[code][fix] improvements for LAN 202503

- more robust client <-> server connection
  - new client event: DownloadGameFilesFailed
  - 3 seconds to reconnect
  - retry forever if server is gone and never lose a UI request

- code cleanup here and there (mostly server)
This commit is contained in:
2025-03-20 19:39:32 +01:00
parent 19434cd1b1
commit 765447e6d1
9 changed files with 405 additions and 239 deletions

View File

@ -21,8 +21,10 @@ lanspread-utils = { path = "../lanspread-utils" }
# external
bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
eyre = { workspace = true }
gethostname = { workspace = true }
itertools = { workspace = true }
s2n-quic = { workspace = true }
serde_json = { workspace = true }
@ -30,4 +32,5 @@ semver = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
uuid = { workspace = true }
walkdir = { workspace = true }

View File

@ -2,27 +2,61 @@ mod cli;
mod quic;
mod req;
use std::{convert::Into, net::SocketAddr};
use std::{convert::Into, net::SocketAddr, time::Duration};
use chrono::{DateTime, Local};
use clap::Parser as _;
use cli::Cli;
use gethostname::gethostname;
use lanspread_compat::eti;
use lanspread_db::db::{Game, GameDB};
use lanspread_mdns::{
DaemonEvent, LANSPREAD_INSTANCE_NAME, LANSPREAD_SERVICE_TYPE, MdnsAdvertiser,
DaemonEvent,
LANSPREAD_INSTANCE_NAME,
LANSPREAD_SERVICE_TYPE,
MdnsAdvertiser,
};
use quic::run_server;
use tracing_subscriber::EnvFilter;
use uuid::Uuid;
fn spawn_mdns_task(server_addr: SocketAddr) -> eyre::Result<()> {
let mdns = MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, LANSPREAD_INSTANCE_NAME, server_addr)?;
let combined_str = if 1 == 2 {
let peer_id = Uuid::now_v7().simple().to_string();
let uidddd = Uuid::now_v7();
// TODO
let uidddd = uidddd
.get_timestamp()
.expect("failed to get timestamp from UUID")
.to_unix();
let local_datetime: DateTime<Local> =
DateTime::from_timestamp(i64::try_from(uidddd.0).unwrap_or(0), uidddd.1)
.expect("Failed to create DateTime from uuid unix timestamp")
.into();
dbg!(local_datetime);
let hostname = gethostname();
let mut hostname = hostname.to_str().unwrap_or("");
if hostname.len() + peer_id.len() > 63 {
hostname = &hostname[..63 - peer_id.len()];
}
format!("{hostname}-{peer_id}")
} else {
String::from(LANSPREAD_INSTANCE_NAME)
};
let mdns = MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, server_addr)?;
tokio::spawn(async move {
while let Ok(event) = mdns.monitor.recv() {
tracing::info!("mDNS: {:?}", &event);
tracing::debug!("mDNS: {:?}", &event);
if let DaemonEvent::Error(e) = event {
tracing::error!("mDNS: {e}");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
@ -46,6 +80,9 @@ async fn prepare_game_db(cli: &Cli) -> eyre::Result<GameDB> {
game_db.add_thumbnails(&cli.thumbs_dir);
game_db.all_games().iter().for_each(|game| {
tracing::debug!("Found game: {game}");
});
tracing::info!("Prepared game database with {} games", game_db.games.len());
Ok(game_db)
@ -73,5 +110,5 @@ async fn main() -> eyre::Result<()> {
let game_db = prepare_game_db(&cli).await?;
tracing::info!("Server listening on {server_addr}");
run_server(server_addr, game_db, cli.game_dir).await
crate::quic::run_server(server_addr, game_db, cli.game_dir).await
}

View File

@ -3,12 +3,7 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use lanspread_db::db::GameDB;
use lanspread_proto::{Message as _, Request};
use lanspread_utils::maybe_addr;
use s2n_quic::{
Connection, Server,
provider::limits::Limits,
stream::{ReceiveStream, SendStream},
};
use tokio::io::AsyncWriteExt as _;
use s2n_quic::{Connection, Server, provider::limits::Limits, stream::BidirectionalStream};
use crate::req::{RequestHandler, send_game_file_data};
@ -21,92 +16,65 @@ struct ServerCtx {
games_folder: PathBuf,
}
#[derive(Clone, Debug)]
struct ConnectionCtx {
server_ctx: Arc<ServerCtx>,
}
async fn handle_bidi_stream(stream: BidirectionalStream, ctx: Arc<ServerCtx>) -> eyre::Result<()> {
let (mut rx, mut tx) = stream.split();
#[derive(Clone, Debug)]
struct StreamCtx {
conn_ctx: Arc<ConnectionCtx>,
}
async fn handle_bidi_stream(
mut rx: ReceiveStream,
mut tx: SendStream,
ctx: Arc<StreamCtx>,
) -> eyre::Result<()> {
let remote_addr = maybe_addr!(rx.connection().remote_addr());
tracing::trace!("{remote_addr} stream opened");
// handle streams
while let Ok(Some(data)) = rx.receive().await {
tracing::trace!(
"{remote_addr} msg: (raw): {}",
String::from_utf8_lossy(&data)
);
loop {
match rx.receive().await {
Ok(Some(data)) => {
tracing::trace!(
"{remote_addr} msg: (raw): {}",
String::from_utf8_lossy(&data)
);
let request = Request::decode(data);
tracing::debug!("{remote_addr} msg: {request:?}");
let request = Request::decode(data);
tracing::debug!("{remote_addr} msg: {request:?}");
// special case for now (send game file data to client)
if let Request::GetGameFileData(game_file_desc) = &request {
send_game_file_data(
game_file_desc,
&mut tx,
&ctx.conn_ctx.server_ctx.games_folder,
)
.await;
continue;
}
// special case for now (send game file data to client)
if let Request::GetGameFileData(game_file_desc) = &request {
send_game_file_data(game_file_desc, &mut tx, &ctx.games_folder).await;
continue;
}
let response = ctx
.conn_ctx
.server_ctx
.handler
.handle_request(request, &ctx.conn_ctx.server_ctx.games_folder)
.await;
tracing::trace!("{remote_addr} server response: {response:?}");
let raw_response = response.encode();
tracing::trace!(
"{remote_addr} server response (raw): {}",
String::from_utf8_lossy(&raw_response)
);
// write response back to client
if let Err(e) = tx.write_all(&raw_response).await {
tracing::error!(?e);
}
// close the stream
if let Err(e) = tx.close().await {
tracing::error!(?e);
// normal case (handle request)
if let Err(e) = ctx
.handler
.handle_request(request, &ctx.games_folder, &mut tx)
.await
{
tracing::error!(?e, "{remote_addr} error handling request");
}
}
Ok(None) => {
tracing::trace!("{remote_addr} stream closed");
break;
}
Err(e) => {
tracing::error!("{remote_addr} stream error: {e}");
break;
}
}
}
Ok(())
}
async fn handle_connection(
mut connection: Connection,
ctx: Arc<ConnectionCtx>,
) -> eyre::Result<()> {
async fn handle_connection(mut connection: Connection, ctx: Arc<ServerCtx>) -> eyre::Result<()> {
let remote_addr = maybe_addr!(connection.remote_addr());
tracing::info!("{remote_addr} connected");
// handle streams
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
let ctx = ctx.clone();
let remote_addr = remote_addr.clone();
let (rx, tx) = stream.split();
let ctx = Arc::new(StreamCtx {
conn_ctx: ctx.clone(),
});
// spawn a new task for the stream
tokio::spawn(async move {
if let Err(e) = handle_bidi_stream(rx, tx, ctx).await {
if let Err(e) = handle_bidi_stream(stream, ctx).await {
tracing::error!("{remote_addr} stream error: {e}");
}
});
@ -121,8 +89,8 @@ pub(crate) async fn run_server(
games_folder: PathBuf,
) -> eyre::Result<()> {
let limits = Limits::default()
.with_max_idle_timeout(Duration::ZERO)?
.with_max_handshake_duration(Duration::from_secs(3))?;
.with_max_handshake_duration(Duration::from_secs(3))?
.with_max_idle_timeout(Duration::from_secs(3))?;
let mut server = Server::builder()
.with_tls((CERT_PEM, KEY_PEM))?
@ -130,23 +98,22 @@ pub(crate) async fn run_server(
.with_limits(limits)?
.start()?;
let server_ctx = Arc::new(ServerCtx {
let ctx = Arc::new(ServerCtx {
handler: RequestHandler::new(db),
games_folder,
});
while let Some(connection) = server.accept().await {
let conn_ctx = Arc::new(ConnectionCtx {
server_ctx: server_ctx.clone(),
});
let ctx = ctx.clone();
// spawn a new task for the connection
tokio::spawn(async move {
if let Err(e) = handle_connection(connection, conn_ctx).await {
if let Err(e) = handle_connection(connection, ctx).await {
tracing::error!("Connection error: {}", e);
}
});
}
tracing::info!("Server shutting down");
Ok(())
}

View File

@ -3,12 +3,12 @@ use std::{
sync::Arc,
};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use lanspread_db::db::{GameDB, GameFileDescription};
use lanspread_proto::{Request, Response};
use lanspread_proto::{Message as _, Request, Response};
use lanspread_utils::maybe_addr;
use s2n_quic::stream::SendStream;
use tokio::sync::RwLock;
use tokio::{io::AsyncReadExt, sync::RwLock, time::Instant};
use walkdir::WalkDir;
#[derive(Clone, Debug)]
@ -23,12 +23,33 @@ impl RequestHandler {
}
}
pub(crate) async fn handle_request(&self, request: Request, games_folder: &Path) -> Response {
pub(crate) async fn handle_request(
&self,
request: Request,
games_folder: &Path,
tx: &mut SendStream,
) -> eyre::Result<()> {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
// process request and generate response
let response = self.process_request(request, games_folder).await;
tracing::trace!("{remote_addr} server response: {response:?}");
// write response back to client
tx.send(response.encode()).await?;
// close the stream
tx.close().await?;
Ok(())
}
pub(crate) async fn process_request(&self, request: Request, games_folder: &Path) -> Response {
match request {
Request::Ping => Response::Pong,
Request::ListGames => {
let db = self.db.read().await;
Response::Games(db.all_games().into_iter().cloned().collect())
Response::ListGames(db.all_games().into_iter().cloned().collect())
}
Request::GetGame { id } => {
if self.db.read().await.get_game_by_id(&id).is_none() {
@ -71,19 +92,15 @@ impl RequestHandler {
}
}
Response::GetGame(game_files_descs)
Response::GetGame {
id,
file_descriptions: game_files_descs,
}
}
Request::GetGameFileData(_) => {
Response::InvalidRequest(Bytes::new(), "Not implemented".to_string())
}
Request::Invalid(data, err_msg) => {
tracing::error!(
"got invalid request from client (error: {}): {}",
err_msg,
String::from_utf8_lossy(&data)
);
Response::InvalidRequest(data, err_msg)
}
Request::Invalid(data, err_msg) => Response::InvalidRequest(data, err_msg),
}
}
}
@ -98,14 +115,48 @@ pub(crate) async fn send_game_file_data(
tracing::debug!("{remote_addr} client requested game file data: {game_file_desc:?}",);
// deliver file data to client
let path = game_dir.join(&game_file_desc.relative_path);
let game_file = game_dir.join(&game_file_desc.relative_path);
if let Ok(mut file) = tokio::fs::File::open(&path).await {
if let Err(e) = tokio::io::copy(&mut file, tx).await {
tracing::error!("{remote_addr} failed to send file data: {e}",);
let mut total_bytes = 0;
let mut last_total_bytes = 0;
let mut timestamp = Instant::now();
if let Ok(mut f) = tokio::fs::File::open(&game_file).await {
let mut buf = BytesMut::with_capacity(64 * 1024);
while let Ok(bytes_read) = f.read_buf(&mut buf).await {
if bytes_read == 0 {
break;
}
total_bytes += bytes_read;
if last_total_bytes + 10_000_000 < total_bytes {
let elapsed = timestamp.elapsed();
let diff_bytes = total_bytes - last_total_bytes;
if elapsed.as_secs_f64() >= 1.0 {
#[allow(clippy::cast_precision_loss)]
let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1000.0 * 1000.0);
tracing::debug!(
"{remote_addr} sending file data: {game_file:?}, MB/s: {mb_per_s:.2}",
);
last_total_bytes = total_bytes;
timestamp = Instant::now();
}
}
if let Err(e) = tx.send(buf.split_to(bytes_read).freeze()).await {
tracing::error!("{remote_addr} failed to send file data: {e}",);
break;
}
}
tracing::debug!(
"{remote_addr} finished sending file data: {game_file:?}, total_bytes: {total_bytes}",
);
} else {
tracing::error!("{remote_addr} failed to open file: {}", path.display());
tracing::error!("{remote_addr} failed to open file: {}", game_file.display());
}
if let Err(e) = tx.close().await {