diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index a0b8262..79cd8cf 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -454,6 +454,7 @@ async fn download_chunk( let path = base_dir.join(&chunk.relative_path); let mut file = OpenOptions::new() .create(true) + .truncate(true) .write(true) .open(&path) .await?; @@ -533,6 +534,7 @@ async fn download_whole_file( let path = base_dir.join(&desc.relative_path); let mut file = OpenOptions::new() .create(true) + .truncate(true) .write(true) .open(&path) .await?; @@ -795,6 +797,12 @@ struct PeerCtx { local_game_db: Arc>>, } +/// Main peer execution loop that handles peer commands and manages the peer system. +/// +/// # Panics +/// +/// This function will panic if the games folder is None after being checked for None. +/// The panic occurs at line 908 where `games_folder.expect("checked above")` is called. pub async fn run_peer( mut rx_control: UnboundedReceiver, tx_notify_ui: UnboundedSender, @@ -845,129 +853,20 @@ pub async fn run_peer( match cmd { PeerCommand::ListGames => { - log::info!("ListGames command received"); - let all_games = { ctx.peer_game_db.read().await.get_all_games() }; - if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(all_games)) { - log::error!("Failed to send ListGames event: {e}"); - } + handle_list_games_command(&ctx, &tx_notify_ui).await; } PeerCommand::GetGame(id) => { - log::info!("Requesting game from peers: {id}"); - let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) }; - if peers.is_empty() { - log::warn!("No peers have game {id}"); - continue; - } - - let peer_game_db = ctx.peer_game_db.clone(); - let tx_notify_ui = tx_notify_ui.clone(); - tokio::spawn(async move { - let mut fetched_any = false; - for peer_addr in peers { - match request_game_details_from_peer(peer_addr, &id, peer_game_db.clone()) - .await - { - Ok(_) => { - log::info!("Fetched game file list for {id} from peer {peer_addr}"); - fetched_any = true; - } - Err(e) => { - log::error!( - "Failed to fetch game files for {id} from {peer_addr}: {e}" - ); - } - } - } - - if fetched_any { - let aggregated_files = - { peer_game_db.read().await.aggregated_game_files(&id) }; - - if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { - id: id.clone(), - file_descriptions: aggregated_files, - }) { - log::error!("Failed to send GotGameFiles event: {e}"); - } - } else { - log::warn!("Failed to retrieve game files for {id} from any peer"); - } - }); + handle_get_game_command(&ctx, &tx_notify_ui, id).await; } PeerCommand::DownloadGameFiles { id, file_descriptions, } => { - log::info!("Got PeerCommand::DownloadGameFiles"); - let games_folder = { ctx.game_dir.read().await.clone() }; - if games_folder.is_none() { - log::error!("Cannot handle game file descriptions: games_folder is not set"); - continue; - } - - let games_folder = games_folder.expect("checked above"); - let peers = { ctx.peer_game_db.read().await.peers_with_latest_version(&id) }; - if peers.is_empty() { - log::error!("No peers with latest version available to download game {id}"); - continue; - } - - let resolved_descriptions = if file_descriptions.is_empty() { - ctx.peer_game_db.read().await.aggregated_game_files(&id) - } else { - file_descriptions - }; - - if resolved_descriptions.is_empty() { - log::error!( - "No file descriptions available to download game {id}; request metadata first" - ); - continue; - } - - let tx_notify_ui = tx_notify_ui.clone(); - tokio::spawn(async move { - match download_game_files( - &id, - resolved_descriptions, - games_folder, - peers, - tx_notify_ui.clone(), - ) - .await - { - Ok(()) => {} - Err(e) => { - log::error!("Download failed for {id}: {e}"); - if let Err(send_err) = tx_notify_ui - .send(PeerEvent::DownloadGameFilesFailed { id: id.clone() }) - { - log::error!( - "Failed to send DownloadGameFilesFailed event: {send_err}" - ); - } - } - } - }); + handle_download_game_files_command(&ctx, &tx_notify_ui, id, file_descriptions) + .await; } PeerCommand::SetGameDir(game_dir) => { - *ctx.game_dir.write().await = Some(game_dir.clone()); - log::info!("Game directory set to: {game_dir}"); - - // Load local game database when game directory is set - let game_dir = game_dir.clone(); - let local_game_db = ctx.local_game_db.clone(); - tokio::spawn(async move { - match load_local_game_db(&game_dir).await { - Ok(db) => { - *local_game_db.write().await = Some(db); - log::info!("Local game database loaded successfully"); - } - Err(e) => { - log::error!("Failed to load local game database: {e}"); - } - } - }); + handle_set_game_dir_command(&ctx, game_dir).await; } PeerCommand::ConnectToPeer(peer_addr) => { log::info!("Connecting to peer: {peer_addr}"); @@ -1047,6 +946,127 @@ async fn run_server_component( Ok(()) } +async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { + log::info!("ListGames command received"); + let all_games = { ctx.peer_game_db.read().await.get_all_games() }; + if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(all_games)) { + log::error!("Failed to send ListGames event: {e}"); + } +} + +async fn handle_get_game_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { + log::info!("Requesting game from peers: {id}"); + let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) }; + if peers.is_empty() { + log::warn!("No peers have game {id}"); + return; + } + + let peer_game_db = ctx.peer_game_db.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + tokio::spawn(async move { + let mut fetched_any = false; + for peer_addr in peers { + match request_game_details_from_peer(peer_addr, &id, peer_game_db.clone()).await { + Ok(_) => { + log::info!("Fetched game file list for {id} from peer {peer_addr}"); + fetched_any = true; + } + Err(e) => { + log::error!("Failed to fetch game files for {id} from {peer_addr}: {e}"); + } + } + } + + if fetched_any { + let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) }; + + if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { + id: id.clone(), + file_descriptions: aggregated_files, + }) { + log::error!("Failed to send GotGameFiles event: {e}"); + } + } else { + log::warn!("Failed to retrieve game files for {id} from any peer"); + } + }); +} + +async fn handle_download_game_files_command( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, + file_descriptions: Vec, +) { + log::info!("Got PeerCommand::DownloadGameFiles"); + let games_folder = { ctx.game_dir.read().await.clone() }; + if games_folder.is_none() { + log::error!("Cannot handle game file descriptions: games_folder is not set"); + return; + } + + let games_folder = games_folder.expect("checked above"); + let peers = { ctx.peer_game_db.read().await.peers_with_latest_version(&id) }; + if peers.is_empty() { + log::error!("No peers with latest version available to download game {id}"); + return; + } + + let resolved_descriptions = if file_descriptions.is_empty() { + ctx.peer_game_db.read().await.aggregated_game_files(&id) + } else { + file_descriptions + }; + + if resolved_descriptions.is_empty() { + log::error!("No file descriptions available to download game {id}; request metadata first"); + return; + } + + let tx_notify_ui = tx_notify_ui.clone(); + tokio::spawn(async move { + match download_game_files( + &id, + resolved_descriptions, + games_folder, + peers, + tx_notify_ui.clone(), + ) + .await + { + Ok(()) => {} + Err(e) => { + log::error!("Download failed for {id}: {e}"); + if let Err(send_err) = + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() }) + { + log::error!("Failed to send DownloadGameFilesFailed event: {send_err}"); + } + } + } + }); +} + +async fn handle_set_game_dir_command(ctx: &Ctx, game_dir: String) { + *ctx.game_dir.write().await = Some(game_dir.clone()); + log::info!("Game directory set to: {game_dir}"); + + // Load local game database when game directory is set + let game_dir = game_dir.clone(); + let local_game_db = ctx.local_game_db.clone(); + tokio::spawn(async move { + match load_local_game_db(&game_dir).await { + Ok(db) => { + *local_game_db.write().await = Some(db); + log::info!("Local game database loaded successfully"); + } + Err(e) => { + log::error!("Failed to load local game database: {e}"); + } + } + }); +} async fn handle_peer_connection( mut connection: Connection, @@ -1079,6 +1099,7 @@ async fn handle_peer_connection( Ok(()) } +#[allow(clippy::too_many_lines)] async fn handle_peer_stream( stream: BidirectionalStream, ctx: PeerCtx, diff --git a/crates/lanspread-peer/src/peer.rs b/crates/lanspread-peer/src/peer.rs index c223f7e..1d6d396 100644 --- a/crates/lanspread-peer/src/peer.rs +++ b/crates/lanspread-peer/src/peer.rs @@ -19,8 +19,8 @@ async fn stream_file_bytes( let remote_addr = maybe_addr!(tx.connection().remote_addr()); let game_file = base_dir.join(relative_path); log::debug!( - "{remote_addr} streaming file bytes for peer: {:?}, offset: {offset}, length: {length:?}", - game_file + "{remote_addr} streaming file bytes for peer: {}, offset: {offset}, length: {length:?}", + game_file.display() ); let mut file = tokio::fs::File::open(&game_file).await?; @@ -58,8 +58,8 @@ async fn stream_file_bytes( #[allow(clippy::cast_precision_loss)] let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0); log::debug!( - "{remote_addr} sending file data: {:?}, MB/s: {mb_per_s:.2}", - game_file + "{remote_addr} sending file data: {}, MB/s: {mb_per_s:.2}", + game_file.display() ); last_total_bytes = total_bytes; timestamp = Instant::now(); @@ -68,8 +68,8 @@ async fn stream_file_bytes( } log::debug!( - "{remote_addr} finished streaming file bytes: {:?}, total_bytes: {total_bytes}", - game_file + "{remote_addr} finished streaming file bytes: {}, total_bytes: {total_bytes}", + game_file.display() ); tx.close().await?;