diff --git a/IMPL_DECISIONS.md b/IMPL_DECISIONS.md index 01c23cb..c24cc60 100644 --- a/IMPL_DECISIONS.md +++ b/IMPL_DECISIONS.md @@ -22,8 +22,9 @@ Tauri by sending `PeerCommand::InstallGame` directly. A not-downloaded game still uses `GetGame`, and the peer auto-installs after the sentinel commit. - Removed the dead internal `PeerCommand::UpdateGame` path. The UI update button - intentionally sends `GetGame`, and the peer infers install versus update from - the presence of `local/` after archives are available. + now sends `FetchLatestFromPeers`, which skips local manifest serving and asks + latest-version peers for fresh file metadata before the normal download and + update transaction runs. - Kept `Availability::Downloading` in the wire protocol for compatibility, but local summaries do not emit it today because active operations are gated out of scans and serving decisions. diff --git a/crates/lanspread-peer/README.md b/crates/lanspread-peer/README.md index 658469a..fbb54dd 100644 --- a/crates/lanspread-peer/README.md +++ b/crates/lanspread-peer/README.md @@ -13,8 +13,8 @@ It is designed to run headless – other crates (most notably of the peer crate's platform layer, and the catalog set gates which local game roots are announced or served. - `PeerCommand` represents the small control surface exposed to the UI layer: - `ListGames`, `GetGame`, `DownloadGameFiles`, `InstallGame`, - `UninstallGame`, and `SetGameDir`. + `ListGames`, `GetGame`, `FetchLatestFromPeers`, `DownloadGameFiles`, + `InstallGame`, `UninstallGame`, `SetGameDir`, and `GetPeerCount`. - `PeerEvent` enumerates everything the peer runtime reports back to the UI: library snapshots, download/install/uninstall lifecycle updates, runtime failures, and peer membership changes. @@ -57,9 +57,10 @@ lifetime of the process: When the UI asks to download a game: -1. The UI first issues `PeerCommand::GetGame`. Each peer that still reports the - game is queried via `request_game_details_from_peer`, and their file - manifests are merged inside `PeerGameDB`. +1. The UI first issues `PeerCommand::GetGame` for a new download, or + `PeerCommand::FetchLatestFromPeers` for an update that must bypass local + archives. The selected peers are queried via `request_game_details_from_peer`, + and their file manifests are merged inside `PeerGameDB`. 2. Once the UI receives `PeerEvent::GotGameFiles`, it forwards the selected file list back with `PeerCommand::DownloadGameFiles`. 3. `download_game_files` starts a version-sentinel transaction, parks any old diff --git a/crates/lanspread-peer/src/context.rs b/crates/lanspread-peer/src/context.rs index 02d0dba..85e8bb6 100644 --- a/crates/lanspread-peer/src/context.rs +++ b/crates/lanspread-peer/src/context.rs @@ -124,6 +124,7 @@ pub(crate) struct OperationGuard { active_operations: Arc>>, active_downloads: Arc>>, clears_download: bool, + armed: bool, } impl OperationGuard { @@ -136,6 +137,7 @@ impl OperationGuard { active_operations, active_downloads: Arc::new(RwLock::new(HashMap::new())), clears_download: false, + armed: true, } } @@ -149,13 +151,26 @@ impl OperationGuard { active_operations, active_downloads, clears_download: true, + armed: true, } } + + pub(crate) fn disarm(mut self) { + self.armed = false; + } } impl Drop for OperationGuard { fn drop(&mut self) { + if !self.armed { + return; + } + let id = self.id.clone(); + log::error!( + "Operation guard is cleaning up {id}; operation ended without explicit state cleanup" + ); + if let Ok(mut guard) = self.active_operations.try_write() { guard.remove(&id); } else if let Ok(handle) = tokio::runtime::Handle::try_current() { @@ -238,7 +253,7 @@ mod tests { } #[tokio::test] - async fn operation_guard_clears_tracking_on_completion() { + async fn operation_guard_cleans_tracking_when_not_disarmed() { let id = "game-complete"; let (active_operations, active_downloads, _) = tracked_download_state(id); @@ -252,7 +267,7 @@ mod tests { } #[tokio::test] - async fn operation_guard_clears_tracking_after_cancellation() { + async fn operation_guard_cleans_tracking_after_cancellation() { let id = "game-cancelled"; let (active_operations, active_downloads, cancel) = tracked_download_state(id); cancel.cancel(); @@ -267,7 +282,23 @@ mod tests { } #[tokio::test] - async fn operation_guard_clears_tracking_when_task_is_dropped() { + async fn disarmed_operation_guard_does_not_clean_tracking() { + let id = "game-finished"; + let (active_operations, active_downloads, _) = tracked_download_state(id); + + OperationGuard::download( + id.to_string(), + active_operations.clone(), + active_downloads.clone(), + ) + .disarm(); + + assert!(active_operations.read().await.contains_key(id)); + assert!(active_downloads.read().await.contains_key(id)); + } + + #[tokio::test] + async fn operation_guard_cleans_tracking_when_task_is_dropped() { let id = "game-aborted"; let (active_operations, active_downloads, _) = tracked_download_state(id); let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index e58b94e..28490f6 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -2,6 +2,7 @@ use std::{ collections::{HashSet, hash_map::Entry}, + future::Future, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -80,17 +81,21 @@ async fn try_serve_local_game( } /// Handles the `GetGame` command. -pub async fn handle_get_game_command( +pub(crate) async fn handle_get_game_command( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String, + source: GameDetailSource, ) { - if try_serve_local_game(ctx, tx_notify_ui, &id).await { + if source.allows_local() && try_serve_local_game(ctx, tx_notify_ui, &id).await { return; } log::info!("Requesting game from peers: {id}"); - let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) }; + let peers = { + let peer_game_db = ctx.peer_game_db.read().await; + source.select_peers(&peer_game_db, &id) + }; if peers.is_empty() { log::warn!("No peers have game {id}"); if let Err(e) = tx_notify_ui.send(PeerEvent::NoPeersHaveGame { id: id.clone() }) { @@ -101,33 +106,34 @@ pub async fn handle_get_game_command( let peer_game_db = ctx.peer_game_db.clone(); let tx_notify_ui = tx_notify_ui.clone(); - ctx.task_tracker.spawn(async move { - let mut fetched_any = false; - for peer_addr in peers { - match request_game_details_and_update(peer_addr, &id, peer_game_db.clone()).await { - Ok(_) => { - log::info!("Fetched game file list for {id} from peer {peer_addr}"); - fetched_any = true; - } - Err(e) => { - log::error!("Failed to fetch game files for {id} from {peer_addr}: {e}"); - } - } - } + ctx.task_tracker.spawn(fetch_game_details_from_peers( + peers, + id, + peer_game_db, + tx_notify_ui, + |peer_addr, game_id, peer_game_db| async move { + request_game_details_and_update(peer_addr, &game_id, peer_game_db).await + }, + )); +} - if fetched_any { - let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) }; +#[derive(Clone, Copy, Debug)] +pub(crate) enum GameDetailSource { + LocalOrPeers, + LatestPeersOnly, +} - if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { - id: id.clone(), - file_descriptions: aggregated_files, - }) { - log::error!("Failed to send GotGameFiles event: {e}"); - } - } else { - log::warn!("Failed to retrieve game files for {id} from any peer"); +impl GameDetailSource { + fn allows_local(self) -> bool { + matches!(self, Self::LocalOrPeers) + } + + fn select_peers(self, peer_game_db: &PeerGameDB, id: &str) -> Vec { + match self { + Self::LocalOrPeers => peer_game_db.peers_with_game(id), + Self::LatestPeersOnly => peer_game_db.peers_with_latest_version(id), } - }); + } } /// Requests game details from a peer and updates the peer game database. @@ -147,6 +153,43 @@ async fn request_game_details_and_update( Ok(file_descriptions) } +async fn fetch_game_details_from_peers( + peers: Vec, + id: String, + peer_game_db: Arc>, + tx_notify_ui: UnboundedSender, + mut fetch_details: F, +) where + F: FnMut(SocketAddr, String, Arc>) -> Fut + Send + 'static, + Fut: Future>> + Send, +{ + let mut fetched_any = false; + for peer_addr in peers { + match fetch_details(peer_addr, id.clone(), peer_game_db.clone()).await { + Ok(_) => { + log::info!("Fetched game file list for {id} from peer {peer_addr}"); + fetched_any = true; + } + Err(e) => { + log::error!("Failed to fetch game files for {id} from {peer_addr}: {e}"); + } + } + } + + if fetched_any { + let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) }; + + if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { + id: id.clone(), + file_descriptions: aggregated_files, + }) { + log::error!("Failed to send GotGameFiles event: {e}"); + } + } else { + log::warn!("Failed to retrieve game files for {id} from any peer"); + } +} + /// Handles the `DownloadGameFiles` command. #[allow(clippy::too_many_lines)] pub async fn handle_download_game_files_command( @@ -252,27 +295,49 @@ pub async fn handle_download_game_files_command( .insert(id, cancel_token.clone()); ctx.task_tracker.spawn(async move { - let result = { - let _download_state_guard = - OperationGuard::download(download_id.clone(), active_operations, active_downloads); + let download_state_guard = + OperationGuard::download(download_id.clone(), active_operations, active_downloads); - download_game_files( - &download_id, - resolved_descriptions, - games_folder, - peer_whitelist, - file_peer_map, - tx_notify_ui_clone.clone(), - cancel_token, - ) - .await - }; + let result = download_game_files( + &download_id, + resolved_descriptions, + games_folder, + peer_whitelist, + file_peer_map, + tx_notify_ui_clone.clone(), + cancel_token, + ) + .await; match result { Ok(()) => { - run_install_operation(&ctx_clone, &tx_notify_ui_clone, download_id).await; + let Some(prepared) = + prepare_install_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await + else { + end_download_operation(&ctx_clone, &download_id).await; + download_state_guard.disarm(); + return; + }; + + if transition_download_to_install(&ctx_clone, &download_id, prepared.operation_kind) + .await + { + clear_active_download(&ctx_clone, &download_id).await; + run_started_install_operation( + &ctx_clone, + &tx_notify_ui_clone, + download_id, + prepared, + ) + .await; + } else { + clear_active_download(&ctx_clone, &download_id).await; + } + download_state_guard.disarm(); } Err(e) => { + end_download_operation(&ctx_clone, &download_id).await; + download_state_guard.disarm(); log::error!("Download failed for {download_id}: {e}"); } } @@ -310,16 +375,42 @@ fn spawn_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, } async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { - if !catalog_contains(ctx, &id).await { - log::warn!("Ignoring install command for non-catalog game {id}"); + let Some(prepared) = prepare_install_operation(ctx, tx_notify_ui, &id).await else { + return; + }; + + if !begin_operation(ctx, &id, prepared.operation_kind).await { + log::warn!("Operation for {id} already in progress; ignoring install command"); return; } - let game_root = { ctx.game_dir.read().await.join(&id) }; + run_started_install_operation(ctx, tx_notify_ui, id, prepared).await; +} + +struct PreparedInstallOperation { + game_root: PathBuf, + operation: InstallOperation, + operation_kind: OperationKind, +} + +async fn prepare_install_operation( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: &str, +) -> Option { + if !catalog_contains(ctx, id).await { + log::warn!("Ignoring install command for non-catalog game {id}"); + return None; + } + + let game_root = { ctx.game_dir.read().await.join(id) }; if !version_ini_is_regular_file(&game_root).await { log::warn!("Ignoring install command for {id}: version.ini sentinel is absent"); - events::send(tx_notify_ui, PeerEvent::InstallGameFailed { id }); - return; + events::send( + tx_notify_ui, + PeerEvent::InstallGameFailed { id: id.to_string() }, + ); + return None; } let local_present = local_dir_is_directory(&game_root).await; @@ -333,13 +424,27 @@ async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender OperationKind::Updating, }; - if !begin_operation(ctx, &id, operation_kind).await { - log::warn!("Operation for {id} already in progress; ignoring install command"); - return; - } + Some(PreparedInstallOperation { + game_root, + operation, + operation_kind, + }) +} +async fn run_started_install_operation( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, + prepared: PreparedInstallOperation, +) { + let PreparedInstallOperation { + game_root, + operation, + .. + } = prepared; + + let operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone()); let result = { - let _operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone()); events::send( tx_notify_ui, PeerEvent::InstallGameBegin { @@ -357,6 +462,8 @@ async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender { @@ -393,8 +500,8 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender { @@ -435,6 +544,39 @@ async fn begin_operation(ctx: &Ctx, id: &str, operation: OperationKind) -> bool } } +async fn transition_download_to_install(ctx: &Ctx, id: &str, operation: OperationKind) -> bool { + let mut active_operations = ctx.active_operations.write().await; + match active_operations.get_mut(id) { + Some(current) if *current == OperationKind::Downloading => { + *current = operation; + true + } + Some(current) => { + log::warn!( + "Cannot transition {id} from download to install; current operation is {current:?}" + ); + false + } + None => { + log::warn!("Cannot transition {id} from download to install; operation is not active"); + false + } + } +} + +async fn end_operation(ctx: &Ctx, id: &str) { + ctx.active_operations.write().await.remove(id); +} + +async fn clear_active_download(ctx: &Ctx, id: &str) { + ctx.active_downloads.write().await.remove(id); +} + +async fn end_download_operation(ctx: &Ctx, id: &str) { + end_operation(ctx, id).await; + clear_active_download(ctx, id).await; +} + async fn catalog_contains(ctx: &Ctx, id: &str) -> bool { ctx.catalog.read().await.contains(id) } @@ -650,11 +792,13 @@ fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind { mod tests { use std::{ collections::HashSet, + net::SocketAddr, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, Mutex}, time::Duration, }; + use lanspread_proto::{Availability, GameSummary}; use tokio::sync::mpsc; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -698,7 +842,41 @@ mod tests { .expect("event channel should remain open") } + fn addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + + fn summary(id: &str, version: &str, availability: Availability) -> GameSummary { + GameSummary { + id: id.to_string(), + name: id.to_string(), + size: 42, + downloaded: availability.is_downloaded(), + installed: true, + eti_version: Some(version.to_string()), + manifest_hash: 7, + availability, + } + } + + fn file_desc(game_id: &str, relative_path: &str, size: u64) -> GameFileDescription { + GameFileDescription { + game_id: game_id.to_string(), + relative_path: relative_path.to_string(), + is_dir: false, + size, + } + } + fn assert_local_update(event: PeerEvent, installed: bool, downloaded: bool) { + let _ = local_update_game(event, installed, downloaded); + } + + fn local_update_game( + event: PeerEvent, + installed: bool, + downloaded: bool, + ) -> lanspread_db::db::Game { let PeerEvent::LocalGamesUpdated { games, active_operations, @@ -711,11 +889,135 @@ mod tests { "settled local update should not report active operations" ); let game = games - .iter() + .into_iter() .find(|game| game.id == "game") .expect("game should be announced"); assert_eq!(game.installed, installed); assert_eq!(game.downloaded, downloaded); + game + } + + #[test] + fn update_source_selects_latest_ready_peer_manifest() { + let old_addr = addr(12_000); + let new_addr = addr(12_001); + let local_only_addr = addr(12_002); + let mut db = PeerGameDB::new(); + db.upsert_peer("old".to_string(), old_addr); + db.upsert_peer("new".to_string(), new_addr); + db.upsert_peer("local-only".to_string(), local_only_addr); + db.update_peer_games( + &"old".to_string(), + vec![summary("game", "20240101", Availability::Ready)], + ); + db.update_peer_games( + &"new".to_string(), + vec![summary("game", "20250101", Availability::Ready)], + ); + db.update_peer_games( + &"local-only".to_string(), + vec![summary("game", "20990101", Availability::LocalOnly)], + ); + + assert_eq!( + GameDetailSource::LatestPeersOnly.select_peers(&db, "game"), + vec![new_addr] + ); + } + + #[tokio::test] + async fn update_fetch_emits_fresh_manifest_from_latest_peer() { + let old_addr = addr(12_010); + let new_addr = addr(12_011); + let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); + { + let mut db = peer_game_db.write().await; + db.upsert_peer("old".to_string(), old_addr); + db.upsert_peer("new".to_string(), new_addr); + db.update_peer_games( + &"old".to_string(), + vec![summary("game", "20240101", Availability::Ready)], + ); + db.update_peer_games( + &"new".to_string(), + vec![summary("game", "20250101", Availability::Ready)], + ); + } + let peers = { + let db = peer_game_db.read().await; + GameDetailSource::LatestPeersOnly.select_peers(&db, "game") + }; + let (tx, mut rx) = mpsc::unbounded_channel(); + let fetched_peers = Arc::new(Mutex::new(Vec::new())); + + fetch_game_details_from_peers(peers, "game".to_string(), peer_game_db.clone(), tx, { + let fetched_peers = fetched_peers.clone(); + move |peer_addr, game_id, peer_game_db| { + let fetched_peers = fetched_peers.clone(); + async move { + fetched_peers + .lock() + .expect("fetched peer list should not be poisoned") + .push(peer_addr); + let files = vec![ + file_desc(&game_id, "game/version.ini", 8), + file_desc(&game_id, "game/new.eti", 11), + ]; + peer_game_db.write().await.update_peer_game_files( + &"new".to_string(), + &game_id, + files.clone(), + ); + Ok(files) + } + } + }) + .await; + + assert_eq!( + *fetched_peers + .lock() + .expect("fetched peer list should not be poisoned"), + vec![new_addr] + ); + let PeerEvent::GotGameFiles { + id, + file_descriptions, + } = recv_event(&mut rx).await + else { + panic!("expected GotGameFiles"); + }; + assert_eq!(id, "game"); + assert!( + file_descriptions + .iter() + .any(|desc| desc.relative_path == "game/new.eti" && desc.size == 11), + "latest peer manifest should be emitted to the download path" + ); + } + + #[tokio::test] + async fn update_request_skips_local_manifest_even_when_download_exists() { + let temp = TempDir::new("lanspread-handler-latest-peer"); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20240101"); + write_file(&root.join("game.eti"), b"old archive"); + + let ctx = test_ctx(temp.path().to_path_buf()); + let (tx, mut rx) = mpsc::unbounded_channel(); + + handle_get_game_command( + &ctx, + &tx, + "game".to_string(), + GameDetailSource::LatestPeersOnly, + ) + .await; + + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::NoPeersHaveGame { id } if id == "game" + )); } #[tokio::test] @@ -826,6 +1128,62 @@ mod tests { assert_local_update(recv_event(&mut rx).await, true, true); } + #[tokio::test] + async fn download_handoff_waits_for_readers_and_auto_installs() { + let temp = TempDir::new("lanspread-handler-download-handoff"); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join("game.eti"), b"archive"); + + let ctx = test_ctx(temp.path().to_path_buf()); + ctx.active_operations + .write() + .await + .insert("game".to_string(), OperationKind::Downloading); + ctx.active_downloads + .write() + .await + .insert("game".to_string(), CancellationToken::new()); + let (prepare_tx, _prepare_rx) = mpsc::unbounded_channel(); + let prepared = prepare_install_operation(&ctx, &prepare_tx, "game") + .await + .expect("downloaded game should be installable"); + let read_guard = ctx.active_operations.read().await; + let (tx, mut rx) = mpsc::unbounded_channel(); + + let install_task = tokio::spawn({ + let ctx = ctx.clone(); + let tx = tx.clone(); + async move { + assert!( + transition_download_to_install(&ctx, "game", prepared.operation_kind).await + ); + clear_active_download(&ctx, "game").await; + run_started_install_operation(&ctx, &tx, "game".to_string(), prepared).await; + } + }); + + tokio::task::yield_now().await; + assert_eq!(read_guard.get("game"), Some(&OperationKind::Downloading)); + drop(read_guard); + install_task.await.expect("handoff task should finish"); + + match recv_event(&mut rx).await { + PeerEvent::InstallGameBegin { id, operation } => { + assert_eq!(id, "game"); + assert_eq!(operation, InstallOperation::Installing); + } + _ => panic!("expected InstallGameBegin"), + } + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::InstallGameFinished { id } if id == "game" + )); + assert!(ctx.active_operations.read().await.is_empty()); + assert!(ctx.active_downloads.read().await.is_empty()); + assert_local_update(recv_event(&mut rx).await, true, true); + } + #[tokio::test] async fn update_refreshes_settled_state_after_guard_release() { let temp = TempDir::new("lanspread-handler-update"); @@ -854,6 +1212,63 @@ mod tests { assert_local_update(recv_event(&mut rx).await, true, true); } + #[tokio::test] + async fn install_update_uninstall_sequence_reports_new_version_and_settled_state() { + let temp = TempDir::new("lanspread-handler-sequence"); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20240101"); + write_file(&root.join("game.eti"), b"old archive"); + + let ctx = test_ctx(temp.path().to_path_buf()); + let (tx, mut rx) = mpsc::unbounded_channel(); + + run_install_operation(&ctx, &tx, "game".to_string()).await; + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::InstallGameBegin { + id, + operation: InstallOperation::Installing + } if id == "game" + )); + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::InstallGameFinished { id } if id == "game" + )); + let game = local_update_game(recv_event(&mut rx).await, true, true); + assert_eq!(game.local_version.as_deref(), Some("20240101")); + + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join("game.eti"), b"new archive"); + + run_install_operation(&ctx, &tx, "game".to_string()).await; + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::InstallGameBegin { + id, + operation: InstallOperation::Updating + } if id == "game" + )); + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::InstallGameFinished { id } if id == "game" + )); + let game = local_update_game(recv_event(&mut rx).await, true, true); + assert_eq!(game.local_version.as_deref(), Some("20250101")); + + run_uninstall_operation(&ctx, &tx, "game".to_string()).await; + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::UninstallGameBegin { id } if id == "game" + )); + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::UninstallGameFinished { id } if id == "game" + )); + let game = local_update_game(recv_event(&mut rx).await, false, true); + assert_eq!(game.local_version.as_deref(), Some("20250101")); + assert!(ctx.active_operations.read().await.is_empty()); + } + #[tokio::test] async fn uninstall_refreshes_settled_state_after_guard_release() { let temp = TempDir::new("lanspread-handler-uninstall"); diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 64e69ca..ef5d9af 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -53,6 +53,7 @@ pub use crate::startup::PeerRuntimeHandle; use crate::{ context::Ctx, handlers::{ + GameDetailSource, handle_download_game_files_command, handle_get_game_command, handle_get_peer_count_command, @@ -174,8 +175,10 @@ pub enum ActiveOperationKind { pub enum PeerCommand { /// Request a list of all available games. ListGames, - /// Request file details for a specific game. + /// Request file details for a specific game, serving local files when available. GetGame(String), + /// Request the latest peer-advertised file details for an update. + FetchLatestFromPeers { id: String }, /// Download game files. DownloadGameFiles { id: String, @@ -302,7 +305,12 @@ async fn handle_peer_commands( handle_list_games_command(ctx, tx_notify_ui).await; } PeerCommand::GetGame(id) => { - handle_get_game_command(ctx, tx_notify_ui, id).await; + handle_get_game_command(ctx, tx_notify_ui, id, GameDetailSource::LocalOrPeers) + .await; + } + PeerCommand::FetchLatestFromPeers { id } => { + handle_get_game_command(ctx, tx_notify_ui, id, GameDetailSource::LatestPeersOnly) + .await; } PeerCommand::DownloadGameFiles { id, diff --git a/crates/lanspread-peer/src/local_games.rs b/crates/lanspread-peer/src/local_games.rs index 7217cfd..85691e2 100644 --- a/crates/lanspread-peer/src/local_games.rs +++ b/crates/lanspread-peer/src/local_games.rs @@ -5,13 +5,14 @@ use std::{ hash::{Hash, Hasher}, io::ErrorKind, path::{Path, PathBuf}, + sync::LazyLock, time::{SystemTime, UNIX_EPOCH}, }; use lanspread_db::db::{Game, GameDB, GameFileDescription}; use lanspread_proto::{Availability, GameSummary}; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncWriteExt; +use tokio::{io::AsyncWriteExt, sync::Mutex}; use crate::{context::OperationKind, error::PeerError}; @@ -76,6 +77,8 @@ const INTENT_LOG_FILE: &str = ".lanspread.json"; const VERSION_TMP_FILE: &str = ".version.ini.tmp"; const VERSION_DISCARDED_FILE: &str = ".version.ini.discarded"; +static LIBRARY_INDEX_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + #[derive(Debug, Clone, Serialize, Deserialize)] struct LibraryIndex { revision: u64, @@ -92,6 +95,8 @@ struct GameIndexEntry { struct GameFingerprint { eti_files: Vec, version_mtime: Option, + #[serde(default)] + version_contents: Option, local_dir_present: bool, } @@ -245,9 +250,21 @@ async fn fingerprint_game_dir(game_path: &Path) -> eyre::Result let eti_files = root_eti_fingerprints(game_path).await?; let version_path = game_path.join("version.ini"); - let version_mtime = match tokio::fs::metadata(&version_path).await { - Ok(metadata) if metadata.is_file() => metadata.modified().ok().map(system_time_to_secs), - Err(_) | Ok(_) => None, + let (version_mtime, version_contents) = match tokio::fs::metadata(&version_path).await { + Ok(metadata) if metadata.is_file() => { + let contents = match tokio::fs::read_to_string(&version_path).await { + Ok(contents) => Some(contents.trim().to_string()), + Err(err) => { + log::warn!( + "Failed to read {} for fingerprinting: {err}", + version_path.display() + ); + None + } + }; + (metadata.modified().ok().map(system_time_to_secs), contents) + } + Err(_) | Ok(_) => (None, None), }; let local_dir_present = local_dir_is_directory(game_path).await; @@ -255,6 +272,7 @@ async fn fingerprint_game_dir(game_path: &Path) -> eyre::Result Ok(GameFingerprint { eti_files, version_mtime, + version_contents, local_dir_present, }) } @@ -558,6 +576,7 @@ pub async fn scan_local_library( return Ok(empty_scan()); } + let _index_guard = LIBRARY_INDEX_LOCK.lock().await; let index_path = library_index_path(game_path); let mut index = load_library_index(&index_path).await; let mut seen_ids = HashSet::new(); @@ -621,6 +640,7 @@ pub async fn rescan_local_game( game_id: &str, ) -> eyre::Result { let game_path = game_dir.as_ref(); + let _index_guard = LIBRARY_INDEX_LOCK.lock().await; let index_path = library_index_path(game_path); let mut index = load_library_index(&index_path).await; @@ -688,6 +708,7 @@ mod tests { fingerprint: GameFingerprint { eti_files: Vec::new(), version_mtime: Some(manifest_hash), + version_contents: Some("20250101".to_string()), local_dir_present: false, }, }, @@ -827,6 +848,48 @@ mod tests { assert_eq!(ready.availability, Availability::Ready); } + #[tokio::test] + async fn concurrent_rescans_preserve_both_index_updates() { + let temp = TempDir::new("lanspread-local-games-concurrent"); + let catalog = HashSet::from(["game-a".to_string(), "game-b".to_string()]); + write_file(&temp.path().join("game-a").join("version.ini"), b"20250101"); + write_file(&temp.path().join("game-b").join("version.ini"), b"20250101"); + + let initial = scan_local_library(temp.path(), &catalog) + .await + .expect("initial scan should succeed"); + assert_eq!(initial.revision, 1); + + write_file(&temp.path().join("game-a").join("game-a.eti"), b"archive-a"); + write_file(&temp.path().join("game-b").join("game-b.eti"), b"archive-b"); + + let (scan_a, scan_b) = tokio::join!( + rescan_local_game(temp.path(), &catalog, "game-a"), + rescan_local_game(temp.path(), &catalog, "game-b") + ); + scan_a.expect("game-a rescan should succeed"); + scan_b.expect("game-b rescan should succeed"); + + let index = load_library_index(&library_index_path(temp.path())).await; + assert_eq!(index.revision, 3); + let game_a = index + .games + .get("game-a") + .expect("game-a update should remain in index"); + let game_b = index + .games + .get("game-b") + .expect("game-b update should remain in index"); + assert!( + game_a.summary.size > 8, + "game-a rescan should persist the new archive" + ); + assert!( + game_b.summary.size > 8, + "game-b rescan should persist the new archive" + ); + } + #[tokio::test] async fn local_download_available_gates_on_catalog_operation_and_sentinel() { let temp = TempDir::new("lanspread-local-games"); diff --git a/crates/lanspread-peer/src/peer_db.rs b/crates/lanspread-peer/src/peer_db.rs index 0c71b42..a93e427 100644 --- a/crates/lanspread-peer/src/peer_db.rs +++ b/crates/lanspread-peer/src/peer_db.rs @@ -431,11 +431,28 @@ impl PeerGameDB { .collect() } + /// Returns file descriptions from peers that advertise the latest game version. + #[must_use] + pub fn latest_game_files_for( + &self, + game_id: &str, + ) -> Vec<(SocketAddr, Vec)> { + let latest_peers = self.peers_with_latest_version(game_id); + if latest_peers.is_empty() { + return Vec::new(); + } + + self.game_files_for(game_id) + .into_iter() + .filter(|(addr, _)| latest_peers.contains(addr)) + .collect() + } + /// Returns aggregated file descriptions for a game across all peers. #[must_use] pub fn aggregated_game_files(&self, game_id: &str) -> Vec { let mut seen: HashMap = HashMap::new(); - for (_, files) in self.game_files_for(game_id) { + for (_, files) in self.latest_game_files_for(game_id) { for file in files { seen.entry(file.relative_path.clone()).or_insert(file); } @@ -477,7 +494,7 @@ impl PeerGameDB { &self, game_id: &str, ) -> eyre::Result { - let game_files = self.game_files_for(game_id); + let game_files = self.latest_game_files_for(game_id); if game_files.is_empty() { return Ok((Vec::new(), Vec::new(), HashMap::new())); } @@ -777,6 +794,15 @@ mod tests { } } + fn file_desc(game_id: &str, relative_path: &str, size: u64) -> GameFileDescription { + GameFileDescription { + game_id: game_id.to_string(), + relative_path: relative_path.to_string(), + is_dir: false, + size, + } + } + #[test] fn aggregation_counts_only_ready_peers_as_download_sources() { let ready_addr = addr(12000); @@ -828,4 +854,55 @@ mod tests { assert_eq!(db.get_latest_version_for_game("game"), None); assert!(db.peers_with_latest_version("game").is_empty()); } + + #[test] + fn validation_uses_latest_version_file_metadata() { + let old_addr = addr(12003); + let new_addr = addr(12004); + let mut db = PeerGameDB::new(); + db.upsert_peer("old".to_string(), old_addr); + db.upsert_peer("new".to_string(), new_addr); + db.update_peer_games( + &"old".to_string(), + vec![summary("game", "20240101", Availability::Ready)], + ); + db.update_peer_games( + &"new".to_string(), + vec![summary("game", "20250101", Availability::Ready)], + ); + db.update_peer_game_files( + &"old".to_string(), + "game", + vec![ + file_desc("game", "game/version.ini", 8), + file_desc("game", "game/archive.eti", 10), + ], + ); + db.update_peer_game_files( + &"new".to_string(), + "game", + vec![ + file_desc("game", "game/version.ini", 8), + file_desc("game", "game/archive.eti", 20), + ], + ); + + let aggregated = db.aggregated_game_files("game"); + let archive = aggregated + .iter() + .find(|desc| desc.relative_path == "game/archive.eti") + .expect("latest archive should be present"); + assert_eq!(archive.size, 20); + + let (validated, peers, file_peer_map) = db + .validate_file_sizes_majority("game") + .expect("old-version file metadata should not create ambiguity"); + assert_eq!(peers, vec![new_addr]); + let archive = validated + .iter() + .find(|desc| desc.relative_path == "game/archive.eti") + .expect("latest archive should validate"); + assert_eq!(archive.size, 20); + assert_eq!(file_peer_map.get("game/archive.eti"), Some(&vec![new_addr])); + } } diff --git a/crates/lanspread-peer/src/services/liveness.rs b/crates/lanspread-peer/src/services/liveness.rs index a52977c..408ff1e 100644 --- a/crates/lanspread-peer/src/services/liveness.rs +++ b/crates/lanspread-peer/src/services/liveness.rs @@ -271,4 +271,56 @@ mod tests { "peers-gone cancellation must not emit a duplicate failure event" ); } + + #[tokio::test] + async fn all_peers_gone_cancels_multiple_downloads_without_stuck_entries() { + let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); + let first_cancel = CancellationToken::new(); + let second_cancel = CancellationToken::new(); + let active_operations = Arc::new(RwLock::new(HashMap::from([ + ("first".to_string(), OperationKind::Downloading), + ("second".to_string(), OperationKind::Downloading), + ("installing".to_string(), OperationKind::Installing), + ]))); + let active_downloads = Arc::new(RwLock::new(HashMap::from([ + ("first".to_string(), first_cancel.clone()), + ("second".to_string(), second_cancel.clone()), + ]))); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + handle_active_downloads_without_peers( + &peer_game_db, + &active_operations, + &active_downloads, + &tx, + ) + .await; + + assert!(first_cancel.is_cancelled()); + assert!(second_cancel.is_cancelled()); + let operations = active_operations.read().await; + assert!(!operations.contains_key("first")); + assert!(!operations.contains_key("second")); + assert_eq!( + operations.get("installing"), + Some(&OperationKind::Installing) + ); + drop(operations); + assert!(active_downloads.read().await.is_empty()); + + let mut cancelled_ids = Vec::new(); + for _ in 0..2 { + let event = rx.recv().await.expect("peers-gone event should be emitted"); + let PeerEvent::DownloadGameFilesAllPeersGone { id } = event else { + panic!("expected peers-gone event"); + }; + cancelled_ids.push(id); + } + cancelled_ids.sort(); + assert_eq!(cancelled_ids, vec!["first", "second"]); + assert!( + rx.try_recv().is_err(), + "multiple peers-gone cancellations must not emit duplicate failure events" + ); + } } diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index 864f557..d54a53b 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -159,7 +159,7 @@ async fn update_game(id: String, state: tauri::State<'_, LanSpreadState>) -> tau let peer_ctrl = peer_ctrl_arc.read().await.clone(); if let Some(peer_ctrl) = peer_ctrl { - if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id)) { + if let Err(e) = peer_ctrl.send(PeerCommand::FetchLatestFromPeers { id }) { log::error!("Failed to send message to peer: {e:?}"); return Ok(false); }