refactor(peer): split local library and operation UI events

Replace the `a9f9845` local-update dedup cache with explicit peer event
semantics. Local scans now emit `LocalLibraryChanged` when the library changes,
while operation mutations emit `ActiveOperationsChanged` from the mutation
path. Tauri keeps joining those facts into the existing `games-list-updated`
payload, so the frontend contract stays stable.

This removes the cache/invalidation coupling between scan emission and
operation state. The remaining forced local snapshot is explicit: accepted game
directory changes can refresh the UI for an equivalent new path without sending
a peer library delta.

Operation guard cleanup and liveness cancellation now publish the same active
operation snapshot as normal command-handler transitions. The peer CLI JSONL
events follow the same split with `local-library-changed` and
`active-operations-changed`.

Test Plan:
- `just fmt`
- `CARGO_BUILD_RUSTC_WRAPPER= just test`
- `CARGO_BUILD_RUSTC_WRAPPER= just clippy`
- `git diff --check`

Refs: CLEAN_CODE_PLAN_1.md
This commit is contained in:
2026-05-18 21:25:20 +02:00
parent be00a7a298
commit 41e9a0efc1
14 changed files with 657 additions and 255 deletions
+76
View File
@@ -0,0 +1,76 @@
# Clean code notes
Running notes on architectural smells in the codebase: things that work today
but are shaped wrong, and what a cleaner design would look like. Each entry
should explain the smell, why the current shape exists, and the warning sign
that says "now is the time to refactor."
---
## Resolved: local library and operation UI signals are split
**Context.** Commit `a9f9845` ("fix(peer): suppress duplicate local game
updates") added a `last_local_update_key` cache in `Ctx`, keyed on
`(revision, digest, active_operations)`. That worked, but it made scan emission
responsible for deduplicating operation-state changes that were actually owned
by command handlers.
### The root issue
`update_and_announce_games` already knows whether the library actually changed:
`LocalLibraryState::update_from_scan` returns `Option<LibraryDelta>`, where
`None` means "nothing changed." That fact gates peer `LibraryDelta`
announcements. Active operation status has a different source of truth:
`Ctx::active_operations`, mutated by operation start, handoff, end, liveness
cancellation, and guard cleanup.
The old `LocalGamesUpdated { games, active_operations }` event mixed those two
sources of truth. The dedup key was a symptom of that overload.
### Current shape
The peer runtime now emits two separate facts:
- `LocalLibraryChanged { games }` is emitted from scans when the local library
state changes. A real `SetGameDir` path change may force one local snapshot
for the UI even when the library digest matches the previous path, because
Tauri has already cleared local flags for the old path.
- `ActiveOperationsChanged { active_operations }` is emitted when the operation
table changes. Normal mutations go through `begin_operation`,
`transition_download_to_install`, and `end_operation`; liveness cancellation
and `OperationGuard` cleanup publish the same snapshot when they clear state.
Tauri is the join boundary. It stores the latest game DB and latest active
operation snapshot, then keeps emitting the existing frontend
`games-list-updated` payload. The frontend did not need to learn the peer
runtime's internal event split.
### Invariants to protect
Do not reintroduce a scan-level dedup cache for operation state. If a new path
mutates `Ctx::active_operations`, route it through the operation publisher or
explicitly document why it is not UI-visible. If a new local scan reason needs a
UI snapshot without a peer delta, model that as an explicit scan policy like the
path-change forced snapshot, not as cache invalidation.
---
## How to add to this file
When a code review uncovers a "this works but it's bolted on" pattern, write it
up here. Structure:
1. **Context** — what commit/PR introduced the pattern, one-paragraph summary.
2. **Root issue** — the underlying invariant the code is working around.
3. **Why the obvious fix doesn't work** — what constraints forced the current
shape.
4. **The tell** — concrete code shapes (scattered invalidations, repeated
special cases, dedup keys that re-derive existing facts) that signal the
smell.
5. **Clean shape** — what the code would look like without the constraint.
6. **Warning signs** — what observations in future work mean "do the
refactor now."
Keep entries narrative, not bulleted to death. The point is to preserve the
_reasoning_ so future contributors can decide whether the trade-off still
holds.
+70
View File
@@ -0,0 +1,70 @@
# Clean Code Plan 1: Split Local Library and Operation UI Signals
## Goal
Replace the `a9f9845` local-update dedup cache with explicit event semantics.
The peer runtime should report local library changes and active operation
changes as separate facts, while the Tauri layer keeps joining those facts into
the existing `games-list-updated` payload for the frontend.
## Architectural Picture
The current overloaded shape makes `LocalGamesUpdated` carry two independent
signals:
- local library contents, whose source of truth is `LocalLibraryState` and
`LocalLibraryState::update_from_scan`;
- active operation status, whose source of truth is `Ctx::active_operations`.
The clean boundary is:
- peer scanning emits a local-library event when the scanned library state
changes, with an explicit force policy for accepted path changes where the UI
needs a fresh snapshot but peers do not need a delta;
- operation-state mutation emits an operation snapshot when the mutation
happens;
- Tauri owns UI joining: it stores the latest catalog/local games and latest
operation snapshot, then emits `games-list-updated` for the frontend.
That keeps the frontend contract stable while removing the cross-cutting cache
and every manual invalidation call.
## Implementation Steps
1. Remove commit `a9f9845` from the local branch history before implementing
the replacement, so the final code is not built on the band-aid.
2. Replace `PeerEvent::LocalGamesUpdated { games, active_operations }` with:
- `PeerEvent::LocalLibraryChanged { games }`;
- `PeerEvent::ActiveOperationsChanged { active_operations }`.
3. Add one operation-snapshot publisher near the peer event helpers. All normal
operation mutations must go through helpers that mutate
`Ctx::active_operations` and then emit `ActiveOperationsChanged`.
4. Make `OperationGuard` publish an operation snapshot when it performs
exceptional cleanup on drop, so cancellation or aborted tasks do not leave UI
state stale.
5. Keep the existing scan behavior that freezes active game summaries while an
operation is running, but emit `LocalLibraryChanged` only when
`update_from_scan` returns a real delta or the scan was explicitly forced by
an accepted path change.
6. Update the Tauri event loop to reconcile `ActiveOperationsChanged`
independently, and call `emit_games_list` after both library and operation
state changes.
7. Update focused tests in peer handlers, local monitor, liveness, context guard,
and Tauri reconciliation to prove:
- unchanged settled scans do not emit local-library events;
- operation starts/transitions/ends emit authoritative snapshots;
- exceptional guard cleanup clears the operation snapshot;
- Tauri still emits the same `games-list-updated` UI payload.
8. Update `CLEAN_CODE.md`, `crates/lanspread-peer/ARCHITECTURE.md`, and
`crates/lanspread-peer/README.md` so the docs describe the new shape rather
than the dedup warning.
## Review Gates
- No `last_local_update_key`, `LocalUpdateKey`, or invalidate helper remains.
- No operation-state mutation that should be visible to the UI bypasses the
snapshot publisher.
- The peer event names reflect domain facts, not UI implementation details.
- Tauri remains the compatibility boundary for the frontend payload.
- Verification runs through `just fmt`, `just test`, `just clippy`, and
`git diff --check`.
+1 -1
View File
@@ -32,5 +32,5 @@
rendering follows backend state instead of reverse-engineering it from rendering follows backend state instead of reverse-engineering it from
`installed && !downloaded`. `installed && !downloaded`.
- Removed Tauri's parallel whole-library filesystem scan. The UI database keeps - Removed Tauri's parallel whole-library filesystem scan. The UI database keeps
bundled catalog metadata, while peer `LocalGamesUpdated` snapshots now own bundled catalog metadata, while peer `LocalLibraryChanged` snapshots now own
`downloaded`, `installed`, `local_version`, and `availability`. `downloaded`, `installed`, `local_version`, and `availability`.
+1 -1
View File
@@ -8,7 +8,7 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path.
| ID | Scenario | Setup | Expected result | | ID | Scenario | Setup | Expected result |
| --- | --- | --- | --- | | --- | --- | --- | --- |
| S1 | Startup scan | Start one peer with `fixture-alpha`. | Peer emits `local-peer-ready` and `local-games-updated`; catalog fixture games are `downloaded=true`, `installed=false`, `availability=Ready`. | | S1 | Startup scan | Start one peer with `fixture-alpha`. | Peer emits `local-peer-ready` and `local-library-changed`; catalog fixture games are `downloaded=true`, `installed=false`, `availability=Ready`. |
| S2 | Direct connect handshake | Start alpha and bravo, send alpha `connect` to bravo's ready address. | Both peers record one remote peer, no self-peer entry appears, and each peer receives the other's library. | | S2 | Direct connect handshake | Start alpha and bravo, send alpha `connect` to bravo's ready address. | Both peers record one remote peer, no self-peer entry appears, and each peer receives the other's library. |
| S3 | Remote aggregation | Empty client connects to alpha and bravo. | `list-games` shows remote-only games once; shared `ggoo` has `peer_count=2`, unique games have `peer_count=1`. | | S3 | Remote aggregation | Empty client connects to alpha and bravo. | `list-games` shows remote-only games once; shared `ggoo` has `peer_count=2`, unique games have `peer_count=1`. |
| S4 | Single-source download, no install | Empty client connected to bravo downloads `bfbc2` with `install=false`. | Client emits `got-game-files`, `download-begin`, `download-finished`, then local `bfbc2` is `downloaded=true`, `installed=false`; root files exist and `local/` does not. | | S4 | Single-source download, no install | Empty client connected to bravo downloads `bfbc2` with `install=false`. | Client emits `got-game-files`, `download-begin`, `download-finished`, then local `bfbc2` is `downloaded=true`, `installed=false`; root files exist and `local/` does not. |
+7 -9
View File
@@ -359,19 +359,17 @@ async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'s
shared.state.write().await.remote_games = games.clone(); shared.state.write().await.remote_games = games.clone();
("list-games", json!({ "games": games })) ("list-games", json!({ "games": games }))
} }
PeerEvent::LocalGamesUpdated { PeerEvent::LocalLibraryChanged { games } => {
games,
active_operations,
} => {
let mut state = shared.state.write().await; let mut state = shared.state.write().await;
state.local_games.clone_from(&games); state.local_games.clone_from(&games);
("local-library-changed", json!({ "games": games }))
}
PeerEvent::ActiveOperationsChanged { active_operations } => {
let mut state = shared.state.write().await;
state.active_operations.clone_from(&active_operations); state.active_operations.clone_from(&active_operations);
( (
"local-games-updated", "active-operations-changed",
json!({ json!({ "active_operations": active_operations_json(&active_operations) }),
"games": games,
"active_operations": active_operations_json(&active_operations),
}),
) )
} }
PeerEvent::GotGameFiles { PeerEvent::GotGameFiles {
+5
View File
@@ -77,6 +77,11 @@ When a peer is discovered:
- an active operation lock drops events for that game; - an active operation lock drops events for that game;
- a rescan already running for the ID sets a rescan-pending flag; - a rescan already running for the ID sets a rescan-pending flag;
- the running rescan loops once more when that flag was set. - the running rescan loops once more when that flag was set.
- Local library scans emit `LocalLibraryChanged` only for real library changes,
except that accepted game-directory changes can force a UI snapshot for the
new path without sending a peer delta.
- Active operation mutations emit `ActiveOperationsChanged` from the mutation
path instead of riding on local library scans.
- Send `LibraryDelta` to known peers; send `LibrarySummary` on new connections. - Send `LibraryDelta` to known peers; send `LibrarySummary` on new connections.
## Local game scanning: fast and low cost ## Local game scanning: fast and low cost
+2 -1
View File
@@ -37,7 +37,8 @@ lifetime of the process:
to keep peer liveness up to date and prunes stale entries from `PeerGameDB`. to keep peer liveness up to date and prunes stale entries from `PeerGameDB`.
4. **Local game monitor** (`run_local_game_monitor`) watches the configured 4. **Local game monitor** (`run_local_game_monitor`) watches the configured
game directory and each game root non-recursively, gates per-ID rescans while game directory and each game root non-recursively, gates per-ID rescans while
operations are active, and runs a 300-second fallback scan for missed events. operations are active, emits local-library changes separately from active
operation snapshots, and runs a 300-second fallback scan for missed events.
`scan_local_library` maintains a lightweight on-disk index and produces both a `scan_local_library` maintains a lightweight on-disk index and produces both a
`GameDB` and protocol summaries. A game is downloaded only when its root-level `GameDB` and protocol summaries. A game is downloaded only when its root-level
+75 -7
View File
@@ -8,10 +8,10 @@ use std::{
}; };
use lanspread_db::db::GameDB; use lanspread_db::db::GameDB;
use tokio::sync::RwLock; use tokio::sync::{RwLock, mpsc::UnboundedSender};
use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tokio_util::{sync::CancellationToken, task::TaskTracker};
use crate::{PeerEvent, Unpacker, library::LocalLibraryState, peer_db::PeerGameDB}; use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB};
/// Mutating filesystem operation currently in flight for a game root. /// Mutating filesystem operation currently in flight for a game root.
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -124,6 +124,7 @@ pub(crate) struct OperationGuard {
id: String, id: String,
active_operations: Arc<RwLock<HashMap<String, OperationKind>>>, active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>, active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
clears_download: bool, clears_download: bool,
armed: bool, armed: bool,
} }
@@ -132,11 +133,13 @@ impl OperationGuard {
pub(crate) fn new( pub(crate) fn new(
id: String, id: String,
active_operations: Arc<RwLock<HashMap<String, OperationKind>>>, active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> Self { ) -> Self {
Self { Self {
id, id,
active_operations, active_operations,
active_downloads: Arc::new(RwLock::new(HashMap::new())), active_downloads: Arc::new(RwLock::new(HashMap::new())),
tx_notify_ui,
clears_download: false, clears_download: false,
armed: true, armed: true,
} }
@@ -146,11 +149,13 @@ impl OperationGuard {
id: String, id: String,
active_operations: Arc<RwLock<HashMap<String, OperationKind>>>, active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>, active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> Self { ) -> Self {
Self { Self {
id, id,
active_operations, active_operations,
active_downloads, active_downloads,
tx_notify_ui,
clears_download: true, clears_download: true,
armed: true, armed: true,
} }
@@ -173,13 +178,19 @@ impl Drop for OperationGuard {
); );
if let Ok(mut guard) = self.active_operations.try_write() { if let Ok(mut guard) = self.active_operations.try_write() {
guard.remove(&id); if guard.remove(&id).is_some() {
events::send_active_operations_snapshot(&self.tx_notify_ui, &guard);
}
} else if let Ok(handle) = tokio::runtime::Handle::try_current() { } else if let Ok(handle) = tokio::runtime::Handle::try_current() {
let active_operations = self.active_operations.clone(); let active_operations = self.active_operations.clone();
let tx_notify_ui = self.tx_notify_ui.clone();
handle.spawn({ handle.spawn({
let id = id.clone(); let id = id.clone();
async move { async move {
active_operations.write().await.remove(&id); let mut active_operations = active_operations.write().await;
if active_operations.remove(&id).is_some() {
events::send_active_operations_snapshot(&tx_notify_ui, &active_operations);
}
} }
}); });
} else { } else {
@@ -210,10 +221,11 @@ impl Drop for OperationGuard {
mod tests { mod tests {
use std::{collections::HashMap, sync::Arc, time::Duration}; use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock; use tokio::sync::{RwLock, mpsc};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use super::{OperationGuard, OperationKind}; use super::{OperationGuard, OperationKind};
use crate::{ActiveOperation, ActiveOperationKind, PeerEvent};
type OperationTracking = ( type OperationTracking = (
Arc<RwLock<HashMap<String, OperationKind>>>, Arc<RwLock<HashMap<String, OperationKind>>>,
@@ -253,18 +265,34 @@ mod tests {
(active_operations, active_downloads, cancel) (active_operations, active_downloads, cancel)
} }
async fn recv_active_operations(
rx: &mut mpsc::UnboundedReceiver<PeerEvent>,
) -> Vec<ActiveOperation> {
let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("active operation event should arrive")
.expect("event channel should remain open");
let PeerEvent::ActiveOperationsChanged { active_operations } = event else {
panic!("expected ActiveOperationsChanged");
};
active_operations
}
#[tokio::test] #[tokio::test]
async fn operation_guard_cleans_tracking_when_not_disarmed() { async fn operation_guard_cleans_tracking_when_not_disarmed() {
let id = "game-complete"; let id = "game-complete";
let (active_operations, active_downloads, _) = tracked_download_state(id); let (active_operations, active_downloads, _) = tracked_download_state(id);
let (tx, mut rx) = mpsc::unbounded_channel();
drop(OperationGuard::download( drop(OperationGuard::download(
id.to_string(), id.to_string(),
active_operations.clone(), active_operations.clone(),
active_downloads.clone(), active_downloads.clone(),
tx,
)); ));
wait_for_tracking_clear(id, &active_operations, &active_downloads).await; wait_for_tracking_clear(id, &active_operations, &active_downloads).await;
assert!(recv_active_operations(&mut rx).await.is_empty());
} }
#[tokio::test] #[tokio::test]
@@ -272,25 +300,30 @@ mod tests {
let id = "game-cancelled"; let id = "game-cancelled";
let (active_operations, active_downloads, cancel) = tracked_download_state(id); let (active_operations, active_downloads, cancel) = tracked_download_state(id);
cancel.cancel(); cancel.cancel();
let (tx, mut rx) = mpsc::unbounded_channel();
drop(OperationGuard::download( drop(OperationGuard::download(
id.to_string(), id.to_string(),
active_operations.clone(), active_operations.clone(),
active_downloads.clone(), active_downloads.clone(),
tx,
)); ));
wait_for_tracking_clear(id, &active_operations, &active_downloads).await; wait_for_tracking_clear(id, &active_operations, &active_downloads).await;
assert!(recv_active_operations(&mut rx).await.is_empty());
} }
#[tokio::test] #[tokio::test]
async fn disarmed_operation_guard_does_not_clean_tracking() { async fn disarmed_operation_guard_does_not_clean_tracking() {
let id = "game-finished"; let id = "game-finished";
let (active_operations, active_downloads, _) = tracked_download_state(id); let (active_operations, active_downloads, _) = tracked_download_state(id);
let (tx, _rx) = mpsc::unbounded_channel();
OperationGuard::download( OperationGuard::download(
id.to_string(), id.to_string(),
active_operations.clone(), active_operations.clone(),
active_downloads.clone(), active_downloads.clone(),
tx,
) )
.disarm(); .disarm();
@@ -303,13 +336,19 @@ mod tests {
let id = "game-aborted"; let id = "game-aborted";
let (active_operations, active_downloads, _) = tracked_download_state(id); let (active_operations, active_downloads, _) = tracked_download_state(id);
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
let (tx, mut rx) = mpsc::unbounded_channel();
let handle = tokio::spawn({ let handle = tokio::spawn({
let active_operations = active_operations.clone(); let active_operations = active_operations.clone();
let active_downloads = active_downloads.clone(); let active_downloads = active_downloads.clone();
let tx = tx.clone();
async move { async move {
let _guard = let _guard = OperationGuard::download(
OperationGuard::download(id.to_string(), active_operations, active_downloads); id.to_string(),
active_operations,
active_downloads,
tx,
);
let _ = ready_tx.send(()); let _ = ready_tx.send(());
std::future::pending::<()>().await; std::future::pending::<()>().await;
} }
@@ -320,5 +359,34 @@ mod tests {
let _ = handle.await; let _ = handle.await;
wait_for_tracking_clear(id, &active_operations, &active_downloads).await; wait_for_tracking_clear(id, &active_operations, &active_downloads).await;
assert_eq!(
recv_active_operations(&mut rx).await,
Vec::<ActiveOperation>::new()
);
}
#[tokio::test]
async fn operation_guard_cleanup_snapshot_keeps_other_operations() {
let active_operations = Arc::new(RwLock::new(HashMap::from([
("aborted".to_string(), OperationKind::Downloading),
("other".to_string(), OperationKind::Installing),
])));
let active_downloads = Arc::new(RwLock::new(HashMap::new()));
let (tx, mut rx) = mpsc::unbounded_channel();
drop(OperationGuard::download(
"aborted".to_string(),
active_operations,
active_downloads,
tx,
));
assert_eq!(
recv_active_operations(&mut rx).await,
vec![ActiveOperation {
id: "other".to_string(),
operation: ActiveOperationKind::Installing,
}]
);
} }
} }
+51 -2
View File
@@ -1,10 +1,16 @@
//! UI event helpers used by peer command and service code. //! UI event helpers used by peer command and service code.
use std::{net::SocketAddr, sync::Arc}; use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio::sync::{RwLock, mpsc::UnboundedSender};
use crate::{PeerEvent, peer_db::PeerGameDB}; use crate::{
ActiveOperation,
ActiveOperationKind,
PeerEvent,
context::OperationKind,
peer_db::PeerGameDB,
};
pub fn send(tx_notify_ui: &UnboundedSender<PeerEvent>, event: PeerEvent) { pub fn send(tx_notify_ui: &UnboundedSender<PeerEvent>, event: PeerEvent) {
if let Err(err) = tx_notify_ui.send(event) { if let Err(err) = tx_notify_ui.send(event) {
@@ -13,6 +19,49 @@ pub fn send(tx_notify_ui: &UnboundedSender<PeerEvent>, event: PeerEvent) {
} }
} }
pub(crate) fn active_operation_snapshot_from_map(
active_operations: &HashMap<String, OperationKind>,
) -> Vec<ActiveOperation> {
let mut snapshot = active_operations
.iter()
.map(|(id, operation)| ActiveOperation {
id: id.clone(),
operation: active_operation_kind(*operation),
})
.collect::<Vec<_>>();
snapshot.sort_by(|left, right| left.id.cmp(&right.id));
snapshot
}
pub(crate) fn send_active_operations_snapshot(
tx_notify_ui: &UnboundedSender<PeerEvent>,
active_operations: &HashMap<String, OperationKind>,
) {
send(
tx_notify_ui,
PeerEvent::ActiveOperationsChanged {
active_operations: active_operation_snapshot_from_map(active_operations),
},
);
}
pub(crate) async fn emit_active_operations(
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) {
let active_operations = active_operations.read().await;
send_active_operations_snapshot(tx_notify_ui, &active_operations);
}
fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind {
match operation {
OperationKind::Downloading => ActiveOperationKind::Downloading,
OperationKind::Installing => ActiveOperationKind::Installing,
OperationKind::Updating => ActiveOperationKind::Updating,
OperationKind::Uninstalling => ActiveOperationKind::Uninstalling,
}
}
pub async fn emit_peer_game_list( pub async fn emit_peer_game_list(
peer_game_db: &Arc<RwLock<PeerGameDB>>, peer_game_db: &Arc<RwLock<PeerGameDB>>,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
+282 -106
View File
@@ -12,8 +12,6 @@ use lanspread_db::db::{GameDB, GameFileDescription};
use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio::sync::{RwLock, mpsc::UnboundedSender};
use crate::{ use crate::{
ActiveOperation,
ActiveOperationKind,
InstallOperation, InstallOperation,
PeerEvent, PeerEvent,
context::{Ctx, OperationGuard, OperationKind}, context::{Ctx, OperationGuard, OperationKind},
@@ -272,18 +270,10 @@ pub async fn handle_download_game_files_command(
return; return;
} }
{ if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
let mut in_progress = ctx.active_operations.write().await;
match in_progress.entry(id.clone()) {
Entry::Vacant(entry) => {
entry.insert(OperationKind::Downloading);
}
Entry::Occupied(_) => {
log::warn!("Operation for {id} already in progress; ignoring new download request"); log::warn!("Operation for {id} already in progress; ignoring new download request");
return; return;
} }
}
}
let active_operations = ctx.active_operations.clone(); let active_operations = ctx.active_operations.clone();
let active_downloads = ctx.active_downloads.clone(); let active_downloads = ctx.active_downloads.clone();
@@ -298,8 +288,12 @@ pub async fn handle_download_game_files_command(
.insert(id, cancel_token.clone()); .insert(id, cancel_token.clone());
ctx.task_tracker.spawn(async move { ctx.task_tracker.spawn(async move {
let download_state_guard = let download_state_guard = OperationGuard::download(
OperationGuard::download(download_id.clone(), active_operations, active_downloads); download_id.clone(),
active_operations,
active_downloads,
tx_notify_ui_clone.clone(),
);
let result = download_game_files( let result = download_game_files(
&download_id, &download_id,
@@ -317,7 +311,7 @@ pub async fn handle_download_game_files_command(
let Some(prepared) = let Some(prepared) =
prepare_install_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await prepare_install_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await
else { else {
end_download_operation(&ctx_clone, &download_id).await; end_download_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await;
download_state_guard.disarm(); download_state_guard.disarm();
return; return;
}; };
@@ -325,6 +319,7 @@ pub async fn handle_download_game_files_command(
if install_after_download { if install_after_download {
if transition_download_to_install( if transition_download_to_install(
&ctx_clone, &ctx_clone,
&tx_notify_ui_clone,
&download_id, &download_id,
prepared.operation_kind, prepared.operation_kind,
) )
@@ -342,7 +337,7 @@ pub async fn handle_download_game_files_command(
clear_active_download(&ctx_clone, &download_id).await; clear_active_download(&ctx_clone, &download_id).await;
} }
} else { } else {
end_download_operation(&ctx_clone, &download_id).await; end_download_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await;
if let Err(err) = if let Err(err) =
refresh_local_game(&ctx_clone, &tx_notify_ui_clone, &download_id).await refresh_local_game(&ctx_clone, &tx_notify_ui_clone, &download_id).await
{ {
@@ -352,7 +347,7 @@ pub async fn handle_download_game_files_command(
download_state_guard.disarm(); download_state_guard.disarm();
} }
Err(e) => { Err(e) => {
end_download_operation(&ctx_clone, &download_id).await; end_download_operation(&ctx_clone, &tx_notify_ui_clone, &download_id).await;
download_state_guard.disarm(); download_state_guard.disarm();
log::error!("Download failed for {download_id}: {e}"); log::error!("Download failed for {download_id}: {e}");
} }
@@ -395,7 +390,7 @@ async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
return; return;
}; };
if !begin_operation(ctx, &id, prepared.operation_kind).await { if !begin_operation(ctx, tx_notify_ui, &id, prepared.operation_kind).await {
log::warn!("Operation for {id} already in progress; ignoring install command"); log::warn!("Operation for {id} already in progress; ignoring install command");
return; return;
} }
@@ -459,7 +454,11 @@ async fn run_started_install_operation(
.. ..
} = prepared; } = prepared;
let operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone()); let operation_guard = OperationGuard::new(
id.clone(),
ctx.active_operations.clone(),
tx_notify_ui.clone(),
);
let result = { let result = {
events::send( events::send(
tx_notify_ui, tx_notify_ui,
@@ -478,7 +477,7 @@ async fn run_started_install_operation(
} }
} }
}; };
end_operation(ctx, &id).await; end_operation(ctx, tx_notify_ui, &id).await;
operation_guard.disarm(); operation_guard.disarm();
match result { match result {
@@ -510,13 +509,17 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerE
return; return;
} }
if !begin_operation(ctx, &id, OperationKind::Uninstalling).await { if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Uninstalling).await {
log::warn!("Operation for {id} already in progress; ignoring uninstall command"); log::warn!("Operation for {id} already in progress; ignoring uninstall command");
return; return;
} }
let game_root = { ctx.game_dir.read().await.join(&id) }; let game_root = { ctx.game_dir.read().await.join(&id) };
let operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone()); let operation_guard = OperationGuard::new(
id.clone(),
ctx.active_operations.clone(),
tx_notify_ui.clone(),
);
let result = { let result = {
events::send( events::send(
tx_notify_ui, tx_notify_ui,
@@ -525,7 +528,7 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerE
install::uninstall(&game_root, &id).await install::uninstall(&game_root, &id).await
}; };
end_operation(ctx, &id).await; end_operation(ctx, tx_notify_ui, &id).await;
operation_guard.disarm(); operation_guard.disarm();
match result { match result {
@@ -549,7 +552,13 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerE
} }
} }
async fn begin_operation(ctx: &Ctx, id: &str, operation: OperationKind) -> bool { async fn begin_operation(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
operation: OperationKind,
) -> bool {
let started = {
let mut active_operations = ctx.active_operations.write().await; let mut active_operations = ctx.active_operations.write().await;
match active_operations.entry(id.to_string()) { match active_operations.entry(id.to_string()) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
@@ -558,9 +567,22 @@ async fn begin_operation(ctx: &Ctx, id: &str, operation: OperationKind) -> bool
} }
Entry::Occupied(_) => false, Entry::Occupied(_) => false,
} }
};
if started {
events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await;
}
started
} }
async fn transition_download_to_install(ctx: &Ctx, id: &str, operation: OperationKind) -> bool { async fn transition_download_to_install(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
operation: OperationKind,
) -> bool {
let transitioned = {
let mut active_operations = ctx.active_operations.write().await; let mut active_operations = ctx.active_operations.write().await;
match active_operations.get_mut(id) { match active_operations.get_mut(id) {
Some(current) if *current == OperationKind::Downloading => { Some(current) if *current == OperationKind::Downloading => {
@@ -574,22 +596,33 @@ async fn transition_download_to_install(ctx: &Ctx, id: &str, operation: Operatio
false false
} }
None => { None => {
log::warn!("Cannot transition {id} from download to install; operation is not active"); log::warn!(
"Cannot transition {id} from download to install; operation is not active"
);
false false
} }
} }
};
if transitioned {
events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await;
}
transitioned
} }
async fn end_operation(ctx: &Ctx, id: &str) { async fn end_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>, id: &str) {
ctx.active_operations.write().await.remove(id); if ctx.active_operations.write().await.remove(id).is_some() {
events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await;
}
} }
async fn clear_active_download(ctx: &Ctx, id: &str) { async fn clear_active_download(ctx: &Ctx, id: &str) {
ctx.active_downloads.write().await.remove(id); ctx.active_downloads.write().await.remove(id);
} }
async fn end_download_operation(ctx: &Ctx, id: &str) { async fn end_download_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>, id: &str) {
end_operation(ctx, id).await; end_operation(ctx, tx_notify_ui, id).await;
clear_active_download(ctx, id).await; clear_active_download(ctx, id).await;
} }
@@ -636,7 +669,13 @@ pub async fn handle_set_game_dir_command(
let ctx_clone = ctx.clone(); let ctx_clone = ctx.clone();
ctx.task_tracker.spawn(async move { ctx.task_tracker.spawn(async move {
match load_local_library(&ctx_clone, &tx_notify_ui).await { match load_local_library_with_policy(
&ctx_clone,
&tx_notify_ui,
LocalLibraryEventPolicy::ForceSnapshot,
)
.await
{
Ok(()) => log::info!("Local game database loaded successfully"), Ok(()) => log::info!("Local game database loaded successfully"),
Err(e) => { Err(e) => {
log::error!("Failed to load local game database: {e}"); log::error!("Failed to load local game database: {e}");
@@ -649,11 +688,19 @@ pub async fn handle_set_game_dir_command(
pub async fn load_local_library( pub async fn load_local_library(
ctx: &Ctx, ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
) -> eyre::Result<()> {
load_local_library_with_policy(ctx, tx_notify_ui, LocalLibraryEventPolicy::OnChange).await
}
async fn load_local_library_with_policy(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
event_policy: LocalLibraryEventPolicy,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let game_dir = { ctx.game_dir.read().await.clone() }; let game_dir = { ctx.game_dir.read().await.clone() };
let active_ids = active_operation_ids(ctx).await; let active_ids = active_operation_ids(ctx).await;
install::recover_on_startup(&game_dir, &active_ids).await?; install::recover_on_startup(&game_dir, &active_ids).await?;
scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir).await scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir, event_policy).await
} }
async fn refresh_local_library( async fn refresh_local_library(
@@ -661,17 +708,24 @@ async fn refresh_local_library(
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let game_dir = { ctx.game_dir.read().await.clone() }; let game_dir = { ctx.game_dir.read().await.clone() };
scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir).await scan_and_announce_local_library(
ctx,
tx_notify_ui,
&game_dir,
LocalLibraryEventPolicy::OnChange,
)
.await
} }
async fn scan_and_announce_local_library( async fn scan_and_announce_local_library(
ctx: &Ctx, ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
game_dir: &Path, game_dir: &Path,
event_policy: LocalLibraryEventPolicy,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let catalog = ctx.catalog.read().await.clone(); let catalog = ctx.catalog.read().await.clone();
let scan = scan_local_library(game_dir, &catalog).await?; let scan = scan_local_library(game_dir, &catalog).await?;
update_and_announce_games(ctx, tx_notify_ui, scan).await; update_and_announce_games_with_policy(ctx, tx_notify_ui, scan, event_policy).await;
Ok(()) Ok(())
} }
@@ -683,10 +737,22 @@ async fn refresh_local_game(
let game_dir = { ctx.game_dir.read().await.clone() }; let game_dir = { ctx.game_dir.read().await.clone() };
let catalog = ctx.catalog.read().await.clone(); let catalog = ctx.catalog.read().await.clone();
let scan = rescan_local_game(&game_dir, &catalog, id).await?; let scan = rescan_local_game(&game_dir, &catalog, id).await?;
update_and_announce_games(ctx, tx_notify_ui, scan).await; update_and_announce_games_with_policy(
ctx,
tx_notify_ui,
scan,
LocalLibraryEventPolicy::OnChange,
)
.await;
Ok(()) Ok(())
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum LocalLibraryEventPolicy {
OnChange,
ForceSnapshot,
}
async fn active_operation_ids(ctx: &Ctx) -> HashSet<String> { async fn active_operation_ids(ctx: &Ctx) -> HashSet<String> {
ctx.active_operations.read().await.keys().cloned().collect() ctx.active_operations.read().await.keys().cloned().collect()
} }
@@ -722,6 +788,21 @@ pub async fn update_and_announce_games(
ctx: &Ctx, ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
scan: LocalLibraryScan, scan: LocalLibraryScan,
) {
update_and_announce_games_with_policy(
ctx,
tx_notify_ui,
scan,
LocalLibraryEventPolicy::OnChange,
)
.await;
}
async fn update_and_announce_games_with_policy(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
scan: LocalLibraryScan,
event_policy: LocalLibraryEventPolicy,
) { ) {
let LocalLibraryScan { let LocalLibraryScan {
mut game_db, mut game_db,
@@ -729,11 +810,11 @@ pub async fn update_and_announce_games(
revision, revision,
} = scan; } = scan;
let active_operations = active_operation_snapshot(ctx).await; let active_operation_ids = active_operation_ids(ctx).await;
if !active_operations.is_empty() { if !active_operation_ids.is_empty() {
let previous = ctx.local_library.read().await.games.clone(); let previous = ctx.local_library.read().await.games.clone();
for id in active_operations.iter().map(|operation| &operation.id) { for id in &active_operation_ids {
if let Some(summary) = previous.get(id) { if let Some(summary) = previous.get(id.as_str()) {
summaries.insert(id.clone(), summary.clone()); summaries.insert(id.clone(), summary.clone());
} else { } else {
summaries.remove(id); summaries.remove(id);
@@ -754,11 +835,15 @@ pub async fn update_and_announce_games(
let all_games = game_db.all_games().into_iter().cloned().collect::<Vec<_>>(); let all_games = game_db.all_games().into_iter().cloned().collect::<Vec<_>>();
if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated { if delta.is_some() || event_policy == LocalLibraryEventPolicy::ForceSnapshot {
events::send(
tx_notify_ui,
PeerEvent::LocalLibraryChanged {
games: all_games.clone(), games: all_games.clone(),
active_operations, },
}) { );
log::error!("Failed to send LocalGamesUpdated event: {e}"); } else {
log::debug!("Skipping unchanged local library event");
} }
let Some(delta) = delta else { let Some(delta) = delta else {
@@ -784,28 +869,6 @@ pub async fn update_and_announce_games(
} }
} }
async fn active_operation_snapshot(ctx: &Ctx) -> Vec<ActiveOperation> {
let active_operations = ctx.active_operations.read().await;
let mut snapshot = active_operations
.iter()
.map(|(id, operation)| ActiveOperation {
id: id.clone(),
operation: active_operation_kind(*operation),
})
.collect::<Vec<_>>();
snapshot.sort_by(|left, right| left.id.cmp(&right.id));
snapshot
}
fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind {
match operation {
OperationKind::Downloading => ActiveOperationKind::Downloading,
OperationKind::Installing => ActiveOperationKind::Installing,
OperationKind::Updating => ActiveOperationKind::Updating,
OperationKind::Uninstalling => ActiveOperationKind::Uninstalling,
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
@@ -821,7 +884,13 @@ mod tests {
use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tokio_util::{sync::CancellationToken, task::TaskTracker};
use super::*; use super::*;
use crate::{UnpackFuture, Unpacker, test_support::TempDir}; use crate::{
ActiveOperation,
ActiveOperationKind,
UnpackFuture,
Unpacker,
test_support::TempDir,
};
struct FakeUnpacker; struct FakeUnpacker;
@@ -860,6 +929,15 @@ mod tests {
.expect("event channel should remain open") .expect("event channel should remain open")
} }
async fn assert_no_event(rx: &mut mpsc::UnboundedReceiver<PeerEvent>) {
assert!(
tokio::time::timeout(Duration::from_millis(50), rx.recv())
.await
.is_err(),
"event channel should stay quiet"
);
}
fn addr(port: u16) -> SocketAddr { fn addr(port: u16) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], port)) SocketAddr::from(([127, 0, 0, 1], port))
} }
@@ -895,17 +973,9 @@ mod tests {
installed: bool, installed: bool,
downloaded: bool, downloaded: bool,
) -> lanspread_db::db::Game { ) -> lanspread_db::db::Game {
let PeerEvent::LocalGamesUpdated { let PeerEvent::LocalLibraryChanged { games } = event else {
games, panic!("expected LocalLibraryChanged");
active_operations,
} = event
else {
panic!("expected LocalGamesUpdated");
}; };
assert!(
active_operations.is_empty(),
"settled local update should not report active operations"
);
let game = games let game = games
.into_iter() .into_iter()
.find(|game| game.id == "game") .find(|game| game.id == "game")
@@ -915,6 +985,20 @@ mod tests {
game game
} }
fn assert_active_update(event: PeerEvent, expected: Vec<ActiveOperation>) {
let PeerEvent::ActiveOperationsChanged { active_operations } = event else {
panic!("expected ActiveOperationsChanged");
};
assert_eq!(active_operations, expected);
}
fn active_update(id: &str, operation: ActiveOperationKind) -> Vec<ActiveOperation> {
vec![ActiveOperation {
id: id.to_string(),
operation,
}]
}
#[test] #[test]
fn update_source_selects_latest_ready_peer_manifest() { fn update_source_selects_latest_ready_peer_manifest() {
let old_addr = addr(12_000); let old_addr = addr(12_000);
@@ -1039,8 +1123,8 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn local_games_update_reports_authoritative_active_operations() { async fn local_library_scan_freezes_active_game_state() {
let temp = TempDir::new("lanspread-handler-active-snapshot"); let temp = TempDir::new("lanspread-handler-active-freeze");
let root = temp.game_root(); let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive"); write_file(&root.join("game.eti"), b"archive");
@@ -1058,29 +1142,38 @@ mod tests {
update_and_announce_games(&ctx, &tx, scan).await; update_and_announce_games(&ctx, &tx, scan).await;
let PeerEvent::LocalGamesUpdated { let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else {
games, panic!("expected LocalLibraryChanged");
active_operations,
} = recv_event(&mut rx).await
else {
panic!("expected LocalGamesUpdated");
}; };
assert!( assert!(
games.is_empty(), games.is_empty(),
"active game should keep its previous announced state" "active game should keep its previous announced state"
); );
assert_eq!( }
active_operations,
#[tokio::test]
async fn begin_operation_reports_authoritative_active_operation_snapshot() {
let temp = TempDir::new("lanspread-handler-active-begin");
let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive");
let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
assert!(begin_operation(&ctx, &tx, "game", OperationKind::Updating).await);
assert_active_update(
recv_event(&mut rx).await,
vec![ActiveOperation { vec![ActiveOperation {
id: "game".to_string(), id: "game".to_string(),
operation: ActiveOperationKind::Installing, operation: ActiveOperationKind::Updating,
}] }],
); );
} }
#[tokio::test] #[tokio::test]
async fn unchanged_scan_still_reports_active_operation_snapshot() { async fn unchanged_settled_scan_is_not_reemitted() {
let temp = TempDir::new("lanspread-handler-active-unchanged"); let temp = TempDir::new("lanspread-handler-settled-unchanged");
let root = temp.game_root(); let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive"); write_file(&root.join("game.eti"), b"archive");
@@ -1095,28 +1188,50 @@ mod tests {
update_and_announce_games(&ctx, &tx, scan).await; update_and_announce_games(&ctx, &tx, scan).await;
assert_local_update(recv_event(&mut rx).await, false, true); assert_local_update(recv_event(&mut rx).await, false, true);
ctx.active_operations
.write()
.await
.insert("game".to_string(), OperationKind::Updating);
let scan = scan_local_library(temp.path(), &catalog) let scan = scan_local_library(temp.path(), &catalog)
.await .await
.expect("second scan should succeed"); .expect("second scan should succeed");
update_and_announce_games(&ctx, &tx, scan).await; update_and_announce_games(&ctx, &tx, scan).await;
let PeerEvent::LocalGamesUpdated { assert_no_event(&mut rx).await;
active_operations, .. }
} = recv_event(&mut rx).await
else { #[tokio::test]
panic!("expected LocalGamesUpdated"); async fn unchanged_operation_refresh_still_reports_settled_snapshot() {
}; let temp = TempDir::new("lanspread-handler-operation-unchanged");
assert_eq!( let root = temp.game_root();
active_operations, write_file(&root.join("version.ini"), b"20250101");
vec![ActiveOperation { write_file(&root.join("game.eti"), b"archive");
id: "game".to_string(), write_file(&root.join("local").join("old.txt"), b"old");
operation: ActiveOperationKind::Updating,
}] let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
let catalog = ctx.catalog.read().await.clone();
let scan = scan_local_library(temp.path(), &catalog)
.await
.expect("initial scan should succeed");
update_and_announce_games(&ctx, &tx, scan).await;
assert_local_update(recv_event(&mut rx).await, true, true);
run_install_operation(&ctx, &tx, "game".to_string()).await;
assert_active_update(
recv_event(&mut rx).await,
active_update("game", ActiveOperationKind::Updating),
); );
assert!(matches!(
recv_event(&mut rx).await,
PeerEvent::InstallGameBegin {
id,
operation: InstallOperation::Updating
} if id == "game"
));
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(matches!(
recv_event(&mut rx).await,
PeerEvent::InstallGameFinished { id } if id == "game"
));
assert_no_event(&mut rx).await;
} }
#[tokio::test] #[tokio::test]
@@ -1131,6 +1246,10 @@ mod tests {
run_install_operation(&ctx, &tx, "game".to_string()).await; run_install_operation(&ctx, &tx, "game".to_string()).await;
assert_active_update(
recv_event(&mut rx).await,
active_update("game", ActiveOperationKind::Installing),
);
match recv_event(&mut rx).await { match recv_event(&mut rx).await {
PeerEvent::InstallGameBegin { id, operation } => { PeerEvent::InstallGameBegin { id, operation } => {
assert_eq!(id, "game"); assert_eq!(id, "game");
@@ -1138,6 +1257,7 @@ mod tests {
} }
_ => panic!("expected InstallGameBegin"), _ => panic!("expected InstallGameBegin"),
} }
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::InstallGameFinished { id } if id == "game" PeerEvent::InstallGameFinished { id } if id == "game"
@@ -1174,7 +1294,8 @@ mod tests {
let tx = tx.clone(); let tx = tx.clone();
async move { async move {
assert!( assert!(
transition_download_to_install(&ctx, "game", prepared.operation_kind).await transition_download_to_install(&ctx, &tx, "game", prepared.operation_kind)
.await
); );
clear_active_download(&ctx, "game").await; clear_active_download(&ctx, "game").await;
run_started_install_operation(&ctx, &tx, "game".to_string(), prepared).await; run_started_install_operation(&ctx, &tx, "game".to_string(), prepared).await;
@@ -1186,6 +1307,10 @@ mod tests {
drop(read_guard); drop(read_guard);
install_task.await.expect("handoff task should finish"); install_task.await.expect("handoff task should finish");
assert_active_update(
recv_event(&mut rx).await,
active_update("game", ActiveOperationKind::Installing),
);
match recv_event(&mut rx).await { match recv_event(&mut rx).await {
PeerEvent::InstallGameBegin { id, operation } => { PeerEvent::InstallGameBegin { id, operation } => {
assert_eq!(id, "game"); assert_eq!(id, "game");
@@ -1193,6 +1318,7 @@ mod tests {
} }
_ => panic!("expected InstallGameBegin"), _ => panic!("expected InstallGameBegin"),
} }
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::InstallGameFinished { id } if id == "game" PeerEvent::InstallGameFinished { id } if id == "game"
@@ -1215,6 +1341,10 @@ mod tests {
run_install_operation(&ctx, &tx, "game".to_string()).await; run_install_operation(&ctx, &tx, "game".to_string()).await;
assert_active_update(
recv_event(&mut rx).await,
active_update("game", ActiveOperationKind::Updating),
);
match recv_event(&mut rx).await { match recv_event(&mut rx).await {
PeerEvent::InstallGameBegin { id, operation } => { PeerEvent::InstallGameBegin { id, operation } => {
assert_eq!(id, "game"); assert_eq!(id, "game");
@@ -1222,6 +1352,7 @@ mod tests {
} }
_ => panic!("expected InstallGameBegin"), _ => panic!("expected InstallGameBegin"),
} }
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::InstallGameFinished { id } if id == "game" PeerEvent::InstallGameFinished { id } if id == "game"
@@ -1241,6 +1372,10 @@ mod tests {
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
run_install_operation(&ctx, &tx, "game".to_string()).await; run_install_operation(&ctx, &tx, "game".to_string()).await;
assert_active_update(
recv_event(&mut rx).await,
active_update("game", ActiveOperationKind::Installing),
);
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::InstallGameBegin { PeerEvent::InstallGameBegin {
@@ -1248,6 +1383,7 @@ mod tests {
operation: InstallOperation::Installing operation: InstallOperation::Installing
} if id == "game" } if id == "game"
)); ));
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::InstallGameFinished { id } if id == "game" PeerEvent::InstallGameFinished { id } if id == "game"
@@ -1259,6 +1395,10 @@ mod tests {
write_file(&root.join("game.eti"), b"new archive"); write_file(&root.join("game.eti"), b"new archive");
run_install_operation(&ctx, &tx, "game".to_string()).await; run_install_operation(&ctx, &tx, "game".to_string()).await;
assert_active_update(
recv_event(&mut rx).await,
active_update("game", ActiveOperationKind::Updating),
);
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::InstallGameBegin { PeerEvent::InstallGameBegin {
@@ -1266,6 +1406,7 @@ mod tests {
operation: InstallOperation::Updating operation: InstallOperation::Updating
} if id == "game" } if id == "game"
)); ));
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::InstallGameFinished { id } if id == "game" PeerEvent::InstallGameFinished { id } if id == "game"
@@ -1274,10 +1415,15 @@ mod tests {
assert_eq!(game.local_version.as_deref(), Some("20250101")); assert_eq!(game.local_version.as_deref(), Some("20250101"));
run_uninstall_operation(&ctx, &tx, "game".to_string()).await; run_uninstall_operation(&ctx, &tx, "game".to_string()).await;
assert_active_update(
recv_event(&mut rx).await,
active_update("game", ActiveOperationKind::Uninstalling),
);
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::UninstallGameBegin { id } if id == "game" PeerEvent::UninstallGameBegin { id } if id == "game"
)); ));
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::UninstallGameFinished { id } if id == "game" PeerEvent::UninstallGameFinished { id } if id == "game"
@@ -1300,10 +1446,15 @@ mod tests {
run_uninstall_operation(&ctx, &tx, "game".to_string()).await; run_uninstall_operation(&ctx, &tx, "game".to_string()).await;
assert_active_update(
recv_event(&mut rx).await,
active_update("game", ActiveOperationKind::Uninstalling),
);
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::UninstallGameBegin { id } if id == "game" PeerEvent::UninstallGameBegin { id } if id == "game"
)); ));
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(matches!( assert!(matches!(
recv_event(&mut rx).await, recv_event(&mut rx).await,
PeerEvent::UninstallGameFinished { id } if id == "game" PeerEvent::UninstallGameFinished { id } if id == "game"
@@ -1356,4 +1507,29 @@ mod tests {
assert!(!next.game_root().join(".version.ini.tmp").exists()); assert!(!next.game_root().join(".version.ini.tmp").exists());
} }
#[tokio::test]
async fn path_changing_set_game_dir_emits_equivalent_snapshot() {
let current = TempDir::new("lanspread-handler-old-equivalent-dir");
let next = TempDir::new("lanspread-handler-new-equivalent-dir");
for root in [current.game_root(), next.game_root()] {
write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive");
}
let ctx = test_ctx(current.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
let catalog = ctx.catalog.read().await.clone();
let scan = scan_local_library(current.path(), &catalog)
.await
.expect("initial scan should succeed");
update_and_announce_games(&ctx, &tx, scan).await;
assert_local_update(recv_event(&mut rx).await, false, true);
handle_set_game_dir_command(&ctx, &tx, next.path().to_path_buf()).await;
ctx.task_tracker.close();
ctx.task_tracker.wait().await;
assert_local_update(recv_event(&mut rx).await, false, true);
}
} }
+5 -4
View File
@@ -132,9 +132,10 @@ pub enum PeerEvent {
PeerLost(SocketAddr), PeerLost(SocketAddr),
/// The total peer count has changed. /// The total peer count has changed.
PeerCountUpdated(usize), PeerCountUpdated(usize),
/// Local games have been scanned, with authoritative in-progress work. /// The local library contents changed after a scan.
LocalGamesUpdated { LocalLibraryChanged { games: Vec<Game> },
games: Vec<Game>, /// The set of in-progress local operations changed.
ActiveOperationsChanged {
active_operations: Vec<ActiveOperation>, active_operations: Vec<ActiveOperation>,
}, },
/// A required peer runtime component failed. /// A required peer runtime component failed.
@@ -168,7 +169,7 @@ pub enum InstallOperation {
Updating, Updating,
} }
/// In-progress operation snapshot attached to local library updates. /// In-progress operation snapshot sent when operation state changes.
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct ActiveOperation { pub struct ActiveOperation {
pub id: String, pub id: String,
+37 -6
View File
@@ -202,12 +202,13 @@ async fn handle_active_downloads_without_peers(
return; return;
} }
let mut changed = false;
for id in active_ids { for id in active_ids {
if peers_still_have_game(peer_game_db, &id).await { if peers_still_have_game(peer_game_db, &id).await {
continue; continue;
} }
active_operations.write().await.remove(&id); changed |= active_operations.write().await.remove(&id).is_some();
let Some(cancel_token) = active_downloads.write().await.remove(&id) else { let Some(cancel_token) = active_downloads.write().await.remove(&id) else {
continue; continue;
}; };
@@ -215,9 +216,13 @@ async fn handle_active_downloads_without_peers(
events::send( events::send(
tx_notify_ui, tx_notify_ui,
PeerEvent::DownloadGameFilesAllPeersGone { id }, PeerEvent::DownloadGameFilesAllPeersGone { id: id.clone() },
); );
} }
if changed {
events::emit_active_operations(active_operations, tx_notify_ui).await;
}
} }
async fn peers_still_have_game(peer_game_db: &Arc<RwLock<PeerGameDB>>, game_id: &str) -> bool { async fn peers_still_have_game(peer_game_db: &Arc<RwLock<PeerGameDB>>, game_id: &str) -> bool {
@@ -233,10 +238,16 @@ mod tests {
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use super::handle_active_downloads_without_peers; use super::handle_active_downloads_without_peers;
use crate::{PeerEvent, context::OperationKind, peer_db::PeerGameDB}; use crate::{
ActiveOperation,
ActiveOperationKind,
PeerEvent,
context::OperationKind,
peer_db::PeerGameDB,
};
#[tokio::test] #[tokio::test]
async fn all_peers_gone_cancels_download_and_emits_only_peers_gone() { async fn all_peers_gone_cancels_download_and_emits_peers_gone_then_active_snapshot() {
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let active_operations = Arc::new(RwLock::new(HashMap::from([( let active_operations = Arc::new(RwLock::new(HashMap::from([(
"game".to_string(), "game".to_string(),
@@ -266,9 +277,17 @@ mod tests {
event, event,
PeerEvent::DownloadGameFilesAllPeersGone { id } if id == "game" PeerEvent::DownloadGameFilesAllPeersGone { id } if id == "game"
)); ));
let event = rx
.recv()
.await
.expect("active operation snapshot should be emitted");
assert!(matches!(
event,
PeerEvent::ActiveOperationsChanged { active_operations } if active_operations.is_empty()
));
assert!( assert!(
rx.try_recv().is_err(), rx.try_recv().is_err(),
"peers-gone cancellation must not emit a duplicate failure event" "peers-gone cancellation must not emit extra events"
); );
} }
@@ -318,9 +337,21 @@ mod tests {
} }
cancelled_ids.sort(); cancelled_ids.sort();
assert_eq!(cancelled_ids, vec!["first", "second"]); assert_eq!(cancelled_ids, vec!["first", "second"]);
let event = rx
.recv()
.await
.expect("active operation snapshot should be emitted");
assert!(matches!(
event,
PeerEvent::ActiveOperationsChanged { active_operations }
if active_operations == vec![ActiveOperation {
id: "installing".to_string(),
operation: ActiveOperationKind::Installing,
}]
));
assert!( assert!(
rx.try_recv().is_err(), rx.try_recv().is_err(),
"multiple peers-gone cancellations must not emit duplicate failure events" "multiple peers-gone cancellations must not emit extra events"
); );
} }
} }
@@ -391,19 +391,15 @@ mod tests {
async fn recv_local_update( async fn recv_local_update(
rx: &mut mpsc::UnboundedReceiver<PeerEvent>, rx: &mut mpsc::UnboundedReceiver<PeerEvent>,
) -> (Vec<lanspread_db::db::Game>, Vec<crate::ActiveOperation>) { ) -> Vec<lanspread_db::db::Game> {
let event = tokio::time::timeout(Duration::from_secs(1), rx.recv()) let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await .await
.expect("local update event should arrive") .expect("local update event should arrive")
.expect("event channel should stay open"); .expect("event channel should stay open");
let PeerEvent::LocalGamesUpdated { let PeerEvent::LocalLibraryChanged { games } = event else {
games, panic!("expected LocalLibraryChanged");
active_operations,
} = event
else {
panic!("expected LocalGamesUpdated");
}; };
(games, active_operations) games
} }
#[test] #[test]
@@ -537,7 +533,7 @@ mod tests {
ctx.task_tracker.wait().await; ctx.task_tracker.wait().await;
let mut update_count = 0; let mut update_count = 0;
while let Ok(Some(PeerEvent::LocalGamesUpdated { .. })) = while let Ok(Some(PeerEvent::LocalLibraryChanged { .. })) =
tokio::time::timeout(Duration::from_millis(50), rx.recv()).await tokio::time::timeout(Duration::from_millis(50), rx.recv()).await
{ {
update_count += 1; update_count += 1;
@@ -560,8 +556,7 @@ mod tests {
run_fallback_scan(&ctx, &tx).await; run_fallback_scan(&ctx, &tx).await;
let (games, active_operations) = recv_local_update(&mut rx).await; let games = recv_local_update(&mut rx).await;
assert!(active_operations.is_empty());
let game = games let game = games
.iter() .iter()
.find(|game| game.id == "game") .find(|game| game.id == "game")
@@ -585,9 +580,12 @@ mod tests {
run_fallback_scan(&ctx, &tx).await; run_fallback_scan(&ctx, &tx).await;
let (games, active_operations) = recv_local_update(&mut rx).await; assert!(
assert!(games.is_empty()); tokio::time::timeout(Duration::from_millis(50), rx.recv())
assert!(active_operations.is_empty()); .await
.is_err(),
"non-catalog scan should not emit a local library event"
);
let library = ctx.local_library.read().await; let library = ctx.local_library.read().await;
assert!(library.games.is_empty()); assert!(library.games.is_empty());
assert!(library.recent_deltas.is_empty()); assert!(library.recent_deltas.is_empty());
@@ -13,7 +13,6 @@ use lanspread_db::db::{Availability, Game, GameDB, GameFileDescription};
use lanspread_peer::{ use lanspread_peer::{
ActiveOperation, ActiveOperation,
ActiveOperationKind, ActiveOperationKind,
InstallOperation,
PeerCommand, PeerCommand,
PeerEvent, PeerEvent,
PeerGameDB, PeerGameDB,
@@ -747,17 +746,18 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
log::info!("PeerEvent::ListGames received"); log::info!("PeerEvent::ListGames received");
update_game_db(games, app_handle.clone()).await; update_game_db(games, app_handle.clone()).await;
} }
PeerEvent::LocalGamesUpdated { PeerEvent::LocalLibraryChanged { games: local_games } => {
games: local_games, log::info!("PeerEvent::LocalLibraryChanged received");
active_operations, update_local_games_in_db(local_games, app_handle.clone()).await;
} => { }
log::info!("PeerEvent::LocalGamesUpdated received"); PeerEvent::ActiveOperationsChanged { active_operations } => {
{ log::info!("PeerEvent::ActiveOperationsChanged received");
let state = app_handle.state::<LanSpreadState>(); let state = app_handle.state::<LanSpreadState>();
{
let mut ui_active_operations = state.active_operations.write().await; let mut ui_active_operations = state.active_operations.write().await;
reconcile_active_operations(&mut ui_active_operations, &active_operations); reconcile_active_operations(&mut ui_active_operations, &active_operations);
} }
update_local_games_in_db(local_games, app_handle.clone()).await; emit_games_list(app_handle).await;
} }
PeerEvent::GotGameFiles { PeerEvent::GotGameFiles {
id, id,
@@ -773,21 +773,9 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
&id, &id,
"PeerEvent::NoPeersHaveGame", "PeerEvent::NoPeersHaveGame",
); );
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.remove(&id);
} }
PeerEvent::DownloadGameFilesBegin { id } => { PeerEvent::DownloadGameFilesBegin { id } => {
log::info!("PeerEvent::DownloadGameFilesBegin received"); log::info!("PeerEvent::DownloadGameFilesBegin received");
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.insert(id.clone(), UiOperationKind::Downloading);
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-download-begin", "game-download-begin",
@@ -808,7 +796,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
); );
} }
PeerEvent::DownloadGameFilesFinished { id } => { PeerEvent::DownloadGameFilesFinished { id } => {
handle_download_finished(app_handle, id).await; handle_download_finished(app_handle, id);
} }
PeerEvent::DownloadGameFilesFailed { id } => { PeerEvent::DownloadGameFilesFailed { id } => {
log::warn!("PeerEvent::DownloadGameFilesFailed received"); log::warn!("PeerEvent::DownloadGameFilesFailed received");
@@ -818,12 +806,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
&id, &id,
"PeerEvent::DownloadGameFilesFailed", "PeerEvent::DownloadGameFilesFailed",
); );
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.remove(&id);
} }
PeerEvent::DownloadGameFilesAllPeersGone { id } => { PeerEvent::DownloadGameFilesAllPeersGone { id } => {
log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}"); log::warn!("PeerEvent::DownloadGameFilesAllPeersGone received for {id}");
@@ -833,26 +815,10 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
&id, &id,
"PeerEvent::DownloadGameFilesAllPeersGone", "PeerEvent::DownloadGameFilesAllPeersGone",
); );
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.remove(&id);
} }
PeerEvent::InstallGameBegin { id, operation } => { PeerEvent::InstallGameBegin { id, operation } => {
let operation_name: &'static str = (&operation).into(); let operation_name: &'static str = (&operation).into();
log::info!("PeerEvent::InstallGameBegin received for {id}: {operation_name}"); log::info!("PeerEvent::InstallGameBegin received for {id}: {operation_name}");
let ui_operation = match operation {
InstallOperation::Installing => UiOperationKind::Installing,
InstallOperation::Updating => UiOperationKind::Updating,
};
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.insert(id.clone(), ui_operation);
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-install-begin", "game-install-begin",
@@ -862,12 +828,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
PeerEvent::InstallGameFinished { id } => { PeerEvent::InstallGameFinished { id } => {
log::info!("PeerEvent::InstallGameFinished received for {id}"); log::info!("PeerEvent::InstallGameFinished received for {id}");
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.remove(&id);
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-install-finished", "game-install-finished",
@@ -877,12 +837,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
PeerEvent::InstallGameFailed { id } => { PeerEvent::InstallGameFailed { id } => {
log::warn!("PeerEvent::InstallGameFailed received for {id}"); log::warn!("PeerEvent::InstallGameFailed received for {id}");
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.remove(&id);
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-install-failed", "game-install-failed",
@@ -892,12 +846,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
PeerEvent::UninstallGameBegin { id } => { PeerEvent::UninstallGameBegin { id } => {
log::info!("PeerEvent::UninstallGameBegin received for {id}"); log::info!("PeerEvent::UninstallGameBegin received for {id}");
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.insert(id.clone(), UiOperationKind::Uninstalling);
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-uninstall-begin", "game-uninstall-begin",
@@ -907,12 +855,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
PeerEvent::UninstallGameFinished { id } => { PeerEvent::UninstallGameFinished { id } => {
log::info!("PeerEvent::UninstallGameFinished received for {id}"); log::info!("PeerEvent::UninstallGameFinished received for {id}");
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.remove(&id);
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-uninstall-finished", "game-uninstall-finished",
@@ -922,12 +864,6 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
PeerEvent::UninstallGameFailed { id } => { PeerEvent::UninstallGameFailed { id } => {
log::warn!("PeerEvent::UninstallGameFailed received for {id}"); log::warn!("PeerEvent::UninstallGameFailed received for {id}");
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.remove(&id);
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
"game-uninstall-failed", "game-uninstall-failed",
@@ -995,7 +931,7 @@ async fn handle_got_game_files(
} }
} }
async fn handle_download_finished(app_handle: &AppHandle, id: String) { fn handle_download_finished(app_handle: &AppHandle, id: String) {
log::info!("PeerEvent::DownloadGameFilesFinished received"); log::info!("PeerEvent::DownloadGameFilesFinished received");
emit_game_id_event( emit_game_id_event(
app_handle, app_handle,
@@ -1003,13 +939,6 @@ async fn handle_download_finished(app_handle: &AppHandle, id: String) {
&id, &id,
"PeerEvent::DownloadGameFilesFinished", "PeerEvent::DownloadGameFilesFinished",
); );
app_handle
.state::<LanSpreadState>()
.active_operations
.write()
.await
.remove(&id);
} }
#[cfg(test)] #[cfg(test)]