file transfer: improve / fix

This commit is contained in:
2025-11-14 00:47:02 +01:00
parent 4e9707dd51
commit 1b2b2cf8c0
2 changed files with 160 additions and 31 deletions
+129 -29
View File
@@ -4,6 +4,7 @@ mod path_validation;
mod peer; mod peer;
use std::{ use std::{
cmp::Reverse,
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
path::{Path, PathBuf}, path::{Path, PathBuf},
@@ -333,23 +334,24 @@ impl PeerGameDB {
} }
/// Validates file sizes across all peers and returns only the files with majority consensus /// Validates file sizes across all peers and returns only the files with majority consensus
/// Returns a tuple of (`validated_files`, `peer_whitelist`) where `peer_whitelist` contains /// Returns a tuple of (`validated_files`, `peer_whitelist`, `file_peer_map`) where
/// only peers that have the majority-approved file sizes /// `peer_whitelist` contains peers that have at least one majority-approved file and
/// `file_peer_map` lists which peers were validated for each file
pub fn validate_file_sizes_majority( pub fn validate_file_sizes_majority(
&self, &self,
game_id: &str, game_id: &str,
) -> eyre::Result<(Vec<GameFileDescription>, Vec<SocketAddr>)> { ) -> eyre::Result<MajorityValidationResult> {
let game_files = self.game_files_for(game_id); let game_files = self.game_files_for(game_id);
if game_files.is_empty() { if game_files.is_empty() {
return Ok((Vec::new(), Vec::new())); return Ok((Vec::new(), Vec::new(), HashMap::new()));
} }
let (file_size_map, _peer_files) = collect_file_sizes(&game_files); let (file_size_map, _peer_files) = collect_file_sizes(&game_files);
let (validated_files, peer_scores) = let (validated_files, peer_scores, file_peer_map) =
self.validate_each_file_consensus(game_id, file_size_map)?; self.validate_each_file_consensus(game_id, file_size_map)?;
let peer_whitelist = create_peer_whitelist(peer_scores); let peer_whitelist = create_peer_whitelist(peer_scores);
Ok((validated_files, peer_whitelist)) Ok((validated_files, peer_whitelist, file_peer_map))
} }
/// Validates consensus for each file and returns validated files with peer scores /// Validates consensus for each file and returns validated files with peer scores
@@ -357,9 +359,10 @@ impl PeerGameDB {
&self, &self,
game_id: &str, game_id: &str,
file_size_map: FileSizeMap, file_size_map: FileSizeMap,
) -> eyre::Result<(Vec<GameFileDescription>, HashMap<SocketAddr, usize>)> { ) -> eyre::Result<FileConsensusAggregation> {
let mut validated_files = Vec::new(); let mut validated_files = Vec::new();
let mut peer_whitelist_scores: HashMap<SocketAddr, usize> = HashMap::new(); let mut peer_whitelist_scores: HashMap<SocketAddr, usize> = HashMap::new();
let mut file_peer_map: HashMap<String, Vec<SocketAddr>> = HashMap::new();
for (relative_path, size_map) in file_size_map { for (relative_path, size_map) in file_size_map {
let total_peers: usize = size_map.values().map(Vec::len).sum(); let total_peers: usize = size_map.values().map(Vec::len).sum();
@@ -376,11 +379,12 @@ impl PeerGameDB {
&& let Some(file_desc) = && let Some(file_desc) =
self.create_validated_file_description(game_id, &relative_path, size, &peers) self.create_validated_file_description(game_id, &relative_path, size, &peers)
{ {
file_peer_map.insert(relative_path.clone(), peers.clone());
validated_files.push(file_desc); validated_files.push(file_desc);
} }
} }
Ok((validated_files, peer_whitelist_scores)) Ok((validated_files, peer_whitelist_scores, file_peer_map))
} }
/// Determines the consensus size for a file based on peer reports /// Determines the consensus size for a file based on peer reports
@@ -482,6 +486,20 @@ type PeerFileMap = HashMap<SocketAddr, HashMap<String, u64>>;
/// Type alias for consensus result: (size, peers) or None /// Type alias for consensus result: (size, peers) or None
type ConsensusResult = Option<(u64, Vec<SocketAddr>)>; type ConsensusResult = Option<(u64, Vec<SocketAddr>)>;
/// Type alias for the aggregated majority validation result
type MajorityValidationResult = (
Vec<GameFileDescription>,
Vec<SocketAddr>,
HashMap<String, Vec<SocketAddr>>,
);
/// Type alias for per-file consensus aggregation results
type FileConsensusAggregation = (
Vec<GameFileDescription>,
HashMap<SocketAddr, usize>,
HashMap<String, Vec<SocketAddr>>,
);
/// Collects file sizes from all peers and organizes them by path and size /// Collects file sizes from all peers and organizes them by path and size
fn collect_file_sizes( fn collect_file_sizes(
game_files: &[(SocketAddr, Vec<GameFileDescription>)], game_files: &[(SocketAddr, Vec<GameFileDescription>)],
@@ -545,16 +563,14 @@ fn create_peer_whitelist(peer_scores: HashMap<SocketAddr, usize>) -> Vec<SocketA
return Vec::new(); return Vec::new();
} }
let max_score = *peer_scores let mut peers: Vec<_> = peer_scores
.values()
.max()
.expect("peer_scores should not be empty here");
let threshold = max_score.max(1); // At least 1 file, or match the highest score
peer_scores
.into_iter() .into_iter()
.filter_map(|(peer, score)| if score >= threshold { Some(peer) } else { None }) .filter_map(|(peer, score)| (score > 0).then_some((peer, score)))
.collect() .collect();
peers.sort_by_key(|(peer, score)| (Reverse(*score), *peer));
peers.into_iter().map(|(peer, _)| peer).collect()
} }
#[derive(Debug)] #[derive(Debug)]
@@ -676,9 +692,24 @@ async fn prepare_game_storage(
Ok(()) Ok(())
} }
fn resolve_file_peers<'a>(
relative_path: &str,
file_peer_map: &'a HashMap<String, Vec<SocketAddr>>,
fallback: &'a [SocketAddr],
) -> &'a [SocketAddr] {
if let Some(peers) = file_peer_map.get(relative_path)
&& !peers.is_empty()
{
return peers;
}
fallback
}
fn build_peer_plans( fn build_peer_plans(
peers: &[SocketAddr], peers: &[SocketAddr],
file_descs: &[GameFileDescription], file_descs: &[GameFileDescription],
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
) -> HashMap<SocketAddr, PeerDownloadPlan> { ) -> HashMap<SocketAddr, PeerDownloadPlan> {
let mut plans: HashMap<SocketAddr, PeerDownloadPlan> = HashMap::new(); let mut plans: HashMap<SocketAddr, PeerDownloadPlan> = HashMap::new();
if peers.is_empty() { if peers.is_empty() {
@@ -689,8 +720,13 @@ fn build_peer_plans(
for desc in file_descs.iter().filter(|d| !d.is_dir) { for desc in file_descs.iter().filter(|d| !d.is_dir) {
let size = desc.file_size(); let size = desc.file_size();
let eligible_peers = resolve_file_peers(&desc.relative_path, file_peer_map, peers);
if eligible_peers.is_empty() {
continue;
}
if size == 0 { if size == 0 {
let peer = peers[peer_index % peers.len()]; let peer = eligible_peers[peer_index % eligible_peers.len()];
peer_index += 1; peer_index += 1;
plans.entry(peer).or_default().chunks.push(DownloadChunk { plans.entry(peer).or_default().chunks.push(DownloadChunk {
relative_path: desc.relative_path.clone(), relative_path: desc.relative_path.clone(),
@@ -705,7 +741,7 @@ fn build_peer_plans(
let mut offset = 0u64; let mut offset = 0u64;
while offset < size { while offset < size {
let length = std::cmp::min(CHUNK_SIZE, size - offset); let length = std::cmp::min(CHUNK_SIZE, size - offset);
let peer = peers[peer_index % peers.len()]; let peer = eligible_peers[peer_index % eligible_peers.len()];
peer_index += 1; peer_index += 1;
plans.entry(peer).or_default().chunks.push(DownloadChunk { plans.entry(peer).or_default().chunks.push(DownloadChunk {
relative_path: desc.relative_path.clone(), relative_path: desc.relative_path.clone(),
@@ -904,6 +940,7 @@ async fn download_game_files(
game_file_descs: Vec<GameFileDescription>, game_file_descs: Vec<GameFileDescription>,
games_folder: String, games_folder: String,
peers: Vec<SocketAddr>, peers: Vec<SocketAddr>,
file_peer_map: HashMap<String, Vec<SocketAddr>>,
tx_notify_ui: UnboundedSender<PeerEvent>, tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
if peers.is_empty() { if peers.is_empty() {
@@ -917,7 +954,7 @@ async fn download_game_files(
id: game_id.to_string(), id: game_id.to_string(),
})?; })?;
let plans = build_peer_plans(&peers, &game_file_descs); let plans = build_peer_plans(&peers, &game_file_descs, &file_peer_map);
let mut tasks = Vec::new(); let mut tasks = Vec::new();
for (peer_addr, plan) in plans { for (peer_addr, plan) in plans {
@@ -963,7 +1000,8 @@ async fn download_game_files(
if !failed_chunks.is_empty() && !peers.is_empty() { if !failed_chunks.is_empty() && !peers.is_empty() {
log::info!("Retrying {} failed chunks", failed_chunks.len()); log::info!("Retrying {} failed chunks", failed_chunks.len());
let retry_results = retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id).await; let retry_results =
retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id, &file_peer_map).await;
for chunk_result in retry_results { for chunk_result in retry_results {
if let Err(e) = chunk_result.result { if let Err(e) = chunk_result.result {
@@ -1018,11 +1056,14 @@ async fn retry_failed_chunks(
peers: &[SocketAddr], peers: &[SocketAddr],
base_dir: &Path, base_dir: &Path,
game_id: &str, game_id: &str,
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
) -> Vec<ChunkDownloadResult> { ) -> Vec<ChunkDownloadResult> {
let mut exhausted = Vec::new(); let mut exhausted = Vec::new();
let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect(); let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect();
while let Some(mut chunk) = queue.pop_front() { while let Some(mut chunk) = queue.pop_front() {
let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers);
if chunk.retry_count >= MAX_RETRY_COUNT { if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult { exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(), chunk: chunk.clone(),
@@ -1030,20 +1071,21 @@ async fn retry_failed_chunks(
"Retry budget exhausted for chunk: {}", "Retry budget exhausted for chunk: {}",
chunk.relative_path chunk.relative_path
)), )),
peer_addr: fallback_peer_addr(peers, chunk.last_peer), peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
}); });
continue; continue;
} }
let retry_offset = chunk.retry_count.saturating_sub(1); let retry_offset = chunk.retry_count.saturating_sub(1);
let Some(peer_addr) = select_retry_peer(peers, chunk.last_peer, retry_offset) else { let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer, retry_offset)
else {
exhausted.push(ChunkDownloadResult { exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(), chunk: chunk.clone(),
result: Err(eyre::eyre!( result: Err(eyre::eyre!(
"No peers available to retry chunk: {}", "No peers available to retry chunk: {}",
chunk.relative_path chunk.relative_path
)), )),
peer_addr: fallback_peer_addr(peers, chunk.last_peer), peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
}); });
continue; continue;
}; };
@@ -1094,7 +1136,7 @@ async fn retry_failed_chunks(
"Retry budget exhausted for chunk after connection failure: {}", "Retry budget exhausted for chunk after connection failure: {}",
chunk.relative_path chunk.relative_path
))), ))),
peer_addr: fallback_peer_addr(peers, chunk.last_peer), peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
}); });
} else { } else {
queue.push_back(chunk); queue.push_back(chunk);
@@ -1511,20 +1553,20 @@ async fn handle_download_game_files_command(
let games_folder = games_folder.expect("checked above"); let games_folder = games_folder.expect("checked above");
// Use majority validation to get trusted file descriptions and peer whitelist // Use majority validation to get trusted file descriptions and peer whitelist
let (validated_descriptions, peer_whitelist) = { let (validated_descriptions, peer_whitelist, file_peer_map) = {
match ctx match ctx
.peer_game_db .peer_game_db
.read() .read()
.await .await
.validate_file_sizes_majority(&id) .validate_file_sizes_majority(&id)
{ {
Ok((files, peers)) => { Ok((files, peers, file_peer_map)) => {
log::info!( log::info!(
"Majority validation: {} validated files, {} trusted peers for game {id}", "Majority validation: {} validated files, {} trusted peers for game {id}",
files.len(), files.len(),
peers.len() peers.len()
); );
(files, peers) (files, peers, file_peer_map)
} }
Err(e) => { Err(e) => {
log::error!("File size majority validation failed for {id}: {e}"); log::error!("File size majority validation failed for {id}: {e}");
@@ -1578,6 +1620,7 @@ async fn handle_download_game_files_command(
resolved_descriptions, resolved_descriptions,
games_folder, games_folder,
peer_whitelist, peer_whitelist,
file_peer_map,
tx_notify_ui.clone(), tx_notify_ui.clone(),
) )
.await .await
@@ -2293,6 +2336,8 @@ mod tests {
fn build_peer_plans_handles_partial_final_chunk() { fn build_peer_plans_handles_partial_final_chunk() {
let peers = vec![loopback_addr(12000), loopback_addr(12001)]; let peers = vec![loopback_addr(12000), loopback_addr(12001)];
let file_size = CHUNK_SIZE * 2 + CHUNK_SIZE / 4; let file_size = CHUNK_SIZE * 2 + CHUNK_SIZE / 4;
let mut file_peer_map = HashMap::new();
file_peer_map.insert("game/file.dat".to_string(), peers.clone());
let file_descs = vec![GameFileDescription { let file_descs = vec![GameFileDescription {
game_id: "test".to_string(), game_id: "test".to_string(),
relative_path: "game/file.dat".to_string(), relative_path: "game/file.dat".to_string(),
@@ -2300,7 +2345,7 @@ mod tests {
size: file_size, size: file_size,
}]; }];
let plans = build_peer_plans(&peers, &file_descs); let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let mut chunks: Vec<_> = plans.values().flat_map(|plan| plan.chunks.iter()).collect(); let mut chunks: Vec<_> = plans.values().flat_map(|plan| plan.chunks.iter()).collect();
assert_eq!(chunks.len(), 3, "expected three chunks for 2.25 blocks"); assert_eq!(chunks.len(), 3, "expected three chunks for 2.25 blocks");
@@ -2317,4 +2362,59 @@ mod tests {
"last chunk should finish the file" "last chunk should finish the file"
); );
} }
#[test]
fn build_peer_plans_respects_file_peer_map() {
let shared_a = loopback_addr(12010);
let shared_b = loopback_addr(12011);
let exclusive = loopback_addr(12012);
let peers = vec![shared_a, shared_b, exclusive];
let mut file_peer_map = HashMap::new();
file_peer_map.insert("shared.bin".to_string(), vec![shared_a, shared_b]);
file_peer_map.insert("exclusive.bin".to_string(), vec![exclusive]);
let file_descs = vec![
GameFileDescription {
game_id: "test".to_string(),
relative_path: "shared.bin".to_string(),
is_dir: false,
size: CHUNK_SIZE * 2,
},
GameFileDescription {
game_id: "test".to_string(),
relative_path: "exclusive.bin".to_string(),
is_dir: false,
size: CHUNK_SIZE,
},
];
let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let exclusive_plan = plans
.get(&exclusive)
.expect("exclusive peer should have a plan");
assert!(
exclusive_plan
.chunks
.iter()
.all(|chunk| chunk.relative_path == "exclusive.bin"),
"exclusive peer should only receive exclusive.bin chunks"
);
for (peer, plan) in plans {
for chunk in plan.chunks {
match chunk.relative_path.as_str() {
"exclusive.bin" => assert_eq!(
peer, exclusive,
"exclusive.bin chunks should only be assigned to the exclusive peer"
),
"shared.bin" => assert!(
peer == shared_a || peer == shared_b,
"shared.bin chunks must stay within shared peers"
),
other => panic!("unexpected file in plan: {other}"),
}
}
}
}
} }
+31 -2
View File
@@ -3,7 +3,10 @@ use std::{convert::TryInto, path::Path};
use bytes::Bytes; use bytes::Bytes;
use lanspread_db::db::GameFileDescription; use lanspread_db::db::GameFileDescription;
use lanspread_utils::maybe_addr; use lanspread_utils::maybe_addr;
use s2n_quic::stream::SendStream; use s2n_quic::{
connection,
stream::{Error as StreamError, SendStream},
};
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncSeekExt}, io::{AsyncReadExt, AsyncSeekExt},
time::Instant, time::Instant,
@@ -33,6 +36,8 @@ async fn stream_file_bytes(
} }
let mut remaining = length.unwrap_or(u64::MAX); let mut remaining = length.unwrap_or(u64::MAX);
let expect_exact = length.is_some();
let mut transfer_complete = matches!(length, Some(0));
let mut total_bytes = 0u64; let mut total_bytes = 0u64;
let mut last_total_bytes = 0u64; let mut last_total_bytes = 0u64;
let mut timestamp = Instant::now(); let mut timestamp = Instant::now();
@@ -47,6 +52,9 @@ async fn stream_file_bytes(
let bytes_read = file.read(&mut buf[..read_len]).await?; let bytes_read = file.read(&mut buf[..read_len]).await?;
if bytes_read == 0 { if bytes_read == 0 {
if !expect_exact {
transfer_complete = true;
}
break; break;
} }
@@ -54,6 +62,11 @@ async fn stream_file_bytes(
remaining = remaining.saturating_sub(bytes_read as u64); remaining = remaining.saturating_sub(bytes_read as u64);
total_bytes += bytes_read as u64; total_bytes += bytes_read as u64;
if expect_exact && remaining == 0 {
transfer_complete = true;
break;
}
if last_total_bytes + 10_000_000 < total_bytes { if last_total_bytes + 10_000_000 < total_bytes {
let elapsed = timestamp.elapsed(); let elapsed = timestamp.elapsed();
let diff_bytes = total_bytes - last_total_bytes; let diff_bytes = total_bytes - last_total_bytes;
@@ -76,10 +89,26 @@ async fn stream_file_bytes(
validated_path.display() validated_path.display()
); );
tx.close().await?; match tx.close().await {
Ok(()) => {}
Err(err) if transfer_complete && is_clean_remote_close(&err) => {
log::debug!("{remote_addr} closed stream after transfer completion: {err}");
}
Err(err) => return Err(err.into()),
}
Ok(()) Ok(())
} }
fn is_clean_remote_close(err: &StreamError) -> bool {
matches!(
err,
StreamError::ConnectionError {
error: connection::Error::Closed { .. },
..
}
)
}
pub async fn send_game_file_data( pub async fn send_game_file_data(
game_file_desc: &GameFileDescription, game_file_desc: &GameFileDescription,
tx: &mut SendStream, tx: &mut SendStream,