diff --git a/crates/lanspread-peer/src/peer_db.rs b/crates/lanspread-peer/src/peer_db.rs index f5d6b6d..92e2e32 100644 --- a/crates/lanspread-peer/src/peer_db.rs +++ b/crates/lanspread-peer/src/peer_db.rs @@ -137,6 +137,30 @@ impl PeerGameDB { self.addr_index.get(addr) } + /// Returns the peer id for a transport source address. + /// + /// QUIC clients connect from ephemeral source ports, while peer records are + /// keyed by their advertised listening address. If the exact socket address + /// is unknown, fall back to a unique peer with the same IP address. + #[must_use] + pub fn peer_id_for_transport_addr(&self, addr: &SocketAddr) -> Option { + if let Some(peer_id) = self.addr_index.get(addr) { + return Some(peer_id.clone()); + } + + let mut matches = self + .peers + .values() + .filter(|peer| peer.addr.ip() == addr.ip()) + .map(|peer| peer.peer_id.clone()); + let peer_id = matches.next()?; + if matches.next().is_some() { + return None; + } + + Some(peer_id) + } + /// Returns the library state for a peer if known. #[must_use] pub fn peer_library_state(&self, peer_id: &PeerId) -> Option<(u64, u64)> { @@ -201,7 +225,7 @@ impl PeerGameDB { /// Updates the last seen timestamp for a peer by address. pub fn update_last_seen_by_addr(&mut self, addr: &SocketAddr) { - if let Some(peer_id) = self.addr_index.get(addr).cloned() + if let Some(peer_id) = self.peer_id_for_transport_addr(addr) && let Some(peer) = self.peers.get_mut(&peer_id) { peer.last_seen = Instant::now(); @@ -815,6 +839,10 @@ mod tests { SocketAddr::from(([127, 0, 0, 1], port)) } + fn ip_addr(ip: [u8; 4], port: u16) -> SocketAddr { + SocketAddr::from((ip, port)) + } + fn summary(id: &str, version: &str, availability: Availability) -> GameSummary { GameSummary { id: id.to_string(), @@ -889,6 +917,29 @@ mod tests { assert!(db.peers_with_latest_version("game").is_empty()); } + #[test] + fn transport_addr_matches_known_peer_on_ephemeral_port() { + let advertised = ip_addr([10, 66, 0, 2], 40000); + let transport_source = ip_addr([10, 66, 0, 2], 52000); + let mut db = PeerGameDB::new(); + db.upsert_peer("peer".to_string(), advertised); + + assert_eq!( + db.peer_id_for_transport_addr(&transport_source).as_deref(), + Some("peer") + ); + } + + #[test] + fn transport_addr_fallback_requires_unique_peer_ip() { + let source = ip_addr([10, 66, 0, 2], 52000); + let mut db = PeerGameDB::new(); + db.upsert_peer("first".to_string(), ip_addr([10, 66, 0, 2], 40000)); + db.upsert_peer("second".to_string(), ip_addr([10, 66, 0, 2], 41000)); + + assert_eq!(db.peer_id_for_transport_addr(&source), None); + } + #[test] fn validation_uses_latest_version_file_metadata() { let old_addr = addr(12003); diff --git a/crates/lanspread-peer/src/remote_peer.rs b/crates/lanspread-peer/src/remote_peer.rs index b6942c6..f90d6cd 100644 --- a/crates/lanspread-peer/src/remote_peer.rs +++ b/crates/lanspread-peer/src/remote_peer.rs @@ -11,7 +11,7 @@ pub async fn ensure_peer_id_for_addr( peer_addr: SocketAddr, ) -> PeerId { let mut db = peer_game_db.write().await; - if let Some(peer_id) = db.peer_id_for_addr(&peer_addr).cloned() { + if let Some(peer_id) = db.peer_id_for_transport_addr(&peer_addr) { return peer_id; } diff --git a/crates/lanspread-peer/src/services/handshake.rs b/crates/lanspread-peer/src/services/handshake.rs index 295eacd..107075e 100644 --- a/crates/lanspread-peer/src/services/handshake.rs +++ b/crates/lanspread-peer/src/services/handshake.rs @@ -121,7 +121,7 @@ pub(super) async fn accept_inbound_hello( return build_hello_ack(ctx).await; } - if let Some(addr) = remote_addr { + if let Some(addr) = peer_record_addr(&ctx.peer_game_db, &hello.peer_id, remote_addr).await { let upsert = record_remote_library( &ctx.peer_game_db, hello.peer_id.clone(), @@ -147,6 +147,16 @@ pub(super) async fn accept_inbound_hello( build_hello_ack(ctx).await } +async fn peer_record_addr( + peer_game_db: &Arc>, + peer_id: &PeerId, + remote_addr: Option, +) -> Option { + let remote_addr = remote_addr?; + let db = peer_game_db.read().await; + Some(db.peer_addr(peer_id).unwrap_or(remote_addr)) +} + pub(super) fn spawn_library_resync( peer_id: Arc, local_library: Arc>, @@ -265,3 +275,33 @@ async fn select_library_update( &library_guard, ))) } + +#[cfg(test)] +mod tests { + use std::{net::SocketAddr, sync::Arc}; + + use tokio::sync::RwLock; + + use super::peer_record_addr; + use crate::peer_db::PeerGameDB; + + fn addr(ip: [u8; 4], port: u16) -> SocketAddr { + SocketAddr::from((ip, port)) + } + + #[tokio::test] + async fn inbound_hello_keeps_existing_listening_addr() { + let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); + let advertised = addr([10, 66, 0, 2], 40000); + let transport_source = addr([10, 66, 0, 2], 52000); + peer_game_db + .write() + .await + .upsert_peer("peer".to_string(), advertised); + + let record_addr = + peer_record_addr(&peer_game_db, &"peer".to_string(), Some(transport_source)).await; + + assert_eq!(record_addr, Some(advertised)); + } +} diff --git a/crates/lanspread-peer/src/services/stream.rs b/crates/lanspread-peer/src/services/stream.rs index 52527e7..b03679f 100644 --- a/crates/lanspread-peer/src/services/stream.rs +++ b/crates/lanspread-peer/src/services/stream.rs @@ -362,17 +362,12 @@ async fn handle_file_chunk_request( FramedWrite::new(tx, LengthDelimitedCodec::new()) } -async fn handle_goodbye(ctx: &PeerCtx, remote_addr: Option, peer_id: String) { +async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option, peer_id: String) { log::info!("Received Goodbye from peer {peer_id}"); let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) }; - if removed.is_none() { - return; - } - - if let Some(addr) = remote_addr { - events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, addr).await; - } + let Some(peer) = removed else { return }; + events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, peer.addr).await; events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; }