feat(tauri): add low-disk streamed install action

NEXT_STEPS item 1 called out that streamed install was still CLI-only
because the Tauri app started the peer with no stream provider. Users can now
choose an explicit "Low disk install" action from the game detail modal for
remote-only games instead of taking the default archive-preserving download
path.

The GUI command queues a normal peer detail fetch first so the peer database
has the file metadata needed for source validation. A small pending handoff in
Tauri routes the resulting GotGameFiles event into StreamInstallGame instead
of DownloadGameFiles, and clears that pending state on no-peer or download
failure events. This keeps the existing download continuation untouched for
the default action.

The external unrar stream provider moved from the CLI harness into
lanspread-peer so CLI and Tauri use the same implementation. Tauri resolves
the bundled unrar sidecar path and injects that provider at peer startup;
falling back to the noop provider keeps peer startup alive if the sidecar
cannot be resolved, while the streamed install operation still fails safely.

Test Plan:
- just fmt
- just test
- just frontend-test
- just clippy
- just build
- git diff --check

Refs: NEXT_STEPS.md item 1
This commit is contained in:
2026-06-07 21:39:02 +02:00
parent 389511f620
commit 40697a73e5
13 changed files with 623 additions and 436 deletions
Generated
-3
View File
@@ -2060,16 +2060,13 @@ dependencies = [
name = "lanspread-peer-cli" name = "lanspread-peer-cli"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytes",
"eyre", "eyre",
"lanspread-compat", "lanspread-compat",
"lanspread-db", "lanspread-db",
"lanspread-peer", "lanspread-peer",
"lanspread-proto",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]
+6 -4
View File
@@ -5,11 +5,13 @@ archive-derived install bytes into `local/` without making the receiver a
source?” Yes. Next Id harden the pieces that decide whether this is source?” Yes. Next Id harden the pieces that decide whether this is
product-ready. product-ready.
1. **Move from CLI-only to real app integration** 1. **Done — Move from CLI-only to real app integration**
Add a GUI command/control path for “stream install / low disk mode”, The GUI now has an explicit “Low disk install” action in the game detail
probably behind an explicit option. The Tauri crate currently opts out with modal for remote-only games. The Tauri backend queues that path through
`stream_install_provider: None`, so the GUI cannot use it yet. `stream_install_game`, injects the shared external `unrar` stream provider,
and hands fetched file details to `StreamInstallGame` instead of the normal
download command.
2. **Replace per-file `unrar p` with a final archive provider** 2. **Replace per-file `unrar p` with a final archive provider**
-3
View File
@@ -14,14 +14,11 @@ path = "src/main.rs"
lanspread-compat = { path = "../lanspread-compat" } lanspread-compat = { path = "../lanspread-compat" }
lanspread-db = { path = "../lanspread-db" } lanspread-db = { path = "../lanspread-db" }
lanspread-peer = { path = "../lanspread-peer" } lanspread-peer = { path = "../lanspread-peer" }
lanspread-proto = { path = "../lanspread-proto" }
bytes = { workspace = true }
eyre = { workspace = true } eyre = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
tokio-util = { workspace = true }
[lints.clippy] [lints.clippy]
needless_pass_by_value = "allow" needless_pass_by_value = "allow"
+1 -415
View File
@@ -5,24 +5,15 @@
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
process::Stdio,
time::Duration, time::Duration,
}; };
use bytes::Bytes;
use eyre::{Context, OptionExt}; use eyre::{Context, OptionExt};
use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, Unpacker}; use lanspread_peer::{UnpackFuture, Unpacker};
use lanspread_proto::StreamInstallFrame;
use serde::Serialize; use serde::Serialize;
use serde_json::{Value, json}; use serde_json::{Value, json};
use tokio::{
io::{AsyncRead, AsyncReadExt},
sync::mpsc,
};
use tokio_util::sync::CancellationToken;
pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101";
const STREAM_CHUNK_SIZE: usize = 256 * 1024;
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct CommandEnvelope { pub struct CommandEnvelope {
@@ -270,356 +261,6 @@ impl Unpacker for ExternalUnrarUnpacker {
} }
} }
pub struct ExternalUnrarStreamProvider {
program: PathBuf,
}
impl ExternalUnrarStreamProvider {
#[must_use]
pub fn new(program: PathBuf) -> Self {
Self { program }
}
}
impl StreamInstallProvider for ExternalUnrarStreamProvider {
fn stream_archive<'a>(
&'a self,
archive: &'a Path,
frames: mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken,
) -> StreamInstallFuture<'a> {
Box::pin(async move {
let listing = unrar_listing(&self.program, archive).await?;
let archive_name = archive
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("archive.eti")
.to_string();
send_stream_frame(
&frames,
StreamInstallFrame::ArchiveBegin {
archive_name: archive_name.clone(),
solid: listing.solid,
unpacked_size: listing.unpacked_size(),
},
)
.await?;
stream_unrar_entries(
&self.program,
archive,
&listing.entries,
&frames,
cancel_token.clone(),
)
.await?;
send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RarListing {
solid: bool,
entries: Vec<RarEntry>,
}
impl RarListing {
fn unpacked_size(&self) -> u64 {
self.entries
.iter()
.filter(|entry| entry.kind == RarEntryKind::File)
.map(|entry| entry.size)
.sum()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RarEntry {
relative_path: String,
kind: RarEntryKind,
size: u64,
crc32: Option<u32>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RarEntryKind {
File,
Directory,
}
#[derive(Default)]
struct RarEntryDraft {
relative_path: Option<String>,
kind: Option<RarEntryKind>,
size: Option<u64>,
crc32: Option<u32>,
}
async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result<RarListing> {
let output = tokio::process::Command::new(program)
.arg("lt")
.arg("-cfg-")
.arg(archive)
.output()
.await?;
if !output.status.success() {
eyre::bail!(
"unrar lt failed for {} with status {}: {}",
archive.display(),
output.status,
String::from_utf8_lossy(&output.stderr)
);
}
parse_unrar_listing(&String::from_utf8_lossy(&output.stdout))
}
fn parse_unrar_listing(output: &str) -> eyre::Result<RarListing> {
let mut solid = false;
let mut entries = Vec::new();
let mut current = RarEntryDraft::default();
for line in output.lines() {
let trimmed = line.trim();
if let Some(details) = trimmed.strip_prefix("Details:") {
solid = details.to_ascii_lowercase().contains("solid");
continue;
}
if let Some(name) = trimmed.strip_prefix("Name:") {
push_rar_entry(&mut entries, std::mem::take(&mut current))?;
current.relative_path = Some(name.trim().to_string());
continue;
}
if let Some(kind) = trimmed.strip_prefix("Type:") {
current.kind = match kind.trim() {
"File" => Some(RarEntryKind::File),
"Directory" => Some(RarEntryKind::Directory),
_ => None,
};
continue;
}
if let Some(size) = trimmed.strip_prefix("Size:") {
current.size = Some(size.trim().parse()?);
continue;
}
if let Some(crc) = trimmed.strip_prefix("CRC32:") {
current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?);
}
}
push_rar_entry(&mut entries, current)?;
Ok(RarListing { solid, entries })
}
fn push_rar_entry(entries: &mut Vec<RarEntry>, draft: RarEntryDraft) -> eyre::Result<()> {
let Some(relative_path) = draft.relative_path else {
return Ok(());
};
let Some(kind) = draft.kind else {
return Ok(());
};
let (size, crc32) = match kind {
RarEntryKind::File => {
let size = draft
.size
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?;
let crc32 = draft
.crc32
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?;
(size, Some(crc32))
}
RarEntryKind::Directory => (0, None),
};
entries.push(RarEntry {
relative_path,
kind,
size,
crc32,
});
Ok(())
}
async fn stream_unrar_entries(
program: &Path,
archive: &Path,
entries: &[RarEntry],
frames: &mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken,
) -> eyre::Result<()> {
let mut child = tokio::process::Command::new(program)
.arg("p")
.arg("-inul")
.arg("-cfg-")
.arg(archive)
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()?;
let result = async {
let mut stdout = child
.stdout
.take()
.ok_or_eyre("unrar stdout was not captured")?;
let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE];
for entry in entries {
if cancel_token.is_cancelled() {
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
match entry.kind {
RarEntryKind::Directory => {
send_stream_frame(
frames,
StreamInstallFrame::Directory {
relative_path: entry.relative_path.clone(),
},
)
.await?;
}
RarEntryKind::File => {
let Some(crc32) = entry.crc32 else {
eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path);
};
send_stream_frame(
frames,
StreamInstallFrame::FileBegin {
relative_path: entry.relative_path.clone(),
size: entry.size,
crc32,
},
)
.await?;
stream_unrar_file_from_stdout(
&mut stdout,
archive,
entry,
frames,
&mut buffer,
&cancel_token,
)
.await?;
send_stream_frame(
frames,
StreamInstallFrame::FileEnd {
relative_path: entry.relative_path.clone(),
},
)
.await?;
}
}
}
let extra =
read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?;
if extra != 0 {
eyre::bail!(
"unrar produced bytes after listed entries for {}",
archive.display()
);
}
let status = wait_unrar_child(&mut child, &cancel_token, archive).await?;
if !status.success() {
eyre::bail!(
"unrar p failed for {} with status {status}",
archive.display()
);
}
Ok(())
}
.await;
if result.is_err() {
let _ = child.kill().await;
}
result
}
async fn stream_unrar_file_from_stdout(
stdout: &mut (impl AsyncRead + Unpin),
archive: &Path,
entry: &RarEntry,
frames: &mpsc::Sender<StreamInstallFrame>,
buffer: &mut [u8],
cancel_token: &CancellationToken,
) -> eyre::Result<()> {
let mut remaining = entry.size;
while remaining > 0 {
let read_len = usize::try_from(remaining.min(buffer.len() as u64))?;
let read =
read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?;
if read == 0 {
eyre::bail!(
"unrar ended while streaming {} from {}; {remaining} bytes missing",
entry.relative_path,
archive.display()
);
}
send_stream_frame(
frames,
StreamInstallFrame::FileChunk {
bytes: Bytes::copy_from_slice(&buffer[..read]),
},
)
.await?;
remaining = remaining.saturating_sub(u64::try_from(read)?);
}
Ok(())
}
async fn read_unrar_stdout(
stdout: &mut (impl AsyncRead + Unpin),
buffer: &mut [u8],
cancel_token: &CancellationToken,
archive: &Path,
) -> eyre::Result<usize> {
tokio::select! {
() = cancel_token.cancelled() => {
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
read = stdout.read(buffer) => Ok(read?),
}
}
async fn wait_unrar_child(
child: &mut tokio::process::Child,
cancel_token: &CancellationToken,
archive: &Path,
) -> eyre::Result<std::process::ExitStatus> {
tokio::select! {
() = cancel_token.cancelled() => {
let _ = child.kill().await;
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
status = child.wait() => Ok(status?),
}
}
async fn send_stream_frame(
frames: &mpsc::Sender<StreamInstallFrame>,
frame: StreamInstallFrame,
) -> eyre::Result<()> {
frames
.send(frame)
.await
.map_err(|_| eyre::eyre!("streamed install frame receiver closed"))
}
pub fn result_line(id: &Option<Value>, command: &str, data: Value) -> eyre::Result<String> { pub fn result_line(id: &Option<Value>, command: &str, data: Value) -> eyre::Result<String> {
output_line(json!({ output_line(json!({
"type": "result", "type": "result",
@@ -723,61 +364,6 @@ mod tests {
); );
} }
#[test]
fn parses_unrar_technical_listing() {
let listing = parse_unrar_listing(
r#"
Archive: game.eti
Details: RAR 5, solid
Name: bin/payload.bin
Type: File
Size: 123
CRC32: 38B488A7
Name: bin
Type: Directory
"#,
)
.expect("listing should parse");
assert!(listing.solid);
assert_eq!(
listing.entries,
vec![
RarEntry {
relative_path: "bin/payload.bin".to_string(),
kind: RarEntryKind::File,
size: 123,
crc32: Some(0x38B4_88A7),
},
RarEntry {
relative_path: "bin".to_string(),
kind: RarEntryKind::Directory,
size: 0,
crc32: None,
},
]
);
}
#[test]
fn rejects_unrar_file_entries_without_crc32() {
let err = parse_unrar_listing(
r#"
Archive: game.eti
Details: RAR 5
Name: bin/payload.bin
Type: File
Size: 123
"#,
)
.expect_err("file entries without CRC32 should be rejected");
assert!(err.to_string().contains("has no CRC32"));
}
#[tokio::test] #[tokio::test]
async fn fixture_unpacker_creates_install_payload() { async fn fixture_unpacker_creates_install_payload() {
let temp = TempDir::new("lanspread-peer-cli-fixture"); let temp = TempDir::new("lanspread-peer-cli-fixture");
+1 -1
View File
@@ -16,6 +16,7 @@ use lanspread_db::db::{Game, GameCatalog, GameFileDescription};
use lanspread_peer::{ use lanspread_peer::{
ActiveOperation, ActiveOperation,
ActiveOperationKind, ActiveOperationKind,
ExternalUnrarStreamProvider,
InstallOperation, InstallOperation,
NoopStreamInstallProvider, NoopStreamInstallProvider,
PeerCommand, PeerCommand,
@@ -33,7 +34,6 @@ use lanspread_peer_cli::{
CliCommand, CliCommand,
CommandEnvelope, CommandEnvelope,
DEFAULT_FIXTURE_VERSION, DEFAULT_FIXTURE_VERSION,
ExternalUnrarStreamProvider,
ExternalUnrarUnpacker, ExternalUnrarUnpacker,
FixtureSeed, FixtureSeed,
FixtureUnpacker, FixtureUnpacker,
+6 -1
View File
@@ -83,7 +83,12 @@ pub use crate::{
launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once}, launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once},
startup::PeerRuntimeHandle, startup::PeerRuntimeHandle,
state_paths::{launch_settings_applied_path, setup_done_path}, state_paths::{launch_settings_applied_path, setup_done_path},
stream_install::{NoopStreamInstallProvider, StreamInstallFuture, StreamInstallProvider}, stream_install::{
ExternalUnrarStreamProvider,
NoopStreamInstallProvider,
StreamInstallFuture,
StreamInstallProvider,
},
}; };
// ============================================================================= // =============================================================================
+410 -1
View File
@@ -3,6 +3,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
process::Stdio,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@@ -14,7 +15,8 @@ use lanspread_proto::{Message, Request, StreamInstallFrame};
use s2n_quic::stream::SendStream; use s2n_quic::stream::SendStream;
use tokio::{ use tokio::{
fs::File, fs::File,
io::AsyncWriteExt, io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
process::Command,
sync::{mpsc, mpsc::UnboundedSender}, sync::{mpsc, mpsc::UnboundedSender},
time::{self, MissedTickBehavior}, time::{self, MissedTickBehavior},
}; };
@@ -33,6 +35,7 @@ use crate::{
const FRAME_CHANNEL_DEPTH: usize = 16; const FRAME_CHANNEL_DEPTH: usize = 16;
const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500); const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
const STREAM_CHUNK_SIZE: usize = 256 * 1024;
pub type StreamInstallFuture<'a> = Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + 'a>>; pub type StreamInstallFuture<'a> = Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + 'a>>;
@@ -64,6 +67,357 @@ impl StreamInstallProvider for NoopStreamInstallProvider {
} }
} }
#[derive(Debug)]
pub struct ExternalUnrarStreamProvider {
program: PathBuf,
}
impl ExternalUnrarStreamProvider {
#[must_use]
pub fn new(program: PathBuf) -> Self {
Self { program }
}
}
impl StreamInstallProvider for ExternalUnrarStreamProvider {
fn stream_archive<'a>(
&'a self,
archive: &'a Path,
frames: mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken,
) -> StreamInstallFuture<'a> {
Box::pin(async move {
let listing = unrar_listing(&self.program, archive).await?;
let archive_name = archive
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("archive.eti")
.to_string();
send_stream_frame(
&frames,
StreamInstallFrame::ArchiveBegin {
archive_name: archive_name.clone(),
solid: listing.solid,
unpacked_size: listing.unpacked_size(),
},
)
.await?;
stream_unrar_entries(
&self.program,
archive,
&listing.entries,
&frames,
cancel_token.clone(),
)
.await?;
send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RarListing {
solid: bool,
entries: Vec<RarEntry>,
}
impl RarListing {
fn unpacked_size(&self) -> u64 {
self.entries
.iter()
.filter(|entry| entry.kind == RarEntryKind::File)
.map(|entry| entry.size)
.sum()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RarEntry {
relative_path: String,
kind: RarEntryKind,
size: u64,
crc32: Option<u32>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RarEntryKind {
File,
Directory,
}
#[derive(Default)]
struct RarEntryDraft {
relative_path: Option<String>,
kind: Option<RarEntryKind>,
size: Option<u64>,
crc32: Option<u32>,
}
async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result<RarListing> {
let output = Command::new(program)
.arg("lt")
.arg("-cfg-")
.arg(archive)
.output()
.await?;
if !output.status.success() {
eyre::bail!(
"unrar lt failed for {} with status {}: {}",
archive.display(),
output.status,
String::from_utf8_lossy(&output.stderr)
);
}
parse_unrar_listing(&String::from_utf8_lossy(&output.stdout))
}
fn parse_unrar_listing(output: &str) -> eyre::Result<RarListing> {
let mut solid = false;
let mut entries = Vec::new();
let mut current = RarEntryDraft::default();
for line in output.lines() {
let trimmed = line.trim();
if let Some(details) = trimmed.strip_prefix("Details:") {
solid = details.to_ascii_lowercase().contains("solid");
continue;
}
if let Some(name) = trimmed.strip_prefix("Name:") {
push_rar_entry(&mut entries, std::mem::take(&mut current))?;
current.relative_path = Some(name.trim().to_string());
continue;
}
if let Some(kind) = trimmed.strip_prefix("Type:") {
current.kind = match kind.trim() {
"File" => Some(RarEntryKind::File),
"Directory" => Some(RarEntryKind::Directory),
_ => None,
};
continue;
}
if let Some(size) = trimmed.strip_prefix("Size:") {
current.size = Some(size.trim().parse()?);
continue;
}
if let Some(crc) = trimmed.strip_prefix("CRC32:") {
current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?);
}
}
push_rar_entry(&mut entries, current)?;
Ok(RarListing { solid, entries })
}
fn push_rar_entry(entries: &mut Vec<RarEntry>, draft: RarEntryDraft) -> eyre::Result<()> {
let Some(relative_path) = draft.relative_path else {
return Ok(());
};
let Some(kind) = draft.kind else {
return Ok(());
};
let (size, crc32) = match kind {
RarEntryKind::File => {
let size = draft
.size
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?;
let crc32 = draft
.crc32
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?;
(size, Some(crc32))
}
RarEntryKind::Directory => (0, None),
};
entries.push(RarEntry {
relative_path,
kind,
size,
crc32,
});
Ok(())
}
async fn stream_unrar_entries(
program: &Path,
archive: &Path,
entries: &[RarEntry],
frames: &mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken,
) -> eyre::Result<()> {
let mut child = Command::new(program)
.arg("p")
.arg("-inul")
.arg("-cfg-")
.arg(archive)
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()?;
let result = async {
let mut stdout = child
.stdout
.take()
.ok_or_else(|| eyre::eyre!("unrar stdout was not captured"))?;
let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE];
for entry in entries {
if cancel_token.is_cancelled() {
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
match entry.kind {
RarEntryKind::Directory => {
send_stream_frame(
frames,
StreamInstallFrame::Directory {
relative_path: entry.relative_path.clone(),
},
)
.await?;
}
RarEntryKind::File => {
let Some(crc32) = entry.crc32 else {
eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path);
};
send_stream_frame(
frames,
StreamInstallFrame::FileBegin {
relative_path: entry.relative_path.clone(),
size: entry.size,
crc32,
},
)
.await?;
stream_unrar_file_from_stdout(
&mut stdout,
archive,
entry,
frames,
&mut buffer,
&cancel_token,
)
.await?;
send_stream_frame(
frames,
StreamInstallFrame::FileEnd {
relative_path: entry.relative_path.clone(),
},
)
.await?;
}
}
}
let extra =
read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?;
if extra != 0 {
eyre::bail!(
"unrar produced bytes after listed entries for {}",
archive.display()
);
}
let status = wait_unrar_child(&mut child, &cancel_token, archive).await?;
if !status.success() {
eyre::bail!(
"unrar p failed for {} with status {status}",
archive.display()
);
}
Ok(())
}
.await;
if result.is_err() {
let _ = child.kill().await;
}
result
}
async fn stream_unrar_file_from_stdout(
stdout: &mut (impl AsyncRead + Unpin),
archive: &Path,
entry: &RarEntry,
frames: &mpsc::Sender<StreamInstallFrame>,
buffer: &mut [u8],
cancel_token: &CancellationToken,
) -> eyre::Result<()> {
let mut remaining = entry.size;
while remaining > 0 {
let read_len = usize::try_from(remaining.min(u64::try_from(buffer.len())?))?;
let read =
read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?;
if read == 0 {
eyre::bail!(
"unrar ended while streaming {} from {}; {remaining} bytes missing",
entry.relative_path,
archive.display()
);
}
send_stream_frame(
frames,
StreamInstallFrame::FileChunk {
bytes: Bytes::copy_from_slice(&buffer[..read]),
},
)
.await?;
remaining = remaining.saturating_sub(u64::try_from(read)?);
}
Ok(())
}
async fn read_unrar_stdout(
stdout: &mut (impl AsyncRead + Unpin),
buffer: &mut [u8],
cancel_token: &CancellationToken,
archive: &Path,
) -> eyre::Result<usize> {
tokio::select! {
() = cancel_token.cancelled() => {
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
read = stdout.read(buffer) => Ok(read?),
}
}
async fn wait_unrar_child(
child: &mut tokio::process::Child,
cancel_token: &CancellationToken,
archive: &Path,
) -> eyre::Result<std::process::ExitStatus> {
tokio::select! {
() = cancel_token.cancelled() => {
let _ = child.kill().await;
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
status = child.wait() => Ok(status?),
}
}
async fn send_stream_frame(
frames: &mpsc::Sender<StreamInstallFrame>,
frame: StreamInstallFrame,
) -> eyre::Result<()> {
frames
.send(frame)
.await
.map_err(|_| eyre::eyre!("streamed install frame receiver closed"))
}
pub(crate) async fn send_stream_install_error( pub(crate) async fn send_stream_install_error(
tx: SendStream, tx: SendStream,
message: impl Into<String>, message: impl Into<String>,
@@ -444,4 +798,59 @@ mod tests {
assert!(resolve_stream_path(&staging, "/absolute").is_err()); assert!(resolve_stream_path(&staging, "/absolute").is_err());
assert!(resolve_stream_path(&staging, "C:/windows").is_err()); assert!(resolve_stream_path(&staging, "C:/windows").is_err());
} }
#[test]
fn parses_unrar_technical_listing() {
let listing = parse_unrar_listing(
r#"
Archive: game.eti
Details: RAR 5, solid
Name: bin/payload.bin
Type: File
Size: 123
CRC32: 38B488A7
Name: bin
Type: Directory
"#,
)
.expect("listing should parse");
assert!(listing.solid);
assert_eq!(
listing.entries,
vec![
RarEntry {
relative_path: "bin/payload.bin".to_string(),
kind: RarEntryKind::File,
size: 123,
crc32: Some(0x38B4_88A7),
},
RarEntry {
relative_path: "bin".to_string(),
kind: RarEntryKind::Directory,
size: 0,
crc32: None,
},
]
);
}
#[test]
fn rejects_unrar_file_entries_without_crc32() {
let err = parse_unrar_listing(
r#"
Archive: game.eti
Details: RAR 5
Name: bin/payload.bin
Type: File
Size: 123
"#,
)
.expect_err("file entries without CRC32 should be rejected");
assert!(err.to_string().contains("has no CRC32"));
}
} }
@@ -14,11 +14,14 @@ use lanspread_db::db::{Availability, Game, GameCatalog, GameDB, GameFileDescript
use lanspread_peer::{ use lanspread_peer::{
ActiveOperation, ActiveOperation,
ActiveOperationKind, ActiveOperationKind,
ExternalUnrarStreamProvider,
NoopStreamInstallProvider,
PeerCommand, PeerCommand,
PeerEvent, PeerEvent,
PeerGameDB, PeerGameDB,
PeerRuntimeHandle, PeerRuntimeHandle,
PeerStartOptions, PeerStartOptions,
StreamInstallProvider,
UnpackFuture, UnpackFuture,
Unpacker, Unpacker,
migrate_legacy_state, migrate_legacy_state,
@@ -82,6 +85,7 @@ struct LanSpreadState {
peer_runtime: Arc<RwLock<Option<PeerRuntimeHandle>>>, peer_runtime: Arc<RwLock<Option<PeerRuntimeHandle>>>,
games: Arc<RwLock<GameDB>>, games: Arc<RwLock<GameDB>>,
active_operations: Arc<RwLock<HashMap<String, UiOperationKind>>>, active_operations: Arc<RwLock<HashMap<String, UiOperationKind>>>,
pending_stream_installs: Arc<RwLock<HashSet<String>>>,
games_folder: Arc<RwLock<String>>, games_folder: Arc<RwLock<String>>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<GameCatalog>>, catalog: Arc<RwLock<GameCatalog>>,
@@ -255,6 +259,16 @@ async fn install_game(
log::warn!("Game already has an active operation: {id}"); log::warn!("Game already has an active operation: {id}");
return Ok(false); return Ok(false);
} }
if state
.inner()
.pending_stream_installs
.read()
.await
.contains(&id)
{
log::warn!("Game already has a pending streamed install: {id}");
return Ok(false);
}
let peer_ctrl_arc = state.inner().peer_ctrl.clone(); let peer_ctrl_arc = state.inner().peer_ctrl.clone();
let peer_ctrl = peer_ctrl_arc.read().await.clone(); let peer_ctrl = peer_ctrl_arc.read().await.clone();
@@ -294,6 +308,77 @@ async fn install_game(
Ok(handled) Ok(handled)
} }
#[tauri::command]
async fn stream_install_game(
id: String,
state: tauri::State<'_, LanSpreadState>,
) -> tauri::Result<bool> {
if state
.inner()
.active_operations
.read()
.await
.contains_key(&id)
{
log::warn!("Game already has an active operation: {id}");
return Ok(false);
}
if state
.inner()
.pending_stream_installs
.read()
.await
.contains(&id)
{
log::warn!("Game already has a pending streamed install: {id}");
return Ok(false);
}
let Some((downloaded, installed, peer_count)) = state
.inner()
.games
.read()
.await
.get_game_by_id(&id)
.map(|game| (game.downloaded, game.installed, game.peer_count))
else {
log::warn!("Ignoring streamed install request for unknown game: {id}");
return Ok(false);
};
if downloaded || installed || peer_count == 0 {
log::warn!(
"Ignoring streamed install request for {id}: downloaded={downloaded}, \
installed={installed}, peer_count={peer_count}"
);
return Ok(false);
}
let peer_ctrl_arc = state.inner().peer_ctrl.clone();
let peer_ctrl = peer_ctrl_arc.read().await.clone();
let Some(peer_ctrl) = peer_ctrl else {
log::warn!("Peer system not initialized yet");
return Ok(false);
};
{
let mut pending = state.inner().pending_stream_installs.write().await;
pending.insert(id.clone());
}
if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id.clone())) {
log::error!("Failed to send PeerCommand::GetGame for streamed install: {e:?}");
state
.inner()
.pending_stream_installs
.write()
.await
.remove(&id);
return Ok(false);
}
Ok(true)
}
#[tauri::command] #[tauri::command]
async fn update_game( async fn update_game(
id: String, id: String,
@@ -1867,6 +1952,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) {
let unpacker = Arc::new(SidecarUnpacker { let unpacker = Arc::new(SidecarUnpacker {
app_handle: app_handle.clone(), app_handle: app_handle.clone(),
}); });
let stream_install_provider = stream_install_provider_for_app(app_handle);
match start_peer_with_options( match start_peer_with_options(
games_folder.to_path_buf(), games_folder.to_path_buf(),
tx_peer_event, tx_peer_event,
@@ -1876,7 +1962,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) {
PeerStartOptions { PeerStartOptions {
state_dir: Some(state_dir), state_dir: Some(state_dir),
active_outbound_transfers: Some(state.active_outbound_transfers.clone()), active_outbound_transfers: Some(state.active_outbound_transfers.clone()),
stream_install_provider: None, stream_install_provider: Some(stream_install_provider),
}, },
) { ) {
Ok(handle) => { Ok(handle) => {
@@ -1894,6 +1980,22 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) {
} }
} }
fn stream_install_provider_for_app(app_handle: &AppHandle) -> Arc<dyn StreamInstallProvider> {
match resolve_unrar_sidecar_program(app_handle) {
Ok(program) => Arc::new(ExternalUnrarStreamProvider::new(program)),
Err(err) => {
log::error!("Failed to resolve streamed-install unrar sidecar: {err}");
Arc::new(NoopStreamInstallProvider)
}
}
}
fn resolve_unrar_sidecar_program(app_handle: &AppHandle) -> eyre::Result<PathBuf> {
let sidecar = app_handle.shell().sidecar("unrar")?;
let command: std::process::Command = sidecar.into();
Ok(PathBuf::from(command.get_program()))
}
fn emit_game_id_event(app_handle: &AppHandle, event: &str, id: &str, label: &str) { fn emit_game_id_event(app_handle: &AppHandle, event: &str, id: &str, label: &str) {
if let Err(e) = app_handle.emit(event, Some(id.to_owned())) { if let Err(e) = app_handle.emit(event, Some(id.to_owned())) {
log::error!("{label}: Failed to emit {event} event: {e}"); log::error!("{label}: Failed to emit {event} event: {e}");
@@ -1990,6 +2092,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
PeerEvent::NoPeersHaveGame { id } => { PeerEvent::NoPeersHaveGame { id } => {
log::warn!("PeerEvent::NoPeersHaveGame received for {id}"); log::warn!("PeerEvent::NoPeersHaveGame received for {id}");
clear_pending_stream_install(app_handle, &id).await;
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-no-peers", "game-no-peers",
@@ -2028,6 +2131,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
PeerEvent::DownloadGameFilesFailed { id } => { PeerEvent::DownloadGameFilesFailed { id } => {
log::warn!("PeerEvent::DownloadGameFilesFailed received"); log::warn!("PeerEvent::DownloadGameFilesFailed received");
clear_pending_stream_install(app_handle, &id).await;
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-download-failed", "game-download-failed",
@@ -2037,6 +2141,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
PeerEvent::DownloadGameFilesAllPeersGone { id } => { PeerEvent::DownloadGameFilesAllPeersGone { id } => {
log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}"); log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}");
clear_pending_stream_install(app_handle, &id).await;
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-download-peers-gone", "game-download-peers-gone",
@@ -2175,15 +2280,25 @@ async fn handle_got_game_files(
); );
let state = app_handle.state::<LanSpreadState>(); let state = app_handle.state::<LanSpreadState>();
let stream_install = state.pending_stream_installs.write().await.remove(&id);
let peer_ctrl = state.peer_ctrl.read().await.clone(); let peer_ctrl = state.peer_ctrl.read().await.clone();
if let Some(peer_ctrl) = peer_ctrl if let Some(peer_ctrl) = peer_ctrl
&& let Err(e) = peer_ctrl.send(PeerCommand::DownloadGameFiles { && let Err(e) = if stream_install {
peer_ctrl.send(PeerCommand::StreamInstallGame { id })
} else {
peer_ctrl.send(PeerCommand::DownloadGameFiles {
id, id,
file_descriptions, file_descriptions,
}) })
{
log::error!("Failed to send PeerCommand::DownloadGameFiles: {e}");
} }
{
log::error!("Failed to continue queued game transfer: {e}");
}
}
async fn clear_pending_stream_install(app_handle: &AppHandle, id: &str) {
let state = app_handle.state::<LanSpreadState>();
state.pending_stream_installs.write().await.remove(id);
} }
fn handle_download_finished(app_handle: &AppHandle, id: String) { fn handle_download_finished(app_handle: &AppHandle, id: String) {
@@ -2679,6 +2794,7 @@ pub fn run() {
.invoke_handler(tauri::generate_handler![ .invoke_handler(tauri::generate_handler![
request_games, request_games,
install_game, install_game,
stream_install_game,
run_game, run_game,
start_server, start_server,
game_directory_exists, game_directory_exists,
@@ -5,7 +5,7 @@ import { StateChip } from '../StateChip';
import { ActionButton } from '../ActionButton'; import { ActionButton } from '../ActionButton';
import { Game, InstallStatus } from '../../lib/types'; import { Game, InstallStatus } from '../../lib/types';
import { deriveState, hasNewerLocalVersion, isInProgress } from '../../lib/gameState'; import { canStreamInstall, deriveState, hasNewerLocalVersion, isInProgress } from '../../lib/gameState';
import { formatBytes, formatEtiVersion, formatPlayers } from '../../lib/format'; import { formatBytes, formatEtiVersion, formatPlayers } from '../../lib/format';
interface Props { interface Props {
@@ -13,6 +13,7 @@ interface Props {
thumbnailUrl: string | null; thumbnailUrl: string | null;
onClose: () => void; onClose: () => void;
onPrimary: (game: Game) => void; onPrimary: (game: Game) => void;
onStreamInstall: (game: Game) => void;
onUninstall: (game: Game) => void; onUninstall: (game: Game) => void;
onRemoveDownload: (game: Game) => void; onRemoveDownload: (game: Game) => void;
onCancelDownload: (game: Game) => void; onCancelDownload: (game: Game) => void;
@@ -43,6 +44,7 @@ export const GameDetailModal = ({
thumbnailUrl, thumbnailUrl,
onClose, onClose,
onPrimary, onPrimary,
onStreamInstall,
onUninstall, onUninstall,
onRemoveDownload, onRemoveDownload,
onCancelDownload, onCancelDownload,
@@ -55,6 +57,7 @@ export const GameDetailModal = ({
const canRemoveDownload = game.downloaded const canRemoveDownload = game.downloaded
&& !game.installed && !game.installed
&& !isInProgress(game.install_status); && !isInProgress(game.install_status);
const showStreamInstall = canStreamInstall(game);
const canViewFiles = game.downloaded const canViewFiles = game.downloaded
|| game.installed || game.installed
|| game.install_status === InstallStatus.Downloading || game.install_status === InstallStatus.Downloading
@@ -133,6 +136,17 @@ export const GameDetailModal = ({
onClick={() => onPrimary(game)} onClick={() => onPrimary(game)}
onCancelDownload={onCancelDownload} onCancelDownload={onCancelDownload}
/> />
{showStreamInstall && (
<button
type="button"
className="ghost-btn"
title="Install without keeping archive files"
onClick={() => onStreamInstall(game)}
>
<Icon.install />
<span>Low disk install</span>
</button>
)}
{game.installed && game.can_host_server === true && ( {game.installed && game.can_host_server === true && (
<button <button
type="button" type="button"
@@ -9,6 +9,7 @@ export interface GameActions {
play: (id: string) => Promise<void>; play: (id: string) => Promise<void>;
startServer: (id: string) => Promise<void>; startServer: (id: string) => Promise<void>;
install: (id: string) => Promise<void>; install: (id: string) => Promise<void>;
streamInstall: (id: string) => Promise<void>;
update: (id: string) => Promise<void>; update: (id: string) => Promise<void>;
uninstall: (id: string) => Promise<void>; uninstall: (id: string) => Promise<void>;
removeDownload: (id: string) => Promise<void>; removeDownload: (id: string) => Promise<void>;
@@ -68,6 +69,15 @@ export const useGameActions = (
} }
}, [games, settings.language, settings.username]); }, [games, settings.language, settings.username]);
const streamInstall = useCallback(async (id: string) => {
try {
const success = await invoke<boolean>('stream_install_game', { id });
if (success) games.markChecking(id);
} catch (err) {
console.error('stream_install_game failed:', err);
}
}, [games]);
const update = useCallback(async (id: string) => { const update = useCallback(async (id: string) => {
try { try {
const game = games.games.find(item => item.id === id); const game = games.games.find(item => item.id === id);
@@ -129,5 +139,15 @@ export const useGameActions = (
} }
}, []); }, []);
return { play, startServer, install, update, uninstall, removeDownload, cancelDownload, viewFiles }; return {
play,
startServer,
install,
streamInstall,
update,
uninstall,
removeDownload,
cancelDownload,
viewFiles,
};
}; };
@@ -114,6 +114,12 @@ export const needsUpdate = (game: Game): boolean => {
return (compareVersionStamps(game.eti_game_version, game.local_version) ?? 0) > 0; return (compareVersionStamps(game.eti_game_version, game.local_version) ?? 0) > 0;
}; };
export const canStreamInstall = (game: Game): boolean =>
!game.downloaded
&& !game.installed
&& game.peer_count > 0
&& !isInProgress(game.install_status);
/** What pressing the card's main action button should do, given the state. */ /** What pressing the card's main action button should do, given the state. */
export type PrimaryAction = 'play' | 'install' | 'update' | 'download' | 'busy' | 'disabled'; export type PrimaryAction = 'play' | 'install' | 'update' | 'download' | 'busy' | 'disabled';
@@ -192,6 +192,7 @@ export const MainWindow = () => {
thumbnailUrl={thumbnails.get(openGame.id)} thumbnailUrl={thumbnails.get(openGame.id)}
onClose={() => setOpenGameId(null)} onClose={() => setOpenGameId(null)}
onPrimary={handlePrimary} onPrimary={handlePrimary}
onStreamInstall={(g) => actions.streamInstall(g.id)}
onUninstall={handleUninstall} onUninstall={handleUninstall}
onRemoveDownload={handleRemoveDownload} onRemoveDownload={handleRemoveDownload}
onCancelDownload={(g) => actions.cancelDownload(g.id)} onCancelDownload={(g) => actions.cancelDownload(g.id)}
@@ -2,6 +2,7 @@ import {
actionLabel, actionLabel,
activeStatusById, activeStatusById,
applyFilterAndSort, applyFilterAndSort,
canStreamInstall,
countByFilter, countByFilter,
deriveState, deriveState,
downloadProgressPercent, downloadProgressPercent,
@@ -209,3 +210,36 @@ Deno.test('download progress formatting matches the progress-bar layouts', () =>
); );
assertEquals(formatDownloadEta(485), '8 min', 'eta format should stay compact'); assertEquals(formatDownloadEta(485), '8 min', 'eta format should stay compact');
}); });
Deno.test('stream install is available only for idle remote games', () => {
assertEquals(
canStreamInstall(game({ downloaded: false, installed: false, peer_count: 1 })),
true,
'remote-only idle games should allow streamed install',
);
assertEquals(
canStreamInstall(game({ downloaded: true, installed: false, peer_count: 1 })),
false,
'downloaded games should install from local archives',
);
assertEquals(
canStreamInstall(game({ downloaded: false, installed: true, peer_count: 1 })),
false,
'installed games should not expose streamed install',
);
assertEquals(
canStreamInstall(game({ downloaded: false, installed: false, peer_count: 0 })),
false,
'games without peers should not expose streamed install',
);
assertEquals(
canStreamInstall(game({
downloaded: false,
installed: false,
peer_count: 1,
install_status: InstallStatus.CheckingPeers,
})),
false,
'busy games should not expose streamed install',
);
});