diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index b32e43d..25d298e 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -1281,7 +1281,16 @@ async fn calculate_directory_size(dir: &Path) -> eyre::Result { Ok(total_size) } -async fn local_download_available(game_dir: &str, game_id: &str) -> bool { +async fn local_download_available( + game_dir: &str, + game_id: &str, + downloading_games: &Arc>>, +) -> bool { + if downloading_games.read().await.contains(game_id) { + log::debug!("Not serving game {game_id} locally because it is still downloading"); + return false; + } + let game_path = PathBuf::from(game_dir).join(game_id); let eti_path = game_path.join(format!("{game_id}.eti")); @@ -1299,6 +1308,7 @@ struct Ctx { local_game_db: Arc>>, peer_game_db: Arc>, local_peer_addr: Arc>>, + downloading_games: Arc>>, } #[derive(Clone)] @@ -1306,6 +1316,7 @@ struct PeerCtx { game_dir: Arc>>, local_game_db: Arc>>, local_peer_addr: Arc>>, + downloading_games: Arc>>, } impl std::fmt::Debug for PeerCtx { @@ -1314,6 +1325,7 @@ impl std::fmt::Debug for PeerCtx { .field("game_dir", &"...") .field("local_game_db", &"...") .field("local_peer_addr", &"...") + .field("downloading_games", &"...") .finish() } } @@ -1334,12 +1346,14 @@ pub async fn run_peer( local_game_db: Arc::new(RwLock::new(None)), peer_game_db: Arc::new(RwLock::new(PeerGameDB::new())), local_peer_addr: Arc::new(RwLock::new(None)), + downloading_games: Arc::new(RwLock::new(HashSet::new())), }; let peer_ctx = PeerCtx { game_dir: ctx.game_dir.clone(), local_game_db: ctx.local_game_db.clone(), local_peer_addr: ctx.local_peer_addr.clone(), + downloading_games: ctx.downloading_games.clone(), }; // Start server component @@ -1519,7 +1533,7 @@ async fn try_serve_local_game( return false; }; - if !local_download_available(&game_dir, id).await { + if !local_download_available(&game_dir, id, &ctx.downloading_games).await { return false; } @@ -1646,7 +1660,7 @@ async fn handle_download_game_files_command( } if peer_whitelist.is_empty() { - if local_download_available(&games_folder, &id).await { + if local_download_available(&games_folder, &id, &ctx.downloading_games).await { log::info!("Using locally downloaded files for game {id}; skipping peer transfer"); if let Err(e) = tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { id: id.clone() }) { @@ -1663,9 +1677,18 @@ async fn handle_download_game_files_command( return; } + { + let mut in_progress = ctx.downloading_games.write().await; + if !in_progress.insert(id.clone()) { + log::warn!("Download for {id} already in progress; ignoring new request"); + return; + } + } + + let downloading_games = ctx.downloading_games.clone(); let tx_notify_ui = tx_notify_ui.clone(); tokio::spawn(async move { - match download_game_files( + let result = download_game_files( &id, resolved_descriptions, games_folder, @@ -1673,16 +1696,19 @@ async fn handle_download_game_files_command( file_peer_map, tx_notify_ui.clone(), ) - .await + .await; + { - Ok(()) => {} - Err(e) => { - log::error!("Download failed for {id}: {e}"); - if let Err(send_err) = - tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() }) - { - log::error!("Failed to send DownloadGameFilesFailed event: {send_err}"); - } + let mut guard = downloading_games.write().await; + guard.remove(&id); + } + + if let Err(e) = result { + log::error!("Download failed for {id}: {e}"); + if let Err(send_err) = + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() }) + { + log::error!("Failed to send DownloadGameFilesFailed event: {send_err}"); } } }); @@ -1780,10 +1806,22 @@ async fn handle_peer_stream( Request::ListGames => { // Return list of games from this peer log::info!("Received ListGames request from peer"); - let games = if let Some(ref db) = *ctx.local_game_db.read().await { - db.all_games().into_iter().cloned().collect() + let snapshot = { + let db_guard = ctx.local_game_db.read().await; + if let Some(ref db) = *db_guard { + db.all_games().into_iter().cloned().collect::>() + } else { + Vec::new() + } + }; + let games = if snapshot.is_empty() { + snapshot } else { - Vec::new() + let downloading = ctx.downloading_games.read().await; + snapshot + .into_iter() + .filter(|game| !downloading.contains(&game.id)) + .collect() }; if let Err(e) = tx.send(Response::ListGames(games).encode()).await { @@ -1792,7 +1830,13 @@ async fn handle_peer_stream( } Request::GetGame { id } => { log::info!("Received GetGame request for {id} from peer"); - let response = if let Some(ref game_dir) = *ctx.game_dir.read().await { + let downloading = ctx.downloading_games.read().await.contains(&id); + let response = if downloading { + log::info!( + "Declining to serve GetGame for {id} because download is in progress" + ); + Response::GameNotFound(id) + } else if let Some(ref game_dir) = *ctx.game_dir.read().await { if let Some(ref db) = *ctx.local_game_db.read().await { if db.get_game_by_id(&id).is_some() { match get_game_file_descriptions(&id, game_dir).await {