//! Command handlers for peer commands. use std::{ collections::{HashSet, hash_map::Entry}, future::Future, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, }; use lanspread_db::db::{GameDB, GameFileDescription}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ ActiveOperation, ActiveOperationKind, InstallOperation, PeerEvent, context::{Ctx, OperationGuard, OperationKind}, download::download_game_files, events, install, local_games::{ LocalLibraryScan, game_from_summary, get_game_file_descriptions, local_dir_is_directory, local_download_available, rescan_local_game, scan_local_library, version_ini_is_regular_file, }, network::{request_game_details_from_peer, send_library_delta}, peer_db::PeerGameDB, remote_peer::ensure_peer_id_for_addr, services::perform_handshake_with_peer, }; // ============================================================================= // Command handlers // ============================================================================= /// Handles the `ListGames` command. pub async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { log::info!("ListGames command received"); events::emit_peer_game_list(&ctx.peer_game_db, tx_notify_ui).await; } /// Tries to serve a game from local files. async fn try_serve_local_game( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: &str, ) -> bool { let game_dir = { ctx.game_dir.read().await.clone() }; let active_operations = ctx.active_operations.read().await; let catalog = ctx.catalog.read().await; if !local_download_available(&game_dir, id, &active_operations, &catalog).await { return false; } drop(active_operations); drop(catalog); match get_game_file_descriptions(id, &game_dir).await { Ok(file_descriptions) => { log::info!("Serving game {id} from local files"); if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { id: id.to_string(), file_descriptions, }) { log::error!("Failed to send GotGameFiles event: {e}"); } true } Err(e) => { log::error!("Failed to enumerate local file descriptions for {id}: {e}"); false } } } /// Handles the `GetGame` command. pub(crate) async fn handle_get_game_command( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String, source: GameDetailSource, ) { if source.allows_local() && try_serve_local_game(ctx, tx_notify_ui, &id).await { return; } log::info!("Requesting game from peers: {id}"); let peers = { let peer_game_db = ctx.peer_game_db.read().await; source.select_peers(&peer_game_db, &id) }; if peers.is_empty() { log::warn!("No peers have game {id}"); if let Err(e) = tx_notify_ui.send(PeerEvent::NoPeersHaveGame { id: id.clone() }) { log::error!("Failed to send NoPeersHaveGame event: {e}"); } return; } let peer_game_db = ctx.peer_game_db.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.spawn(fetch_game_details_from_peers( peers, id, peer_game_db, tx_notify_ui, |peer_addr, game_id, peer_game_db| async move { request_game_details_and_update(peer_addr, &game_id, peer_game_db).await }, )); } #[derive(Clone, Copy, Debug)] pub(crate) enum GameDetailSource { LocalOrPeers, LatestPeersOnly, } impl GameDetailSource { fn allows_local(self) -> bool { matches!(self, Self::LocalOrPeers) } fn select_peers(self, peer_game_db: &PeerGameDB, id: &str) -> Vec { match self { Self::LocalOrPeers => peer_game_db.peers_with_game(id), Self::LatestPeersOnly => peer_game_db.peers_with_latest_version(id), } } } /// Requests game details from a peer and updates the peer game database. async fn request_game_details_and_update( peer_addr: SocketAddr, game_id: &str, peer_game_db: Arc>, ) -> eyre::Result> { let (file_descriptions, _) = request_game_details_from_peer(peer_addr, game_id).await?; let peer_id = ensure_peer_id_for_addr(&peer_game_db, peer_addr).await; { let mut db = peer_game_db.write().await; db.update_peer_game_files(&peer_id, game_id, file_descriptions.clone()); } Ok(file_descriptions) } async fn fetch_game_details_from_peers( peers: Vec, id: String, peer_game_db: Arc>, tx_notify_ui: UnboundedSender, mut fetch_details: F, ) where F: FnMut(SocketAddr, String, Arc>) -> Fut + Send + 'static, Fut: Future>> + Send, { let mut fetched_any = false; for peer_addr in peers { match fetch_details(peer_addr, id.clone(), peer_game_db.clone()).await { Ok(_) => { log::info!("Fetched game file list for {id} from peer {peer_addr}"); fetched_any = true; } Err(e) => { log::error!("Failed to fetch game files for {id} from {peer_addr}: {e}"); } } } if fetched_any { let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) }; if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { id: id.clone(), file_descriptions: aggregated_files, }) { log::error!("Failed to send GotGameFiles event: {e}"); } } else { log::warn!("Failed to retrieve game files for {id} from any peer"); } } /// Handles the `DownloadGameFiles` command. #[allow(clippy::too_many_lines)] pub async fn handle_download_game_files_command( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String, file_descriptions: Vec, install_after_download: bool, ) { log::info!("Got PeerCommand::DownloadGameFiles"); let games_folder = { ctx.game_dir.read().await.clone() }; // Use majority validation to get trusted file descriptions and peer whitelist let (validated_descriptions, peer_whitelist, file_peer_map) = { match ctx .peer_game_db .read() .await .validate_file_sizes_majority(&id) { Ok((files, peers, file_peer_map)) => { log::info!( "Majority validation: {} validated files, {} trusted peers for game {id}", files.len(), peers.len() ); (files, peers, file_peer_map) } Err(e) => { log::error!("File size majority validation failed for {id}: {e}"); if let Err(send_err) = tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() }) { log::error!("Failed to send DownloadGameFilesFailed event: {send_err}"); } return; } } }; let resolved_descriptions = if file_descriptions.is_empty() { validated_descriptions } else { // If user provided specific descriptions, still validate them against majority // but keep user's selection (they might want specific files) file_descriptions }; if resolved_descriptions.is_empty() { log::error!( "No validated file descriptions available to download game {id}; request metadata first" ); return; } let local_dl_available = { let active_operations = ctx.active_operations.read().await; let catalog = ctx.catalog.read().await; local_download_available(&games_folder, &id, &active_operations, &catalog).await }; if peer_whitelist.is_empty() { if local_dl_available { log::info!("Using locally downloaded files for game {id}; skipping peer transfer"); if let Err(e) = tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { id: id.clone() }) { log::error!("Failed to send DownloadGameFilesBegin event: {e}"); } if let Err(e) = tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished { id: id.clone() }) { log::error!("Failed to send DownloadGameFilesFinished event: {e}"); } if install_after_download { spawn_install_operation(ctx, tx_notify_ui, id.clone()); } } else { log::error!("No trusted peers available after majority validation for game {id}"); } return; } { let mut in_progress = ctx.active_operations.write().await; match in_progress.entry(id.clone()) { Entry::Vacant(entry) => { entry.insert(OperationKind::Downloading); } Entry::Occupied(_) => { log::warn!("Operation for {id} already in progress; ignoring new download request"); return; } } } let active_operations = ctx.active_operations.clone(); let active_downloads = ctx.active_downloads.clone(); let tx_notify_ui_clone = tx_notify_ui.clone(); let download_id = id.clone(); let cancel_token = ctx.shutdown.child_token(); let ctx_clone = ctx.clone(); ctx.active_downloads .write() .await .insert(id, cancel_token.clone()); ctx.task_tracker.spawn(async move { let download_state_guard = OperationGuard::download(download_id.clone(), active_operations, active_downloads); let result = download_game_files( &download_id, resolved_descriptions, games_folder, peer_whitelist, file_peer_map, tx_notify_ui_clone.clone(), cancel_token, ) .await; match result { Ok(()) => { let Some(prepared) = prepare_install_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await else { end_download_operation(&ctx_clone, &download_id).await; download_state_guard.disarm(); return; }; if install_after_download { if transition_download_to_install( &ctx_clone, &download_id, prepared.operation_kind, ) .await { clear_active_download(&ctx_clone, &download_id).await; run_started_install_operation( &ctx_clone, &tx_notify_ui_clone, download_id, prepared, ) .await; } else { clear_active_download(&ctx_clone, &download_id).await; } } else { end_download_operation(&ctx_clone, &download_id).await; if let Err(err) = refresh_local_game(&ctx_clone, &tx_notify_ui_clone, &download_id).await { log::error!("Failed to refresh local library after download: {err}"); } } download_state_guard.disarm(); } Err(e) => { end_download_operation(&ctx_clone, &download_id).await; download_state_guard.disarm(); log::error!("Download failed for {download_id}: {e}"); } } }); } /// Handles the `InstallGame` command. pub async fn handle_install_game_command( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String, ) { spawn_install_operation(ctx, tx_notify_ui, id); } /// Handles the `UninstallGame` command. pub async fn handle_uninstall_game_command( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String, ) { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.clone().spawn(async move { run_uninstall_operation(&ctx, &tx_notify_ui, id).await; }); } fn spawn_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.clone().spawn(async move { run_install_operation(&ctx, &tx_notify_ui, id).await; }); } async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { let Some(prepared) = prepare_install_operation(ctx, tx_notify_ui, &id).await else { return; }; if !begin_operation(ctx, &id, prepared.operation_kind).await { log::warn!("Operation for {id} already in progress; ignoring install command"); return; } run_started_install_operation(ctx, tx_notify_ui, id, prepared).await; } struct PreparedInstallOperation { game_root: PathBuf, operation: InstallOperation, operation_kind: OperationKind, } async fn prepare_install_operation( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: &str, ) -> Option { if !catalog_contains(ctx, id).await { log::warn!("Ignoring install command for non-catalog game {id}"); return None; } let game_root = { ctx.game_dir.read().await.join(id) }; if !version_ini_is_regular_file(&game_root).await { log::warn!("Ignoring install command for {id}: version.ini sentinel is absent"); events::send( tx_notify_ui, PeerEvent::InstallGameFailed { id: id.to_string() }, ); return None; } let local_present = local_dir_is_directory(&game_root).await; let operation = if local_present { InstallOperation::Updating } else { InstallOperation::Installing }; let operation_kind = match operation { InstallOperation::Installing => OperationKind::Installing, InstallOperation::Updating => OperationKind::Updating, }; Some(PreparedInstallOperation { game_root, operation, operation_kind, }) } async fn run_started_install_operation( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String, prepared: PreparedInstallOperation, ) { let PreparedInstallOperation { game_root, operation, .. } = prepared; let operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone()); let result = { events::send( tx_notify_ui, PeerEvent::InstallGameBegin { id: id.clone(), operation, }, ); match operation { InstallOperation::Installing => { install::install(&game_root, &id, ctx.unpacker.clone()).await } InstallOperation::Updating => { install::update(&game_root, &id, ctx.unpacker.clone()).await } } }; end_operation(ctx, &id).await; operation_guard.disarm(); match result { Ok(()) => { events::send( tx_notify_ui, PeerEvent::InstallGameFinished { id: id.clone() }, ); if let Err(err) = refresh_local_game(ctx, tx_notify_ui, &id).await { log::error!("Failed to refresh local library after install: {err}"); } } Err(err) => { log::error!("Install operation failed for {id}: {err}"); events::send( tx_notify_ui, PeerEvent::InstallGameFailed { id: id.clone() }, ); if let Err(refresh_err) = refresh_local_game(ctx, tx_notify_ui, &id).await { log::error!("Failed to refresh local library after install failure: {refresh_err}"); } } } } async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { if !catalog_contains(ctx, &id).await { log::warn!("Ignoring uninstall command for non-catalog game {id}"); return; } if !begin_operation(ctx, &id, OperationKind::Uninstalling).await { log::warn!("Operation for {id} already in progress; ignoring uninstall command"); return; } let game_root = { ctx.game_dir.read().await.join(&id) }; let operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone()); let result = { events::send( tx_notify_ui, PeerEvent::UninstallGameBegin { id: id.clone() }, ); install::uninstall(&game_root, &id).await }; end_operation(ctx, &id).await; operation_guard.disarm(); match result { Ok(()) => { events::send( tx_notify_ui, PeerEvent::UninstallGameFinished { id: id.clone() }, ); } Err(err) => { log::error!("Uninstall operation failed for {id}: {err}"); events::send( tx_notify_ui, PeerEvent::UninstallGameFailed { id: id.clone() }, ); } } if let Err(err) = refresh_local_game(ctx, tx_notify_ui, &id).await { log::error!("Failed to refresh local library after uninstall: {err}"); } } async fn begin_operation(ctx: &Ctx, id: &str, operation: OperationKind) -> bool { let mut active_operations = ctx.active_operations.write().await; match active_operations.entry(id.to_string()) { Entry::Vacant(entry) => { entry.insert(operation); true } Entry::Occupied(_) => false, } } async fn transition_download_to_install(ctx: &Ctx, id: &str, operation: OperationKind) -> bool { let mut active_operations = ctx.active_operations.write().await; match active_operations.get_mut(id) { Some(current) if *current == OperationKind::Downloading => { *current = operation; true } Some(current) => { log::warn!( "Cannot transition {id} from download to install; current operation is {current:?}" ); false } None => { log::warn!("Cannot transition {id} from download to install; operation is not active"); false } } } async fn end_operation(ctx: &Ctx, id: &str) { ctx.active_operations.write().await.remove(id); } async fn clear_active_download(ctx: &Ctx, id: &str) { ctx.active_downloads.write().await.remove(id); } async fn end_download_operation(ctx: &Ctx, id: &str) { end_operation(ctx, id).await; clear_active_download(ctx, id).await; } async fn catalog_contains(ctx: &Ctx, id: &str) -> bool { ctx.catalog.read().await.contains(id) } /// Handles the `SetGameDir` command. pub async fn handle_set_game_dir_command( ctx: &Ctx, tx_notify_ui: &UnboundedSender, game_dir: PathBuf, ) { let current_game_dir = ctx.game_dir.read().await.clone(); if current_game_dir == game_dir { log::info!( "Game directory {} unchanged; refreshing without recovery", game_dir.display() ); let tx_notify_ui = tx_notify_ui.clone(); let ctx_clone = ctx.clone(); ctx.task_tracker.spawn(async move { if let Err(err) = refresh_local_library(&ctx_clone, &tx_notify_ui).await { log::error!("Failed to refresh local game database: {err}"); } }); return; } let active_ids = active_operation_ids(ctx).await; if !active_ids.is_empty() { log::warn!( "Rejecting game directory change to {} while operations are active for: {}", game_dir.display(), active_ids.into_iter().collect::>().join(", ") ); return; } *ctx.game_dir.write().await = game_dir.clone(); log::info!("Game directory set to: {}", game_dir.display()); let tx_notify_ui = tx_notify_ui.clone(); let ctx_clone = ctx.clone(); ctx.task_tracker.spawn(async move { match load_local_library(&ctx_clone, &tx_notify_ui).await { Ok(()) => log::info!("Local game database loaded successfully"), Err(e) => { log::error!("Failed to load local game database: {e}"); } } }); } /// Loads the configured local library and announces the result. pub async fn load_local_library( ctx: &Ctx, tx_notify_ui: &UnboundedSender, ) -> eyre::Result<()> { let game_dir = { ctx.game_dir.read().await.clone() }; let active_ids = active_operation_ids(ctx).await; install::recover_on_startup(&game_dir, &active_ids).await?; scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir).await } async fn refresh_local_library( ctx: &Ctx, tx_notify_ui: &UnboundedSender, ) -> eyre::Result<()> { let game_dir = { ctx.game_dir.read().await.clone() }; scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir).await } async fn scan_and_announce_local_library( ctx: &Ctx, tx_notify_ui: &UnboundedSender, game_dir: &Path, ) -> eyre::Result<()> { let catalog = ctx.catalog.read().await.clone(); let scan = scan_local_library(game_dir, &catalog).await?; update_and_announce_games(ctx, tx_notify_ui, scan).await; Ok(()) } async fn refresh_local_game( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: &str, ) -> eyre::Result<()> { let game_dir = { ctx.game_dir.read().await.clone() }; let catalog = ctx.catalog.read().await.clone(); let scan = rescan_local_game(&game_dir, &catalog, id).await?; update_and_announce_games(ctx, tx_notify_ui, scan).await; Ok(()) } async fn active_operation_ids(ctx: &Ctx) -> HashSet { ctx.active_operations.read().await.keys().cloned().collect() } /// Handles the `GetPeerCount` command. pub async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { log::info!("GetPeerCount command received"); events::emit_peer_count(&ctx.peer_game_db, tx_notify_ui).await; } /// Connects to a peer directly, bypassing mDNS discovery. pub async fn handle_connect_peer_command( ctx: &Ctx, tx_notify_ui: &UnboundedSender, addr: SocketAddr, ) { log::info!("Direct connect command received for {addr}"); let peer_id = ctx.peer_id.clone(); let local_peer_addr = ctx.local_peer_addr.clone(); let local_library = ctx.local_library.clone(); let peer_game_db = ctx.peer_game_db.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.spawn(async move { if let Err(err) = perform_handshake_with_peer( peer_id, local_peer_addr, local_library, peer_game_db, tx_notify_ui, addr, None, ) .await { log::warn!("Failed direct connect to {addr}: {err}"); } }); } // ============================================================================= // Game announcement helpers // ============================================================================= /// Updates the local game database and announces changes to peers. pub async fn update_and_announce_games( ctx: &Ctx, tx_notify_ui: &UnboundedSender, scan: LocalLibraryScan, ) { let LocalLibraryScan { mut game_db, mut summaries, revision, } = scan; let active_operations = active_operation_snapshot(ctx).await; if !active_operations.is_empty() { let previous = ctx.local_library.read().await.games.clone(); for id in active_operations.iter().map(|operation| &operation.id) { if let Some(summary) = previous.get(id) { summaries.insert(id.clone(), summary.clone()); } else { summaries.remove(id); } } game_db = GameDB::from(summaries.values().map(game_from_summary).collect()); } let delta = { let mut library_guard = ctx.local_library.write().await; library_guard.update_from_scan(summaries, revision) }; { let mut db_guard = ctx.local_game_db.write().await; *db_guard = Some(game_db.clone()); } let all_games = game_db.all_games().into_iter().cloned().collect::>(); if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated { games: all_games.clone(), active_operations, }) { log::error!("Failed to send LocalGamesUpdated event: {e}"); } let Some(delta) = delta else { return; }; let peer_targets = { let db = ctx.peer_game_db.read().await; db.peer_identities() .into_iter() .map(|(_peer_id, addr)| addr) .collect::>() }; for peer_addr in peer_targets { let delta = delta.clone(); let peer_id = ctx.peer_id.as_ref().clone(); ctx.task_tracker.spawn(async move { if let Err(e) = send_library_delta(peer_addr, &peer_id, delta).await { log::warn!("Failed to send library delta to {peer_addr}: {e}"); } }); } } async fn active_operation_snapshot(ctx: &Ctx) -> Vec { let active_operations = ctx.active_operations.read().await; let mut snapshot = active_operations .iter() .map(|(id, operation)| ActiveOperation { id: id.clone(), operation: active_operation_kind(*operation), }) .collect::>(); snapshot.sort_by(|left, right| left.id.cmp(&right.id)); snapshot } fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind { match operation { OperationKind::Downloading => ActiveOperationKind::Downloading, OperationKind::Installing => ActiveOperationKind::Installing, OperationKind::Updating => ActiveOperationKind::Updating, OperationKind::Uninstalling => ActiveOperationKind::Uninstalling, } } #[cfg(test)] mod tests { use std::{ collections::HashSet, net::SocketAddr, path::{Path, PathBuf}, sync::{Arc, Mutex}, time::Duration, }; use lanspread_proto::{Availability, GameSummary}; use tokio::sync::mpsc; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use super::*; use crate::{UnpackFuture, Unpacker, test_support::TempDir}; struct FakeUnpacker; impl Unpacker for FakeUnpacker { fn unpack<'a>(&'a self, _archive: &'a Path, dest: &'a Path) -> UnpackFuture<'a> { Box::pin(async move { tokio::fs::write(dest.join("payload.txt"), b"installed").await?; Ok(()) }) } } fn write_file(path: &Path, bytes: &[u8]) { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).expect("parent dir should be created"); } std::fs::write(path, bytes).expect("file should be written"); } fn test_ctx(game_dir: PathBuf) -> Ctx { Ctx::new( Arc::new(RwLock::new(PeerGameDB::new())), "peer".to_string(), game_dir, Arc::new(FakeUnpacker), CancellationToken::new(), TaskTracker::new(), Arc::new(RwLock::new(HashSet::from(["game".to_string()]))), ) } async fn recv_event(rx: &mut mpsc::UnboundedReceiver) -> PeerEvent { tokio::time::timeout(Duration::from_secs(1), rx.recv()) .await .expect("event should arrive") .expect("event channel should remain open") } fn addr(port: u16) -> SocketAddr { SocketAddr::from(([127, 0, 0, 1], port)) } fn summary(id: &str, version: &str, availability: Availability) -> GameSummary { GameSummary { id: id.to_string(), name: id.to_string(), size: 42, downloaded: availability == Availability::Ready, installed: true, eti_version: Some(version.to_string()), manifest_hash: 7, availability, } } fn file_desc(game_id: &str, relative_path: &str, size: u64) -> GameFileDescription { GameFileDescription { game_id: game_id.to_string(), relative_path: relative_path.to_string(), is_dir: false, size, } } fn assert_local_update(event: PeerEvent, installed: bool, downloaded: bool) { let _ = local_update_game(event, installed, downloaded); } fn local_update_game( event: PeerEvent, installed: bool, downloaded: bool, ) -> lanspread_db::db::Game { let PeerEvent::LocalGamesUpdated { games, active_operations, } = event else { panic!("expected LocalGamesUpdated"); }; assert!( active_operations.is_empty(), "settled local update should not report active operations" ); let game = games .into_iter() .find(|game| game.id == "game") .expect("game should be announced"); assert_eq!(game.installed, installed); assert_eq!(game.downloaded, downloaded); game } #[test] fn update_source_selects_latest_ready_peer_manifest() { let old_addr = addr(12_000); let new_addr = addr(12_001); let local_only_addr = addr(12_002); let mut db = PeerGameDB::new(); db.upsert_peer("old".to_string(), old_addr); db.upsert_peer("new".to_string(), new_addr); db.upsert_peer("local-only".to_string(), local_only_addr); db.update_peer_games( &"old".to_string(), vec![summary("game", "20240101", Availability::Ready)], ); db.update_peer_games( &"new".to_string(), vec![summary("game", "20250101", Availability::Ready)], ); db.update_peer_games( &"local-only".to_string(), vec![summary("game", "20990101", Availability::LocalOnly)], ); assert_eq!( GameDetailSource::LatestPeersOnly.select_peers(&db, "game"), vec![new_addr] ); } #[tokio::test] async fn update_fetch_emits_fresh_manifest_from_latest_peer() { let old_addr = addr(12_010); let new_addr = addr(12_011); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); { let mut db = peer_game_db.write().await; db.upsert_peer("old".to_string(), old_addr); db.upsert_peer("new".to_string(), new_addr); db.update_peer_games( &"old".to_string(), vec![summary("game", "20240101", Availability::Ready)], ); db.update_peer_games( &"new".to_string(), vec![summary("game", "20250101", Availability::Ready)], ); } let peers = { let db = peer_game_db.read().await; GameDetailSource::LatestPeersOnly.select_peers(&db, "game") }; let (tx, mut rx) = mpsc::unbounded_channel(); let fetched_peers = Arc::new(Mutex::new(Vec::new())); fetch_game_details_from_peers(peers, "game".to_string(), peer_game_db.clone(), tx, { let fetched_peers = fetched_peers.clone(); move |peer_addr, game_id, peer_game_db| { let fetched_peers = fetched_peers.clone(); async move { fetched_peers .lock() .expect("fetched peer list should not be poisoned") .push(peer_addr); let files = vec![ file_desc(&game_id, "game/version.ini", 8), file_desc(&game_id, "game/new.eti", 11), ]; peer_game_db.write().await.update_peer_game_files( &"new".to_string(), &game_id, files.clone(), ); Ok(files) } } }) .await; assert_eq!( *fetched_peers .lock() .expect("fetched peer list should not be poisoned"), vec![new_addr] ); let PeerEvent::GotGameFiles { id, file_descriptions, } = recv_event(&mut rx).await else { panic!("expected GotGameFiles"); }; assert_eq!(id, "game"); assert!( file_descriptions .iter() .any(|desc| desc.relative_path == "game/new.eti" && desc.size == 11), "latest peer manifest should be emitted to the download path" ); } #[tokio::test] async fn update_request_skips_local_manifest_even_when_download_exists() { let temp = TempDir::new("lanspread-handler-latest-peer"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20240101"); write_file(&root.join("game.eti"), b"old archive"); let ctx = test_ctx(temp.path().to_path_buf()); let (tx, mut rx) = mpsc::unbounded_channel(); handle_get_game_command( &ctx, &tx, "game".to_string(), GameDetailSource::LatestPeersOnly, ) .await; assert!(matches!( recv_event(&mut rx).await, PeerEvent::NoPeersHaveGame { id } if id == "game" )); } #[tokio::test] async fn local_games_update_reports_authoritative_active_operations() { let temp = TempDir::new("lanspread-handler-active-snapshot"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); let ctx = test_ctx(temp.path().to_path_buf()); ctx.active_operations .write() .await .insert("game".to_string(), OperationKind::Installing); let (tx, mut rx) = mpsc::unbounded_channel(); let catalog = ctx.catalog.read().await.clone(); let scan = scan_local_library(temp.path(), &catalog) .await .expect("scan should succeed"); update_and_announce_games(&ctx, &tx, scan).await; let PeerEvent::LocalGamesUpdated { games, active_operations, } = recv_event(&mut rx).await else { panic!("expected LocalGamesUpdated"); }; assert!( games.is_empty(), "active game should keep its previous announced state" ); assert_eq!( active_operations, vec![ActiveOperation { id: "game".to_string(), operation: ActiveOperationKind::Installing, }] ); } #[tokio::test] async fn unchanged_scan_still_reports_active_operation_snapshot() { let temp = TempDir::new("lanspread-handler-active-unchanged"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); let ctx = test_ctx(temp.path().to_path_buf()); let (tx, mut rx) = mpsc::unbounded_channel(); let catalog = ctx.catalog.read().await.clone(); let scan = scan_local_library(temp.path(), &catalog) .await .expect("first scan should succeed"); update_and_announce_games(&ctx, &tx, scan).await; assert_local_update(recv_event(&mut rx).await, false, true); ctx.active_operations .write() .await .insert("game".to_string(), OperationKind::Updating); let scan = scan_local_library(temp.path(), &catalog) .await .expect("second scan should succeed"); update_and_announce_games(&ctx, &tx, scan).await; let PeerEvent::LocalGamesUpdated { active_operations, .. } = recv_event(&mut rx).await else { panic!("expected LocalGamesUpdated"); }; assert_eq!( active_operations, vec![ActiveOperation { id: "game".to_string(), operation: ActiveOperationKind::Updating, }] ); } #[tokio::test] async fn install_refreshes_settled_state_after_guard_release() { let temp = TempDir::new("lanspread-handler-install"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); let ctx = test_ctx(temp.path().to_path_buf()); let (tx, mut rx) = mpsc::unbounded_channel(); run_install_operation(&ctx, &tx, "game".to_string()).await; match recv_event(&mut rx).await { PeerEvent::InstallGameBegin { id, operation } => { assert_eq!(id, "game"); assert_eq!(operation, InstallOperation::Installing); } _ => panic!("expected InstallGameBegin"), } assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" )); assert!(ctx.active_operations.read().await.is_empty()); assert_local_update(recv_event(&mut rx).await, true, true); } #[tokio::test] async fn download_handoff_waits_for_readers_and_auto_installs() { let temp = TempDir::new("lanspread-handler-download-handoff"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); let ctx = test_ctx(temp.path().to_path_buf()); ctx.active_operations .write() .await .insert("game".to_string(), OperationKind::Downloading); ctx.active_downloads .write() .await .insert("game".to_string(), CancellationToken::new()); let (prepare_tx, _prepare_rx) = mpsc::unbounded_channel(); let prepared = prepare_install_operation(&ctx, &prepare_tx, "game") .await .expect("downloaded game should be installable"); let read_guard = ctx.active_operations.read().await; let (tx, mut rx) = mpsc::unbounded_channel(); let install_task = tokio::spawn({ let ctx = ctx.clone(); let tx = tx.clone(); async move { assert!( transition_download_to_install(&ctx, "game", prepared.operation_kind).await ); clear_active_download(&ctx, "game").await; run_started_install_operation(&ctx, &tx, "game".to_string(), prepared).await; } }); tokio::task::yield_now().await; assert_eq!(read_guard.get("game"), Some(&OperationKind::Downloading)); drop(read_guard); install_task.await.expect("handoff task should finish"); match recv_event(&mut rx).await { PeerEvent::InstallGameBegin { id, operation } => { assert_eq!(id, "game"); assert_eq!(operation, InstallOperation::Installing); } _ => panic!("expected InstallGameBegin"), } assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" )); assert!(ctx.active_operations.read().await.is_empty()); assert!(ctx.active_downloads.read().await.is_empty()); assert_local_update(recv_event(&mut rx).await, true, true); } #[tokio::test] async fn update_refreshes_settled_state_after_guard_release() { let temp = TempDir::new("lanspread-handler-update"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); write_file(&root.join("local").join("old.txt"), b"old"); let ctx = test_ctx(temp.path().to_path_buf()); let (tx, mut rx) = mpsc::unbounded_channel(); run_install_operation(&ctx, &tx, "game".to_string()).await; match recv_event(&mut rx).await { PeerEvent::InstallGameBegin { id, operation } => { assert_eq!(id, "game"); assert_eq!(operation, InstallOperation::Updating); } _ => panic!("expected InstallGameBegin"), } assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" )); assert!(ctx.active_operations.read().await.is_empty()); assert_local_update(recv_event(&mut rx).await, true, true); } #[tokio::test] async fn install_update_uninstall_sequence_reports_new_version_and_settled_state() { let temp = TempDir::new("lanspread-handler-sequence"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20240101"); write_file(&root.join("game.eti"), b"old archive"); let ctx = test_ctx(temp.path().to_path_buf()); let (tx, mut rx) = mpsc::unbounded_channel(); run_install_operation(&ctx, &tx, "game".to_string()).await; assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameBegin { id, operation: InstallOperation::Installing } if id == "game" )); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" )); let game = local_update_game(recv_event(&mut rx).await, true, true); assert_eq!(game.local_version.as_deref(), Some("20240101")); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"new archive"); run_install_operation(&ctx, &tx, "game".to_string()).await; assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameBegin { id, operation: InstallOperation::Updating } if id == "game" )); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" )); let game = local_update_game(recv_event(&mut rx).await, true, true); assert_eq!(game.local_version.as_deref(), Some("20250101")); run_uninstall_operation(&ctx, &tx, "game".to_string()).await; assert!(matches!( recv_event(&mut rx).await, PeerEvent::UninstallGameBegin { id } if id == "game" )); assert!(matches!( recv_event(&mut rx).await, PeerEvent::UninstallGameFinished { id } if id == "game" )); let game = local_update_game(recv_event(&mut rx).await, false, true); assert_eq!(game.local_version.as_deref(), Some("20250101")); assert!(ctx.active_operations.read().await.is_empty()); } #[tokio::test] async fn uninstall_refreshes_settled_state_after_guard_release() { let temp = TempDir::new("lanspread-handler-uninstall"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); write_file(&root.join("local").join("old.txt"), b"old"); let ctx = test_ctx(temp.path().to_path_buf()); let (tx, mut rx) = mpsc::unbounded_channel(); run_uninstall_operation(&ctx, &tx, "game".to_string()).await; assert!(matches!( recv_event(&mut rx).await, PeerEvent::UninstallGameBegin { id } if id == "game" )); assert!(matches!( recv_event(&mut rx).await, PeerEvent::UninstallGameFinished { id } if id == "game" )); assert!(ctx.active_operations.read().await.is_empty()); assert_local_update(recv_event(&mut rx).await, false, true); } #[tokio::test] async fn path_changing_set_game_dir_is_rejected_while_operations_are_active() { let current = TempDir::new("lanspread-handler-current-dir"); let next = TempDir::new("lanspread-handler-next-dir"); let ctx = test_ctx(current.path().to_path_buf()); ctx.active_operations .write() .await .insert("game".to_string(), OperationKind::Downloading); let (tx, _rx) = mpsc::unbounded_channel(); handle_set_game_dir_command(&ctx, &tx, next.path().to_path_buf()).await; assert_eq!(*ctx.game_dir.read().await, current.path()); } #[tokio::test] async fn same_path_set_game_dir_refreshes_without_recovery() { let temp = TempDir::new("lanspread-handler-same-dir"); write_file(&temp.game_root().join(".version.ini.tmp"), b"tmp"); let ctx = test_ctx(temp.path().to_path_buf()); let (tx, _rx) = mpsc::unbounded_channel(); handle_set_game_dir_command(&ctx, &tx, temp.path().to_path_buf()).await; ctx.task_tracker.close(); ctx.task_tracker.wait().await; assert!(temp.game_root().join(".version.ini.tmp").is_file()); } #[tokio::test] async fn path_changing_set_game_dir_runs_recovery() { let current = TempDir::new("lanspread-handler-old-dir"); let next = TempDir::new("lanspread-handler-new-dir"); write_file(&next.game_root().join(".version.ini.tmp"), b"tmp"); let ctx = test_ctx(current.path().to_path_buf()); let (tx, _rx) = mpsc::unbounded_channel(); handle_set_game_dir_command(&ctx, &tx, next.path().to_path_buf()).await; ctx.task_tracker.close(); ctx.task_tracker.wait().await; assert!(!next.game_root().join(".version.ini.tmp").exists()); } }