refactor(peer): tighten listener-addr handshake invariant

Follow-up hardening for 348a02c, where `listen_addr` was added to Hello and
HelloAck as `Option<SocketAddr>`. Code review surfaced three concrete problems
that the previous commit left open:

1. Cold-start asymmetry. Discovery and the QUIC/mDNS advertiser are spawned
   concurrently. If discovery saw a cached peer advertisement before our own
   advertiser had written `ctx.local_peer_addr`, our outbound Hello carried
   `listen_addr: None`. The receiver's `peer_record_addr` then returned `None`
   and silently dropped the Hello while we still recorded their HelloAck, so
   peer A learned about peer B but B never learned about A until a later
   handshake happened to win the race.

2. Duplicate game-list pipeline. The previous commit added
   `refresh_peer_games`, which post-handshake issued a `ListGames` to fetch
   `peer.games`. The library-sync path (`LibrarySnapshot`) already populates
   the same field. Both could race on first contact and overwrite each other.
   Worse, `refresh_peer_games` was misnamed: a `peer_game_count > 0` guard
   turned it into a fetch-once-then-no-op helper, while
   `handle_library_summary` independently re-triggered a full handshake when
   `previous_count == 0` was observed, producing a redundant ping-pong on
   every first contact.

3. Argument explosion. `perform_handshake_with_peer`, `spawn_library_resync`,
   and `after_peer_library_recorded` had grown to 6-8 individual parameters
   and acquired `#[allow(clippy::too_many_arguments)]` opt-outs. Every caller
   was destructuring the same fields out of `Ctx`/`PeerCtx`.

