Compare commits

...

4 Commits

Author SHA1 Message Date
ddidderr c00e6eae84 fix(peer): drain streamed install senders after completion
A streamed install sender kept the original frame sink alive outside the
producer task. After the producer sent Complete, or an Error for a provider
failure, the forwarding loop still had a live mpsc sender in scope and waited
forever for another frame.

Move the sink into the producer so the channel closes when the producer exits.
That lets the QUIC writer close, the request task return, and the outbound
TransferGuard drop after successful streamed installs and provider-side
failures.

The peer-cli harness now keeps the outbound-transfer map it passes into the
peer runtime and exposes per-game counts in status. S39 asserts that the source
has no active outbound transfer for cnctw after the streamed install finishes,
which catches the sender-side lifecycle leak that receiver-only assertions
missed. The peer-cli README and scenario table document that status field and
expectation.

Test Plan:
- just fmt
- just test
- just clippy
- git diff --check
- git diff --cached --check
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S39 S40 --build-image
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S41 S42 S43 S44 S45 S46 S47

Refs: NEXT_STEPS.md streamed install lifecycle hardening
2026-06-11 08:31:12 +02:00
ddidderr 66c7d5912b fix(peer): harden streamed install lifecycle
Claude Fable 5's branch review found that receiver cancellation or a QUIC
send failure could leave the sender-side archive producer blocked on the
bounded frame channel. That kept the outbound transfer guard alive and could
block later installs or updates of the same game.

Route archive frames through a cancellable StreamInstallFrameSink instead of
exposing the raw channel sender to providers. The QUIC forwarder now cancels
and closes the receive side before awaiting the producer, so a blocked send
wakes and the transfer guard can drop normally.

Make PeerCommand::StreamInstallGame own its peer metadata preflight inside the
peer core. The Tauri layer now sends the command directly, and the peer runtime
fetches file details from catalog-version peers before running the existing
majority validation and retry logic. This removes the UI-only pending streamed
install set and gives PeerEvent::GotGameFiles one meaning again: continue a
normal archive download.

Tighten the receiver transaction edge cases too. Rollback removes a newly
created empty game root, but preserves pre-existing roots. Once streamed
staging has been promoted to local/, intent or launch-settings cleanup failures
are logged for startup recovery instead of reporting a failed install for bytes
that are already committed.

Accept missing RAR CRC32 metadata for zero-byte files as CRC32 00000000 while
still requiring CRC32 metadata for non-empty files. Update the peer README,
scenario docs, and next-steps handoff so the documented ownership and remaining
trust limitation match the implementation.

Test Plan:
- just fmt
- just test
- just frontend-test
- just clippy
- git diff --check
- python3 -m py_compile \
  crates/lanspread-peer-cli/scripts/run_extended_scenarios.py
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \
  S39 S40 S41 S42 S43 S44 S45 S46 S47 --build-image

Refs: streamed-install review handoff from Claude Fable 5
2026-06-11 07:33:34 +02:00
ddidderr 9c765aba9c [deps] cargo update
Updating http                       v1.4.1   -> v1.4.2
Updating js-sys                     v0.3.99  -> v0.3.100
Updating regex-syntax               v0.8.10  -> v0.8.11
Updating regex                      v1.12.3  -> v1.12.4
Updating s2n-codec                  v0.81.0  -> v0.82.0
Updating s2n-quic-core              v0.81.0  -> v0.82.0
Updating s2n-quic-crypto            v0.81.0  -> v0.82.0
Updating s2n-quic-platform          v0.81.0  -> v0.82.0
Updating s2n-quic-rustls            v0.81.0  -> v0.82.0
Updating s2n-quic-tls-default       v0.81.0  -> v0.82.0
Updating s2n-quic-tls               v0.81.0  -> v0.82.0
Updating s2n-quic-transport         v0.81.0  -> v0.82.0
Updating s2n-quic                   v1.81.0  -> v1.82.0
Updating uuid                       v1.23.2  -> v1.23.3
Updating wasm-bindgen-futures       v0.4.72  -> v0.4.73
Updating wasm-bindgen-macro-support v0.2.122 -> v0.2.123
Updating wasm-bindgen-macro         v0.2.122 -> v0.2.123
Updating wasm-bindgen-shared        v0.2.122 -> v0.2.123
Updating wasm-bindgen               v0.2.122 -> v0.2.123
Updating web-sys                    v0.3.99  -> v0.3.100
Updating zerocopy-derive            v0.8.50  -> v0.8.52
Updating zerocopy                   v0.8.50  -> v0.8.52
2026-06-10 22:13:23 +02:00
ddidderr 47ef87748f test(peer-cli): align scenarios with catalog versions
Remote aggregation now filters to catalog-version roots, but the checked-in
peer-cli fixtures and skew scenarios still stamped synthetic future versions.
That hid fixture rows in S3 and left scenario docs asserting latest-version
behavior.

Teach the harness the catalog versions for fixture game IDs, stamp generated
fixtures with catalog versions by default, and update skew, mesh, propagation,
and throughput scenarios to expect only catalog-version peers. Also wire S38
into the executable matrix so the documented first-play launch-setting scenario
is covered by the same full run as S1-S47.

This keeps stale peers as negative coverage: they are absent from list-games and
cannot provide descriptors, votes, or chunks. The fixture version.ini updates
are checked in so alpha, bravo, charlie, and persona roots advertise
downloadable catalog games again.

Test Plan:
- python3 -m py_compile
  crates/lanspread-peer-cli/scripts/run_extended_scenarios.py
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \
  S3 S8 S14 S15 S16 S17 S21 S22 S23 S24 S29 S30 S31 S34 S36 S37 \
  S39 S40 S41 S42 S43 S44 S45 S46 S47 --build-image
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S38
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py
- git diff --check
- git diff --cached --check

Docs: PEER_CLI_SCENARIOS.md
2026-06-08 07:06:21 +02:00
24 changed files with 765 additions and 375 deletions
Generated
+44 -45
View File
@@ -1521,9 +1521,9 @@ dependencies = [
[[package]]
name = "http"
version = "1.4.1"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0"
checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425"
dependencies = [
"bytes",
"itoa",
@@ -1937,13 +1937,12 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.99"
version = "0.3.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11"
checksum = "f2025f20d7a4fa7785846e7b63d10a76d3f1cee98ee5cb79ea59703f95e42162"
dependencies = [
"cfg-if",
"futures-util",
"once_cell",
"wasm-bindgen",
]
@@ -3104,9 +3103,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.12.3"
version = "1.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276"
checksum = "f1292b7759ae1cb9ec195452d1390a074f0cd8541ab7a5a8c31cd6db45d4a6ba"
dependencies = [
"aho-corasick",
"memchr",
@@ -3127,9 +3126,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.8.10"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
checksum = "d6f6ff9a378485b298a5286656da665ba74413d36db0979633275d2e708145d4"
[[package]]
name = "reqwest"
@@ -3275,9 +3274,9 @@ checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "s2n-codec"
version = "0.81.0"
version = "0.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d197a3c92bbe21fc00ba8366f6ba14edb8685316b6c8c14c622d3aba0a3816d8"
checksum = "a650d3f187901f3519ec8a1fe7da3faccc0b2fb40f350eda2c7851fdf2bda0f6"
dependencies = [
"byteorder",
"bytes",
@@ -3286,9 +3285,9 @@ dependencies = [
[[package]]
name = "s2n-quic"
version = "1.81.0"
version = "1.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8728244102e791769cebe44a4abace966d8826f3266e9691c4233f47921b94b8"
checksum = "c27c34127facefcd3e5530c4de5739a62cd4a593710b1194dacbd8e884b6be92"
dependencies = [
"bytes",
"cfg-if",
@@ -3310,9 +3309,9 @@ dependencies = [
[[package]]
name = "s2n-quic-core"
version = "0.81.0"
version = "0.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6cc69861a4909ea508b26309504899f4b0f77bb35348f6a36b7de9a28b1a4b92"
checksum = "79fbc3f06797d985363f74de105d18554b5a272b924b166d73a6564943da1230"
dependencies = [
"atomic-waker",
"byteorder",
@@ -3332,9 +3331,9 @@ dependencies = [
[[package]]
name = "s2n-quic-crypto"
version = "0.81.0"
version = "0.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3ce7f399a87be4b49d76895cdddb987620d34f334072d011bcac913d20fe69"
checksum = "e58ea5aa39eecc29559d1e1bb4a5d55a747fa7b80cff5a3400c57489510644e3"
dependencies = [
"aws-lc-rs",
"cfg-if",
@@ -3346,9 +3345,9 @@ dependencies = [
[[package]]
name = "s2n-quic-platform"
version = "0.81.0"
version = "0.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa9004809ae3a778b8e015581a47e9fb389f9ec230456a24b81c6287b000fefe"
checksum = "4eebb6007139cfffdf3d473d39f01a214032c339432a6293b16b0f7b25343f40"
dependencies = [
"cfg-if",
"futures",
@@ -3361,9 +3360,9 @@ dependencies = [
[[package]]
name = "s2n-quic-rustls"
version = "0.81.0"
version = "0.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf7c34876c77f7560ee4385cd5ff0510acade2eb66dc237a45f7c63d2e7f1af3"
checksum = "eb0084afa65eefae2c37d9ab44118a14dfc5bb78dbf997c0f5176f7cf8d2e633"
dependencies = [
"bytes",
"rustls",
@@ -3375,9 +3374,9 @@ dependencies = [
[[package]]
name = "s2n-quic-tls"
version = "0.81.0"
version = "0.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7b14505cff3d9e39b930c31c150fe2965ee5fe1b654f7c6d33b1f50680ac0b"
checksum = "91150b25ce824ffea581b449ad04acf9b4aef2fa68a46f667cdc9cc6f7b87823"
dependencies = [
"bytes",
"errno",
@@ -3390,9 +3389,9 @@ dependencies = [
[[package]]
name = "s2n-quic-tls-default"
version = "0.81.0"
version = "0.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3297fc8531b3c19f339a3ce1969fdc0e9928cfe439ca6c9f9d9d7ca4522a3b8c"
checksum = "e1f5ae64863972facee778dc80a24317e613f035296631f267b71f225e569c22"
dependencies = [
"s2n-quic-rustls",
"s2n-quic-tls",
@@ -3400,9 +3399,9 @@ dependencies = [
[[package]]
name = "s2n-quic-transport"
version = "0.81.0"
version = "0.82.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1ddd739c1776770dd2ab0b33da1cf372a395500252ae5250c08e2d6bf51b38f"
checksum = "3b82fca53ce1734cc1d1dca96cc9ceb65ed528f27cb43b7de865215b6cf17908"
dependencies = [
"bytes",
"futures-channel",
@@ -5003,9 +5002,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "uuid"
version = "1.23.2"
version = "1.23.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7"
checksum = "144d6b123cef80b301b8f72a9e2ca4370ddec21950d0a103dd22c437006d2db7"
dependencies = [
"getrandom 0.4.2",
"js-sys",
@@ -5108,9 +5107,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen"
version = "0.2.122"
version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409"
checksum = "a254a4b10c19a76f09a27640e7ffbf9bc30bf67e16a3bf28aaefa4920fe81563"
dependencies = [
"cfg-if",
"once_cell",
@@ -5121,9 +5120,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.72"
version = "0.4.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f"
checksum = "54568702fabf5d4849ce2b90fadfa64168a097eaf4b351ce9df8b687a0086aaf"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -5131,9 +5130,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.122"
version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6"
checksum = "24a40fc75b0ec6f3746ceb10d36f53a93dcd68a93b11b6445983945d79eba0dc"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -5141,9 +5140,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.122"
version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e"
checksum = "908f34bd9b9ce3d4caf07b72dfab63d61504d156856c6bd3cd87fa350cf3985b"
dependencies = [
"bumpalo",
"proc-macro2",
@@ -5154,9 +5153,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.122"
version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437"
checksum = "7acbf7616c27b194bbb550bf77ed0c2c3e5b7fd1260a93082b95fb7f47959b92"
dependencies = [
"unicode-ident",
]
@@ -5210,9 +5209,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.99"
version = "0.3.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436"
checksum = "6e0871acf327f283dc6da28a1696cdc64fb355ba9f935d052021fa77f35cce69"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -6019,18 +6018,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.50"
version = "0.8.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b065d4f0e55f82fae73202e189638116a87c55ab6b8e6c2721e13dd9d854ad1"
checksum = "ce1022995ff5ff5d841ad7d994facc23098cd40152f2c1d11cd607c6f530653f"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.50"
version = "0.8.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631b19d36a892ab55420c92dbc83ccd79274f25be714855d3074aa71cab639"
checksum = "1ae7f38b72ec2a254e2b87ef277cf2cd4fb97cbebf944faa6f33354da0867930"
dependencies = [
"proc-macro2",
"quote",
+3 -2
View File
@@ -61,5 +61,6 @@ product-ready.
modal status shows `Installed, not shareable`. Downloaded-and-installed games
keep the normal `Installed` label.
My recommended next slice: make the provider abstraction final-ish, then
implement a real one-pass provider. Everything else builds cleanly on that.
The remaining production-readiness step is additive: move from sender-owned RAR
metadata to catalog-owned archive or extracted-file hashes, then verify those
at the receiver before committing the streamed install.
+47 -25
View File
@@ -22,31 +22,31 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path.
| S12 | Transfer serving gates | A peer has a non-catalog, missing-sentinel, active-operation, or `local/` path request. | The serving peer declines metadata/data; covered by unit tests where timing is too small for a stable CLI race test. |
| S13 | Exact transferred-file equality | Repeat small and large downloads, then compare every transferred regular file against its source with SHA-256 manifests. | Source and receiver manifests match exactly for each transferred file; no extra or missing files appear in the downloaded game root. |
| S14 | Large multi-peer chunked download | `fixture-alpha/alienswarm` contains a renamed RAR `.eti` larger than 100 MB. A second peer downloads it, then a third peer downloads `alienswarm` from both peers. | The third peer's downloaded files match the source by SHA-256; `download-chunk-finished` events show the large `.eti` chunks coming from both peers with byte counts balanced within one chunk. |
| S15 | Three-way version skew | Three peers advertise the same catalog game ID. Peer A has `version.ini=20250101`, peer B has `version.ini=20250201`, and peer C has `version.ini=20250301`; each version has distinguishable file contents. An empty client connects to all three and downloads the game with `install=false`. | `list-games` shows one row for the game with `peer_count=3` and `eti_game_version=20250301`. The `got-game-files` descriptor set and transfer source are peer C's newest version only; no chunks come from A or B. The receiver's `version.ini` and SHA-256 manifest match C exactly. |
| S16 | Latest-version fanout with stale peers present | Peer A has an older version of a game. Peers B and C both advertise the same newest version with matching file manifests; use a large file when proving chunk split. | The aggregated row still counts all ready peers, but eligible transfer peers are only B and C. Large-file chunks may split between B and C; peer A contributes no manifest majority vote and no file chunks. |
| S17 | Latest-version conflict rejection | Peer A has an older version. Peers B and C both advertise the newest version, but their latest-version file sizes conflict. | Validation considers only the latest-version peers, so A cannot rescue the majority. The download fails with `download-failed`, and no committed target `version.ini` remains. |
| S15 | Catalog-version skew | Three peers advertise the same catalog game ID. Peers A and B have stale `version.ini` values; peer C has the catalog's expected version. An empty client connects to all three and downloads the game with `install=false`. | `list-games` shows one row for the game with `peer_count=1` and the catalog `eti_game_version`. The `got-game-files` descriptor set and transfer source are peer C only; no chunks come from A or B. The receiver's `version.ini` and SHA-256 manifest match C exactly. |
| S16 | Catalog-version fanout with stale peers present | Peer A has a stale version of a game. Peers B and C both advertise the catalog version with matching file manifests; use a large file when proving chunk split. | The aggregated row counts only catalog-version ready peers. Large-file chunks may split between B and C; peer A is not listed as downloadable and contributes no manifest vote or file chunks. |
| S17 | Catalog-version conflict rejection | Peer A has a stale version. Peers B and C both advertise the catalog version, but their file sizes conflict. | Validation considers only the catalog-version peers, so A cannot rescue the majority. The download fails with `download-failed`, and no committed target `version.ini` remains. |
| S18 | Mid-download source drop with redundancy | Client downloads a large shared game from two ready peers, then one source is killed after the download has begun. | Failed chunks are retried against the surviving source; the download finishes, no `download-failed` is emitted, and the receiver's files match the source by diff or SHA-256. |
| S19 | Mid-download sole-source drop | Client downloads a large game from one source, then that source is killed after the download has begun. | The download emits `download-failed`; no committed target `version.ini` remains; any partial payload is not advertised as ready; active operation state clears so a retry is possible. |
| S20 | Receiver write failure | Client downloads a large game into a constrained `/games` filesystem. | The download fails deterministically, no committed `version.ini` is advertised, and active operation state clears so the peer can retry later. |
| S21 | Add-game propagation | Two connected peers are running; one peer gains a new catalog game root through a completed download or an external drop. | The other peer receives a library update without reconnecting, and `list-games` shows the new remote game under the existing peer. |
| S22 | Remove-game propagation | Two connected peers are running; one peer loses a previously advertised game root. | The other peer receives a library update without dropping the peer, and `list-games` no longer shows that remote game. |
| S23 | Version bump propagation | Two connected peers are running; one peer's ready game root gets a newer `version.ini`. | The other peer receives a library update without reconnecting, and the aggregated row reflects the newer `eti_game_version`. |
| S23 | Version bump propagation | Two connected peers are running; one peer's ready game root starts with a stale `version.ini`, then changes to the catalog version. | The other peer receives a library update without reconnecting; the stale row is absent before the change, then the catalog-version game appears as downloadable. |
| S24 | Two clients pull from one source | Two empty clients connect to the same source and download the same large game concurrently. | Both downloads finish, both receivers match the source by diff or SHA-256, and the source remains responsive. |
| S25 | One client downloads two games concurrently | One client connected to a source issues two different `download` commands without waiting for the first to finish. | Both operations may run in parallel; both eventually finish, each game reaches the requested install state, and each transferred root matches its source. |
| S26 | Same-game duplicate download rejection | A client starts downloading a game, then issues a second `download` command for the same game while the first operation is active. | The second request is rejected deterministically as an operation-in-progress condition; the first download is not corrupted and still reaches its documented final state. |
| S27 | Self-connect rejection | A peer sends `connect` to its own advertised listener address. | The command fails cleanly, no self-peer entry is created, and the peer remains responsive. |
| S28 | Address change without identity change | A known peer is rediscovered with the same peer ID and a different listener address while its library is still known. | The peer record updates in place to the new address, the existing library stays attached to that peer ID, and no duplicate peer entry appears. This is covered with a deterministic unit-level check until the CLI can rebind a live listener without restart. |
| S29 | Empty-library peer participates | A peer with no games connects into the mesh. | Other peers list it as a peer with zero games; it can receive a download, advertise the new game without restart, and become a source. |
| S30 | 5+ peer mesh aggregation | Five peers advertise partially overlapping catalog games with a mix of unique games, shared games, and differing versions; a sixth client connects to all five. | The client shows one row per game ID, correct ready-source `peer_count`, latest `eti_game_version`, no duplicates, and no self entries. |
| S30 | 5+ peer mesh aggregation | Five peers advertise partially overlapping catalog games with a mix of unique and shared catalog-version games; a sixth client connects to all five. | The client shows one row per game ID, correct catalog-version ready-source `peer_count`, catalog `eti_game_version`, no duplicates, and no self entries. |
| S31 | Bootstrapped peer becomes source in same session | An empty client downloads a game from a source, the original source shuts down, then a fresh third peer downloads the same game from the bootstrapped client. | The third peer's files match the original source by diff or SHA-256, proving downloaded files become servable without restart. |
| S32 | Reinstall after uninstall | A downloaded game is installed, uninstalled, then installed again without another download. | `local/` is recreated from preserved root files, no transfer events occur during reinstall, and the game returns to `installed=true`. |
| S33 | Install after external root mutation | A downloaded game root is externally mutated before `install` is issued. | The CLI fixture installer installs from the current root bytes. The resulting `local/fixture-payload.txt` must match the mutated archive bytes exactly. |
| S34 | Many-small-files game without `.eti` | A catalog game root contains `version.ini` plus many small regular files and no archive. | Download with `install=false` transfers every file, chunk events are coherent for small files, and source/receiver manifests match exactly. |
| S35 | Unknown game ID from remote peer | A remote peer advertises a game ID that is not in the receiver's catalog. | The receiver does not list the unknown game as downloadable, download attempts fail deterministically, and no local files are created. |
| S36 | Latest singleton beats stale majority | Five peers advertise one game; one peer has `20260501`, four peers have `20250101`. | `list-games` reports `eti_game_version=20260501`; all descriptors and chunks come from the singleton latest peer; stale peers contribute zero bytes. |
| S36 | Catalog singleton beats stale majority | Five peers advertise one game; one peer has the catalog version and four peers have stale versions. | `list-games` reports `peer_count=1` and the catalog `eti_game_version`; all descriptors and chunks come from the singleton catalog-version peer, while stale peers remain hidden and contribute zero bytes. |
| S37 | Single-source download throughput | A source peer advertises a temporary catalog game with one sparse `2 GiB` `.eti`; an empty client downloads it with `install=false`. | The client emits `download-finished` with throughput measurements (`bytes`, `duration_ms`, `mib_per_s`, `mbit_per_s`), and the downloaded archive size matches the source. |
| S38 | First-play launch-setting stamping | `fixture-persona/css` ships a real RAR `.eti` whose tree buries a CRLF `SmartSteamEmu.ini` with a stub `PersonaName` line under `engine/bin/win64/steam_settings/`, plus a stub `account_name.txt` and `language.txt` under `profiles/local/`. A peer installs `css` (with `--unrar`), then sends `play css` with a username and language, then `play css` again. | After install the marker `games/css/launch_settings_applied` is absent and the stub files are intact under `local/`. The first `play` returns `already_applied=false` with `account_name_written`, `language_written`, and `persona_name_written` all true; the deep `SmartSteamEmu.ini` `PersonaName` value becomes the username with its `\r\n` ending and sibling lines preserved, `account_name.txt` becomes the username, `language.txt` becomes the passed language, and the marker now exists. A second `play` returns `already_applied=true`, rewrites nothing, and leaves the files untouched even if their values were reset externally. |
| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `got-game-files`, `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. |
| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256; the source reports no active outbound transfer for `cnctw` after completion. |
| S40 | Streamed install receiver is not a peer source | After S39, a third peer connects only to the streamed-install receiver. | The third peer may see the receiver's local-only summary in peer snapshots, but `list-games` remote aggregation does not expose `cnctw` as downloadable, `peer_count` remains zero/absent, and attempting `download cnctw` fails with no local files created. |
| S41 | Solid archive streamed install | Empty client connects to a peer serving `fixture-solid/cnctw`, whose `.eti` is a real solid RAR archive. The receiver uses the container-bundled `unrar` stream provider. | The fixture is verified as solid with `unrar lt`; streamed install finishes with `downloaded=false`, `installed=true`, `availability=LocalOnly`; root archive and `version.ini` are absent; streamed byte count equals the extracted solid entries; local payload SHA-256 hashes match `unrar p` output. |
| S42 | Streamed install whole-stream retry | Empty client connects to two peers serving the same catalog-version `cnctw`: one broken source whose `--unrar` path is missing, followed by one good source. | The broken source sorts before the good source in retry order, contributes zero chunks, and the good source completes a fresh whole-stream attempt. The final state is local-only installed, no root archive/sentinel, no `.local.installing`, byte count matches the extracted entries, and payload hashes match the good source. |
@@ -58,22 +58,21 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path.
## Version-Skew Contract
Use S15-S17 to pin down what "newer" means when several peers have the same
game ID:
Use S15-S17 to pin down what happens when several peers have the same game ID
but only some match the local catalog version:
- Version comparison uses the eight-digit `version.ini` string, so use sortable
`YYYYMMDD` values in manual fixtures.
- The receiver's catalog is authoritative. A remote root whose `version.ini`
does not match the catalog's expected version for that game ID is not
downloadable.
- `list-games` aggregates by game ID. The game appears once; `peer_count`
counts all ready peers with that ID, including peers that only have older
versions.
- The aggregated `eti_game_version` must be the newest ready version.
counts only ready peers with that ID and the catalog version.
- The aggregated `eti_game_version` must be the catalog version.
- The descriptor set emitted to the download path, file-size validation, and
transfer planning are latest-only. Older-version peers may be queried by a
generic detail request, but their descriptors must not supply download
descriptors, majority votes, or chunks once a newer version exists.
- If exactly one peer has the latest version, that peer is the only transfer
source. If several peers tie on the latest version, validation and chunk
fanout happen among that latest-version set only.
transfer planning are catalog-version-only. Stale peers must not supply
download descriptors, majority votes, or chunks.
- If exactly one peer has the catalog version, that peer is the only transfer
source. If several peers match the catalog version, validation and chunk
fanout happen among that catalog-version set only.
- Capture proof with the `list-games` row, `got-game-files` descriptors,
`download-chunk-finished` source addresses, and source/receiver SHA-256
manifests.
@@ -87,7 +86,7 @@ GUI:
payload files may remain, but they must not be advertised as a ready local
game and must not leave an active operation stuck.
- Source failure during a redundant download should retry failed chunks against
another validated source for the same latest-version file.
another validated source for the same catalog-version file.
- Live local library changes are observable by connected peers through library
deltas; reconnect is not required for add, remove, or version-bump cases.
- Same-game operations are single-flight. A duplicate download request while a
@@ -96,10 +95,10 @@ GUI:
are not downloadable.
For a manual run, prefer a catalog game ID already served by the fixture lab,
such as `cnc4`, then create temporary `just peer-cli-run` game roots with
different `version.ini` contents. The existing alpha/bravo/charlie fixtures
cover duplicate-source and shared-game cases, but not the three-version skew
until a dedicated fixture or temporary games root is prepared.
such as `cnc4`, then create temporary `just peer-cli-run` game roots where some
peers match the catalog version and others deliberately use stale
`version.ini` contents. The existing alpha/bravo/charlie fixtures cover
duplicate-source and shared-game cases; S15-S17 add the focused skew cases.
## First-Play Launch-Setting Contract
@@ -144,6 +143,29 @@ Use S39-S41 to pin down low-disk streamed installs:
## Run Log
### 2026-06-07 - Catalog-Version Matrix Alignment (S1-S47)
- Code under test aligned checked-in fixture `version.ini` sentinels with the
catalog, made `run_extended_scenarios.py` stamp generated fixture games with
catalog versions by default, updated S15-S17/S23/S30/S36/S37 to assert
catalog-authoritative aggregation, and wired S38 into the executable matrix.
- Gates before Docker: `python3 -m py_compile
crates/lanspread-peer-cli/scripts/run_extended_scenarios.py` passed.
- Targeted rebuilt-image runner:
`python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S3 S8 S14 S15 S16 S17 S21 S22 S23 S24 S29 S30 S31 S34 S36 S37 S39 S40 S41 S42 S43 S44 S45 S46 S47 --build-image`
passed.
- S38 standalone runner:
`python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S38`
passed, proving the real-RAR `css` fixture installs with the container
`/usr/local/bin/unrar` sidecar and stamps launch settings only once.
- Full matrix runner:
`python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py`
passed for S1-S47 against the rebuilt `lanspread-peer-cli:dev` image.
- The final full-run highlights included S3 aggregation, S15-S17
catalog-version skew/fanout/conflict, S23 stale-to-catalog propagation, S30
mesh aggregation, S36 catalog singleton over stale majority, S37 throughput,
S38 first-play stamping, and S39-S47 streamed-install coverage.
### 2026-06-07 - Streamed Install Edge Coverage (S43-S47)
- Code under test added `cancel-download` to `lanspread-peer-cli`, added the
+4
View File
@@ -42,3 +42,7 @@ echoed back on the result or error line.
{"id":"u1","cmd":"uninstall","game_id":"fixture-one"}
{"id":"q1","cmd":"shutdown"}
```
The `status` result includes receiver-side `active_operations` and
sender-side `active_outbound_transfers` counts by game ID, which the scenario
runner uses to verify transfer lifecycle cleanup.
@@ -1 +1 @@
20250101
20190317
@@ -1 +1 @@
20250103
20160130
@@ -1 +1 @@
20250102
20200721
@@ -1 +1 @@
20250201
20210416
@@ -1 +1 @@
20250202
20170204
@@ -1 +1 @@
20250203
20160128
@@ -1 +1 @@
20250102
20200721
@@ -1 +1 @@
20250202
20170204
@@ -1 +1 @@
20250301
20160920
@@ -1 +1 @@
20250302
20200315
@@ -1 +1 @@
20250303
20200907
@@ -1 +1 @@
20250101
20240623
@@ -28,7 +28,20 @@ CONTAINER_PREFIX = "lanspread-peer-cli-ext"
CATALOG_DB = "/app/game.db"
FIXTURES = REPO / "crates" / "lanspread-peer-cli" / "fixtures"
CHUNK_SIZE = 128 * 1024 * 1024
CATALOG_VERSIONS = {
"alienswarm": "20190317",
"bf1942": "20160130",
"bfbc2": "20210416",
"cnc4": "20170204",
"cnctw": "20160128",
"cod5": "20160920",
"cod6": "20200315",
"coh": "20200907",
"css": "20240623",
"ggoo": "20200721",
}
PERF_GAME_ID = "bf1942"
PERF_GAME_VERSION = CATALOG_VERSIONS[PERF_GAME_ID]
PERF_GAME_SIZE = 2 * 1024 * 1024 * 1024
IGNORED_DIFF_NAMES = {".lanspread", ".lanspread.json", "local"}
@@ -305,8 +318,8 @@ class Runner:
("S13", self.s13_exact_transfer_equality),
("S14", self.s14_large_multi_peer_chunking),
("S15", self.s15_three_way_version_skew),
("S16", self.s16_latest_fanout_with_stale),
("S17", self.s17_latest_conflict_rejection),
("S16", self.s16_catalog_fanout_with_stale),
("S17", self.s17_catalog_conflict_rejection),
("S18", self.s18_redundant_source_drop),
("S19", self.s19_sole_source_drop),
("S20", self.s20_receiver_write_failure),
@@ -325,8 +338,9 @@ class Runner:
("S33", self.s33_install_after_mutation),
("S34", self.s34_many_small_files),
("S35", self.s35_unknown_game_filtered),
("S36", self.s36_latest_singleton),
("S36", self.s36_catalog_singleton),
("S37", self.s37_single_source_download_throughput),
("S38", self.s38_first_play_launch_settings),
("S39", self.s39_streamed_install_local_only),
("S40", self.s40_streamed_receiver_not_source),
("S41", self.s41_solid_archive_streamed_install),
@@ -520,20 +534,20 @@ class Runner:
def s8_ambiguous_metadata_rejection(self) -> str:
dir_a = self.fixture_root / "s8-a"
dir_b = self.fixture_root / "s8-b"
copy_game("ggoo", dir_a, version="20260101")
copy_game("ggoo", dir_b, version="20260101")
copy_game("ggoo", dir_a)
copy_game("ggoo", dir_b)
with (dir_b / "ggoo" / "ggoo.eti").open("ab") as handle:
handle.write(b"conflict")
peer_a = self.peer("s8-a", games_dir=dir_a)
peer_b = self.peer("s8-b", games_dir=dir_b)
client = self.peer("s8-client")
connect_many(client, [peer_a, peer_b])
wait_remote_game(client, "ggoo", peer_count=2, version="20260101")
wait_remote_game(client, "ggoo", peer_count=2, version=CATALOG_VERSIONS["ggoo"])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "ggoo", "install": False})
client.wait_for(event_is("download-failed", "ggoo"), timeout=30, description="ggoo failed", waiter=waiter)
assert_not_exists(client.host_games_dir / "ggoo" / "version.ini")
return "conflicting latest ggoo file sizes emitted download-failed and left no version.ini"
return "conflicting catalog-version ggoo file sizes emitted download-failed and left no version.ini"
def s9_missing_game(self) -> str:
client = self.peer("s9-client")
@@ -615,7 +629,7 @@ class Runner:
diff_game_dirs(source_dir / game_id, stage.host_games_dir / game_id)
client = self.peer("s14-client")
connect_many(client, [alpha, stage])
wait_remote_game(client, game_id, peer_count=2, version="20260520")
wait_remote_game(client, game_id, peer_count=2, version=PERF_GAME_VERSION)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": game_id, "install": False})
client.wait_for(event_is("download-finished", game_id), timeout=90, description="client finish", waiter=waiter)
@@ -629,7 +643,11 @@ class Runner:
return f"{game_id} downloaded from two sources, diff matched, chunk totals={totals}"
def s15_three_way_version_skew(self) -> str:
specs = [("s15-a", "20250101"), ("s15-b", "20250201"), ("s15-c", "20250301")]
specs = [
("s15-a", "20150101"),
("s15-b", "20160101"),
("s15-c", CATALOG_VERSIONS["cnc4"]),
]
peers = []
for name, version in specs:
game_dir = self.fixture_root / name
@@ -637,19 +655,19 @@ class Runner:
peers.append(self.peer(name, games_dir=game_dir))
client = self.peer("s15-client")
connect_many(client, peers)
wait_remote_game(client, "cnc4", peer_count=3, version="20250301")
wait_remote_game(client, "cnc4", peer_count=1, version=CATALOG_VERSIONS["cnc4"])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "cnc4", "install": False})
client.wait_for(event_is("download-finished", "cnc4"), timeout=60, description="cnc4 finish", waiter=waiter)
assert_only_chunk_sources(client, "cnc4", {peers[2].ready_addr})
diff_game_dirs(peers[2].host_games_dir / "cnc4", client.host_games_dir / "cnc4")
return "three-way skew selected only 20250301 peer and receiver diffed cleanly"
return "three-way skew exposed only the catalog-version peer and receiver diffed cleanly"
def s16_latest_fanout_with_stale(self) -> str:
def s16_catalog_fanout_with_stale(self) -> str:
specs = [
("s16-a", "20250101"),
("s16-b", "20250301"),
("s16-c", "20250301"),
("s16-a", "20180101"),
("s16-b", CATALOG_VERSIONS["alienswarm"]),
("s16-c", CATALOG_VERSIONS["alienswarm"]),
]
peers = []
for name, version in specs:
@@ -658,7 +676,7 @@ class Runner:
peers.append(self.peer(name, games_dir=game_dir))
client = self.peer("s16-client")
connect_many(client, peers)
wait_remote_game(client, "alienswarm", peer_count=3, version="20250301")
wait_remote_game(client, "alienswarm", peer_count=2, version=CATALOG_VERSIONS["alienswarm"])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="alienswarm finish", waiter=waiter)
@@ -667,13 +685,13 @@ class Runner:
if peers[0].ready_addr in totals:
raise ScenarioError(f"stale peer contributed chunks: {totals}")
diff_game_dirs(peers[1].host_games_dir / "alienswarm", client.host_games_dir / "alienswarm")
return f"latest B/C peers served alienswarm while stale A contributed zero; totals={totals}"
return f"catalog-version B/C peers served alienswarm while stale A contributed zero; totals={totals}"
def s17_latest_conflict_rejection(self) -> str:
def s17_catalog_conflict_rejection(self) -> str:
specs = [
("s17-a", "20250101", False),
("s17-b", "20250301", False),
("s17-c", "20250301", True),
("s17-a", "20150101", False),
("s17-b", CATALOG_VERSIONS["cnc4"], False),
("s17-c", CATALOG_VERSIONS["cnc4"], True),
]
peers = []
for name, version, conflict in specs:
@@ -685,12 +703,12 @@ class Runner:
peers.append(self.peer(name, games_dir=game_dir))
client = self.peer("s17-client")
connect_many(client, peers)
wait_remote_game(client, "cnc4", peer_count=3, version="20250301")
wait_remote_game(client, "cnc4", peer_count=2, version=CATALOG_VERSIONS["cnc4"])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "cnc4", "install": False})
client.wait_for(event_is("download-failed", "cnc4"), timeout=30, description="cnc4 failed", waiter=waiter)
assert_not_exists(client.host_games_dir / "cnc4" / "version.ini")
return "latest-version file conflict failed download and left no committed version.ini"
return "catalog-version file conflict failed download and left no committed version.ini"
def s18_redundant_source_drop(self) -> str:
source_a_dir = self.fixture_root / "s18-a"
@@ -773,13 +791,13 @@ class Runner:
def s23_version_bump_propagation(self) -> str:
alpha = self.peer("s23-alpha")
bravo_dir = self.fixture_root / "s23-bravo"
copy_game("cnc4", bravo_dir, version="20250101")
copy_game("cnc4", bravo_dir, version="20160101")
bravo = self.peer("s23-bravo", games_dir=bravo_dir)
connect_many(alpha, [bravo])
wait_remote_game(alpha, "cnc4", peer_count=1, version="20250101")
(bravo_dir / "cnc4" / "version.ini").write_text("20260501", encoding="utf-8")
wait_remote_game(alpha, "cnc4", peer_count=1, version="20260501")
return "alpha observed cnc4 eti_game_version change 20250101 -> 20260501 without reconnect"
wait_remote_absent(alpha, "cnc4", timeout=5)
(bravo_dir / "cnc4" / "version.ini").write_text(CATALOG_VERSIONS["cnc4"], encoding="utf-8")
wait_remote_game(alpha, "cnc4", peer_count=1, version=CATALOG_VERSIONS["cnc4"])
return "alpha observed stale cnc4 become catalog-version downloadable without reconnect"
def s24_two_clients_one_source(self) -> str:
source = self.peer("s24-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
@@ -876,11 +894,11 @@ class Runner:
def s30_mesh_aggregation(self) -> str:
dirs = []
specs = [
("s30-a", [("ggoo", "20250101"), ("bf1942", "20250101")]),
("s30-b", [("ggoo", "20250101"), ("cnc4", "20250101")]),
("s30-c", [("cnc4", "20250301"), ("cod5", "20250101")]),
("s30-d", [("cnctw", "20250101"), ("coh", "20250101")]),
("s30-e", [("cnctw", "20250201"), ("bf1942", "20250201")]),
("s30-a", [("ggoo", CATALOG_VERSIONS["ggoo"]), ("bf1942", CATALOG_VERSIONS["bf1942"])]),
("s30-b", [("ggoo", CATALOG_VERSIONS["ggoo"]), ("cnc4", CATALOG_VERSIONS["cnc4"])]),
("s30-c", [("cnc4", CATALOG_VERSIONS["cnc4"]), ("cod5", CATALOG_VERSIONS["cod5"])]),
("s30-d", [("cnctw", CATALOG_VERSIONS["cnctw"]), ("coh", CATALOG_VERSIONS["coh"])]),
("s30-e", [("cnctw", CATALOG_VERSIONS["cnctw"]), ("bf1942", CATALOG_VERSIONS["bf1942"])]),
]
peers = []
for name, games in specs:
@@ -892,12 +910,12 @@ class Runner:
client = self.peer("s30-client")
connect_many(client, peers)
expected = {
"ggoo": (2, "20250101"),
"bf1942": (2, "20250201"),
"cnc4": (2, "20250301"),
"cod5": (1, "20250101"),
"cnctw": (2, "20250201"),
"coh": (1, "20250101"),
"ggoo": (2, CATALOG_VERSIONS["ggoo"]),
"bf1942": (2, CATALOG_VERSIONS["bf1942"]),
"cnc4": (2, CATALOG_VERSIONS["cnc4"]),
"cod5": (1, CATALOG_VERSIONS["cod5"]),
"cnctw": (2, CATALOG_VERSIONS["cnctw"]),
"coh": (1, CATALOG_VERSIONS["coh"]),
}
for game_id, (peer_count, version) in expected.items():
wait_remote_game(client, game_id, peer_count=peer_count, version=version)
@@ -907,7 +925,7 @@ class Runner:
raise ScenarioError(f"duplicate game rows: {ids}")
if any(peer["peer_id"] == client.peer_id for peer in client.list_peers()):
raise ScenarioError("client listed itself as a peer")
return f"client aggregated {len(expected)} IDs from 5 peers with expected peer_count/latest versions"
return f"client aggregated {len(expected)} IDs from 5 peers with expected peer_count/catalog versions"
def s31_bootstrapped_peer_source(self) -> str:
source = self.peer("s31-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
@@ -1003,34 +1021,34 @@ class Runner:
assert_not_exists(client.host_games_dir / "mystery-game")
return f"unknown game absent from list-games; download errored '{err['error']}'; no local files"
def s36_latest_singleton(self) -> str:
def s36_catalog_singleton(self) -> str:
peers = []
for index in range(5):
game_dir = self.fixture_root / f"s36-{index}"
version = "20260501" if index == 0 else "20250101"
version = CATALOG_VERSIONS["cnc4"] if index == 0 else "20160101"
copy_game("cnc4", game_dir, version=version)
peers.append(self.peer(f"s36-{index}", games_dir=game_dir))
client = self.peer("s36-client")
connect_many(client, peers)
wait_remote_game(client, "cnc4", peer_count=5, version="20260501")
wait_remote_game(client, "cnc4", peer_count=1, version=CATALOG_VERSIONS["cnc4"])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "cnc4", "install": False})
got = client.wait_for(event_is("got-game-files", "cnc4"), timeout=20, description="got game files", waiter=waiter)
client.wait_for(event_is("download-finished", "cnc4"), timeout=60, description="download finish", waiter=waiter)
latest_addr = peers[0].ready_addr
if latest_addr is None:
raise ScenarioError("latest peer had no ready addr")
catalog_addr = peers[0].ready_addr
if catalog_addr is None:
raise ScenarioError("catalog-version peer had no ready addr")
for item in client.output:
if item.get("type") != "event" or item.get("event") != "download-chunk-finished":
continue
data = item["data"]
if data.get("game_id") == "cnc4" and data.get("peer_addr") != latest_addr:
if data.get("game_id") == "cnc4" and data.get("peer_addr") != catalog_addr:
raise ScenarioError(f"stale peer contributed chunk: {data}")
diff_game_dirs(peers[0].host_games_dir / "cnc4", client.host_games_dir / "cnc4")
descs = got["data"]["file_descriptions"]
if not descs:
raise ScenarioError("got-game-files had no descriptors")
return "client reported latest 20260501 with peer_count=5; only singleton latest peer sent chunks; diff matched"
return "client reported singleton catalog-version peer; stale peers stayed hidden and sent no chunks; diff matched"
def s37_single_source_download_throughput(self) -> str:
source_dir = self.fixture_root / "s37-source"
@@ -1038,7 +1056,7 @@ class Runner:
source = self.peer("s37-source", games_dir=source_dir)
client = self.peer("s37-client")
connect_many(client, [source])
wait_remote_game(client, PERF_GAME_ID, peer_count=1, version="20260520")
wait_remote_game(client, PERF_GAME_ID, peer_count=1, version=PERF_GAME_VERSION)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": PERF_GAME_ID, "install": False})
@@ -1057,7 +1075,7 @@ class Runner:
throughput = finished.get("data", {}).get("throughput")
if not throughput:
raise ScenarioError(f"download-finished did not include throughput: {finished}")
expected_bytes = PERF_GAME_SIZE + len("20260520")
expected_bytes = PERF_GAME_SIZE + len(PERF_GAME_VERSION)
if int(throughput["bytes"]) != expected_bytes:
raise ScenarioError(
f"throughput byte count mismatch: {throughput['bytes']} != {expected_bytes}"
@@ -1071,6 +1089,116 @@ class Runner:
f"{throughput['chunks']} chunks"
)
def s38_first_play_launch_settings(self) -> str:
client_dir = self.fixture_root / "s38-client"
copy_game("css", client_dir)
client = self.peer(
"s38-client",
games_dir=client_dir,
extra_args=["--unrar", "/usr/local/bin/unrar"],
)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "install", "game_id": "css"})
client.wait_for(
event_is("install-finished", "css"),
timeout=30,
description="css install",
waiter=waiter,
)
wait_local_game(client, "css", downloaded=True, installed=True)
marker = client.host_state_dir / "games" / "css" / "launch_settings_applied"
if marker.exists():
raise ScenarioError("launch settings marker existed before first play")
local_root = client.host_games_dir / "css" / "local"
account_file = local_root / "profiles" / "local" / "account_name.txt"
language_file = local_root / "profiles" / "local" / "language.txt"
ini_file = (
local_root
/ "engine"
/ "bin"
/ "win64"
/ "steam_settings"
/ "SmartSteamEmu.ini"
)
for path in [account_file, language_file, ini_file]:
if not path.is_file():
raise ScenarioError(f"expected installed launch settings file: {path}")
if b"PersonaName = stubplayer\r\n" not in ini_file.read_bytes():
raise ScenarioError("installed SmartSteamEmu.ini did not preserve CRLF stub PersonaName")
first = client.send(
{
"cmd": "play",
"game_id": "css",
"username": "Lan Hero",
"language": "german",
}
)["data"]["outcome"]
expected_first = {
"already_applied": False,
"account_name_written": True,
"language_written": True,
"persona_name_written": True,
}
if first != expected_first:
raise ScenarioError(f"unexpected first play outcome: {first}")
if not marker.is_file():
raise ScenarioError("launch settings marker was not written after first play")
if account_file.read_text(encoding="utf-8") != "Lan Hero":
raise ScenarioError("account_name.txt was not stamped with username")
if language_file.read_text(encoding="utf-8") != "german":
raise ScenarioError("language.txt was not stamped with language")
stamped_ini = ini_file.read_bytes()
if b"PersonaName = Lan Hero\r\n" not in stamped_ini:
raise ScenarioError("PersonaName was not stamped with CRLF preserved")
if b"AppId = 240\r\n" not in stamped_ini or b"Language = english\r\n" not in stamped_ini:
raise ScenarioError("SmartSteamEmu.ini sibling lines were not preserved")
client.docker_exec(
"sh",
"-c",
"printf resetaccount > /games/css/local/profiles/local/account_name.txt",
)
client.docker_exec(
"sh",
"-c",
"printf resetlang > /games/css/local/profiles/local/language.txt",
)
client.docker_exec(
"sh",
"-c",
"printf '[Settings]\\r\\nAppId = 240\\r\\n"
"PersonaName = resetplayer\\r\\nLanguage = english\\r\\n' > "
"/games/css/local/engine/bin/win64/steam_settings/SmartSteamEmu.ini",
)
second = client.send(
{
"cmd": "play",
"game_id": "css",
"username": "Second User",
"language": "french",
}
)["data"]["outcome"]
expected_second = {
"already_applied": True,
"account_name_written": False,
"language_written": False,
"persona_name_written": False,
}
if second != expected_second:
raise ScenarioError(f"unexpected second play outcome: {second}")
if account_file.read_text(encoding="utf-8") != "resetaccount":
raise ScenarioError("second play rewrote account_name.txt despite marker")
if language_file.read_text(encoding="utf-8") != "resetlang":
raise ScenarioError("second play rewrote language.txt despite marker")
if b"PersonaName = resetplayer\r\n" not in ini_file.read_bytes():
raise ScenarioError("second play rewrote PersonaName despite marker")
return "css first play stamped launch settings once; second play respected the marker"
def stream_install_cnctw(self, prefix: str) -> tuple[Peer, Peer]:
source_dir = self.fixture_root / f"{prefix}-bravo"
copy_game("cnctw", source_dir, version="20160128")
@@ -1080,12 +1208,6 @@ class Runner:
wait_remote_game(client, "cnctw", peer_count=1)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "stream-install", "game_id": "cnctw"})
client.wait_for(
event_is("got-game-files", "cnctw"),
timeout=20,
description="got cnctw files",
waiter=waiter,
)
client.wait_for(
event_is("download-begin", "cnctw"),
timeout=20,
@@ -1148,9 +1270,11 @@ class Runner:
f"streamed byte count mismatch: {streamed_bytes} != {expected_bytes}"
)
wait_no_outbound_transfer(source, "cnctw")
return (
"cnctw streamed into local/ only; root archive and version.ini absent; "
f"payload hashes={actual}"
f"payload hashes={actual}; source outbound transfer drained"
)
def s40_streamed_receiver_not_source(self) -> str:
@@ -1192,12 +1316,6 @@ class Runner:
waiter = LineWaiter(len(client.output))
client.send({"cmd": "stream-install", "game_id": "cnctw"})
client.wait_for(
event_is("got-game-files", "cnctw"),
timeout=20,
description="got solid cnctw files",
waiter=waiter,
)
client.wait_for(
event_is("download-finished", "cnctw"),
timeout=60,
@@ -1281,12 +1399,6 @@ class Runner:
waiter = LineWaiter(len(client.output))
client.send({"cmd": "stream-install", "game_id": "cnctw"})
client.wait_for(
event_is("got-game-files", "cnctw"),
timeout=20,
description="got retry cnctw files",
waiter=waiter,
)
client.wait_for(
event_is("download-finished", "cnctw"),
timeout=60,
@@ -1574,6 +1686,7 @@ def copy_game(game_id: str, destination_games_dir: Path, *, version: str | None
shutil.rmtree(destination)
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(source, destination)
version = version if version is not None else CATALOG_VERSIONS.get(game_id)
if version is not None:
(destination / "version.ini").write_text(version, encoding="utf-8")
@@ -1610,14 +1723,14 @@ def create_many_small_game(root: Path) -> None:
for index in range(20):
child = root / f"file-{index:02}.bin"
child.write_bytes(hashlib.sha256(f"small-{index}".encode()).digest() * 8)
(root / "version.ini").write_text("20250101", encoding="utf-8")
(root / "version.ini").write_text(CATALOG_VERSIONS.get(root.name, "20250101"), encoding="utf-8")
def create_large_sparse_game(root: Path, *, size: int) -> None:
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True)
(root / "version.ini").write_text("20260520", encoding="utf-8")
(root / "version.ini").write_text(PERF_GAME_VERSION, encoding="utf-8")
archive = root / f"{root.name}.eti"
with archive.open("wb") as handle:
handle.truncate(size)
@@ -1754,6 +1867,20 @@ def wait_no_active(peer: Peer, game_id: str, timeout: float = 20) -> None:
raise ScenarioError(f"{peer.name} still has active operation for {game_id}: {last_active}")
def wait_no_outbound_transfer(peer: Peer, game_id: str, timeout: float = 20) -> None:
deadline = time.monotonic() + timeout
last_active: dict[str, int] = {}
while time.monotonic() < deadline:
active = peer.status()["active_outbound_transfers"]
last_active = active
if active.get(game_id, 0) == 0:
return
time.sleep(0.4)
raise ScenarioError(
f"{peer.name} still has outbound transfer for {game_id}: {last_active}"
)
def assert_game_state(
game: dict[str, Any],
*,
+13 -2
View File
@@ -19,6 +19,7 @@ use lanspread_peer::{
ExternalUnrarStreamProvider,
InstallOperation,
NoopStreamInstallProvider,
OutboundTransfers,
PeerCommand,
PeerEvent,
PeerGameDB,
@@ -119,6 +120,7 @@ struct SharedState {
state: RwLock<CliState>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: OutboundTransfers,
notify: Notify,
games_dir: PathBuf,
state_dir: PathBuf,
@@ -137,6 +139,7 @@ async fn main() -> eyre::Result<()> {
let (tx_events, rx_events) = mpsc::unbounded_channel();
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let catalog = Arc::new(RwLock::new(catalog));
let active_outbound_transfers: OutboundTransfers = Arc::new(RwLock::new(HashMap::new()));
let unrar_for_streaming = args.unrar.clone().or_else(default_unrar_program);
let unpacker: Arc<dyn lanspread_peer::Unpacker> = match args.unrar.clone() {
Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)),
@@ -155,7 +158,7 @@ async fn main() -> eyre::Result<()> {
catalog.clone(),
PeerStartOptions {
state_dir: Some(args.state_dir.clone()),
active_outbound_transfers: None,
active_outbound_transfers: Some(active_outbound_transfers.clone()),
stream_install_provider: Some(stream_install_provider),
},
)?;
@@ -165,6 +168,7 @@ async fn main() -> eyre::Result<()> {
state: RwLock::new(CliState::default()),
peer_game_db,
catalog: catalog.clone(),
active_outbound_transfers,
notify: Notify::new(),
games_dir: args.games_dir.clone(),
state_dir: args.state_dir.clone(),
@@ -261,7 +265,6 @@ async fn handle_command(
CliCommand::StreamInstall { game_id } => {
ensure_catalog_game(shared, game_id).await?;
ensure_no_active_operation(shared, game_id).await?;
let _ = game_files_for_download(sender, shared, game_id).await?;
sender.send(PeerCommand::StreamInstallGame {
id: game_id.clone(),
})?;
@@ -314,12 +317,20 @@ async fn handle_command(
async fn status(shared: &SharedState) -> eyre::Result<Value> {
let state = shared.state.read().await;
let peer_count = shared.peer_game_db.read().await.peer_snapshots().len();
let active_outbound_transfers = {
let active = shared.active_outbound_transfers.read().await;
active
.iter()
.map(|(game_id, transfers)| (game_id.clone(), transfers.len()))
.collect::<HashMap<_, _>>()
};
Ok(json!({
"local_peer": state.local_peer.clone(),
"peer_count": peer_count,
"local_games": state.local_games.len(),
"remote_games": state.remote_games.len(),
"active_operations": active_operations_json(&state.active_operations),
"active_outbound_transfers": active_outbound_transfers,
}))
}
+24 -4
View File
@@ -14,8 +14,8 @@ It is designed to run headless other crates (most notably
roots are announced or served.
- `PeerCommand` represents the small control surface exposed to the UI layer:
`ListGames`, `GetGame`, `FetchLatestFromPeers`, `DownloadGameFiles`,
`InstallGame`, `UninstallGame`, `RemoveDownloadedGame`, `CancelDownload`,
`SetGameDir`, and `GetPeerCount`.
`StreamInstallGame`, `InstallGame`, `UninstallGame`, `RemoveDownloadedGame`,
`CancelDownload`, `SetGameDir`, and `GetPeerCount`.
- `PeerEvent` enumerates everything the peer runtime reports back to the UI:
library snapshots, download/install/uninstall lifecycle updates, runtime
failures, and peer membership changes.
@@ -28,8 +28,8 @@ lifetime of the process:
1. **Server component** (`run_server_component`) listens for QUIC connections,
advertises via mDNS, and serves `Request::ListGames`, `Request::GetGame`,
`Request::GetGameFileData`, and `Request::GetGameFileChunk` by reading from
the local game directory.
`Request::GetGameFileData`, `Request::GetGameFileChunk`, and
`Request::StreamInstall` by reading from the local game directory.
2. **Discovery loop** (`run_peer_discovery`) uses the `lanspread-mdns`
helper to discover other peers. The blocking mDNS work is executed on a
dedicated thread via `tokio::task::spawn_blocking` so that the Tokio runtime
@@ -87,6 +87,26 @@ When the UI asks to download a game:
7. After a successful sentinel commit, `PeerEvent::DownloadGameFilesFinished`
is emitted and the peer auto-runs the install transaction.
### Streamed Install Pipeline
Low-disk installs use `PeerCommand::StreamInstallGame` instead of the normal
archive download pipeline. The peer core owns the whole operation: it refreshes
file metadata from catalog-version peers, runs the same majority file-size
validation used by normal downloads, selects a validated peer list, and emits
the regular download/install lifecycle events while streaming archive-expanded
bytes directly into a `StreamedInstallTransaction`.
The sender-side `StreamInstallProvider` writes control and chunk frames through
a cancellable `StreamInstallFrameSink`. If the QUIC writer fails because the
receiver cancelled or disconnected, the sink wakes any producer blocked on the
bounded frame channel and lets the transfer guard drop normally.
Each failed peer attempt rolls back its staging directory before trying the next
validated peer. A transaction that created a previously missing game root
removes that root again when rollback leaves it empty. Once staging has been
renamed to `local/`, post-promote intent or launch-settings cleanup failures are
logged for startup recovery rather than reported as a failed install.
`PeerCommand::CancelDownload` cancels the tracked download token for an active
transfer. The transfer task remains responsible for clearing `active_operations`,
discarding partial payload files, and refreshing the settled local snapshot, so
+160 -81
View File
@@ -471,38 +471,6 @@ pub async fn handle_stream_install_game_command(
return;
}
let expected_version = catalog_expected_version(ctx, &id).await;
let mut peers = {
match ctx
.peer_game_db
.read()
.await
.validate_file_sizes_majority(&id, expected_version.as_deref())
{
Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => {
peer_whitelist
}
Ok(_) => {
log::error!("No trusted peers available for streamed install of {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
Err(err) => {
log::error!(
"File size majority validation failed for streamed install {id}: {err}"
);
send_download_failed(tx_notify_ui, &id);
return;
}
}
};
peers.sort();
if peers.is_empty() {
log::error!("No peer selected for streamed install of {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
@@ -516,6 +484,7 @@ pub async fn handle_stream_install_game_command(
}
}
let expected_version = catalog_expected_version(ctx, &id).await;
let cancel_token = ctx.shutdown.child_token();
ctx.active_downloads
.write()
@@ -525,7 +494,14 @@ pub async fn handle_stream_install_game_command(
let ctx_clone = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
ctx.task_tracker.spawn(async move {
run_stream_install_operation(ctx_clone, tx_notify_ui, id, game_root, peers, cancel_token)
run_stream_install_operation(
ctx_clone,
tx_notify_ui,
id,
game_root,
expected_version,
cancel_token,
)
.await;
});
}
@@ -575,7 +551,7 @@ async fn run_stream_install_operation(
tx_notify_ui: UnboundedSender<PeerEvent>,
id: String,
game_root: PathBuf,
peer_addrs: Vec<SocketAddr>,
expected_version: Option<String>,
cancel_token: CancellationToken,
) {
let download_guard = OperationGuard::download(
@@ -590,42 +566,43 @@ async fn run_stream_install_operation(
PeerEvent::DownloadGameFilesBegin { id: id.clone() },
);
let mut last_receive_error = None;
for peer_addr in peer_addrs {
if cancel_token.is_cancelled() {
last_receive_error = Some(eyre::eyre!("streamed install for {id} was cancelled"));
break;
}
let transaction =
match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id).await {
Ok(transaction) => transaction,
let peer_addrs =
match select_stream_install_peers(&ctx, &id, expected_version.as_deref(), &cancel_token)
.await
{
Ok(peers) => peers,
Err(err) => {
log::error!("Failed to prepare streamed install for {id}: {err}");
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false)
let download_was_cancelled = cancel_token.is_cancelled();
if download_was_cancelled {
log::info!("Streamed install preflight cancelled for {id}: {err}");
} else {
log::error!("Streamed install preflight failed for {id}: {err}");
}
finish_failed_stream_download(
&ctx,
&tx_notify_ui,
&id,
download_guard,
download_was_cancelled,
)
.await;
return;
}
};
let receive_result = receive_streamed_install(
peer_addr,
&id,
transaction.staging_dir(),
tx_notify_ui.clone(),
cancel_token.clone(),
)
.await;
match receive_result {
Ok(()) => {
if transition_download_to_install(
match receive_streamed_install_from_peers(
&ctx,
&tx_notify_ui,
&id,
OperationKind::Installing,
&game_root,
&peer_addrs,
&cancel_token,
)
.await
{
Ok(transaction) => {
if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing)
.await
{
clear_active_download(&ctx, &id).await;
send_download_finished(&tx_notify_ui, &id);
@@ -637,18 +614,60 @@ async fn run_stream_install_operation(
if let Err(err) = transaction.rollback().await {
log::error!("Failed to roll back streamed install for {id}: {err}");
}
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false)
.await;
return;
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await;
}
Err(err) => {
let download_was_cancelled = cancel_token.is_cancelled();
if download_was_cancelled {
log::info!("Streamed install download cancelled for {id}: {err}");
} else {
log::error!("Streamed install download failed for {id}: {err}");
}
finish_failed_stream_download(
&ctx,
&tx_notify_ui,
&id,
download_guard,
download_was_cancelled,
)
.await;
}
}
}
async fn receive_streamed_install_from_peers(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
game_root: &Path,
peer_addrs: &[SocketAddr],
cancel_token: &CancellationToken,
) -> eyre::Result<install::StreamedInstallTransaction> {
let mut last_receive_error = None;
for &peer_addr in peer_addrs {
if cancel_token.is_cancelled() {
eyre::bail!("streamed install for {id} was cancelled");
}
let transaction =
install::begin_streamed_install(game_root, ctx.state_dir.as_ref(), id).await?;
let receive_result = receive_streamed_install(
peer_addr,
id,
transaction.staging_dir(),
tx_notify_ui.clone(),
cancel_token.clone(),
)
.await;
match receive_result {
Ok(()) => return Ok(transaction),
Err(err) => {
if let Err(rollback_err) = transaction.rollback().await {
log::error!("Failed to roll back streamed install for {id}: {rollback_err}");
}
if cancel_token.is_cancelled() {
log::info!("Streamed install download cancelled for {id}: {err}");
last_receive_error = Some(err);
break;
return Err(err);
}
log::warn!(
@@ -659,24 +678,84 @@ async fn run_stream_install_operation(
}
}
let download_was_cancelled = cancel_token.is_cancelled();
if let Some(err) = last_receive_error {
if download_was_cancelled {
log::info!("Streamed install download cancelled for {id}: {err}");
} else {
log::error!("Streamed install download failed for {id}: {err}");
Err(last_receive_error.unwrap_or_else(|| {
eyre::eyre!("streamed install download failed for {id}: no peer attempts were made")
}))
}
async fn select_stream_install_peers(
ctx: &Ctx,
id: &str,
expected_version: Option<&str>,
cancel_token: &CancellationToken,
) -> eyre::Result<Vec<SocketAddr>> {
let mut metadata_peers = {
ctx.peer_game_db
.read()
.await
.peers_with_expected_version(id, expected_version)
};
metadata_peers.sort();
if metadata_peers.is_empty() {
eyre::bail!("no peers have game {id}");
}
} else {
log::error!("Streamed install download failed for {id}: no peer attempts were made");
refresh_stream_install_file_details(ctx, id, &metadata_peers, cancel_token).await?;
let mut peers = match ctx
.peer_game_db
.read()
.await
.validate_file_sizes_majority(id, expected_version)
{
Ok((validated_files, peer_whitelist, _)) if !validated_files.is_empty() => peer_whitelist,
Ok(_) => {
eyre::bail!("no trusted peers available for streamed install of {id}");
}
finish_failed_stream_download(
&ctx,
&tx_notify_ui,
&id,
download_guard,
download_was_cancelled,
)
.await;
Err(err) => {
return Err(err.wrap_err(format!(
"file size majority validation failed for streamed install {id}"
)));
}
};
peers.sort();
if peers.is_empty() {
eyre::bail!("no peer selected for streamed install of {id}");
}
Ok(peers)
}
async fn refresh_stream_install_file_details(
ctx: &Ctx,
id: &str,
peers: &[SocketAddr],
cancel_token: &CancellationToken,
) -> eyre::Result<()> {
let mut fetched_any = false;
for &peer_addr in peers {
if cancel_token.is_cancelled() {
eyre::bail!("streamed install for {id} was cancelled");
}
match request_game_details_and_update(peer_addr, id, ctx.peer_game_db.clone()).await {
Ok(_) => {
log::info!("Fetched streamed-install file list for {id} from peer {peer_addr}");
fetched_any = true;
}
Err(err) => {
log::error!(
"Failed to fetch streamed-install files for {id} from {peer_addr}: {err}"
);
}
}
}
if !fetched_any {
eyre::bail!("failed to retrieve game files for {id} from any peer");
}
Ok(())
}
async fn finish_failed_stream_download(
+150 -19
View File
@@ -39,6 +39,7 @@ pub struct StreamedInstallTransaction {
id: String,
staging: PathBuf,
eti_version: Option<String>,
created_game_root: bool,
}
impl StreamedInstallTransaction {
@@ -49,40 +50,61 @@ impl StreamedInstallTransaction {
pub async fn commit(self) -> eyre::Result<()> {
let local = local_dir(&self.game_root);
let result = async {
tokio::fs::rename(&self.staging, &local)
if let Err(err) = tokio::fs::rename(&self.staging, &local)
.await
.wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))?;
reset_launch_settings_marker(&self.state_dir, &self.id).await?;
write_intent(
&self.state_dir,
&self.id,
&InstallIntent::none(&self.id, self.eti_version.clone()),
)
.await
}
.await;
if result.is_err() {
.wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))
{
if let Err(cleanup_err) = remove_dir_all_if_exists(&self.staging).await {
log::warn!(
"Failed to clean streamed install staging {}: {cleanup_err}",
self.staging.display()
);
}
if let Err(cleanup_err) =
remove_created_empty_game_root(&self.game_root, self.created_game_root).await
{
log::warn!(
"Failed to clean streamed install game root {}: {cleanup_err}",
self.game_root.display()
);
}
let _ = write_intent(
&self.state_dir,
&self.id,
&InstallIntent::none(&self.id, self.eti_version.clone()),
)
.await;
return Err(err);
}
result
if let Err(err) = reset_launch_settings_marker(&self.state_dir, &self.id).await {
log::error!(
"Streamed install for {} was promoted but launch-settings marker reset failed: {err}",
self.id
);
}
if let Err(err) = write_intent(
&self.state_dir,
&self.id,
&InstallIntent::none(&self.id, self.eti_version.clone()),
)
.await
{
log::error!(
"Streamed install for {} was promoted but intent cleanup failed: {err}",
self.id
);
}
Ok(())
}
pub async fn rollback(self) -> eyre::Result<()> {
let staging_result = remove_dir_all_if_exists(&self.staging).await;
let cleanup_result = async {
remove_dir_all_if_exists(&self.staging).await?;
remove_created_empty_game_root(&self.game_root, self.created_game_root).await
}
.await;
let intent_result = write_intent(
&self.state_dir,
&self.id,
@@ -90,7 +112,7 @@ impl StreamedInstallTransaction {
)
.await;
staging_result?;
cleanup_result?;
intent_result
}
}
@@ -104,18 +126,36 @@ pub async fn begin_streamed_install(
eyre::bail!("game {id} is already installed");
}
let created_game_root = !path_exists(game_root).await;
tokio::fs::create_dir_all(game_root).await?;
let eti_version = read_downloaded_version(game_root).await;
write_intent(
if let Err(err) = write_intent(
state_dir,
id,
&InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()),
)
.await?;
.await
{
if let Err(cleanup_err) = remove_created_empty_game_root(game_root, created_game_root).await
{
log::warn!(
"Failed to clean streamed install game root {}: {cleanup_err}",
game_root.display()
);
}
return Err(err);
}
let staging = installing_dir(game_root);
if let Err(err) = prepare_owned_empty_dir(&staging).await {
let _ = write_intent(state_dir, id, &InstallIntent::none(id, eti_version)).await;
if let Err(cleanup_err) = remove_created_empty_game_root(game_root, created_game_root).await
{
log::warn!(
"Failed to clean streamed install game root {}: {cleanup_err}",
game_root.display()
);
}
return Err(err);
}
@@ -127,6 +167,7 @@ pub async fn begin_streamed_install(
id: id.to_string(),
staging,
eti_version,
created_game_root,
})
}
@@ -586,6 +627,28 @@ async fn remove_dir_all_if_exists(path: &Path) -> eyre::Result<()> {
}
}
async fn remove_created_empty_game_root(game_root: &Path, created: bool) -> eyre::Result<()> {
if !created {
return Ok(());
}
remove_empty_dir_if_exists(game_root).await
}
async fn remove_empty_dir_if_exists(path: &Path) -> eyre::Result<()> {
match tokio::fs::remove_dir(path).await {
Ok(()) => Ok(()),
Err(err)
if matches!(
err.kind(),
ErrorKind::NotFound | ErrorKind::DirectoryNotEmpty
) =>
{
Ok(())
}
Err(err) => Err(err.into()),
}
}
async fn path_is_dir(path: &Path) -> bool {
tokio::fs::metadata(path)
.await
@@ -727,6 +790,74 @@ mod tests {
assert!(!launch_settings_applied_path(state.path(), "game").exists());
}
#[tokio::test]
async fn streamed_install_rollback_removes_new_empty_game_root() {
let temp = TempDir::new("lanspread-install");
let state = test_state();
let root = temp.path().join("streamed-game");
let transaction = begin_streamed_install(&root, state.path(), "streamed-game")
.await
.expect("streamed transaction should begin");
assert!(transaction.staging_dir().is_dir());
transaction
.rollback()
.await
.expect("streamed rollback should succeed");
assert!(!root.exists());
let intent = read_intent(state.path(), "streamed-game").await;
assert_eq!(intent.state, InstallIntentState::None);
}
#[tokio::test]
async fn streamed_install_rollback_keeps_existing_game_root() {
let temp = TempDir::new("lanspread-install");
let state = test_state();
let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101");
let transaction = begin_streamed_install(&root, state.path(), "game")
.await
.expect("streamed transaction should begin");
transaction
.rollback()
.await
.expect("streamed rollback should succeed");
assert!(root.is_dir());
assert!(root.join("version.ini").is_file());
assert!(!root.join(INSTALLING_DIR).exists());
}
#[tokio::test]
async fn streamed_install_commit_succeeds_when_post_promote_intent_cleanup_fails() {
let temp = TempDir::new("lanspread-install");
let state = test_state();
let root = temp.game_root();
let transaction = begin_streamed_install(&root, state.path(), "game")
.await
.expect("streamed transaction should begin");
write_file(&transaction.staging_dir().join("payload.txt"), b"installed");
let game_state_dir = crate::state_paths::game_state_dir(state.path(), "game");
std::fs::remove_dir_all(&game_state_dir).expect("game state dir should be removed");
write_file(&game_state_dir, b"not a directory");
transaction
.commit()
.await
.expect("promoted streamed install should be reported as success");
assert_eq!(
std::fs::read(root.join(LOCAL_DIR).join("payload.txt"))
.expect("promoted payload should be present"),
b"installed"
);
}
#[tokio::test]
async fn install_unpacks_multiple_root_eti_archives_in_sorted_order() {
let temp = TempDir::new("lanspread-install");
+2
View File
@@ -80,12 +80,14 @@ use crate::{
state_paths::resolve_state_dir,
};
pub use crate::{
context::OutboundTransfers,
launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once},
startup::PeerRuntimeHandle,
state_paths::{launch_settings_applied_path, setup_done_path},
stream_install::{
ExternalUnrarStreamProvider,
NoopStreamInstallProvider,
StreamInstallFrameSink,
StreamInstallFuture,
StreamInstallProvider,
},
+86 -47
View File
@@ -77,11 +77,37 @@ impl SenderArchiveIntegrity {
pub type StreamInstallFuture<'a> = Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + 'a>>;
#[derive(Clone)]
pub struct StreamInstallFrameSink {
frames: mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken,
}
impl StreamInstallFrameSink {
fn new(frames: mpsc::Sender<StreamInstallFrame>, cancel_token: CancellationToken) -> Self {
Self {
frames,
cancel_token,
}
}
pub async fn send(&self, frame: StreamInstallFrame) -> eyre::Result<()> {
tokio::select! {
() = self.cancel_token.cancelled() => {
eyre::bail!("streamed install frame send was cancelled");
}
result = self.frames.send(frame) => {
result.map_err(|_| eyre::eyre!("streamed install frame receiver closed"))
}
}
}
}
pub trait StreamInstallProvider: Send + Sync {
fn stream_archive<'a>(
&'a self,
archive: &'a Path,
frames: mpsc::Sender<StreamInstallFrame>,
frames: StreamInstallFrameSink,
cancel_token: CancellationToken,
) -> StreamInstallFuture<'a>;
}
@@ -93,7 +119,7 @@ impl StreamInstallProvider for NoopStreamInstallProvider {
fn stream_archive<'a>(
&'a self,
archive: &'a Path,
_frames: mpsc::Sender<StreamInstallFrame>,
_frames: StreamInstallFrameSink,
_cancel_token: CancellationToken,
) -> StreamInstallFuture<'a> {
Box::pin(async move {
@@ -121,7 +147,7 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider {
fn stream_archive<'a>(
&'a self,
archive: &'a Path,
frames: mpsc::Sender<StreamInstallFrame>,
frames: StreamInstallFrameSink,
cancel_token: CancellationToken,
) -> StreamInstallFuture<'a> {
Box::pin(async move {
@@ -132,14 +158,12 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider {
.unwrap_or("archive.eti")
.to_string();
send_stream_frame(
&frames,
StreamInstallFrame::ArchiveBegin {
frames
.send(StreamInstallFrame::ArchiveBegin {
archive_name: archive_name.clone(),
solid: listing.solid,
unpacked_size: listing.unpacked_size(),
},
)
})
.await?;
stream_unrar_entries(
@@ -151,7 +175,9 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider {
)
.await?;
send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await
frames
.send(StreamInstallFrame::ArchiveEnd { archive_name })
.await
})
}
}
@@ -268,9 +294,13 @@ fn push_rar_entry(entries: &mut Vec<RarEntry>, draft: RarEntryDraft) -> eyre::Re
let size = draft
.size
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?;
let crc32 = draft
.crc32
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?;
let crc32 = match (size, draft.crc32) {
(_, Some(crc32)) => crc32,
(0, None) => 0,
(_, None) => {
eyre::bail!("RAR file entry {relative_path} has no CRC32");
}
};
(size, Some(crc32))
}
RarEntryKind::Directory => (0, None),
@@ -289,7 +319,7 @@ async fn stream_unrar_entries(
program: &Path,
archive: &Path,
entries: &[RarEntry],
frames: &mpsc::Sender<StreamInstallFrame>,
frames: &StreamInstallFrameSink,
cancel_token: CancellationToken,
) -> eyre::Result<()> {
let mut child = Command::new(program)
@@ -315,26 +345,22 @@ async fn stream_unrar_entries(
match entry.kind {
RarEntryKind::Directory => {
send_stream_frame(
frames,
StreamInstallFrame::Directory {
frames
.send(StreamInstallFrame::Directory {
relative_path: entry.relative_path.clone(),
},
)
})
.await?;
}
RarEntryKind::File => {
let Some(crc32) = entry.crc32 else {
eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path);
};
send_stream_frame(
frames,
StreamInstallFrame::FileBegin {
frames
.send(StreamInstallFrame::FileBegin {
relative_path: entry.relative_path.clone(),
size: entry.size,
crc32,
},
)
})
.await?;
stream_unrar_file_from_stdout(
&mut stdout,
@@ -345,12 +371,10 @@ async fn stream_unrar_entries(
&cancel_token,
)
.await?;
send_stream_frame(
frames,
StreamInstallFrame::FileEnd {
frames
.send(StreamInstallFrame::FileEnd {
relative_path: entry.relative_path.clone(),
},
)
})
.await?;
}
}
@@ -388,7 +412,7 @@ async fn stream_unrar_file_from_stdout(
stdout: &mut (impl AsyncRead + Unpin),
archive: &Path,
entry: &RarEntry,
frames: &mpsc::Sender<StreamInstallFrame>,
frames: &StreamInstallFrameSink,
buffer: &mut [u8],
cancel_token: &CancellationToken,
) -> eyre::Result<()> {
@@ -405,12 +429,10 @@ async fn stream_unrar_file_from_stdout(
);
}
send_stream_frame(
frames,
StreamInstallFrame::FileChunk {
frames
.send(StreamInstallFrame::FileChunk {
bytes: Bytes::copy_from_slice(&buffer[..read]),
},
)
})
.await?;
remaining = remaining.saturating_sub(u64::try_from(read)?);
}
@@ -446,16 +468,6 @@ async fn wait_unrar_child(
}
}
async fn send_stream_frame(
frames: &mpsc::Sender<StreamInstallFrame>,
frame: StreamInstallFrame,
) -> eyre::Result<()> {
frames
.send(frame)
.await
.map_err(|_| eyre::eyre!("streamed install frame receiver closed"))
}
pub(crate) async fn send_stream_install_error(
tx: SendStream,
message: impl Into<String>,
@@ -501,6 +513,7 @@ pub(crate) async fn send_game_install_stream(
let (frame_tx, mut frame_rx) = mpsc::channel(FRAME_CHANNEL_DEPTH);
let producer_cancel = cancel_token.child_token();
let frame_sink = StreamInstallFrameSink::new(frame_tx, producer_cancel.clone());
let game_id_for_producer = game_id.to_string();
let producer = tokio::spawn({
let provider = provider.clone();
@@ -512,16 +525,16 @@ pub(crate) async fn send_game_install_stream(
}
if let Err(err) = provider
.stream_archive(&archive, frame_tx.clone(), producer_cancel.clone())
.stream_archive(&archive, frame_sink.clone(), producer_cancel.clone())
.await
{
let message = err.to_string();
let _ = frame_tx.send(StreamInstallFrame::Error { message }).await;
let _ = frame_sink.send(StreamInstallFrame::Error { message }).await;
return Err(err);
}
}
let _ = frame_tx.send(StreamInstallFrame::Complete).await;
let _ = frame_sink.send(StreamInstallFrame::Complete).await;
Ok(())
}
});
@@ -536,6 +549,7 @@ pub(crate) async fn send_game_install_stream(
break;
}
}
drop(frame_rx);
let close_result = framed_tx
.close()
@@ -876,6 +890,31 @@ Details: RAR 5
assert!(err.to_string().contains("has no CRC32"));
}
#[test]
fn accepts_zero_size_unrar_file_entries_without_crc32() {
let listing = parse_unrar_listing(
r#"
Archive: game.eti
Details: RAR 5
Name: bin/empty.cfg
Type: File
Size: 0
"#,
)
.expect("empty file without CRC32 should parse as CRC32 zero");
assert_eq!(
listing.entries,
vec![RarEntry {
relative_path: "bin/empty.cfg".to_string(),
kind: RarEntryKind::File,
size: 0,
crc32: Some(0),
}]
);
}
#[test]
fn sender_archive_integrity_accepts_matching_size_and_crc32() {
let bytes = b"payload";
@@ -85,7 +85,6 @@ struct LanSpreadState {
peer_runtime: Arc<RwLock<Option<PeerRuntimeHandle>>>,
games: Arc<RwLock<GameDB>>,
active_operations: Arc<RwLock<HashMap<String, UiOperationKind>>>,
pending_stream_installs: Arc<RwLock<HashSet<String>>>,
games_folder: Arc<RwLock<String>>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<GameCatalog>>,
@@ -259,16 +258,6 @@ async fn install_game(
log::warn!("Game already has an active operation: {id}");
return Ok(false);
}
if state
.inner()
.pending_stream_installs
.read()
.await
.contains(&id)
{
log::warn!("Game already has a pending streamed install: {id}");
return Ok(false);
}
let peer_ctrl_arc = state.inner().peer_ctrl.clone();
let peer_ctrl = peer_ctrl_arc.read().await.clone();
@@ -323,16 +312,6 @@ async fn stream_install_game(
log::warn!("Game already has an active operation: {id}");
return Ok(false);
}
if state
.inner()
.pending_stream_installs
.read()
.await
.contains(&id)
{
log::warn!("Game already has a pending streamed install: {id}");
return Ok(false);
}
let Some((downloaded, installed, peer_count)) = state
.inner()
@@ -360,19 +339,8 @@ async fn stream_install_game(
return Ok(false);
};
{
let mut pending = state.inner().pending_stream_installs.write().await;
pending.insert(id.clone());
}
if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id.clone())) {
log::error!("Failed to send PeerCommand::GetGame for streamed install: {e:?}");
state
.inner()
.pending_stream_installs
.write()
.await
.remove(&id);
if let Err(e) = peer_ctrl.send(PeerCommand::StreamInstallGame { id }) {
log::error!("Failed to send PeerCommand::StreamInstallGame: {e:?}");
return Ok(false);
}
@@ -2092,7 +2060,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
}
PeerEvent::NoPeersHaveGame { id } => {
log::warn!("PeerEvent::NoPeersHaveGame received for {id}");
clear_pending_stream_install(app_handle, &id).await;
emit_game_id_event(
app_handle,
"game-no-peers",
@@ -2131,7 +2098,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
}
PeerEvent::DownloadGameFilesFailed { id } => {
log::warn!("PeerEvent::DownloadGameFilesFailed received");
clear_pending_stream_install(app_handle, &id).await;
emit_game_id_event(
app_handle,
"game-download-failed",
@@ -2141,7 +2107,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
}
PeerEvent::DownloadGameFilesAllPeersGone { id } => {
log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}");
clear_pending_stream_install(app_handle, &id).await;
emit_game_id_event(
app_handle,
"game-download-peers-gone",
@@ -2280,27 +2245,17 @@ async fn handle_got_game_files(
);
let state = app_handle.state::<LanSpreadState>();
let stream_install = state.pending_stream_installs.write().await.remove(&id);
let peer_ctrl = state.peer_ctrl.read().await.clone();
if let Some(peer_ctrl) = peer_ctrl
&& let Err(e) = if stream_install {
peer_ctrl.send(PeerCommand::StreamInstallGame { id })
} else {
peer_ctrl.send(PeerCommand::DownloadGameFiles {
&& let Err(e) = peer_ctrl.send(PeerCommand::DownloadGameFiles {
id,
file_descriptions,
})
}
{
log::error!("Failed to continue queued game transfer: {e}");
}
}
async fn clear_pending_stream_install(app_handle: &AppHandle, id: &str) {
let state = app_handle.state::<LanSpreadState>();
state.pending_stream_installs.write().await.remove(id);
}
fn handle_download_finished(app_handle: &AppHandle, id: String) {
log::info!("PeerEvent::DownloadGameFilesFinished received");
emit_game_id_event(