use std::{ collections::VecDeque, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, }; use futures::{SinkExt, StreamExt, stream::FuturesUnordered}; use s2n_quic::{Connection, stream::ReceiveStream}; use tokio::{ fs::OpenOptions, io::{AsyncSeekExt, AsyncWriteExt}, }; use tokio_util::{ codec::{FramedWrite, LengthDelimitedCodec}, sync::CancellationToken, }; use super::{ planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan}, progress::DownloadProgressTracker, version_ini::VersionIniBuffer, }; use crate::{ config::PEER_DOWNLOAD_STREAM_WINDOW, network::connect_to_peer, path_validation::validate_game_file_path, }; fn ensure_download_not_cancelled( cancel_token: &CancellationToken, game_id: &str, ) -> eyre::Result<()> { if cancel_token.is_cancelled() { eyre::bail!("download cancelled for game {game_id}"); } Ok(()) } async fn open_chunk_stream( conn: &mut Connection, game_id: &str, chunk: &DownloadChunk, ) -> eyre::Result { use lanspread_proto::{Message, Request}; let stream = conn.open_bidirectional_stream().await?; let (rx, tx) = stream.split(); let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); let request = Request::GetGameFileChunk { game_id: game_id.to_string(), relative_path: chunk.relative_path.clone(), offset: chunk.offset, length: chunk.length, }; framed_tx.send(request.encode()).await?; framed_tx.close().await?; Ok(rx) } /// Receives one requested chunk from a peer stream. async fn receive_chunk( mut rx: ReceiveStream, base_dir: &Path, chunk: &DownloadChunk, version_buffer: Option>, progress_tracker: Arc, ) -> eyre::Result<()> { if let Some(buffer) = version_buffer && buffer.matches(&chunk.relative_path) { return download_version_ini_chunk(rx, chunk, &buffer, progress_tracker).await; } // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?; let mut file = OpenOptions::new() .create(true) .write(true) .truncate(false) .open(&validated_path) .await?; if chunk.length == 0 && chunk.offset == 0 { // fallback-to-whole-file path replaces any existing partial data file.set_len(0).await?; } file.seek(std::io::SeekFrom::Start(chunk.offset)).await?; let mut remaining = chunk.length; let mut received_bytes = 0u64; let mut progress = progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length); while let Some(bytes) = rx.receive().await? { file.write_all(&bytes).await?; progress.record_bytes(bytes.len()); let byte_count = u64::try_from(bytes.len()).unwrap_or(u64::MAX); received_bytes = received_bytes.saturating_add(byte_count); if remaining == 0 { continue; } remaining = remaining.saturating_sub(byte_count); if remaining == 0 { break; } } // Verify we received the expected amount of data if chunk.length > 0 && received_bytes != chunk.length { eyre::bail!( "Incomplete chunk download: expected {} bytes, received {} bytes for file {} at offset {}", chunk.length, received_bytes, chunk.relative_path, chunk.offset ); } file.flush().await?; // Verify file integrity by checking the file size verify_chunk_integrity(&validated_path, chunk.offset, chunk.length).await?; Ok(()) } async fn receive_chunk_result( peer_addr: SocketAddr, base_dir: PathBuf, chunk: DownloadChunk, rx: ReceiveStream, version_buffer: Option>, progress_tracker: Arc, ) -> ChunkDownloadResult { let result = receive_chunk(rx, &base_dir, &chunk, version_buffer, progress_tracker).await; ChunkDownloadResult { chunk, result, peer_addr, } } async fn download_version_ini_chunk( mut rx: ReceiveStream, chunk: &DownloadChunk, buffer: &VersionIniBuffer, progress_tracker: Arc, ) -> eyre::Result<()> { let mut received = Vec::new(); let mut progress = progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length); while let Some(bytes) = rx.receive().await? { progress.record_bytes(bytes.len()); received.extend_from_slice(&bytes); } if chunk.length > 0 && u64::try_from(received.len())? != chunk.length { eyre::bail!( "Incomplete version.ini chunk download: expected {} bytes, received {} bytes at offset {}", chunk.length, received.len(), chunk.offset ); } buffer.write_at(chunk.offset, &received).await } /// Verifies that a chunk was written correctly. async fn verify_chunk_integrity( file_path: &Path, offset: u64, expected_length: u64, ) -> eyre::Result<()> { if expected_length == 0 { return Ok(()); // Skip verification for whole files or zero-length chunks } let metadata = tokio::fs::metadata(file_path).await?; let file_size = metadata.len(); if file_size < offset + expected_length { eyre::bail!( "File integrity check failed: file size {} is less than expected {} (offset: {})", file_size, offset + expected_length, offset ); } Ok(()) } fn failed_chunk_result( chunk: DownloadChunk, peer_addr: SocketAddr, reason: impl Into, ) -> ChunkDownloadResult { ChunkDownloadResult { chunk, result: Err(eyre::Report::msg(reason.into())), peer_addr, } } fn failed_plan_results( plan: PeerDownloadPlan, peer_addr: SocketAddr, reason: impl std::fmt::Display, ) -> Vec { let reason = format!("peer connection failed: {reason}"); plan.chunks .into_iter() .map(|chunk| failed_chunk_result(chunk, peer_addr, reason.clone())) .collect() } struct ChunkPlanContext<'a> { peer_addr: SocketAddr, game_id: &'a str, base_dir: &'a Path, cancel_token: &'a CancellationToken, version_buffer: Option>, progress_tracker: Arc, } async fn download_chunk_plan( conn: &mut Connection, chunks: Vec, ctx: &ChunkPlanContext<'_>, ) -> eyre::Result> { let mut pending: VecDeque = chunks.into(); let mut in_flight = FuturesUnordered::new(); let mut results = Vec::new(); let window = PEER_DOWNLOAD_STREAM_WINDOW.max(1); let base_dir = ctx.base_dir.to_path_buf(); while !pending.is_empty() || !in_flight.is_empty() { while in_flight.len() < window { let Some(chunk) = pending.pop_front() else { break; }; ensure_download_not_cancelled(ctx.cancel_token, ctx.game_id)?; log::info!( "Downloading chunk {} (offset {}, length {}) from {}", chunk.relative_path, chunk.offset, chunk.length, ctx.peer_addr ); match open_chunk_stream(conn, ctx.game_id, &chunk).await { Ok(rx) => { in_flight.push(receive_chunk_result( ctx.peer_addr, base_dir.clone(), chunk, rx, ctx.version_buffer.clone(), ctx.progress_tracker.clone(), )); } Err(err) => { let reason = format!("failed to open chunk stream: {err}"); results.push(failed_chunk_result(chunk, ctx.peer_addr, reason.clone())); while let Some(chunk) = pending.pop_front() { results.push(failed_chunk_result( chunk, ctx.peer_addr, format!("peer stream unavailable after earlier open failure: {reason}"), )); } break; } } } if in_flight.is_empty() { continue; } let result = tokio::select! { () = ctx.cancel_token.cancelled() => { eyre::bail!("download cancelled for game {}", ctx.game_id); } result = in_flight.next() => result.expect("in-flight chunk stream should exist"), }; results.push(result); } Ok(results) } /// Downloads all assigned chunks and files from a single peer. pub(super) async fn download_from_peer( peer_addr: SocketAddr, game_id: &str, plan: PeerDownloadPlan, games_folder: PathBuf, cancel_token: &CancellationToken, version_buffer: Option>, progress_tracker: Arc, ) -> eyre::Result> { if plan.chunks.is_empty() { return Ok(Vec::new()); } ensure_download_not_cancelled(cancel_token, game_id)?; let mut conn = match tokio::select! { () = cancel_token.cancelled() => { eyre::bail!("download cancelled for game {game_id}"); } result = connect_to_peer(peer_addr) => result, } { Ok(conn) => conn, Err(err) => return Ok(failed_plan_results(plan, peer_addr, err)), }; if let Err(err) = conn.keep_alive(true) { return Ok(failed_plan_results(plan, peer_addr, err)); } let base_dir = games_folder; let chunk_ctx = ChunkPlanContext { peer_addr, game_id, base_dir: &base_dir, cancel_token, version_buffer, progress_tracker, }; let results = download_chunk_plan(&mut conn, plan.chunks, &chunk_ctx).await?; Ok(results) }