Files
lanspread/crates/lanspread-peer-cli/scripts/run_extended_scenarios.py
T
ddidderr 9835e77e8d feat: store launcher state outside game dirs
Move launcher-owned metadata from game roots into the configured peer state
area. Peer identity, the local library index, install intent logs, and setup
markers now live under app/CLI state instead of being written beside games.
The Tauri shell passes its app data directory into the peer, and the peer CLI
runs the same path through its explicit --state-dir.

Add a dedicated pre-start migration phase for legacy files. It migrates the
old global library index, per-game install intents, and the old first-start
marker into app state, then deletes legacy files only after the replacement
write succeeds. Normal scan, install, recovery, and transfer paths no longer
read legacy state files.

Rename the old first-start meaning to setup_done and only set it after
launching game_setup.cmd. Start/setup scripts keep the shared argument shape,
while server_start.cmd now uses cmd /k and a visible window so server logs stay
open for inspection.

While validating the Docker scenario matrix, make download terminal events
come from the handler after local state refresh and operation cleanup. This
makes download-finished/download-failed safe points for immediate follow-up CLI
commands. Also update the multi-peer chunking scenario to use a sparse archive
large enough to actually span multiple production chunks.

Test Plan:
- just fmt
- just test
- just frontend-test
- just build
- just clippy
- git diff --check
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py

Refs: local app-state migration discussion
2026-05-21 21:32:28 +02:00

1455 lines
62 KiB
Python

