This commit is contained in:
2025-11-11 21:30:26 +01:00
parent d831179783
commit 9c1b94fa6a
9 changed files with 191 additions and 450 deletions
+7 -149
View File
@@ -1,145 +1,13 @@
use std::{
convert::TryInto,
path::{Path, PathBuf},
sync::Arc,
};
use std::{convert::TryInto, path::Path};
use bytes::Bytes;
use lanspread_db::db::{GameDB, GameFileDescription};
use lanspread_proto::{Message as _, Request, Response};
use lanspread_db::db::GameFileDescription;
use lanspread_utils::maybe_addr;
use s2n_quic::stream::SendStream;
use tokio::{
io::{AsyncReadExt, AsyncSeekExt},
sync::RwLock,
time::Instant,
};
use walkdir::WalkDir;
#[derive(Clone, Debug)]
pub struct PeerRequestHandler {
db: Arc<RwLock<GameDB>>,
}
impl PeerRequestHandler {
pub fn new(games: GameDB) -> PeerRequestHandler {
PeerRequestHandler {
db: Arc::new(RwLock::new(games)),
}
}
pub async fn handle_request(
&self,
request: Request,
games_folder: &Path,
tx: &mut SendStream,
) -> eyre::Result<()> {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
// process request and generate response
let response = self.process_request(request, games_folder).await;
tracing::trace!("{remote_addr} peer response: {response:?}");
// write response back to client
tx.send(response.encode()).await?;
// close the stream
tx.close().await?;
Ok(())
}
fn handle_ping() -> Response {
Response::Pong
}
async fn handle_list_games(&self) -> Response {
let db = self.db.read().await;
Response::ListGames(db.all_games().into_iter().cloned().collect())
}
async fn handle_get_game(&self, id: String, games_folder: &Path) -> Response {
if self.db.read().await.get_game_by_id(&id).is_none() {
tracing::error!("Game not found in DB: {id}");
return Response::GameNotFound(id);
}
let game_dir = games_folder.join(&id);
if !game_dir.exists() {
tracing::error!("Game folder does not exist: {}", game_dir.display());
return Response::GameNotFound(id);
}
let mut game_files_descs: Vec<GameFileDescription> = vec![];
for entry in WalkDir::new(&game_dir)
.into_iter()
.filter_map(std::result::Result::ok)
{
match get_relative_path(games_folder, entry.path()) {
Ok(relative_path) => match relative_path.to_str() {
Some(relative_path) => {
let is_dir = entry.file_type().is_dir();
let size = if is_dir {
None
} else {
match entry.metadata() {
Ok(metadata) => Some(metadata.len()),
Err(e) => {
tracing::error!(
"Failed to read metadata for {}: {e}",
relative_path
);
None
}
}
};
let game_file_description = GameFileDescription {
game_id: id.clone(),
relative_path: relative_path.to_string(),
is_dir,
size,
};
tracing::debug!("Found game file: {:?}", game_file_description);
game_files_descs.push(game_file_description);
}
None => {
tracing::error!("Failed to get relative path: {relative_path:?}",);
}
},
Err(e) => {
tracing::error!("Failed to get relative path: {e}");
}
}
}
Response::GetGame {
id,
file_descriptions: game_files_descs,
}
}
fn handle_get_game_file_data() -> Response {
Response::InvalidRequest(Bytes::new(), "Not implemented".to_string())
}
fn handle_invalid(data: Bytes, err_msg: String) -> Response {
Response::InvalidRequest(data, err_msg)
}
pub async fn process_request(&self, request: Request, games_folder: &Path) -> Response {
match request {
Request::Ping => PeerRequestHandler::handle_ping(),
Request::ListGames => self.handle_list_games().await,
Request::GetGame { id } => self.handle_get_game(id, games_folder).await,
Request::GetGameFileData(_) => PeerRequestHandler::handle_get_game_file_data(),
Request::GetGameFileChunk { .. } => PeerRequestHandler::handle_get_game_file_data(),
Request::Invalid(data, err_msg) => PeerRequestHandler::handle_invalid(data, err_msg),
}
}
}
async fn stream_file_bytes(
tx: &mut SendStream,
@@ -150,7 +18,7 @@ async fn stream_file_bytes(
) -> eyre::Result<()> {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
let game_file = base_dir.join(relative_path);
tracing::debug!(
log::debug!(
"{remote_addr} streaming file bytes for peer: {:?}, offset: {offset}, length: {length:?}",
game_file
);
@@ -189,7 +57,7 @@ async fn stream_file_bytes(
if elapsed.as_secs_f64() >= 1.0 {
#[allow(clippy::cast_precision_loss)]
let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0);
tracing::debug!(
log::debug!(
"{remote_addr} sending file data: {:?}, MB/s: {mb_per_s:.2}",
game_file
);
@@ -199,7 +67,7 @@ async fn stream_file_bytes(
}
}
tracing::debug!(
log::debug!(
"{remote_addr} finished streaming file bytes: {:?}, total_bytes: {total_bytes}",
game_file
);
@@ -215,7 +83,7 @@ pub async fn send_game_file_data(
) {
if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
tracing::error!(
log::error!(
"{remote_addr} failed to stream file {}: {e}",
game_file_desc.relative_path
);
@@ -232,18 +100,8 @@ pub async fn send_game_file_chunk(
) {
if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
tracing::error!(
log::error!(
"{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}"
);
}
}
fn get_relative_path(base: &Path, deep_path: &Path) -> std::io::Result<PathBuf> {
let base_canonical = base.canonicalize()?;
let full_canonical = deep_path.canonicalize()?;
full_canonical
.strip_prefix(&base_canonical)
.map(std::path::Path::to_path_buf)
.map_err(|_| std::io::Error::other("Path is not within base directory"))
}