feat(peer): pipeline chunk downloads over QUIC

Keep several chunk streams in flight per peer connection so a fast LAN download
is no longer forced through a request, wait, request loop. The transport still
uses the current GetGameFileChunk request on normal QUIC bidirectional streams,
so this improves throughput without adding another wire message or compatibility
path.

The peer planner now assigns chunks to the least-loaded eligible peer by planned
bytes. This keeps shared large files balanced across the latest valid sources,
while still respecting per-file source eligibility. Retries are batched by peer
and use the same pipelined transport instead of opening a new connection for one
failed chunk at a time.

Initial peer connection failures are converted into per-chunk failures so the
existing retry logic can move those chunks to another validated source. The dead
whole-file branch was removed from PeerDownloadPlan because nothing populated it
and retrying those entries as zero-length chunks would be a future data-loss
trap.

Test Plan:
- RUSTC_WRAPPER= just fmt
- RUSTC_WRAPPER= just test
- RUSTC_WRAPPER= just clippy
- RUSTC_WRAPPER= just peer-cli-build
- RUSTC_WRAPPER= just peer-cli-image
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \
  S13 S14 S16 S18 S19 S20 S24 S25 S26 S36
- git diff --cached --check

Refs: PEER_CLI_SCENARIOS.md
Review-Notes: addressed Claude review on whole-file retry cleanup
This commit is contained in:
2026-05-20 07:46:44 +02:00
parent e078b12dcf
commit 6a90ca951d
4 changed files with 413 additions and 186 deletions
+235 -97
View File
@@ -5,6 +5,7 @@ use std::{
sync::Arc,
};
use futures::{StreamExt, stream::FuturesUnordered};
use tokio_util::sync::CancellationToken;
use super::{
@@ -15,11 +16,7 @@ use super::{
use crate::config::MAX_RETRY_COUNT;
/// Selects a peer for retrying a failed chunk.
fn select_retry_peer(
peers: &[SocketAddr],
last_peer: Option<SocketAddr>,
attempt_offset: usize,
) -> Option<SocketAddr> {
fn select_retry_peer(peers: &[SocketAddr], last_peer: Option<SocketAddr>) -> Option<SocketAddr> {
if peers.is_empty() {
return None;
}
@@ -28,11 +25,11 @@ fn select_retry_peer(
&& let Some(last) = last_peer
&& let Some(pos) = peers.iter().position(|addr| *addr == last)
{
let next_index = (pos + 1 + attempt_offset) % peers.len();
let next_index = (pos + 1) % peers.len();
return Some(peers[next_index]);
}
Some(peers[attempt_offset % peers.len()])
peers.first().copied()
}
/// Returns a fallback peer address for error reporting.
@@ -42,6 +39,172 @@ fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option<SocketAddr>) -> So
.unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0)))
}
fn ensure_not_cancelled(cancel_token: &CancellationToken, game_id: &str) -> eyre::Result<()> {
if cancel_token.is_cancelled() {
eyre::bail!("download cancelled for game {game_id}");
}
Ok(())
}
struct RetryAttempt {
peer_addr: SocketAddr,
chunks: Vec<DownloadChunk>,
result: eyre::Result<Vec<ChunkDownloadResult>>,
}
fn plan_retry_batch(
queue: &mut VecDeque<DownloadChunk>,
peers: &[SocketAddr],
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
final_results: &mut Vec<ChunkDownloadResult>,
) -> HashMap<SocketAddr, PeerDownloadPlan> {
let mut retry_plans: HashMap<SocketAddr, PeerDownloadPlan> = HashMap::new();
while let Some(mut chunk) = queue.pop_front() {
let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers);
if chunk.retry_count >= MAX_RETRY_COUNT {
final_results.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"Retry budget exhausted for chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
continue;
}
let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer) else {
final_results.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"No peers available to retry chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
continue;
};
chunk.last_peer = Some(peer_addr);
retry_plans.entry(peer_addr).or_default().chunks.push(chunk);
}
retry_plans
}
async fn run_retry_batch(
retry_plans: HashMap<SocketAddr, PeerDownloadPlan>,
base_dir: &Path,
game_id: &str,
cancel_token: &CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
) -> eyre::Result<Vec<RetryAttempt>> {
let mut attempts = FuturesUnordered::new();
for (peer_addr, plan) in retry_plans {
let retry_chunks = plan.chunks.clone();
let base_dir = base_dir.to_path_buf();
let game_id = game_id.to_string();
let cancel_token = cancel_token.clone();
let version_buffer = version_buffer.clone();
attempts.push(async move {
let result = download_from_peer(
peer_addr,
&game_id,
plan,
base_dir,
&cancel_token,
version_buffer,
)
.await;
RetryAttempt {
peer_addr,
chunks: retry_chunks,
result,
}
});
}
let mut results = Vec::new();
while !attempts.is_empty() {
let result = tokio::select! {
() = cancel_token.cancelled() => {
eyre::bail!("download cancelled for game {game_id}");
}
result = attempts.next() => result.expect("retry attempt should exist"),
};
results.push(result);
}
Ok(results)
}
fn handle_retry_chunk_result(
result: ChunkDownloadResult,
queue: &mut VecDeque<DownloadChunk>,
final_results: &mut Vec<ChunkDownloadResult>,
) {
let ChunkDownloadResult {
mut chunk,
result,
peer_addr,
} = result;
match result {
Ok(()) => final_results.push(ChunkDownloadResult {
chunk,
result: Ok(()),
peer_addr,
}),
Err(err) => {
chunk.retry_count += 1;
chunk.last_peer = Some(peer_addr);
if chunk.retry_count >= MAX_RETRY_COUNT {
let context = format!("Retry budget exhausted for chunk: {}", chunk.relative_path);
final_results.push(ChunkDownloadResult {
chunk,
result: Err(err.wrap_err(context)),
peer_addr,
});
} else {
queue.push_back(chunk);
}
}
}
}
fn handle_retry_attempt_error(
peer_addr: SocketAddr,
chunks: Vec<DownloadChunk>,
err: &eyre::Report,
queue: &mut VecDeque<DownloadChunk>,
final_results: &mut Vec<ChunkDownloadResult>,
) {
let error = err.to_string();
for mut chunk in chunks {
chunk.retry_count += 1;
chunk.last_peer = Some(peer_addr);
if chunk.retry_count >= MAX_RETRY_COUNT {
final_results.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"Retry budget exhausted for chunk after connection failure: {}: {error}",
chunk.relative_path
)),
peer_addr,
});
} else {
queue.push_back(chunk);
}
}
}
/// Retries downloading failed chunks.
pub(super) async fn retry_failed_chunks(
failed_chunks: Vec<DownloadChunk>,
@@ -52,113 +215,88 @@ pub(super) async fn retry_failed_chunks(
cancel_token: &CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
) -> eyre::Result<Vec<ChunkDownloadResult>> {
let mut exhausted = Vec::new();
let mut final_results = Vec::new();
let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect();
while let Some(mut chunk) = queue.pop_front() {
if cancel_token.is_cancelled() {
return Ok(exhausted);
}
while !queue.is_empty() {
ensure_not_cancelled(cancel_token, game_id)?;
let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers);
if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"Retry budget exhausted for chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
let retry_plans = plan_retry_batch(&mut queue, peers, file_peer_map, &mut final_results);
if retry_plans.is_empty() {
continue;
}
let retry_offset = chunk.retry_count.saturating_sub(1);
let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer, retry_offset)
else {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"No peers available to retry chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
continue;
};
let mut attempt_chunk = chunk.clone();
attempt_chunk.last_peer = Some(peer_addr);
let plan = PeerDownloadPlan {
chunks: vec![attempt_chunk.clone()],
whole_files: Vec::new(),
};
match download_from_peer(
peer_addr,
let attempts = run_retry_batch(
retry_plans,
base_dir,
game_id,
plan,
base_dir.to_path_buf(),
cancel_token,
version_buffer.clone(),
)
.await
{
Ok(results) => {
if cancel_token.is_cancelled() {
return Ok(exhausted);
}
.await?;
for result in results {
match result.result {
Ok(()) => {}
Err(e) => {
let mut retry_chunk = result.chunk.clone();
retry_chunk.retry_count = chunk.retry_count + 1;
retry_chunk.last_peer = Some(result.peer_addr);
for attempt in attempts {
let RetryAttempt {
peer_addr,
chunks,
result,
} = attempt;
if retry_chunk.retry_count >= MAX_RETRY_COUNT {
let context = format!(
"Retry budget exhausted for chunk: {}",
result.chunk.relative_path
);
exhausted.push(ChunkDownloadResult {
chunk: retry_chunk,
result: Err(e.wrap_err(context)),
peer_addr: result.peer_addr,
});
} else {
queue.push_back(retry_chunk);
}
}
match result {
Ok(results) => {
for result in results {
handle_retry_chunk_result(result, &mut queue, &mut final_results);
}
}
}
Err(e) => {
if cancel_token.is_cancelled() {
return Ok(exhausted);
}
chunk.retry_count += 1;
chunk.last_peer = Some(peer_addr);
if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(e.wrap_err(format!(
"Retry budget exhausted for chunk after connection failure: {}",
chunk.relative_path
))),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
} else {
queue.push_back(chunk);
Err(err) => {
handle_retry_attempt_error(
peer_addr,
chunks,
&err,
&mut queue,
&mut final_results,
);
}
}
}
}
Ok(exhausted)
Ok(final_results)
}
#[cfg(test)]
mod tests {
use super::*;
fn loopback_addr(port: u16) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], port))
}
#[test]
fn retry_peer_selection_cycles_after_last_failed_peer() {
let peers = vec![
loopback_addr(12000),
loopback_addr(12001),
loopback_addr(12002),
];
assert_eq!(select_retry_peer(&peers, Some(peers[0])), Some(peers[1]));
assert_eq!(select_retry_peer(&peers, Some(peers[1])), Some(peers[2]));
assert_eq!(select_retry_peer(&peers, Some(peers[2])), Some(peers[0]));
}
#[test]
fn retry_peer_selection_uses_first_peer_without_prior_failure() {
let peers = vec![loopback_addr(12000), loopback_addr(12001)];
assert_eq!(select_retry_peer(&peers, None), Some(peers[0]));
}
#[test]
fn retry_peer_selection_wraps_between_two_peers() {
let peers = vec![loopback_addr(12000), loopback_addr(12001)];
assert_eq!(select_retry_peer(&peers, Some(peers[0])), Some(peers[1]));
assert_eq!(select_retry_peer(&peers, Some(peers[1])), Some(peers[0]));
}
}