//! Request dispatch for a single bidirectional QUIC stream. use std::net::SocketAddr; use futures::{SinkExt, StreamExt}; use lanspread_db::db::{Game, GameFileDescription}; use lanspread_proto::{LibraryDelta, Message, Request, Response}; use s2n_quic::stream::{BidirectionalStream, SendStream}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use crate::{ context::PeerCtx, error::PeerError, events, local_games::{get_game_file_descriptions, is_local_dir_name, local_download_matches_catalog}, peer::{send_game_file_chunk, send_game_file_data}, services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync}, }; type ResponseWriter = FramedWrite; /// Handles a bidirectional stream from a peer. pub(super) async fn handle_peer_stream( stream: BidirectionalStream, ctx: PeerCtx, remote_addr: Option, ) -> eyre::Result<()> { let (rx, tx) = stream.split(); let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); log::trace!("{remote_addr:?} peer stream opened"); loop { let next_message = tokio::select! { () = ctx.shutdown.cancelled() => break, next_message = framed_rx.next() => next_message, }; match next_message { Some(Ok(data)) => { log::trace!( "{:?} msg: (raw): {}", remote_addr, String::from_utf8_lossy(&data) ); let request = Request::decode(data.freeze()); log::debug!("{remote_addr:?} msg: {request:?}"); note_peer_activity(&ctx, remote_addr).await; framed_tx = dispatch_request(&ctx, remote_addr, request, framed_tx).await; } Some(Err(err)) => { log::error!("{remote_addr:?} peer stream error: {err}"); break; } None => { log::trace!("{remote_addr:?} peer stream closed"); break; } } } Ok(()) } async fn dispatch_request( ctx: &PeerCtx, remote_addr: Option, request: Request, framed_tx: ResponseWriter, ) -> ResponseWriter { match request { Request::Ping => send_response(framed_tx, Response::Pong, "pong").await, Request::Hello(hello) => match accept_inbound_hello(ctx, remote_addr, hello).await { Ok(ack) => send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await, Err(err) => { log::error!("Failed to accept inbound hello: {err}"); send_response( framed_tx, Response::InternalPeerError(err.to_string()), "HelloAck", ) .await } }, Request::ListGames => handle_list_games(ctx, framed_tx).await, Request::LibraryDelta { peer_id, delta } => { handle_library_delta(ctx, peer_id, delta).await; framed_tx } Request::GetGame { id } => handle_get_game(ctx, id, framed_tx).await, Request::GetGameFileData(desc) => handle_file_data_request(ctx, desc, framed_tx).await, Request::GetGameFileChunk { game_id, relative_path, offset, length, } => { handle_file_chunk_request(ctx, game_id, relative_path, offset, length, framed_tx).await } Request::Goodbye { peer_id } => { handle_goodbye(ctx, remote_addr, peer_id).await; framed_tx } Request::Invalid(_, _) => { log::error!("Received invalid request from peer"); framed_tx } } } async fn note_peer_activity(ctx: &PeerCtx, remote_addr: Option) { if let Some(addr) = remote_addr { ctx.peer_game_db .write() .await .update_last_seen_by_addr(&addr); } } async fn send_response( mut framed_tx: ResponseWriter, response: Response, label: &str, ) -> ResponseWriter { if let Err(err) = framed_tx.send(response.encode()).await { log::error!("Failed to send {label} response: {err}"); } framed_tx } async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> ResponseWriter { log::info!("Received ListGames request from peer"); let snapshot = { let db_guard = ctx.local_game_db.read().await; if let Some(db) = db_guard.as_ref() { db.all_games().into_iter().cloned().collect::>() } else { log::info!("Local game database not yet loaded, responding with empty game list"); Vec::new() } }; let games = if snapshot.is_empty() { snapshot } else { let active_operations = ctx.active_operations.read().await; snapshot .into_iter() .filter(|game| !active_operations.contains_key(&game.id)) .collect() }; send_response(framed_tx, Response::ListGames(games), "ListGames").await } async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelta) { let applied = { let mut db = ctx.peer_game_db.write().await; db.apply_library_delta(&peer_id, delta) }; if applied { events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await; } else { let addr = { let db = ctx.peer_game_db.read().await; db.peer_addr(&peer_id) }; let Some(addr) = addr else { log::debug!("Ignoring library delta from unknown peer {peer_id}"); return; }; spawn_library_resync(HandshakeCtx::from_peer_ctx(ctx), addr, peer_id, "resync"); } } async fn handle_get_game(ctx: &PeerCtx, id: String, framed_tx: ResponseWriter) -> ResponseWriter { log::info!("Received GetGame request for {id} from peer"); let response = get_game_response(ctx, id).await; send_response(framed_tx, response, "GetGame").await } async fn get_game_response(ctx: &PeerCtx, id: String) -> Response { let game_dir = ctx.game_dir.read().await.clone(); if !can_serve_game(ctx, &game_dir, &id).await { return Response::GameNotFound(id); } match get_game_file_descriptions(&id, &game_dir).await { Ok(file_descriptions) => Response::GetGame { id, file_descriptions, }, Err(PeerError::FileSizeDetermination { path, source }) => { let error_msg = format!("Failed to determine file size for {path}: {source}"); log::error!("File size determination error for game {id}: {error_msg}"); Response::InternalPeerError(error_msg) } Err(err) => { log::error!("Failed to get game file descriptions for {id}: {err}"); Response::GameNotFound(id) } } } async fn can_serve_game(ctx: &PeerCtx, game_dir: &std::path::Path, game_id: &str) -> bool { let active_operations = ctx.active_operations.read().await; let catalog = ctx.catalog.read().await; local_download_matches_catalog(game_dir, game_id, &active_operations, &catalog).await } async fn can_dispatch_file_transfer( ctx: &PeerCtx, game_dir: &std::path::Path, game_id: &str, relative_path: &str, ) -> bool { !path_points_inside_local(game_id, relative_path) && can_serve_game(ctx, game_dir, game_id).await } fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool { let normalised = relative_path.replace('\\', "/"); let mut parts = normalised.split('/').filter(|part| !part.is_empty()); match (parts.next(), parts.next()) { (Some(first), _) if is_local_dir_name(first) => true, (Some(first), Some(second)) if first == game_id && is_local_dir_name(second) => true, _ => false, } } use std::sync::atomic::{AtomicU64, Ordering}; static NEXT_TRANSFER_ID: AtomicU64 = AtomicU64::new(1); struct TransferGuard { game_id: String, id: u64, active_outbound_transfers: crate::context::OutboundTransfers, tx_notify_ui: tokio::sync::mpsc::UnboundedSender, } impl TransferGuard { async fn new( game_id: String, active_outbound_transfers: crate::context::OutboundTransfers, tx_notify_ui: tokio::sync::mpsc::UnboundedSender, shutdown: &tokio_util::sync::CancellationToken, ) -> (Self, tokio_util::sync::CancellationToken) { let id = NEXT_TRANSFER_ID.fetch_add(1, Ordering::SeqCst); let token = shutdown.child_token(); { let mut active = active_outbound_transfers.write().await; active .entry(game_id.clone()) .or_default() .push((id, token.clone())); } let _ = tx_notify_ui.send(crate::PeerEvent::OutboundTransferCountChanged); ( Self { game_id, id, active_outbound_transfers, tx_notify_ui, }, token, ) } } impl Drop for TransferGuard { fn drop(&mut self) { let game_id = self.game_id.clone(); let id = self.id; let active_outbound_transfers = self.active_outbound_transfers.clone(); let tx_notify_ui = self.tx_notify_ui.clone(); tokio::spawn(async move { { let mut active = active_outbound_transfers.write().await; if let Some(tokens) = active.get_mut(&game_id) { tokens.retain(|(tid, _)| *tid != id); if tokens.is_empty() { active.remove(&game_id); } } } let _ = tx_notify_ui.send(crate::PeerEvent::OutboundTransferCountChanged); }); } } async fn handle_file_data_request( ctx: &PeerCtx, desc: GameFileDescription, framed_tx: ResponseWriter, ) -> ResponseWriter { log::info!( "Received GetGameFileData request for {} from peer", desc.relative_path ); let (guard, cancel_token) = TransferGuard::new( desc.game_id.clone(), ctx.active_outbound_transfers.clone(), ctx.tx_notify_ui.clone(), &ctx.shutdown, ) .await; let mut tx = framed_tx.into_inner(); let game_dir = ctx.game_dir.read().await.clone(); if !can_dispatch_file_transfer(ctx, &game_dir, &desc.game_id, &desc.relative_path).await { log::info!( "Declining GetGameFileData for {} because the game is not currently transferable", desc.relative_path ); drop(guard); let _ = tx.close().await; return FramedWrite::new(tx, LengthDelimitedCodec::new()); } send_game_file_data(&desc, &mut tx, &game_dir, cancel_token).await; drop(guard); FramedWrite::new(tx, LengthDelimitedCodec::new()) } async fn handle_file_chunk_request( ctx: &PeerCtx, game_id: String, relative_path: String, offset: u64, length: u64, framed_tx: ResponseWriter, ) -> ResponseWriter { log::info!( "Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})" ); let (guard, cancel_token) = TransferGuard::new( game_id.clone(), ctx.active_outbound_transfers.clone(), ctx.tx_notify_ui.clone(), &ctx.shutdown, ) .await; let mut tx = framed_tx.into_inner(); let game_dir = ctx.game_dir.read().await.clone(); if !can_dispatch_file_transfer(ctx, &game_dir, &game_id, &relative_path).await { log::info!( "Declining GetGameFileChunk for {relative_path} because the game is not currently transferable" ); drop(guard); let _ = tx.close().await; return FramedWrite::new(tx, LengthDelimitedCodec::new()); } send_game_file_chunk( &game_id, &relative_path, offset, length, &mut tx, &game_dir, cancel_token, ) .await; drop(guard); FramedWrite::new(tx, LengthDelimitedCodec::new()) } async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option, peer_id: String) { log::info!("Received Goodbye from peer {peer_id}"); let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) }; let Some(peer) = removed else { return }; events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, peer.addr).await; events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await; } #[cfg(test)] mod tests { use std::{ path::{Path, PathBuf}, sync::Arc, }; use lanspread_db::db::GameCatalog; use tokio::sync::{RwLock, mpsc}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use super::*; use crate::{ UnpackFuture, Unpacker, context::{Ctx, OperationKind}, peer_db::PeerGameDB, test_support::TempDir, }; struct NoopUnpacker; impl Unpacker for NoopUnpacker { fn unpack<'a>(&'a self, _archive: &'a Path, _dest: &'a Path) -> UnpackFuture<'a> { Box::pin(async { Ok(()) }) } } fn write_file(path: &Path, bytes: &[u8]) { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).expect("parent dir should be created"); } std::fs::write(path, bytes).expect("file should be written"); } fn test_ctx(game_dir: PathBuf, catalog: GameCatalog) -> PeerCtx { let (tx_notify_ui, _rx) = mpsc::unbounded_channel(); Ctx::new( Arc::new(RwLock::new(PeerGameDB::new())), "peer".to_string(), game_dir.clone(), game_dir.join(".test-state"), Arc::new(NoopUnpacker), CancellationToken::new(), TaskTracker::new(), Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(std::collections::HashMap::new())), ) .to_peer_ctx(tx_notify_ui) } #[test] fn local_relative_paths_are_never_transferable() { assert!(path_points_inside_local("game", "game/local/save.dat")); assert!(path_points_inside_local("game", "local/save.dat")); assert!(path_points_inside_local("game", "game\\local\\save.dat")); assert!(!path_points_inside_local("game", "game/version.ini")); assert!(!path_points_inside_local("game", "game/archive.eti")); } #[tokio::test] async fn get_game_response_respects_serve_gates() { let temp = TempDir::new("lanspread-stream"); write_file(&temp.path().join("ready").join("version.ini"), b"20250101"); write_file( &temp.path().join("non-catalog").join("version.ini"), b"20250101", ); write_file(&temp.path().join("active").join("version.ini"), b"20250101"); write_file( &temp.path().join("wrong-version").join("version.ini"), b"20260101", ); std::fs::create_dir_all(temp.path().join("missing-sentinel")) .expect("missing sentinel root should be created"); let mut catalog = GameCatalog::empty(); catalog.insert("ready".to_string(), Some("20250101".to_string())); catalog.insert("active".to_string(), Some("20250101".to_string())); catalog.insert("missing-sentinel".to_string(), Some("20250101".to_string())); catalog.insert("wrong-version".to_string(), Some("20250101".to_string())); let ctx = test_ctx(temp.path().to_path_buf(), catalog); ctx.active_operations .write() .await .insert("active".to_string(), OperationKind::Downloading); assert!(matches!( get_game_response(&ctx, "ready".to_string()).await, Response::GetGame { id, .. } if id == "ready" )); assert!(matches!( get_game_response(&ctx, "non-catalog".to_string()).await, Response::GameNotFound(id) if id == "non-catalog" )); assert!(matches!( get_game_response(&ctx, "active".to_string()).await, Response::GameNotFound(id) if id == "active" )); assert!(matches!( get_game_response(&ctx, "wrong-version".to_string()).await, Response::GameNotFound(id) if id == "wrong-version" )); assert!(matches!( get_game_response(&ctx, "missing-sentinel".to_string()).await, Response::GameNotFound(id) if id == "missing-sentinel" )); } #[tokio::test] async fn file_transfer_dispatch_respects_serve_gates() { let temp = TempDir::new("lanspread-stream"); write_file(&temp.path().join("ready").join("version.ini"), b"20250101"); write_file( &temp.path().join("non-catalog").join("version.ini"), b"20250101", ); write_file(&temp.path().join("active").join("version.ini"), b"20250101"); write_file( &temp.path().join("wrong-version").join("version.ini"), b"20260101", ); std::fs::create_dir_all(temp.path().join("missing-sentinel")) .expect("missing sentinel root should be created"); let mut catalog = GameCatalog::empty(); catalog.insert("ready".to_string(), Some("20250101".to_string())); catalog.insert("active".to_string(), Some("20250101".to_string())); catalog.insert("missing-sentinel".to_string(), Some("20250101".to_string())); catalog.insert("wrong-version".to_string(), Some("20250101".to_string())); let ctx = test_ctx(temp.path().to_path_buf(), catalog); ctx.active_operations .write() .await .insert("active".to_string(), OperationKind::Downloading); assert!(can_dispatch_file_transfer(&ctx, temp.path(), "ready", "ready/version.ini").await); assert!( !can_dispatch_file_transfer( &ctx, temp.path(), "non-catalog", "non-catalog/version.ini", ) .await ); assert!( !can_dispatch_file_transfer(&ctx, temp.path(), "active", "active/version.ini").await ); assert!( !can_dispatch_file_transfer( &ctx, temp.path(), "wrong-version", "wrong-version/version.ini", ) .await ); assert!( !can_dispatch_file_transfer( &ctx, temp.path(), "missing-sentinel", "missing-sentinel/archive.eti", ) .await ); assert!( !can_dispatch_file_transfer(&ctx, temp.path(), "ready", "ready/local/save.dat").await ); } }