use std::{ collections::HashMap, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, }; use lanspread_db::db::GameFileDescription; use tokio::sync::mpsc::UnboundedSender; use tokio_util::sync::CancellationToken; use super::{ planning::{ChunkDownloadResult, DownloadChunk, build_peer_plans, extract_version_descriptor}, progress::{DownloadProgressTracker, sample_download_progress}, retry::{RetryContext, retry_failed_chunks}, storage::{discard_cancelled_download, prepare_game_storage}, transport::download_from_peer, version_ini::{ VersionIniBuffer, begin_version_ini_transaction, commit_version_ini_buffer, rollback_version_ini_transaction, }, }; use crate::{PeerEvent, config::MAX_RETRY_COUNT}; /// Downloads all game files from available peers. #[allow(clippy::too_many_lines)] pub async fn download_game_files( game_id: &str, game_file_descs: Vec, games_folder: PathBuf, peers: Vec, file_peer_map: HashMap>, tx_notify_ui: UnboundedSender, cancel_token: CancellationToken, ) -> eyre::Result<()> { if peers.is_empty() { eyre::bail!("no peers available for game {game_id}"); } if cancel_token.is_cancelled() { eyre::bail!("download cancelled for game {game_id}"); } let (version_desc, transfer_descs) = extract_version_descriptor(game_id, game_file_descs)?; let version_buffer = match VersionIniBuffer::new(&version_desc) { Ok(buffer) => Arc::new(buffer), Err(err) => return Err(err), }; let game_root = games_folder.join(game_id); begin_version_ini_transaction(&game_root).await?; if cancel_token.is_cancelled() { rollback_version_ini_transaction(&game_root).await; discard_cancelled_download_best_effort(&games_folder, game_id).await; eyre::bail!("download cancelled for game {game_id}"); } if let Err(err) = prepare_game_storage(&games_folder, &transfer_descs).await { rollback_version_ini_transaction(&game_root).await; if cancel_token.is_cancelled() { discard_cancelled_download_best_effort(&games_folder, game_id).await; eyre::bail!("download cancelled for game {game_id}"); } return Err(err); } if cancel_token.is_cancelled() { rollback_version_ini_transaction(&game_root).await; discard_cancelled_download_best_effort(&games_folder, game_id).await; eyre::bail!("download cancelled for game {game_id}"); } tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { id: game_id.to_string(), })?; let progress_tracker = DownloadProgressTracker::new(total_download_bytes(&transfer_descs)); let transfer_ctx = TransferContext { game_id, games_folder: &games_folder, peers: &peers, file_peer_map: &file_peer_map, tx_notify_ui: &tx_notify_ui, cancel_token: &cancel_token, version_buffer: version_buffer.clone(), progress_tracker: progress_tracker.clone(), }; let transfer_result = sample_download_progress( game_id, progress_tracker, tx_notify_ui.clone(), download_transfer_chunks(&transfer_ctx, &transfer_descs), ) .await; if let Err(err) = transfer_result { rollback_version_ini_transaction(&game_root).await; if cancel_token.is_cancelled() { discard_cancelled_download_best_effort(&games_folder, game_id).await; } return Err(err); } if cancel_token.is_cancelled() { rollback_version_ini_transaction(&game_root).await; discard_cancelled_download_best_effort(&games_folder, game_id).await; eyre::bail!("download cancelled for game {game_id}"); } if let Err(err) = commit_version_ini_buffer(&game_root, &version_buffer).await { rollback_version_ini_transaction(&game_root).await; return Err(err); } log::info!("all files downloaded for game: {game_id}"); Ok(()) } async fn discard_cancelled_download_best_effort(games_folder: &Path, game_id: &str) { if let Err(err) = discard_cancelled_download(games_folder, game_id).await { log::warn!("Failed to discard cancelled download payload for {game_id}: {err}"); } } struct TransferContext<'a> { game_id: &'a str, games_folder: &'a Path, peers: &'a [SocketAddr], file_peer_map: &'a HashMap>, tx_notify_ui: &'a UnboundedSender, cancel_token: &'a CancellationToken, version_buffer: Arc, progress_tracker: Arc, } async fn download_transfer_chunks( ctx: &TransferContext<'_>, transfer_descs: &[GameFileDescription], ) -> eyre::Result<()> { let plans = build_peer_plans(ctx.peers, transfer_descs, ctx.file_peer_map); let mut tasks = Vec::new(); for (peer_addr, plan) in plans { let base_dir = ctx.games_folder.to_path_buf(); let game_id = ctx.game_id.to_string(); let cancel_token = ctx.cancel_token.clone(); let version_buffer = ctx.version_buffer.clone(); let progress_tracker = ctx.progress_tracker.clone(); tasks.push(tokio::spawn(async move { download_from_peer( peer_addr, &game_id, plan, base_dir, &cancel_token, Some(version_buffer), progress_tracker, ) .await })); } let mut failed_chunks: Vec = Vec::new(); let mut last_err: Option = None; for handle in tasks { if ctx.cancel_token.is_cancelled() { eyre::bail!("download cancelled for game {}", ctx.game_id); } match handle.await { Ok(Ok(results)) => { if ctx.cancel_token.is_cancelled() { eyre::bail!("download cancelled for game {}", ctx.game_id); } collect_chunk_results( ctx.game_id, ctx.tx_notify_ui, results, &mut failed_chunks, &mut last_err, ); } Ok(Err(_)) | Err(_) if ctx.cancel_token.is_cancelled() => { eyre::bail!("download cancelled for game {}", ctx.game_id); } Ok(Err(e)) => last_err = Some(e), Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")), } } if !failed_chunks.is_empty() && !ctx.peers.is_empty() { retry_chunks(ctx, failed_chunks, &mut last_err).await?; } if ctx.cancel_token.is_cancelled() { eyre::bail!("download cancelled for game {}", ctx.game_id); } if let Some(err) = last_err { return Err(err); } Ok(()) } fn collect_chunk_results( game_id: &str, tx_notify_ui: &UnboundedSender, results: Vec, failed_chunks: &mut Vec, last_err: &mut Option, ) { for chunk_result in results { match chunk_result.result { Ok(()) => { let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished { id: game_id.to_string(), peer_addr: chunk_result.peer_addr, relative_path: chunk_result.chunk.relative_path, offset: chunk_result.chunk.offset, length: chunk_result.chunk.length, }); } Err(e) => { log::warn!( "Failed to download chunk from {}: {e}", chunk_result.peer_addr ); if chunk_result.chunk.retry_count < MAX_RETRY_COUNT { let mut retry_chunk = chunk_result.chunk; retry_chunk.retry_count += 1; retry_chunk.last_peer = Some(chunk_result.peer_addr); failed_chunks.push(retry_chunk); } else { *last_err = Some(eyre::eyre!( "Max retries exceeded for chunk: {}", chunk_result.chunk.relative_path )); } } } } } async fn retry_chunks( ctx: &TransferContext<'_>, failed_chunks: Vec, last_err: &mut Option, ) -> eyre::Result<()> { if ctx.cancel_token.is_cancelled() { eyre::bail!("download cancelled for game {}", ctx.game_id); } log::info!("Retrying {} failed chunks", failed_chunks.len()); let retry_ctx = RetryContext { peers: ctx.peers, base_dir: ctx.games_folder, game_id: ctx.game_id, file_peer_map: ctx.file_peer_map, cancel_token: ctx.cancel_token, version_buffer: Some(ctx.version_buffer.clone()), progress_tracker: ctx.progress_tracker.clone(), }; let retry_results = match retry_failed_chunks(failed_chunks, &retry_ctx).await { Ok(results) => results, Err(_) if ctx.cancel_token.is_cancelled() => { eyre::bail!("download cancelled for game {}", ctx.game_id); } Err(err) => { *last_err = Some(err); Vec::new() } }; for chunk_result in retry_results { if ctx.cancel_token.is_cancelled() { eyre::bail!("download cancelled for game {}", ctx.game_id); } match chunk_result.result { Ok(()) => { let _ = ctx .tx_notify_ui .send(PeerEvent::DownloadGameFileChunkFinished { id: ctx.game_id.to_string(), peer_addr: chunk_result.peer_addr, relative_path: chunk_result.chunk.relative_path, offset: chunk_result.chunk.offset, length: chunk_result.chunk.length, }); } Err(e) => { log::error!("Retry failed for chunk: {e}"); *last_err = Some(e); } } } Ok(()) } fn total_download_bytes(file_descs: &[GameFileDescription]) -> u64 { file_descs .iter() .filter(|desc| !desc.is_dir) .fold(0u64, |total, desc| total.saturating_add(desc.file_size())) }