diff --git a/crates/lanspread-db/src/db.rs b/crates/lanspread-db/src/db.rs index 2094e0f..4582332 100644 --- a/crates/lanspread-db/src/db.rs +++ b/crates/lanspread-db/src/db.rs @@ -179,6 +179,8 @@ pub struct GameFileDescription { pub game_id: String, pub relative_path: String, pub is_dir: bool, + #[serde(default)] + pub size: Option, } impl GameFileDescription { @@ -186,6 +188,11 @@ impl GameFileDescription { pub fn is_version_ini(&self) -> bool { self.relative_path.ends_with("/version.ini") } + + #[must_use] + pub fn file_size(&self) -> Option { + if self.is_dir { None } else { self.size } + } } impl fmt::Debug for GameFileDescription { @@ -193,10 +200,11 @@ impl fmt::Debug for GameFileDescription { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "{}: [{}] path:{}", + "{}: [{}] path:{} size:{}", self.game_id, if self.is_dir { 'D' } else { 'F' }, self.relative_path, + self.size.unwrap_or_default(), ) } } diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 957e709..04e718b 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -1,30 +1,28 @@ #![allow(clippy::missing_errors_doc)] +mod peer; + use std::{ collections::HashMap, - fs::File, - io::Write, net::SocketAddr, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, time::{Duration, Instant}, }; +use crate::peer::{send_game_file_chunk, send_game_file_data}; use bytes::BytesMut; use gethostname::gethostname; use lanspread_db::db::{Game, GameDB, GameFileDescription}; use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, discover_service}; use lanspread_proto::{Message, Request, Response}; use s2n_quic::{ - Client as QuicClient, - Connection, - Server, - client::Connect, - provider::limits::Limits, + Client as QuicClient, Connection, Server, client::Connect, provider::limits::Limits, stream::BidirectionalStream, }; use tokio::{ - io::AsyncWriteExt, + fs::OpenOptions, + io::{AsyncSeekExt, AsyncWriteExt}, sync::{ RwLock, mpsc::{UnboundedReceiver, UnboundedSender}, @@ -61,7 +59,8 @@ pub enum PeerEvent { pub struct PeerInfo { pub addr: SocketAddr, pub last_seen: Instant, - pub games: Vec, + pub games: HashMap, + pub files: HashMap>, } #[derive(Debug)] @@ -80,7 +79,8 @@ impl PeerGameDB { let peer_info = PeerInfo { addr, last_seen: Instant::now(), - games: Vec::new(), + games: HashMap::new(), + files: HashMap::new(), }; self.peers.insert(addr, peer_info); log::info!("Added peer: {addr}"); @@ -92,12 +92,28 @@ impl PeerGameDB { pub fn update_peer_games(&mut self, addr: SocketAddr, games: Vec) { if let Some(peer) = self.peers.get_mut(&addr) { - peer.games = games; + let mut map = HashMap::with_capacity(games.len()); + for game in games { + map.insert(game.id.clone(), game); + } + peer.games = map; peer.last_seen = Instant::now(); log::info!("Updated games for peer: {addr}"); } } + pub fn update_peer_game_files( + &mut self, + addr: SocketAddr, + game_id: &str, + files: Vec, + ) { + if let Some(peer) = self.peers.get_mut(&addr) { + peer.files.insert(game_id.to_string(), files); + peer.last_seen = Instant::now(); + } + } + pub fn update_last_seen(&mut self, addr: &SocketAddr) { if let Some(peer) = self.peers.get_mut(addr) { peer.last_seen = Instant::now(); @@ -105,23 +121,40 @@ impl PeerGameDB { } pub fn get_all_games(&self) -> Vec { - let mut all_games = Vec::new(); + let mut aggregated: HashMap = HashMap::new(); for peer in self.peers.values() { - all_games.extend(peer.games.clone()); + for game in peer.games.values() { + aggregated + .entry(game.id.clone()) + .and_modify(|existing| { + if let (Some(ref new_version), Some(ref current)) = + (&game.eti_game_version, &existing.eti_game_version) + { + if new_version > current { + existing.eti_game_version = Some(new_version.clone()); + } + } else if existing.eti_game_version.is_none() { + existing.eti_game_version = game.eti_game_version.clone(); + } + }) + .or_insert_with(|| game.clone()); + } } - all_games + + let mut games: Vec = aggregated.into_values().collect(); + games.sort_by(|a, b| a.name.cmp(&b.name)); + games } pub fn get_latest_version_for_game(&self, game_id: &str) -> Option { let mut latest_version: Option = None; for peer in self.peers.values() { - if let Some(game) = peer.games.iter().find(|g| g.id == game_id) { + if let Some(game) = peer.games.get(game_id) { if let Some(ref version) = game.eti_game_version { match &latest_version { None => latest_version = Some(version.clone()), Some(current_latest) => { - // Simple string comparison for now - could use semver if version > current_latest { latest_version = Some(version.clone()); } @@ -138,6 +171,31 @@ impl PeerGameDB { self.peers.keys().copied().collect() } + pub fn peers_with_game(&self, game_id: &str) -> Vec { + self.peers + .iter() + .filter(|(_, peer)| peer.games.contains_key(game_id)) + .map(|(addr, _)| *addr) + .collect() + } + + pub fn game_files_for(&self, game_id: &str) -> Vec<(SocketAddr, Vec)> { + self.peers + .iter() + .filter_map(|(addr, peer)| peer.files.get(game_id).cloned().map(|files| (*addr, files))) + .collect() + } + + pub fn aggregated_game_files(&self, game_id: &str) -> Vec { + let mut seen: HashMap = HashMap::new(); + for (_, files) in self.game_files_for(game_id) { + for file in files { + seen.entry(file.relative_path.clone()).or_insert(file); + } + } + seen.into_values().collect() + } + pub fn get_stale_peers(&self, timeout: Duration) -> Vec { self.peers .iter() @@ -194,42 +252,187 @@ async fn initial_peer_alive_check(conn: &mut Connection) -> bool { false } -async fn receive_game_file( - conn: &mut Connection, - desc: &GameFileDescription, - games_folder: &str, +const CHUNK_SIZE: u64 = 512 * 1024; + +#[derive(Debug, Clone)] +struct DownloadChunk { + relative_path: String, + offset: u64, + length: u64, +} + +#[derive(Debug, Default)] +struct PeerDownloadPlan { + chunks: Vec, + whole_files: Vec, +} + +async fn prepare_game_storage( + games_folder: &Path, + file_descs: &[GameFileDescription], ) -> eyre::Result<()> { - log::info!("downloading: {desc:?}"); - - let stream = conn.open_bidirectional_stream().await?; - let (mut rx, mut tx) = stream.split(); - - let request = Request::GetGameFileData(desc.clone()); - - // request file - tx.write_all(&request.encode()).await?; - - // create file - let path = PathBuf::from(&games_folder).join(&desc.relative_path); - let mut file = File::create(&path)?; - - // receive file contents - while let Some(data) = rx.receive().await? { - file.write_all(&data)?; + for desc in file_descs { + let path = games_folder.join(&desc.relative_path); + if desc.is_dir { + tokio::fs::create_dir_all(&path).await?; + } else { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file_size = desc.file_size().unwrap_or(0); + let mut file = OpenOptions::new() + .create(true) + .write(true) + .open(&path) + .await?; + file.set_len(file_size).await?; + } } - log::debug!("file download complete: {}", path.display()); - - tx.close().await?; Ok(()) } -async fn download_game_files( +fn build_peer_plans( + peers: &[SocketAddr], + file_descs: &[GameFileDescription], +) -> HashMap { + let mut plans: HashMap = HashMap::new(); + if peers.is_empty() { + return plans; + } + + let mut peer_index = 0usize; + + for desc in file_descs.iter().filter(|d| !d.is_dir) { + if let Some(size) = desc.file_size() { + if size == 0 { + let peer = peers[peer_index % peers.len()]; + peer_index += 1; + plans.entry(peer).or_default().chunks.push(DownloadChunk { + relative_path: desc.relative_path.clone(), + offset: 0, + length: 0, + }); + continue; + } + + let mut offset = 0u64; + while offset < size { + let length = std::cmp::min(CHUNK_SIZE, size - offset); + let peer = peers[peer_index % peers.len()]; + peer_index += 1; + plans.entry(peer).or_default().chunks.push(DownloadChunk { + relative_path: desc.relative_path.clone(), + offset, + length, + }); + offset += length; + } + } else { + let peer = peers[peer_index % peers.len()]; + peer_index += 1; + plans + .entry(peer) + .or_default() + .whole_files + .push(desc.clone()); + } + } + + plans +} + +async fn download_chunk( + conn: &mut Connection, + base_dir: &Path, game_id: &str, - game_file_descs: Vec, - games_folder: String, - peer_addr: SocketAddr, - tx_notify_ui: UnboundedSender, + chunk: &DownloadChunk, ) -> eyre::Result<()> { + let stream = conn.open_bidirectional_stream().await?; + let (mut rx, mut tx) = stream.split(); + + if chunk.length == 0 { + // fall back to whole file download when size is unknown + let request = Request::GetGameFileData(GameFileDescription { + game_id: game_id.to_string(), + relative_path: chunk.relative_path.clone(), + is_dir: false, + size: None, + }); + tx.write_all(&request.encode()).await?; + } else { + let request = Request::GetGameFileChunk { + game_id: game_id.to_string(), + relative_path: chunk.relative_path.clone(), + offset: chunk.offset, + length: chunk.length, + }; + tx.write_all(&request.encode()).await?; + } + + tx.close().await?; + + let path = base_dir.join(&chunk.relative_path); + let mut file = OpenOptions::new() + .create(true) + .write(true) + .open(&path) + .await?; + file.seek(std::io::SeekFrom::Start(chunk.offset)).await?; + + let mut remaining = chunk.length; + while let Some(bytes) = rx.receive().await? { + file.write_all(&bytes).await?; + if remaining == 0 { + continue; + } + remaining = remaining.saturating_sub(bytes.len() as u64); + if remaining == 0 { + break; + } + } + + file.flush().await?; + Ok(()) +} + +async fn download_whole_file( + conn: &mut Connection, + base_dir: &Path, + desc: &GameFileDescription, +) -> eyre::Result<()> { + let stream = conn.open_bidirectional_stream().await?; + let (mut rx, mut tx) = stream.split(); + + tx.write_all(&Request::GetGameFileData(desc.clone()).encode()) + .await?; + tx.close().await?; + + let path = base_dir.join(&desc.relative_path); + let mut file = OpenOptions::new() + .create(true) + .write(true) + .open(&path) + .await?; + file.seek(std::io::SeekFrom::Start(0)).await?; + + while let Some(bytes) = rx.receive().await? { + file.write_all(&bytes).await?; + } + + file.flush().await?; + Ok(()) +} + +async fn download_from_peer( + peer_addr: SocketAddr, + game_id: &str, + plan: PeerDownloadPlan, + games_folder: PathBuf, +) -> eyre::Result<()> { + if plan.chunks.is_empty() && plan.whole_files.is_empty() { + return Ok(()); + } + let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; let client = QuicClient::builder() @@ -242,32 +445,60 @@ async fn download_game_files( let mut conn = client.connect(conn).await?; conn.keep_alive(true)?; - let game_files = game_file_descs - .iter() - .filter(|desc| !desc.is_dir) - .filter(|desc| !desc.is_version_ini()) - .collect::>(); + let base_dir = games_folder; - if game_files.is_empty() { - eyre::bail!("game_file_descs empty: no game files to download"); + for chunk in &plan.chunks { + download_chunk(&mut conn, &base_dir, game_id, chunk).await?; } + for desc in &plan.whole_files { + download_whole_file(&mut conn, &base_dir, desc).await?; + } + + Ok(()) +} + +async fn download_game_files( + game_id: &str, + game_file_descs: Vec, + games_folder: String, + peers: Vec, + tx_notify_ui: UnboundedSender, +) -> eyre::Result<()> { + if peers.is_empty() { + eyre::bail!("no peers available for game {game_id}"); + } + + let base_dir = PathBuf::from(&games_folder); + prepare_game_storage(&base_dir, &game_file_descs).await?; + tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { id: game_id.to_string(), })?; - // receive all game files - for file_desc in game_files { - receive_game_file(&mut conn, file_desc, &games_folder).await?; + let plans = build_peer_plans(&peers, &game_file_descs); + + let mut tasks = Vec::new(); + for (peer_addr, plan) in plans { + let base_dir = base_dir.clone(); + let game_id = game_id.to_string(); + tasks.push(tokio::spawn(async move { + download_from_peer(peer_addr, &game_id, plan, base_dir).await + })); } - let version_file_desc = game_file_descs - .iter() - .find(|desc| desc.is_version_ini()) - .ok_or_else(|| eyre::eyre!("version.ini not found"))?; + let mut last_err: Option = None; + for handle in tasks { + match handle.await { + Ok(Ok(())) => {} + Ok(Err(e)) => last_err = Some(e), + Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")), + } + } - // receive version.ini - receive_game_file(&mut conn, version_file_desc, &games_folder).await?; + if let Some(err) = last_err { + return Err(err); + } log::info!("all files downloaded for game: {game_id}"); tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished { @@ -348,20 +579,102 @@ pub async fn run_peer( } PeerCommand::GetGame(id) => { log::info!("Requesting game from peers: {id}"); - // TODO: Implement game fetching from peers + let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) }; + if peers.is_empty() { + log::warn!("No peers have game {id}"); + continue; + } + + let peer_game_db = ctx.peer_game_db.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + tokio::spawn(async move { + let mut fetched_any = false; + for peer_addr in peers { + match request_game_details_from_peer(peer_addr, &id, peer_game_db.clone()) + .await + { + Ok(_) => { + log::info!("Fetched game file list for {id} from peer {peer_addr}"); + fetched_any = true; + } + Err(e) => { + log::error!( + "Failed to fetch game files for {id} from {peer_addr}: {e}" + ); + } + } + } + + if fetched_any { + let aggregated_files = + { peer_game_db.read().await.aggregated_game_files(&id) }; + + if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { + id: id.clone(), + file_descriptions: aggregated_files, + }) { + log::error!("Failed to send GotGameFiles event: {e}"); + } + } else { + log::warn!("Failed to retrieve game files for {id} from any peer"); + } + }); } PeerCommand::DownloadGameFiles { id, - file_descriptions: _, + file_descriptions, } => { log::info!("Got PeerCommand::DownloadGameFiles"); let games_folder = { ctx.game_dir.read().await.clone() }; - if let Some(_games_folder) = games_folder { - // TODO: Implement peer file downloading - log::info!("Would download game files for {id}"); - } else { + if games_folder.is_none() { log::error!("Cannot handle game file descriptions: games_folder is not set"); + continue; } + + let games_folder = games_folder.expect("checked above"); + let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) }; + if peers.is_empty() { + log::error!("No peers available to download game {id}"); + continue; + } + + let resolved_descriptions = if file_descriptions.is_empty() { + ctx.peer_game_db.read().await.aggregated_game_files(&id) + } else { + file_descriptions + }; + + if resolved_descriptions.is_empty() { + log::error!( + "No file descriptions available to download game {id}; request metadata first" + ); + continue; + } + + let tx_notify_ui = tx_notify_ui.clone(); + tokio::spawn(async move { + match download_game_files( + &id, + resolved_descriptions, + games_folder, + peers, + tx_notify_ui.clone(), + ) + .await + { + Ok(()) => {} + Err(e) => { + log::error!("Download failed for {id}: {e}"); + if let Err(send_err) = tx_notify_ui + .send(PeerEvent::DownloadGameFilesFailed { id: id.clone() }) + { + log::error!( + "Failed to send DownloadGameFilesFailed event: {send_err}" + ); + } + } + } + }); } PeerCommand::SetGameDir(game_dir) => { *ctx.game_dir.write().await = Some(game_dir.clone()); @@ -546,18 +859,57 @@ async fn handle_peer_stream( "Received GetGameFileData request for {} from peer", desc.relative_path ); - // TODO: Handle file data request - if let Err(e) = tx + + let maybe_game_dir = ctx.game_dir.read().await.clone(); + if let Some(game_dir) = maybe_game_dir { + let base_dir = PathBuf::from(game_dir); + send_game_file_data(&desc, &mut tx, &base_dir).await; + } else if let Err(e) = tx .send( Response::InvalidRequest( desc.relative_path.as_bytes().to_vec().into(), - "File transfer not implemented yet".to_string(), + "Game directory not set".to_string(), ) .encode(), ) .await { - log::error!("Failed to send GetGameFileData response: {e}"); + log::error!("Failed to send GetGameFileData error: {e}"); + } + } + Request::GetGameFileChunk { + game_id, + relative_path, + offset, + length, + } => { + log::info!( + "Received GetGameFileChunk request for {relative_path} from peer" + ); + + let maybe_game_dir = ctx.game_dir.read().await.clone(); + if let Some(game_dir) = maybe_game_dir { + let base_dir = PathBuf::from(game_dir); + send_game_file_chunk( + &game_id, + &relative_path, + offset, + length, + &mut tx, + &base_dir, + ) + .await; + } else if let Err(e) = tx + .send( + Response::InvalidRequest( + relative_path.as_bytes().to_vec().into(), + "Game directory not set".to_string(), + ) + .encode(), + ) + .await + { + log::error!("Failed to send GetGameFileChunk error: {e}"); } } Request::Invalid(_, _) => { @@ -669,14 +1021,13 @@ async fn request_games_from_peer( Response::ListGames(games) => { log::info!("Received {} games from peer {peer_addr}", games.len()); - // Update peer games in database - { + let aggregated_games = { let mut db = peer_game_db.write().await; - db.update_peer_games(peer_addr, games.clone()); - } + db.update_peer_games(peer_addr, games); + db.get_all_games() + }; - // Notify UI about updated games - if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(games)) { + if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) { log::error!("Failed to send ListGames event: {e}"); } } @@ -688,6 +1039,63 @@ async fn request_games_from_peer( Ok(()) } +async fn request_game_details_from_peer( + peer_addr: SocketAddr, + game_id: &str, + peer_game_db: Arc>, +) -> eyre::Result> { + let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; + + let client = QuicClient::builder() + .with_tls(CERT_PEM)? + .with_io("0.0.0.0:0")? + .with_limits(limits)? + .start()?; + + let conn = Connect::new(peer_addr).with_server_name("localhost"); + let mut conn = client.connect(conn).await?; + + let stream = conn.open_bidirectional_stream().await?; + let (mut rx, mut tx) = stream.split(); + + tx.send( + Request::GetGame { + id: game_id.to_string(), + } + .encode(), + ) + .await?; + tx.close().await?; + + let mut data = BytesMut::new(); + while let Ok(Some(bytes)) = rx.receive().await { + data.extend_from_slice(&bytes); + } + + let response = Response::decode(data.freeze()); + match response { + Response::GetGame { + id, + file_descriptions, + } => { + if id != game_id { + eyre::bail!("peer {peer_addr} responded with mismatched game id {id}"); + } + + { + let mut db = peer_game_db.write().await; + db.update_peer_game_files(peer_addr, game_id, file_descriptions.clone()); + } + + Ok(file_descriptions) + } + Response::GameNotFound(_) => { + eyre::bail!("peer {peer_addr} does not have game {game_id}") + } + _ => eyre::bail!("unexpected response from {peer_addr}: {response:?}"), + } +} + async fn run_ping_service( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, @@ -778,6 +1186,6 @@ async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result { let conn = Connect::new(peer_addr).with_server_name("localhost"); let mut conn = client.connect(conn).await?; - initial_peer_alive_check(&mut conn).await; - Ok(true) + let is_alive = initial_peer_alive_check(&mut conn).await; + Ok(is_alive) } diff --git a/crates/lanspread-peer/src/peer.rs b/crates/lanspread-peer/src/peer.rs index 9193014..4f3ad36 100644 --- a/crates/lanspread-peer/src/peer.rs +++ b/crates/lanspread-peer/src/peer.rs @@ -3,12 +3,16 @@ use std::{ sync::Arc, }; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use lanspread_db::db::{GameDB, GameFileDescription}; use lanspread_proto::{Message as _, Request, Response}; use lanspread_utils::maybe_addr; use s2n_quic::stream::SendStream; -use tokio::{io::AsyncReadExt, sync::RwLock, time::Instant}; +use tokio::{ + io::{AsyncReadExt, AsyncSeekExt}, + sync::RwLock, + time::Instant, +}; use walkdir::WalkDir; #[derive(Clone, Debug)] @@ -74,10 +78,26 @@ impl PeerRequestHandler { match get_relative_path(games_folder, entry.path()) { Ok(relative_path) => match relative_path.to_str() { Some(relative_path) => { + let is_dir = entry.file_type().is_dir(); + let size = if is_dir { + None + } else { + match entry.metadata() { + Ok(metadata) => Some(metadata.len()), + Err(e) => { + tracing::error!( + "Failed to read metadata for {}: {e}", + relative_path + ); + None + } + } + }; let game_file_description = GameFileDescription { game_id: id.clone(), relative_path: relative_path.to_string(), - is_dir: entry.file_type().is_dir(), + is_dir, + size, }; tracing::debug!("Found game file: {:?}", game_file_description); @@ -114,67 +134,105 @@ impl PeerRequestHandler { Request::ListGames => self.handle_list_games().await, Request::GetGame { id } => self.handle_get_game(id, games_folder).await, Request::GetGameFileData(_) => PeerRequestHandler::handle_get_game_file_data(), + Request::GetGameFileChunk { .. } => PeerRequestHandler::handle_get_game_file_data(), Request::Invalid(data, err_msg) => PeerRequestHandler::handle_invalid(data, err_msg), } } } +async fn stream_file_bytes( + tx: &mut SendStream, + base_dir: &Path, + relative_path: &str, + offset: u64, + length: Option, +) -> eyre::Result<()> { + let remote_addr = maybe_addr!(tx.connection().remote_addr()); + let game_file = base_dir.join(relative_path); + tracing::debug!( + "{remote_addr} streaming file bytes for peer: {:?}, offset: {offset}, length: {:?}", + game_file + ); + + let mut file = tokio::fs::File::open(&game_file).await?; + if offset > 0 { + file.seek(std::io::SeekFrom::Start(offset)).await?; + } + + let mut remaining = length.unwrap_or(u64::MAX); + let mut total_bytes = 0u64; + let mut last_total_bytes = 0u64; + let mut timestamp = Instant::now(); + let mut buf = vec![0u8; 64 * 1024]; + + while remaining > 0 { + let read_len = std::cmp::min(remaining, buf.len() as u64) as usize; + if read_len == 0 { + break; + } + + let bytes_read = file.read(&mut buf[..read_len]).await?; + if bytes_read == 0 { + break; + } + + tx.send(Bytes::copy_from_slice(&buf[..bytes_read])).await?; + remaining = remaining.saturating_sub(bytes_read as u64); + total_bytes += bytes_read as u64; + + if last_total_bytes + 10_000_000 < total_bytes { + let elapsed = timestamp.elapsed(); + let diff_bytes = total_bytes - last_total_bytes; + + if elapsed.as_secs_f64() >= 1.0 { + #[allow(clippy::cast_precision_loss)] + let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0); + tracing::debug!( + "{remote_addr} sending file data: {:?}, MB/s: {mb_per_s:.2}", + game_file + ); + last_total_bytes = total_bytes; + timestamp = Instant::now(); + } + } + } + + tracing::debug!( + "{remote_addr} finished streaming file bytes: {:?}, total_bytes: {total_bytes}", + game_file + ); + + tx.close().await?; + Ok(()) +} + pub async fn send_game_file_data( game_file_desc: &GameFileDescription, tx: &mut SendStream, game_dir: &Path, ) { - let remote_addr = maybe_addr!(tx.connection().remote_addr()); - - tracing::debug!("{remote_addr} peer requested game file data: {game_file_desc:?}",); - - // deliver file data to client - let game_file = game_dir.join(&game_file_desc.relative_path); - - let mut total_bytes = 0; - let mut last_total_bytes = 0; - let mut timestamp = Instant::now(); - - if let Ok(mut f) = tokio::fs::File::open(&game_file).await { - let mut buf = BytesMut::with_capacity(64 * 1024); - while let Ok(bytes_read) = f.read_buf(&mut buf).await { - if bytes_read == 0 { - break; - } - - total_bytes += bytes_read; - - if last_total_bytes + 10_000_000 < total_bytes { - let elapsed = timestamp.elapsed(); - let diff_bytes = total_bytes - last_total_bytes; - - if elapsed.as_secs_f64() >= 1.0 { - #[allow(clippy::cast_precision_loss)] - let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1000.0 * 1000.0); - - tracing::debug!( - "{remote_addr} sending file data: {game_file:?}, MB/s: {mb_per_s:.2}", - ); - last_total_bytes = total_bytes; - timestamp = Instant::now(); - } - } - - if let Err(e) = tx.send(buf.split_to(bytes_read).freeze()).await { - tracing::error!("{remote_addr} failed to send file data: {e}",); - break; - } - } - - tracing::debug!( - "{remote_addr} finished sending file data: {game_file:?}, total_bytes: {total_bytes}", + if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await { + let remote_addr = maybe_addr!(tx.connection().remote_addr()); + tracing::error!( + "{remote_addr} failed to stream file {}: {e}", + game_file_desc.relative_path ); - } else { - tracing::error!("{remote_addr} failed to open file: {}", game_file.display()); } +} - if let Err(e) = tx.close().await { - tracing::error!("{remote_addr} failed to close stream: {e}"); +pub async fn send_game_file_chunk( + game_id: &str, + relative_path: &str, + offset: u64, + length: u64, + tx: &mut SendStream, + game_dir: &Path, +) { + if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await { + let remote_addr = maybe_addr!(tx.connection().remote_addr()); + tracing::error!( + "{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}" + ); } } diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index d92e2c2..9ae1375 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -6,8 +6,16 @@ use serde::{Deserialize, Serialize}; pub enum Request { Ping, ListGames, - GetGame { id: String }, + GetGame { + id: String, + }, GetGameFileData(GameFileDescription), + GetGameFileChunk { + game_id: String, + relative_path: String, + offset: u64, + length: u64, + }, Invalid(Bytes, String), } diff --git a/crates/lanspread-server/src/quic.rs b/crates/lanspread-server/src/quic.rs index 65eefc5..7c077e0 100644 --- a/crates/lanspread-server/src/quic.rs +++ b/crates/lanspread-server/src/quic.rs @@ -5,7 +5,7 @@ use lanspread_proto::{Message as _, Request}; use lanspread_utils::maybe_addr; use s2n_quic::{Connection, Server, provider::limits::Limits, stream::BidirectionalStream}; -use crate::req::{RequestHandler, send_game_file_data}; +use crate::req::{RequestHandler, send_game_file_chunk, send_game_file_data}; static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem")); static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); @@ -35,9 +35,29 @@ async fn handle_bidi_stream(stream: BidirectionalStream, ctx: Arc) -> tracing::debug!("{remote_addr} msg: {request:?}"); // special case for now (send game file data to client) - if let Request::GetGameFileData(game_file_desc) = &request { - send_game_file_data(game_file_desc, &mut tx, &ctx.games_folder).await; - continue; + match &request { + Request::GetGameFileData(game_file_desc) => { + send_game_file_data(game_file_desc, &mut tx, &ctx.games_folder).await; + continue; + } + Request::GetGameFileChunk { + game_id, + relative_path, + offset, + length, + } => { + send_game_file_chunk( + game_id, + relative_path, + *offset, + *length, + &mut tx, + &ctx.games_folder, + ) + .await; + continue; + } + _ => {} } // normal case (handle request) diff --git a/crates/lanspread-server/src/req.rs b/crates/lanspread-server/src/req.rs index a3a93a3..7434f29 100644 --- a/crates/lanspread-server/src/req.rs +++ b/crates/lanspread-server/src/req.rs @@ -3,12 +3,16 @@ use std::{ sync::Arc, }; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use lanspread_db::db::{GameDB, GameFileDescription}; use lanspread_proto::{Message as _, Request, Response}; use lanspread_utils::maybe_addr; use s2n_quic::stream::SendStream; -use tokio::{io::AsyncReadExt, sync::RwLock, time::Instant}; +use tokio::{ + io::{AsyncReadExt, AsyncSeekExt}, + sync::RwLock, + time::Instant, +}; use walkdir::WalkDir; #[derive(Clone, Debug)] @@ -74,10 +78,26 @@ impl RequestHandler { match get_relative_path(games_folder, entry.path()) { Ok(relative_path) => match relative_path.to_str() { Some(relative_path) => { + let is_dir = entry.file_type().is_dir(); + let size = if is_dir { + None + } else { + match entry.metadata() { + Ok(metadata) => Some(metadata.len()), + Err(e) => { + tracing::error!( + "Failed to read metadata for {}: {e}", + relative_path + ); + None + } + } + }; let game_file_description = GameFileDescription { game_id: id.clone(), relative_path: relative_path.to_string(), - is_dir: entry.file_type().is_dir(), + is_dir, + size, }; tracing::debug!("Found game file: {:?}", game_file_description); @@ -114,67 +134,105 @@ impl RequestHandler { Request::ListGames => self.handle_list_games().await, Request::GetGame { id } => self.handle_get_game(id, games_folder).await, Request::GetGameFileData(_) => RequestHandler::handle_get_game_file_data(), + Request::GetGameFileChunk { .. } => RequestHandler::handle_get_game_file_data(), Request::Invalid(data, err_msg) => RequestHandler::handle_invalid(data, err_msg), } } } +async fn stream_file_bytes( + tx: &mut SendStream, + base_dir: &Path, + relative_path: &str, + offset: u64, + length: Option, +) -> eyre::Result<()> { + let remote_addr = maybe_addr!(tx.connection().remote_addr()); + let game_file = base_dir.join(relative_path); + tracing::debug!( + "{remote_addr} streaming file bytes: {:?}, offset: {offset}, length: {:?}", + game_file + ); + + let mut file = tokio::fs::File::open(&game_file).await?; + if offset > 0 { + file.seek(std::io::SeekFrom::Start(offset)).await?; + } + + let mut remaining = length.unwrap_or(u64::MAX); + let mut total_bytes = 0u64; + let mut last_total_bytes = 0u64; + let mut timestamp = Instant::now(); + let mut buf = vec![0u8; 64 * 1024]; + + while remaining > 0 { + let read_len = std::cmp::min(remaining, buf.len() as u64) as usize; + if read_len == 0 { + break; + } + + let bytes_read = file.read(&mut buf[..read_len]).await?; + if bytes_read == 0 { + break; + } + + tx.send(Bytes::copy_from_slice(&buf[..bytes_read])).await?; + remaining = remaining.saturating_sub(bytes_read as u64); + total_bytes += bytes_read as u64; + + if last_total_bytes + 10_000_000 < total_bytes { + let elapsed = timestamp.elapsed(); + let diff_bytes = total_bytes - last_total_bytes; + + if elapsed.as_secs_f64() >= 1.0 { + #[allow(clippy::cast_precision_loss)] + let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0); + tracing::debug!( + "{remote_addr} sending file data: {:?}, MB/s: {mb_per_s:.2}", + game_file + ); + last_total_bytes = total_bytes; + timestamp = Instant::now(); + } + } + } + + tracing::debug!( + "{remote_addr} finished streaming file bytes: {:?}, total_bytes: {total_bytes}", + game_file + ); + + tx.close().await?; + Ok(()) +} + pub(crate) async fn send_game_file_data( game_file_desc: &GameFileDescription, tx: &mut SendStream, game_dir: &Path, ) { - let remote_addr = maybe_addr!(tx.connection().remote_addr()); - - tracing::debug!("{remote_addr} client requested game file data: {game_file_desc:?}",); - - // deliver file data to client - let game_file = game_dir.join(&game_file_desc.relative_path); - - let mut total_bytes = 0; - let mut last_total_bytes = 0; - let mut timestamp = Instant::now(); - - if let Ok(mut f) = tokio::fs::File::open(&game_file).await { - let mut buf = BytesMut::with_capacity(64 * 1024); - while let Ok(bytes_read) = f.read_buf(&mut buf).await { - if bytes_read == 0 { - break; - } - - total_bytes += bytes_read; - - if last_total_bytes + 10_000_000 < total_bytes { - let elapsed = timestamp.elapsed(); - let diff_bytes = total_bytes - last_total_bytes; - - if elapsed.as_secs_f64() >= 1.0 { - #[allow(clippy::cast_precision_loss)] - let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1000.0 * 1000.0); - - tracing::debug!( - "{remote_addr} sending file data: {game_file:?}, MB/s: {mb_per_s:.2}", - ); - last_total_bytes = total_bytes; - timestamp = Instant::now(); - } - } - - if let Err(e) = tx.send(buf.split_to(bytes_read).freeze()).await { - tracing::error!("{remote_addr} failed to send file data: {e}",); - break; - } - } - - tracing::debug!( - "{remote_addr} finished sending file data: {game_file:?}, total_bytes: {total_bytes}", + if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await { + let remote_addr = maybe_addr!(tx.connection().remote_addr()); + tracing::error!( + "{remote_addr} failed to stream file {}: {e}", + game_file_desc.relative_path ); - } else { - tracing::error!("{remote_addr} failed to open file: {}", game_file.display()); } +} - if let Err(e) = tx.close().await { - tracing::error!("{remote_addr} failed to close stream: {e}"); +pub(crate) async fn send_game_file_chunk( + game_id: &str, + relative_path: &str, + offset: u64, + length: u64, + tx: &mut SendStream, + game_dir: &Path, +) { + if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await { + let remote_addr = maybe_addr!(tx.connection().remote_addr()); + tracing::error!( + "{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}" + ); } }