diff --git a/crates/lanspread-peer/src/config.rs b/crates/lanspread-peer/src/config.rs index 254eb26..39a81f5 100644 --- a/crates/lanspread-peer/src/config.rs +++ b/crates/lanspread-peer/src/config.rs @@ -14,6 +14,12 @@ pub const PEER_STALE_TIMEOUT_SECS: u64 = 90; /// Size of each download chunk (32 MB). pub const CHUNK_SIZE: u64 = 32 * 1024 * 1024; +/// Number of chunk streams to keep in flight on one peer connection. +/// +/// Four 32 MB chunks hide request/stream setup latency on fast LAN links without +/// opening an unbounded number of file handles or competing writes. +pub const PEER_DOWNLOAD_STREAM_WINDOW: usize = 4; + /// Maximum number of retry attempts for failed chunk downloads. pub const MAX_RETRY_COUNT: usize = 3; diff --git a/crates/lanspread-peer/src/download/planning.rs b/crates/lanspread-peer/src/download/planning.rs index ee24f99..4d2d138 100644 --- a/crates/lanspread-peer/src/download/planning.rs +++ b/crates/lanspread-peer/src/download/planning.rs @@ -19,7 +19,6 @@ pub(super) struct DownloadChunk { #[derive(Debug, Default)] pub(super) struct PeerDownloadPlan { pub(super) chunks: Vec, - pub(super) whole_files: Vec, } /// Result of downloading a chunk. @@ -137,7 +136,8 @@ pub(super) fn build_peer_plans( return plans; } - let mut peer_index = 0usize; + let mut planned_bytes: HashMap = HashMap::new(); + let mut tie_breaker = 0usize; for desc in file_descs.iter().filter(|d| !d.is_dir) { let size = desc.file_size(); @@ -147,8 +147,8 @@ pub(super) fn build_peer_plans( } if size == 0 { - let peer = eligible_peers[peer_index % eligible_peers.len()]; - peer_index += 1; + let peer = select_least_loaded_peer(eligible_peers, &planned_bytes, &mut tie_breaker); + *planned_bytes.entry(peer).or_default() += 1; plans.entry(peer).or_default().chunks.push(DownloadChunk { relative_path: desc.relative_path.clone(), offset: 0, @@ -162,8 +162,8 @@ pub(super) fn build_peer_plans( let mut offset = 0u64; while offset < size { let length = std::cmp::min(CHUNK_SIZE, size - offset); - let peer = eligible_peers[peer_index % eligible_peers.len()]; - peer_index += 1; + let peer = select_least_loaded_peer(eligible_peers, &planned_bytes, &mut tie_breaker); + *planned_bytes.entry(peer).or_default() += length; plans.entry(peer).or_default().chunks.push(DownloadChunk { relative_path: desc.relative_path.clone(), offset, @@ -178,6 +178,29 @@ pub(super) fn build_peer_plans( plans } +fn select_least_loaded_peer( + eligible_peers: &[SocketAddr], + planned_bytes: &HashMap, + tie_breaker: &mut usize, +) -> SocketAddr { + let start = *tie_breaker % eligible_peers.len(); + *tie_breaker = (*tie_breaker).wrapping_add(1); + + let mut selected = eligible_peers[start]; + let mut selected_load = planned_bytes.get(&selected).copied().unwrap_or_default(); + + for offset in 1..eligible_peers.len() { + let peer = eligible_peers[(start + offset) % eligible_peers.len()]; + let load = planned_bytes.get(&peer).copied().unwrap_or_default(); + if load < selected_load { + selected = peer; + selected_load = load; + } + } + + selected +} + #[cfg(test)] mod tests { use lanspread_db::db::GameFileDescription; diff --git a/crates/lanspread-peer/src/download/retry.rs b/crates/lanspread-peer/src/download/retry.rs index 22bd0fb..923fe53 100644 --- a/crates/lanspread-peer/src/download/retry.rs +++ b/crates/lanspread-peer/src/download/retry.rs @@ -5,6 +5,7 @@ use std::{ sync::Arc, }; +use futures::{StreamExt, stream::FuturesUnordered}; use tokio_util::sync::CancellationToken; use super::{ @@ -15,11 +16,7 @@ use super::{ use crate::config::MAX_RETRY_COUNT; /// Selects a peer for retrying a failed chunk. -fn select_retry_peer( - peers: &[SocketAddr], - last_peer: Option, - attempt_offset: usize, -) -> Option { +fn select_retry_peer(peers: &[SocketAddr], last_peer: Option) -> Option { if peers.is_empty() { return None; } @@ -28,11 +25,11 @@ fn select_retry_peer( && let Some(last) = last_peer && let Some(pos) = peers.iter().position(|addr| *addr == last) { - let next_index = (pos + 1 + attempt_offset) % peers.len(); + let next_index = (pos + 1) % peers.len(); return Some(peers[next_index]); } - Some(peers[attempt_offset % peers.len()]) + peers.first().copied() } /// Returns a fallback peer address for error reporting. @@ -42,6 +39,172 @@ fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option) -> So .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0))) } +fn ensure_not_cancelled(cancel_token: &CancellationToken, game_id: &str) -> eyre::Result<()> { + if cancel_token.is_cancelled() { + eyre::bail!("download cancelled for game {game_id}"); + } + Ok(()) +} + +struct RetryAttempt { + peer_addr: SocketAddr, + chunks: Vec, + result: eyre::Result>, +} + +fn plan_retry_batch( + queue: &mut VecDeque, + peers: &[SocketAddr], + file_peer_map: &HashMap>, + final_results: &mut Vec, +) -> HashMap { + let mut retry_plans: HashMap = HashMap::new(); + + while let Some(mut chunk) = queue.pop_front() { + let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers); + + if chunk.retry_count >= MAX_RETRY_COUNT { + final_results.push(ChunkDownloadResult { + chunk: chunk.clone(), + result: Err(eyre::eyre!( + "Retry budget exhausted for chunk: {}", + chunk.relative_path + )), + peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), + }); + continue; + } + + let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer) else { + final_results.push(ChunkDownloadResult { + chunk: chunk.clone(), + result: Err(eyre::eyre!( + "No peers available to retry chunk: {}", + chunk.relative_path + )), + peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), + }); + continue; + }; + + chunk.last_peer = Some(peer_addr); + retry_plans.entry(peer_addr).or_default().chunks.push(chunk); + } + + retry_plans +} + +async fn run_retry_batch( + retry_plans: HashMap, + base_dir: &Path, + game_id: &str, + cancel_token: &CancellationToken, + version_buffer: Option>, +) -> eyre::Result> { + let mut attempts = FuturesUnordered::new(); + + for (peer_addr, plan) in retry_plans { + let retry_chunks = plan.chunks.clone(); + let base_dir = base_dir.to_path_buf(); + let game_id = game_id.to_string(); + let cancel_token = cancel_token.clone(); + let version_buffer = version_buffer.clone(); + + attempts.push(async move { + let result = download_from_peer( + peer_addr, + &game_id, + plan, + base_dir, + &cancel_token, + version_buffer, + ) + .await; + RetryAttempt { + peer_addr, + chunks: retry_chunks, + result, + } + }); + } + + let mut results = Vec::new(); + while !attempts.is_empty() { + let result = tokio::select! { + () = cancel_token.cancelled() => { + eyre::bail!("download cancelled for game {game_id}"); + } + result = attempts.next() => result.expect("retry attempt should exist"), + }; + results.push(result); + } + + Ok(results) +} + +fn handle_retry_chunk_result( + result: ChunkDownloadResult, + queue: &mut VecDeque, + final_results: &mut Vec, +) { + let ChunkDownloadResult { + mut chunk, + result, + peer_addr, + } = result; + + match result { + Ok(()) => final_results.push(ChunkDownloadResult { + chunk, + result: Ok(()), + peer_addr, + }), + Err(err) => { + chunk.retry_count += 1; + chunk.last_peer = Some(peer_addr); + + if chunk.retry_count >= MAX_RETRY_COUNT { + let context = format!("Retry budget exhausted for chunk: {}", chunk.relative_path); + final_results.push(ChunkDownloadResult { + chunk, + result: Err(err.wrap_err(context)), + peer_addr, + }); + } else { + queue.push_back(chunk); + } + } + } +} + +fn handle_retry_attempt_error( + peer_addr: SocketAddr, + chunks: Vec, + err: &eyre::Report, + queue: &mut VecDeque, + final_results: &mut Vec, +) { + let error = err.to_string(); + + for mut chunk in chunks { + chunk.retry_count += 1; + chunk.last_peer = Some(peer_addr); + + if chunk.retry_count >= MAX_RETRY_COUNT { + final_results.push(ChunkDownloadResult { + chunk: chunk.clone(), + result: Err(eyre::eyre!( + "Retry budget exhausted for chunk after connection failure: {}: {error}", + chunk.relative_path + )), + peer_addr, + }); + } else { + queue.push_back(chunk); + } + } +} + /// Retries downloading failed chunks. pub(super) async fn retry_failed_chunks( failed_chunks: Vec, @@ -52,113 +215,88 @@ pub(super) async fn retry_failed_chunks( cancel_token: &CancellationToken, version_buffer: Option>, ) -> eyre::Result> { - let mut exhausted = Vec::new(); + let mut final_results = Vec::new(); let mut queue: VecDeque = failed_chunks.into_iter().collect(); - while let Some(mut chunk) = queue.pop_front() { - if cancel_token.is_cancelled() { - return Ok(exhausted); - } + while !queue.is_empty() { + ensure_not_cancelled(cancel_token, game_id)?; - let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers); - - if chunk.retry_count >= MAX_RETRY_COUNT { - exhausted.push(ChunkDownloadResult { - chunk: chunk.clone(), - result: Err(eyre::eyre!( - "Retry budget exhausted for chunk: {}", - chunk.relative_path - )), - peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), - }); + let retry_plans = plan_retry_batch(&mut queue, peers, file_peer_map, &mut final_results); + if retry_plans.is_empty() { continue; } - let retry_offset = chunk.retry_count.saturating_sub(1); - let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer, retry_offset) - else { - exhausted.push(ChunkDownloadResult { - chunk: chunk.clone(), - result: Err(eyre::eyre!( - "No peers available to retry chunk: {}", - chunk.relative_path - )), - peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), - }); - continue; - }; - - let mut attempt_chunk = chunk.clone(); - attempt_chunk.last_peer = Some(peer_addr); - - let plan = PeerDownloadPlan { - chunks: vec![attempt_chunk.clone()], - whole_files: Vec::new(), - }; - - match download_from_peer( - peer_addr, + let attempts = run_retry_batch( + retry_plans, + base_dir, game_id, - plan, - base_dir.to_path_buf(), cancel_token, version_buffer.clone(), ) - .await - { - Ok(results) => { - if cancel_token.is_cancelled() { - return Ok(exhausted); - } + .await?; - for result in results { - match result.result { - Ok(()) => {} - Err(e) => { - let mut retry_chunk = result.chunk.clone(); - retry_chunk.retry_count = chunk.retry_count + 1; - retry_chunk.last_peer = Some(result.peer_addr); + for attempt in attempts { + let RetryAttempt { + peer_addr, + chunks, + result, + } = attempt; - if retry_chunk.retry_count >= MAX_RETRY_COUNT { - let context = format!( - "Retry budget exhausted for chunk: {}", - result.chunk.relative_path - ); - exhausted.push(ChunkDownloadResult { - chunk: retry_chunk, - result: Err(e.wrap_err(context)), - peer_addr: result.peer_addr, - }); - } else { - queue.push_back(retry_chunk); - } - } + match result { + Ok(results) => { + for result in results { + handle_retry_chunk_result(result, &mut queue, &mut final_results); } } - } - Err(e) => { - if cancel_token.is_cancelled() { - return Ok(exhausted); - } - - chunk.retry_count += 1; - chunk.last_peer = Some(peer_addr); - - if chunk.retry_count >= MAX_RETRY_COUNT { - exhausted.push(ChunkDownloadResult { - chunk: chunk.clone(), - result: Err(e.wrap_err(format!( - "Retry budget exhausted for chunk after connection failure: {}", - chunk.relative_path - ))), - peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), - }); - } else { - queue.push_back(chunk); + Err(err) => { + handle_retry_attempt_error( + peer_addr, + chunks, + &err, + &mut queue, + &mut final_results, + ); } } } } - Ok(exhausted) + Ok(final_results) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn loopback_addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + + #[test] + fn retry_peer_selection_cycles_after_last_failed_peer() { + let peers = vec![ + loopback_addr(12000), + loopback_addr(12001), + loopback_addr(12002), + ]; + + assert_eq!(select_retry_peer(&peers, Some(peers[0])), Some(peers[1])); + assert_eq!(select_retry_peer(&peers, Some(peers[1])), Some(peers[2])); + assert_eq!(select_retry_peer(&peers, Some(peers[2])), Some(peers[0])); + } + + #[test] + fn retry_peer_selection_uses_first_peer_without_prior_failure() { + let peers = vec![loopback_addr(12000), loopback_addr(12001)]; + + assert_eq!(select_retry_peer(&peers, None), Some(peers[0])); + } + + #[test] + fn retry_peer_selection_wraps_between_two_peers() { + let peers = vec![loopback_addr(12000), loopback_addr(12001)]; + + assert_eq!(select_retry_peer(&peers, Some(peers[0])), Some(peers[1])); + assert_eq!(select_retry_peer(&peers, Some(peers[1])), Some(peers[0])); + } } diff --git a/crates/lanspread-peer/src/download/transport.rs b/crates/lanspread-peer/src/download/transport.rs index 482e0eb..717b775 100644 --- a/crates/lanspread-peer/src/download/transport.rs +++ b/crates/lanspread-peer/src/download/transport.rs @@ -1,10 +1,12 @@ use std::{ + collections::VecDeque, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, }; -use lanspread_db::db::GameFileDescription; +use futures::{SinkExt, StreamExt, stream::FuturesUnordered}; +use s2n_quic::{Connection, stream::ReceiveStream}; use tokio::{ fs::OpenOptions, io::{AsyncSeekExt, AsyncWriteExt}, @@ -18,7 +20,11 @@ use super::{ planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan}, version_ini::VersionIniBuffer, }; -use crate::{network::connect_to_peer, path_validation::validate_game_file_path}; +use crate::{ + config::PEER_DOWNLOAD_STREAM_WINDOW, + network::connect_to_peer, + path_validation::validate_game_file_path, +}; fn ensure_download_not_cancelled( cancel_token: &CancellationToken, @@ -30,19 +36,15 @@ fn ensure_download_not_cancelled( Ok(()) } -/// Downloads a single chunk from a peer. -async fn download_chunk( - conn: &mut s2n_quic::Connection, - base_dir: &Path, +async fn open_chunk_stream( + conn: &mut Connection, game_id: &str, chunk: &DownloadChunk, - version_buffer: Option>, -) -> eyre::Result<()> { - use futures::SinkExt; +) -> eyre::Result { use lanspread_proto::{Message, Request}; let stream = conn.open_bidirectional_stream().await?; - let (mut rx, tx) = stream.split(); + let (rx, tx) = stream.split(); let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); let request = Request::GetGameFileChunk { @@ -54,11 +56,20 @@ async fn download_chunk( framed_tx.send(request.encode()).await?; framed_tx.close().await?; + Ok(rx) +} +/// Receives one requested chunk from a peer stream. +async fn receive_chunk( + mut rx: ReceiveStream, + base_dir: &Path, + chunk: &DownloadChunk, + version_buffer: Option>, +) -> eyre::Result<()> { if let Some(buffer) = version_buffer && buffer.matches(&chunk.relative_path) { - return download_version_ini_chunk(&mut rx, chunk, &buffer).await; + return download_version_ini_chunk(rx, chunk, &buffer).await; } // Validate the path to prevent directory traversal @@ -110,8 +121,23 @@ async fn download_chunk( Ok(()) } +async fn receive_chunk_result( + peer_addr: SocketAddr, + base_dir: PathBuf, + chunk: DownloadChunk, + rx: ReceiveStream, + version_buffer: Option>, +) -> ChunkDownloadResult { + let result = receive_chunk(rx, &base_dir, &chunk, version_buffer).await; + ChunkDownloadResult { + chunk, + result, + peer_addr, + } +} + async fn download_version_ini_chunk( - rx: &mut s2n_quic::stream::ReceiveStream, + mut rx: ReceiveStream, chunk: &DownloadChunk, buffer: &VersionIniBuffer, ) -> eyre::Result<()> { @@ -157,40 +183,99 @@ async fn verify_chunk_integrity( Ok(()) } -/// Downloads a whole file from a peer. -async fn download_whole_file( - conn: &mut s2n_quic::Connection, +fn failed_chunk_result( + chunk: DownloadChunk, + peer_addr: SocketAddr, + reason: impl Into, +) -> ChunkDownloadResult { + ChunkDownloadResult { + chunk, + result: Err(eyre::Report::msg(reason.into())), + peer_addr, + } +} + +fn failed_plan_results( + plan: PeerDownloadPlan, + peer_addr: SocketAddr, + reason: impl std::fmt::Display, +) -> Vec { + let reason = format!("peer connection failed: {reason}"); + plan.chunks + .into_iter() + .map(|chunk| failed_chunk_result(chunk, peer_addr, reason.clone())) + .collect() +} + +async fn download_chunk_plan( + conn: &mut Connection, + peer_addr: SocketAddr, + game_id: &str, + chunks: Vec, base_dir: &Path, - desc: &GameFileDescription, -) -> eyre::Result<()> { - use futures::SinkExt; - use lanspread_proto::{Message, Request}; + cancel_token: &CancellationToken, + version_buffer: Option>, +) -> eyre::Result> { + let mut pending: VecDeque = chunks.into(); + let mut in_flight = FuturesUnordered::new(); + let mut results = Vec::new(); + let window = PEER_DOWNLOAD_STREAM_WINDOW.max(1); + let base_dir = base_dir.to_path_buf(); - let stream = conn.open_bidirectional_stream().await?; - let (mut rx, tx) = stream.split(); - let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + while !pending.is_empty() || !in_flight.is_empty() { + while in_flight.len() < window { + let Some(chunk) = pending.pop_front() else { + break; + }; + ensure_download_not_cancelled(cancel_token, game_id)?; - framed_tx - .send(Request::GetGameFileData(desc.clone()).encode()) - .await?; - framed_tx.close().await?; + log::info!( + "Downloading chunk {} (offset {}, length {}) from {}", + chunk.relative_path, + chunk.offset, + chunk.length, + peer_addr + ); - // Validate the path to prevent directory traversal - let validated_path = validate_game_file_path(base_dir, &desc.relative_path)?; - let mut file = OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(&validated_path) - .await?; - file.seek(std::io::SeekFrom::Start(0)).await?; + match open_chunk_stream(conn, game_id, &chunk).await { + Ok(rx) => { + in_flight.push(receive_chunk_result( + peer_addr, + base_dir.clone(), + chunk, + rx, + version_buffer.clone(), + )); + } + Err(err) => { + let reason = format!("failed to open chunk stream: {err}"); + results.push(failed_chunk_result(chunk, peer_addr, reason.clone())); + while let Some(chunk) = pending.pop_front() { + results.push(failed_chunk_result( + chunk, + peer_addr, + format!("peer stream unavailable after earlier open failure: {reason}"), + )); + } + break; + } + } + } - while let Some(bytes) = rx.receive().await? { - file.write_all(&bytes).await?; + if in_flight.is_empty() { + continue; + } + + let result = tokio::select! { + () = cancel_token.cancelled() => { + eyre::bail!("download cancelled for game {game_id}"); + } + result = in_flight.next() => result.expect("in-flight chunk stream should exist"), + }; + results.push(result); } - file.flush().await?; - Ok(()) + Ok(results) } /// Downloads all assigned chunks and files from a single peer. @@ -202,58 +287,33 @@ pub(super) async fn download_from_peer( cancel_token: &CancellationToken, version_buffer: Option>, ) -> eyre::Result> { - if plan.chunks.is_empty() && plan.whole_files.is_empty() { + if plan.chunks.is_empty() { return Ok(Vec::new()); } ensure_download_not_cancelled(cancel_token, game_id)?; - let mut conn = connect_to_peer(peer_addr).await?; - conn.keep_alive(true)?; - conn.keep_alive(true)?; + let mut conn = match connect_to_peer(peer_addr).await { + Ok(conn) => conn, + Err(err) => return Ok(failed_plan_results(plan, peer_addr, err)), + }; + + if let Err(err) = conn.keep_alive(true) { + return Ok(failed_plan_results(plan, peer_addr, err)); + } let base_dir = games_folder; - let mut results = Vec::new(); - // Download chunks with error handling - for chunk in &plan.chunks { - ensure_download_not_cancelled(cancel_token, game_id)?; - - log::info!( - "Downloading chunk {} (offset {}, length {}) from {}", - chunk.relative_path, - chunk.offset, - chunk.length, - peer_addr - ); - let result = - download_chunk(&mut conn, &base_dir, game_id, chunk, version_buffer.clone()).await; - results.push(ChunkDownloadResult { - chunk: chunk.clone(), - result, - peer_addr, - }); - } - - // Download whole files - for desc in &plan.whole_files { - ensure_download_not_cancelled(cancel_token, game_id)?; - - let chunk = DownloadChunk { - relative_path: desc.relative_path.clone(), - offset: 0, - length: 0, // Indicates whole file - retry_count: 0, - last_peer: Some(peer_addr), - }; - - let result = download_whole_file(&mut conn, &base_dir, desc).await; - results.push(ChunkDownloadResult { - chunk, - result, - peer_addr, - }); - } + let results = download_chunk_plan( + &mut conn, + peer_addr, + game_id, + plan.chunks, + &base_dir, + cancel_token, + version_buffer, + ) + .await?; Ok(results) }