diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 144c9c1..9fe7f91 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -6,6 +6,7 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use lanspread_db::db::{GameDB, GameFileDescription}; @@ -38,6 +39,9 @@ use crate::{ // Command handlers // ============================================================================= +const OUTBOUND_TRANSFER_DRAIN_POLL_INTERVAL: Duration = Duration::from_millis(10); +const OUTBOUND_TRANSFER_DRAIN_TIMEOUT: Duration = Duration::from_secs(5); + /// Handles the `ListGames` command. pub async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { log::info!("ListGames command received"); @@ -304,9 +308,17 @@ pub async fn handle_download_game_files_command( return; } - if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { - log::warn!("Operation for {id} already in progress; ignoring new download request"); - return; + match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { + BeginOperationResult::Started => {} + BeginOperationResult::AlreadyActive => { + log::warn!("Operation for {id} already in progress; ignoring new download request"); + return; + } + BeginOperationResult::DrainTimedOut => { + log::error!("Timed out waiting for outbound transfers before downloading {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } } let active_operations = ctx.active_operations.clone(); @@ -491,9 +503,20 @@ async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender {} + BeginOperationResult::AlreadyActive => { + log::warn!("Operation for {id} already in progress; ignoring install command"); + return; + } + BeginOperationResult::DrainTimedOut => { + log::error!("Timed out waiting for outbound transfers before install/update of {id}"); + events::send( + tx_notify_ui, + PeerEvent::InstallGameFailed { id: id.clone() }, + ); + return; + } } run_started_install_operation(ctx, tx_notify_ui, id, prepared).await; @@ -616,9 +639,20 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender {} + BeginOperationResult::AlreadyActive => { + log::warn!("Operation for {id} already in progress; ignoring uninstall command"); + return; + } + BeginOperationResult::DrainTimedOut => { + log::error!("Timed out waiting for outbound transfers before uninstall of {id}"); + events::send( + tx_notify_ui, + PeerEvent::UninstallGameFailed { id: id.clone() }, + ); + return; + } } let game_root = { ctx.game_dir.read().await.join(&id) }; @@ -678,9 +712,20 @@ async fn run_remove_downloaded_operation( return; } - if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::RemovingDownload).await { - log::warn!("Operation for {id} already in progress; ignoring downloaded-file removal"); - return; + match begin_operation(ctx, tx_notify_ui, &id, OperationKind::RemovingDownload).await { + BeginOperationResult::Started => {} + BeginOperationResult::AlreadyActive => { + log::warn!("Operation for {id} already in progress; ignoring downloaded-file removal"); + return; + } + BeginOperationResult::DrainTimedOut => { + log::error!("Timed out waiting for outbound transfers before removal of {id}"); + events::send( + tx_notify_ui, + PeerEvent::RemoveDownloadedGameFailed { id: id.clone() }, + ); + return; + } } let game_dir = { ctx.game_dir.read().await.clone() }; @@ -730,12 +775,36 @@ async fn run_remove_downloaded_operation( } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum BeginOperationResult { + Started, + AlreadyActive, + DrainTimedOut, +} + async fn begin_operation( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: &str, operation: OperationKind, -) -> bool { +) -> BeginOperationResult { + begin_operation_with_drain_timeout( + ctx, + tx_notify_ui, + id, + operation, + OUTBOUND_TRANSFER_DRAIN_TIMEOUT, + ) + .await +} + +async fn begin_operation_with_drain_timeout( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: &str, + operation: OperationKind, + drain_timeout: Duration, +) -> BeginOperationResult { let started = { let mut active_operations = ctx.active_operations.write().await; match active_operations.entry(id.to_string()) { @@ -748,27 +817,44 @@ async fn begin_operation( }; if !started { - return false; + return BeginOperationResult::AlreadyActive; } events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await; - if operation == OperationKind::Updating || operation == OperationKind::RemovingDownload { - // Cancel all active outbound transfers for this game - let mut tokens_to_cancel = Vec::new(); - { - let active = ctx.active_outbound_transfers.read().await; - if let Some(transfers) = active.get(id) { - for (_, token) in transfers { - tokens_to_cancel.push(token.clone()); - } + if operation_requires_outbound_drain(operation) + && !cancel_and_wait_for_outbound_transfers(ctx, id, drain_timeout).await + { + end_operation(ctx, tx_notify_ui, id).await; + return BeginOperationResult::DrainTimedOut; + } + + BeginOperationResult::Started +} + +fn operation_requires_outbound_drain(operation: OperationKind) -> bool { + operation == OperationKind::Updating || operation == OperationKind::RemovingDownload +} + +async fn cancel_and_wait_for_outbound_transfers( + ctx: &Ctx, + id: &str, + drain_timeout: Duration, +) -> bool { + let mut tokens_to_cancel = Vec::new(); + { + let active = ctx.active_outbound_transfers.read().await; + if let Some(transfers) = active.get(id) { + for (_, token) in transfers { + tokens_to_cancel.push(token.clone()); } } - for token in tokens_to_cancel { - token.cancel(); - } + } + for token in tokens_to_cancel { + token.cancel(); + } - // Wait until active outbound transfers drop to 0 + let drained = tokio::time::timeout(drain_timeout, async { loop { let count = { let active = ctx.active_outbound_transfers.read().await; @@ -777,11 +863,23 @@ async fn begin_operation( if count == 0 { break; } - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + tokio::time::sleep(OUTBOUND_TRANSFER_DRAIN_POLL_INTERVAL).await; } + }) + .await + .is_ok(); + + if !drained { + let count = { + let active = ctx.active_outbound_transfers.read().await; + active.get(id).map_or(0, Vec::len) + }; + log::error!( + "Timed out after {drain_timeout:?} waiting for {count} outbound transfer(s) to drain for {id}" + ); } - true + drained } async fn transition_download_to_install( @@ -1495,7 +1593,10 @@ mod tests { let ctx = test_ctx(temp.path().to_path_buf()); let (tx, mut rx) = mpsc::unbounded_channel(); - assert!(begin_operation(&ctx, &tx, "game", OperationKind::Updating).await); + assert_eq!( + begin_operation(&ctx, &tx, "game", OperationKind::Updating).await, + BeginOperationResult::Started + ); assert_active_update( recv_event(&mut rx).await, vec![ActiveOperation { @@ -1505,6 +1606,48 @@ mod tests { ); } + #[tokio::test] + async fn begin_operation_timeout_clears_active_operation_snapshot() { + let temp = TempDir::new("lanspread-handler-active-drain-timeout"); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join("game.eti"), b"archive"); + + let ctx = test_ctx(temp.path().to_path_buf()); + let (tx, mut rx) = mpsc::unbounded_channel(); + let token = CancellationToken::new(); + ctx.active_outbound_transfers + .write() + .await + .insert("game".to_string(), vec![(1, token.clone())]); + + assert_eq!( + begin_operation_with_drain_timeout( + &ctx, + &tx, + "game", + OperationKind::Updating, + Duration::from_millis(1), + ) + .await, + BeginOperationResult::DrainTimedOut + ); + + assert!(token.is_cancelled()); + assert_active_update( + recv_event(&mut rx).await, + vec![ActiveOperation { + id: "game".to_string(), + operation: ActiveOperationKind::Updating, + }], + ); + assert_active_update(recv_event(&mut rx).await, Vec::new()); + assert!( + !ctx.active_operations.read().await.contains_key("game"), + "timed-out drain should not leave the operation stuck active" + ); + } + #[tokio::test] async fn unchanged_settled_scan_is_not_reemitted() { let temp = TempDir::new("lanspread-handler-settled-unchanged");