test(peer-cli): harden S1-S47 scenario suite against vacuous and flaky checks

An adversarial audit of the headless peer-to-peer scenario suite
(crates/lanspread-peer-cli/scripts/run_extended_scenarios.py, driven by
`just peer-cli-tests`) found assertions that passed even when the behavior
they claim to test was not happening, plus timing races and doc-vs-code
divergences. A full baseline run (S1-S47) passed beforehand, confirming these
were test-quality gaps, not peer regressions; the baseline output itself
exposed the worst offenders -- e.g. S14 chunk totals {128 MiB, 1 MiB} (a
two-chunk file whose "balanced within one chunk" check can never fail) and
S16/S18 serving the whole ~120 MiB alienswarm.eti from a single source, so
fanout and retry were never exercised.

Test-correctness fixes (a broken behavior could previously pass green):
- S18: the "no download-failed" check was dead -- it reused a LineWaiter
  already advanced past download-finished, so it scanned an empty tail.
  Replaced with assert_no_event_since over the whole window. Switched to a
  4*CHUNK_SIZE sparse archive so both peers get chunks; the test now proves the
  download SURVIVES a mid-download source kill (every byte delivered, survivor
  served part, no download-failed, clean diff). Retry-onto-survivor is the
  mechanism but is not asserted: the kill/serve race against `docker rm -f`
  cannot be forced, so asserting an exact split would be flaky.
- S7: the only check was a diff against byte-identical ggoo fixtures, so it was
  source-agnostic. Added assertions that the download committed exactly once,
  every chunk came from the validated two-peer set, both peers served, and no
  chunk was fetched twice.
- S14: enlarged to 4*CHUNK_SIZE so the balance check can fail under a 3+1
  imbalance; asserts an exact 2+2 split summing to the file size.
- S16: inflated the .eti to 2*CHUNK_SIZE so it fans out across both
  catalog-version peers (the stock 120 MiB fixture is a single chunk).
- S37: validate the throughput rate fields (positive, self-consistent
  mbit/mib == 8.388608, mib_per_s == bytes/duration), not just the byte count.
- S35: assert the source actually advertises the unknown game before checking
  it is filtered, so "absent" means "filtered" and not "never sent".
- S15: cross-check each peer's raw advertised eti_version via list-peers; the
  list-games eti_game_version is synthesized from the catalog and can only ever
  equal the asserted value.
- S2: poll for library convergence and verify the bidirectional exchange
  (bravo sees alpha's 3 games, not just alpha seeing bravo's 4).
- S12/S28: require the gating unit test to appear as "<name> ... ok" so an
  #[ignore]d (un-run) test no longer satisfies the check.
- S24/S25: assert the requested install=false final state.
- S34: assert exactly 21 coherent chunks (20 files + version.ini), 21 distinct
  paths, no duplicates, instead of a >= 21 floor.

Flake fixes:
- S19: force-kill the sole source right after download-begin on a 4*CHUNK_SIZE
  file and accept download-failed or download-peers-gone. The old graceful
  shutdown on a single-chunk file could let the transfer finish first, turning
  the expected failure into a download-finished. A chunk may complete before
  the kill lands, but the full transfer cannot, so the failure is deterministic.
- S26: use a large sparse source so the first operation is reliably still
  active when the duplicate request is issued (TOCTOU on active_operations);
  also assert the active operation == "Downloading".
- S11: drop the "listener address must change" assertion -- it tested the OS
  ephemeral-port allocator and could fail spuriously; keep the same-identity /
  no-duplicate invariant.

Coverage and determinism:
- S27: add handshake::tests::inbound_hello_from_self_is_ignored for the
  protocol-level self guard. The CLI scenario only exercises the CLI
  string-compare guard, which short-circuits before any network call, so the
  peer-crate guard had no test.
- find_fixture_game now iterates sorted(FIXTURES) so the ambiguous cnctw
  (fixture-bravo/multi/solid) resolves deterministically to fixture-bravo.

Reviewed and deliberately left as-is (documented in the run log): S20, S21,
S30, S32/S39/S44 absence checks, S42 IP-order precondition, S45.

