a8edcd7450
Merge the S18-S36 scenario ideas into the official peer-cli scenario matrix and add a Docker-backed runner that now exercises S1-S36 with concrete file proofs. The runner creates temporary fixtures under .lanspread-peer-cli, drives JSONL peer containers, checks transferred roots with diff and SHA-256 manifests, and covers startup, discovery, transfer, failure, mutation, concurrency, mesh, lifecycle, and catalog edge cases. The scenarios exposed a few harness/runtime boundary gaps that would otherwise make the contract ambiguous. The peer CLI now rejects self-connects, rejects commands for game IDs outside the receiver catalog, filters unknown remote games from its command/event surface, and reports duplicate active same-game commands as operation-in-progress errors. The peer core also refuses non-catalog download commands before transfer, and PeerGameDB has a unit check that address changes preserve identity and library state. S12 and S28 remain unit-level invariants because the CLI cannot stably race raw serve-gate requests or rebind a live listener without restart. The runner treats those scenarios as covered by just test and checks the expected unit test names appear in the output. Test Plan: - just fmt - python3 -m py_compile crates/lanspread-peer-cli/scripts/run_extended_scenarios.py - RUSTC_WRAPPER= just test - RUSTC_WRAPPER= just clippy - RUSTC_WRAPPER= just peer-cli-build - just peer-cli-image - python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py - git diff --check Refs: PEER_CLI_SCENARIOS.md S1-S36
1032 lines
35 KiB
Rust
1032 lines
35 KiB
Rust
//! Peer database and consensus validation for tracking remote peers and their games.
|
|
|
|
use std::{
|
|
cmp::Reverse,
|
|
collections::HashMap,
|
|
net::SocketAddr,
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
use lanspread_db::db::{Availability, Game, GameFileDescription};
|
|
use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot};
|
|
|
|
use crate::library::compute_library_digest;
|
|
pub type PeerId = String;
|
|
|
|
/// Information about a discovered peer.
|
|
#[derive(Clone, Debug)]
|
|
pub struct PeerInfo {
|
|
/// Stable peer identifier.
|
|
pub peer_id: PeerId,
|
|
/// Network address of the peer.
|
|
pub addr: SocketAddr,
|
|
/// Last time we heard from this peer.
|
|
pub last_seen: Instant,
|
|
/// Latest library revision advertised by the peer.
|
|
pub library_rev: u64,
|
|
/// Digest of the peer library state.
|
|
pub library_digest: u64,
|
|
/// Capability flags advertised by the peer.
|
|
pub features: Vec<String>,
|
|
/// Games this peer has available, keyed by game ID.
|
|
pub games: HashMap<String, GameSummary>,
|
|
/// File descriptions for each game, keyed by game ID.
|
|
pub files: HashMap<String, Vec<GameFileDescription>>,
|
|
}
|
|
|
|
/// Immutable peer state suitable for CLI assertions and tests.
|
|
#[derive(Clone, Debug)]
|
|
pub struct PeerSnapshot {
|
|
pub peer_id: PeerId,
|
|
pub addr: SocketAddr,
|
|
pub library_rev: u64,
|
|
pub library_digest: u64,
|
|
pub features: Vec<String>,
|
|
pub game_count: usize,
|
|
pub games: Vec<GameSummary>,
|
|
}
|
|
|
|
/// Database tracking all discovered peers and their games.
|
|
#[derive(Debug)]
|
|
pub struct PeerGameDB {
|
|
peers: HashMap<PeerId, PeerInfo>,
|
|
addr_index: HashMap<SocketAddr, PeerId>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy)]
|
|
pub struct PeerUpsert {
|
|
pub is_new: bool,
|
|
pub addr_changed: bool,
|
|
}
|
|
|
|
impl Default for PeerGameDB {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl PeerGameDB {
|
|
#[must_use]
|
|
pub fn new() -> Self {
|
|
Self {
|
|
peers: HashMap::new(),
|
|
addr_index: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
/// Adds a new peer to the database or updates its address.
|
|
pub fn upsert_peer(&mut self, peer_id: PeerId, addr: SocketAddr) -> PeerUpsert {
|
|
if let Some(existing_id) = self.addr_index.get(&addr).cloned()
|
|
&& existing_id != peer_id
|
|
{
|
|
self.peers.remove(&existing_id);
|
|
self.addr_index.remove(&addr);
|
|
}
|
|
|
|
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
|
let addr_changed = peer.addr != addr;
|
|
if addr_changed {
|
|
self.addr_index.remove(&peer.addr);
|
|
self.addr_index.insert(addr, peer_id.clone());
|
|
peer.addr = addr;
|
|
}
|
|
peer.last_seen = Instant::now();
|
|
return PeerUpsert {
|
|
is_new: false,
|
|
addr_changed,
|
|
};
|
|
}
|
|
|
|
let peer_info = PeerInfo {
|
|
peer_id: peer_id.clone(),
|
|
addr,
|
|
last_seen: Instant::now(),
|
|
library_rev: 0,
|
|
library_digest: 0,
|
|
features: Vec::new(),
|
|
games: HashMap::new(),
|
|
files: HashMap::new(),
|
|
};
|
|
self.peers.insert(peer_id.clone(), peer_info);
|
|
self.addr_index.insert(addr, peer_id);
|
|
log::info!("Added peer: {addr}");
|
|
PeerUpsert {
|
|
is_new: true,
|
|
addr_changed: false,
|
|
}
|
|
}
|
|
|
|
/// Removes a peer from the database by id.
|
|
pub fn remove_peer(&mut self, peer_id: &PeerId) -> Option<PeerInfo> {
|
|
if let Some(peer) = self.peers.remove(peer_id) {
|
|
self.addr_index.remove(&peer.addr);
|
|
return Some(peer);
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Removes a peer by address.
|
|
pub fn remove_peer_by_addr(&mut self, addr: &SocketAddr) -> Option<PeerInfo> {
|
|
let peer_id = self.addr_index.remove(addr)?;
|
|
self.peers.remove(&peer_id)
|
|
}
|
|
|
|
/// Returns the peer id for an address if known.
|
|
#[must_use]
|
|
pub fn peer_id_for_addr(&self, addr: &SocketAddr) -> Option<&PeerId> {
|
|
self.addr_index.get(addr)
|
|
}
|
|
|
|
/// Returns the peer id for a transport source address.
|
|
///
|
|
/// QUIC clients connect from ephemeral source ports, while peer records are
|
|
/// keyed by their advertised listening address. If the exact socket address
|
|
/// is unknown, fall back to a unique peer with the same IP address.
|
|
#[must_use]
|
|
pub fn peer_id_for_transport_addr(&self, addr: &SocketAddr) -> Option<PeerId> {
|
|
if let Some(peer_id) = self.addr_index.get(addr) {
|
|
return Some(peer_id.clone());
|
|
}
|
|
|
|
let mut matches = self
|
|
.peers
|
|
.values()
|
|
.filter(|peer| peer.addr.ip() == addr.ip())
|
|
.map(|peer| peer.peer_id.clone());
|
|
let peer_id = matches.next()?;
|
|
if matches.next().is_some() {
|
|
return None;
|
|
}
|
|
|
|
Some(peer_id)
|
|
}
|
|
|
|
/// Returns the library state for a peer if known.
|
|
#[must_use]
|
|
pub fn peer_library_state(&self, peer_id: &PeerId) -> Option<(u64, u64)> {
|
|
self.peers
|
|
.get(peer_id)
|
|
.map(|peer| (peer.library_rev, peer.library_digest))
|
|
}
|
|
|
|
/// Returns the number of games known for a peer.
|
|
#[must_use]
|
|
pub fn peer_game_count(&self, peer_id: &PeerId) -> usize {
|
|
self.peers.get(peer_id).map_or(0, |peer| peer.games.len())
|
|
}
|
|
|
|
/// Returns the feature list for a peer.
|
|
#[must_use]
|
|
pub fn peer_features(&self, peer_id: &PeerId) -> Vec<String> {
|
|
self.peers
|
|
.get(peer_id)
|
|
.map(|peer| peer.features.clone())
|
|
.unwrap_or_default()
|
|
}
|
|
|
|
/// Returns the address for a peer id.
|
|
#[must_use]
|
|
pub fn peer_addr(&self, peer_id: &PeerId) -> Option<SocketAddr> {
|
|
self.peers.get(peer_id).map(|peer| peer.addr)
|
|
}
|
|
|
|
/// Updates the games list for a peer.
|
|
pub fn update_peer_games(&mut self, peer_id: &PeerId, games: Vec<GameSummary>) {
|
|
if let Some(peer) = self.peers.get_mut(peer_id) {
|
|
let mut map = HashMap::with_capacity(games.len());
|
|
for game in games {
|
|
map.insert(game.id.clone(), game);
|
|
}
|
|
peer.games = map;
|
|
peer.last_seen = Instant::now();
|
|
log::info!("Updated games for peer: {}", peer.addr);
|
|
}
|
|
}
|
|
|
|
/// Updates the file descriptions for a specific game from a peer.
|
|
pub fn update_peer_game_files(
|
|
&mut self,
|
|
peer_id: &PeerId,
|
|
game_id: &str,
|
|
files: Vec<GameFileDescription>,
|
|
) {
|
|
if let Some(peer) = self.peers.get_mut(peer_id) {
|
|
peer.files.insert(game_id.to_string(), files);
|
|
peer.last_seen = Instant::now();
|
|
}
|
|
}
|
|
|
|
/// Updates the last seen timestamp for a peer.
|
|
pub fn update_last_seen(&mut self, peer_id: &PeerId) {
|
|
if let Some(peer) = self.peers.get_mut(peer_id) {
|
|
peer.last_seen = Instant::now();
|
|
}
|
|
}
|
|
|
|
/// Updates the last seen timestamp for a peer by address.
|
|
pub fn update_last_seen_by_addr(&mut self, addr: &SocketAddr) {
|
|
if let Some(peer_id) = self.peer_id_for_transport_addr(addr)
|
|
&& let Some(peer) = self.peers.get_mut(&peer_id)
|
|
{
|
|
peer.last_seen = Instant::now();
|
|
}
|
|
}
|
|
|
|
/// Updates the library metadata for a peer.
|
|
pub fn update_peer_library(
|
|
&mut self,
|
|
peer_id: &PeerId,
|
|
library_rev: u64,
|
|
library_digest: u64,
|
|
features: Vec<String>,
|
|
) {
|
|
if let Some(peer) = self.peers.get_mut(peer_id) {
|
|
peer.library_rev = library_rev;
|
|
peer.library_digest = library_digest;
|
|
peer.features = features;
|
|
peer.last_seen = Instant::now();
|
|
}
|
|
}
|
|
|
|
/// Updates the advertised feature list for a peer.
|
|
pub fn update_peer_features(&mut self, peer_id: &PeerId, features: Vec<String>) {
|
|
if let Some(peer) = self.peers.get_mut(peer_id) {
|
|
peer.features = features;
|
|
peer.last_seen = Instant::now();
|
|
}
|
|
}
|
|
|
|
/// Applies a full library snapshot for a peer.
|
|
pub fn apply_library_snapshot(&mut self, peer_id: &PeerId, snapshot: LibrarySnapshot) {
|
|
if let Some(peer) = self.peers.get_mut(peer_id) {
|
|
let mut map = HashMap::with_capacity(snapshot.games.len());
|
|
for game in snapshot.games {
|
|
map.insert(game.id.clone(), game);
|
|
}
|
|
let digest = compute_library_digest(&map);
|
|
peer.games = map;
|
|
peer.library_rev = snapshot.library_rev;
|
|
peer.library_digest = digest;
|
|
peer.last_seen = Instant::now();
|
|
}
|
|
}
|
|
|
|
/// Applies a library delta for a peer. Returns true when applied.
|
|
pub fn apply_library_delta(&mut self, peer_id: &PeerId, delta: LibraryDelta) -> bool {
|
|
let Some(peer) = self.peers.get_mut(peer_id) else {
|
|
return false;
|
|
};
|
|
|
|
if delta.to_rev <= peer.library_rev {
|
|
return false;
|
|
}
|
|
|
|
if delta.from_rev != peer.library_rev {
|
|
return false;
|
|
}
|
|
|
|
for game in delta.added {
|
|
peer.games.insert(game.id.clone(), game);
|
|
}
|
|
for game in delta.updated {
|
|
peer.games.insert(game.id.clone(), game);
|
|
}
|
|
for game_id in delta.removed {
|
|
peer.games.remove(&game_id);
|
|
}
|
|
|
|
peer.library_rev = delta.to_rev;
|
|
peer.library_digest = compute_library_digest(&peer.games);
|
|
peer.last_seen = Instant::now();
|
|
true
|
|
}
|
|
|
|
/// Returns all games aggregated from all peers.
|
|
#[must_use]
|
|
pub fn get_all_games(&self) -> Vec<Game> {
|
|
let mut aggregated: HashMap<String, Game> = HashMap::new();
|
|
let mut peer_counts: HashMap<String, u32> = HashMap::new();
|
|
|
|
// Count peers per game
|
|
for peer in self.peers.values() {
|
|
for game in peer.games.values().filter(|game| game_is_ready(game)) {
|
|
*peer_counts.entry(game.id.clone()).or_insert(0) += 1;
|
|
}
|
|
}
|
|
|
|
// Aggregate games with peer counts
|
|
for peer in self.peers.values() {
|
|
for game in peer.games.values() {
|
|
aggregated
|
|
.entry(game.id.clone())
|
|
.and_modify(|existing| {
|
|
if game_is_ready(game) {
|
|
if let (Some(new_version), Some(current)) =
|
|
(&game.eti_version, &existing.eti_game_version)
|
|
{
|
|
if new_version > current {
|
|
existing.eti_game_version = Some(new_version.clone());
|
|
}
|
|
} else if existing.eti_game_version.is_none() {
|
|
existing.eti_game_version.clone_from(&game.eti_version);
|
|
}
|
|
}
|
|
existing.peer_count = *peer_counts.get(&game.id).unwrap_or(&0);
|
|
if game.size > existing.size {
|
|
existing.size = game.size;
|
|
}
|
|
if game_is_ready(game) {
|
|
existing.set_downloaded(true);
|
|
} else if !existing.downloaded {
|
|
existing.availability = game.availability.clone();
|
|
}
|
|
if game.installed {
|
|
existing.installed = true;
|
|
}
|
|
})
|
|
.or_insert_with(|| {
|
|
let mut game_clone = summary_to_game(game);
|
|
game_clone.peer_count = *peer_counts.get(&game.id).unwrap_or(&0);
|
|
game_clone
|
|
});
|
|
}
|
|
}
|
|
|
|
let mut games: Vec<Game> = aggregated.into_values().collect();
|
|
games.sort_by(|a, b| a.name.cmp(&b.name));
|
|
games
|
|
}
|
|
|
|
/// Returns the latest version of a game across all peers.
|
|
#[must_use]
|
|
pub fn get_latest_version_for_game(&self, game_id: &str) -> Option<String> {
|
|
let mut latest_version: Option<String> = None;
|
|
|
|
for peer in self.peers.values() {
|
|
if let Some(game) = peer.games.get(game_id)
|
|
&& game_is_ready(game)
|
|
&& let Some(ref version) = game.eti_version
|
|
{
|
|
match &latest_version {
|
|
None => latest_version = Some(version.clone()),
|
|
Some(current_latest) => {
|
|
if version > current_latest {
|
|
latest_version = Some(version.clone());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
latest_version
|
|
}
|
|
|
|
/// Returns all peer addresses.
|
|
#[must_use]
|
|
pub fn get_peer_addresses(&self) -> Vec<SocketAddr> {
|
|
self.peers.values().map(|peer| peer.addr).collect()
|
|
}
|
|
|
|
/// Returns peer liveness info for ping scheduling.
|
|
#[must_use]
|
|
pub fn peer_liveness_snapshot(&self) -> Vec<(PeerId, SocketAddr, Instant)> {
|
|
self.peers
|
|
.values()
|
|
.map(|peer| (peer.peer_id.clone(), peer.addr, peer.last_seen))
|
|
.collect()
|
|
}
|
|
|
|
/// Returns peer ids with their current addresses.
|
|
#[must_use]
|
|
pub fn peer_identities(&self) -> Vec<(PeerId, SocketAddr)> {
|
|
self.peers
|
|
.values()
|
|
.map(|peer| (peer.peer_id.clone(), peer.addr))
|
|
.collect()
|
|
}
|
|
|
|
/// Returns immutable snapshots for all known peers.
|
|
#[must_use]
|
|
pub fn peer_snapshots(&self) -> Vec<PeerSnapshot> {
|
|
let mut peers = self
|
|
.peers
|
|
.values()
|
|
.map(|peer| {
|
|
let mut games = peer.games.values().cloned().collect::<Vec<_>>();
|
|
games.sort_by(|a, b| a.id.cmp(&b.id));
|
|
PeerSnapshot {
|
|
peer_id: peer.peer_id.clone(),
|
|
addr: peer.addr,
|
|
library_rev: peer.library_rev,
|
|
library_digest: peer.library_digest,
|
|
features: peer.features.clone(),
|
|
game_count: games.len(),
|
|
games,
|
|
}
|
|
})
|
|
.collect::<Vec<_>>();
|
|
peers.sort_by(|a, b| a.peer_id.cmp(&b.peer_id));
|
|
peers
|
|
}
|
|
|
|
/// Checks if a peer is in the database.
|
|
#[must_use]
|
|
pub fn contains_peer(&self, peer_id: &PeerId) -> bool {
|
|
self.peers.contains_key(peer_id)
|
|
}
|
|
|
|
/// Checks if a peer address is in the database.
|
|
#[must_use]
|
|
pub fn contains_peer_addr(&self, addr: &SocketAddr) -> bool {
|
|
self.addr_index.contains_key(addr)
|
|
}
|
|
|
|
/// Returns addresses of peers that have a specific game.
|
|
#[must_use]
|
|
pub fn peers_with_game(&self, game_id: &str) -> Vec<SocketAddr> {
|
|
self.peers
|
|
.iter()
|
|
.filter(|(_, peer)| peer.games.get(game_id).is_some_and(game_is_ready))
|
|
.map(|(_, peer)| peer.addr)
|
|
.collect()
|
|
}
|
|
|
|
/// Returns addresses of peers that have the latest version of a game.
|
|
#[must_use]
|
|
pub fn peers_with_latest_version(&self, game_id: &str) -> Vec<SocketAddr> {
|
|
let latest_version = self.get_latest_version_for_game(game_id);
|
|
|
|
if let Some(ref latest) = latest_version {
|
|
self.peers
|
|
.iter()
|
|
.filter(|(_, peer)| {
|
|
if let Some(game) = peer.games.get(game_id) {
|
|
if game_is_ready(game)
|
|
&& let Some(ref version) = game.eti_version
|
|
{
|
|
version == latest
|
|
} else {
|
|
false
|
|
}
|
|
} else {
|
|
false
|
|
}
|
|
})
|
|
.map(|(_, peer)| peer.addr)
|
|
.collect()
|
|
} else {
|
|
// If no version info is available, fall back to all peers with the game
|
|
self.peers_with_game(game_id)
|
|
}
|
|
}
|
|
|
|
/// Returns file descriptions for a game from all peers.
|
|
#[must_use]
|
|
pub fn game_files_for(&self, game_id: &str) -> Vec<(SocketAddr, Vec<GameFileDescription>)> {
|
|
self.peers
|
|
.values()
|
|
.filter_map(|peer| {
|
|
if !peer.games.get(game_id).is_some_and(game_is_ready) {
|
|
return None;
|
|
}
|
|
peer.files
|
|
.get(game_id)
|
|
.cloned()
|
|
.map(|files| (peer.addr, files))
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Returns file descriptions from peers that advertise the latest game version.
|
|
#[must_use]
|
|
pub fn latest_game_files_for(
|
|
&self,
|
|
game_id: &str,
|
|
) -> Vec<(SocketAddr, Vec<GameFileDescription>)> {
|
|
let latest_peers = self.peers_with_latest_version(game_id);
|
|
if latest_peers.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
|
|
self.game_files_for(game_id)
|
|
.into_iter()
|
|
.filter(|(addr, _)| latest_peers.contains(addr))
|
|
.collect()
|
|
}
|
|
|
|
/// Returns aggregated file descriptions for a game across all peers.
|
|
#[must_use]
|
|
pub fn aggregated_game_files(&self, game_id: &str) -> Vec<GameFileDescription> {
|
|
let mut seen: HashMap<String, GameFileDescription> = HashMap::new();
|
|
for (_, files) in self.latest_game_files_for(game_id) {
|
|
for file in files {
|
|
seen.entry(file.relative_path.clone()).or_insert(file);
|
|
}
|
|
}
|
|
seen.into_values().collect()
|
|
}
|
|
|
|
/// Returns the majority-agreed size for a game.
|
|
#[must_use]
|
|
pub fn majority_game_size(&self, game_id: &str) -> Option<u64> {
|
|
let mut size_counts: HashMap<u64, usize> = HashMap::new();
|
|
|
|
for peer in self.peers.values() {
|
|
if let Some(game) = peer.games.get(game_id) {
|
|
if !game_is_ready(game) {
|
|
continue;
|
|
}
|
|
if game.size == 0 {
|
|
continue;
|
|
}
|
|
*size_counts.entry(game.size).or_insert(0) += 1;
|
|
}
|
|
}
|
|
|
|
size_counts
|
|
.into_iter()
|
|
.max_by(|(size_a, count_a), (size_b, count_b)| {
|
|
count_a.cmp(count_b).then_with(|| size_a.cmp(size_b))
|
|
})
|
|
.map(|(size, _)| size)
|
|
}
|
|
|
|
/// Validates file sizes across all peers and returns only the files with majority consensus.
|
|
///
|
|
/// Returns a tuple of (`validated_files`, `peer_whitelist`, `file_peer_map`) where
|
|
/// `peer_whitelist` contains peers that have at least one majority-approved file and
|
|
/// `file_peer_map` lists which peers were validated for each file.
|
|
pub fn validate_file_sizes_majority(
|
|
&self,
|
|
game_id: &str,
|
|
) -> eyre::Result<MajorityValidationResult> {
|
|
let game_files = self.latest_game_files_for(game_id);
|
|
if game_files.is_empty() {
|
|
return Ok((Vec::new(), Vec::new(), HashMap::new()));
|
|
}
|
|
|
|
let (file_size_map, _peer_files) = collect_file_sizes(&game_files);
|
|
let (validated_files, peer_scores, file_peer_map) =
|
|
self.validate_each_file_consensus(game_id, file_size_map)?;
|
|
let peer_whitelist = create_peer_whitelist(peer_scores);
|
|
|
|
Ok((validated_files, peer_whitelist, file_peer_map))
|
|
}
|
|
|
|
/// Validates consensus for each file and returns validated files with peer scores.
|
|
fn validate_each_file_consensus(
|
|
&self,
|
|
game_id: &str,
|
|
file_size_map: FileSizeMap,
|
|
) -> eyre::Result<FileConsensusAggregation> {
|
|
let mut validated_files = Vec::new();
|
|
let mut peer_whitelist_scores: HashMap<SocketAddr, usize> = HashMap::new();
|
|
let mut file_peer_map: HashMap<String, Vec<SocketAddr>> = HashMap::new();
|
|
|
|
for (relative_path, size_map) in file_size_map {
|
|
let total_peers: usize = size_map.values().map(Vec::len).sum();
|
|
|
|
if total_peers == 0 {
|
|
continue; // Skip files with no size information
|
|
}
|
|
|
|
let (consensus_size, consensus_peers) =
|
|
self.determine_size_consensus(&size_map, total_peers, &relative_path)?;
|
|
update_peer_scores(&consensus_peers, &mut peer_whitelist_scores);
|
|
|
|
if let Some((size, peers)) = consensus_size
|
|
&& let Some(file_desc) =
|
|
self.create_validated_file_description(game_id, &relative_path, size, &peers)
|
|
{
|
|
file_peer_map.insert(relative_path.clone(), peers.clone());
|
|
validated_files.push(file_desc);
|
|
}
|
|
}
|
|
|
|
Ok((validated_files, peer_whitelist_scores, file_peer_map))
|
|
}
|
|
|
|
/// Determines the consensus size for a file based on peer reports.
|
|
///
|
|
/// # Panics
|
|
///
|
|
/// Panics if `size_map.iter().next()` returns None when `total_peers` == 1
|
|
#[allow(clippy::unused_self)]
|
|
fn determine_size_consensus(
|
|
&self,
|
|
size_map: &HashMap<u64, Vec<SocketAddr>>,
|
|
total_peers: usize,
|
|
relative_path: &str,
|
|
) -> eyre::Result<(ConsensusResult, Vec<SocketAddr>)> {
|
|
if total_peers == 1 {
|
|
// Only one peer has this file - trust it
|
|
let (&size, peers) = size_map
|
|
.iter()
|
|
.next()
|
|
.expect("size_map should have at least one entry when total_peers == 1");
|
|
return Ok((Some((size, peers.clone())), peers.clone()));
|
|
}
|
|
|
|
let (majority_size, _majority_count) = find_majority_size(size_map);
|
|
|
|
if let Some(size) = majority_size {
|
|
let majority_peers = &size_map[&size];
|
|
let is_majority = majority_peers.len() > total_peers / 2;
|
|
|
|
if is_majority {
|
|
// We have a clear majority
|
|
Ok((Some((size, majority_peers.clone())), majority_peers.clone()))
|
|
} else if total_peers == 2 {
|
|
// Two peers with different sizes - ambiguous, fail
|
|
eyre::bail!(
|
|
"File size ambiguity for '{}': two peers report different sizes, cannot determine majority",
|
|
relative_path
|
|
);
|
|
}
|
|
// If no majority and more than 2 peers, we fall back to plurality (largest group)
|
|
else {
|
|
Ok((Some((size, majority_peers.clone())), majority_peers.clone()))
|
|
}
|
|
} else {
|
|
// No clear majority and it's a tie between different sizes
|
|
if total_peers == 2 {
|
|
eyre::bail!(
|
|
"File size ambiguity for '{}': two peers report different sizes, cannot determine majority",
|
|
relative_path
|
|
);
|
|
}
|
|
// For more than 2 peers, we could fall back to plurality, but for now let's be strict
|
|
eyre::bail!(
|
|
"File size ambiguity for '{}': no clear majority among {} peers",
|
|
relative_path,
|
|
total_peers
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Creates a validated file description from consensus data.
|
|
fn create_validated_file_description(
|
|
&self,
|
|
game_id: &str,
|
|
relative_path: &str,
|
|
size: u64,
|
|
peers: &[SocketAddr],
|
|
) -> Option<GameFileDescription> {
|
|
if let Some(first_peer) = peers.first()
|
|
&& let Some(peer_id) = self.addr_index.get(first_peer)
|
|
&& let Some(files) = self.peers.get(peer_id).and_then(|p| p.files.get(game_id))
|
|
&& let Some(file_desc) = files
|
|
.iter()
|
|
.find(|f| f.relative_path == relative_path && f.size == size)
|
|
{
|
|
return Some(file_desc.clone());
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Returns peers that haven't been seen within the timeout duration.
|
|
#[must_use]
|
|
pub fn get_stale_peers(&self, timeout: Duration) -> Vec<SocketAddr> {
|
|
self.peers
|
|
.values()
|
|
.filter(|peer| peer.last_seen.elapsed() > timeout)
|
|
.map(|peer| peer.addr)
|
|
.collect()
|
|
}
|
|
|
|
/// Returns stale peer ids that exceeded the timeout.
|
|
#[must_use]
|
|
pub fn get_stale_peer_ids(&self, timeout: Duration) -> Vec<PeerId> {
|
|
self.peers
|
|
.values()
|
|
.filter(|peer| peer.last_seen.elapsed() > timeout)
|
|
.map(|peer| peer.peer_id.clone())
|
|
.collect()
|
|
}
|
|
}
|
|
|
|
// =============================================================================
|
|
// Type aliases for consensus validation
|
|
// =============================================================================
|
|
|
|
/// Type alias for file size mapping: path -> size -> peers
|
|
type FileSizeMap = HashMap<String, HashMap<u64, Vec<SocketAddr>>>;
|
|
|
|
/// Type alias for peer file mapping: peer -> path -> size
|
|
type PeerFileMap = HashMap<SocketAddr, HashMap<String, u64>>;
|
|
|
|
/// Type alias for consensus result: (size, peers) or None
|
|
type ConsensusResult = Option<(u64, Vec<SocketAddr>)>;
|
|
|
|
/// Type alias for the aggregated majority validation result.
|
|
pub type MajorityValidationResult = (
|
|
Vec<GameFileDescription>,
|
|
Vec<SocketAddr>,
|
|
HashMap<String, Vec<SocketAddr>>,
|
|
);
|
|
|
|
/// Type alias for per-file consensus aggregation results.
|
|
type FileConsensusAggregation = (
|
|
Vec<GameFileDescription>,
|
|
HashMap<SocketAddr, usize>,
|
|
HashMap<String, Vec<SocketAddr>>,
|
|
);
|
|
|
|
// =============================================================================
|
|
// Helper functions for consensus validation
|
|
// =============================================================================
|
|
|
|
/// Collects file sizes from all peers and organizes them by path and size.
|
|
fn collect_file_sizes(
|
|
game_files: &[(SocketAddr, Vec<GameFileDescription>)],
|
|
) -> (FileSizeMap, PeerFileMap) {
|
|
let mut file_size_map: FileSizeMap = HashMap::new();
|
|
let mut peer_files: PeerFileMap = HashMap::new();
|
|
|
|
for (peer_addr, files) in game_files {
|
|
let mut peer_file_sizes = HashMap::new();
|
|
for file in files {
|
|
if !file.is_dir {
|
|
let size = file.size;
|
|
file_size_map
|
|
.entry(file.relative_path.clone())
|
|
.or_default()
|
|
.entry(size)
|
|
.or_default()
|
|
.push(*peer_addr);
|
|
peer_file_sizes.insert(file.relative_path.clone(), size);
|
|
}
|
|
}
|
|
peer_files.insert(*peer_addr, peer_file_sizes);
|
|
}
|
|
|
|
(file_size_map, peer_files)
|
|
}
|
|
|
|
/// Finds the majority size from a map of sizes to peer lists.
|
|
fn find_majority_size(size_map: &HashMap<u64, Vec<SocketAddr>>) -> (Option<u64>, usize) {
|
|
let mut majority_size = None;
|
|
let mut majority_count = 0;
|
|
|
|
for (&size, peers) in size_map {
|
|
let count = peers.len();
|
|
if count > majority_count {
|
|
majority_count = count;
|
|
majority_size = Some(size);
|
|
} else if count == majority_count {
|
|
// Tie between different sizes - ambiguous, fail
|
|
majority_size = None;
|
|
break;
|
|
}
|
|
}
|
|
|
|
(majority_size, majority_count)
|
|
}
|
|
|
|
/// Updates peer scores based on consensus participation.
|
|
fn update_peer_scores(
|
|
peers: &[SocketAddr],
|
|
peer_whitelist_scores: &mut HashMap<SocketAddr, usize>,
|
|
) {
|
|
for &peer in peers {
|
|
*peer_whitelist_scores.entry(peer).or_insert(0) += 1;
|
|
}
|
|
}
|
|
|
|
/// Creates a peer whitelist from scores, including peers with the highest scores.
|
|
fn create_peer_whitelist(peer_scores: HashMap<SocketAddr, usize>) -> Vec<SocketAddr> {
|
|
if peer_scores.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
|
|
let mut peers: Vec<_> = peer_scores
|
|
.into_iter()
|
|
.filter_map(|(peer, score)| (score > 0).then_some((peer, score)))
|
|
.collect();
|
|
|
|
peers.sort_by_key(|(peer, score)| (Reverse(*score), *peer));
|
|
|
|
peers.into_iter().map(|(peer, _)| peer).collect()
|
|
}
|
|
|
|
fn game_is_ready(summary: &GameSummary) -> bool {
|
|
summary.availability == Availability::Ready
|
|
}
|
|
|
|
fn summary_to_game(summary: &GameSummary) -> Game {
|
|
let eti_game_version = game_is_ready(summary)
|
|
.then(|| summary.eti_version.clone())
|
|
.flatten();
|
|
|
|
Game {
|
|
id: summary.id.clone(),
|
|
name: summary.name.clone(),
|
|
description: String::new(),
|
|
release_year: String::new(),
|
|
publisher: String::new(),
|
|
max_players: 1,
|
|
version: "1.0".to_string(),
|
|
genre: String::new(),
|
|
size: summary.size,
|
|
downloaded: game_is_ready(summary),
|
|
installed: summary.installed,
|
|
availability: summary.availability.clone(),
|
|
eti_game_version,
|
|
local_version: None,
|
|
peer_count: 0,
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::net::SocketAddr;
|
|
|
|
use super::*;
|
|
|
|
fn addr(port: u16) -> SocketAddr {
|
|
SocketAddr::from(([127, 0, 0, 1], port))
|
|
}
|
|
|
|
fn ip_addr(ip: [u8; 4], port: u16) -> SocketAddr {
|
|
SocketAddr::from((ip, port))
|
|
}
|
|
|
|
fn summary(id: &str, version: &str, availability: Availability) -> GameSummary {
|
|
GameSummary {
|
|
id: id.to_string(),
|
|
name: id.to_string(),
|
|
size: 42,
|
|
downloaded: availability == Availability::Ready,
|
|
installed: true,
|
|
eti_version: Some(version.to_string()),
|
|
manifest_hash: 7,
|
|
availability,
|
|
}
|
|
}
|
|
|
|
fn file_desc(game_id: &str, relative_path: &str, size: u64) -> GameFileDescription {
|
|
GameFileDescription {
|
|
game_id: game_id.to_string(),
|
|
relative_path: relative_path.to_string(),
|
|
is_dir: false,
|
|
size,
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn aggregation_counts_only_ready_peers_as_download_sources() {
|
|
let ready_addr = addr(12000);
|
|
let local_only_addr = addr(12001);
|
|
let mut db = PeerGameDB::new();
|
|
db.upsert_peer("ready".to_string(), ready_addr);
|
|
db.upsert_peer("local".to_string(), local_only_addr);
|
|
db.update_peer_games(
|
|
&"ready".to_string(),
|
|
vec![summary("game", "20240101", Availability::Ready)],
|
|
);
|
|
db.update_peer_games(
|
|
&"local".to_string(),
|
|
vec![summary("game", "20990101", Availability::LocalOnly)],
|
|
);
|
|
|
|
let games = db.get_all_games();
|
|
assert_eq!(games.len(), 1);
|
|
assert_eq!(games[0].peer_count, 1);
|
|
assert!(games[0].downloaded);
|
|
assert_eq!(games[0].eti_game_version.as_deref(), Some("20240101"));
|
|
|
|
assert_eq!(db.peers_with_game("game"), vec![ready_addr]);
|
|
assert_eq!(
|
|
db.get_latest_version_for_game("game").as_deref(),
|
|
Some("20240101")
|
|
);
|
|
assert_eq!(db.peers_with_latest_version("game"), vec![ready_addr]);
|
|
}
|
|
|
|
#[test]
|
|
fn local_only_peer_does_not_make_game_downloadable() {
|
|
let local_only_addr = addr(12002);
|
|
let mut db = PeerGameDB::new();
|
|
db.upsert_peer("local".to_string(), local_only_addr);
|
|
db.update_peer_games(
|
|
&"local".to_string(),
|
|
vec![summary("game", "20240101", Availability::LocalOnly)],
|
|
);
|
|
|
|
let games = db.get_all_games();
|
|
assert_eq!(games.len(), 1);
|
|
assert_eq!(games[0].peer_count, 0);
|
|
assert!(!games[0].downloaded);
|
|
assert_eq!(games[0].availability, Availability::LocalOnly);
|
|
assert_eq!(games[0].eti_game_version, None);
|
|
|
|
assert!(db.peers_with_game("game").is_empty());
|
|
assert_eq!(db.get_latest_version_for_game("game"), None);
|
|
assert!(db.peers_with_latest_version("game").is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn transport_addr_matches_known_peer_on_ephemeral_port() {
|
|
let advertised = ip_addr([10, 66, 0, 2], 40000);
|
|
let transport_source = ip_addr([10, 66, 0, 2], 52000);
|
|
let mut db = PeerGameDB::new();
|
|
db.upsert_peer("peer".to_string(), advertised);
|
|
|
|
assert_eq!(
|
|
db.peer_id_for_transport_addr(&transport_source).as_deref(),
|
|
Some("peer")
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn transport_addr_fallback_requires_unique_peer_ip() {
|
|
let source = ip_addr([10, 66, 0, 2], 52000);
|
|
let mut db = PeerGameDB::new();
|
|
db.upsert_peer("first".to_string(), ip_addr([10, 66, 0, 2], 40000));
|
|
db.upsert_peer("second".to_string(), ip_addr([10, 66, 0, 2], 41000));
|
|
|
|
assert_eq!(db.peer_id_for_transport_addr(&source), None);
|
|
}
|
|
|
|
#[test]
|
|
fn address_update_preserves_peer_identity_and_library() {
|
|
let old_addr = ip_addr([10, 66, 0, 2], 40000);
|
|
let new_addr = ip_addr([10, 66, 0, 3], 41000);
|
|
let mut db = PeerGameDB::new();
|
|
|
|
let first = db.upsert_peer("peer".to_string(), old_addr);
|
|
assert!(first.is_new);
|
|
db.update_peer_games(
|
|
&"peer".to_string(),
|
|
vec![summary("game", "20250101", Availability::Ready)],
|
|
);
|
|
|
|
let second = db.upsert_peer("peer".to_string(), new_addr);
|
|
assert!(!second.is_new);
|
|
assert!(second.addr_changed);
|
|
|
|
let peers = db.peer_snapshots();
|
|
assert_eq!(peers.len(), 1);
|
|
assert_eq!(peers[0].peer_id, "peer");
|
|
assert_eq!(peers[0].addr, new_addr);
|
|
assert_eq!(peers[0].games.len(), 1);
|
|
assert_eq!(peers[0].games[0].id, "game");
|
|
assert_eq!(db.peer_id_for_addr(&old_addr), None);
|
|
assert_eq!(
|
|
db.peer_id_for_addr(&new_addr).map(String::as_str),
|
|
Some("peer")
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn validation_uses_latest_version_file_metadata() {
|
|
let old_addr = addr(12003);
|
|
let new_addr = addr(12004);
|
|
let mut db = PeerGameDB::new();
|
|
db.upsert_peer("old".to_string(), old_addr);
|
|
db.upsert_peer("new".to_string(), new_addr);
|
|
db.update_peer_games(
|
|
&"old".to_string(),
|
|
vec![summary("game", "20240101", Availability::Ready)],
|
|
);
|
|
db.update_peer_games(
|
|
&"new".to_string(),
|
|
vec![summary("game", "20250101", Availability::Ready)],
|
|
);
|
|
db.update_peer_game_files(
|
|
&"old".to_string(),
|
|
"game",
|
|
vec![
|
|
file_desc("game", "game/version.ini", 8),
|
|
file_desc("game", "game/archive.eti", 10),
|
|
],
|
|
);
|
|
db.update_peer_game_files(
|
|
&"new".to_string(),
|
|
"game",
|
|
vec![
|
|
file_desc("game", "game/version.ini", 8),
|
|
file_desc("game", "game/archive.eti", 20),
|
|
],
|
|
);
|
|
|
|
let aggregated = db.aggregated_game_files("game");
|
|
let archive = aggregated
|
|
.iter()
|
|
.find(|desc| desc.relative_path == "game/archive.eti")
|
|
.expect("latest archive should be present");
|
|
assert_eq!(archive.size, 20);
|
|
|
|
let (validated, peers, file_peer_map) = db
|
|
.validate_file_sizes_majority("game")
|
|
.expect("old-version file metadata should not create ambiguity");
|
|
assert_eq!(peers, vec![new_addr]);
|
|
let archive = validated
|
|
.iter()
|
|
.find(|desc| desc.relative_path == "game/archive.eti")
|
|
.expect("latest archive should validate");
|
|
assert_eq!(archive.size, 20);
|
|
assert_eq!(file_peer_map.get("game/archive.eti"), Some(&vec![new_addr]));
|
|
}
|
|
}
|