From 3600b3ba6f018962867d8cf1e4129942b0dc8e2d Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sat, 8 Nov 2025 21:03:58 +0100 Subject: [PATCH] wip --- crates/lanspread-peer/src/lib.rs | 433 +++++++++++++++++++++++++++++- crates/lanspread-peer/src/main.rs | 6 + 2 files changed, 426 insertions(+), 13 deletions(-) diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 401b084..957e709 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -1,8 +1,19 @@ #![allow(clippy::missing_errors_doc)] -use std::{fs::File, io::Write, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + fs::File, + io::Write, + net::SocketAddr, + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, +}; -use lanspread_db::db::{Game, GameFileDescription}; +use bytes::BytesMut; +use gethostname::gethostname; +use lanspread_db::db::{Game, GameDB, GameFileDescription}; +use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, discover_service}; use lanspread_proto::{Message, Request, Response}; use s2n_quic::{ Client as QuicClient, @@ -19,6 +30,7 @@ use tokio::{ mpsc::{UnboundedReceiver, UnboundedSender}, }, }; +use uuid::Uuid; static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem")); @@ -41,6 +53,98 @@ pub enum PeerEvent { }, PeerConnected(SocketAddr), PeerDisconnected(SocketAddr), + PeerDiscovered(SocketAddr), + PeerLost(SocketAddr), +} + +#[derive(Clone, Debug)] +pub struct PeerInfo { + pub addr: SocketAddr, + pub last_seen: Instant, + pub games: Vec, +} + +#[derive(Debug)] +pub struct PeerGameDB { + peers: HashMap, +} + +impl PeerGameDB { + pub fn new() -> Self { + Self { + peers: HashMap::new(), + } + } + + pub fn add_peer(&mut self, addr: SocketAddr) { + let peer_info = PeerInfo { + addr, + last_seen: Instant::now(), + games: Vec::new(), + }; + self.peers.insert(addr, peer_info); + log::info!("Added peer: {addr}"); + } + + pub fn remove_peer(&mut self, addr: &SocketAddr) -> Option { + self.peers.remove(addr) + } + + pub fn update_peer_games(&mut self, addr: SocketAddr, games: Vec) { + if let Some(peer) = self.peers.get_mut(&addr) { + peer.games = games; + peer.last_seen = Instant::now(); + log::info!("Updated games for peer: {addr}"); + } + } + + pub fn update_last_seen(&mut self, addr: &SocketAddr) { + if let Some(peer) = self.peers.get_mut(addr) { + peer.last_seen = Instant::now(); + } + } + + pub fn get_all_games(&self) -> Vec { + let mut all_games = Vec::new(); + for peer in self.peers.values() { + all_games.extend(peer.games.clone()); + } + all_games + } + + pub fn get_latest_version_for_game(&self, game_id: &str) -> Option { + let mut latest_version: Option = None; + + for peer in self.peers.values() { + if let Some(game) = peer.games.iter().find(|g| g.id == game_id) { + if let Some(ref version) = game.eti_game_version { + match &latest_version { + None => latest_version = Some(version.clone()), + Some(current_latest) => { + // Simple string comparison for now - could use semver + if version > current_latest { + latest_version = Some(version.clone()); + } + } + } + } + } + } + + latest_version + } + + pub fn get_peer_addresses(&self) -> Vec { + self.peers.keys().copied().collect() + } + + pub fn get_stale_peers(&self, timeout: Duration) -> Vec { + self.peers + .iter() + .filter(|(_, peer)| peer.last_seen.elapsed() > timeout) + .map(|(addr, _)| *addr) + .collect() + } } #[derive(Debug)] @@ -174,11 +278,15 @@ async fn download_game_files( struct Ctx { game_dir: Arc>>, + local_game_db: Arc>>, + peer_game_db: Arc>, } #[derive(Clone, Debug)] struct PeerCtx { game_dir: Arc>>, + local_game_db: Arc>>, + peer_game_db: Arc>, } pub async fn run_peer( @@ -188,10 +296,14 @@ pub async fn run_peer( // peer context let ctx = Ctx { game_dir: Arc::new(RwLock::new(None)), + local_game_db: Arc::new(RwLock::new(None)), + peer_game_db: Arc::new(RwLock::new(PeerGameDB::new())), }; let peer_ctx = PeerCtx { game_dir: ctx.game_dir.clone(), + local_game_db: ctx.local_game_db.clone(), + peer_game_db: ctx.peer_game_db.clone(), }; // Start server component @@ -206,6 +318,20 @@ pub async fn run_peer( } }); + // Start peer discovery task + let tx_notify_ui_discovery = tx_notify_ui.clone(); + let peer_game_db_discovery = ctx.peer_game_db.clone(); + tokio::spawn(async move { + run_peer_discovery(tx_notify_ui_discovery, peer_game_db_discovery).await; + }); + + // Start ping service task + let tx_notify_ui_ping = tx_notify_ui.clone(); + let peer_game_db_ping = ctx.peer_game_db.clone(); + tokio::spawn(async move { + run_ping_service(tx_notify_ui_ping, peer_game_db_ping).await; + }); + // Handle client commands loop { let Some(cmd) = rx_control.recv().await else { @@ -214,11 +340,14 @@ pub async fn run_peer( match cmd { PeerCommand::ListGames => { - // TODO: Implement peer discovery and game listing log::info!("ListGames command received"); + let all_games = { ctx.peer_game_db.read().await.get_all_games() }; + if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(all_games)) { + log::error!("Failed to send ListGames event: {e}"); + } } PeerCommand::GetGame(id) => { - log::info!("Requesting game from peer: {id}"); + log::info!("Requesting game from peers: {id}"); // TODO: Implement game fetching from peers } PeerCommand::DownloadGameFiles { @@ -237,10 +366,11 @@ pub async fn run_peer( PeerCommand::SetGameDir(game_dir) => { *ctx.game_dir.write().await = Some(game_dir.clone()); log::info!("Game directory set to: {game_dir}"); + // TODO: Load local game database when game directory is set } PeerCommand::ConnectToPeer(peer_addr) => { log::info!("Connecting to peer: {peer_addr}"); - // TODO: Implement peer connection + // TODO: Implement direct peer connection } } } @@ -266,7 +396,42 @@ async fn run_server_component( let server_addr = server.local_addr()?; log::info!("Peer server listening on {server_addr}"); - // TODO: Implement mDNS advertising for peer discovery + // Start mDNS advertising for peer discovery + let peer_id = Uuid::now_v7().simple().to_string(); + let hostname = gethostname::gethostname(); + let hostname_str = hostname.to_str().unwrap_or(""); + + // Calculate maximum hostname length that fits with UUID in 63 char limit + let max_hostname_len = 63usize.saturating_sub(peer_id.len() + 1); + let truncated_hostname = if hostname_str.len() > max_hostname_len { + hostname_str.get(..max_hostname_len).unwrap_or(hostname_str) + } else { + hostname_str + }; + + let combined_str = if truncated_hostname.is_empty() { + peer_id + } else { + format!("{truncated_hostname}-{peer_id}") + }; + + let mdns = MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, server_addr)?; + + // Monitor mDNS events + let _tx_notify_ui_mdns = tx_notify_ui.clone(); + tokio::spawn(async move { + while let Ok(event) = mdns.monitor.recv() { + match event { + lanspread_mdns::DaemonEvent::Error(e) => { + log::error!("mDNS error: {e}"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + _ => { + log::trace!("mDNS event: {:?}", event); + } + } + } + }); while let Some(connection) = server.accept().await { let ctx = ctx.clone(); @@ -320,19 +485,20 @@ async fn handle_peer_stream( ) -> eyre::Result<()> { let (mut rx, mut tx) = stream.split(); - log::trace!("{remote_addr:?} peer stream opened"); + log::trace!("{:?} peer stream opened", remote_addr); // handle streams loop { match rx.receive().await { Ok(Some(data)) => { log::trace!( - "{remote_addr:?} msg: (raw): {}", + "{:?} msg: (raw): {}", + remote_addr, String::from_utf8_lossy(&data) ); let request = Request::decode(data); - log::debug!("{remote_addr:?} msg: {request:?}"); + log::debug!("{:?} msg: {request:?}", remote_addr); match request { Request::Ping => { @@ -342,12 +508,38 @@ async fn handle_peer_stream( } } Request::ListGames => { - // TODO: Return list of games from this peer + // Return list of games from this peer log::info!("Received ListGames request from peer"); + let games = if let Some(ref db) = *ctx.local_game_db.read().await { + db.all_games().into_iter().cloned().collect() + } else { + Vec::new() + }; + + if let Err(e) = tx.send(Response::ListGames(games).encode()).await { + log::error!("Failed to send ListGames response: {e}"); + } } Request::GetGame { id } => { log::info!("Received GetGame request for {id} from peer"); - // TODO: Handle game request + // TODO: Handle game request using local game DB + let response = if let Some(ref db) = *ctx.local_game_db.read().await { + if db.get_game_by_id(&id).is_some() { + // TODO: Return actual game file descriptions + Response::GetGame { + id, + file_descriptions: Vec::new(), + } + } else { + Response::GameNotFound(id) + } + } else { + Response::GameNotFound(id) + }; + + if let Err(e) = tx.send(response.encode()).await { + log::error!("Failed to send GetGame response: {e}"); + } } Request::GetGameFileData(desc) => { log::info!( @@ -355,6 +547,18 @@ async fn handle_peer_stream( desc.relative_path ); // TODO: Handle file data request + if let Err(e) = tx + .send( + Response::InvalidRequest( + desc.relative_path.as_bytes().to_vec().into(), + "File transfer not implemented yet".to_string(), + ) + .encode(), + ) + .await + { + log::error!("Failed to send GetGameFileData response: {e}"); + } } Request::Invalid(_, _) => { log::error!("Received invalid request from peer"); @@ -362,11 +566,11 @@ async fn handle_peer_stream( } } Ok(None) => { - log::trace!("{remote_addr:?} peer stream closed"); + log::trace!("{:?} peer stream closed", remote_addr); break; } Err(e) => { - log::error!("{remote_addr:?} peer stream error: {e}"); + log::error!("{:?} peer stream error: {e}", remote_addr); break; } } @@ -374,3 +578,206 @@ async fn handle_peer_stream( Ok(()) } + +async fn run_peer_discovery( + tx_notify_ui: UnboundedSender, + peer_game_db: Arc>, +) { + log::info!("Starting peer discovery task"); + + loop { + match discover_service(LANSPREAD_SERVICE_TYPE) { + Ok(peer_addr) => { + log::info!("Discovered peer at: {peer_addr}"); + + // Add peer to database + let is_new_peer = { + let mut db = peer_game_db.write().await; + let peer_addresses = db.get_peer_addresses(); + if !peer_addresses.contains(&peer_addr) { + db.add_peer(peer_addr); + true + } else { + false + } + }; + + if is_new_peer { + // Notify UI about new peer + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) { + log::error!("Failed to send PeerDiscovered event: {e}"); + } + + // Request games from this peer + let tx_notify_ui_clone = tx_notify_ui.clone(); + let peer_game_db_clone = peer_game_db.clone(); + tokio::spawn(async move { + if let Err(e) = request_games_from_peer( + peer_addr, + tx_notify_ui_clone, + peer_game_db_clone, + ) + .await + { + log::error!("Failed to request games from peer {peer_addr}: {e}"); + } + }); + } + } + Err(e) => { + log::debug!("Peer discovery error: {e}"); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + + // Wait before next discovery cycle + tokio::time::sleep(Duration::from_secs(10)).await; + } +} + +async fn request_games_from_peer( + peer_addr: SocketAddr, + tx_notify_ui: UnboundedSender, + peer_game_db: Arc>, +) -> eyre::Result<()> { + let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; + + let client = QuicClient::builder() + .with_tls(CERT_PEM)? + .with_io("0.0.0.0:0")? + .with_limits(limits)? + .start()?; + + let conn = Connect::new(peer_addr).with_server_name("localhost"); + let mut conn = client.connect(conn).await?; + + let stream = conn.open_bidirectional_stream().await?; + let (mut rx, mut tx) = stream.split(); + + // Send ListGames request + tx.send(Request::ListGames.encode()).await?; + let _ = tx.close().await; + + // Receive response + let mut data = BytesMut::new(); + while let Ok(Some(bytes)) = rx.receive().await { + data.extend_from_slice(&bytes); + } + + let response = Response::decode(data.freeze()); + match response { + Response::ListGames(games) => { + log::info!("Received {} games from peer {peer_addr}", games.len()); + + // Update peer games in database + { + let mut db = peer_game_db.write().await; + db.update_peer_games(peer_addr, games.clone()); + } + + // Notify UI about updated games + if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(games)) { + log::error!("Failed to send ListGames event: {e}"); + } + } + _ => { + log::warn!("Unexpected response from peer {peer_addr}: {response:?}"); + } + } + + Ok(()) +} + +async fn run_ping_service( + tx_notify_ui: UnboundedSender, + peer_game_db: Arc>, +) { + log::info!("Starting ping service (10s interval)"); + + let mut interval = tokio::time::interval(Duration::from_secs(10)); + + loop { + interval.tick().await; + + let peer_addresses = { peer_game_db.read().await.get_peer_addresses() }; + + for peer_addr in peer_addresses { + let tx_notify_ui_clone = tx_notify_ui.clone(); + let peer_game_db_clone = peer_game_db.clone(); + + tokio::spawn(async move { + match ping_peer(peer_addr).await { + Ok(is_alive) => { + if is_alive { + // Update last seen time + peer_game_db_clone + .write() + .await + .update_last_seen(&peer_addr); + } else { + log::warn!("Peer {peer_addr} failed ping check"); + + // Remove stale peer + let removed_peer = + peer_game_db_clone.write().await.remove_peer(&peer_addr); + if removed_peer.is_some() { + log::info!("Removed stale peer: {peer_addr}"); + if let Err(e) = + tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr)) + { + log::error!("Failed to send PeerLost event: {e}"); + } + } + } + } + Err(e) => { + log::error!("Failed to ping peer {peer_addr}: {e}"); + + // Remove peer on error + let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_addr); + if removed_peer.is_some() { + log::info!("Removed peer due to ping error: {peer_addr}"); + if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr)) + { + log::error!("Failed to send PeerLost event: {e}"); + } + } + } + } + }); + } + + // Also clean up stale peers + let stale_peers = { + peer_game_db + .read() + .await + .get_stale_peers(Duration::from_secs(30)) + }; + for stale_addr in stale_peers { + let removed_peer = peer_game_db.write().await.remove_peer(&stale_addr); + if removed_peer.is_some() { + log::info!("Removed stale peer: {stale_addr}"); + if let Err(e) = tx_notify_ui.send(PeerEvent::PeerLost(stale_addr)) { + log::error!("Failed to send PeerLost event: {e}"); + } + } + } + } +} + +async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result { + let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; + + let client = QuicClient::builder() + .with_tls(CERT_PEM)? + .with_io("0.0.0.0:0")? + .with_limits(limits)? + .start()?; + + let conn = Connect::new(peer_addr).with_server_name("localhost"); + let mut conn = client.connect(conn).await?; + + initial_peer_alive_check(&mut conn).await; + Ok(true) +} diff --git a/crates/lanspread-peer/src/main.rs b/crates/lanspread-peer/src/main.rs index 2c76244..c4a322f 100644 --- a/crates/lanspread-peer/src/main.rs +++ b/crates/lanspread-peer/src/main.rs @@ -152,6 +152,12 @@ async fn main() -> eyre::Result<()> { PeerEvent::PeerDisconnected(addr) => { tracing::info!("Peer disconnected: {}", addr); } + PeerEvent::PeerDiscovered(addr) => { + tracing::info!("Peer discovered: {}", addr); + } + PeerEvent::PeerLost(addr) => { + tracing::info!("Peer lost: {}", addr); + } } } });