diff --git a/crates/lanspread-peer/src/library.rs b/crates/lanspread-peer/src/library.rs index cf0038b..4932d38 100644 --- a/crates/lanspread-peer/src/library.rs +++ b/crates/lanspread-peer/src/library.rs @@ -17,10 +17,12 @@ pub struct LocalLibraryState { impl LocalLibraryState { pub fn empty() -> Self { + let games = HashMap::new(); + let digest = compute_library_digest(&games); Self { revision: 0, - digest: 0, - games: HashMap::new(), + digest, + games, recent_deltas: VecDeque::new(), } } diff --git a/crates/lanspread-peer/src/services/local_monitor.rs b/crates/lanspread-peer/src/services/local_monitor.rs index 24b5b60..56cbe49 100644 --- a/crates/lanspread-peer/src/services/local_monitor.rs +++ b/crates/lanspread-peer/src/services/local_monitor.rs @@ -331,7 +331,102 @@ fn should_ignore_game_child(name: &str) -> bool { #[cfg(test)] mod tests { - use super::{game_id_from_event_path, should_ignore_game_child}; + use std::{ + collections::HashSet, + path::{Path, PathBuf}, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, SystemTime, UNIX_EPOCH}, + }; + + use notify::EventKind; + use tokio::sync::{RwLock, mpsc}; + use tokio_util::{sync::CancellationToken, task::TaskTracker}; + + use super::*; + use crate::{UnpackFuture, Unpacker, context::OperationKind, peer_db::PeerGameDB}; + + struct TempDir(PathBuf); + + static NEXT_TEMP_ID: AtomicU64 = AtomicU64::new(0); + + impl TempDir { + fn new() -> Self { + let mut path = std::env::temp_dir(); + let unique_id = NEXT_TEMP_ID.fetch_add(1, Ordering::Relaxed); + path.push(format!( + "lanspread-local-monitor-{}-{}-{}", + std::process::id(), + unique_id, + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + std::fs::create_dir_all(&path).expect("temp dir should be created"); + Self(path) + } + + fn path(&self) -> &Path { + &self.0 + } + } + + impl Drop for TempDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + struct NoopUnpacker; + + impl Unpacker for NoopUnpacker { + fn unpack<'a>(&'a self, _archive: &'a Path, _dest: &'a Path) -> UnpackFuture<'a> { + Box::pin(async { Ok(()) }) + } + } + + fn write_file(path: &Path, bytes: &[u8]) { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).expect("parent dir should be created"); + } + std::fs::write(path, bytes).expect("file should be written"); + } + + fn test_ctx(game_dir: PathBuf, catalog: HashSet) -> Ctx { + Ctx::new( + Arc::new(RwLock::new(PeerGameDB::new())), + "peer".to_string(), + game_dir, + Arc::new(NoopUnpacker), + CancellationToken::new(), + TaskTracker::new(), + Arc::new(RwLock::new(catalog)), + ) + } + + fn watch_event(path: PathBuf) -> notify::Result { + Ok(Event::new(EventKind::Any).add_path(path)) + } + + async fn recv_local_update( + rx: &mut mpsc::UnboundedReceiver, + ) -> (Vec, Vec) { + let event = tokio::time::timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("local update event should arrive") + .expect("event channel should stay open"); + let PeerEvent::LocalGamesUpdated { + games, + active_operations, + } = event + else { + panic!("expected LocalGamesUpdated"); + }; + (games, active_operations) + } #[test] fn event_paths_map_to_top_level_game_id() { @@ -369,4 +464,127 @@ mod tests { assert!(!should_ignore_game_child("version.ini")); assert!(!should_ignore_game_child("game.eti")); } + + #[tokio::test] + async fn watch_event_for_active_game_is_dropped() { + let temp = TempDir::new(); + let ctx = test_ctx( + temp.path().to_path_buf(), + HashSet::from(["game".to_string()]), + ); + ctx.active_operations + .write() + .await + .insert("game".to_string(), OperationKind::Downloading); + let gate = RescanGate::default(); + let (tx, mut rx) = mpsc::unbounded_channel(); + + handle_watch_event( + &ctx, + &tx, + &gate, + watch_event(temp.path().join("game").join("version.ini")), + ) + .await; + ctx.task_tracker.close(); + ctx.task_tracker.wait().await; + + assert!( + tokio::time::timeout(Duration::from_millis(50), rx.recv()) + .await + .is_err(), + "active game event should not schedule a UI update" + ); + assert!(gate.running.read().await.is_empty()); + assert!(gate.pending.read().await.is_empty()); + } + + #[tokio::test] + async fn burst_watch_events_collapse_to_two_rescans_for_same_game() { + let temp = TempDir::new(); + let game_root = temp.path().join("game"); + write_file(&game_root.join("version.ini"), b"20250101"); + let ctx = test_ctx( + temp.path().to_path_buf(), + HashSet::from(["game".to_string()]), + ); + let gate = RescanGate::default(); + let (tx, mut rx) = mpsc::unbounded_channel(); + + let library_guard = ctx.local_library.write().await; + queue_rescan(&ctx, &tx, &gate, "game".to_string()).await; + tokio::time::sleep(Duration::from_millis(20)).await; + + for _ in 0..5 { + queue_rescan(&ctx, &tx, &gate, "game".to_string()).await; + } + + assert_eq!(gate.pending.read().await.len(), 1); + drop(library_guard); + ctx.task_tracker.close(); + ctx.task_tracker.wait().await; + + let mut update_count = 0; + while let Ok(Some(PeerEvent::LocalGamesUpdated { .. })) = + tokio::time::timeout(Duration::from_millis(50), rx.recv()).await + { + update_count += 1; + } + assert!( + (1..=2).contains(&update_count), + "expected one initial rescan plus at most one pending rescan, got {update_count}" + ); + } + + #[tokio::test] + async fn fallback_scan_picks_up_sideloaded_catalog_game() { + let temp = TempDir::new(); + write_file(&temp.path().join("game").join("version.ini"), b"20250101"); + let ctx = test_ctx( + temp.path().to_path_buf(), + HashSet::from(["game".to_string()]), + ); + let (tx, mut rx) = mpsc::unbounded_channel(); + + run_fallback_scan(&ctx, &tx).await; + + let (games, active_operations) = recv_local_update(&mut rx).await; + assert!(active_operations.is_empty()); + let game = games + .iter() + .find(|game| game.id == "game") + .expect("sideloaded catalog game should be emitted"); + assert!(game.downloaded); + assert!(!game.installed); + } + + #[tokio::test] + async fn fallback_scan_ignores_non_catalog_game_without_library_delta() { + let temp = TempDir::new(); + write_file( + &temp.path().join("non-catalog").join("version.ini"), + b"20250101", + ); + let ctx = test_ctx( + temp.path().to_path_buf(), + HashSet::from(["game".to_string()]), + ); + let (tx, mut rx) = mpsc::unbounded_channel(); + + run_fallback_scan(&ctx, &tx).await; + + let (games, active_operations) = recv_local_update(&mut rx).await; + assert!(games.is_empty()); + assert!(active_operations.is_empty()); + let library = ctx.local_library.read().await; + assert!(library.games.is_empty()); + assert!(library.recent_deltas.is_empty()); + assert!( + !temp + .path() + .join(".lanspread") + .join("library_index.json") + .exists() + ); + } }