From df01131f8d501b017affa8dc177b7043d6b24834 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Tue, 18 Nov 2025 21:42:47 +0100 Subject: [PATCH] refactor: Centralize local game database updates and announcements, and add retry logic for requesting games from peers. --- crates/lanspread-peer/src/lib.rs | 233 +++++++++++++++++-------------- 1 file changed, 127 insertions(+), 106 deletions(-) diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 06c8d5a..6a0d53a 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -1481,7 +1481,7 @@ pub async fn run_peer( .await; } PeerCommand::SetGameDir(game_dir) => { - handle_set_game_dir_command(&ctx, game_dir).await; + handle_set_game_dir_command(&ctx, &tx_notify_ui, game_dir).await; } PeerCommand::GetPeerCount => { handle_get_peer_count_command(&ctx, &tx_notify_ui).await; @@ -1786,17 +1786,23 @@ async fn handle_download_game_files_command( ctx.active_downloads.write().await.insert(id, handle); } -async fn handle_set_game_dir_command(ctx: &Ctx, game_dir: String) { +async fn handle_set_game_dir_command( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + game_dir: String, +) { *ctx.game_dir.write().await = Some(game_dir.clone()); log::info!("Game directory set to: {game_dir}"); // Load local game database when game directory is set let game_dir = game_dir.clone(); - let local_game_db = ctx.local_game_db.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + let ctx_clone = ctx.clone(); + tokio::spawn(async move { match load_local_game_db(&game_dir).await { Ok(db) => { - *local_game_db.write().await = Some(db); + update_and_announce_games(&ctx_clone, &tx_notify_ui, db).await; log::info!("Local game database loaded successfully"); } Err(e) => { @@ -1806,6 +1812,80 @@ async fn handle_set_game_dir_command(ctx: &Ctx, game_dir: String) { }); } +async fn update_and_announce_games( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + new_db: GameDB, +) { + let local_game_db = ctx.local_game_db.clone(); + let mut db_guard = local_game_db.write().await; + + let previous_games = db_guard + .as_ref() + .map(|db| db.games.keys().cloned().collect::>()) + .unwrap_or_default(); + + let current_game_ids = new_db.games.keys().cloned().collect::>(); + + // Check if any games were removed + let removed_games: Vec = previous_games + .difference(¤t_game_ids) + .cloned() + .collect(); + + if removed_games.is_empty() { + // Check if any games were added or updated + if previous_games != current_game_ids { + log::debug!("Local games directory structure changed, updating database"); + *db_guard = Some(new_db); + + let all_games = db_guard + .as_ref() + .map(|db| db.all_games().into_iter().cloned().collect::>()) + .unwrap_or_default(); + + if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) { + log::error!("Failed to send LocalGamesUpdated event: {e}"); + } + + // Broadcast update to all peers + let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; + for peer_addr in peer_addresses { + let games_clone = all_games.clone(); + tokio::spawn(async move { + if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await { + log::warn!("Failed to announce games to {peer_addr}: {e}"); + } + }); + } + } + } else { + log::info!("Detected removed games: {removed_games:?}"); + *db_guard = Some(new_db); + + // Notify UI about the change + let all_games = db_guard + .as_ref() + .map(|db| db.all_games().into_iter().cloned().collect::>()) + .unwrap_or_default(); + + if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) { + log::error!("Failed to send LocalGamesUpdated event: {e}"); + } + + // Broadcast update to all peers + let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; + for peer_addr in peer_addresses { + let games_clone = all_games.clone(); + tokio::spawn(async move { + if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await { + log::warn!("Failed to announce games to {peer_addr}: {e}"); + } + }); + } + } +} + async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { log::info!("GetPeerCount command received"); let peer_count = { ctx.peer_game_db.read().await.get_peer_addresses().len() }; @@ -2124,9 +2204,13 @@ async fn run_peer_discovery( let tx_notify_ui_clone = tx_notify_ui.clone(); let peer_game_db_clone = peer_game_db.clone(); tokio::spawn(async move { - if let Err(e) = - request_games_from_peer(peer_addr, tx_notify_ui_clone, peer_game_db_clone) - .await + if let Err(e) = request_games_from_peer( + peer_addr, + tx_notify_ui_clone, + peer_game_db_clone, + 0, + ) + .await { log::error!("Failed to request games from peer {peer_addr}: {e}"); } @@ -2154,7 +2238,37 @@ async fn request_games_from_peer( peer_addr: SocketAddr, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, + mut retry_count: u32, ) -> eyre::Result<()> { + loop { + match fetch_games_from_peer(peer_addr).await { + Ok(games) => { + log::info!("Received {} games from peer {peer_addr}", games.len()); + + if games.is_empty() && retry_count < 1 { + log::info!("Received 0 games from peer {peer_addr}, scheduling retry in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + retry_count += 1; + continue; + } + + let aggregated_games = { + let mut db = peer_game_db.write().await; + db.update_peer_games(peer_addr, games); + db.get_all_games() + }; + + if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) { + log::error!("Failed to send ListGames event: {e}"); + } + return Ok(()); + } + Err(e) => return Err(e), + } + } +} + +async fn fetch_games_from_peer(peer_addr: SocketAddr) -> eyre::Result> { let mut conn = connect_to_peer(peer_addr).await?; let stream = conn.open_bidirectional_stream().await?; @@ -2173,26 +2287,12 @@ async fn request_games_from_peer( } let response = Response::decode(data.freeze()); - match response { - Response::ListGames(games) => { - log::info!("Received {} games from peer {peer_addr}", games.len()); - - let aggregated_games = { - let mut db = peer_game_db.write().await; - db.update_peer_games(peer_addr, games); - db.get_all_games() - }; - - if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) { - log::error!("Failed to send ListGames event: {e}"); - } - } - _ => { - log::warn!("Unexpected response from peer {peer_addr}: {response:?}"); - } + if let Response::ListGames(games) = response { + Ok(games) + } else { + log::warn!("Unexpected response from peer {peer_addr}: {response:?}"); + Ok(Vec::new()) // Treat unexpected response as empty list or error? } - - Ok(()) } async fn announce_games_to_peer(peer_addr: SocketAddr, games: Vec) -> eyre::Result<()> { @@ -2465,86 +2565,7 @@ async fn run_local_game_monitor(tx_notify_ui: UnboundedSender, ctx: C if let Some(ref game_dir) = game_dir { match scan_local_games(game_dir).await { Ok(current_games) => { - let local_game_db = ctx.local_game_db.clone(); - let mut db_guard = local_game_db.write().await; - - let previous_games = db_guard - .as_ref() - .map(|db| db.games.keys().cloned().collect::>()) - .unwrap_or_default(); - - let current_game_ids = - current_games.games.keys().cloned().collect::>(); - - // Check if any games were removed - let removed_games: Vec = previous_games - .difference(¤t_game_ids) - .cloned() - .collect(); - - if removed_games.is_empty() { - // Check if any games were added or updated - if previous_games != current_game_ids { - log::debug!( - "Local games directory structure changed, updating database" - ); - *db_guard = Some(current_games); - - let all_games = db_guard - .as_ref() - .map(|db| { - db.all_games().into_iter().cloned().collect::>() - }) - .unwrap_or_default(); - - if let Err(e) = - tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) - { - log::error!("Failed to send LocalGamesUpdated event: {e}"); - } - - // Broadcast update to all peers - let peer_addresses = - { ctx.peer_game_db.read().await.get_peer_addresses() }; - for peer_addr in peer_addresses { - let games_clone = all_games.clone(); - tokio::spawn(async move { - if let Err(e) = - announce_games_to_peer(peer_addr, games_clone).await - { - log::warn!("Failed to announce games to {peer_addr}: {e}"); - } - }); - } - } - } else { - log::info!("Detected removed games: {removed_games:?}"); - *db_guard = Some(current_games); - - // Notify UI about the change - let all_games = db_guard - .as_ref() - .map(|db| db.all_games().into_iter().cloned().collect::>()) - .unwrap_or_default(); - - if let Err(e) = - tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) - { - log::error!("Failed to send LocalGamesUpdated event: {e}"); - } - - // Broadcast update to all peers - let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; - for peer_addr in peer_addresses { - let games_clone = all_games.clone(); - tokio::spawn(async move { - if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await - { - log::warn!("Failed to announce games to {peer_addr}: {e}"); - } - }); - } - } + update_and_announce_games(&ctx, &tx_notify_ui, current_games).await; } Err(e) => { log::error!("Failed to scan local games directory: {e}");