From d831179783b5c212ac71c3c4fabaab9f4b980fc2 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Tue, 11 Nov 2025 21:20:03 +0100 Subject: [PATCH] wip --- crates/lanspread-peer/src/lib.rs | 409 ++++++++++++++++++++++++++--- crates/lanspread-peer/src/main.rs | 14 +- crates/lanspread-peer/src/peer.rs | 6 +- crates/lanspread-server/src/req.rs | 2 +- 4 files changed, 385 insertions(+), 46 deletions(-) diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 04e718b..87dec9e 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -10,14 +10,16 @@ use std::{ time::{Duration, Instant}, }; -use crate::peer::{send_game_file_chunk, send_game_file_data}; use bytes::BytesMut; -use gethostname::gethostname; use lanspread_db::db::{Game, GameDB, GameFileDescription}; use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, discover_service}; use lanspread_proto::{Message, Request, Response}; use s2n_quic::{ - Client as QuicClient, Connection, Server, client::Connect, provider::limits::Limits, + Client as QuicClient, + Connection, + Server, + client::Connect, + provider::limits::Limits, stream::BidirectionalStream, }; use tokio::{ @@ -30,6 +32,8 @@ use tokio::{ }; use uuid::Uuid; +use crate::peer::{send_game_file_chunk, send_game_file_data}; + static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem")); @@ -68,7 +72,14 @@ pub struct PeerGameDB { peers: HashMap, } +impl Default for PeerGameDB { + fn default() -> Self { + Self::new() + } +} + impl PeerGameDB { + #[must_use] pub fn new() -> Self { Self { peers: HashMap::new(), @@ -120,6 +131,7 @@ impl PeerGameDB { } } + #[must_use] pub fn get_all_games(&self) -> Vec { let mut aggregated: HashMap = HashMap::new(); for peer in self.peers.values() { @@ -127,14 +139,14 @@ impl PeerGameDB { aggregated .entry(game.id.clone()) .and_modify(|existing| { - if let (Some(ref new_version), Some(ref current)) = + if let (Some(new_version), Some(current)) = (&game.eti_game_version, &existing.eti_game_version) { if new_version > current { existing.eti_game_version = Some(new_version.clone()); } } else if existing.eti_game_version.is_none() { - existing.eti_game_version = game.eti_game_version.clone(); + existing.eti_game_version.clone_from(&game.eti_game_version); } }) .or_insert_with(|| game.clone()); @@ -146,18 +158,19 @@ impl PeerGameDB { games } + #[must_use] pub fn get_latest_version_for_game(&self, game_id: &str) -> Option { let mut latest_version: Option = None; for peer in self.peers.values() { - if let Some(game) = peer.games.get(game_id) { - if let Some(ref version) = game.eti_game_version { - match &latest_version { - None => latest_version = Some(version.clone()), - Some(current_latest) => { - if version > current_latest { - latest_version = Some(version.clone()); - } + if let Some(game) = peer.games.get(game_id) + && let Some(ref version) = game.eti_game_version + { + match &latest_version { + None => latest_version = Some(version.clone()), + Some(current_latest) => { + if version > current_latest { + latest_version = Some(version.clone()); } } } @@ -167,10 +180,12 @@ impl PeerGameDB { latest_version } + #[must_use] pub fn get_peer_addresses(&self) -> Vec { self.peers.keys().copied().collect() } + #[must_use] pub fn peers_with_game(&self, game_id: &str) -> Vec { self.peers .iter() @@ -179,6 +194,33 @@ impl PeerGameDB { .collect() } + #[must_use] + pub fn peers_with_latest_version(&self, game_id: &str) -> Vec { + let latest_version = self.get_latest_version_for_game(game_id); + + if let Some(ref latest) = latest_version { + self.peers + .iter() + .filter(|(_, peer)| { + if let Some(game) = peer.games.get(game_id) { + if let Some(ref version) = game.eti_game_version { + version == latest + } else { + false + } + } else { + false + } + }) + .map(|(addr, _)| *addr) + .collect() + } else { + // If no version info is available, fall back to all peers with the game + self.peers_with_game(game_id) + } + } + + #[must_use] pub fn game_files_for(&self, game_id: &str) -> Vec<(SocketAddr, Vec)> { self.peers .iter() @@ -186,6 +228,7 @@ impl PeerGameDB { .collect() } + #[must_use] pub fn aggregated_game_files(&self, game_id: &str) -> Vec { let mut seen: HashMap = HashMap::new(); for (_, files) in self.game_files_for(game_id) { @@ -196,6 +239,7 @@ impl PeerGameDB { seen.into_values().collect() } + #[must_use] pub fn get_stale_peers(&self, timeout: Duration) -> Vec { self.peers .iter() @@ -253,12 +297,14 @@ async fn initial_peer_alive_check(conn: &mut Connection) -> bool { } const CHUNK_SIZE: u64 = 512 * 1024; +const MAX_RETRY_COUNT: usize = 3; #[derive(Debug, Clone)] struct DownloadChunk { relative_path: String, offset: u64, length: u64, + retry_count: usize, } #[derive(Debug, Default)] @@ -267,6 +313,13 @@ struct PeerDownloadPlan { whole_files: Vec, } +#[derive(Debug)] +struct ChunkDownloadResult { + chunk: DownloadChunk, + result: eyre::Result<()>, + peer_addr: SocketAddr, +} + async fn prepare_game_storage( games_folder: &Path, file_descs: &[GameFileDescription], @@ -280,8 +333,9 @@ async fn prepare_game_storage( tokio::fs::create_dir_all(parent).await?; } let file_size = desc.file_size().unwrap_or(0); - let mut file = OpenOptions::new() + let file = OpenOptions::new() .create(true) + .truncate(true) .write(true) .open(&path) .await?; @@ -311,6 +365,7 @@ fn build_peer_plans( relative_path: desc.relative_path.clone(), offset: 0, length: 0, + retry_count: 0, }); continue; } @@ -324,6 +379,7 @@ fn build_peer_plans( relative_path: desc.relative_path.clone(), offset, length, + retry_count: 0, }); offset += length; } @@ -380,8 +436,12 @@ async fn download_chunk( file.seek(std::io::SeekFrom::Start(chunk.offset)).await?; let mut remaining = chunk.length; + let mut received_bytes = 0u64; + while let Some(bytes) = rx.receive().await? { file.write_all(&bytes).await?; + received_bytes += bytes.len() as u64; + if remaining == 0 { continue; } @@ -391,7 +451,46 @@ async fn download_chunk( } } + // Verify we received the expected amount of data + if chunk.length > 0 && received_bytes != chunk.length { + eyre::bail!( + "Incomplete chunk download: expected {} bytes, received {} bytes for file {} at offset {}", + chunk.length, + received_bytes, + chunk.relative_path, + chunk.offset + ); + } + file.flush().await?; + + // Verify file integrity by checking the file size + verify_chunk_integrity(&path, chunk.offset, chunk.length).await?; + + Ok(()) +} + +async fn verify_chunk_integrity( + file_path: &Path, + offset: u64, + expected_length: u64, +) -> eyre::Result<()> { + if expected_length == 0 { + return Ok(()); // Skip verification for whole files or zero-length chunks + } + + let metadata = tokio::fs::metadata(file_path).await?; + let file_size = metadata.len(); + + if file_size < offset + expected_length { + eyre::bail!( + "File integrity check failed: file size {} is less than expected {} (offset: {})", + file_size, + offset + expected_length, + offset + ); + } + Ok(()) } @@ -428,9 +527,9 @@ async fn download_from_peer( game_id: &str, plan: PeerDownloadPlan, games_folder: PathBuf, -) -> eyre::Result<()> { +) -> eyre::Result> { if plan.chunks.is_empty() && plan.whole_files.is_empty() { - return Ok(()); + return Ok(Vec::new()); } let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; @@ -446,16 +545,36 @@ async fn download_from_peer( conn.keep_alive(true)?; let base_dir = games_folder; + let mut results = Vec::new(); + // Download chunks with error handling for chunk in &plan.chunks { - download_chunk(&mut conn, &base_dir, game_id, chunk).await?; + let result = download_chunk(&mut conn, &base_dir, game_id, chunk).await; + results.push(ChunkDownloadResult { + chunk: chunk.clone(), + result, + peer_addr, + }); } + // Download whole files for desc in &plan.whole_files { - download_whole_file(&mut conn, &base_dir, desc).await?; + let chunk = DownloadChunk { + relative_path: desc.relative_path.clone(), + offset: 0, + length: 0, // Indicates whole file + retry_count: 0, + }; + + let result = download_whole_file(&mut conn, &base_dir, desc).await; + results.push(ChunkDownloadResult { + chunk, + result, + peer_addr, + }); } - Ok(()) + Ok(results) } async fn download_game_files( @@ -487,16 +606,54 @@ async fn download_game_files( })); } + let mut failed_chunks: Vec = Vec::new(); let mut last_err: Option = None; + for handle in tasks { match handle.await { - Ok(Ok(())) => {} + Ok(Ok(results)) => { + for chunk_result in results { + if let Err(e) = chunk_result.result { + log::warn!( + "Failed to download chunk from {}: {e}", + chunk_result.peer_addr + ); + if chunk_result.chunk.retry_count < MAX_RETRY_COUNT { + let mut retry_chunk = chunk_result.chunk; + retry_chunk.retry_count += 1; + failed_chunks.push(retry_chunk); + } else { + last_err = Some(eyre::eyre!( + "Max retries exceeded for chunk: {}", + chunk_result.chunk.relative_path + )); + } + } + } + } Ok(Err(e)) => last_err = Some(e), Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")), } } + // Retry failed chunks if any + if !failed_chunks.is_empty() && !peers.is_empty() { + log::info!("Retrying {} failed chunks", failed_chunks.len()); + + let retry_results = retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id).await; + + for chunk_result in retry_results { + if let Err(e) = chunk_result.result { + log::error!("Retry failed for chunk: {e}"); + last_err = Some(e); + } + } + } + if let Some(err) = last_err { + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; return Err(err); } @@ -507,6 +664,101 @@ async fn download_game_files( Ok(()) } +async fn retry_failed_chunks( + failed_chunks: Vec, + peers: &[SocketAddr], + base_dir: &Path, + game_id: &str, +) -> Vec { + let mut results = Vec::new(); + + // Redistribute failed chunks among available peers + let _retry_plans = build_peer_plans(peers, &[]); + for (i, chunk) in failed_chunks.into_iter().enumerate() { + let peer_addr = peers[i % peers.len()]; + let plan = PeerDownloadPlan { + chunks: vec![chunk], + whole_files: Vec::new(), + }; + + match download_from_peer(peer_addr, game_id, plan, base_dir.to_path_buf()).await { + Ok(chunk_results) => results.extend(chunk_results), + Err(e) => { + log::error!("Failed to retry chunk: {e}"); + // Add empty failure result + results.push(ChunkDownloadResult { + chunk: DownloadChunk { + relative_path: "unknown".to_string(), + offset: 0, + length: 0, + retry_count: MAX_RETRY_COUNT, + }, + result: Err(e), + peer_addr, + }); + } + } + } + + results +} + +async fn load_local_game_db(game_dir: &str) -> eyre::Result { + let game_path = PathBuf::from(game_dir); + + // Scan game directory for game folders + let mut games = Vec::new(); + let mut entries = tokio::fs::read_dir(&game_path).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() + && let Some(game_id) = path.file_name().and_then(|n| n.to_str()) + { + // Check if this game has a version.ini file + if let Ok(version) = lanspread_db::db::read_version_from_ini(&path) { + let size = calculate_directory_size(&path).await?; + let game = Game { + id: game_id.to_string(), + name: game_id.to_string(), // Use folder name as game name for now + description: String::new(), + release_year: String::new(), + publisher: String::new(), + max_players: 1, + version: "1.0".to_string(), + genre: String::new(), + size, + thumbnail: None, + installed: true, + eti_game_version: version.clone(), + local_version: version, + }; + games.push(game); + } + } + } + + Ok(GameDB::from(games)) +} + +async fn calculate_directory_size(dir: &Path) -> eyre::Result { + let mut total_size = 0u64; + let mut entries = tokio::fs::read_dir(dir).await?; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + let metadata = tokio::fs::metadata(&path).await?; + + if metadata.is_dir() { + total_size += Box::pin(calculate_directory_size(&path)).await?; + } else { + total_size += metadata.len(); + } + } + + Ok(total_size) +} + struct Ctx { game_dir: Arc>>, local_game_db: Arc>>, @@ -632,9 +884,9 @@ pub async fn run_peer( } let games_folder = games_folder.expect("checked above"); - let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) }; + let peers = { ctx.peer_game_db.read().await.peers_with_latest_version(&id) }; if peers.is_empty() { - log::error!("No peers available to download game {id}"); + log::error!("No peers with latest version available to download game {id}"); continue; } @@ -679,7 +931,21 @@ pub async fn run_peer( PeerCommand::SetGameDir(game_dir) => { *ctx.game_dir.write().await = Some(game_dir.clone()); log::info!("Game directory set to: {game_dir}"); - // TODO: Load local game database when game directory is set + + // Load local game database when game directory is set + let game_dir = game_dir.clone(); + let local_game_db = ctx.local_game_db.clone(); + tokio::spawn(async move { + match load_local_game_db(&game_dir).await { + Ok(db) => { + *local_game_db.write().await = Some(db); + log::info!("Local game database loaded successfully"); + } + Err(e) => { + log::error!("Failed to load local game database: {e}"); + } + } + }); } PeerCommand::ConnectToPeer(peer_addr) => { log::info!("Connecting to peer: {peer_addr}"); @@ -740,7 +1006,7 @@ async fn run_server_component( tokio::time::sleep(Duration::from_secs(1)).await; } _ => { - log::trace!("mDNS event: {:?}", event); + log::trace!("mDNS event: {event:?}"); } } } @@ -752,7 +1018,7 @@ async fn run_server_component( tokio::spawn(async move { if let Err(e) = handle_peer_connection(connection, ctx, tx_notify_ui).await { - log::error!("Peer connection error: {}", e); + log::error!("Peer connection error: {e}"); } }); } @@ -798,7 +1064,7 @@ async fn handle_peer_stream( ) -> eyre::Result<()> { let (mut rx, mut tx) = stream.split(); - log::trace!("{:?} peer stream opened", remote_addr); + log::trace!("{remote_addr:?} peer stream opened"); // handle streams loop { @@ -811,7 +1077,7 @@ async fn handle_peer_stream( ); let request = Request::decode(data); - log::debug!("{:?} msg: {request:?}", remote_addr); + log::debug!("{remote_addr:?} msg: {request:?}"); match request { Request::Ping => { @@ -835,13 +1101,23 @@ async fn handle_peer_stream( } Request::GetGame { id } => { log::info!("Received GetGame request for {id} from peer"); - // TODO: Handle game request using local game DB - let response = if let Some(ref db) = *ctx.local_game_db.read().await { - if db.get_game_by_id(&id).is_some() { - // TODO: Return actual game file descriptions - Response::GetGame { - id, - file_descriptions: Vec::new(), + let response = if let Some(ref game_dir) = *ctx.game_dir.read().await { + if let Some(ref db) = *ctx.local_game_db.read().await { + if db.get_game_by_id(&id).is_some() { + match get_game_file_descriptions(&id, game_dir).await { + Ok(file_descriptions) => Response::GetGame { + id, + file_descriptions, + }, + Err(e) => { + log::error!( + "Failed to get game file descriptions for {id}: {e}" + ); + Response::GameNotFound(id) + } + } + } else { + Response::GameNotFound(id) } } else { Response::GameNotFound(id) @@ -918,11 +1194,11 @@ async fn handle_peer_stream( } } Ok(None) => { - log::trace!("{:?} peer stream closed", remote_addr); + log::trace!("{remote_addr:?} peer stream closed"); break; } Err(e) => { - log::error!("{:?} peer stream error: {e}", remote_addr); + log::error!("{remote_addr:?} peer stream error: {e}"); break; } } @@ -946,11 +1222,11 @@ async fn run_peer_discovery( let is_new_peer = { let mut db = peer_game_db.write().await; let peer_addresses = db.get_peer_addresses(); - if !peer_addresses.contains(&peer_addr) { + if peer_addresses.contains(&peer_addr) { + false + } else { db.add_peer(peer_addr); true - } else { - false } }; @@ -1189,3 +1465,58 @@ async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result { let is_alive = initial_peer_alive_check(&mut conn).await; Ok(is_alive) } + +async fn get_game_file_descriptions( + game_id: &str, + game_dir: &str, +) -> eyre::Result> { + let base_dir = PathBuf::from(game_dir); + let game_path = base_dir.join(game_id); + + if !game_path.exists() { + eyre::bail!("Game directory does not exist: {}", game_path.display()); + } + + let mut file_descriptions = Vec::new(); + + for entry in walkdir::WalkDir::new(&game_path) + .into_iter() + .filter_map(std::result::Result::ok) + { + let relative_path = match entry.path().strip_prefix(&base_dir) { + Ok(path) => path.to_string_lossy().to_string(), + Err(e) => { + log::error!( + "Failed to get relative path for {}: {}", + entry.path().display(), + e + ); + continue; + } + }; + + let is_dir = entry.file_type().is_dir(); + let size = if is_dir { + None + } else { + match tokio::fs::metadata(entry.path()).await { + Ok(metadata) => Some(metadata.len()), + Err(e) => { + log::error!("Failed to read metadata for {relative_path}: {e}"); + None + } + } + }; + + let file_desc = GameFileDescription { + game_id: game_id.to_string(), + relative_path, + is_dir, + size, + }; + + file_descriptions.push(file_desc); + } + + Ok(file_descriptions) +} diff --git a/crates/lanspread-peer/src/main.rs b/crates/lanspread-peer/src/main.rs index c4a322f..12c540b 100644 --- a/crates/lanspread-peer/src/main.rs +++ b/crates/lanspread-peer/src/main.rs @@ -14,8 +14,7 @@ use gethostname::gethostname; use lanspread_compat::eti; use lanspread_db::db::{Game, GameDB}; use lanspread_mdns::{DaemonEvent, LANSPREAD_SERVICE_TYPE, MdnsAdvertiser}; -use lanspread_peer::{PeerEvent, run_peer}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use lanspread_peer::{PeerCommand, PeerEvent, run_peer}; use tracing_subscriber::EnvFilter; use uuid::Uuid; @@ -110,7 +109,7 @@ async fn main() -> eyre::Result<()> { // spawn mDNS listener task spawn_mdns_task(server_addr)?; - let game_db = prepare_game_db(&cli).await?; + let _game_db = prepare_game_db(&cli).await?; tracing::info!("Peer listening on {server_addr}"); @@ -162,7 +161,14 @@ async fn main() -> eyre::Result<()> { } }); - // TODO: Add CLI interaction or other peer discovery logic here + // Set the game directory from CLI args + if let Err(e) = tx_control.send(PeerCommand::SetGameDir( + cli.game_dir.to_string_lossy().to_string(), + )) { + tracing::error!("Failed to send SetGameDir command: {e}"); + } + + // TODO: Add additional CLI interaction or other peer discovery logic here // Wait for tasks let (peer_result, _) = tokio::join!(peer_task, event_handler); diff --git a/crates/lanspread-peer/src/peer.rs b/crates/lanspread-peer/src/peer.rs index 4f3ad36..c46833b 100644 --- a/crates/lanspread-peer/src/peer.rs +++ b/crates/lanspread-peer/src/peer.rs @@ -1,4 +1,5 @@ use std::{ + convert::TryInto, path::{Path, PathBuf}, sync::Arc, }; @@ -150,7 +151,7 @@ async fn stream_file_bytes( let remote_addr = maybe_addr!(tx.connection().remote_addr()); let game_file = base_dir.join(relative_path); tracing::debug!( - "{remote_addr} streaming file bytes for peer: {:?}, offset: {offset}, length: {:?}", + "{remote_addr} streaming file bytes for peer: {:?}, offset: {offset}, length: {length:?}", game_file ); @@ -166,7 +167,8 @@ async fn stream_file_bytes( let mut buf = vec![0u8; 64 * 1024]; while remaining > 0 { - let read_len = std::cmp::min(remaining, buf.len() as u64) as usize; + let read_len = std::cmp::min(remaining, buf.len() as u64); + let read_len: usize = read_len.try_into().unwrap_or(usize::MAX); if read_len == 0 { break; } diff --git a/crates/lanspread-server/src/req.rs b/crates/lanspread-server/src/req.rs index 7434f29..4b0126a 100644 --- a/crates/lanspread-server/src/req.rs +++ b/crates/lanspread-server/src/req.rs @@ -150,7 +150,7 @@ async fn stream_file_bytes( let remote_addr = maybe_addr!(tx.connection().remote_addr()); let game_file = base_dir.join(relative_path); tracing::debug!( - "{remote_addr} streaming file bytes: {:?}, offset: {offset}, length: {:?}", + "{remote_addr} streaming file bytes: {:?}, offset: {offset}, length: {length:?}", game_file );