diff --git a/Cargo.lock b/Cargo.lock index aff387a..32f6751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2035,6 +2035,7 @@ name = "lanspread-peer" version = "0.1.0" dependencies = [ "bytes", + "crc32fast", "eyre", "futures", "gethostname", @@ -2059,13 +2060,16 @@ dependencies = [ name = "lanspread-peer-cli" version = "0.1.0" dependencies = [ + "bytes", "eyre", "lanspread-compat", "lanspread-db", "lanspread-peer", + "lanspread-proto", "serde", "serde_json", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c3d57d5..705d512 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ [workspace.dependencies] base64 = "0.22" bytes = { version = "1", features = ["serde"] } +crc32fast = "1" eyre = "0.6" futures = "0.3" gethostname = "1" diff --git a/NEXT_STEPS.md b/NEXT_STEPS.md new file mode 100644 index 0000000..1806a1d --- /dev/null +++ b/NEXT_STEPS.md @@ -0,0 +1,64 @@ +# Streamed Install Next Steps + +I’d treat the prototype as proof of the hard part: “can we stream +archive-derived install bytes into `local/` without making the receiver a +source?” Yes. Next I’d harden the pieces that decide whether this is +product-ready. + +1. **Move from CLI-only to real app integration** + + Add a GUI command/control path for “stream install / low disk mode”, + probably behind an explicit option. The Tauri crate currently opts out with + `stream_install_provider: None`, so the GUI cannot use it yet. + +2. **Replace per-file `unrar p` with a final archive provider** + + The prototype provider is intentionally simple: `unrar lt`, then `unrar p` + per file. Good for non-solid archives, bad for solid archives. Final shape + should be a one-pass provider with real entry boundaries, likely via libunrar + or a purpose-built wrapper. + +3. **Handle solid archives deliberately** + + Add archive inspection that decides: + + - non-solid: per-file streaming is fine + - solid: one sequential archive pass only + + This is the big architectural fork we discussed, and the prototype’s + provider is the thing to swap. + +4. **Decide the integrity model** + + Current prototype verifies streamed bytes against RAR CRC32 from the + sender’s archive headers. That catches corruption and provider bugs. It does + not protect against a malicious peer lying. If you care about that, the next + step is catalog-side trusted hashes for archive or extracted files. + +5. **Upgrade retry/resume semantics** + + Right now, failed stream means failed operation and rollback. Next useful + step: + + - retry whole stream from another trusted peer + - later, maybe keep completed files and restart only the interrupted file + - avoid byte-offset resume until there’s a strong reason + +6. **Expand scenario coverage** + + I’d add cases for: + + - sender disconnect mid-stream + - receiver cancel mid-stream + - corrupted/truncated stream fails and leaves no `local/` + - already-installed game rejects streamed install + - multi-archive `.eti` roots stream in sorted order + +7. **Clean product semantics** + + Decide how the UI labels this state. It is installed but not downloaded, so + “Local only” is technically correct, but users may need a clear affordance + like “Installed, not shareable”. + +My recommended next slice: make the provider abstraction final-ish, then +implement a real one-pass provider. Everything else builds cleanly on that. diff --git a/PEER_CLI_SCENARIOS.md b/PEER_CLI_SCENARIOS.md index a7aee4a..1cb8cbf 100644 --- a/PEER_CLI_SCENARIOS.md +++ b/PEER_CLI_SCENARIOS.md @@ -46,6 +46,8 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path. | S36 | Latest singleton beats stale majority | Five peers advertise one game; one peer has `20260501`, four peers have `20250101`. | `list-games` reports `eti_game_version=20260501`; all descriptors and chunks come from the singleton latest peer; stale peers contribute zero bytes. | | S37 | Single-source download throughput | A source peer advertises a temporary catalog game with one sparse `2 GiB` `.eti`; an empty client downloads it with `install=false`. | The client emits `download-finished` with throughput measurements (`bytes`, `duration_ms`, `mib_per_s`, `mbit_per_s`), and the downloaded archive size matches the source. | | S38 | First-play launch-setting stamping | `fixture-persona/css` ships a real RAR `.eti` whose tree buries a CRLF `SmartSteamEmu.ini` with a stub `PersonaName` line under `engine/bin/win64/steam_settings/`, plus a stub `account_name.txt` and `language.txt` under `profiles/local/`. A peer installs `css` (with `--unrar`), then sends `play css` with a username and language, then `play css` again. | After install the marker `games/css/launch_settings_applied` is absent and the stub files are intact under `local/`. The first `play` returns `already_applied=false` with `account_name_written`, `language_written`, and `persona_name_written` all true; the deep `SmartSteamEmu.ini` `PersonaName` value becomes the username with its `\r\n` ending and sibling lines preserved, `account_name.txt` becomes the username, `language.txt` becomes the passed language, and the marker now exists. A second `play` returns `already_applied=true`, rewrites nothing, and leaves the files untouched even if their values were reset externally. | +| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `got-game-files`, `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. | +| S40 | Streamed install receiver is not a peer source | After S39, a third peer connects only to the streamed-install receiver. | The third peer may see the receiver's local-only summary in peer snapshots, but `list-games` remote aggregation does not expose `cnctw` as downloadable, `peer_count` remains zero/absent, and attempting `download cnctw` fails with no local files created. | ## Version-Skew Contract @@ -105,13 +107,37 @@ Use S38 to pin down how launcher settings are stamped into an installed game: line keeps its existing line ending (`\n` or `\r\n`). - The marker records only that we *tried*: it is written unconditionally after the first play, so a game with none of these files is still marked done. -- S38 needs a real archive expanded with `--unrar`, so it runs against the host - `lanspread-peer-cli` binary rather than the Docker matrix image (which omits - `unrar`). The peer crate's `launch_settings` unit tests cover the rewrite, - line-ending, and marker logic deterministically. +- S38 needs a real archive expanded with `--unrar`; the Docker matrix image now + carries the Linux sidecar for streamed-install coverage, while the peer + crate's `launch_settings` unit tests cover the rewrite, line-ending, and + marker logic deterministically. ## Run Log +### 2026-06-07 - Streamed Install Prototype (S39-S40) + +- Code under test added `stream-install` to `lanspread-peer-cli`, a peer + `StreamInstallGame` command, streamed install frames over QUIC, and an + injected `unrar lt`/`unrar p` provider for archive-derived bytes. +- Gates before Docker: `just fmt` and + `RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test` passed for the + workspace. +- Runner: + `python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S39 S40 --build-image` + passed against the rebuilt `lanspread-peer-cli:dev` image. +- S39 streamed a catalog-version-adjusted `cnctw` fixture from a real RAR + `.eti` into the receiver's `local/` only. The receiver had + `downloaded=false`, `installed=true`, `availability=LocalOnly`, no root + `version.ini`, no root `.eti`, and payload SHA-256 hashes + `82f4da22dc042166def2a5ee2eca19fc9e52785f99838e86c32167cb342e2588` + (`bin/cnctw-payload.bin`) and + `abf833a06c74ea9f17d505c2684186491898ce906405e0f098f0deac19476b06` + (`data/cnctw-assets.dat`) matching `unrar p`. +- S40 connected an observer only to that streamed-install receiver. The + observer saw the receiver's `cnctw` summary as local-only, remote aggregation + hid it as a downloadable source, and `download cnctw` failed with + `no peers have game cnctw`. + ### 2026-05-28 - First-Play Launch-Setting Stamping (S38) - Code under test moved the `account_name.txt`/`language.txt` overwrite out of diff --git a/crates/lanspread-peer-cli/Cargo.toml b/crates/lanspread-peer-cli/Cargo.toml index 7c18e7a..5a96902 100644 --- a/crates/lanspread-peer-cli/Cargo.toml +++ b/crates/lanspread-peer-cli/Cargo.toml @@ -14,11 +14,14 @@ path = "src/main.rs" lanspread-compat = { path = "../lanspread-compat" } lanspread-db = { path = "../lanspread-db" } lanspread-peer = { path = "../lanspread-peer" } +lanspread-proto = { path = "../lanspread-proto" } +bytes = { workspace = true } eyre = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } [lints.clippy] needless_pass_by_value = "allow" diff --git a/crates/lanspread-peer-cli/Dockerfile b/crates/lanspread-peer-cli/Dockerfile index ca24bfc..ad636f3 100644 --- a/crates/lanspread-peer-cli/Dockerfile +++ b/crates/lanspread-peer-cli/Dockerfile @@ -4,14 +4,16 @@ WORKDIR /work COPY . . RUN cargo build --release -p lanspread-peer-cli -FROM debian:bookworm-slim +FROM debian:trixie-slim RUN apt-get update \ - && apt-get install -y --no-install-recommends ca-certificates \ + && apt-get install -y --no-install-recommends ca-certificates libstdc++6 \ && rm -rf /var/lib/apt/lists/* COPY --from=build /work/target/release/lanspread-peer-cli /usr/local/bin/lanspread-peer-cli COPY crates/lanspread-tauri-deno-ts/src-tauri/game.db /app/game.db +COPY crates/lanspread-tauri-deno-ts/src-tauri/binaries/unrar-x86_64-unknown-linux-gnu /usr/local/bin/unrar +RUN chmod +x /usr/local/bin/unrar ENTRYPOINT ["lanspread-peer-cli"] CMD ["--games-dir", "/games", "--state-dir", "/state", "--catalog-db", "/app/game.db"] diff --git a/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py b/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py index 8c262a6..85874fa 100644 --- a/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py +++ b/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Run the peer-cli scenarios S1-S36 through Docker.""" +"""Run the peer-cli scenarios S1-S40 through Docker.""" from __future__ import annotations @@ -8,6 +8,7 @@ import hashlib import json import os import queue +import shlex import shutil import subprocess import sys @@ -325,6 +326,8 @@ class Runner: ("S35", self.s35_unknown_game_filtered), ("S36", self.s36_latest_singleton), ("S37", self.s37_single_source_download_throughput), + ("S39", self.s39_streamed_install_local_only), + ("S40", self.s40_streamed_receiver_not_source), ] for scenario_id, scenario in scenarios: @@ -1060,6 +1063,114 @@ class Runner: f"{throughput['chunks']} chunks" ) + def stream_install_cnctw(self, prefix: str) -> tuple[Peer, Peer]: + source_dir = self.fixture_root / f"{prefix}-bravo" + copy_game("cnctw", source_dir, version="20160128") + source = self.peer(f"{prefix}-bravo", games_dir=source_dir) + client = self.peer(f"{prefix}-client") + connect_many(client, [source]) + wait_remote_game(client, "cnctw", peer_count=1) + waiter = LineWaiter(len(client.output)) + client.send({"cmd": "stream-install", "game_id": "cnctw"}) + client.wait_for( + event_is("got-game-files", "cnctw"), + timeout=20, + description="got cnctw files", + waiter=waiter, + ) + client.wait_for( + event_is("download-begin", "cnctw"), + timeout=20, + description="stream begin cnctw", + waiter=waiter, + ) + client.wait_for( + event_is("download-finished", "cnctw"), + timeout=60, + description="stream finish cnctw", + waiter=waiter, + ) + client.wait_for( + event_is("install-finished", "cnctw"), + timeout=30, + description="stream install cnctw", + waiter=waiter, + ) + return source, client + + def s39_streamed_install_local_only(self) -> str: + source, client = self.stream_install_cnctw("s39") + game = wait_local_game(client, "cnctw", downloaded=False, installed=True) + assert_game_state( + game, + downloaded=False, + installed=True, + availability="LocalOnly", + ) + + game_root = client.host_games_dir / "cnctw" + assert_not_exists(game_root / "version.ini") + assert_not_exists(game_root / "cnctw.eti") + + expected = { + "bin/cnctw-payload.bin": unrar_entry_sha256( + source, "cnctw", "bin/cnctw-payload.bin" + ), + "data/cnctw-assets.dat": unrar_entry_sha256( + source, "cnctw", "data/cnctw-assets.dat" + ), + } + actual = { + rel: sha256_file(game_root / "local" / rel) + for rel in expected + } + if actual != expected: + raise ScenarioError(f"streamed local payload hashes mismatched: {actual} != {expected}") + + streamed_bytes = sum( + int(item.get("data", {}).get("length", 0)) + for item in client.output + if item.get("type") == "event" + and item.get("event") == "download-chunk-finished" + and item.get("data", {}).get("game_id") == "cnctw" + ) + expected_bytes = 3 * 1024 * 1024 + if streamed_bytes != expected_bytes: + raise ScenarioError( + f"streamed byte count mismatch: {streamed_bytes} != {expected_bytes}" + ) + + return ( + "cnctw streamed into local/ only; root archive and version.ini absent; " + f"payload hashes={actual}" + ) + + def s40_streamed_receiver_not_source(self) -> str: + _source, receiver = self.stream_install_cnctw("s40") + observer = self.peer("s40-observer") + connect_many(observer, [receiver]) + receiver_snapshot = wait_peer_has_game(observer, receiver.peer_id, "cnctw") + summary = next( + game + for game in receiver_snapshot.get("games", []) + if game.get("id") == "cnctw" + ) + if summary.get("availability") != "LocalOnly" or summary.get("downloaded"): + raise ScenarioError(f"receiver did not advertise cnctw as local-only: {summary}") + + wait_remote_absent(observer, "cnctw", timeout=5) + err = observer.send( + {"cmd": "download", "game_id": "cnctw", "install": False}, + expect_error=True, + ) + if "no peers have game cnctw" not in err["error"]: + raise ScenarioError(f"unexpected local-only download error: {err}") + assert_not_exists(observer.host_games_dir / "cnctw") + return ( + "observer saw receiver's local-only cnctw snapshot, but remote aggregation hid it " + f"and download errored '{err['error']}'" + ) + def run(command: list[str], description: str) -> subprocess.CompletedProcess[str]: result = subprocess.run( @@ -1177,6 +1288,25 @@ def create_large_sparse_game(root: Path, *, size: int) -> None: handle.truncate(size) +def sha256_file(path: Path) -> str: + hasher = hashlib.sha256() + with path.open("rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +def unrar_entry_sha256(peer: Peer, game_id: str, relative_path: str) -> str: + command = ( + f"unrar p -inul /games/{shlex.quote(game_id)}/{shlex.quote(game_id)}.eti " + f"{shlex.quote(relative_path)} | sha256sum" + ) + output = peer.docker_exec("sh", "-c", command).stdout.strip() + if not output: + raise ScenarioError(f"empty sha256 output for {game_id}:{relative_path}") + return output.split()[0] + + def format_bytes(size: int) -> str: return f"{size / 1024 / 1024 / 1024:.2f} GiB" diff --git a/crates/lanspread-peer-cli/src/lib.rs b/crates/lanspread-peer-cli/src/lib.rs index 78e4315..b05eefa 100644 --- a/crates/lanspread-peer-cli/src/lib.rs +++ b/crates/lanspread-peer-cli/src/lib.rs @@ -5,15 +5,21 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, + process::Stdio, time::Duration, }; +use bytes::Bytes; use eyre::{Context, OptionExt}; -use lanspread_peer::{UnpackFuture, Unpacker}; +use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, Unpacker}; +use lanspread_proto::StreamInstallFrame; use serde::Serialize; use serde_json::{Value, json}; +use tokio::{io::AsyncReadExt, sync::mpsc}; +use tokio_util::sync::CancellationToken; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; +const STREAM_CHUNK_SIZE: usize = 256 * 1024; #[derive(Debug, Clone, PartialEq, Eq)] pub struct CommandEnvelope { @@ -33,6 +39,9 @@ pub enum CliCommand { game_id: String, install_after_download: bool, }, + StreamInstall { + game_id: String, + }, Install { game_id: String, }, @@ -63,6 +72,7 @@ impl CliCommand { Self::ListGames => "list-games", Self::SetGameDir { .. } => "set-game-dir", Self::Download { .. } => "download", + Self::StreamInstall { .. } => "stream-install", Self::Install { .. } => "install", Self::Uninstall { .. } => "uninstall", Self::Play { .. } => "play", @@ -101,6 +111,9 @@ pub fn parse_command_value(value: &Value) -> eyre::Result { game_id: game_id(object)?, install_after_download: install_after_download(object)?, }, + "stream-install" => CliCommand::StreamInstall { + game_id: game_id(object)?, + }, "install" => CliCommand::Install { game_id: game_id(object)?, }, @@ -254,6 +267,270 @@ impl Unpacker for ExternalUnrarUnpacker { } } +pub struct ExternalUnrarStreamProvider { + program: PathBuf, +} + +impl ExternalUnrarStreamProvider { + #[must_use] + pub fn new(program: PathBuf) -> Self { + Self { program } + } +} + +impl StreamInstallProvider for ExternalUnrarStreamProvider { + fn stream_archive<'a>( + &'a self, + archive: &'a Path, + frames: mpsc::Sender, + cancel_token: CancellationToken, + ) -> StreamInstallFuture<'a> { + Box::pin(async move { + let listing = unrar_listing(&self.program, archive).await?; + let archive_name = archive + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("archive.eti") + .to_string(); + + send_stream_frame( + &frames, + StreamInstallFrame::ArchiveBegin { + archive_name: archive_name.clone(), + solid: listing.solid, + }, + ) + .await?; + + for entry in listing.entries { + if cancel_token.is_cancelled() { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + + match entry.kind { + RarEntryKind::Directory => { + send_stream_frame( + &frames, + StreamInstallFrame::Directory { + relative_path: entry.relative_path, + }, + ) + .await?; + } + RarEntryKind::File => { + send_stream_frame( + &frames, + StreamInstallFrame::FileBegin { + relative_path: entry.relative_path.clone(), + size: entry.size, + crc32: entry.crc32, + }, + ) + .await?; + stream_unrar_file( + &self.program, + archive, + &entry.relative_path, + &frames, + cancel_token.clone(), + ) + .await?; + send_stream_frame( + &frames, + StreamInstallFrame::FileEnd { + relative_path: entry.relative_path, + }, + ) + .await?; + } + } + } + + send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RarListing { + solid: bool, + entries: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RarEntry { + relative_path: String, + kind: RarEntryKind, + size: u64, + crc32: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RarEntryKind { + File, + Directory, +} + +#[derive(Default)] +struct RarEntryDraft { + relative_path: Option, + kind: Option, + size: Option, + crc32: Option, +} + +async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result { + let output = tokio::process::Command::new(program) + .arg("lt") + .arg("-cfg-") + .arg(archive) + .output() + .await?; + if !output.status.success() { + eyre::bail!( + "unrar lt failed for {} with status {}: {}", + archive.display(), + output.status, + String::from_utf8_lossy(&output.stderr) + ); + } + + parse_unrar_listing(&String::from_utf8_lossy(&output.stdout)) +} + +fn parse_unrar_listing(output: &str) -> eyre::Result { + let mut solid = false; + let mut entries = Vec::new(); + let mut current = RarEntryDraft::default(); + + for line in output.lines() { + let trimmed = line.trim(); + if let Some(details) = trimmed.strip_prefix("Details:") { + solid = details.to_ascii_lowercase().contains("solid"); + continue; + } + + if let Some(name) = trimmed.strip_prefix("Name:") { + push_rar_entry(&mut entries, std::mem::take(&mut current))?; + current.relative_path = Some(name.trim().to_string()); + continue; + } + + if let Some(kind) = trimmed.strip_prefix("Type:") { + current.kind = match kind.trim() { + "File" => Some(RarEntryKind::File), + "Directory" => Some(RarEntryKind::Directory), + _ => None, + }; + continue; + } + + if let Some(size) = trimmed.strip_prefix("Size:") { + current.size = Some(size.trim().parse()?); + continue; + } + + if let Some(crc) = trimmed.strip_prefix("CRC32:") { + current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?); + } + } + + push_rar_entry(&mut entries, current)?; + Ok(RarListing { solid, entries }) +} + +fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Result<()> { + let Some(relative_path) = draft.relative_path else { + return Ok(()); + }; + + let Some(kind) = draft.kind else { + return Ok(()); + }; + + let size = match kind { + RarEntryKind::File => draft + .size + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?, + RarEntryKind::Directory => 0, + }; + + entries.push(RarEntry { + relative_path, + kind, + size, + crc32: draft.crc32, + }); + Ok(()) +} + +async fn stream_unrar_file( + program: &Path, + archive: &Path, + relative_path: &str, + frames: &mpsc::Sender, + cancel_token: CancellationToken, +) -> eyre::Result<()> { + let mut child = tokio::process::Command::new(program) + .arg("p") + .arg("-inul") + .arg("-cfg-") + .arg(archive) + .arg(relative_path) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .spawn()?; + + let mut stdout = child + .stdout + .take() + .ok_or_eyre("unrar stdout was not captured")?; + let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; + + loop { + let read = tokio::select! { + () = cancel_token.cancelled() => { + let _ = child.kill().await; + eyre::bail!("streaming {relative_path} from {} was cancelled", archive.display()); + } + read = stdout.read(&mut buffer) => read?, + }; + + if read == 0 { + break; + } + + send_stream_frame( + frames, + StreamInstallFrame::FileChunk { + bytes: Bytes::copy_from_slice(&buffer[..read]), + }, + ) + .await?; + } + + let status = child.wait().await?; + if !status.success() { + eyre::bail!( + "unrar p failed for {}:{} with status {status}", + archive.display(), + relative_path + ); + } + + Ok(()) +} + +async fn send_stream_frame( + frames: &mpsc::Sender, + frame: StreamInstallFrame, +) -> eyre::Result<()> { + frames + .send(frame) + .await + .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) +} + pub fn result_line(id: &Option, command: &str, data: Value) -> eyre::Result { output_line(json!({ "type": "result", @@ -344,6 +621,57 @@ mod tests { assert_eq!(parsed["data"]["peer_count"], 0); } + #[test] + fn parses_stream_install_command() { + let parsed = parse_command_line(r#"{"cmd":"stream-install","game_id":"cnctw"}"#) + .expect("command should parse"); + + assert_eq!( + parsed.command, + CliCommand::StreamInstall { + game_id: "cnctw".to_string(), + } + ); + } + + #[test] + fn parses_unrar_technical_listing() { + let listing = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/payload.bin + Type: File + Size: 123 + CRC32: 38B488A7 + + Name: bin + Type: Directory +"#, + ) + .expect("listing should parse"); + + assert!(!listing.solid); + assert_eq!( + listing.entries, + vec![ + RarEntry { + relative_path: "bin/payload.bin".to_string(), + kind: RarEntryKind::File, + size: 123, + crc32: Some(0x38B4_88A7), + }, + RarEntry { + relative_path: "bin".to_string(), + kind: RarEntryKind::Directory, + size: 0, + crc32: None, + }, + ] + ); + } + #[tokio::test] async fn fixture_unpacker_creates_install_payload() { let temp = TempDir::new("lanspread-peer-cli-fixture"); diff --git a/crates/lanspread-peer-cli/src/main.rs b/crates/lanspread-peer-cli/src/main.rs index f5155fa..995512d 100644 --- a/crates/lanspread-peer-cli/src/main.rs +++ b/crates/lanspread-peer-cli/src/main.rs @@ -17,6 +17,7 @@ use lanspread_peer::{ ActiveOperation, ActiveOperationKind, InstallOperation, + NoopStreamInstallProvider, PeerCommand, PeerEvent, PeerGameDB, @@ -24,6 +25,7 @@ use lanspread_peer::{ PeerRuntimeHandle, PeerSnapshot, PeerStartOptions, + StreamInstallProvider, migrate_legacy_state, start_peer_with_options, }; @@ -31,6 +33,7 @@ use lanspread_peer_cli::{ CliCommand, CommandEnvelope, DEFAULT_FIXTURE_VERSION, + ExternalUnrarStreamProvider, ExternalUnrarUnpacker, FixtureSeed, FixtureUnpacker, @@ -134,10 +137,15 @@ async fn main() -> eyre::Result<()> { let (tx_events, rx_events) = mpsc::unbounded_channel(); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let catalog = Arc::new(RwLock::new(catalog)); - let unpacker: Arc = match args.unrar { + let unrar_for_streaming = args.unrar.clone().or_else(default_unrar_program); + let unpacker: Arc = match args.unrar.clone() { Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)), None => Arc::new(FixtureUnpacker), }; + let stream_install_provider: Arc = match unrar_for_streaming { + Some(path) => Arc::new(ExternalUnrarStreamProvider::new(path)), + None => Arc::new(NoopStreamInstallProvider), + }; let mut handle = start_peer_with_options( args.games_dir.clone(), @@ -148,6 +156,7 @@ async fn main() -> eyre::Result<()> { PeerStartOptions { state_dir: Some(args.state_dir.clone()), active_outbound_transfers: None, + stream_install_provider: Some(stream_install_provider), }, )?; let sender = handle.sender(); @@ -249,6 +258,15 @@ async fn handle_command( })?; Ok(json!({"queued": true, "game_id": game_id, "install": install_after_download})) } + CliCommand::StreamInstall { game_id } => { + ensure_catalog_game(shared, game_id).await?; + ensure_no_active_operation(shared, game_id).await?; + let _ = game_files_for_download(sender, shared, game_id).await?; + sender.send(PeerCommand::StreamInstallGame { + id: game_id.clone(), + })?; + Ok(json!({"queued": true, "game_id": game_id})) + } CliCommand::Install { game_id } => { ensure_catalog_game(shared, game_id).await?; ensure_no_active_operation(shared, game_id).await?; @@ -729,6 +747,15 @@ fn default_catalog_db() -> Option { .find(|path| path.exists()) } +fn default_unrar_program() -> Option { + [ + PathBuf::from("/usr/local/bin/unrar"), + PathBuf::from("/usr/bin/unrar"), + ] + .into_iter() + .find(|path| path.exists()) +} + fn next_string(args: &mut impl Iterator, flag: &str) -> eyre::Result { args.next() .ok_or_else(|| eyre::eyre!("{flag} requires a value"))? diff --git a/crates/lanspread-peer/Cargo.toml b/crates/lanspread-peer/Cargo.toml index e580140..6437e69 100644 --- a/crates/lanspread-peer/Cargo.toml +++ b/crates/lanspread-peer/Cargo.toml @@ -14,6 +14,7 @@ lanspread-utils = { path = "../lanspread-utils" } # external bytes = { workspace = true } +crc32fast = { workspace = true } eyre = { workspace = true } futures = { workspace = true } gethostname = { workspace = true } diff --git a/crates/lanspread-peer/src/context.rs b/crates/lanspread-peer/src/context.rs index a4f3da3..db47b93 100644 --- a/crates/lanspread-peer/src/context.rs +++ b/crates/lanspread-peer/src/context.rs @@ -6,7 +6,14 @@ use lanspread_db::db::{GameCatalog, GameDB}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB}; +use crate::{ + PeerEvent, + StreamInstallProvider, + Unpacker, + events, + library::LocalLibraryState, + peer_db::PeerGameDB, +}; /// Thread-safe map of active outbound file transfers grouped by game ID. pub type OutboundTransfers = Arc>>>; @@ -38,6 +45,7 @@ pub struct Ctx { pub active_operations: Arc>>, pub active_downloads: Arc>>, pub unpacker: Arc, + pub stream_install_provider: Arc, pub catalog: Arc>, pub peer_id: Arc, pub shutdown: CancellationToken, @@ -57,6 +65,7 @@ pub struct PeerCtx { pub catalog: Arc>, pub peer_id: Arc, pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender, + pub stream_install_provider: Arc, pub shutdown: CancellationToken, pub task_tracker: TaskTracker, pub active_outbound_transfers: OutboundTransfers, @@ -86,6 +95,7 @@ impl Ctx { task_tracker: TaskTracker, catalog: Arc>, active_outbound_transfers: OutboundTransfers, + stream_install_provider: Arc, ) -> Self { Self { game_dir: Arc::new(RwLock::new(game_dir)), @@ -97,6 +107,7 @@ impl Ctx { active_operations: Arc::new(RwLock::new(HashMap::new())), active_downloads: Arc::new(RwLock::new(HashMap::new())), unpacker, + stream_install_provider, catalog, peer_id: Arc::new(peer_id), shutdown, @@ -120,6 +131,7 @@ impl Ctx { catalog: self.catalog.clone(), peer_id: self.peer_id.clone(), tx_notify_ui, + stream_install_provider: self.stream_install_provider.clone(), shutdown: self.shutdown.clone(), task_tracker: self.task_tracker.clone(), active_outbound_transfers: self.active_outbound_transfers.clone(), diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 9fe7f91..c5851c5 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -11,6 +11,7 @@ use std::{ use lanspread_db::db::{GameDB, GameFileDescription}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; +use tokio_util::sync::CancellationToken; use crate::{ InstallOperation, @@ -33,6 +34,7 @@ use crate::{ peer_db::PeerGameDB, remote_peer::ensure_peer_id_for_addr, services::{HandshakeCtx, perform_handshake_with_peer}, + stream_install::receive_streamed_install, }; // ============================================================================= @@ -450,6 +452,91 @@ pub async fn handle_install_game_command( spawn_install_operation(ctx, tx_notify_ui, id); } +pub async fn handle_stream_install_game_command( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, +) { + if !catalog_contains(ctx, &id).await { + log::warn!("Ignoring streamed install command for non-catalog game {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } + + let games_folder = { ctx.game_dir.read().await.clone() }; + let game_root = games_folder.join(&id); + if local_dir_is_directory(&game_root).await { + log::warn!("Ignoring streamed install command for already-installed game {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } + + let expected_version = catalog_expected_version(ctx, &id).await; + let mut peers = { + match ctx + .peer_game_db + .read() + .await + .validate_file_sizes_majority(&id, expected_version.as_deref()) + { + Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => { + peer_whitelist + } + Ok(_) => { + log::error!("No trusted peers available for streamed install of {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } + Err(err) => { + log::error!( + "File size majority validation failed for streamed install {id}: {err}" + ); + send_download_failed(tx_notify_ui, &id); + return; + } + } + }; + peers.sort(); + let Some(peer_addr) = peers.into_iter().next() else { + log::error!("No peer selected for streamed install of {id}"); + send_download_failed(tx_notify_ui, &id); + return; + }; + + match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { + BeginOperationResult::Started => {} + BeginOperationResult::AlreadyActive => { + log::warn!("Operation for {id} already in progress; ignoring streamed install request"); + return; + } + BeginOperationResult::DrainTimedOut => { + log::error!("Timed out waiting for outbound transfers before streamed install of {id}"); + send_download_failed(tx_notify_ui, &id); + return; + } + } + + let cancel_token = ctx.shutdown.child_token(); + ctx.active_downloads + .write() + .await + .insert(id.clone(), cancel_token.clone()); + + let ctx_clone = ctx.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + ctx.task_tracker.spawn(async move { + run_stream_install_operation( + ctx_clone, + tx_notify_ui, + id, + game_root, + peer_addr, + cancel_token, + ) + .await; + }); +} + /// Handles the `UninstallGame` command. pub async fn handle_uninstall_game_command( ctx: &Ctx, @@ -490,6 +577,151 @@ pub async fn handle_cancel_download_command( cancel_token.cancel(); } +async fn run_stream_install_operation( + ctx: Ctx, + tx_notify_ui: UnboundedSender, + id: String, + game_root: PathBuf, + peer_addr: SocketAddr, + cancel_token: CancellationToken, +) { + let download_guard = OperationGuard::download( + id.clone(), + ctx.active_operations.clone(), + ctx.active_downloads.clone(), + tx_notify_ui.clone(), + ); + + events::send( + &tx_notify_ui, + PeerEvent::DownloadGameFilesBegin { id: id.clone() }, + ); + + let transaction = match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id) + .await + { + Ok(transaction) => transaction, + Err(err) => { + log::error!("Failed to prepare streamed install for {id}: {err}"); + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await; + return; + } + }; + + let receive_result = receive_streamed_install( + peer_addr, + &id, + transaction.staging_dir(), + tx_notify_ui.clone(), + cancel_token.clone(), + ) + .await; + + match receive_result { + Ok(()) => { + if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing) + .await + { + clear_active_download(&ctx, &id).await; + send_download_finished(&tx_notify_ui, &id); + download_guard.disarm(); + commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; + } else { + if let Err(err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {err}"); + } + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) + .await; + } + } + Err(err) => { + if let Err(rollback_err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); + } + let download_was_cancelled = cancel_token.is_cancelled(); + if download_was_cancelled { + log::info!("Streamed install download cancelled for {id}: {err}"); + } else { + log::error!("Streamed install download failed for {id}: {err}"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; + } + } +} + +async fn finish_failed_stream_download( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: &str, + guard: OperationGuard, + cancelled: bool, +) { + if let Err(err) = refresh_local_game_for_ending_operation(ctx, tx_notify_ui, id).await { + log::error!("Failed to refresh local library after streamed install failure: {err}"); + } + end_download_operation(ctx, tx_notify_ui, id).await; + guard.disarm(); + send_download_failed_unless_cancelled(tx_notify_ui, id, cancelled); +} + +async fn commit_streamed_install( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, + transaction: install::StreamedInstallTransaction, +) { + let operation_guard = OperationGuard::new( + id.clone(), + ctx.active_operations.clone(), + tx_notify_ui.clone(), + ); + events::send( + tx_notify_ui, + PeerEvent::InstallGameBegin { + id: id.clone(), + operation: InstallOperation::Installing, + }, + ); + + match transaction.commit().await { + Ok(()) => { + if let Err(err) = refresh_local_game_for_ending_operation(ctx, tx_notify_ui, &id).await + { + log::error!("Failed to refresh local library after streamed install: {err}"); + } + end_operation(ctx, tx_notify_ui, &id).await; + operation_guard.disarm(); + events::send( + tx_notify_ui, + PeerEvent::InstallGameFinished { id: id.clone() }, + ); + } + Err(err) => { + log::error!("Streamed install commit failed for {id}: {err}"); + if let Err(refresh_err) = + refresh_local_game_for_ending_operation(ctx, tx_notify_ui, &id).await + { + log::error!( + "Failed to refresh local library after streamed install commit failure: {refresh_err}" + ); + } + end_operation(ctx, tx_notify_ui, &id).await; + operation_guard.disarm(); + events::send( + tx_notify_ui, + PeerEvent::InstallGameFailed { id: id.clone() }, + ); + } + } +} + fn spawn_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); @@ -1264,6 +1496,7 @@ mod tests { TaskTracker::new(), Arc::new(RwLock::new(GameCatalog::from_ids(["game".to_string()]))), Arc::new(RwLock::new(HashMap::new())), + Arc::new(crate::NoopStreamInstallProvider), ) } diff --git a/crates/lanspread-peer/src/install/mod.rs b/crates/lanspread-peer/src/install/mod.rs index 4d518d7..e0c404b 100644 --- a/crates/lanspread-peer/src/install/mod.rs +++ b/crates/lanspread-peer/src/install/mod.rs @@ -4,5 +4,13 @@ mod transaction; pub mod unpack; pub use remove::remove_downloaded; -pub use transaction::{install, recover_on_startup, uninstall, update}; +pub(crate) use transaction::root_eti_archives; +pub use transaction::{ + StreamedInstallTransaction, + begin_streamed_install, + install, + recover_on_startup, + uninstall, + update, +}; pub use unpack::{UnpackFuture, Unpacker}; diff --git a/crates/lanspread-peer/src/install/transaction.rs b/crates/lanspread-peer/src/install/transaction.rs index a7ebc85..959dc1c 100644 --- a/crates/lanspread-peer/src/install/transaction.rs +++ b/crates/lanspread-peer/src/install/transaction.rs @@ -33,6 +33,103 @@ struct InstallFsState { backup: FsEntryState, } +pub struct StreamedInstallTransaction { + game_root: PathBuf, + state_dir: PathBuf, + id: String, + staging: PathBuf, + eti_version: Option, +} + +impl StreamedInstallTransaction { + #[must_use] + pub fn staging_dir(&self) -> &Path { + &self.staging + } + + pub async fn commit(self) -> eyre::Result<()> { + let local = local_dir(&self.game_root); + let result = async { + tokio::fs::rename(&self.staging, &local) + .await + .wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))?; + reset_launch_settings_marker(&self.state_dir, &self.id).await?; + write_intent( + &self.state_dir, + &self.id, + &InstallIntent::none(&self.id, self.eti_version.clone()), + ) + .await + } + .await; + + if result.is_err() { + if let Err(cleanup_err) = remove_dir_all_if_exists(&self.staging).await { + log::warn!( + "Failed to clean streamed install staging {}: {cleanup_err}", + self.staging.display() + ); + } + let _ = write_intent( + &self.state_dir, + &self.id, + &InstallIntent::none(&self.id, self.eti_version.clone()), + ) + .await; + } + + result + } + + pub async fn rollback(self) -> eyre::Result<()> { + let staging_result = remove_dir_all_if_exists(&self.staging).await; + let intent_result = write_intent( + &self.state_dir, + &self.id, + &InstallIntent::none(&self.id, self.eti_version.clone()), + ) + .await; + + staging_result?; + intent_result + } +} + +pub async fn begin_streamed_install( + game_root: &Path, + state_dir: &Path, + id: &str, +) -> eyre::Result { + if path_is_dir(&local_dir(game_root)).await { + eyre::bail!("game {id} is already installed"); + } + + tokio::fs::create_dir_all(game_root).await?; + let eti_version = read_downloaded_version(game_root).await; + write_intent( + state_dir, + id, + &InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()), + ) + .await?; + + let staging = installing_dir(game_root); + if let Err(err) = prepare_owned_empty_dir(&staging).await { + let _ = write_intent(state_dir, id, &InstallIntent::none(id, eti_version)).await; + return Err(err); + } + + let staging = tokio::fs::canonicalize(&staging).await.unwrap_or(staging); + + Ok(StreamedInstallTransaction { + game_root: game_root.to_path_buf(), + state_dir: state_dir.to_path_buf(), + id: id.to_string(), + staging, + eti_version, + }) +} + pub async fn install( game_root: &Path, state_dir: &Path, @@ -258,7 +355,7 @@ async fn unpack_archives( Ok(()) } -async fn root_eti_archives(game_root: &Path) -> eyre::Result> { +pub(crate) async fn root_eti_archives(game_root: &Path) -> eyre::Result> { let mut entries = tokio::fs::read_dir(game_root).await?; let mut archives = Vec::new(); while let Some(entry) = entries.next_entry().await? { diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 3d254de..540d0e2 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -32,6 +32,7 @@ mod remote_peer; mod services; mod startup; mod state_paths; +mod stream_install; #[cfg(test)] mod test_support; @@ -82,6 +83,7 @@ pub use crate::{ launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once}, startup::PeerRuntimeHandle, state_paths::{launch_settings_applied_path, setup_done_path}, + stream_install::{NoopStreamInstallProvider, StreamInstallFuture, StreamInstallProvider}, }; // ============================================================================= @@ -243,6 +245,8 @@ pub enum PeerCommand { file_descriptions: Vec, install_after_download: bool, }, + /// Stream archive-expanded bytes directly into `local/` without keeping root archives. + StreamInstallGame { id: String }, /// Install already-downloaded archives into `local/`. InstallGame { id: String }, /// Remove only the `local/` install for a game. @@ -260,11 +264,29 @@ pub enum PeerCommand { } /// Optional startup settings for non-GUI callers and tests. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Default)] pub struct PeerStartOptions { /// Directory used for peer identity and other state. pub state_dir: Option, pub active_outbound_transfers: Option, + /// Provider used to stream archive entries for low-disk streamed installs. + pub stream_install_provider: Option>, +} + +impl std::fmt::Debug for PeerStartOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PeerStartOptions") + .field("state_dir", &self.state_dir) + .field( + "active_outbound_transfers", + &self.active_outbound_transfers.as_ref().map(|_| "..."), + ) + .field( + "stream_install_provider", + &self.stream_install_provider.as_ref().map(|_| "..."), + ) + .finish() + } } // ============================================================================= @@ -314,11 +336,14 @@ pub fn start_peer_with_options( let PeerStartOptions { state_dir, active_outbound_transfers, + stream_install_provider, } = options; let state_dir = resolve_state_dir(state_dir.as_deref()); let game_dir = game_dir.into(); let active_outbound_transfers = active_outbound_transfers .unwrap_or_else(|| Arc::new(RwLock::new(std::collections::HashMap::new()))); + let stream_install_provider = + stream_install_provider.unwrap_or_else(|| Arc::new(NoopStreamInstallProvider)); log::info!( "Starting peer system with game directory: {}", game_dir.display() @@ -338,6 +363,7 @@ pub fn start_peer_with_options( unpacker, catalog, active_outbound_transfers, + stream_install_provider, )) } @@ -355,6 +381,7 @@ async fn run_peer( task_tracker: TaskTracker, catalog: Arc>, active_outbound_transfers: crate::context::OutboundTransfers, + stream_install_provider: Arc, ) -> eyre::Result<()> { let ctx = Ctx::new( peer_game_db, @@ -366,6 +393,7 @@ async fn run_peer( task_tracker, catalog, active_outbound_transfers, + stream_install_provider, ); if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await { log::error!("Failed to load initial local game database: {err}"); @@ -439,6 +467,9 @@ async fn handle_peer_commands( ) .await; } + PeerCommand::StreamInstallGame { id } => { + handlers::handle_stream_install_game_command(ctx, tx_notify_ui, id).await; + } PeerCommand::InstallGame { id } => { handle_install_game_command(ctx, tx_notify_ui, id).await; } diff --git a/crates/lanspread-peer/src/services/handshake.rs b/crates/lanspread-peer/src/services/handshake.rs index f147582..7ba2aa8 100644 --- a/crates/lanspread-peer/src/services/handshake.rs +++ b/crates/lanspread-peer/src/services/handshake.rs @@ -319,6 +319,7 @@ mod tests { TaskTracker::new(), Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(HashMap::new())), + Arc::new(crate::NoopStreamInstallProvider), ); *ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000)); diff --git a/crates/lanspread-peer/src/services/local_monitor.rs b/crates/lanspread-peer/src/services/local_monitor.rs index 2885539..f0ce2e9 100644 --- a/crates/lanspread-peer/src/services/local_monitor.rs +++ b/crates/lanspread-peer/src/services/local_monitor.rs @@ -384,6 +384,7 @@ mod tests { TaskTracker::new(), Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(std::collections::HashMap::new())), + Arc::new(crate::NoopStreamInstallProvider), ) } diff --git a/crates/lanspread-peer/src/services/stream.rs b/crates/lanspread-peer/src/services/stream.rs index 27ae69e..7dee903 100644 --- a/crates/lanspread-peer/src/services/stream.rs +++ b/crates/lanspread-peer/src/services/stream.rs @@ -15,6 +15,7 @@ use crate::{ local_games::{get_game_file_descriptions, is_local_dir_name, local_download_matches_catalog}, peer::{send_game_file_chunk, send_game_file_data}, services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync}, + stream_install::{send_game_install_stream, send_stream_install_error}, }; type ResponseWriter = FramedWrite; @@ -99,6 +100,9 @@ async fn dispatch_request( } => { handle_file_chunk_request(ctx, game_id, relative_path, offset, length, framed_tx).await } + Request::StreamInstall { game_id } => { + handle_stream_install_request(ctx, game_id, framed_tx).await + } Request::Goodbye { peer_id } => { handle_goodbye(ctx, remote_addr, peer_id).await; framed_tx @@ -386,6 +390,49 @@ async fn handle_file_chunk_request( FramedWrite::new(tx, LengthDelimitedCodec::new()) } +async fn handle_stream_install_request( + ctx: &PeerCtx, + game_id: String, + framed_tx: ResponseWriter, +) -> ResponseWriter { + log::info!("Received StreamInstall request for {game_id} from peer"); + + let (guard, cancel_token) = TransferGuard::new( + game_id.clone(), + ctx.active_outbound_transfers.clone(), + ctx.tx_notify_ui.clone(), + &ctx.shutdown, + ) + .await; + + let mut tx = framed_tx.into_inner(); + let game_dir = ctx.game_dir.read().await.clone(); + if !can_serve_game(ctx, &game_dir, &game_id).await { + log::info!( + "Declining StreamInstall for {game_id} because the game is not currently transferable" + ); + tx = send_stream_install_error(tx, format!("game {game_id} is not transferable")).await; + drop(guard); + return FramedWrite::new(tx, LengthDelimitedCodec::new()); + } + + let game_root = game_dir.join(&game_id); + let (returned_tx, result) = send_game_install_stream( + ctx.stream_install_provider.clone(), + tx, + &game_root, + &game_id, + cancel_token, + ) + .await; + if let Err(err) = result { + log::warn!("StreamInstall for {game_id} ended with error: {err}"); + } + + drop(guard); + FramedWrite::new(returned_tx, LengthDelimitedCodec::new()) +} + async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option, peer_id: String) { log::info!("Received Goodbye from peer {peer_id}"); let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) }; @@ -442,6 +489,7 @@ mod tests { TaskTracker::new(), Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(std::collections::HashMap::new())), + Arc::new(crate::NoopStreamInstallProvider), ) .to_peer_ctx(tx_notify_ui) } diff --git a/crates/lanspread-peer/src/startup.rs b/crates/lanspread-peer/src/startup.rs index 2ab4571..96e4479 100644 --- a/crates/lanspread-peer/src/startup.rs +++ b/crates/lanspread-peer/src/startup.rs @@ -23,6 +23,7 @@ use crate::{ PeerCommand, PeerEvent, PeerRuntimeComponent, + StreamInstallProvider, Unpacker, context::Ctx, events, @@ -87,6 +88,7 @@ pub(crate) fn spawn_peer_runtime( unpacker: Arc, catalog: Arc>, active_outbound_transfers: crate::context::OutboundTransfers, + stream_install_provider: Arc, ) -> PeerRuntimeHandle { let shutdown = CancellationToken::new(); let task_tracker = TaskTracker::new(); @@ -107,6 +109,7 @@ pub(crate) fn spawn_peer_runtime( runtime_tracker.clone(), catalog, active_outbound_transfers, + stream_install_provider, ) .await { diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs new file mode 100644 index 0000000..3d2e168 --- /dev/null +++ b/crates/lanspread-peer/src/stream_install.rs @@ -0,0 +1,372 @@ +use std::{ + future::Future, + net::SocketAddr, + path::{Path, PathBuf}, + pin::Pin, + sync::Arc, +}; + +use bytes::Bytes; +use crc32fast::Hasher; +use futures::{SinkExt, StreamExt}; +use lanspread_proto::{Message, Request, StreamInstallFrame}; +use s2n_quic::stream::SendStream; +use tokio::{ + fs::File, + io::AsyncWriteExt, + sync::{mpsc, mpsc::UnboundedSender}, +}; +use tokio_util::{ + codec::{FramedRead, FramedWrite, LengthDelimitedCodec}, + sync::CancellationToken, +}; + +use crate::{ + PeerEvent, + install::root_eti_archives, + network::connect_to_peer, + path_validation::validate_game_file_path, +}; + +const FRAME_CHANNEL_DEPTH: usize = 16; + +pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; + +pub trait StreamInstallProvider: Send + Sync { + fn stream_archive<'a>( + &'a self, + archive: &'a Path, + frames: mpsc::Sender, + cancel_token: CancellationToken, + ) -> StreamInstallFuture<'a>; +} + +#[derive(Debug, Default)] +pub struct NoopStreamInstallProvider; + +impl StreamInstallProvider for NoopStreamInstallProvider { + fn stream_archive<'a>( + &'a self, + archive: &'a Path, + _frames: mpsc::Sender, + _cancel_token: CancellationToken, + ) -> StreamInstallFuture<'a> { + Box::pin(async move { + eyre::bail!( + "streamed install provider is not configured for {}", + archive.display() + ) + }) + } +} + +pub(crate) async fn send_stream_install_error( + tx: SendStream, + message: impl Into, +) -> SendStream { + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + if let Err(err) = framed_tx + .send( + StreamInstallFrame::Error { + message: message.into(), + } + .encode(), + ) + .await + { + log::warn!("Failed to send streamed install error frame: {err}"); + } + if let Err(err) = framed_tx.close().await { + log::debug!("Failed to close streamed install error response: {err}"); + } + framed_tx.into_inner() +} + +pub(crate) async fn send_game_install_stream( + provider: Arc, + tx: SendStream, + game_root: &Path, + game_id: &str, + cancel_token: CancellationToken, +) -> (SendStream, eyre::Result<()>) { + let archives = match root_eti_archives(game_root).await { + Ok(archives) => archives, + Err(err) => { + let message = err.to_string(); + let tx = send_stream_install_error(tx, message.clone()).await; + return (tx, Err(eyre::eyre!(message))); + } + }; + if archives.is_empty() { + let message = format!("no .eti archives found for {game_id}"); + let tx = send_stream_install_error(tx, message.clone()).await; + return (tx, Err(eyre::eyre!(message))); + } + + let (frame_tx, mut frame_rx) = mpsc::channel(FRAME_CHANNEL_DEPTH); + let producer_cancel = cancel_token.child_token(); + let game_id_for_producer = game_id.to_string(); + let producer = tokio::spawn({ + let provider = provider.clone(); + let producer_cancel = producer_cancel.clone(); + async move { + for archive in archives { + if producer_cancel.is_cancelled() { + eyre::bail!("streamed install for {game_id_for_producer} was cancelled"); + } + + if let Err(err) = provider + .stream_archive(&archive, frame_tx.clone(), producer_cancel.clone()) + .await + { + let message = err.to_string(); + let _ = frame_tx.send(StreamInstallFrame::Error { message }).await; + return Err(err); + } + } + + let _ = frame_tx.send(StreamInstallFrame::Complete).await; + Ok(()) + } + }); + + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + let mut send_result = Ok(()); + + while let Some(frame) = frame_rx.recv().await { + if let Err(err) = framed_tx.send(frame.encode()).await { + producer_cancel.cancel(); + send_result = Err(eyre::eyre!("failed to send streamed install frame: {err}")); + break; + } + } + + let close_result = framed_tx + .close() + .await + .map_err(|err| eyre::eyre!("failed to close streamed install stream: {err}")); + let tx = framed_tx.into_inner(); + let producer_result = match producer.await { + Ok(result) => result, + Err(err) => Err(eyre::eyre!("streamed install producer task failed: {err}")), + }; + let result = send_result.and(producer_result).and(close_result); + + (tx, result) +} + +pub(crate) async fn receive_streamed_install( + peer_addr: SocketAddr, + game_id: &str, + staging_dir: &Path, + tx_notify_ui: UnboundedSender, + cancel_token: CancellationToken, +) -> eyre::Result<()> { + let staging_dir = tokio::fs::canonicalize(staging_dir) + .await + .unwrap_or_else(|_| staging_dir.to_path_buf()); + let mut conn = connect_to_peer(peer_addr).await?; + let stream = conn.open_bidirectional_stream().await?; + let (rx, tx) = stream.split(); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + + framed_tx + .send( + Request::StreamInstall { + game_id: game_id.to_string(), + } + .encode(), + ) + .await?; + framed_tx.close().await?; + + let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut current_file: Option = None; + + loop { + let next = tokio::select! { + () = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"), + next = framed_rx.next() => next, + }; + + let Some(frame) = next else { + eyre::bail!("streamed install ended before Complete"); + }; + let frame = frame?.freeze(); + let frame = StreamInstallFrame::decode(frame); + + match frame { + StreamInstallFrame::ArchiveBegin { + archive_name, + solid, + } => { + log::info!( + "Receiving streamed install archive {archive_name} for {game_id} (solid={solid})" + ); + } + StreamInstallFrame::Directory { relative_path } => { + let path = resolve_stream_path(&staging_dir, &relative_path)?; + tokio::fs::create_dir_all(path).await?; + } + StreamInstallFrame::FileBegin { + relative_path, + size, + crc32, + } => { + if current_file.is_some() { + eyre::bail!("received FileBegin for {relative_path} before previous FileEnd"); + } + let path = resolve_stream_path(&staging_dir, &relative_path)?; + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = File::create(&path).await?; + current_file = Some(IncomingFile::new(relative_path, path, size, crc32, file)); + } + StreamInstallFrame::FileChunk { bytes } => { + let Some(file) = current_file.as_mut() else { + eyre::bail!("received FileChunk without FileBegin"); + }; + file.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) + .await?; + } + StreamInstallFrame::FileEnd { relative_path } => { + let Some(file) = current_file.take() else { + eyre::bail!("received FileEnd for {relative_path} without FileBegin"); + }; + file.finish(&relative_path).await?; + } + StreamInstallFrame::ArchiveEnd { archive_name } => { + log::info!("Finished streamed install archive {archive_name} for {game_id}"); + } + StreamInstallFrame::Complete => { + if current_file.is_some() { + eyre::bail!("streamed install completed with an open file"); + } + return Ok(()); + } + StreamInstallFrame::Error { message } => { + eyre::bail!("streamed install sender failed: {message}"); + } + } + } +} + +struct IncomingFile { + relative_path: String, + path: PathBuf, + expected_size: u64, + expected_crc32: Option, + received: u64, + hasher: Hasher, + file: File, +} + +impl IncomingFile { + fn new( + relative_path: String, + path: PathBuf, + expected_size: u64, + expected_crc32: Option, + file: File, + ) -> Self { + Self { + relative_path, + path, + expected_size, + expected_crc32, + received: 0, + hasher: Hasher::new(), + file, + } + } + + async fn write_chunk( + &mut self, + game_id: &str, + peer_addr: SocketAddr, + tx_notify_ui: &UnboundedSender, + bytes: Bytes, + ) -> eyre::Result<()> { + let offset = self.received; + let length = u64::try_from(bytes.len())?; + if offset.saturating_add(length) > self.expected_size { + eyre::bail!( + "streamed file {} exceeded expected size {}", + self.relative_path, + self.expected_size + ); + } + self.file.write_all(&bytes).await?; + self.hasher.update(&bytes); + self.received = self.received.saturating_add(length); + + let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished { + id: game_id.to_string(), + peer_addr, + relative_path: format!("{game_id}/local/{}", self.relative_path), + offset, + length, + }); + Ok(()) + } + + async fn finish(mut self, relative_path: &str) -> eyre::Result<()> { + if self.relative_path != relative_path { + eyre::bail!( + "streamed file end mismatch: began {}, ended {relative_path}", + self.relative_path + ); + } + self.file.flush().await?; + + if self.received != self.expected_size { + eyre::bail!( + "streamed file {} size mismatch: got {}, expected {}", + self.relative_path, + self.received, + self.expected_size + ); + } + + if let Some(expected) = self.expected_crc32 { + let actual = self.hasher.finalize(); + if actual != expected { + eyre::bail!( + "streamed file {} CRC32 mismatch: got {actual:08X}, expected {expected:08X}", + self.relative_path + ); + } + } + + log::debug!( + "Received streamed file {} -> {}", + self.relative_path, + self.path.display() + ); + Ok(()) + } +} + +fn resolve_stream_path(staging_dir: &Path, relative_path: &str) -> eyre::Result { + validate_game_file_path(staging_dir, relative_path) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_support::TempDir; + + #[test] + fn stream_paths_stay_inside_staging_dir() { + let temp = TempDir::new("lanspread-stream-install-path"); + let staging = temp.path().join("staging"); + std::fs::create_dir_all(&staging).expect("staging should be created"); + let staging = std::fs::canonicalize(staging).expect("staging should canonicalize"); + + assert!(resolve_stream_path(&staging, "bin/game.exe").is_ok()); + assert!(resolve_stream_path(&staging, "../outside").is_err()); + assert!(resolve_stream_path(&staging, "/absolute").is_err()); + assert!(resolve_stream_path(&staging, "C:/windows").is_err()); + } +} diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index ea8f2f7..27609c5 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use lanspread_db::db::{Game, GameFileDescription}; use serde::{Deserialize, Serialize}; -pub const PROTOCOL_VERSION: u32 = 4; +pub const PROTOCOL_VERSION: u32 = 5; pub use lanspread_db::db::Availability; @@ -67,6 +67,9 @@ pub enum Request { offset: u64, length: u64, }, + StreamInstall { + game_id: String, + }, Hello(Hello), LibraryDelta { peer_id: String, @@ -94,6 +97,35 @@ pub enum Response { InternalPeerError(String), } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum StreamInstallFrame { + ArchiveBegin { + archive_name: String, + solid: bool, + }, + Directory { + relative_path: String, + }, + FileBegin { + relative_path: String, + size: u64, + crc32: Option, + }, + FileChunk { + bytes: Bytes, + }, + FileEnd { + relative_path: String, + }, + ArchiveEnd { + archive_name: String, + }, + Complete, + Error { + message: String, + }, +} + // Add Message trait pub trait Message { fn decode(bytes: Bytes) -> Self; @@ -145,3 +177,29 @@ impl Message for Response { } } } + +impl Message for StreamInstallFrame { + fn decode(bytes: Bytes) -> Self { + match serde_json::from_slice(&bytes) { + Ok(t) => t, + Err(e) => { + tracing::error!(?e, "StreamInstallFrame decoding error"); + StreamInstallFrame::Error { + message: format!("stream install frame decoding error: {e}"), + } + } + } + } + + fn encode(&self) -> Bytes { + match serde_json::to_vec(self) { + Ok(s) => Bytes::from(s), + Err(e) => { + tracing::error!(?e, "StreamInstallFrame encoding error"); + Bytes::from(format!( + r#"{{"Error": {{"message": "encoding error: {e}"}}}}"# + )) + } + } + } +} diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index 2a9fa11..12c3a51 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -1876,6 +1876,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { PeerStartOptions { state_dir: Some(state_dir), active_outbound_transfers: Some(state.active_outbound_transfers.clone()), + stream_install_provider: None, }, ) { Ok(handle) => {