feat(peer): prototype streamed installs

Add a streamed-install prototype that can receive archive-derived install bytes
straight into local/ without first storing the peer-owned root archive payload.
This is intended for low-disk clients that want to install a game but opt out of
becoming a downloadable peer source for that game.

The protocol gains a current-version-only StreamInstall request and framed
StreamInstallFrame responses. The peer core owns the generic transport,
transaction, path validation, size checks, CRC32 verification, and lifecycle
state. The archive-specific work is hidden behind StreamInstallProvider so the
prototype can use unrar while the final implementation can swap in a better
provider without rewriting the peer command path.

The receiver writes into .local.installing and only promotes to local/ after the
full stream verifies. It deliberately does not write the root version.ini or
archive files, so the settled local state is installed=true, downloaded=false,
and availability=LocalOnly. That preserves the existing rule that local/ is not
served to peers and makes streamed receivers non-sources by construction.

The CLI is the only caller for now. It exposes stream-install and provides the
prototype unrar implementation with unrar lt for entry metadata and unrar p for
file bytes. This is simple and good enough to prove non-solid archive streaming,
but it is not the production provider shape for solid archives because per-file
unrar p would repeatedly decompress prefixes. The Tauri app explicitly passes
stream_install_provider: None, so the GUI behavior stays unchanged until a real
product path is designed.

Document the production-readiness work in NEXT_STEPS.md. The main follow-up is
to make the provider abstraction final-ish and replace the per-file CLI unrar
provider with a one-pass archive provider, then wire a deliberate GUI low-disk
mode, retry semantics, and broader failure scenarios.

Test Plan:
- just fmt
- RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \
  S39 S40 --build-image
- RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just clippy
- git diff --check
- git diff --cached --check

