Compare commits
2 Commits
8a8437036d
...
cc147def73
| Author | SHA1 | Date | |
|---|---|---|---|
|
cc147def73
|
|||
|
373def6d44
|
Generated
+4
@@ -2035,6 +2035,7 @@ name = "lanspread-peer"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"crc32fast",
|
||||
"eyre",
|
||||
"futures",
|
||||
"gethostname",
|
||||
@@ -2059,13 +2060,16 @@ dependencies = [
|
||||
name = "lanspread-peer-cli"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"eyre",
|
||||
"lanspread-compat",
|
||||
"lanspread-db",
|
||||
"lanspread-peer",
|
||||
"lanspread-proto",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -14,6 +14,7 @@ members = [
|
||||
[workspace.dependencies]
|
||||
base64 = "0.22"
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
crc32fast = "1"
|
||||
eyre = "0.6"
|
||||
futures = "0.3"
|
||||
gethostname = "1"
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
# Streamed Install Next Steps
|
||||
|
||||
I’d treat the prototype as proof of the hard part: “can we stream
|
||||
archive-derived install bytes into `local/` without making the receiver a
|
||||
source?” Yes. Next I’d harden the pieces that decide whether this is
|
||||
product-ready.
|
||||
|
||||
1. **Move from CLI-only to real app integration**
|
||||
|
||||
Add a GUI command/control path for “stream install / low disk mode”,
|
||||
probably behind an explicit option. The Tauri crate currently opts out with
|
||||
`stream_install_provider: None`, so the GUI cannot use it yet.
|
||||
|
||||
2. **Replace per-file `unrar p` with a final archive provider**
|
||||
|
||||
The prototype provider is intentionally simple: `unrar lt`, then `unrar p`
|
||||
per file. Good for non-solid archives, bad for solid archives. Final shape
|
||||
should be a one-pass provider with real entry boundaries, likely via libunrar
|
||||
or a purpose-built wrapper.
|
||||
|
||||
3. **Handle solid archives deliberately**
|
||||
|
||||
Add archive inspection that decides:
|
||||
|
||||
- non-solid: per-file streaming is fine
|
||||
- solid: one sequential archive pass only
|
||||
|
||||
This is the big architectural fork we discussed, and the prototype’s
|
||||
provider is the thing to swap.
|
||||
|
||||
4. **Decide the integrity model**
|
||||
|
||||
Current prototype verifies streamed bytes against RAR CRC32 from the
|
||||
sender’s archive headers. That catches corruption and provider bugs. It does
|
||||
not protect against a malicious peer lying. If you care about that, the next
|
||||
step is catalog-side trusted hashes for archive or extracted files.
|
||||
|
||||
5. **Upgrade retry/resume semantics**
|
||||
|
||||
Right now, failed stream means failed operation and rollback. Next useful
|
||||
step:
|
||||
|
||||
- retry whole stream from another trusted peer
|
||||
- later, maybe keep completed files and restart only the interrupted file
|
||||
- avoid byte-offset resume until there’s a strong reason
|
||||
|
||||
6. **Expand scenario coverage**
|
||||
|
||||
I’d add cases for:
|
||||
|
||||
- sender disconnect mid-stream
|
||||
- receiver cancel mid-stream
|
||||
- corrupted/truncated stream fails and leaves no `local/`
|
||||
- already-installed game rejects streamed install
|
||||
- multi-archive `.eti` roots stream in sorted order
|
||||
|
||||
7. **Clean product semantics**
|
||||
|
||||
Decide how the UI labels this state. It is installed but not downloaded, so
|
||||
“Local only” is technically correct, but users may need a clear affordance
|
||||
like “Installed, not shareable”.
|
||||
|
||||
My recommended next slice: make the provider abstraction final-ish, then
|
||||
implement a real one-pass provider. Everything else builds cleanly on that.
|
||||
@@ -0,0 +1,20 @@
|
||||
# Issues
|
||||
|
||||
1. (Major) File bytes are JSON-encoded — ~3–4× wire bloat + heavy CPU. StreamInstallFrame::FileChunk { bytes: Bytes } goes over the wire via serde_json (the Message impl). serde_json has no native byte type, so Bytes serializes as a JSON array of integers ([12,255,7,…]) — roughly 3–4 ASCII chars per payload byte, plus serialize/parse cost on both ends. The existing transfer path (peer.rs::stream_file_bytes) sends raw bytes straight on the QUIC stream for exactly this reason. For a feature whose entire purpose is moving file bytes efficiently on constrained peers, routing the payload through JSON undermines the goal. The 3 MiB S39 fixture hides it; a multi-GB game would not. Consider a binary codec (postcard/bincode) for the frame stream, or keep the control frames as-is but stream raw bytes for file content.
|
||||
|
||||
2. (Design) Solid archives → O(n²) extraction. Each file is pulled with a separate unrar p archive <file> (stream_unrar_file). For a solid .eti, unrar must re-decompress every preceding file on each invocation. solid is detected and sent in ArchiveBegin but never acted on. A solid multi-file archive could make streamed install pathologically slow. At minimum document the limitation; ideally a single-pass extraction for the solid case.
|
||||
|
||||
3. (Robustness) Single peer, no fallback. handle_stream_install_game_command picks one peer (peers.sort(); peers.into_iter().next()). If that peer declines (can_serve_game false) or the connection drops, the whole install fails — no retry against the other validated peers, unlike the normal multi-peer download. Fine for a prototype, worth a TODO.
|
||||
|
||||
4. (Minor) unrar p <name> is a wildcard mask. unrar treats the file argument as a pattern; a name containing */?/[ could match the wrong file or multiple files (multiple matches concatenate into stdout → corrupt stream). CRC32 would catch it as a failure rather than silent corruption, but it's a sharp edge. Worth noting / pinning exact-match if the unrar build supports it.
|
||||
|
||||
5. (Minor) CRC32 is optional. If unrar omits CRC32: (older builds, header-encrypted archives), crc32 is None and the receiver verifies size only.
|
||||
|
||||
6. (Minor / cosmetic)
|
||||
- DownloadGameFileChunkFinished reports relative_path as "{game_id}/local/…" while bytes actually land in staging (.local.installing); and there's no total-size up front, so the UI can't show a percentage.
|
||||
- parse_unrar_listing depends on unrar's human-readable lt output (Details:/Name:/Type:/Size:/CRC32:). -cfg- guards against user config, but it's still brittle across unrar versions/locales.
|
||||
|
||||
## Nits
|
||||
|
||||
- PeerStartOptions dropped #[derive(Debug)] for a hand-written Debug (Arc isn't Debug) — correct, and Clone/Default are retained. Good.
|
||||
- commit() renames staging into local/, so the .lanspread_owned marker lands inside local/ — but that's identical to the existing install_inner behavior, so no regression.
|
||||
+30
-4
@@ -46,6 +46,8 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path.
|
||||
| S36 | Latest singleton beats stale majority | Five peers advertise one game; one peer has `20260501`, four peers have `20250101`. | `list-games` reports `eti_game_version=20260501`; all descriptors and chunks come from the singleton latest peer; stale peers contribute zero bytes. |
|
||||
| S37 | Single-source download throughput | A source peer advertises a temporary catalog game with one sparse `2 GiB` `.eti`; an empty client downloads it with `install=false`. | The client emits `download-finished` with throughput measurements (`bytes`, `duration_ms`, `mib_per_s`, `mbit_per_s`), and the downloaded archive size matches the source. |
|
||||
| S38 | First-play launch-setting stamping | `fixture-persona/css` ships a real RAR `.eti` whose tree buries a CRLF `SmartSteamEmu.ini` with a stub `PersonaName` line under `engine/bin/win64/steam_settings/`, plus a stub `account_name.txt` and `language.txt` under `profiles/local/`. A peer installs `css` (with `--unrar`), then sends `play css` with a username and language, then `play css` again. | After install the marker `games/css/launch_settings_applied` is absent and the stub files are intact under `local/`. The first `play` returns `already_applied=false` with `account_name_written`, `language_written`, and `persona_name_written` all true; the deep `SmartSteamEmu.ini` `PersonaName` value becomes the username with its `\r\n` ending and sibling lines preserved, `account_name.txt` becomes the username, `language.txt` becomes the passed language, and the marker now exists. A second `play` returns `already_applied=true`, rewrites nothing, and leaves the files untouched even if their values were reset externally. |
|
||||
| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `got-game-files`, `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. |
|
||||
| S40 | Streamed install receiver is not a peer source | After S39, a third peer connects only to the streamed-install receiver. | The third peer may see the receiver's local-only summary in peer snapshots, but `list-games` remote aggregation does not expose `cnctw` as downloadable, `peer_count` remains zero/absent, and attempting `download cnctw` fails with no local files created. |
|
||||
|
||||
## Version-Skew Contract
|
||||
|
||||
@@ -105,13 +107,37 @@ Use S38 to pin down how launcher settings are stamped into an installed game:
|
||||
line keeps its existing line ending (`\n` or `\r\n`).
|
||||
- The marker records only that we *tried*: it is written unconditionally after
|
||||
the first play, so a game with none of these files is still marked done.
|
||||
- S38 needs a real archive expanded with `--unrar`, so it runs against the host
|
||||
`lanspread-peer-cli` binary rather than the Docker matrix image (which omits
|
||||
`unrar`). The peer crate's `launch_settings` unit tests cover the rewrite,
|
||||
line-ending, and marker logic deterministically.
|
||||
- S38 needs a real archive expanded with `--unrar`; the Docker matrix image now
|
||||
carries the Linux sidecar for streamed-install coverage, while the peer
|
||||
crate's `launch_settings` unit tests cover the rewrite, line-ending, and
|
||||
marker logic deterministically.
|
||||
|
||||
## Run Log
|
||||
|
||||
### 2026-06-07 - Streamed Install Prototype (S39-S40)
|
||||
|
||||
- Code under test added `stream-install` to `lanspread-peer-cli`, a peer
|
||||
`StreamInstallGame` command, streamed install frames over QUIC, and an
|
||||
injected `unrar lt`/`unrar p` provider for archive-derived bytes.
|
||||
- Gates before Docker: `just fmt` and
|
||||
`RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test` passed for the
|
||||
workspace.
|
||||
- Runner:
|
||||
`python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S39 S40 --build-image`
|
||||
passed against the rebuilt `lanspread-peer-cli:dev` image.
|
||||
- S39 streamed a catalog-version-adjusted `cnctw` fixture from a real RAR
|
||||
`.eti` into the receiver's `local/` only. The receiver had
|
||||
`downloaded=false`, `installed=true`, `availability=LocalOnly`, no root
|
||||
`version.ini`, no root `.eti`, and payload SHA-256 hashes
|
||||
`82f4da22dc042166def2a5ee2eca19fc9e52785f99838e86c32167cb342e2588`
|
||||
(`bin/cnctw-payload.bin`) and
|
||||
`abf833a06c74ea9f17d505c2684186491898ce906405e0f098f0deac19476b06`
|
||||
(`data/cnctw-assets.dat`) matching `unrar p`.
|
||||
- S40 connected an observer only to that streamed-install receiver. The
|
||||
observer saw the receiver's `cnctw` summary as local-only, remote aggregation
|
||||
hid it as a downloadable source, and `download cnctw` failed with
|
||||
`no peers have game cnctw`.
|
||||
|
||||
### 2026-05-28 - First-Play Launch-Setting Stamping (S38)
|
||||
|
||||
- Code under test moved the `account_name.txt`/`language.txt` overwrite out of
|
||||
|
||||
@@ -14,11 +14,14 @@ path = "src/main.rs"
|
||||
lanspread-compat = { path = "../lanspread-compat" }
|
||||
lanspread-db = { path = "../lanspread-db" }
|
||||
lanspread-peer = { path = "../lanspread-peer" }
|
||||
lanspread-proto = { path = "../lanspread-proto" }
|
||||
|
||||
bytes = { workspace = true }
|
||||
eyre = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
|
||||
[lints.clippy]
|
||||
needless_pass_by_value = "allow"
|
||||
|
||||
@@ -4,14 +4,16 @@ WORKDIR /work
|
||||
COPY . .
|
||||
RUN cargo build --release -p lanspread-peer-cli
|
||||
|
||||
FROM debian:bookworm-slim
|
||||
FROM debian:trixie-slim
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends ca-certificates \
|
||||
&& apt-get install -y --no-install-recommends ca-certificates libstdc++6 \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=build /work/target/release/lanspread-peer-cli /usr/local/bin/lanspread-peer-cli
|
||||
COPY crates/lanspread-tauri-deno-ts/src-tauri/game.db /app/game.db
|
||||
COPY crates/lanspread-tauri-deno-ts/src-tauri/binaries/unrar-x86_64-unknown-linux-gnu /usr/local/bin/unrar
|
||||
RUN chmod +x /usr/local/bin/unrar
|
||||
|
||||
ENTRYPOINT ["lanspread-peer-cli"]
|
||||
CMD ["--games-dir", "/games", "--state-dir", "/state", "--catalog-db", "/app/game.db"]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Run the peer-cli scenarios S1-S36 through Docker."""
|
||||
"""Run the peer-cli scenarios S1-S40 through Docker."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -8,6 +8,7 @@ import hashlib
|
||||
import json
|
||||
import os
|
||||
import queue
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -325,6 +326,8 @@ class Runner:
|
||||
("S35", self.s35_unknown_game_filtered),
|
||||
("S36", self.s36_latest_singleton),
|
||||
("S37", self.s37_single_source_download_throughput),
|
||||
("S39", self.s39_streamed_install_local_only),
|
||||
("S40", self.s40_streamed_receiver_not_source),
|
||||
]
|
||||
|
||||
for scenario_id, scenario in scenarios:
|
||||
@@ -1060,6 +1063,114 @@ class Runner:
|
||||
f"{throughput['chunks']} chunks"
|
||||
)
|
||||
|
||||
def stream_install_cnctw(self, prefix: str) -> tuple[Peer, Peer]:
|
||||
source_dir = self.fixture_root / f"{prefix}-bravo"
|
||||
copy_game("cnctw", source_dir, version="20160128")
|
||||
source = self.peer(f"{prefix}-bravo", games_dir=source_dir)
|
||||
client = self.peer(f"{prefix}-client")
|
||||
connect_many(client, [source])
|
||||
wait_remote_game(client, "cnctw", peer_count=1)
|
||||
waiter = LineWaiter(len(client.output))
|
||||
client.send({"cmd": "stream-install", "game_id": "cnctw"})
|
||||
client.wait_for(
|
||||
event_is("got-game-files", "cnctw"),
|
||||
timeout=20,
|
||||
description="got cnctw files",
|
||||
waiter=waiter,
|
||||
)
|
||||
client.wait_for(
|
||||
event_is("download-begin", "cnctw"),
|
||||
timeout=20,
|
||||
description="stream begin cnctw",
|
||||
waiter=waiter,
|
||||
)
|
||||
client.wait_for(
|
||||
event_is("download-finished", "cnctw"),
|
||||
timeout=60,
|
||||
description="stream finish cnctw",
|
||||
waiter=waiter,
|
||||
)
|
||||
client.wait_for(
|
||||
event_is("install-finished", "cnctw"),
|
||||
timeout=30,
|
||||
description="stream install cnctw",
|
||||
waiter=waiter,
|
||||
)
|
||||
return source, client
|
||||
|
||||
def s39_streamed_install_local_only(self) -> str:
|
||||
source, client = self.stream_install_cnctw("s39")
|
||||
game = wait_local_game(client, "cnctw", downloaded=False, installed=True)
|
||||
assert_game_state(
|
||||
game,
|
||||
downloaded=False,
|
||||
installed=True,
|
||||
availability="LocalOnly",
|
||||
)
|
||||
|
||||
game_root = client.host_games_dir / "cnctw"
|
||||
assert_not_exists(game_root / "version.ini")
|
||||
assert_not_exists(game_root / "cnctw.eti")
|
||||
|
||||
expected = {
|
||||
"bin/cnctw-payload.bin": unrar_entry_sha256(
|
||||
source, "cnctw", "bin/cnctw-payload.bin"
|
||||
),
|
||||
"data/cnctw-assets.dat": unrar_entry_sha256(
|
||||
source, "cnctw", "data/cnctw-assets.dat"
|
||||
),
|
||||
}
|
||||
actual = {
|
||||
rel: sha256_file(game_root / "local" / rel)
|
||||
for rel in expected
|
||||
}
|
||||
if actual != expected:
|
||||
raise ScenarioError(f"streamed local payload hashes mismatched: {actual} != {expected}")
|
||||
|
||||
streamed_bytes = sum(
|
||||
int(item.get("data", {}).get("length", 0))
|
||||
for item in client.output
|
||||
if item.get("type") == "event"
|
||||
and item.get("event") == "download-chunk-finished"
|
||||
and item.get("data", {}).get("game_id") == "cnctw"
|
||||
)
|
||||
expected_bytes = 3 * 1024 * 1024
|
||||
if streamed_bytes != expected_bytes:
|
||||
raise ScenarioError(
|
||||
f"streamed byte count mismatch: {streamed_bytes} != {expected_bytes}"
|
||||
)
|
||||
|
||||
return (
|
||||
"cnctw streamed into local/ only; root archive and version.ini absent; "
|
||||
f"payload hashes={actual}"
|
||||
)
|
||||
|
||||
def s40_streamed_receiver_not_source(self) -> str:
|
||||
_source, receiver = self.stream_install_cnctw("s40")
|
||||
observer = self.peer("s40-observer")
|
||||
connect_many(observer, [receiver])
|
||||
receiver_snapshot = wait_peer_has_game(observer, receiver.peer_id, "cnctw")
|
||||
summary = next(
|
||||
game
|
||||
for game in receiver_snapshot.get("games", [])
|
||||
if game.get("id") == "cnctw"
|
||||
)
|
||||
if summary.get("availability") != "LocalOnly" or summary.get("downloaded"):
|
||||
raise ScenarioError(f"receiver did not advertise cnctw as local-only: {summary}")
|
||||
|
||||
wait_remote_absent(observer, "cnctw", timeout=5)
|
||||
err = observer.send(
|
||||
{"cmd": "download", "game_id": "cnctw", "install": False},
|
||||
expect_error=True,
|
||||
)
|
||||
if "no peers have game cnctw" not in err["error"]:
|
||||
raise ScenarioError(f"unexpected local-only download error: {err}")
|
||||
assert_not_exists(observer.host_games_dir / "cnctw")
|
||||
return (
|
||||
"observer saw receiver's local-only cnctw snapshot, but remote aggregation hid it "
|
||||
f"and download errored '{err['error']}'"
|
||||
)
|
||||
|
||||
|
||||
def run(command: list[str], description: str) -> subprocess.CompletedProcess[str]:
|
||||
result = subprocess.run(
|
||||
@@ -1177,6 +1288,25 @@ def create_large_sparse_game(root: Path, *, size: int) -> None:
|
||||
handle.truncate(size)
|
||||
|
||||
|
||||
def sha256_file(path: Path) -> str:
|
||||
hasher = hashlib.sha256()
|
||||
with path.open("rb") as handle:
|
||||
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
|
||||
hasher.update(chunk)
|
||||
return hasher.hexdigest()
|
||||
|
||||
|
||||
def unrar_entry_sha256(peer: Peer, game_id: str, relative_path: str) -> str:
|
||||
command = (
|
||||
f"unrar p -inul /games/{shlex.quote(game_id)}/{shlex.quote(game_id)}.eti "
|
||||
f"{shlex.quote(relative_path)} | sha256sum"
|
||||
)
|
||||
output = peer.docker_exec("sh", "-c", command).stdout.strip()
|
||||
if not output:
|
||||
raise ScenarioError(f"empty sha256 output for {game_id}:{relative_path}")
|
||||
return output.split()[0]
|
||||
|
||||
|
||||
def format_bytes(size: int) -> str:
|
||||
return f"{size / 1024 / 1024 / 1024:.2f} GiB"
|
||||
|
||||
|
||||
@@ -5,15 +5,21 @@
|
||||
use std::{
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
process::Stdio,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
use eyre::{Context, OptionExt};
|
||||
use lanspread_peer::{UnpackFuture, Unpacker};
|
||||
use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, Unpacker};
|
||||
use lanspread_proto::StreamInstallFrame;
|
||||
use serde::Serialize;
|
||||
use serde_json::{Value, json};
|
||||
use tokio::{io::AsyncReadExt, sync::mpsc};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub const DEFAULT_FIXTURE_VERSION: &str = "20250101";
|
||||
const STREAM_CHUNK_SIZE: usize = 256 * 1024;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct CommandEnvelope {
|
||||
@@ -33,6 +39,9 @@ pub enum CliCommand {
|
||||
game_id: String,
|
||||
install_after_download: bool,
|
||||
},
|
||||
StreamInstall {
|
||||
game_id: String,
|
||||
},
|
||||
Install {
|
||||
game_id: String,
|
||||
},
|
||||
@@ -63,6 +72,7 @@ impl CliCommand {
|
||||
Self::ListGames => "list-games",
|
||||
Self::SetGameDir { .. } => "set-game-dir",
|
||||
Self::Download { .. } => "download",
|
||||
Self::StreamInstall { .. } => "stream-install",
|
||||
Self::Install { .. } => "install",
|
||||
Self::Uninstall { .. } => "uninstall",
|
||||
Self::Play { .. } => "play",
|
||||
@@ -101,6 +111,9 @@ pub fn parse_command_value(value: &Value) -> eyre::Result<CommandEnvelope> {
|
||||
game_id: game_id(object)?,
|
||||
install_after_download: install_after_download(object)?,
|
||||
},
|
||||
"stream-install" => CliCommand::StreamInstall {
|
||||
game_id: game_id(object)?,
|
||||
},
|
||||
"install" => CliCommand::Install {
|
||||
game_id: game_id(object)?,
|
||||
},
|
||||
@@ -254,6 +267,270 @@ impl Unpacker for ExternalUnrarUnpacker {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ExternalUnrarStreamProvider {
|
||||
program: PathBuf,
|
||||
}
|
||||
|
||||
impl ExternalUnrarStreamProvider {
|
||||
#[must_use]
|
||||
pub fn new(program: PathBuf) -> Self {
|
||||
Self { program }
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamInstallProvider for ExternalUnrarStreamProvider {
|
||||
fn stream_archive<'a>(
|
||||
&'a self,
|
||||
archive: &'a Path,
|
||||
frames: mpsc::Sender<StreamInstallFrame>,
|
||||
cancel_token: CancellationToken,
|
||||
) -> StreamInstallFuture<'a> {
|
||||
Box::pin(async move {
|
||||
let listing = unrar_listing(&self.program, archive).await?;
|
||||
let archive_name = archive
|
||||
.file_name()
|
||||
.and_then(|name| name.to_str())
|
||||
.unwrap_or("archive.eti")
|
||||
.to_string();
|
||||
|
||||
send_stream_frame(
|
||||
&frames,
|
||||
StreamInstallFrame::ArchiveBegin {
|
||||
archive_name: archive_name.clone(),
|
||||
solid: listing.solid,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
for entry in listing.entries {
|
||||
if cancel_token.is_cancelled() {
|
||||
eyre::bail!("streamed archive {} was cancelled", archive.display());
|
||||
}
|
||||
|
||||
match entry.kind {
|
||||
RarEntryKind::Directory => {
|
||||
send_stream_frame(
|
||||
&frames,
|
||||
StreamInstallFrame::Directory {
|
||||
relative_path: entry.relative_path,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
RarEntryKind::File => {
|
||||
send_stream_frame(
|
||||
&frames,
|
||||
StreamInstallFrame::FileBegin {
|
||||
relative_path: entry.relative_path.clone(),
|
||||
size: entry.size,
|
||||
crc32: entry.crc32,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
stream_unrar_file(
|
||||
&self.program,
|
||||
archive,
|
||||
&entry.relative_path,
|
||||
&frames,
|
||||
cancel_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
send_stream_frame(
|
||||
&frames,
|
||||
StreamInstallFrame::FileEnd {
|
||||
relative_path: entry.relative_path,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct RarListing {
|
||||
solid: bool,
|
||||
entries: Vec<RarEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct RarEntry {
|
||||
relative_path: String,
|
||||
kind: RarEntryKind,
|
||||
size: u64,
|
||||
crc32: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum RarEntryKind {
|
||||
File,
|
||||
Directory,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct RarEntryDraft {
|
||||
relative_path: Option<String>,
|
||||
kind: Option<RarEntryKind>,
|
||||
size: Option<u64>,
|
||||
crc32: Option<u32>,
|
||||
}
|
||||
|
||||
async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result<RarListing> {
|
||||
let output = tokio::process::Command::new(program)
|
||||
.arg("lt")
|
||||
.arg("-cfg-")
|
||||
.arg(archive)
|
||||
.output()
|
||||
.await?;
|
||||
if !output.status.success() {
|
||||
eyre::bail!(
|
||||
"unrar lt failed for {} with status {}: {}",
|
||||
archive.display(),
|
||||
output.status,
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
);
|
||||
}
|
||||
|
||||
parse_unrar_listing(&String::from_utf8_lossy(&output.stdout))
|
||||
}
|
||||
|
||||
fn parse_unrar_listing(output: &str) -> eyre::Result<RarListing> {
|
||||
let mut solid = false;
|
||||
let mut entries = Vec::new();
|
||||
let mut current = RarEntryDraft::default();
|
||||
|
||||
for line in output.lines() {
|
||||
let trimmed = line.trim();
|
||||
if let Some(details) = trimmed.strip_prefix("Details:") {
|
||||
solid = details.to_ascii_lowercase().contains("solid");
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(name) = trimmed.strip_prefix("Name:") {
|
||||
push_rar_entry(&mut entries, std::mem::take(&mut current))?;
|
||||
current.relative_path = Some(name.trim().to_string());
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(kind) = trimmed.strip_prefix("Type:") {
|
||||
current.kind = match kind.trim() {
|
||||
"File" => Some(RarEntryKind::File),
|
||||
"Directory" => Some(RarEntryKind::Directory),
|
||||
_ => None,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(size) = trimmed.strip_prefix("Size:") {
|
||||
current.size = Some(size.trim().parse()?);
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(crc) = trimmed.strip_prefix("CRC32:") {
|
||||
current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?);
|
||||
}
|
||||
}
|
||||
|
||||
push_rar_entry(&mut entries, current)?;
|
||||
Ok(RarListing { solid, entries })
|
||||
}
|
||||
|
||||
fn push_rar_entry(entries: &mut Vec<RarEntry>, draft: RarEntryDraft) -> eyre::Result<()> {
|
||||
let Some(relative_path) = draft.relative_path else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some(kind) = draft.kind else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let size = match kind {
|
||||
RarEntryKind::File => draft
|
||||
.size
|
||||
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?,
|
||||
RarEntryKind::Directory => 0,
|
||||
};
|
||||
|
||||
entries.push(RarEntry {
|
||||
relative_path,
|
||||
kind,
|
||||
size,
|
||||
crc32: draft.crc32,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stream_unrar_file(
|
||||
program: &Path,
|
||||
archive: &Path,
|
||||
relative_path: &str,
|
||||
frames: &mpsc::Sender<StreamInstallFrame>,
|
||||
cancel_token: CancellationToken,
|
||||
) -> eyre::Result<()> {
|
||||
let mut child = tokio::process::Command::new(program)
|
||||
.arg("p")
|
||||
.arg("-inul")
|
||||
.arg("-cfg-")
|
||||
.arg(archive)
|
||||
.arg(relative_path)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::null())
|
||||
.spawn()?;
|
||||
|
||||
let mut stdout = child
|
||||
.stdout
|
||||
.take()
|
||||
.ok_or_eyre("unrar stdout was not captured")?;
|
||||
let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE];
|
||||
|
||||
loop {
|
||||
let read = tokio::select! {
|
||||
() = cancel_token.cancelled() => {
|
||||
let _ = child.kill().await;
|
||||
eyre::bail!("streaming {relative_path} from {} was cancelled", archive.display());
|
||||
}
|
||||
read = stdout.read(&mut buffer) => read?,
|
||||
};
|
||||
|
||||
if read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
send_stream_frame(
|
||||
frames,
|
||||
StreamInstallFrame::FileChunk {
|
||||
bytes: Bytes::copy_from_slice(&buffer[..read]),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let status = child.wait().await?;
|
||||
if !status.success() {
|
||||
eyre::bail!(
|
||||
"unrar p failed for {}:{} with status {status}",
|
||||
archive.display(),
|
||||
relative_path
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_stream_frame(
|
||||
frames: &mpsc::Sender<StreamInstallFrame>,
|
||||
frame: StreamInstallFrame,
|
||||
) -> eyre::Result<()> {
|
||||
frames
|
||||
.send(frame)
|
||||
.await
|
||||
.map_err(|_| eyre::eyre!("streamed install frame receiver closed"))
|
||||
}
|
||||
|
||||
pub fn result_line(id: &Option<Value>, command: &str, data: Value) -> eyre::Result<String> {
|
||||
output_line(json!({
|
||||
"type": "result",
|
||||
@@ -344,6 +621,57 @@ mod tests {
|
||||
assert_eq!(parsed["data"]["peer_count"], 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parses_stream_install_command() {
|
||||
let parsed = parse_command_line(r#"{"cmd":"stream-install","game_id":"cnctw"}"#)
|
||||
.expect("command should parse");
|
||||
|
||||
assert_eq!(
|
||||
parsed.command,
|
||||
CliCommand::StreamInstall {
|
||||
game_id: "cnctw".to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parses_unrar_technical_listing() {
|
||||
let listing = parse_unrar_listing(
|
||||
r#"
|
||||
Archive: game.eti
|
||||
Details: RAR 5
|
||||
|
||||
Name: bin/payload.bin
|
||||
Type: File
|
||||
Size: 123
|
||||
CRC32: 38B488A7
|
||||
|
||||
Name: bin
|
||||
Type: Directory
|
||||
"#,
|
||||
)
|
||||
.expect("listing should parse");
|
||||
|
||||
assert!(!listing.solid);
|
||||
assert_eq!(
|
||||
listing.entries,
|
||||
vec![
|
||||
RarEntry {
|
||||
relative_path: "bin/payload.bin".to_string(),
|
||||
kind: RarEntryKind::File,
|
||||
size: 123,
|
||||
crc32: Some(0x38B4_88A7),
|
||||
},
|
||||
RarEntry {
|
||||
relative_path: "bin".to_string(),
|
||||
kind: RarEntryKind::Directory,
|
||||
size: 0,
|
||||
crc32: None,
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fixture_unpacker_creates_install_payload() {
|
||||
let temp = TempDir::new("lanspread-peer-cli-fixture");
|
||||
|
||||
@@ -17,6 +17,7 @@ use lanspread_peer::{
|
||||
ActiveOperation,
|
||||
ActiveOperationKind,
|
||||
InstallOperation,
|
||||
NoopStreamInstallProvider,
|
||||
PeerCommand,
|
||||
PeerEvent,
|
||||
PeerGameDB,
|
||||
@@ -24,6 +25,7 @@ use lanspread_peer::{
|
||||
PeerRuntimeHandle,
|
||||
PeerSnapshot,
|
||||
PeerStartOptions,
|
||||
StreamInstallProvider,
|
||||
migrate_legacy_state,
|
||||
start_peer_with_options,
|
||||
};
|
||||
@@ -31,6 +33,7 @@ use lanspread_peer_cli::{
|
||||
CliCommand,
|
||||
CommandEnvelope,
|
||||
DEFAULT_FIXTURE_VERSION,
|
||||
ExternalUnrarStreamProvider,
|
||||
ExternalUnrarUnpacker,
|
||||
FixtureSeed,
|
||||
FixtureUnpacker,
|
||||
@@ -134,10 +137,15 @@ async fn main() -> eyre::Result<()> {
|
||||
let (tx_events, rx_events) = mpsc::unbounded_channel();
|
||||
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
|
||||
let catalog = Arc::new(RwLock::new(catalog));
|
||||
let unpacker: Arc<dyn lanspread_peer::Unpacker> = match args.unrar {
|
||||
let unrar_for_streaming = args.unrar.clone().or_else(default_unrar_program);
|
||||
let unpacker: Arc<dyn lanspread_peer::Unpacker> = match args.unrar.clone() {
|
||||
Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)),
|
||||
None => Arc::new(FixtureUnpacker),
|
||||
};
|
||||
let stream_install_provider: Arc<dyn StreamInstallProvider> = match unrar_for_streaming {
|
||||
Some(path) => Arc::new(ExternalUnrarStreamProvider::new(path)),
|
||||
None => Arc::new(NoopStreamInstallProvider),
|
||||
};
|
||||
|
||||
let mut handle = start_peer_with_options(
|
||||
args.games_dir.clone(),
|
||||
@@ -148,6 +156,7 @@ async fn main() -> eyre::Result<()> {
|
||||
PeerStartOptions {
|
||||
state_dir: Some(args.state_dir.clone()),
|
||||
active_outbound_transfers: None,
|
||||
stream_install_provider: Some(stream_install_provider),
|
||||
},
|
||||
)?;
|
||||
let sender = handle.sender();
|
||||
@@ -249,6 +258,15 @@ async fn handle_command(
|
||||
})?;
|
||||
Ok(json!({"queued": true, "game_id": game_id, "install": install_after_download}))
|
||||
}
|
||||
CliCommand::StreamInstall { game_id } => {
|
||||
ensure_catalog_game(shared, game_id).await?;
|
||||
ensure_no_active_operation(shared, game_id).await?;
|
||||
let _ = game_files_for_download(sender, shared, game_id).await?;
|
||||
sender.send(PeerCommand::StreamInstallGame {
|
||||
id: game_id.clone(),
|
||||
})?;
|
||||
Ok(json!({"queued": true, "game_id": game_id}))
|
||||
}
|
||||
CliCommand::Install { game_id } => {
|
||||
ensure_catalog_game(shared, game_id).await?;
|
||||
ensure_no_active_operation(shared, game_id).await?;
|
||||
@@ -729,6 +747,15 @@ fn default_catalog_db() -> Option<PathBuf> {
|
||||
.find(|path| path.exists())
|
||||
}
|
||||
|
||||
fn default_unrar_program() -> Option<PathBuf> {
|
||||
[
|
||||
PathBuf::from("/usr/local/bin/unrar"),
|
||||
PathBuf::from("/usr/bin/unrar"),
|
||||
]
|
||||
.into_iter()
|
||||
.find(|path| path.exists())
|
||||
}
|
||||
|
||||
fn next_string(args: &mut impl Iterator<Item = OsString>, flag: &str) -> eyre::Result<String> {
|
||||
args.next()
|
||||
.ok_or_else(|| eyre::eyre!("{flag} requires a value"))?
|
||||
|
||||
@@ -14,6 +14,7 @@ lanspread-utils = { path = "../lanspread-utils" }
|
||||
|
||||
# external
|
||||
bytes = { workspace = true }
|
||||
crc32fast = { workspace = true }
|
||||
eyre = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
gethostname = { workspace = true }
|
||||
|
||||
@@ -6,7 +6,14 @@ use lanspread_db::db::{GameCatalog, GameDB};
|
||||
use tokio::sync::{RwLock, mpsc::UnboundedSender};
|
||||
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
||||
|
||||
use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB};
|
||||
use crate::{
|
||||
PeerEvent,
|
||||
StreamInstallProvider,
|
||||
Unpacker,
|
||||
events,
|
||||
library::LocalLibraryState,
|
||||
peer_db::PeerGameDB,
|
||||
};
|
||||
|
||||
/// Thread-safe map of active outbound file transfers grouped by game ID.
|
||||
pub type OutboundTransfers = Arc<RwLock<HashMap<String, Vec<(u64, CancellationToken)>>>>;
|
||||
@@ -38,6 +45,7 @@ pub struct Ctx {
|
||||
pub active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
|
||||
pub active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
|
||||
pub unpacker: Arc<dyn Unpacker>,
|
||||
pub stream_install_provider: Arc<dyn StreamInstallProvider>,
|
||||
pub catalog: Arc<RwLock<GameCatalog>>,
|
||||
pub peer_id: Arc<String>,
|
||||
pub shutdown: CancellationToken,
|
||||
@@ -57,6 +65,7 @@ pub struct PeerCtx {
|
||||
pub catalog: Arc<RwLock<GameCatalog>>,
|
||||
pub peer_id: Arc<String>,
|
||||
pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender<PeerEvent>,
|
||||
pub stream_install_provider: Arc<dyn StreamInstallProvider>,
|
||||
pub shutdown: CancellationToken,
|
||||
pub task_tracker: TaskTracker,
|
||||
pub active_outbound_transfers: OutboundTransfers,
|
||||
@@ -86,6 +95,7 @@ impl Ctx {
|
||||
task_tracker: TaskTracker,
|
||||
catalog: Arc<RwLock<GameCatalog>>,
|
||||
active_outbound_transfers: OutboundTransfers,
|
||||
stream_install_provider: Arc<dyn StreamInstallProvider>,
|
||||
) -> Self {
|
||||
Self {
|
||||
game_dir: Arc::new(RwLock::new(game_dir)),
|
||||
@@ -97,6 +107,7 @@ impl Ctx {
|
||||
active_operations: Arc::new(RwLock::new(HashMap::new())),
|
||||
active_downloads: Arc::new(RwLock::new(HashMap::new())),
|
||||
unpacker,
|
||||
stream_install_provider,
|
||||
catalog,
|
||||
peer_id: Arc::new(peer_id),
|
||||
shutdown,
|
||||
@@ -120,6 +131,7 @@ impl Ctx {
|
||||
catalog: self.catalog.clone(),
|
||||
peer_id: self.peer_id.clone(),
|
||||
tx_notify_ui,
|
||||
stream_install_provider: self.stream_install_provider.clone(),
|
||||
shutdown: self.shutdown.clone(),
|
||||
task_tracker: self.task_tracker.clone(),
|
||||
active_outbound_transfers: self.active_outbound_transfers.clone(),
|
||||
|
||||
@@ -11,6 +11,7 @@ use std::{
|
||||
|
||||
use lanspread_db::db::{GameDB, GameFileDescription};
|
||||
use tokio::sync::{RwLock, mpsc::UnboundedSender};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
InstallOperation,
|
||||
@@ -33,6 +34,7 @@ use crate::{
|
||||
peer_db::PeerGameDB,
|
||||
remote_peer::ensure_peer_id_for_addr,
|
||||
services::{HandshakeCtx, perform_handshake_with_peer},
|
||||
stream_install::receive_streamed_install,
|
||||
};
|
||||
|
||||
// =============================================================================
|
||||
@@ -450,6 +452,91 @@ pub async fn handle_install_game_command(
|
||||
spawn_install_operation(ctx, tx_notify_ui, id);
|
||||
}
|
||||
|
||||
pub async fn handle_stream_install_game_command(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
id: String,
|
||||
) {
|
||||
if !catalog_contains(ctx, &id).await {
|
||||
log::warn!("Ignoring streamed install command for non-catalog game {id}");
|
||||
send_download_failed(tx_notify_ui, &id);
|
||||
return;
|
||||
}
|
||||
|
||||
let games_folder = { ctx.game_dir.read().await.clone() };
|
||||
let game_root = games_folder.join(&id);
|
||||
if local_dir_is_directory(&game_root).await {
|
||||
log::warn!("Ignoring streamed install command for already-installed game {id}");
|
||||
send_download_failed(tx_notify_ui, &id);
|
||||
return;
|
||||
}
|
||||
|
||||
let expected_version = catalog_expected_version(ctx, &id).await;
|
||||
let mut peers = {
|
||||
match ctx
|
||||
.peer_game_db
|
||||
.read()
|
||||
.await
|
||||
.validate_file_sizes_majority(&id, expected_version.as_deref())
|
||||
{
|
||||
Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => {
|
||||
peer_whitelist
|
||||
}
|
||||
Ok(_) => {
|
||||
log::error!("No trusted peers available for streamed install of {id}");
|
||||
send_download_failed(tx_notify_ui, &id);
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
"File size majority validation failed for streamed install {id}: {err}"
|
||||
);
|
||||
send_download_failed(tx_notify_ui, &id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
peers.sort();
|
||||
let Some(peer_addr) = peers.into_iter().next() else {
|
||||
log::error!("No peer selected for streamed install of {id}");
|
||||
send_download_failed(tx_notify_ui, &id);
|
||||
return;
|
||||
};
|
||||
|
||||
match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
|
||||
BeginOperationResult::Started => {}
|
||||
BeginOperationResult::AlreadyActive => {
|
||||
log::warn!("Operation for {id} already in progress; ignoring streamed install request");
|
||||
return;
|
||||
}
|
||||
BeginOperationResult::DrainTimedOut => {
|
||||
log::error!("Timed out waiting for outbound transfers before streamed install of {id}");
|
||||
send_download_failed(tx_notify_ui, &id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let cancel_token = ctx.shutdown.child_token();
|
||||
ctx.active_downloads
|
||||
.write()
|
||||
.await
|
||||
.insert(id.clone(), cancel_token.clone());
|
||||
|
||||
let ctx_clone = ctx.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
ctx.task_tracker.spawn(async move {
|
||||
run_stream_install_operation(
|
||||
ctx_clone,
|
||||
tx_notify_ui,
|
||||
id,
|
||||
game_root,
|
||||
peer_addr,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
/// Handles the `UninstallGame` command.
|
||||
pub async fn handle_uninstall_game_command(
|
||||
ctx: &Ctx,
|
||||
@@ -490,6 +577,151 @@ pub async fn handle_cancel_download_command(
|
||||
cancel_token.cancel();
|
||||
}
|
||||
|
||||
async fn run_stream_install_operation(
|
||||
ctx: Ctx,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
id: String,
|
||||
game_root: PathBuf,
|
||||
peer_addr: SocketAddr,
|
||||
cancel_token: CancellationToken,
|
||||
) {
|
||||
let download_guard = OperationGuard::download(
|
||||
id.clone(),
|
||||
ctx.active_operations.clone(),
|
||||
ctx.active_downloads.clone(),
|
||||
tx_notify_ui.clone(),
|
||||
);
|
||||
|
||||
events::send(
|
||||
&tx_notify_ui,
|
||||
PeerEvent::DownloadGameFilesBegin { id: id.clone() },
|
||||
);
|
||||
|
||||
let transaction = match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id)
|
||||
.await
|
||||
{
|
||||
Ok(transaction) => transaction,
|
||||
Err(err) => {
|
||||
log::error!("Failed to prepare streamed install for {id}: {err}");
|
||||
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let receive_result = receive_streamed_install(
|
||||
peer_addr,
|
||||
&id,
|
||||
transaction.staging_dir(),
|
||||
tx_notify_ui.clone(),
|
||||
cancel_token.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match receive_result {
|
||||
Ok(()) => {
|
||||
if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing)
|
||||
.await
|
||||
{
|
||||
clear_active_download(&ctx, &id).await;
|
||||
send_download_finished(&tx_notify_ui, &id);
|
||||
download_guard.disarm();
|
||||
commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await;
|
||||
} else {
|
||||
if let Err(err) = transaction.rollback().await {
|
||||
log::error!("Failed to roll back streamed install for {id}: {err}");
|
||||
}
|
||||
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
if let Err(rollback_err) = transaction.rollback().await {
|
||||
log::error!("Failed to roll back streamed install for {id}: {rollback_err}");
|
||||
}
|
||||
let download_was_cancelled = cancel_token.is_cancelled();
|
||||
if download_was_cancelled {
|
||||
log::info!("Streamed install download cancelled for {id}: {err}");
|
||||
} else {
|
||||
log::error!("Streamed install download failed for {id}: {err}");
|
||||
}
|
||||
finish_failed_stream_download(
|
||||
&ctx,
|
||||
&tx_notify_ui,
|
||||
&id,
|
||||
download_guard,
|
||||
download_was_cancelled,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn finish_failed_stream_download(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
id: &str,
|
||||
guard: OperationGuard,
|
||||
cancelled: bool,
|
||||
) {
|
||||
if let Err(err) = refresh_local_game_for_ending_operation(ctx, tx_notify_ui, id).await {
|
||||
log::error!("Failed to refresh local library after streamed install failure: {err}");
|
||||
}
|
||||
end_download_operation(ctx, tx_notify_ui, id).await;
|
||||
guard.disarm();
|
||||
send_download_failed_unless_cancelled(tx_notify_ui, id, cancelled);
|
||||
}
|
||||
|
||||
async fn commit_streamed_install(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
id: String,
|
||||
transaction: install::StreamedInstallTransaction,
|
||||
) {
|
||||
let operation_guard = OperationGuard::new(
|
||||
id.clone(),
|
||||
ctx.active_operations.clone(),
|
||||
tx_notify_ui.clone(),
|
||||
);
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
PeerEvent::InstallGameBegin {
|
||||
id: id.clone(),
|
||||
operation: InstallOperation::Installing,
|
||||
},
|
||||
);
|
||||
|
||||
match transaction.commit().await {
|
||||
Ok(()) => {
|
||||
if let Err(err) = refresh_local_game_for_ending_operation(ctx, tx_notify_ui, &id).await
|
||||
{
|
||||
log::error!("Failed to refresh local library after streamed install: {err}");
|
||||
}
|
||||
end_operation(ctx, tx_notify_ui, &id).await;
|
||||
operation_guard.disarm();
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
PeerEvent::InstallGameFinished { id: id.clone() },
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Streamed install commit failed for {id}: {err}");
|
||||
if let Err(refresh_err) =
|
||||
refresh_local_game_for_ending_operation(ctx, tx_notify_ui, &id).await
|
||||
{
|
||||
log::error!(
|
||||
"Failed to refresh local library after streamed install commit failure: {refresh_err}"
|
||||
);
|
||||
}
|
||||
end_operation(ctx, tx_notify_ui, &id).await;
|
||||
operation_guard.disarm();
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
PeerEvent::InstallGameFailed { id: id.clone() },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>, id: String) {
|
||||
let ctx = ctx.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
@@ -1264,6 +1496,7 @@ mod tests {
|
||||
TaskTracker::new(),
|
||||
Arc::new(RwLock::new(GameCatalog::from_ids(["game".to_string()]))),
|
||||
Arc::new(RwLock::new(HashMap::new())),
|
||||
Arc::new(crate::NoopStreamInstallProvider),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -4,5 +4,13 @@ mod transaction;
|
||||
pub mod unpack;
|
||||
|
||||
pub use remove::remove_downloaded;
|
||||
pub use transaction::{install, recover_on_startup, uninstall, update};
|
||||
pub(crate) use transaction::root_eti_archives;
|
||||
pub use transaction::{
|
||||
StreamedInstallTransaction,
|
||||
begin_streamed_install,
|
||||
install,
|
||||
recover_on_startup,
|
||||
uninstall,
|
||||
update,
|
||||
};
|
||||
pub use unpack::{UnpackFuture, Unpacker};
|
||||
|
||||
@@ -33,6 +33,103 @@ struct InstallFsState {
|
||||
backup: FsEntryState,
|
||||
}
|
||||
|
||||
pub struct StreamedInstallTransaction {
|
||||
game_root: PathBuf,
|
||||
state_dir: PathBuf,
|
||||
id: String,
|
||||
staging: PathBuf,
|
||||
eti_version: Option<String>,
|
||||
}
|
||||
|
||||
impl StreamedInstallTransaction {
|
||||
#[must_use]
|
||||
pub fn staging_dir(&self) -> &Path {
|
||||
&self.staging
|
||||
}
|
||||
|
||||
pub async fn commit(self) -> eyre::Result<()> {
|
||||
let local = local_dir(&self.game_root);
|
||||
let result = async {
|
||||
tokio::fs::rename(&self.staging, &local)
|
||||
.await
|
||||
.wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))?;
|
||||
reset_launch_settings_marker(&self.state_dir, &self.id).await?;
|
||||
write_intent(
|
||||
&self.state_dir,
|
||||
&self.id,
|
||||
&InstallIntent::none(&self.id, self.eti_version.clone()),
|
||||
)
|
||||
.await
|
||||
}
|
||||
.await;
|
||||
|
||||
if result.is_err() {
|
||||
if let Err(cleanup_err) = remove_dir_all_if_exists(&self.staging).await {
|
||||
log::warn!(
|
||||
"Failed to clean streamed install staging {}: {cleanup_err}",
|
||||
self.staging.display()
|
||||
);
|
||||
}
|
||||
let _ = write_intent(
|
||||
&self.state_dir,
|
||||
&self.id,
|
||||
&InstallIntent::none(&self.id, self.eti_version.clone()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub async fn rollback(self) -> eyre::Result<()> {
|
||||
let staging_result = remove_dir_all_if_exists(&self.staging).await;
|
||||
let intent_result = write_intent(
|
||||
&self.state_dir,
|
||||
&self.id,
|
||||
&InstallIntent::none(&self.id, self.eti_version.clone()),
|
||||
)
|
||||
.await;
|
||||
|
||||
staging_result?;
|
||||
intent_result
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn begin_streamed_install(
|
||||
game_root: &Path,
|
||||
state_dir: &Path,
|
||||
id: &str,
|
||||
) -> eyre::Result<StreamedInstallTransaction> {
|
||||
if path_is_dir(&local_dir(game_root)).await {
|
||||
eyre::bail!("game {id} is already installed");
|
||||
}
|
||||
|
||||
tokio::fs::create_dir_all(game_root).await?;
|
||||
let eti_version = read_downloaded_version(game_root).await;
|
||||
write_intent(
|
||||
state_dir,
|
||||
id,
|
||||
&InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let staging = installing_dir(game_root);
|
||||
if let Err(err) = prepare_owned_empty_dir(&staging).await {
|
||||
let _ = write_intent(state_dir, id, &InstallIntent::none(id, eti_version)).await;
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let staging = tokio::fs::canonicalize(&staging).await.unwrap_or(staging);
|
||||
|
||||
Ok(StreamedInstallTransaction {
|
||||
game_root: game_root.to_path_buf(),
|
||||
state_dir: state_dir.to_path_buf(),
|
||||
id: id.to_string(),
|
||||
staging,
|
||||
eti_version,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn install(
|
||||
game_root: &Path,
|
||||
state_dir: &Path,
|
||||
@@ -258,7 +355,7 @@ async fn unpack_archives(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn root_eti_archives(game_root: &Path) -> eyre::Result<Vec<PathBuf>> {
|
||||
pub(crate) async fn root_eti_archives(game_root: &Path) -> eyre::Result<Vec<PathBuf>> {
|
||||
let mut entries = tokio::fs::read_dir(game_root).await?;
|
||||
let mut archives = Vec::new();
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
|
||||
@@ -32,6 +32,7 @@ mod remote_peer;
|
||||
mod services;
|
||||
mod startup;
|
||||
mod state_paths;
|
||||
mod stream_install;
|
||||
#[cfg(test)]
|
||||
mod test_support;
|
||||
|
||||
@@ -82,6 +83,7 @@ pub use crate::{
|
||||
launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once},
|
||||
startup::PeerRuntimeHandle,
|
||||
state_paths::{launch_settings_applied_path, setup_done_path},
|
||||
stream_install::{NoopStreamInstallProvider, StreamInstallFuture, StreamInstallProvider},
|
||||
};
|
||||
|
||||
// =============================================================================
|
||||
@@ -243,6 +245,8 @@ pub enum PeerCommand {
|
||||
file_descriptions: Vec<GameFileDescription>,
|
||||
install_after_download: bool,
|
||||
},
|
||||
/// Stream archive-expanded bytes directly into `local/` without keeping root archives.
|
||||
StreamInstallGame { id: String },
|
||||
/// Install already-downloaded archives into `local/`.
|
||||
InstallGame { id: String },
|
||||
/// Remove only the `local/` install for a game.
|
||||
@@ -260,11 +264,29 @@ pub enum PeerCommand {
|
||||
}
|
||||
|
||||
/// Optional startup settings for non-GUI callers and tests.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct PeerStartOptions {
|
||||
/// Directory used for peer identity and other state.
|
||||
pub state_dir: Option<PathBuf>,
|
||||
pub active_outbound_transfers: Option<crate::context::OutboundTransfers>,
|
||||
/// Provider used to stream archive entries for low-disk streamed installs.
|
||||
pub stream_install_provider: Option<Arc<dyn StreamInstallProvider>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PeerStartOptions {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PeerStartOptions")
|
||||
.field("state_dir", &self.state_dir)
|
||||
.field(
|
||||
"active_outbound_transfers",
|
||||
&self.active_outbound_transfers.as_ref().map(|_| "..."),
|
||||
)
|
||||
.field(
|
||||
"stream_install_provider",
|
||||
&self.stream_install_provider.as_ref().map(|_| "..."),
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
@@ -314,11 +336,14 @@ pub fn start_peer_with_options(
|
||||
let PeerStartOptions {
|
||||
state_dir,
|
||||
active_outbound_transfers,
|
||||
stream_install_provider,
|
||||
} = options;
|
||||
let state_dir = resolve_state_dir(state_dir.as_deref());
|
||||
let game_dir = game_dir.into();
|
||||
let active_outbound_transfers = active_outbound_transfers
|
||||
.unwrap_or_else(|| Arc::new(RwLock::new(std::collections::HashMap::new())));
|
||||
let stream_install_provider =
|
||||
stream_install_provider.unwrap_or_else(|| Arc::new(NoopStreamInstallProvider));
|
||||
log::info!(
|
||||
"Starting peer system with game directory: {}",
|
||||
game_dir.display()
|
||||
@@ -338,6 +363,7 @@ pub fn start_peer_with_options(
|
||||
unpacker,
|
||||
catalog,
|
||||
active_outbound_transfers,
|
||||
stream_install_provider,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -355,6 +381,7 @@ async fn run_peer(
|
||||
task_tracker: TaskTracker,
|
||||
catalog: Arc<RwLock<GameCatalog>>,
|
||||
active_outbound_transfers: crate::context::OutboundTransfers,
|
||||
stream_install_provider: Arc<dyn StreamInstallProvider>,
|
||||
) -> eyre::Result<()> {
|
||||
let ctx = Ctx::new(
|
||||
peer_game_db,
|
||||
@@ -366,6 +393,7 @@ async fn run_peer(
|
||||
task_tracker,
|
||||
catalog,
|
||||
active_outbound_transfers,
|
||||
stream_install_provider,
|
||||
);
|
||||
if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await {
|
||||
log::error!("Failed to load initial local game database: {err}");
|
||||
@@ -439,6 +467,9 @@ async fn handle_peer_commands(
|
||||
)
|
||||
.await;
|
||||
}
|
||||
PeerCommand::StreamInstallGame { id } => {
|
||||
handlers::handle_stream_install_game_command(ctx, tx_notify_ui, id).await;
|
||||
}
|
||||
PeerCommand::InstallGame { id } => {
|
||||
handle_install_game_command(ctx, tx_notify_ui, id).await;
|
||||
}
|
||||
|
||||
@@ -319,6 +319,7 @@ mod tests {
|
||||
TaskTracker::new(),
|
||||
Arc::new(RwLock::new(catalog)),
|
||||
Arc::new(RwLock::new(HashMap::new())),
|
||||
Arc::new(crate::NoopStreamInstallProvider),
|
||||
);
|
||||
*ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000));
|
||||
|
||||
|
||||
@@ -384,6 +384,7 @@ mod tests {
|
||||
TaskTracker::new(),
|
||||
Arc::new(RwLock::new(catalog)),
|
||||
Arc::new(RwLock::new(std::collections::HashMap::new())),
|
||||
Arc::new(crate::NoopStreamInstallProvider),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::{
|
||||
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_matches_catalog},
|
||||
peer::{send_game_file_chunk, send_game_file_data},
|
||||
services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync},
|
||||
stream_install::{send_game_install_stream, send_stream_install_error},
|
||||
};
|
||||
|
||||
type ResponseWriter = FramedWrite<SendStream, LengthDelimitedCodec>;
|
||||
@@ -99,6 +100,9 @@ async fn dispatch_request(
|
||||
} => {
|
||||
handle_file_chunk_request(ctx, game_id, relative_path, offset, length, framed_tx).await
|
||||
}
|
||||
Request::StreamInstall { game_id } => {
|
||||
handle_stream_install_request(ctx, game_id, framed_tx).await
|
||||
}
|
||||
Request::Goodbye { peer_id } => {
|
||||
handle_goodbye(ctx, remote_addr, peer_id).await;
|
||||
framed_tx
|
||||
@@ -386,6 +390,49 @@ async fn handle_file_chunk_request(
|
||||
FramedWrite::new(tx, LengthDelimitedCodec::new())
|
||||
}
|
||||
|
||||
async fn handle_stream_install_request(
|
||||
ctx: &PeerCtx,
|
||||
game_id: String,
|
||||
framed_tx: ResponseWriter,
|
||||
) -> ResponseWriter {
|
||||
log::info!("Received StreamInstall request for {game_id} from peer");
|
||||
|
||||
let (guard, cancel_token) = TransferGuard::new(
|
||||
game_id.clone(),
|
||||
ctx.active_outbound_transfers.clone(),
|
||||
ctx.tx_notify_ui.clone(),
|
||||
&ctx.shutdown,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut tx = framed_tx.into_inner();
|
||||
let game_dir = ctx.game_dir.read().await.clone();
|
||||
if !can_serve_game(ctx, &game_dir, &game_id).await {
|
||||
log::info!(
|
||||
"Declining StreamInstall for {game_id} because the game is not currently transferable"
|
||||
);
|
||||
tx = send_stream_install_error(tx, format!("game {game_id} is not transferable")).await;
|
||||
drop(guard);
|
||||
return FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
}
|
||||
|
||||
let game_root = game_dir.join(&game_id);
|
||||
let (returned_tx, result) = send_game_install_stream(
|
||||
ctx.stream_install_provider.clone(),
|
||||
tx,
|
||||
&game_root,
|
||||
&game_id,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
log::warn!("StreamInstall for {game_id} ended with error: {err}");
|
||||
}
|
||||
|
||||
drop(guard);
|
||||
FramedWrite::new(returned_tx, LengthDelimitedCodec::new())
|
||||
}
|
||||
|
||||
async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option<SocketAddr>, peer_id: String) {
|
||||
log::info!("Received Goodbye from peer {peer_id}");
|
||||
let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) };
|
||||
@@ -442,6 +489,7 @@ mod tests {
|
||||
TaskTracker::new(),
|
||||
Arc::new(RwLock::new(catalog)),
|
||||
Arc::new(RwLock::new(std::collections::HashMap::new())),
|
||||
Arc::new(crate::NoopStreamInstallProvider),
|
||||
)
|
||||
.to_peer_ctx(tx_notify_ui)
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use crate::{
|
||||
PeerCommand,
|
||||
PeerEvent,
|
||||
PeerRuntimeComponent,
|
||||
StreamInstallProvider,
|
||||
Unpacker,
|
||||
context::Ctx,
|
||||
events,
|
||||
@@ -87,6 +88,7 @@ pub(crate) fn spawn_peer_runtime(
|
||||
unpacker: Arc<dyn Unpacker>,
|
||||
catalog: Arc<RwLock<GameCatalog>>,
|
||||
active_outbound_transfers: crate::context::OutboundTransfers,
|
||||
stream_install_provider: Arc<dyn StreamInstallProvider>,
|
||||
) -> PeerRuntimeHandle {
|
||||
let shutdown = CancellationToken::new();
|
||||
let task_tracker = TaskTracker::new();
|
||||
@@ -107,6 +109,7 @@ pub(crate) fn spawn_peer_runtime(
|
||||
runtime_tracker.clone(),
|
||||
catalog,
|
||||
active_outbound_transfers,
|
||||
stream_install_provider,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -0,0 +1,372 @@
|
||||
use std::{
|
||||
future::Future,
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
use crc32fast::Hasher;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use lanspread_proto::{Message, Request, StreamInstallFrame};
|
||||
use s2n_quic::stream::SendStream;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::AsyncWriteExt,
|
||||
sync::{mpsc, mpsc::UnboundedSender},
|
||||
};
|
||||
use tokio_util::{
|
||||
codec::{FramedRead, FramedWrite, LengthDelimitedCodec},
|
||||
sync::CancellationToken,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
PeerEvent,
|
||||
install::root_eti_archives,
|
||||
network::connect_to_peer,
|
||||
path_validation::validate_game_file_path,
|
||||
};
|
||||
|
||||
const FRAME_CHANNEL_DEPTH: usize = 16;
|
||||
|
||||
pub type StreamInstallFuture<'a> = Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + 'a>>;
|
||||
|
||||
pub trait StreamInstallProvider: Send + Sync {
|
||||
fn stream_archive<'a>(
|
||||
&'a self,
|
||||
archive: &'a Path,
|
||||
frames: mpsc::Sender<StreamInstallFrame>,
|
||||
cancel_token: CancellationToken,
|
||||
) -> StreamInstallFuture<'a>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct NoopStreamInstallProvider;
|
||||
|
||||
impl StreamInstallProvider for NoopStreamInstallProvider {
|
||||
fn stream_archive<'a>(
|
||||
&'a self,
|
||||
archive: &'a Path,
|
||||
_frames: mpsc::Sender<StreamInstallFrame>,
|
||||
_cancel_token: CancellationToken,
|
||||
) -> StreamInstallFuture<'a> {
|
||||
Box::pin(async move {
|
||||
eyre::bail!(
|
||||
"streamed install provider is not configured for {}",
|
||||
archive.display()
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn send_stream_install_error(
|
||||
tx: SendStream,
|
||||
message: impl Into<String>,
|
||||
) -> SendStream {
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
if let Err(err) = framed_tx
|
||||
.send(
|
||||
StreamInstallFrame::Error {
|
||||
message: message.into(),
|
||||
}
|
||||
.encode(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
log::warn!("Failed to send streamed install error frame: {err}");
|
||||
}
|
||||
if let Err(err) = framed_tx.close().await {
|
||||
log::debug!("Failed to close streamed install error response: {err}");
|
||||
}
|
||||
framed_tx.into_inner()
|
||||
}
|
||||
|
||||
pub(crate) async fn send_game_install_stream(
|
||||
provider: Arc<dyn StreamInstallProvider>,
|
||||
tx: SendStream,
|
||||
game_root: &Path,
|
||||
game_id: &str,
|
||||
cancel_token: CancellationToken,
|
||||
) -> (SendStream, eyre::Result<()>) {
|
||||
let archives = match root_eti_archives(game_root).await {
|
||||
Ok(archives) => archives,
|
||||
Err(err) => {
|
||||
let message = err.to_string();
|
||||
let tx = send_stream_install_error(tx, message.clone()).await;
|
||||
return (tx, Err(eyre::eyre!(message)));
|
||||
}
|
||||
};
|
||||
if archives.is_empty() {
|
||||
let message = format!("no .eti archives found for {game_id}");
|
||||
let tx = send_stream_install_error(tx, message.clone()).await;
|
||||
return (tx, Err(eyre::eyre!(message)));
|
||||
}
|
||||
|
||||
let (frame_tx, mut frame_rx) = mpsc::channel(FRAME_CHANNEL_DEPTH);
|
||||
let producer_cancel = cancel_token.child_token();
|
||||
let game_id_for_producer = game_id.to_string();
|
||||
let producer = tokio::spawn({
|
||||
let provider = provider.clone();
|
||||
let producer_cancel = producer_cancel.clone();
|
||||
async move {
|
||||
for archive in archives {
|
||||
if producer_cancel.is_cancelled() {
|
||||
eyre::bail!("streamed install for {game_id_for_producer} was cancelled");
|
||||
}
|
||||
|
||||
if let Err(err) = provider
|
||||
.stream_archive(&archive, frame_tx.clone(), producer_cancel.clone())
|
||||
.await
|
||||
{
|
||||
let message = err.to_string();
|
||||
let _ = frame_tx.send(StreamInstallFrame::Error { message }).await;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
|
||||
let _ = frame_tx.send(StreamInstallFrame::Complete).await;
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
let mut send_result = Ok(());
|
||||
|
||||
while let Some(frame) = frame_rx.recv().await {
|
||||
if let Err(err) = framed_tx.send(frame.encode()).await {
|
||||
producer_cancel.cancel();
|
||||
send_result = Err(eyre::eyre!("failed to send streamed install frame: {err}"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let close_result = framed_tx
|
||||
.close()
|
||||
.await
|
||||
.map_err(|err| eyre::eyre!("failed to close streamed install stream: {err}"));
|
||||
let tx = framed_tx.into_inner();
|
||||
let producer_result = match producer.await {
|
||||
Ok(result) => result,
|
||||
Err(err) => Err(eyre::eyre!("streamed install producer task failed: {err}")),
|
||||
};
|
||||
let result = send_result.and(producer_result).and(close_result);
|
||||
|
||||
(tx, result)
|
||||
}
|
||||
|
||||
pub(crate) async fn receive_streamed_install(
|
||||
peer_addr: SocketAddr,
|
||||
game_id: &str,
|
||||
staging_dir: &Path,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
cancel_token: CancellationToken,
|
||||
) -> eyre::Result<()> {
|
||||
let staging_dir = tokio::fs::canonicalize(staging_dir)
|
||||
.await
|
||||
.unwrap_or_else(|_| staging_dir.to_path_buf());
|
||||
let mut conn = connect_to_peer(peer_addr).await?;
|
||||
let stream = conn.open_bidirectional_stream().await?;
|
||||
let (rx, tx) = stream.split();
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
framed_tx
|
||||
.send(
|
||||
Request::StreamInstall {
|
||||
game_id: game_id.to_string(),
|
||||
}
|
||||
.encode(),
|
||||
)
|
||||
.await?;
|
||||
framed_tx.close().await?;
|
||||
|
||||
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
|
||||
let mut current_file: Option<IncomingFile> = None;
|
||||
|
||||
loop {
|
||||
let next = tokio::select! {
|
||||
() = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"),
|
||||
next = framed_rx.next() => next,
|
||||
};
|
||||
|
||||
let Some(frame) = next else {
|
||||
eyre::bail!("streamed install ended before Complete");
|
||||
};
|
||||
let frame = frame?.freeze();
|
||||
let frame = StreamInstallFrame::decode(frame);
|
||||
|
||||
match frame {
|
||||
StreamInstallFrame::ArchiveBegin {
|
||||
archive_name,
|
||||
solid,
|
||||
} => {
|
||||
log::info!(
|
||||
"Receiving streamed install archive {archive_name} for {game_id} (solid={solid})"
|
||||
);
|
||||
}
|
||||
StreamInstallFrame::Directory { relative_path } => {
|
||||
let path = resolve_stream_path(&staging_dir, &relative_path)?;
|
||||
tokio::fs::create_dir_all(path).await?;
|
||||
}
|
||||
StreamInstallFrame::FileBegin {
|
||||
relative_path,
|
||||
size,
|
||||
crc32,
|
||||
} => {
|
||||
if current_file.is_some() {
|
||||
eyre::bail!("received FileBegin for {relative_path} before previous FileEnd");
|
||||
}
|
||||
let path = resolve_stream_path(&staging_dir, &relative_path)?;
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
let file = File::create(&path).await?;
|
||||
current_file = Some(IncomingFile::new(relative_path, path, size, crc32, file));
|
||||
}
|
||||
StreamInstallFrame::FileChunk { bytes } => {
|
||||
let Some(file) = current_file.as_mut() else {
|
||||
eyre::bail!("received FileChunk without FileBegin");
|
||||
};
|
||||
file.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes)
|
||||
.await?;
|
||||
}
|
||||
StreamInstallFrame::FileEnd { relative_path } => {
|
||||
let Some(file) = current_file.take() else {
|
||||
eyre::bail!("received FileEnd for {relative_path} without FileBegin");
|
||||
};
|
||||
file.finish(&relative_path).await?;
|
||||
}
|
||||
StreamInstallFrame::ArchiveEnd { archive_name } => {
|
||||
log::info!("Finished streamed install archive {archive_name} for {game_id}");
|
||||
}
|
||||
StreamInstallFrame::Complete => {
|
||||
if current_file.is_some() {
|
||||
eyre::bail!("streamed install completed with an open file");
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
StreamInstallFrame::Error { message } => {
|
||||
eyre::bail!("streamed install sender failed: {message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct IncomingFile {
|
||||
relative_path: String,
|
||||
path: PathBuf,
|
||||
expected_size: u64,
|
||||
expected_crc32: Option<u32>,
|
||||
received: u64,
|
||||
hasher: Hasher,
|
||||
file: File,
|
||||
}
|
||||
|
||||
impl IncomingFile {
|
||||
fn new(
|
||||
relative_path: String,
|
||||
path: PathBuf,
|
||||
expected_size: u64,
|
||||
expected_crc32: Option<u32>,
|
||||
file: File,
|
||||
) -> Self {
|
||||
Self {
|
||||
relative_path,
|
||||
path,
|
||||
expected_size,
|
||||
expected_crc32,
|
||||
received: 0,
|
||||
hasher: Hasher::new(),
|
||||
file,
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_chunk(
|
||||
&mut self,
|
||||
game_id: &str,
|
||||
peer_addr: SocketAddr,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
bytes: Bytes,
|
||||
) -> eyre::Result<()> {
|
||||
let offset = self.received;
|
||||
let length = u64::try_from(bytes.len())?;
|
||||
if offset.saturating_add(length) > self.expected_size {
|
||||
eyre::bail!(
|
||||
"streamed file {} exceeded expected size {}",
|
||||
self.relative_path,
|
||||
self.expected_size
|
||||
);
|
||||
}
|
||||
self.file.write_all(&bytes).await?;
|
||||
self.hasher.update(&bytes);
|
||||
self.received = self.received.saturating_add(length);
|
||||
|
||||
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
|
||||
id: game_id.to_string(),
|
||||
peer_addr,
|
||||
relative_path: format!("{game_id}/local/{}", self.relative_path),
|
||||
offset,
|
||||
length,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finish(mut self, relative_path: &str) -> eyre::Result<()> {
|
||||
if self.relative_path != relative_path {
|
||||
eyre::bail!(
|
||||
"streamed file end mismatch: began {}, ended {relative_path}",
|
||||
self.relative_path
|
||||
);
|
||||
}
|
||||
self.file.flush().await?;
|
||||
|
||||
if self.received != self.expected_size {
|
||||
eyre::bail!(
|
||||
"streamed file {} size mismatch: got {}, expected {}",
|
||||
self.relative_path,
|
||||
self.received,
|
||||
self.expected_size
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(expected) = self.expected_crc32 {
|
||||
let actual = self.hasher.finalize();
|
||||
if actual != expected {
|
||||
eyre::bail!(
|
||||
"streamed file {} CRC32 mismatch: got {actual:08X}, expected {expected:08X}",
|
||||
self.relative_path
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
log::debug!(
|
||||
"Received streamed file {} -> {}",
|
||||
self.relative_path,
|
||||
self.path.display()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_stream_path(staging_dir: &Path, relative_path: &str) -> eyre::Result<PathBuf> {
|
||||
validate_game_file_path(staging_dir, relative_path)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_support::TempDir;
|
||||
|
||||
#[test]
|
||||
fn stream_paths_stay_inside_staging_dir() {
|
||||
let temp = TempDir::new("lanspread-stream-install-path");
|
||||
let staging = temp.path().join("staging");
|
||||
std::fs::create_dir_all(&staging).expect("staging should be created");
|
||||
let staging = std::fs::canonicalize(staging).expect("staging should canonicalize");
|
||||
|
||||
assert!(resolve_stream_path(&staging, "bin/game.exe").is_ok());
|
||||
assert!(resolve_stream_path(&staging, "../outside").is_err());
|
||||
assert!(resolve_stream_path(&staging, "/absolute").is_err());
|
||||
assert!(resolve_stream_path(&staging, "C:/windows").is_err());
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,7 @@ use bytes::Bytes;
|
||||
use lanspread_db::db::{Game, GameFileDescription};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const PROTOCOL_VERSION: u32 = 4;
|
||||
pub const PROTOCOL_VERSION: u32 = 5;
|
||||
|
||||
pub use lanspread_db::db::Availability;
|
||||
|
||||
@@ -67,6 +67,9 @@ pub enum Request {
|
||||
offset: u64,
|
||||
length: u64,
|
||||
},
|
||||
StreamInstall {
|
||||
game_id: String,
|
||||
},
|
||||
Hello(Hello),
|
||||
LibraryDelta {
|
||||
peer_id: String,
|
||||
@@ -94,6 +97,35 @@ pub enum Response {
|
||||
InternalPeerError(String),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum StreamInstallFrame {
|
||||
ArchiveBegin {
|
||||
archive_name: String,
|
||||
solid: bool,
|
||||
},
|
||||
Directory {
|
||||
relative_path: String,
|
||||
},
|
||||
FileBegin {
|
||||
relative_path: String,
|
||||
size: u64,
|
||||
crc32: Option<u32>,
|
||||
},
|
||||
FileChunk {
|
||||
bytes: Bytes,
|
||||
},
|
||||
FileEnd {
|
||||
relative_path: String,
|
||||
},
|
||||
ArchiveEnd {
|
||||
archive_name: String,
|
||||
},
|
||||
Complete,
|
||||
Error {
|
||||
message: String,
|
||||
},
|
||||
}
|
||||
|
||||
// Add Message trait
|
||||
pub trait Message {
|
||||
fn decode(bytes: Bytes) -> Self;
|
||||
@@ -145,3 +177,29 @@ impl Message for Response {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Message for StreamInstallFrame {
|
||||
fn decode(bytes: Bytes) -> Self {
|
||||
match serde_json::from_slice(&bytes) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
tracing::error!(?e, "StreamInstallFrame decoding error");
|
||||
StreamInstallFrame::Error {
|
||||
message: format!("stream install frame decoding error: {e}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn encode(&self) -> Bytes {
|
||||
match serde_json::to_vec(self) {
|
||||
Ok(s) => Bytes::from(s),
|
||||
Err(e) => {
|
||||
tracing::error!(?e, "StreamInstallFrame encoding error");
|
||||
Bytes::from(format!(
|
||||
r#"{{"Error": {{"message": "encoding error: {e}"}}}}"#
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1876,6 +1876,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) {
|
||||
PeerStartOptions {
|
||||
state_dir: Some(state_dir),
|
||||
active_outbound_transfers: Some(state.active_outbound_transfers.clone()),
|
||||
stream_install_provider: None,
|
||||
},
|
||||
) {
|
||||
Ok(handle) => {
|
||||
|
||||
Reference in New Issue
Block a user