use std::{convert::TryInto, path::Path}; use bytes::Bytes; use lanspread_db::db::GameFileDescription; use lanspread_utils::maybe_addr; use s2n_quic::stream::SendStream; use tokio::{ io::{AsyncReadExt, AsyncSeekExt}, time::Instant, }; use crate::path_validation::validate_game_file_path; async fn stream_file_bytes( tx: &mut SendStream, base_dir: &Path, relative_path: &str, offset: u64, length: Option, ) -> eyre::Result<()> { let remote_addr = maybe_addr!(tx.connection().remote_addr()); // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(base_dir, relative_path)?; log::debug!( "{remote_addr} streaming file bytes for peer: {}, offset: {offset}, length: {length:?}", validated_path.display() ); let mut file = tokio::fs::File::open(&validated_path).await?; if offset > 0 { file.seek(std::io::SeekFrom::Start(offset)).await?; } let mut remaining = length.unwrap_or(u64::MAX); let mut total_bytes = 0u64; let mut last_total_bytes = 0u64; let mut timestamp = Instant::now(); let mut buf = vec![0u8; 64 * 1024]; while remaining > 0 { let read_len = std::cmp::min(remaining, buf.len() as u64); let read_len: usize = read_len.try_into().unwrap_or(usize::MAX); if read_len == 0 { break; } let bytes_read = file.read(&mut buf[..read_len]).await?; if bytes_read == 0 { break; } tx.send(Bytes::copy_from_slice(&buf[..bytes_read])).await?; remaining = remaining.saturating_sub(bytes_read as u64); total_bytes += bytes_read as u64; if last_total_bytes + 10_000_000 < total_bytes { let elapsed = timestamp.elapsed(); let diff_bytes = total_bytes - last_total_bytes; if elapsed.as_secs_f64() >= 1.0 { #[allow(clippy::cast_precision_loss)] let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0); log::debug!( "{remote_addr} sending file data: {}, MB/s: {mb_per_s:.2}", validated_path.display() ); last_total_bytes = total_bytes; timestamp = Instant::now(); } } } log::debug!( "{remote_addr} finished streaming file bytes: {}, total_bytes: {total_bytes}", validated_path.display() ); tx.close().await?; Ok(()) } pub async fn send_game_file_data( game_file_desc: &GameFileDescription, tx: &mut SendStream, game_dir: &Path, ) { if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await { let remote_addr = maybe_addr!(tx.connection().remote_addr()); log::error!( "{remote_addr} failed to stream file {}: {e}", game_file_desc.relative_path ); } } pub async fn send_game_file_chunk( game_id: &str, relative_path: &str, offset: u64, length: u64, tx: &mut SendStream, game_dir: &Path, ) { if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await { let remote_addr = maybe_addr!(tx.connection().remote_addr()); log::error!( "{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}" ); } }