test(peer): cover local monitor rescan gating

Add dispatch-level tests for the local game monitor paths called out in
FOLLOW_UP_2.md. The new coverage verifies watcher events are dropped while a
game has an active operation, burst events for one game collapse through the
pending set to at most one extra rescan, fallback scans pick up sideloaded
catalog games, and non-catalog roots stay invisible to the library state.

The non-catalog test exposed that an empty local library initialized with
digest zero, while the computed digest for an empty map is nonzero. That made
the first empty scan produce a meaningless empty LibraryDelta. Initialize the
empty state with the computed empty digest so a non-catalog-only scan leaves no
delta behind.

Test Plan:
- git diff --check
- just fmt
- just clippy
- just test

Follow-up-Plan: FOLLOW_UP_2.md
This commit is contained in:
2026-05-16 09:13:38 +02:00
parent c7b7ab7576
commit 2a94445391
2 changed files with 223 additions and 3 deletions
+4 -2
View File
@@ -17,10 +17,12 @@ pub struct LocalLibraryState {
impl LocalLibraryState { impl LocalLibraryState {
pub fn empty() -> Self { pub fn empty() -> Self {
let games = HashMap::new();
let digest = compute_library_digest(&games);
Self { Self {
revision: 0, revision: 0,
digest: 0, digest,
games: HashMap::new(), games,
recent_deltas: VecDeque::new(), recent_deltas: VecDeque::new(),
} }
} }
@@ -331,7 +331,102 @@ fn should_ignore_game_child(name: &str) -> bool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{game_id_from_event_path, should_ignore_game_child}; use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use notify::EventKind;
use tokio::sync::{RwLock, mpsc};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use super::*;
use crate::{UnpackFuture, Unpacker, context::OperationKind, peer_db::PeerGameDB};
struct TempDir(PathBuf);
static NEXT_TEMP_ID: AtomicU64 = AtomicU64::new(0);
impl TempDir {
fn new() -> Self {
let mut path = std::env::temp_dir();
let unique_id = NEXT_TEMP_ID.fetch_add(1, Ordering::Relaxed);
path.push(format!(
"lanspread-local-monitor-{}-{}-{}",
std::process::id(),
unique_id,
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
));
std::fs::create_dir_all(&path).expect("temp dir should be created");
Self(path)
}
fn path(&self) -> &Path {
&self.0
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
struct NoopUnpacker;
impl Unpacker for NoopUnpacker {
fn unpack<'a>(&'a self, _archive: &'a Path, _dest: &'a Path) -> UnpackFuture<'a> {
Box::pin(async { Ok(()) })
}
}
fn write_file(path: &Path, bytes: &[u8]) {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).expect("parent dir should be created");
}
std::fs::write(path, bytes).expect("file should be written");
}
fn test_ctx(game_dir: PathBuf, catalog: HashSet<String>) -> Ctx {
Ctx::new(
Arc::new(RwLock::new(PeerGameDB::new())),
"peer".to_string(),
game_dir,
Arc::new(NoopUnpacker),
CancellationToken::new(),
TaskTracker::new(),
Arc::new(RwLock::new(catalog)),
)
}
fn watch_event(path: PathBuf) -> notify::Result<Event> {
Ok(Event::new(EventKind::Any).add_path(path))
}
async fn recv_local_update(
rx: &mut mpsc::UnboundedReceiver<PeerEvent>,
) -> (Vec<lanspread_db::db::Game>, Vec<crate::ActiveOperation>) {
let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("local update event should arrive")
.expect("event channel should stay open");
let PeerEvent::LocalGamesUpdated {
games,
active_operations,
} = event
else {
panic!("expected LocalGamesUpdated");
};
(games, active_operations)
}
#[test] #[test]
fn event_paths_map_to_top_level_game_id() { fn event_paths_map_to_top_level_game_id() {
@@ -369,4 +464,127 @@ mod tests {
assert!(!should_ignore_game_child("version.ini")); assert!(!should_ignore_game_child("version.ini"));
assert!(!should_ignore_game_child("game.eti")); assert!(!should_ignore_game_child("game.eti"));
} }
#[tokio::test]
async fn watch_event_for_active_game_is_dropped() {
let temp = TempDir::new();
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
);
ctx.active_operations
.write()
.await
.insert("game".to_string(), OperationKind::Downloading);
let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel();
handle_watch_event(
&ctx,
&tx,
&gate,
watch_event(temp.path().join("game").join("version.ini")),
)
.await;
ctx.task_tracker.close();
ctx.task_tracker.wait().await;
assert!(
tokio::time::timeout(Duration::from_millis(50), rx.recv())
.await
.is_err(),
"active game event should not schedule a UI update"
);
assert!(gate.running.read().await.is_empty());
assert!(gate.pending.read().await.is_empty());
}
#[tokio::test]
async fn burst_watch_events_collapse_to_two_rescans_for_same_game() {
let temp = TempDir::new();
let game_root = temp.path().join("game");
write_file(&game_root.join("version.ini"), b"20250101");
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
);
let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel();
let library_guard = ctx.local_library.write().await;
queue_rescan(&ctx, &tx, &gate, "game".to_string()).await;
tokio::time::sleep(Duration::from_millis(20)).await;
for _ in 0..5 {
queue_rescan(&ctx, &tx, &gate, "game".to_string()).await;
}
assert_eq!(gate.pending.read().await.len(), 1);
drop(library_guard);
ctx.task_tracker.close();
ctx.task_tracker.wait().await;
let mut update_count = 0;
while let Ok(Some(PeerEvent::LocalGamesUpdated { .. })) =
tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
{
update_count += 1;
}
assert!(
(1..=2).contains(&update_count),
"expected one initial rescan plus at most one pending rescan, got {update_count}"
);
}
#[tokio::test]
async fn fallback_scan_picks_up_sideloaded_catalog_game() {
let temp = TempDir::new();
write_file(&temp.path().join("game").join("version.ini"), b"20250101");
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
);
let (tx, mut rx) = mpsc::unbounded_channel();
run_fallback_scan(&ctx, &tx).await;
let (games, active_operations) = recv_local_update(&mut rx).await;
assert!(active_operations.is_empty());
let game = games
.iter()
.find(|game| game.id == "game")
.expect("sideloaded catalog game should be emitted");
assert!(game.downloaded);
assert!(!game.installed);
}
#[tokio::test]
async fn fallback_scan_ignores_non_catalog_game_without_library_delta() {
let temp = TempDir::new();
write_file(
&temp.path().join("non-catalog").join("version.ini"),
b"20250101",
);
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
);
let (tx, mut rx) = mpsc::unbounded_channel();
run_fallback_scan(&ctx, &tx).await;
let (games, active_operations) = recv_local_update(&mut rx).await;
assert!(games.is_empty());
assert!(active_operations.is_empty());
let library = ctx.local_library.read().await;
assert!(library.games.is_empty());
assert!(library.recent_deltas.is_empty());
assert!(
!temp
.path()
.join(".lanspread")
.join("library_index.json")
.exists()
);
}
} }