From e711cf3454aeab4450144b9aa737e3f4bdbd3fc1 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sat, 16 May 2026 18:32:24 +0200 Subject: [PATCH] fix(peer): settle current-protocol local state cleanup The follow-up backlog had drifted into three settled peer/runtime issues: the legacy game-list fallback contradicted the one-wire-version policy, the Tauri shell still re-derived local install state from disk after peer snapshots, and `Availability::Downloading` existed even though active operations are already reported through a separate operation table. Remove the legacy `AnnounceGames` request and fallback service. Discovery now ignores peers that do not advertise the current protocol and a peer id, and library changes are sent through the current delta path only. This keeps the runtime aligned with the documented current-build-only interoperability model. Make peer `LocalGamesUpdated` snapshots authoritative for local fields in the Tauri database. The GUI-side catalog still owns static metadata such as names, sizes, and descriptions, but downloaded, installed, local version, and availability now come from the peer runtime instead of a second whole-library filesystem scan. Snapshot reconciliation also pins the missing-begin and missing-finish lifecycle cases in tests. Collapse availability back to the settled `Ready` and `LocalOnly` states. Aggregation now counts only `Ready` peers as download sources, and the frontend no longer carries a dead `Downloading` enum value. The core peer also exposes the small non-GUI hooks needed by scripted callers: startup options for state and mDNS, a local-ready event, direct connection, peer snapshots, and an explicit post-download install policy. Those hooks reuse the same current protocol path and do not add compatibility shims. Test Plan: - `git diff --check` - `just fmt` - `just clippy` - `just test` Refs: BACKLOG.md, FINDINGS.md, IMPL_DECISIONS.md --- BACKLOG.md | 131 +------ FINDINGS.md | 172 +-------- IMPL_DECISIONS.md | 9 +- crates/lanspread-db/src/db.rs | 31 +- crates/lanspread-peer/ARCHITECTURE.md | 5 +- crates/lanspread-peer/src/context.rs | 6 + crates/lanspread-peer/src/handlers.rs | 142 +++++-- crates/lanspread-peer/src/identity.rs | 12 +- crates/lanspread-peer/src/lib.rs | 87 ++++- crates/lanspread-peer/src/network.rs | 60 ++- crates/lanspread-peer/src/peer_db.rs | 44 ++- crates/lanspread-peer/src/remote_peer.rs | 50 +-- crates/lanspread-peer/src/services.rs | 2 +- .../lanspread-peer/src/services/discovery.rs | 50 +-- .../lanspread-peer/src/services/handshake.rs | 2 +- crates/lanspread-peer/src/services/legacy.rs | 37 -- .../src/services/local_monitor.rs | 1 + crates/lanspread-peer/src/services/server.rs | 37 +- crates/lanspread-peer/src/services/stream.rs | 20 +- crates/lanspread-peer/src/startup.rs | 6 +- crates/lanspread-proto/src/lib.rs | 1 - .../src-tauri/src/lib.rs | 348 ++++++++---------- crates/lanspread-tauri-deno-ts/src/App.tsx | 1 - 23 files changed, 531 insertions(+), 723 deletions(-) delete mode 100644 crates/lanspread-peer/src/services/legacy.rs diff --git a/BACKLOG.md b/BACKLOG.md index 105d21b..026f130 100644 --- a/BACKLOG.md +++ b/BACKLOG.md @@ -11,136 +11,7 @@ here" cleanups that grow beyond the in-scope change. --- -## Legacy peer protocol fallback contradicts the wire policy - -CLAUDE.md / AGENTS.md: *"There is only one wire version — the current one. -No legacy peers, no compatibility shims, no fallback paths for older -builds."* - -Live legacy paths: - -- `crates/lanspread-peer/src/services/legacy.rs` exists and is called from - `discovery.rs:183-188` when the `Hello` handshake fails. -- `discovery.rs:134` synthesizes `legacy-{addr}` peer IDs when `proto_ver` - is absent from mDNS TXT records. -- `discovery.rs:169` treats `proto_ver.is_none()` as handshake-eligible. -- `update_and_announce_games` (`handlers.rs:605-624`) branches on - `FEATURE_LIBRARY_DELTA` and falls back to `announce_games_to_peer` - (sending `Request::AnnounceGames`) for peers that don't advertise the - feature. -- `Request::AnnounceGames` is still defined in - `lanspread-proto/src/lib.rs:75` and handled in - `services/stream.rs:116`. - -Functionally inert today — current-build peers don't drop `Hello` — but -code and stated policy disagree. Either delete the paths or revert the -policy. - ---- - -## Tauri keeps a parallel filesystem-derived scan - -The peer now owns the install state machine (per PLAN.md:11), but Tauri -still re-derives local install/download state from disk on every event: - -- `refresh_games_list` (`src-tauri/src/lib.rs:489`) fires after every - `update_game_db`, `update_local_games_in_db`, and - `update_game_directory`. It calls `set_all_uninstalled()` and re-runs - `update_game_installation_state` over every bundled-DB entry, - re-reading `version.ini`, re-checking `local/`, re-parsing version - strings. -- `update_local_games_in_db` (`src-tauri/src/lib.rs:667-704`) just merged - the peer's authoritative `Game` values into the Tauri-side `GameDB`. - Immediately after, `refresh_games_list` re-derives the same fields - from disk and overwrites the merged result. -- The per-ID rescan optimization in `local_monitor.rs` is completely - undone on the Tauri side: every peer event triggers a whole-library - disk walk. - -Today both paths reach the same conclusion. The risk is forward-looking: -the moment one of the two derivation rules changes (a new `availability` -rule, a new sentinel, a new ignore name), the two scanners can disagree -silently with no rule for which wins. - -**Fix when convenient:** `refresh_games_list` accepts the peer's `Game` -slice and trusts it for local fields. Tauri's bundled DB stays as the -source of truth for static metadata (name, description, max_players, -thumbnail mapping), but `downloaded`/`installed`/`local_version`/ -`availability` come from the peer. `update_game_installation_state` and -`set_all_uninstalled` go away. The dead log branch at -`src-tauri/src/lib.rs:397-402` is obviated naturally by this. - ---- - -## `Availability::Downloading` is wire-defined but unreachable - -`crates/lanspread-db/src/db.rs:36-41`. The variant exists and serializes -but `build_game_summary` only emits `Ready` or `LocalOnly`. -Operation-table gating handles the in-progress case instead. - -`peer_db::get_all_games` has a code path that lets a remote-advertised -`Downloading` summary contribute `eti_version` to aggregation. If a -future maintainer re-enables emitting `Downloading` from -`build_game_summary`, aggregation will treat such peers as -not-downloadable but still pull their version info. - -**Decide-and-document task:** either remove the variant (matches the -"current wire only" policy) or add a comment in the proto enum naming -the contract. - ---- - -## `update_game_installation_state` dead log branch - -`src-tauri/src/lib.rs:397-402`: - -```rust -if eti_package_exists(&game_path, &game.id) && !downloaded { - log::debug!("Game ... has archives but no version.ini sentinel; treating as not downloaded"); -} -``` - -Side-effect-only log line. Either delete or wire to a UI affordance -("partial download — retry?"). Obviated naturally if/when the Tauri -parallel scan goes away. - ---- - -## Untested edge: Tauri reconciliation with dropped lifecycle events - -The Rust `reconcile_active_operations` test -(`src-tauri/src/lib.rs:1117-1153`) covers map replacement but not the -realistic case of an event sequence with a missing `begin` or `finish`. -The TS side merges in `App.tsx`. Real failure mode: a missing `finish` -followed by a `LocalGamesUpdated` snapshot should clear the spinner, -and today's code does — but it's not pinned by a test. - -Add a test if the spinner ever gets stuck in practice. - ---- - -## Documentation drift - -`FOLLOW_UP_2.md` still lists two items as "Still open" that have -landed: - -- **#10 `save_library_index` non-atomic** — landed in `fdad162`, atomic - temp+fsync+rename at `local_games.rs:169-184`. -- **#11 Split `download.rs`** — landed in `a251233`, split under - `crates/lanspread-peer/src/download/`. - -Either mark the doc complete or delete it. Anyone reading it as status -will be misled. - -The collection of plan/follow-up/review docs in the repo root -(`PLAN.md`, `PLAN_AVAILABILITY.md`, `PLAN_ATOMIC_INDEX.md`, -`PLAN_DOWNLOAD_SPLIT.md`, `FOLLOW_UP_PLAN.md`, `FOLLOW_UP_2.md`, -`REVIEW_STEP_1..4.md`, `IMPL_DECISIONS.md`) is also getting noisy. A -single retrospective archive folder for completed plans would help; a -future PLAN.md should be self-terminating with explicit acceptance -criteria so it doesn't spawn this many trailing docs. - ---- +No open backlog items. ## How items leave this file diff --git a/FINDINGS.md b/FINDINGS.md index da10fea..cc6a916 100644 --- a/FINDINGS.md +++ b/FINDINGS.md @@ -1,161 +1,19 @@ -# Findings — Bugs to Fix Before Merging +# Findings -Three bugs found in the post-PLAN.md implementation. Fix these, then merge. -Everything else lives in `BACKLOG.md` and does not block. +No open pre-merge findings are currently tracked here. ---- +The previous three findings have landed in code and tests: -## 1. `update_game` never fetches a fresh manifest from peers +- `update_game` now uses `PeerCommand::FetchLatestFromPeers` to skip local + manifest serving and fetch fresh peer metadata. Covered by + `update_fetch_emits_fresh_manifest_from_latest_peer` and + `update_request_skips_local_manifest_even_when_download_exists`. +- Download-to-install handoff no longer relies on `OperationGuard::Drop` for + ordered state transitions. Covered by + `download_handoff_waits_for_readers_and_auto_installs` and the liveness + cancellation tests. +- Library index reads and writes are serialized by `LIBRARY_INDEX_LOCK`. + Covered by `concurrent_rescans_preserve_both_index_updates`. -PLAN.md:357 calls for `update_game` to send `GetGame`, *fetch fresh remote -archives*, and trigger an auto-install as a transactional update. The -implementation makes that path unreachable. - -Trace: - -1. Tauri `update_game` sends `PeerCommand::GetGame` - (`crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs:162`). -2. Peer's `handle_get_game_command` calls `try_serve_local_game` first - (`crates/lanspread-peer/src/handlers.rs:88`). -3. `try_serve_local_game` consults `local_download_available`, which returns - true whenever `version.ini` is present locally and the ID is in the catalog - (`crates/lanspread-peer/src/local_games.rs:48-66`). For any game the user - has already downloaded, this is *always* true. -4. The **local** file descriptions are returned via `GotGameFiles`. Tauri - routes those into `DownloadGameFiles`. -5. `handle_download_game_files_command:204-227` consults `peer_game_db` via - `validate_file_sizes_majority`, so cached remote metadata *is* read. But - the descriptions actually used for chunk planning are the local ones - (`handlers.rs:189-195`). When peers advertise a newer version with - different file sizes, the whitelist is empty and the path falls into - "instant install of local archives." When sizes happen to match, we - plan chunks against local descriptions and request those offsets from - peers — which works only when peer-side files are identical to local. - Either way, peers' current manifests are never read. - -Net effect: "update" = re-extract whatever archives are on disk into -`local/`. The flow PLAN.md described — fetch the newer archive from peers, -then auto-install — does not exist. - -**Fix candidates:** - -- New `PeerCommand::FetchLatestFromPeers { id }` that skips the local-serve - gate and asks one peer for its current manifest. -- `PeerCommand::GetGame { id, force_peer: true }` flag honored by - `try_serve_local_game`. -- `try_serve_local_game` short-circuits only when local `eti_version` is ≥ - `peer_db.get_latest_version_for_game(id)`. The aggregation function - already exists in `peer_db.rs:320`; nothing calls it for this purpose. - -**Tests to add:** `update_game` actually pulls the newer manifest from a -peer when one exists. Today this can't be tested because the code path -doesn't exist. - ---- - -## 2. `OperationGuard::Drop` is doing ordered state transitions - -`crates/lanspread-peer/src/handlers.rs:254-279`: - -```rust -ctx.task_tracker.spawn(async move { - let result = { - let _download_state_guard = OperationGuard::download(...); - download_game_files(...).await - }; // guard drops here - match result { - Ok(()) => run_install_operation(&ctx_clone, ..., download_id).await, - ... - } -}); -``` - -`OperationGuard::Drop` (`context.rs:156-191`) tries `try_write` first, then -falls back to `tokio::spawn(async { ... .write().await.remove(...) })` if the -lock is contended. The contention happens because `active_operations` is read -on every watcher tick, every `list_games`, every `can_serve_game`, every -liveness sweep, every `update_and_announce_games` snapshot. - -This is the wrong shape for the state transition. Drop is fire-and-forget; -the synchronous code after the guard scope keeps running before the deferred -removal lands. Two distinct symptoms of the same root cause: - -1. **Install rejected:** `run_install_operation` calls `begin_operation` - (`handlers.rs:336-339`) which does `Entry::Vacant` on the same map. If - `begin_operation` wins the lock before the spawned remove task does, it - sees the leftover `Downloading` entry and rejects the install. User sees - `version.ini` on disk, no `local/`, no `InstallGameBegin`, no - explanation. -2. **Stale snapshot:** Post-finish refresh calls - `active_operation_snapshot` (`handlers.rs:558`) before the deferred - removal runs. UI receives one final snapshot saying the operation is - active even though `InstallGameFinished` was already sent. - -**Fix:** Explicit `async end_operation(...)` call before finish/refresh, -under a single write lock. The same write that removes `Downloading` -should insert `Installing`/`Updating` for the auto-install path, making -the handoff atomic. Demote `OperationGuard` to crash-safety: only fires -when the task panics or is aborted, and logs loudly when it does. - -**Tests to add:** - -- Hold a read lock on `active_operations` while `download_game_files` - returns; assert the auto-install still proceeds. -- Liveness path cancellation while multiple downloads are in flight; - assert no duplicate failure events and no stuck operation-table - entries. - ---- - -## 3. Uncoordinated library-index writes - -`scan_local_library` (`local_games.rs:533-615`) and `rescan_local_game` -(`local_games.rs:617-639`) both load `library_index.json`, mutate the -deserialized state, and save. Nothing serializes the two paths. - -Call sites: - -- `run_fallback_scan` (`local_monitor.rs:289`) → `scan_local_library`. -- `run_gated_rescan` (`local_monitor.rs:261`) → `rescan_local_game`, - spawned on the task tracker (line 253-258). -- `load_local_library` (`handlers.rs:491`) → `scan_local_library`. -- `refresh_local_game` (`handlers.rs:520`) → `rescan_local_game`. - -A fallback-scan tick can land between a gated-rescan's load and save (or -vice versa). Last writer wins; intermediate updates are silently dropped. - -The piece of state that drifts in a user-visible way is `revision`: both -writers compute `old.saturating_add(1)` and save `old+1`, while the -in-memory `LocalLibraryState.revision` bumps independently in -`update_from_scan`. After a restart, disk-revision can be lower than -peers expect, breaking `LibraryDelta.from_rev` matching — peers will -fall back to snapshots and the delta optimization is undone. - -**Fix candidates:** `tokio::Mutex` around index I/O, or move the index -ownership into the same actor that owns `LocalLibraryState` so all -mutations go through one channel. - ---- - -## What's *not* in this file - -Everything else found during review is in `BACKLOG.md`. Notable items -include: Tauri-side parallel scanning, legacy peer protocol fallback, -unreachable `Availability::Downloading` variant, stale FOLLOW_UP_2.md. -None of those block merging. - ---- - -## Definition of done for this branch - -- Fixes for #1, #2, #3 land. -- Tests listed under each fix land. -- `just test`, `just clippy`, `just build` clean. -- Manual: install a game, then update it while a peer advertises a newer - version, then uninstall it. Verify the version actually changes after - update (covers #1) and that the UI doesn't get stuck on a spinner - after operations complete (covers #2). - -Once those are green, this branch is done. Re-reviewing will surface -more smells; don't run another review unless something behaves wrong -when tested manually. +Manual install/update/uninstall smoke testing is still a useful release check, +but there are no known blocking findings left in this file. diff --git a/IMPL_DECISIONS.md b/IMPL_DECISIONS.md index c24cc60..56cbe23 100644 --- a/IMPL_DECISIONS.md +++ b/IMPL_DECISIONS.md @@ -25,9 +25,12 @@ now sends `FetchLatestFromPeers`, which skips local manifest serving and asks latest-version peers for fresh file metadata before the normal download and update transaction runs. -- Kept `Availability::Downloading` in the wire protocol for compatibility, but - local summaries do not emit it today because active operations are gated out - of scans and serving decisions. +- Removed the unreachable `Availability::Downloading` protocol value. Active + operations are reported separately, and local summaries emit only settled + availability. - Threaded availability through the UI-facing `Game` payload so `LocalOnly` rendering follows backend state instead of reverse-engineering it from `installed && !downloaded`. +- Removed Tauri's parallel whole-library filesystem scan. The UI database keeps + bundled catalog metadata, while peer `LocalGamesUpdated` snapshots now own + `downloaded`, `installed`, `local_version`, and `availability`. diff --git a/crates/lanspread-db/src/db.rs b/crates/lanspread-db/src/db.rs index 97468f0..b0ec6cd 100644 --- a/crates/lanspread-db/src/db.rs +++ b/crates/lanspread-db/src/db.rs @@ -33,9 +33,6 @@ pub fn read_version_from_ini(game_dir: &Path) -> eyre::Result> { #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum Availability { Ready, - /// Wire-compatible transitional state. Local library summaries currently - /// suppress active operations instead of advertising this value. - Downloading, #[default] LocalOnly, } @@ -49,11 +46,6 @@ impl Availability { Self::LocalOnly } } - - #[must_use] - pub fn is_downloaded(&self) -> bool { - matches!(self, Self::Ready) - } } /// A game @@ -95,8 +87,7 @@ pub struct Game { } impl Game { - /// Sets sentinel-derived download state and collapses any non-ready - /// availability, including `Downloading`, back to `LocalOnly`. + /// Sets sentinel-derived download state and matching availability. pub fn set_downloaded(&mut self, downloaded: bool) { self.downloaded = downloaded; self.availability = Availability::from_downloaded(downloaded); @@ -106,10 +97,8 @@ impl Game { pub fn normalized_availability(&self) -> Availability { if self.downloaded { Availability::Ready - } else if self.availability.is_downloaded() { - Availability::LocalOnly } else { - self.availability.clone() + Availability::LocalOnly } } } @@ -201,14 +190,6 @@ impl GameDB { games.sort_by(|a, b| a.name.cmp(&b.name)); games } - - pub fn set_all_uninstalled(&mut self) { - for game in self.games.values_mut() { - game.set_downloaded(false); - game.installed = false; - game.local_version = None; - } - } } impl Default for GameDB { @@ -326,14 +307,6 @@ mod tests { game.availability = Availability::Ready; assert_eq!(game.normalized_availability(), Availability::LocalOnly); - game.availability = Availability::Downloading; - game.set_downloaded(false); - assert!(!game.downloaded); - assert_eq!(game.availability, Availability::LocalOnly); - - game.availability = Availability::Downloading; - assert_eq!(game.normalized_availability(), Availability::Downloading); - game.downloaded = true; assert_eq!(game.normalized_availability(), Availability::Ready); } diff --git a/crates/lanspread-peer/ARCHITECTURE.md b/crates/lanspread-peer/ARCHITECTURE.md index cda26cb..cb2c65b 100644 --- a/crates/lanspread-peer/ARCHITECTURE.md +++ b/crates/lanspread-peer/ARCHITECTURE.md @@ -165,7 +165,7 @@ Most scans become O(number of game dirs), with full recursion only when needed. immediate TCP/QUIC roundtrips when nothing changed. - Add a lightweight handshake in `run_peer_discovery` that exchanges `Hello`/`HelloAck` before any library sync. - - Keep a fallback path that uses `ListGames` when `Hello` is unsupported. + - Ignore peers that do not advertise the current protocol version. 4. Library revisioning: - Store a monotonic `library_rev` locally and increment only after a successful index refresh completes. @@ -180,8 +180,7 @@ Most scans become O(number of game dirs), with full recursion only when needed. incrementally update the cache. - Schedule a low-frequency full scan to reconcile missed watcher events. 6. Announce updates: - - Replace `AnnounceGames` with `LibraryDelta` broadcasts keyed by - `library_rev`. + - Broadcast `LibraryDelta` updates keyed by `library_rev`. - Send `LibrarySummary` on new connections to seed the delta flow. 7. File manifest caching: - Store per-game `manifest_hash` and only fetch details when changed. diff --git a/crates/lanspread-peer/src/context.rs b/crates/lanspread-peer/src/context.rs index 85e8bb6..da4f92f 100644 --- a/crates/lanspread-peer/src/context.rs +++ b/crates/lanspread-peer/src/context.rs @@ -39,6 +39,7 @@ pub struct Ctx { pub unpacker: Arc, pub catalog: Arc>>, pub peer_id: Arc, + pub enable_mdns: bool, pub shutdown: CancellationToken, pub task_tracker: TaskTracker, } @@ -54,6 +55,7 @@ pub struct PeerCtx { pub peer_game_db: Arc>, pub catalog: Arc>>, pub peer_id: Arc, + pub enable_mdns: bool, pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender, pub shutdown: CancellationToken, pub task_tracker: TaskTracker, @@ -72,6 +74,7 @@ impl std::fmt::Debug for PeerCtx { impl Ctx { /// Creates a new context with the given peer game database. + #[allow(clippy::too_many_arguments)] pub fn new( peer_game_db: Arc>, peer_id: String, @@ -80,6 +83,7 @@ impl Ctx { shutdown: CancellationToken, task_tracker: TaskTracker, catalog: Arc>>, + enable_mdns: bool, ) -> Self { Self { game_dir: Arc::new(RwLock::new(game_dir)), @@ -92,6 +96,7 @@ impl Ctx { unpacker, catalog, peer_id: Arc::new(peer_id), + enable_mdns, shutdown, task_tracker, } @@ -111,6 +116,7 @@ impl Ctx { peer_game_db: self.peer_game_db.clone(), catalog: self.catalog.clone(), peer_id: self.peer_id.clone(), + enable_mdns: self.enable_mdns, tx_notify_ui, shutdown: self.shutdown.clone(), task_tracker: self.task_tracker.clone(), diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 28490f6..15cb145 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -8,7 +8,8 @@ use std::{ sync::Arc, }; -use lanspread_db::db::{GameDB, GameFileDescription}; +use lanspread_db::db::{Game, GameDB, GameFileDescription}; +use lanspread_proto::GameSummary; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ @@ -19,7 +20,6 @@ use crate::{ context::{Ctx, OperationGuard, OperationKind}, download::download_game_files, events, - identity::FEATURE_LIBRARY_DELTA, install, local_games::{ LocalLibraryScan, @@ -31,9 +31,10 @@ use crate::{ scan_local_library, version_ini_is_regular_file, }, - network::{announce_games_to_peer, request_game_details_from_peer, send_library_delta}, + network::{request_game_details_from_peer, request_game_list_from_peer, send_library_delta}, peer_db::PeerGameDB, remote_peer::ensure_peer_id_for_addr, + services::perform_handshake_with_peer, }; // ============================================================================= @@ -197,6 +198,7 @@ pub async fn handle_download_game_files_command( tx_notify_ui: &UnboundedSender, id: String, file_descriptions: Vec, + install_after_download: bool, ) { log::info!("Got PeerCommand::DownloadGameFiles"); let games_folder = { ctx.game_dir.read().await.clone() }; @@ -262,7 +264,9 @@ pub async fn handle_download_game_files_command( { log::error!("Failed to send DownloadGameFilesFinished event: {e}"); } - spawn_install_operation(ctx, tx_notify_ui, id.clone()); + if install_after_download { + spawn_install_operation(ctx, tx_notify_ui, id.clone()); + } } else { log::error!("No trusted peers available after majority validation for game {id}"); } @@ -319,19 +323,32 @@ pub async fn handle_download_game_files_command( return; }; - if transition_download_to_install(&ctx_clone, &download_id, prepared.operation_kind) - .await - { - clear_active_download(&ctx_clone, &download_id).await; - run_started_install_operation( + if install_after_download { + if transition_download_to_install( &ctx_clone, - &tx_notify_ui_clone, - download_id, - prepared, + &download_id, + prepared.operation_kind, ) - .await; + .await + { + clear_active_download(&ctx_clone, &download_id).await; + run_started_install_operation( + &ctx_clone, + &tx_notify_ui_clone, + download_id, + prepared, + ) + .await; + } else { + clear_active_download(&ctx_clone, &download_id).await; + } } else { - clear_active_download(&ctx_clone, &download_id).await; + end_download_operation(&ctx_clone, &download_id).await; + if let Err(err) = + refresh_local_game(&ctx_clone, &tx_notify_ui_clone, &download_id).await + { + log::error!("Failed to refresh local library after download: {err}"); + } } download_state_guard.disarm(); } @@ -681,6 +698,69 @@ pub async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSe events::emit_peer_count(&ctx.peer_game_db, tx_notify_ui).await; } +/// Connects to a peer directly, bypassing mDNS discovery. +pub async fn handle_connect_peer_command( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + addr: SocketAddr, +) { + log::info!("Direct connect command received for {addr}"); + let peer_id = ctx.peer_id.clone(); + let local_library = ctx.local_library.clone(); + let peer_game_db = ctx.peer_game_db.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + + ctx.task_tracker.spawn(async move { + if let Err(err) = perform_handshake_with_peer( + peer_id, + local_library, + peer_game_db.clone(), + tx_notify_ui.clone(), + addr, + None, + ) + .await + { + log::warn!("Failed direct connect to {addr}: {err}"); + return; + } + + if let Err(err) = refresh_direct_peer_games(&peer_game_db, &tx_notify_ui, addr).await { + log::warn!("Failed to refresh direct peer games from {addr}: {err}"); + } + }); +} + +async fn refresh_direct_peer_games( + peer_game_db: &Arc>, + tx_notify_ui: &UnboundedSender, + addr: SocketAddr, +) -> eyre::Result<()> { + let games = request_game_list_from_peer(addr).await?; + let summaries = games.into_iter().map(game_to_summary).collect::>(); + let peer_id = ensure_peer_id_for_addr(peer_game_db, addr).await; + { + let mut db = peer_game_db.write().await; + db.update_peer_games(&peer_id, summaries); + } + events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; + Ok(()) +} + +fn game_to_summary(game: Game) -> GameSummary { + let availability = game.normalized_availability(); + GameSummary { + id: game.id, + name: game.name, + size: game.size, + downloaded: game.downloaded, + installed: game.installed, + eti_version: game.eti_game_version, + manifest_hash: 0, + availability, + } +} + // ============================================================================= // Game announcement helpers // ============================================================================= @@ -737,32 +817,17 @@ pub async fn update_and_announce_games( let db = ctx.peer_game_db.read().await; db.peer_identities() .into_iter() - .map(|(peer_id, addr)| { - let features = db.peer_features(&peer_id); - (peer_id, addr, features) - }) + .map(|(_peer_id, addr)| addr) .collect::>() }; - for (_peer_id, peer_addr, features) in peer_targets { - if features - .iter() - .any(|feature| feature == FEATURE_LIBRARY_DELTA) - { - let delta = delta.clone(); - ctx.task_tracker.spawn(async move { - if let Err(e) = send_library_delta(peer_addr, delta).await { - log::warn!("Failed to send library delta to {peer_addr}: {e}"); - } - }); - } else { - let games_clone = all_games.clone(); - ctx.task_tracker.spawn(async move { - if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await { - log::warn!("Failed to announce games to {peer_addr}: {e}"); - } - }); - } + for peer_addr in peer_targets { + let delta = delta.clone(); + ctx.task_tracker.spawn(async move { + if let Err(e) = send_library_delta(peer_addr, delta).await { + log::warn!("Failed to send library delta to {peer_addr}: {e}"); + } + }); } } @@ -832,6 +897,7 @@ mod tests { CancellationToken::new(), TaskTracker::new(), Arc::new(RwLock::new(HashSet::from(["game".to_string()]))), + true, ) } @@ -851,7 +917,7 @@ mod tests { id: id.to_string(), name: id.to_string(), size: 42, - downloaded: availability.is_downloaded(), + downloaded: availability == Availability::Ready, installed: true, eti_version: Some(version.to_string()), manifest_hash: 7, diff --git a/crates/lanspread-peer/src/identity.rs b/crates/lanspread-peer/src/identity.rs index 33d889f..7a06abf 100644 --- a/crates/lanspread-peer/src/identity.rs +++ b/crates/lanspread-peer/src/identity.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use uuid::Uuid; @@ -7,8 +7,8 @@ const PEER_ID_FILE: &str = "peer_id"; pub const FEATURE_LIBRARY_DELTA: &str = "library-delta-v1"; pub const FEATURE_LIBRARY_SNAPSHOT: &str = "library-snapshot-v1"; -pub fn load_or_create_peer_id() -> eyre::Result { - let path = peer_id_path(); +pub fn load_or_create_peer_id(state_dir: Option<&Path>) -> eyre::Result { + let path = peer_id_path(state_dir); if let Ok(existing) = std::fs::read_to_string(&path) { let trimmed = existing.trim(); if !trimmed.is_empty() { @@ -31,7 +31,11 @@ pub fn default_features() -> Vec { ] } -fn peer_id_path() -> PathBuf { +fn peer_id_path(state_dir: Option<&Path>) -> PathBuf { + if let Some(dir) = state_dir { + return dir.join(PEER_ID_FILE); + } + if let Some(dir) = std::env::var_os("LANSPREAD_STATE_DIR") { return PathBuf::from(dir).join(PEER_ID_FILE); } diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index ef5d9af..ab97312 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -42,7 +42,14 @@ pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT}; pub use error::PeerError; pub use install::{UnpackFuture, Unpacker}; use lanspread_db::db::{Game, GameFileDescription}; -pub use peer_db::{MajorityValidationResult, PeerGameDB, PeerId, PeerInfo, PeerUpsert}; +pub use peer_db::{ + MajorityValidationResult, + PeerGameDB, + PeerId, + PeerInfo, + PeerSnapshot, + PeerUpsert, +}; use tokio::sync::{ RwLock, mpsc::{UnboundedReceiver, UnboundedSender}, @@ -54,6 +61,7 @@ use crate::{ context::Ctx, handlers::{ GameDetailSource, + handle_connect_peer_command, handle_download_game_files_command, handle_get_game_command, handle_get_peer_count_command, @@ -72,6 +80,8 @@ use crate::{ /// Events sent from the peer system to the UI. #[derive(Debug, strum::IntoStaticStr)] pub enum PeerEvent { + /// The local QUIC server is listening and ready to accept peer connections. + LocalPeerReady { peer_id: String, addr: SocketAddr }, /// List of available games from peers. ListGames(Vec), /// File descriptions for a specific game. @@ -184,6 +194,12 @@ pub enum PeerCommand { id: String, file_descriptions: Vec, }, + /// Download game files with an explicit install policy. + DownloadGameFilesWithOptions { + id: String, + file_descriptions: Vec, + install_after_download: bool, + }, /// Install already-downloaded archives into `local/`. InstallGame { id: String }, /// Remove only the `local/` install for a game. @@ -192,6 +208,26 @@ pub enum PeerCommand { SetGameDir(PathBuf), /// Request the current peer count. GetPeerCount, + /// Connect directly to a peer address without waiting for mDNS discovery. + ConnectPeer(SocketAddr), +} + +/// Optional startup settings for non-GUI callers and tests. +#[derive(Clone, Debug)] +pub struct PeerStartOptions { + /// Directory used for peer identity and other state. + pub state_dir: Option, + /// Whether to advertise and discover peers via mDNS. + pub enable_mdns: bool, +} + +impl Default for PeerStartOptions { + fn default() -> Self { + Self { + state_dir: None, + enable_mdns: true, + } + } } // ============================================================================= @@ -218,12 +254,36 @@ pub fn start_peer( unpacker: Arc, catalog: Arc>>, ) -> eyre::Result { + start_peer_with_options( + game_dir, + tx_notify_ui, + peer_game_db, + unpacker, + catalog, + PeerStartOptions::default(), + ) +} + +/// Initialize and start the peer system with explicit startup settings. +#[allow(clippy::implicit_hasher)] +pub fn start_peer_with_options( + game_dir: impl Into, + tx_notify_ui: UnboundedSender, + peer_game_db: Arc>, + unpacker: Arc, + catalog: Arc>>, + options: PeerStartOptions, +) -> eyre::Result { + let PeerStartOptions { + state_dir, + enable_mdns, + } = options; let game_dir = game_dir.into(); log::info!( "Starting peer system with game directory: {}", game_dir.display() ); - let peer_id = identity::load_or_create_peer_id()?; + let peer_id = identity::load_or_create_peer_id(state_dir.as_deref())?; let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel(); @@ -236,6 +296,7 @@ pub fn start_peer( game_dir, unpacker, catalog, + enable_mdns, )) } @@ -251,6 +312,7 @@ async fn run_peer( shutdown: CancellationToken, task_tracker: TaskTracker, catalog: Arc>>, + enable_mdns: bool, ) -> eyre::Result<()> { let ctx = Ctx::new( peer_game_db, @@ -260,6 +322,7 @@ async fn run_peer( shutdown, task_tracker, catalog, + enable_mdns, ); if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await { log::error!("Failed to load initial local game database: {err}"); @@ -316,7 +379,22 @@ async fn handle_peer_commands( id, file_descriptions, } => { - handle_download_game_files_command(ctx, tx_notify_ui, id, file_descriptions).await; + handle_download_game_files_command(ctx, tx_notify_ui, id, file_descriptions, true) + .await; + } + PeerCommand::DownloadGameFilesWithOptions { + id, + file_descriptions, + install_after_download, + } => { + handle_download_game_files_command( + ctx, + tx_notify_ui, + id, + file_descriptions, + install_after_download, + ) + .await; } PeerCommand::InstallGame { id } => { handle_install_game_command(ctx, tx_notify_ui, id).await; @@ -330,6 +408,9 @@ async fn handle_peer_commands( PeerCommand::GetPeerCount => { handle_get_peer_count_command(ctx, tx_notify_ui).await; } + PeerCommand::ConnectPeer(addr) => { + handle_connect_peer_command(ctx, tx_notify_ui, addr).await; + } } } } diff --git a/crates/lanspread-peer/src/network.rs b/crates/lanspread-peer/src/network.rs index c115efa..0de6324 100644 --- a/crates/lanspread-peer/src/network.rs +++ b/crates/lanspread-peer/src/network.rs @@ -123,39 +123,6 @@ pub async fn exchange_hello(peer_addr: SocketAddr, hello: Hello) -> eyre::Result } } -/// Fetches the list of games from a peer. -pub async fn fetch_games_from_peer(peer_addr: SocketAddr) -> eyre::Result> { - let mut conn = connect_to_peer(peer_addr).await?; - - let stream = conn.open_bidirectional_stream().await?; - let (rx, tx) = stream.split(); - let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); - let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); - - // Send ListGames request - framed_tx.send(Request::ListGames.encode()).await?; - let _ = framed_tx.close().await; - - // Receive response - let mut data = BytesMut::new(); - while let Some(Ok(bytes)) = framed_rx.next().await { - data.extend_from_slice(&bytes); - } - - let response = Response::decode(data.freeze()); - if let Response::ListGames(games) = response { - Ok(games) - } else { - log::warn!("Unexpected response from peer {peer_addr}: {response:?}"); - Ok(Vec::new()) - } -} - -/// Announces local games to a peer. -pub async fn announce_games_to_peer(peer_addr: SocketAddr, games: Vec) -> eyre::Result<()> { - send_oneway_request(peer_addr, Request::AnnounceGames(games)).await -} - pub async fn send_library_summary( peer_addr: SocketAddr, summary: LibrarySummary, @@ -178,6 +145,33 @@ pub async fn send_goodbye(peer_addr: SocketAddr, peer_id: String) -> eyre::Resul send_oneway_request(peer_addr, Request::Goodbye { peer_id }).await } +/// Requests the current game list from a peer. +pub async fn request_game_list_from_peer(peer_addr: SocketAddr) -> eyre::Result> { + let mut conn = connect_to_peer(peer_addr).await?; + + let stream = conn.open_bidirectional_stream().await?; + let (rx, tx) = stream.split(); + let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + + framed_tx.send(Request::ListGames.encode()).await?; + framed_tx.close().await?; + + let mut data = BytesMut::new(); + while let Some(Ok(bytes)) = framed_rx.next().await { + data.extend_from_slice(&bytes); + } + + let response = Response::decode(data.freeze()); + match response { + Response::ListGames(games) => Ok(games), + Response::InternalPeerError(error_msg) => { + eyre::bail!("peer {peer_addr} reported internal error: {error_msg}") + } + other => eyre::bail!("unexpected response from {peer_addr}: {other:?}"), + } +} + /// Requests game file details from a peer. pub async fn request_game_details_from_peer( peer_addr: SocketAddr, diff --git a/crates/lanspread-peer/src/peer_db.rs b/crates/lanspread-peer/src/peer_db.rs index a93e427..f5d6b6d 100644 --- a/crates/lanspread-peer/src/peer_db.rs +++ b/crates/lanspread-peer/src/peer_db.rs @@ -7,9 +7,7 @@ use std::{ time::{Duration, Instant}, }; -#[cfg(test)] -use lanspread_db::db::Availability; -use lanspread_db::db::{Game, GameFileDescription}; +use lanspread_db::db::{Availability, Game, GameFileDescription}; use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot}; use crate::library::compute_library_digest; @@ -36,6 +34,18 @@ pub struct PeerInfo { pub files: HashMap>, } +/// Immutable peer state suitable for CLI assertions and tests. +#[derive(Clone, Debug)] +pub struct PeerSnapshot { + pub peer_id: PeerId, + pub addr: SocketAddr, + pub library_rev: u64, + pub library_digest: u64, + pub features: Vec, + pub game_count: usize, + pub games: Vec, +} + /// Database tracking all discovered peers and their games. #[derive(Debug)] pub struct PeerGameDB { @@ -363,6 +373,30 @@ impl PeerGameDB { .collect() } + /// Returns immutable snapshots for all known peers. + #[must_use] + pub fn peer_snapshots(&self) -> Vec { + let mut peers = self + .peers + .values() + .map(|peer| { + let mut games = peer.games.values().cloned().collect::>(); + games.sort_by(|a, b| a.id.cmp(&b.id)); + PeerSnapshot { + peer_id: peer.peer_id.clone(), + addr: peer.addr, + library_rev: peer.library_rev, + library_digest: peer.library_digest, + features: peer.features.clone(), + game_count: games.len(), + games, + } + }) + .collect::>(); + peers.sort_by(|a, b| a.peer_id.cmp(&b.peer_id)); + peers + } + /// Checks if a peer is in the database. #[must_use] pub fn contains_peer(&self, peer_id: &PeerId) -> bool { @@ -744,7 +778,7 @@ fn create_peer_whitelist(peer_scores: HashMap) -> Vec bool { - summary.availability.is_downloaded() + summary.availability == Availability::Ready } fn summary_to_game(summary: &GameSummary) -> Game { @@ -762,7 +796,7 @@ fn summary_to_game(summary: &GameSummary) -> Game { version: "1.0".to_string(), genre: String::new(), size: summary.size, - downloaded: summary.availability.is_downloaded(), + downloaded: game_is_ready(summary), installed: summary.installed, availability: summary.availability.clone(), eti_game_version, diff --git a/crates/lanspread-peer/src/remote_peer.rs b/crates/lanspread-peer/src/remote_peer.rs index 548f207..b6942c6 100644 --- a/crates/lanspread-peer/src/remote_peer.rs +++ b/crates/lanspread-peer/src/remote_peer.rs @@ -1,15 +1,10 @@ -//! Shared helpers for remote peer identity and legacy game announcements. +//! Shared helpers for remote peer identity. -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; -use lanspread_db::db::Game; -use lanspread_proto::GameSummary; use tokio::sync::RwLock; -use crate::{ - library::compute_library_digest, - peer_db::{PeerGameDB, PeerId}, -}; +use crate::peer_db::{PeerGameDB, PeerId}; pub async fn ensure_peer_id_for_addr( peer_game_db: &Arc>, @@ -20,40 +15,7 @@ pub async fn ensure_peer_id_for_addr( return peer_id; } - let legacy_id = format!("legacy-{peer_addr}"); - db.upsert_peer(legacy_id.clone(), peer_addr); - legacy_id -} - -pub fn summary_from_game(game: &Game) -> GameSummary { - GameSummary { - id: game.id.clone(), - name: game.name.clone(), - size: game.size, - downloaded: game.downloaded, - installed: game.installed, - eti_version: game.eti_game_version.clone(), - manifest_hash: 0, - availability: game.normalized_availability(), - } -} - -pub async fn update_peer_from_game_list( - peer_game_db: &Arc>, - peer_addr: SocketAddr, - games: &[Game], -) -> Vec { - let summaries = games.iter().map(summary_from_game).collect::>(); - let mut by_id = HashMap::with_capacity(summaries.len()); - for summary in &summaries { - by_id.insert(summary.id.clone(), summary.clone()); - } - let digest = compute_library_digest(&by_id); - let peer_id = ensure_peer_id_for_addr(peer_game_db, peer_addr).await; - - let mut db = peer_game_db.write().await; - db.update_peer_games(&peer_id, summaries); - let features = db.peer_features(&peer_id); - db.update_peer_library(&peer_id, 0, digest, features); - db.get_all_games() + let addr_id = format!("addr-{peer_addr}"); + db.upsert_peer(addr_id.clone(), peer_addr); + addr_id } diff --git a/crates/lanspread-peer/src/services.rs b/crates/lanspread-peer/src/services.rs index 1c43686..8292b62 100644 --- a/crates/lanspread-peer/src/services.rs +++ b/crates/lanspread-peer/src/services.rs @@ -7,13 +7,13 @@ mod advertise; mod discovery; mod handshake; -mod legacy; mod liveness; mod local_monitor; mod server; mod stream; pub use discovery::run_peer_discovery; +pub(crate) use handshake::perform_handshake_with_peer; pub use liveness::run_ping_service; pub use local_monitor::run_local_game_monitor; pub use server::run_server_component; diff --git a/crates/lanspread-peer/src/services/discovery.rs b/crates/lanspread-peer/src/services/discovery.rs index 407f238..e25fc26 100644 --- a/crates/lanspread-peer/src/services/discovery.rs +++ b/crates/lanspread-peer/src/services/discovery.rs @@ -11,7 +11,7 @@ use crate::{ context::Ctx, events, peer_db::PeerId, - services::{handshake::perform_handshake_with_peer, legacy::request_games_from_peer}, + services::handshake::perform_handshake_with_peer, }; struct MdnsPeerInfo { @@ -128,10 +128,22 @@ async fn handle_discovered_peer( ctx: &Ctx, tx_notify_ui: &UnboundedSender, ) { - let peer_id = info - .peer_id - .clone() - .unwrap_or_else(|| format!("legacy-{}", info.addr)); + if info.proto_ver != Some(PROTOCOL_VERSION) { + log::debug!( + "Ignoring peer at {} with protocol {:?}; expected {PROTOCOL_VERSION}", + info.addr, + info.proto_ver + ); + return; + } + + let Some(peer_id) = info.peer_id.clone() else { + log::debug!( + "Ignoring current-protocol peer at {} without a peer_id TXT record", + info.addr + ); + return; + }; let upsert = { let mut db = ctx.peer_game_db.write().await; @@ -160,30 +172,22 @@ fn spawn_protocol_negotiation( peer_id: PeerId, ) { let peer_addr = info.addr; - let proto_ver = info.proto_ver; let peer_id_arc = ctx.peer_id.clone(); let local_library = ctx.local_library.clone(); let peer_game_db = ctx.peer_game_db.clone(); ctx.task_tracker.spawn(async move { - let handshake_result = if proto_ver.is_none() || proto_ver == Some(PROTOCOL_VERSION) { - perform_handshake_with_peer( - peer_id_arc, - local_library, - peer_game_db.clone(), - tx_notify_ui.clone(), - peer_addr, - Some(peer_id), - ) - .await - } else { - Err(eyre::eyre!("Skipping hello for legacy peer")) - }; - - if handshake_result.is_err() - && let Err(err) = request_games_from_peer(peer_addr, tx_notify_ui, peer_game_db).await + if let Err(err) = perform_handshake_with_peer( + peer_id_arc, + local_library, + peer_game_db, + tx_notify_ui, + peer_addr, + Some(peer_id), + ) + .await { - log::error!("Failed to request games from peer {peer_addr}: {err}"); + log::warn!("Failed to negotiate protocol with peer {peer_addr}: {err}"); } }); } diff --git a/crates/lanspread-peer/src/services/handshake.rs b/crates/lanspread-peer/src/services/handshake.rs index 9fcbbb2..295eacd 100644 --- a/crates/lanspread-peer/src/services/handshake.rs +++ b/crates/lanspread-peer/src/services/handshake.rs @@ -45,7 +45,7 @@ async fn build_hello_from_state( } } -pub(super) async fn perform_handshake_with_peer( +pub(crate) async fn perform_handshake_with_peer( peer_id: Arc, local_library: Arc>, peer_game_db: Arc>, diff --git a/crates/lanspread-peer/src/services/legacy.rs b/crates/lanspread-peer/src/services/legacy.rs deleted file mode 100644 index e1de4a5..0000000 --- a/crates/lanspread-peer/src/services/legacy.rs +++ /dev/null @@ -1,37 +0,0 @@ -//! Compatibility path for peers that only support the original game-list protocol. - -use std::{net::SocketAddr, sync::Arc, time::Duration}; - -use tokio::sync::{RwLock, mpsc::UnboundedSender}; - -use crate::{ - PeerEvent, - events, - network::fetch_games_from_peer, - peer_db::PeerGameDB, - remote_peer::update_peer_from_game_list, -}; - -pub(super) async fn request_games_from_peer( - peer_addr: SocketAddr, - tx_notify_ui: UnboundedSender, - peer_game_db: Arc>, -) -> eyre::Result<()> { - let mut retry_count = 0; - - loop { - let games = fetch_games_from_peer(peer_addr).await?; - log::info!("Received {} games from peer {peer_addr}", games.len()); - - if games.is_empty() && retry_count < 1 { - log::info!("Received 0 games from peer {peer_addr}, scheduling retry in 5s"); - tokio::time::sleep(Duration::from_secs(5)).await; - retry_count += 1; - continue; - } - - let aggregated_games = update_peer_from_game_list(&peer_game_db, peer_addr, &games).await; - events::send(&tx_notify_ui, PeerEvent::ListGames(aggregated_games)); - return Ok(()); - } -} diff --git a/crates/lanspread-peer/src/services/local_monitor.rs b/crates/lanspread-peer/src/services/local_monitor.rs index f1ef263..f3be000 100644 --- a/crates/lanspread-peer/src/services/local_monitor.rs +++ b/crates/lanspread-peer/src/services/local_monitor.rs @@ -375,6 +375,7 @@ mod tests { CancellationToken::new(), TaskTracker::new(), Arc::new(RwLock::new(catalog)), + true, ) } diff --git a/crates/lanspread-peer/src/services/server.rs b/crates/lanspread-peer/src/services/server.rs index c4200d5..3eb64af 100644 --- a/crates/lanspread-peer/src/services/server.rs +++ b/crates/lanspread-peer/src/services/server.rs @@ -35,12 +35,30 @@ pub async fn run_server_component( let server_addr = server.local_addr()?; log::info!("Peer server listening on {server_addr}"); - let mdns_advertiser = start_mdns_advertiser(&ctx, server_addr).await?; - let mdns_monitor = mdns_advertiser.monitor.clone(); - let mdns_shutdown = ctx.shutdown.clone(); - ctx.task_tracker.spawn(async move { - monitor_mdns_events(mdns_monitor, mdns_shutdown).await; - }); + let (ready_addr, _mdns_advertiser) = if ctx.enable_mdns { + let mdns_advertiser = start_mdns_advertiser(&ctx, server_addr).await?; + let mdns_monitor = mdns_advertiser.monitor.clone(); + let mdns_shutdown = ctx.shutdown.clone(); + ctx.task_tracker.spawn(async move { + monitor_mdns_events(mdns_monitor, mdns_shutdown).await; + }); + let ready_addr = + (*ctx.local_peer_addr.read().await).unwrap_or_else(|| direct_connect_addr(server_addr)); + (ready_addr, Some(mdns_advertiser)) + } else { + let addr = direct_connect_addr(server_addr); + *ctx.local_peer_addr.write().await = Some(addr); + log::info!("mDNS disabled; direct peer address is {addr}"); + (addr, None) + }; + + events::send( + &tx_notify_ui, + PeerEvent::LocalPeerReady { + peer_id: ctx.peer_id.as_ref().clone(), + addr: ready_addr, + }, + ); loop { let connection = tokio::select! { @@ -64,6 +82,13 @@ pub async fn run_server_component( } } +fn direct_connect_addr(server_addr: SocketAddr) -> SocketAddr { + if server_addr.ip().is_unspecified() { + return SocketAddr::from(([127, 0, 0, 1], server_addr.port())); + } + server_addr +} + async fn handle_peer_connection( mut connection: Connection, ctx: PeerCtx, diff --git a/crates/lanspread-peer/src/services/stream.rs b/crates/lanspread-peer/src/services/stream.rs index 82b77f8..5502026 100644 --- a/crates/lanspread-peer/src/services/stream.rs +++ b/crates/lanspread-peer/src/services/stream.rs @@ -9,13 +9,12 @@ use s2n_quic::stream::{BidirectionalStream, SendStream}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use crate::{ - PeerEvent, context::PeerCtx, error::PeerError, events, local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available}, peer::{send_game_file_chunk, send_game_file_data}, - remote_peer::{ensure_peer_id_for_addr, update_peer_from_game_list}, + remote_peer::ensure_peer_id_for_addr, services::handshake::{ accept_inbound_hello, perform_handshake_with_peer, @@ -113,10 +112,6 @@ async fn dispatch_request( log::error!("Received invalid request from peer"); framed_tx } - Request::AnnounceGames(games) => { - handle_announce_games(ctx, remote_addr, games).await; - framed_tx - } } } @@ -381,18 +376,6 @@ async fn handle_goodbye(ctx: &PeerCtx, remote_addr: Option, peer_id: events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; } -async fn handle_announce_games(ctx: &PeerCtx, remote_addr: Option, games: Vec) { - log::info!( - "Received {} announced games from peer {remote_addr:?}", - games.len() - ); - - if let Some(addr) = remote_addr { - let aggregated_games = update_peer_from_game_list(&ctx.peer_game_db, addr, &games).await; - events::send(&ctx.tx_notify_ui, PeerEvent::ListGames(aggregated_games)); - } -} - #[cfg(test)] mod tests { use std::{ @@ -438,6 +421,7 @@ mod tests { CancellationToken::new(), TaskTracker::new(), Arc::new(RwLock::new(catalog)), + true, ) .to_peer_ctx(tx_notify_ui) } diff --git a/crates/lanspread-peer/src/startup.rs b/crates/lanspread-peer/src/startup.rs index a6c95f9..d44b991 100644 --- a/crates/lanspread-peer/src/startup.rs +++ b/crates/lanspread-peer/src/startup.rs @@ -84,6 +84,7 @@ pub(crate) fn spawn_peer_runtime( game_dir: PathBuf, unpacker: Arc, catalog: Arc>>, + enable_mdns: bool, ) -> PeerRuntimeHandle { let shutdown = CancellationToken::new(); let task_tracker = TaskTracker::new(); @@ -102,6 +103,7 @@ pub(crate) fn spawn_peer_runtime( runtime_shutdown.clone(), runtime_tracker.clone(), catalog, + enable_mdns, ) .await { @@ -125,7 +127,9 @@ pub(crate) fn spawn_peer_runtime( pub(crate) fn spawn_startup_services(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { spawn_quic_server(ctx, tx_notify_ui); - spawn_peer_discovery_service(ctx, tx_notify_ui); + if ctx.enable_mdns { + spawn_peer_discovery_service(ctx, tx_notify_ui); + } spawn_peer_liveness_service(ctx, tx_notify_ui); spawn_local_library_monitor(ctx, tx_notify_ui); } diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index 966e25e..7295015 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -72,7 +72,6 @@ pub enum Request { offset: u64, length: u64, }, - AnnounceGames(Vec), Hello(Hello), LibrarySummary(LibrarySummary), LibrarySnapshot(LibrarySnapshot), diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index d54a53b..1f8df9a 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -9,7 +9,7 @@ use std::{ use eyre::bail; use lanspread_compat::eti::get_games; -use lanspread_db::db::{Game, GameDB, GameFileDescription}; +use lanspread_db::db::{Availability, Game, GameDB, GameFileDescription}; use lanspread_peer::{ ActiveOperation, ActiveOperationKind, @@ -114,10 +114,17 @@ async fn install_game(id: String, state: tauri::State<'_, LanSpreadState>) -> ta let peer_ctrl_arc = state.inner().peer_ctrl.clone(); let peer_ctrl = peer_ctrl_arc.read().await.clone(); - let games_folder = state.inner().games_folder.read().await.clone(); - let game_path = PathBuf::from(games_folder).join(&id); - let downloaded = game_path.join("version.ini").is_file(); - let installed = local_install_is_present(&game_path); + let Some((downloaded, installed)) = state + .inner() + .games + .read() + .await + .get_game_by_id(&id) + .map(|game| (game.downloaded, game.installed)) + else { + log::warn!("Ignoring install request for unknown game: {id}"); + return Ok(false); + }; let handled = if let Some(peer_ctrl) = peer_ctrl { let command = if !downloaded { @@ -351,177 +358,72 @@ async fn run_game(id: String, state: tauri::State<'_, LanSpreadState>) -> tauri: Ok(()) } -fn eti_package_exists(game_path: &Path, game_id: &str) -> bool { - game_path.is_dir() && game_path.join(format!("{game_id}.eti")).is_file() -} - +#[cfg(target_os = "windows")] fn local_install_is_present(game_path: &Path) -> bool { game_path.join("local").is_dir() } -fn update_game_installation_state(game: &mut Game, games_root: &Path) { - let game_path = games_root.join(&game.id); - if !game_path.is_dir() { - return; - } - - let downloaded = game_path.join("version.ini").is_file(); - game.set_downloaded(downloaded); - - let installed = local_install_is_present(&game_path); - game.installed = installed; - - // Size stays anchored to bundled game.db; skip expensive recalculation. - - if downloaded { - match lanspread_db::db::read_version_from_ini(&game_path) { - Ok(version) => { - game.local_version = version; - if let Some(ref version) = game.local_version { - log::debug!("Read local version for game {}: {}", game.id, version); - } - } - Err(e) => { - log::warn!("Failed to read local version.ini for game {}: {e}", game.id); - game.local_version = None; - } - } - } else { - game.local_version = None; - } - - if installed { - log::debug!("Set {game} to installed"); - } - - if eti_package_exists(&game_path, &game.id) && !downloaded { - log::debug!( - "Game {} has archives but no version.ini sentinel; treating as not downloaded", - game.id - ); - } +fn clear_local_game_state(game: &mut Game) { + game.set_downloaded(false); + game.installed = false; + game.local_version = None; } -/// Left in place for potential re-enablement. Currently not invoked to avoid expensive IO. -#[allow(dead_code)] -fn calculate_directory_size_sync(dir: &Path) -> eyre::Result { - let mut total_size = 0u64; - - for entry in walkdir::WalkDir::new(dir) { - let entry = entry?; - let path = entry.path(); - - if path.is_file() { - let metadata = std::fs::metadata(path)?; - total_size += metadata.len(); - } - } - - Ok(total_size) +fn has_local_game_state(game: &Game) -> bool { + game.downloaded + || game.installed + || game.local_version.is_some() + || game.availability != Availability::LocalOnly } -/// Used for peer-majority calculations but currently disabled. -#[allow(dead_code)] -fn calculate_size_from_file_descriptions( - file_descriptions: &[lanspread_db::db::GameFileDescription], -) -> u64 { - file_descriptions +fn apply_peer_local_state(existing: &mut Game, local_game: &Game) { + existing.set_downloaded(local_game.downloaded); + existing.installed = local_game.installed; + existing.local_version.clone_from(&local_game.local_version); + existing.availability = local_game.normalized_availability(); +} + +fn apply_peer_local_games(game_db: &mut GameDB, local_games: &[Game]) { + let local_game_ids = local_games .iter() - .filter(|desc| !desc.is_dir) - .map(|desc| desc.size) - .sum() -} + .map(|game| game.id.clone()) + .collect::>(); -/// Future hook for reintroducing peer-driven size updates. -#[allow(dead_code)] -async fn update_game_sizes_from_peers( - games: &mut std::collections::HashMap, - peer_game_db: &Arc>, -) { - log::debug!("Updating game sizes from peer data where local files are not available"); + for local_game in local_games { + if let Some(existing_game) = game_db.get_mut_game_by_id(&local_game.id) { + apply_peer_local_state(existing_game, local_game); + log::debug!("Updated local game status for: {}", local_game.id); + } + } - let peer_db = peer_game_db.read().await; - - for game in games.values_mut() { - if !game.downloaded && !game.installed { - let peer_files_for_game = peer_db.aggregated_game_files(&game.id); - - if peer_files_for_game.is_empty() { - if let Some(peer_size) = peer_db.majority_game_size(&game.id) { - if peer_size > 0 { - game.size = peer_size; - log::debug!( - "Updated size for game {} from peer totals: {} bytes", - game.id, - peer_size - ); - } else { - log::debug!( - "Peer-reported size for game {} is 0; keeping previous value", - game.id - ); - } - } else { - log::debug!("No peer size data available for game {}", game.id); - } - } else { - let peer_size = calculate_size_from_file_descriptions(&peer_files_for_game); - - if peer_size > 0 { - game.size = peer_size; - log::debug!( - "Updated size for game {} from peer files: {} bytes ({} files)", - game.id, - peer_size, - peer_files_for_game.len() - ); - } else { - log::debug!( - "Peer files for game {} exist but calculated size is 0", - game.id - ); - } - } + for game in game_db.games.values_mut() { + if !local_game_ids.contains(&game.id) && has_local_game_state(game) { + log::info!( + "Game {} missing from peer local snapshot; marking as unavailable locally", + game.id + ); + clear_local_game_state(game); } } } -async fn refresh_games_list(app_handle: &AppHandle) { +fn clear_all_local_game_states(game_db: &mut GameDB) { + for game in game_db.games.values_mut() { + clear_local_game_state(game); + } +} + +async fn emit_games_list(app_handle: &AppHandle) { let state = app_handle.state::(); - let games_folder_lock = state.games_folder.clone(); let games_db_lock = state.games.clone(); - - let games_folder = games_folder_lock.read().await.clone(); - - let path = if games_folder.is_empty() { - log::debug!("Games folder not set; emitting current game list without rescan"); - None - } else { - Some(PathBuf::from(&games_folder)) - }; - - let mut game_db = games_db_lock.write().await; + let game_db = games_db_lock.read().await; if game_db.games.is_empty() { - log::debug!("Game database empty during refresh; skipping emit"); + log::debug!("Game database empty; skipping emit"); return; } - if let Some(ref path) = path { - if path.exists() { - game_db.set_all_uninstalled(); - for game in game_db.games.values_mut() { - update_game_installation_state(game, path); - } - } else { - log::error!( - "game dir {} does not exist; keeping last known installation state", - path.display() - ); - } - } - let games_to_emit = game_db .all_games() .into_iter() @@ -611,10 +513,16 @@ async fn update_game_directory(app_handle: tauri::AppHandle, path: String) -> ta return Ok(()); } + let path_changed = current_path != path; *state.games_folder.write().await = path; ensure_bundled_game_db_loaded(&app_handle).await; - refresh_games_list(&app_handle).await; + if path_changed { + let mut game_db = state.games.write().await; + clear_all_local_game_states(&mut game_db); + } + + emit_games_list(&app_handle).await; ensure_peer_started(&app_handle, &games_folder).await; Ok(()) @@ -661,46 +569,18 @@ async fn update_game_db(games: Vec, app: AppHandle) { } } - refresh_games_list(&app).await; + emit_games_list(&app).await; } async fn update_local_games_in_db(local_games: Vec, app: AppHandle) { let state = app.state::(); - // Collect local game IDs first to avoid move issues - let local_game_ids: HashSet = local_games.iter().map(|g| g.id.clone()).collect(); - { let mut game_db = state.games.write().await; - - // Update installation status for games that exist locally - for local_game in &local_games { - if let Some(existing_game) = game_db.get_mut_game_by_id(&local_game.id) { - existing_game.set_downloaded(local_game.downloaded); - existing_game.installed = local_game.installed; - existing_game - .local_version - .clone_from(&local_game.local_version); - log::debug!("Updated local game status for: {}", local_game.id); - } - } - - // For games in the main DB that are not in the local list, - // mark them as not downloaded/installed (they were deleted) - for game in game_db.games.values_mut() { - if !local_game_ids.contains(&game.id) && (game.downloaded || game.installed) { - log::info!( - "Game {} no longer exists locally, marking as uninstalled", - game.id - ); - game.set_downloaded(false); - game.installed = false; - game.local_version = None; - } - } + apply_peer_local_games(&mut game_db, &local_games); } - refresh_games_list(&app).await; + emit_games_list(&app).await; } fn add_final_slash(path: &str) -> String { @@ -857,6 +737,12 @@ fn spawn_peer_event_loop(app_handle: AppHandle, mut rx_peer_event: UnboundedRece #[allow(clippy::too_many_lines)] async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { match event { + PeerEvent::LocalPeerReady { peer_id, addr } => { + log::info!("Local peer ready: {peer_id} at {addr}"); + if let Err(e) = app_handle.emit("peer-local-ready", Some((peer_id, addr.to_string()))) { + log::error!("Failed to emit peer-local-ready event: {e}"); + } + } PeerEvent::ListGames(games) => { log::info!("PeerEvent::ListGames received"); update_game_db(games, app_handle.clone()).await; @@ -1118,6 +1004,26 @@ async fn handle_download_finished(app_handle: &AppHandle, id: String) { mod tests { use super::*; + fn game_fixture(id: &str, name: &str) -> Game { + Game { + id: id.to_string(), + name: name.to_string(), + description: format!("{name} description"), + release_year: "2000".to_string(), + publisher: "publisher".to_string(), + max_players: 4, + version: "1.0".to_string(), + genre: "genre".to_string(), + size: 123, + downloaded: false, + installed: false, + availability: Availability::LocalOnly, + eti_game_version: None, + local_version: None, + peer_count: 0, + } + } + #[test] fn active_operation_reconciliation_replaces_stale_ui_history() { let mut active_operations = HashMap::from([ @@ -1152,6 +1058,36 @@ mod tests { ); } + #[test] + fn local_snapshot_without_finish_clears_stale_ui_operation() { + let mut active_operations = + HashMap::from([("game".to_string(), UiOperationKind::Downloading)]); + + reconcile_active_operations(&mut active_operations, &[]); + + assert!( + active_operations.is_empty(), + "an authoritative snapshot without the game should clear a missed finish event" + ); + } + + #[test] + fn local_snapshot_without_begin_restores_active_ui_operation() { + let mut active_operations = HashMap::new(); + let snapshot = vec![ActiveOperation { + id: "game".to_string(), + operation: ActiveOperationKind::Installing, + }]; + + reconcile_active_operations(&mut active_operations, &snapshot); + + assert_eq!( + active_operations.get("game"), + Some(&UiOperationKind::Installing), + "an authoritative snapshot should recover a missed begin event" + ); + } + #[test] fn active_operation_payload_is_sorted_for_stable_ui_updates() { let active_operations = HashMap::from([ @@ -1175,6 +1111,48 @@ mod tests { ] ); } + + #[test] + fn peer_local_snapshot_replaces_local_state_without_overwriting_catalog_metadata() { + let mut alpha = game_fixture("alpha", "Catalog Alpha"); + alpha.size = 999; + alpha.peer_count = 3; + + let mut beta = game_fixture("beta", "Catalog Beta"); + beta.set_downloaded(true); + beta.installed = true; + beta.local_version = Some("20240101".to_string()); + + let mut game_db = GameDB::from(vec![alpha, beta]); + + let mut local_alpha = game_fixture("alpha", "Peer Alpha"); + local_alpha.size = 42; + local_alpha.set_downloaded(true); + local_alpha.local_version = Some("20240202".to_string()); + + let mut unknown = game_fixture("unknown", "Unknown"); + unknown.set_downloaded(true); + unknown.installed = true; + + apply_peer_local_games(&mut game_db, &[local_alpha, unknown]); + + let alpha = game_db.get_game_by_id("alpha").expect("alpha remains"); + assert_eq!(alpha.name, "Catalog Alpha"); + assert_eq!(alpha.size, 999); + assert_eq!(alpha.peer_count, 3); + assert!(alpha.downloaded); + assert!(!alpha.installed); + assert_eq!(alpha.availability, Availability::Ready); + assert_eq!(alpha.local_version.as_deref(), Some("20240202")); + + let beta = game_db.get_game_by_id("beta").expect("beta remains"); + assert!(!beta.downloaded); + assert!(!beta.installed); + assert_eq!(beta.availability, Availability::LocalOnly); + assert_eq!(beta.local_version, None); + + assert!(game_db.get_game_by_id("unknown").is_none()); + } } #[allow(clippy::missing_panics_doc)] diff --git a/crates/lanspread-tauri-deno-ts/src/App.tsx b/crates/lanspread-tauri-deno-ts/src/App.tsx index 50af46c..81f0730 100644 --- a/crates/lanspread-tauri-deno-ts/src/App.tsx +++ b/crates/lanspread-tauri-deno-ts/src/App.tsx @@ -51,7 +51,6 @@ interface Game { enum GameAvailability { Ready = 'Ready', - Downloading = 'Downloading', LocalOnly = 'LocalOnly', }