mdns improved peer discovery

This commit is contained in:
2025-11-14 00:24:04 +01:00
parent b9e3e760d9
commit 4e9707dd51
2 changed files with 163 additions and 108 deletions
+82 -48
View File
@@ -47,57 +47,91 @@ impl Drop for MdnsAdvertiser {
} }
} }
pub struct MdnsBrowser {
daemon: ServiceDaemon,
receiver: Receiver<ServiceEvent>,
service_type: String,
}
impl MdnsBrowser {
pub fn new(service_type: &str) -> eyre::Result<Self> {
let daemon = ServiceDaemon::new()?;
let receiver = daemon.browse(service_type)?;
Ok(Self {
daemon,
receiver,
service_type: service_type.to_string(),
})
}
pub fn next_address(
&self,
ignore_addr: Option<SocketAddr>,
) -> eyre::Result<Option<SocketAddr>> {
loop {
match self.receiver.recv() {
Ok(ServiceEvent::ServiceResolved(info)) => {
log::trace!("mdns ServiceResolved event: {info:?}");
if info.ty_domain != self.service_type {
log::trace!(
"Got mDNS with uninteresting service type: {} (expected: {})",
info.ty_domain,
self.service_type,
);
continue;
}
let mut ignored_match = false;
for address in info.get_addresses() {
let addr = SocketAddr::new(address.to_ip_addr(), info.get_port());
if ignore_addr.is_some_and(|ignore| ignore == addr) {
ignored_match = true;
log::trace!("Ignoring mDNS advertisement for local server at {addr}");
continue;
}
log::info!("Found server at {addr}");
return Ok(Some(addr));
}
if ignored_match {
log::trace!(
"Only saw ignored mDNS advertisements (probably ourselves) for {:?}",
info.get_fullname()
);
continue;
}
log::error!("No address found in mDNS response: {info:?}");
}
Ok(other_event) => {
log::trace!("mdns unrelated event: {other_event:?}");
}
Err(err) => {
log::error!("mDNS browse channel closed: {err}");
return Ok(None);
}
}
}
}
}
impl Drop for MdnsBrowser {
fn drop(&mut self) {
let _ = self.daemon.shutdown();
}
}
pub fn discover_service( pub fn discover_service(
service_type: &str, service_type: &str,
ignore_addr: Option<SocketAddr>, ignore_addr: Option<SocketAddr>,
) -> eyre::Result<SocketAddr> { ) -> eyre::Result<SocketAddr> {
let mdns = ServiceDaemon::new().expect("Failed to create mDNS daemon."); // Currently unused; kept for potential one-off discovery callers that just need a single address.
let browser = MdnsBrowser::new(service_type)?;
let receiver = mdns.browse(service_type)?; match browser.next_address(ignore_addr)? {
Some(addr) => Ok(addr),
while let Ok(event) = receiver.recv() { None => bail!("No server found."),
match event {
ServiceEvent::ServiceResolved(info) => {
log::trace!("mdns ServiceResolved event: {info:?}");
// Check if this service matches our expected service type
if info.ty_domain != service_type {
log::trace!(
"Got mDNS with uninteresting service type: {} (expected: {})",
info.ty_domain,
service_type,
);
continue;
}
let mut ignored_match = false;
for address in info.get_addresses() {
let addr = SocketAddr::new(address.to_ip_addr(), info.get_port());
if ignore_addr.is_some_and(|ignore| ignore == addr) {
ignored_match = true;
log::trace!("Ignoring mDNS advertisement for local server at {addr}");
continue;
}
log::info!("Found server at {addr}");
return Ok(addr);
}
if ignored_match {
log::trace!(
"Only saw ignored mDNS advertisements (probably ourselves) for {:?}",
info.get_fullname()
);
continue;
}
log::error!("No address found in mDNS response: {info:?}");
}
other_event => {
log::trace!("mdns unrelated event: {other_event:?}");
}
}
} }
bail!("No server found.")
} }
+81 -60
View File
@@ -14,7 +14,7 @@ use std::{
use bytes::BytesMut; use bytes::BytesMut;
use if_addrs::{IfAddr, Interface, get_if_addrs}; use if_addrs::{IfAddr, Interface, get_if_addrs};
use lanspread_db::db::{Game, GameDB, GameFileDescription}; use lanspread_db::db::{Game, GameDB, GameFileDescription};
use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, discover_service}; use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser};
use lanspread_proto::{Message, Request, Response}; use lanspread_proto::{Message, Request, Response};
use s2n_quic::{ use s2n_quic::{
Client as QuicClient, Client as QuicClient,
@@ -273,6 +273,11 @@ impl PeerGameDB {
self.peers.keys().copied().collect() self.peers.keys().copied().collect()
} }
#[must_use]
pub fn contains_peer(&self, addr: &SocketAddr) -> bool {
self.peers.contains_key(addr)
}
#[must_use] #[must_use]
pub fn peers_with_game(&self, game_id: &str) -> Vec<SocketAddr> { pub fn peers_with_game(&self, game_id: &str) -> Vec<SocketAddr> {
self.peers self.peers
@@ -1816,72 +1821,88 @@ async fn run_peer_discovery(
) { ) {
log::info!("Starting peer discovery task"); log::info!("Starting peer discovery task");
let service_type = LANSPREAD_SERVICE_TYPE.to_string();
loop { loop {
let ignored_addr = { *local_peer_addr.read().await }; let (addr_tx, mut addr_rx) = tokio::sync::mpsc::unbounded_channel();
let discovery_result = discover_service(LANSPREAD_SERVICE_TYPE, ignored_addr); let service_type_clone = service_type.clone();
match discovery_result { let worker_handle = tokio::task::spawn_blocking(move || -> eyre::Result<()> {
Ok(peer_addr) => { let browser = MdnsBrowser::new(&service_type_clone)?;
let is_self = { loop {
let guard = local_peer_addr.read().await; if let Some(addr) = browser.next_address(None)? {
guard.as_ref().is_some_and(|addr| *addr == peer_addr) if addr_tx.send(addr).is_err() {
}; log::debug!("Peer discovery consumer dropped; stopping worker");
if is_self { break;
log::trace!("Ignoring self advertisement at {peer_addr}");
} else {
log::info!("Discovered peer at: {peer_addr}");
// Add peer to database
let is_new_peer = {
let mut db = peer_game_db.write().await;
let peer_addresses = db.get_peer_addresses();
if peer_addresses.contains(&peer_addr) {
false
} else {
db.add_peer(peer_addr);
true
}
};
if is_new_peer {
// Notify UI about new peer
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) {
log::error!("Failed to send PeerDiscovered event: {e}");
}
// Send updated peer count
let current_peer_count =
{ peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) =
tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count))
{
log::error!("Failed to send PeerCountUpdated event: {e}");
}
// Request games from this peer
let tx_notify_ui_clone = tx_notify_ui.clone();
let peer_game_db_clone = peer_game_db.clone();
tokio::spawn(async move {
if let Err(e) = request_games_from_peer(
peer_addr,
tx_notify_ui_clone,
peer_game_db_clone,
)
.await
{
log::error!("Failed to request games from peer {peer_addr}: {e}");
}
});
} }
} else {
log::warn!("mDNS browser closed; stopping peer discovery worker");
break;
} }
} }
Err(e) => { Ok(())
log::debug!("Peer discovery error: {e}"); });
tokio::time::sleep(Duration::from_secs(5)).await;
while let Some(peer_addr) = addr_rx.recv().await {
let is_self = {
let guard = local_peer_addr.read().await;
guard.as_ref().is_some_and(|addr| *addr == peer_addr)
};
if is_self {
log::trace!("Ignoring self advertisement at {peer_addr}");
continue;
}
let is_new_peer = {
let mut db = peer_game_db.write().await;
if db.contains_peer(&peer_addr) {
db.update_last_seen(&peer_addr);
false
} else {
db.add_peer(peer_addr);
true
}
};
if is_new_peer {
log::info!("Discovered peer at: {peer_addr}");
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) {
log::error!("Failed to send PeerDiscovered event: {e}");
}
let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) {
log::error!("Failed to send PeerCountUpdated event: {e}");
}
let tx_notify_ui_clone = tx_notify_ui.clone();
let peer_game_db_clone = peer_game_db.clone();
tokio::spawn(async move {
if let Err(e) =
request_games_from_peer(peer_addr, tx_notify_ui_clone, peer_game_db_clone)
.await
{
log::error!("Failed to request games from peer {peer_addr}: {e}");
}
});
} }
} }
// Wait before next discovery cycle
tokio::time::sleep(Duration::from_secs(10)).await; match worker_handle.await {
Ok(Ok(())) => {
log::warn!("Peer discovery worker exited; restarting shortly");
}
Ok(Err(e)) => {
log::error!("Peer discovery worker failed: {e}");
}
Err(e) => {
log::error!("Peer discovery worker join error: {e}");
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
} }
} }