From ce51d92df00b9d4058ea5b6609a69a6ab87ed7f4 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Mon, 18 May 2026 18:21:19 +0200 Subject: [PATCH] refactor(peer): tighten listener-addr handshake invariant Follow-up hardening for 348a02c, where `listen_addr` was added to Hello and HelloAck as `Option`. Code review surfaced three concrete problems that the previous commit left open: 1. Cold-start asymmetry. Discovery and the QUIC/mDNS advertiser are spawned concurrently. If discovery saw a cached peer advertisement before our own advertiser had written `ctx.local_peer_addr`, our outbound Hello carried `listen_addr: None`. The receiver's `peer_record_addr` then returned `None` and silently dropped the Hello while we still recorded their HelloAck, so peer A learned about peer B but B never learned about A until a later handshake happened to win the race. 2. Duplicate game-list pipeline. The previous commit added `refresh_peer_games`, which post-handshake issued a `ListGames` to fetch `peer.games`. The library-sync path (`LibrarySnapshot`) already populates the same field. Both could race on first contact and overwrite each other. Worse, `refresh_peer_games` was misnamed: a `peer_game_count > 0` guard turned it into a fetch-once-then-no-op helper, while `handle_library_summary` independently re-triggered a full handshake when `previous_count == 0` was observed, producing a redundant ping-pong on every first contact. 3. Argument explosion. `perform_handshake_with_peer`, `spawn_library_resync`, and `after_peer_library_recorded` had grown to 6-8 individual parameters and acquired `#[allow(clippy::too_many_arguments)]` opt-outs. Every caller was destructuring the same fields out of `Ctx`/`PeerCtx`. Changes (all in one commit because they jointly enforce the same invariant: "a peer is only ever recorded by its listener address, and the local listener address must exist before we participate in the protocol"): - `Hello.listen_addr` and `HelloAck.listen_addr` are now `SocketAddr`, not `Option`. Wire-incompatible, but PROTOCOL_VERSION already moved to 3 in 348a02c so no additional version bump is needed. - `required_listen_addr` reads `ctx.local_peer_addr` and returns an `eyre::Result`; `build_hello_from_state` and `build_hello_ack` both call it, so an outbound or inbound Hello can no longer be constructed before the local QUIC listener is bound. The inbound path maps this into a `Response::InternalPeerError` so the remote peer fails cleanly instead of seeing a malformed HelloAck. - `run_peer_discovery` blocks on `wait_for_local_peer_addr` (25 ms poll, shutdown-aware) before subscribing to the mDNS browser. This closes the cold-start race for outbound handshakes at the source. - `refresh_peer_games`, `request_game_list_from_peer`, and the `previous_count == 0` re-handshake trigger are removed. The post-handshake flow now relies solely on `LibrarySummary`/`LibrarySnapshot`/`LibraryDelta` for peer-library state; `ListGames` survives only for the `request_game_details_*` paths that fetch per-game file descriptions on demand. - New `HandshakeCtx` (with `from_ctx` and `from_peer_ctx` constructors) replaces the long argument lists. All `too_many_arguments` allow-attrs in `handshake.rs` are gone, and call sites in `handlers.rs`, `discovery.rs`, and `stream.rs` collapse to a single clone. - `handle_library_delta` no longer acquires a read lock on the apply path: the `peer_addr` lookup moved into the `else` resync branch where it is actually needed. - `accept_inbound_hello`'s `remote_addr` parameter is renamed to `transport_addr`. It is now used only for warn-log formatting, and the new name signals that this is the ephemeral QUIC source port, never the authoritative listener address that gets recorded. User-visible effect: on cold start, peers can no longer end up with an asymmetric view of each other ("A sees B but B never sees A"). First-contact library sync now does one handshake plus one snapshot/delta exchange instead of the previous handshake + ListGames + redundant follow-up handshake. The direct-connect CLI path (`handle_connect_peer_command`) now fails fast with "local peer listener address is not ready" if invoked before the QUIC server has bound; this is intentional - the previous behaviour would have sent a Hello that the receiver had to silently discard. Test Plan: - just fmt - just clippy - just test (80 peer + 3 cli + 5 tauri tests pass) - just build - Manual: bring up `just peer-cli-alpha`/`bravo`/`charlie`, confirm symmetric peer discovery and that games show up on every side after one library digest cycle, with no duplicated ListGames traffic in trace logs. Refs: Review feedback on commit 348a02c (listener-address handshake fix). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/lanspread-peer/ARCHITECTURE.md | 8 +- crates/lanspread-peer/src/handlers.rs | 20 +- crates/lanspread-peer/src/network.rs | 29 +- crates/lanspread-peer/src/remote_peer.rs | 42 +-- crates/lanspread-peer/src/services.rs | 2 +- .../lanspread-peer/src/services/discovery.rs | 39 +-- .../lanspread-peer/src/services/handshake.rs | 278 ++++++++---------- crates/lanspread-peer/src/services/stream.rs | 74 ++--- crates/lanspread-proto/src/lib.rs | 4 +- 9 files changed, 182 insertions(+), 314 deletions(-) diff --git a/crates/lanspread-peer/ARCHITECTURE.md b/crates/lanspread-peer/ARCHITECTURE.md index 548269d..6758f21 100644 --- a/crates/lanspread-peer/ARCHITECTURE.md +++ b/crates/lanspread-peer/ARCHITECTURE.md @@ -26,10 +26,10 @@ chunked file transfers. When a peer is discovered: -1. Connect and send `Hello { peer_id, proto_ver, library_rev, library_digest, features }`. - `Hello` also carries the sender's advertised `listen_addr`; the QUIC source - port is only a temporary transport port and must not be recorded as the - peer's listener. +1. Connect and send `Hello { peer_id, proto_ver, listen_addr, library_rev, + library_digest, features }`. `listen_addr` is mandatory; the QUIC source port + is only a temporary transport port and must not be recorded as the peer's + listener. 2. Receive `HelloAck { peer_id, proto_ver, listen_addr, library_rev, library_digest, features }`. 3. If the remote `peer_id` is already known but the address changed, update it. diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 050998e..16a6a5a 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -33,7 +33,7 @@ use crate::{ network::{request_game_details_from_peer, send_library_delta}, peer_db::PeerGameDB, remote_peer::ensure_peer_id_for_addr, - services::perform_handshake_with_peer, + services::{HandshakeCtx, perform_handshake_with_peer}, }; // ============================================================================= @@ -704,24 +704,10 @@ pub async fn handle_connect_peer_command( addr: SocketAddr, ) { log::info!("Direct connect command received for {addr}"); - let peer_id = ctx.peer_id.clone(); - let local_peer_addr = ctx.local_peer_addr.clone(); - let local_library = ctx.local_library.clone(); - let peer_game_db = ctx.peer_game_db.clone(); - let tx_notify_ui = tx_notify_ui.clone(); + let handshake_ctx = HandshakeCtx::from_ctx(ctx, tx_notify_ui); ctx.task_tracker.spawn(async move { - if let Err(err) = perform_handshake_with_peer( - peer_id, - local_peer_addr, - local_library, - peer_game_db, - tx_notify_ui, - addr, - None, - ) - .await - { + if let Err(err) = perform_handshake_with_peer(handshake_ctx, addr, None).await { log::warn!("Failed direct connect to {addr}: {err}"); } }); diff --git a/crates/lanspread-peer/src/network.rs b/crates/lanspread-peer/src/network.rs index 673bf93..e1d3247 100644 --- a/crates/lanspread-peer/src/network.rs +++ b/crates/lanspread-peer/src/network.rs @@ -8,7 +8,7 @@ use std::{ use bytes::BytesMut; use futures::{SinkExt, StreamExt}; use if_addrs::{IfAddr, Interface, get_if_addrs}; -use lanspread_db::db::{Game, GameFileDescription}; +use lanspread_db::db::GameFileDescription; use lanspread_proto::{ Hello, HelloAck, @@ -172,33 +172,6 @@ pub async fn send_goodbye(peer_addr: SocketAddr, peer_id: String) -> eyre::Resul send_oneway_request(peer_addr, Request::Goodbye { peer_id }).await } -/// Requests the current game list from a peer. -pub async fn request_game_list_from_peer(peer_addr: SocketAddr) -> eyre::Result> { - let mut conn = connect_to_peer(peer_addr).await?; - - let stream = conn.open_bidirectional_stream().await?; - let (rx, tx) = stream.split(); - let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); - let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); - - framed_tx.send(Request::ListGames.encode()).await?; - framed_tx.close().await?; - - let mut data = BytesMut::new(); - while let Some(Ok(bytes)) = framed_rx.next().await { - data.extend_from_slice(&bytes); - } - - let response = Response::decode(data.freeze()); - match response { - Response::ListGames(games) => Ok(games), - Response::InternalPeerError(error_msg) => { - eyre::bail!("peer {peer_addr} reported internal error: {error_msg}") - } - other => eyre::bail!("unexpected response from {peer_addr}: {other:?}"), - } -} - /// Requests game file details from a peer. pub async fn request_game_details_from_peer( peer_addr: SocketAddr, diff --git a/crates/lanspread-peer/src/remote_peer.rs b/crates/lanspread-peer/src/remote_peer.rs index e55fde5..f90d6cd 100644 --- a/crates/lanspread-peer/src/remote_peer.rs +++ b/crates/lanspread-peer/src/remote_peer.rs @@ -2,15 +2,9 @@ use std::{net::SocketAddr, sync::Arc}; -use lanspread_db::db::Game; -use lanspread_proto::GameSummary; use tokio::sync::RwLock; -use crate::{ - events, - network::request_game_list_from_peer, - peer_db::{PeerGameDB, PeerId}, -}; +use crate::peer_db::{PeerGameDB, PeerId}; pub async fn ensure_peer_id_for_addr( peer_game_db: &Arc>, @@ -25,37 +19,3 @@ pub async fn ensure_peer_id_for_addr( db.upsert_peer(addr_id.clone(), peer_addr); addr_id } - -pub async fn refresh_peer_games( - peer_game_db: &Arc>, - tx_notify_ui: &tokio::sync::mpsc::UnboundedSender, - peer_addr: SocketAddr, - peer_id: &PeerId, -) -> eyre::Result<()> { - if peer_game_db.read().await.peer_game_count(peer_id) > 0 { - return Ok(()); - } - - let games = request_game_list_from_peer(peer_addr).await?; - let summaries = games.into_iter().map(game_to_summary).collect::>(); - { - let mut db = peer_game_db.write().await; - db.update_peer_games(peer_id, summaries); - } - events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; - Ok(()) -} - -fn game_to_summary(game: Game) -> GameSummary { - let availability = game.normalized_availability(); - GameSummary { - id: game.id, - name: game.name, - size: game.size, - downloaded: game.downloaded, - installed: game.installed, - eti_version: game.eti_game_version, - manifest_hash: 0, - availability, - } -} diff --git a/crates/lanspread-peer/src/services.rs b/crates/lanspread-peer/src/services.rs index 8292b62..1bfd6c7 100644 --- a/crates/lanspread-peer/src/services.rs +++ b/crates/lanspread-peer/src/services.rs @@ -13,7 +13,7 @@ mod server; mod stream; pub use discovery::run_peer_discovery; -pub(crate) use handshake::perform_handshake_with_peer; +pub(crate) use handshake::{HandshakeCtx, perform_handshake_with_peer}; pub use liveness::run_ping_service; pub use local_monitor::run_local_game_monitor; pub use server::run_server_component; diff --git a/crates/lanspread-peer/src/services/discovery.rs b/crates/lanspread-peer/src/services/discovery.rs index 53a1dba..2c81f62 100644 --- a/crates/lanspread-peer/src/services/discovery.rs +++ b/crates/lanspread-peer/src/services/discovery.rs @@ -11,7 +11,7 @@ use crate::{ context::Ctx, events, peer_db::PeerId, - services::handshake::perform_handshake_with_peer, + services::handshake::{HandshakeCtx, perform_handshake_with_peer}, }; struct MdnsPeerInfo { @@ -29,6 +29,10 @@ pub async fn run_peer_discovery( ) -> eyre::Result<()> { log::info!("Starting peer discovery task"); + if !wait_for_local_peer_addr(&ctx).await { + return Ok(()); + } + let service_type = LANSPREAD_SERVICE_TYPE.to_string(); let (service_tx, mut service_rx) = tokio::sync::mpsc::unbounded_channel(); let worker_shutdown = ctx.shutdown.clone(); @@ -93,6 +97,19 @@ pub async fn run_peer_discovery( } } +async fn wait_for_local_peer_addr(ctx: &Ctx) -> bool { + loop { + if ctx.local_peer_addr.read().await.is_some() { + return true; + } + + tokio::select! { + () = ctx.shutdown.cancelled() => return false, + () = tokio::time::sleep(Duration::from_millis(25)) => {} + } + } +} + fn parse_mdns_peer(service: &MdnsService) -> MdnsPeerInfo { MdnsPeerInfo { addr: service.addr, @@ -161,33 +178,21 @@ async fn handle_discovered_peer( } if upsert.is_new || upsert.addr_changed { - spawn_protocol_negotiation(&info, ctx, tx_notify_ui.clone(), peer_id); + spawn_protocol_negotiation(&info, ctx, tx_notify_ui, peer_id); } } fn spawn_protocol_negotiation( info: &MdnsPeerInfo, ctx: &Ctx, - tx_notify_ui: UnboundedSender, + tx_notify_ui: &UnboundedSender, peer_id: PeerId, ) { let peer_addr = info.addr; - let peer_id_arc = ctx.peer_id.clone(); - let local_peer_addr = ctx.local_peer_addr.clone(); - let local_library = ctx.local_library.clone(); - let peer_game_db = ctx.peer_game_db.clone(); + let handshake_ctx = HandshakeCtx::from_ctx(ctx, tx_notify_ui); ctx.task_tracker.spawn(async move { - if let Err(err) = perform_handshake_with_peer( - peer_id_arc, - local_peer_addr, - local_library, - peer_game_db, - tx_notify_ui, - peer_addr, - Some(peer_id), - ) - .await + if let Err(err) = perform_handshake_with_peer(handshake_ctx, peer_addr, Some(peer_id)).await { log::warn!("Failed to negotiate protocol with peer {peer_addr}: {err}"); } diff --git a/crates/lanspread-peer/src/services/handshake.rs b/crates/lanspread-peer/src/services/handshake.rs index 96a16f2..c3eb0d3 100644 --- a/crates/lanspread-peer/src/services/handshake.rs +++ b/crates/lanspread-peer/src/services/handshake.rs @@ -7,13 +7,12 @@ use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ PeerEvent, - context::PeerCtx, + context::{Ctx, PeerCtx}, events, identity::default_features, library::{LocalLibraryState, build_library_snapshot, build_library_summary}, network::{exchange_hello, send_library_delta, send_library_snapshot, send_library_summary}, peer_db::{PeerGameDB, PeerId, PeerUpsert}, - remote_peer::refresh_peer_games, }; enum LibraryUpdate { @@ -21,46 +20,76 @@ enum LibraryUpdate { Snapshot(LibrarySnapshot), } -pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> HelloAck { +#[derive(Clone)] +pub(crate) struct HandshakeCtx { + peer_id: Arc, + local_peer_addr: Arc>>, + local_library: Arc>, + peer_game_db: Arc>, + tx_notify_ui: UnboundedSender, +} + +impl HandshakeCtx { + pub(crate) fn from_ctx(ctx: &Ctx, tx_notify_ui: &UnboundedSender) -> Self { + Self { + peer_id: ctx.peer_id.clone(), + local_peer_addr: ctx.local_peer_addr.clone(), + local_library: ctx.local_library.clone(), + peer_game_db: ctx.peer_game_db.clone(), + tx_notify_ui: tx_notify_ui.clone(), + } + } + + pub(crate) fn from_peer_ctx(ctx: &PeerCtx) -> Self { + Self { + peer_id: ctx.peer_id.clone(), + local_peer_addr: ctx.local_peer_addr.clone(), + local_library: ctx.local_library.clone(), + peer_game_db: ctx.peer_game_db.clone(), + tx_notify_ui: ctx.tx_notify_ui.clone(), + } + } +} + +async fn required_listen_addr( + local_peer_addr: &Arc>>, +) -> eyre::Result { + (*local_peer_addr.read().await) + .ok_or_else(|| eyre::eyre!("local peer listener address is not ready")) +} + +pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> eyre::Result { let library_guard = ctx.local_library.read().await; - let listen_addr = *ctx.local_peer_addr.read().await; - HelloAck { + let listen_addr = required_listen_addr(&ctx.local_peer_addr).await?; + Ok(HelloAck { peer_id: ctx.peer_id.as_ref().clone(), proto_ver: PROTOCOL_VERSION, listen_addr, library_rev: library_guard.revision, library_digest: library_guard.digest, features: default_features(), - } + }) } -async fn build_hello_from_state( - peer_id: &str, - local_peer_addr: &Arc>>, - local_library: &Arc>, -) -> Hello { - let library_guard = local_library.read().await; - let listen_addr = *local_peer_addr.read().await; - Hello { - peer_id: peer_id.to_string(), +async fn build_hello_from_state(ctx: &HandshakeCtx) -> eyre::Result { + let library_guard = ctx.local_library.read().await; + let listen_addr = required_listen_addr(&ctx.local_peer_addr).await?; + Ok(Hello { + peer_id: ctx.peer_id.as_ref().clone(), proto_ver: PROTOCOL_VERSION, listen_addr, library_rev: library_guard.revision, library_digest: library_guard.digest, features: default_features(), - } + }) } pub(crate) async fn perform_handshake_with_peer( - peer_id: Arc, - local_peer_addr: Arc>>, - local_library: Arc>, - peer_game_db: Arc>, - tx_notify_ui: UnboundedSender, + ctx: HandshakeCtx, peer_addr: SocketAddr, peer_id_hint: Option, ) -> eyre::Result<()> { - let hello = build_hello_from_state(peer_id.as_ref(), &local_peer_addr, &local_library).await; + let hello = build_hello_from_state(&ctx).await?; let ack = exchange_hello(peer_addr, hello).await?; if ack.proto_ver != PROTOCOL_VERSION { @@ -71,7 +100,7 @@ pub(crate) async fn perform_handshake_with_peer( return Ok(()); } - if ack.peer_id == *peer_id { + if ack.peer_id == *ctx.peer_id { log::trace!("Ignoring handshake with self for {peer_addr}"); return Ok(()); } @@ -83,12 +112,12 @@ pub(crate) async fn perform_handshake_with_peer( "Peer {peer_addr} id mismatch: mDNS advertised {expected}, hello ack returned {}", ack.peer_id ); - let _ = peer_game_db.write().await.remove_peer(expected); + let _ = ctx.peer_game_db.write().await.remove_peer(expected); } - let record_addr = ack.listen_addr.unwrap_or(peer_addr); + let record_addr = ack.listen_addr; let upsert = record_remote_library( - &peer_game_db, + &ctx.peer_game_db, ack.peer_id.clone(), record_addr, ack.library_rev, @@ -98,31 +127,22 @@ pub(crate) async fn perform_handshake_with_peer( .await; after_peer_library_recorded( + &ctx, upsert, record_addr, - peer_id.as_ref(), ack.library_rev, ack.library_digest, - &local_library, - &peer_game_db, - &tx_notify_ui, ) .await; - if let Err(err) = - refresh_peer_games(&peer_game_db, &tx_notify_ui, record_addr, &ack.peer_id).await - { - log::warn!("Failed to refresh peer games from {record_addr}: {err}"); - } - Ok(()) } pub(super) async fn accept_inbound_hello( ctx: &PeerCtx, - remote_addr: Option, + transport_addr: Option, hello: Hello, -) -> HelloAck { +) -> eyre::Result { if hello.peer_id == *ctx.peer_id { log::trace!("Ignoring hello from self"); return build_hello_ack(ctx).await; @@ -130,91 +150,44 @@ pub(super) async fn accept_inbound_hello( if hello.proto_ver != PROTOCOL_VERSION { log::warn!( - "Incompatible protocol from {remote_addr:?}: {}", + "Incompatible protocol from {transport_addr:?}: {}", hello.proto_ver ); return build_hello_ack(ctx).await; } - if let Some(addr) = peer_record_addr(&ctx.peer_game_db, &hello.peer_id, hello.listen_addr).await - { - let upsert = record_remote_library( - &ctx.peer_game_db, - hello.peer_id.clone(), - addr, - hello.library_rev, - hello.library_digest, - hello.features.clone(), - ) - .await; + let addr = hello.listen_addr; + let handshake_ctx = HandshakeCtx::from_peer_ctx(ctx); + let upsert = record_remote_library( + &ctx.peer_game_db, + hello.peer_id.clone(), + addr, + hello.library_rev, + hello.library_digest, + hello.features.clone(), + ) + .await; - after_peer_library_recorded( - upsert, - addr, - ctx.peer_id.as_ref(), - hello.library_rev, - hello.library_digest, - &ctx.local_library, - &ctx.peer_game_db, - &ctx.tx_notify_ui, - ) - .await; - - spawn_peer_game_refresh(ctx, addr, hello.peer_id.clone()); - } else { - log::debug!( - "Ignoring inbound hello from {} without a known listener address", - hello.peer_id - ); - } + after_peer_library_recorded( + &handshake_ctx, + upsert, + addr, + hello.library_rev, + hello.library_digest, + ) + .await; build_hello_ack(ctx).await } -async fn peer_record_addr( - peer_game_db: &Arc>, - peer_id: &PeerId, - listen_addr: Option, -) -> Option { - let db = peer_game_db.read().await; - listen_addr.or_else(|| db.peer_addr(peer_id)) -} - -fn spawn_peer_game_refresh(ctx: &PeerCtx, peer_addr: SocketAddr, peer_id: PeerId) { - let peer_game_db = ctx.peer_game_db.clone(); - let tx_notify_ui = ctx.tx_notify_ui.clone(); - ctx.task_tracker.spawn(async move { - if let Err(err) = - refresh_peer_games(&peer_game_db, &tx_notify_ui, peer_addr, &peer_id).await - { - log::warn!("Failed to refresh peer games from {peer_addr}: {err}"); - } - }); -} - -#[allow(clippy::too_many_arguments)] pub(super) fn spawn_library_resync( - peer_id: Arc, - local_peer_addr: Arc>>, - local_library: Arc>, - peer_game_db: Arc>, - tx_notify_ui: UnboundedSender, + ctx: HandshakeCtx, peer_addr: SocketAddr, peer_id_hint: PeerId, reason: &'static str, ) { tokio::spawn(async move { - if let Err(err) = perform_handshake_with_peer( - peer_id, - local_peer_addr, - local_library, - peer_game_db, - tx_notify_ui, - peer_addr, - Some(peer_id_hint), - ) - .await - { + if let Err(err) = perform_handshake_with_peer(ctx, peer_addr, Some(peer_id_hint)).await { log::warn!("Failed to {reason} library from {peer_addr}: {err}"); } }); @@ -234,42 +207,28 @@ async fn record_remote_library( upsert } -#[allow(clippy::too_many_arguments)] async fn after_peer_library_recorded( + ctx: &HandshakeCtx, upsert: PeerUpsert, peer_addr: SocketAddr, - local_peer_id: &str, remote_library_rev: u64, remote_library_digest: u64, - local_library: &Arc>, - peer_game_db: &Arc>, - tx_notify_ui: &UnboundedSender, ) { if upsert.is_new { - events::emit_peer_discovered(peer_game_db, tx_notify_ui, peer_addr).await; - send_local_library_summary(peer_addr, local_peer_id, local_library).await; + events::emit_peer_discovered(&ctx.peer_game_db, &ctx.tx_notify_ui, peer_addr).await; + send_local_library_summary(peer_addr, ctx).await; } - send_local_library_update_if_needed( - peer_addr, - local_peer_id, - local_library, - remote_library_rev, - remote_library_digest, - ) - .await; + send_local_library_update_if_needed(peer_addr, ctx, remote_library_rev, remote_library_digest) + .await; } -async fn send_local_library_summary( - peer_addr: SocketAddr, - local_peer_id: &str, - local_library: &Arc>, -) { +async fn send_local_library_summary(peer_addr: SocketAddr, ctx: &HandshakeCtx) { let summary = { - let library_guard = local_library.read().await; + let library_guard = ctx.local_library.read().await; build_library_summary(&library_guard) }; - let local_peer_id = local_peer_id.to_string(); + let local_peer_id = ctx.peer_id.as_ref().clone(); tokio::spawn(async move { if let Err(err) = send_library_summary(peer_addr, &local_peer_id, summary).await { @@ -280,13 +239,13 @@ async fn send_local_library_summary( async fn send_local_library_update_if_needed( peer_addr: SocketAddr, - local_peer_id: &str, - local_library: &Arc>, + ctx: &HandshakeCtx, remote_rev: u64, remote_digest: u64, ) { - if let Some(update) = select_library_update(local_library, remote_rev, remote_digest).await { - let local_peer_id = local_peer_id.to_string(); + if let Some(update) = select_library_update(&ctx.local_library, remote_rev, remote_digest).await + { + let local_peer_id = ctx.peer_id.as_ref().clone(); tokio::spawn(async move { let result = match update { LibraryUpdate::Delta(delta) => { @@ -327,46 +286,47 @@ async fn select_library_update( mod tests { use std::{net::SocketAddr, sync::Arc}; - use tokio::sync::RwLock; + use tokio::sync::{RwLock, mpsc}; - use super::peer_record_addr; - use crate::peer_db::PeerGameDB; + use super::{HandshakeCtx, build_hello_from_state}; + use crate::{library::LocalLibraryState, peer_db::PeerGameDB}; fn addr(ip: [u8; 4], port: u16) -> SocketAddr { SocketAddr::from((ip, port)) } - #[tokio::test] - async fn inbound_hello_keeps_existing_listening_addr() { + fn test_handshake_ctx(local_peer_addr: Option) -> HandshakeCtx { + let (tx_notify_ui, _rx_notify_ui) = mpsc::unbounded_channel(); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); - let advertised = addr([10, 66, 0, 2], 40000); - peer_game_db - .write() + HandshakeCtx { + peer_id: Arc::new("local-peer".to_string()), + local_peer_addr: Arc::new(RwLock::new(local_peer_addr)), + local_library: Arc::new(RwLock::new(LocalLibraryState::empty())), + peer_game_db, + tx_notify_ui, + } + } + + #[tokio::test] + async fn outbound_hello_requires_local_listener_addr() { + let ctx = test_handshake_ctx(None); + + let err = build_hello_from_state(&ctx) .await - .upsert_peer("peer".to_string(), advertised); + .expect_err("hello without listener must fail"); - let record_addr = peer_record_addr(&peer_game_db, &"peer".to_string(), None).await; - - assert_eq!(record_addr, Some(advertised)); + assert_eq!(err.to_string(), "local peer listener address is not ready"); } #[tokio::test] - async fn inbound_hello_prefers_reported_listening_addr() { - let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); + async fn outbound_hello_carries_local_listener_addr() { let advertised = addr([10, 66, 0, 2], 40000); + let ctx = test_handshake_ctx(Some(advertised)); - let record_addr = - peer_record_addr(&peer_game_db, &"peer".to_string(), Some(advertised)).await; + let hello = build_hello_from_state(&ctx) + .await + .expect("listener address is present"); - assert_eq!(record_addr, Some(advertised)); - } - - #[tokio::test] - async fn inbound_hello_without_known_listener_is_not_recorded() { - let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); - - let record_addr = peer_record_addr(&peer_game_db, &"peer".to_string(), None).await; - - assert_eq!(record_addr, None); + assert_eq!(hello.listen_addr, advertised); } } diff --git a/crates/lanspread-peer/src/services/stream.rs b/crates/lanspread-peer/src/services/stream.rs index 23b48d8..34a9297 100644 --- a/crates/lanspread-peer/src/services/stream.rs +++ b/crates/lanspread-peer/src/services/stream.rs @@ -15,6 +15,7 @@ use crate::{ local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available}, peer::{send_game_file_chunk, send_game_file_data}, services::handshake::{ + HandshakeCtx, accept_inbound_hello, perform_handshake_with_peer, spawn_library_resync, @@ -76,10 +77,18 @@ async fn dispatch_request( ) -> ResponseWriter { match request { Request::Ping => send_response(framed_tx, Response::Pong, "pong").await, - Request::Hello(hello) => { - let ack = accept_inbound_hello(ctx, remote_addr, hello).await; - send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await - } + Request::Hello(hello) => match accept_inbound_hello(ctx, remote_addr, hello).await { + Ok(ack) => send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await, + Err(err) => { + log::error!("Failed to accept inbound hello: {err}"); + send_response( + framed_tx, + Response::InternalPeerError(err.to_string()), + "HelloAck", + ) + .await + } + }, Request::ListGames => handle_list_games(ctx, framed_tx).await, Request::LibrarySummary { peer_id, summary } => { handle_library_summary(ctx, peer_id, summary).await; @@ -160,19 +169,14 @@ async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> Response } async fn handle_library_summary(ctx: &PeerCtx, peer_id: String, summary: LibrarySummary) { - let (addr, previous_digest, previous_count, features) = { + let (addr, previous_digest, features) = { let db = ctx.peer_game_db.read().await; let Some(addr) = db.peer_addr(&peer_id) else { log::debug!("Ignoring library summary from unknown peer {peer_id}"); return; }; let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0)); - ( - addr, - digest, - db.peer_game_count(&peer_id), - db.peer_features(&peer_id), - ) + (addr, digest, db.peer_features(&peer_id)) }; { @@ -185,24 +189,12 @@ async fn handle_library_summary(ctx: &PeerCtx, peer_id: String, summary: Library ); } - if summary.library_digest != previous_digest || previous_count == 0 { + if summary.library_digest != previous_digest { ctx.task_tracker.spawn({ - let peer_id_arc = ctx.peer_id.clone(); - let local_peer_addr = ctx.local_peer_addr.clone(); - let local_library = ctx.local_library.clone(); - let peer_game_db = ctx.peer_game_db.clone(); - let tx_notify_ui = ctx.tx_notify_ui.clone(); + let handshake_ctx = HandshakeCtx::from_peer_ctx(ctx); async move { - if let Err(err) = perform_handshake_with_peer( - peer_id_arc, - local_peer_addr, - local_library, - peer_game_db, - tx_notify_ui, - addr, - Some(peer_id), - ) - .await + if let Err(err) = + perform_handshake_with_peer(handshake_ctx, addr, Some(peer_id)).await { log::warn!("Failed to refresh library from {addr}: {err}"); } @@ -229,14 +221,6 @@ async fn handle_library_snapshot(ctx: &PeerCtx, peer_id: String, snapshot: Libra } async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelta) { - let Some(addr) = ({ - let db = ctx.peer_game_db.read().await; - db.peer_addr(&peer_id) - }) else { - log::debug!("Ignoring library delta from unknown peer {peer_id}"); - return; - }; - let applied = { let mut db = ctx.peer_game_db.write().await; db.apply_library_delta(&peer_id, delta) @@ -245,16 +229,16 @@ async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelt if applied { events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; } else { - spawn_library_resync( - ctx.peer_id.clone(), - ctx.local_peer_addr.clone(), - ctx.local_library.clone(), - ctx.peer_game_db.clone(), - ctx.tx_notify_ui.clone(), - addr, - peer_id, - "resync", - ); + let addr = { + let db = ctx.peer_game_db.read().await; + db.peer_addr(&peer_id) + }; + let Some(addr) = addr else { + log::debug!("Ignoring library delta from unknown peer {peer_id}"); + return; + }; + + spawn_library_resync(HandshakeCtx::from_peer_ctx(ctx), addr, peer_id, "resync"); } } diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index 9f854aa..0581bc6 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -24,7 +24,7 @@ pub struct GameSummary { pub struct Hello { pub peer_id: String, pub proto_ver: u32, - pub listen_addr: Option, + pub listen_addr: SocketAddr, pub library_rev: u64, pub library_digest: u64, pub features: Vec, @@ -34,7 +34,7 @@ pub struct Hello { pub struct HelloAck { pub peer_id: String, pub proto_ver: u32, - pub listen_addr: Option, + pub listen_addr: SocketAddr, pub library_rev: u64, pub library_digest: u64, pub features: Vec,