PEER_CLI_SCENARIOS.md rows S2, S11, S14, S16, S18, S19, S27 are updated to
match the harness, and a dated run-log entry records the audit, the fixes, the
accepted items, and the live-run evidence.

Test Plan:
- `just peer-cli-tests` (rebuilds the image, runs S1-S47 in Docker): baseline
  passed; post-fix passed; a final run on the exact committed code passed
  47/47. Evidence: S14 {268435456, 268435456} balanced 2+2; S16 .eti split
  across B and C {134217728, 134217728}; S18 all 536870912 bytes delivered with
  no download-failed; S19 deterministic download-failed; S37 ~874 MiB/s.
- `just test` (incl. inbound_hello_from_self_is_ignored), `just clippy`
  (-D warnings, all-targets), and `just fmt` all pass.

Refs: PEER_CLI_SCENARIOS.md scenario matrix and 2026-06-21 run-log entry.
This commit was merged in pull request #29.
This commit is contained in:
2026-06-21 10:23:45 +02:00
parent 72fa61da65
commit 2d53848e0c
3 changed files with 388 additions and 71 deletions
@@ -448,12 +448,21 @@ class Runner:
alpha = self.peer("s2-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
bravo = self.peer("s2-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
connect_many(alpha, [bravo])
# Poll for library convergence rather than reading once: wait-peers only
# guarantees peer presence, and the mDNS path can upsert a peer with an
# empty library before its snapshot arrives.
wait_peer_has_game(alpha, bravo.peer_id, "bfbc2")
peers = alpha.list_peers()
if len(peers) != 1 or peers[0]["peer_id"] == alpha.peer_id:
raise ScenarioError(f"bad alpha peers after connect: {peers}")
if peers[0]["game_count"] != 4:
raise ScenarioError(f"expected bravo game_count=4, got {peers}")
return "alpha connected to bravo, saw one non-self peer with four games"
# The handshake is bidirectional: bravo must also record alpha's library
# without a separate connect, proving both directions of the exchange.
bravo_view = wait_peer_has_game(bravo, alpha.peer_id, "alienswarm")
if bravo_view["game_count"] != 3:
raise ScenarioError(f"expected alpha game_count=3 on bravo, got {bravo_view}")
return "alpha<->bravo exchanged libraries: bravo had 4 games on alpha, alpha had 3 on bravo"
def s3_remote_aggregation(self) -> str:
alpha = self.peer("s3-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
@@ -529,7 +538,19 @@ class Runner:
client.send({"cmd": "download", "game_id": "ggoo", "install": False})
client.wait_for(event_is("download-finished", "ggoo"), timeout=60, description="finish ggoo", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-alpha" / "ggoo", client.host_games_dir / "ggoo")
return "shared ggoo downloaded once from duplicate sources and diffed cleanly"
# The fixtures are byte-identical, so the diff alone is source-agnostic.
# Prove the duplicate-source path: download committed exactly once, every
# chunk came from the validated 2-peer set, both peers actually served,
# and nothing was fetched twice.
if count_events(client, "download-finished", "ggoo") != 1:
raise ScenarioError("ggoo did not finish exactly once")
assert_only_chunk_sources(client, "ggoo", {alpha.ready_addr, bravo.ready_addr})
if chunk_sources(client, "ggoo") != {alpha.ready_addr, bravo.ready_addr}:
raise ScenarioError(
f"expected both validated sources to serve ggoo, got {chunk_sources(client, 'ggoo')}"
)
assert_no_duplicate_chunks(client, "ggoo")
return "ggoo downloaded once, served from both validated sources with no duplicate chunks"
def s8_ambiguous_metadata_rejection(self) -> str:
dir_a = self.fixture_root / "s8-a"
@@ -581,13 +602,17 @@ class Runner:
bravo = self.peer("s11-bravo", games_dir=bravo_dir, readonly_games=True)
connect_many(alpha, [bravo])
peers = alpha.list_peers()
# The real invariant is a single peer entry reusing the same identity (no
# duplicate). The listener address is an OS-assigned ephemeral port: it
# almost always differs across restarts, but asserting it MUST change
# tests the kernel's port allocator, not the peer, and can fail spuriously
# if the same port is reused. So we only require the identity to be stable.
if len(peers) != 1:
raise ScenarioError(f"expected one bravo peer after reconnect, got {peers}")
if peers[0]["peer_id"] != first_id:
raise ScenarioError(f"bravo peer id changed: {first_id} -> {peers[0]['peer_id']}")
if peers[0]["addr"] == first_addr:
raise ScenarioError(f"bravo listener address did not change: {first_addr}")
return f"bravo reused peer id {first_id} with new address {peers[0]['addr']}"
changed = "new" if peers[0]["addr"] != first_addr else "same"
return f"bravo reused peer id {first_id} as a single entry at {changed} address {peers[0]['addr']}"
def s12_transfer_serving_gates(self) -> str:
output = run_just_test()
@@ -597,9 +622,9 @@ class Runner:
"file_transfer_dispatch_respects_serve_gates",
"local_relative_paths_are_never_transferable",
]
missing = [name for name in required if name not in output]
missing = [name for name in required if f"{name} ... ok" not in output]
if missing:
raise ScenarioError(f"S12 unit proof missing tests: {missing}")
raise ScenarioError(f"S12 unit proof tests did not run-and-pass: {missing}")
return "just test passed including catalog/sentinel/active/local-path serve gate tests"
def s13_exact_transfer_equality(self) -> str:
@@ -619,28 +644,34 @@ class Runner:
def s14_large_multi_peer_chunking(self) -> str:
game_id = PERF_GAME_ID
source_dir = self.fixture_root / "s14-alpha"
create_large_sparse_game(source_dir / game_id, size=CHUNK_SIZE + 1024 * 1024)
# Four 128 MiB chunks so the balance assertion is meaningful: with two
# peers a fair split is 2+2 chunks (diff 0) and a 3+1 imbalance would
# exceed one CHUNK_SIZE. A 2-chunk file could never trip the check.
file_size = CHUNK_SIZE * 4
create_large_sparse_game(source_dir / game_id, size=file_size)
alpha = self.peer("s14-alpha", games_dir=source_dir)
stage = self.peer("s14-stage")
connect_many(stage, [alpha])
waiter = LineWaiter(len(stage.output))
stage.send({"cmd": "download", "game_id": game_id, "install": False})
stage.wait_for(event_is("download-finished", game_id), timeout=90, description="stage finish", waiter=waiter)
stage.wait_for(event_is("download-finished", game_id), timeout=180, description="stage finish", waiter=waiter)
diff_game_dirs(source_dir / game_id, stage.host_games_dir / game_id)
client = self.peer("s14-client")
connect_many(client, [alpha, stage])
wait_remote_game(client, game_id, peer_count=2, version=PERF_GAME_VERSION)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": game_id, "install": False})
client.wait_for(event_is("download-finished", game_id), timeout=90, description="client finish", waiter=waiter)
client.wait_for(event_is("download-finished", game_id), timeout=180, description="client finish", waiter=waiter)
diff_game_dirs(source_dir / game_id, client.host_games_dir / game_id)
totals = chunk_totals(client, game_id, f"{game_id}/{game_id}.eti")
if len(totals) < 2:
raise ScenarioError(f"expected chunks from two peers, got {totals}")
if len(totals) != 2:
raise ScenarioError(f"expected .eti chunks from exactly two peers, got {totals}")
if sum(totals.values()) != file_size:
raise ScenarioError(f"chunk bytes {sum(totals.values())} != file size {file_size}: {totals}")
values = list(totals.values())
if max(values) - min(values) > CHUNK_SIZE:
raise ScenarioError(f"chunk totals not balanced within one chunk: {totals}")
return f"{game_id} downloaded from two sources, diff matched, chunk totals={totals}"
return f"{game_id} ({file_size // (1024 * 1024)} MiB) split across two sources, balanced, diff matched: {totals}"
def s15_three_way_version_skew(self) -> str:
specs = [
@@ -656,6 +687,15 @@ class Runner:
client = self.peer("s15-client")
connect_many(client, peers)
wait_remote_game(client, "cnc4", peer_count=1, version=CATALOG_VERSIONS["cnc4"])
# Cross-check the RAW advertised versions via list-peers (not the
# catalog-synthesized list-games field), proving the three peers really
# differ and that only the catalog-version peer is aggregated.
for peer, expected_version in zip(peers, ["20150101", "20160101", CATALOG_VERSIONS["cnc4"]]):
advertised = peer_advertised_version(client, peer.peer_id, "cnc4")
if advertised != expected_version:
raise ScenarioError(
f"{peer.name} advertised cnc4 version {advertised}, expected {expected_version}"
)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "cnc4", "install": False})
client.wait_for(event_is("download-finished", "cnc4"), timeout=60, description="cnc4 finish", waiter=waiter)
@@ -673,19 +713,25 @@ class Runner:
for name, version in specs:
game_dir = self.fixture_root / name
copy_game("alienswarm", game_dir, version=version)
# Two 128 MiB chunks so the .eti can actually fan out across the two
# catalog-version peers; the stock 120 MiB fixture is a single chunk
# that can only ever come from one source.
inflate_archive_sparse(game_dir / "alienswarm", "alienswarm", CHUNK_SIZE * 2)
peers.append(self.peer(name, games_dir=game_dir))
client = self.peer("s16-client")
connect_many(client, peers)
wait_remote_game(client, "alienswarm", peer_count=2, version=CATALOG_VERSIONS["alienswarm"])
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="alienswarm finish", waiter=waiter)
client.wait_for(event_is("download-finished", "alienswarm"), timeout=180, description="alienswarm finish", waiter=waiter)
assert_only_chunk_sources(client, "alienswarm", {peers[1].ready_addr, peers[2].ready_addr})
totals = chunk_totals(client, "alienswarm", "alienswarm/alienswarm.eti")
if peers[0].ready_addr in totals:
raise ScenarioError(f"stale peer contributed chunks: {totals}")
if set(totals) != {peers[1].ready_addr, peers[2].ready_addr}:
raise ScenarioError(f"expected .eti to fan out across both B and C, got {totals}")
diff_game_dirs(peers[1].host_games_dir / "alienswarm", client.host_games_dir / "alienswarm")
return f"catalog-version B/C peers served alienswarm while stale A contributed zero; totals={totals}"
return f"catalog-version B/C peers split alienswarm.eti while stale A contributed zero; totals={totals}"
def s17_catalog_conflict_rejection(self) -> str:
specs = [
@@ -711,43 +757,87 @@ class Runner:
return "catalog-version file conflict failed download and left no committed version.ini"
def s18_redundant_source_drop(self) -> str:
game_id = "bf1942"
source_a_dir = self.fixture_root / "s18-a"
source_b_dir = self.fixture_root / "s18-b"
copy_game("alienswarm", source_a_dir)
copy_game("alienswarm", source_b_dir)
# Multi-chunk sparse archive so BOTH peers are assigned .eti chunks; a
# single-chunk file could be served entirely by one peer, so killing the
# other would prove nothing. We verify the download SURVIVES a mid-download
# source kill (every byte still arrives, no download-failed, diff matches).
# Retry-onto-survivor is the mechanism that makes this work and is
# exercised whenever the kill interrupts an unfinished chunk, but the race
# against `docker rm -f` means we cannot deterministically force it, so we
# do not assert it. Sparse zero bytes are identical, so duplicate-source
# majority validation still agrees.
file_size = CHUNK_SIZE * 4
create_large_sparse_game(source_a_dir / game_id, size=file_size)
create_large_sparse_game(source_b_dir / game_id, size=file_size)
source_a = self.peer("s18-a", games_dir=source_a_dir)
source_b = self.peer("s18-b", games_dir=source_b_dir)
client = self.peer("s18-client")
connect_many(client, [source_a, source_b])
wait_remote_game(client, "alienswarm", peer_count=2)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-begin", "alienswarm"), timeout=20, description="download begin", waiter=waiter)
wait_remote_game(client, game_id, peer_count=2)
start = len(client.output)
waiter = LineWaiter(start)
client.send({"cmd": "download", "game_id": game_id, "install": False})
client.wait_for(event_is("download-begin", game_id), timeout=20, description="download begin", waiter=waiter)
source_a.kill()
client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="download finish", waiter=waiter)
assert_no_event(client, waiter, "download-failed", "alienswarm")
diff_game_dirs(source_b_dir / "alienswarm", client.host_games_dir / "alienswarm")
totals = chunk_totals(client, "alienswarm", "alienswarm/alienswarm.eti")
if not totals:
raise ScenarioError("S18 did not record chunk evidence")
return f"source killed after begin; download finished; diff matched; chunk bytes={totals}"
client.wait_for(event_is("download-finished", game_id), timeout=180, description="download finish", waiter=waiter)
# Scan the WHOLE download window (start..) for download-failed. The old
# assert_no_event reused a waiter already advanced past download-finished,
# so it only saw the empty tail and could never fire.
assert_no_event_since(client, start, "download-failed", game_id)
diff_game_dirs(source_b_dir / game_id, client.host_games_dir / game_id)
totals = chunk_totals(client, game_id, f"{game_id}/{game_id}.eti")
if source_b.ready_addr not in totals:
raise ScenarioError(f"surviving source served no .eti chunks: {totals}")
if sum(totals.values()) != file_size:
raise ScenarioError(f"download did not deliver the whole archive ({sum(totals.values())} != {file_size}): {totals}")
# We deliberately do NOT assert the exact per-source split. The killed
# source can serve a chunk or two before `docker rm -f` lands (a fast-LAN
# race), so requiring totals == {survivor: file_size} would be flaky. The
# robust proof of redundancy is that a source died mid-download yet every
# byte still arrived (sum == file_size plus the diff), the survivor served
# part of it, and no download-failed was emitted.
survivor_bytes = totals[source_b.ready_addr]
return (
f"source killed after begin; all {file_size} bytes delivered "
f"({survivor_bytes} from the survivor), no download-failed; diff matched; bytes={totals}"
)
def s19_sole_source_drop(self) -> str:
game_id = "bf1942"
source_dir = self.fixture_root / "s19-source"
copy_game("alienswarm", source_dir)
# Multi-chunk sparse archive force-killed right after download-begin. A
# 120 MiB single-chunk file served by a graceful shutdown could finish
# (~0.15s at LAN speed) before the drop landed, flipping the expected
# failure into a download-finished. With four 128 MiB chunks and a
# forceful kill issued right after download-begin, the source dies with
# the bulk of the transfer still outstanding (an individual chunk may
# complete first, but the full download cannot), so a terminal failure is
# deterministic.
create_large_sparse_game(source_dir / game_id, size=CHUNK_SIZE * 4)
source = self.peer("s19-source", games_dir=source_dir)
client = self.peer("s19-client")
connect_many(client, [source])
wait_remote_game(client, "alienswarm", peer_count=1)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(event_is("download-begin", "alienswarm"), timeout=20, description="download begin", waiter=waiter)
source.shutdown()
client.wait_for(event_is("download-failed", "alienswarm"), timeout=90, description="download failed", waiter=waiter)
assert_not_exists(client.host_games_dir / "alienswarm" / "version.ini")
assert_no_active(client, "alienswarm")
assert_local_absent(client, "alienswarm")
return "download-failed emitted; version.ini absent; local ready row absent; active operations empty"
wait_remote_game(client, game_id, peer_count=1)
start = len(client.output)
waiter = LineWaiter(start)
client.send({"cmd": "download", "game_id": game_id, "install": False})
client.wait_for(event_is("download-begin", game_id), timeout=20, description="download begin", waiter=waiter)
# Forceful kill (no Goodbye) drops the connection mid-transfer.
source.kill()
terminal = client.wait_for(
event_name_in({"download-failed", "download-peers-gone"}, game_id),
timeout=120,
description="sole-source drop terminal failure",
waiter=waiter,
)
assert_no_event_since(client, start, "download-finished", game_id)
assert_not_exists(client.host_games_dir / game_id / "version.ini")
assert_no_active(client, game_id)
assert_local_absent(client, game_id)
return f"sole-source forceful drop mid-transfer -> {terminal['event']}; version.ini absent; no ready row; no active op"
def s20_receiver_write_failure(self) -> str:
source_dir = self.fixture_root / "s20-source"
@@ -813,8 +903,12 @@ class Runner:
c2.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="client-b finish", waiter=waiter2)
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", c1.host_games_dir / "alienswarm")
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", c2.host_games_dir / "alienswarm")
source.status()
return "two concurrent clients finished alienswarm; both diffs matched; source status responded"
wait_local_game(c1, "alienswarm", downloaded=True, installed=False)
wait_local_game(c2, "alienswarm", downloaded=True, installed=False)
# Responsiveness: the source still answers and still advertises its games.
if not any(g["id"] == "alienswarm" for g in source.list_games()["local"]):
raise ScenarioError("source no longer advertises alienswarm after serving two clients")
return "two concurrent clients finished alienswarm install=false; both diffs matched; source still responsive"
def s25_two_downloads_one_client(self) -> str:
source = self.peer("s25-bravo", games_dir=FIXTURES / "fixture-bravo", readonly_games=True)
@@ -827,34 +921,49 @@ class Runner:
client.wait_for(event_is("download-finished", "cnctw"), timeout=60, description="cnctw finish", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-bravo" / "bfbc2", client.host_games_dir / "bfbc2")
diff_game_dirs(FIXTURES / "fixture-bravo" / "cnctw", client.host_games_dir / "cnctw")
return "bfbc2 and cnctw concurrent downloads both finished and diffed cleanly"
wait_local_game(client, "bfbc2", downloaded=True, installed=False)
wait_local_game(client, "cnctw", downloaded=True, installed=False)
return "bfbc2 and cnctw concurrent downloads both finished install=false and diffed cleanly"
def s26_duplicate_download_rejection(self) -> str:
source = self.peer("s26-alpha", games_dir=FIXTURES / "fixture-alpha", readonly_games=True)
game_id = "bf1942"
source_dir = self.fixture_root / "s26-source"
# A large sparse archive keeps the first download active long enough that
# the duplicate request is guaranteed to race against an in-progress
# operation rather than a near-instant 3 MB transfer that may already be
# finished by the time the second command is read.
create_large_sparse_game(source_dir / game_id, size=CHUNK_SIZE * 2)
source = self.peer("s26-source", games_dir=source_dir)
client = self.peer("s26-client")
connect_many(client, [source])
wait_remote_game(client, game_id, peer_count=1)
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": "alienswarm", "install": False})
client.wait_for(
client.send({"cmd": "download", "game_id": game_id, "install": False})
active = client.wait_for(
lambda item: item.get("type") == "event"
and item.get("event") == "active-operations-changed"
and any(
op.get("game_id") == "alienswarm"
op.get("game_id") == game_id
for op in item.get("data", {}).get("active_operations", [])
),
timeout=20,
description="active operation",
waiter=waiter,
)
# Verify the operation kind, not just its presence (the only scenario that
# inspects the active_operations 'operation' field).
op = next(o for o in active["data"]["active_operations"] if o.get("game_id") == game_id)
if op.get("operation") != "Downloading":
raise ScenarioError(f"expected Downloading active operation, got {op}")
err = client.send(
{"cmd": "download", "game_id": "alienswarm", "install": False},
{"cmd": "download", "game_id": game_id, "install": False},
expect_error=True,
)
if "operation already in progress" not in err["error"]:
raise ScenarioError(f"unexpected duplicate error: {err}")
client.wait_for(event_is("download-finished", "alienswarm"), timeout=90, description="first download finish", waiter=waiter)
diff_game_dirs(FIXTURES / "fixture-alpha" / "alienswarm", client.host_games_dir / "alienswarm")
return f"second command errored '{err['error']}'; first download diff matched"
client.wait_for(event_is("download-finished", game_id), timeout=180, description="first download finish", waiter=waiter)
diff_game_dirs(source_dir / game_id, client.host_games_dir / game_id)
return f"duplicate rejected while Downloading active ('{err['error']}'); first download diff matched"
def s27_self_connect_rejection(self) -> str:
alpha = self.peer("s27-alpha")
@@ -870,8 +979,8 @@ class Runner:
def s28_address_change_unit(self) -> str:
output = run_just_test()
test_name = "peer_db::tests::address_update_preserves_peer_identity_and_library"
if test_name not in output:
raise ScenarioError(f"S28 unit proof missing from just test output:\n{output}")
if f"{test_name} ... ok" not in output:
raise ScenarioError(f"S28 unit proof did not run-and-pass in just test output:\n{output}")
return "`just test` passed including address_update_preserves_peer_identity_and_library"
def s29_empty_peer_participates(self) -> str:
@@ -1006,14 +1115,24 @@ class Runner:
and item.get("event") == "download-chunk-finished"
and item.get("data", {}).get("game_id") == "bf1942"
]
if len(chunks) < 21:
raise ScenarioError(f"expected at least 21 file chunks, got {len(chunks)}")
return f"20 small files plus version.ini transferred; diff matched; chunk events={len(chunks)}"
# 20 small files + version.ini, each a single coherent chunk: exactly 21
# chunk events, 21 distinct relative paths, no splits and no duplicates.
if len(chunks) != 21:
raise ScenarioError(f"expected exactly 21 file chunks (20 files + version.ini), got {len(chunks)}")
assert_no_duplicate_chunks(client, "bf1942")
distinct_paths = {item.get("data", {}).get("relative_path") for item in chunks}
if len(distinct_paths) != 21:
raise ScenarioError(f"expected 21 distinct file paths, got {len(distinct_paths)}: {sorted(distinct_paths)}")
return f"20 small files plus version.ini each transferred as one coherent chunk; diff matched; chunk events={len(chunks)}"
def s35_unknown_game_filtered(self) -> str:
source = self.peer("s35-source", fixtures=["mystery-game"])
client = self.peer("s35-client")
connect_many(client, [source])
# Establish the premise: the source really does advertise mystery-game in
# its raw library snapshot. Without this, wait_remote_absent could pass
# vacuously ("absent because never sent" vs "absent because filtered").
wait_peer_has_game(client, source.peer_id, "mystery-game")
wait_remote_absent(client, "mystery-game")
err = client.send({"cmd": "download", "game_id": "mystery-game", "install": False}, expect_error=True)
if "not in the local catalog" not in err["error"]:
@@ -1081,6 +1200,26 @@ class Runner:
f"throughput byte count mismatch: {throughput['bytes']} != {expected_bytes}"
)
# The byte count alone never exercises the rate math. Validate the rate
# fields are positive and mutually consistent so a units/divisor bug
# (MiB vs MB, off-by-1000, zero duration) cannot slip through.
if throughput["duration_ms"] <= 0:
raise ScenarioError(f"throughput duration_ms not positive: {throughput}")
if throughput["mib_per_s"] <= 0 or throughput["mbit_per_s"] <= 0:
raise ScenarioError(f"throughput rates not positive: {throughput}")
derived_mib = (int(throughput["bytes"]) / 1_048_576) / (throughput["duration_ms"] / 1000.0)
if abs(derived_mib - throughput["mib_per_s"]) > max(1.0, derived_mib * 0.02):
raise ScenarioError(
f"mib_per_s {throughput['mib_per_s']} disagrees with bytes/duration {derived_mib}"
)
# mbit_per_s = bytes*8/s/1e6; mib_per_s = bytes/s/1048576 -> ratio is fixed.
ratio = throughput["mbit_per_s"] / throughput["mib_per_s"]
expected_ratio = 1_048_576 * 8 / 1_000_000 # 8.388608
if abs(ratio - expected_ratio) > 0.01:
raise ScenarioError(
f"mbit/mib ratio {ratio} != expected {expected_ratio}: {throughput}"
)
return (
f"{PERF_GAME_ID} {format_bytes(PERF_GAME_SIZE)} single-source download: "
f"{throughput['mib_per_s']:.2f} MiB/s, "
@@ -1692,7 +1831,10 @@ def copy_game(game_id: str, destination_games_dir: Path, *, version: str | None
def find_fixture_game(game_id: str) -> Path:
for fixture_dir in FIXTURES.iterdir():
# Sorted so resolution is deterministic: several fixtures define the same
# game id (e.g. cnctw exists under fixture-bravo, fixture-multi and
# fixture-solid), and downstream assertions expect the fixture-bravo layout.
for fixture_dir in sorted(FIXTURES.iterdir()):
candidate = fixture_dir / game_id
if candidate.exists():
return candidate
@@ -1726,16 +1868,27 @@ def create_many_small_game(root: Path) -> None:
(root / "version.ini").write_text(CATALOG_VERSIONS.get(root.name, "20250101"), encoding="utf-8")
def create_large_sparse_game(root: Path, *, size: int) -> None:
def create_large_sparse_game(root: Path, *, size: int, version: str | None = None) -> None:
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True)
(root / "version.ini").write_text(PERF_GAME_VERSION, encoding="utf-8")
resolved = version if version is not None else CATALOG_VERSIONS.get(root.name, PERF_GAME_VERSION)
(root / "version.ini").write_text(resolved, encoding="utf-8")
archive = root / f"{root.name}.eti"
with archive.open("wb") as handle:
handle.truncate(size)
def inflate_archive_sparse(game_root: Path, game_id: str, size: int) -> None:
"""Replace a copied game's `.eti` with a sparse file of `size` bytes so the
archive spans multiple 128 MiB download chunks. Sparse zero bytes are
identical across copies, so duplicate-source majority validation still
agrees and a `diff` against any catalog-version source still matches."""
archive = game_root / f"{game_id}.eti"
with archive.open("wb") as handle:
handle.truncate(size)
def sha256_file(path: Path) -> str:
hasher = hashlib.sha256()
with path.open("rb") as handle:
@@ -1977,13 +2130,6 @@ def event_name_in(events: set[str], game_id: str | None = None) -> Callable[[dic
return predicate
def assert_no_event(peer: Peer, waiter: LineWaiter, event: str, game_id: str) -> None:
for item in peer.output[waiter.seen :]:
if item.get("type") == "event" and item.get("event") == event:
if item.get("data", {}).get("game_id") == game_id:
raise ScenarioError(f"unexpected {event} for {game_id}: {item}")
def assert_no_event_since(peer: Peer, start: int, event: str, game_id: str) -> None:
for item in peer.output[start:]:
if item.get("type") == "event" and item.get("event") == event:
@@ -2038,6 +2184,61 @@ def chunk_totals(peer: Peer, game_id: str, relative_path: str) -> dict[str, int]
return totals
def chunk_sources(peer: Peer, game_id: str) -> set[str]:
return {
item["data"]["peer_addr"]
for item in peer.output
if item.get("type") == "event"
and item.get("event") == "download-chunk-finished"
and item.get("data", {}).get("game_id") == game_id
}
def assert_no_duplicate_chunks(peer: Peer, game_id: str) -> None:
seen: set[tuple[str | None, int]] = set()
for item in peer.output:
if item.get("type") != "event" or item.get("event") != "download-chunk-finished":
continue
data = item["data"]
if data.get("game_id") != game_id:
continue
key = (data.get("relative_path"), int(data.get("offset", 0)))
if key in seen:
raise ScenarioError(f"{peer.name} downloaded a duplicate chunk for {game_id}: {key}")
seen.add(key)
def count_events(peer: Peer, event: str, game_id: str) -> int:
return sum(
1
for item in peer.output
if item.get("type") == "event"
and item.get("event") == event
and item.get("data", {}).get("game_id") == game_id
)
def peer_advertised_version(
observer: Peer, peer_id: str | None, game_id: str, timeout: float = 20
) -> str | None:
"""Returns the raw `eti_version` a peer advertises for a game in its library
snapshot (list-peers). Unlike the list-games `remote` rows, this value is NOT
synthesized from the local catalog, so it faithfully reports the source.
Polls until the peer's library snapshot is observed."""
if peer_id is None:
raise ScenarioError("cannot read advertised version without peer_id")
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
for peer in observer.list_peers():
if peer.get("peer_id") != peer_id:
continue
for game in peer.get("games", []):
if game.get("id") == game_id:
return game.get("eti_version")
time.sleep(0.4)
raise ScenarioError(f"{observer.name} does not see {game_id} advertised by {peer_id}")
def diff_game_dirs(source: Path, destination: Path) -> None:
source_manifest = manifest(source)
destination_manifest = manifest(destination)