From 4e9707dd512b6574fd96c5997484e12cad0d6ef7 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Fri, 14 Nov 2025 00:24:04 +0100 Subject: [PATCH] mdns improved peer discovery --- crates/lanspread-mdns/src/lib.rs | 130 +++++++++++++++++----------- crates/lanspread-peer/src/lib.rs | 141 ++++++++++++++++++------------- 2 files changed, 163 insertions(+), 108 deletions(-) diff --git a/crates/lanspread-mdns/src/lib.rs b/crates/lanspread-mdns/src/lib.rs index b351a40..e69db19 100644 --- a/crates/lanspread-mdns/src/lib.rs +++ b/crates/lanspread-mdns/src/lib.rs @@ -47,57 +47,91 @@ impl Drop for MdnsAdvertiser { } } +pub struct MdnsBrowser { + daemon: ServiceDaemon, + receiver: Receiver, + service_type: String, +} + +impl MdnsBrowser { + pub fn new(service_type: &str) -> eyre::Result { + let daemon = ServiceDaemon::new()?; + let receiver = daemon.browse(service_type)?; + Ok(Self { + daemon, + receiver, + service_type: service_type.to_string(), + }) + } + + pub fn next_address( + &self, + ignore_addr: Option, + ) -> eyre::Result> { + loop { + match self.receiver.recv() { + Ok(ServiceEvent::ServiceResolved(info)) => { + log::trace!("mdns ServiceResolved event: {info:?}"); + + if info.ty_domain != self.service_type { + log::trace!( + "Got mDNS with uninteresting service type: {} (expected: {})", + info.ty_domain, + self.service_type, + ); + continue; + } + + let mut ignored_match = false; + for address in info.get_addresses() { + let addr = SocketAddr::new(address.to_ip_addr(), info.get_port()); + + if ignore_addr.is_some_and(|ignore| ignore == addr) { + ignored_match = true; + log::trace!("Ignoring mDNS advertisement for local server at {addr}"); + continue; + } + + log::info!("Found server at {addr}"); + return Ok(Some(addr)); + } + + if ignored_match { + log::trace!( + "Only saw ignored mDNS advertisements (probably ourselves) for {:?}", + info.get_fullname() + ); + continue; + } + + log::error!("No address found in mDNS response: {info:?}"); + } + Ok(other_event) => { + log::trace!("mdns unrelated event: {other_event:?}"); + } + Err(err) => { + log::error!("mDNS browse channel closed: {err}"); + return Ok(None); + } + } + } + } +} + +impl Drop for MdnsBrowser { + fn drop(&mut self) { + let _ = self.daemon.shutdown(); + } +} + pub fn discover_service( service_type: &str, ignore_addr: Option, ) -> eyre::Result { - let mdns = ServiceDaemon::new().expect("Failed to create mDNS daemon."); - - let receiver = mdns.browse(service_type)?; - - while let Ok(event) = receiver.recv() { - match event { - ServiceEvent::ServiceResolved(info) => { - log::trace!("mdns ServiceResolved event: {info:?}"); - - // Check if this service matches our expected service type - if info.ty_domain != service_type { - log::trace!( - "Got mDNS with uninteresting service type: {} (expected: {})", - info.ty_domain, - service_type, - ); - continue; - } - - let mut ignored_match = false; - for address in info.get_addresses() { - let addr = SocketAddr::new(address.to_ip_addr(), info.get_port()); - - if ignore_addr.is_some_and(|ignore| ignore == addr) { - ignored_match = true; - log::trace!("Ignoring mDNS advertisement for local server at {addr}"); - continue; - } - - log::info!("Found server at {addr}"); - return Ok(addr); - } - - if ignored_match { - log::trace!( - "Only saw ignored mDNS advertisements (probably ourselves) for {:?}", - info.get_fullname() - ); - continue; - } - - log::error!("No address found in mDNS response: {info:?}"); - } - other_event => { - log::trace!("mdns unrelated event: {other_event:?}"); - } - } + // Currently unused; kept for potential one-off discovery callers that just need a single address. + let browser = MdnsBrowser::new(service_type)?; + match browser.next_address(ignore_addr)? { + Some(addr) => Ok(addr), + None => bail!("No server found."), } - bail!("No server found.") } diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index b5f9a53..f94bc95 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -14,7 +14,7 @@ use std::{ use bytes::BytesMut; use if_addrs::{IfAddr, Interface, get_if_addrs}; use lanspread_db::db::{Game, GameDB, GameFileDescription}; -use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, discover_service}; +use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser}; use lanspread_proto::{Message, Request, Response}; use s2n_quic::{ Client as QuicClient, @@ -273,6 +273,11 @@ impl PeerGameDB { self.peers.keys().copied().collect() } + #[must_use] + pub fn contains_peer(&self, addr: &SocketAddr) -> bool { + self.peers.contains_key(addr) + } + #[must_use] pub fn peers_with_game(&self, game_id: &str) -> Vec { self.peers @@ -1816,72 +1821,88 @@ async fn run_peer_discovery( ) { log::info!("Starting peer discovery task"); + let service_type = LANSPREAD_SERVICE_TYPE.to_string(); + loop { - let ignored_addr = { *local_peer_addr.read().await }; - let discovery_result = discover_service(LANSPREAD_SERVICE_TYPE, ignored_addr); + let (addr_tx, mut addr_rx) = tokio::sync::mpsc::unbounded_channel(); + let service_type_clone = service_type.clone(); - match discovery_result { - Ok(peer_addr) => { - let is_self = { - let guard = local_peer_addr.read().await; - guard.as_ref().is_some_and(|addr| *addr == peer_addr) - }; - if is_self { - log::trace!("Ignoring self advertisement at {peer_addr}"); - } else { - log::info!("Discovered peer at: {peer_addr}"); - - // Add peer to database - let is_new_peer = { - let mut db = peer_game_db.write().await; - let peer_addresses = db.get_peer_addresses(); - if peer_addresses.contains(&peer_addr) { - false - } else { - db.add_peer(peer_addr); - true - } - }; - - if is_new_peer { - // Notify UI about new peer - if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) { - log::error!("Failed to send PeerDiscovered event: {e}"); - } - - // Send updated peer count - let current_peer_count = - { peer_game_db.read().await.get_peer_addresses().len() }; - if let Err(e) = - tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) - { - log::error!("Failed to send PeerCountUpdated event: {e}"); - } - - // Request games from this peer - let tx_notify_ui_clone = tx_notify_ui.clone(); - let peer_game_db_clone = peer_game_db.clone(); - tokio::spawn(async move { - if let Err(e) = request_games_from_peer( - peer_addr, - tx_notify_ui_clone, - peer_game_db_clone, - ) - .await - { - log::error!("Failed to request games from peer {peer_addr}: {e}"); - } - }); + let worker_handle = tokio::task::spawn_blocking(move || -> eyre::Result<()> { + let browser = MdnsBrowser::new(&service_type_clone)?; + loop { + if let Some(addr) = browser.next_address(None)? { + if addr_tx.send(addr).is_err() { + log::debug!("Peer discovery consumer dropped; stopping worker"); + break; } + } else { + log::warn!("mDNS browser closed; stopping peer discovery worker"); + break; } } - Err(e) => { - log::debug!("Peer discovery error: {e}"); - tokio::time::sleep(Duration::from_secs(5)).await; + Ok(()) + }); + + while let Some(peer_addr) = addr_rx.recv().await { + let is_self = { + let guard = local_peer_addr.read().await; + guard.as_ref().is_some_and(|addr| *addr == peer_addr) + }; + + if is_self { + log::trace!("Ignoring self advertisement at {peer_addr}"); + continue; + } + + let is_new_peer = { + let mut db = peer_game_db.write().await; + if db.contains_peer(&peer_addr) { + db.update_last_seen(&peer_addr); + false + } else { + db.add_peer(peer_addr); + true + } + }; + + if is_new_peer { + log::info!("Discovered peer at: {peer_addr}"); + + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) { + log::error!("Failed to send PeerDiscovered event: {e}"); + } + + let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() }; + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) { + log::error!("Failed to send PeerCountUpdated event: {e}"); + } + + let tx_notify_ui_clone = tx_notify_ui.clone(); + let peer_game_db_clone = peer_game_db.clone(); + tokio::spawn(async move { + if let Err(e) = + request_games_from_peer(peer_addr, tx_notify_ui_clone, peer_game_db_clone) + .await + { + log::error!("Failed to request games from peer {peer_addr}: {e}"); + } + }); } } - // Wait before next discovery cycle - tokio::time::sleep(Duration::from_secs(10)).await; + + match worker_handle.await { + Ok(Ok(())) => { + log::warn!("Peer discovery worker exited; restarting shortly"); + } + Ok(Err(e)) => { + log::error!("Peer discovery worker failed: {e}"); + } + Err(e) => { + log::error!("Peer discovery worker join error: {e}"); + } + } + + tokio::time::sleep(Duration::from_secs(5)).await; } }