#!/usr/bin/env python3 """Run the peer-cli scenarios S1-S47 through Docker.""" from __future__ import annotations import argparse import hashlib import ipaddress import json import os import queue import shlex import shutil import subprocess import sys import threading import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable REPO = Path(__file__).resolve().parents[3] RUN_ROOT = REPO / ".lanspread-peer-cli" / "extended-scenarios" IMAGE = "lanspread-peer-cli:dev" NETWORK = "lanspread" CONTAINER_PREFIX = "lanspread-peer-cli-ext" CATALOG_DB = "/app/game.db" FIXTURES = REPO / "crates" / "lanspread-peer-cli" / "fixtures" CHUNK_SIZE = 128 * 1024 * 1024 CATALOG_VERSIONS = { "alienswarm": "20190317", "bf1942": "20160130", "bfbc2": "20210416", "cnc4": "20170204", "cnctw": "20160128", "cod5": "20160920", "cod6": "20200315", "coh": "20200907", "css": "20240623", "ggoo": "20200721", } PERF_GAME_ID = "bf1942" PERF_GAME_VERSION = CATALOG_VERSIONS[PERF_GAME_ID] PERF_GAME_SIZE = 2 * 1024 * 1024 * 1024 IGNORED_DIFF_NAMES = {".lanspread", ".lanspread.json", "local"} class ScenarioError(RuntimeError): pass @dataclass class LineWaiter: seen: int = 0 @dataclass class Peer: runner: "Runner" name: str games_dir: Path | None = None readonly_games: bool = False tmpfs_size: str | None = None fixtures: list[str] = field(default_factory=list) extra_args: list[str] = field(default_factory=list) process: subprocess.Popen[str] | None = None output: list[dict[str, Any]] = field(default_factory=list) raw_output: list[str] = field(default_factory=list) events: queue.Queue[dict[str, Any]] = field(default_factory=queue.Queue) condition: threading.Condition = field(default_factory=threading.Condition) request_index: int = 0 ready_addr: str | None = None peer_id: str | None = None @property def container_name(self) -> str: return f"{CONTAINER_PREFIX}-{self.runner.run_id}-{self.name}" @property def host_games_dir(self) -> Path: if self.games_dir is not None: return self.games_dir return self.runner.games_root / self.name @property def host_state_dir(self) -> Path: return self.runner.state_root / self.name def start(self) -> "Peer": self.host_state_dir.mkdir(parents=True, exist_ok=True) if self.tmpfs_size is None: self.host_games_dir.mkdir(parents=True, exist_ok=True) command = [ "docker", "run", "--rm", "--init", "--network", NETWORK, "--name", self.container_name, "-i", "-v", f"{self.host_state_dir}:/state", ] if self.tmpfs_size is None: mode = "ro" if self.readonly_games else "rw" command.extend(["-v", f"{self.host_games_dir}:/games:{mode}"]) else: command.extend(["--tmpfs", f"/games:size={self.tmpfs_size}"]) command.extend( [ IMAGE, "--name", self.name, "--games-dir", "/games", "--state-dir", "/state", "--catalog-db", CATALOG_DB, ] ) for fixture in self.fixtures: command.extend(["--fixture", fixture]) command.extend(self.extra_args) self.runner.log(f"start {self.name}: {' '.join(command)}") self.process = subprocess.Popen( command, cwd=REPO, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) threading.Thread(target=self._read_output, daemon=True).start() self.wait_ready() return self def _read_output(self) -> None: assert self.process is not None assert self.process.stdout is not None for line in self.process.stdout: stripped = line.rstrip("\r\n") with self.condition: self.raw_output.append(stripped) try: parsed = json.loads(stripped) except json.JSONDecodeError: parsed = {"type": "raw", "line": stripped} self.output.append(parsed) self.condition.notify_all() self.events.put(parsed) def wait_ready(self) -> None: line = self.wait_for( lambda item: item.get("type") == "event" and item.get("event") == "local-peer-ready", timeout=20, description=f"{self.name} local-peer-ready", ) data = line["data"] self.ready_addr = data["addr"] self.peer_id = data["peer_id"] def send( self, payload: dict[str, Any], *, expect_error: bool = False, timeout: float = 20, ) -> dict[str, Any]: assert self.process is not None assert self.process.stdin is not None self.request_index += 1 request_id = f"{self.name}-{self.request_index}" payload = dict(payload) payload["id"] = request_id line = json.dumps(payload, separators=(",", ":")) self.process.stdin.write(line + "\n") self.process.stdin.flush() response = self.wait_for( lambda item: item.get("id") == request_id and item.get("type") in {"result", "error"}, timeout=timeout, description=f"{self.name} response to {payload.get('cmd')}", ) if response["type"] == "error": if expect_error: return response raise ScenarioError(f"{self.name} command failed: {response}") if expect_error: raise ScenarioError(f"{self.name} command unexpectedly succeeded: {response}") return response def wait_for( self, predicate: Callable[[dict[str, Any]], bool], *, timeout: float, description: str, waiter: LineWaiter | None = None, ) -> dict[str, Any]: deadline = time.monotonic() + timeout with self.condition: if waiter is None: start = 0 else: start = waiter.seen while True: for index in range(start, len(self.output)): item = self.output[index] if predicate(item): if waiter is not None: waiter.seen = index + 1 return item start = len(self.output) if waiter is not None: waiter.seen = start remaining = deadline - time.monotonic() if remaining <= 0: tail = "\n".join(self.raw_output[-20:]) raise ScenarioError( f"timed out waiting for {description} on {self.name}\n{tail}" ) self.condition.wait(remaining) def list_games(self) -> dict[str, Any]: return self.send({"cmd": "list-games"})["data"] def list_peers(self) -> list[dict[str, Any]]: return self.send({"cmd": "list-peers"})["data"]["peers"] def status(self) -> dict[str, Any]: return self.send({"cmd": "status"})["data"] def connect_to(self, other: "Peer") -> None: if other.ready_addr is None: raise ScenarioError(f"{other.name} is not ready") self.send({"cmd": "connect", "addr": other.ready_addr}) self.send({"cmd": "wait-peers", "count": 1, "timeout_ms": 10000}) def shutdown(self) -> None: if self.process is None: return if self.process.poll() is None: try: self.send({"cmd": "shutdown"}, timeout=8) except Exception: self.kill() try: self.process.wait(timeout=8) except subprocess.TimeoutExpired: self.kill() def kill(self) -> None: subprocess.run( ["docker", "rm", "-f", self.container_name], cwd=REPO, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) if self.process is not None: try: self.process.wait(timeout=5) except subprocess.TimeoutExpired: self.process.kill() def docker_exec(self, *args: str, check: bool = True) -> subprocess.CompletedProcess[str]: return subprocess.run( ["docker", "exec", self.container_name, *args], cwd=REPO, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=check, ) class Runner: def __init__(self, selected: set[str] | None = None, build_image: bool = False) -> None: self.selected = selected self.build_image = build_image self.run_id = str(int(time.time())) self.state_root = RUN_ROOT / "state" self.games_root = RUN_ROOT / "games" self.fixture_root = RUN_ROOT / "fixtures" self.current_peers: list[Peer] = [] self.results: list[tuple[str, str]] = [] def log(self, message: str) -> None: print(message, flush=True) def run(self) -> None: self.prepare() scenarios: list[tuple[str, Callable[[], str]]] = [ ("S1", self.s1_startup_scan), ("S2", self.s2_direct_connect_handshake), ("S3", self.s3_remote_aggregation), ("S4", self.s4_single_source_download), ("S5", self.s5_auto_install_download), ("S6", self.s6_manual_install_uninstall), ("S7", self.s7_duplicate_source_download), ("S8", self.s8_ambiguous_metadata_rejection), ("S9", self.s9_missing_game), ("S10", self.s10_shutdown_cleanup), ("S11", self.s11_same_identity_reconnect), ("S12", self.s12_transfer_serving_gates), ("S13", self.s13_exact_transfer_equality), ("S14", self.s14_large_multi_peer_chunking), ("S15", self.s15_three_way_version_skew), ("S16", self.s16_catalog_fanout_with_stale), ("S17", self.s17_catalog_conflict_rejection), ("S18", self.s18_redundant_source_drop), ("S19", self.s19_sole_source_drop), ("S20", self.s20_receiver_write_failure), ("S21", self.s21_add_game_propagation), ("S22", self.s22_remove_game_propagation), ("S23", self.s23_version_bump_propagation), ("S24", self.s24_two_clients_one_source), ("S25", self.s25_two_downloads_one_client), ("S26", self.s26_duplicate_download_rejection), ("S27", self.s27_self_connect_rejection), ("S28", self.s28_address_change_unit), ("S29", self.s29_empty_peer_participates), ("S30", self.s30_mesh_aggregation), ("S31", self.s31_bootstrapped_peer_source), ("S32", self.s32_reinstall_after_uninstall), ("S33", self.s33_install_after_mutation), ("S34", self.s34_many_small_files), ("S35", self.s35_unknown_game_filtered), ("S36", self.s36_catalog_singleton), ("S37", self.s37_single_source_download_throughput), ("S38", self.s38_first_play_launch_settings), ("S39", self.s39_streamed_install_local_only), ("S40", self.s40_streamed_receiver_not_source), ("S41", self.s41_solid_archive_streamed_install), ("S42", self.s42_streamed_install_retries_next_source), ("S43", self.s43_streamed_install_rejects_installed_game), ("S44", self.s44_corrupt_stream_rolls_back), ("S45", self.s45_sender_disconnect_mid_stream), ("S46", self.s46_receiver_cancel_mid_stream), ("S47", self.s47_multi_archive_streams_in_sorted_order), ] for scenario_id, scenario in scenarios: if self.selected and scenario_id.lower() not in self.selected: continue self.cleanup_containers() self.current_peers = [] try: self.log(f"\n== {scenario_id} ==") evidence = scenario() self.results.append((scenario_id, evidence)) self.log(f"{scenario_id} PASS: {evidence}") finally: self.stop_peers() if not self.results: raise ScenarioError("no scenarios selected") self.log("\nSummary:") for scenario_id, evidence in self.results: self.log(f"- {scenario_id}: {evidence}") def prepare(self) -> None: reset_run_root() self.state_root.mkdir(parents=True, exist_ok=True) self.games_root.mkdir(parents=True, exist_ok=True) self.fixture_root.mkdir(parents=True, exist_ok=True) self.cleanup_containers() if self.build_image: run(["just", "peer-cli-image"], "build peer-cli image") run(["just", "peer-cli-net"], "prepare peer-cli docker network") def cleanup_containers(self) -> None: result = subprocess.run( ["docker", "ps", "-a", "--format", "{{.Names}}"], cwd=REPO, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, ) names = [ name for name in result.stdout.splitlines() if name.startswith(f"{CONTAINER_PREFIX}-") ] if names: subprocess.run( ["docker", "rm", "-f", *names], cwd=REPO, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) def stop_peers(self) -> None: for peer in reversed(self.current_peers): peer.shutdown() self.cleanup_containers() def peer( self, name: str, *, games_dir: Path | None = None, readonly_games: bool = False, tmpfs_size: str | None = None, fixtures: list[str] | None = None, extra_args: list[str] | None = None, ) -> Peer: peer = Peer( runner=self, name=name, games_dir=games_dir, readonly_games=readonly_games, tmpfs_size=tmpfs_size, fixtures=fixtures or [], extra_args=extra_args or [], ).start() self.current_peers.append(peer) return peer def s1_startup_scan(self) -> str: alpha = self.peer("s1-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) games = alpha.list_games()["local"] expected = {"alienswarm", "bf1942", "ggoo"} seen = {game["id"] for game in games} if not expected.issubset(seen): raise ScenarioError(f"fixture-alpha games missing: expected {expected}, saw {seen}") for game in games: if game["id"] in expected: assert_game_state(game, downloaded=True, installed=False, availability="Ready") return "fixture-alpha emitted ready local games alienswarm, bf1942, and ggoo" def s2_direct_connect_handshake(self) -> str: alpha = self.peer("s2-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) bravo = self.peer("s2-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) connect_many(alpha, [bravo]) # Poll for library convergence rather than reading once: wait-peers only # guarantees peer presence, and the mDNS path can upsert a peer with an # empty library before its snapshot arrives. wait_peer_has_game(alpha, bravo.peer_id, "bfbc2") peers = alpha.list_peers() if len(peers) != 1 or peers[0]["peer_id"] == alpha.peer_id: raise ScenarioError(f"bad alpha peers after connect: {peers}") if peers[0]["game_count"] != 4: raise ScenarioError(f"expected bravo game_count=4, got {peers}") # The handshake is bidirectional: bravo must also record alpha's library # without a separate connect, proving both directions of the exchange. bravo_view = wait_peer_has_game(bravo, alpha.peer_id, "alienswarm") if bravo_view["game_count"] != 3: raise ScenarioError(f"expected alpha game_count=3 on bravo, got {bravo_view}") return "alpha<->bravo exchanged libraries: bravo had 4 games on alpha, alpha had 3 on bravo" def s3_remote_aggregation(self) -> str: alpha = self.peer("s3-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) bravo = self.peer("s3-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) client = self.peer("s3-client") connect_many(client, [alpha, bravo]) expected_counts = { "ggoo": 2, "alienswarm": 1, "bf1942": 1, "bfbc2": 1, "cnc4": 1, "cnctw": 1, } for game_id, peer_count in expected_counts.items(): wait_remote_game(client, game_id, peer_count=peer_count) return "empty client aggregated alpha/bravo with ggoo peer_count=2 and unique games peer_count=1" def s4_single_source_download(self) -> str: bravo = self.peer("s4-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) client = self.peer("s4-client") connect_many(client, [bravo]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "bfbc2", "install": False}) client.wait_for(event_is("got-game-files", "bfbc2"), timeout=20, description="got bfbc2", waiter=waiter) client.wait_for(event_is("download-begin", "bfbc2"), timeout=20, description="begin bfbc2", waiter=waiter) client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="finish bfbc2", waiter=waiter) game = wait_local_game(client, "bfbc2", downloaded=True, installed=False) diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2") if (client.host_games_dir / "bfbc2" / "local").exists(): raise ScenarioError("bfbc2 local/ exists after install=false") return f"bfbc2 downloaded with install=false, local state installed={game['installed']}, diff matched" def s5_auto_install_download(self) -> str: bravo = self.peer("s5-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) client = self.peer("s5-client") connect_many(client, [bravo]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "cnctw"}) client.wait_for(event_is("download-finished", "cnctw"), timeout=60, description="finish cnctw", waiter=waiter) client.wait_for(event_is("install-finished", "cnctw"), timeout=30, description="install cnctw", waiter=waiter) wait_local_game(client, "cnctw", downloaded=True, installed=True) if not (client.host_games_dir / "cnctw" / "local" / "fixture-payload.txt").is_file(): raise ScenarioError("cnctw install payload missing") diff_game_dirs(FIXTURES / "fixture-bravo" / "cnctw", client.host_games_dir / "cnctw") return "cnctw auto-installed, local fixture payload existed, root diff matched excluding local metadata" def s6_manual_install_uninstall(self) -> str: bravo = self.peer("s6-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) client = self.peer("s6-client") connect_many(client, [bravo]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "bfbc2", "install": False}) client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="finish bfbc2", waiter=waiter) client.send({"cmd": "install", "game_id": "bfbc2"}) client.wait_for(event_is("install-finished", "bfbc2"), timeout=30, description="install bfbc2", waiter=waiter) wait_local_game(client, "bfbc2", downloaded=True, installed=True) client.send({"cmd": "uninstall", "game_id": "bfbc2"}) client.wait_for(event_is("uninstall-finished", "bfbc2"), timeout=30, description="uninstall bfbc2", waiter=waiter) wait_local_game(client, "bfbc2", downloaded=True, installed=False) if (client.host_games_dir / "bfbc2" / "local").exists(): raise ScenarioError("bfbc2 local/ remained after uninstall") diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2") return "manual install/uninstall toggled installed state, removed local/, preserved downloaded root" def s7_duplicate_source_download(self) -> str: alpha = self.peer("s7-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) bravo = self.peer("s7-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) client = self.peer("s7-client") connect_many(client, [alpha, bravo]) wait_remote_game(client, "ggoo", peer_count=2) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "ggoo", "install": False}) client.wait_for(event_is("download-finished", "ggoo"), timeout=60, description="finish ggoo", waiter=waiter) diff_game_dirs(FIXTURES / "fixture-alpha" / "ggoo", client.host_games_dir / "ggoo") # The fixtures are byte-identical, so the diff alone is source-agnostic. # Prove the duplicate-source path: download committed exactly once, every # chunk came from the validated 2-peer set, both peers actually served, # and nothing was fetched twice. if count_events(client, "download-finished", "ggoo") != 1: raise ScenarioError("ggoo did not finish exactly once") assert_only_chunk_sources(client, "ggoo", {alpha.ready_addr, bravo.ready_addr}) if chunk_sources(client, "ggoo") != {alpha.ready_addr, bravo.ready_addr}: raise ScenarioError( f"expected both validated sources to serve ggoo, got {chunk_sources(client, 'ggoo')}" ) assert_no_duplicate_chunks(client, "ggoo") return "ggoo downloaded once, served from both validated sources with no duplicate chunks" def s8_ambiguous_metadata_rejection(self) -> str: dir_a = self.fixture_root / "s8-a" dir_b = self.fixture_root / "s8-b" copy_game("ggoo", dir_a) copy_game("ggoo", dir_b) with (dir_b / "ggoo" / "ggoo.eti").open("ab") as handle: handle.write(b"conflict") peer_a = self.peer("s8-a", games_dir=dir_a) peer_b = self.peer("s8-b", games_dir=dir_b) client = self.peer("s8-client") connect_many(client, [peer_a, peer_b]) wait_remote_game(client, "ggoo", peer_count=2, version=CATALOG_VERSIONS["ggoo"]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "ggoo", "install": False}) client.wait_for(event_is("download-failed", "ggoo"), timeout=30, description="ggoo failed", waiter=waiter) assert_not_exists(client.host_games_dir / "ggoo" / "version.ini") return "conflicting catalog-version ggoo file sizes emitted download-failed and left no version.ini" def s9_missing_game(self) -> str: client = self.peer("s9-client") err = client.send({"cmd": "download", "game_id": "cod2", "install": False}, expect_error=True) if "no peers have game cod2" not in err["error"]: raise ScenarioError(f"unexpected missing game error: {err}") assert_not_exists(client.host_games_dir / "cod2") return f"missing game command errored '{err['error']}' and created no local directory" def s10_shutdown_cleanup(self) -> str: alpha = self.peer("s10-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) bravo = self.peer("s10-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) connect_many(alpha, [bravo]) wait_remote_game(alpha, "bfbc2", peer_count=1) bravo.shutdown() wait_remote_absent(alpha, "bfbc2") if alpha.list_peers(): raise ScenarioError(f"alpha still has peers after bravo shutdown: {alpha.list_peers()}") return "bravo graceful shutdown removed peer and bravo-only games from alpha" def s11_same_identity_reconnect(self) -> str: alpha = self.peer("s11-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) bravo_dir = FIXTURES / "fixture-bravo" bravo = self.peer("s11-bravo", games_dir=bravo_dir, readonly_games=True) connect_many(alpha, [bravo]) first_peer = alpha.list_peers()[0] first_addr = first_peer["addr"] first_id = first_peer["peer_id"] bravo.shutdown() wait_remote_absent(alpha, "bfbc2") bravo = self.peer("s11-bravo", games_dir=bravo_dir, readonly_games=True) connect_many(alpha, [bravo]) peers = alpha.list_peers() # The real invariant is a single peer entry reusing the same identity (no # duplicate). The listener address is an OS-assigned ephemeral port: it # almost always differs across restarts, but asserting it MUST change # tests the kernel's port allocator, not the peer, and can fail spuriously # if the same port is reused. So we only require the identity to be stable. if len(peers) != 1: raise ScenarioError(f"expected one bravo peer after reconnect, got {peers}") if peers[0]["peer_id"] != first_id: raise ScenarioError(f"bravo peer id changed: {first_id} -> {peers[0]['peer_id']}") changed = "new" if peers[0]["addr"] != first_addr else "same" return f"bravo reused peer id {first_id} as a single entry at {changed} address {peers[0]['addr']}" def s12_transfer_serving_gates(self) -> str: output = run_just_test() required = [ "local_download_available_gates_on_catalog_operation_and_sentinel", "get_game_response_respects_serve_gates", "file_transfer_dispatch_respects_serve_gates", "local_relative_paths_are_never_transferable", ] missing = [name for name in required if f"{name} ... ok" not in output] if missing: raise ScenarioError(f"S12 unit proof tests did not run-and-pass: {missing}") return "just test passed including catalog/sentinel/active/local-path serve gate tests" def s13_exact_transfer_equality(self) -> str: bravo = self.peer("s13-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) alpha = self.peer("s13-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) client = self.peer("s13-client") connect_many(client, [bravo, alpha]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "bfbc2", "install": False}) client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="bfbc2 finish", waiter=waiter) client.send({"cmd": "download", "game_id": "alienswarm", "install": False}) client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="alienswarm finish", waiter=waiter) diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2") diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", client.host_games_dir / "alienswarm") return "small bfbc2 and large alienswarm transfers both diffed cleanly against sources" def s14_large_multi_peer_chunking(self) -> str: game_id = PERF_GAME_ID source_dir = self.fixture_root / "s14-alpha" # Four 128 MiB chunks so the balance assertion is meaningful: with two # peers a fair split is 2+2 chunks (diff 0) and a 3+1 imbalance would # exceed one CHUNK_SIZE. A 2-chunk file could never trip the check. file_size = CHUNK_SIZE * 4 create_large_sparse_game(source_dir / game_id, size=file_size) alpha = self.peer("s14-alpha", games_dir=source_dir) stage = self.peer("s14-stage") connect_many(stage, [alpha]) waiter = LineWaiter(len(stage.output)) stage.send({"cmd": "download", "game_id": game_id, "install": False}) stage.wait_for(event_is("download-finished", game_id), timeout=180, description="stage finish", waiter=waiter) diff_game_dirs(source_dir / game_id, stage.host_games_dir / game_id) client = self.peer("s14-client") connect_many(client, [alpha, stage]) wait_remote_game(client, game_id, peer_count=2, version=PERF_GAME_VERSION) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": game_id, "install": False}) client.wait_for(event_is("download-finished", game_id), timeout=180, description="client finish", waiter=waiter) diff_game_dirs(source_dir / game_id, client.host_games_dir / game_id) totals = chunk_totals(client, game_id, f"{game_id}/{game_id}.eti") if len(totals) != 2: raise ScenarioError(f"expected .eti chunks from exactly two peers, got {totals}") if sum(totals.values()) != file_size: raise ScenarioError(f"chunk bytes {sum(totals.values())} != file size {file_size}: {totals}") values = list(totals.values()) if max(values) - min(values) > CHUNK_SIZE: raise ScenarioError(f"chunk totals not balanced within one chunk: {totals}") return f"{game_id} ({file_size // (1024 * 1024)} MiB) split across two sources, balanced, diff matched: {totals}" def s15_three_way_version_skew(self) -> str: specs = [ ("s15-a", "20150101"), ("s15-b", "20160101"), ("s15-c", CATALOG_VERSIONS["cnc4"]), ] peers = [] for name, version in specs: game_dir = self.fixture_root / name copy_game("cnc4", game_dir, version=version) peers.append(self.peer(name, games_dir=game_dir)) client = self.peer("s15-client") connect_many(client, peers) wait_remote_game(client, "cnc4", peer_count=1, version=CATALOG_VERSIONS["cnc4"]) # Cross-check the RAW advertised versions via list-peers (not the # catalog-synthesized list-games field), proving the three peers really # differ and that only the catalog-version peer is aggregated. for peer, expected_version in zip(peers, ["20150101", "20160101", CATALOG_VERSIONS["cnc4"]]): advertised = peer_advertised_version(client, peer.peer_id, "cnc4") if advertised != expected_version: raise ScenarioError( f"{peer.name} advertised cnc4 version {advertised}, expected {expected_version}" ) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "cnc4", "install": False}) client.wait_for(event_is("download-finished", "cnc4"), timeout=60, description="cnc4 finish", waiter=waiter) assert_only_chunk_sources(client, "cnc4", {peers[2].ready_addr}) diff_game_dirs(peers[2].host_games_dir / "cnc4", client.host_games_dir / "cnc4") return "three-way skew exposed only the catalog-version peer and receiver diffed cleanly" def s16_catalog_fanout_with_stale(self) -> str: specs = [ ("s16-a", "20180101"), ("s16-b", CATALOG_VERSIONS["alienswarm"]), ("s16-c", CATALOG_VERSIONS["alienswarm"]), ] peers = [] for name, version in specs: game_dir = self.fixture_root / name copy_game("alienswarm", game_dir, version=version) # Two 128 MiB chunks so the .eti can actually fan out across the two # catalog-version peers; the stock 120 MiB fixture is a single chunk # that can only ever come from one source. inflate_archive_sparse(game_dir / "alienswarm", "alienswarm", CHUNK_SIZE * 2) peers.append(self.peer(name, games_dir=game_dir)) client = self.peer("s16-client") connect_many(client, peers) wait_remote_game(client, "alienswarm", peer_count=2, version=CATALOG_VERSIONS["alienswarm"]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "alienswarm", "install": False}) client.wait_for(event_is("download-finished", "alienswarm"), timeout=180, description="alienswarm finish", waiter=waiter) assert_only_chunk_sources(client, "alienswarm", {peers[1].ready_addr, peers[2].ready_addr}) totals = chunk_totals(client, "alienswarm", "alienswarm/alienswarm.eti") if peers[0].ready_addr in totals: raise ScenarioError(f"stale peer contributed chunks: {totals}") if set(totals) != {peers[1].ready_addr, peers[2].ready_addr}: raise ScenarioError(f"expected .eti to fan out across both B and C, got {totals}") diff_game_dirs(peers[1].host_games_dir / "alienswarm", client.host_games_dir / "alienswarm") return f"catalog-version B/C peers split alienswarm.eti while stale A contributed zero; totals={totals}" def s17_catalog_conflict_rejection(self) -> str: specs = [ ("s17-a", "20150101", False), ("s17-b", CATALOG_VERSIONS["cnc4"], False), ("s17-c", CATALOG_VERSIONS["cnc4"], True), ] peers = [] for name, version, conflict in specs: game_dir = self.fixture_root / name copy_game("cnc4", game_dir, version=version) if conflict: with (game_dir / "cnc4" / "cnc4.eti").open("ab") as handle: handle.write(b"conflict") peers.append(self.peer(name, games_dir=game_dir)) client = self.peer("s17-client") connect_many(client, peers) wait_remote_game(client, "cnc4", peer_count=2, version=CATALOG_VERSIONS["cnc4"]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "cnc4", "install": False}) client.wait_for(event_is("download-failed", "cnc4"), timeout=30, description="cnc4 failed", waiter=waiter) assert_not_exists(client.host_games_dir / "cnc4" / "version.ini") return "catalog-version file conflict failed download and left no committed version.ini" def s18_redundant_source_drop(self) -> str: game_id = "bf1942" source_a_dir = self.fixture_root / "s18-a" source_b_dir = self.fixture_root / "s18-b" # Multi-chunk sparse archive so BOTH peers are assigned .eti chunks; a # single-chunk file could be served entirely by one peer, so killing the # other would prove nothing. We verify the download SURVIVES a mid-download # source kill (every byte still arrives, no download-failed, diff matches). # Retry-onto-survivor is the mechanism that makes this work and is # exercised whenever the kill interrupts an unfinished chunk, but the race # against `docker rm -f` means we cannot deterministically force it, so we # do not assert it. Sparse zero bytes are identical, so duplicate-source # majority validation still agrees. file_size = CHUNK_SIZE * 4 create_large_sparse_game(source_a_dir / game_id, size=file_size) create_large_sparse_game(source_b_dir / game_id, size=file_size) source_a = self.peer("s18-a", games_dir=source_a_dir) source_b = self.peer("s18-b", games_dir=source_b_dir) client = self.peer("s18-client") connect_many(client, [source_a, source_b]) wait_remote_game(client, game_id, peer_count=2) start = len(client.output) waiter = LineWaiter(start) client.send({"cmd": "download", "game_id": game_id, "install": False}) client.wait_for(event_is("download-begin", game_id), timeout=20, description="download begin", waiter=waiter) source_a.kill() client.wait_for(event_is("download-finished", game_id), timeout=180, description="download finish", waiter=waiter) # Scan the WHOLE download window (start..) for download-failed. The old # assert_no_event reused a waiter already advanced past download-finished, # so it only saw the empty tail and could never fire. assert_no_event_since(client, start, "download-failed", game_id) diff_game_dirs(source_b_dir / game_id, client.host_games_dir / game_id) totals = chunk_totals(client, game_id, f"{game_id}/{game_id}.eti") if source_b.ready_addr not in totals: raise ScenarioError(f"surviving source served no .eti chunks: {totals}") if sum(totals.values()) != file_size: raise ScenarioError(f"download did not deliver the whole archive ({sum(totals.values())} != {file_size}): {totals}") # We deliberately do NOT assert the exact per-source split. The killed # source can serve a chunk or two before `docker rm -f` lands (a fast-LAN # race), so requiring totals == {survivor: file_size} would be flaky. The # robust proof of redundancy is that a source died mid-download yet every # byte still arrived (sum == file_size plus the diff), the survivor served # part of it, and no download-failed was emitted. survivor_bytes = totals[source_b.ready_addr] return ( f"source killed after begin; all {file_size} bytes delivered " f"({survivor_bytes} from the survivor), no download-failed; diff matched; bytes={totals}" ) def s19_sole_source_drop(self) -> str: game_id = "bf1942" source_dir = self.fixture_root / "s19-source" # Multi-chunk sparse archive force-killed right after download-begin. A # 120 MiB single-chunk file served by a graceful shutdown could finish # (~0.15s at LAN speed) before the drop landed, flipping the expected # failure into a download-finished. With four 128 MiB chunks and a # forceful kill issued right after download-begin, the source dies with # the bulk of the transfer still outstanding (an individual chunk may # complete first, but the full download cannot), so a terminal failure is # deterministic. create_large_sparse_game(source_dir / game_id, size=CHUNK_SIZE * 4) source = self.peer("s19-source", games_dir=source_dir) client = self.peer("s19-client") connect_many(client, [source]) wait_remote_game(client, game_id, peer_count=1) start = len(client.output) waiter = LineWaiter(start) client.send({"cmd": "download", "game_id": game_id, "install": False}) client.wait_for(event_is("download-begin", game_id), timeout=20, description="download begin", waiter=waiter) # Forceful kill (no Goodbye) drops the connection mid-transfer. source.kill() terminal = client.wait_for( event_name_in({"download-failed", "download-peers-gone"}, game_id), timeout=120, description="sole-source drop terminal failure", waiter=waiter, ) assert_no_event_since(client, start, "download-finished", game_id) assert_not_exists(client.host_games_dir / game_id / "version.ini") assert_no_active(client, game_id) assert_local_absent(client, game_id) return f"sole-source forceful drop mid-transfer -> {terminal['event']}; version.ini absent; no ready row; no active op" def s20_receiver_write_failure(self) -> str: source_dir = self.fixture_root / "s20-source" copy_game("alienswarm", source_dir) source = self.peer("s20-source", games_dir=source_dir) client = self.peer("s20-client", tmpfs_size="32m") connect_many(client, [source]) wait_remote_game(client, "alienswarm", peer_count=1) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "alienswarm", "install": False}) client.wait_for(event_is("download-failed", "alienswarm"), timeout=90, description="download failed", waiter=waiter) client.docker_exec("test", "!", "-e", "/games/alienswarm/version.ini") assert_no_active(client, "alienswarm") return "32m tmpfs receiver emitted download-failed; /games/alienswarm/version.ini absent; active operations empty" def s21_add_game_propagation(self) -> str: alpha = self.peer("s21-alpha") bravo_dir = self.fixture_root / "s21-bravo" bravo_dir.mkdir(parents=True, exist_ok=True) bravo = self.peer("s21-bravo", games_dir=bravo_dir) connect_many(alpha, [bravo]) assert len(alpha.list_peers()) == 1 stage_game_drop(bravo_dir, "cod5") game = wait_remote_game(alpha, "cod5", peer_count=1) return f"alpha saw {game['id']} from the existing bravo peer with peer_count={game['peer_count']}" def s22_remove_game_propagation(self) -> str: alpha = self.peer("s22-alpha") bravo_dir = self.fixture_root / "s22-bravo" copy_game("cod5", bravo_dir) bravo = self.peer("s22-bravo", games_dir=bravo_dir) connect_many(alpha, [bravo]) wait_remote_game(alpha, "cod5", peer_count=1) shutil.rmtree(bravo_dir / "cod5") wait_remote_absent(alpha, "cod5") peers = alpha.list_peers() if len(peers) != 1: raise ScenarioError(f"expected bravo peer to remain, got {peers}") return "alpha removed cod5 while keeping one bravo peer" def s23_version_bump_propagation(self) -> str: alpha = self.peer("s23-alpha") bravo_dir = self.fixture_root / "s23-bravo" copy_game("cnc4", bravo_dir, version="20160101") bravo = self.peer("s23-bravo", games_dir=bravo_dir) connect_many(alpha, [bravo]) wait_remote_absent(alpha, "cnc4", timeout=5) (bravo_dir / "cnc4" / "version.ini").write_text(CATALOG_VERSIONS["cnc4"], encoding="utf-8") wait_remote_game(alpha, "cnc4", peer_count=1, version=CATALOG_VERSIONS["cnc4"]) return "alpha observed stale cnc4 become catalog-version downloadable without reconnect" def s24_two_clients_one_source(self) -> str: source = self.peer("s24-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) c1 = self.peer("s24-client-a") c2 = self.peer("s24-client-b") connect_many(c1, [source]) connect_many(c2, [source]) waiter1 = LineWaiter(len(c1.output)) waiter2 = LineWaiter(len(c2.output)) c1.send({"cmd": "download", "game_id": "alienswarm", "install": False}) c2.send({"cmd": "download", "game_id": "alienswarm", "install": False}) c1.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="client-a finish", waiter=waiter1) c2.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="client-b finish", waiter=waiter2) diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", c1.host_games_dir / "alienswarm") diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", c2.host_games_dir / "alienswarm") wait_local_game(c1, "alienswarm", downloaded=True, installed=False) wait_local_game(c2, "alienswarm", downloaded=True, installed=False) # Responsiveness: the source still answers and still advertises its games. if not any(g["id"] == "alienswarm" for g in source.list_games()["local"]): raise ScenarioError("source no longer advertises alienswarm after serving two clients") return "two concurrent clients finished alienswarm install=false; both diffs matched; source still responsive" def s25_two_downloads_one_client(self) -> str: source = self.peer("s25-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) client = self.peer("s25-client") connect_many(client, [source]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "bfbc2", "install": False}) client.send({"cmd": "download", "game_id": "cnctw", "install": False}) client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="bfbc2 finish", waiter=waiter) client.wait_for(event_is("download-finished", "cnctw"), timeout=60, description="cnctw finish", waiter=waiter) diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2") diff_game_dirs(FIXTURES / "fixture-bravo" / "cnctw", client.host_games_dir / "cnctw") wait_local_game(client, "bfbc2", downloaded=True, installed=False) wait_local_game(client, "cnctw", downloaded=True, installed=False) return "bfbc2 and cnctw concurrent downloads both finished install=false and diffed cleanly" def s26_duplicate_download_rejection(self) -> str: game_id = "bf1942" source_dir = self.fixture_root / "s26-source" # A large sparse archive keeps the first download active long enough that # the duplicate request is guaranteed to race against an in-progress # operation rather than a near-instant 3 MB transfer that may already be # finished by the time the second command is read. create_large_sparse_game(source_dir / game_id, size=CHUNK_SIZE * 2) source = self.peer("s26-source", games_dir=source_dir) client = self.peer("s26-client") connect_many(client, [source]) wait_remote_game(client, game_id, peer_count=1) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": game_id, "install": False}) active = client.wait_for( lambda item: item.get("type") == "event" and item.get("event") == "active-operations-changed" and any( op.get("game_id") == game_id for op in item.get("data", {}).get("active_operations", []) ), timeout=20, description="active operation", waiter=waiter, ) # Verify the operation kind, not just its presence (the only scenario that # inspects the active_operations 'operation' field). op = next(o for o in active["data"]["active_operations"] if o.get("game_id") == game_id) if op.get("operation") != "Downloading": raise ScenarioError(f"expected Downloading active operation, got {op}") err = client.send( {"cmd": "download", "game_id": game_id, "install": False}, expect_error=True, ) if "operation already in progress" not in err["error"]: raise ScenarioError(f"unexpected duplicate error: {err}") client.wait_for(event_is("download-finished", game_id), timeout=180, description="first download finish", waiter=waiter) diff_game_dirs(source_dir / game_id, client.host_games_dir / game_id) return f"duplicate rejected while Downloading active ('{err['error']}'); first download diff matched" def s27_self_connect_rejection(self) -> str: alpha = self.peer("s27-alpha") err = alpha.send({"cmd": "connect", "addr": alpha.ready_addr}, expect_error=True) if "cannot connect peer to itself" not in err["error"]: raise ScenarioError(f"unexpected self-connect error: {err}") peers = alpha.list_peers() if peers: raise ScenarioError(f"self-connect created peers: {peers}") alpha.status() return f"self-connect errored '{err['error']}'; peer list stayed empty" def s28_address_change_unit(self) -> str: output = run_just_test() test_name = "peer_db::tests::address_update_preserves_peer_identity_and_library" if f"{test_name} ... ok" not in output: raise ScenarioError(f"S28 unit proof did not run-and-pass in just test output:\n{output}") return "`just test` passed including address_update_preserves_peer_identity_and_library" def s29_empty_peer_participates(self) -> str: source = self.peer("s29-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) empty = self.peer("s29-empty") observer = self.peer("s29-observer") connect_many(observer, [empty]) peers = observer.list_peers() if len(peers) != 1 or peers[0]["game_count"] != 0: raise ScenarioError(f"expected empty peer with zero games, got {peers}") connect_many(empty, [source]) wait_remote_game(empty, "alienswarm", peer_count=1) waiter = LineWaiter(len(empty.output)) empty.send({"cmd": "download", "game_id": "alienswarm", "install": False}) empty.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="empty download finish", waiter=waiter) diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", empty.host_games_dir / "alienswarm") wait_peer_has_game(observer, empty.peer_id, "alienswarm") return "observer saw zero-game peer; empty downloaded alienswarm, diff matched, then observer's snapshot for that peer contained alienswarm" def s30_mesh_aggregation(self) -> str: dirs = [] specs = [ ("s30-a", [("ggoo", CATALOG_VERSIONS["ggoo"]), ("bf1942", CATALOG_VERSIONS["bf1942"])]), ("s30-b", [("ggoo", CATALOG_VERSIONS["ggoo"]), ("cnc4", CATALOG_VERSIONS["cnc4"])]), ("s30-c", [("cnc4", CATALOG_VERSIONS["cnc4"]), ("cod5", CATALOG_VERSIONS["cod5"])]), ("s30-d", [("cnctw", CATALOG_VERSIONS["cnctw"]), ("coh", CATALOG_VERSIONS["coh"])]), ("s30-e", [("cnctw", CATALOG_VERSIONS["cnctw"]), ("bf1942", CATALOG_VERSIONS["bf1942"])]), ] peers = [] for name, games in specs: game_dir = self.fixture_root / name for game_id, version in games: copy_game(game_id, game_dir, version=version) dirs.append(game_dir) peers.append(self.peer(name, games_dir=game_dir)) client = self.peer("s30-client") connect_many(client, peers) expected = { "ggoo": (2, CATALOG_VERSIONS["ggoo"]), "bf1942": (2, CATALOG_VERSIONS["bf1942"]), "cnc4": (2, CATALOG_VERSIONS["cnc4"]), "cod5": (1, CATALOG_VERSIONS["cod5"]), "cnctw": (2, CATALOG_VERSIONS["cnctw"]), "coh": (1, CATALOG_VERSIONS["coh"]), } for game_id, (peer_count, version) in expected.items(): wait_remote_game(client, game_id, peer_count=peer_count, version=version) game_rows = client.list_games()["remote"] ids = [game["id"] for game in game_rows] if len(ids) != len(set(ids)): raise ScenarioError(f"duplicate game rows: {ids}") if any(peer["peer_id"] == client.peer_id for peer in client.list_peers()): raise ScenarioError("client listed itself as a peer") return f"client aggregated {len(expected)} IDs from 5 peers with expected peer_count/catalog versions" def s31_bootstrapped_peer_source(self) -> str: source = self.peer("s31-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True) bootstrap = self.peer("s31-bootstrap") connect_many(bootstrap, [source]) waiter = LineWaiter(len(bootstrap.output)) bootstrap.send({"cmd": "download", "game_id": "alienswarm", "install": False}) bootstrap.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="bootstrap finish", waiter=waiter) diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", bootstrap.host_games_dir / "alienswarm") source.kill() third = self.peer("s31-third") connect_many(third, [bootstrap]) waiter = LineWaiter(len(third.output)) third.send({"cmd": "download", "game_id": "alienswarm", "install": False}) third.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="third finish", waiter=waiter) diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", third.host_games_dir / "alienswarm") return "third peer downloaded from bootstrapped client after original source kill; diff matched original" def s32_reinstall_after_uninstall(self) -> str: source = self.peer("s32-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) client = self.peer("s32-client") connect_many(client, [source]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "bfbc2", "install": False}) client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="download finish", waiter=waiter) client.send({"cmd": "install", "game_id": "bfbc2"}) client.wait_for(event_is("install-finished", "bfbc2"), timeout=30, description="install finish", waiter=waiter) client.send({"cmd": "uninstall", "game_id": "bfbc2"}) client.wait_for(event_is("uninstall-finished", "bfbc2"), timeout=30, description="uninstall finish", waiter=waiter) before = len(client.output) client.send({"cmd": "install", "game_id": "bfbc2"}) reinstall_waiter = LineWaiter(before) client.wait_for(event_is("install-finished", "bfbc2"), timeout=30, description="reinstall finish", waiter=reinstall_waiter) recent = client.output[before:] if any(item.get("event") == "download-chunk-finished" for item in recent): raise ScenarioError("reinstall produced transfer chunk events") wait_local_game(client, "bfbc2", downloaded=True, installed=True) if not (client.host_games_dir / "bfbc2" / "local").is_dir(): raise ScenarioError("local/ was not recreated") return "reinstall recreated local/, local state installed=true, no transfer events during reinstall" def s33_install_after_mutation(self) -> str: source = self.peer("s33-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True) client = self.peer("s33-client") connect_many(client, [source]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "bfbc2", "install": False}) client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="download finish", waiter=waiter) client.docker_exec( "sh", "-c", "printf 'mutated archive bytes\\n' > /games/bfbc2/bfbc2.eti", ) client.send({"cmd": "install", "game_id": "bfbc2"}) client.wait_for(event_is("install-finished", "bfbc2"), timeout=30, description="install finish", waiter=waiter) client.docker_exec( "cmp", "/games/bfbc2/bfbc2.eti", "/games/bfbc2/local/fixture-payload.txt", ) return "fixture installer installed current mutated archive bytes exactly" def s34_many_small_files(self) -> str: source_dir = self.fixture_root / "s34-source" create_many_small_game(source_dir / "bf1942") source = self.peer("s34-source", games_dir=source_dir) client = self.peer("s34-client") connect_many(client, [source]) wait_remote_game(client, "bf1942", peer_count=1) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "bf1942", "install": False}) client.wait_for(event_is("download-finished", "bf1942"), timeout=60, description="download finish", waiter=waiter) diff_game_dirs(source_dir / "bf1942", client.host_games_dir / "bf1942") chunks = [ item for item in client.output if item.get("type") == "event" and item.get("event") == "download-chunk-finished" and item.get("data", {}).get("game_id") == "bf1942" ] # 20 small files + version.ini, each a single coherent chunk: exactly 21 # chunk events, 21 distinct relative paths, no splits and no duplicates. if len(chunks) != 21: raise ScenarioError(f"expected exactly 21 file chunks (20 files + version.ini), got {len(chunks)}") assert_no_duplicate_chunks(client, "bf1942") distinct_paths = {item.get("data", {}).get("relative_path") for item in chunks} if len(distinct_paths) != 21: raise ScenarioError(f"expected 21 distinct file paths, got {len(distinct_paths)}: {sorted(distinct_paths)}") return f"20 small files plus version.ini each transferred as one coherent chunk; diff matched; chunk events={len(chunks)}" def s35_unknown_game_filtered(self) -> str: source = self.peer("s35-source", fixtures=["mystery-game"]) client = self.peer("s35-client") connect_many(client, [source]) # Establish the premise: the source really does advertise mystery-game in # its raw library snapshot. Without this, wait_remote_absent could pass # vacuously ("absent because never sent" vs "absent because filtered"). wait_peer_has_game(client, source.peer_id, "mystery-game") wait_remote_absent(client, "mystery-game") err = client.send({"cmd": "download", "game_id": "mystery-game", "install": False}, expect_error=True) if "not in the local catalog" not in err["error"]: raise ScenarioError(f"unexpected unknown game error: {err}") assert_not_exists(client.host_games_dir / "mystery-game") return f"unknown game absent from list-games; download errored '{err['error']}'; no local files" def s36_catalog_singleton(self) -> str: peers = [] for index in range(5): game_dir = self.fixture_root / f"s36-{index}" version = CATALOG_VERSIONS["cnc4"] if index == 0 else "20160101" copy_game("cnc4", game_dir, version=version) peers.append(self.peer(f"s36-{index}", games_dir=game_dir)) client = self.peer("s36-client") connect_many(client, peers) wait_remote_game(client, "cnc4", peer_count=1, version=CATALOG_VERSIONS["cnc4"]) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": "cnc4", "install": False}) got = client.wait_for(event_is("got-game-files", "cnc4"), timeout=20, description="got game files", waiter=waiter) client.wait_for(event_is("download-finished", "cnc4"), timeout=60, description="download finish", waiter=waiter) catalog_addr = peers[0].ready_addr if catalog_addr is None: raise ScenarioError("catalog-version peer had no ready addr") for item in client.output: if item.get("type") != "event" or item.get("event") != "download-chunk-finished": continue data = item["data"] if data.get("game_id") == "cnc4" and data.get("peer_addr") != catalog_addr: raise ScenarioError(f"stale peer contributed chunk: {data}") diff_game_dirs(peers[0].host_games_dir / "cnc4", client.host_games_dir / "cnc4") descs = got["data"]["file_descriptions"] if not descs: raise ScenarioError("got-game-files had no descriptors") return "client reported singleton catalog-version peer; stale peers stayed hidden and sent no chunks; diff matched" def s37_single_source_download_throughput(self) -> str: source_dir = self.fixture_root / "s37-source" create_large_sparse_game(source_dir / PERF_GAME_ID, size=PERF_GAME_SIZE) source = self.peer("s37-source", games_dir=source_dir) client = self.peer("s37-client") connect_many(client, [source]) wait_remote_game(client, PERF_GAME_ID, peer_count=1, version=PERF_GAME_VERSION) waiter = LineWaiter(len(client.output)) client.send({"cmd": "download", "game_id": PERF_GAME_ID, "install": False}) finished = client.wait_for( event_is("download-finished", PERF_GAME_ID), timeout=300, description=f"{PERF_GAME_ID} throughput download", waiter=waiter, ) destination_archive = client.host_games_dir / PERF_GAME_ID / f"{PERF_GAME_ID}.eti" if destination_archive.stat().st_size != PERF_GAME_SIZE: raise ScenarioError( f"downloaded archive size mismatch: {destination_archive.stat().st_size} != {PERF_GAME_SIZE}" ) throughput = finished.get("data", {}).get("throughput") if not throughput: raise ScenarioError(f"download-finished did not include throughput: {finished}") expected_bytes = PERF_GAME_SIZE + len(PERF_GAME_VERSION) if int(throughput["bytes"]) != expected_bytes: raise ScenarioError( f"throughput byte count mismatch: {throughput['bytes']} != {expected_bytes}" ) # The byte count alone never exercises the rate math. Validate the rate # fields are positive and mutually consistent so a units/divisor bug # (MiB vs MB, off-by-1000, zero duration) cannot slip through. if throughput["duration_ms"] <= 0: raise ScenarioError(f"throughput duration_ms not positive: {throughput}") if throughput["mib_per_s"] <= 0 or throughput["mbit_per_s"] <= 0: raise ScenarioError(f"throughput rates not positive: {throughput}") derived_mib = (int(throughput["bytes"]) / 1_048_576) / (throughput["duration_ms"] / 1000.0) if abs(derived_mib - throughput["mib_per_s"]) > max(1.0, derived_mib * 0.02): raise ScenarioError( f"mib_per_s {throughput['mib_per_s']} disagrees with bytes/duration {derived_mib}" ) # mbit_per_s = bytes*8/s/1e6; mib_per_s = bytes/s/1048576 -> ratio is fixed. ratio = throughput["mbit_per_s"] / throughput["mib_per_s"] expected_ratio = 1_048_576 * 8 / 1_000_000 # 8.388608 if abs(ratio - expected_ratio) > 0.01: raise ScenarioError( f"mbit/mib ratio {ratio} != expected {expected_ratio}: {throughput}" ) return ( f"{PERF_GAME_ID} {format_bytes(PERF_GAME_SIZE)} single-source download: " f"{throughput['mib_per_s']:.2f} MiB/s, " f"{throughput['mbit_per_s']:.2f} Mbit/s, " f"{throughput['duration_ms'] / 1000.0:.3f}s, " f"{throughput['chunks']} chunks" ) def s38_first_play_launch_settings(self) -> str: client_dir = self.fixture_root / "s38-client" copy_game("css", client_dir) client = self.peer( "s38-client", games_dir=client_dir, extra_args=["--unrar", "/usr/local/bin/unrar"], ) waiter = LineWaiter(len(client.output)) client.send({"cmd": "install", "game_id": "css"}) client.wait_for( event_is("install-finished", "css"), timeout=30, description="css install", waiter=waiter, ) wait_local_game(client, "css", downloaded=True, installed=True) marker = client.host_state_dir / "games" / "css" / "launch_settings_applied" if marker.exists(): raise ScenarioError("launch settings marker existed before first play") local_root = client.host_games_dir / "css" / "local" account_file = local_root / "profiles" / "local" / "account_name.txt" language_file = local_root / "profiles" / "local" / "language.txt" ini_file = ( local_root / "engine" / "bin" / "win64" / "steam_settings" / "SmartSteamEmu.ini" ) for path in [account_file, language_file, ini_file]: if not path.is_file(): raise ScenarioError(f"expected installed launch settings file: {path}") if b"PersonaName = stubplayer\r\n" not in ini_file.read_bytes(): raise ScenarioError("installed SmartSteamEmu.ini did not preserve CRLF stub PersonaName") first = client.send( { "cmd": "play", "game_id": "css", "username": "Lan Hero", "language": "german", } )["data"]["outcome"] expected_first = { "already_applied": False, "account_name_written": True, "language_written": True, "persona_name_written": True, } if first != expected_first: raise ScenarioError(f"unexpected first play outcome: {first}") if not marker.is_file(): raise ScenarioError("launch settings marker was not written after first play") if account_file.read_text(encoding="utf-8") != "Lan Hero": raise ScenarioError("account_name.txt was not stamped with username") if language_file.read_text(encoding="utf-8") != "german": raise ScenarioError("language.txt was not stamped with language") stamped_ini = ini_file.read_bytes() if b"PersonaName = Lan Hero\r\n" not in stamped_ini: raise ScenarioError("PersonaName was not stamped with CRLF preserved") if b"AppId = 240\r\n" not in stamped_ini or b"Language = english\r\n" not in stamped_ini: raise ScenarioError("SmartSteamEmu.ini sibling lines were not preserved") client.docker_exec( "sh", "-c", "printf resetaccount > /games/css/local/profiles/local/account_name.txt", ) client.docker_exec( "sh", "-c", "printf resetlang > /games/css/local/profiles/local/language.txt", ) client.docker_exec( "sh", "-c", "printf '[Settings]\\r\\nAppId = 240\\r\\n" "PersonaName = resetplayer\\r\\nLanguage = english\\r\\n' > " "/games/css/local/engine/bin/win64/steam_settings/SmartSteamEmu.ini", ) second = client.send( { "cmd": "play", "game_id": "css", "username": "Second User", "language": "french", } )["data"]["outcome"] expected_second = { "already_applied": True, "account_name_written": False, "language_written": False, "persona_name_written": False, } if second != expected_second: raise ScenarioError(f"unexpected second play outcome: {second}") if account_file.read_text(encoding="utf-8") != "resetaccount": raise ScenarioError("second play rewrote account_name.txt despite marker") if language_file.read_text(encoding="utf-8") != "resetlang": raise ScenarioError("second play rewrote language.txt despite marker") if b"PersonaName = resetplayer\r\n" not in ini_file.read_bytes(): raise ScenarioError("second play rewrote PersonaName despite marker") return "css first play stamped launch settings once; second play respected the marker" def stream_install_cnctw(self, prefix: str) -> tuple[Peer, Peer]: source_dir = self.fixture_root / f"{prefix}-bravo" copy_game("cnctw", source_dir, version="20160128") source = self.peer(f"{prefix}-bravo", games_dir=source_dir) client = self.peer(f"{prefix}-client") connect_many(client, [source]) wait_remote_game(client, "cnctw", peer_count=1) waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) client.wait_for( event_is("download-begin", "cnctw"), timeout=20, description="stream begin cnctw", waiter=waiter, ) client.wait_for( event_is("download-finished", "cnctw"), timeout=60, description="stream finish cnctw", waiter=waiter, ) client.wait_for( event_is("install-finished", "cnctw"), timeout=30, description="stream install cnctw", waiter=waiter, ) return source, client def s39_streamed_install_local_only(self) -> str: source, client = self.stream_install_cnctw("s39") game = wait_local_game(client, "cnctw", downloaded=False, installed=True) assert_game_state( game, downloaded=False, installed=True, availability="LocalOnly", ) game_root = client.host_games_dir / "cnctw" assert_not_exists(game_root / "version.ini") assert_not_exists(game_root / "cnctw.eti") expected = { "bin/cnctw-payload.bin": unrar_entry_sha256( source, "cnctw", "bin/cnctw-payload.bin" ), "data/cnctw-assets.dat": unrar_entry_sha256( source, "cnctw", "data/cnctw-assets.dat" ), } actual = { rel: sha256_file(game_root / "local" / rel) for rel in expected } if actual != expected: raise ScenarioError(f"streamed local payload hashes mismatched: {actual} != {expected}") streamed_bytes = sum( int(item.get("data", {}).get("length", 0)) for item in client.output if item.get("type") == "event" and item.get("event") == "download-chunk-finished" and item.get("data", {}).get("game_id") == "cnctw" ) expected_bytes = 3 * 1024 * 1024 if streamed_bytes != expected_bytes: raise ScenarioError( f"streamed byte count mismatch: {streamed_bytes} != {expected_bytes}" ) wait_no_outbound_transfer(source, "cnctw") return ( "cnctw streamed into local/ only; root archive and version.ini absent; " f"payload hashes={actual}; source outbound transfer drained" ) def s40_streamed_receiver_not_source(self) -> str: _source, receiver = self.stream_install_cnctw("s40") observer = self.peer("s40-observer") connect_many(observer, [receiver]) receiver_snapshot = wait_peer_has_game(observer, receiver.peer_id, "cnctw") summary = next( game for game in receiver_snapshot.get("games", []) if game.get("id") == "cnctw" ) if summary.get("availability") != "LocalOnly" or summary.get("downloaded"): raise ScenarioError(f"receiver did not advertise cnctw as local-only: {summary}") wait_remote_absent(observer, "cnctw", timeout=5) err = observer.send( {"cmd": "download", "game_id": "cnctw", "install": False}, expect_error=True, ) if "no peers have game cnctw" not in err["error"]: raise ScenarioError(f"unexpected local-only download error: {err}") assert_not_exists(observer.host_games_dir / "cnctw") return ( "observer saw receiver's local-only cnctw snapshot, but remote aggregation hid it " f"and download errored '{err['error']}'" ) def s41_solid_archive_streamed_install(self) -> str: source_dir = self.fixture_root / "s41-solid-source" source_game = source_dir / "cnctw" shutil.copytree(FIXTURES / "fixture-solid" / "cnctw", source_game) source = self.peer("s41-solid-source", games_dir=source_dir) assert_peer_rar_archive_solid(source, "cnctw") client = self.peer("s41-solid-client") connect_many(client, [source]) wait_remote_game(client, "cnctw", peer_count=1, version="20160128") waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) client.wait_for( event_is("download-finished", "cnctw"), timeout=60, description="solid stream finish cnctw", waiter=waiter, ) client.wait_for( event_is("install-finished", "cnctw"), timeout=30, description="solid stream install cnctw", waiter=waiter, ) game = wait_local_game(client, "cnctw", downloaded=False, installed=True) assert_game_state( game, downloaded=False, installed=True, availability="LocalOnly", ) game_root = client.host_games_dir / "cnctw" assert_not_exists(game_root / "version.ini") assert_not_exists(game_root / "cnctw.eti") expected = { "bin/cnctw-solid-payload.bin": unrar_entry_sha256( source, "cnctw", "bin/cnctw-solid-payload.bin" ), "data/cnctw-solid-assets.dat": unrar_entry_sha256( source, "cnctw", "data/cnctw-solid-assets.dat" ), } actual = { rel: sha256_file(game_root / "local" / rel) for rel in expected } if actual != expected: raise ScenarioError( f"solid streamed payload hashes mismatched: {actual} != {expected}" ) streamed_bytes = sum( int(item.get("data", {}).get("length", 0)) for item in client.output if item.get("type") == "event" and item.get("event") == "download-chunk-finished" and item.get("data", {}).get("game_id") == "cnctw" ) expected_bytes = sum((game_root / "local" / rel).stat().st_size for rel in expected) if streamed_bytes != expected_bytes: raise ScenarioError( f"solid streamed byte count mismatch: {streamed_bytes} != {expected_bytes}" ) return ( "solid cnctw archive streamed through one local-only install; " f"payload hashes={actual}, bytes={streamed_bytes}" ) def s42_streamed_install_retries_next_source(self) -> str: bad_dir = self.fixture_root / "s42-bad-source" good_dir = self.fixture_root / "s42-good-source" copy_game("cnctw", bad_dir, version="20160128") copy_game("cnctw", good_dir, version="20160128") bad = self.peer( "s42-bad-source", games_dir=bad_dir, extra_args=["--unrar", "/missing-unrar"], ) good = self.peer("s42-good-source", games_dir=good_dir) if socket_addr_sort_key(bad.ready_addr) > socket_addr_sort_key(good.ready_addr): raise ScenarioError( "S42 requires the broken source to sort before the good source; " f"bad={bad.ready_addr}, good={good.ready_addr}" ) client = self.peer("s42-client") connect_many(client, [bad, good]) wait_remote_game(client, "cnctw", peer_count=2, version="20160128") waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) client.wait_for( event_is("download-finished", "cnctw"), timeout=60, description="retry stream finish cnctw", waiter=waiter, ) client.wait_for( event_is("install-finished", "cnctw"), timeout=30, description="retry stream install cnctw", waiter=waiter, ) game = wait_local_game(client, "cnctw", downloaded=False, installed=True) assert_game_state( game, downloaded=False, installed=True, availability="LocalOnly", ) game_root = client.host_games_dir / "cnctw" assert_not_exists(game_root / ".local.installing") assert_not_exists(game_root / "version.ini") assert_not_exists(game_root / "cnctw.eti") assert_only_chunk_sources(client, "cnctw", {good.ready_addr}) expected = { "bin/cnctw-payload.bin": unrar_entry_sha256( good, "cnctw", "bin/cnctw-payload.bin" ), "data/cnctw-assets.dat": unrar_entry_sha256( good, "cnctw", "data/cnctw-assets.dat" ), } actual = { rel: sha256_file(game_root / "local" / rel) for rel in expected } if actual != expected: raise ScenarioError(f"retry streamed payload hashes mismatched: {actual} != {expected}") streamed_bytes = sum( int(item.get("data", {}).get("length", 0)) for item in client.output if item.get("type") == "event" and item.get("event") == "download-chunk-finished" and item.get("data", {}).get("game_id") == "cnctw" ) expected_bytes = 3 * 1024 * 1024 if streamed_bytes != expected_bytes: raise ScenarioError( f"retry streamed byte count mismatch: {streamed_bytes} != {expected_bytes}" ) return ( "broken first source failed without chunks, next source completed whole stream; " f"good={good.ready_addr}, bad={bad.ready_addr}, bytes={streamed_bytes}" ) def s43_streamed_install_rejects_installed_game(self) -> str: _source, client = self.stream_install_cnctw("s43") start = len(client.output) waiter = LineWaiter(start) client.send({"cmd": "stream-install", "game_id": "cnctw"}) client.wait_for( event_is("download-failed", "cnctw"), timeout=20, description="already-installed stream rejection", waiter=waiter, ) assert_no_event_since(client, start, "install-finished", "cnctw") assert_no_event_since(client, start, "download-finished", "cnctw") wait_no_active(client, "cnctw") game = wait_local_game(client, "cnctw", downloaded=False, installed=True) assert_game_state( game, downloaded=False, installed=True, availability="LocalOnly", ) return "already-installed cnctw rejected a second streamed install without state drift" def s44_corrupt_stream_rolls_back(self) -> str: source_dir = self.fixture_root / "s44-corrupt-source" copy_game("cnctw", source_dir, version="20160128") (source_dir / "cnctw" / "cnctw.eti").write_bytes(b"not a rar archive") source = self.peer("s44-corrupt-source", games_dir=source_dir) client = self.peer("s44-client") connect_many(client, [source]) wait_remote_game(client, "cnctw", peer_count=1, version="20160128") start = len(client.output) waiter = LineWaiter(start) client.send({"cmd": "stream-install", "game_id": "cnctw"}) client.wait_for( event_is("download-failed", "cnctw"), timeout=30, description="corrupt stream failed", waiter=waiter, ) assert_no_event_since(client, start, "download-finished", "cnctw") assert_no_event_since(client, start, "install-finished", "cnctw") wait_no_active(client, "cnctw") assert_failed_stream_left_no_local(client, "cnctw") return "corrupt cnctw archive emitted download-failed and left no local install" def s45_sender_disconnect_mid_stream(self) -> str: source_dir = self.fixture_root / "s45-source" copy_game("alienswarm", source_dir, version="20190317") source = self.peer("s45-source", games_dir=source_dir) client = self.peer("s45-client") connect_many(client, [source]) wait_remote_game(client, "alienswarm", peer_count=1, version="20190317") start = len(client.output) waiter = LineWaiter(start) client.send({"cmd": "stream-install", "game_id": "alienswarm"}) client.wait_for( event_is("download-chunk-finished", "alienswarm"), timeout=30, description="first alienswarm stream chunk before source drop", waiter=waiter, ) source.kill() terminal = client.wait_for( event_name_in({"download-failed", "download-peers-gone"}, "alienswarm"), timeout=60, description="sender disconnect terminal event", waiter=waiter, ) assert_no_event_since(client, start, "download-finished", "alienswarm") assert_no_event_since(client, start, "install-finished", "alienswarm") wait_no_active(client, "alienswarm") assert_failed_stream_left_no_local(client, "alienswarm") return ( "sender disconnect after first alienswarm chunk rolled back stream; " f"terminal={terminal['event']}" ) def s46_receiver_cancel_mid_stream(self) -> str: source_dir = self.fixture_root / "s46-source" copy_game("alienswarm", source_dir, version="20190317") source = self.peer("s46-source", games_dir=source_dir) client = self.peer("s46-client") connect_many(client, [source]) wait_remote_game(client, "alienswarm", peer_count=1, version="20190317") start = len(client.output) waiter = LineWaiter(start) client.send({"cmd": "stream-install", "game_id": "alienswarm"}) client.wait_for( event_is("download-chunk-finished", "alienswarm"), timeout=30, description="first alienswarm stream chunk before receiver cancel", waiter=waiter, ) client.send({"cmd": "cancel-download", "game_id": "alienswarm"}) wait_no_active(client, "alienswarm", timeout=60) assert_no_event_since(client, start, "download-finished", "alienswarm") assert_no_event_since(client, start, "download-failed", "alienswarm") assert_no_event_since(client, start, "install-finished", "alienswarm") assert_failed_stream_left_no_local(client, "alienswarm") return "receiver cancel after first alienswarm chunk rolled back without failed event" def s47_multi_archive_streams_in_sorted_order(self) -> str: source_dir = self.fixture_root / "s47-source" source_game = source_dir / "cnctw" shutil.copytree(FIXTURES / "fixture-multi" / "cnctw", source_game) source = self.peer("s47-source", games_dir=source_dir) client = self.peer("s47-client") connect_many(client, [source]) wait_remote_game(client, "cnctw", peer_count=1, version="20160128") waiter = LineWaiter(len(client.output)) client.send({"cmd": "stream-install", "game_id": "cnctw"}) client.wait_for( event_is("download-finished", "cnctw"), timeout=30, description="multi-archive stream finish", waiter=waiter, ) client.wait_for( event_is("install-finished", "cnctw"), timeout=30, description="multi-archive stream install", waiter=waiter, ) game = wait_local_game(client, "cnctw", downloaded=False, installed=True) assert_game_state( game, downloaded=False, installed=True, availability="LocalOnly", ) game_root = client.host_games_dir / "cnctw" assert_not_exists(game_root / "version.ini") assert_not_exists(game_root / "a-first.eti") assert_not_exists(game_root / "z-second.eti") chunk_paths = streamed_chunk_paths(client, "cnctw") expected_paths = [ "cnctw/.local.installing/order/first.txt", "cnctw/.local.installing/order/second.txt", ] if chunk_paths != expected_paths: raise ScenarioError(f"multi-archive stream order mismatch: {chunk_paths}") first = (game_root / "local" / "order" / "first.txt").read_text(encoding="utf-8") second = (game_root / "local" / "order" / "second.txt").read_text(encoding="utf-8") if first != "first archive payload\n" or second != "second archive payload\n": raise ScenarioError(f"multi-archive payload mismatch: {first!r}, {second!r}") return f"multi-archive cnctw streamed in sorted order: {chunk_paths}" def run(command: list[str], description: str) -> subprocess.CompletedProcess[str]: result = subprocess.run( command, cwd=REPO, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if result.returncode != 0: raise ScenarioError(f"{description} failed:\n{result.stdout}") return result def run_just_test() -> str: env = os.environ.copy() env["RUSTC_WRAPPER"] = "" result = subprocess.run( ["just", "test"], cwd=REPO, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if result.returncode != 0: raise ScenarioError(f"just test failed:\n{result.stdout}") return result.stdout def reset_run_root() -> None: if not RUN_ROOT.exists(): return try: shutil.rmtree(RUN_ROOT) return except PermissionError: pass subprocess.run( [ "docker", "run", "--rm", "-v", f"{RUN_ROOT}:/cleanup", "--entrypoint", "/bin/sh", "debian:bookworm-slim", "-c", "rm -rf /cleanup/* /cleanup/.[!.]* /cleanup/..?*", ], cwd=REPO, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) shutil.rmtree(RUN_ROOT) def copy_game(game_id: str, destination_games_dir: Path, *, version: str | None = None) -> None: source = find_fixture_game(game_id) destination = destination_games_dir / game_id if destination.exists(): shutil.rmtree(destination) destination.parent.mkdir(parents=True, exist_ok=True) shutil.copytree(source, destination) version = version if version is not None else CATALOG_VERSIONS.get(game_id) if version is not None: (destination / "version.ini").write_text(version, encoding="utf-8") def find_fixture_game(game_id: str) -> Path: # Sorted so resolution is deterministic: several fixtures define the same # game id (e.g. cnctw exists under fixture-bravo, fixture-multi and # fixture-solid), and downstream assertions expect the fixture-bravo layout. for fixture_dir in sorted(FIXTURES.iterdir()): candidate = fixture_dir / game_id if candidate.exists(): return candidate raise ScenarioError(f"fixture game not found: {game_id}") def stage_game_drop(destination_games_dir: Path, game_id: str) -> None: source = find_fixture_game(game_id) root = destination_games_dir / game_id if root.exists(): shutil.rmtree(root) root.mkdir(parents=True) for child in source.iterdir(): if child.name == "version.ini": continue target = root / child.name if child.is_dir(): shutil.copytree(child, target) else: shutil.copy2(child, target) shutil.copy2(source / "version.ini", root / "version.ini") def create_many_small_game(root: Path) -> None: if root.exists(): shutil.rmtree(root) root.mkdir(parents=True) for index in range(20): child = root / f"file-{index:02}.bin" child.write_bytes(hashlib.sha256(f"small-{index}".encode()).digest() * 8) (root / "version.ini").write_text(CATALOG_VERSIONS.get(root.name, "20250101"), encoding="utf-8") def create_large_sparse_game(root: Path, *, size: int, version: str | None = None) -> None: if root.exists(): shutil.rmtree(root) root.mkdir(parents=True) resolved = version if version is not None else CATALOG_VERSIONS.get(root.name, PERF_GAME_VERSION) (root / "version.ini").write_text(resolved, encoding="utf-8") archive = root / f"{root.name}.eti" with archive.open("wb") as handle: handle.truncate(size) def inflate_archive_sparse(game_root: Path, game_id: str, size: int) -> None: """Replace a copied game's `.eti` with a sparse file of `size` bytes so the archive spans multiple 128 MiB download chunks. Sparse zero bytes are identical across copies, so duplicate-source majority validation still agrees and a `diff` against any catalog-version source still matches.""" archive = game_root / f"{game_id}.eti" with archive.open("wb") as handle: handle.truncate(size) def sha256_file(path: Path) -> str: hasher = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): hasher.update(chunk) return hasher.hexdigest() def unrar_entry_sha256(peer: Peer, game_id: str, relative_path: str) -> str: command = ( f"unrar p -inul /games/{shlex.quote(game_id)}/{shlex.quote(game_id)}.eti " f"{shlex.quote(relative_path)} | sha256sum" ) output = peer.docker_exec("sh", "-c", command).stdout.strip() if not output: raise ScenarioError(f"empty sha256 output for {game_id}:{relative_path}") return output.split()[0] def assert_peer_rar_archive_solid(peer: Peer, game_id: str) -> None: output = peer.docker_exec( "unrar", "lt", "-cfg-", f"/games/{game_id}/{game_id}.eti", ).stdout for line in output.splitlines(): stripped = line.strip() if stripped.startswith("Details:"): if "solid" in stripped.lower(): return raise ScenarioError(f"RAR archive is not solid: {game_id}") raise ScenarioError(f"RAR archive details were not reported: {game_id}") def socket_addr_sort_key(addr: str | None) -> tuple[int, int]: if addr is None: raise ScenarioError("cannot sort missing peer address") host, port = addr.rsplit(":", 1) host = host.removeprefix("[").removesuffix("]") return (int(ipaddress.ip_address(host)), int(port)) def format_bytes(size: int) -> str: return f"{size / 1024 / 1024 / 1024:.2f} GiB" def connect_many(client: Peer, peers: list[Peer]) -> None: for peer in peers: client.connect_to(peer) client.send({"cmd": "wait-peers", "count": len(peers), "timeout_ms": 15000}) def wait_remote_game( peer: Peer, game_id: str, *, peer_count: int | None = None, version: str | None = None, timeout: float = 20, ) -> dict[str, Any]: deadline = time.monotonic() + timeout last_rows: list[dict[str, Any]] = [] while time.monotonic() < deadline: rows = peer.list_games()["remote"] last_rows = rows for row in rows: if row["id"] != game_id: continue if peer_count is not None and row.get("peer_count") != peer_count: continue if version is not None and row.get("eti_game_version") != version: continue return row time.sleep(0.4) raise ScenarioError( f"{peer.name} never saw remote {game_id} peer_count={peer_count} version={version}; rows={last_rows}" ) def wait_remote_absent(peer: Peer, game_id: str, timeout: float = 20) -> None: deadline = time.monotonic() + timeout last_rows: list[dict[str, Any]] = [] while time.monotonic() < deadline: rows = peer.list_games()["remote"] last_rows = rows if all(row["id"] != game_id for row in rows): return time.sleep(0.4) raise ScenarioError(f"{peer.name} still lists remote {game_id}; rows={last_rows}") def wait_local_game( peer: Peer, game_id: str, *, downloaded: bool | None = None, installed: bool | None = None, timeout: float = 20, ) -> dict[str, Any]: deadline = time.monotonic() + timeout last_rows: list[dict[str, Any]] = [] while time.monotonic() < deadline: rows = peer.list_games()["local"] last_rows = rows for row in rows: if row["id"] != game_id: continue if downloaded is not None and row.get("downloaded") != downloaded: continue if installed is not None and row.get("installed") != installed: continue return row time.sleep(0.4) raise ScenarioError( f"{peer.name} never reached local {game_id} downloaded={downloaded} installed={installed}; rows={last_rows}" ) def wait_no_active(peer: Peer, game_id: str, timeout: float = 20) -> None: deadline = time.monotonic() + timeout last_active: list[dict[str, Any]] = [] while time.monotonic() < deadline: active = peer.status()["active_operations"] last_active = active if all(item["game_id"] != game_id for item in active): return time.sleep(0.4) raise ScenarioError(f"{peer.name} still has active operation for {game_id}: {last_active}") def wait_no_outbound_transfer(peer: Peer, game_id: str, timeout: float = 20) -> None: deadline = time.monotonic() + timeout last_active: dict[str, int] = {} while time.monotonic() < deadline: active = peer.status()["active_outbound_transfers"] last_active = active if active.get(game_id, 0) == 0: return time.sleep(0.4) raise ScenarioError( f"{peer.name} still has outbound transfer for {game_id}: {last_active}" ) def assert_game_state( game: dict[str, Any], *, downloaded: bool, installed: bool, availability: str, ) -> None: if ( game.get("downloaded") != downloaded or game.get("installed") != installed or game.get("availability") != availability ): raise ScenarioError( f"unexpected game state for {game.get('id')}: " f"downloaded={game.get('downloaded')} installed={game.get('installed')} " f"availability={game.get('availability')}" ) def wait_peer_has_game( observer: Peer, peer_id: str | None, game_id: str, timeout: float = 20, ) -> dict[str, Any]: if peer_id is None: raise ScenarioError("cannot wait for a peer without peer_id") deadline = time.monotonic() + timeout last_peers: list[dict[str, Any]] = [] while time.monotonic() < deadline: peers = observer.list_peers() last_peers = peers for peer in peers: if peer.get("peer_id") != peer_id: continue if any(game.get("id") == game_id for game in peer.get("games", [])): return peer time.sleep(0.4) raise ScenarioError( f"{observer.name} never saw peer {peer_id} advertise {game_id}; peers={last_peers}" ) def assert_local_absent(peer: Peer, game_id: str) -> None: rows = peer.list_games()["local"] if any( row["id"] == game_id and (row.get("downloaded") or row.get("installed")) for row in rows ): raise ScenarioError(f"{peer.name} advertises failed local {game_id}: {rows}") def assert_no_active(peer: Peer, game_id: str) -> None: status = peer.status() active = status["active_operations"] if any(item["game_id"] == game_id for item in active): raise ScenarioError(f"{peer.name} still has active operation for {game_id}: {active}") def assert_not_exists(path: Path) -> None: if path.exists(): raise ScenarioError(f"expected path to be absent: {path}") def assert_failed_stream_left_no_local(peer: Peer, game_id: str) -> None: game_root = peer.host_games_dir / game_id assert_local_absent(peer, game_id) assert_not_exists(game_root / "local") assert_not_exists(game_root / ".local.installing") assert_not_exists(game_root / "version.ini") assert_not_exists(game_root / f"{game_id}.eti") def event_is(event: str, game_id: str | None = None) -> Callable[[dict[str, Any]], bool]: def predicate(item: dict[str, Any]) -> bool: if item.get("type") != "event" or item.get("event") != event: return False if game_id is None: return True return item.get("data", {}).get("game_id") == game_id return predicate def event_name_in(events: set[str], game_id: str | None = None) -> Callable[[dict[str, Any]], bool]: def predicate(item: dict[str, Any]) -> bool: if item.get("type") != "event" or item.get("event") not in events: return False if game_id is None: return True return item.get("data", {}).get("game_id") == game_id return predicate def assert_no_event_since(peer: Peer, start: int, event: str, game_id: str) -> None: for item in peer.output[start:]: if item.get("type") == "event" and item.get("event") == event: if item.get("data", {}).get("game_id") == game_id: raise ScenarioError(f"unexpected {event} for {game_id}: {item}") def assert_only_chunk_sources( peer: Peer, game_id: str, allowed_sources: set[str | None], ) -> None: allowed = {source for source in allowed_sources if source is not None} if not allowed: raise ScenarioError("no allowed chunk sources supplied") seen: set[str] = set() for item in peer.output: if item.get("type") != "event" or item.get("event") != "download-chunk-finished": continue data = item["data"] if data.get("game_id") != game_id: continue source = data.get("peer_addr") seen.add(source) if source not in allowed: raise ScenarioError(f"unexpected chunk source for {game_id}: {data}") if not seen: raise ScenarioError(f"no chunk events recorded for {game_id}") def streamed_chunk_paths(peer: Peer, game_id: str) -> list[str]: return [ item["data"]["relative_path"] for item in peer.output if item.get("type") == "event" and item.get("event") == "download-chunk-finished" and item.get("data", {}).get("game_id") == game_id ] def chunk_totals(peer: Peer, game_id: str, relative_path: str) -> dict[str, int]: totals: dict[str, int] = {} for item in peer.output: if item.get("type") != "event" or item.get("event") != "download-chunk-finished": continue data = item["data"] if data.get("game_id") != game_id or data.get("relative_path") != relative_path: continue totals[data["peer_addr"]] = totals.get(data["peer_addr"], 0) + int(data["length"]) return totals def chunk_sources(peer: Peer, game_id: str) -> set[str]: return { item["data"]["peer_addr"] for item in peer.output if item.get("type") == "event" and item.get("event") == "download-chunk-finished" and item.get("data", {}).get("game_id") == game_id } def assert_no_duplicate_chunks(peer: Peer, game_id: str) -> None: seen: set[tuple[str | None, int]] = set() for item in peer.output: if item.get("type") != "event" or item.get("event") != "download-chunk-finished": continue data = item["data"] if data.get("game_id") != game_id: continue key = (data.get("relative_path"), int(data.get("offset", 0))) if key in seen: raise ScenarioError(f"{peer.name} downloaded a duplicate chunk for {game_id}: {key}") seen.add(key) def count_events(peer: Peer, event: str, game_id: str) -> int: return sum( 1 for item in peer.output if item.get("type") == "event" and item.get("event") == event and item.get("data", {}).get("game_id") == game_id ) def peer_advertised_version( observer: Peer, peer_id: str | None, game_id: str, timeout: float = 20 ) -> str | None: """Returns the raw `eti_version` a peer advertises for a game in its library snapshot (list-peers). Unlike the list-games `remote` rows, this value is NOT synthesized from the local catalog, so it faithfully reports the source. Polls until the peer's library snapshot is observed.""" if peer_id is None: raise ScenarioError("cannot read advertised version without peer_id") deadline = time.monotonic() + timeout while time.monotonic() < deadline: for peer in observer.list_peers(): if peer.get("peer_id") != peer_id: continue for game in peer.get("games", []): if game.get("id") == game_id: return game.get("eti_version") time.sleep(0.4) raise ScenarioError(f"{observer.name} does not see {game_id} advertised by {peer_id}") def diff_game_dirs(source: Path, destination: Path) -> None: source_manifest = manifest(source) destination_manifest = manifest(destination) if source_manifest != destination_manifest: diff = run_diff(source, destination) raise ScenarioError( f"manifest mismatch between {source} and {destination}\n{diff}" ) diff = run_diff(source, destination) if diff: raise ScenarioError(f"diff mismatch between {source} and {destination}\n{diff}") def manifest(root: Path) -> dict[str, str]: if not root.exists(): raise ScenarioError(f"missing manifest root: {root}") entries: dict[str, str] = {} for path in sorted(root.rglob("*")): if not path.is_file(): continue rel = path.relative_to(root) if any(part in IGNORED_DIFF_NAMES for part in rel.parts): continue hasher = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): hasher.update(chunk) entries[str(rel)] = hasher.hexdigest() return entries def run_diff(source: Path, destination: Path) -> str: command = [ "diff", "-r", "-x", ".lanspread", "-x", ".lanspread.json", "-x", "local", str(source), str(destination), ] result = subprocess.run( command, cwd=REPO, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) return result.stdout if result.returncode else "" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "scenarios", nargs="*", help="Scenario IDs to run, e.g. S18 S20. Defaults to all implemented scenarios.", ) parser.add_argument( "--build-image", action="store_true", help="Run `just peer-cli-image` before starting scenarios.", ) return parser.parse_args() def main() -> int: args = parse_args() selected = {item.lower() for item in args.scenarios} if args.scenarios else None try: Runner(selected=selected, build_image=args.build_image).run() except ScenarioError as error: print(f"\nFAILED: {error}", file=sys.stderr) return 1 return 0 if __name__ == "__main__": sys.exit(main())