fix(peer): drain streamed install senders after completion

A streamed install sender kept the original frame sink alive outside the
producer task. After the producer sent Complete, or an Error for a provider
failure, the forwarding loop still had a live mpsc sender in scope and waited
forever for another frame.

Move the sink into the producer so the channel closes when the producer exits.
That lets the QUIC writer close, the request task return, and the outbound
TransferGuard drop after successful streamed installs and provider-side
failures.

The peer-cli harness now keeps the outbound-transfer map it passes into the
peer runtime and exposes per-game counts in status. S39 asserts that the source
has no active outbound transfer for cnctw after the streamed install finishes,
which catches the sender-side lifecycle leak that receiver-only assertions
missed. The peer-cli README and scenario table document that status field and
expectation.

Test Plan:
- just fmt
- just test
- just clippy
- git diff --check
- git diff --cached --check
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S39 S40 --build-image
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S41 S42 S43 S44 S45 S46 S47

Refs: NEXT_STEPS.md streamed install lifecycle hardening
This commit was merged in pull request #27.
This commit is contained in:
2026-06-11 08:31:12 +02:00
parent 66c7d5912b
commit c00e6eae84
6 changed files with 36 additions and 4 deletions
+1 -1
View File
@@ -46,7 +46,7 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path.
| S36 | Catalog singleton beats stale majority | Five peers advertise one game; one peer has the catalog version and four peers have stale versions. | `list-games` reports `peer_count=1` and the catalog `eti_game_version`; all descriptors and chunks come from the singleton catalog-version peer, while stale peers remain hidden and contribute zero bytes. |
| S37 | Single-source download throughput | A source peer advertises a temporary catalog game with one sparse `2 GiB` `.eti`; an empty client downloads it with `install=false`. | The client emits `download-finished` with throughput measurements (`bytes`, `duration_ms`, `mib_per_s`, `mbit_per_s`), and the downloaded archive size matches the source. |
| S38 | First-play launch-setting stamping | `fixture-persona/css` ships a real RAR `.eti` whose tree buries a CRLF `SmartSteamEmu.ini` with a stub `PersonaName` line under `engine/bin/win64/steam_settings/`, plus a stub `account_name.txt` and `language.txt` under `profiles/local/`. A peer installs `css` (with `--unrar`), then sends `play css` with a username and language, then `play css` again. | After install the marker `games/css/launch_settings_applied` is absent and the stub files are intact under `local/`. The first `play` returns `already_applied=false` with `account_name_written`, `language_written`, and `persona_name_written` all true; the deep `SmartSteamEmu.ini` `PersonaName` value becomes the username with its `\r\n` ending and sibling lines preserved, `account_name.txt` becomes the username, `language.txt` becomes the passed language, and the marker now exists. A second `play` returns `already_applied=true`, rewrites nothing, and leaves the files untouched even if their values were reset externally. |
| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256. |
| S39 | Streamed install without keeping archive payload | Empty client connects to `fixture-bravo`, then sends `stream-install cnctw`. The source has real RAR `.eti` payload entries under `bin/` and `data/`; the receiver uses the container-bundled `unrar` stream provider. | Client emits `download-begin`, streamed `download-chunk-finished`, `download-finished`, `install-begin`, and `install-finished`. Local `cnctw` is `downloaded=false`, `installed=true`, `availability=LocalOnly`; root `version.ini` and `.eti` are absent; `local/bin/cnctw-payload.bin` and `local/data/cnctw-assets.dat` match `unrar p` output by SHA-256; the source reports no active outbound transfer for `cnctw` after completion. |
| S40 | Streamed install receiver is not a peer source | After S39, a third peer connects only to the streamed-install receiver. | The third peer may see the receiver's local-only summary in peer snapshots, but `list-games` remote aggregation does not expose `cnctw` as downloadable, `peer_count` remains zero/absent, and attempting `download cnctw` fails with no local files created. |
| S41 | Solid archive streamed install | Empty client connects to a peer serving `fixture-solid/cnctw`, whose `.eti` is a real solid RAR archive. The receiver uses the container-bundled `unrar` stream provider. | The fixture is verified as solid with `unrar lt`; streamed install finishes with `downloaded=false`, `installed=true`, `availability=LocalOnly`; root archive and `version.ini` are absent; streamed byte count equals the extracted solid entries; local payload SHA-256 hashes match `unrar p` output. |
| S42 | Streamed install whole-stream retry | Empty client connects to two peers serving the same catalog-version `cnctw`: one broken source whose `--unrar` path is missing, followed by one good source. | The broken source sorts before the good source in retry order, contributes zero chunks, and the good source completes a fresh whole-stream attempt. The final state is local-only installed, no root archive/sentinel, no `.local.installing`, byte count matches the extracted entries, and payload hashes match the good source. |
+4
View File
@@ -42,3 +42,7 @@ echoed back on the result or error line.
{"id":"u1","cmd":"uninstall","game_id":"fixture-one"}
{"id":"q1","cmd":"shutdown"}
```
The `status` result includes receiver-side `active_operations` and
sender-side `active_outbound_transfers` counts by game ID, which the scenario
runner uses to verify transfer lifecycle cleanup.
@@ -1270,9 +1270,11 @@ class Runner:
f"streamed byte count mismatch: {streamed_bytes} != {expected_bytes}"
)
wait_no_outbound_transfer(source, "cnctw")
return (
"cnctw streamed into local/ only; root archive and version.ini absent; "
f"payload hashes={actual}"
f"payload hashes={actual}; source outbound transfer drained"
)
def s40_streamed_receiver_not_source(self) -> str:
@@ -1865,6 +1867,20 @@ def wait_no_active(peer: Peer, game_id: str, timeout: float = 20) -> None:
raise ScenarioError(f"{peer.name} still has active operation for {game_id}: {last_active}")
def wait_no_outbound_transfer(peer: Peer, game_id: str, timeout: float = 20) -> None:
deadline = time.monotonic() + timeout
last_active: dict[str, int] = {}
while time.monotonic() < deadline:
active = peer.status()["active_outbound_transfers"]
last_active = active
if active.get(game_id, 0) == 0:
return
time.sleep(0.4)
raise ScenarioError(
f"{peer.name} still has outbound transfer for {game_id}: {last_active}"
)
def assert_game_state(
game: dict[str, Any],
*,
+13 -1
View File
@@ -19,6 +19,7 @@ use lanspread_peer::{
ExternalUnrarStreamProvider,
InstallOperation,
NoopStreamInstallProvider,
OutboundTransfers,
PeerCommand,
PeerEvent,
PeerGameDB,
@@ -119,6 +120,7 @@ struct SharedState {
state: RwLock<CliState>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: OutboundTransfers,
notify: Notify,
games_dir: PathBuf,
state_dir: PathBuf,
@@ -137,6 +139,7 @@ async fn main() -> eyre::Result<()> {
let (tx_events, rx_events) = mpsc::unbounded_channel();
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let catalog = Arc::new(RwLock::new(catalog));
let active_outbound_transfers: OutboundTransfers = Arc::new(RwLock::new(HashMap::new()));
let unrar_for_streaming = args.unrar.clone().or_else(default_unrar_program);
let unpacker: Arc<dyn lanspread_peer::Unpacker> = match args.unrar.clone() {
Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)),
@@ -155,7 +158,7 @@ async fn main() -> eyre::Result<()> {
catalog.clone(),
PeerStartOptions {
state_dir: Some(args.state_dir.clone()),
active_outbound_transfers: None,
active_outbound_transfers: Some(active_outbound_transfers.clone()),
stream_install_provider: Some(stream_install_provider),
},
)?;
@@ -165,6 +168,7 @@ async fn main() -> eyre::Result<()> {
state: RwLock::new(CliState::default()),
peer_game_db,
catalog: catalog.clone(),
active_outbound_transfers,
notify: Notify::new(),
games_dir: args.games_dir.clone(),
state_dir: args.state_dir.clone(),
@@ -313,12 +317,20 @@ async fn handle_command(
async fn status(shared: &SharedState) -> eyre::Result<Value> {
let state = shared.state.read().await;
let peer_count = shared.peer_game_db.read().await.peer_snapshots().len();
let active_outbound_transfers = {
let active = shared.active_outbound_transfers.read().await;
active
.iter()
.map(|(game_id, transfers)| (game_id.clone(), transfers.len()))
.collect::<HashMap<_, _>>()
};
Ok(json!({
"local_peer": state.local_peer.clone(),
"peer_count": peer_count,
"local_games": state.local_games.len(),
"remote_games": state.remote_games.len(),
"active_operations": active_operations_json(&state.active_operations),
"active_outbound_transfers": active_outbound_transfers,
}))
}
+1
View File
@@ -80,6 +80,7 @@ use crate::{
state_paths::resolve_state_dir,
};
pub use crate::{
context::OutboundTransfers,
launch_settings::{LaunchSettingsOutcome, apply_launch_settings_once},
startup::PeerRuntimeHandle,
state_paths::{launch_settings_applied_path, setup_done_path},
@@ -518,7 +518,6 @@ pub(crate) async fn send_game_install_stream(
let producer = tokio::spawn({
let provider = provider.clone();
let producer_cancel = producer_cancel.clone();
let frame_sink = frame_sink.clone();
async move {
for archive in archives {
if producer_cancel.is_cancelled() {