use std::{ path::{Path, PathBuf}, sync::Arc, }; use bytes::{Bytes, BytesMut}; use lanspread_db::db::{GameDB, GameFileDescription}; use lanspread_proto::{Message as _, Request, Response}; use lanspread_utils::maybe_addr; use s2n_quic::stream::SendStream; use tokio::{io::AsyncReadExt, sync::RwLock, time::Instant}; use walkdir::WalkDir; #[derive(Clone, Debug)] pub(crate) struct RequestHandler { db: Arc>, } impl RequestHandler { pub(crate) fn new(games: GameDB) -> RequestHandler { RequestHandler { db: Arc::new(RwLock::new(games)), } } pub(crate) async fn handle_request( &self, request: Request, games_folder: &Path, tx: &mut SendStream, ) -> eyre::Result<()> { let remote_addr = maybe_addr!(tx.connection().remote_addr()); // process request and generate response let response = self.process_request(request, games_folder).await; tracing::trace!("{remote_addr} server response: {response:?}"); // write response back to client tx.send(response.encode()).await?; // close the stream tx.close().await?; Ok(()) } fn handle_ping() -> Response { Response::Pong } async fn handle_list_games(&self) -> Response { let db = self.db.read().await; Response::ListGames(db.all_games().into_iter().cloned().collect()) } async fn handle_get_game(&self, id: String, games_folder: &Path) -> Response { if self.db.read().await.get_game_by_id(&id).is_none() { tracing::error!("Game not found in DB: {id}"); return Response::GameNotFound(id); } let game_dir = games_folder.join(&id); if !game_dir.exists() { tracing::error!("Game folder does not exist: {}", game_dir.display()); return Response::GameNotFound(id); } let mut game_files_descs: Vec = vec![]; for entry in WalkDir::new(&game_dir) .into_iter() .filter_map(std::result::Result::ok) { match get_relative_path(games_folder, entry.path()) { Ok(relative_path) => match relative_path.to_str() { Some(relative_path) => { let game_file_description = GameFileDescription { game_id: id.clone(), relative_path: relative_path.to_string(), is_dir: entry.file_type().is_dir(), }; tracing::debug!("Found game file: {:?}", game_file_description); game_files_descs.push(game_file_description); } None => { tracing::error!("Failed to get relative path: {relative_path:?}",); } }, Err(e) => { tracing::error!("Failed to get relative path: {e}"); } } } Response::GetGame { id, file_descriptions: game_files_descs, } } fn handle_get_game_file_data() -> Response { Response::InvalidRequest(Bytes::new(), "Not implemented".to_string()) } fn handle_invalid(data: Bytes, err_msg: String) -> Response { Response::InvalidRequest(data, err_msg) } pub(crate) async fn process_request(&self, request: Request, games_folder: &Path) -> Response { match request { Request::Ping => RequestHandler::handle_ping(), Request::ListGames => self.handle_list_games().await, Request::GetGame { id } => self.handle_get_game(id, games_folder).await, Request::GetGameFileData(_) => RequestHandler::handle_get_game_file_data(), Request::Invalid(data, err_msg) => RequestHandler::handle_invalid(data, err_msg), } } } pub(crate) async fn send_game_file_data( game_file_desc: &GameFileDescription, tx: &mut SendStream, game_dir: &Path, ) { let remote_addr = maybe_addr!(tx.connection().remote_addr()); tracing::debug!("{remote_addr} client requested game file data: {game_file_desc:?}",); // deliver file data to client let game_file = game_dir.join(&game_file_desc.relative_path); let mut total_bytes = 0; let mut last_total_bytes = 0; let mut timestamp = Instant::now(); if let Ok(mut f) = tokio::fs::File::open(&game_file).await { let mut buf = BytesMut::with_capacity(64 * 1024); while let Ok(bytes_read) = f.read_buf(&mut buf).await { if bytes_read == 0 { break; } total_bytes += bytes_read; if last_total_bytes + 10_000_000 < total_bytes { let elapsed = timestamp.elapsed(); let diff_bytes = total_bytes - last_total_bytes; if elapsed.as_secs_f64() >= 1.0 { #[allow(clippy::cast_precision_loss)] let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1000.0 * 1000.0); tracing::debug!( "{remote_addr} sending file data: {game_file:?}, MB/s: {mb_per_s:.2}", ); last_total_bytes = total_bytes; timestamp = Instant::now(); } } if let Err(e) = tx.send(buf.split_to(bytes_read).freeze()).await { tracing::error!("{remote_addr} failed to send file data: {e}",); break; } } tracing::debug!( "{remote_addr} finished sending file data: {game_file:?}, total_bytes: {total_bytes}", ); } else { tracing::error!("{remote_addr} failed to open file: {}", game_file.display()); } if let Err(e) = tx.close().await { tracing::error!("{remote_addr} failed to close stream: {e}"); } } fn get_relative_path(base: &Path, deep_path: &Path) -> std::io::Result { let base_canonical = base.canonicalize()?; let full_canonical = deep_path.canonicalize()?; full_canonical .strip_prefix(&base_canonical) .map(std::path::Path::to_path_buf) .map_err(|_| std::io::Error::other("Path is not within base directory")) }