Changes (all in one commit because they jointly enforce the same invariant:
"a peer is only ever recorded by its listener address, and the local
listener address must exist before we participate in the protocol"):

- `Hello.listen_addr` and `HelloAck.listen_addr` are now `SocketAddr`, not
  `Option<SocketAddr>`. Wire-incompatible, but PROTOCOL_VERSION already moved
  to 3 in 348a02c so no additional version bump is needed.
- `required_listen_addr` reads `ctx.local_peer_addr` and returns an
  `eyre::Result`; `build_hello_from_state` and `build_hello_ack` both call
  it, so an outbound or inbound Hello can no longer be constructed before
  the local QUIC listener is bound. The inbound path maps this into a
  `Response::InternalPeerError` so the remote peer fails cleanly instead of
  seeing a malformed HelloAck.
- `run_peer_discovery` blocks on `wait_for_local_peer_addr` (25 ms poll,
  shutdown-aware) before subscribing to the mDNS browser. This closes the
  cold-start race for outbound handshakes at the source.
- `refresh_peer_games`, `request_game_list_from_peer`, and the
  `previous_count == 0` re-handshake trigger are removed. The post-handshake
  flow now relies solely on `LibrarySummary`/`LibrarySnapshot`/`LibraryDelta`
  for peer-library state; `ListGames` survives only for the
  `request_game_details_*` paths that fetch per-game file descriptions on
  demand.
- New `HandshakeCtx` (with `from_ctx` and `from_peer_ctx` constructors)
  replaces the long argument lists. All `too_many_arguments` allow-attrs in
  `handshake.rs` are gone, and call sites in `handlers.rs`, `discovery.rs`,
  and `stream.rs` collapse to a single clone.
- `handle_library_delta` no longer acquires a read lock on the apply path:
  the `peer_addr` lookup moved into the `else` resync branch where it is
  actually needed.
- `accept_inbound_hello`'s `remote_addr` parameter is renamed to
  `transport_addr`. It is now used only for warn-log formatting, and the
  new name signals that this is the ephemeral QUIC source port, never the
  authoritative listener address that gets recorded.

User-visible effect: on cold start, peers can no longer end up with an
asymmetric view of each other ("A sees B but B never sees A"). First-contact
library sync now does one handshake plus one snapshot/delta exchange instead
of the previous handshake + ListGames + redundant follow-up handshake. The
direct-connect CLI path (`handle_connect_peer_command`) now fails fast with
"local peer listener address is not ready" if invoked before the QUIC server
has bound; this is intentional - the previous behaviour would have sent a
Hello that the receiver had to silently discard.

Test Plan:
- just fmt
- just clippy
- just test (80 peer + 3 cli + 5 tauri tests pass)
- just build
- Manual: bring up `just peer-cli-alpha`/`bravo`/`charlie`, confirm symmetric
  peer discovery and that games show up on every side after one library
  digest cycle, with no duplicated ListGames traffic in trace logs.

Refs: Review feedback on commit 348a02c (listener-address handshake fix).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-18 18:21:19 +02:00
parent 348a02c35f
commit ce51d92df0
9 changed files with 182 additions and 314 deletions
+4 -4
View File
@@ -26,10 +26,10 @@ chunked file transfers.
When a peer is discovered:
1. Connect and send `Hello { peer_id, proto_ver, library_rev, library_digest, features }`.
`Hello` also carries the sender's advertised `listen_addr`; the QUIC source
port is only a temporary transport port and must not be recorded as the
peer's listener.
1. Connect and send `Hello { peer_id, proto_ver, listen_addr, library_rev,
library_digest, features }`. `listen_addr` is mandatory; the QUIC source port
is only a temporary transport port and must not be recorded as the peer's
listener.
2. Receive `HelloAck { peer_id, proto_ver, listen_addr, library_rev,
library_digest, features }`.
3. If the remote `peer_id` is already known but the address changed, update it.
+3 -17
View File
@@ -33,7 +33,7 @@ use crate::{
network::{request_game_details_from_peer, send_library_delta},
peer_db::PeerGameDB,
remote_peer::ensure_peer_id_for_addr,
services::perform_handshake_with_peer,
services::{HandshakeCtx, perform_handshake_with_peer},
};
// =============================================================================
@@ -704,24 +704,10 @@ pub async fn handle_connect_peer_command(
addr: SocketAddr,
) {
log::info!("Direct connect command received for {addr}");
let peer_id = ctx.peer_id.clone();
let local_peer_addr = ctx.local_peer_addr.clone();
let local_library = ctx.local_library.clone();
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui = tx_notify_ui.clone();
let handshake_ctx = HandshakeCtx::from_ctx(ctx, tx_notify_ui);
ctx.task_tracker.spawn(async move {
if let Err(err) = perform_handshake_with_peer(
peer_id,
local_peer_addr,
local_library,
peer_game_db,
tx_notify_ui,
addr,
None,
)
.await
{
if let Err(err) = perform_handshake_with_peer(handshake_ctx, addr, None).await {
log::warn!("Failed direct connect to {addr}: {err}");
}
});
+1 -28
View File
@@ -8,7 +8,7 @@ use std::{
use bytes::BytesMut;
use futures::{SinkExt, StreamExt};
use if_addrs::{IfAddr, Interface, get_if_addrs};
use lanspread_db::db::{Game, GameFileDescription};
use lanspread_db::db::GameFileDescription;
use lanspread_proto::{
Hello,
HelloAck,
@@ -172,33 +172,6 @@ pub async fn send_goodbye(peer_addr: SocketAddr, peer_id: String) -> eyre::Resul
send_oneway_request(peer_addr, Request::Goodbye { peer_id }).await
}
/// Requests the current game list from a peer.
pub async fn request_game_list_from_peer(peer_addr: SocketAddr) -> eyre::Result<Vec<Game>> {
let mut conn = connect_to_peer(peer_addr).await?;
let stream = conn.open_bidirectional_stream().await?;
let (rx, tx) = stream.split();
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
framed_tx.send(Request::ListGames.encode()).await?;
framed_tx.close().await?;
let mut data = BytesMut::new();
while let Some(Ok(bytes)) = framed_rx.next().await {
data.extend_from_slice(&bytes);
}
let response = Response::decode(data.freeze());
match response {
Response::ListGames(games) => Ok(games),
Response::InternalPeerError(error_msg) => {
eyre::bail!("peer {peer_addr} reported internal error: {error_msg}")
}
other => eyre::bail!("unexpected response from {peer_addr}: {other:?}"),
}
}
/// Requests game file details from a peer.
pub async fn request_game_details_from_peer(
peer_addr: SocketAddr,
+1 -41
View File
@@ -2,15 +2,9 @@
use std::{net::SocketAddr, sync::Arc};
use lanspread_db::db::Game;
use lanspread_proto::GameSummary;
use tokio::sync::RwLock;
use crate::{
events,
network::request_game_list_from_peer,
peer_db::{PeerGameDB, PeerId},
};
use crate::peer_db::{PeerGameDB, PeerId};
pub async fn ensure_peer_id_for_addr(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
@@ -25,37 +19,3 @@ pub async fn ensure_peer_id_for_addr(
db.upsert_peer(addr_id.clone(), peer_addr);
addr_id
}
pub async fn refresh_peer_games(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
tx_notify_ui: &tokio::sync::mpsc::UnboundedSender<crate::PeerEvent>,
peer_addr: SocketAddr,
peer_id: &PeerId,
) -> eyre::Result<()> {
if peer_game_db.read().await.peer_game_count(peer_id) > 0 {
return Ok(());
}
let games = request_game_list_from_peer(peer_addr).await?;
let summaries = games.into_iter().map(game_to_summary).collect::<Vec<_>>();
{
let mut db = peer_game_db.write().await;
db.update_peer_games(peer_id, summaries);
}
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await;
Ok(())
}
fn game_to_summary(game: Game) -> GameSummary {
let availability = game.normalized_availability();
GameSummary {
id: game.id,
name: game.name,
size: game.size,
downloaded: game.downloaded,
installed: game.installed,
eti_version: game.eti_game_version,
manifest_hash: 0,
availability,
}
}
+1 -1
View File
@@ -13,7 +13,7 @@ mod server;
mod stream;
pub use discovery::run_peer_discovery;
pub(crate) use handshake::perform_handshake_with_peer;
pub(crate) use handshake::{HandshakeCtx, perform_handshake_with_peer};
pub use liveness::run_ping_service;
pub use local_monitor::run_local_game_monitor;
pub use server::run_server_component;
+22 -17
View File
@@ -11,7 +11,7 @@ use crate::{
context::Ctx,
events,
peer_db::PeerId,
services::handshake::perform_handshake_with_peer,
services::handshake::{HandshakeCtx, perform_handshake_with_peer},
};
struct MdnsPeerInfo {
@@ -29,6 +29,10 @@ pub async fn run_peer_discovery(
) -> eyre::Result<()> {
log::info!("Starting peer discovery task");
if !wait_for_local_peer_addr(&ctx).await {
return Ok(());
}
let service_type = LANSPREAD_SERVICE_TYPE.to_string();
let (service_tx, mut service_rx) = tokio::sync::mpsc::unbounded_channel();
let worker_shutdown = ctx.shutdown.clone();
@@ -93,6 +97,19 @@ pub async fn run_peer_discovery(
}
}
async fn wait_for_local_peer_addr(ctx: &Ctx) -> bool {
loop {
if ctx.local_peer_addr.read().await.is_some() {
return true;
}
tokio::select! {
() = ctx.shutdown.cancelled() => return false,
() = tokio::time::sleep(Duration::from_millis(25)) => {}
}
}
}
fn parse_mdns_peer(service: &MdnsService) -> MdnsPeerInfo {
MdnsPeerInfo {
addr: service.addr,
@@ -161,33 +178,21 @@ async fn handle_discovered_peer(
}
if upsert.is_new || upsert.addr_changed {
spawn_protocol_negotiation(&info, ctx, tx_notify_ui.clone(), peer_id);
spawn_protocol_negotiation(&info, ctx, tx_notify_ui, peer_id);
}
}
fn spawn_protocol_negotiation(
info: &MdnsPeerInfo,
ctx: &Ctx,
tx_notify_ui: UnboundedSender<PeerEvent>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
peer_id: PeerId,
) {
let peer_addr = info.addr;
let peer_id_arc = ctx.peer_id.clone();
let local_peer_addr = ctx.local_peer_addr.clone();
let local_library = ctx.local_library.clone();
let peer_game_db = ctx.peer_game_db.clone();
let handshake_ctx = HandshakeCtx::from_ctx(ctx, tx_notify_ui);
ctx.task_tracker.spawn(async move {
if let Err(err) = perform_handshake_with_peer(
peer_id_arc,
local_peer_addr,
local_library,
peer_game_db,
tx_notify_ui,
peer_addr,
Some(peer_id),
)
.await
if let Err(err) = perform_handshake_with_peer(handshake_ctx, peer_addr, Some(peer_id)).await
{
log::warn!("Failed to negotiate protocol with peer {peer_addr}: {err}");
}
+119 -159
View File
@@ -7,13 +7,12 @@ use tokio::sync::{RwLock, mpsc::UnboundedSender};
use crate::{
PeerEvent,
context::PeerCtx,
context::{Ctx, PeerCtx},
events,
identity::default_features,
library::{LocalLibraryState, build_library_snapshot, build_library_summary},
network::{exchange_hello, send_library_delta, send_library_snapshot, send_library_summary},
peer_db::{PeerGameDB, PeerId, PeerUpsert},
remote_peer::refresh_peer_games,
};
enum LibraryUpdate {
@@ -21,46 +20,76 @@ enum LibraryUpdate {
Snapshot(LibrarySnapshot),
}
pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> HelloAck {
#[derive(Clone)]
pub(crate) struct HandshakeCtx {
peer_id: Arc<String>,
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
local_library: Arc<RwLock<LocalLibraryState>>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
}
impl HandshakeCtx {
pub(crate) fn from_ctx(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) -> Self {
Self {
peer_id: ctx.peer_id.clone(),
local_peer_addr: ctx.local_peer_addr.clone(),
local_library: ctx.local_library.clone(),
peer_game_db: ctx.peer_game_db.clone(),
tx_notify_ui: tx_notify_ui.clone(),
}
}
pub(crate) fn from_peer_ctx(ctx: &PeerCtx) -> Self {
Self {
peer_id: ctx.peer_id.clone(),
local_peer_addr: ctx.local_peer_addr.clone(),
local_library: ctx.local_library.clone(),
peer_game_db: ctx.peer_game_db.clone(),
tx_notify_ui: ctx.tx_notify_ui.clone(),
}
}
}
async fn required_listen_addr(
local_peer_addr: &Arc<RwLock<Option<SocketAddr>>>,
) -> eyre::Result<SocketAddr> {
(*local_peer_addr.read().await)
.ok_or_else(|| eyre::eyre!("local peer listener address is not ready"))
}
pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> eyre::Result<HelloAck> {
let library_guard = ctx.local_library.read().await;
let listen_addr = *ctx.local_peer_addr.read().await;
HelloAck {
let listen_addr = required_listen_addr(&ctx.local_peer_addr).await?;
Ok(HelloAck {
peer_id: ctx.peer_id.as_ref().clone(),
proto_ver: PROTOCOL_VERSION,
listen_addr,
library_rev: library_guard.revision,
library_digest: library_guard.digest,
features: default_features(),
}
})
}
async fn build_hello_from_state(
peer_id: &str,
local_peer_addr: &Arc<RwLock<Option<SocketAddr>>>,
local_library: &Arc<RwLock<LocalLibraryState>>,
) -> Hello {
let library_guard = local_library.read().await;
let listen_addr = *local_peer_addr.read().await;
Hello {
peer_id: peer_id.to_string(),
async fn build_hello_from_state(ctx: &HandshakeCtx) -> eyre::Result<Hello> {
let library_guard = ctx.local_library.read().await;
let listen_addr = required_listen_addr(&ctx.local_peer_addr).await?;
Ok(Hello {
peer_id: ctx.peer_id.as_ref().clone(),
proto_ver: PROTOCOL_VERSION,
listen_addr,
library_rev: library_guard.revision,
library_digest: library_guard.digest,
features: default_features(),
}
})
}
pub(crate) async fn perform_handshake_with_peer(
peer_id: Arc<String>,
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
local_library: Arc<RwLock<LocalLibraryState>>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
ctx: HandshakeCtx,
peer_addr: SocketAddr,
peer_id_hint: Option<PeerId>,
) -> eyre::Result<()> {
let hello = build_hello_from_state(peer_id.as_ref(), &local_peer_addr, &local_library).await;
let hello = build_hello_from_state(&ctx).await?;
let ack = exchange_hello(peer_addr, hello).await?;
if ack.proto_ver != PROTOCOL_VERSION {
@@ -71,7 +100,7 @@ pub(crate) async fn perform_handshake_with_peer(
return Ok(());
}
if ack.peer_id == *peer_id {
if ack.peer_id == *ctx.peer_id {
log::trace!("Ignoring handshake with self for {peer_addr}");
return Ok(());
}
@@ -83,12 +112,12 @@ pub(crate) async fn perform_handshake_with_peer(
"Peer {peer_addr} id mismatch: mDNS advertised {expected}, hello ack returned {}",
ack.peer_id
);
let _ = peer_game_db.write().await.remove_peer(expected);
let _ = ctx.peer_game_db.write().await.remove_peer(expected);
}
let record_addr = ack.listen_addr.unwrap_or(peer_addr);
let record_addr = ack.listen_addr;
let upsert = record_remote_library(
&peer_game_db,
&ctx.peer_game_db,
ack.peer_id.clone(),
record_addr,
ack.library_rev,
@@ -98,31 +127,22 @@ pub(crate) async fn perform_handshake_with_peer(
.await;
after_peer_library_recorded(
&ctx,
upsert,
record_addr,
peer_id.as_ref(),
ack.library_rev,
ack.library_digest,
&local_library,
&peer_game_db,
&tx_notify_ui,
)
.await;
if let Err(err) =
refresh_peer_games(&peer_game_db, &tx_notify_ui, record_addr, &ack.peer_id).await
{
log::warn!("Failed to refresh peer games from {record_addr}: {err}");
}
Ok(())
}
pub(super) async fn accept_inbound_hello(
ctx: &PeerCtx,
remote_addr: Option<SocketAddr>,
transport_addr: Option<SocketAddr>,
hello: Hello,
) -> HelloAck {
) -> eyre::Result<HelloAck> {
if hello.peer_id == *ctx.peer_id {
log::trace!("Ignoring hello from self");
return build_hello_ack(ctx).await;
@@ -130,91 +150,44 @@ pub(super) async fn accept_inbound_hello(
if hello.proto_ver != PROTOCOL_VERSION {
log::warn!(
"Incompatible protocol from {remote_addr:?}: {}",
"Incompatible protocol from {transport_addr:?}: {}",
hello.proto_ver
);
return build_hello_ack(ctx).await;
}
if let Some(addr) = peer_record_addr(&ctx.peer_game_db, &hello.peer_id, hello.listen_addr).await
{
let upsert = record_remote_library(
&ctx.peer_game_db,
hello.peer_id.clone(),
addr,
hello.library_rev,
hello.library_digest,
hello.features.clone(),
)
.await;
let addr = hello.listen_addr;
let handshake_ctx = HandshakeCtx::from_peer_ctx(ctx);
let upsert = record_remote_library(
&ctx.peer_game_db,
hello.peer_id.clone(),
addr,
hello.library_rev,
hello.library_digest,
hello.features.clone(),
)
.await;
after_peer_library_recorded(
upsert,
addr,
ctx.peer_id.as_ref(),
hello.library_rev,
hello.library_digest,
&ctx.local_library,
&ctx.peer_game_db,
&ctx.tx_notify_ui,
)
.await;
spawn_peer_game_refresh(ctx, addr, hello.peer_id.clone());
} else {
log::debug!(
"Ignoring inbound hello from {} without a known listener address",
hello.peer_id
);
}
after_peer_library_recorded(
&handshake_ctx,
upsert,
addr,
hello.library_rev,
hello.library_digest,
)
.await;
build_hello_ack(ctx).await
}
async fn peer_record_addr(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
peer_id: &PeerId,
listen_addr: Option<SocketAddr>,
) -> Option<SocketAddr> {
let db = peer_game_db.read().await;
listen_addr.or_else(|| db.peer_addr(peer_id))
}
fn spawn_peer_game_refresh(ctx: &PeerCtx, peer_addr: SocketAddr, peer_id: PeerId) {
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui = ctx.tx_notify_ui.clone();
ctx.task_tracker.spawn(async move {
if let Err(err) =
refresh_peer_games(&peer_game_db, &tx_notify_ui, peer_addr, &peer_id).await
{
log::warn!("Failed to refresh peer games from {peer_addr}: {err}");
}
});
}
#[allow(clippy::too_many_arguments)]
pub(super) fn spawn_library_resync(
peer_id: Arc<String>,
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
local_library: Arc<RwLock<LocalLibraryState>>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
ctx: HandshakeCtx,
peer_addr: SocketAddr,
peer_id_hint: PeerId,
reason: &'static str,
) {
tokio::spawn(async move {
if let Err(err) = perform_handshake_with_peer(
peer_id,
local_peer_addr,
local_library,
peer_game_db,
tx_notify_ui,
peer_addr,
Some(peer_id_hint),
)
.await
{
if let Err(err) = perform_handshake_with_peer(ctx, peer_addr, Some(peer_id_hint)).await {
log::warn!("Failed to {reason} library from {peer_addr}: {err}");
}
});
@@ -234,42 +207,28 @@ async fn record_remote_library(
upsert
}
#[allow(clippy::too_many_arguments)]
async fn after_peer_library_recorded(
ctx: &HandshakeCtx,
upsert: PeerUpsert,
peer_addr: SocketAddr,
local_peer_id: &str,
remote_library_rev: u64,
remote_library_digest: u64,
local_library: &Arc<RwLock<LocalLibraryState>>,
peer_game_db: &Arc<RwLock<PeerGameDB>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) {
if upsert.is_new {
events::emit_peer_discovered(peer_game_db, tx_notify_ui, peer_addr).await;
send_local_library_summary(peer_addr, local_peer_id, local_library).await;
events::emit_peer_discovered(&ctx.peer_game_db, &ctx.tx_notify_ui, peer_addr).await;
send_local_library_summary(peer_addr, ctx).await;
}
send_local_library_update_if_needed(
peer_addr,
local_peer_id,
local_library,
remote_library_rev,
remote_library_digest,
)
.await;
send_local_library_update_if_needed(peer_addr, ctx, remote_library_rev, remote_library_digest)
.await;
}
async fn send_local_library_summary(
peer_addr: SocketAddr,
local_peer_id: &str,
local_library: &Arc<RwLock<LocalLibraryState>>,
) {
async fn send_local_library_summary(peer_addr: SocketAddr, ctx: &HandshakeCtx) {
let summary = {
let library_guard = local_library.read().await;
let library_guard = ctx.local_library.read().await;
build_library_summary(&library_guard)
};
let local_peer_id = local_peer_id.to_string();
let local_peer_id = ctx.peer_id.as_ref().clone();
tokio::spawn(async move {
if let Err(err) = send_library_summary(peer_addr, &local_peer_id, summary).await {
@@ -280,13 +239,13 @@ async fn send_local_library_summary(
async fn send_local_library_update_if_needed(
peer_addr: SocketAddr,
local_peer_id: &str,
local_library: &Arc<RwLock<LocalLibraryState>>,
ctx: &HandshakeCtx,
remote_rev: u64,
remote_digest: u64,
) {
if let Some(update) = select_library_update(local_library, remote_rev, remote_digest).await {
let local_peer_id = local_peer_id.to_string();
if let Some(update) = select_library_update(&ctx.local_library, remote_rev, remote_digest).await
{
let local_peer_id = ctx.peer_id.as_ref().clone();
tokio::spawn(async move {
let result = match update {
LibraryUpdate::Delta(delta) => {
@@ -327,46 +286,47 @@ async fn select_library_update(
mod tests {
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::RwLock;
use tokio::sync::{RwLock, mpsc};
use super::peer_record_addr;
use crate::peer_db::PeerGameDB;
use super::{HandshakeCtx, build_hello_from_state};
use crate::{library::LocalLibraryState, peer_db::PeerGameDB};
fn addr(ip: [u8; 4], port: u16) -> SocketAddr {
SocketAddr::from((ip, port))
}
#[tokio::test]
async fn inbound_hello_keeps_existing_listening_addr() {
fn test_handshake_ctx(local_peer_addr: Option<SocketAddr>) -> HandshakeCtx {
let (tx_notify_ui, _rx_notify_ui) = mpsc::unbounded_channel();
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let advertised = addr([10, 66, 0, 2], 40000);
peer_game_db
.write()
HandshakeCtx {
peer_id: Arc::new("local-peer".to_string()),
local_peer_addr: Arc::new(RwLock::new(local_peer_addr)),
local_library: Arc::new(RwLock::new(LocalLibraryState::empty())),
peer_game_db,
tx_notify_ui,
}
}
#[tokio::test]
async fn outbound_hello_requires_local_listener_addr() {
let ctx = test_handshake_ctx(None);
let err = build_hello_from_state(&ctx)
.await
.upsert_peer("peer".to_string(), advertised);
.expect_err("hello without listener must fail");
let record_addr = peer_record_addr(&peer_game_db, &"peer".to_string(), None).await;
assert_eq!(record_addr, Some(advertised));
assert_eq!(err.to_string(), "local peer listener address is not ready");
}
#[tokio::test]
async fn inbound_hello_prefers_reported_listening_addr() {
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
async fn outbound_hello_carries_local_listener_addr() {
let advertised = addr([10, 66, 0, 2], 40000);
let ctx = test_handshake_ctx(Some(advertised));
let record_addr =
peer_record_addr(&peer_game_db, &"peer".to_string(), Some(advertised)).await;
let hello = build_hello_from_state(&ctx)
.await
.expect("listener address is present");
assert_eq!(record_addr, Some(advertised));
}
#[tokio::test]
async fn inbound_hello_without_known_listener_is_not_recorded() {
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let record_addr = peer_record_addr(&peer_game_db, &"peer".to_string(), None).await;
assert_eq!(record_addr, None);
assert_eq!(hello.listen_addr, advertised);
}
}
+29 -45
View File
@@ -15,6 +15,7 @@ use crate::{
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available},
peer::{send_game_file_chunk, send_game_file_data},
services::handshake::{
HandshakeCtx,
accept_inbound_hello,
perform_handshake_with_peer,
spawn_library_resync,
@@ -76,10 +77,18 @@ async fn dispatch_request(
) -> ResponseWriter {
match request {
Request::Ping => send_response(framed_tx, Response::Pong, "pong").await,
Request::Hello(hello) => {
let ack = accept_inbound_hello(ctx, remote_addr, hello).await;
send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await
}
Request::Hello(hello) => match accept_inbound_hello(ctx, remote_addr, hello).await {
Ok(ack) => send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await,
Err(err) => {
log::error!("Failed to accept inbound hello: {err}");
send_response(
framed_tx,
Response::InternalPeerError(err.to_string()),
"HelloAck",
)
.await
}
},
Request::ListGames => handle_list_games(ctx, framed_tx).await,
Request::LibrarySummary { peer_id, summary } => {
handle_library_summary(ctx, peer_id, summary).await;
@@ -160,19 +169,14 @@ async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> Response
}
async fn handle_library_summary(ctx: &PeerCtx, peer_id: String, summary: LibrarySummary) {
let (addr, previous_digest, previous_count, features) = {
let (addr, previous_digest, features) = {
let db = ctx.peer_game_db.read().await;
let Some(addr) = db.peer_addr(&peer_id) else {
log::debug!("Ignoring library summary from unknown peer {peer_id}");
return;
};
let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0));
(
addr,
digest,
db.peer_game_count(&peer_id),
db.peer_features(&peer_id),
)
(addr, digest, db.peer_features(&peer_id))
};
{
@@ -185,24 +189,12 @@ async fn handle_library_summary(ctx: &PeerCtx, peer_id: String, summary: Library
);
}
if summary.library_digest != previous_digest || previous_count == 0 {
if summary.library_digest != previous_digest {
ctx.task_tracker.spawn({
let peer_id_arc = ctx.peer_id.clone();
let local_peer_addr = ctx.local_peer_addr.clone();
let local_library = ctx.local_library.clone();
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui = ctx.tx_notify_ui.clone();
let handshake_ctx = HandshakeCtx::from_peer_ctx(ctx);
async move {
if let Err(err) = perform_handshake_with_peer(
peer_id_arc,
local_peer_addr,
local_library,
peer_game_db,
tx_notify_ui,
addr,
Some(peer_id),
)
.await
if let Err(err) =
perform_handshake_with_peer(handshake_ctx, addr, Some(peer_id)).await
{
log::warn!("Failed to refresh library from {addr}: {err}");
}
@@ -229,14 +221,6 @@ async fn handle_library_snapshot(ctx: &PeerCtx, peer_id: String, snapshot: Libra
}
async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelta) {
let Some(addr) = ({
let db = ctx.peer_game_db.read().await;
db.peer_addr(&peer_id)
}) else {
log::debug!("Ignoring library delta from unknown peer {peer_id}");
return;
};
let applied = {
let mut db = ctx.peer_game_db.write().await;
db.apply_library_delta(&peer_id, delta)
@@ -245,16 +229,16 @@ async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelt
if applied {
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
} else {
spawn_library_resync(
ctx.peer_id.clone(),
ctx.local_peer_addr.clone(),
ctx.local_library.clone(),
ctx.peer_game_db.clone(),
ctx.tx_notify_ui.clone(),
addr,
peer_id,
"resync",
);
let addr = {
let db = ctx.peer_game_db.read().await;
db.peer_addr(&peer_id)
};
let Some(addr) = addr else {
log::debug!("Ignoring library delta from unknown peer {peer_id}");
return;
};
spawn_library_resync(HandshakeCtx::from_peer_ctx(ctx), addr, peer_id, "resync");
}
}