#![allow(clippy::missing_errors_doc)] mod path_validation; mod peer; use std::{ collections::{HashMap, VecDeque}, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, sync::Arc, time::{Duration, Instant}, }; use bytes::BytesMut; use if_addrs::{IfAddr, Interface, get_if_addrs}; use lanspread_db::db::{Game, GameDB, GameFileDescription}; use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, discover_service}; use lanspread_proto::{Message, Request, Response}; use s2n_quic::{ Client as QuicClient, Connection, Server, client::Connect, provider::limits::Limits, stream::BidirectionalStream, }; use tokio::{ fs::OpenOptions, io::{AsyncSeekExt, AsyncWriteExt}, sync::{ RwLock, mpsc::{UnboundedReceiver, UnboundedSender}, }, }; use uuid::Uuid; use crate::{ path_validation::validate_game_file_path, peer::{send_game_file_chunk, send_game_file_data}, }; /// Custom error types for peer operations #[derive(Debug)] pub enum PeerError { FileSizeDetermination { path: String, source: std::io::Error, }, GameDirNotSet, Other(eyre::Report), } impl std::fmt::Display for PeerError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { PeerError::FileSizeDetermination { path, source } => { write!(f, "Failed to determine file size for {path}: {source}") } PeerError::GameDirNotSet => write!(f, "Game directory not set"), PeerError::Other(err) => write!(f, "General error: {err}"), } } } impl std::error::Error for PeerError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { PeerError::FileSizeDetermination { source, .. } => Some(source), PeerError::Other(err) => Some(err.root_cause()), PeerError::GameDirNotSet => None, } } } impl From for PeerError { fn from(err: eyre::Report) -> Self { PeerError::Other(err) } } /// Initialize and start the peer system /// This function replaces the main.rs entry point and allows the peer to be started from other crates pub fn start_peer( game_dir: String, tx_notify_ui: UnboundedSender, ) -> eyre::Result> { log::info!("Starting peer system with game directory: {game_dir}"); let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel(); // Start the peer in a background task let tx_control_clone = tx_control.clone(); tokio::spawn(async move { if let Err(e) = run_peer(rx_control, tx_notify_ui).await { log::error!("Peer system failed: {e}"); } }); // Set the game directory tx_control.send(PeerCommand::SetGameDir(game_dir))?; Ok(tx_control_clone) } static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem")); #[derive(Debug)] pub enum PeerEvent { ListGames(Vec), GotGameFiles { id: String, file_descriptions: Vec, }, DownloadGameFilesBegin { id: String, }, DownloadGameFilesFinished { id: String, }, DownloadGameFilesFailed { id: String, }, NoPeersHaveGame { id: String, }, PeerConnected(SocketAddr), PeerDisconnected(SocketAddr), PeerDiscovered(SocketAddr), PeerLost(SocketAddr), } #[derive(Clone, Debug)] pub struct PeerInfo { pub addr: SocketAddr, pub last_seen: Instant, pub games: HashMap, pub files: HashMap>, } #[derive(Debug)] pub struct PeerGameDB { peers: HashMap, } impl Default for PeerGameDB { fn default() -> Self { Self::new() } } impl PeerGameDB { #[must_use] pub fn new() -> Self { Self { peers: HashMap::new(), } } pub fn add_peer(&mut self, addr: SocketAddr) { let peer_info = PeerInfo { addr, last_seen: Instant::now(), games: HashMap::new(), files: HashMap::new(), }; self.peers.insert(addr, peer_info); log::info!("Added peer: {addr}"); } pub fn remove_peer(&mut self, addr: &SocketAddr) -> Option { self.peers.remove(addr) } pub fn update_peer_games(&mut self, addr: SocketAddr, games: Vec) { if let Some(peer) = self.peers.get_mut(&addr) { let mut map = HashMap::with_capacity(games.len()); for game in games { map.insert(game.id.clone(), game); } peer.games = map; peer.last_seen = Instant::now(); log::info!("Updated games for peer: {addr}"); } } pub fn update_peer_game_files( &mut self, addr: SocketAddr, game_id: &str, files: Vec, ) { if let Some(peer) = self.peers.get_mut(&addr) { peer.files.insert(game_id.to_string(), files); peer.last_seen = Instant::now(); } } pub fn update_last_seen(&mut self, addr: &SocketAddr) { if let Some(peer) = self.peers.get_mut(addr) { peer.last_seen = Instant::now(); } } #[must_use] pub fn get_all_games(&self) -> Vec { let mut aggregated: HashMap = HashMap::new(); let mut peer_counts: HashMap = HashMap::new(); // Count peers per game for peer in self.peers.values() { for game_id in peer.games.keys() { *peer_counts.entry(game_id.clone()).or_insert(0) += 1; } } // Aggregate games with peer counts for peer in self.peers.values() { for game in peer.games.values() { aggregated .entry(game.id.clone()) .and_modify(|existing| { if let (Some(new_version), Some(current)) = (&game.eti_game_version, &existing.eti_game_version) { if new_version > current { existing.eti_game_version = Some(new_version.clone()); } } else if existing.eti_game_version.is_none() { existing.eti_game_version.clone_from(&game.eti_game_version); } // Update peer count existing.peer_count = peer_counts[&game.id]; }) .or_insert_with(|| { let mut game_clone = game.clone(); game_clone.peer_count = peer_counts[&game.id]; game_clone }); } } let mut games: Vec = aggregated.into_values().collect(); games.sort_by(|a, b| a.name.cmp(&b.name)); games } #[must_use] pub fn get_latest_version_for_game(&self, game_id: &str) -> Option { let mut latest_version: Option = None; for peer in self.peers.values() { if let Some(game) = peer.games.get(game_id) && let Some(ref version) = game.eti_game_version { match &latest_version { None => latest_version = Some(version.clone()), Some(current_latest) => { if version > current_latest { latest_version = Some(version.clone()); } } } } } latest_version } #[must_use] pub fn get_peer_addresses(&self) -> Vec { self.peers.keys().copied().collect() } #[must_use] pub fn peers_with_game(&self, game_id: &str) -> Vec { self.peers .iter() .filter(|(_, peer)| peer.games.contains_key(game_id)) .map(|(addr, _)| *addr) .collect() } #[must_use] pub fn peers_with_latest_version(&self, game_id: &str) -> Vec { let latest_version = self.get_latest_version_for_game(game_id); if let Some(ref latest) = latest_version { self.peers .iter() .filter(|(_, peer)| { if let Some(game) = peer.games.get(game_id) { if let Some(ref version) = game.eti_game_version { version == latest } else { false } } else { false } }) .map(|(addr, _)| *addr) .collect() } else { // If no version info is available, fall back to all peers with the game self.peers_with_game(game_id) } } #[must_use] pub fn game_files_for(&self, game_id: &str) -> Vec<(SocketAddr, Vec)> { self.peers .iter() .filter_map(|(addr, peer)| peer.files.get(game_id).cloned().map(|files| (*addr, files))) .collect() } #[must_use] pub fn aggregated_game_files(&self, game_id: &str) -> Vec { let mut seen: HashMap = HashMap::new(); for (_, files) in self.game_files_for(game_id) { for file in files { seen.entry(file.relative_path.clone()).or_insert(file); } } seen.into_values().collect() } /// Validates file sizes across all peers and returns only the files with majority consensus /// Returns a tuple of (`validated_files`, `peer_whitelist`) where `peer_whitelist` contains /// only peers that have the majority-approved file sizes pub fn validate_file_sizes_majority( &self, game_id: &str, ) -> eyre::Result<(Vec, Vec)> { let game_files = self.game_files_for(game_id); if game_files.is_empty() { return Ok((Vec::new(), Vec::new())); } let (file_size_map, _peer_files) = collect_file_sizes(&game_files); let (validated_files, peer_scores) = self.validate_each_file_consensus(game_id, file_size_map)?; let peer_whitelist = create_peer_whitelist(peer_scores); Ok((validated_files, peer_whitelist)) } /// Validates consensus for each file and returns validated files with peer scores fn validate_each_file_consensus( &self, game_id: &str, file_size_map: FileSizeMap, ) -> eyre::Result<(Vec, HashMap)> { let mut validated_files = Vec::new(); let mut peer_whitelist_scores: HashMap = HashMap::new(); for (relative_path, size_map) in file_size_map { let total_peers: usize = size_map.values().map(Vec::len).sum(); if total_peers == 0 { continue; // Skip files with no size information } let (consensus_size, consensus_peers) = self.determine_size_consensus(&size_map, total_peers, &relative_path)?; update_peer_scores(&consensus_peers, &mut peer_whitelist_scores); if let Some((size, peers)) = consensus_size && let Some(file_desc) = self.create_validated_file_description(game_id, &relative_path, size, &peers) { validated_files.push(file_desc); } } Ok((validated_files, peer_whitelist_scores)) } /// Determines the consensus size for a file based on peer reports /// /// # Panics /// /// Panics if `size_map.iter().next()` returns None when `total_peers` == 1 #[allow(clippy::unused_self)] fn determine_size_consensus( &self, size_map: &HashMap>, total_peers: usize, relative_path: &str, ) -> eyre::Result<(ConsensusResult, Vec)> { if total_peers == 1 { // Only one peer has this file - trust it let (&size, peers) = size_map .iter() .next() .expect("size_map should have at least one entry when total_peers == 1"); return Ok((Some((size, peers.clone())), peers.clone())); } let (majority_size, _majority_count) = find_majority_size(size_map); if let Some(size) = majority_size { let majority_peers = &size_map[&size]; let is_majority = majority_peers.len() > total_peers / 2; if is_majority { // We have a clear majority Ok((Some((size, majority_peers.clone())), majority_peers.clone())) } else if total_peers == 2 { // Two peers with different sizes - ambiguous, fail eyre::bail!( "File size ambiguity for '{}': two peers report different sizes, cannot determine majority", relative_path ); } // If no majority and more than 2 peers, we fall back to plurality (largest group) else { Ok((Some((size, majority_peers.clone())), majority_peers.clone())) } } else { // No clear majority and it's a tie between different sizes if total_peers == 2 { eyre::bail!( "File size ambiguity for '{}': two peers report different sizes, cannot determine majority", relative_path ); } // For more than 2 peers, we could fall back to plurality, but for now let's be strict eyre::bail!( "File size ambiguity for '{}': no clear majority among {} peers", relative_path, total_peers ); } } /// Creates a validated file description from consensus data fn create_validated_file_description( &self, game_id: &str, relative_path: &str, size: u64, peers: &[SocketAddr], ) -> Option { if let Some(first_peer) = peers.first() && let Some(files) = self .peers .get(first_peer) .and_then(|p| p.files.get(game_id)) && let Some(file_desc) = files .iter() .find(|f| f.relative_path == relative_path && f.size == size) { return Some(file_desc.clone()); } None } #[must_use] pub fn get_stale_peers(&self, timeout: Duration) -> Vec { self.peers .iter() .filter(|(_, peer)| peer.last_seen.elapsed() > timeout) .map(|(addr, _)| *addr) .collect() } } /// Type alias for file size mapping: path -> size -> peers type FileSizeMap = HashMap>>; /// Type alias for peer file mapping: peer -> path -> size type PeerFileMap = HashMap>; /// Type alias for consensus result: (size, peers) or None type ConsensusResult = Option<(u64, Vec)>; /// Collects file sizes from all peers and organizes them by path and size fn collect_file_sizes( game_files: &[(SocketAddr, Vec)], ) -> (FileSizeMap, PeerFileMap) { let mut file_size_map: FileSizeMap = HashMap::new(); let mut peer_files: PeerFileMap = HashMap::new(); for (peer_addr, files) in game_files { let mut peer_file_sizes = HashMap::new(); for file in files { if !file.is_dir { let size = file.size; file_size_map .entry(file.relative_path.clone()) .or_default() .entry(size) .or_default() .push(*peer_addr); peer_file_sizes.insert(file.relative_path.clone(), size); } } peer_files.insert(*peer_addr, peer_file_sizes); } (file_size_map, peer_files) } /// Finds the majority size from a map of sizes to peer lists fn find_majority_size(size_map: &HashMap>) -> (Option, usize) { let mut majority_size = None; let mut majority_count = 0; for (&size, peers) in size_map { let count = peers.len(); if count > majority_count { majority_count = count; majority_size = Some(size); } else if count == majority_count { // Tie between different sizes - ambiguous, fail majority_size = None; break; } } (majority_size, majority_count) } /// Updates peer scores based on consensus participation fn update_peer_scores( peers: &[SocketAddr], peer_whitelist_scores: &mut HashMap, ) { for &peer in peers { *peer_whitelist_scores.entry(peer).or_insert(0) += 1; } } /// Creates a peer whitelist from scores, including peers with the highest scores fn create_peer_whitelist(peer_scores: HashMap) -> Vec { if peer_scores.is_empty() { return Vec::new(); } let max_score = *peer_scores .values() .max() .expect("peer_scores should not be empty here"); let threshold = max_score.max(1); // At least 1 file, or match the highest score peer_scores .into_iter() .filter_map(|(peer, score)| if score >= threshold { Some(peer) } else { None }) .collect() } #[derive(Debug)] pub enum PeerCommand { ListGames, GetGame(String), DownloadGameFiles { id: String, file_descriptions: Vec, }, SetGameDir(String), } async fn initial_peer_alive_check(conn: &mut Connection) -> bool { let stream = match conn.open_bidirectional_stream().await { Ok(stream) => stream, Err(e) => { log::error!("failed to open stream: {e}"); return false; } }; let (mut rx, mut tx) = stream.split(); // send ping if let Err(e) = tx.send(Request::Ping.encode()).await { log::error!("failed to send ping to peer: {e}"); return false; } let _ = tx.close().await; // receive pong if let Ok(Some(response)) = rx.receive().await { let response = Response::decode(response); match response { Response::Pong => { log::info!("peer is alive"); return true; } _ => { log::error!("peer sent invalid response to ping: {response:?}"); } } } false } const CHUNK_SIZE: u64 = 512 * 1024; const MAX_RETRY_COUNT: usize = 3; #[derive(Debug, Clone)] struct DownloadChunk { relative_path: String, offset: u64, length: u64, retry_count: usize, last_peer: Option, } #[derive(Debug, Default)] struct PeerDownloadPlan { chunks: Vec, whole_files: Vec, } #[derive(Debug)] struct ChunkDownloadResult { chunk: DownloadChunk, result: eyre::Result<()>, peer_addr: SocketAddr, } async fn prepare_game_storage( games_folder: &Path, file_descs: &[GameFileDescription], ) -> eyre::Result<()> { for desc in file_descs { // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(games_folder, &desc.relative_path)?; if desc.is_dir { tokio::fs::create_dir_all(&validated_path).await?; } else { if let Some(parent) = validated_path.parent() { tokio::fs::create_dir_all(parent).await?; } // Create and pre-allocate the file with the expected size let file = OpenOptions::new() .create(true) .truncate(true) .write(true) .open(&validated_path) .await?; // Pre-allocate the file with the expected size let size = desc.size; if let Err(e) = file.set_len(size).await { log::warn!( "Failed to pre-allocate file {} (size: {}): {}", desc.relative_path, size, e ); // Continue without pre-allocation - the file will grow as chunks are written } else { log::debug!( "Pre-allocated file {} with {} bytes", desc.relative_path, size ); } } } Ok(()) } fn build_peer_plans( peers: &[SocketAddr], file_descs: &[GameFileDescription], ) -> HashMap { let mut plans: HashMap = HashMap::new(); if peers.is_empty() { return plans; } let mut peer_index = 0usize; for desc in file_descs.iter().filter(|d| !d.is_dir) { let size = desc.file_size(); if size == 0 { let peer = peers[peer_index % peers.len()]; peer_index += 1; plans.entry(peer).or_default().chunks.push(DownloadChunk { relative_path: desc.relative_path.clone(), offset: 0, length: 0, retry_count: 0, last_peer: Some(peer), }); continue; } let mut offset = 0u64; while offset < size { let length = std::cmp::min(CHUNK_SIZE, size - offset); let peer = peers[peer_index % peers.len()]; peer_index += 1; plans.entry(peer).or_default().chunks.push(DownloadChunk { relative_path: desc.relative_path.clone(), offset, length, retry_count: 0, last_peer: Some(peer), }); offset += length; } } plans } async fn download_chunk( conn: &mut Connection, base_dir: &Path, game_id: &str, chunk: &DownloadChunk, ) -> eyre::Result<()> { let stream = conn.open_bidirectional_stream().await?; let (mut rx, mut tx) = stream.split(); let request = Request::GetGameFileChunk { game_id: game_id.to_string(), relative_path: chunk.relative_path.clone(), offset: chunk.offset, length: chunk.length, }; tx.write_all(&request.encode()).await?; tx.close().await?; // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?; let mut file = OpenOptions::new() .create(true) .write(true) .truncate(false) .open(&validated_path) .await?; if chunk.length == 0 && chunk.offset == 0 { // fallback-to-whole-file path replaces any existing partial data file.set_len(0).await?; } file.seek(std::io::SeekFrom::Start(chunk.offset)).await?; let mut remaining = chunk.length; let mut received_bytes = 0u64; while let Some(bytes) = rx.receive().await? { file.write_all(&bytes).await?; received_bytes += bytes.len() as u64; if remaining == 0 { continue; } remaining = remaining.saturating_sub(bytes.len() as u64); if remaining == 0 { break; } } // Verify we received the expected amount of data if chunk.length > 0 && received_bytes != chunk.length { eyre::bail!( "Incomplete chunk download: expected {} bytes, received {} bytes for file {} at offset {}", chunk.length, received_bytes, chunk.relative_path, chunk.offset ); } file.flush().await?; // Verify file integrity by checking the file size verify_chunk_integrity(&validated_path, chunk.offset, chunk.length).await?; Ok(()) } async fn verify_chunk_integrity( file_path: &Path, offset: u64, expected_length: u64, ) -> eyre::Result<()> { if expected_length == 0 { return Ok(()); // Skip verification for whole files or zero-length chunks } let metadata = tokio::fs::metadata(file_path).await?; let file_size = metadata.len(); if file_size < offset + expected_length { eyre::bail!( "File integrity check failed: file size {} is less than expected {} (offset: {})", file_size, offset + expected_length, offset ); } Ok(()) } async fn download_whole_file( conn: &mut Connection, base_dir: &Path, desc: &GameFileDescription, ) -> eyre::Result<()> { let stream = conn.open_bidirectional_stream().await?; let (mut rx, mut tx) = stream.split(); tx.write_all(&Request::GetGameFileData(desc.clone()).encode()) .await?; tx.close().await?; // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(base_dir, &desc.relative_path)?; let mut file = OpenOptions::new() .create(true) .truncate(true) .write(true) .open(&validated_path) .await?; file.seek(std::io::SeekFrom::Start(0)).await?; while let Some(bytes) = rx.receive().await? { file.write_all(&bytes).await?; } file.flush().await?; Ok(()) } async fn download_from_peer( peer_addr: SocketAddr, game_id: &str, plan: PeerDownloadPlan, games_folder: PathBuf, ) -> eyre::Result> { if plan.chunks.is_empty() && plan.whole_files.is_empty() { return Ok(Vec::new()); } let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; let client = QuicClient::builder() .with_tls(CERT_PEM)? .with_io("0.0.0.0:0")? .with_limits(limits)? .start()?; let conn = Connect::new(peer_addr).with_server_name("localhost"); let mut conn = client.connect(conn).await?; conn.keep_alive(true)?; let base_dir = games_folder; let mut results = Vec::new(); // Download chunks with error handling for chunk in &plan.chunks { let result = download_chunk(&mut conn, &base_dir, game_id, chunk).await; results.push(ChunkDownloadResult { chunk: chunk.clone(), result, peer_addr, }); } // Download whole files for desc in &plan.whole_files { let chunk = DownloadChunk { relative_path: desc.relative_path.clone(), offset: 0, length: 0, // Indicates whole file retry_count: 0, last_peer: Some(peer_addr), }; let result = download_whole_file(&mut conn, &base_dir, desc).await; results.push(ChunkDownloadResult { chunk, result, peer_addr, }); } Ok(results) } async fn download_game_files( game_id: &str, game_file_descs: Vec, games_folder: String, peers: Vec, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { if peers.is_empty() { eyre::bail!("no peers available for game {game_id}"); } let base_dir = PathBuf::from(&games_folder); prepare_game_storage(&base_dir, &game_file_descs).await?; tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { id: game_id.to_string(), })?; let plans = build_peer_plans(&peers, &game_file_descs); let mut tasks = Vec::new(); for (peer_addr, plan) in plans { let base_dir = base_dir.clone(); let game_id = game_id.to_string(); tasks.push(tokio::spawn(async move { download_from_peer(peer_addr, &game_id, plan, base_dir).await })); } let mut failed_chunks: Vec = Vec::new(); let mut last_err: Option = None; for handle in tasks { match handle.await { Ok(Ok(results)) => { for chunk_result in results { if let Err(e) = chunk_result.result { log::warn!( "Failed to download chunk from {}: {e}", chunk_result.peer_addr ); if chunk_result.chunk.retry_count < MAX_RETRY_COUNT { let mut retry_chunk = chunk_result.chunk; retry_chunk.retry_count += 1; retry_chunk.last_peer = Some(chunk_result.peer_addr); failed_chunks.push(retry_chunk); } else { last_err = Some(eyre::eyre!( "Max retries exceeded for chunk: {}", chunk_result.chunk.relative_path )); } } } } Ok(Err(e)) => last_err = Some(e), Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")), } } // Retry failed chunks if any if !failed_chunks.is_empty() && !peers.is_empty() { log::info!("Retrying {} failed chunks", failed_chunks.len()); let retry_results = retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id).await; for chunk_result in retry_results { if let Err(e) = chunk_result.result { log::error!("Retry failed for chunk: {e}"); last_err = Some(e); } } } if let Some(err) = last_err { tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: game_id.to_string(), })?; return Err(err); } log::info!("all files downloaded for game: {game_id}"); tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished { id: game_id.to_string(), })?; Ok(()) } fn select_retry_peer( peers: &[SocketAddr], last_peer: Option, attempt_offset: usize, ) -> Option { if peers.is_empty() { return None; } if peers.len() > 1 && let Some(last) = last_peer && let Some(pos) = peers.iter().position(|addr| *addr == last) { let next_index = (pos + 1 + attempt_offset) % peers.len(); return Some(peers[next_index]); } Some(peers[attempt_offset % peers.len()]) } fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option) -> SocketAddr { last_peer .or_else(|| peers.first().copied()) .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0))) } async fn retry_failed_chunks( failed_chunks: Vec, peers: &[SocketAddr], base_dir: &Path, game_id: &str, ) -> Vec { let mut exhausted = Vec::new(); let mut queue: VecDeque = failed_chunks.into_iter().collect(); while let Some(mut chunk) = queue.pop_front() { if chunk.retry_count >= MAX_RETRY_COUNT { exhausted.push(ChunkDownloadResult { chunk: chunk.clone(), result: Err(eyre::eyre!( "Retry budget exhausted for chunk: {}", chunk.relative_path )), peer_addr: fallback_peer_addr(peers, chunk.last_peer), }); continue; } let retry_offset = chunk.retry_count.saturating_sub(1); let Some(peer_addr) = select_retry_peer(peers, chunk.last_peer, retry_offset) else { exhausted.push(ChunkDownloadResult { chunk: chunk.clone(), result: Err(eyre::eyre!( "No peers available to retry chunk: {}", chunk.relative_path )), peer_addr: fallback_peer_addr(peers, chunk.last_peer), }); continue; }; let mut attempt_chunk = chunk.clone(); attempt_chunk.last_peer = Some(peer_addr); let plan = PeerDownloadPlan { chunks: vec![attempt_chunk.clone()], whole_files: Vec::new(), }; match download_from_peer(peer_addr, game_id, plan, base_dir.to_path_buf()).await { Ok(results) => { for result in results { match result.result { Ok(()) => {} Err(e) => { let mut retry_chunk = result.chunk.clone(); retry_chunk.retry_count = chunk.retry_count + 1; retry_chunk.last_peer = Some(result.peer_addr); if retry_chunk.retry_count >= MAX_RETRY_COUNT { let context = format!( "Retry budget exhausted for chunk: {}", result.chunk.relative_path ); exhausted.push(ChunkDownloadResult { chunk: retry_chunk, result: Err(e.wrap_err(context)), peer_addr: result.peer_addr, }); } else { queue.push_back(retry_chunk); } } } } } Err(e) => { chunk.retry_count += 1; chunk.last_peer = Some(peer_addr); if chunk.retry_count >= MAX_RETRY_COUNT { exhausted.push(ChunkDownloadResult { chunk: chunk.clone(), result: Err(e.wrap_err(format!( "Retry budget exhausted for chunk after connection failure: {}", chunk.relative_path ))), peer_addr: fallback_peer_addr(peers, chunk.last_peer), }); } else { queue.push_back(chunk); } } } } exhausted } /// Load local game database combining locally installed games async fn load_local_game_db(game_dir: &str) -> eyre::Result { let game_path = PathBuf::from(game_dir); let mut games = Vec::new(); // Scan game directory and create entries for installed games let mut entries = tokio::fs::read_dir(&game_path).await?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); if path.is_dir() && let Some(game_id) = path.file_name().and_then(|n| n.to_str()) { // Check if this game has a version.ini file if let Ok(version) = lanspread_db::db::read_version_from_ini(&path) { let size = calculate_directory_size(&path).await?; let game = Game { id: game_id.to_string(), name: game_id.to_string(), description: String::new(), release_year: String::new(), publisher: String::new(), max_players: 1, version: "1.0".to_string(), genre: String::new(), size, thumbnail: None, installed: true, eti_game_version: version.clone(), local_version: version, peer_count: 0, // Local games start with 0 peers }; games.push(game); } } } Ok(GameDB::from(games)) } async fn calculate_directory_size(dir: &Path) -> eyre::Result { let mut total_size = 0u64; let mut entries = tokio::fs::read_dir(dir).await?; while let Some(entry) = entries.next_entry().await? { let path = entry.path(); let metadata = tokio::fs::metadata(&path).await?; if metadata.is_dir() { total_size += Box::pin(calculate_directory_size(&path)).await?; } else { total_size += metadata.len(); } } Ok(total_size) } struct Ctx { game_dir: Arc>>, local_game_db: Arc>>, peer_game_db: Arc>, } #[derive(Clone)] struct PeerCtx { game_dir: Arc>>, local_game_db: Arc>>, } impl std::fmt::Debug for PeerCtx { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PeerCtx") .field("game_dir", &"...") .field("local_game_db", &"...") .finish() } } /// Main peer execution loop that handles peer commands and manages the peer system. /// /// # Panics /// /// This function will panic if the games folder is None after being checked for None. /// The panic occurs at line 908 where `games_folder.expect("checked above")` is called. pub async fn run_peer( mut rx_control: UnboundedReceiver, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { // peer context let ctx = Ctx { game_dir: Arc::new(RwLock::new(None)), local_game_db: Arc::new(RwLock::new(None)), peer_game_db: Arc::new(RwLock::new(PeerGameDB::new())), }; let peer_ctx = PeerCtx { game_dir: ctx.game_dir.clone(), local_game_db: ctx.local_game_db.clone(), }; // Start server component let server_addr = "0.0.0.0:0".parse::()?; let tx_notify_ui_clone = tx_notify_ui.clone(); let peer_ctx_clone = peer_ctx.clone(); tokio::spawn(async move { if let Err(e) = run_server_component(server_addr, peer_ctx_clone, tx_notify_ui_clone).await { log::error!("Server component error: {e}"); } }); // Start peer discovery task let tx_notify_ui_discovery = tx_notify_ui.clone(); let peer_game_db_discovery = ctx.peer_game_db.clone(); tokio::spawn(async move { run_peer_discovery(tx_notify_ui_discovery, peer_game_db_discovery).await; }); // Start ping service task let tx_notify_ui_ping = tx_notify_ui.clone(); let peer_game_db_ping = ctx.peer_game_db.clone(); tokio::spawn(async move { run_ping_service(tx_notify_ui_ping, peer_game_db_ping).await; }); // Handle client commands loop { let Some(cmd) = rx_control.recv().await else { break; }; match cmd { PeerCommand::ListGames => { handle_list_games_command(&ctx, &tx_notify_ui).await; } PeerCommand::GetGame(id) => { handle_get_game_command(&ctx, &tx_notify_ui, id).await; } PeerCommand::DownloadGameFiles { id, file_descriptions, } => { handle_download_game_files_command(&ctx, &tx_notify_ui, id, file_descriptions) .await; } PeerCommand::SetGameDir(game_dir) => { handle_set_game_dir_command(&ctx, game_dir).await; } } } Ok(()) } async fn run_server_component( addr: SocketAddr, ctx: PeerCtx, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { let limits = Limits::default() .with_max_handshake_duration(Duration::from_secs(3))? .with_max_idle_timeout(Duration::from_secs(3))?; let mut server = Server::builder() .with_tls((CERT_PEM, KEY_PEM))? .with_io(addr)? .with_limits(limits)? .start()?; let server_addr = server.local_addr()?; log::info!("Peer server listening on {server_addr}"); let advertise_ip = select_advertise_ip()?; let advertise_addr = SocketAddr::new(advertise_ip, server_addr.port()); log::info!("Advertising peer via mDNS from {advertise_addr}"); // Start mDNS advertising for peer discovery let peer_id = Uuid::now_v7().simple().to_string(); let hostname = gethostname::gethostname(); let hostname_str = hostname.to_str().unwrap_or(""); // Calculate maximum hostname length that fits with UUID in 63 char limit let max_hostname_len = 63usize.saturating_sub(peer_id.len() + 1); let truncated_hostname = if hostname_str.len() > max_hostname_len { hostname_str.get(..max_hostname_len).unwrap_or(hostname_str) } else { hostname_str }; let combined_str = if truncated_hostname.is_empty() { peer_id } else { format!("{truncated_hostname}-{peer_id}") }; let mdns = tokio::task::spawn_blocking(move || { MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, advertise_addr) }) .await??; // Monitor mDNS events let _tx_notify_ui_mdns = tx_notify_ui.clone(); let hostname = truncated_hostname.to_string(); tokio::spawn(async move { log::info!("Registering mDNS service with hostname: {hostname}"); while let Ok(event) = mdns.monitor.recv() { match event { lanspread_mdns::DaemonEvent::Error(e) => { log::error!("mDNS error: {e}"); tokio::time::sleep(Duration::from_secs(1)).await; } _ => { log::trace!("mDNS event: {event:?}"); } } } }); while let Some(connection) = server.accept().await { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); tokio::spawn(async move { if let Err(e) = handle_peer_connection(connection, ctx, tx_notify_ui).await { log::error!("Peer connection error: {e}"); } }); } Ok(()) } async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { log::info!("ListGames command received"); let all_games = { ctx.peer_game_db.read().await.get_all_games() }; if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(all_games)) { log::error!("Failed to send ListGames event: {e}"); } } async fn handle_get_game_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { log::info!("Requesting game from peers: {id}"); let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) }; if peers.is_empty() { log::warn!("No peers have game {id}"); if let Err(e) = tx_notify_ui.send(PeerEvent::NoPeersHaveGame { id: id.clone() }) { log::error!("Failed to send NoPeersHaveGame event: {e}"); } return; } let peer_game_db = ctx.peer_game_db.clone(); let tx_notify_ui = tx_notify_ui.clone(); tokio::spawn(async move { let mut fetched_any = false; for peer_addr in peers { match request_game_details_from_peer(peer_addr, &id, peer_game_db.clone()).await { Ok(_) => { log::info!("Fetched game file list for {id} from peer {peer_addr}"); fetched_any = true; } Err(e) => { log::error!("Failed to fetch game files for {id} from {peer_addr}: {e}"); } } } if fetched_any { let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) }; if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { id: id.clone(), file_descriptions: aggregated_files, }) { log::error!("Failed to send GotGameFiles event: {e}"); } } else { log::warn!("Failed to retrieve game files for {id} from any peer"); } }); } async fn handle_download_game_files_command( ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String, file_descriptions: Vec, ) { log::info!("Got PeerCommand::DownloadGameFiles"); let games_folder = { ctx.game_dir.read().await.clone() }; if games_folder.is_none() { log::error!("Cannot handle game file descriptions: games_folder is not set"); return; } let games_folder = games_folder.expect("checked above"); // Use majority validation to get trusted file descriptions and peer whitelist let (validated_descriptions, peer_whitelist) = { match ctx .peer_game_db .read() .await .validate_file_sizes_majority(&id) { Ok((files, peers)) => { log::info!( "Majority validation: {} validated files, {} trusted peers for game {id}", files.len(), peers.len() ); (files, peers) } Err(e) => { log::error!("File size majority validation failed for {id}: {e}"); if let Err(send_err) = tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() }) { log::error!("Failed to send DownloadGameFilesFailed event: {send_err}"); } return; } } }; let resolved_descriptions = if file_descriptions.is_empty() { validated_descriptions } else { // If user provided specific descriptions, still validate them against majority // but keep user's selection (they might want specific files) file_descriptions }; if resolved_descriptions.is_empty() { log::error!( "No validated file descriptions available to download game {id}; request metadata first" ); return; } if peer_whitelist.is_empty() { log::error!("No trusted peers available after majority validation for game {id}"); return; } let tx_notify_ui = tx_notify_ui.clone(); tokio::spawn(async move { match download_game_files( &id, resolved_descriptions, games_folder, peer_whitelist, tx_notify_ui.clone(), ) .await { Ok(()) => {} Err(e) => { log::error!("Download failed for {id}: {e}"); if let Err(send_err) = tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() }) { log::error!("Failed to send DownloadGameFilesFailed event: {send_err}"); } } } }); } async fn handle_set_game_dir_command(ctx: &Ctx, game_dir: String) { *ctx.game_dir.write().await = Some(game_dir.clone()); log::info!("Game directory set to: {game_dir}"); // Load local game database when game directory is set let game_dir = game_dir.clone(); let local_game_db = ctx.local_game_db.clone(); tokio::spawn(async move { match load_local_game_db(&game_dir).await { Ok(db) => { *local_game_db.write().await = Some(db); log::info!("Local game database loaded successfully"); } Err(e) => { log::error!("Failed to load local game database: {e}"); } } }); } async fn handle_peer_connection( mut connection: Connection, ctx: PeerCtx, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { let remote_addr = connection.remote_addr()?; log::info!("{remote_addr} peer connected"); if let Err(e) = tx_notify_ui.send(PeerEvent::PeerConnected(remote_addr)) { log::error!("Failed to send PeerConnected event: {e}"); } // handle streams while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { let ctx = ctx.clone(); let remote_addr = Some(remote_addr); tokio::spawn(async move { if let Err(e) = handle_peer_stream(stream, ctx, remote_addr).await { log::error!("{remote_addr:?} peer stream error: {e}"); } }); } if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDisconnected(remote_addr)) { log::error!("Failed to send PeerDisconnected event: {e}"); } Ok(()) } #[allow(clippy::too_many_lines)] async fn handle_peer_stream( stream: BidirectionalStream, ctx: PeerCtx, remote_addr: Option, ) -> eyre::Result<()> { let (mut rx, mut tx) = stream.split(); log::trace!("{remote_addr:?} peer stream opened"); // handle streams loop { match rx.receive().await { Ok(Some(data)) => { log::trace!( "{:?} msg: (raw): {}", remote_addr, String::from_utf8_lossy(&data) ); let request = Request::decode(data); log::debug!("{remote_addr:?} msg: {request:?}"); match request { Request::Ping => { // Respond with pong if let Err(e) = tx.send(Response::Pong.encode()).await { log::error!("Failed to send pong: {e}"); } } Request::ListGames => { // Return list of games from this peer log::info!("Received ListGames request from peer"); let games = if let Some(ref db) = *ctx.local_game_db.read().await { db.all_games().into_iter().cloned().collect() } else { Vec::new() }; if let Err(e) = tx.send(Response::ListGames(games).encode()).await { log::error!("Failed to send ListGames response: {e}"); } } Request::GetGame { id } => { log::info!("Received GetGame request for {id} from peer"); let response = if let Some(ref game_dir) = *ctx.game_dir.read().await { if let Some(ref db) = *ctx.local_game_db.read().await { if db.get_game_by_id(&id).is_some() { match get_game_file_descriptions(&id, game_dir).await { Ok(file_descriptions) => Response::GetGame { id, file_descriptions, }, Err(PeerError::FileSizeDetermination { path, source }) => { let error_msg = format!( "Failed to determine file size for {path}: {source}" ); log::error!( "File size determination error for game {id}: {error_msg}" ); Response::InternalPeerError(error_msg) } Err(e) => { log::error!( "Failed to get game file descriptions for {id}: {e}" ); Response::GameNotFound(id) } } } else { Response::GameNotFound(id) } } else { Response::GameNotFound(id) } } else { Response::GameNotFound(id) }; if let Err(e) = tx.send(response.encode()).await { log::error!("Failed to send GetGame response: {e}"); } } Request::GetGameFileData(desc) => { log::info!( "Received GetGameFileData request for {} from peer", desc.relative_path ); let maybe_game_dir = ctx.game_dir.read().await.clone(); if let Some(game_dir) = maybe_game_dir { let base_dir = PathBuf::from(game_dir); send_game_file_data(&desc, &mut tx, &base_dir).await; } else if let Err(e) = tx .send( Response::InvalidRequest( desc.relative_path.as_bytes().to_vec().into(), "Game directory not set".to_string(), ) .encode(), ) .await { log::error!("Failed to send GetGameFileData error: {e}"); } } Request::GetGameFileChunk { game_id, relative_path, offset, length, } => { log::info!( "Received GetGameFileChunk request for {relative_path} from peer" ); let maybe_game_dir = ctx.game_dir.read().await.clone(); if let Some(game_dir) = maybe_game_dir { let base_dir = PathBuf::from(game_dir); send_game_file_chunk( &game_id, &relative_path, offset, length, &mut tx, &base_dir, ) .await; } else if let Err(e) = tx .send( Response::InvalidRequest( relative_path.as_bytes().to_vec().into(), "Game directory not set".to_string(), ) .encode(), ) .await { log::error!("Failed to send GetGameFileChunk error: {e}"); } } Request::Invalid(_, _) => { log::error!("Received invalid request from peer"); } } } Ok(None) => { log::trace!("{remote_addr:?} peer stream closed"); break; } Err(e) => { log::error!("{remote_addr:?} peer stream error: {e}"); break; } } } Ok(()) } async fn run_peer_discovery( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, ) { log::info!("Starting peer discovery task"); loop { let discovery_result = discover_service(LANSPREAD_SERVICE_TYPE); match discovery_result { Ok(peer_addr) => { log::info!("Discovered peer at: {peer_addr}"); // Add peer to database let is_new_peer = { let mut db = peer_game_db.write().await; let peer_addresses = db.get_peer_addresses(); if peer_addresses.contains(&peer_addr) { false } else { db.add_peer(peer_addr); true } }; if is_new_peer { // Notify UI about new peer if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) { log::error!("Failed to send PeerDiscovered event: {e}"); } // Request games from this peer let tx_notify_ui_clone = tx_notify_ui.clone(); let peer_game_db_clone = peer_game_db.clone(); tokio::spawn(async move { if let Err(e) = request_games_from_peer( peer_addr, tx_notify_ui_clone, peer_game_db_clone, ) .await { log::error!("Failed to request games from peer {peer_addr}: {e}"); } }); } } Err(e) => { log::debug!("Peer discovery error: {e}"); tokio::time::sleep(Duration::from_secs(5)).await; } } // Wait before next discovery cycle tokio::time::sleep(Duration::from_secs(10)).await; } } async fn request_games_from_peer( peer_addr: SocketAddr, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, ) -> eyre::Result<()> { let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; let client = QuicClient::builder() .with_tls(CERT_PEM)? .with_io("0.0.0.0:0")? .with_limits(limits)? .start()?; let conn = Connect::new(peer_addr).with_server_name("localhost"); let mut conn = client.connect(conn).await?; let stream = conn.open_bidirectional_stream().await?; let (mut rx, mut tx) = stream.split(); // Send ListGames request tx.send(Request::ListGames.encode()).await?; let _ = tx.close().await; // Receive response let mut data = BytesMut::new(); while let Ok(Some(bytes)) = rx.receive().await { data.extend_from_slice(&bytes); } let response = Response::decode(data.freeze()); match response { Response::ListGames(games) => { log::info!("Received {} games from peer {peer_addr}", games.len()); let aggregated_games = { let mut db = peer_game_db.write().await; db.update_peer_games(peer_addr, games); db.get_all_games() }; if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) { log::error!("Failed to send ListGames event: {e}"); } } _ => { log::warn!("Unexpected response from peer {peer_addr}: {response:?}"); } } Ok(()) } async fn request_game_details_from_peer( peer_addr: SocketAddr, game_id: &str, peer_game_db: Arc>, ) -> eyre::Result> { let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; let client = QuicClient::builder() .with_tls(CERT_PEM)? .with_io("0.0.0.0:0")? .with_limits(limits)? .start()?; let conn = Connect::new(peer_addr).with_server_name("localhost"); let mut conn = client.connect(conn).await?; let stream = conn.open_bidirectional_stream().await?; let (mut rx, mut tx) = stream.split(); tx.send( Request::GetGame { id: game_id.to_string(), } .encode(), ) .await?; tx.close().await?; let mut data = BytesMut::new(); while let Ok(Some(bytes)) = rx.receive().await { data.extend_from_slice(&bytes); } let response = Response::decode(data.freeze()); match response { Response::GetGame { id, file_descriptions, } => { if id != game_id { eyre::bail!("peer {peer_addr} responded with mismatched game id {id}"); } { let mut db = peer_game_db.write().await; db.update_peer_game_files(peer_addr, game_id, file_descriptions.clone()); } Ok(file_descriptions) } Response::GameNotFound(_) => { eyre::bail!("peer {peer_addr} does not have game {game_id}") } Response::InternalPeerError(error_msg) => { eyre::bail!("peer {peer_addr} reported internal error: {error_msg}") } _ => eyre::bail!("unexpected response from {peer_addr}: {response:?}"), } } async fn run_ping_service( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, ) { log::info!("Starting ping service (10s interval)"); let mut interval = tokio::time::interval(Duration::from_secs(10)); loop { interval.tick().await; let peer_addresses = { peer_game_db.read().await.get_peer_addresses() }; for peer_addr in peer_addresses { let tx_notify_ui_clone = tx_notify_ui.clone(); let peer_game_db_clone = peer_game_db.clone(); tokio::spawn(async move { match ping_peer(peer_addr).await { Ok(is_alive) => { if is_alive { // Update last seen time peer_game_db_clone .write() .await .update_last_seen(&peer_addr); } else { log::warn!("Peer {peer_addr} failed ping check"); // Remove stale peer let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_addr); if removed_peer.is_some() { log::info!("Removed stale peer: {peer_addr}"); if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr)) { log::error!("Failed to send PeerLost event: {e}"); } } } } Err(e) => { log::error!("Failed to ping peer {peer_addr}: {e}"); // Remove peer on error let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_addr); if removed_peer.is_some() { log::info!("Removed peer due to ping error: {peer_addr}"); if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr)) { log::error!("Failed to send PeerLost event: {e}"); } } } } }); } // Also clean up stale peers let stale_peers = { peer_game_db .read() .await .get_stale_peers(Duration::from_secs(30)) }; for stale_addr in stale_peers { let removed_peer = peer_game_db.write().await.remove_peer(&stale_addr); if removed_peer.is_some() { log::info!("Removed stale peer: {stale_addr}"); if let Err(e) = tx_notify_ui.send(PeerEvent::PeerLost(stale_addr)) { log::error!("Failed to send PeerLost event: {e}"); } } } } } async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result { let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; let client = QuicClient::builder() .with_tls(CERT_PEM)? .with_io("0.0.0.0:0")? .with_limits(limits)? .start()?; let conn = Connect::new(peer_addr).with_server_name("localhost"); let mut conn = client.connect(conn).await?; let is_alive = initial_peer_alive_check(&mut conn).await; Ok(is_alive) } fn select_advertise_ip() -> eyre::Result { let mut best_candidate: Option<(u8, IpAddr)> = None; let mut loopback_fallback = None; for interface in get_if_addrs()? { if interface.is_loopback() { loopback_fallback.get_or_insert(interface.ip()); continue; } if let Some(candidate) = classify_interface(&interface) && best_candidate .as_ref() .is_none_or(|(rank, _)| candidate.0 < *rank) { best_candidate = Some(candidate); } } if let Some((_, ip)) = best_candidate { return Ok(ip); } if let Some(ip) = loopback_fallback { log::warn!( "No non-loopback interface suitable for mDNS advertisement; falling back to {ip}" ); return Ok(ip); } eyre::bail!("No usable network interface found for mDNS advertisement"); } fn classify_interface(interface: &Interface) -> Option<(u8, IpAddr)> { match interface.addr { IfAddr::V4(ref v4) => { let ip = v4.ip; if ip.is_unspecified() || ip.is_link_local() { return None; } let mut rank = if ip.is_private() { 0 } else { 2 }; if is_virtual_interface(&interface.name) { rank += 2; } Some((rank, IpAddr::V4(ip))) } IfAddr::V6(_) => None, } } fn is_virtual_interface(name: &str) -> bool { const VIRTUAL_HINTS: &[&str] = &[ "awdl", "br-", "bridge", "docker", "ham", "llw", "tap", "tailscale", "tun", "utun", "vbox", "veth", "virbr", "vmnet", "wg", "zt", ]; let lower = name.to_ascii_lowercase(); VIRTUAL_HINTS.iter().any(|hint| lower.contains(hint)) } async fn get_game_file_descriptions( game_id: &str, game_dir: &str, ) -> Result, PeerError> { let base_dir = PathBuf::from(game_dir); let game_path = base_dir.join(game_id); if !game_path.exists() { return Err(PeerError::Other(eyre::eyre!( "Game directory does not exist: {}", game_path.display() ))); } let mut file_descriptions = Vec::new(); for entry in walkdir::WalkDir::new(&game_path) .into_iter() .filter_map(std::result::Result::ok) { let relative_path = match entry.path().strip_prefix(&base_dir) { Ok(path) => path.to_string_lossy().to_string(), Err(e) => { log::error!( "Failed to get relative path for {}: {}", entry.path().display(), e ); continue; } }; let is_dir = entry.file_type().is_dir(); let size = if is_dir { 0 } else { match tokio::fs::metadata(entry.path()).await { Ok(metadata) => metadata.len(), Err(e) => { log::error!("Failed to read metadata for {relative_path}: {e}"); return Err(PeerError::FileSizeDetermination { path: relative_path.clone(), source: e, }); } } }; let file_desc = GameFileDescription { game_id: game_id.to_string(), relative_path, is_dir, size, }; file_descriptions.push(file_desc); } Ok(file_descriptions) }