From 802fe3d082376086c1d393aa2de6483687e66564 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Thu, 21 May 2026 20:11:11 +0200 Subject: [PATCH] feat(client): expose tunnel traffic counters Client relay I/O now shares atomic tunnel counters across cloned ClientRelayIo handles and the owning ClientSession. The counters cover successful Ethernet frame rx/tx, relay datagram rx/tx, and dropped or malformed frames so client diagnostics have the traffic totals called out in PLAN.md. The counters live in client-core because that crate owns relay datagram classification and Ethernet payload validation. The Windows TAP runner can later sample this snapshot without duplicating protocol decisions. Test Plan: - cargo fmt --check - cargo test -p lanparty-client-core - cargo clippy -p lanparty-client-core --all-targets -- -D warnings - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings - git diff --check Refs: PLAN.md --- Cargo.lock | 1 + README.md | 1 + crates/lanparty-client-core/Cargo.toml | 1 + crates/lanparty-client-core/src/lib.rs | 117 ++++++++++++++++++++++++- 4 files changed, 116 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7da62d3..faed813 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -437,6 +437,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lanparty-ctrl", + "lanparty-obs", "lanparty-proto", "quinn", "rcgen", diff --git a/README.md b/README.md index 6a1eac8..fc7dc82 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ Platform-neutral remote client relay session: - client hello with room, virtual MAC, and datagram budget - welcome/reject handling with assigned peer id and effective TAP MTU - Ethernet frame send/receive helpers over QUIC DATAGRAM +- client tunnel statistics for frame/datagram rx/tx and drops ### `lanparty-client-route` diff --git a/crates/lanparty-client-core/Cargo.toml b/crates/lanparty-client-core/Cargo.toml index 67c2a39..679cabc 100644 --- a/crates/lanparty-client-core/Cargo.toml +++ b/crates/lanparty-client-core/Cargo.toml @@ -8,6 +8,7 @@ anyhow.workspace = true bytes.workspace = true getrandom.workspace = true lanparty-ctrl = { path = "../lanparty-ctrl" } +lanparty-obs = { path = "../lanparty-obs" } lanparty-proto = { path = "../lanparty-proto" } quinn.workspace = true rustls.workspace = true diff --git a/crates/lanparty-client-core/src/lib.rs b/crates/lanparty-client-core/src/lib.rs index 70789bf..2d0c073 100644 --- a/crates/lanparty-client-core/src/lib.rs +++ b/crates/lanparty-client-core/src/lib.rs @@ -10,7 +10,10 @@ use std::{ io::ErrorKind, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, path::{Path, PathBuf}, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, }; use anyhow::{Context, Result, bail}; @@ -19,6 +22,7 @@ use lanparty_ctrl::{ CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN, RoomCode, ServerWelcome, decode_control_frame, encode_control_message, }; +use lanparty_obs::TunnelStats; use lanparty_proto::{EthernetFrame, FrameType, MacAddr, decode_datagram, encode_datagram}; use quinn::{ClientConfig, Endpoint, crypto::rustls::QuicClientConfig}; use rustls::pki_types::CertificateDer; @@ -210,6 +214,7 @@ pub struct ClientSession { connection: quinn::Connection, config: ClientSessionConfig, welcome: ServerWelcome, + stats: Arc, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -243,7 +248,11 @@ impl ClientSession { #[must_use] pub fn relay_io(&self) -> ClientRelayIo { - ClientRelayIo::new(self.connection.clone(), self.welcome.clone()) + ClientRelayIo::new( + self.connection.clone(), + self.welcome.clone(), + Arc::clone(&self.stats), + ) } pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> { @@ -254,6 +263,11 @@ impl ClientSession { self.relay_io().recv_ethernet().await } + #[must_use] + pub fn stats_snapshot(&self) -> TunnelStats { + self.stats.snapshot() + } + pub async fn shutdown(self, reason: &str) { self.connection.close(0_u32.into(), reason.as_bytes()); self.endpoint.wait_idle().await; @@ -264,14 +278,20 @@ impl ClientSession { pub struct ClientRelayIo { connection: quinn::Connection, welcome: ServerWelcome, + stats: Arc, } impl ClientRelayIo { #[must_use] - fn new(connection: quinn::Connection, welcome: ServerWelcome) -> Self { + fn new( + connection: quinn::Connection, + welcome: ServerWelcome, + stats: Arc, + ) -> Self { Self { connection, welcome, + stats, } } @@ -281,7 +301,10 @@ impl ClientRelayIo { } pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> { - EthernetFrame::parse(frame).context("client Ethernet frame is malformed")?; + if let Err(error) = EthernetFrame::parse(frame) { + self.stats.record_malformed_frame(); + return Err(error).context("client Ethernet frame is malformed"); + } let datagram = encode_datagram( FrameType::Ethernet, self.welcome.room_id(), @@ -294,6 +317,7 @@ impl ClientRelayIo { self.connection .send_datagram(Bytes::from(datagram)) .context("failed to send client Ethernet datagram")?; + self.stats.record_ethernet_tx(); Ok(()) } @@ -301,7 +325,9 @@ impl ClientRelayIo { pub async fn recv_ethernet(&self) -> Result { loop { let datagram = self.connection.read_datagram().await?; + self.stats.record_datagram_rx(); let Ok(packet) = decode_datagram(&datagram) else { + self.stats.record_malformed_frame(); continue; }; let header = packet.header(); @@ -309,18 +335,71 @@ impl ClientRelayIo { || header.room_id() != self.welcome.room_id() || header.peer_id() == self.welcome.peer_id() { + self.stats.record_dropped_frame(); continue; } if EthernetFrame::parse(packet.payload()).is_err() { + self.stats.record_malformed_frame(); continue; } + self.stats.record_ethernet_rx(); return Ok(ReceivedEthernetFrame { source_peer_id: header.peer_id(), payload: Bytes::copy_from_slice(packet.payload()), }); } } + + #[must_use] + pub fn stats_snapshot(&self) -> TunnelStats { + self.stats.snapshot() + } +} + +#[derive(Debug, Default)] +struct ClientTunnelStats { + ethernet_frames_tx: AtomicU64, + ethernet_frames_rx: AtomicU64, + datagrams_tx: AtomicU64, + datagrams_rx: AtomicU64, + dropped_frames: AtomicU64, + malformed_frames: AtomicU64, +} + +impl ClientTunnelStats { + fn record_ethernet_tx(&self) { + self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed); + self.datagrams_tx.fetch_add(1, Ordering::Relaxed); + } + + fn record_ethernet_rx(&self) { + self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed); + } + + fn record_datagram_rx(&self) { + self.datagrams_rx.fetch_add(1, Ordering::Relaxed); + } + + fn record_dropped_frame(&self) { + self.dropped_frames.fetch_add(1, Ordering::Relaxed); + } + + fn record_malformed_frame(&self) { + self.dropped_frames.fetch_add(1, Ordering::Relaxed); + self.malformed_frames.fetch_add(1, Ordering::Relaxed); + } + + fn snapshot(&self) -> TunnelStats { + TunnelStats::new( + self.ethernet_frames_tx.load(Ordering::Relaxed), + self.ethernet_frames_rx.load(Ordering::Relaxed), + self.datagrams_tx.load(Ordering::Relaxed), + self.datagrams_rx.load(Ordering::Relaxed), + self.dropped_frames.load(Ordering::Relaxed), + self.malformed_frames.load(Ordering::Relaxed), + ) + } } pub async fn connect_client(config: ClientSessionConfig) -> Result { @@ -353,6 +432,7 @@ pub async fn connect_client(config: ClientSessionConfig) -> Result bail!( "relay rejected client hello: {:?}: {}", @@ -581,6 +661,16 @@ mod tests { assert_eq!(received.source_peer_id(), 1); assert_eq!(received.payload(), ethernet_frame(b"from relay").as_slice()); + assert!(relay_io.send_ethernet(&[0; 4]).is_err()); + let stats = relay_io.stats_snapshot(); + assert_eq!(stats.ethernet_frames_tx(), 1); + assert_eq!(stats.ethernet_frames_rx(), 1); + assert_eq!(stats.datagrams_tx(), 1); + assert_eq!(stats.datagrams_rx(), 1); + assert_eq!(stats.dropped_frames(), 1); + assert_eq!(stats.malformed_frames(), 1); + assert_eq!(client.stats_snapshot(), stats); + client.shutdown("test complete").await; tokio::time::timeout(Duration::from_secs(5), server_task) .await @@ -588,6 +678,25 @@ mod tests { .unwrap(); } + #[test] + fn snapshots_client_tunnel_stats() { + let stats = ClientTunnelStats::default(); + + stats.record_ethernet_tx(); + stats.record_datagram_rx(); + stats.record_ethernet_rx(); + stats.record_dropped_frame(); + stats.record_malformed_frame(); + let snapshot = stats.snapshot(); + + assert_eq!(snapshot.ethernet_frames_tx(), 1); + assert_eq!(snapshot.ethernet_frames_rx(), 1); + assert_eq!(snapshot.datagrams_tx(), 1); + assert_eq!(snapshot.datagrams_rx(), 1); + assert_eq!(snapshot.dropped_frames(), 2); + assert_eq!(snapshot.malformed_frames(), 1); + } + fn test_server_config() -> (ServerConfig, CertificateDer<'static>) { let certified_key = rcgen::generate_simple_self_signed(vec!["lanparty-relay.local".into()]).unwrap();