#!/usr/bin/env python3
"""Run the peer-cli scenarios S1-S36 through Docker."""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import queue
import shutil
import subprocess
import sys
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable
REPO = Path(__file__).resolve().parents[3]
RUN_ROOT = REPO / ".lanspread-peer-cli" / "extended-scenarios"
IMAGE = "lanspread-peer-cli:dev"
NETWORK = "lanspread"
CONTAINER_PREFIX = "lanspread-peer-cli-ext"
CATALOG_DB = "/app/game.db"
FIXTURES = REPO / "crates" / "lanspread-peer-cli" / "fixtures"
CHUNK_SIZE = 128 * 1024 * 1024
PERF_GAME_ID = "bf1942"
PERF_GAME_SIZE = 2 * 1024 * 1024 * 1024
IGNORED_DIFF_NAMES = {".lanspread", ".lanspread.json", "local"}
class ScenarioError(RuntimeError):
pass
@dataclass
class LineWaiter:
seen: int = 0
@dataclass
class Peer:
runner: "Runner"
name: str
games_dir: Path | None = None
readonly_games: bool = False
tmpfs_size: str | None = None
fixtures: list[str] = field(default_factory=list)
extra_args: list[str] = field(default_factory=list)
process: subprocess.Popen[str] | None = None
output: list[dict[str, Any]] = field(default_factory=list)
raw_output: list[str] = field(default_factory=list)
events: queue.Queue[dict[str, Any]] = field(default_factory=queue.Queue)
condition: threading.Condition = field(default_factory=threading.Condition)
request_index: int = 0
ready_addr: str | None = None
peer_id: str | None = None
@property
def container_name(self) -> str:
return f"{CONTAINER_PREFIX}-{self.runner.run_id}-{self.name}"
@property
def host_games_dir(self) -> Path:
if self.games_dir is not None:
return self.games_dir
return self.runner.games_root / self.name
@property
def host_state_dir(self) -> Path:
return self.runner.state_root / self.name
def start(self) -> "Peer":
self.host_state_dir.mkdir(parents=True, exist_ok=True)
if self.tmpfs_size is None:
self.host_games_dir.mkdir(parents=True, exist_ok=True)
command = [
"docker",
"run",
"--rm",
"--init",
"--network",
NETWORK,
"--name",
self.container_name,
"-i",
"-v",
f"{self.host_state_dir}:/state",
]
if self.tmpfs_size is None:
mode = "ro" if self.readonly_games else "rw"
command.extend(["-v", f"{self.host_games_dir}:/games:{mode}"])
else:
command.extend(["--tmpfs", f"/games:size={self.tmpfs_size}"])
command.extend(
[
IMAGE,
"--name",
self.name,
"--games-dir",
"/games",
"--state-dir",
"/state",
"--catalog-db",
CATALOG_DB,
]
)
for fixture in self.fixtures:
command.extend(["--fixture", fixture])
command.extend(self.extra_args)
self.runner.log(f"start {self.name}: {' '.join(command)}")
self.process = subprocess.Popen(
command,
cwd=REPO,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
threading.Thread(target=self._read_output, daemon=True).start()
self.wait_ready()
return self
def _read_output(self) -> None:
assert self.process is not None
assert self.process.stdout is not None
for line in self.process.stdout:
stripped = line.rstrip("\r\n")
with self.condition:
self.raw_output.append(stripped)
try:
parsed = json.loads(stripped)
except json.JSONDecodeError:
parsed = {"type": "raw", "line": stripped}
self.output.append(parsed)
self.condition.notify_all()
self.events.put(parsed)
def wait_ready(self) -> None:
line = self.wait_for(
lambda item: item.get("type") == "event"
and item.get("event") == "local-peer-ready",
timeout=20,
description=f"{self.name} local-peer-ready",
)
data = line["data"]
self.ready_addr = data["addr"]
self.peer_id = data["peer_id"]
def send(
self,
payload: dict[str, Any],
*,
expect_error: bool = False,
timeout: float = 20,
) -> dict[str, Any]:
assert self.process is not None
assert self.process.stdin is not None
self.request_index += 1
request_id = f"{self.name}-{self.request_index}"
payload = dict(payload)
payload["id"] = request_id
line = json.dumps(payload, separators=(",", ":"))
self.process.stdin.write(line + "\n")
self.process.stdin.flush()
response = self.wait_for(
lambda item: item.get("id") == request_id
and item.get("type") in {"result", "error"},
timeout=timeout,
description=f"{self.name} response to {payload.get('cmd')}",
)
if response["type"] == "error":
if expect_error:
return response
raise ScenarioError(f"{self.name} command failed: {response}")
if expect_error:
raise ScenarioError(f"{self.name} command unexpectedly succeeded: {response}")
return response
def wait_for(
self,
predicate: Callable[[dict[str, Any]], bool],
*,
timeout: float,
description: str,
waiter: LineWaiter | None = None,
) -> dict[str, Any]:
deadline = time.monotonic() + timeout
with self.condition:
if waiter is None:
start = 0
else:
start = waiter.seen
while True:
for index in range(start, len(self.output)):
item = self.output[index]
if predicate(item):
if waiter is not None:
waiter.seen = index + 1
return item
start = len(self.output)
if waiter is not None:
waiter.seen = start
remaining = deadline - time.monotonic()
if remaining <= 0:
tail = "\n".join(self.raw_output[-20:])
raise ScenarioError(
f"timed out waiting for {description} on {self.name}\n{tail}"
)
self.condition.wait(remaining)
def list_games(self) -> dict[str, Any]:
return self.send({"cmd": "list-games"})["data"]
def list_peers(self) -> list[dict[str, Any]]:
return self.send({"cmd": "list-peers"})["data"]["peers"]
def status(self) -> dict[str, Any]:
return self.send({"cmd": "status"})["data"]
def connect_to(self, other: "Peer") -> None:
if other.ready_addr is None:
raise ScenarioError(f"{other.name} is not ready")
self.send({"cmd": "connect", "addr": other.ready_addr})
self.send({"cmd": "wait-peers", "count": 1, "timeout_ms": 10000})
def shutdown(self) -> None:
if self.process is None:
return
if self.process.poll() is None:
try:
self.send({"cmd": "shutdown"}, timeout=8)
except Exception:
self.kill()
try:
self.process.wait(timeout=8)
except subprocess.TimeoutExpired:
self.kill()
def kill(self) -> None:
subprocess.run(
["docker", "rm", "-f", self.container_name],
cwd=REPO,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
if self.process is not None:
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
def docker_exec(self, *args: str, check: bool = True) -> subprocess.CompletedProcess[str]:
return subprocess.run(
["docker", "exec", self.container_name, *args],
cwd=REPO,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=check,
)
class Runner:
def __init__(self, selected: set[str] | None = None, build_image: bool = False) -> None:
self.selected = selected
self.build_image = build_image
self.run_id = str(int(time.time()))
self.state_root = RUN_ROOT / "state"
self.games_root = RUN_ROOT / "games"
self.fixture_root = RUN_ROOT / "fixtures"
self.current_peers: list[Peer] = []
self.results: list[tuple[str, str]] = []
def log(self, message: str) -> None:
print(message, flush=True)
def run(self) -> None:
self.prepare()
scenarios: list[tuple[str, Callable[[], str]]] = [
("S1", self.s1_startup_scan),
("S2", self.s2_direct_connect_handshake),
("S3", self.s3_remote_aggregation),
("S4", self.s4_single_source_download),
("S5", self.s5_auto_install_download),
("S6", self.s6_manual_install_uninstall),
("S7", self.s7_duplicate_source_download),
("S8", self.s8_ambiguous_metadata_rejection),
("S9", self.s9_missing_game),
("S10", self.s10_shutdown_cleanup),
("S11", self.s11_same_identity_reconnect),
("S12", self.s12_transfer_serving_gates),
("S13", self.s13_exact_transfer_equality),
("S14", self.s14_large_multi_peer_chunking),
("S15", self.s15_three_way_version_skew),
("S16", self.s16_latest_fanout_with_stale),
("S17", self.s17_latest_conflict_rejection),
("S18", self.s18_redundant_source_drop),
("S19", self.s19_sole_source_drop),
("S20", self.s20_receiver_write_failure),
("S21", self.s21_add_game_propagation),
("S22", self.s22_remove_game_propagation),
("S23", self.s23_version_bump_propagation),
("S24", self.s24_two_clients_one_source),
("S25", self.s25_two_downloads_one_client),
("S26", self.s26_duplicate_download_rejection),
("S27", self.s27_self_connect_rejection),
("S28", self.s28_address_change_unit),
("S29", self.s29_empty_peer_participates),
("S30", self.s30_mesh_aggregation),
("S31", self.s31_bootstrapped_peer_source),
("S32", self.s32_reinstall_after_uninstall),
("S33", self.s33_install_after_mutation),
("S34", self.s34_many_small_files),
("S35", self.s35_unknown_game_filtered),
("S36", self.s36_latest_singleton),
("S37", self.s37_single_source_download_throughput),
]
for scenario_id, scenario in scenarios:
if self.selected and scenario_id.lower() not in self.selected:
continue
self.cleanup_containers()
self.current_peers = []
try:
self.log(f"\n== {scenario_id} ==")
evidence = scenario()
self.results.append((scenario_id, evidence))
self.log(f"{scenario_id} PASS: {evidence}")
finally:
self.stop_peers()
if not self.results:
raise ScenarioError("no scenarios selected")
self.log("\nSummary:")
for scenario_id, evidence in self.results:
self.log(f"- {scenario_id}: {evidence}")
def prepare(self) -> None:
reset_run_root()
self.state_root.mkdir(parents=True, exist_ok=True)
self.games_root.mkdir(parents=True, exist_ok=True)
self.fixture_root.mkdir(parents=True, exist_ok=True)
self.cleanup_containers()
if self.build_image:
run(["just", "peer-cli-image"], "build peer-cli image")
run(["just", "peer-cli-net"], "prepare peer-cli docker network")
def cleanup_containers(self) -> None:
result = subprocess.run(
["docker", "ps", "-a", "--format", "{{.Names}}"],
cwd=REPO,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
names = [
name
for name in result.stdout.splitlines()
if name.startswith(f"{CONTAINER_PREFIX}-")
]
if names:
subprocess.run(
["docker", "rm", "-f", *names],
cwd=REPO,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
def stop_peers(self) -> None:
for peer in reversed(self.current_peers):
peer.shutdown()
self.cleanup_containers()
def peer(
self,
name: str,
*,
games_dir: Path | None = None,
readonly_games: bool = False,
tmpfs_size: str | None = None,
fixtures: list[str] | None = None,
extra_args: list[str] | None = None,
) -> Peer:
peer = Peer(
runner=self,
name=name,
games_dir=games_dir,
readonly_games=readonly_games,
tmpfs_size=tmpfs_size,
fixtures=fixtures or [],
extra_args=extra_args or [],
).start()
self.current_peers.append(peer)
return peer
def s1_startup_scan(self) -> str:
alpha = self.peer("s1-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
games = alpha.list_games()["local"]
expected = {"alienswarm", "bf1942", "ggoo"}
seen = {game["id"] for game in games}
if not expected.issubset(seen):
raise ScenarioError(f"fixture-alpha games missing: expected {expected}, saw {seen}")
for game in games:
if game["id"] in expected:
assert_game_state(game, downloaded=True, installed=False, availability="Ready")
return "fixture-alpha emitted ready local games alienswarm, bf1942, and ggoo"
def s2_direct_connect_handshake(self) -> str:
alpha = self.peer("s2-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
bravo = self.peer("s2-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
connect_many(alpha, [bravo])
peers = alpha.list_peers()
if len(peers) != 1 or peers[0]["peer_id"] == alpha.peer_id:
raise ScenarioError(f"bad alpha peers after connect: {peers}")
if peers[0]["game_count"] != 4:
raise ScenarioError(f"expected bravo game_count=4, got {peers}")
return "alpha connected to bravo, saw one non-self peer with four games"
def s3_remote_aggregation(self) -> str:
alpha = self.peer("s3-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
bravo = self.peer("s3-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
client = self.peer("s3-client")
connect_many(client, [alpha, bravo])
expected_counts = {
"ggoo": 2,
"alienswarm": 1,
"bf1942": 1,
"bfbc2": 1,
"cnc4": 1,
"cnctw": 1,
}
for game_id, peer_count in expected_counts.items():
wait_remote_game(client, game_id, peer_count=peer_count)
return "empty client aggregated alpha/bravo with ggoo peer_count=2 and unique games peer_count=1"
def s4_single_source_download(self) -> str:
bravo = self.peer("s4-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
client = self.peer("s4-client")
connect_many(client, [bravo])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "bfbc2", "install": False})
client.wait_for(event_is("got-game-files", "bfbc2"), timeout=20, description="got bfbc2", waiter=waiter)
client.wait_for(event_is("download-begin", "bfbc2"), timeout=20, description="begin bfbc2", waiter=waiter)
client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="finish bfbc2", waiter=waiter)
game = wait_local_game(client, "bfbc2", downloaded=True, installed=False)
diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2")
if (client.host_games_dir / "bfbc2" / "local").exists():
raise ScenarioError("bfbc2 local/ exists after install=false")
return f"bfbc2 downloaded with install=false, local state installed={game['installed']}, diff matched"
def s5_auto_install_download(self) -> str:
bravo = self.peer("s5-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
client = self.peer("s5-client")
connect_many(client, [bravo])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "cnctw"})
client.wait_for(event_is("download-finished", "cnctw"), timeout=60, description="finish cnctw", waiter=waiter)
client.wait_for(event_is("install-finished", "cnctw"), timeout=30, description="install cnctw", waiter=waiter)
wait_local_game(client, "cnctw", downloaded=True, installed=True)
if not (client.host_games_dir / "cnctw" / "local" / "fixture-payload.txt").is_file():
raise ScenarioError("cnctw install payload missing")
diff_game_dirs(FIXTURES / "fixture-bravo" / "cnctw", client.host_games_dir / "cnctw")
return "cnctw auto-installed, local fixture payload existed, root diff matched excluding local metadata"
def s6_manual_install_uninstall(self) -> str:
bravo = self.peer("s6-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
client = self.peer("s6-client")
connect_many(client, [bravo])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "bfbc2", "install": False})
client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="finish bfbc2", waiter=waiter)
client.send({"cmd": "install", "game_id": "bfbc2"})
client.wait_for(event_is("install-finished", "bfbc2"), timeout=30, description="install bfbc2", waiter=waiter)
wait_local_game(client, "bfbc2", downloaded=True, installed=True)
client.send({"cmd": "uninstall", "game_id": "bfbc2"})
client.wait_for(event_is("uninstall-finished", "bfbc2"), timeout=30, description="uninstall bfbc2", waiter=waiter)
wait_local_game(client, "bfbc2", downloaded=True, installed=False)
if (client.host_games_dir / "bfbc2" / "local").exists():
raise ScenarioError("bfbc2 local/ remained after uninstall")
diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2")
return "manual install/uninstall toggled installed state, removed local/, preserved downloaded root"
def s7_duplicate_source_download(self) -> str:
alpha = self.peer("s7-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
bravo = self.peer("s7-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
client = self.peer("s7-client")
connect_many(client, [alpha, bravo])
wait_remote_game(client, "ggoo", peer_count=2)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "ggoo", "install": False})
client.wait_for(event_is("download-finished", "ggoo"), timeout=60, description="finish ggoo", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-alpha" / "ggoo", client.host_games_dir / "ggoo")
return "shared ggoo downloaded once from duplicate sources and diffed cleanly"
def s8_ambiguous_metadata_rejection(self) -> str:
dir_a = self.fixture_root / "s8-a"
dir_b = self.fixture_root / "s8-b"
copy_game("ggoo", dir_a, version="20260101")
copy_game("ggoo", dir_b, version="20260101")
with (dir_b / "ggoo" / "ggoo.eti").open("ab") as handle:
handle.write(b"conflict")
peer_a = self.peer("s8-a", games_dir=dir_a)
peer_b = self.peer("s8-b", games_dir=dir_b)
client = self.peer("s8-client")
connect_many(client, [peer_a, peer_b])
wait_remote_game(client, "ggoo", peer_count=2, version="20260101")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "ggoo", "install": False})
client.wait_for(event_is("download-failed", "ggoo"), timeout=30, description="ggoo failed", waiter=waiter)
assert_not_exists(client.host_games_dir / "ggoo" / "version.ini")
return "conflicting latest ggoo file sizes emitted download-failed and left no version.ini"
def s9_missing_game(self) -> str:
client = self.peer("s9-client")
err = client.send({"cmd": "download", "game_id": "cod2", "install": False}, expect_error=True)
if "no peers have game cod2" not in err["error"]:
raise ScenarioError(f"unexpected missing game error: {err}")
assert_not_exists(client.host_games_dir / "cod2")
return f"missing game command errored '{err['error']}' and created no local directory"
def s10_shutdown_cleanup(self) -> str:
alpha = self.peer("s10-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
bravo = self.peer("s10-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
connect_many(alpha, [bravo])
wait_remote_game(alpha, "bfbc2", peer_count=1)
bravo.shutdown()
wait_remote_absent(alpha, "bfbc2")
if alpha.list_peers():
raise ScenarioError(f"alpha still has peers after bravo shutdown: {alpha.list_peers()}")
return "bravo graceful shutdown removed peer and bravo-only games from alpha"
def s11_same_identity_reconnect(self) -> str:
alpha = self.peer("s11-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
bravo_dir = FIXTURES / "fixture-bravo"
bravo = self.peer("s11-bravo", games_dir=bravo_dir, readonly_games=True)
connect_many(alpha, [bravo])
first_peer = alpha.list_peers()[0]
first_addr = first_peer["addr"]
first_id = first_peer["peer_id"]
bravo.shutdown()
wait_remote_absent(alpha, "bfbc2")
bravo = self.peer("s11-bravo", games_dir=bravo_dir, readonly_games=True)
connect_many(alpha, [bravo])
peers = alpha.list_peers()
if len(peers) != 1:
raise ScenarioError(f"expected one bravo peer after reconnect, got {peers}")
if peers[0]["peer_id"] != first_id:
raise ScenarioError(f"bravo peer id changed: {first_id} -> {peers[0]['peer_id']}")
if peers[0]["addr"] == first_addr:
raise ScenarioError(f"bravo listener address did not change: {first_addr}")
return f"bravo reused peer id {first_id} with new address {peers[0]['addr']}"
def s12_transfer_serving_gates(self) -> str:
output = run_just_test()
required = [
"local_download_available_gates_on_catalog_operation_and_sentinel",
"get_game_response_respects_serve_gates",
"file_transfer_dispatch_respects_serve_gates",
"local_relative_paths_are_never_transferable",
]
missing = [name for name in required if name not in output]
if missing:
raise ScenarioError(f"S12 unit proof missing tests: {missing}")
return "just test passed including catalog/sentinel/active/local-path serve gate tests"
def s13_exact_transfer_equality(self) -> str:
bravo = self.peer("s13-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
alpha = self.peer("s13-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
client = self.peer("s13-client")
connect_many(client, [bravo, alpha])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "bfbc2", "install": False})
client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="bfbc2 finish", waiter=waiter)
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="alienswarm finish", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2")
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", client.host_games_dir / "alienswarm")
return "small bfbc2 and large alienswarm transfers both diffed cleanly against sources"
def s14_large_multi_peer_chunking(self) -> str:
game_id = PERF_GAME_ID
source_dir = self.fixture_root / "s14-alpha"
create_large_sparse_game(source_dir / game_id, size=CHUNK_SIZE + 1024 * 1024)
alpha = self.peer("s14-alpha", games_dir=source_dir)
stage = self.peer("s14-stage")
connect_many(stage, [alpha])
waiter = LineWaiter(len(stage.output))
stage.send({"cmd": "download", "game_id": game_id, "install": False})
stage.wait_for(event_is("download-finished", game_id), timeout=90, description="stage finish", waiter=waiter)
diff_game_dirs(source_dir / game_id, stage.host_games_dir / game_id)
client = self.peer("s14-client")
connect_many(client, [alpha, stage])
wait_remote_game(client, game_id, peer_count=2, version="20260520")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": game_id, "install": False})
client.wait_for(event_is("download-finished", game_id), timeout=90, description="client finish", waiter=waiter)
diff_game_dirs(source_dir / game_id, client.host_games_dir / game_id)
totals = chunk_totals(client, game_id, f"{game_id}/{game_id}.eti")
if len(totals) < 2:
raise ScenarioError(f"expected chunks from two peers, got {totals}")
values = list(totals.values())
if max(values) - min(values) > CHUNK_SIZE:
raise ScenarioError(f"chunk totals not balanced within one chunk: {totals}")
return f"{game_id} downloaded from two sources, diff matched, chunk totals={totals}"
def s15_three_way_version_skew(self) -> str:
specs = [("s15-a", "20250101"), ("s15-b", "20250201"), ("s15-c", "20250301")]
peers = []
for name, version in specs:
game_dir = self.fixture_root / name
copy_game("cnc4", game_dir, version=version)
peers.append(self.peer(name, games_dir=game_dir))
client = self.peer("s15-client")
connect_many(client, peers)
wait_remote_game(client, "cnc4", peer_count=3, version="20250301")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "cnc4", "install": False})
client.wait_for(event_is("download-finished", "cnc4"), timeout=60, description="cnc4 finish", waiter=waiter)
assert_only_chunk_sources(client, "cnc4", {peers[2].ready_addr})
diff_game_dirs(peers[2].host_games_dir / "cnc4", client.host_games_dir / "cnc4")
return "three-way skew selected only 20250301 peer and receiver diffed cleanly"
def s16_latest_fanout_with_stale(self) -> str:
specs = [
("s16-a", "20250101"),
("s16-b", "20250301"),
("s16-c", "20250301"),
]
peers = []
for name, version in specs:
game_dir = self.fixture_root / name
copy_game("alienswarm", game_dir, version=version)
peers.append(self.peer(name, games_dir=game_dir))
client = self.peer("s16-client")
connect_many(client, peers)
wait_remote_game(client, "alienswarm", peer_count=3, version="20250301")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="alienswarm finish", waiter=waiter)
assert_only_chunk_sources(client, "alienswarm", {peers[1].ready_addr, peers[2].ready_addr})
totals = chunk_totals(client, "alienswarm", "alienswarm/alienswarm.eti")
if peers[0].ready_addr in totals:
raise ScenarioError(f"stale peer contributed chunks: {totals}")
diff_game_dirs(peers[1].host_games_dir / "alienswarm", client.host_games_dir / "alienswarm")
return f"latest B/C peers served alienswarm while stale A contributed zero; totals={totals}"
def s17_latest_conflict_rejection(self) -> str:
specs = [
("s17-a", "20250101", False),
("s17-b", "20250301", False),
("s17-c", "20250301", True),
]
peers = []
for name, version, conflict in specs:
game_dir = self.fixture_root / name
copy_game("cnc4", game_dir, version=version)
if conflict:
with (game_dir / "cnc4" / "cnc4.eti").open("ab") as handle:
handle.write(b"conflict")
peers.append(self.peer(name, games_dir=game_dir))
client = self.peer("s17-client")
connect_many(client, peers)
wait_remote_game(client, "cnc4", peer_count=3, version="20250301")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "cnc4", "install": False})
client.wait_for(event_is("download-failed", "cnc4"), timeout=30, description="cnc4 failed", waiter=waiter)
assert_not_exists(client.host_games_dir / "cnc4" / "version.ini")
return "latest-version file conflict failed download and left no committed version.ini"
def s18_redundant_source_drop(self) -> str:
source_a_dir = self.fixture_root / "s18-a"
source_b_dir = self.fixture_root / "s18-b"
copy_game("alienswarm", source_a_dir)
copy_game("alienswarm", source_b_dir)
source_a = self.peer("s18-a", games_dir=source_a_dir)
source_b = self.peer("s18-b", games_dir=source_b_dir)
client = self.peer("s18-client")
connect_many(client, [source_a, source_b])
wait_remote_game(client, "alienswarm", peer_count=2)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-begin", "alienswarm"), timeout=20, description="download begin", waiter=waiter)
source_a.kill()
client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="download finish", waiter=waiter)
assert_no_event(client, waiter, "download-failed", "alienswarm")
diff_game_dirs(source_b_dir / "alienswarm", client.host_games_dir / "alienswarm")
totals = chunk_totals(client, "alienswarm", "alienswarm/alienswarm.eti")
if not totals:
raise ScenarioError("S18 did not record chunk evidence")
return f"source killed after begin; download finished; diff matched; chunk bytes={totals}"
def s19_sole_source_drop(self) -> str:
source_dir = self.fixture_root / "s19-source"
copy_game("alienswarm", source_dir)
source = self.peer("s19-source", games_dir=source_dir)
client = self.peer("s19-client")
connect_many(client, [source])
wait_remote_game(client, "alienswarm", peer_count=1)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-begin", "alienswarm"), timeout=20, description="download begin", waiter=waiter)
source.shutdown()
client.wait_for(event_is("download-failed", "alienswarm"), timeout=90, description="download failed", waiter=waiter)
assert_not_exists(client.host_games_dir / "alienswarm" / "version.ini")
assert_no_active(client, "alienswarm")
assert_local_absent(client, "alienswarm")
return "download-failed emitted; version.ini absent; local ready row absent; active operations empty"
def s20_receiver_write_failure(self) -> str:
source_dir = self.fixture_root / "s20-source"
copy_game("alienswarm", source_dir)
source = self.peer("s20-source", games_dir=source_dir)
client = self.peer("s20-client", tmpfs_size="32m")
connect_many(client, [source])
wait_remote_game(client, "alienswarm", peer_count=1)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-failed", "alienswarm"), timeout=90, description="download failed", waiter=waiter)
client.docker_exec("test", "!", "-e", "/games/alienswarm/version.ini")
assert_no_active(client, "alienswarm")
return "32m tmpfs receiver emitted download-failed; /games/alienswarm/version.ini absent; active operations empty"
def s21_add_game_propagation(self) -> str:
alpha = self.peer("s21-alpha")
bravo_dir = self.fixture_root / "s21-bravo"
bravo_dir.mkdir(parents=True, exist_ok=True)
bravo = self.peer("s21-bravo", games_dir=bravo_dir)
connect_many(alpha, [bravo])
assert len(alpha.list_peers()) == 1
stage_game_drop(bravo_dir, "cod5")
game = wait_remote_game(alpha, "cod5", peer_count=1)
return f"alpha saw {game['id']} from the existing bravo peer with peer_count={game['peer_count']}"
def s22_remove_game_propagation(self) -> str:
alpha = self.peer("s22-alpha")
bravo_dir = self.fixture_root / "s22-bravo"
copy_game("cod5", bravo_dir)
bravo = self.peer("s22-bravo", games_dir=bravo_dir)
connect_many(alpha, [bravo])
wait_remote_game(alpha, "cod5", peer_count=1)
shutil.rmtree(bravo_dir / "cod5")
wait_remote_absent(alpha, "cod5")
peers = alpha.list_peers()
if len(peers) != 1:
raise ScenarioError(f"expected bravo peer to remain, got {peers}")
return "alpha removed cod5 while keeping one bravo peer"
def s23_version_bump_propagation(self) -> str:
alpha = self.peer("s23-alpha")
bravo_dir = self.fixture_root / "s23-bravo"
copy_game("cnc4", bravo_dir, version="20250101")
bravo = self.peer("s23-bravo", games_dir=bravo_dir)
connect_many(alpha, [bravo])
wait_remote_game(alpha, "cnc4", peer_count=1, version="20250101")
(bravo_dir / "cnc4" / "version.ini").write_text("20260501", encoding="utf-8")
wait_remote_game(alpha, "cnc4", peer_count=1, version="20260501")
return "alpha observed cnc4 eti_game_version change 20250101 -> 20260501 without reconnect"
def s24_two_clients_one_source(self) -> str:
source = self.peer("s24-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
c1 = self.peer("s24-client-a")
c2 = self.peer("s24-client-b")
connect_many(c1, [source])
connect_many(c2, [source])
waiter1 = LineWaiter(len(c1.output))
waiter2 = LineWaiter(len(c2.output))
c1.send({"cmd": "download", "game_id": "alienswarm", "install": False})
c2.send({"cmd": "download", "game_id": "alienswarm", "install": False})
c1.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="client-a finish", waiter=waiter1)
c2.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="client-b finish", waiter=waiter2)
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", c1.host_games_dir / "alienswarm")
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", c2.host_games_dir / "alienswarm")
source.status()
return "two concurrent clients finished alienswarm; both diffs matched; source status responded"
def s25_two_downloads_one_client(self) -> str:
source = self.peer("s25-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
client = self.peer("s25-client")
connect_many(client, [source])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "bfbc2", "install": False})
client.send({"cmd": "download", "game_id": "cnctw", "install": False})
client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="bfbc2 finish", waiter=waiter)
client.wait_for(event_is("download-finished", "cnctw"), timeout=60, description="cnctw finish", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2")
diff_game_dirs(FIXTURES / "fixture-bravo" / "cnctw", client.host_games_dir / "cnctw")
return "bfbc2 and cnctw concurrent downloads both finished and diffed cleanly"
def s26_duplicate_download_rejection(self) -> str:
source = self.peer("s26-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
client = self.peer("s26-client")
connect_many(client, [source])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(
lambda item: item.get("type") == "event"
and item.get("event") == "active-operations-changed"
and any(
op.get("game_id") == "alienswarm"
for op in item.get("data", {}).get("active_operations", [])
),
timeout=20,
description="active operation",
waiter=waiter,
)
err = client.send(
{"cmd": "download", "game_id": "alienswarm", "install": False},
expect_error=True,
)
if "operation already in progress" not in err["error"]:
raise ScenarioError(f"unexpected duplicate error: {err}")
client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="first download finish", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", client.host_games_dir / "alienswarm")
return f"second command errored '{err['error']}'; first download diff matched"
def s27_self_connect_rejection(self) -> str:
alpha = self.peer("s27-alpha")
err = alpha.send({"cmd": "connect", "addr": alpha.ready_addr}, expect_error=True)
if "cannot connect peer to itself" not in err["error"]:
raise ScenarioError(f"unexpected self-connect error: {err}")
peers = alpha.list_peers()
if peers:
raise ScenarioError(f"self-connect created peers: {peers}")
alpha.status()
return f"self-connect errored '{err['error']}'; peer list stayed empty"
def s28_address_change_unit(self) -> str:
output = run_just_test()
test_name = "peer_db::tests::address_update_preserves_peer_identity_and_library"
if test_name not in output:
raise ScenarioError(f"S28 unit proof missing from just test output:\n{output}")
return "`just test` passed including address_update_preserves_peer_identity_and_library"
def s29_empty_peer_participates(self) -> str:
source = self.peer("s29-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
empty = self.peer("s29-empty")
observer = self.peer("s29-observer")
connect_many(observer, [empty])
peers = observer.list_peers()
if len(peers) != 1 or peers[0]["game_count"] != 0:
raise ScenarioError(f"expected empty peer with zero games, got {peers}")
connect_many(empty, [source])
wait_remote_game(empty, "alienswarm", peer_count=1)
waiter = LineWaiter(len(empty.output))
empty.send({"cmd": "download", "game_id": "alienswarm", "install": False})
empty.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="empty download finish", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", empty.host_games_dir / "alienswarm")
wait_peer_has_game(observer, empty.peer_id, "alienswarm")
return "observer saw zero-game peer; empty downloaded alienswarm, diff matched, then observer's snapshot for that peer contained alienswarm"
def s30_mesh_aggregation(self) -> str:
dirs = []
specs = [
("s30-a", [("ggoo", "20250101"), ("bf1942", "20250101")]),
("s30-b", [("ggoo", "20250101"), ("cnc4", "20250101")]),
("s30-c", [("cnc4", "20250301"), ("cod5", "20250101")]),
("s30-d", [("cnctw", "20250101"), ("coh", "20250101")]),
("s30-e", [("cnctw", "20250201"), ("bf1942", "20250201")]),
]
peers = []
for name, games in specs:
game_dir = self.fixture_root / name
for game_id, version in games:
copy_game(game_id, game_dir, version=version)
dirs.append(game_dir)
peers.append(self.peer(name, games_dir=game_dir))
client = self.peer("s30-client")
connect_many(client, peers)
expected = {
"ggoo": (2, "20250101"),
"bf1942": (2, "20250201"),
"cnc4": (2, "20250301"),
"cod5": (1, "20250101"),
"cnctw": (2, "20250201"),
"coh": (1, "20250101"),
}
for game_id, (peer_count, version) in expected.items():
wait_remote_game(client, game_id, peer_count=peer_count, version=version)
game_rows = client.list_games()["remote"]
ids = [game["id"] for game in game_rows]
if len(ids) != len(set(ids)):
raise ScenarioError(f"duplicate game rows: {ids}")
if any(peer["peer_id"] == client.peer_id for peer in client.list_peers()):
raise ScenarioError("client listed itself as a peer")
return f"client aggregated {len(expected)} IDs from 5 peers with expected peer_count/latest versions"
def s31_bootstrapped_peer_source(self) -> str:
source = self.peer("s31-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
bootstrap = self.peer("s31-bootstrap")
connect_many(bootstrap, [source])
waiter = LineWaiter(len(bootstrap.output))
bootstrap.send({"cmd": "download", "game_id": "alienswarm", "install": False})
bootstrap.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="bootstrap finish", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", bootstrap.host_games_dir / "alienswarm")
source.kill()
third = self.peer("s31-third")
connect_many(third, [bootstrap])
waiter = LineWaiter(len(third.output))
third.send({"cmd": "download", "game_id": "alienswarm", "install": False})
third.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="third finish", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", third.host_games_dir / "alienswarm")
return "third peer downloaded from bootstrapped client after original source kill; diff matched original"
def s32_reinstall_after_uninstall(self) -> str:
source = self.peer("s32-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
client = self.peer("s32-client")
connect_many(client, [source])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "bfbc2", "install": False})
client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="download finish", waiter=waiter)
client.send({"cmd": "install", "game_id": "bfbc2"})
client.wait_for(event_is("install-finished", "bfbc2"), timeout=30, description="install finish", waiter=waiter)
client.send({"cmd": "uninstall", "game_id": "bfbc2"})
client.wait_for(event_is("uninstall-finished", "bfbc2"), timeout=30, description="uninstall finish", waiter=waiter)
before = len(client.output)
client.send({"cmd": "install", "game_id": "bfbc2"})
reinstall_waiter = LineWaiter(before)
client.wait_for(event_is("install-finished", "bfbc2"), timeout=30, description="reinstall finish", waiter=reinstall_waiter)
recent = client.output[before:]
if any(item.get("event") == "download-chunk-finished" for item in recent):
raise ScenarioError("reinstall produced transfer chunk events")
wait_local_game(client, "bfbc2", downloaded=True, installed=True)
if not (client.host_games_dir / "bfbc2" / "local").is_dir():
raise ScenarioError("local/ was not recreated")
return "reinstall recreated local/, local state installed=true, no transfer events during reinstall"
def s33_install_after_mutation(self) -> str:
source = self.peer("s33-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
client = self.peer("s33-client")
connect_many(client, [source])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "bfbc2", "install": False})
client.wait_for(event_is("download-finished", "bfbc2"), timeout=60, description="download finish", waiter=waiter)
client.docker_exec(
"sh",
"-c",
"printf 'mutated archive bytes\\n' > /games/bfbc2/bfbc2.eti",
)
client.send({"cmd": "install", "game_id": "bfbc2"})
client.wait_for(event_is("install-finished", "bfbc2"), timeout=30, description="install finish", waiter=waiter)
client.docker_exec(
"cmp",
"/games/bfbc2/bfbc2.eti",
"/games/bfbc2/local/fixture-payload.txt",
)
return "fixture installer installed current mutated archive bytes exactly"
def s34_many_small_files(self) -> str:
source_dir = self.fixture_root / "s34-source"
create_many_small_game(source_dir / "bf1942")
source = self.peer("s34-source", games_dir=source_dir)
client = self.peer("s34-client")
connect_many(client, [source])
wait_remote_game(client, "bf1942", peer_count=1)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "bf1942", "install": False})
client.wait_for(event_is("download-finished", "bf1942"), timeout=60, description="download finish", waiter=waiter)
diff_game_dirs(source_dir / "bf1942", client.host_games_dir / "bf1942")
chunks = [
item
for item in client.output
if item.get("type") == "event"
and item.get("event") == "download-chunk-finished"
and item.get("data", {}).get("game_id") == "bf1942"
]
if len(chunks) < 21:
raise ScenarioError(f"expected at least 21 file chunks, got {len(chunks)}")
return f"20 small files plus version.ini transferred; diff matched; chunk events={len(chunks)}"
def s35_unknown_game_filtered(self) -> str:
source = self.peer("s35-source", fixtures=["mystery-game"])
client = self.peer("s35-client")
connect_many(client, [source])
wait_remote_absent(client, "mystery-game")
err = client.send({"cmd": "download", "game_id": "mystery-game", "install": False}, expect_error=True)
if "not in the local catalog" not in err["error"]:
raise ScenarioError(f"unexpected unknown game error: {err}")
assert_not_exists(client.host_games_dir / "mystery-game")
return f"unknown game absent from list-games; download errored '{err['error']}'; no local files"
def s36_latest_singleton(self) -> str:
peers = []
for index in range(5):
game_dir = self.fixture_root / f"s36-{index}"
version = "20260501" if index == 0 else "20250101"
copy_game("cnc4", game_dir, version=version)
peers.append(self.peer(f"s36-{index}", games_dir=game_dir))
client = self.peer("s36-client")
connect_many(client, peers)
wait_remote_game(client, "cnc4", peer_count=5, version="20260501")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "cnc4", "install": False})
got = client.wait_for(event_is("got-game-files", "cnc4"), timeout=20, description="got game files", waiter=waiter)
client.wait_for(event_is("download-finished", "cnc4"), timeout=60, description="download finish", waiter=waiter)
latest_addr = peers[0].ready_addr
if latest_addr is None:
raise ScenarioError("latest peer had no ready addr")
for item in client.output:
if item.get("type") != "event" or item.get("event") != "download-chunk-finished":
continue
data = item["data"]
if data.get("game_id") == "cnc4" and data.get("peer_addr") != latest_addr:
raise ScenarioError(f"stale peer contributed chunk: {data}")
diff_game_dirs(peers[0].host_games_dir / "cnc4", client.host_games_dir / "cnc4")
descs = got["data"]["file_descriptions"]
if not descs:
raise ScenarioError("got-game-files had no descriptors")
return "client reported latest 20260501 with peer_count=5; only singleton latest peer sent chunks; diff matched"
def s37_single_source_download_throughput(self) -> str:
source_dir = self.fixture_root / "s37-source"
create_large_sparse_game(source_dir / PERF_GAME_ID, size=PERF_GAME_SIZE)
source = self.peer("s37-source", games_dir=source_dir)
client = self.peer("s37-client")
connect_many(client, [source])
wait_remote_game(client, PERF_GAME_ID, peer_count=1, version="20260520")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": PERF_GAME_ID, "install": False})
finished = client.wait_for(
event_is("download-finished", PERF_GAME_ID),
timeout=300,
description=f"{PERF_GAME_ID} throughput download",
waiter=waiter,
)
destination_archive = client.host_games_dir / PERF_GAME_ID / f"{PERF_GAME_ID}.eti"
if destination_archive.stat().st_size != PERF_GAME_SIZE:
raise ScenarioError(
f"downloaded archive size mismatch: {destination_archive.stat().st_size} != {PERF_GAME_SIZE}"
)
throughput = finished.get("data", {}).get("throughput")
if not throughput:
raise ScenarioError(f"download-finished did not include throughput: {finished}")
expected_bytes = PERF_GAME_SIZE + len("20260520")
if int(throughput["bytes"]) != expected_bytes:
raise ScenarioError(
f"throughput byte count mismatch: {throughput['bytes']} != {expected_bytes}"
)
return (
f"{PERF_GAME_ID} {format_bytes(PERF_GAME_SIZE)} single-source download: "
f"{throughput['mib_per_s']:.2f} MiB/s, "
f"{throughput['mbit_per_s']:.2f} Mbit/s, "
f"{throughput['duration_ms'] / 1000.0:.3f}s, "
f"{throughput['chunks']} chunks"
)
def run(command: list[str], description: str) -> subprocess.CompletedProcess[str]:
result = subprocess.run(
command,
cwd=REPO,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if result.returncode != 0:
raise ScenarioError(f"{description} failed:\n{result.stdout}")
return result
def run_just_test() -> str:
env = os.environ.copy()
env["RUSTC_WRAPPER"] = ""
result = subprocess.run(
["just", "test"],
cwd=REPO,
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if result.returncode != 0:
raise ScenarioError(f"just test failed:\n{result.stdout}")
return result.stdout
def reset_run_root() -> None:
if not RUN_ROOT.exists():
return
try:
shutil.rmtree(RUN_ROOT)
return
except PermissionError:
pass
subprocess.run(
[
"docker",
"run",
"--rm",
"-v",
f"{RUN_ROOT}:/cleanup",
"--entrypoint",
"/bin/sh",
"debian:bookworm-slim",
"-c",
"rm -rf /cleanup/* /cleanup/.[!.]* /cleanup/..?*",
],
cwd=REPO,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
shutil.rmtree(RUN_ROOT)
def copy_game(game_id: str, destination_games_dir: Path, *, version: str | None = None) -> None:
source = find_fixture_game(game_id)
destination = destination_games_dir / game_id
if destination.exists():
shutil.rmtree(destination)
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(source, destination)
if version is not None:
(destination / "version.ini").write_text(version, encoding="utf-8")
def find_fixture_game(game_id: str) -> Path:
for fixture_dir in FIXTURES.iterdir():
candidate = fixture_dir / game_id
if candidate.exists():
return candidate
raise ScenarioError(f"fixture game not found: {game_id}")
def stage_game_drop(destination_games_dir: Path, game_id: str) -> None:
source = find_fixture_game(game_id)
root = destination_games_dir / game_id
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True)
for child in source.iterdir():
if child.name == "version.ini":
continue
target = root / child.name
if child.is_dir():
shutil.copytree(child, target)
else:
shutil.copy2(child, target)
shutil.copy2(source / "version.ini", root / "version.ini")
def create_many_small_game(root: Path) -> None:
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True)
for index in range(20):
child = root / f"file-{index:02}.bin"
child.write_bytes(hashlib.sha256(f"small-{index}".encode()).digest() * 8)
(root / "version.ini").write_text("20250101", encoding="utf-8")
def create_large_sparse_game(root: Path, *, size: int) -> None:
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True)
(root / "version.ini").write_text("20260520", encoding="utf-8")
archive = root / f"{root.name}.eti"
with archive.open("wb") as handle:
handle.truncate(size)
def format_bytes(size: int) -> str:
return f"{size / 1024 / 1024 / 1024:.2f} GiB"
def connect_many(client: Peer, peers: list[Peer]) -> None:
for peer in peers:
client.connect_to(peer)
client.send({"cmd": "wait-peers", "count": len(peers), "timeout_ms": 15000})
def wait_remote_game(
peer: Peer,
game_id: str,
*,
peer_count: int | None = None,
version: str | None = None,
timeout: float = 20,
) -> dict[str, Any]:
deadline = time.monotonic() + timeout
last_rows: list[dict[str, Any]] = []
while time.monotonic() < deadline:
rows = peer.list_games()["remote"]
last_rows = rows
for row in rows:
if row["id"] != game_id:
continue
if peer_count is not None and row.get("peer_count") != peer_count:
continue
if version is not None and row.get("eti_game_version") != version:
continue
return row
time.sleep(0.4)
raise ScenarioError(
f"{peer.name} never saw remote {game_id} peer_count={peer_count} version={version}; rows={last_rows}"
)
def wait_remote_absent(peer: Peer, game_id: str, timeout: float = 20) -> None:
deadline = time.monotonic() + timeout
last_rows: list[dict[str, Any]] = []
while time.monotonic() < deadline:
rows = peer.list_games()["remote"]
last_rows = rows
if all(row["id"] != game_id for row in rows):
return
time.sleep(0.4)
raise ScenarioError(f"{peer.name} still lists remote {game_id}; rows={last_rows}")
def wait_local_game(
peer: Peer,
game_id: str,
*,
downloaded: bool | None = None,
installed: bool | None = None,
timeout: float = 20,
) -> dict[str, Any]:
deadline = time.monotonic() + timeout
last_rows: list[dict[str, Any]] = []
while time.monotonic() < deadline:
rows = peer.list_games()["local"]
last_rows = rows
for row in rows:
if row["id"] != game_id:
continue
if downloaded is not None and row.get("downloaded") != downloaded:
continue
if installed is not None and row.get("installed") != installed:
continue
return row
time.sleep(0.4)
raise ScenarioError(
f"{peer.name} never reached local {game_id} downloaded={downloaded} installed={installed}; rows={last_rows}"
)
def assert_game_state(
game: dict[str, Any],
*,
downloaded: bool,
installed: bool,
availability: str,
) -> None:
if (
game.get("downloaded") != downloaded
or game.get("installed") != installed
or game.get("availability") != availability
):
raise ScenarioError(
f"unexpected game state for {game.get('id')}: "
f"downloaded={game.get('downloaded')} installed={game.get('installed')} "
f"availability={game.get('availability')}"
)
def wait_peer_has_game(
observer: Peer,
peer_id: str | None,
game_id: str,
timeout: float = 20,
) -> dict[str, Any]:
if peer_id is None:
raise ScenarioError("cannot wait for a peer without peer_id")
deadline = time.monotonic() + timeout
last_peers: list[dict[str, Any]] = []
while time.monotonic() < deadline:
peers = observer.list_peers()
last_peers = peers
for peer in peers:
if peer.get("peer_id") != peer_id:
continue
if any(game.get("id") == game_id for game in peer.get("games", [])):
return peer
time.sleep(0.4)
raise ScenarioError(
f"{observer.name} never saw peer {peer_id} advertise {game_id}; peers={last_peers}"
)
def assert_local_absent(peer: Peer, game_id: str) -> None:
rows = peer.list_games()["local"]
if any(row["id"] == game_id and row.get("downloaded") for row in rows):
raise ScenarioError(f"{peer.name} advertises failed local {game_id}: {rows}")
def assert_no_active(peer: Peer, game_id: str) -> None:
status = peer.status()
active = status["active_operations"]
if any(item["game_id"] == game_id for item in active):
raise ScenarioError(f"{peer.name} still has active operation for {game_id}: {active}")
def assert_not_exists(path: Path) -> None:
if path.exists():
raise ScenarioError(f"expected path to be absent: {path}")
def event_is(event: str, game_id: str | None = None) -> Callable[[dict[str, Any]], bool]:
def predicate(item: dict[str, Any]) -> bool:
if item.get("type") != "event" or item.get("event") != event:
return False
if game_id is None:
return True
return item.get("data", {}).get("game_id") == game_id
return predicate
def assert_no_event(peer: Peer, waiter: LineWaiter, event: str, game_id: str) -> None:
for item in peer.output[waiter.seen :]:
if item.get("type") == "event" and item.get("event") == event:
if item.get("data", {}).get("game_id") == game_id:
raise ScenarioError(f"unexpected {event} for {game_id}: {item}")
def assert_only_chunk_sources(
peer: Peer,
game_id: str,
allowed_sources: set[str | None],
) -> None:
allowed = {source for source in allowed_sources if source is not None}
if not allowed:
raise ScenarioError("no allowed chunk sources supplied")
seen: set[str] = set()
for item in peer.output:
if item.get("type") != "event" or item.get("event") != "download-chunk-finished":
continue
data = item["data"]
if data.get("game_id") != game_id:
continue
source = data.get("peer_addr")
seen.add(source)
if source not in allowed:
raise ScenarioError(f"unexpected chunk source for {game_id}: {data}")
if not seen:
raise ScenarioError(f"no chunk events recorded for {game_id}")
def chunk_totals(peer: Peer, game_id: str, relative_path: str) -> dict[str, int]:
totals: dict[str, int] = {}
for item in peer.output:
if item.get("type") != "event" or item.get("event") != "download-chunk-finished":
continue
data = item["data"]
if data.get("game_id") != game_id or data.get("relative_path") != relative_path:
continue
totals[data["peer_addr"]] = totals.get(data["peer_addr"], 0) + int(data["length"])
return totals
def diff_game_dirs(source: Path, destination: Path) -> None:
source_manifest = manifest(source)
destination_manifest = manifest(destination)
if source_manifest != destination_manifest:
diff = run_diff(source, destination)
raise ScenarioError(
f"manifest mismatch between {source} and {destination}\n{diff}"
)
diff = run_diff(source, destination)
if diff:
raise ScenarioError(f"diff mismatch between {source} and {destination}\n{diff}")
def manifest(root: Path) -> dict[str, str]:
if not root.exists():
raise ScenarioError(f"missing manifest root: {root}")
entries: dict[str, str] = {}
for path in sorted(root.rglob("*")):
if not path.is_file():
continue
rel = path.relative_to(root)
if any(part in IGNORED_DIFF_NAMES for part in rel.parts):
continue
hasher = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
hasher.update(chunk)
entries[str(rel)] = hasher.hexdigest()
return entries
def run_diff(source: Path, destination: Path) -> str:
command = [
"diff",
"-r",
"-x",
".lanspread",
"-x",
".lanspread.json",
"-x",
"local",
str(source),
str(destination),
]
result = subprocess.run(
command,
cwd=REPO,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
return result.stdout if result.returncode else ""
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"scenarios",
nargs="*",
help="Scenario IDs to run, e.g. S18 S20. Defaults to all implemented scenarios.",
)
parser.add_argument(
"--build-image",
action="store_true",
help="Run `just peer-cli-image` before starting scenarios.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
selected = {item.lower() for item in args.scenarios} if args.scenarios else None
try:
Runner(selected=selected, build_image=args.build_image).run()
except ScenarioError as error:
print(f"\nFAILED: {error}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())