test(peer-cli): expand streamed install edge coverage

NEXT_STEPS item 6 called for the remaining streamed-install edge cases to
be covered in the peer-cli matrix. Add S43-S47 for already-installed
rejection, corrupt archive rollback, sender disconnect, receiver cancel,
and sorted multi-archive streaming.

The receiver-cancel scenario needs the harness to drive the same runtime
path as the GUI, so `lanspread-peer-cli` now accepts a narrow
`cancel-download` command that forwards to `PeerCommand::CancelDownload`.
A parser test covers the new JSONL command shape.

Add `fixture-multi/cnctw`, a tiny two-archive RAR fixture. S47 uses it to
prove streamed installs process root `.eti` archives in sorted order and
commit only extracted `local/` payloads, not the root archives or
`version.ini` sentinel.

Test Plan:
- just fmt
- python3 -m py_compile crates/lanspread-peer-cli/scripts/run_extended_scenarios.py
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S43 S44 S45 S46 S47 --build-image
- just test
- just clippy
- git diff --check
- git diff --cached --check

Refs: NEXT_STEPS.md item 6
This commit is contained in:
2026-06-07 22:26:49 +02:00
parent 88bfaeb04a
commit 9288fda037
8 changed files with 286 additions and 10 deletions
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""Run the peer-cli scenarios S1-S42 through Docker."""
"""Run the peer-cli scenarios S1-S47 through Docker."""
from __future__ import annotations
@@ -331,6 +331,11 @@ class Runner:
("S40", self.s40_streamed_receiver_not_source),
("S41", self.s41_solid_archive_streamed_install),
("S42", self.s42_streamed_install_retries_next_source),
("S43", self.s43_streamed_install_rejects_installed_game),
("S44", self.s44_corrupt_stream_rolls_back),
("S45", self.s45_sender_disconnect_mid_stream),
("S46", self.s46_receiver_cancel_mid_stream),
("S47", self.s47_multi_archive_streams_in_sorted_order),
]
for scenario_id, scenario in scenarios:
@@ -1341,6 +1346,166 @@ class Runner:
f"good={good.ready_addr}, bad={bad.ready_addr}, bytes={streamed_bytes}"
)
def s43_streamed_install_rejects_installed_game(self) -> str:
_source, client = self.stream_install_cnctw("s43")
start = len(client.output)
waiter = LineWaiter(start)
client.send({"cmd": "stream-install", "game_id": "cnctw"})
client.wait_for(
event_is("download-failed", "cnctw"),
timeout=20,
description="already-installed stream rejection",
waiter=waiter,
)
assert_no_event_since(client, start, "install-finished", "cnctw")
assert_no_event_since(client, start, "download-finished", "cnctw")
wait_no_active(client, "cnctw")
game = wait_local_game(client, "cnctw", downloaded=False, installed=True)
assert_game_state(
game,
downloaded=False,
installed=True,
availability="LocalOnly",
)
return "already-installed cnctw rejected a second streamed install without state drift"
def s44_corrupt_stream_rolls_back(self) -> str:
source_dir = self.fixture_root / "s44-corrupt-source"
copy_game("cnctw", source_dir, version="20160128")
(source_dir / "cnctw" / "cnctw.eti").write_bytes(b"not a rar archive")
source = self.peer("s44-corrupt-source", games_dir=source_dir)
client = self.peer("s44-client")
connect_many(client, [source])
wait_remote_game(client, "cnctw", peer_count=1, version="20160128")
start = len(client.output)
waiter = LineWaiter(start)
client.send({"cmd": "stream-install", "game_id": "cnctw"})
client.wait_for(
event_is("download-failed", "cnctw"),
timeout=30,
description="corrupt stream failed",
waiter=waiter,
)
assert_no_event_since(client, start, "download-finished", "cnctw")
assert_no_event_since(client, start, "install-finished", "cnctw")
wait_no_active(client, "cnctw")
assert_failed_stream_left_no_local(client, "cnctw")
return "corrupt cnctw archive emitted download-failed and left no local install"
def s45_sender_disconnect_mid_stream(self) -> str:
source_dir = self.fixture_root / "s45-source"
copy_game("alienswarm", source_dir, version="20190317")
source = self.peer("s45-source", games_dir=source_dir)
client = self.peer("s45-client")
connect_many(client, [source])
wait_remote_game(client, "alienswarm", peer_count=1, version="20190317")
start = len(client.output)
waiter = LineWaiter(start)
client.send({"cmd": "stream-install", "game_id": "alienswarm"})
client.wait_for(
event_is("download-chunk-finished", "alienswarm"),
timeout=30,
description="first alienswarm stream chunk before source drop",
waiter=waiter,
)
source.kill()
terminal = client.wait_for(
event_name_in({"download-failed", "download-peers-gone"}, "alienswarm"),
timeout=60,
description="sender disconnect terminal event",
waiter=waiter,
)
assert_no_event_since(client, start, "download-finished", "alienswarm")
assert_no_event_since(client, start, "install-finished", "alienswarm")
wait_no_active(client, "alienswarm")
assert_failed_stream_left_no_local(client, "alienswarm")
return (
"sender disconnect after first alienswarm chunk rolled back stream; "
f"terminal={terminal['event']}"
)
def s46_receiver_cancel_mid_stream(self) -> str:
source_dir = self.fixture_root / "s46-source"
copy_game("alienswarm", source_dir, version="20190317")
source = self.peer("s46-source", games_dir=source_dir)
client = self.peer("s46-client")
connect_many(client, [source])
wait_remote_game(client, "alienswarm", peer_count=1, version="20190317")
start = len(client.output)
waiter = LineWaiter(start)
client.send({"cmd": "stream-install", "game_id": "alienswarm"})
client.wait_for(
event_is("download-chunk-finished", "alienswarm"),
timeout=30,
description="first alienswarm stream chunk before receiver cancel",
waiter=waiter,
)
client.send({"cmd": "cancel-download", "game_id": "alienswarm"})
wait_no_active(client, "alienswarm", timeout=60)
assert_no_event_since(client, start, "download-finished", "alienswarm")
assert_no_event_since(client, start, "download-failed", "alienswarm")
assert_no_event_since(client, start, "install-finished", "alienswarm")
assert_failed_stream_left_no_local(client, "alienswarm")
return "receiver cancel after first alienswarm chunk rolled back without failed event"
def s47_multi_archive_streams_in_sorted_order(self) -> str:
source_dir = self.fixture_root / "s47-source"
source_game = source_dir / "cnctw"
shutil.copytree(FIXTURES / "fixture-multi" / "cnctw", source_game)
source = self.peer("s47-source", games_dir=source_dir)
client = self.peer("s47-client")
connect_many(client, [source])
wait_remote_game(client, "cnctw", peer_count=1, version="20160128")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "stream-install", "game_id": "cnctw"})
client.wait_for(
event_is("download-finished", "cnctw"),
timeout=30,
description="multi-archive stream finish",
waiter=waiter,
)
client.wait_for(
event_is("install-finished", "cnctw"),
timeout=30,
description="multi-archive stream install",
waiter=waiter,
)
game = wait_local_game(client, "cnctw", downloaded=False, installed=True)
assert_game_state(
game,
downloaded=False,
installed=True,
availability="LocalOnly",
)
game_root = client.host_games_dir / "cnctw"
assert_not_exists(game_root / "version.ini")
assert_not_exists(game_root / "a-first.eti")
assert_not_exists(game_root / "z-second.eti")
chunk_paths = streamed_chunk_paths(client, "cnctw")
expected_paths = [
"cnctw/.local.installing/order/first.txt",
"cnctw/.local.installing/order/second.txt",
]
if chunk_paths != expected_paths:
raise ScenarioError(f"multi-archive stream order mismatch: {chunk_paths}")
first = (game_root / "local" / "order" / "first.txt").read_text(encoding="utf-8")
second = (game_root / "local" / "order" / "second.txt").read_text(encoding="utf-8")
if first != "first archive payload\n" or second != "second archive payload\n":
raise ScenarioError(f"multi-archive payload mismatch: {first!r}, {second!r}")
return f"multi-archive cnctw streamed in sorted order: {chunk_paths}"
def run(command: list[str], description: str) -> subprocess.CompletedProcess[str]:
result = subprocess.run(
@@ -1577,6 +1742,18 @@ def wait_local_game(
)
def wait_no_active(peer: Peer, game_id: str, timeout: float = 20) -> None:
deadline = time.monotonic() + timeout
last_active: list[dict[str, Any]] = []
while time.monotonic() < deadline:
active = peer.status()["active_operations"]
last_active = active
if all(item["game_id"] != game_id for item in active):
return
time.sleep(0.4)
raise ScenarioError(f"{peer.name} still has active operation for {game_id}: {last_active}")
def assert_game_state(
game: dict[str, Any],
*,
@@ -1623,7 +1800,10 @@ def wait_peer_has_game(
def assert_local_absent(peer: Peer, game_id: str) -> None:
rows = peer.list_games()["local"]
if any(row["id"] == game_id and row.get("downloaded") for row in rows):
if any(
row["id"] == game_id and (row.get("downloaded") or row.get("installed"))
for row in rows
):
raise ScenarioError(f"{peer.name} advertises failed local {game_id}: {rows}")
@@ -1639,6 +1819,15 @@ def assert_not_exists(path: Path) -> None:
raise ScenarioError(f"expected path to be absent: {path}")
def assert_failed_stream_left_no_local(peer: Peer, game_id: str) -> None:
game_root = peer.host_games_dir / game_id
assert_local_absent(peer, game_id)
assert_not_exists(game_root / "local")
assert_not_exists(game_root / ".local.installing")
assert_not_exists(game_root / "version.ini")
assert_not_exists(game_root / f"{game_id}.eti")
def event_is(event: str, game_id: str | None = None) -> Callable[[dict[str, Any]], bool]:
def predicate(item: dict[str, Any]) -> bool:
if item.get("type") != "event" or item.get("event") != event:
@@ -1650,6 +1839,17 @@ def event_is(event: str, game_id: str | None = None) -> Callable[[dict[str, Any]
return predicate
def event_name_in(events: set[str], game_id: str | None = None) -> Callable[[dict[str, Any]], bool]:
def predicate(item: dict[str, Any]) -> bool:
if item.get("type") != "event" or item.get("event") not in events:
return False
if game_id is None:
return True
return item.get("data", {}).get("game_id") == game_id
return predicate
def assert_no_event(peer: Peer, waiter: LineWaiter, event: str, game_id: str) -> None:
for item in peer.output[waiter.seen :]:
if item.get("type") == "event" and item.get("event") == event:
@@ -1657,6 +1857,13 @@ def assert_no_event(peer: Peer, waiter: LineWaiter, event: str, game_id: str) ->
raise ScenarioError(f"unexpected {event} for {game_id}: {item}")
def assert_no_event_since(peer: Peer, start: int, event: str, game_id: str) -> None:
for item in peer.output[start:]:
if item.get("type") == "event" and item.get("event") == event:
if item.get("data", {}).get("game_id") == game_id:
raise ScenarioError(f"unexpected {event} for {game_id}: {item}")
def assert_only_chunk_sources(
peer: Peer,
game_id: str,
@@ -1682,6 +1889,16 @@ def assert_only_chunk_sources(
raise ScenarioError(f"no chunk events recorded for {game_id}")
def streamed_chunk_paths(peer: Peer, game_id: str) -> list[str]:
return [
item["data"]["relative_path"]
for item in peer.output
if item.get("type") == "event"
and item.get("event") == "download-chunk-finished"
and item.get("data", {}).get("game_id") == game_id
]
def chunk_totals(peer: Peer, game_id: str, relative_path: str) -> dict[str, int]:
totals: dict[str, int] = {}
for item in peer.output: