feat(client): expose tunnel traffic counters

Client relay I/O now shares atomic tunnel counters across cloned
ClientRelayIo handles and the owning ClientSession. The counters cover
successful Ethernet frame rx/tx, relay datagram rx/tx, and dropped or
malformed frames so client diagnostics have the traffic totals called
out in PLAN.md.

The counters live in client-core because that crate owns relay datagram
classification and Ethernet payload validation. The Windows TAP runner can
later sample this snapshot without duplicating protocol decisions.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-client-core
- cargo clippy -p lanparty-client-core --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 20:11:11 +02:00
parent aa9105541f
commit 802fe3d082
4 changed files with 116 additions and 4 deletions
+113 -4
View File
@@ -10,7 +10,10 @@ use std::{
io::ErrorKind,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
path::{Path, PathBuf},
sync::Arc,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use anyhow::{Context, Result, bail};
@@ -19,6 +22,7 @@ use lanparty_ctrl::{
CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN,
RoomCode, ServerWelcome, decode_control_frame, encode_control_message,
};
use lanparty_obs::TunnelStats;
use lanparty_proto::{EthernetFrame, FrameType, MacAddr, decode_datagram, encode_datagram};
use quinn::{ClientConfig, Endpoint, crypto::rustls::QuicClientConfig};
use rustls::pki_types::CertificateDer;
@@ -210,6 +214,7 @@ pub struct ClientSession {
connection: quinn::Connection,
config: ClientSessionConfig,
welcome: ServerWelcome,
stats: Arc<ClientTunnelStats>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -243,7 +248,11 @@ impl ClientSession {
#[must_use]
pub fn relay_io(&self) -> ClientRelayIo {
ClientRelayIo::new(self.connection.clone(), self.welcome.clone())
ClientRelayIo::new(
self.connection.clone(),
self.welcome.clone(),
Arc::clone(&self.stats),
)
}
pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> {
@@ -254,6 +263,11 @@ impl ClientSession {
self.relay_io().recv_ethernet().await
}
#[must_use]
pub fn stats_snapshot(&self) -> TunnelStats {
self.stats.snapshot()
}
pub async fn shutdown(self, reason: &str) {
self.connection.close(0_u32.into(), reason.as_bytes());
self.endpoint.wait_idle().await;
@@ -264,14 +278,20 @@ impl ClientSession {
pub struct ClientRelayIo {
connection: quinn::Connection,
welcome: ServerWelcome,
stats: Arc<ClientTunnelStats>,
}
impl ClientRelayIo {
#[must_use]
fn new(connection: quinn::Connection, welcome: ServerWelcome) -> Self {
fn new(
connection: quinn::Connection,
welcome: ServerWelcome,
stats: Arc<ClientTunnelStats>,
) -> Self {
Self {
connection,
welcome,
stats,
}
}
@@ -281,7 +301,10 @@ impl ClientRelayIo {
}
pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> {
EthernetFrame::parse(frame).context("client Ethernet frame is malformed")?;
if let Err(error) = EthernetFrame::parse(frame) {
self.stats.record_malformed_frame();
return Err(error).context("client Ethernet frame is malformed");
}
let datagram = encode_datagram(
FrameType::Ethernet,
self.welcome.room_id(),
@@ -294,6 +317,7 @@ impl ClientRelayIo {
self.connection
.send_datagram(Bytes::from(datagram))
.context("failed to send client Ethernet datagram")?;
self.stats.record_ethernet_tx();
Ok(())
}
@@ -301,7 +325,9 @@ impl ClientRelayIo {
pub async fn recv_ethernet(&self) -> Result<ReceivedEthernetFrame> {
loop {
let datagram = self.connection.read_datagram().await?;
self.stats.record_datagram_rx();
let Ok(packet) = decode_datagram(&datagram) else {
self.stats.record_malformed_frame();
continue;
};
let header = packet.header();
@@ -309,18 +335,71 @@ impl ClientRelayIo {
|| header.room_id() != self.welcome.room_id()
|| header.peer_id() == self.welcome.peer_id()
{
self.stats.record_dropped_frame();
continue;
}
if EthernetFrame::parse(packet.payload()).is_err() {
self.stats.record_malformed_frame();
continue;
}
self.stats.record_ethernet_rx();
return Ok(ReceivedEthernetFrame {
source_peer_id: header.peer_id(),
payload: Bytes::copy_from_slice(packet.payload()),
});
}
}
#[must_use]
pub fn stats_snapshot(&self) -> TunnelStats {
self.stats.snapshot()
}
}
#[derive(Debug, Default)]
struct ClientTunnelStats {
ethernet_frames_tx: AtomicU64,
ethernet_frames_rx: AtomicU64,
datagrams_tx: AtomicU64,
datagrams_rx: AtomicU64,
dropped_frames: AtomicU64,
malformed_frames: AtomicU64,
}
impl ClientTunnelStats {
fn record_ethernet_tx(&self) {
self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed);
self.datagrams_tx.fetch_add(1, Ordering::Relaxed);
}
fn record_ethernet_rx(&self) {
self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed);
}
fn record_datagram_rx(&self) {
self.datagrams_rx.fetch_add(1, Ordering::Relaxed);
}
fn record_dropped_frame(&self) {
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
}
fn record_malformed_frame(&self) {
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
self.malformed_frames.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self) -> TunnelStats {
TunnelStats::new(
self.ethernet_frames_tx.load(Ordering::Relaxed),
self.ethernet_frames_rx.load(Ordering::Relaxed),
self.datagrams_tx.load(Ordering::Relaxed),
self.datagrams_rx.load(Ordering::Relaxed),
self.dropped_frames.load(Ordering::Relaxed),
self.malformed_frames.load(Ordering::Relaxed),
)
}
}
pub async fn connect_client(config: ClientSessionConfig) -> Result<ClientSession> {
@@ -353,6 +432,7 @@ pub async fn connect_client(config: ClientSessionConfig) -> Result<ClientSession
connection,
config,
welcome,
stats: Arc::default(),
}),
ControlMessage::Reject(reject) => bail!(
"relay rejected client hello: {:?}: {}",
@@ -581,6 +661,16 @@ mod tests {
assert_eq!(received.source_peer_id(), 1);
assert_eq!(received.payload(), ethernet_frame(b"from relay").as_slice());
assert!(relay_io.send_ethernet(&[0; 4]).is_err());
let stats = relay_io.stats_snapshot();
assert_eq!(stats.ethernet_frames_tx(), 1);
assert_eq!(stats.ethernet_frames_rx(), 1);
assert_eq!(stats.datagrams_tx(), 1);
assert_eq!(stats.datagrams_rx(), 1);
assert_eq!(stats.dropped_frames(), 1);
assert_eq!(stats.malformed_frames(), 1);
assert_eq!(client.stats_snapshot(), stats);
client.shutdown("test complete").await;
tokio::time::timeout(Duration::from_secs(5), server_task)
.await
@@ -588,6 +678,25 @@ mod tests {
.unwrap();
}
#[test]
fn snapshots_client_tunnel_stats() {
let stats = ClientTunnelStats::default();
stats.record_ethernet_tx();
stats.record_datagram_rx();
stats.record_ethernet_rx();
stats.record_dropped_frame();
stats.record_malformed_frame();
let snapshot = stats.snapshot();
assert_eq!(snapshot.ethernet_frames_tx(), 1);
assert_eq!(snapshot.ethernet_frames_rx(), 1);
assert_eq!(snapshot.datagrams_tx(), 1);
assert_eq!(snapshot.datagrams_rx(), 1);
assert_eq!(snapshot.dropped_frames(), 2);
assert_eq!(snapshot.malformed_frames(), 1);
}
fn test_server_config() -> (ServerConfig, CertificateDer<'static>) {
let certified_key =
rcgen::generate_simple_self_signed(vec!["lanparty-relay.local".into()]).unwrap();