fix(peer): bound outbound transfer drain waits

Update and remove-download operations must wait for existing outbound readers to
release game files before mutating or deleting the game root. That wait was
unbounded, so a stuck transfer guard could leave the game permanently marked as
Updating or RemovingDownload and prevent the requested operation from ever
starting.

Return a structured begin-operation result and put a five-second timeout around
the drain wait. If the transfer count does not reach zero, the operation start
fails, the active-operation snapshot is cleared, and the caller emits the
normal failure event for the attempted operation. The destructive mutation is
not allowed to proceed after a timeout.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #3
This commit is contained in:
2026-05-30 16:01:53 +02:00
parent 7e40cf4bfb
commit 9b700c7e3f
+173 -30
View File
@@ -6,6 +6,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
time::Duration,
}; };
use lanspread_db::db::{GameDB, GameFileDescription}; use lanspread_db::db::{GameDB, GameFileDescription};
@@ -38,6 +39,9 @@ use crate::{
// Command handlers // Command handlers
// ============================================================================= // =============================================================================
const OUTBOUND_TRANSFER_DRAIN_POLL_INTERVAL: Duration = Duration::from_millis(10);
const OUTBOUND_TRANSFER_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
/// Handles the `ListGames` command. /// Handles the `ListGames` command.
pub async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) { pub async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
log::info!("ListGames command received"); log::info!("ListGames command received");
@@ -304,9 +308,17 @@ pub async fn handle_download_game_files_command(
return; return;
} }
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
log::warn!("Operation for {id} already in progress; ignoring new download request"); BeginOperationResult::Started => {}
return; BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring new download request");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before downloading {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
} }
let active_operations = ctx.active_operations.clone(); let active_operations = ctx.active_operations.clone();
@@ -491,9 +503,20 @@ async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
return; return;
}; };
if !begin_operation(ctx, tx_notify_ui, &id, prepared.operation_kind).await { match begin_operation(ctx, tx_notify_ui, &id, prepared.operation_kind).await {
log::warn!("Operation for {id} already in progress; ignoring install command"); BeginOperationResult::Started => {}
return; BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring install command");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before install/update of {id}");
events::send(
tx_notify_ui,
PeerEvent::InstallGameFailed { id: id.clone() },
);
return;
}
} }
run_started_install_operation(ctx, tx_notify_ui, id, prepared).await; run_started_install_operation(ctx, tx_notify_ui, id, prepared).await;
@@ -616,9 +639,20 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerE
return; return;
} }
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Uninstalling).await { match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Uninstalling).await {
log::warn!("Operation for {id} already in progress; ignoring uninstall command"); BeginOperationResult::Started => {}
return; BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring uninstall command");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before uninstall of {id}");
events::send(
tx_notify_ui,
PeerEvent::UninstallGameFailed { id: id.clone() },
);
return;
}
} }
let game_root = { ctx.game_dir.read().await.join(&id) }; let game_root = { ctx.game_dir.read().await.join(&id) };
@@ -678,9 +712,20 @@ async fn run_remove_downloaded_operation(
return; return;
} }
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::RemovingDownload).await { match begin_operation(ctx, tx_notify_ui, &id, OperationKind::RemovingDownload).await {
log::warn!("Operation for {id} already in progress; ignoring downloaded-file removal"); BeginOperationResult::Started => {}
return; BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring downloaded-file removal");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before removal of {id}");
events::send(
tx_notify_ui,
PeerEvent::RemoveDownloadedGameFailed { id: id.clone() },
);
return;
}
} }
let game_dir = { ctx.game_dir.read().await.clone() }; let game_dir = { ctx.game_dir.read().await.clone() };
@@ -730,12 +775,36 @@ async fn run_remove_downloaded_operation(
} }
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum BeginOperationResult {
Started,
AlreadyActive,
DrainTimedOut,
}
async fn begin_operation( async fn begin_operation(
ctx: &Ctx, ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str, id: &str,
operation: OperationKind, operation: OperationKind,
) -> bool { ) -> BeginOperationResult {
begin_operation_with_drain_timeout(
ctx,
tx_notify_ui,
id,
operation,
OUTBOUND_TRANSFER_DRAIN_TIMEOUT,
)
.await
}
async fn begin_operation_with_drain_timeout(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
operation: OperationKind,
drain_timeout: Duration,
) -> BeginOperationResult {
let started = { let started = {
let mut active_operations = ctx.active_operations.write().await; let mut active_operations = ctx.active_operations.write().await;
match active_operations.entry(id.to_string()) { match active_operations.entry(id.to_string()) {
@@ -748,27 +817,44 @@ async fn begin_operation(
}; };
if !started { if !started {
return false; return BeginOperationResult::AlreadyActive;
} }
events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await; events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await;
if operation == OperationKind::Updating || operation == OperationKind::RemovingDownload { if operation_requires_outbound_drain(operation)
// Cancel all active outbound transfers for this game && !cancel_and_wait_for_outbound_transfers(ctx, id, drain_timeout).await
let mut tokens_to_cancel = Vec::new(); {
{ end_operation(ctx, tx_notify_ui, id).await;
let active = ctx.active_outbound_transfers.read().await; return BeginOperationResult::DrainTimedOut;
if let Some(transfers) = active.get(id) { }
for (_, token) in transfers {
tokens_to_cancel.push(token.clone()); BeginOperationResult::Started
} }
fn operation_requires_outbound_drain(operation: OperationKind) -> bool {
operation == OperationKind::Updating || operation == OperationKind::RemovingDownload
}
async fn cancel_and_wait_for_outbound_transfers(
ctx: &Ctx,
id: &str,
drain_timeout: Duration,
) -> bool {
let mut tokens_to_cancel = Vec::new();
{
let active = ctx.active_outbound_transfers.read().await;
if let Some(transfers) = active.get(id) {
for (_, token) in transfers {
tokens_to_cancel.push(token.clone());
} }
} }
for token in tokens_to_cancel { }
token.cancel(); for token in tokens_to_cancel {
} token.cancel();
}
// Wait until active outbound transfers drop to 0 let drained = tokio::time::timeout(drain_timeout, async {
loop { loop {
let count = { let count = {
let active = ctx.active_outbound_transfers.read().await; let active = ctx.active_outbound_transfers.read().await;
@@ -777,11 +863,23 @@ async fn begin_operation(
if count == 0 { if count == 0 {
break; break;
} }
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; tokio::time::sleep(OUTBOUND_TRANSFER_DRAIN_POLL_INTERVAL).await;
} }
})
.await
.is_ok();
if !drained {
let count = {
let active = ctx.active_outbound_transfers.read().await;
active.get(id).map_or(0, Vec::len)
};
log::error!(
"Timed out after {drain_timeout:?} waiting for {count} outbound transfer(s) to drain for {id}"
);
} }
true drained
} }
async fn transition_download_to_install( async fn transition_download_to_install(
@@ -1495,7 +1593,10 @@ mod tests {
let ctx = test_ctx(temp.path().to_path_buf()); let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
assert!(begin_operation(&ctx, &tx, "game", OperationKind::Updating).await); assert_eq!(
begin_operation(&ctx, &tx, "game", OperationKind::Updating).await,
BeginOperationResult::Started
);
assert_active_update( assert_active_update(
recv_event(&mut rx).await, recv_event(&mut rx).await,
vec![ActiveOperation { vec![ActiveOperation {
@@ -1505,6 +1606,48 @@ mod tests {
); );
} }
#[tokio::test]
async fn begin_operation_timeout_clears_active_operation_snapshot() {
let temp = TempDir::new("lanspread-handler-active-drain-timeout");
let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive");
let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
let token = CancellationToken::new();
ctx.active_outbound_transfers
.write()
.await
.insert("game".to_string(), vec![(1, token.clone())]);
assert_eq!(
begin_operation_with_drain_timeout(
&ctx,
&tx,
"game",
OperationKind::Updating,
Duration::from_millis(1),
)
.await,
BeginOperationResult::DrainTimedOut
);
assert!(token.is_cancelled());
assert_active_update(
recv_event(&mut rx).await,
vec![ActiveOperation {
id: "game".to_string(),
operation: ActiveOperationKind::Updating,
}],
);
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(
!ctx.active_operations.read().await.contains_key("game"),
"timed-out drain should not leave the operation stuck active"
);
}
#[tokio::test] #[tokio::test]
async fn unchanged_settled_scan_is_not_reemitted() { async fn unchanged_settled_scan_is_not_reemitted() {
let temp = TempDir::new("lanspread-handler-settled-unchanged"); let temp = TempDir::new("lanspread-handler-settled-unchanged");