use std::{ collections::{HashMap, VecDeque}, net::SocketAddr, path::Path, sync::Arc, }; use futures::{StreamExt, stream::FuturesUnordered}; use tokio_util::sync::CancellationToken; use super::{ planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan, resolve_file_peers}, progress::DownloadProgressTracker, transport::download_from_peer, version_ini::VersionIniBuffer, }; use crate::config::MAX_RETRY_COUNT; /// Selects a peer for retrying a failed chunk. fn select_retry_peer(peers: &[SocketAddr], last_peer: Option) -> Option { if peers.is_empty() { return None; } if peers.len() > 1 && let Some(last) = last_peer && let Some(pos) = peers.iter().position(|addr| *addr == last) { let next_index = (pos + 1) % peers.len(); return Some(peers[next_index]); } peers.first().copied() } /// Returns a fallback peer address for error reporting. fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option) -> SocketAddr { last_peer .or_else(|| peers.first().copied()) .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0))) } fn ensure_not_cancelled(cancel_token: &CancellationToken, game_id: &str) -> eyre::Result<()> { if cancel_token.is_cancelled() { eyre::bail!("download cancelled for game {game_id}"); } Ok(()) } struct RetryAttempt { peer_addr: SocketAddr, chunks: Vec, result: eyre::Result>, } pub(super) struct RetryContext<'a> { pub(super) peers: &'a [SocketAddr], pub(super) base_dir: &'a Path, pub(super) game_id: &'a str, pub(super) file_peer_map: &'a HashMap>, pub(super) cancel_token: &'a CancellationToken, pub(super) version_buffer: Option>, pub(super) progress_tracker: Arc, } fn plan_retry_batch( queue: &mut VecDeque, peers: &[SocketAddr], file_peer_map: &HashMap>, final_results: &mut Vec, ) -> HashMap { let mut retry_plans: HashMap = HashMap::new(); while let Some(mut chunk) = queue.pop_front() { let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers); if chunk.retry_count >= MAX_RETRY_COUNT { final_results.push(ChunkDownloadResult { chunk: chunk.clone(), result: Err(eyre::eyre!( "Retry budget exhausted for chunk: {}", chunk.relative_path )), peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), }); continue; } let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer) else { final_results.push(ChunkDownloadResult { chunk: chunk.clone(), result: Err(eyre::eyre!( "No peers available to retry chunk: {}", chunk.relative_path )), peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), }); continue; }; chunk.last_peer = Some(peer_addr); retry_plans.entry(peer_addr).or_default().chunks.push(chunk); } retry_plans } async fn run_retry_batch( retry_plans: HashMap, ctx: &RetryContext<'_>, ) -> eyre::Result> { let mut attempts = FuturesUnordered::new(); for (peer_addr, plan) in retry_plans { let retry_chunks = plan.chunks.clone(); let base_dir = ctx.base_dir.to_path_buf(); let game_id = ctx.game_id.to_string(); let cancel_token = ctx.cancel_token.clone(); let version_buffer = ctx.version_buffer.clone(); let progress_tracker = ctx.progress_tracker.clone(); attempts.push(async move { let result = download_from_peer( peer_addr, &game_id, plan, base_dir, &cancel_token, version_buffer, progress_tracker, ) .await; RetryAttempt { peer_addr, chunks: retry_chunks, result, } }); } let mut results = Vec::new(); while !attempts.is_empty() { let result = tokio::select! { () = ctx.cancel_token.cancelled() => { eyre::bail!("download cancelled for game {}", ctx.game_id); } result = attempts.next() => result.expect("retry attempt should exist"), }; results.push(result); } Ok(results) } fn handle_retry_chunk_result( result: ChunkDownloadResult, queue: &mut VecDeque, final_results: &mut Vec, ) { let ChunkDownloadResult { mut chunk, result, peer_addr, } = result; match result { Ok(()) => final_results.push(ChunkDownloadResult { chunk, result: Ok(()), peer_addr, }), Err(err) => { chunk.retry_count += 1; chunk.last_peer = Some(peer_addr); if chunk.retry_count >= MAX_RETRY_COUNT { let context = format!("Retry budget exhausted for chunk: {}", chunk.relative_path); final_results.push(ChunkDownloadResult { chunk, result: Err(err.wrap_err(context)), peer_addr, }); } else { queue.push_back(chunk); } } } } fn handle_retry_attempt_error( peer_addr: SocketAddr, chunks: Vec, err: &eyre::Report, queue: &mut VecDeque, final_results: &mut Vec, ) { let error = err.to_string(); for mut chunk in chunks { chunk.retry_count += 1; chunk.last_peer = Some(peer_addr); if chunk.retry_count >= MAX_RETRY_COUNT { final_results.push(ChunkDownloadResult { chunk: chunk.clone(), result: Err(eyre::eyre!( "Retry budget exhausted for chunk after connection failure: {}: {error}", chunk.relative_path )), peer_addr, }); } else { queue.push_back(chunk); } } } /// Retries downloading failed chunks. pub(super) async fn retry_failed_chunks( failed_chunks: Vec, ctx: &RetryContext<'_>, ) -> eyre::Result> { let mut final_results = Vec::new(); let mut queue: VecDeque = failed_chunks.into_iter().collect(); while !queue.is_empty() { ensure_not_cancelled(ctx.cancel_token, ctx.game_id)?; let retry_plans = plan_retry_batch(&mut queue, ctx.peers, ctx.file_peer_map, &mut final_results); if retry_plans.is_empty() { continue; } let attempts = run_retry_batch(retry_plans, ctx).await?; for attempt in attempts { let RetryAttempt { peer_addr, chunks, result, } = attempt; match result { Ok(results) => { for result in results { handle_retry_chunk_result(result, &mut queue, &mut final_results); } } Err(err) => { handle_retry_attempt_error( peer_addr, chunks, &err, &mut queue, &mut final_results, ); } } } } Ok(final_results) } #[cfg(test)] mod tests { use super::*; fn loopback_addr(port: u16) -> SocketAddr { SocketAddr::from(([127, 0, 0, 1], port)) } #[test] fn retry_peer_selection_cycles_after_last_failed_peer() { let peers = vec![ loopback_addr(12000), loopback_addr(12001), loopback_addr(12002), ]; assert_eq!(select_retry_peer(&peers, Some(peers[0])), Some(peers[1])); assert_eq!(select_retry_peer(&peers, Some(peers[1])), Some(peers[2])); assert_eq!(select_retry_peer(&peers, Some(peers[2])), Some(peers[0])); } #[test] fn retry_peer_selection_uses_first_peer_without_prior_failure() { let peers = vec![loopback_addr(12000), loopback_addr(12001)]; assert_eq!(select_retry_peer(&peers, None), Some(peers[0])); } #[test] fn retry_peer_selection_wraps_between_two_peers() { let peers = vec![loopback_addr(12000), loopback_addr(12001)]; assert_eq!(select_retry_peer(&peers, Some(peers[0])), Some(peers[1])); assert_eq!(select_retry_peer(&peers, Some(peers[1])), Some(peers[0])); } }