From 9f8c6d3417a9ef5b3a55886e70219adc2da3ef2f Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sun, 10 Nov 2024 15:45:55 +0100 Subject: [PATCH] [feat] client robust against server disconnects and better logging on the client --- Cargo.lock | 1 + crates/lanspread-client/Cargo.toml | 1 + crates/lanspread-client/src/lib.rs | 173 +++++++++++++++------------- crates/lanspread-server/src/main.rs | 4 +- 4 files changed, 95 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 42413f0..da29ffe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2389,6 +2389,7 @@ dependencies = [ "lanspread-db", "lanspread-proto", "lanspread-utils", + "log", "s2n-quic", "serde_json", "tokio", diff --git a/crates/lanspread-client/Cargo.toml b/crates/lanspread-client/Cargo.toml index 3c63275..f63ed05 100644 --- a/crates/lanspread-client/Cargo.toml +++ b/crates/lanspread-client/Cargo.toml @@ -19,6 +19,7 @@ lanspread-utils = { path = "../lanspread-utils" } # external clap = { workspace = true } eyre = { workspace = true } +log = "0.4" s2n-quic = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } diff --git a/crates/lanspread-client/src/lib.rs b/crates/lanspread-client/src/lib.rs index 6dc8a34..fc671fe 100644 --- a/crates/lanspread-client/src/lib.rs +++ b/crates/lanspread-client/src/lib.rs @@ -4,11 +4,7 @@ use lanspread_db::db::Game; use lanspread_proto::{Message as _, Request, Response}; use lanspread_utils::maybe_addr; use s2n_quic::{client::Connect, provider::limits::Limits, Client as QuicClient}; -use tokio::{ - io::AsyncWriteExt as _, - sync::mpsc::{UnboundedReceiver, UnboundedSender}, -}; -// use tracing_subscriber::EnvFilter; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); @@ -29,103 +25,118 @@ pub async fn run( mut rx_control: UnboundedReceiver, tx_event: UnboundedSender, ) -> eyre::Result<()> { - // tracing_subscriber::fmt() - // .with_env_filter(EnvFilter::from_default_env()) - // .init(); - // blocking wait for remote address - tracing::debug!("waiting for server address"); + log::debug!("waiting for server address"); let server_addr = loop { if let Some(ClientCommand::ServerAddr(addr)) = rx_control.recv().await { - tracing::info!("got server address: {addr}"); + log::info!("got server address: {addr}"); break addr; } }; - let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; + loop { + let limits = Limits::default() + .with_max_handshake_duration(Duration::from_secs(3))? + .with_max_idle_timeout(Duration::from_secs(1))?; - let client = QuicClient::builder() - .with_tls(CERT_PEM)? - .with_io("0.0.0.0:0")? - .with_limits(limits)? - .start()?; + let client = QuicClient::builder() + .with_tls(CERT_PEM)? + .with_io("0.0.0.0:0")? + .with_limits(limits)? + .start()?; - let conn = Connect::new(server_addr).with_server_name("localhost"); - let mut conn = client.connect(conn).await?; - conn.keep_alive(true)?; - - tracing::info!( - "connected: (server: {}) (client: {})", - maybe_addr!(conn.remote_addr()), - maybe_addr!(conn.local_addr()) - ); - - // tx - while let Some(cmd) = rx_control.recv().await { - let request = match cmd { - ClientCommand::ListGames => Request::ListGames, - ClientCommand::GetGame(id) => Request::GetGame { id }, - ClientCommand::ServerAddr(_) => Request::Invalid( - [].into(), - "invalid control message (ServerAddr), should not happen".into(), - ), + let conn = Connect::new(server_addr).with_server_name("localhost"); + let mut conn = match client.connect(conn).await { + Ok(conn) => conn, + Err(e) => { + log::error!("failed to connect to server: {e}"); + tokio::time::sleep(Duration::from_secs(3)).await; + continue; + } }; - let data = request.encode(); - tracing::trace!("encoded data: {}", String::from_utf8_lossy(&data)); + conn.keep_alive(true)?; - let stream = conn.open_bidirectional_stream().await?; - let (mut rx, mut tx) = stream.split(); + log::info!( + "connected: (server: {}) (client: {})", + maybe_addr!(conn.remote_addr()), + maybe_addr!(conn.local_addr()) + ); - if let Err(e) = tx.write_all(&data).await { - tracing::error!(?e, "failed to send request to server"); - } - - if let Ok(Some(data)) = rx.receive().await { - tracing::trace!("server response (raw): {}", String::from_utf8_lossy(&data)); - - let response = Response::decode(&data); - tracing::trace!( - "server response (decoded): {}", - String::from_utf8_lossy(&data) - ); - match response { - Response::Games(games) => { - for game in &games { - tracing::debug!(?game); - } - - if let Err(e) = tx_event.send(ClientEvent::ListGames(games)) { - tracing::error!(?e, "failed to send ClientEvent::ListGames to client"); - } - } - Response::Game(game) => tracing::debug!(?game, "game received"), - Response::GameNotFound(id) => tracing::debug!(?id, "game not found"), - Response::InvalidRequest(request_bytes, err) => tracing::error!( - "server says our request was invalid (error: {}): {}", - err, - String::from_utf8_lossy(&request_bytes) + // tx + while let Some(cmd) = rx_control.recv().await { + let request = match cmd { + ClientCommand::ListGames => Request::ListGames, + ClientCommand::GetGame(id) => Request::GetGame { id }, + ClientCommand::ServerAddr(_) => Request::Invalid( + [].into(), + "invalid control message (ServerAddr), should not happen".into(), ), - Response::EncodingError(err) => { - tracing::error!("server encoding error: {err}"); - } - Response::DecodingError(data, err) => { - tracing::error!( - "response decoding error: {} (data: {})", - err, - String::from_utf8_lossy(&data) - ); + }; + + let data = request.encode(); + log::error!("encoded data: {}", String::from_utf8_lossy(&data)); + + let stream = match conn.open_bidirectional_stream().await { + Ok(stream) => stream, + Err(e) => { + log::error!("failed to open stream: {e}"); + break; } + }; + + let (mut rx, mut tx) = stream.split(); + + if let Err(e) = tx.send(data).await { + log::error!("failed to send request to server {:?}", e); } - if let Err(err) = tx.close().await { - tracing::error!("failed to close stream: {err}"); + if let Ok(Some(data)) = rx.receive().await { + log::trace!("server response (raw): {}", String::from_utf8_lossy(&data)); + + let response = Response::decode(&data); + log::trace!( + "server response (decoded): {}", + String::from_utf8_lossy(&data) + ); + match response { + Response::Games(games) => { + for game in &games { + log::debug!("{game:?}"); + } + + if let Err(e) = tx_event.send(ClientEvent::ListGames(games)) { + log::error!("failed to send ClientEvent::ListGames to client {e:?}"); + } + } + Response::Game(game) => log::debug!("game received: {game:?}"), + Response::GameNotFound(id) => log::debug!("game not found {id}"), + Response::InvalidRequest(request_bytes, err) => log::error!( + "server says our request was invalid (error: {}): {}", + err, + String::from_utf8_lossy(&request_bytes) + ), + Response::EncodingError(err) => { + log::error!("server encoding error: {err}"); + } + Response::DecodingError(data, err) => { + log::error!( + "response decoding error: {} (data: {})", + err, + String::from_utf8_lossy(&data) + ); + } + } + + if let Err(err) = tx.close().await { + log::error!("failed to close stream: {err}"); + } } } } - tracing::info!("server closed connection"); - Ok(()) + // log::info!("server closed connection"); + // Ok(()) } // #[derive(Debug, Parser)] diff --git a/crates/lanspread-server/src/main.rs b/crates/lanspread-server/src/main.rs index 650011d..6cdbf6a 100644 --- a/crates/lanspread-server/src/main.rs +++ b/crates/lanspread-server/src/main.rs @@ -66,14 +66,12 @@ impl Server { tracing::info!("{} connected", conn_ctx.remote_addr); while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { - tracing::debug!("{} stream opened: {:?}", conn_ctx.remote_addr, stream); - let (mut rx, mut tx) = stream.split(); let conn_ctx = conn_ctx.clone(); // spawn a new task for the stream tokio::spawn(async move { - tracing::debug!("{} stream opened", conn_ctx.remote_addr); + tracing::trace!("{} stream opened", conn_ctx.remote_addr); // handle streams while let Ok(Some(data)) = rx.receive().await {