From 66c7d5912bc3c521761e4b0ddf957b576bcd3c68 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Thu, 11 Jun 2026 07:33:16 +0200 Subject: [PATCH] fix(peer): harden streamed install lifecycle Claude Fable 5's branch review found that receiver cancellation or a QUIC send failure could leave the sender-side archive producer blocked on the bounded frame channel. That kept the outbound transfer guard alive and could block later installs or updates of the same game. Route archive frames through a cancellable StreamInstallFrameSink instead of exposing the raw channel sender to providers. The QUIC forwarder now cancels and closes the receive side before awaiting the producer, so a blocked send wakes and the transfer guard can drop normally. Make PeerCommand::StreamInstallGame own its peer metadata preflight inside the peer core. The Tauri layer now sends the command directly, and the peer runtime fetches file details from catalog-version peers before running the existing majority validation and retry logic. This removes the UI-only pending streamed install set and gives PeerEvent::GotGameFiles one meaning again: continue a normal archive download. Tighten the receiver transaction edge cases too. Rollback removes a newly created empty game root, but preserves pre-existing roots. Once streamed staging has been promoted to local/, intent or launch-settings cleanup failures are logged for startup recovery instead of reporting a failed install for bytes that are already committed. Accept missing RAR CRC32 metadata for zero-byte files as CRC32 00000000 while still requiring CRC32 metadata for non-empty files. Update the peer README, scenario docs, and next-steps handoff so the documented ownership and remaining trust limitation match the implementation. Test Plan: - just fmt - just test - just frontend-test - just clippy - git diff --check - python3 -m py_compile \ crates/lanspread-peer-cli/scripts/run_extended_scenarios.py - python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \ S39 S40 S41 S42 S43 S44 S45 S46 S47 --build-image Refs: streamed-install review handoff from Claude Fable 5 --- NEXT_STEPS.md | 5 +- PEER_CLI_SCENARIOS.md | 2 +- .../scripts/run_extended_scenarios.py | 18 -- crates/lanspread-peer-cli/src/main.rs | 1 - crates/lanspread-peer/README.md | 28 +- crates/lanspread-peer/src/handlers.rs | 263 ++++++++++++------ .../lanspread-peer/src/install/transaction.rs | 169 +++++++++-- crates/lanspread-peer/src/lib.rs | 1 + crates/lanspread-peer/src/stream_install.rs | 144 ++++++---- .../src-tauri/src/lib.rs | 57 +--- 10 files changed, 448 insertions(+), 240 deletions(-) diff --git a/NEXT_STEPS.md b/NEXT_STEPS.md index 4bdd410..8d734ed 100644 --- a/NEXT_STEPS.md +++ b/NEXT_STEPS.md @@ -61,5 +61,6 @@ product-ready. modal status shows `Installed, not shareable`. Downloaded-and-installed games keep the normal `Installed` label. -My recommended next slice: make the provider abstraction final-ish, then -implement a real one-pass provider. Everything else builds cleanly on that. +The remaining production-readiness step is additive: move from sender-owned RAR +metadata to catalog-owned archive or extracted-file hashes, then verify those +at the receiver before committing the streamed install. diff --git a/PEER_CLI_SCENARIOS.md b/PEER_CLI_SCENARIOS.md index 3586367..9fe4895 100644 --- a/PEER_CLI_SCENARIOS.md +++ b/PEER_CLI_SCENARIOS.md @@ -46,7 +46,7 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path. | S36 | Catalog singleton beats stale majority | Five peers advertise one game; one peer has the catalog version and four peers have stale versions. | `list-games` reports `peer_count=1` and the catalog `eti_game_version`; all descriptors and chunks come from the singleton catalog-version peer, while stale peers remain hidden and contribute zero bytes. | | S37 | Single-source download throughput | A source peer advertises a temporary catalog game with one sparse `2 GiB` `.eti`; an empty client downloads it with `install=false`. | The client emits `download-finished` with throughput measurements (`bytes`, `duration_ms`, `mib_per_s`, `mbit_per_s`), and the downloaded archive size matches the source. | | S38 | First-play launch-setting stamping | `fixture-persona/css` ships a real RAR `.eti` whose tree buries a CRLF `SmartSteamEmu.ini` with a stub `PersonaName` line under `engine/bin/win64/steam_settings/`, plus a stub `account_name.txt` and `language.txt` under `profiles/local/`. A peer installs `css` (with `--unrar`), then sends `play css` with a username and language, then `play css` again. | After install the marker `games/css/launch_settings_applied` is absent and the stub files are intact under `local/`. The first `play` returns `already_applied=false` with `account_name_written`, `language_written`, and `persona_name_written` all true; the deep `SmartSteamEmu.ini` `PersonaName` value becomes the username with its `\r\n` ending and sibling lines preserved, `account_name.txt` becomes the username, `language.txt` becomes the passed language, and the marker now exists. A second `play` returns `already_applied=true`, rewrites nothing, and leaves the files untouched even if their values were reset externally. | -| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `got-game-files`, `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. | +| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. | | S40 | Streamed install receiver is not a peer source | After S39, a third peer connects only to the streamed-install receiver. | The third peer may see the receiver's local-only summary in peer snapshots, but `list-games` remote aggregation does not expose `cnctw` as downloadable, `peer_count` remains zero/absent, and attempting `download cnctw` fails with no local files created. | | S41 | Solid archive streamed install | Empty client connects to a peer serving `fixture-solid/cnctw`, whose `.eti` is a real solid RAR archive. The receiver uses the container-bundled `unrar` stream provider. | The fixture is verified as solid with `unrar lt`; streamed install finishes with `downloaded=false`, `installed=true`, `availability=LocalOnly`; root archive and `version.ini` are absent; streamed byte count equals the extracted solid entries; local payload SHA-256 hashes match `unrar p` output. | | S42 | Streamed install whole-stream retry | Empty client connects to two peers serving the same catalog-version `cnctw`: one broken source whose `--unrar` path is missing, followed by one good source. | The broken source sorts before the good source in retry order, contributes zero chunks, and the good source completes a fresh whole-stream attempt. The final state is local-only installed, no root archive/sentinel, no `.local.installing`, byte count matches the extracted entries, and payload hashes match the good source. | diff --git a/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py b/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py index ba8abf3..c2077d2 100644 --- a/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py +++ b/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py @@ -1208,12 +1208,6 @@ class Runner: wait_remote_game(client, "cnctw", peer_count=1) waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) - client.wait_for( - event_is("got-game-files", "cnctw"), - timeout=20, - description="got cnctw files", - waiter=waiter, - ) client.wait_for( event_is("download-begin", "cnctw"), timeout=20, @@ -1320,12 +1314,6 @@ class Runner: waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) - client.wait_for( - event_is("got-game-files", "cnctw"), - timeout=20, - description="got solid cnctw files", - waiter=waiter, - ) client.wait_for( event_is("download-finished", "cnctw"), timeout=60, @@ -1409,12 +1397,6 @@ class Runner: waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) - client.wait_for( - event_is("got-game-files", "cnctw"), - timeout=20, - description="got retry cnctw files", - waiter=waiter, - ) client.wait_for( event_is("download-finished", "cnctw"), timeout=60, diff --git a/crates/lanspread-peer-cli/src/main.rs b/crates/lanspread-peer-cli/src/main.rs index 24d2426..9663284 100644 --- a/crates/lanspread-peer-cli/src/main.rs +++ b/crates/lanspread-peer-cli/src/main.rs @@ -261,7 +261,6 @@ async fn handle_command( CliCommand::StreamInstall { game_id } => { ensure_catalog_game(shared, game_id).await?; ensure_no_active_operation(shared, game_id).await?; - let _ = game_files_for_download(sender, shared, game_id).await?; sender.send(PeerCommand::StreamInstallGame { id: game_id.clone(), })?; diff --git a/crates/lanspread-peer/README.md b/crates/lanspread-peer/README.md index 4e34ef8..55a409b 100644 --- a/crates/lanspread-peer/README.md +++ b/crates/lanspread-peer/README.md @@ -14,8 +14,8 @@ It is designed to run headless – other crates (most notably roots are announced or served. - `PeerCommand` represents the small control surface exposed to the UI layer: `ListGames`, `GetGame`, `FetchLatestFromPeers`, `DownloadGameFiles`, - `InstallGame`, `UninstallGame`, `RemoveDownloadedGame`, `CancelDownload`, - `SetGameDir`, and `GetPeerCount`. + `StreamInstallGame`, `InstallGame`, `UninstallGame`, `RemoveDownloadedGame`, + `CancelDownload`, `SetGameDir`, and `GetPeerCount`. - `PeerEvent` enumerates everything the peer runtime reports back to the UI: library snapshots, download/install/uninstall lifecycle updates, runtime failures, and peer membership changes. @@ -28,8 +28,8 @@ lifetime of the process: 1. **Server component** (`run_server_component`) – listens for QUIC connections, advertises via mDNS, and serves `Request::ListGames`, `Request::GetGame`, - `Request::GetGameFileData`, and `Request::GetGameFileChunk` by reading from - the local game directory. + `Request::GetGameFileData`, `Request::GetGameFileChunk`, and + `Request::StreamInstall` by reading from the local game directory. 2. **Discovery loop** (`run_peer_discovery`) – uses the `lanspread-mdns` helper to discover other peers. The blocking mDNS work is executed on a dedicated thread via `tokio::task::spawn_blocking` so that the Tokio runtime @@ -87,6 +87,26 @@ When the UI asks to download a game: 7. After a successful sentinel commit, `PeerEvent::DownloadGameFilesFinished` is emitted and the peer auto-runs the install transaction. +### Streamed Install Pipeline + +Low-disk installs use `PeerCommand::StreamInstallGame` instead of the normal +archive download pipeline. The peer core owns the whole operation: it refreshes +file metadata from catalog-version peers, runs the same majority file-size +validation used by normal downloads, selects a validated peer list, and emits +the regular download/install lifecycle events while streaming archive-expanded +bytes directly into a `StreamedInstallTransaction`. + +The sender-side `StreamInstallProvider` writes control and chunk frames through +a cancellable `StreamInstallFrameSink`. If the QUIC writer fails because the +receiver cancelled or disconnected, the sink wakes any producer blocked on the +bounded frame channel and lets the transfer guard drop normally. + +Each failed peer attempt rolls back its staging directory before trying the next +validated peer. A transaction that created a previously missing game root +removes that root again when rollback leaves it empty. Once staging has been +renamed to `local/`, post-promote intent or launch-settings cleanup failures are +logged for startup recovery rather than reported as a failed install. + `PeerCommand::CancelDownload` cancels the tracked download token for an active transfer. The transfer task remains responsible for clearing `active_operations`, discarding partial payload files, and refreshing the settled local snapshot, so diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 8985f74..10e68c9 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -471,38 +471,6 @@ pub async fn handle_stream_install_game_command( return; } - let expected_version = catalog_expected_version(ctx, &id).await; - let mut peers = { - match ctx - .peer_game_db - .read() - .await - .validate_file_sizes_majority(&id, expected_version.as_deref()) - { - Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => { - peer_whitelist - } - Ok(_) => { - log::error!("No trusted peers available for streamed install of {id}"); - send_download_failed(tx_notify_ui, &id); - return; - } - Err(err) => { - log::error!( - "File size majority validation failed for streamed install {id}: {err}" - ); - send_download_failed(tx_notify_ui, &id); - return; - } - } - }; - peers.sort(); - if peers.is_empty() { - log::error!("No peer selected for streamed install of {id}"); - send_download_failed(tx_notify_ui, &id); - return; - } - match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { BeginOperationResult::Started => {} BeginOperationResult::AlreadyActive => { @@ -516,6 +484,7 @@ pub async fn handle_stream_install_game_command( } } + let expected_version = catalog_expected_version(ctx, &id).await; let cancel_token = ctx.shutdown.child_token(); ctx.active_downloads .write() @@ -525,8 +494,15 @@ pub async fn handle_stream_install_game_command( let ctx_clone = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.spawn(async move { - run_stream_install_operation(ctx_clone, tx_notify_ui, id, game_root, peers, cancel_token) - .await; + run_stream_install_operation( + ctx_clone, + tx_notify_ui, + id, + game_root, + expected_version, + cancel_token, + ) + .await; }); } @@ -575,7 +551,7 @@ async fn run_stream_install_operation( tx_notify_ui: UnboundedSender, id: String, game_root: PathBuf, - peer_addrs: Vec, + expected_version: Option, cancel_token: CancellationToken, ) { let download_guard = OperationGuard::download( @@ -590,27 +566,94 @@ async fn run_stream_install_operation( PeerEvent::DownloadGameFilesBegin { id: id.clone() }, ); + let peer_addrs = + match select_stream_install_peers(&ctx, &id, expected_version.as_deref(), &cancel_token) + .await + { + Ok(peers) => peers, + Err(err) => { + let download_was_cancelled = cancel_token.is_cancelled(); + if download_was_cancelled { + log::info!("Streamed install preflight cancelled for {id}: {err}"); + } else { + log::error!("Streamed install preflight failed for {id}: {err}"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; + return; + } + }; + + match receive_streamed_install_from_peers( + &ctx, + &tx_notify_ui, + &id, + &game_root, + &peer_addrs, + &cancel_token, + ) + .await + { + Ok(transaction) => { + if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing) + .await + { + clear_active_download(&ctx, &id).await; + send_download_finished(&tx_notify_ui, &id); + download_guard.disarm(); + commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; + return; + } + + if let Err(err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {err}"); + } + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await; + } + Err(err) => { + let download_was_cancelled = cancel_token.is_cancelled(); + if download_was_cancelled { + log::info!("Streamed install download cancelled for {id}: {err}"); + } else { + log::error!("Streamed install download failed for {id}: {err}"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; + } + } +} + +async fn receive_streamed_install_from_peers( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: &str, + game_root: &Path, + peer_addrs: &[SocketAddr], + cancel_token: &CancellationToken, +) -> eyre::Result { let mut last_receive_error = None; - for peer_addr in peer_addrs { + for &peer_addr in peer_addrs { if cancel_token.is_cancelled() { - last_receive_error = Some(eyre::eyre!("streamed install for {id} was cancelled")); - break; + eyre::bail!("streamed install for {id} was cancelled"); } let transaction = - match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id).await { - Ok(transaction) => transaction, - Err(err) => { - log::error!("Failed to prepare streamed install for {id}: {err}"); - finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) - .await; - return; - } - }; - + install::begin_streamed_install(game_root, ctx.state_dir.as_ref(), id).await?; let receive_result = receive_streamed_install( peer_addr, - &id, + id, transaction.staging_dir(), tx_notify_ui.clone(), cancel_token.clone(), @@ -618,37 +661,13 @@ async fn run_stream_install_operation( .await; match receive_result { - Ok(()) => { - if transition_download_to_install( - &ctx, - &tx_notify_ui, - &id, - OperationKind::Installing, - ) - .await - { - clear_active_download(&ctx, &id).await; - send_download_finished(&tx_notify_ui, &id); - download_guard.disarm(); - commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; - return; - } - - if let Err(err) = transaction.rollback().await { - log::error!("Failed to roll back streamed install for {id}: {err}"); - } - finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) - .await; - return; - } + Ok(()) => return Ok(transaction), Err(err) => { if let Err(rollback_err) = transaction.rollback().await { log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); } if cancel_token.is_cancelled() { - log::info!("Streamed install download cancelled for {id}: {err}"); - last_receive_error = Some(err); - break; + return Err(err); } log::warn!( @@ -659,24 +678,84 @@ async fn run_stream_install_operation( } } - let download_was_cancelled = cancel_token.is_cancelled(); - if let Some(err) = last_receive_error { - if download_was_cancelled { - log::info!("Streamed install download cancelled for {id}: {err}"); - } else { - log::error!("Streamed install download failed for {id}: {err}"); - } - } else { - log::error!("Streamed install download failed for {id}: no peer attempts were made"); + Err(last_receive_error.unwrap_or_else(|| { + eyre::eyre!("streamed install download failed for {id}: no peer attempts were made") + })) +} + +async fn select_stream_install_peers( + ctx: &Ctx, + id: &str, + expected_version: Option<&str>, + cancel_token: &CancellationToken, +) -> eyre::Result> { + let mut metadata_peers = { + ctx.peer_game_db + .read() + .await + .peers_with_expected_version(id, expected_version) + }; + metadata_peers.sort(); + if metadata_peers.is_empty() { + eyre::bail!("no peers have game {id}"); } - finish_failed_stream_download( - &ctx, - &tx_notify_ui, - &id, - download_guard, - download_was_cancelled, - ) - .await; + + refresh_stream_install_file_details(ctx, id, &metadata_peers, cancel_token).await?; + + let mut peers = match ctx + .peer_game_db + .read() + .await + .validate_file_sizes_majority(id, expected_version) + { + Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => peer_whitelist, + Ok(_) => { + eyre::bail!("no trusted peers available for streamed install of {id}"); + } + Err(err) => { + return Err(err.wrap_err(format!( + "file size majority validation failed for streamed install {id}" + ))); + } + }; + peers.sort(); + if peers.is_empty() { + eyre::bail!("no peer selected for streamed install of {id}"); + } + + Ok(peers) +} + +async fn refresh_stream_install_file_details( + ctx: &Ctx, + id: &str, + peers: &[SocketAddr], + cancel_token: &CancellationToken, +) -> eyre::Result<()> { + let mut fetched_any = false; + for &peer_addr in peers { + if cancel_token.is_cancelled() { + eyre::bail!("streamed install for {id} was cancelled"); + } + + match request_game_details_and_update(peer_addr, id, ctx.peer_game_db.clone()).await { + Ok(_) => { + log::info!("Fetched streamed-install file list for {id} from peer {peer_addr}"); + fetched_any = true; + } + Err(err) => { + log::error!( + "Failed to fetch streamed-install files for {id} from {peer_addr}: {err}" + ); + } + } + } + + if !fetched_any { + eyre::bail!("failed to retrieve game files for {id} from any peer"); + } + + Ok(()) } async fn finish_failed_stream_download( diff --git a/crates/lanspread-peer/src/install/transaction.rs b/crates/lanspread-peer/src/install/transaction.rs index 959dc1c..531cba9 100644 --- a/crates/lanspread-peer/src/install/transaction.rs +++ b/crates/lanspread-peer/src/install/transaction.rs @@ -39,6 +39,7 @@ pub struct StreamedInstallTransaction { id: String, staging: PathBuf, eti_version: Option, + created_game_root: bool, } impl StreamedInstallTransaction { @@ -49,40 +50,61 @@ impl StreamedInstallTransaction { pub async fn commit(self) -> eyre::Result<()> { let local = local_dir(&self.game_root); - let result = async { - tokio::fs::rename(&self.staging, &local) - .await - .wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))?; - reset_launch_settings_marker(&self.state_dir, &self.id).await?; - write_intent( - &self.state_dir, - &self.id, - &InstallIntent::none(&self.id, self.eti_version.clone()), - ) + if let Err(err) = tokio::fs::rename(&self.staging, &local) .await - } - .await; - - if result.is_err() { + .wrap_err_with(|| format!("failed to promote streamed install for {}", self.id)) + { if let Err(cleanup_err) = remove_dir_all_if_exists(&self.staging).await { log::warn!( "Failed to clean streamed install staging {}: {cleanup_err}", self.staging.display() ); } + if let Err(cleanup_err) = + remove_created_empty_game_root(&self.game_root, self.created_game_root).await + { + log::warn!( + "Failed to clean streamed install game root {}: {cleanup_err}", + self.game_root.display() + ); + } let _ = write_intent( &self.state_dir, &self.id, &InstallIntent::none(&self.id, self.eti_version.clone()), ) .await; + return Err(err); } - result + if let Err(err) = reset_launch_settings_marker(&self.state_dir, &self.id).await { + log::error!( + "Streamed install for {} was promoted but launch-settings marker reset failed: {err}", + self.id + ); + } + if let Err(err) = write_intent( + &self.state_dir, + &self.id, + &InstallIntent::none(&self.id, self.eti_version.clone()), + ) + .await + { + log::error!( + "Streamed install for {} was promoted but intent cleanup failed: {err}", + self.id + ); + } + + Ok(()) } pub async fn rollback(self) -> eyre::Result<()> { - let staging_result = remove_dir_all_if_exists(&self.staging).await; + let cleanup_result = async { + remove_dir_all_if_exists(&self.staging).await?; + remove_created_empty_game_root(&self.game_root, self.created_game_root).await + } + .await; let intent_result = write_intent( &self.state_dir, &self.id, @@ -90,7 +112,7 @@ impl StreamedInstallTransaction { ) .await; - staging_result?; + cleanup_result?; intent_result } } @@ -104,18 +126,36 @@ pub async fn begin_streamed_install( eyre::bail!("game {id} is already installed"); } + let created_game_root = !path_exists(game_root).await; tokio::fs::create_dir_all(game_root).await?; let eti_version = read_downloaded_version(game_root).await; - write_intent( + if let Err(err) = write_intent( state_dir, id, &InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()), ) - .await?; + .await + { + if let Err(cleanup_err) = remove_created_empty_game_root(game_root, created_game_root).await + { + log::warn!( + "Failed to clean streamed install game root {}: {cleanup_err}", + game_root.display() + ); + } + return Err(err); + } let staging = installing_dir(game_root); if let Err(err) = prepare_owned_empty_dir(&staging).await { let _ = write_intent(state_dir, id, &InstallIntent::none(id, eti_version)).await; + if let Err(cleanup_err) = remove_created_empty_game_root(game_root, created_game_root).await + { + log::warn!( + "Failed to clean streamed install game root {}: {cleanup_err}", + game_root.display() + ); + } return Err(err); } @@ -127,6 +167,7 @@ pub async fn begin_streamed_install( id: id.to_string(), staging, eti_version, + created_game_root, }) } @@ -586,6 +627,28 @@ async fn remove_dir_all_if_exists(path: &Path) -> eyre::Result<()> { } } +async fn remove_created_empty_game_root(game_root: &Path, created: bool) -> eyre::Result<()> { + if !created { + return Ok(()); + } + remove_empty_dir_if_exists(game_root).await +} + +async fn remove_empty_dir_if_exists(path: &Path) -> eyre::Result<()> { + match tokio::fs::remove_dir(path).await { + Ok(()) => Ok(()), + Err(err) + if matches!( + err.kind(), + ErrorKind::NotFound | ErrorKind::DirectoryNotEmpty + ) => + { + Ok(()) + } + Err(err) => Err(err.into()), + } +} + async fn path_is_dir(path: &Path) -> bool { tokio::fs::metadata(path) .await @@ -727,6 +790,74 @@ mod tests { assert!(!launch_settings_applied_path(state.path(), "game").exists()); } + #[tokio::test] + async fn streamed_install_rollback_removes_new_empty_game_root() { + let temp = TempDir::new("lanspread-install"); + let state = test_state(); + let root = temp.path().join("streamed-game"); + + let transaction = begin_streamed_install(&root, state.path(), "streamed-game") + .await + .expect("streamed transaction should begin"); + assert!(transaction.staging_dir().is_dir()); + + transaction + .rollback() + .await + .expect("streamed rollback should succeed"); + + assert!(!root.exists()); + let intent = read_intent(state.path(), "streamed-game").await; + assert_eq!(intent.state, InstallIntentState::None); + } + + #[tokio::test] + async fn streamed_install_rollback_keeps_existing_game_root() { + let temp = TempDir::new("lanspread-install"); + let state = test_state(); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20250101"); + + let transaction = begin_streamed_install(&root, state.path(), "game") + .await + .expect("streamed transaction should begin"); + + transaction + .rollback() + .await + .expect("streamed rollback should succeed"); + + assert!(root.is_dir()); + assert!(root.join("version.ini").is_file()); + assert!(!root.join(INSTALLING_DIR).exists()); + } + + #[tokio::test] + async fn streamed_install_commit_succeeds_when_post_promote_intent_cleanup_fails() { + let temp = TempDir::new("lanspread-install"); + let state = test_state(); + let root = temp.game_root(); + let transaction = begin_streamed_install(&root, state.path(), "game") + .await + .expect("streamed transaction should begin"); + write_file(&transaction.staging_dir().join("payload.txt"), b"installed"); + + let game_state_dir = crate::state_paths::game_state_dir(state.path(), "game"); + std::fs::remove_dir_all(&game_state_dir).expect("game state dir should be removed"); + write_file(&game_state_dir, b"not a directory"); + + transaction + .commit() + .await + .expect("promoted streamed install should be reported as success"); + + assert_eq!( + std::fs::read(root.join(LOCAL_DIR).join("payload.txt")) + .expect("promoted payload should be present"), + b"installed" + ); + } + #[tokio::test] async fn install_unpacks_multiple_root_eti_archives_in_sorted_order() { let temp = TempDir::new("lanspread-install"); diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index f2053bf..f41ecbc 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -86,6 +86,7 @@ pub use crate::{ stream_install::{ ExternalUnrarStreamProvider, NoopStreamInstallProvider, + StreamInstallFrameSink, StreamInstallFuture, StreamInstallProvider, }, diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs index 18319c0..489ce1e 100644 --- a/crates/lanspread-peer/src/stream_install.rs +++ b/crates/lanspread-peer/src/stream_install.rs @@ -77,11 +77,37 @@ impl SenderArchiveIntegrity { pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; +#[derive(Clone)] +pub struct StreamInstallFrameSink { + frames: mpsc::Sender, + cancel_token: CancellationToken, +} + +impl StreamInstallFrameSink { + fn new(frames: mpsc::Sender, cancel_token: CancellationToken) -> Self { + Self { + frames, + cancel_token, + } + } + + pub async fn send(&self, frame: StreamInstallFrame) -> eyre::Result<()> { + tokio::select! { + () = self.cancel_token.cancelled() => { + eyre::bail!("streamed install frame send was cancelled"); + } + result = self.frames.send(frame) => { + result.map_err(|_| eyre::eyre!("streamed install frame receiver closed")) + } + } + } +} + pub trait StreamInstallProvider: Send + Sync { fn stream_archive<'a>( &'a self, archive: &'a Path, - frames: mpsc::Sender, + frames: StreamInstallFrameSink, cancel_token: CancellationToken, ) -> StreamInstallFuture<'a>; } @@ -93,7 +119,7 @@ impl StreamInstallProvider for NoopStreamInstallProvider { fn stream_archive<'a>( &'a self, archive: &'a Path, - _frames: mpsc::Sender, + _frames: StreamInstallFrameSink, _cancel_token: CancellationToken, ) -> StreamInstallFuture<'a> { Box::pin(async move { @@ -121,7 +147,7 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { fn stream_archive<'a>( &'a self, archive: &'a Path, - frames: mpsc::Sender, + frames: StreamInstallFrameSink, cancel_token: CancellationToken, ) -> StreamInstallFuture<'a> { Box::pin(async move { @@ -132,15 +158,13 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { .unwrap_or("archive.eti") .to_string(); - send_stream_frame( - &frames, - StreamInstallFrame::ArchiveBegin { + frames + .send(StreamInstallFrame::ArchiveBegin { archive_name: archive_name.clone(), solid: listing.solid, unpacked_size: listing.unpacked_size(), - }, - ) - .await?; + }) + .await?; stream_unrar_entries( &self.program, @@ -151,7 +175,9 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { ) .await?; - send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await + frames + .send(StreamInstallFrame::ArchiveEnd { archive_name }) + .await }) } } @@ -268,9 +294,13 @@ fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Re let size = draft .size .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; - let crc32 = draft - .crc32 - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; + let crc32 = match (size, draft.crc32) { + (_, Some(crc32)) => crc32, + (0, None) => 0, + (_, None) => { + eyre::bail!("RAR file entry {relative_path} has no CRC32"); + } + }; (size, Some(crc32)) } RarEntryKind::Directory => (0, None), @@ -289,7 +319,7 @@ async fn stream_unrar_entries( program: &Path, archive: &Path, entries: &[RarEntry], - frames: &mpsc::Sender, + frames: &StreamInstallFrameSink, cancel_token: CancellationToken, ) -> eyre::Result<()> { let mut child = Command::new(program) @@ -315,27 +345,23 @@ async fn stream_unrar_entries( match entry.kind { RarEntryKind::Directory => { - send_stream_frame( - frames, - StreamInstallFrame::Directory { + frames + .send(StreamInstallFrame::Directory { relative_path: entry.relative_path.clone(), - }, - ) - .await?; + }) + .await?; } RarEntryKind::File => { let Some(crc32) = entry.crc32 else { eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); }; - send_stream_frame( - frames, - StreamInstallFrame::FileBegin { + frames + .send(StreamInstallFrame::FileBegin { relative_path: entry.relative_path.clone(), size: entry.size, crc32, - }, - ) - .await?; + }) + .await?; stream_unrar_file_from_stdout( &mut stdout, archive, @@ -345,13 +371,11 @@ async fn stream_unrar_entries( &cancel_token, ) .await?; - send_stream_frame( - frames, - StreamInstallFrame::FileEnd { + frames + .send(StreamInstallFrame::FileEnd { relative_path: entry.relative_path.clone(), - }, - ) - .await?; + }) + .await?; } } } @@ -388,7 +412,7 @@ async fn stream_unrar_file_from_stdout( stdout: &mut (impl AsyncRead + Unpin), archive: &Path, entry: &RarEntry, - frames: &mpsc::Sender, + frames: &StreamInstallFrameSink, buffer: &mut [u8], cancel_token: &CancellationToken, ) -> eyre::Result<()> { @@ -405,13 +429,11 @@ async fn stream_unrar_file_from_stdout( ); } - send_stream_frame( - frames, - StreamInstallFrame::FileChunk { + frames + .send(StreamInstallFrame::FileChunk { bytes: Bytes::copy_from_slice(&buffer[..read]), - }, - ) - .await?; + }) + .await?; remaining = remaining.saturating_sub(u64::try_from(read)?); } @@ -446,16 +468,6 @@ async fn wait_unrar_child( } } -async fn send_stream_frame( - frames: &mpsc::Sender, - frame: StreamInstallFrame, -) -> eyre::Result<()> { - frames - .send(frame) - .await - .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) -} - pub(crate) async fn send_stream_install_error( tx: SendStream, message: impl Into, @@ -501,10 +513,12 @@ pub(crate) async fn send_game_install_stream( let (frame_tx, mut frame_rx) = mpsc::channel(FRAME_CHANNEL_DEPTH); let producer_cancel = cancel_token.child_token(); + let frame_sink = StreamInstallFrameSink::new(frame_tx, producer_cancel.clone()); let game_id_for_producer = game_id.to_string(); let producer = tokio::spawn({ let provider = provider.clone(); let producer_cancel = producer_cancel.clone(); + let frame_sink = frame_sink.clone(); async move { for archive in archives { if producer_cancel.is_cancelled() { @@ -512,16 +526,16 @@ pub(crate) async fn send_game_install_stream( } if let Err(err) = provider - .stream_archive(&archive, frame_tx.clone(), producer_cancel.clone()) + .stream_archive(&archive, frame_sink.clone(), producer_cancel.clone()) .await { let message = err.to_string(); - let _ = frame_tx.send(StreamInstallFrame::Error { message }).await; + let _ = frame_sink.send(StreamInstallFrame::Error { message }).await; return Err(err); } } - let _ = frame_tx.send(StreamInstallFrame::Complete).await; + let _ = frame_sink.send(StreamInstallFrame::Complete).await; Ok(()) } }); @@ -536,6 +550,7 @@ pub(crate) async fn send_game_install_stream( break; } } + drop(frame_rx); let close_result = framed_tx .close() @@ -876,6 +891,31 @@ Details: RAR 5 assert!(err.to_string().contains("has no CRC32")); } + #[test] + fn accepts_zero_size_unrar_file_entries_without_crc32() { + let listing = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/empty.cfg + Type: File + Size: 0 +"#, + ) + .expect("empty file without CRC32 should parse as CRC32 zero"); + + assert_eq!( + listing.entries, + vec![RarEntry { + relative_path: "bin/empty.cfg".to_string(), + kind: RarEntryKind::File, + size: 0, + crc32: Some(0), + }] + ); + } + #[test] fn sender_archive_integrity_accepts_matching_size_and_crc32() { let bytes = b"payload"; diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index 489ccc5..b6fb5f2 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -85,7 +85,6 @@ struct LanSpreadState { peer_runtime: Arc>>, games: Arc>, active_operations: Arc>>, - pending_stream_installs: Arc>>, games_folder: Arc>, peer_game_db: Arc>, catalog: Arc>, @@ -259,16 +258,6 @@ async fn install_game( log::warn!("Game already has an active operation: {id}"); return Ok(false); } - if state - .inner() - .pending_stream_installs - .read() - .await - .contains(&id) - { - log::warn!("Game already has a pending streamed install: {id}"); - return Ok(false); - } let peer_ctrl_arc = state.inner().peer_ctrl.clone(); let peer_ctrl = peer_ctrl_arc.read().await.clone(); @@ -323,16 +312,6 @@ async fn stream_install_game( log::warn!("Game already has an active operation: {id}"); return Ok(false); } - if state - .inner() - .pending_stream_installs - .read() - .await - .contains(&id) - { - log::warn!("Game already has a pending streamed install: {id}"); - return Ok(false); - } let Some((downloaded, installed, peer_count)) = state .inner() @@ -360,19 +339,8 @@ async fn stream_install_game( return Ok(false); }; - { - let mut pending = state.inner().pending_stream_installs.write().await; - pending.insert(id.clone()); - } - - if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id.clone())) { - log::error!("Failed to send PeerCommand::GetGame for streamed install: {e:?}"); - state - .inner() - .pending_stream_installs - .write() - .await - .remove(&id); + if let Err(e) = peer_ctrl.send(PeerCommand::StreamInstallGame { id }) { + log::error!("Failed to send PeerCommand::StreamInstallGame: {e:?}"); return Ok(false); } @@ -2092,7 +2060,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::NoPeersHaveGame { id } => { log::warn!("PeerEvent::NoPeersHaveGame received for {id}"); - clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-no-peers", @@ -2131,7 +2098,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::DownloadGameFilesFailed { id } => { log::warn!("PeerEvent::DownloadGameFilesFailed received"); - clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-download-failed", @@ -2141,7 +2107,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::DownloadGameFilesAllPeersGone { id } => { log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}"); - clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-download-peers-gone", @@ -2280,27 +2245,17 @@ async fn handle_got_game_files( ); let state = app_handle.state::(); - let stream_install = state.pending_stream_installs.write().await.remove(&id); let peer_ctrl = state.peer_ctrl.read().await.clone(); if let Some(peer_ctrl) = peer_ctrl - && let Err(e) = if stream_install { - peer_ctrl.send(PeerCommand::StreamInstallGame { id }) - } else { - peer_ctrl.send(PeerCommand::DownloadGameFiles { - id, - file_descriptions, - }) - } + && let Err(e) = peer_ctrl.send(PeerCommand::DownloadGameFiles { + id, + file_descriptions, + }) { log::error!("Failed to continue queued game transfer: {e}"); } } -async fn clear_pending_stream_install(app_handle: &AppHandle, id: &str) { - let state = app_handle.state::(); - state.pending_stream_installs.write().await.remove(id); -} - fn handle_download_finished(app_handle: &AppHandle, id: String) { log::info!("PeerEvent::DownloadGameFilesFinished received"); emit_game_id_event(