//! Protocol handshakes and library synchronization between peers. use std::{net::SocketAddr, sync::Arc}; use lanspread_proto::{Hello, HelloAck, LibraryDelta, LibrarySnapshot, PROTOCOL_VERSION}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ PeerEvent, context::PeerCtx, events, identity::default_features, library::{LocalLibraryState, build_library_snapshot, build_library_summary}, network::{exchange_hello, send_library_delta, send_library_snapshot, send_library_summary}, peer_db::{PeerGameDB, PeerId, PeerUpsert}, }; enum LibraryUpdate { Delta(LibraryDelta), Snapshot(LibrarySnapshot), } pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> HelloAck { let library_guard = ctx.local_library.read().await; HelloAck { peer_id: ctx.peer_id.as_ref().clone(), proto_ver: PROTOCOL_VERSION, library_rev: library_guard.revision, library_digest: library_guard.digest, features: default_features(), } } async fn build_hello_from_state( peer_id: &str, local_library: &Arc>, ) -> Hello { let library_guard = local_library.read().await; Hello { peer_id: peer_id.to_string(), proto_ver: PROTOCOL_VERSION, library_rev: library_guard.revision, library_digest: library_guard.digest, features: default_features(), } } pub(crate) async fn perform_handshake_with_peer( peer_id: Arc, local_library: Arc>, peer_game_db: Arc>, tx_notify_ui: UnboundedSender, peer_addr: SocketAddr, peer_id_hint: Option, ) -> eyre::Result<()> { let hello = build_hello_from_state(peer_id.as_ref(), &local_library).await; let ack = exchange_hello(peer_addr, hello).await?; if ack.proto_ver != PROTOCOL_VERSION { log::warn!( "Peer {peer_addr} uses incompatible protocol {} (expected {PROTOCOL_VERSION})", ack.proto_ver ); return Ok(()); } if ack.peer_id == *peer_id { log::trace!("Ignoring handshake with self for {peer_addr}"); return Ok(()); } if let Some(expected) = peer_id_hint.as_ref() && expected != &ack.peer_id { log::warn!( "Peer {peer_addr} id mismatch: mDNS advertised {expected}, hello ack returned {}", ack.peer_id ); let _ = peer_game_db.write().await.remove_peer(expected); } let upsert = record_remote_library( &peer_game_db, ack.peer_id.clone(), peer_addr, ack.library_rev, ack.library_digest, ack.features.clone(), ) .await; after_peer_library_recorded( upsert, peer_addr, ack.library_rev, ack.library_digest, &local_library, &peer_game_db, &tx_notify_ui, ) .await; Ok(()) } pub(super) async fn accept_inbound_hello( ctx: &PeerCtx, remote_addr: Option, hello: Hello, ) -> HelloAck { if hello.peer_id == *ctx.peer_id { log::trace!("Ignoring hello from self"); return build_hello_ack(ctx).await; } if hello.proto_ver != PROTOCOL_VERSION { log::warn!( "Incompatible protocol from {remote_addr:?}: {}", hello.proto_ver ); return build_hello_ack(ctx).await; } if let Some(addr) = peer_record_addr(&ctx.peer_game_db, &hello.peer_id, remote_addr).await { let upsert = record_remote_library( &ctx.peer_game_db, hello.peer_id.clone(), addr, hello.library_rev, hello.library_digest, hello.features.clone(), ) .await; after_peer_library_recorded( upsert, addr, hello.library_rev, hello.library_digest, &ctx.local_library, &ctx.peer_game_db, &ctx.tx_notify_ui, ) .await; } build_hello_ack(ctx).await } async fn peer_record_addr( peer_game_db: &Arc>, peer_id: &PeerId, remote_addr: Option, ) -> Option { let remote_addr = remote_addr?; let db = peer_game_db.read().await; Some(db.peer_addr(peer_id).unwrap_or(remote_addr)) } pub(super) fn spawn_library_resync( peer_id: Arc, local_library: Arc>, peer_game_db: Arc>, tx_notify_ui: UnboundedSender, peer_addr: SocketAddr, peer_id_hint: PeerId, reason: &'static str, ) { tokio::spawn(async move { if let Err(err) = perform_handshake_with_peer( peer_id, local_library, peer_game_db, tx_notify_ui, peer_addr, Some(peer_id_hint), ) .await { log::warn!("Failed to {reason} library from {peer_addr}: {err}"); } }); } async fn record_remote_library( peer_game_db: &Arc>, peer_id: PeerId, peer_addr: SocketAddr, library_rev: u64, library_digest: u64, features: Vec, ) -> PeerUpsert { let mut db = peer_game_db.write().await; let upsert = db.upsert_peer(peer_id.clone(), peer_addr); db.update_peer_library(&peer_id, library_rev, library_digest, features); upsert } async fn after_peer_library_recorded( upsert: PeerUpsert, peer_addr: SocketAddr, remote_library_rev: u64, remote_library_digest: u64, local_library: &Arc>, peer_game_db: &Arc>, tx_notify_ui: &UnboundedSender, ) { if upsert.is_new { events::emit_peer_discovered(peer_game_db, tx_notify_ui, peer_addr).await; send_local_library_summary(peer_addr, local_library).await; } send_local_library_update_if_needed( peer_addr, local_library, remote_library_rev, remote_library_digest, ) .await; } async fn send_local_library_summary( peer_addr: SocketAddr, local_library: &Arc>, ) { let summary = { let library_guard = local_library.read().await; build_library_summary(&library_guard) }; tokio::spawn(async move { if let Err(err) = send_library_summary(peer_addr, summary).await { log::warn!("Failed to send library summary to {peer_addr}: {err}"); } }); } async fn send_local_library_update_if_needed( peer_addr: SocketAddr, local_library: &Arc>, remote_rev: u64, remote_digest: u64, ) { if let Some(update) = select_library_update(local_library, remote_rev, remote_digest).await { tokio::spawn(async move { let result = match update { LibraryUpdate::Delta(delta) => send_library_delta(peer_addr, delta).await, LibraryUpdate::Snapshot(snapshot) => { send_library_snapshot(peer_addr, snapshot).await } }; if let Err(err) = result { log::warn!("Failed to send library update to {peer_addr}: {err}"); } }); } } async fn select_library_update( local_library: &Arc>, remote_rev: u64, remote_digest: u64, ) -> Option { let library_guard = local_library.read().await; if library_guard.digest == remote_digest || remote_rev > library_guard.revision { return None; } if let Some(delta) = library_guard.delta_since(remote_rev) { return Some(LibraryUpdate::Delta(delta)); } Some(LibraryUpdate::Snapshot(build_library_snapshot( &library_guard, ))) } #[cfg(test)] mod tests { use std::{net::SocketAddr, sync::Arc}; use tokio::sync::RwLock; use super::peer_record_addr; use crate::peer_db::PeerGameDB; fn addr(ip: [u8; 4], port: u16) -> SocketAddr { SocketAddr::from((ip, port)) } #[tokio::test] async fn inbound_hello_keeps_existing_listening_addr() { let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let advertised = addr([10, 66, 0, 2], 40000); let transport_source = addr([10, 66, 0, 2], 52000); peer_game_db .write() .await .upsert_peer("peer".to_string(), advertised); let record_addr = peer_record_addr(&peer_game_db, &"peer".to_string(), Some(transport_source)).await; assert_eq!(record_addr, Some(advertised)); } }