From 41e9a0efc11ec99701fdc3e8d1758c9f02149cd2 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Mon, 18 May 2026 21:25:20 +0200 Subject: [PATCH] refactor(peer): split local library and operation UI events Replace the `a9f9845` local-update dedup cache with explicit peer event semantics. Local scans now emit `LocalLibraryChanged` when the library changes, while operation mutations emit `ActiveOperationsChanged` from the mutation path. Tauri keeps joining those facts into the existing `games-list-updated` payload, so the frontend contract stays stable. This removes the cache/invalidation coupling between scan emission and operation state. The remaining forced local snapshot is explicit: accepted game directory changes can refresh the UI for an equivalent new path without sending a peer library delta. Operation guard cleanup and liveness cancellation now publish the same active operation snapshot as normal command-handler transitions. The peer CLI JSONL events follow the same split with `local-library-changed` and `active-operations-changed`. Test Plan: - `just fmt` - `CARGO_BUILD_RUSTC_WRAPPER= just test` - `CARGO_BUILD_RUSTC_WRAPPER= just clippy` - `git diff --check` Refs: CLEAN_CODE_PLAN_1.md --- CLEAN_CODE.md | 76 +++ CLEAN_CODE_PLAN_1.md | 70 +++ IMPL_DECISIONS.md | 2 +- PEER_CLI_SCENARIOS.md | 2 +- crates/lanspread-peer-cli/src/main.rs | 16 +- crates/lanspread-peer/ARCHITECTURE.md | 5 + crates/lanspread-peer/README.md | 3 +- crates/lanspread-peer/src/context.rs | 82 +++- crates/lanspread-peer/src/events.rs | 53 ++- crates/lanspread-peer/src/handlers.rs | 434 ++++++++++++------ crates/lanspread-peer/src/lib.rs | 9 +- .../lanspread-peer/src/services/liveness.rs | 43 +- .../src/services/local_monitor.rs | 26 +- .../src-tauri/src/lib.rs | 91 +--- 14 files changed, 657 insertions(+), 255 deletions(-) create mode 100644 CLEAN_CODE.md create mode 100644 CLEAN_CODE_PLAN_1.md diff --git a/CLEAN_CODE.md b/CLEAN_CODE.md new file mode 100644 index 0000000..df92790 --- /dev/null +++ b/CLEAN_CODE.md @@ -0,0 +1,76 @@ +# Clean code notes + +Running notes on architectural smells in the codebase: things that work today +but are shaped wrong, and what a cleaner design would look like. Each entry +should explain the smell, why the current shape exists, and the warning sign +that says "now is the time to refactor." + +--- + +## Resolved: local library and operation UI signals are split + +**Context.** Commit `a9f9845` ("fix(peer): suppress duplicate local game +updates") added a `last_local_update_key` cache in `Ctx`, keyed on +`(revision, digest, active_operations)`. That worked, but it made scan emission +responsible for deduplicating operation-state changes that were actually owned +by command handlers. + +### The root issue + +`update_and_announce_games` already knows whether the library actually changed: +`LocalLibraryState::update_from_scan` returns `Option`, where +`None` means "nothing changed." That fact gates peer `LibraryDelta` +announcements. Active operation status has a different source of truth: +`Ctx::active_operations`, mutated by operation start, handoff, end, liveness +cancellation, and guard cleanup. + +The old `LocalGamesUpdated { games, active_operations }` event mixed those two +sources of truth. The dedup key was a symptom of that overload. + +### Current shape + +The peer runtime now emits two separate facts: + +- `LocalLibraryChanged { games }` is emitted from scans when the local library + state changes. A real `SetGameDir` path change may force one local snapshot + for the UI even when the library digest matches the previous path, because + Tauri has already cleared local flags for the old path. +- `ActiveOperationsChanged { active_operations }` is emitted when the operation + table changes. Normal mutations go through `begin_operation`, + `transition_download_to_install`, and `end_operation`; liveness cancellation + and `OperationGuard` cleanup publish the same snapshot when they clear state. + +Tauri is the join boundary. It stores the latest game DB and latest active +operation snapshot, then keeps emitting the existing frontend +`games-list-updated` payload. The frontend did not need to learn the peer +runtime's internal event split. + +### Invariants to protect + +Do not reintroduce a scan-level dedup cache for operation state. If a new path +mutates `Ctx::active_operations`, route it through the operation publisher or +explicitly document why it is not UI-visible. If a new local scan reason needs a +UI snapshot without a peer delta, model that as an explicit scan policy like the +path-change forced snapshot, not as cache invalidation. + +--- + +## How to add to this file + +When a code review uncovers a "this works but it's bolted on" pattern, write it +up here. Structure: + +1. **Context** — what commit/PR introduced the pattern, one-paragraph summary. +2. **Root issue** — the underlying invariant the code is working around. +3. **Why the obvious fix doesn't work** — what constraints forced the current + shape. +4. **The tell** — concrete code shapes (scattered invalidations, repeated + special cases, dedup keys that re-derive existing facts) that signal the + smell. +5. **Clean shape** — what the code would look like without the constraint. +6. **Warning signs** — what observations in future work mean "do the + refactor now." + +Keep entries narrative, not bulleted to death. The point is to preserve the +_reasoning_ so future contributors can decide whether the trade-off still +holds. diff --git a/CLEAN_CODE_PLAN_1.md b/CLEAN_CODE_PLAN_1.md new file mode 100644 index 0000000..567b3eb --- /dev/null +++ b/CLEAN_CODE_PLAN_1.md @@ -0,0 +1,70 @@ +# Clean Code Plan 1: Split Local Library and Operation UI Signals + +## Goal + +Replace the `a9f9845` local-update dedup cache with explicit event semantics. +The peer runtime should report local library changes and active operation +changes as separate facts, while the Tauri layer keeps joining those facts into +the existing `games-list-updated` payload for the frontend. + +## Architectural Picture + +The current overloaded shape makes `LocalGamesUpdated` carry two independent +signals: + +- local library contents, whose source of truth is `LocalLibraryState` and + `LocalLibraryState::update_from_scan`; +- active operation status, whose source of truth is `Ctx::active_operations`. + +The clean boundary is: + +- peer scanning emits a local-library event when the scanned library state + changes, with an explicit force policy for accepted path changes where the UI + needs a fresh snapshot but peers do not need a delta; +- operation-state mutation emits an operation snapshot when the mutation + happens; +- Tauri owns UI joining: it stores the latest catalog/local games and latest + operation snapshot, then emits `games-list-updated` for the frontend. + +That keeps the frontend contract stable while removing the cross-cutting cache +and every manual invalidation call. + +## Implementation Steps + +1. Remove commit `a9f9845` from the local branch history before implementing + the replacement, so the final code is not built on the band-aid. +2. Replace `PeerEvent::LocalGamesUpdated { games, active_operations }` with: + - `PeerEvent::LocalLibraryChanged { games }`; + - `PeerEvent::ActiveOperationsChanged { active_operations }`. +3. Add one operation-snapshot publisher near the peer event helpers. All normal + operation mutations must go through helpers that mutate + `Ctx::active_operations` and then emit `ActiveOperationsChanged`. +4. Make `OperationGuard` publish an operation snapshot when it performs + exceptional cleanup on drop, so cancellation or aborted tasks do not leave UI + state stale. +5. Keep the existing scan behavior that freezes active game summaries while an + operation is running, but emit `LocalLibraryChanged` only when + `update_from_scan` returns a real delta or the scan was explicitly forced by + an accepted path change. +6. Update the Tauri event loop to reconcile `ActiveOperationsChanged` + independently, and call `emit_games_list` after both library and operation + state changes. +7. Update focused tests in peer handlers, local monitor, liveness, context guard, + and Tauri reconciliation to prove: + - unchanged settled scans do not emit local-library events; + - operation starts/transitions/ends emit authoritative snapshots; + - exceptional guard cleanup clears the operation snapshot; + - Tauri still emits the same `games-list-updated` UI payload. +8. Update `CLEAN_CODE.md`, `crates/lanspread-peer/ARCHITECTURE.md`, and + `crates/lanspread-peer/README.md` so the docs describe the new shape rather + than the dedup warning. + +## Review Gates + +- No `last_local_update_key`, `LocalUpdateKey`, or invalidate helper remains. +- No operation-state mutation that should be visible to the UI bypasses the + snapshot publisher. +- The peer event names reflect domain facts, not UI implementation details. +- Tauri remains the compatibility boundary for the frontend payload. +- Verification runs through `just fmt`, `just test`, `just clippy`, and + `git diff --check`. diff --git a/IMPL_DECISIONS.md b/IMPL_DECISIONS.md index 56cbe23..6dfc546 100644 --- a/IMPL_DECISIONS.md +++ b/IMPL_DECISIONS.md @@ -32,5 +32,5 @@ rendering follows backend state instead of reverse-engineering it from `installed && !downloaded`. - Removed Tauri's parallel whole-library filesystem scan. The UI database keeps - bundled catalog metadata, while peer `LocalGamesUpdated` snapshots now own + bundled catalog metadata, while peer `LocalLibraryChanged` snapshots now own `downloaded`, `installed`, `local_version`, and `availability`. diff --git a/PEER_CLI_SCENARIOS.md b/PEER_CLI_SCENARIOS.md index be5b627..f89e984 100644 --- a/PEER_CLI_SCENARIOS.md +++ b/PEER_CLI_SCENARIOS.md @@ -8,7 +8,7 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path. | ID | Scenario | Setup | Expected result | | --- | --- | --- | --- | -| S1 | Startup scan | Start one peer with `fixture-alpha`. | Peer emits `local-peer-ready` and `local-games-updated`; catalog fixture games are `downloaded=true`, `installed=false`, `availability=Ready`. | +| S1 | Startup scan | Start one peer with `fixture-alpha`. | Peer emits `local-peer-ready` and `local-library-changed`; catalog fixture games are `downloaded=true`, `installed=false`, `availability=Ready`. | | S2 | Direct connect handshake | Start alpha and bravo, send alpha `connect` to bravo's ready address. | Both peers record one remote peer, no self-peer entry appears, and each peer receives the other's library. | | S3 | Remote aggregation | Empty client connects to alpha and bravo. | `list-games` shows remote-only games once; shared `ggoo` has `peer_count=2`, unique games have `peer_count=1`. | | S4 | Single-source download, no install | Empty client connected to bravo downloads `bfbc2` with `install=false`. | Client emits `got-game-files`, `download-begin`, `download-finished`, then local `bfbc2` is `downloaded=true`, `installed=false`; root files exist and `local/` does not. | diff --git a/crates/lanspread-peer-cli/src/main.rs b/crates/lanspread-peer-cli/src/main.rs index 8e1c038..34a1a7d 100644 --- a/crates/lanspread-peer-cli/src/main.rs +++ b/crates/lanspread-peer-cli/src/main.rs @@ -359,19 +359,17 @@ async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'s shared.state.write().await.remote_games = games.clone(); ("list-games", json!({ "games": games })) } - PeerEvent::LocalGamesUpdated { - games, - active_operations, - } => { + PeerEvent::LocalLibraryChanged { games } => { let mut state = shared.state.write().await; state.local_games.clone_from(&games); + ("local-library-changed", json!({ "games": games })) + } + PeerEvent::ActiveOperationsChanged { active_operations } => { + let mut state = shared.state.write().await; state.active_operations.clone_from(&active_operations); ( - "local-games-updated", - json!({ - "games": games, - "active_operations": active_operations_json(&active_operations), - }), + "active-operations-changed", + json!({ "active_operations": active_operations_json(&active_operations) }), ) } PeerEvent::GotGameFiles { diff --git a/crates/lanspread-peer/ARCHITECTURE.md b/crates/lanspread-peer/ARCHITECTURE.md index 6758f21..e2accc7 100644 --- a/crates/lanspread-peer/ARCHITECTURE.md +++ b/crates/lanspread-peer/ARCHITECTURE.md @@ -77,6 +77,11 @@ When a peer is discovered: - an active operation lock drops events for that game; - a rescan already running for the ID sets a rescan-pending flag; - the running rescan loops once more when that flag was set. +- Local library scans emit `LocalLibraryChanged` only for real library changes, + except that accepted game-directory changes can force a UI snapshot for the + new path without sending a peer delta. +- Active operation mutations emit `ActiveOperationsChanged` from the mutation + path instead of riding on local library scans. - Send `LibraryDelta` to known peers; send `LibrarySummary` on new connections. ## Local game scanning: fast and low cost diff --git a/crates/lanspread-peer/README.md b/crates/lanspread-peer/README.md index fbb54dd..552d2e0 100644 --- a/crates/lanspread-peer/README.md +++ b/crates/lanspread-peer/README.md @@ -37,7 +37,8 @@ lifetime of the process: to keep peer liveness up to date and prunes stale entries from `PeerGameDB`. 4. **Local game monitor** (`run_local_game_monitor`) – watches the configured game directory and each game root non-recursively, gates per-ID rescans while - operations are active, and runs a 300-second fallback scan for missed events. + operations are active, emits local-library changes separately from active + operation snapshots, and runs a 300-second fallback scan for missed events. `scan_local_library` maintains a lightweight on-disk index and produces both a `GameDB` and protocol summaries. A game is downloaded only when its root-level diff --git a/crates/lanspread-peer/src/context.rs b/crates/lanspread-peer/src/context.rs index 63eb582..9dd936f 100644 --- a/crates/lanspread-peer/src/context.rs +++ b/crates/lanspread-peer/src/context.rs @@ -8,10 +8,10 @@ use std::{ }; use lanspread_db::db::GameDB; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use crate::{PeerEvent, Unpacker, library::LocalLibraryState, peer_db::PeerGameDB}; +use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB}; /// Mutating filesystem operation currently in flight for a game root. #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -124,6 +124,7 @@ pub(crate) struct OperationGuard { id: String, active_operations: Arc>>, active_downloads: Arc>>, + tx_notify_ui: UnboundedSender, clears_download: bool, armed: bool, } @@ -132,11 +133,13 @@ impl OperationGuard { pub(crate) fn new( id: String, active_operations: Arc>>, + tx_notify_ui: UnboundedSender, ) -> Self { Self { id, active_operations, active_downloads: Arc::new(RwLock::new(HashMap::new())), + tx_notify_ui, clears_download: false, armed: true, } @@ -146,11 +149,13 @@ impl OperationGuard { id: String, active_operations: Arc>>, active_downloads: Arc>>, + tx_notify_ui: UnboundedSender, ) -> Self { Self { id, active_operations, active_downloads, + tx_notify_ui, clears_download: true, armed: true, } @@ -173,13 +178,19 @@ impl Drop for OperationGuard { ); if let Ok(mut guard) = self.active_operations.try_write() { - guard.remove(&id); + if guard.remove(&id).is_some() { + events::send_active_operations_snapshot(&self.tx_notify_ui, &guard); + } } else if let Ok(handle) = tokio::runtime::Handle::try_current() { let active_operations = self.active_operations.clone(); + let tx_notify_ui = self.tx_notify_ui.clone(); handle.spawn({ let id = id.clone(); async move { - active_operations.write().await.remove(&id); + let mut active_operations = active_operations.write().await; + if active_operations.remove(&id).is_some() { + events::send_active_operations_snapshot(&tx_notify_ui, &active_operations); + } } }); } else { @@ -210,10 +221,11 @@ impl Drop for OperationGuard { mod tests { use std::{collections::HashMap, sync::Arc, time::Duration}; - use tokio::sync::RwLock; + use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; use super::{OperationGuard, OperationKind}; + use crate::{ActiveOperation, ActiveOperationKind, PeerEvent}; type OperationTracking = ( Arc>>, @@ -253,18 +265,34 @@ mod tests { (active_operations, active_downloads, cancel) } + async fn recv_active_operations( + rx: &mut mpsc::UnboundedReceiver, + ) -> Vec { + let event = tokio::time::timeout(Duration::from_secs(1), rx.recv()) + .await + .expect("active operation event should arrive") + .expect("event channel should remain open"); + let PeerEvent::ActiveOperationsChanged { active_operations } = event else { + panic!("expected ActiveOperationsChanged"); + }; + active_operations + } + #[tokio::test] async fn operation_guard_cleans_tracking_when_not_disarmed() { let id = "game-complete"; let (active_operations, active_downloads, _) = tracked_download_state(id); + let (tx, mut rx) = mpsc::unbounded_channel(); drop(OperationGuard::download( id.to_string(), active_operations.clone(), active_downloads.clone(), + tx, )); wait_for_tracking_clear(id, &active_operations, &active_downloads).await; + assert!(recv_active_operations(&mut rx).await.is_empty()); } #[tokio::test] @@ -272,25 +300,30 @@ mod tests { let id = "game-cancelled"; let (active_operations, active_downloads, cancel) = tracked_download_state(id); cancel.cancel(); + let (tx, mut rx) = mpsc::unbounded_channel(); drop(OperationGuard::download( id.to_string(), active_operations.clone(), active_downloads.clone(), + tx, )); wait_for_tracking_clear(id, &active_operations, &active_downloads).await; + assert!(recv_active_operations(&mut rx).await.is_empty()); } #[tokio::test] async fn disarmed_operation_guard_does_not_clean_tracking() { let id = "game-finished"; let (active_operations, active_downloads, _) = tracked_download_state(id); + let (tx, _rx) = mpsc::unbounded_channel(); OperationGuard::download( id.to_string(), active_operations.clone(), active_downloads.clone(), + tx, ) .disarm(); @@ -303,13 +336,19 @@ mod tests { let id = "game-aborted"; let (active_operations, active_downloads, _) = tracked_download_state(id); let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); + let (tx, mut rx) = mpsc::unbounded_channel(); let handle = tokio::spawn({ let active_operations = active_operations.clone(); let active_downloads = active_downloads.clone(); + let tx = tx.clone(); async move { - let _guard = - OperationGuard::download(id.to_string(), active_operations, active_downloads); + let _guard = OperationGuard::download( + id.to_string(), + active_operations, + active_downloads, + tx, + ); let _ = ready_tx.send(()); std::future::pending::<()>().await; } @@ -320,5 +359,34 @@ mod tests { let _ = handle.await; wait_for_tracking_clear(id, &active_operations, &active_downloads).await; + assert_eq!( + recv_active_operations(&mut rx).await, + Vec::::new() + ); + } + + #[tokio::test] + async fn operation_guard_cleanup_snapshot_keeps_other_operations() { + let active_operations = Arc::new(RwLock::new(HashMap::from([ + ("aborted".to_string(), OperationKind::Downloading), + ("other".to_string(), OperationKind::Installing), + ]))); + let active_downloads = Arc::new(RwLock::new(HashMap::new())); + let (tx, mut rx) = mpsc::unbounded_channel(); + + drop(OperationGuard::download( + "aborted".to_string(), + active_operations, + active_downloads, + tx, + )); + + assert_eq!( + recv_active_operations(&mut rx).await, + vec![ActiveOperation { + id: "other".to_string(), + operation: ActiveOperationKind::Installing, + }] + ); } } diff --git a/crates/lanspread-peer/src/events.rs b/crates/lanspread-peer/src/events.rs index 6166cae..32bd4f0 100644 --- a/crates/lanspread-peer/src/events.rs +++ b/crates/lanspread-peer/src/events.rs @@ -1,10 +1,16 @@ //! UI event helpers used by peer command and service code. -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; -use crate::{PeerEvent, peer_db::PeerGameDB}; +use crate::{ + ActiveOperation, + ActiveOperationKind, + PeerEvent, + context::OperationKind, + peer_db::PeerGameDB, +}; pub fn send(tx_notify_ui: &UnboundedSender, event: PeerEvent) { if let Err(err) = tx_notify_ui.send(event) { @@ -13,6 +19,49 @@ pub fn send(tx_notify_ui: &UnboundedSender, event: PeerEvent) { } } +pub(crate) fn active_operation_snapshot_from_map( + active_operations: &HashMap, +) -> Vec { + let mut snapshot = active_operations + .iter() + .map(|(id, operation)| ActiveOperation { + id: id.clone(), + operation: active_operation_kind(*operation), + }) + .collect::>(); + snapshot.sort_by(|left, right| left.id.cmp(&right.id)); + snapshot +} + +pub(crate) fn send_active_operations_snapshot( + tx_notify_ui: &UnboundedSender, + active_operations: &HashMap, +) { + send( + tx_notify_ui, + PeerEvent::ActiveOperationsChanged { + active_operations: active_operation_snapshot_from_map(active_operations), + }, + ); +} + +pub(crate) async fn emit_active_operations( + active_operations: &Arc>>, + tx_notify_ui: &UnboundedSender, +) { + let active_operations = active_operations.read().await; + send_active_operations_snapshot(tx_notify_ui, &active_operations); +} + +fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind { + match operation { + OperationKind::Downloading => ActiveOperationKind::Downloading, + OperationKind::Installing => ActiveOperationKind::Installing, + OperationKind::Updating => ActiveOperationKind::Updating, + OperationKind::Uninstalling => ActiveOperationKind::Uninstalling, + } +} + pub async fn emit_peer_game_list( peer_game_db: &Arc>, tx_notify_ui: &UnboundedSender, diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 16a6a5a..1443ef9 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -12,8 +12,6 @@ use lanspread_db::db::{GameDB, GameFileDescription}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ - ActiveOperation, - ActiveOperationKind, InstallOperation, PeerEvent, context::{Ctx, OperationGuard, OperationKind}, @@ -272,17 +270,9 @@ pub async fn handle_download_game_files_command( return; } - { - let mut in_progress = ctx.active_operations.write().await; - match in_progress.entry(id.clone()) { - Entry::Vacant(entry) => { - entry.insert(OperationKind::Downloading); - } - Entry::Occupied(_) => { - log::warn!("Operation for {id} already in progress; ignoring new download request"); - return; - } - } + if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { + log::warn!("Operation for {id} already in progress; ignoring new download request"); + return; } let active_operations = ctx.active_operations.clone(); @@ -298,8 +288,12 @@ pub async fn handle_download_game_files_command( .insert(id, cancel_token.clone()); ctx.task_tracker.spawn(async move { - let download_state_guard = - OperationGuard::download(download_id.clone(), active_operations, active_downloads); + let download_state_guard = OperationGuard::download( + download_id.clone(), + active_operations, + active_downloads, + tx_notify_ui_clone.clone(), + ); let result = download_game_files( &download_id, @@ -317,7 +311,7 @@ pub async fn handle_download_game_files_command( let Some(prepared) = prepare_install_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await else { - end_download_operation(&ctx_clone, &download_id).await; + end_download_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await; download_state_guard.disarm(); return; }; @@ -325,6 +319,7 @@ pub async fn handle_download_game_files_command( if install_after_download { if transition_download_to_install( &ctx_clone, + &tx_notify_ui_clone, &download_id, prepared.operation_kind, ) @@ -342,7 +337,7 @@ pub async fn handle_download_game_files_command( clear_active_download(&ctx_clone, &download_id).await; } } else { - end_download_operation(&ctx_clone, &download_id).await; + end_download_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await; if let Err(err) = refresh_local_game(&ctx_clone, &tx_notify_ui_clone, &download_id).await { @@ -352,7 +347,7 @@ pub async fn handle_download_game_files_command( download_state_guard.disarm(); } Err(e) => { - end_download_operation(&ctx_clone, &download_id).await; + end_download_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await; download_state_guard.disarm(); log::error!("Download failed for {download_id}: {e}"); } @@ -395,7 +390,7 @@ async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender bool { - let mut active_operations = ctx.active_operations.write().await; - match active_operations.entry(id.to_string()) { - Entry::Vacant(entry) => { - entry.insert(operation); - true +async fn begin_operation( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: &str, + operation: OperationKind, +) -> bool { + let started = { + let mut active_operations = ctx.active_operations.write().await; + match active_operations.entry(id.to_string()) { + Entry::Vacant(entry) => { + entry.insert(operation); + true + } + Entry::Occupied(_) => false, } - Entry::Occupied(_) => false, + }; + + if started { + events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await; } + + started } -async fn transition_download_to_install(ctx: &Ctx, id: &str, operation: OperationKind) -> bool { - let mut active_operations = ctx.active_operations.write().await; - match active_operations.get_mut(id) { - Some(current) if *current == OperationKind::Downloading => { - *current = operation; - true - } - Some(current) => { - log::warn!( - "Cannot transition {id} from download to install; current operation is {current:?}" - ); - false - } - None => { - log::warn!("Cannot transition {id} from download to install; operation is not active"); - false +async fn transition_download_to_install( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: &str, + operation: OperationKind, +) -> bool { + let transitioned = { + let mut active_operations = ctx.active_operations.write().await; + match active_operations.get_mut(id) { + Some(current) if *current == OperationKind::Downloading => { + *current = operation; + true + } + Some(current) => { + log::warn!( + "Cannot transition {id} from download to install; current operation is {current:?}" + ); + false + } + None => { + log::warn!( + "Cannot transition {id} from download to install; operation is not active" + ); + false + } } + }; + + if transitioned { + events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await; } + + transitioned } -async fn end_operation(ctx: &Ctx, id: &str) { - ctx.active_operations.write().await.remove(id); +async fn end_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: &str) { + if ctx.active_operations.write().await.remove(id).is_some() { + events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await; + } } async fn clear_active_download(ctx: &Ctx, id: &str) { ctx.active_downloads.write().await.remove(id); } -async fn end_download_operation(ctx: &Ctx, id: &str) { - end_operation(ctx, id).await; +async fn end_download_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: &str) { + end_operation(ctx, tx_notify_ui, id).await; clear_active_download(ctx, id).await; } @@ -636,7 +669,13 @@ pub async fn handle_set_game_dir_command( let ctx_clone = ctx.clone(); ctx.task_tracker.spawn(async move { - match load_local_library(&ctx_clone, &tx_notify_ui).await { + match load_local_library_with_policy( + &ctx_clone, + &tx_notify_ui, + LocalLibraryEventPolicy::ForceSnapshot, + ) + .await + { Ok(()) => log::info!("Local game database loaded successfully"), Err(e) => { log::error!("Failed to load local game database: {e}"); @@ -649,11 +688,19 @@ pub async fn handle_set_game_dir_command( pub async fn load_local_library( ctx: &Ctx, tx_notify_ui: &UnboundedSender, +) -> eyre::Result<()> { + load_local_library_with_policy(ctx, tx_notify_ui, LocalLibraryEventPolicy::OnChange).await +} + +async fn load_local_library_with_policy( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + event_policy: LocalLibraryEventPolicy, ) -> eyre::Result<()> { let game_dir = { ctx.game_dir.read().await.clone() }; let active_ids = active_operation_ids(ctx).await; install::recover_on_startup(&game_dir, &active_ids).await?; - scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir).await + scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir, event_policy).await } async fn refresh_local_library( @@ -661,17 +708,24 @@ async fn refresh_local_library( tx_notify_ui: &UnboundedSender, ) -> eyre::Result<()> { let game_dir = { ctx.game_dir.read().await.clone() }; - scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir).await + scan_and_announce_local_library( + ctx, + tx_notify_ui, + &game_dir, + LocalLibraryEventPolicy::OnChange, + ) + .await } async fn scan_and_announce_local_library( ctx: &Ctx, tx_notify_ui: &UnboundedSender, game_dir: &Path, + event_policy: LocalLibraryEventPolicy, ) -> eyre::Result<()> { let catalog = ctx.catalog.read().await.clone(); let scan = scan_local_library(game_dir, &catalog).await?; - update_and_announce_games(ctx, tx_notify_ui, scan).await; + update_and_announce_games_with_policy(ctx, tx_notify_ui, scan, event_policy).await; Ok(()) } @@ -683,10 +737,22 @@ async fn refresh_local_game( let game_dir = { ctx.game_dir.read().await.clone() }; let catalog = ctx.catalog.read().await.clone(); let scan = rescan_local_game(&game_dir, &catalog, id).await?; - update_and_announce_games(ctx, tx_notify_ui, scan).await; + update_and_announce_games_with_policy( + ctx, + tx_notify_ui, + scan, + LocalLibraryEventPolicy::OnChange, + ) + .await; Ok(()) } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum LocalLibraryEventPolicy { + OnChange, + ForceSnapshot, +} + async fn active_operation_ids(ctx: &Ctx) -> HashSet { ctx.active_operations.read().await.keys().cloned().collect() } @@ -722,6 +788,21 @@ pub async fn update_and_announce_games( ctx: &Ctx, tx_notify_ui: &UnboundedSender, scan: LocalLibraryScan, +) { + update_and_announce_games_with_policy( + ctx, + tx_notify_ui, + scan, + LocalLibraryEventPolicy::OnChange, + ) + .await; +} + +async fn update_and_announce_games_with_policy( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + scan: LocalLibraryScan, + event_policy: LocalLibraryEventPolicy, ) { let LocalLibraryScan { mut game_db, @@ -729,11 +810,11 @@ pub async fn update_and_announce_games( revision, } = scan; - let active_operations = active_operation_snapshot(ctx).await; - if !active_operations.is_empty() { + let active_operation_ids = active_operation_ids(ctx).await; + if !active_operation_ids.is_empty() { let previous = ctx.local_library.read().await.games.clone(); - for id in active_operations.iter().map(|operation| &operation.id) { - if let Some(summary) = previous.get(id) { + for id in &active_operation_ids { + if let Some(summary) = previous.get(id.as_str()) { summaries.insert(id.clone(), summary.clone()); } else { summaries.remove(id); @@ -754,11 +835,15 @@ pub async fn update_and_announce_games( let all_games = game_db.all_games().into_iter().cloned().collect::>(); - if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated { - games: all_games.clone(), - active_operations, - }) { - log::error!("Failed to send LocalGamesUpdated event: {e}"); + if delta.is_some() || event_policy == LocalLibraryEventPolicy::ForceSnapshot { + events::send( + tx_notify_ui, + PeerEvent::LocalLibraryChanged { + games: all_games.clone(), + }, + ); + } else { + log::debug!("Skipping unchanged local library event"); } let Some(delta) = delta else { @@ -784,28 +869,6 @@ pub async fn update_and_announce_games( } } -async fn active_operation_snapshot(ctx: &Ctx) -> Vec { - let active_operations = ctx.active_operations.read().await; - let mut snapshot = active_operations - .iter() - .map(|(id, operation)| ActiveOperation { - id: id.clone(), - operation: active_operation_kind(*operation), - }) - .collect::>(); - snapshot.sort_by(|left, right| left.id.cmp(&right.id)); - snapshot -} - -fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind { - match operation { - OperationKind::Downloading => ActiveOperationKind::Downloading, - OperationKind::Installing => ActiveOperationKind::Installing, - OperationKind::Updating => ActiveOperationKind::Updating, - OperationKind::Uninstalling => ActiveOperationKind::Uninstalling, - } -} - #[cfg(test)] mod tests { use std::{ @@ -821,7 +884,13 @@ mod tests { use tokio_util::{sync::CancellationToken, task::TaskTracker}; use super::*; - use crate::{UnpackFuture, Unpacker, test_support::TempDir}; + use crate::{ + ActiveOperation, + ActiveOperationKind, + UnpackFuture, + Unpacker, + test_support::TempDir, + }; struct FakeUnpacker; @@ -860,6 +929,15 @@ mod tests { .expect("event channel should remain open") } + async fn assert_no_event(rx: &mut mpsc::UnboundedReceiver) { + assert!( + tokio::time::timeout(Duration::from_millis(50), rx.recv()) + .await + .is_err(), + "event channel should stay quiet" + ); + } + fn addr(port: u16) -> SocketAddr { SocketAddr::from(([127, 0, 0, 1], port)) } @@ -895,17 +973,9 @@ mod tests { installed: bool, downloaded: bool, ) -> lanspread_db::db::Game { - let PeerEvent::LocalGamesUpdated { - games, - active_operations, - } = event - else { - panic!("expected LocalGamesUpdated"); + let PeerEvent::LocalLibraryChanged { games } = event else { + panic!("expected LocalLibraryChanged"); }; - assert!( - active_operations.is_empty(), - "settled local update should not report active operations" - ); let game = games .into_iter() .find(|game| game.id == "game") @@ -915,6 +985,20 @@ mod tests { game } + fn assert_active_update(event: PeerEvent, expected: Vec) { + let PeerEvent::ActiveOperationsChanged { active_operations } = event else { + panic!("expected ActiveOperationsChanged"); + }; + assert_eq!(active_operations, expected); + } + + fn active_update(id: &str, operation: ActiveOperationKind) -> Vec { + vec![ActiveOperation { + id: id.to_string(), + operation, + }] + } + #[test] fn update_source_selects_latest_ready_peer_manifest() { let old_addr = addr(12_000); @@ -1039,8 +1123,8 @@ mod tests { } #[tokio::test] - async fn local_games_update_reports_authoritative_active_operations() { - let temp = TempDir::new("lanspread-handler-active-snapshot"); + async fn local_library_scan_freezes_active_game_state() { + let temp = TempDir::new("lanspread-handler-active-freeze"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); @@ -1058,29 +1142,38 @@ mod tests { update_and_announce_games(&ctx, &tx, scan).await; - let PeerEvent::LocalGamesUpdated { - games, - active_operations, - } = recv_event(&mut rx).await - else { - panic!("expected LocalGamesUpdated"); + let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else { + panic!("expected LocalLibraryChanged"); }; assert!( games.is_empty(), "active game should keep its previous announced state" ); - assert_eq!( - active_operations, + } + + #[tokio::test] + async fn begin_operation_reports_authoritative_active_operation_snapshot() { + let temp = TempDir::new("lanspread-handler-active-begin"); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join("game.eti"), b"archive"); + + let ctx = test_ctx(temp.path().to_path_buf()); + let (tx, mut rx) = mpsc::unbounded_channel(); + + assert!(begin_operation(&ctx, &tx, "game", OperationKind::Updating).await); + assert_active_update( + recv_event(&mut rx).await, vec![ActiveOperation { id: "game".to_string(), - operation: ActiveOperationKind::Installing, - }] + operation: ActiveOperationKind::Updating, + }], ); } #[tokio::test] - async fn unchanged_scan_still_reports_active_operation_snapshot() { - let temp = TempDir::new("lanspread-handler-active-unchanged"); + async fn unchanged_settled_scan_is_not_reemitted() { + let temp = TempDir::new("lanspread-handler-settled-unchanged"); let root = temp.game_root(); write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("game.eti"), b"archive"); @@ -1095,28 +1188,50 @@ mod tests { update_and_announce_games(&ctx, &tx, scan).await; assert_local_update(recv_event(&mut rx).await, false, true); - ctx.active_operations - .write() - .await - .insert("game".to_string(), OperationKind::Updating); let scan = scan_local_library(temp.path(), &catalog) .await .expect("second scan should succeed"); update_and_announce_games(&ctx, &tx, scan).await; - let PeerEvent::LocalGamesUpdated { - active_operations, .. - } = recv_event(&mut rx).await - else { - panic!("expected LocalGamesUpdated"); - }; - assert_eq!( - active_operations, - vec![ActiveOperation { - id: "game".to_string(), - operation: ActiveOperationKind::Updating, - }] + assert_no_event(&mut rx).await; + } + + #[tokio::test] + async fn unchanged_operation_refresh_still_reports_settled_snapshot() { + let temp = TempDir::new("lanspread-handler-operation-unchanged"); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join("game.eti"), b"archive"); + write_file(&root.join("local").join("old.txt"), b"old"); + + let ctx = test_ctx(temp.path().to_path_buf()); + let (tx, mut rx) = mpsc::unbounded_channel(); + let catalog = ctx.catalog.read().await.clone(); + let scan = scan_local_library(temp.path(), &catalog) + .await + .expect("initial scan should succeed"); + update_and_announce_games(&ctx, &tx, scan).await; + assert_local_update(recv_event(&mut rx).await, true, true); + + run_install_operation(&ctx, &tx, "game".to_string()).await; + + assert_active_update( + recv_event(&mut rx).await, + active_update("game", ActiveOperationKind::Updating), ); + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::InstallGameBegin { + id, + operation: InstallOperation::Updating + } if id == "game" + )); + assert_active_update(recv_event(&mut rx).await, Vec::new()); + assert!(matches!( + recv_event(&mut rx).await, + PeerEvent::InstallGameFinished { id } if id == "game" + )); + assert_no_event(&mut rx).await; } #[tokio::test] @@ -1131,6 +1246,10 @@ mod tests { run_install_operation(&ctx, &tx, "game".to_string()).await; + assert_active_update( + recv_event(&mut rx).await, + active_update("game", ActiveOperationKind::Installing), + ); match recv_event(&mut rx).await { PeerEvent::InstallGameBegin { id, operation } => { assert_eq!(id, "game"); @@ -1138,6 +1257,7 @@ mod tests { } _ => panic!("expected InstallGameBegin"), } + assert_active_update(recv_event(&mut rx).await, Vec::new()); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" @@ -1174,7 +1294,8 @@ mod tests { let tx = tx.clone(); async move { assert!( - transition_download_to_install(&ctx, "game", prepared.operation_kind).await + transition_download_to_install(&ctx, &tx, "game", prepared.operation_kind) + .await ); clear_active_download(&ctx, "game").await; run_started_install_operation(&ctx, &tx, "game".to_string(), prepared).await; @@ -1186,6 +1307,10 @@ mod tests { drop(read_guard); install_task.await.expect("handoff task should finish"); + assert_active_update( + recv_event(&mut rx).await, + active_update("game", ActiveOperationKind::Installing), + ); match recv_event(&mut rx).await { PeerEvent::InstallGameBegin { id, operation } => { assert_eq!(id, "game"); @@ -1193,6 +1318,7 @@ mod tests { } _ => panic!("expected InstallGameBegin"), } + assert_active_update(recv_event(&mut rx).await, Vec::new()); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" @@ -1215,6 +1341,10 @@ mod tests { run_install_operation(&ctx, &tx, "game".to_string()).await; + assert_active_update( + recv_event(&mut rx).await, + active_update("game", ActiveOperationKind::Updating), + ); match recv_event(&mut rx).await { PeerEvent::InstallGameBegin { id, operation } => { assert_eq!(id, "game"); @@ -1222,6 +1352,7 @@ mod tests { } _ => panic!("expected InstallGameBegin"), } + assert_active_update(recv_event(&mut rx).await, Vec::new()); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" @@ -1241,6 +1372,10 @@ mod tests { let (tx, mut rx) = mpsc::unbounded_channel(); run_install_operation(&ctx, &tx, "game".to_string()).await; + assert_active_update( + recv_event(&mut rx).await, + active_update("game", ActiveOperationKind::Installing), + ); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameBegin { @@ -1248,6 +1383,7 @@ mod tests { operation: InstallOperation::Installing } if id == "game" )); + assert_active_update(recv_event(&mut rx).await, Vec::new()); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" @@ -1259,6 +1395,10 @@ mod tests { write_file(&root.join("game.eti"), b"new archive"); run_install_operation(&ctx, &tx, "game".to_string()).await; + assert_active_update( + recv_event(&mut rx).await, + active_update("game", ActiveOperationKind::Updating), + ); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameBegin { @@ -1266,6 +1406,7 @@ mod tests { operation: InstallOperation::Updating } if id == "game" )); + assert_active_update(recv_event(&mut rx).await, Vec::new()); assert!(matches!( recv_event(&mut rx).await, PeerEvent::InstallGameFinished { id } if id == "game" @@ -1274,10 +1415,15 @@ mod tests { assert_eq!(game.local_version.as_deref(), Some("20250101")); run_uninstall_operation(&ctx, &tx, "game".to_string()).await; + assert_active_update( + recv_event(&mut rx).await, + active_update("game", ActiveOperationKind::Uninstalling), + ); assert!(matches!( recv_event(&mut rx).await, PeerEvent::UninstallGameBegin { id } if id == "game" )); + assert_active_update(recv_event(&mut rx).await, Vec::new()); assert!(matches!( recv_event(&mut rx).await, PeerEvent::UninstallGameFinished { id } if id == "game" @@ -1300,10 +1446,15 @@ mod tests { run_uninstall_operation(&ctx, &tx, "game".to_string()).await; + assert_active_update( + recv_event(&mut rx).await, + active_update("game", ActiveOperationKind::Uninstalling), + ); assert!(matches!( recv_event(&mut rx).await, PeerEvent::UninstallGameBegin { id } if id == "game" )); + assert_active_update(recv_event(&mut rx).await, Vec::new()); assert!(matches!( recv_event(&mut rx).await, PeerEvent::UninstallGameFinished { id } if id == "game" @@ -1356,4 +1507,29 @@ mod tests { assert!(!next.game_root().join(".version.ini.tmp").exists()); } + + #[tokio::test] + async fn path_changing_set_game_dir_emits_equivalent_snapshot() { + let current = TempDir::new("lanspread-handler-old-equivalent-dir"); + let next = TempDir::new("lanspread-handler-new-equivalent-dir"); + for root in [current.game_root(), next.game_root()] { + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join("game.eti"), b"archive"); + } + + let ctx = test_ctx(current.path().to_path_buf()); + let (tx, mut rx) = mpsc::unbounded_channel(); + let catalog = ctx.catalog.read().await.clone(); + let scan = scan_local_library(current.path(), &catalog) + .await + .expect("initial scan should succeed"); + update_and_announce_games(&ctx, &tx, scan).await; + assert_local_update(recv_event(&mut rx).await, false, true); + + handle_set_game_dir_command(&ctx, &tx, next.path().to_path_buf()).await; + ctx.task_tracker.close(); + ctx.task_tracker.wait().await; + + assert_local_update(recv_event(&mut rx).await, false, true); + } } diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index e6c26ea..9c6996b 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -132,9 +132,10 @@ pub enum PeerEvent { PeerLost(SocketAddr), /// The total peer count has changed. PeerCountUpdated(usize), - /// Local games have been scanned, with authoritative in-progress work. - LocalGamesUpdated { - games: Vec, + /// The local library contents changed after a scan. + LocalLibraryChanged { games: Vec }, + /// The set of in-progress local operations changed. + ActiveOperationsChanged { active_operations: Vec, }, /// A required peer runtime component failed. @@ -168,7 +169,7 @@ pub enum InstallOperation { Updating, } -/// In-progress operation snapshot attached to local library updates. +/// In-progress operation snapshot sent when operation state changes. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ActiveOperation { pub id: String, diff --git a/crates/lanspread-peer/src/services/liveness.rs b/crates/lanspread-peer/src/services/liveness.rs index 408ff1e..f9dcc5b 100644 --- a/crates/lanspread-peer/src/services/liveness.rs +++ b/crates/lanspread-peer/src/services/liveness.rs @@ -202,12 +202,13 @@ async fn handle_active_downloads_without_peers( return; } + let mut changed = false; for id in active_ids { if peers_still_have_game(peer_game_db, &id).await { continue; } - active_operations.write().await.remove(&id); + changed |= active_operations.write().await.remove(&id).is_some(); let Some(cancel_token) = active_downloads.write().await.remove(&id) else { continue; }; @@ -215,9 +216,13 @@ async fn handle_active_downloads_without_peers( events::send( tx_notify_ui, - PeerEvent::DownloadGameFilesAllPeersGone { id }, + PeerEvent::DownloadGameFilesAllPeersGone { id: id.clone() }, ); } + + if changed { + events::emit_active_operations(active_operations, tx_notify_ui).await; + } } async fn peers_still_have_game(peer_game_db: &Arc>, game_id: &str) -> bool { @@ -233,10 +238,16 @@ mod tests { use tokio_util::sync::CancellationToken; use super::handle_active_downloads_without_peers; - use crate::{PeerEvent, context::OperationKind, peer_db::PeerGameDB}; + use crate::{ + ActiveOperation, + ActiveOperationKind, + PeerEvent, + context::OperationKind, + peer_db::PeerGameDB, + }; #[tokio::test] - async fn all_peers_gone_cancels_download_and_emits_only_peers_gone() { + async fn all_peers_gone_cancels_download_and_emits_peers_gone_then_active_snapshot() { let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let active_operations = Arc::new(RwLock::new(HashMap::from([( "game".to_string(), @@ -266,9 +277,17 @@ mod tests { event, PeerEvent::DownloadGameFilesAllPeersGone { id } if id == "game" )); + let event = rx + .recv() + .await + .expect("active operation snapshot should be emitted"); + assert!(matches!( + event, + PeerEvent::ActiveOperationsChanged { active_operations } if active_operations.is_empty() + )); assert!( rx.try_recv().is_err(), - "peers-gone cancellation must not emit a duplicate failure event" + "peers-gone cancellation must not emit extra events" ); } @@ -318,9 +337,21 @@ mod tests { } cancelled_ids.sort(); assert_eq!(cancelled_ids, vec!["first", "second"]); + let event = rx + .recv() + .await + .expect("active operation snapshot should be emitted"); + assert!(matches!( + event, + PeerEvent::ActiveOperationsChanged { active_operations } + if active_operations == vec![ActiveOperation { + id: "installing".to_string(), + operation: ActiveOperationKind::Installing, + }] + )); assert!( rx.try_recv().is_err(), - "multiple peers-gone cancellations must not emit duplicate failure events" + "multiple peers-gone cancellations must not emit extra events" ); } } diff --git a/crates/lanspread-peer/src/services/local_monitor.rs b/crates/lanspread-peer/src/services/local_monitor.rs index acf83c6..541fcce 100644 --- a/crates/lanspread-peer/src/services/local_monitor.rs +++ b/crates/lanspread-peer/src/services/local_monitor.rs @@ -391,19 +391,15 @@ mod tests { async fn recv_local_update( rx: &mut mpsc::UnboundedReceiver, - ) -> (Vec, Vec) { + ) -> Vec { let event = tokio::time::timeout(Duration::from_secs(1), rx.recv()) .await .expect("local update event should arrive") .expect("event channel should stay open"); - let PeerEvent::LocalGamesUpdated { - games, - active_operations, - } = event - else { - panic!("expected LocalGamesUpdated"); + let PeerEvent::LocalLibraryChanged { games } = event else { + panic!("expected LocalLibraryChanged"); }; - (games, active_operations) + games } #[test] @@ -537,7 +533,7 @@ mod tests { ctx.task_tracker.wait().await; let mut update_count = 0; - while let Ok(Some(PeerEvent::LocalGamesUpdated { .. })) = + while let Ok(Some(PeerEvent::LocalLibraryChanged { .. })) = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await { update_count += 1; @@ -560,8 +556,7 @@ mod tests { run_fallback_scan(&ctx, &tx).await; - let (games, active_operations) = recv_local_update(&mut rx).await; - assert!(active_operations.is_empty()); + let games = recv_local_update(&mut rx).await; let game = games .iter() .find(|game| game.id == "game") @@ -585,9 +580,12 @@ mod tests { run_fallback_scan(&ctx, &tx).await; - let (games, active_operations) = recv_local_update(&mut rx).await; - assert!(games.is_empty()); - assert!(active_operations.is_empty()); + assert!( + tokio::time::timeout(Duration::from_millis(50), rx.recv()) + .await + .is_err(), + "non-catalog scan should not emit a local library event" + ); let library = ctx.local_library.read().await; assert!(library.games.is_empty()); assert!(library.recent_deltas.is_empty()); diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index 0a5aa22..f4264b1 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -13,7 +13,6 @@ use lanspread_db::db::{Availability, Game, GameDB, GameFileDescription}; use lanspread_peer::{ ActiveOperation, ActiveOperationKind, - InstallOperation, PeerCommand, PeerEvent, PeerGameDB, @@ -747,17 +746,18 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { log::info!("PeerEvent::ListGames received"); update_game_db(games, app_handle.clone()).await; } - PeerEvent::LocalGamesUpdated { - games: local_games, - active_operations, - } => { - log::info!("PeerEvent::LocalGamesUpdated received"); + PeerEvent::LocalLibraryChanged { games: local_games } => { + log::info!("PeerEvent::LocalLibraryChanged received"); + update_local_games_in_db(local_games, app_handle.clone()).await; + } + PeerEvent::ActiveOperationsChanged { active_operations } => { + log::info!("PeerEvent::ActiveOperationsChanged received"); + let state = app_handle.state::(); { - let state = app_handle.state::(); let mut ui_active_operations = state.active_operations.write().await; reconcile_active_operations(&mut ui_active_operations, &active_operations); } - update_local_games_in_db(local_games, app_handle.clone()).await; + emit_games_list(app_handle).await; } PeerEvent::GotGameFiles { id, @@ -773,21 +773,9 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { &id, "PeerEvent::NoPeersHaveGame", ); - app_handle - .state::() - .active_operations - .write() - .await - .remove(&id); } PeerEvent::DownloadGameFilesBegin { id } => { log::info!("PeerEvent::DownloadGameFilesBegin received"); - app_handle - .state::() - .active_operations - .write() - .await - .insert(id.clone(), UiOperationKind::Downloading); emit_game_id_event( app_handle, "game-download-begin", @@ -808,7 +796,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { ); } PeerEvent::DownloadGameFilesFinished { id } => { - handle_download_finished(app_handle, id).await; + handle_download_finished(app_handle, id); } PeerEvent::DownloadGameFilesFailed { id } => { log::warn!("PeerEvent::DownloadGameFilesFailed received"); @@ -818,12 +806,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { &id, "PeerEvent::DownloadGameFilesFailed", ); - app_handle - .state::() - .active_operations - .write() - .await - .remove(&id); } PeerEvent::DownloadGameFilesAllPeersGone { id } => { log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}"); @@ -833,26 +815,10 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { &id, "PeerEvent::DownloadGameFilesAllPeersGone", ); - app_handle - .state::() - .active_operations - .write() - .await - .remove(&id); } PeerEvent::InstallGameBegin { id, operation } => { let operation_name: &'static str = (&operation).into(); log::info!("PeerEvent::InstallGameBegin received for {id}: {operation_name}"); - let ui_operation = match operation { - InstallOperation::Installing => UiOperationKind::Installing, - InstallOperation::Updating => UiOperationKind::Updating, - }; - app_handle - .state::() - .active_operations - .write() - .await - .insert(id.clone(), ui_operation); emit_game_id_event( app_handle, "game-install-begin", @@ -862,12 +828,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::InstallGameFinished { id } => { log::info!("PeerEvent::InstallGameFinished received for {id}"); - app_handle - .state::() - .active_operations - .write() - .await - .remove(&id); emit_game_id_event( app_handle, "game-install-finished", @@ -877,12 +837,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::InstallGameFailed { id } => { log::warn!("PeerEvent::InstallGameFailed received for {id}"); - app_handle - .state::() - .active_operations - .write() - .await - .remove(&id); emit_game_id_event( app_handle, "game-install-failed", @@ -892,12 +846,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::UninstallGameBegin { id } => { log::info!("PeerEvent::UninstallGameBegin received for {id}"); - app_handle - .state::() - .active_operations - .write() - .await - .insert(id.clone(), UiOperationKind::Uninstalling); emit_game_id_event( app_handle, "game-uninstall-begin", @@ -907,12 +855,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::UninstallGameFinished { id } => { log::info!("PeerEvent::UninstallGameFinished received for {id}"); - app_handle - .state::() - .active_operations - .write() - .await - .remove(&id); emit_game_id_event( app_handle, "game-uninstall-finished", @@ -922,12 +864,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { } PeerEvent::UninstallGameFailed { id } => { log::warn!("PeerEvent::UninstallGameFailed received for {id}"); - app_handle - .state::() - .active_operations - .write() - .await - .remove(&id); emit_game_id_event( app_handle, "game-uninstall-failed", @@ -995,7 +931,7 @@ async fn handle_got_game_files( } } -async fn handle_download_finished(app_handle: &AppHandle, id: String) { +fn handle_download_finished(app_handle: &AppHandle, id: String) { log::info!("PeerEvent::DownloadGameFilesFinished received"); emit_game_id_event( app_handle, @@ -1003,13 +939,6 @@ async fn handle_download_finished(app_handle: &AppHandle, id: String) { &id, "PeerEvent::DownloadGameFilesFinished", ); - - app_handle - .state::() - .active_operations - .write() - .await - .remove(&id); } #[cfg(test)]