Files
lanspread/crates/lanspread-peer/src/services/local_monitor.rs
T
ddidderr 738095235f feat(peer): coordinate outbound transfers with local game mutations
Updating or removing a local game rewrites its on-disk files. Peers that
were mid-download of that game would keep streaming bytes from files that
are being deleted or replaced, handing them a corrupt or stale copy.
There was also no authoritative notion of which game version a peer
should serve or accept, so a peer could serve whatever happened to be on
disk and downloaders could aggregate files from peers running mismatched
versions.

This introduces a reader-writer coordination scheme between outbound file
transfers (readers) and local mutation operations (writers), and gates
both serving and downloading on an authoritative game catalog version.

Reader-writer coordination:
- Track active outbound transfers per game in a shared `OutboundTransfers`
  map of (id, CancellationToken), threaded through `Ctx`/`PeerCtx` and
  registered by a `TransferGuard` in the stream service. The guard is
  registered *before* the serve-eligibility check to close a TOCTOU window
  where a writer could miss an in-flight reader.
- `stream_file_bytes` now honors a cancellation token at every await point
  (file read, network send, stream close) via `tokio::select!`, so a
  transfer aborts promptly instead of hanging on a stalled receiver.
- `begin_operation` marks a game active first, then cancels its outbound
  transfers and waits for the count to reach zero before any
  Updating/RemovingDownload work touches the filesystem.
- Active games are now hidden from library snapshots entirely while an
  operation is in flight, instead of freezing their last announced state,
  so peers stop discovering a game that is being mutated.

Authoritative version catalog:
- Replace the `HashSet<String>` catalog with `GameCatalog`, mapping each
  game id to its expected version (from the bundled game.db / ETI data).
- Serving requires the local `version.ini` to match the catalog version
  (`local_download_matches_catalog`); peer selection, file aggregation,
  and majority size validation all filter on the expected version
  (`peers_with_expected_version`, `aggregated_game_files`, and friends).

User-visible changes:
- The GUI shows confirmation dialogs before Update and Remove, and
  surfaces a sharing-status indicator on game cards and the detail modal.
- A new `OutboundTransferCountChanged` event lets the UI reflect live
  outbound transfer activity.

Test Plan:
- just test
- just frontend-test
- just clippy
2026-05-30 16:36:58 +02:00

603 lines
18 KiB
Rust

