[feat] client robust against server disconnects and better logging on the client

This commit is contained in:
ddidderr 2024-11-10 15:45:55 +01:00
parent 89af1f9176
commit 9f8c6d3417
Signed by: ddidderr
GPG Key ID: 3841F1C27E6F0E14
4 changed files with 95 additions and 84 deletions

1
Cargo.lock generated
View File

@ -2389,6 +2389,7 @@ dependencies = [
"lanspread-db", "lanspread-db",
"lanspread-proto", "lanspread-proto",
"lanspread-utils", "lanspread-utils",
"log",
"s2n-quic", "s2n-quic",
"serde_json", "serde_json",
"tokio", "tokio",

View File

@ -19,6 +19,7 @@ lanspread-utils = { path = "../lanspread-utils" }
# external # external
clap = { workspace = true } clap = { workspace = true }
eyre = { workspace = true } eyre = { workspace = true }
log = "0.4"
s2n-quic = { workspace = true } s2n-quic = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }

View File

@ -4,11 +4,7 @@ use lanspread_db::db::Game;
use lanspread_proto::{Message as _, Request, Response}; use lanspread_proto::{Message as _, Request, Response};
use lanspread_utils::maybe_addr; use lanspread_utils::maybe_addr;
use s2n_quic::{client::Connect, provider::limits::Limits, Client as QuicClient}; use s2n_quic::{client::Connect, provider::limits::Limits, Client as QuicClient};
use tokio::{ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
io::AsyncWriteExt as _,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
// use tracing_subscriber::EnvFilter;
static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem"));
@ -29,20 +25,19 @@ pub async fn run(
mut rx_control: UnboundedReceiver<ClientCommand>, mut rx_control: UnboundedReceiver<ClientCommand>,
tx_event: UnboundedSender<ClientEvent>, tx_event: UnboundedSender<ClientEvent>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
// tracing_subscriber::fmt()
// .with_env_filter(EnvFilter::from_default_env())
// .init();
// blocking wait for remote address // blocking wait for remote address
tracing::debug!("waiting for server address"); log::debug!("waiting for server address");
let server_addr = loop { let server_addr = loop {
if let Some(ClientCommand::ServerAddr(addr)) = rx_control.recv().await { if let Some(ClientCommand::ServerAddr(addr)) = rx_control.recv().await {
tracing::info!("got server address: {addr}"); log::info!("got server address: {addr}");
break addr; break addr;
} }
}; };
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; loop {
let limits = Limits::default()
.with_max_handshake_duration(Duration::from_secs(3))?
.with_max_idle_timeout(Duration::from_secs(1))?;
let client = QuicClient::builder() let client = QuicClient::builder()
.with_tls(CERT_PEM)? .with_tls(CERT_PEM)?
@ -51,10 +46,18 @@ pub async fn run(
.start()?; .start()?;
let conn = Connect::new(server_addr).with_server_name("localhost"); let conn = Connect::new(server_addr).with_server_name("localhost");
let mut conn = client.connect(conn).await?; let mut conn = match client.connect(conn).await {
Ok(conn) => conn,
Err(e) => {
log::error!("failed to connect to server: {e}");
tokio::time::sleep(Duration::from_secs(3)).await;
continue;
}
};
conn.keep_alive(true)?; conn.keep_alive(true)?;
tracing::info!( log::info!(
"connected: (server: {}) (client: {})", "connected: (server: {}) (client: {})",
maybe_addr!(conn.remote_addr()), maybe_addr!(conn.remote_addr()),
maybe_addr!(conn.local_addr()) maybe_addr!(conn.local_addr())
@ -72,45 +75,52 @@ pub async fn run(
}; };
let data = request.encode(); let data = request.encode();
tracing::trace!("encoded data: {}", String::from_utf8_lossy(&data)); log::error!("encoded data: {}", String::from_utf8_lossy(&data));
let stream = match conn.open_bidirectional_stream().await {
Ok(stream) => stream,
Err(e) => {
log::error!("failed to open stream: {e}");
break;
}
};
let stream = conn.open_bidirectional_stream().await?;
let (mut rx, mut tx) = stream.split(); let (mut rx, mut tx) = stream.split();
if let Err(e) = tx.write_all(&data).await { if let Err(e) = tx.send(data).await {
tracing::error!(?e, "failed to send request to server"); log::error!("failed to send request to server {:?}", e);
} }
if let Ok(Some(data)) = rx.receive().await { if let Ok(Some(data)) = rx.receive().await {
tracing::trace!("server response (raw): {}", String::from_utf8_lossy(&data)); log::trace!("server response (raw): {}", String::from_utf8_lossy(&data));
let response = Response::decode(&data); let response = Response::decode(&data);
tracing::trace!( log::trace!(
"server response (decoded): {}", "server response (decoded): {}",
String::from_utf8_lossy(&data) String::from_utf8_lossy(&data)
); );
match response { match response {
Response::Games(games) => { Response::Games(games) => {
for game in &games { for game in &games {
tracing::debug!(?game); log::debug!("{game:?}");
} }
if let Err(e) = tx_event.send(ClientEvent::ListGames(games)) { if let Err(e) = tx_event.send(ClientEvent::ListGames(games)) {
tracing::error!(?e, "failed to send ClientEvent::ListGames to client"); log::error!("failed to send ClientEvent::ListGames to client {e:?}");
} }
} }
Response::Game(game) => tracing::debug!(?game, "game received"), Response::Game(game) => log::debug!("game received: {game:?}"),
Response::GameNotFound(id) => tracing::debug!(?id, "game not found"), Response::GameNotFound(id) => log::debug!("game not found {id}"),
Response::InvalidRequest(request_bytes, err) => tracing::error!( Response::InvalidRequest(request_bytes, err) => log::error!(
"server says our request was invalid (error: {}): {}", "server says our request was invalid (error: {}): {}",
err, err,
String::from_utf8_lossy(&request_bytes) String::from_utf8_lossy(&request_bytes)
), ),
Response::EncodingError(err) => { Response::EncodingError(err) => {
tracing::error!("server encoding error: {err}"); log::error!("server encoding error: {err}");
} }
Response::DecodingError(data, err) => { Response::DecodingError(data, err) => {
tracing::error!( log::error!(
"response decoding error: {} (data: {})", "response decoding error: {} (data: {})",
err, err,
String::from_utf8_lossy(&data) String::from_utf8_lossy(&data)
@ -119,13 +129,14 @@ pub async fn run(
} }
if let Err(err) = tx.close().await { if let Err(err) = tx.close().await {
tracing::error!("failed to close stream: {err}"); log::error!("failed to close stream: {err}");
}
} }
} }
} }
tracing::info!("server closed connection"); // log::info!("server closed connection");
Ok(()) // Ok(())
} }
// #[derive(Debug, Parser)] // #[derive(Debug, Parser)]

View File

@ -66,14 +66,12 @@ impl Server {
tracing::info!("{} connected", conn_ctx.remote_addr); tracing::info!("{} connected", conn_ctx.remote_addr);
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
tracing::debug!("{} stream opened: {:?}", conn_ctx.remote_addr, stream);
let (mut rx, mut tx) = stream.split(); let (mut rx, mut tx) = stream.split();
let conn_ctx = conn_ctx.clone(); let conn_ctx = conn_ctx.clone();
// spawn a new task for the stream // spawn a new task for the stream
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!("{} stream opened", conn_ctx.remote_addr); tracing::trace!("{} stream opened", conn_ctx.remote_addr);
// handle streams // handle streams
while let Ok(Some(data)) = rx.receive().await { while let Ok(Some(data)) = rx.receive().await {