Follow-up: NEXT_STEPS.md
This commit is contained in:
2026-06-07 20:31:51 +02:00
parent 8a8437036d
commit 373def6d44
22 changed files with 1465 additions and 14 deletions
+3
View File
@@ -14,11 +14,14 @@ path = "src/main.rs"
lanspread-compat = { path = "../lanspread-compat" }
lanspread-db = { path = "../lanspread-db" }
lanspread-peer = { path = "../lanspread-peer" }
lanspread-proto = { path = "../lanspread-proto" }
bytes = { workspace = true }
eyre = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
[lints.clippy]
needless_pass_by_value = "allow"
+4 -2
View File
@@ -4,14 +4,16 @@ WORKDIR /work
COPY . .
RUN cargo build --release -p lanspread-peer-cli
FROM debian:bookworm-slim
FROM debian:trixie-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates \
&& apt-get install -y --no-install-recommends ca-certificates libstdc++6 \
&& rm -rf /var/lib/apt/lists/*
COPY --from=build /work/target/release/lanspread-peer-cli /usr/local/bin/lanspread-peer-cli
COPY crates/lanspread-tauri-deno-ts/src-tauri/game.db /app/game.db
COPY crates/lanspread-tauri-deno-ts/src-tauri/binaries/unrar-x86_64-unknown-linux-gnu /usr/local/bin/unrar
RUN chmod +x /usr/local/bin/unrar
ENTRYPOINT ["lanspread-peer-cli"]
CMD ["--games-dir", "/games", "--state-dir", "/state", "--catalog-db", "/app/game.db"]
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""Run the peer-cli scenarios S1-S36 through Docker."""
"""Run the peer-cli scenarios S1-S40 through Docker."""
from __future__ import annotations
@@ -8,6 +8,7 @@ import hashlib
import json
import os
import queue
import shlex
import shutil
import subprocess
import sys
@@ -325,6 +326,8 @@ class Runner:
("S35", self.s35_unknown_game_filtered),
("S36", self.s36_latest_singleton),
("S37", self.s37_single_source_download_throughput),
("S39", self.s39_streamed_install_local_only),
("S40", self.s40_streamed_receiver_not_source),
]
for scenario_id, scenario in scenarios:
@@ -1060,6 +1063,114 @@ class Runner:
f"{throughput['chunks']} chunks"
)
def stream_install_cnctw(self, prefix: str) -> tuple[Peer, Peer]:
source_dir = self.fixture_root / f"{prefix}-bravo"
copy_game("cnctw", source_dir, version="20160128")
source = self.peer(f"{prefix}-bravo", games_dir=source_dir)
client = self.peer(f"{prefix}-client")
connect_many(client, [source])
wait_remote_game(client, "cnctw", peer_count=1)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "stream-install", "game_id": "cnctw"})
client.wait_for(
event_is("got-game-files", "cnctw"),
timeout=20,
description="got cnctw files",
waiter=waiter,
)
client.wait_for(
event_is("download-begin", "cnctw"),
timeout=20,
description="stream begin cnctw",
waiter=waiter,
)
client.wait_for(
event_is("download-finished", "cnctw"),
timeout=60,
description="stream finish cnctw",
waiter=waiter,
)
client.wait_for(
event_is("install-finished", "cnctw"),
timeout=30,
description="stream install cnctw",
waiter=waiter,
)
return source, client
def s39_streamed_install_local_only(self) -> str:
source, client = self.stream_install_cnctw("s39")
game = wait_local_game(client, "cnctw", downloaded=False, installed=True)
assert_game_state(
game,
downloaded=False,
installed=True,
availability="LocalOnly",
)
game_root = client.host_games_dir / "cnctw"
assert_not_exists(game_root / "version.ini")
assert_not_exists(game_root / "cnctw.eti")
expected = {
"bin/cnctw-payload.bin": unrar_entry_sha256(
source, "cnctw", "bin/cnctw-payload.bin"
),
"data/cnctw-assets.dat": unrar_entry_sha256(
source, "cnctw", "data/cnctw-assets.dat"
),
}
actual = {
rel: sha256_file(game_root / "local" / rel)
for rel in expected
}
if actual != expected:
raise ScenarioError(f"streamed local payload hashes mismatched: {actual} != {expected}")
streamed_bytes = sum(
int(item.get("data", {}).get("length", 0))
for item in client.output
if item.get("type") == "event"
and item.get("event") == "download-chunk-finished"
and item.get("data", {}).get("game_id") == "cnctw"
)
expected_bytes = 3 * 1024 * 1024
if streamed_bytes != expected_bytes:
raise ScenarioError(
f"streamed byte count mismatch: {streamed_bytes} != {expected_bytes}"
)
return (
"cnctw streamed into local/ only; root archive and version.ini absent; "
f"payload hashes={actual}"
)
def s40_streamed_receiver_not_source(self) -> str:
_source, receiver = self.stream_install_cnctw("s40")
observer = self.peer("s40-observer")
connect_many(observer, [receiver])
receiver_snapshot = wait_peer_has_game(observer, receiver.peer_id, "cnctw")
summary = next(
game
for game in receiver_snapshot.get("games", [])
if game.get("id") == "cnctw"
)
if summary.get("availability") != "LocalOnly" or summary.get("downloaded"):
raise ScenarioError(f"receiver did not advertise cnctw as local-only: {summary}")
wait_remote_absent(observer, "cnctw", timeout=5)
err = observer.send(
{"cmd": "download", "game_id": "cnctw", "install": False},
expect_error=True,
)
if "no peers have game cnctw" not in err["error"]:
raise ScenarioError(f"unexpected local-only download error: {err}")
assert_not_exists(observer.host_games_dir / "cnctw")
return (
"observer saw receiver's local-only cnctw snapshot, but remote aggregation hid it "
f"and download errored '{err['error']}'"
)
def run(command: list[str], description: str) -> subprocess.CompletedProcess[str]:
result = subprocess.run(
@@ -1177,6 +1288,25 @@ def create_large_sparse_game(root: Path, *, size: int) -> None:
handle.truncate(size)
def sha256_file(path: Path) -> str:
hasher = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
hasher.update(chunk)
return hasher.hexdigest()
def unrar_entry_sha256(peer: Peer, game_id: str, relative_path: str) -> str:
command = (
f"unrar p -inul /games/{shlex.quote(game_id)}/{shlex.quote(game_id)}.eti "
f"{shlex.quote(relative_path)} | sha256sum"
)
output = peer.docker_exec("sh", "-c", command).stdout.strip()
if not output:
raise ScenarioError(f"empty sha256 output for {game_id}:{relative_path}")
return output.split()[0]
def format_bytes(size: int) -> str:
return f"{size / 1024 / 1024 / 1024:.2f} GiB"
+329 -1
View File
@@ -5,15 +5,21 @@
use std::{
net::SocketAddr,
path::{Path, PathBuf},
process::Stdio,
time::Duration,
};
use bytes::Bytes;
use eyre::{Context, OptionExt};
use lanspread_peer::{UnpackFuture, Unpacker};
use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, Unpacker};
use lanspread_proto::StreamInstallFrame;
use serde::Serialize;
use serde_json::{Value, json};
use tokio::{io::AsyncReadExt, sync::mpsc};
use tokio_util::sync::CancellationToken;
pub const DEFAULT_FIXTURE_VERSION: &str = "20250101";
const STREAM_CHUNK_SIZE: usize = 256 * 1024;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CommandEnvelope {
@@ -33,6 +39,9 @@ pub enum CliCommand {
game_id: String,
install_after_download: bool,
},
StreamInstall {
game_id: String,
},
Install {
game_id: String,
},
@@ -63,6 +72,7 @@ impl CliCommand {
Self::ListGames => "list-games",
Self::SetGameDir { .. } => "set-game-dir",
Self::Download { .. } => "download",
Self::StreamInstall { .. } => "stream-install",
Self::Install { .. } => "install",
Self::Uninstall { .. } => "uninstall",
Self::Play { .. } => "play",
@@ -101,6 +111,9 @@ pub fn parse_command_value(value: &Value) -> eyre::Result<CommandEnvelope> {
game_id: game_id(object)?,
install_after_download: install_after_download(object)?,
},
"stream-install" => CliCommand::StreamInstall {
game_id: game_id(object)?,
},
"install" => CliCommand::Install {
game_id: game_id(object)?,
},
@@ -254,6 +267,270 @@ impl Unpacker for ExternalUnrarUnpacker {
}
}
pub struct ExternalUnrarStreamProvider {
program: PathBuf,
}
impl ExternalUnrarStreamProvider {
#[must_use]
pub fn new(program: PathBuf) -> Self {
Self { program }
}
}
impl StreamInstallProvider for ExternalUnrarStreamProvider {
fn stream_archive<'a>(
&'a self,
archive: &'a Path,
frames: mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken,
) -> StreamInstallFuture<'a> {
Box::pin(async move {
let listing = unrar_listing(&self.program, archive).await?;
let archive_name = archive
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("archive.eti")
.to_string();
send_stream_frame(
&frames,
StreamInstallFrame::ArchiveBegin {
archive_name: archive_name.clone(),
solid: listing.solid,
},
)
.await?;
for entry in listing.entries {
if cancel_token.is_cancelled() {
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
match entry.kind {
RarEntryKind::Directory => {
send_stream_frame(
&frames,
StreamInstallFrame::Directory {
relative_path: entry.relative_path,
},
)
.await?;
}
RarEntryKind::File => {
send_stream_frame(
&frames,
StreamInstallFrame::FileBegin {
relative_path: entry.relative_path.clone(),
size: entry.size,
crc32: entry.crc32,
},
)
.await?;
stream_unrar_file(
&self.program,
archive,
&entry.relative_path,
&frames,
cancel_token.clone(),
)
.await?;
send_stream_frame(
&frames,
StreamInstallFrame::FileEnd {
relative_path: entry.relative_path,
},
)
.await?;
}
}
}
send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RarListing {
solid: bool,
entries: Vec<RarEntry>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RarEntry {
relative_path: String,
kind: RarEntryKind,
size: u64,
crc32: Option<u32>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RarEntryKind {
File,
Directory,
}
#[derive(Default)]
struct RarEntryDraft {
relative_path: Option<String>,
kind: Option<RarEntryKind>,
size: Option<u64>,
crc32: Option<u32>,
}
async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result<RarListing> {
let output = tokio::process::Command::new(program)
.arg("lt")
.arg("-cfg-")
.arg(archive)
.output()
.await?;
if !output.status.success() {
eyre::bail!(
"unrar lt failed for {} with status {}: {}",
archive.display(),
output.status,
String::from_utf8_lossy(&output.stderr)
);
}
parse_unrar_listing(&String::from_utf8_lossy(&output.stdout))
}
fn parse_unrar_listing(output: &str) -> eyre::Result<RarListing> {
let mut solid = false;
let mut entries = Vec::new();
let mut current = RarEntryDraft::default();
for line in output.lines() {
let trimmed = line.trim();
if let Some(details) = trimmed.strip_prefix("Details:") {
solid = details.to_ascii_lowercase().contains("solid");
continue;
}
if let Some(name) = trimmed.strip_prefix("Name:") {
push_rar_entry(&mut entries, std::mem::take(&mut current))?;
current.relative_path = Some(name.trim().to_string());
continue;
}
if let Some(kind) = trimmed.strip_prefix("Type:") {
current.kind = match kind.trim() {
"File" => Some(RarEntryKind::File),
"Directory" => Some(RarEntryKind::Directory),
_ => None,
};
continue;
}
if let Some(size) = trimmed.strip_prefix("Size:") {
current.size = Some(size.trim().parse()?);
continue;
}
if let Some(crc) = trimmed.strip_prefix("CRC32:") {
current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?);
}
}
push_rar_entry(&mut entries, current)?;
Ok(RarListing { solid, entries })
}
fn push_rar_entry(entries: &mut Vec<RarEntry>, draft: RarEntryDraft) -> eyre::Result<()> {
let Some(relative_path) = draft.relative_path else {
return Ok(());
};
let Some(kind) = draft.kind else {
return Ok(());
};
let size = match kind {
RarEntryKind::File => draft
.size
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?,
RarEntryKind::Directory => 0,
};
entries.push(RarEntry {
relative_path,
kind,
size,
crc32: draft.crc32,
});
Ok(())
}
async fn stream_unrar_file(
program: &Path,
archive: &Path,
relative_path: &str,
frames: &mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken,
) -> eyre::Result<()> {
let mut child = tokio::process::Command::new(program)
.arg("p")
.arg("-inul")
.arg("-cfg-")
.arg(archive)
.arg(relative_path)
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()?;
let mut stdout = child
.stdout
.take()
.ok_or_eyre("unrar stdout was not captured")?;
let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE];
loop {
let read = tokio::select! {
() = cancel_token.cancelled() => {
let _ = child.kill().await;
eyre::bail!("streaming {relative_path} from {} was cancelled", archive.display());
}
read = stdout.read(&mut buffer) => read?,
};
if read == 0 {
break;
}
send_stream_frame(
frames,
StreamInstallFrame::FileChunk {
bytes: Bytes::copy_from_slice(&buffer[..read]),
},
)
.await?;
}
let status = child.wait().await?;
if !status.success() {
eyre::bail!(
"unrar p failed for {}:{} with status {status}",
archive.display(),
relative_path
);
}
Ok(())
}
async fn send_stream_frame(
frames: &mpsc::Sender<StreamInstallFrame>,
frame: StreamInstallFrame,
) -> eyre::Result<()> {
frames
.send(frame)
.await
.map_err(|_| eyre::eyre!("streamed install frame receiver closed"))
}
pub fn result_line(id: &Option<Value>, command: &str, data: Value) -> eyre::Result<String> {
output_line(json!({
"type": "result",
@@ -344,6 +621,57 @@ mod tests {
assert_eq!(parsed["data"]["peer_count"], 0);
}
#[test]
fn parses_stream_install_command() {
let parsed = parse_command_line(r#"{"cmd":"stream-install","game_id":"cnctw"}"#)
.expect("command should parse");
assert_eq!(
parsed.command,
CliCommand::StreamInstall {
game_id: "cnctw".to_string(),
}
);
}
#[test]
fn parses_unrar_technical_listing() {
let listing = parse_unrar_listing(
r#"
Archive: game.eti
Details: RAR 5
Name: bin/payload.bin
Type: File
Size: 123
CRC32: 38B488A7
Name: bin
Type: Directory
"#,
)
.expect("listing should parse");
assert!(!listing.solid);
assert_eq!(
listing.entries,
vec![
RarEntry {
relative_path: "bin/payload.bin".to_string(),
kind: RarEntryKind::File,
size: 123,
crc32: Some(0x38B4_88A7),
},
RarEntry {
relative_path: "bin".to_string(),
kind: RarEntryKind::Directory,
size: 0,
crc32: None,
},
]
);
}
#[tokio::test]
async fn fixture_unpacker_creates_install_payload() {
let temp = TempDir::new("lanspread-peer-cli-fixture");
+28 -1
View File
@@ -17,6 +17,7 @@ use lanspread_peer::{
ActiveOperation,
ActiveOperationKind,
InstallOperation,
NoopStreamInstallProvider,
PeerCommand,
PeerEvent,
PeerGameDB,
@@ -24,6 +25,7 @@ use lanspread_peer::{
PeerRuntimeHandle,
PeerSnapshot,
PeerStartOptions,
StreamInstallProvider,
migrate_legacy_state,
start_peer_with_options,
};
@@ -31,6 +33,7 @@ use lanspread_peer_cli::{
CliCommand,
CommandEnvelope,
DEFAULT_FIXTURE_VERSION,
ExternalUnrarStreamProvider,
ExternalUnrarUnpacker,
FixtureSeed,
FixtureUnpacker,
@@ -134,10 +137,15 @@ async fn main() -> eyre::Result<()> {
let (tx_events, rx_events) = mpsc::unbounded_channel();
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let catalog = Arc::new(RwLock::new(catalog));
let unpacker: Arc<dyn lanspread_peer::Unpacker> = match args.unrar {
let unrar_for_streaming = args.unrar.clone().or_else(default_unrar_program);
let unpacker: Arc<dyn lanspread_peer::Unpacker> = match args.unrar.clone() {
Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)),
None => Arc::new(FixtureUnpacker),
};
let stream_install_provider: Arc<dyn StreamInstallProvider> = match unrar_for_streaming {
Some(path) => Arc::new(ExternalUnrarStreamProvider::new(path)),
None => Arc::new(NoopStreamInstallProvider),
};
let mut handle = start_peer_with_options(
args.games_dir.clone(),
@@ -148,6 +156,7 @@ async fn main() -> eyre::Result<()> {
PeerStartOptions {
state_dir: Some(args.state_dir.clone()),
active_outbound_transfers: None,
stream_install_provider: Some(stream_install_provider),
},
)?;
let sender = handle.sender();
@@ -249,6 +258,15 @@ async fn handle_command(
})?;
Ok(json!({"queued": true, "game_id": game_id, "install": install_after_download}))
}
CliCommand::StreamInstall { game_id } => {
ensure_catalog_game(shared, game_id).await?;
ensure_no_active_operation(shared, game_id).await?;
let _ = game_files_for_download(sender, shared, game_id).await?;
sender.send(PeerCommand::StreamInstallGame {
id: game_id.clone(),
})?;
Ok(json!({"queued": true, "game_id": game_id}))
}
CliCommand::Install { game_id } => {
ensure_catalog_game(shared, game_id).await?;
ensure_no_active_operation(shared, game_id).await?;
@@ -729,6 +747,15 @@ fn default_catalog_db() -> Option<PathBuf> {
.find(|path| path.exists())
}
fn default_unrar_program() -> Option<PathBuf> {
[
PathBuf::from("/usr/local/bin/unrar"),
PathBuf::from("/usr/bin/unrar"),
]
.into_iter()
.find(|path| path.exists())
}
fn next_string(args: &mut impl Iterator<Item = OsString>, flag: &str) -> eyre::Result<String> {
args.next()
.ok_or_else(|| eyre::eyre!("{flag} requires a value"))?
+1
View File
@@ -14,6 +14,7 @@ lanspread-utils = { path = "../lanspread-utils" }
# external
bytes = { workspace = true }
crc32fast = { workspace = true }
eyre = { workspace = true }
futures = { workspace = true }
gethostname = { workspace = true }
+13 -1
View File
@@ -6,7 +6,14 @@ use lanspread_db::db::{GameCatalog, GameDB};
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB};
use crate::{
PeerEvent,
StreamInstallProvider,
Unpacker,
events,
library::LocalLibraryState,
peer_db::PeerGameDB,
};
/// Thread-safe map of active outbound file transfers grouped by game ID.
pub type OutboundTransfers = Arc<RwLock<HashMap<String, Vec<(u64, CancellationToken)>>>>;
@@ -38,6 +45,7 @@ pub struct Ctx {
pub active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
pub active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
pub unpacker: Arc<dyn Unpacker>,
pub stream_install_provider: Arc<dyn StreamInstallProvider>,
pub catalog: Arc<RwLock<GameCatalog>>,
pub peer_id: Arc<String>,
pub shutdown: CancellationToken,
@@ -57,6 +65,7 @@ pub struct PeerCtx {
pub catalog: Arc<RwLock<GameCatalog>>,
pub peer_id: Arc<String>,
pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender<PeerEvent>,
pub stream_install_provider: Arc<dyn StreamInstallProvider>,
pub shutdown: CancellationToken,
pub task_tracker: TaskTracker,
pub active_outbound_transfers: OutboundTransfers,
@@ -86,6 +95,7 @@ impl Ctx {
task_tracker: TaskTracker,
catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: OutboundTransfers,
stream_install_provider: Arc<dyn StreamInstallProvider>,
) -> Self {
Self {
game_dir: Arc::new(RwLock::new(game_dir)),
@@ -97,6 +107,7 @@ impl Ctx {
active_operations: Arc::new(RwLock::new(HashMap::new())),
active_downloads: Arc::new(RwLock::new(HashMap::new())),
unpacker,
stream_install_provider,
catalog,
peer_id: Arc::new(peer_id),
shutdown,
@@ -120,6 +131,7 @@ impl Ctx {
catalog: self.catalog.clone(),
peer_id: self.peer_id.clone(),
tx_notify_ui,
stream_install_provider: self.stream_install_provider.clone(),
shutdown: self.shutdown.clone(),
task_tracker: self.task_tracker.clone(),
active_outbound_transfers: self.active_outbound_transfers.clone(),
+233
View File
@@ -11,6 +11,7 @@ use std::{
use lanspread_db::db::{GameDB, GameFileDescription};
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use tokio_util::sync::CancellationToken;
use crate::{
InstallOperation,
@@ -33,6 +34,7 @@ use crate::{
peer_db::PeerGameDB,
remote_peer::ensure_peer_id_for_addr,
services::{HandshakeCtx, perform_handshake_with_peer},
stream_install::receive_streamed_install,
};
// =============================================================================
@@ -450,6 +452,91 @@ pub async fn handle_install_game_command(
spawn_install_operation(ctx, tx_notify_ui, id);
}
pub async fn handle_stream_install_game_command(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: String,
) {
if !catalog_contains(ctx, &id).await {
log::warn!("Ignoring streamed install command for non-catalog game {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
let games_folder = { ctx.game_dir.read().await.clone() };
let game_root = games_folder.join(&id);
if local_dir_is_directory(&game_root).await {
log::warn!("Ignoring streamed install command for already-installed game {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
let expected_version = catalog_expected_version(ctx, &id).await;
let mut peers = {
match ctx
.peer_game_db
.read()
.await
.validate_file_sizes_majority(&id, expected_version.as_deref())
{
Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => {
peer_whitelist
}
Ok(_) => {
log::error!("No trusted peers available for streamed install of {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
Err(err) => {
log::error!(
"File size majority validation failed for streamed install {id}: {err}"
);
send_download_failed(tx_notify_ui, &id);
return;
}
}
};
peers.sort();
let Some(peer_addr) = peers.into_iter().next() else {
log::error!("No peer selected for streamed install of {id}");
send_download_failed(tx_notify_ui, &id);
return;
};
match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring streamed install request");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before streamed install of {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
}
let cancel_token = ctx.shutdown.child_token();
ctx.active_downloads
.write()
.await
.insert(id.clone(), cancel_token.clone());
let ctx_clone = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
ctx.task_tracker.spawn(async move {
run_stream_install_operation(
ctx_clone,
tx_notify_ui,
id,
game_root,
peer_addr,
cancel_token,
)
.await;
});
}
/// Handles the `UninstallGame` command.
pub async fn handle_uninstall_game_command(
ctx: &Ctx,
@@ -490,6 +577,151 @@ pub async fn handle_cancel_download_command(
cancel_token.cancel();
}
async fn run_stream_install_operation(
ctx: Ctx,
tx_notify_ui: UnboundedSender<PeerEvent>,
id: String,
game_root: PathBuf,
peer_addr: SocketAddr,
cancel_token: CancellationToken,
) {
let download_guard = OperationGuard::download(
id.clone(),
ctx.active_operations.clone(),
ctx.active_downloads.clone(),
tx_notify_ui.clone(),
);
events::send(
&tx_notify_ui,
PeerEvent::DownloadGameFilesBegin { id: id.clone() },
);
let transaction = match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id)
.await
{
Ok(transaction) => transaction,
Err(err) => {
log::error!("Failed to prepare streamed install for {id}: {err}");
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await;
return;
}
};
let receive_result = receive_streamed_install(
peer_addr,
&id,
transaction.staging_dir(),
tx_notify_ui.clone(),
cancel_token.clone(),
)
.await;
match receive_result {
Ok(()) => {
if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing)
.await
{
clear_active_download(&ctx, &id).await;
send_download_finished(&tx_notify_ui, &id);
download_guard.disarm();
commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await;
} else {
if let Err(err) = transaction.rollback().await {
log::error!("Failed to roll back streamed install for {id}: {err}");
}
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false)
.await;
}
}
Err(err) => {
if let Err(rollback_err) = transaction.rollback().await {
log::error!("Failed to roll back streamed install for {id}: {rollback_err}");
}
let download_was_cancelled = cancel_token.is_cancelled();
if download_was_cancelled {
log::info!("Streamed install download cancelled for {id}: {err}");
} else {
log::error!("Streamed install download failed for {id}: {err}");
}
finish_failed_stream_download(
&ctx,
&tx_notify_ui,
&id,
download_guard,
download_was_cancelled,
)
.await;
}
}
}
async fn finish_failed_stream_download(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
guard: OperationGuard,
cancelled: bool,
) {
if let Err(err) = refresh_local_game_for_ending_operation(ctx, tx_notify_ui, id).await {
log::error!("Failed to refresh local library after streamed install failure: {err}");
}
end_download_operation(ctx, tx_notify_ui, id).await;
guard.disarm();
send_download_failed_unless_cancelled(tx_notify_ui, id, cancelled);
}
async fn commit_streamed_install(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: String,
transaction: install::StreamedInstallTransaction,
) {
let operation_guard = OperationGuard::new(
id.clone(),
ctx.active_operations.clone(),
tx_notify_ui.clone(),
);
events::send(
tx_notify_ui,
PeerEvent::InstallGameBegin {
id: id.clone(),
operation: InstallOperation::Installing,
},
);
match transaction.commit().await {
Ok(()) => {
if let Err(err) = refresh_local_game_for_ending_operation(ctx, tx_notify_ui, &id).await
{
log::error!("Failed to refresh local library after streamed install: {err}");
}
end_operation(ctx, tx_notify_ui, &id).await;
operation_guard.disarm();
events::send(
tx_notify_ui,
PeerEvent::InstallGameFinished { id: id.clone() },
);
}
Err(err) => {
log::error!("Streamed install commit failed for {id}: {err}");
if let Err(refresh_err) =
refresh_local_game_for_ending_operation(ctx, tx_notify_ui, &id).await
{
log::error!(
"Failed to refresh local library after streamed install commit failure: {refresh_err}"
);
}
end_operation(ctx, tx_notify_ui, &id).await;
operation_guard.disarm();
events::send(
tx_notify_ui,
PeerEvent::InstallGameFailed { id: id.clone() },
);
}
}
}
fn spawn_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>, id: String) {
let ctx = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
@@ -1264,6 +1496,7 @@ mod tests {
TaskTracker::new(),
Arc::new(RwLock::new(GameCatalog::from_ids(["game".to_string()]))),
Arc::new(RwLock::new(HashMap::new())),
Arc::new(crate::NoopStreamInstallProvider),
)
}
+9 -1
View File
@@ -4,5 +4,13 @@ mod transaction;
pub mod unpack;
pub use remove::remove_downloaded;
pub use transaction::{install, recover_on_startup, uninstall, update};
pub(crate) use transaction::root_eti_archives;
pub use transaction::{
StreamedInstallTransaction,
begin_streamed_install,
install,
recover_on_startup,
uninstall,
update,
};
pub use unpack::{UnpackFuture, Unpacker};
@@ -33,6 +33,103 @@ struct InstallFsState {
backup: FsEntryState,
}
pub struct StreamedInstallTransaction {
game_root: PathBuf,
state_dir: PathBuf,
id: String,
staging: PathBuf,
eti_version: Option<String>,
}
impl StreamedInstallTransaction {
#[must_use]
pub fn staging_dir(&self) -> &Path {
&self.staging
}
pub async fn commit(self) -> eyre::Result<()> {
let local = local_dir(&self.game_root);
let result = async {
tokio::fs::rename(&self.staging, &local)
.await
.wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))?;
reset_launch_settings_marker(&self.state_dir, &self.id).await?;
write_intent(
&self.state_dir,
&self.id,
&InstallIntent::none(&self.id, self.eti_version.clone()),
)
.await
}
.await;
if result.is_err() {
if let Err(cleanup_err) = remove_dir_all_if_exists(&self.staging).await {
log::warn!(
"Failed to clean streamed install staging {}: {cleanup_err}",
self.staging.display()
);
}
let _ = write_intent(
&self.state_dir,
&self.id,
&InstallIntent::none(&self.id, self.eti_version.clone()),
)
.await;
}
result
}
pub async fn rollback(self) -> eyre::Result<()> {
let staging_result = remove_dir_all_if_exists(&self.staging).await;
let intent_result = write_intent(
&self.state_dir,
&self.id,
&InstallIntent::none(&self.id, self.eti_version.clone()),
)
.await;
staging_result?;
intent_result
}
}
pub async fn begin_streamed_install(
game_root: &Path,
state_dir: &Path,
id: &str,
) -> eyre::Result<StreamedInstallTransaction> {
if path_is_dir(&local_dir(game_root)).await {
eyre::bail!("game {id} is already installed");
}
tokio::fs::create_dir_all(game_root).await?;
let eti_version = read_downloaded_version(game_root).await;
write_intent(
state_dir,
id,
&InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()),
)
.await?;
let staging = installing_dir(game_root);
if let Err(err) = prepare_owned_empty_dir(&staging).await {
let _ = write_intent(state_dir, id, &InstallIntent::none(id, eti_version)).await;
return Err(err);
}
let staging = tokio::fs::canonicalize(&staging).await.unwrap_or(staging);
Ok(StreamedInstallTransaction {
game_root: game_root.to_path_buf(),
state_dir: state_dir.to_path_buf(),
id: id.to_string(),
staging,
eti_version,
})
}
pub async fn install(
game_root: &Path,
state_dir: &Path,
@@ -258,7 +355,7 @@ async fn unpack_archives(
Ok(())
}
async fn root_eti_archives(game_root: &Path) -> eyre::Result<Vec<PathBuf>> {
pub(crate) async fn root_eti_archives(game_root: &Path) -> eyre::Result<Vec<PathBuf>> {
let mut entries = tokio::fs::read_dir(game_root).await?;
let mut archives = Vec::new();
while let Some(entry) = entries.next_entry().await? {
+32 -1
View File
@@ -32,6 +32,7 @@ mod remote_peer;
mod services;
mod startup;
mod state_paths;
mod stream_install;
#[cfg(test)]
mod test_support;
@@ -82,6 +83,7 @@ pub use crate::{
launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once},
startup::PeerRuntimeHandle,
state_paths::{launch_settings_applied_path, setup_done_path},
stream_install::{NoopStreamInstallProvider, StreamInstallFuture, StreamInstallProvider},
};
// =============================================================================
@@ -243,6 +245,8 @@ pub enum PeerCommand {
file_descriptions: Vec<GameFileDescription>,
install_after_download: bool,
},
/// Stream archive-expanded bytes directly into `local/` without keeping root archives.
StreamInstallGame { id: String },
/// Install already-downloaded archives into `local/`.
InstallGame { id: String },
/// Remove only the `local/` install for a game.
@@ -260,11 +264,29 @@ pub enum PeerCommand {
}
/// Optional startup settings for non-GUI callers and tests.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct PeerStartOptions {
/// Directory used for peer identity and other state.
pub state_dir: Option<PathBuf>,
pub active_outbound_transfers: Option<crate::context::OutboundTransfers>,
/// Provider used to stream archive entries for low-disk streamed installs.
pub stream_install_provider: Option<Arc<dyn StreamInstallProvider>>,
}
impl std::fmt::Debug for PeerStartOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerStartOptions")
.field("state_dir", &self.state_dir)
.field(
"active_outbound_transfers",
&self.active_outbound_transfers.as_ref().map(|_| "..."),
)
.field(
"stream_install_provider",
&self.stream_install_provider.as_ref().map(|_| "..."),
)
.finish()
}
}
// =============================================================================
@@ -314,11 +336,14 @@ pub fn start_peer_with_options(
let PeerStartOptions {
state_dir,
active_outbound_transfers,
stream_install_provider,
} = options;
let state_dir = resolve_state_dir(state_dir.as_deref());
let game_dir = game_dir.into();
let active_outbound_transfers = active_outbound_transfers
.unwrap_or_else(|| Arc::new(RwLock::new(std::collections::HashMap::new())));
let stream_install_provider =
stream_install_provider.unwrap_or_else(|| Arc::new(NoopStreamInstallProvider));
log::info!(
"Starting peer system with game directory: {}",
game_dir.display()
@@ -338,6 +363,7 @@ pub fn start_peer_with_options(
unpacker,
catalog,
active_outbound_transfers,
stream_install_provider,
))
}
@@ -355,6 +381,7 @@ async fn run_peer(
task_tracker: TaskTracker,
catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: crate::context::OutboundTransfers,
stream_install_provider: Arc<dyn StreamInstallProvider>,
) -> eyre::Result<()> {
let ctx = Ctx::new(
peer_game_db,
@@ -366,6 +393,7 @@ async fn run_peer(
task_tracker,
catalog,
active_outbound_transfers,
stream_install_provider,
);
if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await {
log::error!("Failed to load initial local game database: {err}");
@@ -439,6 +467,9 @@ async fn handle_peer_commands(
)
.await;
}
PeerCommand::StreamInstallGame { id } => {
handlers::handle_stream_install_game_command(ctx, tx_notify_ui, id).await;
}
PeerCommand::InstallGame { id } => {
handle_install_game_command(ctx, tx_notify_ui, id).await;
}
@@ -319,6 +319,7 @@ mod tests {
TaskTracker::new(),
Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(HashMap::new())),
Arc::new(crate::NoopStreamInstallProvider),
);
*ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000));
@@ -384,6 +384,7 @@ mod tests {
TaskTracker::new(),
Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(std::collections::HashMap::new())),
Arc::new(crate::NoopStreamInstallProvider),
)
}
@@ -15,6 +15,7 @@ use crate::{
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_matches_catalog},
peer::{send_game_file_chunk, send_game_file_data},
services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync},
stream_install::{send_game_install_stream, send_stream_install_error},
};
type ResponseWriter = FramedWrite<SendStream, LengthDelimitedCodec>;
@@ -99,6 +100,9 @@ async fn dispatch_request(
} => {
handle_file_chunk_request(ctx, game_id, relative_path, offset, length, framed_tx).await
}
Request::StreamInstall { game_id } => {
handle_stream_install_request(ctx, game_id, framed_tx).await
}
Request::Goodbye { peer_id } => {
handle_goodbye(ctx, remote_addr, peer_id).await;
framed_tx
@@ -386,6 +390,49 @@ async fn handle_file_chunk_request(
FramedWrite::new(tx, LengthDelimitedCodec::new())
}
async fn handle_stream_install_request(
ctx: &PeerCtx,
game_id: String,
framed_tx: ResponseWriter,
) -> ResponseWriter {
log::info!("Received StreamInstall request for {game_id} from peer");
let (guard, cancel_token) = TransferGuard::new(
game_id.clone(),
ctx.active_outbound_transfers.clone(),
ctx.tx_notify_ui.clone(),
&ctx.shutdown,
)
.await;
let mut tx = framed_tx.into_inner();
let game_dir = ctx.game_dir.read().await.clone();
if !can_serve_game(ctx, &game_dir, &game_id).await {
log::info!(
"Declining StreamInstall for {game_id} because the game is not currently transferable"
);
tx = send_stream_install_error(tx, format!("game {game_id} is not transferable")).await;
drop(guard);
return FramedWrite::new(tx, LengthDelimitedCodec::new());
}
let game_root = game_dir.join(&game_id);
let (returned_tx, result) = send_game_install_stream(
ctx.stream_install_provider.clone(),
tx,
&game_root,
&game_id,
cancel_token,
)
.await;
if let Err(err) = result {
log::warn!("StreamInstall for {game_id} ended with error: {err}");
}
drop(guard);
FramedWrite::new(returned_tx, LengthDelimitedCodec::new())
}
async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option<SocketAddr>, peer_id: String) {
log::info!("Received Goodbye from peer {peer_id}");
let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) };
@@ -442,6 +489,7 @@ mod tests {
TaskTracker::new(),
Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(std::collections::HashMap::new())),
Arc::new(crate::NoopStreamInstallProvider),
)
.to_peer_ctx(tx_notify_ui)
}
+3
View File
@@ -23,6 +23,7 @@ use crate::{
PeerCommand,
PeerEvent,
PeerRuntimeComponent,
StreamInstallProvider,
Unpacker,
context::Ctx,
events,
@@ -87,6 +88,7 @@ pub(crate) fn spawn_peer_runtime(
unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: crate::context::OutboundTransfers,
stream_install_provider: Arc<dyn StreamInstallProvider>,
) -> PeerRuntimeHandle {
let shutdown = CancellationToken::new();
let task_tracker = TaskTracker::new();
@@ -107,6 +109,7 @@ pub(crate) fn spawn_peer_runtime(
runtime_tracker.clone(),
catalog,
active_outbound_transfers,
stream_install_provider,
)
.await
{
+372
View File
@@ -0,0 +1,372 @@
use std::{
future::Future,
net::SocketAddr,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
};
use bytes::Bytes;
use crc32fast::Hasher;
use futures::{SinkExt, StreamExt};
use lanspread_proto::{Message, Request, StreamInstallFrame};
use s2n_quic::stream::SendStream;
use tokio::{
fs::File,
io::AsyncWriteExt,
sync::{mpsc, mpsc::UnboundedSender},
};
use tokio_util::{
codec::{FramedRead, FramedWrite, LengthDelimitedCodec},
sync::CancellationToken,
};
use crate::{
PeerEvent,
install::root_eti_archives,
network::connect_to_peer,
path_validation::validate_game_file_path,
};
const FRAME_CHANNEL_DEPTH: usize = 16;
pub type StreamInstallFuture<'a> = Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + 'a>>;
pub trait StreamInstallProvider: Send + Sync {
fn stream_archive<'a>(
&'a self,
archive: &'a Path,
frames: mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken,
) -> StreamInstallFuture<'a>;
}
#[derive(Debug, Default)]
pub struct NoopStreamInstallProvider;
impl StreamInstallProvider for NoopStreamInstallProvider {
fn stream_archive<'a>(
&'a self,
archive: &'a Path,
_frames: mpsc::Sender<StreamInstallFrame>,
_cancel_token: CancellationToken,
) -> StreamInstallFuture<'a> {
Box::pin(async move {
eyre::bail!(
"streamed install provider is not configured for {}",
archive.display()
)
})
}
}
pub(crate) async fn send_stream_install_error(
tx: SendStream,
message: impl Into<String>,
) -> SendStream {
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
if let Err(err) = framed_tx
.send(
StreamInstallFrame::Error {
message: message.into(),
}
.encode(),
)
.await
{
log::warn!("Failed to send streamed install error frame: {err}");
}
if let Err(err) = framed_tx.close().await {
log::debug!("Failed to close streamed install error response: {err}");
}
framed_tx.into_inner()
}
pub(crate) async fn send_game_install_stream(
provider: Arc<dyn StreamInstallProvider>,
tx: SendStream,
game_root: &Path,
game_id: &str,
cancel_token: CancellationToken,
) -> (SendStream, eyre::Result<()>) {
let archives = match root_eti_archives(game_root).await {
Ok(archives) => archives,
Err(err) => {
let message = err.to_string();
let tx = send_stream_install_error(tx, message.clone()).await;
return (tx, Err(eyre::eyre!(message)));
}
};
if archives.is_empty() {
let message = format!("no .eti archives found for {game_id}");
let tx = send_stream_install_error(tx, message.clone()).await;
return (tx, Err(eyre::eyre!(message)));
}
let (frame_tx, mut frame_rx) = mpsc::channel(FRAME_CHANNEL_DEPTH);
let producer_cancel = cancel_token.child_token();
let game_id_for_producer = game_id.to_string();
let producer = tokio::spawn({
let provider = provider.clone();
let producer_cancel = producer_cancel.clone();
async move {
for archive in archives {
if producer_cancel.is_cancelled() {
eyre::bail!("streamed install for {game_id_for_producer} was cancelled");
}
if let Err(err) = provider
.stream_archive(&archive, frame_tx.clone(), producer_cancel.clone())
.await
{
let message = err.to_string();
let _ = frame_tx.send(StreamInstallFrame::Error { message }).await;
return Err(err);
}
}
let _ = frame_tx.send(StreamInstallFrame::Complete).await;
Ok(())
}
});
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
let mut send_result = Ok(());
while let Some(frame) = frame_rx.recv().await {
if let Err(err) = framed_tx.send(frame.encode()).await {
producer_cancel.cancel();
send_result = Err(eyre::eyre!("failed to send streamed install frame: {err}"));
break;
}
}
let close_result = framed_tx
.close()
.await
.map_err(|err| eyre::eyre!("failed to close streamed install stream: {err}"));
let tx = framed_tx.into_inner();
let producer_result = match producer.await {
Ok(result) => result,
Err(err) => Err(eyre::eyre!("streamed install producer task failed: {err}")),
};
let result = send_result.and(producer_result).and(close_result);
(tx, result)
}
pub(crate) async fn receive_streamed_install(
peer_addr: SocketAddr,
game_id: &str,
staging_dir: &Path,
tx_notify_ui: UnboundedSender<PeerEvent>,
cancel_token: CancellationToken,
) -> eyre::Result<()> {
let staging_dir = tokio::fs::canonicalize(staging_dir)
.await
.unwrap_or_else(|_| staging_dir.to_path_buf());
let mut conn = connect_to_peer(peer_addr).await?;
let stream = conn.open_bidirectional_stream().await?;
let (rx, tx) = stream.split();
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
framed_tx
.send(
Request::StreamInstall {
game_id: game_id.to_string(),
}
.encode(),
)
.await?;
framed_tx.close().await?;
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut current_file: Option<IncomingFile> = None;
loop {
let next = tokio::select! {
() = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"),
next = framed_rx.next() => next,
};
let Some(frame) = next else {
eyre::bail!("streamed install ended before Complete");
};
let frame = frame?.freeze();
let frame = StreamInstallFrame::decode(frame);
match frame {
StreamInstallFrame::ArchiveBegin {
archive_name,
solid,
} => {
log::info!(
"Receiving streamed install archive {archive_name} for {game_id} (solid={solid})"
);
}
StreamInstallFrame::Directory { relative_path } => {
let path = resolve_stream_path(&staging_dir, &relative_path)?;
tokio::fs::create_dir_all(path).await?;
}
StreamInstallFrame::FileBegin {
relative_path,
size,
crc32,
} => {
if current_file.is_some() {
eyre::bail!("received FileBegin for {relative_path} before previous FileEnd");
}
let path = resolve_stream_path(&staging_dir, &relative_path)?;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = File::create(&path).await?;
current_file = Some(IncomingFile::new(relative_path, path, size, crc32, file));
}
StreamInstallFrame::FileChunk { bytes } => {
let Some(file) = current_file.as_mut() else {
eyre::bail!("received FileChunk without FileBegin");
};
file.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes)
.await?;
}
StreamInstallFrame::FileEnd { relative_path } => {
let Some(file) = current_file.take() else {
eyre::bail!("received FileEnd for {relative_path} without FileBegin");
};
file.finish(&relative_path).await?;
}
StreamInstallFrame::ArchiveEnd { archive_name } => {
log::info!("Finished streamed install archive {archive_name} for {game_id}");
}
StreamInstallFrame::Complete => {
if current_file.is_some() {
eyre::bail!("streamed install completed with an open file");
}
return Ok(());
}
StreamInstallFrame::Error { message } => {
eyre::bail!("streamed install sender failed: {message}");
}
}
}
}
struct IncomingFile {
relative_path: String,
path: PathBuf,
expected_size: u64,
expected_crc32: Option<u32>,
received: u64,
hasher: Hasher,
file: File,
}
impl IncomingFile {
fn new(
relative_path: String,
path: PathBuf,
expected_size: u64,
expected_crc32: Option<u32>,
file: File,
) -> Self {
Self {
relative_path,
path,
expected_size,
expected_crc32,
received: 0,
hasher: Hasher::new(),
file,
}
}
async fn write_chunk(
&mut self,
game_id: &str,
peer_addr: SocketAddr,
tx_notify_ui: &UnboundedSender<PeerEvent>,
bytes: Bytes,
) -> eyre::Result<()> {
let offset = self.received;
let length = u64::try_from(bytes.len())?;
if offset.saturating_add(length) > self.expected_size {
eyre::bail!(
"streamed file {} exceeded expected size {}",
self.relative_path,
self.expected_size
);
}
self.file.write_all(&bytes).await?;
self.hasher.update(&bytes);
self.received = self.received.saturating_add(length);
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
id: game_id.to_string(),
peer_addr,
relative_path: format!("{game_id}/local/{}", self.relative_path),
offset,
length,
});
Ok(())
}
async fn finish(mut self, relative_path: &str) -> eyre::Result<()> {
if self.relative_path != relative_path {
eyre::bail!(
"streamed file end mismatch: began {}, ended {relative_path}",
self.relative_path
);
}
self.file.flush().await?;
if self.received != self.expected_size {
eyre::bail!(
"streamed file {} size mismatch: got {}, expected {}",
self.relative_path,
self.received,
self.expected_size
);
}
if let Some(expected) = self.expected_crc32 {
let actual = self.hasher.finalize();
if actual != expected {
eyre::bail!(
"streamed file {} CRC32 mismatch: got {actual:08X}, expected {expected:08X}",
self.relative_path
);
}
}
log::debug!(
"Received streamed file {} -> {}",
self.relative_path,
self.path.display()
);
Ok(())
}
}
fn resolve_stream_path(staging_dir: &Path, relative_path: &str) -> eyre::Result<PathBuf> {
validate_game_file_path(staging_dir, relative_path)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_support::TempDir;
#[test]
fn stream_paths_stay_inside_staging_dir() {
let temp = TempDir::new("lanspread-stream-install-path");
let staging = temp.path().join("staging");
std::fs::create_dir_all(&staging).expect("staging should be created");
let staging = std::fs::canonicalize(staging).expect("staging should canonicalize");
assert!(resolve_stream_path(&staging, "bin/game.exe").is_ok());
assert!(resolve_stream_path(&staging, "../outside").is_err());
assert!(resolve_stream_path(&staging, "/absolute").is_err());
assert!(resolve_stream_path(&staging, "C:/windows").is_err());
}
}
+59 -1
View File
@@ -4,7 +4,7 @@ use bytes::Bytes;
use lanspread_db::db::{Game, GameFileDescription};
use serde::{Deserialize, Serialize};
pub const PROTOCOL_VERSION: u32 = 4;
pub const PROTOCOL_VERSION: u32 = 5;
pub use lanspread_db::db::Availability;
@@ -67,6 +67,9 @@ pub enum Request {
offset: u64,
length: u64,
},
StreamInstall {
game_id: String,
},
Hello(Hello),
LibraryDelta {
peer_id: String,
@@ -94,6 +97,35 @@ pub enum Response {
InternalPeerError(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum StreamInstallFrame {
ArchiveBegin {
archive_name: String,
solid: bool,
},
Directory {
relative_path: String,
},
FileBegin {
relative_path: String,
size: u64,
crc32: Option<u32>,
},
FileChunk {
bytes: Bytes,
},
FileEnd {
relative_path: String,
},
ArchiveEnd {
archive_name: String,
},
Complete,
Error {
message: String,
},
}
// Add Message trait
pub trait Message {
fn decode(bytes: Bytes) -> Self;
@@ -145,3 +177,29 @@ impl Message for Response {
}
}
}
impl Message for StreamInstallFrame {
fn decode(bytes: Bytes) -> Self {
match serde_json::from_slice(&bytes) {
Ok(t) => t,
Err(e) => {
tracing::error!(?e, "StreamInstallFrame decoding error");
StreamInstallFrame::Error {
message: format!("stream install frame decoding error: {e}"),
}
}
}
}
fn encode(&self) -> Bytes {
match serde_json::to_vec(self) {
Ok(s) => Bytes::from(s),
Err(e) => {
tracing::error!(?e, "StreamInstallFrame encoding error");
Bytes::from(format!(
r#"{{"Error": {{"message": "encoding error: {e}"}}}}"#
))
}
}
}
}
@@ -1876,6 +1876,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) {
PeerStartOptions {
state_dir: Some(state_dir),
active_outbound_transfers: Some(state.active_outbound_transfers.clone()),
stream_install_provider: None,
},
) {
Ok(handle) => {