//! Local game directory monitor.
use std::{
collections::HashSet,
path::{Component, Path, PathBuf},
sync::Arc,
time::Duration,
};
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use crate::{
PeerEvent,
config::LOCAL_GAME_FALLBACK_SCAN_SECS,
context::Ctx,
handlers::update_and_announce_games,
local_games::{
is_ignored_game_root_name,
is_local_dir_name,
rescan_local_game,
scan_local_library,
},
};
struct WatchState {
watcher: RecommendedWatcher,
game_dir: PathBuf,
watched: HashSet<PathBuf>,
}
#[derive(Clone, Default)]
struct RescanGate {
running: Arc<RwLock<HashSet<String>>>,
pending: Arc<RwLock<HashSet<String>>>,
}
/// Monitors the local game directory for changes.
pub async fn run_local_game_monitor(
tx_notify_ui: UnboundedSender<PeerEvent>,
ctx: Ctx,
) -> eyre::Result<()> {
log::info!("Starting notify-based local game directory monitor");
let (watch_tx, mut watch_rx) = tokio::sync::mpsc::unbounded_channel::<notify::Result<Event>>();
let mut watch_state = build_watch_state(&ctx, watch_tx.clone()).await;
let gate = RescanGate::default();
let mut fallback_interval =
tokio::time::interval(Duration::from_secs(LOCAL_GAME_FALLBACK_SCAN_SECS));
loop {
tokio::select! {
() = ctx.shutdown.cancelled() => return Ok(()),
_ = fallback_interval.tick() => {
run_fallback_scan(&ctx, &tx_notify_ui).await;
reconcile_watch_state(&ctx, &mut watch_state, watch_tx.clone()).await;
}
Some(event) = watch_rx.recv() => {
handle_watch_event(
&ctx,
&tx_notify_ui,
&gate,
event,
).await;
reconcile_watch_state(&ctx, &mut watch_state, watch_tx.clone()).await;
}
}
}
}
async fn build_watch_state(
ctx: &Ctx,
watch_tx: tokio::sync::mpsc::UnboundedSender<notify::Result<Event>>,
) -> Option<WatchState> {
let game_dir = ctx.game_dir.read().await.clone();
let mut fs_watcher = match RecommendedWatcher::new(
move |result| {
let _ = watch_tx.send(result);
},
Config::default(),
) {
Ok(watcher) => watcher,
Err(err) => {
log::warn!("Filesystem watcher unavailable; falling back to periodic scans: {err}");
return None;
}
};
let watched_paths = match watch_game_roots(&mut fs_watcher, &game_dir).await {
Ok(paths) => paths,
Err(err) => {
log::warn!(
"Failed to initialize filesystem watcher for {}: {err}; falling back to periodic scans",
game_dir.display()
);
return None;
}
};
Some(WatchState {
watcher: fs_watcher,
game_dir,
watched: watched_paths,
})
}
async fn reconcile_watch_state(
ctx: &Ctx,
watch_state: &mut Option<WatchState>,
watch_tx: tokio::sync::mpsc::UnboundedSender<notify::Result<Event>>,
) {
let current_game_dir = ctx.game_dir.read().await.clone();
if watch_state
.as_ref()
.is_none_or(|state| state.game_dir != current_game_dir)
{
*watch_state = build_watch_state(ctx, watch_tx).await;
return;
}
if let Some(state) = watch_state
&& let Err(err) = reconcile_game_root_watches(state).await
{
log::warn!(
"Failed to reconcile filesystem watches for {}: {err}",
state.game_dir.display()
);
}
}
async fn watch_game_roots(
watcher: &mut RecommendedWatcher,
game_dir: &Path,
) -> eyre::Result<HashSet<PathBuf>> {
let mut watched_paths = HashSet::new();
watch_path(watcher, game_dir, &mut watched_paths)?;
for root in list_game_roots(game_dir).await? {
watch_path(watcher, &root, &mut watched_paths)?;
}
Ok(watched_paths)
}
async fn reconcile_game_root_watches(state: &mut WatchState) -> eyre::Result<()> {
let desired = {
let mut desired = HashSet::from([state.game_dir.clone()]);
desired.extend(list_game_roots(&state.game_dir).await?);
desired
};
let stale_paths = state
.watched
.difference(&desired)
.cloned()
.collect::<Vec<_>>();
for path in stale_paths {
if let Err(err) = state.watcher.unwatch(&path) {
log::debug!("Failed to unwatch {}: {err}", path.display());
}
state.watched.remove(&path);
}
let new_paths = desired
.difference(&state.watched)
.cloned()
.collect::<Vec<_>>();
for path in new_paths {
watch_path(&mut state.watcher, &path, &mut state.watched)?;
}
Ok(())
}
fn watch_path(
watcher: &mut RecommendedWatcher,
path: &Path,
watched_paths: &mut HashSet<PathBuf>,
) -> notify::Result<()> {
watcher.watch(path, RecursiveMode::NonRecursive)?;
watched_paths.insert(path.to_path_buf());
Ok(())
}
async fn list_game_roots(game_dir: &Path) -> eyre::Result<Vec<PathBuf>> {
let mut roots = Vec::new();
let mut entries = match tokio::fs::read_dir(game_dir).await {
Ok(entries) => entries,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(roots),
Err(err) => return Err(err.into()),
};
while let Some(entry) = entries.next_entry().await? {
if !entry.file_type().await?.is_dir() {
continue;
}
let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else {
continue;
};
if is_ignored_game_root_name(&name) {
continue;
}
roots.push(entry.path());
}
Ok(roots)
}
async fn handle_watch_event(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
gate: &RescanGate,
event: notify::Result<Event>,
) {
let event = match event {
Ok(event) => event,
Err(err) => {
log::warn!("Filesystem watcher event error: {err}");
return;
}
};
if matches!(event.kind, EventKind::Access(_)) {
return;
}
let game_dir = ctx.game_dir.read().await.clone();
let ids = event
.paths
.iter()
.filter_map(|path| game_id_from_event_path(&game_dir, path))
.collect::<HashSet<_>>();
for id in ids {
if ctx.active_operations.read().await.contains_key(&id) {
log::debug!("Dropping filesystem event for {id}: operation active");
continue;
}
queue_rescan(ctx, tx_notify_ui, gate, id).await;
}
}
async fn queue_rescan(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
gate: &RescanGate,
id: String,
) {
{
let mut running = gate.running.write().await;
if running.contains(&id) {
gate.pending.write().await.insert(id);
return;
}
running.insert(id.clone());
}
let ctx = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
let gate = gate.clone();
ctx.task_tracker.clone().spawn(async move {
run_gated_rescan(ctx, tx_notify_ui, gate, id).await;
});
}
async fn run_gated_rescan(
ctx: Ctx,
tx_notify_ui: UnboundedSender<PeerEvent>,
gate: RescanGate,
id: String,
) {
loop {
gate.pending.write().await.remove(&id);
if ctx.active_operations.read().await.contains_key(&id) {
break;
}
let game_dir = ctx.game_dir.read().await.clone();
let catalog = ctx.catalog.read().await.clone();
match rescan_local_game(&game_dir, ctx.state_dir.as_ref(), &catalog, &id).await {
Ok(scan) => update_and_announce_games(&ctx, &tx_notify_ui, scan).await,
Err(err) => log::error!("Failed to rescan local game {id}: {err}"),
}
if !gate.pending.write().await.remove(&id) {
break;
}
}
gate.running.write().await.remove(&id);
}
async fn run_fallback_scan(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
let game_dir = ctx.game_dir.read().await.clone();
let catalog = ctx.catalog.read().await.clone();
match scan_local_library(&game_dir, ctx.state_dir.as_ref(), &catalog).await {
Ok(scan) => update_and_announce_games(ctx, tx_notify_ui, scan).await,
Err(err) => log::error!("Failed to scan local games directory: {err}"),
}
}
fn game_id_from_event_path(game_dir: &Path, path: &Path) -> Option<String> {
let relative = path.strip_prefix(game_dir).ok()?;
let mut components = relative.components();
let game_id = component_name(components.next()?)?;
if is_ignored_game_root_name(game_id) {
return None;
}
if let Some(second) = components.next().and_then(component_name)
&& should_ignore_game_child(second)
{
return None;
}
Some(game_id.to_string())
}
fn component_name(component: Component<'_>) -> Option<&str> {
match component {
Component::Normal(name) => name.to_str(),
_ => None,
}
}
fn should_ignore_game_child(name: &str) -> bool {
is_local_dir_name(name)
|| name.starts_with(".local.")
|| name.starts_with(".version.ini.")
|| name == ".lanspread"
|| name == ".lanspread.json"
|| name == ".sync"
|| name == ".softlan_game_installed"
}
#[cfg(test)]
mod tests {
use std::{
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use lanspread_db::db::GameCatalog;
use notify::{
EventKind,
event::{AccessKind, AccessMode},
};
use tokio::sync::{RwLock, mpsc};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use super::*;
use crate::{
UnpackFuture,
Unpacker,
context::OperationKind,
peer_db::PeerGameDB,
test_support::TempDir,
};
struct NoopUnpacker;
impl Unpacker for NoopUnpacker {
fn unpack<'a>(&'a self, _archive: &'a Path, _dest: &'a Path) -> UnpackFuture<'a> {
Box::pin(async { Ok(()) })
}
}
fn write_file(path: &Path, bytes: &[u8]) {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).expect("parent dir should be created");
}
std::fs::write(path, bytes).expect("file should be written");
}
fn test_ctx(game_dir: PathBuf, catalog: GameCatalog) -> Ctx {
Ctx::new(
Arc::new(RwLock::new(PeerGameDB::new())),
"peer".to_string(),
game_dir.clone(),
game_dir.join(".test-state"),
Arc::new(NoopUnpacker),
CancellationToken::new(),
TaskTracker::new(),
Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(std::collections::HashMap::new())),
)
}
fn watch_event(path: PathBuf) -> notify::Result<Event> {
Ok(Event::new(EventKind::Any).add_path(path))
}
async fn recv_local_update(
rx: &mut mpsc::UnboundedReceiver<PeerEvent>,
) -> Vec<lanspread_db::db::Game> {
let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("local update event should arrive")
.expect("event channel should stay open");
let PeerEvent::LocalLibraryChanged { games } = event else {
panic!("expected LocalLibraryChanged");
};
games
}
#[test]
fn event_paths_map_to_top_level_game_id() {
let root = std::path::Path::new("/games");
assert_eq!(
game_id_from_event_path(root, std::path::Path::new("/games/aoe2/version.ini"))
.as_deref(),
Some("aoe2")
);
assert_eq!(
game_id_from_event_path(root, std::path::Path::new("/games/aoe2/local/save.dat")),
None
);
assert_eq!(
game_id_from_event_path(root, std::path::Path::new("/games/.lanspread/index.json")),
None
);
}
#[test]
fn event_ignore_list_covers_reserved_names() {
for name in [
"local",
".local.installing",
".local.backup",
".version.ini.tmp",
".version.ini.discarded",
".lanspread",
".lanspread.json",
".sync",
".softlan_game_installed",
] {
assert!(should_ignore_game_child(name));
}
assert!(!should_ignore_game_child("version.ini"));
assert!(!should_ignore_game_child("game.eti"));
}
#[tokio::test]
async fn watch_event_for_active_game_is_dropped() {
let temp = TempDir::new("lanspread-local-monitor");
let ctx = test_ctx(
temp.path().to_path_buf(),
GameCatalog::from_ids(["game".to_string()]),
);
ctx.active_operations
.write()
.await
.insert("game".to_string(), OperationKind::Downloading);
let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel();
handle_watch_event(
&ctx,
&tx,
&gate,
watch_event(temp.path().join("game").join("version.ini")),
)
.await;
ctx.task_tracker.close();
ctx.task_tracker.wait().await;
assert!(
tokio::time::timeout(Duration::from_millis(50), rx.recv())
.await
.is_err(),
"active game event should not schedule a UI update"
);
assert!(gate.running.read().await.is_empty());
assert!(gate.pending.read().await.is_empty());
}
#[tokio::test]
async fn access_watch_event_is_ignored() {
let temp = TempDir::new("lanspread-local-monitor");
write_file(&temp.path().join("game").join("version.ini"), b"20250101");
let ctx = test_ctx(
temp.path().to_path_buf(),
GameCatalog::from_ids(["game".to_string()]),
);
let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel();
handle_watch_event(
&ctx,
&tx,
&gate,
Ok(
Event::new(EventKind::Access(AccessKind::Close(AccessMode::Read)))
.add_path(temp.path().join("game").join("version.ini")),
),
)
.await;
ctx.task_tracker.close();
ctx.task_tracker.wait().await;
assert!(
tokio::time::timeout(Duration::from_millis(50), rx.recv())
.await
.is_err(),
"access events should not schedule a UI update"
);
assert!(gate.running.read().await.is_empty());
assert!(gate.pending.read().await.is_empty());
}
#[tokio::test]
async fn burst_watch_events_collapse_to_two_rescans_for_same_game() {
let temp = TempDir::new("lanspread-local-monitor");
let game_root = temp.path().join("game");
write_file(&game_root.join("version.ini"), b"20250101");
let ctx = test_ctx(
temp.path().to_path_buf(),
GameCatalog::from_ids(["game".to_string()]),
);
let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel();
let library_guard = ctx.local_library.write().await;
queue_rescan(&ctx, &tx, &gate, "game".to_string()).await;
tokio::time::sleep(Duration::from_millis(20)).await;
for _ in 0..5 {
queue_rescan(&ctx, &tx, &gate, "game".to_string()).await;
}
assert_eq!(gate.pending.read().await.len(), 1);
drop(library_guard);
ctx.task_tracker.close();
ctx.task_tracker.wait().await;
let mut update_count = 0;
while let Ok(Some(PeerEvent::LocalLibraryChanged { .. })) =
tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
{
update_count += 1;
}
assert!(
(1..=2).contains(&update_count),
"expected one initial rescan plus at most one pending rescan, got {update_count}"
);
}
#[tokio::test]
async fn fallback_scan_picks_up_sideloaded_catalog_game() {
let temp = TempDir::new("lanspread-local-monitor");
write_file(&temp.path().join("game").join("version.ini"), b"20250101");
let ctx = test_ctx(
temp.path().to_path_buf(),
GameCatalog::from_ids(["game".to_string()]),
);
let (tx, mut rx) = mpsc::unbounded_channel();
run_fallback_scan(&ctx, &tx).await;
let games = recv_local_update(&mut rx).await;
let game = games
.iter()
.find(|game| game.id == "game")
.expect("sideloaded catalog game should be emitted");
assert!(game.downloaded);
assert!(!game.installed);
}
#[tokio::test]
async fn fallback_scan_ignores_non_catalog_game_without_library_delta() {
let temp = TempDir::new("lanspread-local-monitor");
write_file(
&temp.path().join("non-catalog").join("version.ini"),
b"20250101",
);
let ctx = test_ctx(
temp.path().to_path_buf(),
GameCatalog::from_ids(["game".to_string()]),
);
let (tx, mut rx) = mpsc::unbounded_channel();
run_fallback_scan(&ctx, &tx).await;
assert!(
tokio::time::timeout(Duration::from_millis(50), rx.recv())
.await
.is_err(),
"non-catalog scan should not emit a local library event"
);
let library = ctx.local_library.read().await;
assert!(library.games.is_empty());
assert!(library.recent_deltas.is_empty());
assert!(
!temp
.path()
.join(".lanspread")
.join("library_index.json")
.exists()
);
}
}