From b60dcef471b94858dab4f31fc6fdf2cfdcc40586 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Tue, 13 Jan 2026 18:59:12 +0100 Subject: [PATCH] ChatGPT Codex 5.2 xhigh refactored > 45min --- Cargo.lock | 2 + crates/lanspread-mdns/src/lib.rs | 38 +- crates/lanspread-peer/Cargo.toml | 2 + crates/lanspread-peer/README.md | 6 +- crates/lanspread-peer/src/config.rs | 9 +- crates/lanspread-peer/src/context.rs | 12 +- crates/lanspread-peer/src/handlers.rs | 139 +++-- crates/lanspread-peer/src/identity.rs | 44 ++ crates/lanspread-peer/src/lib.rs | 32 +- crates/lanspread-peer/src/library.rs | 129 +++++ crates/lanspread-peer/src/local_games.rs | 482 +++++++++++----- crates/lanspread-peer/src/network.rs | 78 ++- crates/lanspread-peer/src/peer_db.rs | 297 ++++++++-- crates/lanspread-peer/src/services.rs | 700 +++++++++++++++++++---- crates/lanspread-proto/src/lib.rs | 69 +++ 15 files changed, 1672 insertions(+), 367 deletions(-) create mode 100644 crates/lanspread-peer/src/identity.rs create mode 100644 crates/lanspread-peer/src/library.rs diff --git a/Cargo.lock b/Cargo.lock index 083d85a..31528dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2167,6 +2167,8 @@ dependencies = [ "lanspread-utils", "log", "s2n-quic", + "serde", + "serde_json", "tokio", "tokio-util", "tracing", diff --git a/crates/lanspread-mdns/src/lib.rs b/crates/lanspread-mdns/src/lib.rs index e69db19..b6fab05 100644 --- a/crates/lanspread-mdns/src/lib.rs +++ b/crates/lanspread-mdns/src/lib.rs @@ -1,6 +1,6 @@ #![allow(clippy::missing_errors_doc, clippy::missing_panics_doc)] -use std::net::SocketAddr; +use std::{collections::HashMap, net::SocketAddr}; use eyre::bail; pub use mdns_sd::DaemonEvent; @@ -15,7 +15,12 @@ pub struct MdnsAdvertiser { } impl MdnsAdvertiser { - pub fn new(service_type: &str, instance_name: &str, address: SocketAddr) -> eyre::Result { + pub fn new( + service_type: &str, + instance_name: &str, + address: SocketAddr, + properties: Option>, + ) -> eyre::Result { let host_name = format!("{}.local.", address.ip()); let daemon = ServiceDaemon::new()?; let service_info = ServiceInfo::new( @@ -24,7 +29,7 @@ impl MdnsAdvertiser { &host_name, address.ip(), address.port(), - None, + properties, )?; let monitor = daemon.monitor()?; @@ -53,6 +58,14 @@ pub struct MdnsBrowser { service_type: String, } +#[derive(Debug, Clone)] +pub struct MdnsService { + pub addr: SocketAddr, + pub fullname: String, + pub hostname: String, + pub properties: HashMap, +} + impl MdnsBrowser { pub fn new(service_type: &str) -> eyre::Result { let daemon = ServiceDaemon::new()?; @@ -64,10 +77,10 @@ impl MdnsBrowser { }) } - pub fn next_address( + pub fn next_service( &self, ignore_addr: Option, - ) -> eyre::Result> { + ) -> eyre::Result> { loop { match self.receiver.recv() { Ok(ServiceEvent::ServiceResolved(info)) => { @@ -93,7 +106,13 @@ impl MdnsBrowser { } log::info!("Found server at {addr}"); - return Ok(Some(addr)); + let properties = info.get_properties().clone().into_property_map_str(); + return Ok(Some(MdnsService { + addr, + fullname: info.get_fullname().to_string(), + hostname: info.get_hostname().to_string(), + properties, + })); } if ignored_match { @@ -116,6 +135,13 @@ impl MdnsBrowser { } } } + + pub fn next_address( + &self, + ignore_addr: Option, + ) -> eyre::Result> { + Ok(self.next_service(ignore_addr)?.map(|service| service.addr)) + } } impl Drop for MdnsBrowser { diff --git a/crates/lanspread-peer/Cargo.toml b/crates/lanspread-peer/Cargo.toml index 3d79ea1..fbdd779 100644 --- a/crates/lanspread-peer/Cargo.toml +++ b/crates/lanspread-peer/Cargo.toml @@ -24,6 +24,8 @@ gethostname = { workspace = true } if-addrs = { workspace = true } log = { workspace = true } s2n-quic = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/crates/lanspread-peer/README.md b/crates/lanspread-peer/README.md index 7b6abce..9facd4f 100644 --- a/crates/lanspread-peer/README.md +++ b/crates/lanspread-peer/README.md @@ -34,9 +34,9 @@ lifetime of the process: 3. **Ping service** (`run_ping_service`) – periodically issues QUIC ping requests to keep peer liveness up to date and prunes stale entries from `PeerGameDB`. -`load_local_game_db` scans the configured game directory (looking for folders -with a `version.ini`) and hydrates a `GameDB`. That database is used to respond -to incoming metadata requests (`Request::ListGames` / `Request::GetGame`). +`scan_local_library` maintains a lightweight on-disk index and produces both a +`GameDB` and protocol summaries. The resulting database is used to respond to +incoming metadata requests (`Request::ListGames` / `Request::GetGame`). ## Networking and File Transfer diff --git a/crates/lanspread-peer/src/config.rs b/crates/lanspread-peer/src/config.rs index 7ff9521..473fd55 100644 --- a/crates/lanspread-peer/src/config.rs +++ b/crates/lanspread-peer/src/config.rs @@ -3,10 +3,13 @@ use std::time::Duration; /// Interval between peer ping checks (seconds). -pub const PEER_PING_INTERVAL_SECS: u64 = 5; +pub const PEER_PING_INTERVAL_SECS: u64 = 20; + +/// Minimum idle time before pinging a peer (seconds). +pub const PEER_PING_IDLE_SECS: u64 = 30; /// Timeout after which a peer is considered stale (seconds). -pub const PEER_STALE_TIMEOUT_SECS: u64 = 12; +pub const PEER_STALE_TIMEOUT_SECS: u64 = 90; /// Size of each download chunk (32 MB). pub const CHUNK_SIZE: u64 = 32 * 1024 * 1024; @@ -15,7 +18,7 @@ pub const CHUNK_SIZE: u64 = 32 * 1024 * 1024; pub const MAX_RETRY_COUNT: usize = 3; /// Interval for local game directory monitoring (seconds). -pub const LOCAL_GAME_MONITOR_INTERVAL_SECS: u64 = 5; +pub const LOCAL_GAME_MONITOR_INTERVAL_SECS: u64 = 15; /// TLS certificate for QUIC connections. pub static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); diff --git a/crates/lanspread-peer/src/context.rs b/crates/lanspread-peer/src/context.rs index 1abede0..71dbcf9 100644 --- a/crates/lanspread-peer/src/context.rs +++ b/crates/lanspread-peer/src/context.rs @@ -9,17 +9,19 @@ use std::{ use lanspread_db::db::GameDB; use tokio::{sync::RwLock, task::JoinHandle}; -use crate::{PeerEvent, peer_db::PeerGameDB}; +use crate::{PeerEvent, library::LocalLibraryState, peer_db::PeerGameDB}; /// Main context for the peer system. #[derive(Clone)] pub struct Ctx { pub game_dir: Arc>>, pub local_game_db: Arc>>, + pub local_library: Arc>, pub peer_game_db: Arc>, pub local_peer_addr: Arc>>, pub downloading_games: Arc>>, pub active_downloads: Arc>>>, + pub peer_id: Arc, } /// Context for peer connection handling. @@ -27,9 +29,11 @@ pub struct Ctx { pub struct PeerCtx { pub game_dir: Arc>>, pub local_game_db: Arc>>, + pub local_library: Arc>, pub local_peer_addr: Arc>>, pub downloading_games: Arc>>, pub peer_game_db: Arc>, + pub peer_id: Arc, pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender, } @@ -46,14 +50,16 @@ impl std::fmt::Debug for PeerCtx { impl Ctx { /// Creates a new context with the given peer game database. - pub fn new(peer_game_db: Arc>) -> Self { + pub fn new(peer_game_db: Arc>, peer_id: String) -> Self { Self { game_dir: Arc::new(RwLock::new(None)), local_game_db: Arc::new(RwLock::new(None)), + local_library: Arc::new(RwLock::new(LocalLibraryState::empty())), peer_game_db, local_peer_addr: Arc::new(RwLock::new(None)), downloading_games: Arc::new(RwLock::new(HashSet::new())), active_downloads: Arc::new(RwLock::new(HashMap::new())), + peer_id: Arc::new(peer_id), } } @@ -65,9 +71,11 @@ impl Ctx { PeerCtx { game_dir: self.game_dir.clone(), local_game_db: self.local_game_db.clone(), + local_library: self.local_library.clone(), local_peer_addr: self.local_peer_addr.clone(), downloading_games: self.downloading_games.clone(), peer_game_db: self.peer_game_db.clone(), + peer_id: self.peer_id.clone(), tx_notify_ui, } } diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 3336aa2..9187ac5 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -1,17 +1,23 @@ //! Command handlers for peer commands. -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; -use lanspread_db::db::{Game, GameDB, GameFileDescription}; +use lanspread_db::db::{Game, GameFileDescription}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ PeerEvent, context::Ctx, download::download_game_files, - local_games::{get_game_file_descriptions, load_local_game_db, local_download_available}, - network::{announce_games_to_peer, request_game_details_from_peer}, - peer_db::PeerGameDB, + identity::FEATURE_LIBRARY_DELTA, + local_games::{ + LocalLibraryScan, + get_game_file_descriptions, + local_download_available, + scan_local_library, + }, + network::{announce_games_to_peer, request_game_details_from_peer, send_library_delta}, + peer_db::{PeerGameDB, PeerId}, }; // ============================================================================= @@ -35,6 +41,20 @@ pub async fn emit_peer_game_list( } } +async fn ensure_peer_id_for_addr( + peer_game_db: &Arc>, + peer_addr: SocketAddr, +) -> PeerId { + let mut db = peer_game_db.write().await; + if let Some(peer_id) = db.peer_id_for_addr(&peer_addr).cloned() { + return peer_id; + } + + let legacy_id = format!("legacy-{peer_addr}"); + db.upsert_peer(legacy_id.clone(), peer_addr); + legacy_id +} + /// Tries to serve a game from local files. async fn try_serve_local_game( ctx: &Ctx, @@ -128,10 +148,11 @@ async fn request_game_details_and_update( peer_game_db: Arc>, ) -> eyre::Result> { let (file_descriptions, _) = request_game_details_from_peer(peer_addr, game_id).await?; + let peer_id = ensure_peer_id_for_addr(&peer_game_db, peer_addr).await; { let mut db = peer_game_db.write().await; - db.update_peer_game_files(peer_addr, game_id, file_descriptions.clone()); + db.update_peer_game_files(&peer_id, game_id, file_descriptions.clone()); } Ok(file_descriptions) @@ -277,9 +298,9 @@ pub async fn handle_set_game_dir_command( let ctx_clone = ctx.clone(); tokio::spawn(async move { - match load_local_game_db(&game_dir).await { - Ok(db) => { - update_and_announce_games(&ctx_clone, &tx_notify_ui, db).await; + match scan_local_library(&game_dir).await { + Ok(scan) => { + update_and_announce_games(&ctx_clone, &tx_notify_ui, scan).await; log::info!("Local game database loaded successfully"); } Err(e) => { @@ -306,67 +327,61 @@ pub async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSe pub async fn update_and_announce_games( ctx: &Ctx, tx_notify_ui: &UnboundedSender, - new_db: GameDB, + scan: LocalLibraryScan, ) { - let local_game_db = ctx.local_game_db.clone(); - let mut db_guard = local_game_db.write().await; + let LocalLibraryScan { + game_db, + summaries, + revision, + } = scan; - let previous_games = db_guard - .as_ref() - .map(|db| db.games.keys().cloned().collect::>()) - .unwrap_or_default(); + let delta = { + let mut library_guard = ctx.local_library.write().await; + library_guard.update_from_scan(summaries, revision) + }; - let current_game_ids = new_db.games.keys().cloned().collect::>(); + let Some(delta) = delta else { + return; + }; - // Check if any games were removed - let removed_games: Vec = previous_games - .difference(¤t_game_ids) + { + let mut db_guard = ctx.local_game_db.write().await; + *db_guard = Some(game_db.clone()); + } + + let all_games = game_db + .all_games() + .into_iter() .cloned() - .collect(); + .collect::>(); - if removed_games.is_empty() { - // Check if any games were added or updated - if previous_games != current_game_ids { - log::debug!("Local games directory structure changed, updating database"); - *db_guard = Some(new_db); + if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) { + log::error!("Failed to send LocalGamesUpdated event: {e}"); + } - let all_games = db_guard - .as_ref() - .map(|db| db.all_games().into_iter().cloned().collect::>()) - .unwrap_or_default(); + let peer_targets = { + let db = ctx.peer_game_db.read().await; + db.peer_identities() + .into_iter() + .map(|(peer_id, addr)| { + let features = db.peer_features(&peer_id); + (peer_id, addr, features) + }) + .collect::>() + }; - if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) { - log::error!("Failed to send LocalGamesUpdated event: {e}"); - } - - // Broadcast update to all peers - let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; - for peer_addr in peer_addresses { - let games_clone = all_games.clone(); - tokio::spawn(async move { - if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await { - log::warn!("Failed to announce games to {peer_addr}: {e}"); - } - }); - } - } - } else { - log::info!("Detected removed games: {removed_games:?}"); - *db_guard = Some(new_db); - - // Notify UI about the change - let all_games = db_guard - .as_ref() - .map(|db| db.all_games().into_iter().cloned().collect::>()) - .unwrap_or_default(); - - if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) { - log::error!("Failed to send LocalGamesUpdated event: {e}"); - } - - // Broadcast update to all peers - let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; - for peer_addr in peer_addresses { + for (_peer_id, peer_addr, features) in peer_targets { + if features + .iter() + .any(|feature| feature == FEATURE_LIBRARY_DELTA) + { + let delta = delta.clone(); + tokio::spawn(async move { + if let Err(e) = send_library_delta(peer_addr, delta).await { + log::warn!("Failed to send library delta to {peer_addr}: {e}"); + } + }); + } else { let games_clone = all_games.clone(); tokio::spawn(async move { if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await { diff --git a/crates/lanspread-peer/src/identity.rs b/crates/lanspread-peer/src/identity.rs new file mode 100644 index 0000000..33d889f --- /dev/null +++ b/crates/lanspread-peer/src/identity.rs @@ -0,0 +1,44 @@ +use std::path::PathBuf; + +use uuid::Uuid; + +const PEER_ID_FILE: &str = "peer_id"; + +pub const FEATURE_LIBRARY_DELTA: &str = "library-delta-v1"; +pub const FEATURE_LIBRARY_SNAPSHOT: &str = "library-snapshot-v1"; + +pub fn load_or_create_peer_id() -> eyre::Result { + let path = peer_id_path(); + if let Ok(existing) = std::fs::read_to_string(&path) { + let trimmed = existing.trim(); + if !trimmed.is_empty() { + return Ok(trimmed.to_string()); + } + } + + let peer_id = Uuid::now_v7().simple().to_string(); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + std::fs::write(&path, peer_id.as_bytes())?; + Ok(peer_id) +} + +pub fn default_features() -> Vec { + vec![ + FEATURE_LIBRARY_DELTA.to_string(), + FEATURE_LIBRARY_SNAPSHOT.to_string(), + ] +} + +fn peer_id_path() -> PathBuf { + if let Some(dir) = std::env::var_os("LANSPREAD_STATE_DIR") { + return PathBuf::from(dir).join(PEER_ID_FILE); + } + + if let Some(home) = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE")) { + return PathBuf::from(home).join(".lanspread").join(PEER_ID_FILE); + } + + std::env::temp_dir().join("lanspread").join(PEER_ID_FILE) +} diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 302495d..c162885 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -17,6 +17,8 @@ mod context; mod download; mod error; mod handlers; +mod identity; +mod library; mod local_games; mod network; mod path_validation; @@ -33,7 +35,7 @@ use std::{net::SocketAddr, sync::Arc}; pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT}; pub use error::PeerError; use lanspread_db::db::{Game, GameFileDescription}; -pub use peer_db::{MajorityValidationResult, PeerGameDB, PeerInfo}; +pub use peer_db::{MajorityValidationResult, PeerGameDB, PeerId, PeerInfo, PeerUpsert}; use tokio::sync::{ RwLock, mpsc::{UnboundedReceiver, UnboundedSender}, @@ -48,6 +50,7 @@ use crate::{ handle_list_games_command, handle_set_game_dir_command, }, + network::send_goodbye, services::{ run_local_game_monitor, run_peer_discovery, @@ -137,13 +140,14 @@ pub fn start_peer( peer_game_db: Arc>, ) -> eyre::Result> { log::info!("Starting peer system with game directory: {game_dir}"); + let peer_id = identity::load_or_create_peer_id()?; let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel(); // Start the peer in a background task let tx_control_clone = tx_control.clone(); tokio::spawn(async move { - if let Err(e) = run_peer(rx_control, tx_notify_ui, peer_game_db).await { + if let Err(e) = run_peer(rx_control, tx_notify_ui, peer_game_db, peer_id).await { log::error!("Peer system failed: {e}"); } }); @@ -159,9 +163,10 @@ async fn run_peer( mut rx_control: UnboundedReceiver, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, + peer_id: String, ) -> eyre::Result<()> { // Create the shared context - let ctx = Ctx::new(peer_game_db.clone()); + let ctx = Ctx::new(peer_game_db.clone(), peer_id); let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone()); // Start server component @@ -178,15 +183,9 @@ async fn run_peer( // Start peer discovery task let tx_notify_ui_discovery = tx_notify_ui.clone(); - let peer_game_db_discovery = ctx.peer_game_db.clone(); - let local_peer_addr = ctx.local_peer_addr.clone(); + let ctx_discovery = ctx.clone(); tokio::spawn(async move { - run_peer_discovery( - tx_notify_ui_discovery, - peer_game_db_discovery, - local_peer_addr, - ) - .await; + run_peer_discovery(tx_notify_ui_discovery, ctx_discovery).await; }); // Start ping service task @@ -240,5 +239,16 @@ async fn run_peer( } } + let peer_id = ctx.peer_id.as_ref().clone(); + let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; + for peer_addr in peer_addresses { + let peer_id = peer_id.clone(); + tokio::spawn(async move { + if let Err(e) = send_goodbye(peer_addr, peer_id).await { + log::warn!("Failed to send Goodbye to {peer_addr}: {e}"); + } + }); + } + Ok(()) } diff --git a/crates/lanspread-peer/src/library.rs b/crates/lanspread-peer/src/library.rs new file mode 100644 index 0000000..cf0038b --- /dev/null +++ b/crates/lanspread-peer/src/library.rs @@ -0,0 +1,129 @@ +use std::{ + collections::{HashMap, VecDeque}, + hash::{Hash, Hasher}, +}; + +use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot, LibrarySummary}; + +const MAX_DELTA_HISTORY: usize = 8; + +#[derive(Debug, Clone)] +pub struct LocalLibraryState { + pub revision: u64, + pub digest: u64, + pub games: HashMap, + pub recent_deltas: VecDeque, +} + +impl LocalLibraryState { + pub fn empty() -> Self { + Self { + revision: 0, + digest: 0, + games: HashMap::new(), + recent_deltas: VecDeque::new(), + } + } + + pub fn update_from_scan( + &mut self, + summaries: HashMap, + revision: u64, + ) -> Option { + let new_digest = compute_library_digest(&summaries); + let changed = + self.revision != revision || self.digest != new_digest || self.games != summaries; + + if !changed { + return None; + } + + let delta = compute_library_delta(self.revision, revision, &self.games, &summaries); + self.revision = revision; + self.digest = new_digest; + self.games = summaries; + self.recent_deltas.push_back(delta.clone()); + while self.recent_deltas.len() > MAX_DELTA_HISTORY { + self.recent_deltas.pop_front(); + } + Some(delta) + } + + pub fn delta_since(&self, from_rev: u64) -> Option { + self.recent_deltas + .iter() + .find(|delta| delta.from_rev == from_rev) + .cloned() + } +} + +pub fn compute_library_digest(games: &HashMap) -> u64 { + let mut entries: Vec<&GameSummary> = games.values().collect(); + entries.sort_by(|a, b| a.id.cmp(&b.id)); + + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + for summary in entries { + summary.id.hash(&mut hasher); + summary.name.hash(&mut hasher); + summary.size.hash(&mut hasher); + summary.downloaded.hash(&mut hasher); + summary.installed.hash(&mut hasher); + summary.eti_version.hash(&mut hasher); + summary.manifest_hash.hash(&mut hasher); + summary.availability.hash(&mut hasher); + } + hasher.finish() +} + +pub fn build_library_summary(state: &LocalLibraryState) -> LibrarySummary { + LibrarySummary { + library_rev: state.revision, + library_digest: state.digest, + game_count: state.games.len(), + } +} + +pub fn build_library_snapshot(state: &LocalLibraryState) -> LibrarySnapshot { + let mut games: Vec = state.games.values().cloned().collect(); + games.sort_by(|a, b| a.id.cmp(&b.id)); + LibrarySnapshot { + library_rev: state.revision, + games, + } +} + +pub fn compute_library_delta( + from_rev: u64, + to_rev: u64, + previous: &HashMap, + next: &HashMap, +) -> LibraryDelta { + let mut added = Vec::new(); + let mut updated = Vec::new(); + let mut removed = Vec::new(); + + for (game_id, summary) in next { + match previous.get(game_id) { + None => added.push(summary.clone()), + Some(existing) => { + if existing != summary { + updated.push(summary.clone()); + } + } + } + } + + for game_id in previous.keys() { + if !next.contains_key(game_id) { + removed.push(game_id.clone()); + } + } + + LibraryDelta { + from_rev, + to_rev, + added, + updated, + removed, + } +} diff --git a/crates/lanspread-peer/src/local_games.rs b/crates/lanspread-peer/src/local_games.rs index 658c36b..8ed74c4 100644 --- a/crates/lanspread-peer/src/local_games.rs +++ b/crates/lanspread-peer/src/local_games.rs @@ -1,12 +1,16 @@ //! Local game scanning and database management. use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, + hash::{Hash, Hasher}, io::ErrorKind, path::{Path, PathBuf}, + time::{SystemTime, UNIX_EPOCH}, }; use lanspread_db::db::{Game, GameDB, GameFileDescription}; +use lanspread_proto::{Availability, GameSummary}; +use serde::{Deserialize, Serialize}; use crate::error::PeerError; @@ -72,137 +76,138 @@ pub async fn local_download_available( } // ============================================================================= -// Directory size calculation +// Local library index and scanning // ============================================================================= -/// Calculates the total size of a directory recursively. -pub async fn calculate_directory_size(dir: &Path, is_root: bool) -> eyre::Result { - let mut total_size = 0u64; - let mut entries = tokio::fs::read_dir(dir).await?; +const LIBRARY_INDEX_DIR: &str = ".lanspread"; +const LIBRARY_INDEX_FILE: &str = "library_index.json"; - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - let name = entry.file_name(); - let name_str = name.to_string_lossy(); - - if is_root { - if name_str == ".sync" || name_str == ".softlan_first_start_done" { - continue; - } - if entry.file_type().await?.is_dir() && is_local_dir_name(&name_str) { - continue; - } - } - - let metadata = tokio::fs::metadata(&path).await?; - - if metadata.is_dir() { - total_size += Box::pin(calculate_directory_size(&path, false)).await?; - } else { - total_size += metadata.len(); - } - } - - Ok(total_size) +#[derive(Debug, Clone, Serialize, Deserialize)] +struct LibraryIndex { + revision: u64, + games: HashMap, } -// ============================================================================= -// Game database loading -// ============================================================================= +#[derive(Debug, Clone, Serialize, Deserialize)] +struct GameIndexEntry { + summary: GameSummary, + fingerprint: GameFingerprint, +} -/// Loads the local game database from the game directory. -pub async fn load_local_game_db(game_dir: &str) -> eyre::Result { - let game_path = PathBuf::from(game_dir); +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +struct GameFingerprint { + eti_size: Option, + eti_mtime: Option, + version_mtime: Option, + local_dir_present: bool, +} - let metadata = match tokio::fs::metadata(&game_path).await { - Ok(metadata) => metadata, +#[derive(Debug, Clone)] +pub struct LocalLibraryScan { + pub game_db: GameDB, + pub summaries: HashMap, + pub revision: u64, +} + +fn library_index_path(game_dir: &str) -> PathBuf { + PathBuf::from(game_dir) + .join(LIBRARY_INDEX_DIR) + .join(LIBRARY_INDEX_FILE) +} + +async fn load_library_index(path: &Path) -> LibraryIndex { + let data = match tokio::fs::read_to_string(path).await { + Ok(data) => data, Err(err) => { - if err.kind() == ErrorKind::NotFound { - log::warn!( - "Local game directory {} missing; reporting empty game database", - game_path.display() - ); - return Ok(GameDB::empty()); + if err.kind() != ErrorKind::NotFound { + log::warn!("Failed to read library index {}: {err}", path.display()); } - return Err(err.into()); + return LibraryIndex { + revision: 0, + games: HashMap::new(), + }; } }; - if !metadata.is_dir() { - log::warn!( - "Configured game directory {} is not a directory; reporting empty game database", - game_path.display() - ); - return Ok(GameDB::empty()); + match serde_json::from_str(&data) { + Ok(index) => index, + Err(err) => { + log::warn!("Failed to parse library index {}: {err}", path.display()); + LibraryIndex { + revision: 0, + games: HashMap::new(), + } + } + } +} + +async fn save_library_index(path: &Path, index: &LibraryIndex) -> eyre::Result<()> { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let data = serde_json::to_vec_pretty(index)?; + tokio::fs::write(path, data).await?; + Ok(()) +} + +fn system_time_to_secs(time: SystemTime) -> u64 { + time.duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +async fn fingerprint_game_dir(game_path: &Path, game_id: &str) -> eyre::Result { + let eti_path = game_path.join(format!("{game_id}.eti")); + let (eti_size, eti_mtime) = match tokio::fs::metadata(&eti_path).await { + Ok(metadata) => ( + Some(metadata.len()), + metadata.modified().ok().map(system_time_to_secs), + ), + Err(_) => (None, None), + }; + + let version_path = game_path.join("version.ini"); + let version_mtime = match tokio::fs::metadata(&version_path).await { + Ok(metadata) => metadata.modified().ok().map(system_time_to_secs), + Err(_) => None, + }; + + let local_dir_present = local_dir_has_content(game_path).await; + + Ok(GameFingerprint { + eti_size, + eti_mtime, + version_mtime, + local_dir_present, + }) +} + +fn should_skip_root_entry(entry: &walkdir::DirEntry) -> bool { + if entry.depth() != 1 { + return false; } - let mut games = Vec::new(); + if entry.file_type().is_dir() && entry.file_name().to_str().is_some_and(is_local_dir_name) { + return true; + } - // Scan game directory and create entries for installed games - let mut entries = tokio::fs::read_dir(&game_path).await?; - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.is_dir() - && let Some(game_id) = path.file_name().and_then(|n| n.to_str()) - { - let eti_path = path.join(format!("{game_id}.eti")); - let downloaded = tokio::fs::metadata(&eti_path).await.is_ok(); - if !downloaded { - continue; - } - - let installed = local_dir_has_content(&path).await; - let local_version = if installed { - match lanspread_db::db::read_version_from_ini(&path) { - Ok(version) => version, - Err(e) => { - log::warn!("Failed to read version.ini for installed game {game_id}: {e}"); - None - } - } - } else { - None - }; - - let size = calculate_directory_size(&path, true).await?; - let game = Game { - id: game_id.to_string(), - name: game_id.to_string(), - description: String::new(), - release_year: String::new(), - publisher: String::new(), - max_players: 1, - version: "1.0".to_string(), - genre: String::new(), - size, - downloaded, - installed, - eti_game_version: local_version.clone(), - local_version, - peer_count: 0, // Local games start with 0 peers - }; - games.push(game); + if let Some(name) = entry.file_name().to_str() { + if entry.file_type().is_dir() && name == ".sync" { + return true; + } + if entry.file_type().is_file() && name == ".softlan_game_installed" { + return true; } } - Ok(GameDB::from(games)) + false } -/// Scans the local games directory and returns a `GameDB` with current games. -pub async fn scan_local_games(game_dir: &str) -> eyre::Result { - load_local_game_db(game_dir).await -} - -// ============================================================================= -// Game file descriptions -// ============================================================================= - -/// Gets file descriptions for a game from the local filesystem. -pub async fn get_game_file_descriptions( +async fn scan_game_descriptions( game_id: &str, - game_dir: &str, + game_dir: &Path, ) -> Result, PeerError> { - let base_dir = PathBuf::from(game_dir); + let base_dir = game_dir; let game_path = base_dir.join(game_id); if !game_path.exists() { @@ -216,30 +221,10 @@ pub async fn get_game_file_descriptions( for entry in walkdir::WalkDir::new(&game_path) .into_iter() - .filter_entry(|entry| { - if entry.depth() == 1 { - if entry.file_type().is_dir() - && entry.file_name().to_str().is_some_and(is_local_dir_name) - { - // Skip the local install folder entirely so WalkDir never enters it. - return false; - } - - if let Some(name) = entry.file_name().to_str() { - if entry.file_type().is_dir() && name == ".sync" { - return false; - } - if entry.file_type().is_file() && name == ".softlan_game_installed" { - return false; - } - } - } - - true - }) + .filter_entry(|entry| !should_skip_root_entry(entry)) .filter_map(std::result::Result::ok) { - let relative_path = match entry.path().strip_prefix(&base_dir) { + let relative_path = match entry.path().strip_prefix(base_dir) { Ok(path) => path.to_string_lossy().to_string(), Err(e) => { log::error!( @@ -279,3 +264,236 @@ pub async fn get_game_file_descriptions( Ok(file_descriptions) } + +fn manifest_hash(file_descriptions: &[GameFileDescription]) -> u64 { + let mut entries: Vec<_> = file_descriptions + .iter() + .filter(|desc| !desc.is_dir) + .map(|desc| (&desc.relative_path, desc.size, desc.is_dir)) + .collect(); + entries.sort_by(|a, b| a.0.cmp(b.0).then(a.1.cmp(&b.1))); + + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + for (path, size, is_dir) in entries { + path.hash(&mut hasher); + size.hash(&mut hasher); + is_dir.hash(&mut hasher); + } + hasher.finish() +} + +async fn build_game_summary(game_dir: &Path, game_id: &str) -> Result { + let game_path = game_dir.join(game_id); + let eti_path = game_path.join(format!("{game_id}.eti")); + let downloaded = tokio::fs::metadata(&eti_path).await.is_ok(); + if !downloaded { + return Err(PeerError::Other(eyre::eyre!( + "Game is not downloaded: {game_id}" + ))); + } + + let installed = local_dir_has_content(&game_path).await; + let eti_version = if installed { + match lanspread_db::db::read_version_from_ini(&game_path) { + Ok(version) => version, + Err(e) => { + log::warn!("Failed to read version.ini for installed game {game_id}: {e}"); + None + } + } + } else { + None + }; + + let file_descriptions = scan_game_descriptions(game_id, game_dir).await?; + let total_size = file_descriptions + .iter() + .filter(|desc| !desc.is_dir) + .map(|desc| desc.size) + .sum(); + let manifest_hash = manifest_hash(&file_descriptions); + + Ok(GameSummary { + id: game_id.to_string(), + name: game_id.to_string(), + size: total_size, + downloaded, + installed, + eti_version, + manifest_hash, + availability: Availability::Ready, + }) +} + +fn game_from_summary(summary: &GameSummary) -> Game { + Game { + id: summary.id.clone(), + name: summary.name.clone(), + description: String::new(), + release_year: String::new(), + publisher: String::new(), + max_players: 1, + version: "1.0".to_string(), + genre: String::new(), + size: summary.size, + downloaded: summary.downloaded, + installed: summary.installed, + eti_game_version: summary.eti_version.clone(), + local_version: summary.eti_version.clone(), + peer_count: 0, + } +} + +struct IndexUpdate { + summary: Option, + changed: bool, +} + +async fn update_index_for_game( + game_root: &Path, + game_id: &str, + index: &mut LibraryIndex, +) -> eyre::Result { + let game_path = game_root.join(game_id); + let fingerprint = fingerprint_game_dir(&game_path, game_id).await?; + + if fingerprint.eti_size.is_none() { + return Ok(IndexUpdate { + summary: None, + changed: index.games.remove(game_id).is_some(), + }); + } + + let mut changed = false; + let summary = match index.games.get(game_id) { + Some(entry) if entry.fingerprint == fingerprint => entry.summary.clone(), + _ => { + changed = true; + build_game_summary(game_root, game_id).await? + } + }; + + if index + .games + .get(game_id) + .is_some_and(|entry| entry.summary.manifest_hash != summary.manifest_hash) + { + changed = true; + } + + index.games.insert( + game_id.to_string(), + GameIndexEntry { + summary: summary.clone(), + fingerprint, + }, + ); + + Ok(IndexUpdate { + summary: Some(summary), + changed, + }) +} + +fn empty_scan() -> LocalLibraryScan { + LocalLibraryScan { + game_db: GameDB::empty(), + summaries: HashMap::new(), + revision: 0, + } +} + +// ============================================================================= +// Game database loading +// ============================================================================= + +/// Scans the local game directory and returns summaries plus a game database. +pub async fn scan_local_library(game_dir: &str) -> eyre::Result { + let game_path = PathBuf::from(game_dir); + + let metadata = match tokio::fs::metadata(&game_path).await { + Ok(metadata) => metadata, + Err(err) => { + if err.kind() == ErrorKind::NotFound { + log::warn!( + "Local game directory {} missing; reporting empty game database", + game_path.display() + ); + return Ok(empty_scan()); + } + return Err(err.into()); + } + }; + + if !metadata.is_dir() { + log::warn!( + "Configured game directory {} is not a directory; reporting empty game database", + game_path.display() + ); + return Ok(empty_scan()); + } + + let index_path = library_index_path(game_dir); + let mut index = load_library_index(&index_path).await; + let mut seen_ids = HashSet::new(); + let mut summaries = HashMap::new(); + let mut games = Vec::new(); + let mut changed = false; + + let mut entries = tokio::fs::read_dir(&game_path).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if !path.is_dir() { + continue; + } + + let Some(game_id) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + + let update = update_index_for_game(&game_path, game_id, &mut index).await?; + changed |= update.changed; + + let Some(summary) = update.summary else { + continue; + }; + + seen_ids.insert(game_id.to_string()); + summaries.insert(game_id.to_string(), summary.clone()); + games.push(game_from_summary(&summary)); + } + + let before = index.games.len(); + index.games.retain(|game_id, _| seen_ids.contains(game_id)); + if index.games.len() != before { + changed = true; + } + + if changed { + index.revision = index.revision.saturating_add(1); + if let Err(err) = save_library_index(&index_path, &index).await { + log::warn!( + "Failed to persist library index {}: {err}", + index_path.display() + ); + } + } + + Ok(LocalLibraryScan { + game_db: GameDB::from(games), + summaries, + revision: index.revision, + }) +} + +// ============================================================================= +// Game file descriptions +// ============================================================================= + +/// Gets file descriptions for a game from the local filesystem. +pub async fn get_game_file_descriptions( + game_id: &str, + game_dir: &str, +) -> Result, PeerError> { + scan_game_descriptions(game_id, &PathBuf::from(game_dir)).await +} diff --git a/crates/lanspread-peer/src/network.rs b/crates/lanspread-peer/src/network.rs index 7f1c2b4..c115efa 100644 --- a/crates/lanspread-peer/src/network.rs +++ b/crates/lanspread-peer/src/network.rs @@ -9,7 +9,16 @@ use bytes::BytesMut; use futures::{SinkExt, StreamExt}; use if_addrs::{IfAddr, Interface, get_if_addrs}; use lanspread_db::db::{Game, GameFileDescription}; -use lanspread_proto::{Message, Request, Response}; +use lanspread_proto::{ + Hello, + HelloAck, + LibraryDelta, + LibrarySnapshot, + LibrarySummary, + Message, + Request, + Response, +}; use s2n_quic::{Client as QuicClient, Connection, client::Connect, provider::limits::Limits}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; @@ -77,6 +86,43 @@ pub async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result { Ok(is_alive) } +/// Sends a single request without waiting for a response. +pub async fn send_oneway_request(peer_addr: SocketAddr, request: Request) -> eyre::Result<()> { + let mut conn = connect_to_peer(peer_addr).await?; + + let stream = conn.open_bidirectional_stream().await?; + let (_, tx) = stream.split(); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + + framed_tx.send(request.encode()).await?; + let _ = framed_tx.close().await; + Ok(()) +} + +/// Performs a hello/ack handshake with a peer. +pub async fn exchange_hello(peer_addr: SocketAddr, hello: Hello) -> eyre::Result { + let mut conn = connect_to_peer(peer_addr).await?; + + let stream = conn.open_bidirectional_stream().await?; + let (rx, tx) = stream.split(); + let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + + framed_tx.send(Request::Hello(hello).encode()).await?; + let _ = framed_tx.close().await; + + let mut data = BytesMut::new(); + while let Some(Ok(bytes)) = framed_rx.next().await { + data.extend_from_slice(&bytes); + } + + let response = Response::decode(data.freeze()); + match response { + Response::HelloAck(ack) => Ok(ack), + other => eyre::bail!("Unexpected response from peer {peer_addr}: {other:?}"), + } +} + /// Fetches the list of games from a peer. pub async fn fetch_games_from_peer(peer_addr: SocketAddr) -> eyre::Result> { let mut conn = connect_to_peer(peer_addr).await?; @@ -107,19 +153,29 @@ pub async fn fetch_games_from_peer(peer_addr: SocketAddr) -> eyre::Result) -> eyre::Result<()> { - let mut conn = connect_to_peer(peer_addr).await?; + send_oneway_request(peer_addr, Request::AnnounceGames(games)).await +} - let stream = conn.open_bidirectional_stream().await?; - let (_, tx) = stream.split(); - let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); +pub async fn send_library_summary( + peer_addr: SocketAddr, + summary: LibrarySummary, +) -> eyre::Result<()> { + send_oneway_request(peer_addr, Request::LibrarySummary(summary)).await +} - // Send AnnounceGames request - framed_tx - .send(Request::AnnounceGames(games).encode()) - .await?; - let _ = framed_tx.close().await; +pub async fn send_library_snapshot( + peer_addr: SocketAddr, + snapshot: LibrarySnapshot, +) -> eyre::Result<()> { + send_oneway_request(peer_addr, Request::LibrarySnapshot(snapshot)).await +} - Ok(()) +pub async fn send_library_delta(peer_addr: SocketAddr, delta: LibraryDelta) -> eyre::Result<()> { + send_oneway_request(peer_addr, Request::LibraryDelta(delta)).await +} + +pub async fn send_goodbye(peer_addr: SocketAddr, peer_id: String) -> eyre::Result<()> { + send_oneway_request(peer_addr, Request::Goodbye { peer_id }).await } /// Requests game file details from a peer. diff --git a/crates/lanspread-peer/src/peer_db.rs b/crates/lanspread-peer/src/peer_db.rs index cdafdd2..1d52009 100644 --- a/crates/lanspread-peer/src/peer_db.rs +++ b/crates/lanspread-peer/src/peer_db.rs @@ -8,16 +8,28 @@ use std::{ }; use lanspread_db::db::{Game, GameFileDescription}; +use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot}; + +use crate::library::compute_library_digest; +pub type PeerId = String; /// Information about a discovered peer. #[derive(Clone, Debug)] pub struct PeerInfo { + /// Stable peer identifier. + pub peer_id: PeerId, /// Network address of the peer. pub addr: SocketAddr, /// Last time we heard from this peer. pub last_seen: Instant, + /// Latest library revision advertised by the peer. + pub library_rev: u64, + /// Digest of the peer library state. + pub library_digest: u64, + /// Capability flags advertised by the peer. + pub features: Vec, /// Games this peer has available, keyed by game ID. - pub games: HashMap, + pub games: HashMap, /// File descriptions for each game, keyed by game ID. pub files: HashMap>, } @@ -25,7 +37,14 @@ pub struct PeerInfo { /// Database tracking all discovered peers and their games. #[derive(Debug)] pub struct PeerGameDB { - peers: HashMap, + peers: HashMap, + addr_index: HashMap, +} + +#[derive(Debug, Clone, Copy)] +pub struct PeerUpsert { + pub is_new: bool, + pub addr_changed: bool, } impl Default for PeerGameDB { @@ -39,59 +58,205 @@ impl PeerGameDB { pub fn new() -> Self { Self { peers: HashMap::new(), + addr_index: HashMap::new(), } } - /// Adds a new peer to the database. - pub fn add_peer(&mut self, addr: SocketAddr) { + /// Adds a new peer to the database or updates its address. + pub fn upsert_peer(&mut self, peer_id: PeerId, addr: SocketAddr) -> PeerUpsert { + if let Some(existing_id) = self.addr_index.get(&addr).cloned() + && existing_id != peer_id + { + self.peers.remove(&existing_id); + self.addr_index.remove(&addr); + } + + if let Some(peer) = self.peers.get_mut(&peer_id) { + let addr_changed = peer.addr != addr; + if addr_changed { + self.addr_index.remove(&peer.addr); + self.addr_index.insert(addr, peer_id.clone()); + peer.addr = addr; + } + peer.last_seen = Instant::now(); + return PeerUpsert { + is_new: false, + addr_changed, + }; + } + let peer_info = PeerInfo { + peer_id: peer_id.clone(), addr, last_seen: Instant::now(), + library_rev: 0, + library_digest: 0, + features: Vec::new(), games: HashMap::new(), files: HashMap::new(), }; - self.peers.insert(addr, peer_info); + self.peers.insert(peer_id.clone(), peer_info); + self.addr_index.insert(addr, peer_id); log::info!("Added peer: {addr}"); + PeerUpsert { + is_new: true, + addr_changed: false, + } } - /// Removes a peer from the database. - pub fn remove_peer(&mut self, addr: &SocketAddr) -> Option { - self.peers.remove(addr) + /// Removes a peer from the database by id. + pub fn remove_peer(&mut self, peer_id: &PeerId) -> Option { + if let Some(peer) = self.peers.remove(peer_id) { + self.addr_index.remove(&peer.addr); + return Some(peer); + } + None + } + + /// Removes a peer by address. + pub fn remove_peer_by_addr(&mut self, addr: &SocketAddr) -> Option { + let peer_id = self.addr_index.remove(addr)?; + self.peers.remove(&peer_id) + } + + /// Returns the peer id for an address if known. + #[must_use] + pub fn peer_id_for_addr(&self, addr: &SocketAddr) -> Option<&PeerId> { + self.addr_index.get(addr) + } + + /// Returns the library state for a peer if known. + #[must_use] + pub fn peer_library_state(&self, peer_id: &PeerId) -> Option<(u64, u64)> { + self.peers + .get(peer_id) + .map(|peer| (peer.library_rev, peer.library_digest)) + } + + /// Returns the number of games known for a peer. + #[must_use] + pub fn peer_game_count(&self, peer_id: &PeerId) -> usize { + self.peers.get(peer_id).map_or(0, |peer| peer.games.len()) + } + + /// Returns the feature list for a peer. + #[must_use] + pub fn peer_features(&self, peer_id: &PeerId) -> Vec { + self.peers + .get(peer_id) + .map(|peer| peer.features.clone()) + .unwrap_or_default() + } + + /// Returns the address for a peer id. + #[must_use] + pub fn peer_addr(&self, peer_id: &PeerId) -> Option { + self.peers.get(peer_id).map(|peer| peer.addr) } /// Updates the games list for a peer. - pub fn update_peer_games(&mut self, addr: SocketAddr, games: Vec) { - if let Some(peer) = self.peers.get_mut(&addr) { + pub fn update_peer_games(&mut self, peer_id: &PeerId, games: Vec) { + if let Some(peer) = self.peers.get_mut(peer_id) { let mut map = HashMap::with_capacity(games.len()); for game in games { map.insert(game.id.clone(), game); } peer.games = map; peer.last_seen = Instant::now(); - log::info!("Updated games for peer: {addr}"); + log::info!("Updated games for peer: {}", peer.addr); } } /// Updates the file descriptions for a specific game from a peer. pub fn update_peer_game_files( &mut self, - addr: SocketAddr, + peer_id: &PeerId, game_id: &str, files: Vec, ) { - if let Some(peer) = self.peers.get_mut(&addr) { + if let Some(peer) = self.peers.get_mut(peer_id) { peer.files.insert(game_id.to_string(), files); peer.last_seen = Instant::now(); } } /// Updates the last seen timestamp for a peer. - pub fn update_last_seen(&mut self, addr: &SocketAddr) { - if let Some(peer) = self.peers.get_mut(addr) { + pub fn update_last_seen(&mut self, peer_id: &PeerId) { + if let Some(peer) = self.peers.get_mut(peer_id) { peer.last_seen = Instant::now(); } } + /// Updates the last seen timestamp for a peer by address. + pub fn update_last_seen_by_addr(&mut self, addr: &SocketAddr) { + if let Some(peer_id) = self.addr_index.get(addr).cloned() + && let Some(peer) = self.peers.get_mut(&peer_id) + { + peer.last_seen = Instant::now(); + } + } + + /// Updates the library metadata for a peer. + pub fn update_peer_library( + &mut self, + peer_id: &PeerId, + library_rev: u64, + library_digest: u64, + features: Vec, + ) { + if let Some(peer) = self.peers.get_mut(peer_id) { + peer.library_rev = library_rev; + peer.library_digest = library_digest; + peer.features = features; + peer.last_seen = Instant::now(); + } + } + + /// Applies a full library snapshot for a peer. + pub fn apply_library_snapshot(&mut self, peer_id: &PeerId, snapshot: LibrarySnapshot) { + if let Some(peer) = self.peers.get_mut(peer_id) { + let mut map = HashMap::with_capacity(snapshot.games.len()); + for game in snapshot.games { + map.insert(game.id.clone(), game); + } + let digest = compute_library_digest(&map); + peer.games = map; + peer.library_rev = snapshot.library_rev; + peer.library_digest = digest; + peer.last_seen = Instant::now(); + } + } + + /// Applies a library delta for a peer. Returns true when applied. + pub fn apply_library_delta(&mut self, peer_id: &PeerId, delta: LibraryDelta) -> bool { + let Some(peer) = self.peers.get_mut(peer_id) else { + return false; + }; + + if delta.to_rev <= peer.library_rev { + return false; + } + + if delta.from_rev != peer.library_rev { + return false; + } + + for game in delta.added { + peer.games.insert(game.id.clone(), game); + } + for game in delta.updated { + peer.games.insert(game.id.clone(), game); + } + for game_id in delta.removed { + peer.games.remove(&game_id); + } + + peer.library_rev = delta.to_rev; + peer.library_digest = compute_library_digest(&peer.games); + peer.last_seen = Instant::now(); + true + } + /// Returns all games aggregated from all peers. #[must_use] pub fn get_all_games(&self) -> Vec { @@ -112,19 +277,27 @@ impl PeerGameDB { .entry(game.id.clone()) .and_modify(|existing| { if let (Some(new_version), Some(current)) = - (&game.eti_game_version, &existing.eti_game_version) + (&game.eti_version, &existing.eti_game_version) { if new_version > current { existing.eti_game_version = Some(new_version.clone()); } } else if existing.eti_game_version.is_none() { - existing.eti_game_version.clone_from(&game.eti_game_version); + existing.eti_game_version.clone_from(&game.eti_version); } - // Update peer count existing.peer_count = peer_counts[&game.id]; + if game.size > existing.size { + existing.size = game.size; + } + if game.downloaded { + existing.downloaded = true; + } + if game.installed { + existing.installed = true; + } }) .or_insert_with(|| { - let mut game_clone = game.clone(); + let mut game_clone = summary_to_game(game); game_clone.peer_count = peer_counts[&game.id]; game_clone }); @@ -143,7 +316,7 @@ impl PeerGameDB { for peer in self.peers.values() { if let Some(game) = peer.games.get(game_id) - && let Some(ref version) = game.eti_game_version + && let Some(ref version) = game.eti_version { match &latest_version { None => latest_version = Some(version.clone()), @@ -162,13 +335,37 @@ impl PeerGameDB { /// Returns all peer addresses. #[must_use] pub fn get_peer_addresses(&self) -> Vec { - self.peers.keys().copied().collect() + self.peers.values().map(|peer| peer.addr).collect() + } + + /// Returns peer liveness info for ping scheduling. + #[must_use] + pub fn peer_liveness_snapshot(&self) -> Vec<(PeerId, SocketAddr, Instant)> { + self.peers + .values() + .map(|peer| (peer.peer_id.clone(), peer.addr, peer.last_seen)) + .collect() + } + + /// Returns peer ids with their current addresses. + #[must_use] + pub fn peer_identities(&self) -> Vec<(PeerId, SocketAddr)> { + self.peers + .values() + .map(|peer| (peer.peer_id.clone(), peer.addr)) + .collect() } /// Checks if a peer is in the database. #[must_use] - pub fn contains_peer(&self, addr: &SocketAddr) -> bool { - self.peers.contains_key(addr) + pub fn contains_peer(&self, peer_id: &PeerId) -> bool { + self.peers.contains_key(peer_id) + } + + /// Checks if a peer address is in the database. + #[must_use] + pub fn contains_peer_addr(&self, addr: &SocketAddr) -> bool { + self.addr_index.contains_key(addr) } /// Returns addresses of peers that have a specific game. @@ -177,7 +374,7 @@ impl PeerGameDB { self.peers .iter() .filter(|(_, peer)| peer.games.contains_key(game_id)) - .map(|(addr, _)| *addr) + .map(|(_, peer)| peer.addr) .collect() } @@ -191,7 +388,7 @@ impl PeerGameDB { .iter() .filter(|(_, peer)| { if let Some(game) = peer.games.get(game_id) { - if let Some(ref version) = game.eti_game_version { + if let Some(ref version) = game.eti_version { version == latest } else { false @@ -200,7 +397,7 @@ impl PeerGameDB { false } }) - .map(|(addr, _)| *addr) + .map(|(_, peer)| peer.addr) .collect() } else { // If no version info is available, fall back to all peers with the game @@ -213,7 +410,12 @@ impl PeerGameDB { pub fn game_files_for(&self, game_id: &str) -> Vec<(SocketAddr, Vec)> { self.peers .iter() - .filter_map(|(addr, peer)| peer.files.get(game_id).cloned().map(|files| (*addr, files))) + .filter_map(|(_, peer)| { + peer.files + .get(game_id) + .cloned() + .map(|files| (peer.addr, files)) + }) .collect() } @@ -373,10 +575,8 @@ impl PeerGameDB { peers: &[SocketAddr], ) -> Option { if let Some(first_peer) = peers.first() - && let Some(files) = self - .peers - .get(first_peer) - .and_then(|p| p.files.get(game_id)) + && let Some(peer_id) = self.addr_index.get(first_peer) + && let Some(files) = self.peers.get(peer_id).and_then(|p| p.files.get(game_id)) && let Some(file_desc) = files .iter() .find(|f| f.relative_path == relative_path && f.size == size) @@ -390,9 +590,19 @@ impl PeerGameDB { #[must_use] pub fn get_stale_peers(&self, timeout: Duration) -> Vec { self.peers - .iter() - .filter(|(_, peer)| peer.last_seen.elapsed() > timeout) - .map(|(addr, _)| *addr) + .values() + .filter(|peer| peer.last_seen.elapsed() > timeout) + .map(|peer| peer.addr) + .collect() + } + + /// Returns stale peer ids that exceeded the timeout. + #[must_use] + pub fn get_stale_peer_ids(&self, timeout: Duration) -> Vec { + self.peers + .values() + .filter(|peer| peer.last_seen.elapsed() > timeout) + .map(|peer| peer.peer_id.clone()) .collect() } } @@ -500,3 +710,22 @@ fn create_peer_whitelist(peer_scores: HashMap) -> Vec Game { + Game { + id: summary.id.clone(), + name: summary.name.clone(), + description: String::new(), + release_year: String::new(), + publisher: String::new(), + max_players: 1, + version: "1.0".to_string(), + genre: String::new(), + size: summary.size, + downloaded: summary.downloaded, + installed: summary.installed, + eti_game_version: summary.eti_version.clone(), + local_version: None, + peer_count: 0, + } +} diff --git a/crates/lanspread-peer/src/services.rs b/crates/lanspread-peer/src/services.rs index 7497300..a50c71e 100644 --- a/crates/lanspread-peer/src/services.rs +++ b/crates/lanspread-peer/src/services.rs @@ -10,15 +10,23 @@ use std::{ use futures::{SinkExt, StreamExt}; use lanspread_db::db::Game; -use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser}; -use lanspread_proto::{Message, Request, Response}; +use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser, MdnsService}; +use lanspread_proto::{ + Hello, + HelloAck, + LibraryDelta, + LibrarySnapshot, + Message, + PROTOCOL_VERSION, + Request, + Response, +}; use s2n_quic::{Connection, Server, provider::limits::Limits, stream::BidirectionalStream}; use tokio::{ sync::{RwLock, mpsc::UnboundedSender}, task::JoinHandle, }; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; -use uuid::Uuid; use crate::{ PeerEvent, @@ -32,10 +40,25 @@ use crate::{ context::{Ctx, PeerCtx}, error::PeerError, handlers::{emit_peer_game_list, update_and_announce_games}, - local_games::{get_game_file_descriptions, scan_local_games}, - network::{fetch_games_from_peer, ping_peer, select_advertise_ip}, + identity::default_features, + library::{ + LocalLibraryState, + build_library_snapshot, + build_library_summary, + compute_library_digest, + }, + local_games::{get_game_file_descriptions, scan_local_library}, + network::{ + exchange_hello, + fetch_games_from_peer, + ping_peer, + select_advertise_ip, + send_library_delta, + send_library_snapshot, + send_library_summary, + }, peer::{send_game_file_chunk, send_game_file_data}, - peer_db::PeerGameDB, + peer_db::{PeerGameDB, PeerId, PeerUpsert}, }; // ============================================================================= @@ -70,7 +93,7 @@ pub async fn run_server_component( } // Start mDNS advertising for peer discovery - let peer_id = Uuid::now_v7().simple().to_string(); + let peer_id = ctx.peer_id.as_ref().clone(); let hostname = gethostname::gethostname(); let hostname_str = hostname.to_str().unwrap_or(""); @@ -83,13 +106,32 @@ pub async fn run_server_component( }; let combined_str = if truncated_hostname.is_empty() { - peer_id + peer_id.clone() } else { format!("{truncated_hostname}-{peer_id}") }; + let (library_rev, library_digest) = { + let library_guard = ctx.local_library.read().await; + (library_guard.revision, library_guard.digest) + }; + + let mut properties = HashMap::new(); + properties.insert("peer_id".to_string(), peer_id.clone()); + properties.insert("proto_ver".to_string(), PROTOCOL_VERSION.to_string()); + properties.insert("library_rev".to_string(), library_rev.to_string()); + properties.insert("library_digest".to_string(), library_digest.to_string()); + if !hostname_str.is_empty() { + properties.insert("hostname".to_string(), hostname_str.to_string()); + } + let mdns = tokio::task::spawn_blocking(move || { - MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, advertise_addr) + MdnsAdvertiser::new( + LANSPREAD_SERVICE_TYPE, + &combined_str, + advertise_addr, + Some(properties), + ) }) .await??; @@ -157,6 +199,279 @@ async fn handle_peer_connection( Ok(()) } +enum LibraryUpdate { + Delta(LibraryDelta), + Snapshot(LibrarySnapshot), +} + +async fn build_hello_from_state( + peer_id: &str, + local_library: &Arc>, +) -> Hello { + let library_guard = local_library.read().await; + Hello { + peer_id: peer_id.to_string(), + proto_ver: PROTOCOL_VERSION, + library_rev: library_guard.revision, + library_digest: library_guard.digest, + features: default_features(), + } +} + +async fn build_hello_ack(ctx: &PeerCtx) -> HelloAck { + let library_guard = ctx.local_library.read().await; + HelloAck { + peer_id: ctx.peer_id.as_ref().clone(), + proto_ver: PROTOCOL_VERSION, + library_rev: library_guard.revision, + library_digest: library_guard.digest, + features: default_features(), + } +} + +async fn select_library_update( + local_library: &Arc>, + remote_rev: u64, + remote_digest: u64, +) -> Option { + let library_guard = local_library.read().await; + if library_guard.digest == remote_digest { + return None; + } + + if remote_rev > library_guard.revision { + return None; + } + + if let Some(delta) = library_guard.delta_since(remote_rev) { + return Some(LibraryUpdate::Delta(delta)); + } + + Some(LibraryUpdate::Snapshot(build_library_snapshot( + &library_guard, + ))) +} + +async fn ensure_peer_id_for_addr( + peer_game_db: &Arc>, + peer_addr: SocketAddr, +) -> PeerId { + let mut db = peer_game_db.write().await; + if let Some(peer_id) = db.peer_id_for_addr(&peer_addr).cloned() { + return peer_id; + } + + let legacy_id = format!("legacy-{peer_addr}"); + db.upsert_peer(legacy_id.clone(), peer_addr); + legacy_id +} + +fn summary_from_game(game: &Game) -> lanspread_proto::GameSummary { + lanspread_proto::GameSummary { + id: game.id.clone(), + name: game.name.clone(), + size: game.size, + downloaded: game.downloaded, + installed: game.installed, + eti_version: game.eti_game_version.clone(), + manifest_hash: 0, + availability: lanspread_proto::Availability::Ready, + } +} + +async fn perform_handshake_with_peer( + peer_id: Arc, + local_library: Arc>, + peer_game_db: Arc>, + tx_notify_ui: UnboundedSender, + peer_addr: SocketAddr, + peer_id_hint: Option, +) -> eyre::Result<()> { + let hello = build_hello_from_state(peer_id.as_ref(), &local_library).await; + let ack = exchange_hello(peer_addr, hello).await?; + + if ack.proto_ver != PROTOCOL_VERSION { + log::warn!( + "Peer {peer_addr} uses incompatible protocol {} (expected {PROTOCOL_VERSION})", + ack.proto_ver + ); + return Ok(()); + } + + if ack.peer_id == *peer_id { + log::trace!("Ignoring handshake with self for {peer_addr}"); + return Ok(()); + } + + if let Some(expected) = peer_id_hint.as_ref() + && expected != &ack.peer_id + { + log::warn!( + "Peer {peer_addr} id mismatch: mDNS advertised {expected}, hello ack returned {}", + ack.peer_id + ); + let _ = peer_game_db.write().await.remove_peer(expected); + } + + let upsert = { + let mut db = peer_game_db.write().await; + let upsert = db.upsert_peer(ack.peer_id.clone(), peer_addr); + db.update_peer_library( + &ack.peer_id, + ack.library_rev, + ack.library_digest, + ack.features.clone(), + ); + upsert + }; + + if upsert.is_new { + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) { + log::error!("Failed to send PeerDiscovered event: {e}"); + } + + let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() }; + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) { + log::error!("Failed to send PeerCountUpdated event: {e}"); + } + + let summary = { + let library_guard = local_library.read().await; + build_library_summary(&library_guard) + }; + tokio::spawn(async move { + if let Err(e) = send_library_summary(peer_addr, summary).await { + log::warn!("Failed to send library summary to {peer_addr}: {e}"); + } + }); + } + + if let Some(update) = + select_library_update(&local_library, ack.library_rev, ack.library_digest).await + { + tokio::spawn(async move { + let result = match update { + LibraryUpdate::Delta(delta) => send_library_delta(peer_addr, delta).await, + LibraryUpdate::Snapshot(snapshot) => { + send_library_snapshot(peer_addr, snapshot).await + } + }; + + if let Err(e) = result { + log::warn!("Failed to send library update to {peer_addr}: {e}"); + } + }); + } + + Ok(()) +} + +struct MdnsPeerInfo { + addr: SocketAddr, + peer_id: Option, + proto_ver: Option, + library_rev: u64, + library_digest: u64, +} + +fn parse_mdns_peer(service: &MdnsService) -> MdnsPeerInfo { + let peer_id = service.properties.get("peer_id").cloned(); + let proto_ver = service + .properties + .get("proto_ver") + .and_then(|value| value.parse::().ok()); + let library_rev = service + .properties + .get("library_rev") + .and_then(|value| value.parse::().ok()) + .unwrap_or(0); + let library_digest = service + .properties + .get("library_digest") + .and_then(|value| value.parse::().ok()) + .unwrap_or(0); + + MdnsPeerInfo { + addr: service.addr, + peer_id, + proto_ver, + library_rev, + library_digest, + } +} + +async fn is_self_advertisement(info: &MdnsPeerInfo, ctx: &Ctx) -> bool { + let guard = ctx.local_peer_addr.read().await; + guard.as_ref().is_some_and(|addr| *addr == info.addr) + || info + .peer_id + .as_ref() + .is_some_and(|peer_id| peer_id == ctx.peer_id.as_ref()) +} + +async fn handle_discovered_peer( + info: MdnsPeerInfo, + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, +) { + let peer_id = info + .peer_id + .unwrap_or_else(|| format!("legacy-{}", info.addr)); + let upsert = { + let mut db = ctx.peer_game_db.write().await; + let upsert = db.upsert_peer(peer_id.clone(), info.addr); + let features = db.peer_features(&peer_id); + if info.library_rev > 0 || info.library_digest > 0 { + db.update_peer_library(&peer_id, info.library_rev, info.library_digest, features); + } + upsert + }; + + if upsert.is_new { + log::info!("Discovered peer at: {}", info.addr); + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(info.addr)) { + log::error!("Failed to send PeerDiscovered event: {e}"); + } + + let current_peer_count = ctx.peer_game_db.read().await.get_peer_addresses().len(); + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) { + log::error!("Failed to send PeerCountUpdated event: {e}"); + } + } + + if upsert.is_new || upsert.addr_changed { + let peer_id_arc = ctx.peer_id.clone(); + let local_library = ctx.local_library.clone(); + let peer_game_db = ctx.peer_game_db.clone(); + let tx_notify_ui_clone = tx_notify_ui.clone(); + let peer_id_hint = Some(peer_id.clone()); + + tokio::spawn(async move { + let handshake_result = + if info.proto_ver.is_none() || info.proto_ver == Some(PROTOCOL_VERSION) { + perform_handshake_with_peer( + peer_id_arc, + local_library, + peer_game_db.clone(), + tx_notify_ui_clone.clone(), + info.addr, + peer_id_hint.clone(), + ) + .await + } else { + Err(eyre::eyre!("Skipping hello for legacy peer")) + }; + + if handshake_result.is_err() + && let Err(e) = + request_games_from_peer(info.addr, tx_notify_ui_clone, peer_game_db, 0).await + { + log::error!("Failed to request games from peer {}: {e}", info.addr); + } + }); + } +} + /// Handles a bidirectional stream from a peer. #[allow(clippy::too_many_lines)] async fn handle_peer_stream( @@ -183,22 +498,124 @@ async fn handle_peer_stream( let request = Request::decode(data.freeze()); log::debug!("{remote_addr:?} msg: {request:?}"); + if let Some(addr) = remote_addr { + ctx.peer_game_db + .write() + .await + .update_last_seen_by_addr(&addr); + } + match request { Request::Ping => { - // Respond with pong if let Err(e) = framed_tx.send(Response::Pong.encode()).await { log::error!("Failed to send pong: {e}"); } } + Request::Hello(hello) => { + if hello.peer_id == *ctx.peer_id { + log::trace!("Ignoring hello from self"); + let ack = build_hello_ack(&ctx).await; + if let Err(e) = framed_tx.send(Response::HelloAck(ack).encode()).await { + log::error!("Failed to send HelloAck: {e}"); + } + continue; + } + + if hello.proto_ver != PROTOCOL_VERSION { + log::warn!( + "Incompatible protocol from {remote_addr:?}: {}", + hello.proto_ver + ); + let ack = build_hello_ack(&ctx).await; + if let Err(e) = framed_tx.send(Response::HelloAck(ack).encode()).await { + log::error!("Failed to send HelloAck: {e}"); + } + continue; + } + + let upsert = if let Some(addr) = remote_addr { + let mut db = ctx.peer_game_db.write().await; + let upsert = db.upsert_peer(hello.peer_id.clone(), addr); + db.update_peer_library( + &hello.peer_id, + hello.library_rev, + hello.library_digest, + hello.features.clone(), + ); + upsert + } else { + PeerUpsert { + is_new: false, + addr_changed: false, + } + }; + + if upsert.is_new + && let Some(addr) = remote_addr + { + if let Err(e) = ctx.tx_notify_ui.send(PeerEvent::PeerDiscovered(addr)) { + log::error!("Failed to send PeerDiscovered event: {e}"); + } + + let current_peer_count = + { ctx.peer_game_db.read().await.get_peer_addresses().len() }; + if let Err(e) = ctx + .tx_notify_ui + .send(PeerEvent::PeerCountUpdated(current_peer_count)) + { + log::error!("Failed to send PeerCountUpdated event: {e}"); + } + } + + let ack = build_hello_ack(&ctx).await; + if let Err(e) = framed_tx.send(Response::HelloAck(ack).encode()).await { + log::error!("Failed to send HelloAck: {e}"); + } + + if let Some(addr) = remote_addr { + if upsert.is_new { + let summary = { + let library_guard = ctx.local_library.read().await; + build_library_summary(&library_guard) + }; + tokio::spawn(async move { + if let Err(e) = send_library_summary(addr, summary).await { + log::warn!("Failed to send library summary to {addr}: {e}"); + } + }); + } + + if let Some(update) = select_library_update( + &ctx.local_library, + hello.library_rev, + hello.library_digest, + ) + .await + { + tokio::spawn(async move { + let result = match update { + LibraryUpdate::Delta(delta) => { + send_library_delta(addr, delta).await + } + LibraryUpdate::Snapshot(snapshot) => { + send_library_snapshot(addr, snapshot).await + } + }; + + if let Err(e) = result { + log::warn!("Failed to send library update to {addr}: {e}"); + } + }); + } + } + } Request::ListGames => { - // Return list of games from this peer log::info!("Received ListGames request from peer"); let snapshot = { let db_guard = ctx.local_game_db.read().await; if let Some(ref db) = *db_guard { db.all_games().into_iter().cloned().collect::>() } else { - // Local database not loaded yet, return empty result log::info!( "Local game database not yet loaded, responding with empty game list" ); @@ -219,6 +636,94 @@ async fn handle_peer_stream( log::error!("Failed to send ListGames response: {e}"); } } + Request::LibrarySummary(summary) => { + if let Some(addr) = remote_addr { + let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await; + let (previous_digest, previous_count, features) = { + let db = ctx.peer_game_db.read().await; + let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0)); + ( + digest, + db.peer_game_count(&peer_id), + db.peer_features(&peer_id), + ) + }; + + { + let mut db = ctx.peer_game_db.write().await; + db.update_peer_library( + &peer_id, + summary.library_rev, + summary.library_digest, + features, + ); + } + + if summary.library_digest != previous_digest || previous_count == 0 { + let peer_id_arc = ctx.peer_id.clone(); + let local_library = ctx.local_library.clone(); + let peer_game_db = ctx.peer_game_db.clone(); + let tx_notify_ui = ctx.tx_notify_ui.clone(); + tokio::spawn(async move { + if let Err(e) = perform_handshake_with_peer( + peer_id_arc, + local_library, + peer_game_db, + tx_notify_ui, + addr, + Some(peer_id), + ) + .await + { + log::warn!("Failed to refresh library from {addr}: {e}"); + } + }); + } + } + } + Request::LibrarySnapshot(snapshot) => { + if let Some(addr) = remote_addr { + let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await; + { + let mut db = ctx.peer_game_db.write().await; + db.apply_library_snapshot(&peer_id, snapshot); + } + + emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; + } + } + Request::LibraryDelta(delta) => { + if let Some(addr) = remote_addr { + let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await; + let applied = { + let mut db = ctx.peer_game_db.write().await; + db.apply_library_delta(&peer_id, delta) + }; + + if applied { + emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; + } else { + let peer_id_arc = ctx.peer_id.clone(); + let local_library = ctx.local_library.clone(); + let peer_game_db = ctx.peer_game_db.clone(); + let tx_notify_ui = ctx.tx_notify_ui.clone(); + tokio::spawn(async move { + if let Err(e) = perform_handshake_with_peer( + peer_id_arc, + local_library, + peer_game_db, + tx_notify_ui, + addr, + Some(peer_id), + ) + .await + { + log::warn!("Failed to resync library from {addr}: {e}"); + } + }); + } + } + } Request::GetGame { id } => { log::info!("Received GetGame request for {id} from peer"); let downloading = ctx.downloading_games.read().await.contains(&id); @@ -274,10 +779,8 @@ async fn handle_peer_stream( let maybe_game_dir = ctx.game_dir.read().await.clone(); if let Some(game_dir) = maybe_game_dir { let base_dir = PathBuf::from(game_dir); - // For file data, we need the raw stream, so we unwrap the FramedWrite let mut tx = framed_tx.into_inner(); send_game_file_data(&desc, &mut tx, &base_dir).await; - // Re-wrap for next iteration (though usually stream closes after file transfer) framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); } else if let Err(e) = framed_tx .send( @@ -305,7 +808,6 @@ async fn handle_peer_stream( let maybe_game_dir = ctx.game_dir.read().await.clone(); if let Some(game_dir) = maybe_game_dir { let base_dir = PathBuf::from(game_dir); - // For file data, we need the raw stream, so we unwrap the FramedWrite let mut tx = framed_tx.into_inner(); send_game_file_chunk( &game_id, @@ -316,7 +818,6 @@ async fn handle_peer_stream( &base_dir, ) .await; - // Re-wrap for next iteration framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); } else if let Err(e) = framed_tx .send( @@ -331,6 +832,28 @@ async fn handle_peer_stream( log::error!("Failed to send GetGameFileChunk error: {e}"); } } + Request::Goodbye { peer_id } => { + log::info!("Received Goodbye from peer {peer_id}"); + let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) }; + if removed.is_some() { + if let Some(addr) = remote_addr { + if let Err(e) = ctx.tx_notify_ui.send(PeerEvent::PeerLost(addr)) { + log::error!("Failed to send PeerLost event: {e}"); + } + + let current_peer_count = + { ctx.peer_game_db.read().await.get_peer_addresses().len() }; + if let Err(e) = ctx + .tx_notify_ui + .send(PeerEvent::PeerCountUpdated(current_peer_count)) + { + log::error!("Failed to send PeerCountUpdated event: {e}"); + } + } + + emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; + } + } Request::Invalid(_, _) => { log::error!("Received invalid request from peer"); } @@ -340,9 +863,18 @@ async fn handle_peer_stream( games.len() ); if let Some(addr) = remote_addr { + let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await; + let summaries: Vec<_> = games.iter().map(summary_from_game).collect(); + let mut map = HashMap::with_capacity(summaries.len()); + for summary in &summaries { + map.insert(summary.id.clone(), summary.clone()); + } + let digest = compute_library_digest(&map); let aggregated_games = { let mut db = ctx.peer_game_db.write().await; - db.update_peer_games(addr, games); + db.update_peer_games(&peer_id, summaries); + let features = db.peer_features(&peer_id); + db.update_peer_library(&peer_id, 0, digest, features); db.get_all_games() }; @@ -376,24 +908,20 @@ async fn handle_peer_stream( // ============================================================================= /// Runs the peer discovery service using mDNS. -pub async fn run_peer_discovery( - tx_notify_ui: UnboundedSender, - peer_game_db: Arc>, - local_peer_addr: Arc>>, -) { +pub async fn run_peer_discovery(tx_notify_ui: UnboundedSender, ctx: Ctx) { log::info!("Starting peer discovery task"); let service_type = LANSPREAD_SERVICE_TYPE.to_string(); loop { - let (addr_tx, mut addr_rx) = tokio::sync::mpsc::unbounded_channel(); + let (service_tx, mut service_rx) = tokio::sync::mpsc::unbounded_channel(); let service_type_clone = service_type.clone(); let worker_handle = tokio::task::spawn_blocking(move || -> eyre::Result<()> { let browser = MdnsBrowser::new(&service_type_clone)?; loop { - if let Some(addr) = browser.next_address(None)? { - if addr_tx.send(addr).is_err() { + if let Some(service) = browser.next_service(None)? { + if service_tx.send(service).is_err() { log::debug!("Peer discovery consumer dropped; stopping worker"); break; } @@ -405,55 +933,14 @@ pub async fn run_peer_discovery( Ok(()) }); - while let Some(peer_addr) = addr_rx.recv().await { - let is_self = { - let guard = local_peer_addr.read().await; - guard.as_ref().is_some_and(|addr| *addr == peer_addr) - }; - - if is_self { - log::trace!("Ignoring self advertisement at {peer_addr}"); + while let Some(service) = service_rx.recv().await { + let info = parse_mdns_peer(&service); + if is_self_advertisement(&info, &ctx).await { + log::trace!("Ignoring self advertisement at {}", info.addr); continue; } - let is_new_peer = { - let mut db = peer_game_db.write().await; - if db.contains_peer(&peer_addr) { - db.update_last_seen(&peer_addr); - false - } else { - db.add_peer(peer_addr); - true - } - }; - - if is_new_peer { - log::info!("Discovered peer at: {peer_addr}"); - - if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) { - log::error!("Failed to send PeerDiscovered event: {e}"); - } - - let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() }; - if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) { - log::error!("Failed to send PeerCountUpdated event: {e}"); - } - - let tx_notify_ui_clone = tx_notify_ui.clone(); - let peer_game_db_clone = peer_game_db.clone(); - tokio::spawn(async move { - if let Err(e) = request_games_from_peer( - peer_addr, - tx_notify_ui_clone, - peer_game_db_clone, - 0, - ) - .await - { - log::error!("Failed to request games from peer {peer_addr}: {e}"); - } - }); - } + handle_discovered_peer(info, &ctx, &tx_notify_ui).await; } match worker_handle.await { @@ -491,9 +978,21 @@ async fn request_games_from_peer( continue; } + let mut map = HashMap::with_capacity(games.len()); + let mut summaries = Vec::with_capacity(games.len()); + for game in &games { + let summary = summary_from_game(game); + map.insert(summary.id.clone(), summary.clone()); + summaries.push(summary); + } + let digest = compute_library_digest(&map); + let peer_id = ensure_peer_id_for_addr(&peer_game_db, peer_addr).await; + let aggregated_games = { let mut db = peer_game_db.write().await; - db.update_peer_games(peer_addr, games); + db.update_peer_games(&peer_id, summaries); + let features = db.peer_features(&peer_id); + db.update_peer_library(&peer_id, 0, digest, features); db.get_all_games() }; @@ -521,7 +1020,8 @@ pub async fn run_ping_service( ) { log::info!( "Starting ping service ({PEER_PING_INTERVAL_SECS}s interval, \ -{}s timeout)", +{}s idle threshold, {}s timeout)", + crate::config::PEER_PING_IDLE_SECS, peer_stale_timeout().as_secs() ); @@ -530,9 +1030,13 @@ pub async fn run_ping_service( loop { interval.tick().await; - let peer_addresses = { peer_game_db.read().await.get_peer_addresses() }; + let peer_snapshots = { peer_game_db.read().await.peer_liveness_snapshot() }; + + for (peer_id, peer_addr, last_seen) in peer_snapshots { + if last_seen.elapsed() < Duration::from_secs(crate::config::PEER_PING_IDLE_SECS) { + continue; + } - for peer_addr in peer_addresses { let tx_notify_ui_clone = tx_notify_ui.clone(); let peer_game_db_clone = peer_game_db.clone(); let downloading_games_clone = downloading_games.clone(); @@ -542,26 +1046,19 @@ pub async fn run_ping_service( match ping_peer(peer_addr).await { Ok(is_alive) => { if is_alive { - // Update last seen time - peer_game_db_clone - .write() - .await - .update_last_seen(&peer_addr); + peer_game_db_clone.write().await.update_last_seen(&peer_id); } else { log::warn!("Peer {peer_addr} failed ping check"); - - // Remove stale peer let removed_peer = - peer_game_db_clone.write().await.remove_peer(&peer_addr); - if removed_peer.is_some() { - log::info!("Removed stale peer: {peer_addr}"); + peer_game_db_clone.write().await.remove_peer(&peer_id); + if let Some(peer) = removed_peer { + log::info!("Removed stale peer: {}", peer.addr); if let Err(e) = - tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr)) + tx_notify_ui_clone.send(PeerEvent::PeerLost(peer.addr)) { log::error!("Failed to send PeerLost event: {e}"); } - // Send updated peer count let current_peer_count = { peer_game_db_clone.read().await.get_peer_addresses().len() }; if let Err(e) = tx_notify_ui_clone @@ -583,17 +1080,14 @@ pub async fn run_ping_service( } Err(e) => { log::error!("Failed to ping peer {peer_addr}: {e}"); - - // Remove peer on error - let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_addr); - if removed_peer.is_some() { - log::info!("Removed peer due to ping error: {peer_addr}"); - if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr)) + let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_id); + if let Some(peer) = removed_peer { + log::info!("Removed peer due to ping error: {}", peer.addr); + if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer.addr)) { log::error!("Failed to send PeerLost event: {e}"); } - // Send updated peer count let current_peer_count = { peer_game_db_clone.read().await.get_peer_addresses().len() }; if let Err(e) = tx_notify_ui_clone @@ -621,14 +1115,14 @@ pub async fn run_ping_service( peer_game_db .read() .await - .get_stale_peers(peer_stale_timeout()) + .get_stale_peer_ids(peer_stale_timeout()) }; let mut removed_any = false; - for stale_addr in stale_peers { - let removed_peer = peer_game_db.write().await.remove_peer(&stale_addr); - if removed_peer.is_some() { - log::info!("Removed stale peer: {stale_addr}"); - if let Err(e) = tx_notify_ui.send(PeerEvent::PeerLost(stale_addr)) { + for stale_peer_id in stale_peers { + let removed_peer = peer_game_db.write().await.remove_peer(&stale_peer_id); + if let Some(peer) = removed_peer { + log::info!("Removed stale peer: {}", peer.addr); + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerLost(peer.addr)) { log::error!("Failed to send PeerLost event: {e}"); } @@ -718,9 +1212,9 @@ pub async fn run_local_game_monitor(tx_notify_ui: UnboundedSender, ct }; if let Some(ref game_dir) = game_dir { - match scan_local_games(game_dir).await { - Ok(current_games) => { - update_and_announce_games(&ctx, &tx_notify_ui, current_games).await; + match scan_local_library(game_dir).await { + Ok(scan) => { + update_and_announce_games(&ctx, &tx_notify_ui, scan).await; } Err(e) => { log::error!("Failed to scan local games directory: {e}"); diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index 6f7732c..e2a2fab 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -2,6 +2,67 @@ use bytes::Bytes; use lanspread_db::db::{Game, GameFileDescription}; use serde::{Deserialize, Serialize}; +pub const PROTOCOL_VERSION: u32 = 2; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub enum Availability { + Ready, + Downloading, + LocalOnly, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct GameSummary { + pub id: String, + pub name: String, + pub size: u64, + pub downloaded: bool, + pub installed: bool, + pub eti_version: Option, + pub manifest_hash: u64, + pub availability: Availability, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Hello { + pub peer_id: String, + pub proto_ver: u32, + pub library_rev: u64, + pub library_digest: u64, + pub features: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct HelloAck { + pub peer_id: String, + pub proto_ver: u32, + pub library_rev: u64, + pub library_digest: u64, + pub features: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LibrarySummary { + pub library_rev: u64, + pub library_digest: u64, + pub game_count: usize, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LibrarySnapshot { + pub library_rev: u64, + pub games: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LibraryDelta { + pub from_rev: u64, + pub to_rev: u64, + pub added: Vec, + pub updated: Vec, + pub removed: Vec, +} + #[derive(Debug, Serialize, Deserialize)] pub enum Request { Ping, @@ -17,6 +78,13 @@ pub enum Request { length: u64, }, AnnounceGames(Vec), + Hello(Hello), + LibrarySummary(LibrarySummary), + LibrarySnapshot(LibrarySnapshot), + LibraryDelta(LibraryDelta), + Goodbye { + peer_id: String, + }, Invalid(Bytes, String), } @@ -28,6 +96,7 @@ pub enum Response { id: String, file_descriptions: Vec, }, + HelloAck(HelloAck), GameNotFound(String), InvalidRequest(Bytes, String), EncodingError(String),