From 1b2b2cf8c086582dc4c0cb69c83877b2221e2b89 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Fri, 14 Nov 2025 00:47:02 +0100 Subject: [PATCH] file transfer: improve / fix --- crates/lanspread-peer/src/lib.rs | 158 ++++++++++++++++++++++++------ crates/lanspread-peer/src/peer.rs | 33 ++++++- 2 files changed, 160 insertions(+), 31 deletions(-) diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index f94bc95..86adebc 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -4,6 +4,7 @@ mod path_validation; mod peer; use std::{ + cmp::Reverse, collections::{HashMap, VecDeque}, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, @@ -333,23 +334,24 @@ impl PeerGameDB { } /// Validates file sizes across all peers and returns only the files with majority consensus - /// Returns a tuple of (`validated_files`, `peer_whitelist`) where `peer_whitelist` contains - /// only peers that have the majority-approved file sizes + /// Returns a tuple of (`validated_files`, `peer_whitelist`, `file_peer_map`) where + /// `peer_whitelist` contains peers that have at least one majority-approved file and + /// `file_peer_map` lists which peers were validated for each file pub fn validate_file_sizes_majority( &self, game_id: &str, - ) -> eyre::Result<(Vec, Vec)> { + ) -> eyre::Result { let game_files = self.game_files_for(game_id); if game_files.is_empty() { - return Ok((Vec::new(), Vec::new())); + return Ok((Vec::new(), Vec::new(), HashMap::new())); } let (file_size_map, _peer_files) = collect_file_sizes(&game_files); - let (validated_files, peer_scores) = + let (validated_files, peer_scores, file_peer_map) = self.validate_each_file_consensus(game_id, file_size_map)?; let peer_whitelist = create_peer_whitelist(peer_scores); - Ok((validated_files, peer_whitelist)) + Ok((validated_files, peer_whitelist, file_peer_map)) } /// Validates consensus for each file and returns validated files with peer scores @@ -357,9 +359,10 @@ impl PeerGameDB { &self, game_id: &str, file_size_map: FileSizeMap, - ) -> eyre::Result<(Vec, HashMap)> { + ) -> eyre::Result { let mut validated_files = Vec::new(); let mut peer_whitelist_scores: HashMap = HashMap::new(); + let mut file_peer_map: HashMap> = HashMap::new(); for (relative_path, size_map) in file_size_map { let total_peers: usize = size_map.values().map(Vec::len).sum(); @@ -376,11 +379,12 @@ impl PeerGameDB { && let Some(file_desc) = self.create_validated_file_description(game_id, &relative_path, size, &peers) { + file_peer_map.insert(relative_path.clone(), peers.clone()); validated_files.push(file_desc); } } - Ok((validated_files, peer_whitelist_scores)) + Ok((validated_files, peer_whitelist_scores, file_peer_map)) } /// Determines the consensus size for a file based on peer reports @@ -482,6 +486,20 @@ type PeerFileMap = HashMap>; /// Type alias for consensus result: (size, peers) or None type ConsensusResult = Option<(u64, Vec)>; +/// Type alias for the aggregated majority validation result +type MajorityValidationResult = ( + Vec, + Vec, + HashMap>, +); + +/// Type alias for per-file consensus aggregation results +type FileConsensusAggregation = ( + Vec, + HashMap, + HashMap>, +); + /// Collects file sizes from all peers and organizes them by path and size fn collect_file_sizes( game_files: &[(SocketAddr, Vec)], @@ -545,16 +563,14 @@ fn create_peer_whitelist(peer_scores: HashMap) -> Vec = peer_scores .into_iter() - .filter_map(|(peer, score)| if score >= threshold { Some(peer) } else { None }) - .collect() + .filter_map(|(peer, score)| (score > 0).then_some((peer, score))) + .collect(); + + peers.sort_by_key(|(peer, score)| (Reverse(*score), *peer)); + + peers.into_iter().map(|(peer, _)| peer).collect() } #[derive(Debug)] @@ -676,9 +692,24 @@ async fn prepare_game_storage( Ok(()) } +fn resolve_file_peers<'a>( + relative_path: &str, + file_peer_map: &'a HashMap>, + fallback: &'a [SocketAddr], +) -> &'a [SocketAddr] { + if let Some(peers) = file_peer_map.get(relative_path) + && !peers.is_empty() + { + return peers; + } + + fallback +} + fn build_peer_plans( peers: &[SocketAddr], file_descs: &[GameFileDescription], + file_peer_map: &HashMap>, ) -> HashMap { let mut plans: HashMap = HashMap::new(); if peers.is_empty() { @@ -689,8 +720,13 @@ fn build_peer_plans( for desc in file_descs.iter().filter(|d| !d.is_dir) { let size = desc.file_size(); + let eligible_peers = resolve_file_peers(&desc.relative_path, file_peer_map, peers); + if eligible_peers.is_empty() { + continue; + } + if size == 0 { - let peer = peers[peer_index % peers.len()]; + let peer = eligible_peers[peer_index % eligible_peers.len()]; peer_index += 1; plans.entry(peer).or_default().chunks.push(DownloadChunk { relative_path: desc.relative_path.clone(), @@ -705,7 +741,7 @@ fn build_peer_plans( let mut offset = 0u64; while offset < size { let length = std::cmp::min(CHUNK_SIZE, size - offset); - let peer = peers[peer_index % peers.len()]; + let peer = eligible_peers[peer_index % eligible_peers.len()]; peer_index += 1; plans.entry(peer).or_default().chunks.push(DownloadChunk { relative_path: desc.relative_path.clone(), @@ -904,6 +940,7 @@ async fn download_game_files( game_file_descs: Vec, games_folder: String, peers: Vec, + file_peer_map: HashMap>, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { if peers.is_empty() { @@ -917,7 +954,7 @@ async fn download_game_files( id: game_id.to_string(), })?; - let plans = build_peer_plans(&peers, &game_file_descs); + let plans = build_peer_plans(&peers, &game_file_descs, &file_peer_map); let mut tasks = Vec::new(); for (peer_addr, plan) in plans { @@ -963,7 +1000,8 @@ async fn download_game_files( if !failed_chunks.is_empty() && !peers.is_empty() { log::info!("Retrying {} failed chunks", failed_chunks.len()); - let retry_results = retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id).await; + let retry_results = + retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id, &file_peer_map).await; for chunk_result in retry_results { if let Err(e) = chunk_result.result { @@ -1018,11 +1056,14 @@ async fn retry_failed_chunks( peers: &[SocketAddr], base_dir: &Path, game_id: &str, + file_peer_map: &HashMap>, ) -> Vec { let mut exhausted = Vec::new(); let mut queue: VecDeque = failed_chunks.into_iter().collect(); while let Some(mut chunk) = queue.pop_front() { + let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers); + if chunk.retry_count >= MAX_RETRY_COUNT { exhausted.push(ChunkDownloadResult { chunk: chunk.clone(), @@ -1030,20 +1071,21 @@ async fn retry_failed_chunks( "Retry budget exhausted for chunk: {}", chunk.relative_path )), - peer_addr: fallback_peer_addr(peers, chunk.last_peer), + peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), }); continue; } let retry_offset = chunk.retry_count.saturating_sub(1); - let Some(peer_addr) = select_retry_peer(peers, chunk.last_peer, retry_offset) else { + let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer, retry_offset) + else { exhausted.push(ChunkDownloadResult { chunk: chunk.clone(), result: Err(eyre::eyre!( "No peers available to retry chunk: {}", chunk.relative_path )), - peer_addr: fallback_peer_addr(peers, chunk.last_peer), + peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), }); continue; }; @@ -1094,7 +1136,7 @@ async fn retry_failed_chunks( "Retry budget exhausted for chunk after connection failure: {}", chunk.relative_path ))), - peer_addr: fallback_peer_addr(peers, chunk.last_peer), + peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), }); } else { queue.push_back(chunk); @@ -1511,20 +1553,20 @@ async fn handle_download_game_files_command( let games_folder = games_folder.expect("checked above"); // Use majority validation to get trusted file descriptions and peer whitelist - let (validated_descriptions, peer_whitelist) = { + let (validated_descriptions, peer_whitelist, file_peer_map) = { match ctx .peer_game_db .read() .await .validate_file_sizes_majority(&id) { - Ok((files, peers)) => { + Ok((files, peers, file_peer_map)) => { log::info!( "Majority validation: {} validated files, {} trusted peers for game {id}", files.len(), peers.len() ); - (files, peers) + (files, peers, file_peer_map) } Err(e) => { log::error!("File size majority validation failed for {id}: {e}"); @@ -1578,6 +1620,7 @@ async fn handle_download_game_files_command( resolved_descriptions, games_folder, peer_whitelist, + file_peer_map, tx_notify_ui.clone(), ) .await @@ -2293,6 +2336,8 @@ mod tests { fn build_peer_plans_handles_partial_final_chunk() { let peers = vec![loopback_addr(12000), loopback_addr(12001)]; let file_size = CHUNK_SIZE * 2 + CHUNK_SIZE / 4; + let mut file_peer_map = HashMap::new(); + file_peer_map.insert("game/file.dat".to_string(), peers.clone()); let file_descs = vec![GameFileDescription { game_id: "test".to_string(), relative_path: "game/file.dat".to_string(), @@ -2300,7 +2345,7 @@ mod tests { size: file_size, }]; - let plans = build_peer_plans(&peers, &file_descs); + let plans = build_peer_plans(&peers, &file_descs, &file_peer_map); let mut chunks: Vec<_> = plans.values().flat_map(|plan| plan.chunks.iter()).collect(); assert_eq!(chunks.len(), 3, "expected three chunks for 2.25 blocks"); @@ -2317,4 +2362,59 @@ mod tests { "last chunk should finish the file" ); } + + #[test] + fn build_peer_plans_respects_file_peer_map() { + let shared_a = loopback_addr(12010); + let shared_b = loopback_addr(12011); + let exclusive = loopback_addr(12012); + let peers = vec![shared_a, shared_b, exclusive]; + + let mut file_peer_map = HashMap::new(); + file_peer_map.insert("shared.bin".to_string(), vec![shared_a, shared_b]); + file_peer_map.insert("exclusive.bin".to_string(), vec![exclusive]); + + let file_descs = vec![ + GameFileDescription { + game_id: "test".to_string(), + relative_path: "shared.bin".to_string(), + is_dir: false, + size: CHUNK_SIZE * 2, + }, + GameFileDescription { + game_id: "test".to_string(), + relative_path: "exclusive.bin".to_string(), + is_dir: false, + size: CHUNK_SIZE, + }, + ]; + + let plans = build_peer_plans(&peers, &file_descs, &file_peer_map); + let exclusive_plan = plans + .get(&exclusive) + .expect("exclusive peer should have a plan"); + assert!( + exclusive_plan + .chunks + .iter() + .all(|chunk| chunk.relative_path == "exclusive.bin"), + "exclusive peer should only receive exclusive.bin chunks" + ); + + for (peer, plan) in plans { + for chunk in plan.chunks { + match chunk.relative_path.as_str() { + "exclusive.bin" => assert_eq!( + peer, exclusive, + "exclusive.bin chunks should only be assigned to the exclusive peer" + ), + "shared.bin" => assert!( + peer == shared_a || peer == shared_b, + "shared.bin chunks must stay within shared peers" + ), + other => panic!("unexpected file in plan: {other}"), + } + } + } + } } diff --git a/crates/lanspread-peer/src/peer.rs b/crates/lanspread-peer/src/peer.rs index fd0e235..b1d46a8 100644 --- a/crates/lanspread-peer/src/peer.rs +++ b/crates/lanspread-peer/src/peer.rs @@ -3,7 +3,10 @@ use std::{convert::TryInto, path::Path}; use bytes::Bytes; use lanspread_db::db::GameFileDescription; use lanspread_utils::maybe_addr; -use s2n_quic::stream::SendStream; +use s2n_quic::{ + connection, + stream::{Error as StreamError, SendStream}, +}; use tokio::{ io::{AsyncReadExt, AsyncSeekExt}, time::Instant, @@ -33,6 +36,8 @@ async fn stream_file_bytes( } let mut remaining = length.unwrap_or(u64::MAX); + let expect_exact = length.is_some(); + let mut transfer_complete = matches!(length, Some(0)); let mut total_bytes = 0u64; let mut last_total_bytes = 0u64; let mut timestamp = Instant::now(); @@ -47,6 +52,9 @@ async fn stream_file_bytes( let bytes_read = file.read(&mut buf[..read_len]).await?; if bytes_read == 0 { + if !expect_exact { + transfer_complete = true; + } break; } @@ -54,6 +62,11 @@ async fn stream_file_bytes( remaining = remaining.saturating_sub(bytes_read as u64); total_bytes += bytes_read as u64; + if expect_exact && remaining == 0 { + transfer_complete = true; + break; + } + if last_total_bytes + 10_000_000 < total_bytes { let elapsed = timestamp.elapsed(); let diff_bytes = total_bytes - last_total_bytes; @@ -76,10 +89,26 @@ async fn stream_file_bytes( validated_path.display() ); - tx.close().await?; + match tx.close().await { + Ok(()) => {} + Err(err) if transfer_complete && is_clean_remote_close(&err) => { + log::debug!("{remote_addr} closed stream after transfer completion: {err}"); + } + Err(err) => return Err(err.into()), + } Ok(()) } +fn is_clean_remote_close(err: &StreamError) -> bool { + matches!( + err, + StreamError::ConnectionError { + error: connection::Error::Closed { .. }, + .. + } + ) +} + pub async fn send_game_file_data( game_file_desc: &GameFileDescription, tx: &mut SendStream,