use std::{convert::TryInto, path::Path}; use bytes::Bytes; use lanspread_db::db::GameFileDescription; use lanspread_utils::maybe_addr; use s2n_quic::{ application, connection, stream::{Error as StreamError, SendStream}, }; use tokio::{ io::{AsyncReadExt, AsyncSeekExt}, time::Instant, }; use crate::{config::FILE_TRANSFER_BUFFER_SIZE, path_validation::validate_game_file_path}; fn cancel_send_stream(tx: &mut SendStream, remote_addr: impl std::fmt::Display, path: &Path) { // Reset instead of finishing so truncated whole-file transfers cannot look like EOF. if let Err(err) = tx.reset(application::Error::UNKNOWN) { log::debug!( "{remote_addr} failed to reset cancelled transfer for {}: {err}", path.display() ); } } #[allow(clippy::too_many_lines)] async fn stream_file_bytes( tx: &mut SendStream, base_dir: &Path, relative_path: &str, offset: u64, length: Option, cancel_token: tokio_util::sync::CancellationToken, ) -> eyre::Result<()> { let remote_addr = maybe_addr!(tx.connection().remote_addr()); // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(base_dir, relative_path)?; log::debug!( "{remote_addr} streaming file bytes for peer: {}, offset: {offset}, length: {length:?}", validated_path.display() ); let mut file = tokio::fs::File::open(&validated_path).await?; if offset > 0 { file.seek(std::io::SeekFrom::Start(offset)).await?; } let mut remaining = length.unwrap_or(u64::MAX); let expect_exact = length.is_some(); let mut transfer_complete = matches!(length, Some(0)); let mut total_bytes = 0u64; let mut last_total_bytes = 0u64; let started = Instant::now(); let mut timestamp = Instant::now(); let mut buf = vec![0u8; FILE_TRANSFER_BUFFER_SIZE]; while remaining > 0 { if cancel_token.is_cancelled() { log::info!( "{remote_addr} transfer cancelled for {}", validated_path.display() ); cancel_send_stream(tx, remote_addr, &validated_path); return Err(eyre::eyre!("File transfer cancelled by user")); } let read_len = std::cmp::min(remaining, buf.len() as u64); let read_len: usize = read_len.try_into().unwrap_or(usize::MAX); if read_len == 0 { break; } let bytes_read = tokio::select! { () = cancel_token.cancelled() => { log::info!( "{remote_addr} transfer cancelled for {}", validated_path.display() ); cancel_send_stream(tx, remote_addr, &validated_path); return Err(eyre::eyre!("File transfer cancelled by user")); } res = file.read(&mut buf[..read_len]) => { res? } }; if bytes_read == 0 { if !expect_exact { transfer_complete = true; } break; } tokio::select! { () = cancel_token.cancelled() => { log::info!( "{remote_addr} transfer cancelled for {}", validated_path.display() ); cancel_send_stream(tx, remote_addr, &validated_path); return Err(eyre::eyre!("File transfer cancelled by user")); } res = tx.send(Bytes::copy_from_slice(&buf[..bytes_read])) => { res?; } } remaining = remaining.saturating_sub(bytes_read as u64); total_bytes += bytes_read as u64; if expect_exact && remaining == 0 { transfer_complete = true; break; } if last_total_bytes + 10_000_000 < total_bytes { let elapsed = timestamp.elapsed(); let diff_bytes = total_bytes - last_total_bytes; if elapsed.as_secs_f64() >= 1.0 { #[allow(clippy::cast_precision_loss)] let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0); log::debug!( "{remote_addr} sending file data: {}, MB/s: {mb_per_s:.2}", validated_path.display() ); last_total_bytes = total_bytes; timestamp = Instant::now(); } } } let elapsed = started.elapsed(); #[allow(clippy::cast_precision_loss)] let mib_per_s = if elapsed.as_secs_f64() > 0.0 { total_bytes as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0) } else { 0.0 }; log::info!( "{remote_addr} finished streaming file bytes: {}, total_bytes: {total_bytes}, MiB/s: {mib_per_s:.2}", validated_path.display() ); tokio::select! { () = cancel_token.cancelled() => { log::info!("{remote_addr} transfer cancelled while closing stream"); cancel_send_stream(tx, remote_addr, &validated_path); return Err(eyre::eyre!("File transfer cancelled by user")); } res = tx.close() => { match res { Ok(()) => {} Err(err) if transfer_complete && is_clean_remote_close(&err) => { log::debug!("{remote_addr} closed stream after transfer completion: {err}"); } Err(err) => return Err(err.into()), } } } Ok(()) } fn is_clean_remote_close(err: &StreamError) -> bool { matches!( err, StreamError::ConnectionError { error: connection::Error::Closed { .. }, .. } ) } pub async fn send_game_file_data( game_file_desc: &GameFileDescription, tx: &mut SendStream, game_dir: &Path, cancel_token: tokio_util::sync::CancellationToken, ) { if let Err(e) = stream_file_bytes( tx, game_dir, &game_file_desc.relative_path, 0, None, cancel_token, ) .await { let remote_addr = maybe_addr!(tx.connection().remote_addr()); log::error!( "{remote_addr} failed to stream file {}: {e}", game_file_desc.relative_path ); } } pub async fn send_game_file_chunk( game_id: &str, relative_path: &str, offset: u64, length: u64, tx: &mut SendStream, game_dir: &Path, cancel_token: tokio_util::sync::CancellationToken, ) { if let Err(e) = stream_file_bytes( tx, game_dir, relative_path, offset, Some(length), cancel_token, ) .await { let remote_addr = maybe_addr!(tx.connection().remote_addr()); log::error!( "{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}" ); } }