diff --git a/FINDINGS.md b/FINDINGS.md index cc6a916..1d2eb8b 100644 --- a/FINDINGS.md +++ b/FINDINGS.md @@ -2,6 +2,12 @@ No open pre-merge findings are currently tracked here. +## Claude Review Scope Triage + +No out-of-scope code smells or issues were identified in Claude's review. All +four points were direct follow-up cleanup for the current protocol change and +were handled in code. + The previous three findings have landed in code and tests: - `update_game` now uses `PeerCommand::FetchLatestFromPeers` to skip local diff --git a/crates/lanspread-peer/src/library.rs b/crates/lanspread-peer/src/library.rs index 4932d38..033b238 100644 --- a/crates/lanspread-peer/src/library.rs +++ b/crates/lanspread-peer/src/library.rs @@ -3,7 +3,7 @@ use std::{ hash::{Hash, Hasher}, }; -use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot, LibrarySummary}; +use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot}; const MAX_DELTA_HISTORY: usize = 8; @@ -50,13 +50,6 @@ impl LocalLibraryState { } Some(delta) } - - pub fn delta_since(&self, from_rev: u64) -> Option { - self.recent_deltas - .iter() - .find(|delta| delta.from_rev == from_rev) - .cloned() - } } pub fn compute_library_digest(games: &HashMap) -> u64 { @@ -77,14 +70,6 @@ pub fn compute_library_digest(games: &HashMap) -> u64 { hasher.finish() } -pub fn build_library_summary(state: &LocalLibraryState) -> LibrarySummary { - LibrarySummary { - library_rev: state.revision, - library_digest: state.digest, - game_count: state.games.len(), - } -} - pub fn build_library_snapshot(state: &LocalLibraryState) -> LibrarySnapshot { let mut games: Vec = state.games.values().cloned().collect(); games.sort_by(|a, b| a.id.cmp(&b.id)); diff --git a/crates/lanspread-peer/src/network.rs b/crates/lanspread-peer/src/network.rs index e1d3247..e30bc5b 100644 --- a/crates/lanspread-peer/src/network.rs +++ b/crates/lanspread-peer/src/network.rs @@ -9,16 +9,7 @@ use bytes::BytesMut; use futures::{SinkExt, StreamExt}; use if_addrs::{IfAddr, Interface, get_if_addrs}; use lanspread_db::db::GameFileDescription; -use lanspread_proto::{ - Hello, - HelloAck, - LibraryDelta, - LibrarySnapshot, - LibrarySummary, - Message, - Request, - Response, -}; +use lanspread_proto::{Hello, HelloAck, LibraryDelta, Message, Request, Response}; use s2n_quic::{Client as QuicClient, Connection, client::Connect, provider::limits::Limits}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; @@ -123,36 +114,6 @@ pub async fn exchange_hello(peer_addr: SocketAddr, hello: Hello) -> eyre::Result } } -pub async fn send_library_summary( - peer_addr: SocketAddr, - peer_id: &str, - summary: LibrarySummary, -) -> eyre::Result<()> { - send_oneway_request( - peer_addr, - Request::LibrarySummary { - peer_id: peer_id.to_string(), - summary, - }, - ) - .await -} - -pub async fn send_library_snapshot( - peer_addr: SocketAddr, - peer_id: &str, - snapshot: LibrarySnapshot, -) -> eyre::Result<()> { - send_oneway_request( - peer_addr, - Request::LibrarySnapshot { - peer_id: peer_id.to_string(), - snapshot, - }, - ) - .await -} - pub async fn send_library_delta( peer_addr: SocketAddr, peer_id: &str, diff --git a/crates/lanspread-peer/src/peer_db.rs b/crates/lanspread-peer/src/peer_db.rs index 92e2e32..f90743d 100644 --- a/crates/lanspread-peer/src/peer_db.rs +++ b/crates/lanspread-peer/src/peer_db.rs @@ -248,6 +248,14 @@ impl PeerGameDB { } } + /// Updates the advertised feature list for a peer. + pub fn update_peer_features(&mut self, peer_id: &PeerId, features: Vec) { + if let Some(peer) = self.peers.get_mut(peer_id) { + peer.features = features; + peer.last_seen = Instant::now(); + } + } + /// Applies a full library snapshot for a peer. pub fn apply_library_snapshot(&mut self, peer_id: &PeerId, snapshot: LibrarySnapshot) { if let Some(peer) = self.peers.get_mut(peer_id) { diff --git a/crates/lanspread-peer/src/services/handshake.rs b/crates/lanspread-peer/src/services/handshake.rs index c3eb0d3..6056f71 100644 --- a/crates/lanspread-peer/src/services/handshake.rs +++ b/crates/lanspread-peer/src/services/handshake.rs @@ -2,7 +2,7 @@ use std::{net::SocketAddr, sync::Arc}; -use lanspread_proto::{Hello, HelloAck, LibraryDelta, LibrarySnapshot, PROTOCOL_VERSION}; +use lanspread_proto::{Hello, HelloAck, PROTOCOL_VERSION}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ @@ -10,16 +10,11 @@ use crate::{ context::{Ctx, PeerCtx}, events, identity::default_features, - library::{LocalLibraryState, build_library_snapshot, build_library_summary}, - network::{exchange_hello, send_library_delta, send_library_snapshot, send_library_summary}, + library::{LocalLibraryState, build_library_snapshot}, + network::exchange_hello, peer_db::{PeerGameDB, PeerId, PeerUpsert}, }; -enum LibraryUpdate { - Delta(LibraryDelta), - Snapshot(LibrarySnapshot), -} - #[derive(Clone)] pub(crate) struct HandshakeCtx { peer_id: Arc, @@ -61,12 +56,12 @@ async fn required_listen_addr( pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> eyre::Result { let library_guard = ctx.local_library.read().await; let listen_addr = required_listen_addr(&ctx.local_peer_addr).await?; + let library = build_library_snapshot(&library_guard); Ok(HelloAck { peer_id: ctx.peer_id.as_ref().clone(), proto_ver: PROTOCOL_VERSION, listen_addr, - library_rev: library_guard.revision, - library_digest: library_guard.digest, + library, features: default_features(), }) } @@ -74,12 +69,12 @@ pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> eyre::Result { async fn build_hello_from_state(ctx: &HandshakeCtx) -> eyre::Result { let library_guard = ctx.local_library.read().await; let listen_addr = required_listen_addr(&ctx.local_peer_addr).await?; + let library = build_library_snapshot(&library_guard); Ok(Hello { peer_id: ctx.peer_id.as_ref().clone(), proto_ver: PROTOCOL_VERSION, listen_addr, - library_rev: library_guard.revision, - library_digest: library_guard.digest, + library, features: default_features(), }) } @@ -120,20 +115,13 @@ pub(crate) async fn perform_handshake_with_peer( &ctx.peer_game_db, ack.peer_id.clone(), record_addr, - ack.library_rev, - ack.library_digest, ack.features.clone(), + ack.library, ) .await; - after_peer_library_recorded( - &ctx, - upsert, - record_addr, - ack.library_rev, - ack.library_digest, - ) - .await; + after_peer_library_recorded(&ctx, upsert, record_addr).await; + events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; Ok(()) } @@ -162,20 +150,13 @@ pub(super) async fn accept_inbound_hello( &ctx.peer_game_db, hello.peer_id.clone(), addr, - hello.library_rev, - hello.library_digest, hello.features.clone(), + hello.library, ) .await; - after_peer_library_recorded( - &handshake_ctx, - upsert, - addr, - hello.library_rev, - hello.library_digest, - ) - .await; + after_peer_library_recorded(&handshake_ctx, upsert, addr).await; + events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; build_hello_ack(ctx).await } @@ -197,13 +178,13 @@ async fn record_remote_library( peer_game_db: &Arc>, peer_id: PeerId, peer_addr: SocketAddr, - library_rev: u64, - library_digest: u64, features: Vec, + snapshot: lanspread_proto::LibrarySnapshot, ) -> PeerUpsert { let mut db = peer_game_db.write().await; let upsert = db.upsert_peer(peer_id.clone(), peer_addr); - db.update_peer_library(&peer_id, library_rev, library_digest, features); + db.apply_library_snapshot(&peer_id, snapshot); + db.update_peer_features(&peer_id, features); upsert } @@ -211,85 +192,42 @@ async fn after_peer_library_recorded( ctx: &HandshakeCtx, upsert: PeerUpsert, peer_addr: SocketAddr, - remote_library_rev: u64, - remote_library_digest: u64, ) { if upsert.is_new { events::emit_peer_discovered(&ctx.peer_game_db, &ctx.tx_notify_ui, peer_addr).await; - send_local_library_summary(peer_addr, ctx).await; } - - send_local_library_update_if_needed(peer_addr, ctx, remote_library_rev, remote_library_digest) - .await; -} - -async fn send_local_library_summary(peer_addr: SocketAddr, ctx: &HandshakeCtx) { - let summary = { - let library_guard = ctx.local_library.read().await; - build_library_summary(&library_guard) - }; - let local_peer_id = ctx.peer_id.as_ref().clone(); - - tokio::spawn(async move { - if let Err(err) = send_library_summary(peer_addr, &local_peer_id, summary).await { - log::warn!("Failed to send library summary to {peer_addr}: {err}"); - } - }); -} - -async fn send_local_library_update_if_needed( - peer_addr: SocketAddr, - ctx: &HandshakeCtx, - remote_rev: u64, - remote_digest: u64, -) { - if let Some(update) = select_library_update(&ctx.local_library, remote_rev, remote_digest).await - { - let local_peer_id = ctx.peer_id.as_ref().clone(); - tokio::spawn(async move { - let result = match update { - LibraryUpdate::Delta(delta) => { - send_library_delta(peer_addr, &local_peer_id, delta).await - } - LibraryUpdate::Snapshot(snapshot) => { - send_library_snapshot(peer_addr, &local_peer_id, snapshot).await - } - }; - - if let Err(err) = result { - log::warn!("Failed to send library update to {peer_addr}: {err}"); - } - }); - } -} - -async fn select_library_update( - local_library: &Arc>, - remote_rev: u64, - remote_digest: u64, -) -> Option { - let library_guard = local_library.read().await; - if library_guard.digest == remote_digest || remote_rev > library_guard.revision { - return None; - } - - if let Some(delta) = library_guard.delta_since(remote_rev) { - return Some(LibraryUpdate::Delta(delta)); - } - - Some(LibraryUpdate::Snapshot(build_library_snapshot( - &library_guard, - ))) } #[cfg(test)] mod tests { - use std::{net::SocketAddr, sync::Arc}; + use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, + }; + use lanspread_proto::{Availability, GameSummary, Hello, LibrarySnapshot, PROTOCOL_VERSION}; use tokio::sync::{RwLock, mpsc}; + use tokio_util::{sync::CancellationToken, task::TaskTracker}; - use super::{HandshakeCtx, build_hello_from_state}; - use crate::{library::LocalLibraryState, peer_db::PeerGameDB}; + use super::{HandshakeCtx, accept_inbound_hello, build_hello_from_state}; + use crate::{ + PeerEvent, + UnpackFuture, + Unpacker, + context::Ctx, + library::LocalLibraryState, + peer_db::PeerGameDB, + }; + + struct NoopUnpacker; + + impl Unpacker for NoopUnpacker { + fn unpack<'a>(&'a self, _archive: &'a Path, _dest: &'a Path) -> UnpackFuture<'a> { + Box::pin(async { Ok(()) }) + } + } fn addr(ip: [u8; 4], port: u16) -> SocketAddr { SocketAddr::from((ip, port)) @@ -307,6 +245,19 @@ mod tests { } } + fn summary(id: &str) -> GameSummary { + GameSummary { + id: id.to_string(), + name: id.to_string(), + size: 42, + downloaded: true, + installed: true, + eti_version: Some("20250101".to_string()), + manifest_hash: 7, + availability: Availability::Ready, + } + } + #[tokio::test] async fn outbound_hello_requires_local_listener_addr() { let ctx = test_handshake_ctx(None); @@ -329,4 +280,86 @@ mod tests { assert_eq!(hello.listen_addr, advertised); } + + #[tokio::test] + async fn outbound_hello_carries_local_library_snapshot() { + let ctx = test_handshake_ctx(Some(addr([10, 66, 0, 2], 40000))); + ctx.local_library + .write() + .await + .update_from_scan(HashMap::from([("game".to_string(), summary("game"))]), 7); + + let hello = build_hello_from_state(&ctx) + .await + .expect("listener address is present"); + + assert_eq!(hello.library.library_rev, 7); + assert_eq!(hello.library.games.len(), 1); + assert_eq!(hello.library.games[0].id, "game"); + } + + #[tokio::test] + async fn inbound_hello_applies_remote_library_snapshot() { + let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); + let ctx = Ctx::new( + peer_game_db.clone(), + "local-peer".to_string(), + PathBuf::new(), + Arc::new(NoopUnpacker), + CancellationToken::new(), + TaskTracker::new(), + Arc::new(RwLock::new(HashSet::new())), + ); + *ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000)); + + let (tx_notify_ui, mut rx_notify_ui) = mpsc::unbounded_channel(); + let peer_ctx = ctx.to_peer_ctx(tx_notify_ui); + let remote_addr = addr([127, 0, 0, 1], 5000); + let hello = Hello { + peer_id: "remote-peer".to_string(), + proto_ver: PROTOCOL_VERSION, + listen_addr: remote_addr, + library: LibrarySnapshot { + library_rev: 3, + games: vec![summary("remote-game")], + }, + features: Vec::new(), + }; + + let ack = accept_inbound_hello(&peer_ctx, None, hello) + .await + .expect("current protocol hello should be accepted"); + + assert_eq!(ack.peer_id, "local-peer"); + let snapshots = peer_game_db.read().await.peer_snapshots(); + assert_eq!(snapshots.len(), 1); + assert_eq!(snapshots[0].addr, remote_addr); + assert_eq!(snapshots[0].game_count, 1); + assert_eq!(snapshots[0].games[0].id, "remote-game"); + + assert!(matches!( + rx_notify_ui + .recv() + .await + .expect("peer discovery event should be emitted"), + PeerEvent::PeerDiscovered(addr) if addr == remote_addr + )); + assert!(matches!( + rx_notify_ui + .recv() + .await + .expect("peer count event should be emitted"), + PeerEvent::PeerCountUpdated(1) + )); + let PeerEvent::ListGames(games) = rx_notify_ui + .recv() + .await + .expect("peer game list should be emitted") + else { + panic!("expected ListGames"); + }; + assert_eq!(games.len(), 1); + assert_eq!(games[0].id, "remote-game"); + assert_eq!(games[0].peer_count, 1); + } } diff --git a/crates/lanspread-peer/src/services/stream.rs b/crates/lanspread-peer/src/services/stream.rs index 34a9297..1399809 100644 --- a/crates/lanspread-peer/src/services/stream.rs +++ b/crates/lanspread-peer/src/services/stream.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use futures::{SinkExt, StreamExt}; use lanspread_db::db::{Game, GameFileDescription}; -use lanspread_proto::{LibraryDelta, LibrarySnapshot, LibrarySummary, Message, Request, Response}; +use lanspread_proto::{LibraryDelta, Message, Request, Response}; use s2n_quic::stream::{BidirectionalStream, SendStream}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; @@ -14,12 +14,7 @@ use crate::{ events, local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available}, peer::{send_game_file_chunk, send_game_file_data}, - services::handshake::{ - HandshakeCtx, - accept_inbound_hello, - perform_handshake_with_peer, - spawn_library_resync, - }, + services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync}, }; type ResponseWriter = FramedWrite; @@ -90,14 +85,6 @@ async fn dispatch_request( } }, Request::ListGames => handle_list_games(ctx, framed_tx).await, - Request::LibrarySummary { peer_id, summary } => { - handle_library_summary(ctx, peer_id, summary).await; - framed_tx - } - Request::LibrarySnapshot { peer_id, snapshot } => { - handle_library_snapshot(ctx, peer_id, snapshot).await; - framed_tx - } Request::LibraryDelta { peer_id, delta } => { handle_library_delta(ctx, peer_id, delta).await; framed_tx @@ -168,58 +155,6 @@ async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> Response send_response(framed_tx, Response::ListGames(games), "ListGames").await } -async fn handle_library_summary(ctx: &PeerCtx, peer_id: String, summary: LibrarySummary) { - let (addr, previous_digest, features) = { - let db = ctx.peer_game_db.read().await; - let Some(addr) = db.peer_addr(&peer_id) else { - log::debug!("Ignoring library summary from unknown peer {peer_id}"); - return; - }; - let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0)); - (addr, digest, db.peer_features(&peer_id)) - }; - - { - let mut db = ctx.peer_game_db.write().await; - db.update_peer_library( - &peer_id, - summary.library_rev, - summary.library_digest, - features, - ); - } - - if summary.library_digest != previous_digest { - ctx.task_tracker.spawn({ - let handshake_ctx = HandshakeCtx::from_peer_ctx(ctx); - async move { - if let Err(err) = - perform_handshake_with_peer(handshake_ctx, addr, Some(peer_id)).await - { - log::warn!("Failed to refresh library from {addr}: {err}"); - } - } - }); - } -} - -async fn handle_library_snapshot(ctx: &PeerCtx, peer_id: String, snapshot: LibrarySnapshot) { - let applied = { - let mut db = ctx.peer_game_db.write().await; - if db.peer_addr(&peer_id).is_some() { - db.apply_library_snapshot(&peer_id, snapshot); - true - } else { - log::debug!("Ignoring library snapshot from unknown peer {peer_id}"); - false - } - }; - - if applied { - events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; - } -} - async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelta) { let applied = { let mut db = ctx.peer_game_db.write().await; diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index 0581bc6..ea8f2f7 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use lanspread_db::db::{Game, GameFileDescription}; use serde::{Deserialize, Serialize}; -pub const PROTOCOL_VERSION: u32 = 3; +pub const PROTOCOL_VERSION: u32 = 4; pub use lanspread_db::db::Availability; @@ -25,8 +25,7 @@ pub struct Hello { pub peer_id: String, pub proto_ver: u32, pub listen_addr: SocketAddr, - pub library_rev: u64, - pub library_digest: u64, + pub library: LibrarySnapshot, pub features: Vec, } @@ -35,18 +34,10 @@ pub struct HelloAck { pub peer_id: String, pub proto_ver: u32, pub listen_addr: SocketAddr, - pub library_rev: u64, - pub library_digest: u64, + pub library: LibrarySnapshot, pub features: Vec, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct LibrarySummary { - pub library_rev: u64, - pub library_digest: u64, - pub game_count: usize, -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LibrarySnapshot { pub library_rev: u64, @@ -77,14 +68,6 @@ pub enum Request { length: u64, }, Hello(Hello), - LibrarySummary { - peer_id: String, - summary: LibrarySummary, - }, - LibrarySnapshot { - peer_id: String, - snapshot: LibrarySnapshot, - }, LibraryDelta { peer_id: String, delta: LibraryDelta,