diff --git a/NEXT_STEPS.md b/NEXT_STEPS.md index 4bdd410..8d734ed 100644 --- a/NEXT_STEPS.md +++ b/NEXT_STEPS.md @@ -61,5 +61,6 @@ product-ready. modal status shows `Installed, not shareable`. Downloaded-and-installed games keep the normal `Installed` label. -My recommended next slice: make the provider abstraction final-ish, then -implement a real one-pass provider. Everything else builds cleanly on that. +The remaining production-readiness step is additive: move from sender-owned RAR +metadata to catalog-owned archive or extracted-file hashes, then verify those +at the receiver before committing the streamed install. diff --git a/PEER_CLI_SCENARIOS.md b/PEER_CLI_SCENARIOS.md index 3586367..9fe4895 100644 --- a/PEER_CLI_SCENARIOS.md +++ b/PEER_CLI_SCENARIOS.md @@ -46,7 +46,7 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path. | S36 | Catalog singleton beats stale majority | Five peers advertise one game; one peer has the catalog version and four peers have stale versions. | `list-games` reports `peer_count=1` and the catalog `eti_game_version`; all descriptors and chunks come from the singleton catalog-version peer, while stale peers remain hidden and contribute zero bytes. | | S37 | Single-source download throughput | A source peer advertises a temporary catalog game with one sparse `2 GiB` `.eti`; an empty client downloads it with `install=false`. | The client emits `download-finished` with throughput measurements (`bytes`, `duration_ms`, `mib_per_s`, `mbit_per_s`), and the downloaded archive size matches the source. | | S38 | First-play launch-setting stamping | `fixture-persona/css` ships a real RAR `.eti` whose tree buries a CRLF `SmartSteamEmu.ini` with a stub `PersonaName` line under `engine/bin/win64/steam_settings/`, plus a stub `account_name.txt` and `language.txt` under `profiles/local/`. A peer installs `css` (with `--unrar`), then sends `play css` with a username and language, then `play css` again. | After install the marker `games/css/launch_settings_applied` is absent and the stub files are intact under `local/`. The first `play` returns `already_applied=false` with `account_name_written`, `language_written`, and `persona_name_written` all true; the deep `SmartSteamEmu.ini` `PersonaName` value becomes the username with its `\r\n` ending and sibling lines preserved, `account_name.txt` becomes the username, `language.txt` becomes the passed language, and the marker now exists. A second `play` returns `already_applied=true`, rewrites nothing, and leaves the files untouched even if their values were reset externally. | -| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `got-game-files`, `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. | +| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. | | S40 | Streamed install receiver is not a peer source | After S39, a third peer connects only to the streamed-install receiver. | The third peer may see the receiver's local-only summary in peer snapshots, but `list-games` remote aggregation does not expose `cnctw` as downloadable, `peer_count` remains zero/absent, and attempting `download cnctw` fails with no local files created. | | S41 | Solid archive streamed install | Empty client connects to a peer serving `fixture-solid/cnctw`, whose `.eti` is a real solid RAR archive. The receiver uses the container-bundled `unrar` stream provider. | The fixture is verified as solid with `unrar lt`; streamed install finishes with `downloaded=false`, `installed=true`, `availability=LocalOnly`; root archive and `version.ini` are absent; streamed byte count equals the extracted solid entries; local payload SHA-256 hashes match `unrar p` output. | | S42 | Streamed install whole-stream retry | Empty client connects to two peers serving the same catalog-version `cnctw`: one broken source whose `--unrar` path is missing, followed by one good source. | The broken source sorts before the good source in retry order, contributes zero chunks, and the good source completes a fresh whole-stream attempt. The final state is local-only installed, no root archive/sentinel, no `.local.installing`, byte count matches the extracted entries, and payload hashes match the good source. | diff --git a/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py b/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py index ba8abf3..c2077d2 100644 --- a/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py +++ b/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py @@ -1208,12 +1208,6 @@ class Runner: wait_remote_game(client, "cnctw", peer_count=1) waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) - client.wait_for( - event_is("got-game-files", "cnctw"), - timeout=20, - description="got cnctw files", - waiter=waiter, - ) client.wait_for( event_is("download-begin", "cnctw"), timeout=20, @@ -1320,12 +1314,6 @@ class Runner: waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) - client.wait_for( - event_is("got-game-files", "cnctw"), - timeout=20, - description="got solid cnctw files", - waiter=waiter, - ) client.wait_for( event_is("download-finished", "cnctw"), timeout=60, @@ -1409,12 +1397,6 @@ class Runner: waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) - client.wait_for( - event_is("got-game-files", "cnctw"), - timeout=20, - description="got retry cnctw files", - waiter=waiter, - ) client.wait_for( event_is("download-finished", "cnctw"), timeout=60, diff --git a/crates/lanspread-peer-cli/src/main.rs b/crates/lanspread-peer-cli/src/main.rs index 24d2426..9663284 100644 --- a/crates/lanspread-peer-cli/src/main.rs +++ b/crates/lanspread-peer-cli/src/main.rs @@ -261,7 +261,6 @@ async fn handle_command( CliCommand::StreamInstall { game_id } => { ensure_catalog_game(shared, game_id).await?; ensure_no_active_operation(shared, game_id).await?; - let _ = game_files_for_download(sender, shared, game_id).await?; sender.send(PeerCommand::StreamInstallGame { id: game_id.clone(), })?; diff --git a/crates/lanspread-peer/README.md b/crates/lanspread-peer/README.md index 4e34ef8..55a409b 100644 --- a/crates/lanspread-peer/README.md +++ b/crates/lanspread-peer/README.md @@ -14,8 +14,8 @@ It is designed to run headless – other crates (most notably roots are announced or served. - `PeerCommand` represents the small control surface exposed to the UI layer: `ListGames`, `GetGame`, `FetchLatestFromPeers`, `DownloadGameFiles`, - `InstallGame`, `UninstallGame`, `RemoveDownloadedGame`, `CancelDownload`, - `SetGameDir`, and `GetPeerCount`. + `StreamInstallGame`, `InstallGame`, `UninstallGame`, `RemoveDownloadedGame`, + `CancelDownload`, `SetGameDir`, and `GetPeerCount`. - `PeerEvent` enumerates everything the peer runtime reports back to the UI: library snapshots, download/install/uninstall lifecycle updates, runtime failures, and peer membership changes. @@ -28,8 +28,8 @@ lifetime of the process: 1. **Server component** (`run_server_component`) – listens for QUIC connections, advertises via mDNS, and serves `Request::ListGames`, `Request::GetGame`, - `Request::GetGameFileData`, and `Request::GetGameFileChunk` by reading from - the local game directory. + `Request::GetGameFileData`, `Request::GetGameFileChunk`, and + `Request::StreamInstall` by reading from the local game directory. 2. **Discovery loop** (`run_peer_discovery`) – uses the `lanspread-mdns` helper to discover other peers. The blocking mDNS work is executed on a dedicated thread via `tokio::task::spawn_blocking` so that the Tokio runtime @@ -87,6 +87,26 @@ When the UI asks to download a game: 7. After a successful sentinel commit, `PeerEvent::DownloadGameFilesFinished` is emitted and the peer auto-runs the install transaction. +### Streamed Install Pipeline + +Low-disk installs use `PeerCommand::StreamInstallGame` instead of the normal +archive download pipeline. The peer core owns the whole operation: it refreshes +file metadata from catalog-version peers, runs the same majority file-size +validation used by normal downloads, selects a validated peer list, and emits +the regular download/install lifecycle events while streaming archive-expanded +bytes directly into a `StreamedInstallTransaction`. + +The sender-side `StreamInstallProvider` writes control and chunk frames through +a cancellable `StreamInstallFrameSink`. If the QUIC writer fails because the +receiver cancelled or disconnected, the sink wakes any producer blocked on the +bounded frame channel and lets the transfer guard drop normally. + +Each failed peer attempt rolls back its staging directory before trying the next +validated peer. A transaction that created a previously missing game root +removes that root again when rollback leaves it empty. Once staging has been +renamed to `local/`, post-promote intent or launch-settings cleanup failures are +logged for startup recovery rather than reported as a failed install. + `PeerCommand::CancelDownload` cancels the tracked download token for an active transfer. The transfer task remains responsible for clearing `active_operations`, discarding partial payload files, and refreshing the settled local snapshot, so diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 8985f74..10e68c9 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -471,38 +471,6 @@ pub async fn handle_stream_install_game_command( return; } - let expected_version = catalog_expected_version(ctx, &id).await; - let mut peers = { - match ctx - .peer_game_db - .read() - .await - .validate_file_sizes_majority(&id, expected_version.as_deref()) - { - Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => { - peer_whitelist - } - Ok(_) => { - log::error!("No trusted peers available for streamed install of {id}"); - send_download_failed(tx_notify_ui, &id); - return; - } - Err(err) => { - log::error!( - "File size majority validation failed for streamed install {id}: {err}" - ); - send_download_failed(tx_notify_ui, &id); - return; - } - } - }; - peers.sort(); - if peers.is_empty() { - log::error!("No peer selected for streamed install of {id}"); - send_download_failed(tx_notify_ui, &id); - return; - } - match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { BeginOperationResult::Started => {} BeginOperationResult::AlreadyActive => { @@ -516,6 +484,7 @@ pub async fn handle_stream_install_game_command( } } + let expected_version = catalog_expected_version(ctx, &id).await; let cancel_token = ctx.shutdown.child_token(); ctx.active_downloads .write() @@ -525,8 +494,15 @@ pub async fn handle_stream_install_game_command( let ctx_clone = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.spawn(async move { - run_stream_install_operation(ctx_clone, tx_notify_ui, id, game_root, peers, cancel_token) - .await; + run_stream_install_operation( + ctx_clone, + tx_notify_ui, + id, + game_root, + expected_version, + cancel_token, + ) + .await; }); } @@ -575,7 +551,7 @@ async fn run_stream_install_operation( tx_notify_ui: UnboundedSender, id: String, game_root: PathBuf, - peer_addrs: Vec, + expected_version: Option, cancel_token: CancellationToken, ) { let download_guard = OperationGuard::download( @@ -590,27 +566,94 @@ async fn run_stream_install_operation( PeerEvent::DownloadGameFilesBegin { id: id.clone() }, ); + let peer_addrs = + match select_stream_install_peers(&ctx, &id, expected_version.as_deref(), &cancel_token) + .await + { + Ok(peers) => peers, + Err(err) => { + let download_was_cancelled = cancel_token.is_cancelled(); + if download_was_cancelled { + log::info!("Streamed install preflight cancelled for {id}: {err}"); + } else { + log::error!("Streamed install preflight failed for {id}: {err}"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; + return; + } + }; + + match receive_streamed_install_from_peers( + &ctx, + &tx_notify_ui, + &id, + &game_root, + &peer_addrs, + &cancel_token, + ) + .await + { + Ok(transaction) => { + if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing) + .await + { + clear_active_download(&ctx, &id).await; + send_download_finished(&tx_notify_ui, &id); + download_guard.disarm(); + commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; + return; + } + + if let Err(err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {err}"); + } + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await; + } + Err(err) => { + let download_was_cancelled = cancel_token.is_cancelled(); + if download_was_cancelled { + log::info!("Streamed install download cancelled for {id}: {err}"); + } else { + log::error!("Streamed install download failed for {id}: {err}"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; + } + } +} + +async fn receive_streamed_install_from_peers( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: &str, + game_root: &Path, + peer_addrs: &[SocketAddr], + cancel_token: &CancellationToken, +) -> eyre::Result { let mut last_receive_error = None; - for peer_addr in peer_addrs { + for &peer_addr in peer_addrs { if cancel_token.is_cancelled() { - last_receive_error = Some(eyre::eyre!("streamed install for {id} was cancelled")); - break; + eyre::bail!("streamed install for {id} was cancelled"); } let transaction = - match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id).await { - Ok(transaction) => transaction, - Err(err) => { - log::error!("Failed to prepare streamed install for {id}: {err}"); - finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) - .await; - return; - } - }; - + install::begin_streamed_install(game_root, ctx.state_dir.as_ref(), id).await?; let receive_result = receive_streamed_install( peer_addr, - &id, + id, transaction.staging_dir(), tx_notify_ui.clone(), cancel_token.clone(), @@ -618,37 +661,13 @@ async fn run_stream_install_operation( .await; match receive_result { - Ok(()) => { - if transition_download_to_install( - &ctx, - &tx_notify_ui, - &id, - OperationKind::Installing, - ) - .await - { - clear_active_download(&ctx, &id).await; - send_download_finished(&tx_notify_ui, &id); - download_guard.disarm(); - commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; - return; - } - - if let Err(err) = transaction.rollback().await { - log::error!("Failed to roll back streamed install for {id}: {err}"); - } - finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) - .await; - return; - } + Ok(()) => return Ok(transaction), Err(err) => { if let Err(rollback_err) = transaction.rollback().await { log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); } if cancel_token.is_cancelled() { - log::info!("Streamed install download cancelled for {id}: {err}"); - last_receive_error = Some(err); - break; + return Err(err); } log::warn!( @@ -659,24 +678,84 @@ async fn run_stream_install_operation( } } - let download_was_cancelled = cancel_token.is_cancelled(); - if let Some(err) = last_receive_error { - if download_was_cancelled { - log::info!("Streamed install download cancelled for {id}: {err}"); - } else { - log::error!("Streamed install download failed for {id}: {err}"); - } - } else { - log::error!("Streamed install download failed for {id}: no peer attempts were made"); + Err(last_receive_error.unwrap_or_else(|| { + eyre::eyre!("streamed install download failed for {id}: no peer attempts were made") + })) +} + +async fn select_stream_install_peers( + ctx: &Ctx, + id: &str, + expected_version: Option<&str>, + cancel_token: &CancellationToken, +) -> eyre::Result> { + let mut metadata_peers = { + ctx.peer_game_db + .read() + .await + .peers_with_expected_version(id, expected_version) + }; + metadata_peers.sort(); + if metadata_peers.is_empty() { + eyre::bail!("no peers have game {id}"); } - finish_failed_stream_download( - &ctx, - &tx_notify_ui, - &id, - download_guard, - download_was_cancelled, - ) - .await; + + refresh_stream_install_file_details(ctx, id, &metadata_peers, cancel_token).await?; + + let mut peers = match ctx + .peer_game_db + .read() + .await + .validate_file_sizes_majority(id, expected_version) + { + Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => peer_whitelist, + Ok(_) => { + eyre::bail!("no trusted peers available for streamed install of {id}"); + } + Err(err) => { + return Err(err.wrap_err(format!( + "file size majority validation failed for streamed install {id}" + ))); + } + }; + peers.sort(); + if peers.is_empty() { + eyre::bail!("no peer selected for streamed install of {id}"); + } + + Ok(peers) +} + +async fn refresh_stream_install_file_details( + ctx: &Ctx, + id: &str, + peers: &[SocketAddr], + cancel_token: &CancellationToken, +) -> eyre::Result<()> { + let mut fetched_any = false; + for &peer_addr in peers { + if cancel_token.is_cancelled() { + eyre::bail!("streamed install for {id} was cancelled"); + } + + match request_game_details_and_update(peer_addr, id, ctx.peer_game_db.clone()).await { + Ok(_) => { + log::info!("Fetched streamed-install file list for {id} from peer {peer_addr}"); + fetched_any = true; + } + Err(err) => { + log::error!( + "Failed to fetch streamed-install files for {id} from {peer_addr}: {err}" + ); + } + } + } + + if !fetched_any { + eyre::bail!("failed to retrieve game files for {id} from any peer"); + } + + Ok(()) } async fn finish_failed_stream_download( diff --git a/crates/lanspread-peer/src/install/transaction.rs b/crates/lanspread-peer/src/install/transaction.rs index 959dc1c..531cba9 100644 --- a/crates/lanspread-peer/src/install/transaction.rs +++ b/crates/lanspread-peer/src/install/transaction.rs @@ -39,6 +39,7 @@ pub struct StreamedInstallTransaction { id: String, staging: PathBuf, eti_version: Option, + created_game_root: bool, } impl StreamedInstallTransaction { @@ -49,40 +50,61 @@ impl StreamedInstallTransaction { pub async fn commit(self) -> eyre::Result<()> { let local = local_dir(&self.game_root); - let result = async { - tokio::fs::rename(&self.staging, &local) - .await - .wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))?; - reset_launch_settings_marker(&self.state_dir, &self.id).await?; - write_intent( - &self.state_dir, - &self.id, - &InstallIntent::none(&self.id, self.eti_version.clone()), - ) + if let Err(err) = tokio::fs::rename(&self.staging, &local) .await - } - .await; - - if result.is_err() { + .wrap_err_with(|| format!("failed to promote streamed install for {}", self.id)) + { if let Err(cleanup_err) = remove_dir_all_if_exists(&self.staging).await { log::warn!( "Failed to clean streamed install staging {}: {cleanup_err}", self.staging.display() ); } + if let Err(cleanup_err) = + remove_created_empty_game_root(&self.game_root, self.created_game_root).await + { + log::warn!( + "Failed to clean streamed install game root {}: {cleanup_err}", + self.game_root.display() + ); + } let _ = write_intent( &self.state_dir, &self.id, &InstallIntent::none(&self.id, self.eti_version.clone()), ) .await; + return Err(err); } - result + if let Err(err) = reset_launch_settings_marker(&self.state_dir, &self.id).await { + log::error!( + "Streamed install for {} was promoted but launch-settings marker reset failed: {err}", + self.id + ); + } + if let Err(err) = write_intent( + &self.state_dir, + &self.id, + &InstallIntent::none(&self.id, self.eti_version.clone()), + ) + .await + { + log::error!( + "Streamed install for {} was promoted but intent cleanup failed: {err}", + self.id + ); + } + + Ok(()) } pub async fn rollback(self) -> eyre::Result<()> { - let staging_result = remove_dir_all_if_exists(&self.staging).await; + let cleanup_result = async { + remove_dir_all_if_exists(&self.staging).await?; + remove_created_empty_game_root(&self.game_root, self.created_game_root).await + } + .await; let intent_result = write_intent( &self.state_dir, &self.id, @@ -90,7 +112,7 @@ impl StreamedInstallTransaction { ) .await; - staging_result?; + cleanup_result?; intent_result } } @@ -104,18 +126,36 @@ pub async fn begin_streamed_install( eyre::bail!("game {id} is already installed"); } + let created_game_root = !path_exists(game_root).await; tokio::fs::create_dir_all(game_root).await?; let eti_version = read_downloaded_version(game_root).await; - write_intent( + if let Err(err) = write_intent( state_dir, id, &InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()), ) - .await?; + .await + { + if let Err(cleanup_err) = remove_created_empty_game_root(game_root, created_game_root).await + { + log::warn!( + "Failed to clean streamed install game root {}: {cleanup_err}", + game_root.display() + ); + } + return Err(err); + } let staging = installing_dir(game_root); if let Err(err) = prepare_owned_empty_dir(&staging).await { let _ = write_intent(state_dir, id, &InstallIntent::none(id, eti_version)).await; + if let Err(cleanup_err) = remove_created_empty_game_root(game_root, created_game_root).await + { + log::warn!( + "Failed to clean streamed install game root {}: {cleanup_err}", + game_root.display() + ); + } return Err(err); } @@ -127,6 +167,7 @@ pub async fn begin_streamed_install( id: id.to_string(), staging, eti_version, + created_game_root, }) } @@ -586,6 +627,28 @@ async fn remove_dir_all_if_exists(path: &Path) -> eyre::Result<()> { } } +async fn remove_created_empty_game_root(game_root: &Path, created: bool) -> eyre::Result<()> { + if !created { + return Ok(()); + } + remove_empty_dir_if_exists(game_root).await +} + +async fn remove_empty_dir_if_exists(path: &Path) -> eyre::Result<()> { + match tokio::fs::remove_dir(path).await { + Ok(()) => Ok(()), + Err(err) + if matches!( + err.kind(), + ErrorKind::NotFound | ErrorKind::DirectoryNotEmpty + ) => + { + Ok(()) + } + Err(err) => Err(err.into()), + } +} + async fn path_is_dir(path: &Path) -> bool { tokio::fs::metadata(path) .await @@ -727,6 +790,74 @@ mod tests { assert!(!launch_settings_applied_path(state.path(), "game").exists()); } + #[tokio::test] + async fn streamed_install_rollback_removes_new_empty_game_root() { + let temp = TempDir::new("lanspread-install"); + let state = test_state(); + let root = temp.path().join("streamed-game"); + + let transaction = begin_streamed_install(&root, state.path(), "streamed-game") + .await + .expect("streamed transaction should begin"); + assert!(transaction.staging_dir().is_dir()); + + transaction + .rollback() + .await + .expect("streamed rollback should succeed"); + + assert!(!root.exists()); + let intent = read_intent(state.path(), "streamed-game").await; + assert_eq!(intent.state, InstallIntentState::None); + } + + #[tokio::test] + async fn streamed_install_rollback_keeps_existing_game_root() { + let temp = TempDir::new("lanspread-install"); + let state = test_state(); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20250101"); + + let transaction = begin_streamed_install(&root, state.path(), "game") + .await + .expect("streamed transaction should begin"); + + transaction + .rollback() + .await + .expect("streamed rollback should succeed"); + + assert!(root.is_dir()); + assert!(root.join("version.ini").is_file()); + assert!(!root.join(INSTALLING_DIR).exists()); + } + + #[tokio::test] + async fn streamed_install_commit_succeeds_when_post_promote_intent_cleanup_fails() { + let temp = TempDir::new("lanspread-install"); + let state = test_state(); + let root = temp.game_root(); + let transaction = begin_streamed_install(&root, state.path(), "game") + .await + .expect("streamed transaction should begin"); + write_file(&transaction.staging_dir().join("payload.txt"), b"installed"); + + let game_state_dir = crate::state_paths::game_state_dir(state.path(), "game"); + std::fs::remove_dir_all(&game_state_dir).expect("game state dir should be removed"); + write_file(&game_state_dir, b"not a directory"); + + transaction + .commit() + .await + .expect("promoted streamed install should be reported as success"); + + assert_eq!( + std::fs::read(root.join(LOCAL_DIR).join("payload.txt")) + .expect("promoted payload should be present"), + b"installed" + ); + } + #[tokio::test] async fn install_unpacks_multiple_root_eti_archives_in_sorted_order() { let temp = TempDir::new("lanspread-install"); diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index f2053bf..f41ecbc 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -86,6 +86,7 @@ pub use crate::{ stream_install::{ ExternalUnrarStreamProvider, NoopStreamInstallProvider, + StreamInstallFrameSink, StreamInstallFuture, StreamInstallProvider, }, diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs index 18319c0..489ce1e 100644 --- a/crates/lanspread-peer/src/stream_install.rs +++ b/crates/lanspread-peer/src/stream_install.rs @@ -77,11 +77,37 @@ impl SenderArchiveIntegrity { pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; +#[derive(Clone)] +pub struct StreamInstallFrameSink { + frames: mpsc::Sender, + cancel_token: CancellationToken, +} + +impl StreamInstallFrameSink { + fn new(frames: mpsc::Sender, cancel_token: CancellationToken) -> Self { + Self { + frames, + cancel_token, + } + } + + pub async fn send(&self, frame: StreamInstallFrame) -> eyre::Result<()> { + tokio::select! { + () = self.cancel_token.cancelled() => { + eyre::bail!("streamed install frame send was cancelled"); + } + result = self.frames.send(frame) => { + result.map_err(|_| eyre::eyre!("streamed install frame receiver closed")) + } + } + } +} + pub trait StreamInstallProvider: Send + Sync { fn stream_archive<'a>( &'a self, archive: &'a Path, - frames: mpsc::Sender, + frames: StreamInstallFrameSink, cancel_token: CancellationToken, ) -> StreamInstallFuture<'a>; } @@ -93,7 +119,7 @@ impl StreamInstallProvider for NoopStreamInstallProvider { fn stream_archive<'a>( &'a self, archive: &'a Path, - _frames: mpsc::Sender, + _frames: StreamInstallFrameSink, _cancel_token: CancellationToken, ) -> StreamInstallFuture<'a> { Box::pin(async move { @@ -121,7 +147,7 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { fn stream_archive<'a>( &'a self, archive: &'a Path, - frames: mpsc::Sender, + frames: StreamInstallFrameSink, cancel_token: CancellationToken, ) -> StreamInstallFuture<'a> { Box::pin(async move { @@ -132,15 +158,13 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { .unwrap_or("archive.eti") .to_string(); - send_stream_frame( - &frames, - StreamInstallFrame::ArchiveBegin { + frames + .send(StreamInstallFrame::ArchiveBegin { archive_name: archive_name.clone(), solid: listing.solid, unpacked_size: listing.unpacked_size(), - }, - ) - .await?; + }) + .await?; stream_unrar_entries( &self.program, @@ -151,7 +175,9 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { ) .await?; - send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await + frames + .send(StreamInstallFrame::ArchiveEnd { archive_name }) + .await }) } } @@ -268,9 +294,13 @@ fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Re let size = draft .size .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; - let crc32 = draft - .crc32 - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; + let crc32 = match (size, draft.crc32) { + (_, Some(crc32)) => crc32, + (0, None) => 0, + (_, None) => { + eyre::bail!("RAR file entry {relative_path} has no CRC32"); + } + }; (size, Some(crc32)) } RarEntryKind::Directory => (0, None), @@ -289,7 +319,7 @@ async fn stream_unrar_entries( program: &Path, archive: &Path, entries: &[RarEntry], - frames: &mpsc::Sender, + frames: &StreamInstallFrameSink, cancel_token: CancellationToken, ) -> eyre::Result<()> { let mut child = Command::new(program) @@ -315,27 +345,23 @@ async fn stream_unrar_entries( match entry.kind { RarEntryKind::Directory => { - send_stream_frame( - frames, - StreamInstallFrame::Directory { + frames + .send(StreamInstallFrame::Directory { relative_path: entry.relative_path.clone(), - }, - ) - .await?; + }) + .await?; } RarEntryKind::File => { let Some(crc32) = entry.crc32 else { eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); }; - send_stream_frame( - frames, - StreamInstallFrame::FileBegin { + frames + .send(StreamInstallFrame::FileBegin { relative_path: entry.relative_path.clone(), size: entry.size, crc32, - }, - ) - .await?; + }) + .await?; stream_unrar_file_from_stdout( &mut stdout, archive, @@ -345,13 +371,11 @@ async fn stream_unrar_entries( &cancel_token, ) .await?; - send_stream_frame( - frames, - StreamInstallFrame::FileEnd { + frames + .send(StreamInstallFrame::FileEnd { relative_path: entry.relative_path.clone(), - }, - ) - .await?; + }) + .await?; } } } @@ -388,7 +412,7 @@ async fn stream_unrar_file_from_stdout( stdout: &mut (impl AsyncRead + Unpin), archive: &Path, entry: &RarEntry, - frames: &mpsc::Sender, + frames: &StreamInstallFrameSink, buffer: &mut [u8], cancel_token: &CancellationToken, ) -> eyre::Result<()> { @@ -405,13 +429,11 @@ async fn stream_unrar_file_from_stdout( ); } - send_stream_frame( - frames, - StreamInstallFrame::FileChunk { + frames + .send(StreamInstallFrame::FileChunk { bytes: Bytes::copy_from_slice(&buffer[..read]), - }, - ) - .await?; + }) + .await?; remaining = remaining.saturating_sub(u64::try_from(read)?); } @@ -446,16 +468,6 @@ async fn wait_unrar_child( } } -async fn send_stream_frame( - frames: &mpsc::Sender, - frame: StreamInstallFrame, -) -> eyre::Result<()> { - frames - .send(frame) - .await - .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) -} - pub(crate) async fn send_stream_install_error( tx: SendStream, message: impl Into, @@ -501,10 +513,12 @@ pub(crate) async fn send_game_install_stream( let (frame_tx, mut frame_rx) = mpsc::channel(FRAME_CHANNEL_DEPTH); let producer_cancel = cancel_token.child_token(); + let frame_sink = StreamInstallFrameSink::new(frame_tx, producer_cancel.clone()); let game_id_for_producer = game_id.to_string(); let producer = tokio::spawn({ let provider = provider.clone(); let producer_cancel = producer_cancel.clone(); + let frame_sink = frame_sink.clone(); async move { for archive in archives { if producer_cancel.is_cancelled() { @@ -512,16 +526,16 @@ pub(crate) async fn send_game_install_stream( } if let Err(err) = provider - .stream_archive(&archive, frame_tx.clone(), producer_cancel.clone()) + .stream_archive(&archive, frame_sink.clone(), producer_cancel.clone()) .await { let message = err.to_string(); - let _ = frame_tx.send(StreamInstallFrame::Error { message }).await; + let _ = frame_sink.send(StreamInstallFrame::Error { message }).await; return Err(err); } } - let _ = frame_tx.send(StreamInstallFrame::Complete).await; + let _ = frame_sink.send(StreamInstallFrame::Complete).await; Ok(()) } }); @@ -536,6 +550,7 @@ pub(crate) async fn send_game_install_stream( break; } } + drop(frame_rx); let close_result = framed_tx .close() @@ -876,6 +891,31 @@ Details: RAR 5 assert!(err.to_string().contains("has no CRC32")); } + #[test] + fn accepts_zero_size_unrar_file_entries_without_crc32() { + let listing = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/empty.cfg + Type: File + Size: 0 +"#, + ) + .expect("empty file without CRC32 should parse as CRC32 zero"); + + assert_eq!( + listing.entries, + vec![RarEntry { + relative_path: "bin/empty.cfg".to_string(), + kind: RarEntryKind::File, + size: 0, + crc32: Some(0), + }] + ); + } + #[test] fn sender_archive_integrity_accepts_matching_size_and_crc32() { let bytes = b"payload"; diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index 489ccc5..b6fb5f2 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -85,7 +85,6 @@ struct LanSpreadState { peer_runtime: Arc>>, games: Arc>, active_operations: Arc>>, - pending_stream_installs: Arc>>, games_folder: Arc>, peer_game_db: Arc>, catalog: Arc>, @@ -259,16 +258,6 @@ async fn install_game( log::warn!("Game already has an active operation: {id}"); return Ok(false); } - if state - .inner() - .pending_stream_installs - .read() - .await - .contains(&id) - { - log::warn!("Game already has a pending streamed install: {id}"); - return Ok(false); - } let peer_ctrl_arc = state.inner().peer_ctrl.clone(); let peer_ctrl = peer_ctrl_arc.read().await.clone(); @@ -323,16 +312,6 @@ async fn stream_install_game( log::warn!("Game already has an active operation: {id}"); return Ok(false); } - if state - .inner() - .pending_stream_installs - .read() - .await - .contains(&id) - { - log::warn!("Game already has a pending streamed install: {id}"); - return Ok(false); - } let Some((downloaded, installed, peer_count)) = state .inner() @@ -360,19 +339,8 @@ async fn stream_install_game( return Ok(false); }; - { - let mut pending = state.inner().pending_stream_installs.write().await; - pending.insert(id.clone()); - } - - if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id.clone())) { - log::error!("Failed to send PeerCommand::GetGame for streamed install: {e:?}"); - state - .inner() - .pending_stream_installs - .write() - .await - .remove(&id); + if let Err(e) = peer_ctrl.send(PeerCommand::StreamInstallGame { id }) { + log::error!("Failed to send PeerCommand::StreamInstallGame: {e:?}"); return Ok(false); } @@ -2092,7 +2060,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::NoPeersHaveGame { id } => { log::warn!("PeerEvent::NoPeersHaveGame received for {id}"); - clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-no-peers", @@ -2131,7 +2098,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::DownloadGameFilesFailed { id } => { log::warn!("PeerEvent::DownloadGameFilesFailed received"); - clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-download-failed", @@ -2141,7 +2107,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::DownloadGameFilesAllPeersGone { id } => { log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}"); - clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-download-peers-gone", @@ -2280,27 +2245,17 @@ async fn handle_got_game_files( ); let state = app_handle.state::(); - let stream_install = state.pending_stream_installs.write().await.remove(&id); let peer_ctrl = state.peer_ctrl.read().await.clone(); if let Some(peer_ctrl) = peer_ctrl - && let Err(e) = if stream_install { - peer_ctrl.send(PeerCommand::StreamInstallGame { id }) - } else { - peer_ctrl.send(PeerCommand::DownloadGameFiles { - id, - file_descriptions, - }) - } + && let Err(e) = peer_ctrl.send(PeerCommand::DownloadGameFiles { + id, + file_descriptions, + }) { log::error!("Failed to continue queued game transfer: {e}"); } } -async fn clear_pending_stream_install(app_handle: &AppHandle, id: &str) { - let state = app_handle.state::(); - state.pending_stream_installs.write().await.remove(id); -} - fn handle_download_finished(app_handle: &AppHandle, id: String) { log::info!("PeerEvent::DownloadGameFilesFinished received"); emit_game_id_event(