From 21a69626e04901a19be716b443024668726d79b6 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Thu, 21 May 2026 21:54:35 +0200 Subject: [PATCH] feat(obs): report broadcast frame counters PLAN.md calls out "Broadcast traffic flowing" as a user-facing diagnostic. The tunnel stats only reported total Ethernet frame counts, so the client could not distinguish whether broadcast traffic was actually crossing the tunnel. Add defaulted broadcast tx/rx counters to TunnelStats while preserving the existing constructor and old JSON compatibility. Client and gateway accounting now increments those counters from validated Ethernet frames, and the client diagnostics line reports the broadcast flow next to total frame counts. Relay peer stats logs include the new counters so operators can see broadcast activity from forwarded stats snapshots too. Test Plan: - cargo fmt --check - cargo test -p lanparty-obs -p lanparty-client-core -p lanparty-gateway \ -p lanparty-client-win -p lanparty-relay - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings - git diff --check Refs: PLAN.md --- Cargo.lock | 1 + README.md | 12 ++-- crates/lanparty-client-core/src/lib.rs | 63 ++++++++++++++++----- crates/lanparty-client-win/src/main.rs | 10 ++-- crates/lanparty-gateway/src/lib.rs | 76 ++++++++++++++++++++++---- crates/lanparty-obs/Cargo.toml | 3 + crates/lanparty-obs/src/lib.rs | 49 ++++++++++++++++- crates/lanparty-relay/src/server.rs | 4 +- 8 files changed, 180 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6035328..9d6719c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -522,6 +522,7 @@ version = "0.1.0" dependencies = [ "lanparty-proto", "serde", + "serde_json", ] [[package]] diff --git a/README.md b/README.md index 1147dd1..bec00f4 100644 --- a/README.md +++ b/README.md @@ -203,9 +203,9 @@ adapter may need to be disabled/enabled or reinstalled before it reloads the configured `NetworkAddress`. It prints and reports client diagnostics snapshots with relay reachability, LAN-gateway presence, route-pinning, QUIC datagram budget, TAP status/IP, -frame/datagram counters, and drops. The periodic diagnostics refresh the TAP -unicast IP so DHCP results that arrive after bridging starts become visible in -later status lines. Relay lifecycle events are logged as they arrive, including -gateway joins and peer leaves. The client remembers peer identities from join -and catch-up events so later leave logs can identify a disconnected LAN gateway -or client MAC when that peer was known. +broadcast frame flow, frame/datagram counters, and drops. The periodic +diagnostics refresh the TAP unicast IP so DHCP results that arrive after +bridging starts become visible in later status lines. Relay lifecycle events +are logged as they arrive, including gateway joins and peer leaves. The client +remembers peer identities from join and catch-up events so later leave logs can +identify a disconnected LAN gateway or client MAC when that peer was known. diff --git a/crates/lanparty-client-core/src/lib.rs b/crates/lanparty-client-core/src/lib.rs index da8fd5d..4c98f7b 100644 --- a/crates/lanparty-client-core/src/lib.rs +++ b/crates/lanparty-client-core/src/lib.rs @@ -328,10 +328,13 @@ impl ClientRelayIo { } pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> { - if let Err(error) = EthernetFrame::parse(frame) { - self.stats.record_malformed_frame(); - return Err(error).context("client Ethernet frame is malformed"); - } + let ethernet_frame = match EthernetFrame::parse(frame) { + Ok(frame) => frame, + Err(error) => { + self.stats.record_malformed_frame(); + return Err(error).context("client Ethernet frame is malformed"); + } + }; let datagram = encode_datagram( FrameType::Ethernet, self.welcome.room_id(), @@ -344,7 +347,7 @@ impl ClientRelayIo { self.connection .send_datagram(Bytes::from(datagram)) .context("failed to send client Ethernet datagram")?; - self.stats.record_ethernet_tx(); + self.stats.record_ethernet_tx(ethernet_frame); Ok(()) } @@ -365,12 +368,15 @@ impl ClientRelayIo { self.stats.record_dropped_frame(); continue; } - if EthernetFrame::parse(packet.payload()).is_err() { - self.stats.record_malformed_frame(); - continue; - } + let ethernet_frame = match EthernetFrame::parse(packet.payload()) { + Ok(frame) => frame, + Err(_) => { + self.stats.record_malformed_frame(); + continue; + } + }; - self.stats.record_ethernet_rx(); + self.stats.record_ethernet_rx(ethernet_frame); return Ok(ReceivedEthernetFrame { source_peer_id: header.peer_id(), payload: Bytes::copy_from_slice(packet.payload()), @@ -393,6 +399,8 @@ impl ClientRelayIo { struct ClientTunnelStats { ethernet_frames_tx: AtomicU64, ethernet_frames_rx: AtomicU64, + broadcast_frames_tx: AtomicU64, + broadcast_frames_rx: AtomicU64, datagrams_tx: AtomicU64, datagrams_rx: AtomicU64, dropped_frames: AtomicU64, @@ -400,13 +408,19 @@ struct ClientTunnelStats { } impl ClientTunnelStats { - fn record_ethernet_tx(&self) { + fn record_ethernet_tx(&self, frame: EthernetFrame<'_>) { self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed); + if frame.is_broadcast() { + self.broadcast_frames_tx.fetch_add(1, Ordering::Relaxed); + } self.datagrams_tx.fetch_add(1, Ordering::Relaxed); } - fn record_ethernet_rx(&self) { + fn record_ethernet_rx(&self, frame: EthernetFrame<'_>) { self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed); + if frame.is_broadcast() { + self.broadcast_frames_rx.fetch_add(1, Ordering::Relaxed); + } } fn record_datagram_rx(&self) { @@ -431,6 +445,10 @@ impl ClientTunnelStats { self.dropped_frames.load(Ordering::Relaxed), self.malformed_frames.load(Ordering::Relaxed), ) + .with_broadcast_frames( + self.broadcast_frames_tx.load(Ordering::Relaxed), + self.broadcast_frames_rx.load(Ordering::Relaxed), + ) } } @@ -806,6 +824,8 @@ mod tests { let stats = relay_io.stats_snapshot(); assert_eq!(stats.ethernet_frames_tx(), 1); assert_eq!(stats.ethernet_frames_rx(), 1); + assert_eq!(stats.broadcast_frames_tx(), 0); + assert_eq!(stats.broadcast_frames_rx(), 0); assert_eq!(stats.datagrams_tx(), 1); assert_eq!(stats.datagrams_rx(), 1); assert_eq!(stats.dropped_frames(), 1); @@ -827,16 +847,22 @@ mod tests { #[test] fn snapshots_client_tunnel_stats() { let stats = ClientTunnelStats::default(); + let broadcast_tx_bytes = broadcast_ethernet_frame(b"broadcast tx"); + let broadcast_rx_bytes = broadcast_ethernet_frame(b"broadcast rx"); + let broadcast_tx = EthernetFrame::parse(&broadcast_tx_bytes).unwrap(); + let broadcast_rx = EthernetFrame::parse(&broadcast_rx_bytes).unwrap(); - stats.record_ethernet_tx(); + stats.record_ethernet_tx(broadcast_tx); stats.record_datagram_rx(); - stats.record_ethernet_rx(); + stats.record_ethernet_rx(broadcast_rx); stats.record_dropped_frame(); stats.record_malformed_frame(); let snapshot = stats.snapshot(); assert_eq!(snapshot.ethernet_frames_tx(), 1); assert_eq!(snapshot.ethernet_frames_rx(), 1); + assert_eq!(snapshot.broadcast_frames_tx(), 1); + assert_eq!(snapshot.broadcast_frames_rx(), 1); assert_eq!(snapshot.datagrams_tx(), 1); assert_eq!(snapshot.datagrams_rx(), 1); assert_eq!(snapshot.dropped_frames(), 2); @@ -876,6 +902,15 @@ mod tests { frame } + fn broadcast_ethernet_frame(payload: &[u8]) -> Vec { + let mut frame = Vec::new(); + frame.extend_from_slice(&MacAddr::BROADCAST.octets()); + frame.extend_from_slice(&[0x02, 0, 0, 0, 0, 1]); + frame.extend_from_slice(&0x0800_u16.to_be_bytes()); + frame.extend_from_slice(payload); + frame + } + fn unique_temp_identity_path() -> std::path::PathBuf { let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/crates/lanparty-client-win/src/main.rs b/crates/lanparty-client-win/src/main.rs index a914ab6..620a686 100644 --- a/crates/lanparty-client-win/src/main.rs +++ b/crates/lanparty-client-win/src/main.rs @@ -453,7 +453,7 @@ async fn print_and_report_client_diagnostics( fn format_client_diagnostics(diagnostics: &ClientDiagnostics) -> String { let stats = diagnostics.stats(); format!( - "client diagnostics: relay reachable {} gateway connected {} route pinned {}; QUIC datagrams {} max {}; TAP found {} MAC {} MTU {} IP {}; frames tx {} rx {} datagrams tx {} rx {} drops {} malformed {}", + "client diagnostics: relay reachable {} gateway connected {} route pinned {}; QUIC datagrams {} max {}; TAP found {} MAC {} MTU {} IP {}; frames tx {} rx {} broadcast tx {} rx {} datagrams tx {} rx {} drops {} malformed {}", yes_no(diagnostics.relay().reachable()), yes_no(diagnostics.relay().gateway_connected()), yes_no(diagnostics.relay().route_pinned()), @@ -465,6 +465,8 @@ fn format_client_diagnostics(diagnostics: &ClientDiagnostics) -> String { optional_label(diagnostics.tap().ip()), stats.ethernet_frames_tx(), stats.ethernet_frames_rx(), + stats.broadcast_frames_tx(), + stats.broadcast_frames_rx(), stats.datagrams_tx(), stats.datagrams_rx(), stats.dropped_frames(), @@ -851,12 +853,12 @@ mod tests { Some(1200), Some("10.73.42.51".parse().unwrap()), ), - TunnelStats::new(1, 2, 3, 4, 5, 6), + TunnelStats::new(1, 2, 3, 4, 5, 6).with_broadcast_frames(7, 8), ); assert_eq!( format_client_diagnostics(&diagnostics), - "client diagnostics: relay reachable yes gateway connected yes route pinned yes; QUIC datagrams yes max 1400; TAP found yes MAC 02:00:00:00:00:01 MTU 1200 IP 10.73.42.51; frames tx 1 rx 2 datagrams tx 3 rx 4 drops 5 malformed 6" + "client diagnostics: relay reachable yes gateway connected yes route pinned yes; QUIC datagrams yes max 1400; TAP found yes MAC 02:00:00:00:00:01 MTU 1200 IP 10.73.42.51; frames tx 1 rx 2 broadcast tx 7 rx 8 datagrams tx 3 rx 4 drops 5 malformed 6" ); } @@ -871,7 +873,7 @@ mod tests { assert_eq!( format_client_diagnostics(&diagnostics), - "client diagnostics: relay reachable yes gateway connected no route pinned no; QUIC datagrams no max unknown; TAP found no MAC unknown MTU unknown IP unknown; frames tx 0 rx 0 datagrams tx 0 rx 0 drops 0 malformed 0" + "client diagnostics: relay reachable yes gateway connected no route pinned no; QUIC datagrams no max unknown; TAP found no MAC unknown MTU unknown IP unknown; frames tx 0 rx 0 broadcast tx 0 rx 0 datagrams tx 0 rx 0 drops 0 malformed 0" ); } diff --git a/crates/lanparty-gateway/src/lib.rs b/crates/lanparty-gateway/src/lib.rs index e3afce1..80ff6e0 100644 --- a/crates/lanparty-gateway/src/lib.rs +++ b/crates/lanparty-gateway/src/lib.rs @@ -366,10 +366,13 @@ fn send_gateway_ethernet( stats: &GatewayTunnelStats, frame: &[u8], ) -> Result<()> { - if let Err(error) = EthernetFrame::parse(frame) { - stats.record_malformed_frame(); - return Err(error).context("gateway Ethernet frame is malformed"); - } + let ethernet_frame = match EthernetFrame::parse(frame) { + Ok(frame) => frame, + Err(error) => { + stats.record_malformed_frame(); + return Err(error).context("gateway Ethernet frame is malformed"); + } + }; let datagram = encode_datagram( FrameType::Ethernet, welcome.room_id(), @@ -382,7 +385,7 @@ fn send_gateway_ethernet( connection .send_datagram(Bytes::from(datagram)) .context("failed to send gateway Ethernet datagram")?; - stats.record_ethernet_tx(); + stats.record_ethernet_tx(ethernet_frame); Ok(()) } @@ -407,12 +410,15 @@ async fn recv_gateway_ethernet( stats.record_dropped_frame(); continue; } - if EthernetFrame::parse(packet.payload()).is_err() { - stats.record_malformed_frame(); - continue; - } + let ethernet_frame = match EthernetFrame::parse(packet.payload()) { + Ok(frame) => frame, + Err(_) => { + stats.record_malformed_frame(); + continue; + } + }; - stats.record_ethernet_rx(); + stats.record_ethernet_rx(ethernet_frame); return Ok(ReceivedEthernetFrame { source_peer_id: header.peer_id(), payload: Bytes::copy_from_slice(packet.payload()), @@ -459,6 +465,8 @@ async fn send_gateway_control_event( struct GatewayTunnelStats { ethernet_frames_tx: AtomicU64, ethernet_frames_rx: AtomicU64, + broadcast_frames_tx: AtomicU64, + broadcast_frames_rx: AtomicU64, datagrams_tx: AtomicU64, datagrams_rx: AtomicU64, dropped_frames: AtomicU64, @@ -466,13 +474,19 @@ struct GatewayTunnelStats { } impl GatewayTunnelStats { - fn record_ethernet_tx(&self) { + fn record_ethernet_tx(&self, frame: EthernetFrame<'_>) { self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed); + if frame.is_broadcast() { + self.broadcast_frames_tx.fetch_add(1, Ordering::Relaxed); + } self.datagrams_tx.fetch_add(1, Ordering::Relaxed); } - fn record_ethernet_rx(&self) { + fn record_ethernet_rx(&self, frame: EthernetFrame<'_>) { self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed); + if frame.is_broadcast() { + self.broadcast_frames_rx.fetch_add(1, Ordering::Relaxed); + } } fn record_datagram_rx(&self) { @@ -497,6 +511,10 @@ impl GatewayTunnelStats { self.dropped_frames.load(Ordering::Relaxed), self.malformed_frames.load(Ordering::Relaxed), ) + .with_broadcast_frames( + self.broadcast_frames_tx.load(Ordering::Relaxed), + self.broadcast_frames_rx.load(Ordering::Relaxed), + ) } } @@ -986,6 +1004,31 @@ mod tests { .unwrap(); } + #[test] + fn snapshots_gateway_broadcast_stats() { + let stats = GatewayTunnelStats::default(); + let broadcast_tx_bytes = broadcast_ethernet_frame(b"broadcast tx"); + let broadcast_rx_bytes = broadcast_ethernet_frame(b"broadcast rx"); + let broadcast_tx = EthernetFrame::parse(&broadcast_tx_bytes).unwrap(); + let broadcast_rx = EthernetFrame::parse(&broadcast_rx_bytes).unwrap(); + + stats.record_ethernet_tx(broadcast_tx); + stats.record_datagram_rx(); + stats.record_ethernet_rx(broadcast_rx); + stats.record_dropped_frame(); + stats.record_malformed_frame(); + let snapshot = stats.snapshot(); + + assert_eq!(snapshot.ethernet_frames_tx(), 1); + assert_eq!(snapshot.ethernet_frames_rx(), 1); + assert_eq!(snapshot.broadcast_frames_tx(), 1); + assert_eq!(snapshot.broadcast_frames_rx(), 1); + assert_eq!(snapshot.datagrams_tx(), 1); + assert_eq!(snapshot.datagrams_rx(), 1); + assert_eq!(snapshot.dropped_frames(), 2); + assert_eq!(snapshot.malformed_frames(), 1); + } + #[cfg(target_os = "linux")] #[test] fn builds_padded_cam_refresh_frame() { @@ -1106,4 +1149,13 @@ mod tests { frame.extend_from_slice(payload); frame } + + fn broadcast_ethernet_frame(payload: &[u8]) -> Vec { + let mut frame = Vec::new(); + frame.extend_from_slice(&MacAddr::BROADCAST.octets()); + frame.extend_from_slice(&[0x02, 0, 0, 0, 0, 1]); + frame.extend_from_slice(&0x0800_u16.to_be_bytes()); + frame.extend_from_slice(payload); + frame + } } diff --git a/crates/lanparty-obs/Cargo.toml b/crates/lanparty-obs/Cargo.toml index c7ca86d..3af2440 100644 --- a/crates/lanparty-obs/Cargo.toml +++ b/crates/lanparty-obs/Cargo.toml @@ -6,3 +6,6 @@ edition.workspace = true [dependencies] lanparty-proto = { path = "../lanparty-proto" } serde.workspace = true + +[dev-dependencies] +serde_json.workspace = true diff --git a/crates/lanparty-obs/src/lib.rs b/crates/lanparty-obs/src/lib.rs index e9e1b75..237dd4d 100644 --- a/crates/lanparty-obs/src/lib.rs +++ b/crates/lanparty-obs/src/lib.rs @@ -135,6 +135,10 @@ impl FrameLog { pub struct TunnelStats { ethernet_frames_tx: u64, ethernet_frames_rx: u64, + #[serde(default)] + broadcast_frames_tx: u64, + #[serde(default)] + broadcast_frames_rx: u64, datagrams_tx: u64, datagrams_rx: u64, dropped_frames: u64, @@ -154,6 +158,8 @@ impl TunnelStats { Self { ethernet_frames_tx, ethernet_frames_rx, + broadcast_frames_tx: 0, + broadcast_frames_rx: 0, datagrams_tx, datagrams_rx, dropped_frames, @@ -161,6 +167,17 @@ impl TunnelStats { } } + #[must_use] + pub const fn with_broadcast_frames( + mut self, + broadcast_frames_tx: u64, + broadcast_frames_rx: u64, + ) -> Self { + self.broadcast_frames_tx = broadcast_frames_tx; + self.broadcast_frames_rx = broadcast_frames_rx; + self + } + #[must_use] pub const fn ethernet_frames_tx(&self) -> u64 { self.ethernet_frames_tx @@ -171,6 +188,16 @@ impl TunnelStats { self.ethernet_frames_rx } + #[must_use] + pub const fn broadcast_frames_tx(&self) -> u64 { + self.broadcast_frames_tx + } + + #[must_use] + pub const fn broadcast_frames_rx(&self) -> u64 { + self.broadcast_frames_rx + } + #[must_use] pub const fn datagrams_tx(&self) -> u64 { self.datagrams_tx @@ -433,7 +460,7 @@ mod tests { #[test] fn exposes_client_diagnostics() { let mac = MacAddr::new([0x02, 1, 2, 3, 4, 5]); - let stats = TunnelStats::new(1, 2, 3, 4, 5, 6); + let stats = TunnelStats::new(1, 2, 3, 4, 5, 6).with_broadcast_frames(7, 8); let diagnostics = ClientDiagnostics::new( RelayDiagnostics::new(true, true, true), QuicDiagnostics::new(true, Some(1400)), @@ -455,6 +482,26 @@ mod tests { assert_eq!(diagnostics.tap().mac(), Some(mac)); assert_eq!(diagnostics.tap().mtu(), Some(1200)); assert_eq!(diagnostics.tap().ip().unwrap().to_string(), "10.73.42.51"); + assert_eq!(diagnostics.stats().broadcast_frames_tx(), 7); + assert_eq!(diagnostics.stats().broadcast_frames_rx(), 8); assert_eq!(diagnostics.stats().dropped_frames(), 5); } + + #[test] + fn defaults_missing_broadcast_stats_to_zero() { + let stats: TunnelStats = serde_json::from_str( + r#"{ + "ethernet_frames_tx": 1, + "ethernet_frames_rx": 2, + "datagrams_tx": 3, + "datagrams_rx": 4, + "dropped_frames": 5, + "malformed_frames": 6 + }"#, + ) + .unwrap(); + + assert_eq!(stats.broadcast_frames_tx(), 0); + assert_eq!(stats.broadcast_frames_rx(), 0); + } } diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index 5c8a53a..4a09139 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -539,12 +539,14 @@ fn relay_frame_log_line( fn peer_stats_log_line(accepted: &AcceptedPeer, stats: &TunnelStats) -> String { format!( - "peer stats room={} peer_id={} role={:?} frames_tx={} frames_rx={} datagrams_tx={} datagrams_rx={} drops={} malformed={}", + "peer stats room={} peer_id={} role={:?} frames_tx={} frames_rx={} broadcast_tx={} broadcast_rx={} datagrams_tx={} datagrams_rx={} drops={} malformed={}", accepted.room, accepted.peer.peer_id(), accepted.peer.role(), stats.ethernet_frames_tx(), stats.ethernet_frames_rx(), + stats.broadcast_frames_tx(), + stats.broadcast_frames_rx(), stats.datagrams_tx(), stats.datagrams_rx(), stats.dropped_frames(),