From 373def6d44db7f08e079e26358824f4ab662d9b0 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sun, 7 Jun 2026 20:31:51 +0200 Subject: [PATCH 01/15] feat(peer): prototype streamed installs Add a streamed-install prototype that can receive archive-derived install bytes straight into local/ without first storing the peer-owned root archive payload. This is intended for low-disk clients that want to install a game but opt out of becoming a downloadable peer source for that game. The protocol gains a current-version-only StreamInstall request and framed StreamInstallFrame responses. The peer core owns the generic transport, transaction, path validation, size checks, CRC32 verification, and lifecycle state. The archive-specific work is hidden behind StreamInstallProvider so the prototype can use unrar while the final implementation can swap in a better provider without rewriting the peer command path. The receiver writes into .local.installing and only promotes to local/ after the full stream verifies. It deliberately does not write the root version.ini or archive files, so the settled local state is installed=true, downloaded=false, and availability=LocalOnly. That preserves the existing rule that local/ is not served to peers and makes streamed receivers non-sources by construction. The CLI is the only caller for now. It exposes stream-install and provides the prototype unrar implementation with unrar lt for entry metadata and unrar p for file bytes. This is simple and good enough to prove non-solid archive streaming, but it is not the production provider shape for solid archives because per-file unrar p would repeatedly decompress prefixes. The Tauri app explicitly passes stream_install_provider: None, so the GUI behavior stays unchanged until a real product path is designed. Document the production-readiness work in NEXT_STEPS.md. The main follow-up is to make the provider abstraction final-ish and replace the per-file CLI unrar provider with a one-pass archive provider, then wire a deliberate GUI low-disk mode, retry semantics, and broader failure scenarios. Test Plan: - just fmt - RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test - python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \ S39 S40 --build-image - RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just clippy - git diff --check - git diff --cached --check Follow-up: NEXT_STEPS.md --- Cargo.lock | 4 + Cargo.toml | 1 + NEXT_STEPS.md | 64 +++ PEER_CLI_SCENARIOS.md | 34 +- crates/lanspread-peer-cli/Cargo.toml | 3 + crates/lanspread-peer-cli/Dockerfile | 6 +- .../scripts/run_extended_scenarios.py | 132 ++++++- crates/lanspread-peer-cli/src/lib.rs | 330 +++++++++++++++- crates/lanspread-peer-cli/src/main.rs | 29 +- crates/lanspread-peer/Cargo.toml | 1 + crates/lanspread-peer/src/context.rs | 14 +- crates/lanspread-peer/src/handlers.rs | 233 +++++++++++ crates/lanspread-peer/src/install/mod.rs | 10 +- .../lanspread-peer/src/install/transaction.rs | 99 ++++- crates/lanspread-peer/src/lib.rs | 33 +- .../lanspread-peer/src/services/handshake.rs | 1 + .../src/services/local_monitor.rs | 1 + crates/lanspread-peer/src/services/stream.rs | 48 +++ crates/lanspread-peer/src/startup.rs | 3 + crates/lanspread-peer/src/stream_install.rs | 372 ++++++++++++++++++ crates/lanspread-proto/src/lib.rs | 60 ++- .../src-tauri/src/lib.rs | 1 + 22 files changed, 1465 insertions(+), 14 deletions(-) create mode 100644 NEXT_STEPS.md create mode 100644 crates/lanspread-peer/src/stream_install.rs diff --git a/Cargo.lock b/Cargo.lock index aff387a..32f6751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2035,6 +2035,7 @@ name = "lanspread-peer" version = "0.1.0" dependencies = [ "bytes", + "crc32fast", "eyre", "futures", "gethostname", @@ -2059,13 +2060,16 @@ dependencies = [ name = "lanspread-peer-cli" version = "0.1.0" dependencies = [ + "bytes", "eyre", "lanspread-compat", "lanspread-db", "lanspread-peer", + "lanspread-proto", "serde", "serde_json", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c3d57d5..705d512 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ [workspace.dependencies] base64 = "0.22" bytes = { version = "1", features = ["serde"] } +crc32fast = "1" eyre = "0.6" futures = "0.3" gethostname = "1" diff --git a/NEXT_STEPS.md b/NEXT_STEPS.md new file mode 100644 index 0000000..1806a1d --- /dev/null +++ b/NEXT_STEPS.md @@ -0,0 +1,64 @@ +# Streamed Install Next Steps + +I’d treat the prototype as proof of the hard part: “can we stream +archive-derived install bytes into `local/` without making the receiver a +source?” Yes. Next I’d harden the pieces that decide whether this is +product-ready. + +1. **Move from CLI-only to real app integration** + + Add a GUI command/control path for “stream install / low disk mode”, + probably behind an explicit option. The Tauri crate currently opts out with + `stream_install_provider: None`, so the GUI cannot use it yet. + +2. **Replace per-file `unrar p` with a final archive provider** + + The prototype provider is intentionally simple: `unrar lt`, then `unrar p` + per file. Good for non-solid archives, bad for solid archives. Final shape + should be a one-pass provider with real entry boundaries, likely via libunrar + or a purpose-built wrapper. + +3. **Handle solid archives deliberately** + + Add archive inspection that decides: + + - non-solid: per-file streaming is fine + - solid: one sequential archive pass only + + This is the big architectural fork we discussed, and the prototype’s + provider is the thing to swap. + +4. **Decide the integrity model** + + Current prototype verifies streamed bytes against RAR CRC32 from the + sender’s archive headers. That catches corruption and provider bugs. It does + not protect against a malicious peer lying. If you care about that, the next + step is catalog-side trusted hashes for archive or extracted files. + +5. **Upgrade retry/resume semantics** + + Right now, failed stream means failed operation and rollback. Next useful + step: + + - retry whole stream from another trusted peer + - later, maybe keep completed files and restart only the interrupted file + - avoid byte-offset resume until there’s a strong reason + +6. **Expand scenario coverage** + + I’d add cases for: + + - sender disconnect mid-stream + - receiver cancel mid-stream + - corrupted/truncated stream fails and leaves no `local/` + - already-installed game rejects streamed install + - multi-archive `.eti` roots stream in sorted order + +7. **Clean product semantics** + + Decide how the UI labels this state. It is installed but not downloaded, so + “Local only” is technically correct, but users may need a clear affordance + like “Installed, not shareable”. + +My recommended next slice: make the provider abstraction final-ish, then +implement a real one-pass provider. Everything else builds cleanly on that. diff --git a/PEER_CLI_SCENARIOS.md b/PEER_CLI_SCENARIOS.md index a7aee4a..1cb8cbf 100644 --- a/PEER_CLI_SCENARIOS.md +++ b/PEER_CLI_SCENARIOS.md @@ -46,6 +46,8 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path. | S36 | Latest singleton beats stale majority | Five peers advertise one game; one peer has `20260501`, four peers have `20250101`. | `list-games` reports `eti_game_version=20260501`; all descriptors and chunks come from the singleton latest peer; stale peers contribute zero bytes. | | S37 | Single-source download throughput | A source peer advertises a temporary catalog game with one sparse `2 GiB` `.eti`; an empty client downloads it with `install=false`. | The client emits `download-finished` with throughput measurements (`bytes`, `duration_ms`, `mib_per_s`, `mbit_per_s`), and the downloaded archive size matches the source. | | S38 | First-play launch-setting stamping | `fixture-persona/css` ships a real RAR `.eti` whose tree buries a CRLF `SmartSteamEmu.ini` with a stub `PersonaName` line under `engine/bin/win64/steam_settings/`, plus a stub `account_name.txt` and `language.txt` under `profiles/local/`. A peer installs `css` (with `--unrar`), then sends `play css` with a username and language, then `play css` again. | After install the marker `games/css/launch_settings_applied` is absent and the stub files are intact under `local/`. The first `play` returns `already_applied=false` with `account_name_written`, `language_written`, and `persona_name_written` all true; the deep `SmartSteamEmu.ini` `PersonaName` value becomes the username with its `\r\n` ending and sibling lines preserved, `account_name.txt` becomes the username, `language.txt` becomes the passed language, and the marker now exists. A second `play` returns `already_applied=true`, rewrites nothing, and leaves the files untouched even if their values were reset externally. | +| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `got-game-files`, `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. | +| S40 | Streamed install receiver is not a peer source | After S39, a third peer connects only to the streamed-install receiver. | The third peer may see the receiver's local-only summary in peer snapshots, but `list-games` remote aggregation does not expose `cnctw` as downloadable, `peer_count` remains zero/absent, and attempting `download cnctw` fails with no local files created. | ## Version-Skew Contract @@ -105,13 +107,37 @@ Use S38 to pin down how launcher settings are stamped into an installed game: line keeps its existing line ending (`\n` or `\r\n`). - The marker records only that we *tried*: it is written unconditionally after the first play, so a game with none of these files is still marked done. -- S38 needs a real archive expanded with `--unrar`, so it runs against the host - `lanspread-peer-cli` binary rather than the Docker matrix image (which omits - `unrar`). The peer crate's `launch_settings` unit tests cover the rewrite, - line-ending, and marker logic deterministically. +- S38 needs a real archive expanded with `--unrar`; the Docker matrix image now + carries the Linux sidecar for streamed-install coverage, while the peer + crate's `launch_settings` unit tests cover the rewrite, line-ending, and + marker logic deterministically. ## Run Log +### 2026-06-07 - Streamed Install Prototype (S39-S40) + +- Code under test added `stream-install` to `lanspread-peer-cli`, a peer + `StreamInstallGame` command, streamed install frames over QUIC, and an + injected `unrar lt`/`unrar p` provider for archive-derived bytes. +- Gates before Docker: `just fmt` and + `RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test` passed for the + workspace. +- Runner: + `python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S39 S40 --build-image` + passed against the rebuilt `lanspread-peer-cli:dev` image. +- S39 streamed a catalog-version-adjusted `cnctw` fixture from a real RAR + `.eti` into the receiver's `local/` only. The receiver had + `downloaded=false`, `installed=true`, `availability=LocalOnly`, no root + `version.ini`, no root `.eti`, and payload SHA-256 hashes + `82f4da22dc042166def2a5ee2eca19fc9e52785f99838e86c32167cb342e2588` + (`bin/cnctw-payload.bin`) and + `abf833a06c74ea9f17d505c2684186491898ce906405e0f098f0deac19476b06` + (`data/cnctw-assets.dat`) matching `unrar p`. +- S40 connected an observer only to that streamed-install receiver. The + observer saw the receiver's `cnctw` summary as local-only, remote aggregation + hid it as a downloadable source, and `download cnctw` failed with + `no peers have game cnctw`. + ### 2026-05-28 - First-Play Launch-Setting Stamping (S38) - Code under test moved the `account_name.txt`/`language.txt` overwrite out of diff --git a/crates/lanspread-peer-cli/Cargo.toml b/crates/lanspread-peer-cli/Cargo.toml index 7c18e7a..5a96902 100644 --- a/crates/lanspread-peer-cli/Cargo.toml +++ b/crates/lanspread-peer-cli/Cargo.toml @@ -14,11 +14,14 @@ path = "src/main.rs" lanspread-compat = { path = "../lanspread-compat" } lanspread-db = { path = "../lanspread-db" } lanspread-peer = { path = "../lanspread-peer" } +lanspread-proto = { path = "../lanspread-proto" } +bytes = { workspace = true } eyre = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } [lints.clippy] needless_pass_by_value = "allow" diff --git a/crates/lanspread-peer-cli/Dockerfile b/crates/lanspread-peer-cli/Dockerfile index ca24bfc..ad636f3 100644 --- a/crates/lanspread-peer-cli/Dockerfile +++ b/crates/lanspread-peer-cli/Dockerfile @@ -4,14 +4,16 @@ WORKDIR /work COPY . . RUN cargo build --release -p lanspread-peer-cli -FROM debian:bookworm-slim +FROM debian:trixie-slim RUN apt-get update \ - && apt-get install -y --no-install-recommends ca-certificates \ + && apt-get install -y --no-install-recommends ca-certificates libstdc++6 \ && rm -rf /var/lib/apt/lists/* COPY --from=build /work/target/release/lanspread-peer-cli /usr/local/bin/lanspread-peer-cli COPY crates/lanspread-tauri-deno-ts/src-tauri/game.db /app/game.db +COPY crates/lanspread-tauri-deno-ts/src-tauri/binaries/unrar-x86_64-unknown-linux-gnu /usr/local/bin/unrar +RUN chmod +x /usr/local/bin/unrar ENTRYPOINT ["lanspread-peer-cli"] CMD ["--games-dir", "/games", "--state-dir", "/state", "--catalog-db", "/app/game.db"] diff --git a/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py b/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py index 8c262a6..85874fa 100644 --- a/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py +++ b/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Run the peer-cli scenarios S1-S36 through Docker.""" +"""Run the peer-cli scenarios S1-S40 through Docker.""" from __future__ import annotations @@ -8,6 +8,7 @@ import hashlib import json import os import queue +import shlex import shutil import subprocess import sys @@ -325,6 +326,8 @@ class Runner: ("S35", self.s35_unknown_game_filtered), ("S36", self.s36_latest_singleton), ("S37", self.s37_single_source_download_throughput), + ("S39", self.s39_streamed_install_local_only), + ("S40", self.s40_streamed_receiver_not_source), ] for scenario_id, scenario in scenarios: @@ -1060,6 +1063,114 @@ class Runner: f"{throughput['chunks']} chunks" ) + def stream_install_cnctw(self, prefix: str) -> tuple[Peer, Peer]: + source_dir = self.fixture_root / f"{prefix}-bravo" + copy_game("cnctw", source_dir, version="20160128") + source = self.peer(f"{prefix}-bravo", games_dir=source_dir) + client = self.peer(f"{prefix}-client") + connect_many(client, [source]) + wait_remote_game(client, "cnctw", peer_count=1) + waiter = LineWaiter(len(client.output)) + client.send({"cmd": "stream-install", "game_id": "cnctw"}) + client.wait_for( + event_is("got-game-files", "cnctw"), + timeout=20, + description="got cnctw files", + waiter=waiter, + ) + client.wait_for( + event_is("download-begin", "cnctw"), + timeout=20, + description="stream begin cnctw", + waiter=waiter, + ) + client.wait_for( + event_is("download-finished", "cnctw"), + timeout=60, + description="stream finish cnctw", + waiter=waiter, + ) + client.wait_for( + event_is("install-finished", "cnctw"), + timeout=30, + description="stream install cnctw", + waiter=waiter, + ) + return source, client + + def s39_streamed_install_local_only(self) -> str: + source, client = self.stream_install_cnctw("s39") + game = wait_local_game(client, "cnctw", downloaded=False, installed=True) + assert_game_state( + game, + downloaded=False, + installed=True, + availability="LocalOnly", + ) + + game_root = client.host_games_dir / "cnctw" + assert_not_exists(game_root / "version.ini") + assert_not_exists(game_root / "cnctw.eti") + + expected = { + "bin/cnctw-payload.bin": unrar_entry_sha256( + source, "cnctw", "bin/cnctw-payload.bin" + ), + "data/cnctw-assets.dat": unrar_entry_sha256( + source, "cnctw", "data/cnctw-assets.dat" + ), + } + actual = { + rel: sha256_file(game_root / "local" / rel) + for rel in expected + } + if actual != expected: + raise ScenarioError(f"streamed local payload hashes mismatched: {actual} != {expected}") + + streamed_bytes = sum( + int(item.get("data", {}).get("length", 0)) + for item in client.output + if item.get("type") == "event" + and item.get("event") == "download-chunk-finished" + and item.get("data", {}).get("game_id") == "cnctw" + ) + expected_bytes = 3 * 1024 * 1024 + if streamed_bytes != expected_bytes: + raise ScenarioError( + f"streamed byte count mismatch: {streamed_bytes} != {expected_bytes}" + ) + + return ( + "cnctw streamed into local/ only; root archive and version.ini absent; " + f"payload hashes={actual}" + ) + + def s40_streamed_receiver_not_source(self) -> str: + _source, receiver = self.stream_install_cnctw("s40") + observer = self.peer("s40-observer") + connect_many(observer, [receiver]) + receiver_snapshot = wait_peer_has_game(observer, receiver.peer_id, "cnctw") + summary = next( + game + for game in receiver_snapshot.get("games", []) + if game.get("id") == "cnctw" + ) + if summary.get("availability") != "LocalOnly" or summary.get("downloaded"): + raise ScenarioError(f"receiver did not advertise cnctw as local-only: {summary}") + + wait_remote_absent(observer, "cnctw", timeout=5) + err = observer.send( + {"cmd": "download", "game_id": "cnctw", "install": False}, + expect_error=True, + ) + if "no peers have game cnctw" not in err["error"]: + raise ScenarioError(f"unexpected local-only download error: {err}") + assert_not_exists(observer.host_games_dir / "cnctw") + return ( + "observer saw receiver's local-only cnctw snapshot, but remote aggregation hid it " + f"and download errored '{err['error']}'" + ) + def run(command: list[str], description: str) -> subprocess.CompletedProcess[str]: result = subprocess.run( @@ -1177,6 +1288,25 @@ def create_large_sparse_game(root: Path, *, size: int) -> None: handle.truncate(size) +def sha256_file(path: Path) -> str: + hasher = hashlib.sha256() + with path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +def unrar_entry_sha256(peer: Peer, game_id: str, relative_path: str) -> str: + command = ( + f"unrar p -inul /games/{shlex.quote(game_id)}/{shlex.quote(game_id)}.eti " + f"{shlex.quote(relative_path)} | sha256sum" + ) + output = peer.docker_exec("sh", "-c", command).stdout.strip() + if not output: + raise ScenarioError(f"empty sha256 output for {game_id}:{relative_path}") + return output.split()[0] + + def format_bytes(size: int) -> str: return f"{size / 1024 / 1024 / 1024:.2f} GiB" diff --git a/crates/lanspread-peer-cli/src/lib.rs b/crates/lanspread-peer-cli/src/lib.rs index 78e4315..b05eefa 100644 --- a/crates/lanspread-peer-cli/src/lib.rs +++ b/crates/lanspread-peer-cli/src/lib.rs @@ -5,15 +5,21 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, + process::Stdio, time::Duration, }; +use bytes::Bytes; use eyre::{Context, OptionExt}; -use lanspread_peer::{UnpackFuture, Unpacker}; +use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, Unpacker}; +use lanspread_proto::StreamInstallFrame; use serde::Serialize; use serde_json::{Value, json}; +use tokio::{io::AsyncReadExt, sync::mpsc}; +use tokio_util::sync::CancellationToken; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; +const STREAM_CHUNK_SIZE: usize = 256 * 1024; #[derive(Debug, Clone, PartialEq, Eq)] pub struct CommandEnvelope { @@ -33,6 +39,9 @@ pub enum CliCommand { game_id: String, install_after_download: bool, }, + StreamInstall { + game_id: String, + }, Install { game_id: String, }, @@ -63,6 +72,7 @@ impl CliCommand { Self::ListGames => "list-games", Self::SetGameDir { .. } => "set-game-dir", Self::Download { .. } => "download", + Self::StreamInstall { .. } => "stream-install", Self::Install { .. } => "install", Self::Uninstall { .. } => "uninstall", Self::Play { .. } => "play", @@ -101,6 +111,9 @@ pub fn parse_command_value(value: &Value) -> eyre::Result { game_id: game_id(object)?, install_after_download: install_after_download(object)?, }, + "stream-install" => CliCommand::StreamInstall { + game_id: game_id(object)?, + }, "install" => CliCommand::Install { game_id: game_id(object)?, }, @@ -254,6 +267,270 @@ impl Unpacker for ExternalUnrarUnpacker { } } +pub struct ExternalUnrarStreamProvider { + program: PathBuf, +} + +impl ExternalUnrarStreamProvider { + #[must_use] + pub fn new(program: PathBuf) -> Self { + Self { program } + } +} + +impl StreamInstallProvider for ExternalUnrarStreamProvider { + fn stream_archive<'a>( + &'a self, + archive: &'a Path, + frames: mpsc::Sender, + cancel_token: CancellationToken, + ) -> StreamInstallFuture<'a> { + Box::pin(async move { + let listing = unrar_listing(&self.program, archive).await?; + let archive_name = archive + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("archive.eti") + .to_string(); + + send_stream_frame( + &frames, + StreamInstallFrame::ArchiveBegin { + archive_name: archive_name.clone(), + solid: listing.solid, + }, + ) + .await?; + + for entry in listing.entries { + if cancel_token.is_cancelled() { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + + match entry.kind { + RarEntryKind::Directory => { + send_stream_frame( + &frames, + StreamInstallFrame::Directory { + relative_path: entry.relative_path, + }, + ) + .await?; + } + RarEntryKind::File => { + send_stream_frame( + &frames, + StreamInstallFrame::FileBegin { + relative_path: entry.relative_path.clone(), + size: entry.size, + crc32: entry.crc32, + }, + ) + .await?; + stream_unrar_file( + &self.program, + archive, + &entry.relative_path, + &frames, + cancel_token.clone(), + ) + .await?; + send_stream_frame( + &frames, + StreamInstallFrame::FileEnd { + relative_path: entry.relative_path, + }, + ) + .await?; + } + } + } + + send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RarListing { + solid: bool, + entries: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RarEntry { + relative_path: String, + kind: RarEntryKind, + size: u64, + crc32: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RarEntryKind { + File, + Directory, +} + +#[derive(Default)] +struct RarEntryDraft { + relative_path: Option, + kind: Option, + size: Option, + crc32: Option, +} + +async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result { + let output = tokio::process::Command::new(program) + .arg("lt") + .arg("-cfg-") + .arg(archive) + .output() + .await?; + if !output.status.success() { + eyre::bail!( + "unrar lt failed for {} with status {}: {}", + archive.display(), + output.status, + String::from_utf8_lossy(&output.stderr) + ); + } + + parse_unrar_listing(&String::from_utf8_lossy(&output.stdout)) +} + +fn parse_unrar_listing(output: &str) -> eyre::Result { + let mut solid = false; + let mut entries = Vec::new(); + let mut current = RarEntryDraft::default(); + + for line in output.lines() { + let trimmed = line.trim(); + if let Some(details) = trimmed.strip_prefix("Details:") { + solid = details.to_ascii_lowercase().contains("solid"); + continue; + } + + if let Some(name) = trimmed.strip_prefix("Name:") { + push_rar_entry(&mut entries, std::mem::take(&mut current))?; + current.relative_path = Some(name.trim().to_string()); + continue; + } + + if let Some(kind) = trimmed.strip_prefix("Type:") { + current.kind = match kind.trim() { + "File" => Some(RarEntryKind::File), + "Directory" => Some(RarEntryKind::Directory), + _ => None, + }; + continue; + } + + if let Some(size) = trimmed.strip_prefix("Size:") { + current.size = Some(size.trim().parse()?); + continue; + } + + if let Some(crc) = trimmed.strip_prefix("CRC32:") { + current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?); + } + } + + push_rar_entry(&mut entries, current)?; + Ok(RarListing { solid, entries }) +} + +fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Result<()> { + let Some(relative_path) = draft.relative_path else { + return Ok(()); + }; + + let Some(kind) = draft.kind else { + return Ok(()); + }; + + let size = match kind { + RarEntryKind::File => draft + .size + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?, + RarEntryKind::Directory => 0, + }; + + entries.push(RarEntry { + relative_path, + kind, + size, + crc32: draft.crc32, + }); + Ok(()) +} + +async fn stream_unrar_file( + program: &Path, + archive: &Path, + relative_path: &str, + frames: &mpsc::Sender, + cancel_token: CancellationToken, +) -> eyre::Result<()> { + let mut child = tokio::process::Command::new(program) + .arg("p") + .arg("-inul") + .arg("-cfg-") + .arg(archive) + .arg(relative_path) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .spawn()?; + + let mut stdout = child + .stdout + .take() + .ok_or_eyre("unrar stdout was not captured")?; + let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; + + loop { + let read = tokio::select! { + () = cancel_token.cancelled() => { + let _ = child.kill().await; + eyre::bail!("streaming {relative_path} from {} was cancelled", archive.display()); + } + read = stdout.read(&mut buffer) => read?, + }; + + if read == 0 { + break; + } + + send_stream_frame( + frames, + StreamInstallFrame::FileChunk { + bytes: Bytes::copy_from_slice(&buffer[..read]), + }, + ) + .await?; + } + + let status = child.wait().await?; + if !status.success() { + eyre::bail!( + "unrar p failed for {}:{} with status {status}", + archive.display(), + relative_path + ); + } + + Ok(()) +} + +async fn send_stream_frame( + frames: &mpsc::Sender, + frame: StreamInstallFrame, +) -> eyre::Result<()> { + frames + .send(frame) + .await + .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) +} + pub fn result_line(id: &Option, command: &str, data: Value) -> eyre::Result { output_line(json!({ "type": "result", @@ -344,6 +621,57 @@ mod tests { assert_eq!(parsed["data"]["peer_count"], 0); } + #[test] + fn parses_stream_install_command() { + let parsed = parse_command_line(r#"{"cmd":"stream-install","game_id":"cnctw"}"#) + .expect("command should parse"); + + assert_eq!( + parsed.command, + CliCommand::StreamInstall { + game_id: "cnctw".to_string(), + } + ); + } + + #[test] + fn parses_unrar_technical_listing() { + let listing = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/payload.bin + Type: File + Size: 123 + CRC32: 38B488A7 + + Name: bin + Type: Directory +"#, + ) + .expect("listing should parse"); + + assert!(!listing.solid); + assert_eq!( + listing.entries, + vec![ + RarEntry { + relative_path: "bin/payload.bin".to_string(), + kind: RarEntryKind::File, + size: 123, + crc32: Some(0x38B4_88A7), + }, + RarEntry { + relative_path: "bin".to_string(), + kind: RarEntryKind::Directory, + size: 0, + crc32: None, + }, + ] + ); + } + #[tokio::test] async fn fixture_unpacker_creates_install_payload() { let temp = TempDir::new("lanspread-peer-cli-fixture"); diff --git a/crates/lanspread-peer-cli/src/main.rs b/crates/lanspread-peer-cli/src/main.rs index f5155fa..995512d 100644 --- a/crates/lanspread-peer-cli/src/main.rs +++ b/crates/lanspread-peer-cli/src/main.rs @@ -17,6 +17,7 @@ use lanspread_peer::{ ActiveOperation, ActiveOperationKind, InstallOperation, + NoopStreamInstallProvider, PeerCommand, PeerEvent, PeerGameDB, @@ -24,6 +25,7 @@ use lanspread_peer::{ PeerRuntimeHandle, PeerSnapshot, PeerStartOptions, + StreamInstallProvider, migrate_legacy_state, start_peer_with_options, }; @@ -31,6 +33,7 @@ use lanspread_peer_cli::{ CliCommand, CommandEnvelope, DEFAULT_FIXTURE_VERSION, + ExternalUnrarStreamProvider, ExternalUnrarUnpacker, FixtureSeed, FixtureUnpacker, @@ -134,10 +137,15 @@ async fn main() -> eyre::Result<()> { let (tx_events, rx_events) = mpsc::unbounded_channel(); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let catalog = Arc::new(RwLock::new(catalog)); - let unpacker: Arc = match args.unrar { + let unrar_for_streaming = args.unrar.clone().or_else(default_unrar_program); + let unpacker: Arc = match args.unrar.clone() { Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)), None => Arc::new(FixtureUnpacker), }; + let stream_install_provider: Arc = match unrar_for_streaming { + Some(path) => Arc::new(ExternalUnrarStreamProvider::new(path)), + None => Arc::new(NoopStreamInstallProvider), + }; let mut handle = start_peer_with_options( args.games_dir.clone(), @@ -148,6 +156,7 @@ async fn main() -> eyre::Result<()> { PeerStartOptions { state_dir: Some(args.state_dir.clone()), active_outbound_transfers: None, + stream_install_provider: Some(stream_install_provider), }, )?; let sender = handle.sender(); @@ -249,6 +258,15 @@ async fn handle_command( })?; Ok(json!({"queued": true, "game_id": game_id, "install": install_after_download})) } + CliCommand::StreamInstall { game_id } => { + ensure_catalog_game(shared, game_id).await?; + ensure_no_active_operation(shared, game_id).await?; + let _ = game_files_for_download(sender, shared, game_id).await?; + sender.send(PeerCommand::StreamInstallGame { + id: game_id.clone(), + })?; + Ok(json!({"queued": true, "game_id": game_id})) + } CliCommand::Install { game_id } => { ensure_catalog_game(shared, game_id).await?; ensure_no_active_operation(shared, game_id).await?; @@ -729,6 +747,15 @@ fn default_catalog_db() -> Option { .find(|path| path.exists()) } +fn default_unrar_program() -> Option { + [ + PathBuf::from("/usr/local/bin/unrar"), + PathBuf::from("/usr/bin/unrar"), + ] + .into_iter() + .find(|path| path.exists()) +} + fn next_string(args: &mut impl Iterator, flag: &str) -> eyre::Result { args.next() .ok_or_else(|| eyre::eyre!("{flag} requires a value"))? diff --git a/crates/lanspread-peer/Cargo.toml b/crates/lanspread-peer/Cargo.toml index e580140..6437e69 100644 --- a/crates/lanspread-peer/Cargo.toml +++ b/crates/lanspread-peer/Cargo.toml @@ -14,6 +14,7 @@ lanspread-utils = { path = "../lanspread-utils" } # external bytes = { workspace = true } +crc32fast = { workspace = true } eyre = { workspace = true } futures = { workspace = true } gethostname = { workspace = true } diff --git a/crates/lanspread-peer/src/context.rs b/crates/lanspread-peer/src/context.rs index a4f3da3..db47b93 100644 --- a/crates/lanspread-peer/src/context.rs +++ b/crates/lanspread-peer/src/context.rs @@ -6,7 +6,14 @@ use lanspread_db::db::{GameCatalog, GameDB}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB}; +use crate::{ + PeerEvent, + StreamInstallProvider, + Unpacker, + events, + library::LocalLibraryState, + peer_db::PeerGameDB, +}; /// Thread-safe map of active outbound file transfers grouped by game ID. pub type OutboundTransfers = Arc>>>; @@ -38,6 +45,7 @@ pub struct Ctx { pub active_operations: Arc>>, pub active_downloads: Arc>>, pub unpacker: Arc, + pub stream_install_provider: Arc, pub catalog: Arc>, pub peer_id: Arc, pub shutdown: CancellationToken, @@ -57,6 +65,7 @@ pub struct PeerCtx { pub catalog: Arc>, pub peer_id: Arc, pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender, + pub stream_install_provider: Arc, pub shutdown: CancellationToken, pub task_tracker: TaskTracker, pub active_outbound_transfers: OutboundTransfers, @@ -86,6 +95,7 @@ impl Ctx { task_tracker: TaskTracker, catalog: Arc>, active_outbound_transfers: OutboundTransfers, + stream_install_provider: Arc, ) -> Self { Self { game_dir: Arc::new(RwLock::new(game_dir)), @@ -97,6 +107,7 @@ impl Ctx { active_operations: Arc::new(RwLock::new(HashMap::new())), active_downloads: Arc::new(RwLock::new(HashMap::new())), unpacker, + stream_install_provider, catalog, peer_id: Arc::new(peer_id), shutdown, @@ -120,6 +131,7 @@ impl Ctx { catalog: self.catalog.clone(), peer_id: self.peer_id.clone(), tx_notify_ui, + stream_install_provider: self.stream_install_provider.clone(), shutdown: self.shutdown.clone(), task_tracker: self.task_tracker.clone(), active_outbound_transfers: self.active_outbound_transfers.clone(), diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 9fe7f91..c5851c5 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -11,6 +11,7 @@ use std::{ use lanspread_db::db::{GameDB, GameFileDescription}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; +use tokio_util::sync::CancellationToken; use crate::{ InstallOperation, @@ -33,6 +34,7 @@ use crate::{ peer_db::PeerGameDB, remote_peer::ensure_peer_id_for_addr, services::{HandshakeCtx, perform_handshake_with_peer}, + stream_install::receive_streamed_install, }; // ============================================================================= @@ -450,6 +452,91 @@ pub async fn handle_install_game_command( spawn_install_operation(ctx, tx_notify_ui, id); } +pub async fn handle_stream_install_game_command( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, +) { + if !catalog_contains(ctx, &id).await { + log::warn!("Ignoring streamed install command for non-catalog game {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } + + let games_folder = { ctx.game_dir.read().await.clone() }; + let game_root = games_folder.join(&id); + if local_dir_is_directory(&game_root).await { + log::warn!("Ignoring streamed install command for already-installed game {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } + + let expected_version = catalog_expected_version(ctx, &id).await; + let mut peers = { + match ctx + .peer_game_db + .read() + .await + .validate_file_sizes_majority(&id, expected_version.as_deref()) + { + Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => { + peer_whitelist + } + Ok(_) => { + log::error!("No trusted peers available for streamed install of {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } + Err(err) => { + log::error!( + "File size majority validation failed for streamed install {id}: {err}" + ); + send_download_failed(tx_notify_ui, &id); + return; + } + } + }; + peers.sort(); + let Some(peer_addr) = peers.into_iter().next() else { + log::error!("No peer selected for streamed install of {id}"); + send_download_failed(tx_notify_ui, &id); + return; + }; + + match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { + BeginOperationResult::Started => {} + BeginOperationResult::AlreadyActive => { + log::warn!("Operation for {id} already in progress; ignoring streamed install request"); + return; + } + BeginOperationResult::DrainTimedOut => { + log::error!("Timed out waiting for outbound transfers before streamed install of {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } + } + + let cancel_token = ctx.shutdown.child_token(); + ctx.active_downloads + .write() + .await + .insert(id.clone(), cancel_token.clone()); + + let ctx_clone = ctx.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + ctx.task_tracker.spawn(async move { + run_stream_install_operation( + ctx_clone, + tx_notify_ui, + id, + game_root, + peer_addr, + cancel_token, + ) + .await; + }); +} + /// Handles the `UninstallGame` command. pub async fn handle_uninstall_game_command( ctx: &Ctx, @@ -490,6 +577,151 @@ pub async fn handle_cancel_download_command( cancel_token.cancel(); } +async fn run_stream_install_operation( + ctx: Ctx, + tx_notify_ui: UnboundedSender, + id: String, + game_root: PathBuf, + peer_addr: SocketAddr, + cancel_token: CancellationToken, +) { + let download_guard = OperationGuard::download( + id.clone(), + ctx.active_operations.clone(), + ctx.active_downloads.clone(), + tx_notify_ui.clone(), + ); + + events::send( + &tx_notify_ui, + PeerEvent::DownloadGameFilesBegin { id: id.clone() }, + ); + + let transaction = match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id) + .await + { + Ok(transaction) => transaction, + Err(err) => { + log::error!("Failed to prepare streamed install for {id}: {err}"); + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await; + return; + } + }; + + let receive_result = receive_streamed_install( + peer_addr, + &id, + transaction.staging_dir(), + tx_notify_ui.clone(), + cancel_token.clone(), + ) + .await; + + match receive_result { + Ok(()) => { + if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing) + .await + { + clear_active_download(&ctx, &id).await; + send_download_finished(&tx_notify_ui, &id); + download_guard.disarm(); + commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; + } else { + if let Err(err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {err}"); + } + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) + .await; + } + } + Err(err) => { + if let Err(rollback_err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); + } + let download_was_cancelled = cancel_token.is_cancelled(); + if download_was_cancelled { + log::info!("Streamed install download cancelled for {id}: {err}"); + } else { + log::error!("Streamed install download failed for {id}: {err}"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; + } + } +} + +async fn finish_failed_stream_download( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: &str, + guard: OperationGuard, + cancelled: bool, +) { + if let Err(err) = refresh_local_game_for_ending_operation(ctx, tx_notify_ui, id).await { + log::error!("Failed to refresh local library after streamed install failure: {err}"); + } + end_download_operation(ctx, tx_notify_ui, id).await; + guard.disarm(); + send_download_failed_unless_cancelled(tx_notify_ui, id, cancelled); +} + +async fn commit_streamed_install( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, + transaction: install::StreamedInstallTransaction, +) { + let operation_guard = OperationGuard::new( + id.clone(), + ctx.active_operations.clone(), + tx_notify_ui.clone(), + ); + events::send( + tx_notify_ui, + PeerEvent::InstallGameBegin { + id: id.clone(), + operation: InstallOperation::Installing, + }, + ); + + match transaction.commit().await { + Ok(()) => { + if let Err(err) = refresh_local_game_for_ending_operation(ctx, tx_notify_ui, &id).await + { + log::error!("Failed to refresh local library after streamed install: {err}"); + } + end_operation(ctx, tx_notify_ui, &id).await; + operation_guard.disarm(); + events::send( + tx_notify_ui, + PeerEvent::InstallGameFinished { id: id.clone() }, + ); + } + Err(err) => { + log::error!("Streamed install commit failed for {id}: {err}"); + if let Err(refresh_err) = + refresh_local_game_for_ending_operation(ctx, tx_notify_ui, &id).await + { + log::error!( + "Failed to refresh local library after streamed install commit failure: {refresh_err}" + ); + } + end_operation(ctx, tx_notify_ui, &id).await; + operation_guard.disarm(); + events::send( + tx_notify_ui, + PeerEvent::InstallGameFailed { id: id.clone() }, + ); + } + } +} + fn spawn_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); @@ -1264,6 +1496,7 @@ mod tests { TaskTracker::new(), Arc::new(RwLock::new(GameCatalog::from_ids(["game".to_string()]))), Arc::new(RwLock::new(HashMap::new())), + Arc::new(crate::NoopStreamInstallProvider), ) } diff --git a/crates/lanspread-peer/src/install/mod.rs b/crates/lanspread-peer/src/install/mod.rs index 4d518d7..e0c404b 100644 --- a/crates/lanspread-peer/src/install/mod.rs +++ b/crates/lanspread-peer/src/install/mod.rs @@ -4,5 +4,13 @@ mod transaction; pub mod unpack; pub use remove::remove_downloaded; -pub use transaction::{install, recover_on_startup, uninstall, update}; +pub(crate) use transaction::root_eti_archives; +pub use transaction::{ + StreamedInstallTransaction, + begin_streamed_install, + install, + recover_on_startup, + uninstall, + update, +}; pub use unpack::{UnpackFuture, Unpacker}; diff --git a/crates/lanspread-peer/src/install/transaction.rs b/crates/lanspread-peer/src/install/transaction.rs index a7ebc85..959dc1c 100644 --- a/crates/lanspread-peer/src/install/transaction.rs +++ b/crates/lanspread-peer/src/install/transaction.rs @@ -33,6 +33,103 @@ struct InstallFsState { backup: FsEntryState, } +pub struct StreamedInstallTransaction { + game_root: PathBuf, + state_dir: PathBuf, + id: String, + staging: PathBuf, + eti_version: Option, +} + +impl StreamedInstallTransaction { + #[must_use] + pub fn staging_dir(&self) -> &Path { + &self.staging + } + + pub async fn commit(self) -> eyre::Result<()> { + let local = local_dir(&self.game_root); + let result = async { + tokio::fs::rename(&self.staging, &local) + .await + .wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))?; + reset_launch_settings_marker(&self.state_dir, &self.id).await?; + write_intent( + &self.state_dir, + &self.id, + &InstallIntent::none(&self.id, self.eti_version.clone()), + ) + .await + } + .await; + + if result.is_err() { + if let Err(cleanup_err) = remove_dir_all_if_exists(&self.staging).await { + log::warn!( + "Failed to clean streamed install staging {}: {cleanup_err}", + self.staging.display() + ); + } + let _ = write_intent( + &self.state_dir, + &self.id, + &InstallIntent::none(&self.id, self.eti_version.clone()), + ) + .await; + } + + result + } + + pub async fn rollback(self) -> eyre::Result<()> { + let staging_result = remove_dir_all_if_exists(&self.staging).await; + let intent_result = write_intent( + &self.state_dir, + &self.id, + &InstallIntent::none(&self.id, self.eti_version.clone()), + ) + .await; + + staging_result?; + intent_result + } +} + +pub async fn begin_streamed_install( + game_root: &Path, + state_dir: &Path, + id: &str, +) -> eyre::Result { + if path_is_dir(&local_dir(game_root)).await { + eyre::bail!("game {id} is already installed"); + } + + tokio::fs::create_dir_all(game_root).await?; + let eti_version = read_downloaded_version(game_root).await; + write_intent( + state_dir, + id, + &InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()), + ) + .await?; + + let staging = installing_dir(game_root); + if let Err(err) = prepare_owned_empty_dir(&staging).await { + let _ = write_intent(state_dir, id, &InstallIntent::none(id, eti_version)).await; + return Err(err); + } + + let staging = tokio::fs::canonicalize(&staging).await.unwrap_or(staging); + + Ok(StreamedInstallTransaction { + game_root: game_root.to_path_buf(), + state_dir: state_dir.to_path_buf(), + id: id.to_string(), + staging, + eti_version, + }) +} + pub async fn install( game_root: &Path, state_dir: &Path, @@ -258,7 +355,7 @@ async fn unpack_archives( Ok(()) } -async fn root_eti_archives(game_root: &Path) -> eyre::Result> { +pub(crate) async fn root_eti_archives(game_root: &Path) -> eyre::Result> { let mut entries = tokio::fs::read_dir(game_root).await?; let mut archives = Vec::new(); while let Some(entry) = entries.next_entry().await? { diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 3d254de..540d0e2 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -32,6 +32,7 @@ mod remote_peer; mod services; mod startup; mod state_paths; +mod stream_install; #[cfg(test)] mod test_support; @@ -82,6 +83,7 @@ pub use crate::{ launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once}, startup::PeerRuntimeHandle, state_paths::{launch_settings_applied_path, setup_done_path}, + stream_install::{NoopStreamInstallProvider, StreamInstallFuture, StreamInstallProvider}, }; // ============================================================================= @@ -243,6 +245,8 @@ pub enum PeerCommand { file_descriptions: Vec, install_after_download: bool, }, + /// Stream archive-expanded bytes directly into `local/` without keeping root archives. + StreamInstallGame { id: String }, /// Install already-downloaded archives into `local/`. InstallGame { id: String }, /// Remove only the `local/` install for a game. @@ -260,11 +264,29 @@ pub enum PeerCommand { } /// Optional startup settings for non-GUI callers and tests. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Default)] pub struct PeerStartOptions { /// Directory used for peer identity and other state. pub state_dir: Option, pub active_outbound_transfers: Option, + /// Provider used to stream archive entries for low-disk streamed installs. + pub stream_install_provider: Option>, +} + +impl std::fmt::Debug for PeerStartOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PeerStartOptions") + .field("state_dir", &self.state_dir) + .field( + "active_outbound_transfers", + &self.active_outbound_transfers.as_ref().map(|_| "..."), + ) + .field( + "stream_install_provider", + &self.stream_install_provider.as_ref().map(|_| "..."), + ) + .finish() + } } // ============================================================================= @@ -314,11 +336,14 @@ pub fn start_peer_with_options( let PeerStartOptions { state_dir, active_outbound_transfers, + stream_install_provider, } = options; let state_dir = resolve_state_dir(state_dir.as_deref()); let game_dir = game_dir.into(); let active_outbound_transfers = active_outbound_transfers .unwrap_or_else(|| Arc::new(RwLock::new(std::collections::HashMap::new()))); + let stream_install_provider = + stream_install_provider.unwrap_or_else(|| Arc::new(NoopStreamInstallProvider)); log::info!( "Starting peer system with game directory: {}", game_dir.display() @@ -338,6 +363,7 @@ pub fn start_peer_with_options( unpacker, catalog, active_outbound_transfers, + stream_install_provider, )) } @@ -355,6 +381,7 @@ async fn run_peer( task_tracker: TaskTracker, catalog: Arc>, active_outbound_transfers: crate::context::OutboundTransfers, + stream_install_provider: Arc, ) -> eyre::Result<()> { let ctx = Ctx::new( peer_game_db, @@ -366,6 +393,7 @@ async fn run_peer( task_tracker, catalog, active_outbound_transfers, + stream_install_provider, ); if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await { log::error!("Failed to load initial local game database: {err}"); @@ -439,6 +467,9 @@ async fn handle_peer_commands( ) .await; } + PeerCommand::StreamInstallGame { id } => { + handlers::handle_stream_install_game_command(ctx, tx_notify_ui, id).await; + } PeerCommand::InstallGame { id } => { handle_install_game_command(ctx, tx_notify_ui, id).await; } diff --git a/crates/lanspread-peer/src/services/handshake.rs b/crates/lanspread-peer/src/services/handshake.rs index f147582..7ba2aa8 100644 --- a/crates/lanspread-peer/src/services/handshake.rs +++ b/crates/lanspread-peer/src/services/handshake.rs @@ -319,6 +319,7 @@ mod tests { TaskTracker::new(), Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(HashMap::new())), + Arc::new(crate::NoopStreamInstallProvider), ); *ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000)); diff --git a/crates/lanspread-peer/src/services/local_monitor.rs b/crates/lanspread-peer/src/services/local_monitor.rs index 2885539..f0ce2e9 100644 --- a/crates/lanspread-peer/src/services/local_monitor.rs +++ b/crates/lanspread-peer/src/services/local_monitor.rs @@ -384,6 +384,7 @@ mod tests { TaskTracker::new(), Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(std::collections::HashMap::new())), + Arc::new(crate::NoopStreamInstallProvider), ) } diff --git a/crates/lanspread-peer/src/services/stream.rs b/crates/lanspread-peer/src/services/stream.rs index 27ae69e..7dee903 100644 --- a/crates/lanspread-peer/src/services/stream.rs +++ b/crates/lanspread-peer/src/services/stream.rs @@ -15,6 +15,7 @@ use crate::{ local_games::{get_game_file_descriptions, is_local_dir_name, local_download_matches_catalog}, peer::{send_game_file_chunk, send_game_file_data}, services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync}, + stream_install::{send_game_install_stream, send_stream_install_error}, }; type ResponseWriter = FramedWrite; @@ -99,6 +100,9 @@ async fn dispatch_request( } => { handle_file_chunk_request(ctx, game_id, relative_path, offset, length, framed_tx).await } + Request::StreamInstall { game_id } => { + handle_stream_install_request(ctx, game_id, framed_tx).await + } Request::Goodbye { peer_id } => { handle_goodbye(ctx, remote_addr, peer_id).await; framed_tx @@ -386,6 +390,49 @@ async fn handle_file_chunk_request( FramedWrite::new(tx, LengthDelimitedCodec::new()) } +async fn handle_stream_install_request( + ctx: &PeerCtx, + game_id: String, + framed_tx: ResponseWriter, +) -> ResponseWriter { + log::info!("Received StreamInstall request for {game_id} from peer"); + + let (guard, cancel_token) = TransferGuard::new( + game_id.clone(), + ctx.active_outbound_transfers.clone(), + ctx.tx_notify_ui.clone(), + &ctx.shutdown, + ) + .await; + + let mut tx = framed_tx.into_inner(); + let game_dir = ctx.game_dir.read().await.clone(); + if !can_serve_game(ctx, &game_dir, &game_id).await { + log::info!( + "Declining StreamInstall for {game_id} because the game is not currently transferable" + ); + tx = send_stream_install_error(tx, format!("game {game_id} is not transferable")).await; + drop(guard); + return FramedWrite::new(tx, LengthDelimitedCodec::new()); + } + + let game_root = game_dir.join(&game_id); + let (returned_tx, result) = send_game_install_stream( + ctx.stream_install_provider.clone(), + tx, + &game_root, + &game_id, + cancel_token, + ) + .await; + if let Err(err) = result { + log::warn!("StreamInstall for {game_id} ended with error: {err}"); + } + + drop(guard); + FramedWrite::new(returned_tx, LengthDelimitedCodec::new()) +} + async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option, peer_id: String) { log::info!("Received Goodbye from peer {peer_id}"); let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) }; @@ -442,6 +489,7 @@ mod tests { TaskTracker::new(), Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(std::collections::HashMap::new())), + Arc::new(crate::NoopStreamInstallProvider), ) .to_peer_ctx(tx_notify_ui) } diff --git a/crates/lanspread-peer/src/startup.rs b/crates/lanspread-peer/src/startup.rs index 2ab4571..96e4479 100644 --- a/crates/lanspread-peer/src/startup.rs +++ b/crates/lanspread-peer/src/startup.rs @@ -23,6 +23,7 @@ use crate::{ PeerCommand, PeerEvent, PeerRuntimeComponent, + StreamInstallProvider, Unpacker, context::Ctx, events, @@ -87,6 +88,7 @@ pub(crate) fn spawn_peer_runtime( unpacker: Arc, catalog: Arc>, active_outbound_transfers: crate::context::OutboundTransfers, + stream_install_provider: Arc, ) -> PeerRuntimeHandle { let shutdown = CancellationToken::new(); let task_tracker = TaskTracker::new(); @@ -107,6 +109,7 @@ pub(crate) fn spawn_peer_runtime( runtime_tracker.clone(), catalog, active_outbound_transfers, + stream_install_provider, ) .await { diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs new file mode 100644 index 0000000..3d2e168 --- /dev/null +++ b/crates/lanspread-peer/src/stream_install.rs @@ -0,0 +1,372 @@ +use std::{ + future::Future, + net::SocketAddr, + path::{Path, PathBuf}, + pin::Pin, + sync::Arc, +}; + +use bytes::Bytes; +use crc32fast::Hasher; +use futures::{SinkExt, StreamExt}; +use lanspread_proto::{Message, Request, StreamInstallFrame}; +use s2n_quic::stream::SendStream; +use tokio::{ + fs::File, + io::AsyncWriteExt, + sync::{mpsc, mpsc::UnboundedSender}, +}; +use tokio_util::{ + codec::{FramedRead, FramedWrite, LengthDelimitedCodec}, + sync::CancellationToken, +}; + +use crate::{ + PeerEvent, + install::root_eti_archives, + network::connect_to_peer, + path_validation::validate_game_file_path, +}; + +const FRAME_CHANNEL_DEPTH: usize = 16; + +pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; + +pub trait StreamInstallProvider: Send + Sync { + fn stream_archive<'a>( + &'a self, + archive: &'a Path, + frames: mpsc::Sender, + cancel_token: CancellationToken, + ) -> StreamInstallFuture<'a>; +} + +#[derive(Debug, Default)] +pub struct NoopStreamInstallProvider; + +impl StreamInstallProvider for NoopStreamInstallProvider { + fn stream_archive<'a>( + &'a self, + archive: &'a Path, + _frames: mpsc::Sender, + _cancel_token: CancellationToken, + ) -> StreamInstallFuture<'a> { + Box::pin(async move { + eyre::bail!( + "streamed install provider is not configured for {}", + archive.display() + ) + }) + } +} + +pub(crate) async fn send_stream_install_error( + tx: SendStream, + message: impl Into, +) -> SendStream { + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + if let Err(err) = framed_tx + .send( + StreamInstallFrame::Error { + message: message.into(), + } + .encode(), + ) + .await + { + log::warn!("Failed to send streamed install error frame: {err}"); + } + if let Err(err) = framed_tx.close().await { + log::debug!("Failed to close streamed install error response: {err}"); + } + framed_tx.into_inner() +} + +pub(crate) async fn send_game_install_stream( + provider: Arc, + tx: SendStream, + game_root: &Path, + game_id: &str, + cancel_token: CancellationToken, +) -> (SendStream, eyre::Result<()>) { + let archives = match root_eti_archives(game_root).await { + Ok(archives) => archives, + Err(err) => { + let message = err.to_string(); + let tx = send_stream_install_error(tx, message.clone()).await; + return (tx, Err(eyre::eyre!(message))); + } + }; + if archives.is_empty() { + let message = format!("no .eti archives found for {game_id}"); + let tx = send_stream_install_error(tx, message.clone()).await; + return (tx, Err(eyre::eyre!(message))); + } + + let (frame_tx, mut frame_rx) = mpsc::channel(FRAME_CHANNEL_DEPTH); + let producer_cancel = cancel_token.child_token(); + let game_id_for_producer = game_id.to_string(); + let producer = tokio::spawn({ + let provider = provider.clone(); + let producer_cancel = producer_cancel.clone(); + async move { + for archive in archives { + if producer_cancel.is_cancelled() { + eyre::bail!("streamed install for {game_id_for_producer} was cancelled"); + } + + if let Err(err) = provider + .stream_archive(&archive, frame_tx.clone(), producer_cancel.clone()) + .await + { + let message = err.to_string(); + let _ = frame_tx.send(StreamInstallFrame::Error { message }).await; + return Err(err); + } + } + + let _ = frame_tx.send(StreamInstallFrame::Complete).await; + Ok(()) + } + }); + + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + let mut send_result = Ok(()); + + while let Some(frame) = frame_rx.recv().await { + if let Err(err) = framed_tx.send(frame.encode()).await { + producer_cancel.cancel(); + send_result = Err(eyre::eyre!("failed to send streamed install frame: {err}")); + break; + } + } + + let close_result = framed_tx + .close() + .await + .map_err(|err| eyre::eyre!("failed to close streamed install stream: {err}")); + let tx = framed_tx.into_inner(); + let producer_result = match producer.await { + Ok(result) => result, + Err(err) => Err(eyre::eyre!("streamed install producer task failed: {err}")), + }; + let result = send_result.and(producer_result).and(close_result); + + (tx, result) +} + +pub(crate) async fn receive_streamed_install( + peer_addr: SocketAddr, + game_id: &str, + staging_dir: &Path, + tx_notify_ui: UnboundedSender, + cancel_token: CancellationToken, +) -> eyre::Result<()> { + let staging_dir = tokio::fs::canonicalize(staging_dir) + .await + .unwrap_or_else(|_| staging_dir.to_path_buf()); + let mut conn = connect_to_peer(peer_addr).await?; + let stream = conn.open_bidirectional_stream().await?; + let (rx, tx) = stream.split(); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + + framed_tx + .send( + Request::StreamInstall { + game_id: game_id.to_string(), + } + .encode(), + ) + .await?; + framed_tx.close().await?; + + let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut current_file: Option = None; + + loop { + let next = tokio::select! { + () = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"), + next = framed_rx.next() => next, + }; + + let Some(frame) = next else { + eyre::bail!("streamed install ended before Complete"); + }; + let frame = frame?.freeze(); + let frame = StreamInstallFrame::decode(frame); + + match frame { + StreamInstallFrame::ArchiveBegin { + archive_name, + solid, + } => { + log::info!( + "Receiving streamed install archive {archive_name} for {game_id} (solid={solid})" + ); + } + StreamInstallFrame::Directory { relative_path } => { + let path = resolve_stream_path(&staging_dir, &relative_path)?; + tokio::fs::create_dir_all(path).await?; + } + StreamInstallFrame::FileBegin { + relative_path, + size, + crc32, + } => { + if current_file.is_some() { + eyre::bail!("received FileBegin for {relative_path} before previous FileEnd"); + } + let path = resolve_stream_path(&staging_dir, &relative_path)?; + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = File::create(&path).await?; + current_file = Some(IncomingFile::new(relative_path, path, size, crc32, file)); + } + StreamInstallFrame::FileChunk { bytes } => { + let Some(file) = current_file.as_mut() else { + eyre::bail!("received FileChunk without FileBegin"); + }; + file.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) + .await?; + } + StreamInstallFrame::FileEnd { relative_path } => { + let Some(file) = current_file.take() else { + eyre::bail!("received FileEnd for {relative_path} without FileBegin"); + }; + file.finish(&relative_path).await?; + } + StreamInstallFrame::ArchiveEnd { archive_name } => { + log::info!("Finished streamed install archive {archive_name} for {game_id}"); + } + StreamInstallFrame::Complete => { + if current_file.is_some() { + eyre::bail!("streamed install completed with an open file"); + } + return Ok(()); + } + StreamInstallFrame::Error { message } => { + eyre::bail!("streamed install sender failed: {message}"); + } + } + } +} + +struct IncomingFile { + relative_path: String, + path: PathBuf, + expected_size: u64, + expected_crc32: Option, + received: u64, + hasher: Hasher, + file: File, +} + +impl IncomingFile { + fn new( + relative_path: String, + path: PathBuf, + expected_size: u64, + expected_crc32: Option, + file: File, + ) -> Self { + Self { + relative_path, + path, + expected_size, + expected_crc32, + received: 0, + hasher: Hasher::new(), + file, + } + } + + async fn write_chunk( + &mut self, + game_id: &str, + peer_addr: SocketAddr, + tx_notify_ui: &UnboundedSender, + bytes: Bytes, + ) -> eyre::Result<()> { + let offset = self.received; + let length = u64::try_from(bytes.len())?; + if offset.saturating_add(length) > self.expected_size { + eyre::bail!( + "streamed file {} exceeded expected size {}", + self.relative_path, + self.expected_size + ); + } + self.file.write_all(&bytes).await?; + self.hasher.update(&bytes); + self.received = self.received.saturating_add(length); + + let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished { + id: game_id.to_string(), + peer_addr, + relative_path: format!("{game_id}/local/{}", self.relative_path), + offset, + length, + }); + Ok(()) + } + + async fn finish(mut self, relative_path: &str) -> eyre::Result<()> { + if self.relative_path != relative_path { + eyre::bail!( + "streamed file end mismatch: began {}, ended {relative_path}", + self.relative_path + ); + } + self.file.flush().await?; + + if self.received != self.expected_size { + eyre::bail!( + "streamed file {} size mismatch: got {}, expected {}", + self.relative_path, + self.received, + self.expected_size + ); + } + + if let Some(expected) = self.expected_crc32 { + let actual = self.hasher.finalize(); + if actual != expected { + eyre::bail!( + "streamed file {} CRC32 mismatch: got {actual:08X}, expected {expected:08X}", + self.relative_path + ); + } + } + + log::debug!( + "Received streamed file {} -> {}", + self.relative_path, + self.path.display() + ); + Ok(()) + } +} + +fn resolve_stream_path(staging_dir: &Path, relative_path: &str) -> eyre::Result { + validate_game_file_path(staging_dir, relative_path) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_support::TempDir; + + #[test] + fn stream_paths_stay_inside_staging_dir() { + let temp = TempDir::new("lanspread-stream-install-path"); + let staging = temp.path().join("staging"); + std::fs::create_dir_all(&staging).expect("staging should be created"); + let staging = std::fs::canonicalize(staging).expect("staging should canonicalize"); + + assert!(resolve_stream_path(&staging, "bin/game.exe").is_ok()); + assert!(resolve_stream_path(&staging, "../outside").is_err()); + assert!(resolve_stream_path(&staging, "/absolute").is_err()); + assert!(resolve_stream_path(&staging, "C:/windows").is_err()); + } +} diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index ea8f2f7..27609c5 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use lanspread_db::db::{Game, GameFileDescription}; use serde::{Deserialize, Serialize}; -pub const PROTOCOL_VERSION: u32 = 4; +pub const PROTOCOL_VERSION: u32 = 5; pub use lanspread_db::db::Availability; @@ -67,6 +67,9 @@ pub enum Request { offset: u64, length: u64, }, + StreamInstall { + game_id: String, + }, Hello(Hello), LibraryDelta { peer_id: String, @@ -94,6 +97,35 @@ pub enum Response { InternalPeerError(String), } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum StreamInstallFrame { + ArchiveBegin { + archive_name: String, + solid: bool, + }, + Directory { + relative_path: String, + }, + FileBegin { + relative_path: String, + size: u64, + crc32: Option, + }, + FileChunk { + bytes: Bytes, + }, + FileEnd { + relative_path: String, + }, + ArchiveEnd { + archive_name: String, + }, + Complete, + Error { + message: String, + }, +} + // Add Message trait pub trait Message { fn decode(bytes: Bytes) -> Self; @@ -145,3 +177,29 @@ impl Message for Response { } } } + +impl Message for StreamInstallFrame { + fn decode(bytes: Bytes) -> Self { + match serde_json::from_slice(&bytes) { + Ok(t) => t, + Err(e) => { + tracing::error!(?e, "StreamInstallFrame decoding error"); + StreamInstallFrame::Error { + message: format!("stream install frame decoding error: {e}"), + } + } + } + } + + fn encode(&self) -> Bytes { + match serde_json::to_vec(self) { + Ok(s) => Bytes::from(s), + Err(e) => { + tracing::error!(?e, "StreamInstallFrame encoding error"); + Bytes::from(format!( + r#"{{"Error": {{"message": "encoding error: {e}"}}}}"# + )) + } + } + } +} diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index 2a9fa11..12c3a51 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -1876,6 +1876,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { PeerStartOptions { state_dir: Some(state_dir), active_outbound_transfers: Some(state.active_outbound_transfers.clone()), + stream_install_provider: None, }, ) { Ok(handle) => { -- 2.54.0 From cc147def7380226f68a4126fc8610189c4be6d95 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sun, 7 Jun 2026 20:40:33 +0200 Subject: [PATCH 02/15] Claude's Review notes --- NEXT_STEPS_CLAUDES_REVIEW.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 NEXT_STEPS_CLAUDES_REVIEW.md diff --git a/NEXT_STEPS_CLAUDES_REVIEW.md b/NEXT_STEPS_CLAUDES_REVIEW.md new file mode 100644 index 0000000..dd1ec7c --- /dev/null +++ b/NEXT_STEPS_CLAUDES_REVIEW.md @@ -0,0 +1,20 @@ +# Issues + +1. (Major) File bytes are JSON-encoded — ~3–4× wire bloat + heavy CPU. StreamInstallFrame::FileChunk { bytes: Bytes } goes over the wire via serde_json (the Message impl). serde_json has no native byte type, so Bytes serializes as a JSON array of integers ([12,255,7,…]) — roughly 3–4 ASCII chars per payload byte, plus serialize/parse cost on both ends. The existing transfer path (peer.rs::stream_file_bytes) sends raw bytes straight on the QUIC stream for exactly this reason. For a feature whose entire purpose is moving file bytes efficiently on constrained peers, routing the payload through JSON undermines the goal. The 3 MiB S39 fixture hides it; a multi-GB game would not. Consider a binary codec (postcard/bincode) for the frame stream, or keep the control frames as-is but stream raw bytes for file content. + +2. (Design) Solid archives → O(n²) extraction. Each file is pulled with a separate unrar p archive (stream_unrar_file). For a solid .eti, unrar must re-decompress every preceding file on each invocation. solid is detected and sent in ArchiveBegin but never acted on. A solid multi-file archive could make streamed install pathologically slow. At minimum document the limitation; ideally a single-pass extraction for the solid case. + +3. (Robustness) Single peer, no fallback. handle_stream_install_game_command picks one peer (peers.sort(); peers.into_iter().next()). If that peer declines (can_serve_game false) or the connection drops, the whole install fails — no retry against the other validated peers, unlike the normal multi-peer download. Fine for a prototype, worth a TODO. + +4. (Minor) unrar p is a wildcard mask. unrar treats the file argument as a pattern; a name containing */?/[ could match the wrong file or multiple files (multiple matches concatenate into stdout → corrupt stream). CRC32 would catch it as a failure rather than silent corruption, but it's a sharp edge. Worth noting / pinning exact-match if the unrar build supports it. + +5. (Minor) CRC32 is optional. If unrar omits CRC32: (older builds, header-encrypted archives), crc32 is None and the receiver verifies size only. + +6. (Minor / cosmetic) + - DownloadGameFileChunkFinished reports relative_path as "{game_id}/local/…" while bytes actually land in staging (.local.installing); and there's no total-size up front, so the UI can't show a percentage. + - parse_unrar_listing depends on unrar's human-readable lt output (Details:/Name:/Type:/Size:/CRC32:). -cfg- guards against user config, but it's still brittle across unrar versions/locales. + +## Nits + +- PeerStartOptions dropped #[derive(Debug)] for a hand-written Debug (Arc isn't Debug) — correct, and Clone/Default are retained. Good. +- commit() renames staging into local/, so the .lanspread_owned marker lands inside local/ — but that's identical to the existing install_inner behavior, so no regression. -- 2.54.0 From 5dd356eca8cb56bc8a1a8b9ae93d681a4bc30ba5 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sun, 7 Jun 2026 21:12:15 +0200 Subject: [PATCH 03/15] fix(stream-install)!: stream archive payloads as raw frames Streamed installs were sending FileChunk payloads through the shared JSON Message impl. serde_json serializes bytes as arrays of integers, which bloats wire traffic and burns CPU on large archives. Replace StreamInstallFrame encoding with tagged frames: JSON control frames keep their shape under tag 0, while file chunks carry raw bytes under tag 1. The stream install metadata now carries unpacked archive size and mandatory CRC32. The CLI unrar provider validates CRCs up front, runs one archive-wide unrar p stream, splits stdout by listed file sizes, and refuses trailing or missing bytes. That avoids solid archive re-decompression and sidesteps unrar wildcard masks for path arguments. Receivers now sample existing download progress events for streamed installs, report staging-relative chunk paths, and retry trusted peers with a fresh streamed-install transaction after a failed attempt. The current protocol policy does not preserve compatibility with older stream-install builds. Test Plan: - just fmt - just test - just clippy - git diff --check - git diff --cached --check BREAKING CHANGE: StreamInstallFrame now uses tagged frames with raw chunk payloads and requires current peers on both sides of streamed installs. Refs: NEXT_STEPS_CLAUDES_REVIEW.md --- crates/lanspread-peer-cli/src/lib.rs | 260 ++++++++++++------ crates/lanspread-peer/src/handlers.rs | 135 +++++---- crates/lanspread-peer/src/stream_install.rs | 105 ++++++- crates/lanspread-proto/src/lib.rs | 73 +++-- .../tests/stream_install_frame.rs | 42 +++ 5 files changed, 450 insertions(+), 165 deletions(-) create mode 100644 crates/lanspread-proto/tests/stream_install_frame.rs diff --git a/crates/lanspread-peer-cli/src/lib.rs b/crates/lanspread-peer-cli/src/lib.rs index b05eefa..38fab43 100644 --- a/crates/lanspread-peer-cli/src/lib.rs +++ b/crates/lanspread-peer-cli/src/lib.rs @@ -15,7 +15,10 @@ use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, U use lanspread_proto::StreamInstallFrame; use serde::Serialize; use serde_json::{Value, json}; -use tokio::{io::AsyncReadExt, sync::mpsc}; +use tokio::{ + io::{AsyncRead, AsyncReadExt}, + sync::mpsc, +}; use tokio_util::sync::CancellationToken; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; @@ -298,53 +301,19 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { StreamInstallFrame::ArchiveBegin { archive_name: archive_name.clone(), solid: listing.solid, + unpacked_size: listing.unpacked_size(), }, ) .await?; - for entry in listing.entries { - if cancel_token.is_cancelled() { - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - - match entry.kind { - RarEntryKind::Directory => { - send_stream_frame( - &frames, - StreamInstallFrame::Directory { - relative_path: entry.relative_path, - }, - ) - .await?; - } - RarEntryKind::File => { - send_stream_frame( - &frames, - StreamInstallFrame::FileBegin { - relative_path: entry.relative_path.clone(), - size: entry.size, - crc32: entry.crc32, - }, - ) - .await?; - stream_unrar_file( - &self.program, - archive, - &entry.relative_path, - &frames, - cancel_token.clone(), - ) - .await?; - send_stream_frame( - &frames, - StreamInstallFrame::FileEnd { - relative_path: entry.relative_path, - }, - ) - .await?; - } - } - } + stream_unrar_entries( + &self.program, + archive, + &listing.entries, + &frames, + cancel_token.clone(), + ) + .await?; send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await }) @@ -357,6 +326,16 @@ struct RarListing { entries: Vec, } +impl RarListing { + fn unpacked_size(&self) -> u64 { + self.entries + .iter() + .filter(|entry| entry.kind == RarEntryKind::File) + .map(|entry| entry.size) + .sum() + } +} + #[derive(Debug, Clone, PartialEq, Eq)] struct RarEntry { relative_path: String, @@ -448,26 +427,32 @@ fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Re return Ok(()); }; - let size = match kind { - RarEntryKind::File => draft - .size - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?, - RarEntryKind::Directory => 0, + let (size, crc32) = match kind { + RarEntryKind::File => { + let size = draft + .size + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; + let crc32 = draft + .crc32 + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; + (size, Some(crc32)) + } + RarEntryKind::Directory => (0, None), }; entries.push(RarEntry { relative_path, kind, size, - crc32: draft.crc32, + crc32, }); Ok(()) } -async fn stream_unrar_file( +async fn stream_unrar_entries( program: &Path, archive: &Path, - relative_path: &str, + entries: &[RarEntry], frames: &mpsc::Sender, cancel_token: CancellationToken, ) -> eyre::Result<()> { @@ -476,28 +461,112 @@ async fn stream_unrar_file( .arg("-inul") .arg("-cfg-") .arg(archive) - .arg(relative_path) .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn()?; - let mut stdout = child - .stdout - .take() - .ok_or_eyre("unrar stdout was not captured")?; - let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; + let result = async { + let mut stdout = child + .stdout + .take() + .ok_or_eyre("unrar stdout was not captured")?; + let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; - loop { - let read = tokio::select! { - () = cancel_token.cancelled() => { - let _ = child.kill().await; - eyre::bail!("streaming {relative_path} from {} was cancelled", archive.display()); + for entry in entries { + if cancel_token.is_cancelled() { + eyre::bail!("streamed archive {} was cancelled", archive.display()); } - read = stdout.read(&mut buffer) => read?, - }; + match entry.kind { + RarEntryKind::Directory => { + send_stream_frame( + frames, + StreamInstallFrame::Directory { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + RarEntryKind::File => { + let Some(crc32) = entry.crc32 else { + eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); + }; + send_stream_frame( + frames, + StreamInstallFrame::FileBegin { + relative_path: entry.relative_path.clone(), + size: entry.size, + crc32, + }, + ) + .await?; + stream_unrar_file_from_stdout( + &mut stdout, + archive, + entry, + frames, + &mut buffer, + &cancel_token, + ) + .await?; + send_stream_frame( + frames, + StreamInstallFrame::FileEnd { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + } + } + + let extra = + read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?; + if extra != 0 { + eyre::bail!( + "unrar produced bytes after listed entries for {}", + archive.display() + ); + } + + let status = wait_unrar_child(&mut child, &cancel_token, archive).await?; + if !status.success() { + eyre::bail!( + "unrar p failed for {} with status {status}", + archive.display() + ); + } + + Ok(()) + } + .await; + + if result.is_err() { + let _ = child.kill().await; + } + + result +} + +async fn stream_unrar_file_from_stdout( + stdout: &mut (impl AsyncRead + Unpin), + archive: &Path, + entry: &RarEntry, + frames: &mpsc::Sender, + buffer: &mut [u8], + cancel_token: &CancellationToken, +) -> eyre::Result<()> { + let mut remaining = entry.size; + while remaining > 0 { + let read_len = usize::try_from(remaining.min(buffer.len() as u64))?; + let read = + read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?; if read == 0 { - break; + eyre::bail!( + "unrar ended while streaming {} from {}; {remaining} bytes missing", + entry.relative_path, + archive.display() + ); } send_stream_frame( @@ -507,20 +576,40 @@ async fn stream_unrar_file( }, ) .await?; - } - - let status = child.wait().await?; - if !status.success() { - eyre::bail!( - "unrar p failed for {}:{} with status {status}", - archive.display(), - relative_path - ); + remaining = remaining.saturating_sub(u64::try_from(read)?); } Ok(()) } +async fn read_unrar_stdout( + stdout: &mut (impl AsyncRead + Unpin), + buffer: &mut [u8], + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + read = stdout.read(buffer) => Ok(read?), + } +} + +async fn wait_unrar_child( + child: &mut tokio::process::Child, + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + let _ = child.kill().await; + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + status = child.wait() => Ok(status?), + } +} + async fn send_stream_frame( frames: &mpsc::Sender, frame: StreamInstallFrame, @@ -639,7 +728,7 @@ mod tests { let listing = parse_unrar_listing( r#" Archive: game.eti -Details: RAR 5 +Details: RAR 5, solid Name: bin/payload.bin Type: File @@ -652,7 +741,7 @@ Details: RAR 5 ) .expect("listing should parse"); - assert!(!listing.solid); + assert!(listing.solid); assert_eq!( listing.entries, vec![ @@ -672,6 +761,23 @@ Details: RAR 5 ); } + #[test] + fn rejects_unrar_file_entries_without_crc32() { + let err = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/payload.bin + Type: File + Size: 123 +"#, + ) + .expect_err("file entries without CRC32 should be rejected"); + + assert!(err.to_string().contains("has no CRC32")); + } + #[tokio::test] async fn fixture_unpacker_creates_install_payload() { let temp = TempDir::new("lanspread-peer-cli-fixture"); diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index c5851c5..8985f74 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -497,11 +497,11 @@ pub async fn handle_stream_install_game_command( } }; peers.sort(); - let Some(peer_addr) = peers.into_iter().next() else { + if peers.is_empty() { log::error!("No peer selected for streamed install of {id}"); send_download_failed(tx_notify_ui, &id); return; - }; + } match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { BeginOperationResult::Started => {} @@ -525,15 +525,8 @@ pub async fn handle_stream_install_game_command( let ctx_clone = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.spawn(async move { - run_stream_install_operation( - ctx_clone, - tx_notify_ui, - id, - game_root, - peer_addr, - cancel_token, - ) - .await; + run_stream_install_operation(ctx_clone, tx_notify_ui, id, game_root, peers, cancel_token) + .await; }); } @@ -582,7 +575,7 @@ async fn run_stream_install_operation( tx_notify_ui: UnboundedSender, id: String, game_root: PathBuf, - peer_addr: SocketAddr, + peer_addrs: Vec, cancel_token: CancellationToken, ) { let download_guard = OperationGuard::download( @@ -597,63 +590,93 @@ async fn run_stream_install_operation( PeerEvent::DownloadGameFilesBegin { id: id.clone() }, ); - let transaction = match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id) - .await - { - Ok(transaction) => transaction, - Err(err) => { - log::error!("Failed to prepare streamed install for {id}: {err}"); - finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await; - return; + let mut last_receive_error = None; + for peer_addr in peer_addrs { + if cancel_token.is_cancelled() { + last_receive_error = Some(eyre::eyre!("streamed install for {id} was cancelled")); + break; } - }; - let receive_result = receive_streamed_install( - peer_addr, - &id, - transaction.staging_dir(), - tx_notify_ui.clone(), - cancel_token.clone(), - ) - .await; + let transaction = + match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id).await { + Ok(transaction) => transaction, + Err(err) => { + log::error!("Failed to prepare streamed install for {id}: {err}"); + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) + .await; + return; + } + }; - match receive_result { - Ok(()) => { - if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing) + let receive_result = receive_streamed_install( + peer_addr, + &id, + transaction.staging_dir(), + tx_notify_ui.clone(), + cancel_token.clone(), + ) + .await; + + match receive_result { + Ok(()) => { + if transition_download_to_install( + &ctx, + &tx_notify_ui, + &id, + OperationKind::Installing, + ) .await - { - clear_active_download(&ctx, &id).await; - send_download_finished(&tx_notify_ui, &id); - download_guard.disarm(); - commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; - } else { + { + clear_active_download(&ctx, &id).await; + send_download_finished(&tx_notify_ui, &id); + download_guard.disarm(); + commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; + return; + } + if let Err(err) = transaction.rollback().await { log::error!("Failed to roll back streamed install for {id}: {err}"); } finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) .await; + return; } - } - Err(err) => { - if let Err(rollback_err) = transaction.rollback().await { - log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); + Err(err) => { + if let Err(rollback_err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); + } + if cancel_token.is_cancelled() { + log::info!("Streamed install download cancelled for {id}: {err}"); + last_receive_error = Some(err); + break; + } + + log::warn!( + "Streamed install attempt from {peer_addr} failed for {id}; trying another peer if available: {err}" + ); + last_receive_error = Some(err); } - let download_was_cancelled = cancel_token.is_cancelled(); - if download_was_cancelled { - log::info!("Streamed install download cancelled for {id}: {err}"); - } else { - log::error!("Streamed install download failed for {id}: {err}"); - } - finish_failed_stream_download( - &ctx, - &tx_notify_ui, - &id, - download_guard, - download_was_cancelled, - ) - .await; } } + + let download_was_cancelled = cancel_token.is_cancelled(); + if let Some(err) = last_receive_error { + if download_was_cancelled { + log::info!("Streamed install download cancelled for {id}: {err}"); + } else { + log::error!("Streamed install download failed for {id}: {err}"); + } + } else { + log::error!("Streamed install download failed for {id}: no peer attempts were made"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; } async fn finish_failed_stream_download( diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs index 3d2e168..f6c5df5 100644 --- a/crates/lanspread-peer/src/stream_install.rs +++ b/crates/lanspread-peer/src/stream_install.rs @@ -4,6 +4,7 @@ use std::{ path::{Path, PathBuf}, pin::Pin, sync::Arc, + time::{Duration, Instant}, }; use bytes::Bytes; @@ -15,6 +16,7 @@ use tokio::{ fs::File, io::AsyncWriteExt, sync::{mpsc, mpsc::UnboundedSender}, + time::{self, MissedTickBehavior}, }; use tokio_util::{ codec::{FramedRead, FramedWrite, LengthDelimitedCodec}, @@ -22,6 +24,7 @@ use tokio_util::{ }; use crate::{ + DownloadProgress, PeerEvent, install::root_eti_archives, network::connect_to_peer, @@ -29,6 +32,7 @@ use crate::{ }; const FRAME_CHANNEL_DEPTH: usize = 16; +const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500); pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; @@ -182,10 +186,18 @@ pub(crate) async fn receive_streamed_install( let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); let mut current_file: Option = None; + let mut progress = StreamInstallProgress::new(game_id.to_string()); + let mut progress_interval = time::interval(STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL); + progress_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + progress_interval.tick().await; loop { let next = tokio::select! { () = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"), + _ = progress_interval.tick() => { + progress.emit_current(&tx_notify_ui); + continue; + } next = framed_rx.next() => next, }; @@ -199,9 +211,13 @@ pub(crate) async fn receive_streamed_install( StreamInstallFrame::ArchiveBegin { archive_name, solid, + unpacked_size, } => { + progress.add_total(unpacked_size); + progress.emit_snapshot(&tx_notify_ui, 0); log::info!( - "Receiving streamed install archive {archive_name} for {game_id} (solid={solid})" + "Receiving streamed install archive {archive_name} for {game_id} \ + (solid={solid}, unpacked_size={unpacked_size})" ); } StreamInstallFrame::Directory { relative_path } => { @@ -227,8 +243,10 @@ pub(crate) async fn receive_streamed_install( let Some(file) = current_file.as_mut() else { eyre::bail!("received FileChunk without FileBegin"); }; - file.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) + let length = file + .write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) .await?; + progress.record_bytes(length); } StreamInstallFrame::FileEnd { relative_path } => { let Some(file) = current_file.take() else { @@ -243,6 +261,7 @@ pub(crate) async fn receive_streamed_install( if current_file.is_some() { eyre::bail!("streamed install completed with an open file"); } + progress.emit_snapshot(&tx_notify_ui, 0); return Ok(()); } StreamInstallFrame::Error { message } => { @@ -252,11 +271,68 @@ pub(crate) async fn receive_streamed_install( } } +struct StreamInstallProgress { + id: String, + total_bytes: u64, + downloaded_bytes: u64, + last_downloaded_bytes: u64, + last_at: Instant, +} + +impl StreamInstallProgress { + fn new(id: String) -> Self { + Self { + id, + total_bytes: 0, + downloaded_bytes: 0, + last_downloaded_bytes: 0, + last_at: Instant::now(), + } + } + + fn add_total(&mut self, bytes: u64) { + self.total_bytes = self.total_bytes.saturating_add(bytes); + } + + fn record_bytes(&mut self, bytes: u64) { + self.downloaded_bytes = self.downloaded_bytes.saturating_add(bytes); + } + + fn emit_current(&mut self, tx_notify_ui: &UnboundedSender) { + let now = Instant::now(); + let speed = bytes_per_second( + self.downloaded_bytes + .saturating_sub(self.last_downloaded_bytes), + now.duration_since(self.last_at), + ); + + self.last_downloaded_bytes = self.downloaded_bytes; + self.last_at = now; + self.emit_snapshot(tx_notify_ui, speed); + } + + fn emit_snapshot(&self, tx_notify_ui: &UnboundedSender, bytes_per_second: u64) { + let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesProgress(DownloadProgress { + id: self.id.clone(), + downloaded_bytes: self.downloaded_bytes, + total_bytes: self.total_bytes, + bytes_per_second, + active_peer_count: 1, + })); + } +} + +fn bytes_per_second(bytes: u64, elapsed: Duration) -> u64 { + let millis = elapsed.as_millis().max(1); + let rate = u128::from(bytes).saturating_mul(1_000) / millis; + u64::try_from(rate).unwrap_or(u64::MAX) +} + struct IncomingFile { relative_path: String, path: PathBuf, expected_size: u64, - expected_crc32: Option, + expected_crc32: u32, received: u64, hasher: Hasher, file: File, @@ -267,7 +343,7 @@ impl IncomingFile { relative_path: String, path: PathBuf, expected_size: u64, - expected_crc32: Option, + expected_crc32: u32, file: File, ) -> Self { Self { @@ -287,7 +363,7 @@ impl IncomingFile { peer_addr: SocketAddr, tx_notify_ui: &UnboundedSender, bytes: Bytes, - ) -> eyre::Result<()> { + ) -> eyre::Result { let offset = self.received; let length = u64::try_from(bytes.len())?; if offset.saturating_add(length) > self.expected_size { @@ -304,11 +380,11 @@ impl IncomingFile { let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished { id: game_id.to_string(), peer_addr, - relative_path: format!("{game_id}/local/{}", self.relative_path), + relative_path: format!("{game_id}/.local.installing/{}", self.relative_path), offset, length, }); - Ok(()) + Ok(length) } async fn finish(mut self, relative_path: &str) -> eyre::Result<()> { @@ -329,14 +405,13 @@ impl IncomingFile { ); } - if let Some(expected) = self.expected_crc32 { - let actual = self.hasher.finalize(); - if actual != expected { - eyre::bail!( - "streamed file {} CRC32 mismatch: got {actual:08X}, expected {expected:08X}", - self.relative_path - ); - } + let actual = self.hasher.finalize(); + if actual != self.expected_crc32 { + eyre::bail!( + "streamed file {} CRC32 mismatch: got {actual:08X}, expected {:08X}", + self.relative_path, + self.expected_crc32 + ); } log::debug!( diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index 27609c5..3aad2dc 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -97,11 +97,17 @@ pub enum Response { InternalPeerError(String), } -#[derive(Clone, Debug, Serialize, Deserialize)] +const STREAM_INSTALL_CONTROL_FRAME_TAG: u8 = 0; +const STREAM_INSTALL_FILE_CHUNK_FRAME_TAG: u8 = 1; +const STREAM_INSTALL_ENCODE_ERROR_FRAME: &[u8] = + b"\0{\"Error\":{\"message\":\"stream install frame encoding error\"}}"; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum StreamInstallFrame { ArchiveBegin { archive_name: String, solid: bool, + unpacked_size: u64, }, Directory { relative_path: String, @@ -109,7 +115,7 @@ pub enum StreamInstallFrame { FileBegin { relative_path: String, size: u64, - crc32: Option, + crc32: u32, }, FileChunk { bytes: Bytes, @@ -180,26 +186,59 @@ impl Message for Response { impl Message for StreamInstallFrame { fn decode(bytes: Bytes) -> Self { - match serde_json::from_slice(&bytes) { - Ok(t) => t, - Err(e) => { - tracing::error!(?e, "StreamInstallFrame decoding error"); - StreamInstallFrame::Error { - message: format!("stream install frame decoding error: {e}"), - } - } + if bytes.is_empty() { + return stream_install_decode_error("stream install frame is empty"); + } + + let tag = bytes[0]; + let payload = bytes.slice(1..); + match tag { + STREAM_INSTALL_CONTROL_FRAME_TAG => decode_stream_install_control_frame(&payload), + STREAM_INSTALL_FILE_CHUNK_FRAME_TAG => StreamInstallFrame::FileChunk { bytes: payload }, + _ => stream_install_decode_error(format!("unknown stream install frame tag {tag}")), } } fn encode(&self) -> Bytes { - match serde_json::to_vec(self) { - Ok(s) => Bytes::from(s), - Err(e) => { - tracing::error!(?e, "StreamInstallFrame encoding error"); - Bytes::from(format!( - r#"{{"Error": {{"message": "encoding error: {e}"}}}}"# - )) + match self { + StreamInstallFrame::FileChunk { bytes } => { + tagged_stream_install_frame(STREAM_INSTALL_FILE_CHUNK_FRAME_TAG, bytes) } + _ => match serde_json::to_vec(self) { + Ok(payload) => { + tagged_stream_install_frame(STREAM_INSTALL_CONTROL_FRAME_TAG, &payload) + } + Err(e) => { + tracing::error!(?e, "StreamInstallFrame encoding error"); + Bytes::from_static(STREAM_INSTALL_ENCODE_ERROR_FRAME) + } + }, } } } + +fn decode_stream_install_control_frame(payload: &[u8]) -> StreamInstallFrame { + match serde_json::from_slice(payload) { + Ok(StreamInstallFrame::FileChunk { .. }) => { + stream_install_decode_error("stream install control frame cannot contain file bytes") + } + Ok(frame) => frame, + Err(e) => { + tracing::error!(?e, "StreamInstallFrame decoding error"); + stream_install_decode_error(format!("stream install frame decoding error: {e}")) + } + } +} + +fn tagged_stream_install_frame(tag: u8, payload: &[u8]) -> Bytes { + let mut frame = Vec::with_capacity(1 + payload.len()); + frame.push(tag); + frame.extend_from_slice(payload); + Bytes::from(frame) +} + +fn stream_install_decode_error(message: impl Into) -> StreamInstallFrame { + StreamInstallFrame::Error { + message: message.into(), + } +} diff --git a/crates/lanspread-proto/tests/stream_install_frame.rs b/crates/lanspread-proto/tests/stream_install_frame.rs new file mode 100644 index 0000000..ed44eb7 --- /dev/null +++ b/crates/lanspread-proto/tests/stream_install_frame.rs @@ -0,0 +1,42 @@ +use bytes::Bytes; +use lanspread_proto::{Message, StreamInstallFrame}; + +#[test] +fn file_chunks_encode_raw_bytes() { + let bytes = Bytes::from_static(&[0, 1, 2, 255]); + let encoded = StreamInstallFrame::FileChunk { + bytes: bytes.clone(), + } + .encode(); + + assert_eq!(&encoded[..], &[1, 0, 1, 2, 255]); + assert_eq!( + StreamInstallFrame::decode(encoded), + StreamInstallFrame::FileChunk { bytes } + ); +} + +#[test] +fn control_frames_are_tagged_json() { + let frame = StreamInstallFrame::FileBegin { + relative_path: "bin/game.exe".to_string(), + size: 42, + crc32: 0x38B4_88A7, + }; + let encoded = frame.encode(); + + assert_eq!(encoded[0], 0); + assert_eq!(StreamInstallFrame::decode(encoded), frame); +} + +#[test] +fn empty_frames_decode_as_errors() { + match StreamInstallFrame::decode(Bytes::new()) { + StreamInstallFrame::Error { message } => { + assert!(message.contains("empty")); + } + other => { + panic!("expected error frame, got {other:?}"); + } + } +} -- 2.54.0 From 389511f6206d1d198bfe4099293acf3f6f116ad5 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sun, 7 Jun 2026 21:22:21 +0200 Subject: [PATCH 04/15] remove NEXT_STEPS_CLAUDES_REVIEW.md, it has been applied --- NEXT_STEPS_CLAUDES_REVIEW.md | 20 -------------------- 1 file changed, 20 deletions(-) delete mode 100644 NEXT_STEPS_CLAUDES_REVIEW.md diff --git a/NEXT_STEPS_CLAUDES_REVIEW.md b/NEXT_STEPS_CLAUDES_REVIEW.md deleted file mode 100644 index dd1ec7c..0000000 --- a/NEXT_STEPS_CLAUDES_REVIEW.md +++ /dev/null @@ -1,20 +0,0 @@ -# Issues - -1. (Major) File bytes are JSON-encoded — ~3–4× wire bloat + heavy CPU. StreamInstallFrame::FileChunk { bytes: Bytes } goes over the wire via serde_json (the Message impl). serde_json has no native byte type, so Bytes serializes as a JSON array of integers ([12,255,7,…]) — roughly 3–4 ASCII chars per payload byte, plus serialize/parse cost on both ends. The existing transfer path (peer.rs::stream_file_bytes) sends raw bytes straight on the QUIC stream for exactly this reason. For a feature whose entire purpose is moving file bytes efficiently on constrained peers, routing the payload through JSON undermines the goal. The 3 MiB S39 fixture hides it; a multi-GB game would not. Consider a binary codec (postcard/bincode) for the frame stream, or keep the control frames as-is but stream raw bytes for file content. - -2. (Design) Solid archives → O(n²) extraction. Each file is pulled with a separate unrar p archive (stream_unrar_file). For a solid .eti, unrar must re-decompress every preceding file on each invocation. solid is detected and sent in ArchiveBegin but never acted on. A solid multi-file archive could make streamed install pathologically slow. At minimum document the limitation; ideally a single-pass extraction for the solid case. - -3. (Robustness) Single peer, no fallback. handle_stream_install_game_command picks one peer (peers.sort(); peers.into_iter().next()). If that peer declines (can_serve_game false) or the connection drops, the whole install fails — no retry against the other validated peers, unlike the normal multi-peer download. Fine for a prototype, worth a TODO. - -4. (Minor) unrar p is a wildcard mask. unrar treats the file argument as a pattern; a name containing */?/[ could match the wrong file or multiple files (multiple matches concatenate into stdout → corrupt stream). CRC32 would catch it as a failure rather than silent corruption, but it's a sharp edge. Worth noting / pinning exact-match if the unrar build supports it. - -5. (Minor) CRC32 is optional. If unrar omits CRC32: (older builds, header-encrypted archives), crc32 is None and the receiver verifies size only. - -6. (Minor / cosmetic) - - DownloadGameFileChunkFinished reports relative_path as "{game_id}/local/…" while bytes actually land in staging (.local.installing); and there's no total-size up front, so the UI can't show a percentage. - - parse_unrar_listing depends on unrar's human-readable lt output (Details:/Name:/Type:/Size:/CRC32:). -cfg- guards against user config, but it's still brittle across unrar versions/locales. - -## Nits - -- PeerStartOptions dropped #[derive(Debug)] for a hand-written Debug (Arc isn't Debug) — correct, and Clone/Default are retained. Good. -- commit() renames staging into local/, so the .lanspread_owned marker lands inside local/ — but that's identical to the existing install_inner behavior, so no regression. -- 2.54.0 From 40697a73e5bed68bb067134ad87abe467b90f668 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sun, 7 Jun 2026 21:39:02 +0200 Subject: [PATCH 05/15] feat(tauri): add low-disk streamed install action NEXT_STEPS item 1 called out that streamed install was still CLI-only because the Tauri app started the peer with no stream provider. Users can now choose an explicit "Low disk install" action from the game detail modal for remote-only games instead of taking the default archive-preserving download path. The GUI command queues a normal peer detail fetch first so the peer database has the file metadata needed for source validation. A small pending handoff in Tauri routes the resulting GotGameFiles event into StreamInstallGame instead of DownloadGameFiles, and clears that pending state on no-peer or download failure events. This keeps the existing download continuation untouched for the default action. The external unrar stream provider moved from the CLI harness into lanspread-peer so CLI and Tauri use the same implementation. Tauri resolves the bundled unrar sidecar path and injects that provider at peer startup; falling back to the noop provider keeps peer startup alive if the sidecar cannot be resolved, while the streamed install operation still fails safely. Test Plan: - just fmt - just test - just frontend-test - just clippy - just build - git diff --check Refs: NEXT_STEPS.md item 1 --- Cargo.lock | 3 - NEXT_STEPS.md | 10 +- crates/lanspread-peer-cli/Cargo.toml | 3 - crates/lanspread-peer-cli/src/lib.rs | 416 +----------------- crates/lanspread-peer-cli/src/main.rs | 2 +- crates/lanspread-peer/src/lib.rs | 7 +- crates/lanspread-peer/src/stream_install.rs | 411 ++++++++++++++++- .../src-tauri/src/lib.rs | 128 +++++- .../src/components/modals/GameDetailModal.tsx | 16 +- .../src/hooks/useGameActions.ts | 22 +- .../src/lib/gameState.ts | 6 + .../src/windows/MainWindow.tsx | 1 + .../tests/gameState.test.ts | 34 ++ 13 files changed, 623 insertions(+), 436 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 32f6751..ea5a41c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2060,16 +2060,13 @@ dependencies = [ name = "lanspread-peer-cli" version = "0.1.0" dependencies = [ - "bytes", "eyre", "lanspread-compat", "lanspread-db", "lanspread-peer", - "lanspread-proto", "serde", "serde_json", "tokio", - "tokio-util", ] [[package]] diff --git a/NEXT_STEPS.md b/NEXT_STEPS.md index 1806a1d..0996a17 100644 --- a/NEXT_STEPS.md +++ b/NEXT_STEPS.md @@ -5,11 +5,13 @@ archive-derived install bytes into `local/` without making the receiver a source?” Yes. Next I’d harden the pieces that decide whether this is product-ready. -1. **Move from CLI-only to real app integration** +1. **Done — Move from CLI-only to real app integration** - Add a GUI command/control path for “stream install / low disk mode”, - probably behind an explicit option. The Tauri crate currently opts out with - `stream_install_provider: None`, so the GUI cannot use it yet. + The GUI now has an explicit “Low disk install” action in the game detail + modal for remote-only games. The Tauri backend queues that path through + `stream_install_game`, injects the shared external `unrar` stream provider, + and hands fetched file details to `StreamInstallGame` instead of the normal + download command. 2. **Replace per-file `unrar p` with a final archive provider** diff --git a/crates/lanspread-peer-cli/Cargo.toml b/crates/lanspread-peer-cli/Cargo.toml index 5a96902..7c18e7a 100644 --- a/crates/lanspread-peer-cli/Cargo.toml +++ b/crates/lanspread-peer-cli/Cargo.toml @@ -14,14 +14,11 @@ path = "src/main.rs" lanspread-compat = { path = "../lanspread-compat" } lanspread-db = { path = "../lanspread-db" } lanspread-peer = { path = "../lanspread-peer" } -lanspread-proto = { path = "../lanspread-proto" } -bytes = { workspace = true } eyre = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -tokio-util = { workspace = true } [lints.clippy] needless_pass_by_value = "allow" diff --git a/crates/lanspread-peer-cli/src/lib.rs b/crates/lanspread-peer-cli/src/lib.rs index 38fab43..9eba46b 100644 --- a/crates/lanspread-peer-cli/src/lib.rs +++ b/crates/lanspread-peer-cli/src/lib.rs @@ -5,24 +5,15 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, - process::Stdio, time::Duration, }; -use bytes::Bytes; use eyre::{Context, OptionExt}; -use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, Unpacker}; -use lanspread_proto::StreamInstallFrame; +use lanspread_peer::{UnpackFuture, Unpacker}; use serde::Serialize; use serde_json::{Value, json}; -use tokio::{ - io::{AsyncRead, AsyncReadExt}, - sync::mpsc, -}; -use tokio_util::sync::CancellationToken; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; -const STREAM_CHUNK_SIZE: usize = 256 * 1024; #[derive(Debug, Clone, PartialEq, Eq)] pub struct CommandEnvelope { @@ -270,356 +261,6 @@ impl Unpacker for ExternalUnrarUnpacker { } } -pub struct ExternalUnrarStreamProvider { - program: PathBuf, -} - -impl ExternalUnrarStreamProvider { - #[must_use] - pub fn new(program: PathBuf) -> Self { - Self { program } - } -} - -impl StreamInstallProvider for ExternalUnrarStreamProvider { - fn stream_archive<'a>( - &'a self, - archive: &'a Path, - frames: mpsc::Sender, - cancel_token: CancellationToken, - ) -> StreamInstallFuture<'a> { - Box::pin(async move { - let listing = unrar_listing(&self.program, archive).await?; - let archive_name = archive - .file_name() - .and_then(|name| name.to_str()) - .unwrap_or("archive.eti") - .to_string(); - - send_stream_frame( - &frames, - StreamInstallFrame::ArchiveBegin { - archive_name: archive_name.clone(), - solid: listing.solid, - unpacked_size: listing.unpacked_size(), - }, - ) - .await?; - - stream_unrar_entries( - &self.program, - archive, - &listing.entries, - &frames, - cancel_token.clone(), - ) - .await?; - - send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await - }) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct RarListing { - solid: bool, - entries: Vec, -} - -impl RarListing { - fn unpacked_size(&self) -> u64 { - self.entries - .iter() - .filter(|entry| entry.kind == RarEntryKind::File) - .map(|entry| entry.size) - .sum() - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct RarEntry { - relative_path: String, - kind: RarEntryKind, - size: u64, - crc32: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum RarEntryKind { - File, - Directory, -} - -#[derive(Default)] -struct RarEntryDraft { - relative_path: Option, - kind: Option, - size: Option, - crc32: Option, -} - -async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result { - let output = tokio::process::Command::new(program) - .arg("lt") - .arg("-cfg-") - .arg(archive) - .output() - .await?; - if !output.status.success() { - eyre::bail!( - "unrar lt failed for {} with status {}: {}", - archive.display(), - output.status, - String::from_utf8_lossy(&output.stderr) - ); - } - - parse_unrar_listing(&String::from_utf8_lossy(&output.stdout)) -} - -fn parse_unrar_listing(output: &str) -> eyre::Result { - let mut solid = false; - let mut entries = Vec::new(); - let mut current = RarEntryDraft::default(); - - for line in output.lines() { - let trimmed = line.trim(); - if let Some(details) = trimmed.strip_prefix("Details:") { - solid = details.to_ascii_lowercase().contains("solid"); - continue; - } - - if let Some(name) = trimmed.strip_prefix("Name:") { - push_rar_entry(&mut entries, std::mem::take(&mut current))?; - current.relative_path = Some(name.trim().to_string()); - continue; - } - - if let Some(kind) = trimmed.strip_prefix("Type:") { - current.kind = match kind.trim() { - "File" => Some(RarEntryKind::File), - "Directory" => Some(RarEntryKind::Directory), - _ => None, - }; - continue; - } - - if let Some(size) = trimmed.strip_prefix("Size:") { - current.size = Some(size.trim().parse()?); - continue; - } - - if let Some(crc) = trimmed.strip_prefix("CRC32:") { - current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?); - } - } - - push_rar_entry(&mut entries, current)?; - Ok(RarListing { solid, entries }) -} - -fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Result<()> { - let Some(relative_path) = draft.relative_path else { - return Ok(()); - }; - - let Some(kind) = draft.kind else { - return Ok(()); - }; - - let (size, crc32) = match kind { - RarEntryKind::File => { - let size = draft - .size - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; - let crc32 = draft - .crc32 - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; - (size, Some(crc32)) - } - RarEntryKind::Directory => (0, None), - }; - - entries.push(RarEntry { - relative_path, - kind, - size, - crc32, - }); - Ok(()) -} - -async fn stream_unrar_entries( - program: &Path, - archive: &Path, - entries: &[RarEntry], - frames: &mpsc::Sender, - cancel_token: CancellationToken, -) -> eyre::Result<()> { - let mut child = tokio::process::Command::new(program) - .arg("p") - .arg("-inul") - .arg("-cfg-") - .arg(archive) - .stdout(Stdio::piped()) - .stderr(Stdio::null()) - .spawn()?; - - let result = async { - let mut stdout = child - .stdout - .take() - .ok_or_eyre("unrar stdout was not captured")?; - let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; - - for entry in entries { - if cancel_token.is_cancelled() { - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - - match entry.kind { - RarEntryKind::Directory => { - send_stream_frame( - frames, - StreamInstallFrame::Directory { - relative_path: entry.relative_path.clone(), - }, - ) - .await?; - } - RarEntryKind::File => { - let Some(crc32) = entry.crc32 else { - eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); - }; - send_stream_frame( - frames, - StreamInstallFrame::FileBegin { - relative_path: entry.relative_path.clone(), - size: entry.size, - crc32, - }, - ) - .await?; - stream_unrar_file_from_stdout( - &mut stdout, - archive, - entry, - frames, - &mut buffer, - &cancel_token, - ) - .await?; - send_stream_frame( - frames, - StreamInstallFrame::FileEnd { - relative_path: entry.relative_path.clone(), - }, - ) - .await?; - } - } - } - - let extra = - read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?; - if extra != 0 { - eyre::bail!( - "unrar produced bytes after listed entries for {}", - archive.display() - ); - } - - let status = wait_unrar_child(&mut child, &cancel_token, archive).await?; - if !status.success() { - eyre::bail!( - "unrar p failed for {} with status {status}", - archive.display() - ); - } - - Ok(()) - } - .await; - - if result.is_err() { - let _ = child.kill().await; - } - - result -} - -async fn stream_unrar_file_from_stdout( - stdout: &mut (impl AsyncRead + Unpin), - archive: &Path, - entry: &RarEntry, - frames: &mpsc::Sender, - buffer: &mut [u8], - cancel_token: &CancellationToken, -) -> eyre::Result<()> { - let mut remaining = entry.size; - while remaining > 0 { - let read_len = usize::try_from(remaining.min(buffer.len() as u64))?; - let read = - read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?; - if read == 0 { - eyre::bail!( - "unrar ended while streaming {} from {}; {remaining} bytes missing", - entry.relative_path, - archive.display() - ); - } - - send_stream_frame( - frames, - StreamInstallFrame::FileChunk { - bytes: Bytes::copy_from_slice(&buffer[..read]), - }, - ) - .await?; - remaining = remaining.saturating_sub(u64::try_from(read)?); - } - - Ok(()) -} - -async fn read_unrar_stdout( - stdout: &mut (impl AsyncRead + Unpin), - buffer: &mut [u8], - cancel_token: &CancellationToken, - archive: &Path, -) -> eyre::Result { - tokio::select! { - () = cancel_token.cancelled() => { - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - read = stdout.read(buffer) => Ok(read?), - } -} - -async fn wait_unrar_child( - child: &mut tokio::process::Child, - cancel_token: &CancellationToken, - archive: &Path, -) -> eyre::Result { - tokio::select! { - () = cancel_token.cancelled() => { - let _ = child.kill().await; - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - status = child.wait() => Ok(status?), - } -} - -async fn send_stream_frame( - frames: &mpsc::Sender, - frame: StreamInstallFrame, -) -> eyre::Result<()> { - frames - .send(frame) - .await - .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) -} - pub fn result_line(id: &Option, command: &str, data: Value) -> eyre::Result { output_line(json!({ "type": "result", @@ -723,61 +364,6 @@ mod tests { ); } - #[test] - fn parses_unrar_technical_listing() { - let listing = parse_unrar_listing( - r#" -Archive: game.eti -Details: RAR 5, solid - - Name: bin/payload.bin - Type: File - Size: 123 - CRC32: 38B488A7 - - Name: bin - Type: Directory -"#, - ) - .expect("listing should parse"); - - assert!(listing.solid); - assert_eq!( - listing.entries, - vec![ - RarEntry { - relative_path: "bin/payload.bin".to_string(), - kind: RarEntryKind::File, - size: 123, - crc32: Some(0x38B4_88A7), - }, - RarEntry { - relative_path: "bin".to_string(), - kind: RarEntryKind::Directory, - size: 0, - crc32: None, - }, - ] - ); - } - - #[test] - fn rejects_unrar_file_entries_without_crc32() { - let err = parse_unrar_listing( - r#" -Archive: game.eti -Details: RAR 5 - - Name: bin/payload.bin - Type: File - Size: 123 -"#, - ) - .expect_err("file entries without CRC32 should be rejected"); - - assert!(err.to_string().contains("has no CRC32")); - } - #[tokio::test] async fn fixture_unpacker_creates_install_payload() { let temp = TempDir::new("lanspread-peer-cli-fixture"); diff --git a/crates/lanspread-peer-cli/src/main.rs b/crates/lanspread-peer-cli/src/main.rs index 995512d..75cf4a7 100644 --- a/crates/lanspread-peer-cli/src/main.rs +++ b/crates/lanspread-peer-cli/src/main.rs @@ -16,6 +16,7 @@ use lanspread_db::db::{Game, GameCatalog, GameFileDescription}; use lanspread_peer::{ ActiveOperation, ActiveOperationKind, + ExternalUnrarStreamProvider, InstallOperation, NoopStreamInstallProvider, PeerCommand, @@ -33,7 +34,6 @@ use lanspread_peer_cli::{ CliCommand, CommandEnvelope, DEFAULT_FIXTURE_VERSION, - ExternalUnrarStreamProvider, ExternalUnrarUnpacker, FixtureSeed, FixtureUnpacker, diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 540d0e2..f2053bf 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -83,7 +83,12 @@ pub use crate::{ launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once}, startup::PeerRuntimeHandle, state_paths::{launch_settings_applied_path, setup_done_path}, - stream_install::{NoopStreamInstallProvider, StreamInstallFuture, StreamInstallProvider}, + stream_install::{ + ExternalUnrarStreamProvider, + NoopStreamInstallProvider, + StreamInstallFuture, + StreamInstallProvider, + }, }; // ============================================================================= diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs index f6c5df5..dde87fe 100644 --- a/crates/lanspread-peer/src/stream_install.rs +++ b/crates/lanspread-peer/src/stream_install.rs @@ -3,6 +3,7 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, pin::Pin, + process::Stdio, sync::Arc, time::{Duration, Instant}, }; @@ -14,7 +15,8 @@ use lanspread_proto::{Message, Request, StreamInstallFrame}; use s2n_quic::stream::SendStream; use tokio::{ fs::File, - io::AsyncWriteExt, + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + process::Command, sync::{mpsc, mpsc::UnboundedSender}, time::{self, MissedTickBehavior}, }; @@ -33,6 +35,7 @@ use crate::{ const FRAME_CHANNEL_DEPTH: usize = 16; const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500); +const STREAM_CHUNK_SIZE: usize = 256 * 1024; pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; @@ -64,6 +67,357 @@ impl StreamInstallProvider for NoopStreamInstallProvider { } } +#[derive(Debug)] +pub struct ExternalUnrarStreamProvider { + program: PathBuf, +} + +impl ExternalUnrarStreamProvider { + #[must_use] + pub fn new(program: PathBuf) -> Self { + Self { program } + } +} + +impl StreamInstallProvider for ExternalUnrarStreamProvider { + fn stream_archive<'a>( + &'a self, + archive: &'a Path, + frames: mpsc::Sender, + cancel_token: CancellationToken, + ) -> StreamInstallFuture<'a> { + Box::pin(async move { + let listing = unrar_listing(&self.program, archive).await?; + let archive_name = archive + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("archive.eti") + .to_string(); + + send_stream_frame( + &frames, + StreamInstallFrame::ArchiveBegin { + archive_name: archive_name.clone(), + solid: listing.solid, + unpacked_size: listing.unpacked_size(), + }, + ) + .await?; + + stream_unrar_entries( + &self.program, + archive, + &listing.entries, + &frames, + cancel_token.clone(), + ) + .await?; + + send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RarListing { + solid: bool, + entries: Vec, +} + +impl RarListing { + fn unpacked_size(&self) -> u64 { + self.entries + .iter() + .filter(|entry| entry.kind == RarEntryKind::File) + .map(|entry| entry.size) + .sum() + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RarEntry { + relative_path: String, + kind: RarEntryKind, + size: u64, + crc32: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RarEntryKind { + File, + Directory, +} + +#[derive(Default)] +struct RarEntryDraft { + relative_path: Option, + kind: Option, + size: Option, + crc32: Option, +} + +async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result { + let output = Command::new(program) + .arg("lt") + .arg("-cfg-") + .arg(archive) + .output() + .await?; + if !output.status.success() { + eyre::bail!( + "unrar lt failed for {} with status {}: {}", + archive.display(), + output.status, + String::from_utf8_lossy(&output.stderr) + ); + } + + parse_unrar_listing(&String::from_utf8_lossy(&output.stdout)) +} + +fn parse_unrar_listing(output: &str) -> eyre::Result { + let mut solid = false; + let mut entries = Vec::new(); + let mut current = RarEntryDraft::default(); + + for line in output.lines() { + let trimmed = line.trim(); + if let Some(details) = trimmed.strip_prefix("Details:") { + solid = details.to_ascii_lowercase().contains("solid"); + continue; + } + + if let Some(name) = trimmed.strip_prefix("Name:") { + push_rar_entry(&mut entries, std::mem::take(&mut current))?; + current.relative_path = Some(name.trim().to_string()); + continue; + } + + if let Some(kind) = trimmed.strip_prefix("Type:") { + current.kind = match kind.trim() { + "File" => Some(RarEntryKind::File), + "Directory" => Some(RarEntryKind::Directory), + _ => None, + }; + continue; + } + + if let Some(size) = trimmed.strip_prefix("Size:") { + current.size = Some(size.trim().parse()?); + continue; + } + + if let Some(crc) = trimmed.strip_prefix("CRC32:") { + current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?); + } + } + + push_rar_entry(&mut entries, current)?; + Ok(RarListing { solid, entries }) +} + +fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Result<()> { + let Some(relative_path) = draft.relative_path else { + return Ok(()); + }; + + let Some(kind) = draft.kind else { + return Ok(()); + }; + + let (size, crc32) = match kind { + RarEntryKind::File => { + let size = draft + .size + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; + let crc32 = draft + .crc32 + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; + (size, Some(crc32)) + } + RarEntryKind::Directory => (0, None), + }; + + entries.push(RarEntry { + relative_path, + kind, + size, + crc32, + }); + Ok(()) +} + +async fn stream_unrar_entries( + program: &Path, + archive: &Path, + entries: &[RarEntry], + frames: &mpsc::Sender, + cancel_token: CancellationToken, +) -> eyre::Result<()> { + let mut child = Command::new(program) + .arg("p") + .arg("-inul") + .arg("-cfg-") + .arg(archive) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .spawn()?; + + let result = async { + let mut stdout = child + .stdout + .take() + .ok_or_else(|| eyre::eyre!("unrar stdout was not captured"))?; + let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; + + for entry in entries { + if cancel_token.is_cancelled() { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + + match entry.kind { + RarEntryKind::Directory => { + send_stream_frame( + frames, + StreamInstallFrame::Directory { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + RarEntryKind::File => { + let Some(crc32) = entry.crc32 else { + eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); + }; + send_stream_frame( + frames, + StreamInstallFrame::FileBegin { + relative_path: entry.relative_path.clone(), + size: entry.size, + crc32, + }, + ) + .await?; + stream_unrar_file_from_stdout( + &mut stdout, + archive, + entry, + frames, + &mut buffer, + &cancel_token, + ) + .await?; + send_stream_frame( + frames, + StreamInstallFrame::FileEnd { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + } + } + + let extra = + read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?; + if extra != 0 { + eyre::bail!( + "unrar produced bytes after listed entries for {}", + archive.display() + ); + } + + let status = wait_unrar_child(&mut child, &cancel_token, archive).await?; + if !status.success() { + eyre::bail!( + "unrar p failed for {} with status {status}", + archive.display() + ); + } + + Ok(()) + } + .await; + + if result.is_err() { + let _ = child.kill().await; + } + + result +} + +async fn stream_unrar_file_from_stdout( + stdout: &mut (impl AsyncRead + Unpin), + archive: &Path, + entry: &RarEntry, + frames: &mpsc::Sender, + buffer: &mut [u8], + cancel_token: &CancellationToken, +) -> eyre::Result<()> { + let mut remaining = entry.size; + while remaining > 0 { + let read_len = usize::try_from(remaining.min(u64::try_from(buffer.len())?))?; + let read = + read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?; + if read == 0 { + eyre::bail!( + "unrar ended while streaming {} from {}; {remaining} bytes missing", + entry.relative_path, + archive.display() + ); + } + + send_stream_frame( + frames, + StreamInstallFrame::FileChunk { + bytes: Bytes::copy_from_slice(&buffer[..read]), + }, + ) + .await?; + remaining = remaining.saturating_sub(u64::try_from(read)?); + } + + Ok(()) +} + +async fn read_unrar_stdout( + stdout: &mut (impl AsyncRead + Unpin), + buffer: &mut [u8], + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + read = stdout.read(buffer) => Ok(read?), + } +} + +async fn wait_unrar_child( + child: &mut tokio::process::Child, + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + let _ = child.kill().await; + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + status = child.wait() => Ok(status?), + } +} + +async fn send_stream_frame( + frames: &mpsc::Sender, + frame: StreamInstallFrame, +) -> eyre::Result<()> { + frames + .send(frame) + .await + .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) +} + pub(crate) async fn send_stream_install_error( tx: SendStream, message: impl Into, @@ -444,4 +798,59 @@ mod tests { assert!(resolve_stream_path(&staging, "/absolute").is_err()); assert!(resolve_stream_path(&staging, "C:/windows").is_err()); } + + #[test] + fn parses_unrar_technical_listing() { + let listing = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5, solid + + Name: bin/payload.bin + Type: File + Size: 123 + CRC32: 38B488A7 + + Name: bin + Type: Directory +"#, + ) + .expect("listing should parse"); + + assert!(listing.solid); + assert_eq!( + listing.entries, + vec![ + RarEntry { + relative_path: "bin/payload.bin".to_string(), + kind: RarEntryKind::File, + size: 123, + crc32: Some(0x38B4_88A7), + }, + RarEntry { + relative_path: "bin".to_string(), + kind: RarEntryKind::Directory, + size: 0, + crc32: None, + }, + ] + ); + } + + #[test] + fn rejects_unrar_file_entries_without_crc32() { + let err = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/payload.bin + Type: File + Size: 123 +"#, + ) + .expect_err("file entries without CRC32 should be rejected"); + + assert!(err.to_string().contains("has no CRC32")); + } } diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index 12c3a51..489ccc5 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -14,11 +14,14 @@ use lanspread_db::db::{Availability, Game, GameCatalog, GameDB, GameFileDescript use lanspread_peer::{ ActiveOperation, ActiveOperationKind, + ExternalUnrarStreamProvider, + NoopStreamInstallProvider, PeerCommand, PeerEvent, PeerGameDB, PeerRuntimeHandle, PeerStartOptions, + StreamInstallProvider, UnpackFuture, Unpacker, migrate_legacy_state, @@ -82,6 +85,7 @@ struct LanSpreadState { peer_runtime: Arc>>, games: Arc>, active_operations: Arc>>, + pending_stream_installs: Arc>>, games_folder: Arc>, peer_game_db: Arc>, catalog: Arc>, @@ -255,6 +259,16 @@ async fn install_game( log::warn!("Game already has an active operation: {id}"); return Ok(false); } + if state + .inner() + .pending_stream_installs + .read() + .await + .contains(&id) + { + log::warn!("Game already has a pending streamed install: {id}"); + return Ok(false); + } let peer_ctrl_arc = state.inner().peer_ctrl.clone(); let peer_ctrl = peer_ctrl_arc.read().await.clone(); @@ -294,6 +308,77 @@ async fn install_game( Ok(handled) } +#[tauri::command] +async fn stream_install_game( + id: String, + state: tauri::State<'_, LanSpreadState>, +) -> tauri::Result { + if state + .inner() + .active_operations + .read() + .await + .contains_key(&id) + { + log::warn!("Game already has an active operation: {id}"); + return Ok(false); + } + if state + .inner() + .pending_stream_installs + .read() + .await + .contains(&id) + { + log::warn!("Game already has a pending streamed install: {id}"); + return Ok(false); + } + + let Some((downloaded, installed, peer_count)) = state + .inner() + .games + .read() + .await + .get_game_by_id(&id) + .map(|game| (game.downloaded, game.installed, game.peer_count)) + else { + log::warn!("Ignoring streamed install request for unknown game: {id}"); + return Ok(false); + }; + if downloaded || installed || peer_count == 0 { + log::warn!( + "Ignoring streamed install request for {id}: downloaded={downloaded}, \ + installed={installed}, peer_count={peer_count}" + ); + return Ok(false); + } + + let peer_ctrl_arc = state.inner().peer_ctrl.clone(); + let peer_ctrl = peer_ctrl_arc.read().await.clone(); + let Some(peer_ctrl) = peer_ctrl else { + log::warn!("Peer system not initialized yet"); + return Ok(false); + }; + + { + let mut pending = state.inner().pending_stream_installs.write().await; + pending.insert(id.clone()); + } + + if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id.clone())) { + log::error!("Failed to send PeerCommand::GetGame for streamed install: {e:?}"); + state + .inner() + .pending_stream_installs + .write() + .await + .remove(&id); + return Ok(false); + } + + Ok(true) +} + #[tauri::command] async fn update_game( id: String, @@ -1867,6 +1952,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { let unpacker = Arc::new(SidecarUnpacker { app_handle: app_handle.clone(), }); + let stream_install_provider = stream_install_provider_for_app(app_handle); match start_peer_with_options( games_folder.to_path_buf(), tx_peer_event, @@ -1876,7 +1962,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { PeerStartOptions { state_dir: Some(state_dir), active_outbound_transfers: Some(state.active_outbound_transfers.clone()), - stream_install_provider: None, + stream_install_provider: Some(stream_install_provider), }, ) { Ok(handle) => { @@ -1894,6 +1980,22 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { } } +fn stream_install_provider_for_app(app_handle: &AppHandle) -> Arc { + match resolve_unrar_sidecar_program(app_handle) { + Ok(program) => Arc::new(ExternalUnrarStreamProvider::new(program)), + Err(err) => { + log::error!("Failed to resolve streamed-install unrar sidecar: {err}"); + Arc::new(NoopStreamInstallProvider) + } + } +} + +fn resolve_unrar_sidecar_program(app_handle: &AppHandle) -> eyre::Result { + let sidecar = app_handle.shell().sidecar("unrar")?; + let command: std::process::Command = sidecar.into(); + Ok(PathBuf::from(command.get_program())) +} + fn emit_game_id_event(app_handle: &AppHandle, event: &str, id: &str, label: &str) { if let Err(e) = app_handle.emit(event, Some(id.to_owned())) { log::error!("{label}: Failed to emit {event} event: {e}"); @@ -1990,6 +2092,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::NoPeersHaveGame { id } => { log::warn!("PeerEvent::NoPeersHaveGame received for {id}"); + clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-no-peers", @@ -2028,6 +2131,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::DownloadGameFilesFailed { id } => { log::warn!("PeerEvent::DownloadGameFilesFailed received"); + clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-download-failed", @@ -2037,6 +2141,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::DownloadGameFilesAllPeersGone { id } => { log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}"); + clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-download-peers-gone", @@ -2175,17 +2280,27 @@ async fn handle_got_game_files( ); let state = app_handle.state::(); + let stream_install = state.pending_stream_installs.write().await.remove(&id); let peer_ctrl = state.peer_ctrl.read().await.clone(); if let Some(peer_ctrl) = peer_ctrl - && let Err(e) = peer_ctrl.send(PeerCommand::DownloadGameFiles { - id, - file_descriptions, - }) + && let Err(e) = if stream_install { + peer_ctrl.send(PeerCommand::StreamInstallGame { id }) + } else { + peer_ctrl.send(PeerCommand::DownloadGameFiles { + id, + file_descriptions, + }) + } { - log::error!("Failed to send PeerCommand::DownloadGameFiles: {e}"); + log::error!("Failed to continue queued game transfer: {e}"); } } +async fn clear_pending_stream_install(app_handle: &AppHandle, id: &str) { + let state = app_handle.state::(); + state.pending_stream_installs.write().await.remove(id); +} + fn handle_download_finished(app_handle: &AppHandle, id: String) { log::info!("PeerEvent::DownloadGameFilesFinished received"); emit_game_id_event( @@ -2679,6 +2794,7 @@ pub fn run() { .invoke_handler(tauri::generate_handler![ request_games, install_game, + stream_install_game, run_game, start_server, game_directory_exists, diff --git a/crates/lanspread-tauri-deno-ts/src/components/modals/GameDetailModal.tsx b/crates/lanspread-tauri-deno-ts/src/components/modals/GameDetailModal.tsx index 96e5e87..b5d9693 100644 --- a/crates/lanspread-tauri-deno-ts/src/components/modals/GameDetailModal.tsx +++ b/crates/lanspread-tauri-deno-ts/src/components/modals/GameDetailModal.tsx @@ -5,7 +5,7 @@ import { StateChip } from '../StateChip'; import { ActionButton } from '../ActionButton'; import { Game, InstallStatus } from '../../lib/types'; -import { deriveState, hasNewerLocalVersion, isInProgress } from '../../lib/gameState'; +import { canStreamInstall, deriveState, hasNewerLocalVersion, isInProgress } from '../../lib/gameState'; import { formatBytes, formatEtiVersion, formatPlayers } from '../../lib/format'; interface Props { @@ -13,6 +13,7 @@ interface Props { thumbnailUrl: string | null; onClose: () => void; onPrimary: (game: Game) => void; + onStreamInstall: (game: Game) => void; onUninstall: (game: Game) => void; onRemoveDownload: (game: Game) => void; onCancelDownload: (game: Game) => void; @@ -43,6 +44,7 @@ export const GameDetailModal = ({ thumbnailUrl, onClose, onPrimary, + onStreamInstall, onUninstall, onRemoveDownload, onCancelDownload, @@ -55,6 +57,7 @@ export const GameDetailModal = ({ const canRemoveDownload = game.downloaded && !game.installed && !isInProgress(game.install_status); + const showStreamInstall = canStreamInstall(game); const canViewFiles = game.downloaded || game.installed || game.install_status === InstallStatus.Downloading @@ -133,6 +136,17 @@ export const GameDetailModal = ({ onClick={() => onPrimary(game)} onCancelDownload={onCancelDownload} /> + {showStreamInstall && ( + + )} {game.installed && game.can_host_server === true && (