From 738095235fa15cda9f1317429da5248ff025e06f Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sat, 30 May 2026 15:37:34 +0200 Subject: [PATCH] feat(peer): coordinate outbound transfers with local game mutations Updating or removing a local game rewrites its on-disk files. Peers that were mid-download of that game would keep streaming bytes from files that are being deleted or replaced, handing them a corrupt or stale copy. There was also no authoritative notion of which game version a peer should serve or accept, so a peer could serve whatever happened to be on disk and downloaders could aggregate files from peers running mismatched versions. This introduces a reader-writer coordination scheme between outbound file transfers (readers) and local mutation operations (writers), and gates both serving and downloading on an authoritative game catalog version. Reader-writer coordination: - Track active outbound transfers per game in a shared `OutboundTransfers` map of (id, CancellationToken), threaded through `Ctx`/`PeerCtx` and registered by a `TransferGuard` in the stream service. The guard is registered *before* the serve-eligibility check to close a TOCTOU window where a writer could miss an in-flight reader. - `stream_file_bytes` now honors a cancellation token at every await point (file read, network send, stream close) via `tokio::select!`, so a transfer aborts promptly instead of hanging on a stalled receiver. - `begin_operation` marks a game active first, then cancels its outbound transfers and waits for the count to reach zero before any Updating/RemovingDownload work touches the filesystem. - Active games are now hidden from library snapshots entirely while an operation is in flight, instead of freezing their last announced state, so peers stop discovering a game that is being mutated. Authoritative version catalog: - Replace the `HashSet` catalog with `GameCatalog`, mapping each game id to its expected version (from the bundled game.db / ETI data). - Serving requires the local `version.ini` to match the catalog version (`local_download_matches_catalog`); peer selection, file aggregation, and majority size validation all filter on the expected version (`peers_with_expected_version`, `aggregated_game_files`, and friends). User-visible changes: - The GUI shows confirmation dialogs before Update and Remove, and surfaces a sharing-status indicator on game cards and the detail modal. - A new `OutboundTransferCountChanged` event lets the UI reflect live outbound transfer activity. Test Plan: - just test - just frontend-test - just clippy --- Cargo.lock | 1 + crates/lanspread-compat/src/eti.rs | 4 +- crates/lanspread-db/src/db.rs | 56 +++++- crates/lanspread-peer-cli/src/main.rs | 36 ++-- crates/lanspread-peer/src/context.rs | 24 ++- crates/lanspread-peer/src/events.rs | 7 +- crates/lanspread-peer/src/handlers.rs | 178 ++++++++++++------ crates/lanspread-peer/src/lib.rs | 23 ++- crates/lanspread-peer/src/local_games.rs | 80 ++++++-- crates/lanspread-peer/src/peer.rs | 78 +++++++- crates/lanspread-peer/src/peer_db.rs | 150 ++++++++++++++- .../lanspread-peer/src/services/handshake.rs | 17 +- .../lanspread-peer/src/services/liveness.rs | 14 +- .../src/services/local_monitor.rs | 15 +- crates/lanspread-peer/src/services/stream.rs | 156 ++++++++++++--- crates/lanspread-peer/src/startup.rs | 8 +- .../src-tauri/Cargo.toml | 1 + .../src-tauri/src/lib.rs | 142 ++++++++++---- .../src/components/grid/GameCard.tsx | 13 +- .../src/components/modals/GameDetailModal.tsx | 22 ++- .../src/hooks/useGameActions.ts | 19 +- .../src/lib/gameState.ts | 31 ++- .../lanspread-tauri-deno-ts/src/lib/types.ts | 3 +- .../src/styles/launcher.css | 16 ++ 24 files changed, 882 insertions(+), 212 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73f7104..b994ecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2217,6 +2217,7 @@ dependencies = [ "tauri-plugin-shell", "tauri-plugin-store", "tokio", + "tokio-util", "walkdir", "windows 0.62.2", ] diff --git a/crates/lanspread-compat/src/eti.rs b/crates/lanspread-compat/src/eti.rs index 5ed6b6f..5253907 100644 --- a/crates/lanspread-compat/src/eti.rs +++ b/crates/lanspread-compat/src/eti.rs @@ -57,14 +57,14 @@ impl From for Game { release_year: eti_game.game_release, publisher: eti_game.game_publisher, max_players: eti_game.game_maxplayers, - version: eti_game.game_version, + version: eti_game.game_version.clone(), genre: eti_game.genre_de, #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] size: (eti_game.game_size * 1024.0 * 1024.0 * 1024.0) as u64, downloaded: false, installed: false, availability: Availability::LocalOnly, - eti_game_version: None, + eti_game_version: Some(eti_game.game_version), local_version: None, peer_count: 0, // ETI games start with 0 peers until peer system discovers them } diff --git a/crates/lanspread-db/src/db.rs b/crates/lanspread-db/src/db.rs index b0ec6cd..32335fe 100644 --- a/crates/lanspread-db/src/db.rs +++ b/crates/lanspread-db/src/db.rs @@ -78,7 +78,7 @@ pub struct Game { /// Backend-reported availability state for this game's local or peer summary. #[serde(default)] pub availability: Availability, - /// ETI game version from version.ini (YYYYMMDD format) (server) + /// Authoritative ETI game version from the bundled game.db (YYYYMMDD format). pub eti_game_version: Option, /// Local game version from version.ini (YYYYMMDD format) pub local_version: Option, @@ -198,6 +198,60 @@ impl Default for GameDB { } } +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct GameCatalog { + expected_versions: HashMap>, +} + +impl GameCatalog { + #[must_use] + pub fn empty() -> Self { + Self { + expected_versions: HashMap::new(), + } + } + + #[must_use] + pub fn from_game_db(game_db: &GameDB) -> Self { + Self { + expected_versions: game_db + .games + .values() + .map(|game| (game.id.clone(), game.eti_game_version.clone())) + .collect(), + } + } + + #[must_use] + pub fn from_ids(ids: impl IntoIterator) -> Self { + Self { + expected_versions: ids.into_iter().map(|id| (id, None)).collect(), + } + } + + pub fn insert(&mut self, id: String, expected_version: Option) { + self.expected_versions.insert(id, expected_version); + } + + #[must_use] + pub fn contains(&self, id: S) -> bool + where + S: AsRef, + { + self.expected_versions.contains_key(id.as_ref()) + } + + #[must_use] + pub fn expected_version(&self, id: S) -> Option<&str> + where + S: AsRef, + { + self.expected_versions + .get(id.as_ref()) + .and_then(Option::as_deref) + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct GameFileDescription { pub game_id: String, diff --git a/crates/lanspread-peer-cli/src/main.rs b/crates/lanspread-peer-cli/src/main.rs index 81dab6e..f5155fa 100644 --- a/crates/lanspread-peer-cli/src/main.rs +++ b/crates/lanspread-peer-cli/src/main.rs @@ -12,7 +12,7 @@ use std::{ use eyre::Context; use lanspread_compat::eti::get_games; -use lanspread_db::db::{Game, GameFileDescription}; +use lanspread_db::db::{Game, GameCatalog, GameFileDescription}; use lanspread_peer::{ ActiveOperation, ActiveOperationKind, @@ -30,6 +30,7 @@ use lanspread_peer::{ use lanspread_peer_cli::{ CliCommand, CommandEnvelope, + DEFAULT_FIXTURE_VERSION, ExternalUnrarUnpacker, FixtureSeed, FixtureUnpacker, @@ -114,7 +115,7 @@ struct DownloadMeasurement { struct SharedState { state: RwLock, peer_game_db: Arc>, - catalog: Arc>>, + catalog: Arc>, notify: Notify, games_dir: PathBuf, state_dir: PathBuf, @@ -146,6 +147,7 @@ async fn main() -> eyre::Result<()> { catalog.clone(), PeerStartOptions { state_dir: Some(args.state_dir.clone()), + active_outbound_transfers: None, }, )?; let sender = handle.sender(); @@ -303,15 +305,8 @@ async fn list_peers(shared: &SharedState) -> eyre::Result { async fn list_games(shared: &SharedState) -> eyre::Result { let state = shared.state.read().await; - let catalog = shared.catalog.read().await.clone(); - let remote = shared - .peer_game_db - .read() - .await - .get_all_games() - .into_iter() - .filter(|game| catalog.contains(&game.id)) - .collect::>(); + let catalog = shared.catalog.read().await; + let remote = shared.peer_game_db.read().await.get_catalog_games(&catalog); Ok(json!({ "local": state.local_games.clone(), "remote": remote, @@ -434,6 +429,7 @@ async fn event_loop( } } +#[allow(clippy::too_many_lines)] async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'static str, Value) { match event { PeerEvent::LocalPeerReady { peer_id, addr } => { @@ -458,6 +454,7 @@ async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'s state.local_games.clone_from(&games); ("local-library-changed", json!({ "games": games })) } + PeerEvent::OutboundTransferCountChanged => ("outbound-transfer-count-changed", json!({})), PeerEvent::ActiveOperationsChanged { active_operations } => { let mut state = shared.state.write().await; state.active_operations.clone_from(&active_operations); @@ -668,18 +665,27 @@ fn seed_fixtures(game_dir: &Path, fixtures: &[String]) -> eyre::Result, fixtures: &[FixtureSeed]) -> HashSet { - let mut catalog = HashSet::new(); +async fn load_catalog(catalog_db: Option<&Path>, fixtures: &[FixtureSeed]) -> GameCatalog { + let mut catalog = GameCatalog::empty(); if let Some(path) = catalog_db && path.exists() { match get_games(path).await { - Ok(games) => catalog.extend(games.into_iter().map(|game| game.game_id)), + Ok(games) => { + for game in games { + catalog.insert(game.game_id, Some(game.game_version)); + } + } Err(err) => eprintln!("failed to load catalog db {}: {err}", path.display()), } } - catalog.extend(fixtures.iter().map(|seed| seed.game_id.clone())); + for seed in fixtures { + catalog.insert( + seed.game_id.clone(), + Some(DEFAULT_FIXTURE_VERSION.to_string()), + ); + } catalog } diff --git a/crates/lanspread-peer/src/context.rs b/crates/lanspread-peer/src/context.rs index 3a75a5b..55cb9d0 100644 --- a/crates/lanspread-peer/src/context.rs +++ b/crates/lanspread-peer/src/context.rs @@ -1,18 +1,17 @@ //! Shared context types for the peer system. -use std::{ - collections::{HashMap, HashSet}, - net::SocketAddr, - path::PathBuf, - sync::Arc, -}; +use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc}; -use lanspread_db::db::GameDB; +use lanspread_db::db::{GameCatalog, GameDB}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB}; +/// Thread-safe map of active outbound file transfers grouped by game ID. +pub type OutboundTransfers = Arc>>>; + + /// Mutating filesystem operation currently in flight for a game root. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum OperationKind { @@ -40,10 +39,11 @@ pub struct Ctx { pub active_operations: Arc>>, pub active_downloads: Arc>>, pub unpacker: Arc, - pub catalog: Arc>>, + pub catalog: Arc>, pub peer_id: Arc, pub shutdown: CancellationToken, pub task_tracker: TaskTracker, + pub active_outbound_transfers: OutboundTransfers, } /// Context for peer connection handling. @@ -55,11 +55,12 @@ pub struct PeerCtx { pub local_peer_addr: Arc>>, pub active_operations: Arc>>, pub peer_game_db: Arc>, - pub catalog: Arc>>, + pub catalog: Arc>, pub peer_id: Arc, pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender, pub shutdown: CancellationToken, pub task_tracker: TaskTracker, + pub active_outbound_transfers: OutboundTransfers, } impl std::fmt::Debug for PeerCtx { @@ -84,7 +85,8 @@ impl Ctx { unpacker: Arc, shutdown: CancellationToken, task_tracker: TaskTracker, - catalog: Arc>>, + catalog: Arc>, + active_outbound_transfers: OutboundTransfers, ) -> Self { Self { game_dir: Arc::new(RwLock::new(game_dir)), @@ -100,6 +102,7 @@ impl Ctx { peer_id: Arc::new(peer_id), shutdown, task_tracker, + active_outbound_transfers, } } @@ -120,6 +123,7 @@ impl Ctx { tx_notify_ui, shutdown: self.shutdown.clone(), task_tracker: self.task_tracker.clone(), + active_outbound_transfers: self.active_outbound_transfers.clone(), } } } diff --git a/crates/lanspread-peer/src/events.rs b/crates/lanspread-peer/src/events.rs index c479063..6cb475e 100644 --- a/crates/lanspread-peer/src/events.rs +++ b/crates/lanspread-peer/src/events.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use lanspread_db::db::GameCatalog; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ @@ -65,9 +66,13 @@ fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind { pub async fn emit_peer_game_list( peer_game_db: &Arc>, + catalog: &Arc>, tx_notify_ui: &UnboundedSender, ) { - let games = { peer_game_db.read().await.get_all_games() }; + let games = { + let catalog = catalog.read().await; + peer_game_db.read().await.get_catalog_games(&catalog) + }; send(tx_notify_ui, PeerEvent::ListGames(games)); } diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 8adca68..144c9c1 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -23,7 +23,7 @@ use crate::{ game_from_summary, get_game_file_descriptions, local_dir_is_directory, - local_download_available, + local_download_matches_catalog, rescan_local_game, scan_local_library, version_ini_is_regular_file, @@ -41,7 +41,7 @@ use crate::{ /// Handles the `ListGames` command. pub async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { log::info!("ListGames command received"); - events::emit_peer_game_list(&ctx.peer_game_db, tx_notify_ui).await; + events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, tx_notify_ui).await; } /// Tries to serve a game from local files. @@ -54,7 +54,7 @@ async fn try_serve_local_game( let active_operations = ctx.active_operations.read().await; let catalog = ctx.catalog.read().await; - if !local_download_available(&game_dir, id, &active_operations, &catalog).await { + if !local_download_matches_catalog(&game_dir, id, &active_operations, &catalog).await { return false; } drop(active_operations); @@ -90,9 +90,10 @@ pub(crate) async fn handle_get_game_command( } log::info!("Requesting game from peers: {id}"); + let expected_version = catalog_expected_version(ctx, &id).await; let peers = { let peer_game_db = ctx.peer_game_db.read().await; - source.select_peers(&peer_game_db, &id) + source.select_peers(&peer_game_db, &id, expected_version.as_deref()) }; if peers.is_empty() { log::warn!("No peers have game {id}"); @@ -107,6 +108,7 @@ pub(crate) async fn handle_get_game_command( ctx.task_tracker.spawn(fetch_game_details_from_peers( peers, id, + expected_version, peer_game_db, tx_notify_ui, |peer_addr, game_id, peer_game_db| async move { @@ -126,10 +128,16 @@ impl GameDetailSource { matches!(self, Self::LocalOrPeers) } - fn select_peers(self, peer_game_db: &PeerGameDB, id: &str) -> Vec { + fn select_peers( + self, + peer_game_db: &PeerGameDB, + id: &str, + expected_version: Option<&str>, + ) -> Vec { match self { - Self::LocalOrPeers => peer_game_db.peers_with_game(id), - Self::LatestPeersOnly => peer_game_db.peers_with_latest_version(id), + Self::LocalOrPeers | Self::LatestPeersOnly => { + peer_game_db.peers_with_expected_version(id, expected_version) + } } } } @@ -154,6 +162,7 @@ async fn request_game_details_and_update( async fn fetch_game_details_from_peers( peers: Vec, id: String, + expected_version: Option, peer_game_db: Arc>, tx_notify_ui: UnboundedSender, mut fetch_details: F, @@ -175,7 +184,12 @@ async fn fetch_game_details_from_peers( } if fetched_any { - let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) }; + let aggregated_files = { + peer_game_db + .read() + .await + .aggregated_game_files(&id, expected_version.as_deref()) + }; if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { id: id.clone(), @@ -210,6 +224,7 @@ pub async fn handle_download_game_files_command( } let games_folder = { ctx.game_dir.read().await.clone() }; + let expected_version = catalog_expected_version(ctx, &id).await; // Use majority validation to get trusted file descriptions and peer whitelist let (validated_descriptions, peer_whitelist, file_peer_map) = { @@ -217,7 +232,7 @@ pub async fn handle_download_game_files_command( .peer_game_db .read() .await - .validate_file_sizes_majority(&id) + .validate_file_sizes_majority(&id, expected_version.as_deref()) { Ok((files, peers, file_peer_map)) => { log::info!( @@ -260,7 +275,7 @@ pub async fn handle_download_game_files_command( let local_dl_available = { let active_operations = ctx.active_operations.read().await; let catalog = ctx.catalog.read().await; - local_download_available(&games_folder, &id, &active_operations, &catalog).await + local_download_matches_catalog(&games_folder, &id, &active_operations, &catalog).await }; if peer_whitelist.is_empty() { @@ -732,11 +747,41 @@ async fn begin_operation( } }; - if started { - events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await; + if !started { + return false; } - started + events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await; + + if operation == OperationKind::Updating || operation == OperationKind::RemovingDownload { + // Cancel all active outbound transfers for this game + let mut tokens_to_cancel = Vec::new(); + { + let active = ctx.active_outbound_transfers.read().await; + if let Some(transfers) = active.get(id) { + for (_, token) in transfers { + tokens_to_cancel.push(token.clone()); + } + } + } + for token in tokens_to_cancel { + token.cancel(); + } + + // Wait until active outbound transfers drop to 0 + loop { + let count = { + let active = ctx.active_outbound_transfers.read().await; + active.get(id).map_or(0, Vec::len) + }; + if count == 0 { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + } + + true } async fn transition_download_to_install( @@ -818,6 +863,14 @@ async fn catalog_contains(ctx: &Ctx, id: &str) -> bool { ctx.catalog.read().await.contains(id) } +async fn catalog_expected_version(ctx: &Ctx, id: &str) -> Option { + ctx.catalog + .read() + .await + .expected_version(id) + .map(ToOwned::to_owned) +} + /// Handles the `SetGameDir` command. pub async fn handle_set_game_dir_command( ctx: &Ctx, @@ -1008,13 +1061,8 @@ async fn update_and_announce_games_with_policy( active_operation_ids.remove(id); } if !active_operation_ids.is_empty() { - let previous = ctx.local_library.read().await.games.clone(); for id in &active_operation_ids { - if let Some(summary) = previous.get(id.as_str()) { - summaries.insert(id.clone(), summary.clone()); - } else { - summaries.remove(id); - } + summaries.remove(id); } game_db = GameDB::from(summaries.values().map(game_from_summary).collect()); } @@ -1068,13 +1116,14 @@ async fn update_and_announce_games_with_policy( #[cfg(test)] mod tests { use std::{ - collections::HashSet, + collections::HashMap, net::SocketAddr, path::{Path, PathBuf}, sync::{Arc, Mutex}, time::Duration, }; + use lanspread_db::db::GameCatalog; use lanspread_proto::{Availability, GameSummary}; use tokio::sync::mpsc; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -1115,7 +1164,8 @@ mod tests { Arc::new(FakeUnpacker), CancellationToken::new(), TaskTracker::new(), - Arc::new(RwLock::new(HashSet::from(["game".to_string()]))), + Arc::new(RwLock::new(GameCatalog::from_ids(["game".to_string()]))), + Arc::new(RwLock::new(HashMap::new())), ) } @@ -1220,7 +1270,7 @@ mod tests { } #[test] - fn update_source_selects_latest_ready_peer_manifest() { + fn update_source_selects_expected_ready_peer_manifest() { let old_addr = addr(12_000); let new_addr = addr(12_001); let local_only_addr = addr(12_002); @@ -1242,13 +1292,13 @@ mod tests { ); assert_eq!( - GameDetailSource::LatestPeersOnly.select_peers(&db, "game"), + GameDetailSource::LatestPeersOnly.select_peers(&db, "game", Some("20250101")), vec![new_addr] ); } #[tokio::test] - async fn update_fetch_emits_fresh_manifest_from_latest_peer() { + async fn update_fetch_emits_fresh_manifest_from_expected_peer() { let old_addr = addr(12_010); let new_addr = addr(12_011); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); @@ -1267,33 +1317,40 @@ mod tests { } let peers = { let db = peer_game_db.read().await; - GameDetailSource::LatestPeersOnly.select_peers(&db, "game") + GameDetailSource::LatestPeersOnly.select_peers(&db, "game", Some("20250101")) }; let (tx, mut rx) = mpsc::unbounded_channel(); let fetched_peers = Arc::new(Mutex::new(Vec::new())); - fetch_game_details_from_peers(peers, "game".to_string(), peer_game_db.clone(), tx, { - let fetched_peers = fetched_peers.clone(); - move |peer_addr, game_id, peer_game_db| { + fetch_game_details_from_peers( + peers, + "game".to_string(), + Some("20250101".to_string()), + peer_game_db.clone(), + tx, + { let fetched_peers = fetched_peers.clone(); - async move { - fetched_peers - .lock() - .expect("fetched peer list should not be poisoned") - .push(peer_addr); - let files = vec![ - file_desc(&game_id, "game/version.ini", 8), - file_desc(&game_id, "game/new.eti", 11), - ]; - peer_game_db.write().await.update_peer_game_files( - &"new".to_string(), - &game_id, - files.clone(), - ); - Ok(files) + move |peer_addr, game_id, peer_game_db| { + let fetched_peers = fetched_peers.clone(); + async move { + fetched_peers + .lock() + .expect("fetched peer list should not be poisoned") + .push(peer_addr); + let files = vec![ + file_desc(&game_id, "game/version.ini", 8), + file_desc(&game_id, "game/new.eti", 11), + ]; + peer_game_db.write().await.update_peer_game_files( + &"new".to_string(), + &game_id, + files.clone(), + ); + Ok(files) + } } - } - }) + }, + ) .await; assert_eq!( @@ -1314,7 +1371,7 @@ mod tests { file_descriptions .iter() .any(|desc| desc.relative_path == "game/new.eti" && desc.size == 11), - "latest peer manifest should be emitted to the download path" + "expected-version peer manifest should be emitted to the download path" ); } @@ -1329,6 +1386,7 @@ mod tests { fetch_game_details_from_peers( vec![first_addr, second_addr], "game".to_string(), + Some("20250101".to_string()), peer_game_db, tx.clone(), { @@ -1362,7 +1420,7 @@ mod tests { #[tokio::test] async fn update_request_skips_local_manifest_even_when_download_exists() { - let temp = TempDir::new("lanspread-handler-latest-peer"); + let temp = TempDir::new("lanspread-handler-expected-peer"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20240101"); write_file(&root.join("game.eti"), b"old archive"); @@ -1385,23 +1443,37 @@ mod tests { } #[tokio::test] - async fn local_library_scan_freezes_active_game_state() { - let temp = TempDir::new("lanspread-handler-active-freeze"); + async fn local_library_scan_hides_active_game_state() { + let temp = TempDir::new("lanspread-handler-active-hide"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); let ctx = test_ctx(temp.path().to_path_buf()); + let (tx, mut rx) = mpsc::unbounded_channel(); + let catalog = ctx.catalog.read().await.clone(); + + // 1. Initial scan: the game is ready and announced + let scan = scan_local_library(temp.path(), ctx.state_dir.as_ref(), &catalog) + .await + .expect("scan should succeed"); + update_and_announce_games(&ctx, &tx, scan).await; + + let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else { + panic!("expected LocalLibraryChanged"); + }; + assert_eq!(games.len(), 1); + assert_eq!(games[0].id, "game"); + + // 2. Set the game as active/in-progress and scan again ctx.active_operations .write() .await .insert("game".to_string(), OperationKind::Installing); - let (tx, mut rx) = mpsc::unbounded_channel(); - let catalog = ctx.catalog.read().await.clone(); + let scan = scan_local_library(temp.path(), ctx.state_dir.as_ref(), &catalog) .await .expect("scan should succeed"); - update_and_announce_games(&ctx, &tx, scan).await; let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else { @@ -1409,7 +1481,7 @@ mod tests { }; assert!( games.is_empty(), - "active game should keep its previous announced state" + "active game should be hidden/unannounced during operations" ); } diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index c29cf7d..3d254de 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -39,12 +39,12 @@ mod test_support; // Public re-exports // ============================================================================= -use std::{collections::HashSet, net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT}; pub use error::PeerError; pub use install::{UnpackFuture, Unpacker}; -use lanspread_db::db::{Game, GameFileDescription}; +use lanspread_db::db::{Game, GameCatalog, GameFileDescription}; pub use migration::{MigrationReport, migrate_legacy_state}; pub use peer_db::{ MajorityValidationResult, @@ -153,6 +153,8 @@ pub enum PeerEvent { PeerCountUpdated(usize), /// The local library contents changed after a scan. LocalLibraryChanged { games: Vec }, + /// The number of active outbound transfers changed. + OutboundTransferCountChanged, /// The set of in-progress local operations changed. ActiveOperationsChanged { active_operations: Vec, @@ -262,6 +264,7 @@ pub enum PeerCommand { pub struct PeerStartOptions { /// Directory used for peer identity and other state. pub state_dir: Option, + pub active_outbound_transfers: Option, } // ============================================================================= @@ -286,7 +289,7 @@ pub fn start_peer( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, unpacker: Arc, - catalog: Arc>>, + catalog: Arc>, ) -> eyre::Result { start_peer_with_options( game_dir, @@ -305,12 +308,17 @@ pub fn start_peer_with_options( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, unpacker: Arc, - catalog: Arc>>, + catalog: Arc>, options: PeerStartOptions, ) -> eyre::Result { - let PeerStartOptions { state_dir } = options; + let PeerStartOptions { + state_dir, + active_outbound_transfers, + } = options; let state_dir = resolve_state_dir(state_dir.as_deref()); let game_dir = game_dir.into(); + let active_outbound_transfers = active_outbound_transfers + .unwrap_or_else(|| Arc::new(RwLock::new(std::collections::HashMap::new()))); log::info!( "Starting peer system with game directory: {}", game_dir.display() @@ -329,6 +337,7 @@ pub fn start_peer_with_options( state_dir, unpacker, catalog, + active_outbound_transfers, )) } @@ -344,7 +353,8 @@ async fn run_peer( unpacker: Arc, shutdown: CancellationToken, task_tracker: TaskTracker, - catalog: Arc>>, + catalog: Arc>, + active_outbound_transfers: crate::context::OutboundTransfers, ) -> eyre::Result<()> { let ctx = Ctx::new( peer_game_db, @@ -355,6 +365,7 @@ async fn run_peer( shutdown, task_tracker, catalog, + active_outbound_transfers, ); if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await { log::error!("Failed to load initial local game database: {err}"); diff --git a/crates/lanspread-peer/src/local_games.rs b/crates/lanspread-peer/src/local_games.rs index dbb6ae6..7b938c1 100644 --- a/crates/lanspread-peer/src/local_games.rs +++ b/crates/lanspread-peer/src/local_games.rs @@ -9,7 +9,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; -use lanspread_db::db::{Game, GameDB, GameFileDescription}; +use lanspread_db::db::{Game, GameCatalog, GameDB, GameFileDescription}; use lanspread_proto::{Availability, GameSummary}; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, sync::Mutex}; @@ -51,7 +51,7 @@ pub async fn local_download_available( game_dir: &Path, game_id: &str, active_operations: &HashMap, - catalog: &HashSet, + catalog: &GameCatalog, ) -> bool { if !catalog.contains(game_id) { log::debug!("Not serving game {game_id} locally because it is not in the catalog"); @@ -67,6 +67,40 @@ pub async fn local_download_available( version_ini_is_regular_file(game_path.as_path()).await } +/// Checks if a local game may be served to peers under the authoritative catalog version. +pub async fn local_download_matches_catalog( + game_dir: &Path, + game_id: &str, + active_operations: &HashMap, + catalog: &GameCatalog, +) -> bool { + if !local_download_available(game_dir, game_id, active_operations, catalog).await { + return false; + } + + let Some(expected_version) = catalog.expected_version(game_id) else { + return true; + }; + + let game_path = game_dir.join(game_id); + match lanspread_db::db::read_version_from_ini(&game_path) { + Ok(Some(local_version)) if local_version == expected_version => true, + Ok(Some(local_version)) => { + log::debug!( + "Not serving game {game_id}: local version.ini {local_version} does not match catalog {expected_version}" + ); + false + } + Ok(None) => false, + Err(err) => { + log::warn!( + "Not serving game {game_id}: failed to read local version.ini for catalog comparison: {err}" + ); + false + } + } +} + // ============================================================================= // Local library index and scanning // ============================================================================= @@ -468,7 +502,7 @@ struct IndexUpdate { async fn update_index_for_game( game_root: &Path, game_id: &str, - catalog: &HashSet, + catalog: &GameCatalog, index: &mut LibraryIndex, ) -> eyre::Result { if !catalog.contains(game_id) { @@ -557,7 +591,7 @@ fn scan_from_index(index: &LibraryIndex) -> LocalLibraryScan { pub async fn scan_local_library( game_dir: impl AsRef, state_dir: impl AsRef, - catalog: &HashSet, + catalog: &GameCatalog, ) -> eyre::Result { let game_path = game_dir.as_ref(); let state_path = state_dir.as_ref(); @@ -645,7 +679,7 @@ pub async fn scan_local_library( pub async fn rescan_local_game( game_dir: impl AsRef, state_dir: impl AsRef, - catalog: &HashSet, + catalog: &GameCatalog, game_id: &str, ) -> eyre::Result { let game_path = game_dir.as_ref(); @@ -682,10 +716,7 @@ pub async fn get_game_file_descriptions( #[cfg(test)] mod tests { - use std::{ - collections::{HashMap, HashSet}, - path::Path, - }; + use std::{collections::HashMap, path::Path}; use lanspread_proto::Availability; @@ -776,7 +807,7 @@ mod tests { async fn scan_uses_version_ini_and_local_dir_as_independent_state() { let temp = TempDir::new("lanspread-local-games"); let state = TempDir::new("lanspread-local-games-state"); - let catalog = HashSet::from([ + let catalog = GameCatalog::from_ids([ "ready".to_string(), "local-only".to_string(), "eti-only".to_string(), @@ -830,7 +861,7 @@ mod tests { async fn rescan_promotes_installed_only_game_to_ready_when_sentinel_appears() { let temp = TempDir::new("lanspread-local-games"); let state = TempDir::new("lanspread-local-games-state"); - let catalog = HashSet::from(["game".to_string()]); + let catalog = GameCatalog::from_ids(["game".to_string()]); std::fs::create_dir_all(temp.path().join("game").join("local")) .expect("local install dir should be created"); @@ -864,7 +895,7 @@ mod tests { async fn concurrent_rescans_preserve_both_index_updates() { let temp = TempDir::new("lanspread-local-games-concurrent"); let state = TempDir::new("lanspread-local-games-state"); - let catalog = HashSet::from(["game-a".to_string(), "game-b".to_string()]); + let catalog = GameCatalog::from_ids(["game-a".to_string(), "game-b".to_string()]); write_file(&temp.path().join("game-a").join("version.ini"), b"20250101"); write_file(&temp.path().join("game-b").join("version.ini"), b"20250101"); @@ -909,7 +940,7 @@ mod tests { let game_root = temp.path().join("game"); write_file(&game_root.join("version.ini"), b"20250101"); - let catalog = HashSet::from(["game".to_string()]); + let catalog = GameCatalog::from_ids(["game".to_string()]); let no_operations = HashMap::new(); assert!(local_download_available(temp.path(), "game", &no_operations, &catalog).await); @@ -917,8 +948,29 @@ mod tests { assert!(!local_download_available(temp.path(), "game", &active_operations, &catalog).await); assert!( - !local_download_available(temp.path(), "game", &no_operations, &HashSet::new()).await + !local_download_available(temp.path(), "game", &no_operations, &GameCatalog::empty()) + .await ); assert!(!local_download_available(temp.path(), "missing", &no_operations, &catalog).await); } + + #[tokio::test] + async fn local_download_matches_catalog_requires_expected_version() { + let temp = TempDir::new("lanspread-local-games"); + let game_root = temp.path().join("game"); + write_file(&game_root.join("version.ini"), b"20260101"); + + let mut catalog = GameCatalog::empty(); + catalog.insert("game".to_string(), Some("20250101".to_string())); + let no_operations = HashMap::new(); + + assert!( + !local_download_matches_catalog(temp.path(), "game", &no_operations, &catalog).await + ); + + catalog.insert("game".to_string(), Some("20260101".to_string())); + assert!( + local_download_matches_catalog(temp.path(), "game", &no_operations, &catalog).await + ); + } } diff --git a/crates/lanspread-peer/src/peer.rs b/crates/lanspread-peer/src/peer.rs index 43c358b..4d7a53b 100644 --- a/crates/lanspread-peer/src/peer.rs +++ b/crates/lanspread-peer/src/peer.rs @@ -14,12 +14,14 @@ use tokio::{ use crate::{config::FILE_TRANSFER_BUFFER_SIZE, path_validation::validate_game_file_path}; +#[allow(clippy::too_many_lines)] async fn stream_file_bytes( tx: &mut SendStream, base_dir: &Path, relative_path: &str, offset: u64, length: Option, + cancel_token: tokio_util::sync::CancellationToken, ) -> eyre::Result<()> { let remote_addr = maybe_addr!(tx.connection().remote_addr()); @@ -45,13 +47,32 @@ async fn stream_file_bytes( let mut buf = vec![0u8; FILE_TRANSFER_BUFFER_SIZE]; while remaining > 0 { + if cancel_token.is_cancelled() { + log::info!( + "{remote_addr} transfer cancelled for {}", + validated_path.display() + ); + return Err(eyre::eyre!("File transfer cancelled by user")); + } + let read_len = std::cmp::min(remaining, buf.len() as u64); let read_len: usize = read_len.try_into().unwrap_or(usize::MAX); if read_len == 0 { break; } - let bytes_read = file.read(&mut buf[..read_len]).await?; + let bytes_read = tokio::select! { + () = cancel_token.cancelled() => { + log::info!( + "{remote_addr} transfer cancelled for {}", + validated_path.display() + ); + return Err(eyre::eyre!("File transfer cancelled by user")); + } + res = file.read(&mut buf[..read_len]) => { + res? + } + }; if bytes_read == 0 { if !expect_exact { transfer_complete = true; @@ -59,7 +80,18 @@ async fn stream_file_bytes( break; } - tx.send(Bytes::copy_from_slice(&buf[..bytes_read])).await?; + tokio::select! { + () = cancel_token.cancelled() => { + log::info!( + "{remote_addr} transfer cancelled for {}", + validated_path.display() + ); + return Err(eyre::eyre!("File transfer cancelled by user")); + } + res = tx.send(Bytes::copy_from_slice(&buf[..bytes_read])) => { + res?; + } + } remaining = remaining.saturating_sub(bytes_read as u64); total_bytes += bytes_read as u64; @@ -97,12 +129,20 @@ async fn stream_file_bytes( validated_path.display() ); - match tx.close().await { - Ok(()) => {} - Err(err) if transfer_complete && is_clean_remote_close(&err) => { - log::debug!("{remote_addr} closed stream after transfer completion: {err}"); + tokio::select! { + () = cancel_token.cancelled() => { + log::info!("{remote_addr} transfer cancelled while closing stream"); + return Err(eyre::eyre!("File transfer cancelled by user")); + } + res = tx.close() => { + match res { + Ok(()) => {} + Err(err) if transfer_complete && is_clean_remote_close(&err) => { + log::debug!("{remote_addr} closed stream after transfer completion: {err}"); + } + Err(err) => return Err(err.into()), + } } - Err(err) => return Err(err.into()), } Ok(()) } @@ -121,8 +161,18 @@ pub async fn send_game_file_data( game_file_desc: &GameFileDescription, tx: &mut SendStream, game_dir: &Path, + cancel_token: tokio_util::sync::CancellationToken, ) { - if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await { + if let Err(e) = stream_file_bytes( + tx, + game_dir, + &game_file_desc.relative_path, + 0, + None, + cancel_token, + ) + .await + { let remote_addr = maybe_addr!(tx.connection().remote_addr()); log::error!( "{remote_addr} failed to stream file {}: {e}", @@ -138,8 +188,18 @@ pub async fn send_game_file_chunk( length: u64, tx: &mut SendStream, game_dir: &Path, + cancel_token: tokio_util::sync::CancellationToken, ) { - if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await { + if let Err(e) = stream_file_bytes( + tx, + game_dir, + relative_path, + offset, + Some(length), + cancel_token, + ) + .await + { let remote_addr = maybe_addr!(tx.connection().remote_addr()); log::error!( "{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}" diff --git a/crates/lanspread-peer/src/peer_db.rs b/crates/lanspread-peer/src/peer_db.rs index 764729c..0d66beb 100644 --- a/crates/lanspread-peer/src/peer_db.rs +++ b/crates/lanspread-peer/src/peer_db.rs @@ -7,7 +7,7 @@ use std::{ time::{Duration, Instant}, }; -use lanspread_db::db::{Availability, Game, GameFileDescription}; +use lanspread_db::db::{Availability, Game, GameCatalog, GameFileDescription}; use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot}; use crate::library::compute_library_digest; @@ -357,6 +357,54 @@ impl PeerGameDB { games } + /// Returns catalog games aggregated from peers that advertise the expected catalog version. + #[must_use] + pub fn get_catalog_games(&self, catalog: &GameCatalog) -> Vec { + let mut aggregated: HashMap = HashMap::new(); + let mut peer_counts: HashMap = HashMap::new(); + + for peer in self.peers.values() { + for game in peer.games.values().filter(|game| { + catalog.contains(&game.id) + && game_matches_expected_version(game, catalog.expected_version(&game.id)) + }) { + *peer_counts.entry(game.id.clone()).or_insert(0) += 1; + } + } + + for peer in self.peers.values() { + for game in peer.games.values().filter(|game| { + catalog.contains(&game.id) + && game_matches_expected_version(game, catalog.expected_version(&game.id)) + }) { + aggregated + .entry(game.id.clone()) + .and_modify(|existing| { + existing.peer_count = *peer_counts.get(&game.id).unwrap_or(&0); + if game.size > existing.size { + existing.size = game.size; + } + existing.set_downloaded(true); + if game.installed { + existing.installed = true; + } + }) + .or_insert_with(|| { + let mut game_clone = summary_to_game(game); + if let Some(expected_version) = catalog.expected_version(&game.id) { + game_clone.eti_game_version = Some(expected_version.to_string()); + } + game_clone.peer_count = *peer_counts.get(&game.id).unwrap_or(&0); + game_clone + }); + } + } + + let mut games: Vec = aggregated.into_values().collect(); + games.sort_by(|a, b| a.name.cmp(&b.name)); + games + } + /// Returns the latest version of a game across all peers. #[must_use] pub fn get_latest_version_for_game(&self, game_id: &str) -> Option { @@ -451,6 +499,24 @@ impl PeerGameDB { .collect() } + /// Returns addresses of peers that have the expected catalog version of a game. + #[must_use] + pub fn peers_with_expected_version( + &self, + game_id: &str, + expected_version: Option<&str>, + ) -> Vec { + self.peers + .iter() + .filter(|(_, peer)| { + peer.games + .get(game_id) + .is_some_and(|game| game_matches_expected_version(game, expected_version)) + }) + .map(|(_, peer)| peer.addr) + .collect() + } + /// Returns addresses of peers that have the latest version of a game. #[must_use] pub fn peers_with_latest_version(&self, game_id: &str) -> Vec { @@ -514,11 +580,33 @@ impl PeerGameDB { .collect() } + /// Returns file descriptions from peers that advertise the expected catalog version. + #[must_use] + pub fn expected_version_game_files_for( + &self, + game_id: &str, + expected_version: Option<&str>, + ) -> Vec<(SocketAddr, Vec)> { + let expected_peers = self.peers_with_expected_version(game_id, expected_version); + if expected_peers.is_empty() { + return Vec::new(); + } + + self.game_files_for(game_id) + .into_iter() + .filter(|(addr, _)| expected_peers.contains(addr)) + .collect() + } + /// Returns aggregated file descriptions for a game across all peers. #[must_use] - pub fn aggregated_game_files(&self, game_id: &str) -> Vec { + pub fn aggregated_game_files( + &self, + game_id: &str, + expected_version: Option<&str>, + ) -> Vec { let mut seen: HashMap = HashMap::new(); - for (_, files) in self.latest_game_files_for(game_id) { + for (_, files) in self.expected_version_game_files_for(game_id, expected_version) { for file in files { seen.entry(file.relative_path.clone()).or_insert(file); } @@ -559,8 +647,9 @@ impl PeerGameDB { pub fn validate_file_sizes_majority( &self, game_id: &str, + expected_version: Option<&str>, ) -> eyre::Result { - let game_files = self.latest_game_files_for(game_id); + let game_files = self.expected_version_game_files_for(game_id, expected_version); if game_files.is_empty() { return Ok((Vec::new(), Vec::new(), HashMap::new())); } @@ -813,6 +902,14 @@ fn game_is_ready(summary: &GameSummary) -> bool { summary.availability == Availability::Ready } +fn game_matches_expected_version(summary: &GameSummary, expected_version: Option<&str>) -> bool { + if !game_is_ready(summary) { + return false; + } + + expected_version.is_none_or(|expected| summary.eti_version.as_deref() == Some(expected)) +} + fn summary_to_game(summary: &GameSummary) -> Game { let eti_game_version = game_is_ready(summary) .then(|| summary.eti_version.clone()) @@ -925,6 +1022,41 @@ mod tests { assert!(db.peers_with_latest_version("game").is_empty()); } + #[test] + fn catalog_aggregation_counts_only_expected_version_peers() { + let old_addr = addr(12003); + let expected_addr = addr(12004); + let newer_addr = addr(12005); + let mut db = PeerGameDB::new(); + db.upsert_peer("old".to_string(), old_addr); + db.upsert_peer("expected".to_string(), expected_addr); + db.upsert_peer("newer".to_string(), newer_addr); + db.update_peer_games( + &"old".to_string(), + vec![summary("game", "20240101", Availability::Ready)], + ); + db.update_peer_games( + &"expected".to_string(), + vec![summary("game", "20250101", Availability::Ready)], + ); + db.update_peer_games( + &"newer".to_string(), + vec![summary("game", "20260101", Availability::Ready)], + ); + let mut catalog = GameCatalog::empty(); + catalog.insert("game".to_string(), Some("20250101".to_string())); + + let games = db.get_catalog_games(&catalog); + + assert_eq!(games.len(), 1); + assert_eq!(games[0].peer_count, 1); + assert_eq!(games[0].eti_game_version.as_deref(), Some("20250101")); + assert_eq!( + db.peers_with_expected_version("game", Some("20250101")), + vec![expected_addr] + ); + } + #[test] fn transport_addr_matches_known_peer_on_ephemeral_port() { let advertised = ip_addr([10, 66, 0, 2], 40000); @@ -979,7 +1111,7 @@ mod tests { } #[test] - fn validation_uses_latest_version_file_metadata() { + fn validation_uses_expected_version_file_metadata() { let old_addr = addr(12003); let new_addr = addr(12004); let mut db = PeerGameDB::new(); @@ -1010,21 +1142,21 @@ mod tests { ], ); - let aggregated = db.aggregated_game_files("game"); + let aggregated = db.aggregated_game_files("game", Some("20250101")); let archive = aggregated .iter() .find(|desc| desc.relative_path == "game/archive.eti") - .expect("latest archive should be present"); + .expect("expected-version archive should be present"); assert_eq!(archive.size, 20); let (validated, peers, file_peer_map) = db - .validate_file_sizes_majority("game") + .validate_file_sizes_majority("game", Some("20250101")) .expect("old-version file metadata should not create ambiguity"); assert_eq!(peers, vec![new_addr]); let archive = validated .iter() .find(|desc| desc.relative_path == "game/archive.eti") - .expect("latest archive should validate"); + .expect("expected-version archive should validate"); assert_eq!(archive.size, 20); assert_eq!(file_peer_map.get("game/archive.eti"), Some(&vec![new_addr])); } diff --git a/crates/lanspread-peer/src/services/handshake.rs b/crates/lanspread-peer/src/services/handshake.rs index 90ef9a0..f147582 100644 --- a/crates/lanspread-peer/src/services/handshake.rs +++ b/crates/lanspread-peer/src/services/handshake.rs @@ -2,6 +2,7 @@ use std::{net::SocketAddr, sync::Arc}; +use lanspread_db::db::GameCatalog; use lanspread_proto::{Hello, HelloAck, PROTOCOL_VERSION}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; @@ -22,6 +23,7 @@ pub(crate) struct HandshakeCtx { local_library: Arc>, peer_game_db: Arc>, tx_notify_ui: UnboundedSender, + catalog: Arc>, } impl HandshakeCtx { @@ -32,6 +34,7 @@ impl HandshakeCtx { local_library: ctx.local_library.clone(), peer_game_db: ctx.peer_game_db.clone(), tx_notify_ui: tx_notify_ui.clone(), + catalog: ctx.catalog.clone(), } } @@ -42,6 +45,7 @@ impl HandshakeCtx { local_library: ctx.local_library.clone(), peer_game_db: ctx.peer_game_db.clone(), tx_notify_ui: ctx.tx_notify_ui.clone(), + catalog: ctx.catalog.clone(), } } } @@ -121,7 +125,7 @@ pub(crate) async fn perform_handshake_with_peer( .await; after_peer_library_recorded(&ctx, upsert, record_addr).await; - events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; + events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await; Ok(()) } @@ -156,7 +160,7 @@ pub(super) async fn accept_inbound_hello( .await; after_peer_library_recorded(&handshake_ctx, upsert, addr).await; - events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; + events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await; build_hello_ack(ctx).await } @@ -201,12 +205,13 @@ async fn after_peer_library_recorded( #[cfg(test)] mod tests { use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, }; + use lanspread_db::db::GameCatalog; use lanspread_proto::{Availability, GameSummary, Hello, LibrarySnapshot, PROTOCOL_VERSION}; use tokio::sync::{RwLock, mpsc}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -242,6 +247,7 @@ mod tests { local_library: Arc::new(RwLock::new(LocalLibraryState::empty())), peer_game_db, tx_notify_ui, + catalog: Arc::new(RwLock::new(GameCatalog::empty())), } } @@ -301,6 +307,8 @@ mod tests { #[tokio::test] async fn inbound_hello_applies_remote_library_snapshot() { let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); + let mut catalog = GameCatalog::empty(); + catalog.insert("remote-game".to_string(), Some("20250101".to_string())); let ctx = Ctx::new( peer_game_db.clone(), "local-peer".to_string(), @@ -309,7 +317,8 @@ mod tests { Arc::new(NoopUnpacker), CancellationToken::new(), TaskTracker::new(), - Arc::new(RwLock::new(HashSet::new())), + Arc::new(RwLock::new(catalog)), + Arc::new(RwLock::new(HashMap::new())), ); *ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000)); diff --git a/crates/lanspread-peer/src/services/liveness.rs b/crates/lanspread-peer/src/services/liveness.rs index f9dcc5b..fce904c 100644 --- a/crates/lanspread-peer/src/services/liveness.rs +++ b/crates/lanspread-peer/src/services/liveness.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; +use lanspread_db::db::GameCatalog; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -18,6 +19,7 @@ use crate::{ pub async fn run_ping_service( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, + catalog: Arc>, active_operations: Arc>>, active_downloads: Arc>>, shutdown: CancellationToken, @@ -40,6 +42,7 @@ pub async fn run_ping_service( ping_idle_peers( &peer_game_db, + &catalog, &active_operations, &active_downloads, &tx_notify_ui, @@ -50,6 +53,7 @@ pub async fn run_ping_service( prune_stale_peers( &peer_game_db, + &catalog, &active_operations, &active_downloads, &tx_notify_ui, @@ -60,6 +64,7 @@ pub async fn run_ping_service( async fn ping_idle_peers( peer_game_db: &Arc>, + catalog: &Arc>, active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, @@ -75,6 +80,7 @@ async fn ping_idle_peers( let tx_notify_ui = tx_notify_ui.clone(); let peer_game_db = peer_game_db.clone(); + let catalog = catalog.clone(); let active_operations = active_operations.clone(); let active_downloads = active_downloads.clone(); let shutdown = shutdown.clone(); @@ -93,6 +99,7 @@ async fn ping_idle_peers( log::warn!("Peer {peer_addr} failed ping check"); remove_peer_and_refresh( &peer_game_db, + &catalog, &active_operations, &active_downloads, &tx_notify_ui, @@ -105,6 +112,7 @@ async fn ping_idle_peers( log::error!("Failed to ping peer {peer_addr}: {err}"); remove_peer_and_refresh( &peer_game_db, + &catalog, &active_operations, &active_downloads, &tx_notify_ui, @@ -120,6 +128,7 @@ async fn ping_idle_peers( async fn prune_stale_peers( peer_game_db: &Arc>, + catalog: &Arc>, active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, @@ -137,7 +146,7 @@ async fn prune_stale_peers( } if removed_any { - events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; + events::emit_peer_game_list(peer_game_db, catalog, tx_notify_ui).await; handle_active_downloads_without_peers( peer_game_db, active_operations, @@ -150,6 +159,7 @@ async fn prune_stale_peers( async fn remove_peer_and_refresh( peer_game_db: &Arc>, + catalog: &Arc>, active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, @@ -157,7 +167,7 @@ async fn remove_peer_and_refresh( log_label: &str, ) { if remove_peer(peer_game_db, tx_notify_ui, peer_id, log_label).await { - events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; + events::emit_peer_game_list(peer_game_db, catalog, tx_notify_ui).await; handle_active_downloads_without_peers( peer_game_db, active_operations, diff --git a/crates/lanspread-peer/src/services/local_monitor.rs b/crates/lanspread-peer/src/services/local_monitor.rs index c09122e..2885539 100644 --- a/crates/lanspread-peer/src/services/local_monitor.rs +++ b/crates/lanspread-peer/src/services/local_monitor.rs @@ -336,12 +336,12 @@ fn should_ignore_game_child(name: &str) -> bool { #[cfg(test)] mod tests { use std::{ - collections::HashSet, path::{Path, PathBuf}, sync::Arc, time::Duration, }; + use lanspread_db::db::GameCatalog; use notify::{ EventKind, event::{AccessKind, AccessMode}, @@ -373,7 +373,7 @@ mod tests { std::fs::write(path, bytes).expect("file should be written"); } - fn test_ctx(game_dir: PathBuf, catalog: HashSet) -> Ctx { + fn test_ctx(game_dir: PathBuf, catalog: GameCatalog) -> Ctx { Ctx::new( Arc::new(RwLock::new(PeerGameDB::new())), "peer".to_string(), @@ -383,6 +383,7 @@ mod tests { CancellationToken::new(), TaskTracker::new(), Arc::new(RwLock::new(catalog)), + Arc::new(RwLock::new(std::collections::HashMap::new())), ) } @@ -445,7 +446,7 @@ mod tests { let temp = TempDir::new("lanspread-local-monitor"); let ctx = test_ctx( temp.path().to_path_buf(), - HashSet::from(["game".to_string()]), + GameCatalog::from_ids(["game".to_string()]), ); ctx.active_operations .write() @@ -480,7 +481,7 @@ mod tests { write_file(&temp.path().join("game").join("version.ini"), b"20250101"); let ctx = test_ctx( temp.path().to_path_buf(), - HashSet::from(["game".to_string()]), + GameCatalog::from_ids(["game".to_string()]), ); let gate = RescanGate::default(); let (tx, mut rx) = mpsc::unbounded_channel(); @@ -515,7 +516,7 @@ mod tests { write_file(&game_root.join("version.ini"), b"20250101"); let ctx = test_ctx( temp.path().to_path_buf(), - HashSet::from(["game".to_string()]), + GameCatalog::from_ids(["game".to_string()]), ); let gate = RescanGate::default(); let (tx, mut rx) = mpsc::unbounded_channel(); @@ -551,7 +552,7 @@ mod tests { write_file(&temp.path().join("game").join("version.ini"), b"20250101"); let ctx = test_ctx( temp.path().to_path_buf(), - HashSet::from(["game".to_string()]), + GameCatalog::from_ids(["game".to_string()]), ); let (tx, mut rx) = mpsc::unbounded_channel(); @@ -575,7 +576,7 @@ mod tests { ); let ctx = test_ctx( temp.path().to_path_buf(), - HashSet::from(["game".to_string()]), + GameCatalog::from_ids(["game".to_string()]), ); let (tx, mut rx) = mpsc::unbounded_channel(); diff --git a/crates/lanspread-peer/src/services/stream.rs b/crates/lanspread-peer/src/services/stream.rs index 39d540c..be8040d 100644 --- a/crates/lanspread-peer/src/services/stream.rs +++ b/crates/lanspread-peer/src/services/stream.rs @@ -12,7 +12,7 @@ use crate::{ context::PeerCtx, error::PeerError, events, - local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available}, + local_games::{get_game_file_descriptions, is_local_dir_name, local_download_matches_catalog}, peer::{send_game_file_chunk, send_game_file_data}, services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync}, }; @@ -162,7 +162,7 @@ async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelt }; if applied { - events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; + events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await; } else { let addr = { let db = ctx.peer_game_db.read().await; @@ -209,7 +209,7 @@ async fn get_game_response(ctx: &PeerCtx, id: String) -> Response { async fn can_serve_game(ctx: &PeerCtx, game_dir: &std::path::Path, game_id: &str) -> bool { let active_operations = ctx.active_operations.read().await; let catalog = ctx.catalog.read().await; - local_download_available(game_dir, game_id, &active_operations, &catalog).await + local_download_matches_catalog(game_dir, game_id, &active_operations, &catalog).await } async fn can_dispatch_file_transfer( @@ -232,6 +232,67 @@ fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool { } } +use std::sync::atomic::{AtomicU64, Ordering}; + +static NEXT_TRANSFER_ID: AtomicU64 = AtomicU64::new(1); + +struct TransferGuard { + game_id: String, + id: u64, + active_outbound_transfers: crate::context::OutboundTransfers, + tx_notify_ui: tokio::sync::mpsc::UnboundedSender, +} + +impl TransferGuard { + async fn new( + game_id: String, + active_outbound_transfers: crate::context::OutboundTransfers, + tx_notify_ui: tokio::sync::mpsc::UnboundedSender, + shutdown: &tokio_util::sync::CancellationToken, + ) -> (Self, tokio_util::sync::CancellationToken) { + let id = NEXT_TRANSFER_ID.fetch_add(1, Ordering::SeqCst); + let token = shutdown.child_token(); + { + let mut active = active_outbound_transfers.write().await; + active + .entry(game_id.clone()) + .or_default() + .push((id, token.clone())); + } + let _ = tx_notify_ui.send(crate::PeerEvent::OutboundTransferCountChanged); + ( + Self { + game_id, + id, + active_outbound_transfers, + tx_notify_ui, + }, + token, + ) + } +} + +impl Drop for TransferGuard { + fn drop(&mut self) { + let game_id = self.game_id.clone(); + let id = self.id; + let active_outbound_transfers = self.active_outbound_transfers.clone(); + let tx_notify_ui = self.tx_notify_ui.clone(); + tokio::spawn(async move { + { + let mut active = active_outbound_transfers.write().await; + if let Some(tokens) = active.get_mut(&game_id) { + tokens.retain(|(tid, _)| *tid != id); + if tokens.is_empty() { + active.remove(&game_id); + } + } + } + let _ = tx_notify_ui.send(crate::PeerEvent::OutboundTransferCountChanged); + }); + } +} + async fn handle_file_data_request( ctx: &PeerCtx, desc: GameFileDescription, @@ -242,6 +303,14 @@ async fn handle_file_data_request( desc.relative_path ); + let (guard, cancel_token) = TransferGuard::new( + desc.game_id.clone(), + ctx.active_outbound_transfers.clone(), + ctx.tx_notify_ui.clone(), + &ctx.shutdown, + ) + .await; + let mut tx = framed_tx.into_inner(); let game_dir = ctx.game_dir.read().await.clone(); if !can_dispatch_file_transfer(ctx, &game_dir, &desc.game_id, &desc.relative_path).await { @@ -249,11 +318,13 @@ async fn handle_file_data_request( "Declining GetGameFileData for {} because the game is not currently transferable", desc.relative_path ); + drop(guard); let _ = tx.close().await; return FramedWrite::new(tx, LengthDelimitedCodec::new()); } - send_game_file_data(&desc, &mut tx, &game_dir).await; + send_game_file_data(&desc, &mut tx, &game_dir, cancel_token).await; + drop(guard); FramedWrite::new(tx, LengthDelimitedCodec::new()) } @@ -269,17 +340,36 @@ async fn handle_file_chunk_request( "Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})" ); + let (guard, cancel_token) = TransferGuard::new( + game_id.clone(), + ctx.active_outbound_transfers.clone(), + ctx.tx_notify_ui.clone(), + &ctx.shutdown, + ) + .await; + let mut tx = framed_tx.into_inner(); let game_dir = ctx.game_dir.read().await.clone(); if !can_dispatch_file_transfer(ctx, &game_dir, &game_id, &relative_path).await { log::info!( "Declining GetGameFileChunk for {relative_path} because the game is not currently transferable" ); + drop(guard); let _ = tx.close().await; return FramedWrite::new(tx, LengthDelimitedCodec::new()); } - send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &game_dir).await; + send_game_file_chunk( + &game_id, + &relative_path, + offset, + length, + &mut tx, + &game_dir, + cancel_token, + ) + .await; + drop(guard); FramedWrite::new(tx, LengthDelimitedCodec::new()) } @@ -289,17 +379,17 @@ async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option, peer_id let Some(peer) = removed else { return }; events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, peer.addr).await; - events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; + events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await; } #[cfg(test)] mod tests { use std::{ - collections::HashSet, path::{Path, PathBuf}, sync::Arc, }; + use lanspread_db::db::GameCatalog; use tokio::sync::{RwLock, mpsc}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -327,7 +417,7 @@ mod tests { std::fs::write(path, bytes).expect("file should be written"); } - fn test_ctx(game_dir: PathBuf, catalog: HashSet) -> PeerCtx { + fn test_ctx(game_dir: PathBuf, catalog: GameCatalog) -> PeerCtx { let (tx_notify_ui, _rx) = mpsc::unbounded_channel(); Ctx::new( Arc::new(RwLock::new(PeerGameDB::new())), @@ -338,6 +428,7 @@ mod tests { CancellationToken::new(), TaskTracker::new(), Arc::new(RwLock::new(catalog)), + Arc::new(RwLock::new(std::collections::HashMap::new())), ) .to_peer_ctx(tx_notify_ui) } @@ -360,17 +451,19 @@ mod tests { b"20250101", ); write_file(&temp.path().join("active").join("version.ini"), b"20250101"); + write_file( + &temp.path().join("wrong-version").join("version.ini"), + b"20260101", + ); std::fs::create_dir_all(temp.path().join("missing-sentinel")) .expect("missing sentinel root should be created"); - let ctx = test_ctx( - temp.path().to_path_buf(), - HashSet::from([ - "ready".to_string(), - "active".to_string(), - "missing-sentinel".to_string(), - ]), - ); + let mut catalog = GameCatalog::empty(); + catalog.insert("ready".to_string(), Some("20250101".to_string())); + catalog.insert("active".to_string(), Some("20250101".to_string())); + catalog.insert("missing-sentinel".to_string(), Some("20250101".to_string())); + catalog.insert("wrong-version".to_string(), Some("20250101".to_string())); + let ctx = test_ctx(temp.path().to_path_buf(), catalog); ctx.active_operations .write() .await @@ -388,6 +481,10 @@ mod tests { get_game_response(&ctx, "active".to_string()).await, Response::GameNotFound(id) if id == "active" )); + assert!(matches!( + get_game_response(&ctx, "wrong-version".to_string()).await, + Response::GameNotFound(id) if id == "wrong-version" + )); assert!(matches!( get_game_response(&ctx, "missing-sentinel".to_string()).await, Response::GameNotFound(id) if id == "missing-sentinel" @@ -403,17 +500,19 @@ mod tests { b"20250101", ); write_file(&temp.path().join("active").join("version.ini"), b"20250101"); + write_file( + &temp.path().join("wrong-version").join("version.ini"), + b"20260101", + ); std::fs::create_dir_all(temp.path().join("missing-sentinel")) .expect("missing sentinel root should be created"); - let ctx = test_ctx( - temp.path().to_path_buf(), - HashSet::from([ - "ready".to_string(), - "active".to_string(), - "missing-sentinel".to_string(), - ]), - ); + let mut catalog = GameCatalog::empty(); + catalog.insert("ready".to_string(), Some("20250101".to_string())); + catalog.insert("active".to_string(), Some("20250101".to_string())); + catalog.insert("missing-sentinel".to_string(), Some("20250101".to_string())); + catalog.insert("wrong-version".to_string(), Some("20250101".to_string())); + let ctx = test_ctx(temp.path().to_path_buf(), catalog); ctx.active_operations .write() .await @@ -432,6 +531,15 @@ mod tests { assert!( !can_dispatch_file_transfer(&ctx, temp.path(), "active", "active/version.ini").await ); + assert!( + !can_dispatch_file_transfer( + &ctx, + temp.path(), + "wrong-version", + "wrong-version/version.ini", + ) + .await + ); assert!( !can_dispatch_file_transfer( &ctx, diff --git a/crates/lanspread-peer/src/startup.rs b/crates/lanspread-peer/src/startup.rs index edb993d..2ab4571 100644 --- a/crates/lanspread-peer/src/startup.rs +++ b/crates/lanspread-peer/src/startup.rs @@ -11,6 +11,7 @@ use std::{ }; use futures::FutureExt as _; +use lanspread_db::db::GameCatalog; use tokio::sync::{ RwLock, mpsc::{UnboundedReceiver, UnboundedSender}, @@ -84,7 +85,8 @@ pub(crate) fn spawn_peer_runtime( game_dir: PathBuf, state_dir: PathBuf, unpacker: Arc, - catalog: Arc>>, + catalog: Arc>, + active_outbound_transfers: crate::context::OutboundTransfers, ) -> PeerRuntimeHandle { let shutdown = CancellationToken::new(); let task_tracker = TaskTracker::new(); @@ -104,6 +106,7 @@ pub(crate) fn spawn_peer_runtime( runtime_shutdown.clone(), runtime_tracker.clone(), catalog, + active_outbound_transfers, ) .await { @@ -190,6 +193,7 @@ fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { let tx_notify_ui = tx_notify_ui.clone(); let peer_game_db = ctx.peer_game_db.clone(); + let catalog = ctx.catalog.clone(); let active_operations = ctx.active_operations.clone(); let active_downloads = ctx.active_downloads.clone(); let shutdown = ctx.shutdown.clone(); @@ -207,6 +211,7 @@ fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender>>, +>; + /// Tauri-managed runtime state shared by commands and setup tasks. #[derive(Default)] struct LanSpreadState { @@ -40,9 +44,10 @@ struct LanSpreadState { active_operations: Arc>>, games_folder: Arc>, peer_game_db: Arc>, - catalog: Arc>>, + catalog: Arc>, unpack_logs: Arc>>, state_dir: OnceLock, + active_outbound_transfers: OutboundTransfers, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -79,6 +84,7 @@ struct LauncherGame { #[serde(flatten)] game: Game, can_host_server: bool, + active_outbound_transfers: usize, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -829,6 +835,24 @@ fn apply_peer_local_games(game_db: &mut GameDB, local_games: &[Game]) { } } +fn apply_peer_remote_games(game_db: &mut GameDB, peer_games: Vec) { + // Peer events update availability, but catalog metadata stays anchored to game.db. + for game in game_db.games.values_mut() { + game.peer_count = 0; + } + + for peer_game in peer_games { + if let Some(existing) = game_db.get_mut_game_by_id(&peer_game.id) { + existing.peer_count = peer_game.peer_count; + } else { + log::debug!( + "Peer advertised unknown game {id}; ignoring because game.db is ground truth", + id = peer_game.id + ); + } + } +} + fn clear_all_local_game_states(game_db: &mut GameDB) { for game in game_db.games.values_mut() { clear_local_game_state(game); @@ -847,17 +871,24 @@ async fn emit_games_list(app_handle: &AppHandle) { return; } + let active_transfers = state.active_outbound_transfers.read().await; + let games_to_emit = game_db .all_games() .into_iter() .cloned() - .map(|game| LauncherGame { - can_host_server: game_can_host_server(&games_folder, &game), - game, + .map(|game| { + let active_outbound_transfers = active_transfers.get(&game.id).map_or(0, Vec::len); + LauncherGame { + can_host_server: game_can_host_server(&games_folder, &game), + active_outbound_transfers, + game, + } }) .collect::>(); drop(game_db); + drop(active_transfers); let active_operations = { let active_operations = state.active_operations.read().await; @@ -996,36 +1027,7 @@ async fn update_game_db(games: Vec, app: AppHandle) { { let mut game_db = state.games.write().await; - - // Reset peer counts up front. Presence/metadata stay anchored to the baked game.db. - for game in game_db.games.values_mut() { - game.peer_count = 0; - } - - for peer_game in games { - if let Some(existing) = game_db.get_mut_game_by_id(&peer_game.id) { - existing.peer_count = peer_game.peer_count; - - if let Some(peer_version) = &peer_game.eti_game_version { - match &existing.eti_game_version { - Some(current_version) if current_version >= peer_version => {} - _ => { - existing.eti_game_version = Some(peer_version.clone()); - log::debug!( - "Updated eti_game_version for {} to {} based on peer data", - peer_game.id, - peer_version - ); - } - } - } - } else { - log::debug!( - "Peer advertised unknown game {id}; ignoring because game.db is ground truth", - id = peer_game.id - ); - } - } + apply_peer_remote_games(&mut game_db, games); } emit_games_list(&app).await; @@ -1399,7 +1401,7 @@ async fn ensure_bundled_game_db_loaded(app_handle: &AppHandle) { if needs_load { let game_db = load_bundled_game_db(app_handle).await; - let catalog = game_db.games.keys().cloned().collect::>(); + let catalog = GameCatalog::from_game_db(&game_db); *state.games.write().await = game_db; *state.catalog.write().await = catalog; } @@ -1432,6 +1434,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) { state.catalog.clone(), PeerStartOptions { state_dir: Some(state_dir), + active_outbound_transfers: Some(state.active_outbound_transfers.clone()), }, ) { Ok(handle) => { @@ -1495,6 +1498,10 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } emit_games_list(app_handle).await; } + PeerEvent::OutboundTransferCountChanged => { + log::info!("PeerEvent::OutboundTransferCountChanged received"); + emit_games_list(app_handle).await; + } PeerEvent::GotGameFiles { id, file_descriptions, @@ -1747,6 +1754,33 @@ mod tests { } } + fn eti_game_fixture(game_id: &str, game_version: &str) -> lanspread_compat::eti::EtiGame { + lanspread_compat::eti::EtiGame { + game_id: game_id.to_string(), + game_title: "Catalog Game".to_string(), + game_key: "catalog-game".to_string(), + game_release: "2000".to_string(), + game_publisher: "publisher".to_string(), + game_size: 1.0, + game_readme_de: "description".to_string(), + game_readme_en: "description".to_string(), + game_readme_fr: "description".to_string(), + game_maxplayers: 4, + game_master_req: 0, + genre_de: "genre".to_string(), + game_version: game_version.to_string(), + } + } + + #[test] + fn eti_game_conversion_uses_catalog_version_as_authoritative_eti_version() { + let game = Game::from(eti_game_fixture("alpha", "20200721")); + + assert_eq!(game.version, "20200721"); + assert_eq!(game.eti_game_version.as_deref(), Some("20200721")); + assert_eq!(game.local_version, None); + } + #[test] fn terminal_log_cleanup_preserves_crlf_and_collapses_redrawn_lines() { let input = "Extracting foo 10%\rExtracting foo 80%\rExtracting foo OK\r\nAll done\r\n"; @@ -2048,6 +2082,42 @@ mod tests { assert!(game_db.get_game_by_id("unknown").is_none()); } + + #[test] + fn peer_remote_snapshot_updates_counts_without_overwriting_catalog_version() { + let mut alpha = game_fixture("alpha", "Catalog Alpha"); + alpha.size = 999; + alpha.eti_game_version = Some("20200721".to_string()); + + let mut beta = game_fixture("beta", "Catalog Beta"); + beta.peer_count = 2; + beta.eti_game_version = Some("20200101".to_string()); + + let mut game_db = GameDB::from(vec![alpha, beta]); + + let mut peer_alpha = game_fixture("alpha", "Peer Alpha"); + peer_alpha.size = 42; + peer_alpha.peer_count = 3; + peer_alpha.eti_game_version = Some("20990101".to_string()); + + let mut unknown = game_fixture("unknown", "Unknown"); + unknown.peer_count = 1; + unknown.eti_game_version = Some("20990101".to_string()); + + apply_peer_remote_games(&mut game_db, vec![peer_alpha, unknown]); + + let alpha = game_db.get_game_by_id("alpha").expect("alpha remains"); + assert_eq!(alpha.name, "Catalog Alpha"); + assert_eq!(alpha.size, 999); + assert_eq!(alpha.peer_count, 3); + assert_eq!(alpha.eti_game_version.as_deref(), Some("20200721")); + + let beta = game_db.get_game_by_id("beta").expect("beta remains"); + assert_eq!(beta.peer_count, 0); + assert_eq!(beta.eti_game_version.as_deref(), Some("20200101")); + + assert!(game_db.get_game_by_id("unknown").is_none()); + } } #[allow(clippy::missing_panics_doc)] diff --git a/crates/lanspread-tauri-deno-ts/src/components/grid/GameCard.tsx b/crates/lanspread-tauri-deno-ts/src/components/grid/GameCard.tsx index a79a8be..52ce025 100644 --- a/crates/lanspread-tauri-deno-ts/src/components/grid/GameCard.tsx +++ b/crates/lanspread-tauri-deno-ts/src/components/grid/GameCard.tsx @@ -3,6 +3,7 @@ import { JSX, KeyboardEvent } from 'react'; import { Game } from '../../lib/types'; import { CoverAspect } from '../../hooks/useSettings'; import { formatBytes } from '../../lib/format'; +import { hasNewerLocalVersion } from '../../lib/gameState'; import { GameCover } from './GameCover'; import { StateChip } from '../StateChip'; @@ -42,6 +43,14 @@ export const GameCard = ({ onOpen(game); } }; + const newerThanExpected = hasNewerLocalVersion(game); + const hasOutbound = game.active_outbound_transfers !== undefined && game.active_outbound_transfers > 0; + const statusMessage = hasOutbound + ? `Sharing to ${game.active_outbound_transfers} peer${game.active_outbound_transfers === 1 ? '' : 's'}` + : (game.status_message ?? (newerThanExpected ? 'Newer than expected' : '')); + const statusLevel = hasOutbound + ? 'info' + : (game.status_level ?? (newerThanExpected ? 'warning' : undefined)); return (