From 40697a73e5bed68bb067134ad87abe467b90f668 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sun, 7 Jun 2026 21:39:02 +0200 Subject: [PATCH] feat(tauri): add low-disk streamed install action NEXT_STEPS item 1 called out that streamed install was still CLI-only because the Tauri app started the peer with no stream provider. Users can now choose an explicit "Low disk install" action from the game detail modal for remote-only games instead of taking the default archive-preserving download path. The GUI command queues a normal peer detail fetch first so the peer database has the file metadata needed for source validation. A small pending handoff in Tauri routes the resulting GotGameFiles event into StreamInstallGame instead of DownloadGameFiles, and clears that pending state on no-peer or download failure events. This keeps the existing download continuation untouched for the default action. The external unrar stream provider moved from the CLI harness into lanspread-peer so CLI and Tauri use the same implementation. Tauri resolves the bundled unrar sidecar path and injects that provider at peer startup; falling back to the noop provider keeps peer startup alive if the sidecar cannot be resolved, while the streamed install operation still fails safely. Test Plan: - just fmt - just test - just frontend-test - just clippy - just build - git diff --check Refs: NEXT_STEPS.md item 1 --- Cargo.lock | 3 - NEXT_STEPS.md | 10 +- crates/lanspread-peer-cli/Cargo.toml | 3 - crates/lanspread-peer-cli/src/lib.rs | 416 +----------------- crates/lanspread-peer-cli/src/main.rs | 2 +- crates/lanspread-peer/src/lib.rs | 7 +- crates/lanspread-peer/src/stream_install.rs | 411 ++++++++++++++++- .../src-tauri/src/lib.rs | 128 +++++- .../src/components/modals/GameDetailModal.tsx | 16 +- .../src/hooks/useGameActions.ts | 22 +- .../src/lib/gameState.ts | 6 + .../src/windows/MainWindow.tsx | 1 + .../tests/gameState.test.ts | 34 ++ 13 files changed, 623 insertions(+), 436 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 32f6751..ea5a41c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2060,16 +2060,13 @@ dependencies = [ name = "lanspread-peer-cli" version = "0.1.0" dependencies = [ - "bytes", "eyre", "lanspread-compat", "lanspread-db", "lanspread-peer", - "lanspread-proto", "serde", "serde_json", "tokio", - "tokio-util", ] [[package]] diff --git a/NEXT_STEPS.md b/NEXT_STEPS.md index 1806a1d..0996a17 100644 --- a/NEXT_STEPS.md +++ b/NEXT_STEPS.md @@ -5,11 +5,13 @@ archive-derived install bytes into `local/` without making the receiver a source?” Yes. Next I’d harden the pieces that decide whether this is product-ready. -1. **Move from CLI-only to real app integration** +1. **Done — Move from CLI-only to real app integration** - Add a GUI command/control path for “stream install / low disk mode”, - probably behind an explicit option. The Tauri crate currently opts out with - `stream_install_provider: None`, so the GUI cannot use it yet. + The GUI now has an explicit “Low disk install” action in the game detail + modal for remote-only games. The Tauri backend queues that path through + `stream_install_game`, injects the shared external `unrar` stream provider, + and hands fetched file details to `StreamInstallGame` instead of the normal + download command. 2. **Replace per-file `unrar p` with a final archive provider** diff --git a/crates/lanspread-peer-cli/Cargo.toml b/crates/lanspread-peer-cli/Cargo.toml index 5a96902..7c18e7a 100644 --- a/crates/lanspread-peer-cli/Cargo.toml +++ b/crates/lanspread-peer-cli/Cargo.toml @@ -14,14 +14,11 @@ path = "src/main.rs" lanspread-compat = { path = "../lanspread-compat" } lanspread-db = { path = "../lanspread-db" } lanspread-peer = { path = "../lanspread-peer" } -lanspread-proto = { path = "../lanspread-proto" } -bytes = { workspace = true } eyre = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -tokio-util = { workspace = true } [lints.clippy] needless_pass_by_value = "allow" diff --git a/crates/lanspread-peer-cli/src/lib.rs b/crates/lanspread-peer-cli/src/lib.rs index 38fab43..9eba46b 100644 --- a/crates/lanspread-peer-cli/src/lib.rs +++ b/crates/lanspread-peer-cli/src/lib.rs @@ -5,24 +5,15 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, - process::Stdio, time::Duration, }; -use bytes::Bytes; use eyre::{Context, OptionExt}; -use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, Unpacker}; -use lanspread_proto::StreamInstallFrame; +use lanspread_peer::{UnpackFuture, Unpacker}; use serde::Serialize; use serde_json::{Value, json}; -use tokio::{ - io::{AsyncRead, AsyncReadExt}, - sync::mpsc, -}; -use tokio_util::sync::CancellationToken; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; -const STREAM_CHUNK_SIZE: usize = 256 * 1024; #[derive(Debug, Clone, PartialEq, Eq)] pub struct CommandEnvelope { @@ -270,356 +261,6 @@ impl Unpacker for ExternalUnrarUnpacker { } } -pub struct ExternalUnrarStreamProvider { - program: PathBuf, -} - -impl ExternalUnrarStreamProvider { - #[must_use] - pub fn new(program: PathBuf) -> Self { - Self { program } - } -} - -impl StreamInstallProvider for ExternalUnrarStreamProvider { - fn stream_archive<'a>( - &'a self, - archive: &'a Path, - frames: mpsc::Sender, - cancel_token: CancellationToken, - ) -> StreamInstallFuture<'a> { - Box::pin(async move { - let listing = unrar_listing(&self.program, archive).await?; - let archive_name = archive - .file_name() - .and_then(|name| name.to_str()) - .unwrap_or("archive.eti") - .to_string(); - - send_stream_frame( - &frames, - StreamInstallFrame::ArchiveBegin { - archive_name: archive_name.clone(), - solid: listing.solid, - unpacked_size: listing.unpacked_size(), - }, - ) - .await?; - - stream_unrar_entries( - &self.program, - archive, - &listing.entries, - &frames, - cancel_token.clone(), - ) - .await?; - - send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await - }) - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct RarListing { - solid: bool, - entries: Vec, -} - -impl RarListing { - fn unpacked_size(&self) -> u64 { - self.entries - .iter() - .filter(|entry| entry.kind == RarEntryKind::File) - .map(|entry| entry.size) - .sum() - } -} - -#[derive(Debug, Clone, PartialEq, Eq)] -struct RarEntry { - relative_path: String, - kind: RarEntryKind, - size: u64, - crc32: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum RarEntryKind { - File, - Directory, -} - -#[derive(Default)] -struct RarEntryDraft { - relative_path: Option, - kind: Option, - size: Option, - crc32: Option, -} - -async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result { - let output = tokio::process::Command::new(program) - .arg("lt") - .arg("-cfg-") - .arg(archive) - .output() - .await?; - if !output.status.success() { - eyre::bail!( - "unrar lt failed for {} with status {}: {}", - archive.display(), - output.status, - String::from_utf8_lossy(&output.stderr) - ); - } - - parse_unrar_listing(&String::from_utf8_lossy(&output.stdout)) -} - -fn parse_unrar_listing(output: &str) -> eyre::Result { - let mut solid = false; - let mut entries = Vec::new(); - let mut current = RarEntryDraft::default(); - - for line in output.lines() { - let trimmed = line.trim(); - if let Some(details) = trimmed.strip_prefix("Details:") { - solid = details.to_ascii_lowercase().contains("solid"); - continue; - } - - if let Some(name) = trimmed.strip_prefix("Name:") { - push_rar_entry(&mut entries, std::mem::take(&mut current))?; - current.relative_path = Some(name.trim().to_string()); - continue; - } - - if let Some(kind) = trimmed.strip_prefix("Type:") { - current.kind = match kind.trim() { - "File" => Some(RarEntryKind::File), - "Directory" => Some(RarEntryKind::Directory), - _ => None, - }; - continue; - } - - if let Some(size) = trimmed.strip_prefix("Size:") { - current.size = Some(size.trim().parse()?); - continue; - } - - if let Some(crc) = trimmed.strip_prefix("CRC32:") { - current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?); - } - } - - push_rar_entry(&mut entries, current)?; - Ok(RarListing { solid, entries }) -} - -fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Result<()> { - let Some(relative_path) = draft.relative_path else { - return Ok(()); - }; - - let Some(kind) = draft.kind else { - return Ok(()); - }; - - let (size, crc32) = match kind { - RarEntryKind::File => { - let size = draft - .size - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; - let crc32 = draft - .crc32 - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; - (size, Some(crc32)) - } - RarEntryKind::Directory => (0, None), - }; - - entries.push(RarEntry { - relative_path, - kind, - size, - crc32, - }); - Ok(()) -} - -async fn stream_unrar_entries( - program: &Path, - archive: &Path, - entries: &[RarEntry], - frames: &mpsc::Sender, - cancel_token: CancellationToken, -) -> eyre::Result<()> { - let mut child = tokio::process::Command::new(program) - .arg("p") - .arg("-inul") - .arg("-cfg-") - .arg(archive) - .stdout(Stdio::piped()) - .stderr(Stdio::null()) - .spawn()?; - - let result = async { - let mut stdout = child - .stdout - .take() - .ok_or_eyre("unrar stdout was not captured")?; - let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; - - for entry in entries { - if cancel_token.is_cancelled() { - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - - match entry.kind { - RarEntryKind::Directory => { - send_stream_frame( - frames, - StreamInstallFrame::Directory { - relative_path: entry.relative_path.clone(), - }, - ) - .await?; - } - RarEntryKind::File => { - let Some(crc32) = entry.crc32 else { - eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); - }; - send_stream_frame( - frames, - StreamInstallFrame::FileBegin { - relative_path: entry.relative_path.clone(), - size: entry.size, - crc32, - }, - ) - .await?; - stream_unrar_file_from_stdout( - &mut stdout, - archive, - entry, - frames, - &mut buffer, - &cancel_token, - ) - .await?; - send_stream_frame( - frames, - StreamInstallFrame::FileEnd { - relative_path: entry.relative_path.clone(), - }, - ) - .await?; - } - } - } - - let extra = - read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?; - if extra != 0 { - eyre::bail!( - "unrar produced bytes after listed entries for {}", - archive.display() - ); - } - - let status = wait_unrar_child(&mut child, &cancel_token, archive).await?; - if !status.success() { - eyre::bail!( - "unrar p failed for {} with status {status}", - archive.display() - ); - } - - Ok(()) - } - .await; - - if result.is_err() { - let _ = child.kill().await; - } - - result -} - -async fn stream_unrar_file_from_stdout( - stdout: &mut (impl AsyncRead + Unpin), - archive: &Path, - entry: &RarEntry, - frames: &mpsc::Sender, - buffer: &mut [u8], - cancel_token: &CancellationToken, -) -> eyre::Result<()> { - let mut remaining = entry.size; - while remaining > 0 { - let read_len = usize::try_from(remaining.min(buffer.len() as u64))?; - let read = - read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?; - if read == 0 { - eyre::bail!( - "unrar ended while streaming {} from {}; {remaining} bytes missing", - entry.relative_path, - archive.display() - ); - } - - send_stream_frame( - frames, - StreamInstallFrame::FileChunk { - bytes: Bytes::copy_from_slice(&buffer[..read]), - }, - ) - .await?; - remaining = remaining.saturating_sub(u64::try_from(read)?); - } - - Ok(()) -} - -async fn read_unrar_stdout( - stdout: &mut (impl AsyncRead + Unpin), - buffer: &mut [u8], - cancel_token: &CancellationToken, - archive: &Path, -) -> eyre::Result { - tokio::select! { - () = cancel_token.cancelled() => { - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - read = stdout.read(buffer) => Ok(read?), - } -} - -async fn wait_unrar_child( - child: &mut tokio::process::Child, - cancel_token: &CancellationToken, - archive: &Path, -) -> eyre::Result { - tokio::select! { - () = cancel_token.cancelled() => { - let _ = child.kill().await; - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - status = child.wait() => Ok(status?), - } -} - -async fn send_stream_frame( - frames: &mpsc::Sender, - frame: StreamInstallFrame, -) -> eyre::Result<()> { - frames - .send(frame) - .await - .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) -} - pub fn result_line(id: &Option, command: &str, data: Value) -> eyre::Result { output_line(json!({ "type": "result", @@ -723,61 +364,6 @@ mod tests { ); } - #[test] - fn parses_unrar_technical_listing() { - let listing = parse_unrar_listing( - r#" -Archive: game.eti -Details: RAR 5, solid - - Name: bin/payload.bin - Type: File - Size: 123 - CRC32: 38B488A7 - - Name: bin - Type: Directory -"#, - ) - .expect("listing should parse"); - - assert!(listing.solid); - assert_eq!( - listing.entries, - vec![ - RarEntry { - relative_path: "bin/payload.bin".to_string(), - kind: RarEntryKind::File, - size: 123, - crc32: Some(0x38B4_88A7), - }, - RarEntry { - relative_path: "bin".to_string(), - kind: RarEntryKind::Directory, - size: 0, - crc32: None, - }, - ] - ); - } - - #[test] - fn rejects_unrar_file_entries_without_crc32() { - let err = parse_unrar_listing( - r#" -Archive: game.eti -Details: RAR 5 - - Name: bin/payload.bin - Type: File - Size: 123 -"#, - ) - .expect_err("file entries without CRC32 should be rejected"); - - assert!(err.to_string().contains("has no CRC32")); - } - #[tokio::test] async fn fixture_unpacker_creates_install_payload() { let temp = TempDir::new("lanspread-peer-cli-fixture"); diff --git a/crates/lanspread-peer-cli/src/main.rs b/crates/lanspread-peer-cli/src/main.rs index 995512d..75cf4a7 100644 --- a/crates/lanspread-peer-cli/src/main.rs +++ b/crates/lanspread-peer-cli/src/main.rs @@ -16,6 +16,7 @@ use lanspread_db::db::{Game, GameCatalog, GameFileDescription}; use lanspread_peer::{ ActiveOperation, ActiveOperationKind, + ExternalUnrarStreamProvider, InstallOperation, NoopStreamInstallProvider, PeerCommand, @@ -33,7 +34,6 @@ use lanspread_peer_cli::{ CliCommand, CommandEnvelope, DEFAULT_FIXTURE_VERSION, - ExternalUnrarStreamProvider, ExternalUnrarUnpacker, FixtureSeed, FixtureUnpacker, diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 540d0e2..f2053bf 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -83,7 +83,12 @@ pub use crate::{ launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once}, startup::PeerRuntimeHandle, state_paths::{launch_settings_applied_path, setup_done_path}, - stream_install::{NoopStreamInstallProvider, StreamInstallFuture, StreamInstallProvider}, + stream_install::{ + ExternalUnrarStreamProvider, + NoopStreamInstallProvider, + StreamInstallFuture, + StreamInstallProvider, + }, }; // ============================================================================= diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs index f6c5df5..dde87fe 100644 --- a/crates/lanspread-peer/src/stream_install.rs +++ b/crates/lanspread-peer/src/stream_install.rs @@ -3,6 +3,7 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, pin::Pin, + process::Stdio, sync::Arc, time::{Duration, Instant}, }; @@ -14,7 +15,8 @@ use lanspread_proto::{Message, Request, StreamInstallFrame}; use s2n_quic::stream::SendStream; use tokio::{ fs::File, - io::AsyncWriteExt, + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + process::Command, sync::{mpsc, mpsc::UnboundedSender}, time::{self, MissedTickBehavior}, }; @@ -33,6 +35,7 @@ use crate::{ const FRAME_CHANNEL_DEPTH: usize = 16; const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500); +const STREAM_CHUNK_SIZE: usize = 256 * 1024; pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; @@ -64,6 +67,357 @@ impl StreamInstallProvider for NoopStreamInstallProvider { } } +#[derive(Debug)] +pub struct ExternalUnrarStreamProvider { + program: PathBuf, +} + +impl ExternalUnrarStreamProvider { + #[must_use] + pub fn new(program: PathBuf) -> Self { + Self { program } + } +} + +impl StreamInstallProvider for ExternalUnrarStreamProvider { + fn stream_archive<'a>( + &'a self, + archive: &'a Path, + frames: mpsc::Sender, + cancel_token: CancellationToken, + ) -> StreamInstallFuture<'a> { + Box::pin(async move { + let listing = unrar_listing(&self.program, archive).await?; + let archive_name = archive + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("archive.eti") + .to_string(); + + send_stream_frame( + &frames, + StreamInstallFrame::ArchiveBegin { + archive_name: archive_name.clone(), + solid: listing.solid, + unpacked_size: listing.unpacked_size(), + }, + ) + .await?; + + stream_unrar_entries( + &self.program, + archive, + &listing.entries, + &frames, + cancel_token.clone(), + ) + .await?; + + send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RarListing { + solid: bool, + entries: Vec, +} + +impl RarListing { + fn unpacked_size(&self) -> u64 { + self.entries + .iter() + .filter(|entry| entry.kind == RarEntryKind::File) + .map(|entry| entry.size) + .sum() + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct RarEntry { + relative_path: String, + kind: RarEntryKind, + size: u64, + crc32: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RarEntryKind { + File, + Directory, +} + +#[derive(Default)] +struct RarEntryDraft { + relative_path: Option, + kind: Option, + size: Option, + crc32: Option, +} + +async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result { + let output = Command::new(program) + .arg("lt") + .arg("-cfg-") + .arg(archive) + .output() + .await?; + if !output.status.success() { + eyre::bail!( + "unrar lt failed for {} with status {}: {}", + archive.display(), + output.status, + String::from_utf8_lossy(&output.stderr) + ); + } + + parse_unrar_listing(&String::from_utf8_lossy(&output.stdout)) +} + +fn parse_unrar_listing(output: &str) -> eyre::Result { + let mut solid = false; + let mut entries = Vec::new(); + let mut current = RarEntryDraft::default(); + + for line in output.lines() { + let trimmed = line.trim(); + if let Some(details) = trimmed.strip_prefix("Details:") { + solid = details.to_ascii_lowercase().contains("solid"); + continue; + } + + if let Some(name) = trimmed.strip_prefix("Name:") { + push_rar_entry(&mut entries, std::mem::take(&mut current))?; + current.relative_path = Some(name.trim().to_string()); + continue; + } + + if let Some(kind) = trimmed.strip_prefix("Type:") { + current.kind = match kind.trim() { + "File" => Some(RarEntryKind::File), + "Directory" => Some(RarEntryKind::Directory), + _ => None, + }; + continue; + } + + if let Some(size) = trimmed.strip_prefix("Size:") { + current.size = Some(size.trim().parse()?); + continue; + } + + if let Some(crc) = trimmed.strip_prefix("CRC32:") { + current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?); + } + } + + push_rar_entry(&mut entries, current)?; + Ok(RarListing { solid, entries }) +} + +fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Result<()> { + let Some(relative_path) = draft.relative_path else { + return Ok(()); + }; + + let Some(kind) = draft.kind else { + return Ok(()); + }; + + let (size, crc32) = match kind { + RarEntryKind::File => { + let size = draft + .size + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; + let crc32 = draft + .crc32 + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; + (size, Some(crc32)) + } + RarEntryKind::Directory => (0, None), + }; + + entries.push(RarEntry { + relative_path, + kind, + size, + crc32, + }); + Ok(()) +} + +async fn stream_unrar_entries( + program: &Path, + archive: &Path, + entries: &[RarEntry], + frames: &mpsc::Sender, + cancel_token: CancellationToken, +) -> eyre::Result<()> { + let mut child = Command::new(program) + .arg("p") + .arg("-inul") + .arg("-cfg-") + .arg(archive) + .stdout(Stdio::piped()) + .stderr(Stdio::null()) + .spawn()?; + + let result = async { + let mut stdout = child + .stdout + .take() + .ok_or_else(|| eyre::eyre!("unrar stdout was not captured"))?; + let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; + + for entry in entries { + if cancel_token.is_cancelled() { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + + match entry.kind { + RarEntryKind::Directory => { + send_stream_frame( + frames, + StreamInstallFrame::Directory { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + RarEntryKind::File => { + let Some(crc32) = entry.crc32 else { + eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); + }; + send_stream_frame( + frames, + StreamInstallFrame::FileBegin { + relative_path: entry.relative_path.clone(), + size: entry.size, + crc32, + }, + ) + .await?; + stream_unrar_file_from_stdout( + &mut stdout, + archive, + entry, + frames, + &mut buffer, + &cancel_token, + ) + .await?; + send_stream_frame( + frames, + StreamInstallFrame::FileEnd { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + } + } + + let extra = + read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?; + if extra != 0 { + eyre::bail!( + "unrar produced bytes after listed entries for {}", + archive.display() + ); + } + + let status = wait_unrar_child(&mut child, &cancel_token, archive).await?; + if !status.success() { + eyre::bail!( + "unrar p failed for {} with status {status}", + archive.display() + ); + } + + Ok(()) + } + .await; + + if result.is_err() { + let _ = child.kill().await; + } + + result +} + +async fn stream_unrar_file_from_stdout( + stdout: &mut (impl AsyncRead + Unpin), + archive: &Path, + entry: &RarEntry, + frames: &mpsc::Sender, + buffer: &mut [u8], + cancel_token: &CancellationToken, +) -> eyre::Result<()> { + let mut remaining = entry.size; + while remaining > 0 { + let read_len = usize::try_from(remaining.min(u64::try_from(buffer.len())?))?; + let read = + read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?; + if read == 0 { + eyre::bail!( + "unrar ended while streaming {} from {}; {remaining} bytes missing", + entry.relative_path, + archive.display() + ); + } + + send_stream_frame( + frames, + StreamInstallFrame::FileChunk { + bytes: Bytes::copy_from_slice(&buffer[..read]), + }, + ) + .await?; + remaining = remaining.saturating_sub(u64::try_from(read)?); + } + + Ok(()) +} + +async fn read_unrar_stdout( + stdout: &mut (impl AsyncRead + Unpin), + buffer: &mut [u8], + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + read = stdout.read(buffer) => Ok(read?), + } +} + +async fn wait_unrar_child( + child: &mut tokio::process::Child, + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + let _ = child.kill().await; + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + status = child.wait() => Ok(status?), + } +} + +async fn send_stream_frame( + frames: &mpsc::Sender, + frame: StreamInstallFrame, +) -> eyre::Result<()> { + frames + .send(frame) + .await + .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) +} + pub(crate) async fn send_stream_install_error( tx: SendStream, message: impl Into, @@ -444,4 +798,59 @@ mod tests { assert!(resolve_stream_path(&staging, "/absolute").is_err()); assert!(resolve_stream_path(&staging, "C:/windows").is_err()); } + + #[test] + fn parses_unrar_technical_listing() { + let listing = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5, solid + + Name: bin/payload.bin + Type: File + Size: 123 + CRC32: 38B488A7 + + Name: bin + Type: Directory +"#, + ) + .expect("listing should parse"); + + assert!(listing.solid); + assert_eq!( + listing.entries, + vec![ + RarEntry { + relative_path: "bin/payload.bin".to_string(), + kind: RarEntryKind::File, + size: 123, + crc32: Some(0x38B4_88A7), + }, + RarEntry { + relative_path: "bin".to_string(), + kind: RarEntryKind::Directory, + size: 0, + crc32: None, + }, + ] + ); + } + + #[test] + fn rejects_unrar_file_entries_without_crc32() { + let err = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/payload.bin + Type: File + Size: 123 +"#, + ) + .expect_err("file entries without CRC32 should be rejected"); + + assert!(err.to_string().contains("has no CRC32")); + } } diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index 12c3a51..489ccc5 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -14,11 +14,14 @@ use lanspread_db::db::{Availability, Game, GameCatalog, GameDB, GameFileDescript use lanspread_peer::{ ActiveOperation, ActiveOperationKind, + ExternalUnrarStreamProvider, + NoopStreamInstallProvider, PeerCommand, PeerEvent, PeerGameDB, PeerRuntimeHandle, PeerStartOptions, + StreamInstallProvider, UnpackFuture, Unpacker, migrate_legacy_state, @@ -82,6 +85,7 @@ struct LanSpreadState { peer_runtime: Arc>>, games: Arc>, active_operations: Arc>>, + pending_stream_installs: Arc>>, games_folder: Arc>, peer_game_db: Arc>, catalog: Arc>, @@ -255,6 +259,16 @@ async fn install_game( log::warn!("Game already has an active operation: {id}"); return Ok(false); } + if state + .inner() + .pending_stream_installs + .read() + .await + .contains(&id) + { + log::warn!("Game already has a pending streamed install: {id}"); + return Ok(false); + } let peer_ctrl_arc = state.inner().peer_ctrl.clone(); let peer_ctrl = peer_ctrl_arc.read().await.clone(); @@ -294,6 +308,77 @@ async fn install_game( Ok(handled) } +#[tauri::command] +async fn stream_install_game( + id: String, + state: tauri::State<'_, LanSpreadState>, +) -> tauri::Result { + if state + .inner() + .active_operations + .read() + .await + .contains_key(&id) + { + log::warn!("Game already has an active operation: {id}"); + return Ok(false); + } + if state + .inner() + .pending_stream_installs + .read() + .await + .contains(&id) + { + log::warn!("Game already has a pending streamed install: {id}"); + return Ok(false); + } + + let Some((downloaded, installed, peer_count)) = state + .inner() + .games + .read() + .await + .get_game_by_id(&id) + .map(|game| (game.downloaded, game.installed, game.peer_count)) + else { + log::warn!("Ignoring streamed install request for unknown game: {id}"); + return Ok(false); + }; + if downloaded || installed || peer_count == 0 { + log::warn!( + "Ignoring streamed install request for {id}: downloaded={downloaded}, \ + installed={installed}, peer_count={peer_count}" + ); + return Ok(false); + } + + let peer_ctrl_arc = state.inner().peer_ctrl.clone(); + let peer_ctrl = peer_ctrl_arc.read().await.clone(); + let Some(peer_ctrl) = peer_ctrl else { + log::warn!("Peer system not initialized yet"); + return Ok(false); + }; + + { + let mut pending = state.inner().pending_stream_installs.write().await; + pending.insert(id.clone()); + } + + if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id.clone())) { + log::error!("Failed to send PeerCommand::GetGame for streamed install: {e:?}"); + state + .inner() + .pending_stream_installs + .write() + .await + .remove(&id); + return Ok(false); + } + + Ok(true) +} + #[tauri::command] async fn update_game( id: String, @@ -1867,6 +1952,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { let unpacker = Arc::new(SidecarUnpacker { app_handle: app_handle.clone(), }); + let stream_install_provider = stream_install_provider_for_app(app_handle); match start_peer_with_options( games_folder.to_path_buf(), tx_peer_event, @@ -1876,7 +1962,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { PeerStartOptions { state_dir: Some(state_dir), active_outbound_transfers: Some(state.active_outbound_transfers.clone()), - stream_install_provider: None, + stream_install_provider: Some(stream_install_provider), }, ) { Ok(handle) => { @@ -1894,6 +1980,22 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { } } +fn stream_install_provider_for_app(app_handle: &AppHandle) -> Arc { + match resolve_unrar_sidecar_program(app_handle) { + Ok(program) => Arc::new(ExternalUnrarStreamProvider::new(program)), + Err(err) => { + log::error!("Failed to resolve streamed-install unrar sidecar: {err}"); + Arc::new(NoopStreamInstallProvider) + } + } +} + +fn resolve_unrar_sidecar_program(app_handle: &AppHandle) -> eyre::Result { + let sidecar = app_handle.shell().sidecar("unrar")?; + let command: std::process::Command = sidecar.into(); + Ok(PathBuf::from(command.get_program())) +} + fn emit_game_id_event(app_handle: &AppHandle, event: &str, id: &str, label: &str) { if let Err(e) = app_handle.emit(event, Some(id.to_owned())) { log::error!("{label}: Failed to emit {event} event: {e}"); @@ -1990,6 +2092,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::NoPeersHaveGame { id } => { log::warn!("PeerEvent::NoPeersHaveGame received for {id}"); + clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-no-peers", @@ -2028,6 +2131,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::DownloadGameFilesFailed { id } => { log::warn!("PeerEvent::DownloadGameFilesFailed received"); + clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-download-failed", @@ -2037,6 +2141,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::DownloadGameFilesAllPeersGone { id } => { log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}"); + clear_pending_stream_install(app_handle, &id).await; emit_game_id_event( app_handle, "game-download-peers-gone", @@ -2175,17 +2280,27 @@ async fn handle_got_game_files( ); let state = app_handle.state::(); + let stream_install = state.pending_stream_installs.write().await.remove(&id); let peer_ctrl = state.peer_ctrl.read().await.clone(); if let Some(peer_ctrl) = peer_ctrl - && let Err(e) = peer_ctrl.send(PeerCommand::DownloadGameFiles { - id, - file_descriptions, - }) + && let Err(e) = if stream_install { + peer_ctrl.send(PeerCommand::StreamInstallGame { id }) + } else { + peer_ctrl.send(PeerCommand::DownloadGameFiles { + id, + file_descriptions, + }) + } { - log::error!("Failed to send PeerCommand::DownloadGameFiles: {e}"); + log::error!("Failed to continue queued game transfer: {e}"); } } +async fn clear_pending_stream_install(app_handle: &AppHandle, id: &str) { + let state = app_handle.state::(); + state.pending_stream_installs.write().await.remove(id); +} + fn handle_download_finished(app_handle: &AppHandle, id: String) { log::info!("PeerEvent::DownloadGameFilesFinished received"); emit_game_id_event( @@ -2679,6 +2794,7 @@ pub fn run() { .invoke_handler(tauri::generate_handler![ request_games, install_game, + stream_install_game, run_game, start_server, game_directory_exists, diff --git a/crates/lanspread-tauri-deno-ts/src/components/modals/GameDetailModal.tsx b/crates/lanspread-tauri-deno-ts/src/components/modals/GameDetailModal.tsx index 96e5e87..b5d9693 100644 --- a/crates/lanspread-tauri-deno-ts/src/components/modals/GameDetailModal.tsx +++ b/crates/lanspread-tauri-deno-ts/src/components/modals/GameDetailModal.tsx @@ -5,7 +5,7 @@ import { StateChip } from '../StateChip'; import { ActionButton } from '../ActionButton'; import { Game, InstallStatus } from '../../lib/types'; -import { deriveState, hasNewerLocalVersion, isInProgress } from '../../lib/gameState'; +import { canStreamInstall, deriveState, hasNewerLocalVersion, isInProgress } from '../../lib/gameState'; import { formatBytes, formatEtiVersion, formatPlayers } from '../../lib/format'; interface Props { @@ -13,6 +13,7 @@ interface Props { thumbnailUrl: string | null; onClose: () => void; onPrimary: (game: Game) => void; + onStreamInstall: (game: Game) => void; onUninstall: (game: Game) => void; onRemoveDownload: (game: Game) => void; onCancelDownload: (game: Game) => void; @@ -43,6 +44,7 @@ export const GameDetailModal = ({ thumbnailUrl, onClose, onPrimary, + onStreamInstall, onUninstall, onRemoveDownload, onCancelDownload, @@ -55,6 +57,7 @@ export const GameDetailModal = ({ const canRemoveDownload = game.downloaded && !game.installed && !isInProgress(game.install_status); + const showStreamInstall = canStreamInstall(game); const canViewFiles = game.downloaded || game.installed || game.install_status === InstallStatus.Downloading @@ -133,6 +136,17 @@ export const GameDetailModal = ({ onClick={() => onPrimary(game)} onCancelDownload={onCancelDownload} /> + {showStreamInstall && ( + + )} {game.installed && game.can_host_server === true && (