diff --git a/crates/lanspread-peer/src/download.rs b/crates/lanspread-peer/src/download.rs deleted file mode 100644 index 830acb9..0000000 --- a/crates/lanspread-peer/src/download.rs +++ /dev/null @@ -1,1191 +0,0 @@ -//! Download pipeline for game files from peers. - -use std::{ - collections::{HashMap, VecDeque}, - net::SocketAddr, - path::{Path, PathBuf}, - sync::Arc, -}; - -use lanspread_db::db::GameFileDescription; -use tokio::{ - fs::OpenOptions, - io::{AsyncSeekExt, AsyncWriteExt}, - sync::{Mutex, mpsc::UnboundedSender}, -}; -use tokio_util::{ - codec::{FramedWrite, LengthDelimitedCodec}, - sync::CancellationToken, -}; - -use crate::{ - PeerEvent, - config::{CHUNK_SIZE, MAX_RETRY_COUNT}, - network::connect_to_peer, - path_validation::validate_game_file_path, -}; - -// ============================================================================= -// Download data structures -// ============================================================================= - -/// Represents a chunk of a file to be downloaded. -#[derive(Debug, Clone)] -pub struct DownloadChunk { - pub relative_path: String, - pub offset: u64, - pub length: u64, - pub retry_count: usize, - pub last_peer: Option, -} - -/// Download plan for a single peer. -#[derive(Debug, Default)] -pub struct PeerDownloadPlan { - pub chunks: Vec, - pub whole_files: Vec, -} - -/// Result of downloading a chunk. -#[derive(Debug)] -pub struct ChunkDownloadResult { - pub chunk: DownloadChunk, - pub result: eyre::Result<()>, - pub peer_addr: SocketAddr, -} - -#[derive(Debug)] -pub struct VersionIniBuffer { - relative_path: String, - bytes: Mutex>, -} - -impl VersionIniBuffer { - fn new(desc: &GameFileDescription) -> eyre::Result { - if desc.is_dir { - eyre::bail!("version.ini sentinel cannot be a directory"); - } - let size = usize::try_from(desc.size)?; - Ok(Self { - relative_path: desc.relative_path.clone(), - bytes: Mutex::new(vec![0; size]), - }) - } - - fn matches(&self, relative_path: &str) -> bool { - self.relative_path == relative_path - } - - async fn write_at(&self, offset: u64, bytes: &[u8]) -> eyre::Result<()> { - let offset = usize::try_from(offset)?; - let mut buffer = self.bytes.lock().await; - let end = offset - .checked_add(bytes.len()) - .ok_or_else(|| eyre::eyre!("version.ini chunk offset overflow"))?; - if end > buffer.len() { - eyre::bail!( - "version.ini chunk exceeds buffer: end {end}, buffer {}", - buffer.len() - ); - } - buffer[offset..end].copy_from_slice(bytes); - Ok(()) - } - - async fn snapshot(&self) -> Vec { - self.bytes.lock().await.clone() - } -} - -fn ensure_download_not_cancelled( - cancel_token: &CancellationToken, - game_id: &str, -) -> eyre::Result<()> { - if cancel_token.is_cancelled() { - eyre::bail!("download cancelled for game {game_id}"); - } - Ok(()) -} - -/// Extracts the root `version.ini` descriptor while keeping every descriptor in -/// the transfer list. The chunk writer diverts the sentinel bytes into memory. -fn extract_version_descriptor( - game_id: &str, - game_file_descs: Vec, - tx_notify_ui: &UnboundedSender, -) -> eyre::Result<(GameFileDescription, Vec)> { - let mut version_descs = Vec::new(); - let mut transfer_descs = Vec::new(); - - for desc in game_file_descs { - if desc.is_version_ini() { - version_descs.push(desc.clone()); - } - transfer_descs.push(desc); - } - - if version_descs.len() != 1 { - let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { - id: game_id.to_string(), - }); - eyre::bail!( - "expected exactly one root-level version.ini sentinel for {game_id}, found {}", - version_descs.len() - ); - } - - let version_desc = version_descs.remove(0); - Ok((version_desc, transfer_descs)) -} - -// ============================================================================= -// Storage preparation -// ============================================================================= - -/// Prepares storage for game files by creating directories and pre-allocating files. -pub async fn prepare_game_storage( - games_folder: &Path, - file_descs: &[GameFileDescription], -) -> eyre::Result<()> { - for desc in file_descs { - if desc.is_version_ini() { - continue; - } - - // Validate the path to prevent directory traversal - let validated_path = validate_game_file_path(games_folder, &desc.relative_path)?; - - if desc.is_dir { - tokio::fs::create_dir_all(&validated_path).await?; - } else { - if let Some(parent) = validated_path.parent() { - tokio::fs::create_dir_all(parent).await?; - } - - // Create and pre-allocate the file with the expected size - let file = OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(&validated_path) - .await?; - - // Pre-allocate the file with the expected size - let size = desc.size; - if let Err(e) = file.set_len(size).await { - log::warn!( - "Failed to pre-allocate file {} (size: {}): {}", - desc.relative_path, - size, - e - ); - // Continue without pre-allocation - the file will grow as chunks are written - } else { - log::debug!( - "Pre-allocated file {} with {} bytes", - desc.relative_path, - size - ); - } - } - } - Ok(()) -} - -async fn begin_version_ini_transaction(game_root: &Path) -> eyre::Result<()> { - tokio::fs::create_dir_all(game_root).await?; - remove_file_if_exists(&game_root.join(".version.ini.tmp")).await?; - remove_file_if_exists(&game_root.join(".version.ini.discarded")).await?; - - let version_path = game_root.join("version.ini"); - if tokio::fs::metadata(&version_path) - .await - .is_ok_and(|metadata| metadata.is_file()) - { - tokio::fs::rename(version_path, game_root.join(".version.ini.discarded")).await?; - } - Ok(()) -} - -async fn rollback_version_ini_transaction(game_root: &Path) { - if let Err(err) = remove_file_if_exists(&game_root.join(".version.ini.tmp")).await { - log::warn!( - "Failed to sweep partial version.ini tmp in {}: {err}", - game_root.display() - ); - } - if let Err(err) = remove_file_if_exists(&game_root.join(".version.ini.discarded")).await { - log::warn!( - "Failed to sweep discarded version.ini in {}: {err}", - game_root.display() - ); - } -} - -async fn commit_version_ini_buffer( - game_root: &Path, - buffer: &VersionIniBuffer, -) -> eyre::Result<()> { - let tmp_path = game_root.join(".version.ini.tmp"); - let version_path = game_root.join("version.ini"); - let bytes = buffer.snapshot().await; - - let mut file = tokio::fs::File::create(&tmp_path).await?; - file.write_all(&bytes).await?; - file.sync_all().await?; - drop(file); - - tokio::fs::rename(&tmp_path, &version_path).await?; - sync_parent_dir(&version_path)?; - remove_file_if_exists(&game_root.join(".version.ini.discarded")).await?; - Ok(()) -} - -#[cfg(unix)] -fn sync_parent_dir(path: &Path) -> std::io::Result<()> { - if let Some(parent) = path.parent() { - std::fs::File::open(parent)?.sync_all()?; - } - Ok(()) -} - -#[cfg(not(unix))] -fn sync_parent_dir(_path: &Path) -> std::io::Result<()> { - Ok(()) -} - -async fn remove_file_if_exists(path: &Path) -> eyre::Result<()> { - match tokio::fs::remove_file(path).await { - Ok(()) => Ok(()), - Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(err) => Err(err.into()), - } -} - -// ============================================================================= -// Peer plan building -// ============================================================================= - -/// Resolves which peers have a specific file. -pub fn resolve_file_peers<'a>( - relative_path: &str, - file_peer_map: &'a HashMap>, - fallback: &'a [SocketAddr], -) -> &'a [SocketAddr] { - if let Some(peers) = file_peer_map.get(relative_path) - && !peers.is_empty() - { - return peers; - } - - fallback -} - -/// Builds download plans distributing files across peers. -pub fn build_peer_plans( - peers: &[SocketAddr], - file_descs: &[GameFileDescription], - file_peer_map: &HashMap>, -) -> HashMap { - let mut plans: HashMap = HashMap::new(); - if peers.is_empty() { - return plans; - } - - let mut peer_index = 0usize; - - for desc in file_descs.iter().filter(|d| !d.is_dir) { - let size = desc.file_size(); - let eligible_peers = resolve_file_peers(&desc.relative_path, file_peer_map, peers); - if eligible_peers.is_empty() { - continue; - } - - if size == 0 { - let peer = eligible_peers[peer_index % eligible_peers.len()]; - peer_index += 1; - plans.entry(peer).or_default().chunks.push(DownloadChunk { - relative_path: desc.relative_path.clone(), - offset: 0, - length: 0, - retry_count: 0, - last_peer: Some(peer), - }); - continue; - } - - let mut offset = 0u64; - while offset < size { - let length = std::cmp::min(CHUNK_SIZE, size - offset); - let peer = eligible_peers[peer_index % eligible_peers.len()]; - peer_index += 1; - plans.entry(peer).or_default().chunks.push(DownloadChunk { - relative_path: desc.relative_path.clone(), - offset, - length, - retry_count: 0, - last_peer: Some(peer), - }); - offset += length; - } - } - - plans -} - -// ============================================================================= -// Chunk downloading -// ============================================================================= - -/// Downloads a single chunk from a peer. -pub async fn download_chunk( - conn: &mut s2n_quic::Connection, - base_dir: &Path, - game_id: &str, - chunk: &DownloadChunk, - version_buffer: Option>, -) -> eyre::Result<()> { - use futures::SinkExt; - use lanspread_proto::{Message, Request}; - - let stream = conn.open_bidirectional_stream().await?; - let (mut rx, tx) = stream.split(); - let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); - - let request = Request::GetGameFileChunk { - game_id: game_id.to_string(), - relative_path: chunk.relative_path.clone(), - offset: chunk.offset, - length: chunk.length, - }; - framed_tx.send(request.encode()).await?; - - framed_tx.close().await?; - - if let Some(buffer) = version_buffer - && buffer.matches(&chunk.relative_path) - { - return download_version_ini_chunk(&mut rx, chunk, &buffer).await; - } - - // Validate the path to prevent directory traversal - let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?; - let mut file = OpenOptions::new() - .create(true) - .write(true) - .truncate(false) - .open(&validated_path) - .await?; - if chunk.length == 0 && chunk.offset == 0 { - // fallback-to-whole-file path replaces any existing partial data - file.set_len(0).await?; - } - file.seek(std::io::SeekFrom::Start(chunk.offset)).await?; - - let mut remaining = chunk.length; - let mut received_bytes = 0u64; - - while let Some(bytes) = rx.receive().await? { - file.write_all(&bytes).await?; - received_bytes += bytes.len() as u64; - - if remaining == 0 { - continue; - } - remaining = remaining.saturating_sub(bytes.len() as u64); - if remaining == 0 { - break; - } - } - - // Verify we received the expected amount of data - if chunk.length > 0 && received_bytes != chunk.length { - eyre::bail!( - "Incomplete chunk download: expected {} bytes, received {} bytes for file {} at offset {}", - chunk.length, - received_bytes, - chunk.relative_path, - chunk.offset - ); - } - - file.flush().await?; - - // Verify file integrity by checking the file size - verify_chunk_integrity(&validated_path, chunk.offset, chunk.length).await?; - - Ok(()) -} - -async fn download_version_ini_chunk( - rx: &mut s2n_quic::stream::ReceiveStream, - chunk: &DownloadChunk, - buffer: &VersionIniBuffer, -) -> eyre::Result<()> { - let mut received = Vec::new(); - while let Some(bytes) = rx.receive().await? { - received.extend_from_slice(&bytes); - } - - if chunk.length > 0 && u64::try_from(received.len())? != chunk.length { - eyre::bail!( - "Incomplete version.ini chunk download: expected {} bytes, received {} bytes at offset {}", - chunk.length, - received.len(), - chunk.offset - ); - } - - buffer.write_at(chunk.offset, &received).await -} - -/// Verifies that a chunk was written correctly. -async fn verify_chunk_integrity( - file_path: &Path, - offset: u64, - expected_length: u64, -) -> eyre::Result<()> { - if expected_length == 0 { - return Ok(()); // Skip verification for whole files or zero-length chunks - } - - let metadata = tokio::fs::metadata(file_path).await?; - let file_size = metadata.len(); - - if file_size < offset + expected_length { - eyre::bail!( - "File integrity check failed: file size {} is less than expected {} (offset: {})", - file_size, - offset + expected_length, - offset - ); - } - - Ok(()) -} - -/// Downloads a whole file from a peer. -pub async fn download_whole_file( - conn: &mut s2n_quic::Connection, - base_dir: &Path, - desc: &GameFileDescription, -) -> eyre::Result<()> { - use futures::SinkExt; - use lanspread_proto::{Message, Request}; - - let stream = conn.open_bidirectional_stream().await?; - let (mut rx, tx) = stream.split(); - let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); - - framed_tx - .send(Request::GetGameFileData(desc.clone()).encode()) - .await?; - framed_tx.close().await?; - - // Validate the path to prevent directory traversal - let validated_path = validate_game_file_path(base_dir, &desc.relative_path)?; - let mut file = OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(&validated_path) - .await?; - file.seek(std::io::SeekFrom::Start(0)).await?; - - while let Some(bytes) = rx.receive().await? { - file.write_all(&bytes).await?; - } - - file.flush().await?; - Ok(()) -} - -/// Downloads all assigned chunks and files from a single peer. -pub async fn download_from_peer( - peer_addr: SocketAddr, - game_id: &str, - plan: PeerDownloadPlan, - games_folder: PathBuf, - cancel_token: &CancellationToken, - version_buffer: Option>, -) -> eyre::Result> { - if plan.chunks.is_empty() && plan.whole_files.is_empty() { - return Ok(Vec::new()); - } - - ensure_download_not_cancelled(cancel_token, game_id)?; - - let mut conn = connect_to_peer(peer_addr).await?; - conn.keep_alive(true)?; - conn.keep_alive(true)?; - - let base_dir = games_folder; - let mut results = Vec::new(); - - // Download chunks with error handling - for chunk in &plan.chunks { - ensure_download_not_cancelled(cancel_token, game_id)?; - - log::info!( - "Downloading chunk {} (offset {}, length {}) from {}", - chunk.relative_path, - chunk.offset, - chunk.length, - peer_addr - ); - let result = - download_chunk(&mut conn, &base_dir, game_id, chunk, version_buffer.clone()).await; - results.push(ChunkDownloadResult { - chunk: chunk.clone(), - result, - peer_addr, - }); - } - - // Download whole files - for desc in &plan.whole_files { - ensure_download_not_cancelled(cancel_token, game_id)?; - - let chunk = DownloadChunk { - relative_path: desc.relative_path.clone(), - offset: 0, - length: 0, // Indicates whole file - retry_count: 0, - last_peer: Some(peer_addr), - }; - - let result = download_whole_file(&mut conn, &base_dir, desc).await; - results.push(ChunkDownloadResult { - chunk, - result, - peer_addr, - }); - } - - Ok(results) -} - -// ============================================================================= -// Retry logic -// ============================================================================= - -/// Selects a peer for retrying a failed chunk. -fn select_retry_peer( - peers: &[SocketAddr], - last_peer: Option, - attempt_offset: usize, -) -> Option { - if peers.is_empty() { - return None; - } - - if peers.len() > 1 - && let Some(last) = last_peer - && let Some(pos) = peers.iter().position(|addr| *addr == last) - { - let next_index = (pos + 1 + attempt_offset) % peers.len(); - return Some(peers[next_index]); - } - - Some(peers[attempt_offset % peers.len()]) -} - -/// Returns a fallback peer address for error reporting. -fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option) -> SocketAddr { - last_peer - .or_else(|| peers.first().copied()) - .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0))) -} - -/// Retries downloading failed chunks. -pub async fn retry_failed_chunks( - failed_chunks: Vec, - peers: &[SocketAddr], - base_dir: &Path, - game_id: &str, - file_peer_map: &HashMap>, - cancel_token: &CancellationToken, - version_buffer: Option>, -) -> eyre::Result> { - let mut exhausted = Vec::new(); - let mut queue: VecDeque = failed_chunks.into_iter().collect(); - - while let Some(mut chunk) = queue.pop_front() { - if cancel_token.is_cancelled() { - return Ok(exhausted); - } - - let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers); - - if chunk.retry_count >= MAX_RETRY_COUNT { - exhausted.push(ChunkDownloadResult { - chunk: chunk.clone(), - result: Err(eyre::eyre!( - "Retry budget exhausted for chunk: {}", - chunk.relative_path - )), - peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), - }); - continue; - } - - let retry_offset = chunk.retry_count.saturating_sub(1); - let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer, retry_offset) - else { - exhausted.push(ChunkDownloadResult { - chunk: chunk.clone(), - result: Err(eyre::eyre!( - "No peers available to retry chunk: {}", - chunk.relative_path - )), - peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), - }); - continue; - }; - - let mut attempt_chunk = chunk.clone(); - attempt_chunk.last_peer = Some(peer_addr); - - let plan = PeerDownloadPlan { - chunks: vec![attempt_chunk.clone()], - whole_files: Vec::new(), - }; - - match download_from_peer( - peer_addr, - game_id, - plan, - base_dir.to_path_buf(), - cancel_token, - version_buffer.clone(), - ) - .await - { - Ok(results) => { - if cancel_token.is_cancelled() { - return Ok(exhausted); - } - - for result in results { - match result.result { - Ok(()) => {} - Err(e) => { - let mut retry_chunk = result.chunk.clone(); - retry_chunk.retry_count = chunk.retry_count + 1; - retry_chunk.last_peer = Some(result.peer_addr); - - if retry_chunk.retry_count >= MAX_RETRY_COUNT { - let context = format!( - "Retry budget exhausted for chunk: {}", - result.chunk.relative_path - ); - exhausted.push(ChunkDownloadResult { - chunk: retry_chunk, - result: Err(e.wrap_err(context)), - peer_addr: result.peer_addr, - }); - } else { - queue.push_back(retry_chunk); - } - } - } - } - } - Err(e) => { - if cancel_token.is_cancelled() { - return Ok(exhausted); - } - - chunk.retry_count += 1; - chunk.last_peer = Some(peer_addr); - - if chunk.retry_count >= MAX_RETRY_COUNT { - exhausted.push(ChunkDownloadResult { - chunk: chunk.clone(), - result: Err(e.wrap_err(format!( - "Retry budget exhausted for chunk after connection failure: {}", - chunk.relative_path - ))), - peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), - }); - } else { - queue.push_back(chunk); - } - } - } - } - - Ok(exhausted) -} - -// ============================================================================= -// Main download orchestration -// ============================================================================= - -/// Downloads all game files from available peers. -#[allow(clippy::too_many_lines)] -pub async fn download_game_files( - game_id: &str, - game_file_descs: Vec, - games_folder: PathBuf, - peers: Vec, - file_peer_map: HashMap>, - tx_notify_ui: UnboundedSender, - cancel_token: CancellationToken, -) -> eyre::Result<()> { - if peers.is_empty() { - eyre::bail!("no peers available for game {game_id}"); - } - - if cancel_token.is_cancelled() { - eyre::bail!("download cancelled for game {game_id}"); - } - - let (version_desc, transfer_descs) = - extract_version_descriptor(game_id, game_file_descs, &tx_notify_ui)?; - let version_buffer = match VersionIniBuffer::new(&version_desc) { - Ok(buffer) => Arc::new(buffer), - Err(err) => { - tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { - id: game_id.to_string(), - })?; - return Err(err); - } - }; - let game_root = games_folder.join(game_id); - - if let Err(err) = begin_version_ini_transaction(&game_root).await { - tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { - id: game_id.to_string(), - })?; - return Err(err); - } - if let Err(err) = prepare_game_storage(&games_folder, &transfer_descs).await { - rollback_version_ini_transaction(&game_root).await; - tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { - id: game_id.to_string(), - })?; - return Err(err); - } - if cancel_token.is_cancelled() { - rollback_version_ini_transaction(&game_root).await; - eyre::bail!("download cancelled for game {game_id}"); - } - - tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { - id: game_id.to_string(), - })?; - - let plans = build_peer_plans(&peers, &transfer_descs, &file_peer_map); - - let mut tasks = Vec::new(); - for (peer_addr, plan) in plans { - let base_dir = games_folder.clone(); - let game_id = game_id.to_string(); - let cancel_token = cancel_token.clone(); - let version_buffer = version_buffer.clone(); - tasks.push(tokio::spawn(async move { - download_from_peer( - peer_addr, - &game_id, - plan, - base_dir, - &cancel_token, - Some(version_buffer), - ) - .await - })); - } - - let mut failed_chunks: Vec = Vec::new(); - let mut last_err: Option = None; - - for handle in tasks { - if cancel_token.is_cancelled() { - rollback_version_ini_transaction(&game_root).await; - eyre::bail!("download cancelled for game {game_id}"); - } - - match handle.await { - Ok(Ok(results)) => { - if cancel_token.is_cancelled() { - rollback_version_ini_transaction(&game_root).await; - eyre::bail!("download cancelled for game {game_id}"); - } - - for chunk_result in results { - if let Err(e) = chunk_result.result { - log::warn!( - "Failed to download chunk from {}: {e}", - chunk_result.peer_addr - ); - if chunk_result.chunk.retry_count < MAX_RETRY_COUNT { - let mut retry_chunk = chunk_result.chunk; - retry_chunk.retry_count += 1; - retry_chunk.last_peer = Some(chunk_result.peer_addr); - failed_chunks.push(retry_chunk); - } else { - last_err = Some(eyre::eyre!( - "Max retries exceeded for chunk: {}", - chunk_result.chunk.relative_path - )); - } - } - } - } - Ok(Err(_)) | Err(_) if cancel_token.is_cancelled() => { - rollback_version_ini_transaction(&game_root).await; - eyre::bail!("download cancelled for game {game_id}"); - } - Ok(Err(e)) => last_err = Some(e), - Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")), - } - } - - // Retry failed chunks if any - if !failed_chunks.is_empty() && !peers.is_empty() { - if cancel_token.is_cancelled() { - rollback_version_ini_transaction(&game_root).await; - eyre::bail!("download cancelled for game {game_id}"); - } - - log::info!("Retrying {} failed chunks", failed_chunks.len()); - - let retry_results = match retry_failed_chunks( - failed_chunks, - &peers, - &games_folder, - game_id, - &file_peer_map, - &cancel_token, - Some(version_buffer.clone()), - ) - .await - { - Ok(results) => results, - Err(_) if cancel_token.is_cancelled() => { - rollback_version_ini_transaction(&game_root).await; - eyre::bail!("download cancelled for game {game_id}"); - } - Err(err) => { - last_err = Some(err); - Vec::new() - } - }; - - for chunk_result in retry_results { - if cancel_token.is_cancelled() { - rollback_version_ini_transaction(&game_root).await; - eyre::bail!("download cancelled for game {game_id}"); - } - - if let Err(e) = chunk_result.result { - log::error!("Retry failed for chunk: {e}"); - last_err = Some(e); - } - } - } - - if cancel_token.is_cancelled() { - rollback_version_ini_transaction(&game_root).await; - eyre::bail!("download cancelled for game {game_id}"); - } - - if let Some(err) = last_err { - rollback_version_ini_transaction(&game_root).await; - tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { - id: game_id.to_string(), - })?; - return Err(err); - } - - if let Err(err) = commit_version_ini_buffer(&game_root, &version_buffer).await { - rollback_version_ini_transaction(&game_root).await; - tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { - id: game_id.to_string(), - })?; - return Err(err); - } - log::info!("all files downloaded for game: {game_id}"); - tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished { - id: game_id.to_string(), - })?; - Ok(()) -} - -// ============================================================================= -// Tests -// ============================================================================= - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_support::TempDir; - - fn loopback_addr(port: u16) -> SocketAddr { - SocketAddr::from(([127, 0, 0, 1], port)) - } - - #[test] - fn build_peer_plans_handles_partial_final_chunk() { - let peers = vec![loopback_addr(12000), loopback_addr(12001)]; - let file_size = CHUNK_SIZE * 2 + CHUNK_SIZE / 4; - let mut file_peer_map = HashMap::new(); - file_peer_map.insert("game/file.dat".to_string(), peers.clone()); - let file_descs = vec![GameFileDescription { - game_id: "test".to_string(), - relative_path: "game/file.dat".to_string(), - is_dir: false, - size: file_size, - }]; - - let plans = build_peer_plans(&peers, &file_descs, &file_peer_map); - let mut chunks: Vec<_> = plans.values().flat_map(|plan| plan.chunks.iter()).collect(); - - assert_eq!(chunks.len(), 3, "expected three chunks for 2.25 blocks"); - - chunks.sort_by_key(|chunk| chunk.offset); - let last_chunk = chunks.last().expect("last chunk exists"); - - assert_eq!(last_chunk.offset, CHUNK_SIZE * 2); - assert_eq!(last_chunk.length, file_size - last_chunk.offset); - assert_eq!(last_chunk.length, CHUNK_SIZE / 4); - assert_eq!( - last_chunk.offset + last_chunk.length, - file_size, - "last chunk should finish the file" - ); - } - - #[test] - fn build_peer_plans_respects_file_peer_map() { - let shared_a = loopback_addr(12010); - let shared_b = loopback_addr(12011); - let exclusive = loopback_addr(12012); - let peers = vec![shared_a, shared_b, exclusive]; - - let mut file_peer_map = HashMap::new(); - file_peer_map.insert("shared.bin".to_string(), vec![shared_a, shared_b]); - file_peer_map.insert("exclusive.bin".to_string(), vec![exclusive]); - - let file_descs = vec![ - GameFileDescription { - game_id: "test".to_string(), - relative_path: "shared.bin".to_string(), - is_dir: false, - size: CHUNK_SIZE * 2, - }, - GameFileDescription { - game_id: "test".to_string(), - relative_path: "exclusive.bin".to_string(), - is_dir: false, - size: CHUNK_SIZE, - }, - ]; - - let plans = build_peer_plans(&peers, &file_descs, &file_peer_map); - let exclusive_plan = plans - .get(&exclusive) - .expect("exclusive peer should have a plan"); - assert!( - exclusive_plan - .chunks - .iter() - .all(|chunk| chunk.relative_path == "exclusive.bin"), - "exclusive peer should only receive exclusive.bin chunks" - ); - - for (peer, plan) in plans { - for chunk in plan.chunks { - match chunk.relative_path.as_str() { - "exclusive.bin" => assert_eq!( - peer, exclusive, - "exclusive.bin chunks should only be assigned to the exclusive peer" - ), - "shared.bin" => assert!( - peer == shared_a || peer == shared_b, - "shared.bin chunks must stay within shared peers" - ), - other => panic!("unexpected file in plan: {other}"), - } - } - } - } - - #[tokio::test] - async fn prepare_game_storage_skips_version_ini_sentinel() { - let temp = TempDir::new("lanspread-download"); - let descs = vec![GameFileDescription { - game_id: "game".to_string(), - relative_path: "game/version.ini".to_string(), - is_dir: false, - size: 8, - }]; - - prepare_game_storage(temp.path(), &descs) - .await - .expect("storage preparation should succeed"); - - assert!(!temp.path().join("game").join("version.ini").exists()); - } - - #[tokio::test] - async fn version_ini_buffer_accepts_out_of_order_chunks() { - let desc = GameFileDescription { - game_id: "game".to_string(), - relative_path: "game/version.ini".to_string(), - is_dir: false, - size: 8, - }; - let buffer = VersionIniBuffer::new(&desc).expect("buffer should be created"); - - buffer - .write_at(4, b"0101") - .await - .expect("second chunk should write"); - buffer - .write_at(0, b"2025") - .await - .expect("first chunk should write"); - - assert_eq!(buffer.snapshot().await, b"20250101"); - } - - #[tokio::test] - async fn commit_version_ini_writes_sentinel_last_and_sweeps_discarded() { - let temp = TempDir::new("lanspread-download"); - let game_root = temp.path().join("game"); - tokio::fs::create_dir_all(&game_root) - .await - .expect("game root should be created"); - tokio::fs::write(game_root.join(".version.ini.discarded"), b"old") - .await - .expect("discarded sentinel should be written"); - - let desc = GameFileDescription { - game_id: "game".to_string(), - relative_path: "game/version.ini".to_string(), - is_dir: false, - size: 8, - }; - let buffer = VersionIniBuffer::new(&desc).expect("buffer should be created"); - buffer - .write_at(0, b"20250101") - .await - .expect("version should be buffered"); - - commit_version_ini_buffer(&game_root, &buffer) - .await - .expect("version sentinel should commit"); - - assert_eq!( - std::fs::read(game_root.join("version.ini")).expect("version.ini should exist"), - b"20250101" - ); - assert!(!game_root.join(".version.ini.tmp").exists()); - assert!(!game_root.join(".version.ini.discarded").exists()); - } - - #[tokio::test] - async fn begin_version_ini_transaction_parks_existing_sentinel() { - let temp = TempDir::new("lanspread-download"); - let game_root = temp.path().join("game"); - tokio::fs::create_dir_all(&game_root) - .await - .expect("game root should be created"); - tokio::fs::write(game_root.join("version.ini"), b"20240101") - .await - .expect("version sentinel should be written"); - tokio::fs::write(game_root.join(".version.ini.tmp"), b"partial") - .await - .expect("tmp sentinel should be written"); - - begin_version_ini_transaction(&game_root) - .await - .expect("transaction should begin"); - - assert!(!game_root.join("version.ini").exists()); - assert!(!game_root.join(".version.ini.tmp").exists()); - assert_eq!( - std::fs::read(game_root.join(".version.ini.discarded")) - .expect("discarded sentinel should exist"), - b"20240101" - ); - } - - #[tokio::test] - async fn rollback_version_ini_transaction_sweeps_transients() { - let temp = TempDir::new("lanspread-download"); - let game_root = temp.path().join("game"); - tokio::fs::create_dir_all(&game_root) - .await - .expect("game root should be created"); - tokio::fs::write(game_root.join(".version.ini.tmp"), b"partial") - .await - .expect("tmp sentinel should be written"); - tokio::fs::write(game_root.join(".version.ini.discarded"), b"old") - .await - .expect("discarded sentinel should be written"); - - rollback_version_ini_transaction(&game_root).await; - - assert!(!game_root.join(".version.ini.tmp").exists()); - assert!(!game_root.join(".version.ini.discarded").exists()); - } - - #[test] - fn version_descriptor_extraction_keeps_nested_decoy_in_transfer_list() { - let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); - let nested_decoy = vec![ - GameFileDescription { - game_id: "game".to_string(), - relative_path: "game/version.ini".to_string(), - is_dir: false, - size: 8, - }, - GameFileDescription { - game_id: "game".to_string(), - relative_path: "game/local/version.ini".to_string(), - is_dir: false, - size: 8, - }, - ]; - - let (version, transfer) = - extract_version_descriptor("game", nested_decoy, &tx).expect("only one root sentinel"); - assert_eq!(version.relative_path, "game/version.ini"); - assert_eq!(transfer.len(), 2); - } - - #[test] - fn version_descriptor_extraction_requires_a_root_version_ini() { - let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); - let missing = vec![GameFileDescription { - game_id: "game".to_string(), - relative_path: "game/archive.eti".to_string(), - is_dir: false, - size: 1, - }]; - assert!(extract_version_descriptor("game", missing, &tx).is_err()); - } - - #[test] - fn version_descriptor_extraction_rejects_duplicate_root_version_ini() { - let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); - let multiple = vec![ - GameFileDescription { - game_id: "game".to_string(), - relative_path: "game/version.ini".to_string(), - is_dir: false, - size: 8, - }, - GameFileDescription { - game_id: "game".to_string(), - relative_path: "game/version.ini".to_string(), - is_dir: false, - size: 8, - }, - ]; - assert!(extract_version_descriptor("game", multiple, &tx).is_err()); - } -} diff --git a/crates/lanspread-peer/src/download/mod.rs b/crates/lanspread-peer/src/download/mod.rs new file mode 100644 index 0000000..6ba854d --- /dev/null +++ b/crates/lanspread-peer/src/download/mod.rs @@ -0,0 +1,9 @@ +//! Download pipeline for game files from peers. + +mod orchestrator; +mod planning; +mod retry; +mod transport; +mod version_ini; + +pub use orchestrator::download_game_files; diff --git a/crates/lanspread-peer/src/download/orchestrator.rs b/crates/lanspread-peer/src/download/orchestrator.rs new file mode 100644 index 0000000..3e4f0ce --- /dev/null +++ b/crates/lanspread-peer/src/download/orchestrator.rs @@ -0,0 +1,209 @@ +use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc}; + +use lanspread_db::db::GameFileDescription; +use tokio::sync::mpsc::UnboundedSender; +use tokio_util::sync::CancellationToken; + +use super::{ + planning::{DownloadChunk, build_peer_plans, extract_version_descriptor, prepare_game_storage}, + retry::retry_failed_chunks, + transport::download_from_peer, + version_ini::{ + VersionIniBuffer, + begin_version_ini_transaction, + commit_version_ini_buffer, + rollback_version_ini_transaction, + }, +}; +use crate::{PeerEvent, config::MAX_RETRY_COUNT}; + +/// Downloads all game files from available peers. +#[allow(clippy::too_many_lines)] +pub async fn download_game_files( + game_id: &str, + game_file_descs: Vec, + games_folder: PathBuf, + peers: Vec, + file_peer_map: HashMap>, + tx_notify_ui: UnboundedSender, + cancel_token: CancellationToken, +) -> eyre::Result<()> { + if peers.is_empty() { + eyre::bail!("no peers available for game {game_id}"); + } + + if cancel_token.is_cancelled() { + eyre::bail!("download cancelled for game {game_id}"); + } + + let (version_desc, transfer_descs) = + extract_version_descriptor(game_id, game_file_descs, &tx_notify_ui)?; + let version_buffer = match VersionIniBuffer::new(&version_desc) { + Ok(buffer) => Arc::new(buffer), + Err(err) => { + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } + }; + let game_root = games_folder.join(game_id); + + if let Err(err) = begin_version_ini_transaction(&game_root).await { + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } + if let Err(err) = prepare_game_storage(&games_folder, &transfer_descs).await { + rollback_version_ini_transaction(&game_root).await; + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } + if cancel_token.is_cancelled() { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } + + tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { + id: game_id.to_string(), + })?; + + let plans = build_peer_plans(&peers, &transfer_descs, &file_peer_map); + + let mut tasks = Vec::new(); + for (peer_addr, plan) in plans { + let base_dir = games_folder.clone(); + let game_id = game_id.to_string(); + let cancel_token = cancel_token.clone(); + let version_buffer = version_buffer.clone(); + tasks.push(tokio::spawn(async move { + download_from_peer( + peer_addr, + &game_id, + plan, + base_dir, + &cancel_token, + Some(version_buffer), + ) + .await + })); + } + + let mut failed_chunks: Vec = Vec::new(); + let mut last_err: Option = None; + + for handle in tasks { + if cancel_token.is_cancelled() { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } + + match handle.await { + Ok(Ok(results)) => { + if cancel_token.is_cancelled() { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } + + for chunk_result in results { + if let Err(e) = chunk_result.result { + log::warn!( + "Failed to download chunk from {}: {e}", + chunk_result.peer_addr + ); + if chunk_result.chunk.retry_count < MAX_RETRY_COUNT { + let mut retry_chunk = chunk_result.chunk; + retry_chunk.retry_count += 1; + retry_chunk.last_peer = Some(chunk_result.peer_addr); + failed_chunks.push(retry_chunk); + } else { + last_err = Some(eyre::eyre!( + "Max retries exceeded for chunk: {}", + chunk_result.chunk.relative_path + )); + } + } + } + } + Ok(Err(_)) | Err(_) if cancel_token.is_cancelled() => { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } + Ok(Err(e)) => last_err = Some(e), + Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")), + } + } + + // Retry failed chunks if any + if !failed_chunks.is_empty() && !peers.is_empty() { + if cancel_token.is_cancelled() { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } + + log::info!("Retrying {} failed chunks", failed_chunks.len()); + + let retry_results = match retry_failed_chunks( + failed_chunks, + &peers, + &games_folder, + game_id, + &file_peer_map, + &cancel_token, + Some(version_buffer.clone()), + ) + .await + { + Ok(results) => results, + Err(_) if cancel_token.is_cancelled() => { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } + Err(err) => { + last_err = Some(err); + Vec::new() + } + }; + + for chunk_result in retry_results { + if cancel_token.is_cancelled() { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } + + if let Err(e) = chunk_result.result { + log::error!("Retry failed for chunk: {e}"); + last_err = Some(e); + } + } + } + + if cancel_token.is_cancelled() { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } + + if let Some(err) = last_err { + rollback_version_ini_transaction(&game_root).await; + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } + + if let Err(err) = commit_version_ini_buffer(&game_root, &version_buffer).await { + rollback_version_ini_transaction(&game_root).await; + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } + log::info!("all files downloaded for game: {game_id}"); + tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished { + id: game_id.to_string(), + })?; + Ok(()) +} diff --git a/crates/lanspread-peer/src/download/planning.rs b/crates/lanspread-peer/src/download/planning.rs new file mode 100644 index 0000000..aa91351 --- /dev/null +++ b/crates/lanspread-peer/src/download/planning.rs @@ -0,0 +1,350 @@ +use std::{collections::HashMap, net::SocketAddr, path::Path}; + +use lanspread_db::db::GameFileDescription; +use tokio::{fs::OpenOptions, sync::mpsc::UnboundedSender}; + +use crate::{PeerEvent, config::CHUNK_SIZE, path_validation::validate_game_file_path}; + +/// Represents a chunk of a file to be downloaded. +#[derive(Debug, Clone)] +pub(super) struct DownloadChunk { + pub(super) relative_path: String, + pub(super) offset: u64, + pub(super) length: u64, + pub(super) retry_count: usize, + pub(super) last_peer: Option, +} + +/// Download plan for a single peer. +#[derive(Debug, Default)] +pub(super) struct PeerDownloadPlan { + pub(super) chunks: Vec, + pub(super) whole_files: Vec, +} + +/// Result of downloading a chunk. +#[derive(Debug)] +pub(super) struct ChunkDownloadResult { + pub(super) chunk: DownloadChunk, + pub(super) result: eyre::Result<()>, + pub(super) peer_addr: SocketAddr, +} + +/// Extracts the root `version.ini` descriptor while keeping every descriptor in +/// the transfer list. The chunk writer diverts the sentinel bytes into memory. +pub(super) fn extract_version_descriptor( + game_id: &str, + game_file_descs: Vec, + tx_notify_ui: &UnboundedSender, +) -> eyre::Result<(GameFileDescription, Vec)> { + let mut version_descs = Vec::new(); + let mut transfer_descs = Vec::new(); + + for desc in game_file_descs { + if desc.is_version_ini() { + version_descs.push(desc.clone()); + } + transfer_descs.push(desc); + } + + if version_descs.len() != 1 { + let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + }); + eyre::bail!( + "expected exactly one root-level version.ini sentinel for {game_id}, found {}", + version_descs.len() + ); + } + + let version_desc = version_descs.remove(0); + Ok((version_desc, transfer_descs)) +} + +/// Prepares storage for game files by creating directories and pre-allocating files. +pub(super) async fn prepare_game_storage( + games_folder: &Path, + file_descs: &[GameFileDescription], +) -> eyre::Result<()> { + for desc in file_descs { + if desc.is_version_ini() { + continue; + } + + // Validate the path to prevent directory traversal + let validated_path = validate_game_file_path(games_folder, &desc.relative_path)?; + + if desc.is_dir { + tokio::fs::create_dir_all(&validated_path).await?; + } else { + if let Some(parent) = validated_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + // Create and pre-allocate the file with the expected size + let file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&validated_path) + .await?; + + // Pre-allocate the file with the expected size + let size = desc.size; + if let Err(e) = file.set_len(size).await { + log::warn!( + "Failed to pre-allocate file {} (size: {}): {}", + desc.relative_path, + size, + e + ); + // Continue without pre-allocation - the file will grow as chunks are written + } else { + log::debug!( + "Pre-allocated file {} with {} bytes", + desc.relative_path, + size + ); + } + } + } + Ok(()) +} + +/// Resolves which peers have a specific file. +pub(super) fn resolve_file_peers<'a>( + relative_path: &str, + file_peer_map: &'a HashMap>, + fallback: &'a [SocketAddr], +) -> &'a [SocketAddr] { + if let Some(peers) = file_peer_map.get(relative_path) + && !peers.is_empty() + { + return peers; + } + + fallback +} + +/// Builds download plans distributing files across peers. +pub(super) fn build_peer_plans( + peers: &[SocketAddr], + file_descs: &[GameFileDescription], + file_peer_map: &HashMap>, +) -> HashMap { + let mut plans: HashMap = HashMap::new(); + if peers.is_empty() { + return plans; + } + + let mut peer_index = 0usize; + + for desc in file_descs.iter().filter(|d| !d.is_dir) { + let size = desc.file_size(); + let eligible_peers = resolve_file_peers(&desc.relative_path, file_peer_map, peers); + if eligible_peers.is_empty() { + continue; + } + + if size == 0 { + let peer = eligible_peers[peer_index % eligible_peers.len()]; + peer_index += 1; + plans.entry(peer).or_default().chunks.push(DownloadChunk { + relative_path: desc.relative_path.clone(), + offset: 0, + length: 0, + retry_count: 0, + last_peer: Some(peer), + }); + continue; + } + + let mut offset = 0u64; + while offset < size { + let length = std::cmp::min(CHUNK_SIZE, size - offset); + let peer = eligible_peers[peer_index % eligible_peers.len()]; + peer_index += 1; + plans.entry(peer).or_default().chunks.push(DownloadChunk { + relative_path: desc.relative_path.clone(), + offset, + length, + retry_count: 0, + last_peer: Some(peer), + }); + offset += length; + } + } + + plans +} + +#[cfg(test)] +mod tests { + use lanspread_db::db::GameFileDescription; + + use super::*; + use crate::test_support::TempDir; + + fn loopback_addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + + #[test] + fn build_peer_plans_handles_partial_final_chunk() { + let peers = vec![loopback_addr(12000), loopback_addr(12001)]; + let file_size = CHUNK_SIZE * 2 + CHUNK_SIZE / 4; + let mut file_peer_map = HashMap::new(); + file_peer_map.insert("game/file.dat".to_string(), peers.clone()); + let file_descs = vec![GameFileDescription { + game_id: "test".to_string(), + relative_path: "game/file.dat".to_string(), + is_dir: false, + size: file_size, + }]; + + let plans = build_peer_plans(&peers, &file_descs, &file_peer_map); + let mut chunks: Vec<_> = plans.values().flat_map(|plan| plan.chunks.iter()).collect(); + + assert_eq!(chunks.len(), 3, "expected three chunks for 2.25 blocks"); + + chunks.sort_by_key(|chunk| chunk.offset); + let last_chunk = chunks.last().expect("last chunk exists"); + + assert_eq!(last_chunk.offset, CHUNK_SIZE * 2); + assert_eq!(last_chunk.length, file_size - last_chunk.offset); + assert_eq!(last_chunk.length, CHUNK_SIZE / 4); + assert_eq!( + last_chunk.offset + last_chunk.length, + file_size, + "last chunk should finish the file" + ); + } + + #[test] + fn build_peer_plans_respects_file_peer_map() { + let shared_a = loopback_addr(12010); + let shared_b = loopback_addr(12011); + let exclusive = loopback_addr(12012); + let peers = vec![shared_a, shared_b, exclusive]; + + let mut file_peer_map = HashMap::new(); + file_peer_map.insert("shared.bin".to_string(), vec![shared_a, shared_b]); + file_peer_map.insert("exclusive.bin".to_string(), vec![exclusive]); + + let file_descs = vec![ + GameFileDescription { + game_id: "test".to_string(), + relative_path: "shared.bin".to_string(), + is_dir: false, + size: CHUNK_SIZE * 2, + }, + GameFileDescription { + game_id: "test".to_string(), + relative_path: "exclusive.bin".to_string(), + is_dir: false, + size: CHUNK_SIZE, + }, + ]; + + let plans = build_peer_plans(&peers, &file_descs, &file_peer_map); + let exclusive_plan = plans + .get(&exclusive) + .expect("exclusive peer should have a plan"); + assert!( + exclusive_plan + .chunks + .iter() + .all(|chunk| chunk.relative_path == "exclusive.bin"), + "exclusive peer should only receive exclusive.bin chunks" + ); + + for (peer, plan) in plans { + for chunk in plan.chunks { + match chunk.relative_path.as_str() { + "exclusive.bin" => assert_eq!( + peer, exclusive, + "exclusive.bin chunks should only be assigned to the exclusive peer" + ), + "shared.bin" => assert!( + peer == shared_a || peer == shared_b, + "shared.bin chunks must stay within shared peers" + ), + other => panic!("unexpected file in plan: {other}"), + } + } + } + } + + #[tokio::test] + async fn prepare_game_storage_skips_version_ini_sentinel() { + let temp = TempDir::new("lanspread-download"); + let descs = vec![GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }]; + + prepare_game_storage(temp.path(), &descs) + .await + .expect("storage preparation should succeed"); + + assert!(!temp.path().join("game").join("version.ini").exists()); + } + + #[test] + fn version_descriptor_extraction_keeps_nested_decoy_in_transfer_list() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); + let nested_decoy = vec![ + GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }, + GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/local/version.ini".to_string(), + is_dir: false, + size: 8, + }, + ]; + + let (version, transfer) = + extract_version_descriptor("game", nested_decoy, &tx).expect("only one root sentinel"); + assert_eq!(version.relative_path, "game/version.ini"); + assert_eq!(transfer.len(), 2); + } + + #[test] + fn version_descriptor_extraction_requires_a_root_version_ini() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); + let missing = vec![GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/archive.eti".to_string(), + is_dir: false, + size: 1, + }]; + assert!(extract_version_descriptor("game", missing, &tx).is_err()); + } + + #[test] + fn version_descriptor_extraction_rejects_duplicate_root_version_ini() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); + let multiple = vec![ + GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }, + GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }, + ]; + assert!(extract_version_descriptor("game", multiple, &tx).is_err()); + } +} diff --git a/crates/lanspread-peer/src/download/retry.rs b/crates/lanspread-peer/src/download/retry.rs new file mode 100644 index 0000000..22bd0fb --- /dev/null +++ b/crates/lanspread-peer/src/download/retry.rs @@ -0,0 +1,164 @@ +use std::{ + collections::{HashMap, VecDeque}, + net::SocketAddr, + path::Path, + sync::Arc, +}; + +use tokio_util::sync::CancellationToken; + +use super::{ + planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan, resolve_file_peers}, + transport::download_from_peer, + version_ini::VersionIniBuffer, +}; +use crate::config::MAX_RETRY_COUNT; + +/// Selects a peer for retrying a failed chunk. +fn select_retry_peer( + peers: &[SocketAddr], + last_peer: Option, + attempt_offset: usize, +) -> Option { + if peers.is_empty() { + return None; + } + + if peers.len() > 1 + && let Some(last) = last_peer + && let Some(pos) = peers.iter().position(|addr| *addr == last) + { + let next_index = (pos + 1 + attempt_offset) % peers.len(); + return Some(peers[next_index]); + } + + Some(peers[attempt_offset % peers.len()]) +} + +/// Returns a fallback peer address for error reporting. +fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option) -> SocketAddr { + last_peer + .or_else(|| peers.first().copied()) + .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0))) +} + +/// Retries downloading failed chunks. +pub(super) async fn retry_failed_chunks( + failed_chunks: Vec, + peers: &[SocketAddr], + base_dir: &Path, + game_id: &str, + file_peer_map: &HashMap>, + cancel_token: &CancellationToken, + version_buffer: Option>, +) -> eyre::Result> { + let mut exhausted = Vec::new(); + let mut queue: VecDeque = failed_chunks.into_iter().collect(); + + while let Some(mut chunk) = queue.pop_front() { + if cancel_token.is_cancelled() { + return Ok(exhausted); + } + + let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers); + + if chunk.retry_count >= MAX_RETRY_COUNT { + exhausted.push(ChunkDownloadResult { + chunk: chunk.clone(), + result: Err(eyre::eyre!( + "Retry budget exhausted for chunk: {}", + chunk.relative_path + )), + peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), + }); + continue; + } + + let retry_offset = chunk.retry_count.saturating_sub(1); + let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer, retry_offset) + else { + exhausted.push(ChunkDownloadResult { + chunk: chunk.clone(), + result: Err(eyre::eyre!( + "No peers available to retry chunk: {}", + chunk.relative_path + )), + peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), + }); + continue; + }; + + let mut attempt_chunk = chunk.clone(); + attempt_chunk.last_peer = Some(peer_addr); + + let plan = PeerDownloadPlan { + chunks: vec![attempt_chunk.clone()], + whole_files: Vec::new(), + }; + + match download_from_peer( + peer_addr, + game_id, + plan, + base_dir.to_path_buf(), + cancel_token, + version_buffer.clone(), + ) + .await + { + Ok(results) => { + if cancel_token.is_cancelled() { + return Ok(exhausted); + } + + for result in results { + match result.result { + Ok(()) => {} + Err(e) => { + let mut retry_chunk = result.chunk.clone(); + retry_chunk.retry_count = chunk.retry_count + 1; + retry_chunk.last_peer = Some(result.peer_addr); + + if retry_chunk.retry_count >= MAX_RETRY_COUNT { + let context = format!( + "Retry budget exhausted for chunk: {}", + result.chunk.relative_path + ); + exhausted.push(ChunkDownloadResult { + chunk: retry_chunk, + result: Err(e.wrap_err(context)), + peer_addr: result.peer_addr, + }); + } else { + queue.push_back(retry_chunk); + } + } + } + } + } + Err(e) => { + if cancel_token.is_cancelled() { + return Ok(exhausted); + } + + chunk.retry_count += 1; + chunk.last_peer = Some(peer_addr); + + if chunk.retry_count >= MAX_RETRY_COUNT { + exhausted.push(ChunkDownloadResult { + chunk: chunk.clone(), + result: Err(e.wrap_err(format!( + "Retry budget exhausted for chunk after connection failure: {}", + chunk.relative_path + ))), + peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer), + }); + } else { + queue.push_back(chunk); + } + } + } + } + + Ok(exhausted) +} diff --git a/crates/lanspread-peer/src/download/transport.rs b/crates/lanspread-peer/src/download/transport.rs new file mode 100644 index 0000000..482e0eb --- /dev/null +++ b/crates/lanspread-peer/src/download/transport.rs @@ -0,0 +1,259 @@ +use std::{ + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, +}; + +use lanspread_db::db::GameFileDescription; +use tokio::{ + fs::OpenOptions, + io::{AsyncSeekExt, AsyncWriteExt}, +}; +use tokio_util::{ + codec::{FramedWrite, LengthDelimitedCodec}, + sync::CancellationToken, +}; + +use super::{ + planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan}, + version_ini::VersionIniBuffer, +}; +use crate::{network::connect_to_peer, path_validation::validate_game_file_path}; + +fn ensure_download_not_cancelled( + cancel_token: &CancellationToken, + game_id: &str, +) -> eyre::Result<()> { + if cancel_token.is_cancelled() { + eyre::bail!("download cancelled for game {game_id}"); + } + Ok(()) +} + +/// Downloads a single chunk from a peer. +async fn download_chunk( + conn: &mut s2n_quic::Connection, + base_dir: &Path, + game_id: &str, + chunk: &DownloadChunk, + version_buffer: Option>, +) -> eyre::Result<()> { + use futures::SinkExt; + use lanspread_proto::{Message, Request}; + + let stream = conn.open_bidirectional_stream().await?; + let (mut rx, tx) = stream.split(); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + + let request = Request::GetGameFileChunk { + game_id: game_id.to_string(), + relative_path: chunk.relative_path.clone(), + offset: chunk.offset, + length: chunk.length, + }; + framed_tx.send(request.encode()).await?; + + framed_tx.close().await?; + + if let Some(buffer) = version_buffer + && buffer.matches(&chunk.relative_path) + { + return download_version_ini_chunk(&mut rx, chunk, &buffer).await; + } + + // Validate the path to prevent directory traversal + let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?; + let mut file = OpenOptions::new() + .create(true) + .write(true) + .truncate(false) + .open(&validated_path) + .await?; + if chunk.length == 0 && chunk.offset == 0 { + // fallback-to-whole-file path replaces any existing partial data + file.set_len(0).await?; + } + file.seek(std::io::SeekFrom::Start(chunk.offset)).await?; + + let mut remaining = chunk.length; + let mut received_bytes = 0u64; + + while let Some(bytes) = rx.receive().await? { + file.write_all(&bytes).await?; + received_bytes += bytes.len() as u64; + + if remaining == 0 { + continue; + } + remaining = remaining.saturating_sub(bytes.len() as u64); + if remaining == 0 { + break; + } + } + + // Verify we received the expected amount of data + if chunk.length > 0 && received_bytes != chunk.length { + eyre::bail!( + "Incomplete chunk download: expected {} bytes, received {} bytes for file {} at offset {}", + chunk.length, + received_bytes, + chunk.relative_path, + chunk.offset + ); + } + + file.flush().await?; + + // Verify file integrity by checking the file size + verify_chunk_integrity(&validated_path, chunk.offset, chunk.length).await?; + + Ok(()) +} + +async fn download_version_ini_chunk( + rx: &mut s2n_quic::stream::ReceiveStream, + chunk: &DownloadChunk, + buffer: &VersionIniBuffer, +) -> eyre::Result<()> { + let mut received = Vec::new(); + while let Some(bytes) = rx.receive().await? { + received.extend_from_slice(&bytes); + } + + if chunk.length > 0 && u64::try_from(received.len())? != chunk.length { + eyre::bail!( + "Incomplete version.ini chunk download: expected {} bytes, received {} bytes at offset {}", + chunk.length, + received.len(), + chunk.offset + ); + } + + buffer.write_at(chunk.offset, &received).await +} + +/// Verifies that a chunk was written correctly. +async fn verify_chunk_integrity( + file_path: &Path, + offset: u64, + expected_length: u64, +) -> eyre::Result<()> { + if expected_length == 0 { + return Ok(()); // Skip verification for whole files or zero-length chunks + } + + let metadata = tokio::fs::metadata(file_path).await?; + let file_size = metadata.len(); + + if file_size < offset + expected_length { + eyre::bail!( + "File integrity check failed: file size {} is less than expected {} (offset: {})", + file_size, + offset + expected_length, + offset + ); + } + + Ok(()) +} + +/// Downloads a whole file from a peer. +async fn download_whole_file( + conn: &mut s2n_quic::Connection, + base_dir: &Path, + desc: &GameFileDescription, +) -> eyre::Result<()> { + use futures::SinkExt; + use lanspread_proto::{Message, Request}; + + let stream = conn.open_bidirectional_stream().await?; + let (mut rx, tx) = stream.split(); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + + framed_tx + .send(Request::GetGameFileData(desc.clone()).encode()) + .await?; + framed_tx.close().await?; + + // Validate the path to prevent directory traversal + let validated_path = validate_game_file_path(base_dir, &desc.relative_path)?; + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&validated_path) + .await?; + file.seek(std::io::SeekFrom::Start(0)).await?; + + while let Some(bytes) = rx.receive().await? { + file.write_all(&bytes).await?; + } + + file.flush().await?; + Ok(()) +} + +/// Downloads all assigned chunks and files from a single peer. +pub(super) async fn download_from_peer( + peer_addr: SocketAddr, + game_id: &str, + plan: PeerDownloadPlan, + games_folder: PathBuf, + cancel_token: &CancellationToken, + version_buffer: Option>, +) -> eyre::Result> { + if plan.chunks.is_empty() && plan.whole_files.is_empty() { + return Ok(Vec::new()); + } + + ensure_download_not_cancelled(cancel_token, game_id)?; + + let mut conn = connect_to_peer(peer_addr).await?; + conn.keep_alive(true)?; + conn.keep_alive(true)?; + + let base_dir = games_folder; + let mut results = Vec::new(); + + // Download chunks with error handling + for chunk in &plan.chunks { + ensure_download_not_cancelled(cancel_token, game_id)?; + + log::info!( + "Downloading chunk {} (offset {}, length {}) from {}", + chunk.relative_path, + chunk.offset, + chunk.length, + peer_addr + ); + let result = + download_chunk(&mut conn, &base_dir, game_id, chunk, version_buffer.clone()).await; + results.push(ChunkDownloadResult { + chunk: chunk.clone(), + result, + peer_addr, + }); + } + + // Download whole files + for desc in &plan.whole_files { + ensure_download_not_cancelled(cancel_token, game_id)?; + + let chunk = DownloadChunk { + relative_path: desc.relative_path.clone(), + offset: 0, + length: 0, // Indicates whole file + retry_count: 0, + last_peer: Some(peer_addr), + }; + + let result = download_whole_file(&mut conn, &base_dir, desc).await; + results.push(ChunkDownloadResult { + chunk, + result, + peer_addr, + }); + } + + Ok(results) +} diff --git a/crates/lanspread-peer/src/download/version_ini.rs b/crates/lanspread-peer/src/download/version_ini.rs new file mode 100644 index 0000000..6d703f8 --- /dev/null +++ b/crates/lanspread-peer/src/download/version_ini.rs @@ -0,0 +1,229 @@ +use std::path::Path; + +use lanspread_db::db::GameFileDescription; +use tokio::{io::AsyncWriteExt, sync::Mutex}; + +#[derive(Debug)] +pub(super) struct VersionIniBuffer { + relative_path: String, + bytes: Mutex>, +} + +impl VersionIniBuffer { + pub(super) fn new(desc: &GameFileDescription) -> eyre::Result { + if desc.is_dir { + eyre::bail!("version.ini sentinel cannot be a directory"); + } + let size = usize::try_from(desc.size)?; + Ok(Self { + relative_path: desc.relative_path.clone(), + bytes: Mutex::new(vec![0; size]), + }) + } + + pub(super) fn matches(&self, relative_path: &str) -> bool { + self.relative_path == relative_path + } + + pub(super) async fn write_at(&self, offset: u64, bytes: &[u8]) -> eyre::Result<()> { + let offset = usize::try_from(offset)?; + let mut buffer = self.bytes.lock().await; + let end = offset + .checked_add(bytes.len()) + .ok_or_else(|| eyre::eyre!("version.ini chunk offset overflow"))?; + if end > buffer.len() { + eyre::bail!( + "version.ini chunk exceeds buffer: end {end}, buffer {}", + buffer.len() + ); + } + buffer[offset..end].copy_from_slice(bytes); + Ok(()) + } + + async fn snapshot(&self) -> Vec { + self.bytes.lock().await.clone() + } +} + +pub(super) async fn begin_version_ini_transaction(game_root: &Path) -> eyre::Result<()> { + tokio::fs::create_dir_all(game_root).await?; + remove_file_if_exists(&game_root.join(".version.ini.tmp")).await?; + remove_file_if_exists(&game_root.join(".version.ini.discarded")).await?; + + let version_path = game_root.join("version.ini"); + if tokio::fs::metadata(&version_path) + .await + .is_ok_and(|metadata| metadata.is_file()) + { + tokio::fs::rename(version_path, game_root.join(".version.ini.discarded")).await?; + } + Ok(()) +} + +pub(super) async fn rollback_version_ini_transaction(game_root: &Path) { + if let Err(err) = remove_file_if_exists(&game_root.join(".version.ini.tmp")).await { + log::warn!( + "Failed to sweep partial version.ini tmp in {}: {err}", + game_root.display() + ); + } + if let Err(err) = remove_file_if_exists(&game_root.join(".version.ini.discarded")).await { + log::warn!( + "Failed to sweep discarded version.ini in {}: {err}", + game_root.display() + ); + } +} + +pub(super) async fn commit_version_ini_buffer( + game_root: &Path, + buffer: &VersionIniBuffer, +) -> eyre::Result<()> { + let tmp_path = game_root.join(".version.ini.tmp"); + let version_path = game_root.join("version.ini"); + let bytes = buffer.snapshot().await; + + let mut file = tokio::fs::File::create(&tmp_path).await?; + file.write_all(&bytes).await?; + file.sync_all().await?; + drop(file); + + tokio::fs::rename(&tmp_path, &version_path).await?; + sync_parent_dir(&version_path)?; + remove_file_if_exists(&game_root.join(".version.ini.discarded")).await?; + Ok(()) +} + +#[cfg(unix)] +fn sync_parent_dir(path: &Path) -> std::io::Result<()> { + if let Some(parent) = path.parent() { + std::fs::File::open(parent)?.sync_all()?; + } + Ok(()) +} + +#[cfg(not(unix))] +fn sync_parent_dir(_path: &Path) -> std::io::Result<()> { + Ok(()) +} + +async fn remove_file_if_exists(path: &Path) -> eyre::Result<()> { + match tokio::fs::remove_file(path).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.into()), + } +} + +#[cfg(test)] +mod tests { + use lanspread_db::db::GameFileDescription; + + use super::*; + use crate::test_support::TempDir; + + #[tokio::test] + async fn version_ini_buffer_accepts_out_of_order_chunks() { + let desc = GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }; + let buffer = VersionIniBuffer::new(&desc).expect("buffer should be created"); + + buffer + .write_at(4, b"0101") + .await + .expect("second chunk should write"); + buffer + .write_at(0, b"2025") + .await + .expect("first chunk should write"); + + assert_eq!(buffer.snapshot().await, b"20250101"); + } + + #[tokio::test] + async fn commit_version_ini_writes_sentinel_last_and_sweeps_discarded() { + let temp = TempDir::new("lanspread-download"); + let game_root = temp.path().join("game"); + tokio::fs::create_dir_all(&game_root) + .await + .expect("game root should be created"); + tokio::fs::write(game_root.join(".version.ini.discarded"), b"old") + .await + .expect("discarded sentinel should be written"); + + let desc = GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }; + let buffer = VersionIniBuffer::new(&desc).expect("buffer should be created"); + buffer + .write_at(0, b"20250101") + .await + .expect("version should be buffered"); + + commit_version_ini_buffer(&game_root, &buffer) + .await + .expect("version sentinel should commit"); + + assert_eq!( + std::fs::read(game_root.join("version.ini")).expect("version.ini should exist"), + b"20250101" + ); + assert!(!game_root.join(".version.ini.tmp").exists()); + assert!(!game_root.join(".version.ini.discarded").exists()); + } + + #[tokio::test] + async fn begin_version_ini_transaction_parks_existing_sentinel() { + let temp = TempDir::new("lanspread-download"); + let game_root = temp.path().join("game"); + tokio::fs::create_dir_all(&game_root) + .await + .expect("game root should be created"); + tokio::fs::write(game_root.join("version.ini"), b"20240101") + .await + .expect("version sentinel should be written"); + tokio::fs::write(game_root.join(".version.ini.tmp"), b"partial") + .await + .expect("tmp sentinel should be written"); + + begin_version_ini_transaction(&game_root) + .await + .expect("transaction should begin"); + + assert!(!game_root.join("version.ini").exists()); + assert!(!game_root.join(".version.ini.tmp").exists()); + assert_eq!( + std::fs::read(game_root.join(".version.ini.discarded")) + .expect("discarded sentinel should exist"), + b"20240101" + ); + } + + #[tokio::test] + async fn rollback_version_ini_transaction_sweeps_transients() { + let temp = TempDir::new("lanspread-download"); + let game_root = temp.path().join("game"); + tokio::fs::create_dir_all(&game_root) + .await + .expect("game root should be created"); + tokio::fs::write(game_root.join(".version.ini.tmp"), b"partial") + .await + .expect("tmp sentinel should be written"); + tokio::fs::write(game_root.join(".version.ini.discarded"), b"old") + .await + .expect("discarded sentinel should be written"); + + rollback_version_ini_transaction(&game_root).await; + + assert!(!game_root.join(".version.ini.tmp").exists()); + assert!(!game_root.join(".version.ini.discarded").exists()); + } +}