feat(obs): report broadcast frame counters

PLAN.md calls out "Broadcast traffic flowing" as a user-facing diagnostic.
The tunnel stats only reported total Ethernet frame counts, so the client
could not distinguish whether broadcast traffic was actually crossing the
tunnel.

Add defaulted broadcast tx/rx counters to TunnelStats while preserving the
existing constructor and old JSON compatibility. Client and gateway accounting
now increments those counters from validated Ethernet frames, and the client
diagnostics line reports the broadcast flow next to total frame counts.

Relay peer stats logs include the new counters so operators can see broadcast
activity from forwarded stats snapshots too.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-obs -p lanparty-client-core -p lanparty-gateway \
  -p lanparty-client-win -p lanparty-relay
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 21:54:35 +02:00
parent 9722adbd70
commit 21a69626e0
8 changed files with 180 additions and 38 deletions
+49 -14
View File
@@ -328,10 +328,13 @@ impl ClientRelayIo {
}
pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> {
if let Err(error) = EthernetFrame::parse(frame) {
self.stats.record_malformed_frame();
return Err(error).context("client Ethernet frame is malformed");
}
let ethernet_frame = match EthernetFrame::parse(frame) {
Ok(frame) => frame,
Err(error) => {
self.stats.record_malformed_frame();
return Err(error).context("client Ethernet frame is malformed");
}
};
let datagram = encode_datagram(
FrameType::Ethernet,
self.welcome.room_id(),
@@ -344,7 +347,7 @@ impl ClientRelayIo {
self.connection
.send_datagram(Bytes::from(datagram))
.context("failed to send client Ethernet datagram")?;
self.stats.record_ethernet_tx();
self.stats.record_ethernet_tx(ethernet_frame);
Ok(())
}
@@ -365,12 +368,15 @@ impl ClientRelayIo {
self.stats.record_dropped_frame();
continue;
}
if EthernetFrame::parse(packet.payload()).is_err() {
self.stats.record_malformed_frame();
continue;
}
let ethernet_frame = match EthernetFrame::parse(packet.payload()) {
Ok(frame) => frame,
Err(_) => {
self.stats.record_malformed_frame();
continue;
}
};
self.stats.record_ethernet_rx();
self.stats.record_ethernet_rx(ethernet_frame);
return Ok(ReceivedEthernetFrame {
source_peer_id: header.peer_id(),
payload: Bytes::copy_from_slice(packet.payload()),
@@ -393,6 +399,8 @@ impl ClientRelayIo {
struct ClientTunnelStats {
ethernet_frames_tx: AtomicU64,
ethernet_frames_rx: AtomicU64,
broadcast_frames_tx: AtomicU64,
broadcast_frames_rx: AtomicU64,
datagrams_tx: AtomicU64,
datagrams_rx: AtomicU64,
dropped_frames: AtomicU64,
@@ -400,13 +408,19 @@ struct ClientTunnelStats {
}
impl ClientTunnelStats {
fn record_ethernet_tx(&self) {
fn record_ethernet_tx(&self, frame: EthernetFrame<'_>) {
self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed);
if frame.is_broadcast() {
self.broadcast_frames_tx.fetch_add(1, Ordering::Relaxed);
}
self.datagrams_tx.fetch_add(1, Ordering::Relaxed);
}
fn record_ethernet_rx(&self) {
fn record_ethernet_rx(&self, frame: EthernetFrame<'_>) {
self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed);
if frame.is_broadcast() {
self.broadcast_frames_rx.fetch_add(1, Ordering::Relaxed);
}
}
fn record_datagram_rx(&self) {
@@ -431,6 +445,10 @@ impl ClientTunnelStats {
self.dropped_frames.load(Ordering::Relaxed),
self.malformed_frames.load(Ordering::Relaxed),
)
.with_broadcast_frames(
self.broadcast_frames_tx.load(Ordering::Relaxed),
self.broadcast_frames_rx.load(Ordering::Relaxed),
)
}
}
@@ -806,6 +824,8 @@ mod tests {
let stats = relay_io.stats_snapshot();
assert_eq!(stats.ethernet_frames_tx(), 1);
assert_eq!(stats.ethernet_frames_rx(), 1);
assert_eq!(stats.broadcast_frames_tx(), 0);
assert_eq!(stats.broadcast_frames_rx(), 0);
assert_eq!(stats.datagrams_tx(), 1);
assert_eq!(stats.datagrams_rx(), 1);
assert_eq!(stats.dropped_frames(), 1);
@@ -827,16 +847,22 @@ mod tests {
#[test]
fn snapshots_client_tunnel_stats() {
let stats = ClientTunnelStats::default();
let broadcast_tx_bytes = broadcast_ethernet_frame(b"broadcast tx");
let broadcast_rx_bytes = broadcast_ethernet_frame(b"broadcast rx");
let broadcast_tx = EthernetFrame::parse(&broadcast_tx_bytes).unwrap();
let broadcast_rx = EthernetFrame::parse(&broadcast_rx_bytes).unwrap();
stats.record_ethernet_tx();
stats.record_ethernet_tx(broadcast_tx);
stats.record_datagram_rx();
stats.record_ethernet_rx();
stats.record_ethernet_rx(broadcast_rx);
stats.record_dropped_frame();
stats.record_malformed_frame();
let snapshot = stats.snapshot();
assert_eq!(snapshot.ethernet_frames_tx(), 1);
assert_eq!(snapshot.ethernet_frames_rx(), 1);
assert_eq!(snapshot.broadcast_frames_tx(), 1);
assert_eq!(snapshot.broadcast_frames_rx(), 1);
assert_eq!(snapshot.datagrams_tx(), 1);
assert_eq!(snapshot.datagrams_rx(), 1);
assert_eq!(snapshot.dropped_frames(), 2);
@@ -876,6 +902,15 @@ mod tests {
frame
}
fn broadcast_ethernet_frame(payload: &[u8]) -> Vec<u8> {
let mut frame = Vec::new();
frame.extend_from_slice(&MacAddr::BROADCAST.octets());
frame.extend_from_slice(&[0x02, 0, 0, 0, 0, 1]);
frame.extend_from_slice(&0x0800_u16.to_be_bytes());
frame.extend_from_slice(payload);
frame
}
fn unique_temp_identity_path() -> std::path::PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
+6 -4
View File
@@ -453,7 +453,7 @@ async fn print_and_report_client_diagnostics(
fn format_client_diagnostics(diagnostics: &ClientDiagnostics) -> String {
let stats = diagnostics.stats();
format!(
"client diagnostics: relay reachable {} gateway connected {} route pinned {}; QUIC datagrams {} max {}; TAP found {} MAC {} MTU {} IP {}; frames tx {} rx {} datagrams tx {} rx {} drops {} malformed {}",
"client diagnostics: relay reachable {} gateway connected {} route pinned {}; QUIC datagrams {} max {}; TAP found {} MAC {} MTU {} IP {}; frames tx {} rx {} broadcast tx {} rx {} datagrams tx {} rx {} drops {} malformed {}",
yes_no(diagnostics.relay().reachable()),
yes_no(diagnostics.relay().gateway_connected()),
yes_no(diagnostics.relay().route_pinned()),
@@ -465,6 +465,8 @@ fn format_client_diagnostics(diagnostics: &ClientDiagnostics) -> String {
optional_label(diagnostics.tap().ip()),
stats.ethernet_frames_tx(),
stats.ethernet_frames_rx(),
stats.broadcast_frames_tx(),
stats.broadcast_frames_rx(),
stats.datagrams_tx(),
stats.datagrams_rx(),
stats.dropped_frames(),
@@ -851,12 +853,12 @@ mod tests {
Some(1200),
Some("10.73.42.51".parse().unwrap()),
),
TunnelStats::new(1, 2, 3, 4, 5, 6),
TunnelStats::new(1, 2, 3, 4, 5, 6).with_broadcast_frames(7, 8),
);
assert_eq!(
format_client_diagnostics(&diagnostics),
"client diagnostics: relay reachable yes gateway connected yes route pinned yes; QUIC datagrams yes max 1400; TAP found yes MAC 02:00:00:00:00:01 MTU 1200 IP 10.73.42.51; frames tx 1 rx 2 datagrams tx 3 rx 4 drops 5 malformed 6"
"client diagnostics: relay reachable yes gateway connected yes route pinned yes; QUIC datagrams yes max 1400; TAP found yes MAC 02:00:00:00:00:01 MTU 1200 IP 10.73.42.51; frames tx 1 rx 2 broadcast tx 7 rx 8 datagrams tx 3 rx 4 drops 5 malformed 6"
);
}
@@ -871,7 +873,7 @@ mod tests {
assert_eq!(
format_client_diagnostics(&diagnostics),
"client diagnostics: relay reachable yes gateway connected no route pinned no; QUIC datagrams no max unknown; TAP found no MAC unknown MTU unknown IP unknown; frames tx 0 rx 0 datagrams tx 0 rx 0 drops 0 malformed 0"
"client diagnostics: relay reachable yes gateway connected no route pinned no; QUIC datagrams no max unknown; TAP found no MAC unknown MTU unknown IP unknown; frames tx 0 rx 0 broadcast tx 0 rx 0 datagrams tx 0 rx 0 drops 0 malformed 0"
);
}
+64 -12
View File
@@ -366,10 +366,13 @@ fn send_gateway_ethernet(
stats: &GatewayTunnelStats,
frame: &[u8],
) -> Result<()> {
if let Err(error) = EthernetFrame::parse(frame) {
stats.record_malformed_frame();
return Err(error).context("gateway Ethernet frame is malformed");
}
let ethernet_frame = match EthernetFrame::parse(frame) {
Ok(frame) => frame,
Err(error) => {
stats.record_malformed_frame();
return Err(error).context("gateway Ethernet frame is malformed");
}
};
let datagram = encode_datagram(
FrameType::Ethernet,
welcome.room_id(),
@@ -382,7 +385,7 @@ fn send_gateway_ethernet(
connection
.send_datagram(Bytes::from(datagram))
.context("failed to send gateway Ethernet datagram")?;
stats.record_ethernet_tx();
stats.record_ethernet_tx(ethernet_frame);
Ok(())
}
@@ -407,12 +410,15 @@ async fn recv_gateway_ethernet(
stats.record_dropped_frame();
continue;
}
if EthernetFrame::parse(packet.payload()).is_err() {
stats.record_malformed_frame();
continue;
}
let ethernet_frame = match EthernetFrame::parse(packet.payload()) {
Ok(frame) => frame,
Err(_) => {
stats.record_malformed_frame();
continue;
}
};
stats.record_ethernet_rx();
stats.record_ethernet_rx(ethernet_frame);
return Ok(ReceivedEthernetFrame {
source_peer_id: header.peer_id(),
payload: Bytes::copy_from_slice(packet.payload()),
@@ -459,6 +465,8 @@ async fn send_gateway_control_event(
struct GatewayTunnelStats {
ethernet_frames_tx: AtomicU64,
ethernet_frames_rx: AtomicU64,
broadcast_frames_tx: AtomicU64,
broadcast_frames_rx: AtomicU64,
datagrams_tx: AtomicU64,
datagrams_rx: AtomicU64,
dropped_frames: AtomicU64,
@@ -466,13 +474,19 @@ struct GatewayTunnelStats {
}
impl GatewayTunnelStats {
fn record_ethernet_tx(&self) {
fn record_ethernet_tx(&self, frame: EthernetFrame<'_>) {
self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed);
if frame.is_broadcast() {
self.broadcast_frames_tx.fetch_add(1, Ordering::Relaxed);
}
self.datagrams_tx.fetch_add(1, Ordering::Relaxed);
}
fn record_ethernet_rx(&self) {
fn record_ethernet_rx(&self, frame: EthernetFrame<'_>) {
self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed);
if frame.is_broadcast() {
self.broadcast_frames_rx.fetch_add(1, Ordering::Relaxed);
}
}
fn record_datagram_rx(&self) {
@@ -497,6 +511,10 @@ impl GatewayTunnelStats {
self.dropped_frames.load(Ordering::Relaxed),
self.malformed_frames.load(Ordering::Relaxed),
)
.with_broadcast_frames(
self.broadcast_frames_tx.load(Ordering::Relaxed),
self.broadcast_frames_rx.load(Ordering::Relaxed),
)
}
}
@@ -986,6 +1004,31 @@ mod tests {
.unwrap();
}
#[test]
fn snapshots_gateway_broadcast_stats() {
let stats = GatewayTunnelStats::default();
let broadcast_tx_bytes = broadcast_ethernet_frame(b"broadcast tx");
let broadcast_rx_bytes = broadcast_ethernet_frame(b"broadcast rx");
let broadcast_tx = EthernetFrame::parse(&broadcast_tx_bytes).unwrap();
let broadcast_rx = EthernetFrame::parse(&broadcast_rx_bytes).unwrap();
stats.record_ethernet_tx(broadcast_tx);
stats.record_datagram_rx();
stats.record_ethernet_rx(broadcast_rx);
stats.record_dropped_frame();
stats.record_malformed_frame();
let snapshot = stats.snapshot();
assert_eq!(snapshot.ethernet_frames_tx(), 1);
assert_eq!(snapshot.ethernet_frames_rx(), 1);
assert_eq!(snapshot.broadcast_frames_tx(), 1);
assert_eq!(snapshot.broadcast_frames_rx(), 1);
assert_eq!(snapshot.datagrams_tx(), 1);
assert_eq!(snapshot.datagrams_rx(), 1);
assert_eq!(snapshot.dropped_frames(), 2);
assert_eq!(snapshot.malformed_frames(), 1);
}
#[cfg(target_os = "linux")]
#[test]
fn builds_padded_cam_refresh_frame() {
@@ -1106,4 +1149,13 @@ mod tests {
frame.extend_from_slice(payload);
frame
}
fn broadcast_ethernet_frame(payload: &[u8]) -> Vec<u8> {
let mut frame = Vec::new();
frame.extend_from_slice(&MacAddr::BROADCAST.octets());
frame.extend_from_slice(&[0x02, 0, 0, 0, 0, 1]);
frame.extend_from_slice(&0x0800_u16.to_be_bytes());
frame.extend_from_slice(payload);
frame
}
}
+3
View File
@@ -6,3 +6,6 @@ edition.workspace = true
[dependencies]
lanparty-proto = { path = "../lanparty-proto" }
serde.workspace = true
[dev-dependencies]
serde_json.workspace = true
+48 -1
View File
@@ -135,6 +135,10 @@ impl FrameLog {
pub struct TunnelStats {
ethernet_frames_tx: u64,
ethernet_frames_rx: u64,
#[serde(default)]
broadcast_frames_tx: u64,
#[serde(default)]
broadcast_frames_rx: u64,
datagrams_tx: u64,
datagrams_rx: u64,
dropped_frames: u64,
@@ -154,6 +158,8 @@ impl TunnelStats {
Self {
ethernet_frames_tx,
ethernet_frames_rx,
broadcast_frames_tx: 0,
broadcast_frames_rx: 0,
datagrams_tx,
datagrams_rx,
dropped_frames,
@@ -161,6 +167,17 @@ impl TunnelStats {
}
}
#[must_use]
pub const fn with_broadcast_frames(
mut self,
broadcast_frames_tx: u64,
broadcast_frames_rx: u64,
) -> Self {
self.broadcast_frames_tx = broadcast_frames_tx;
self.broadcast_frames_rx = broadcast_frames_rx;
self
}
#[must_use]
pub const fn ethernet_frames_tx(&self) -> u64 {
self.ethernet_frames_tx
@@ -171,6 +188,16 @@ impl TunnelStats {
self.ethernet_frames_rx
}
#[must_use]
pub const fn broadcast_frames_tx(&self) -> u64 {
self.broadcast_frames_tx
}
#[must_use]
pub const fn broadcast_frames_rx(&self) -> u64 {
self.broadcast_frames_rx
}
#[must_use]
pub const fn datagrams_tx(&self) -> u64 {
self.datagrams_tx
@@ -433,7 +460,7 @@ mod tests {
#[test]
fn exposes_client_diagnostics() {
let mac = MacAddr::new([0x02, 1, 2, 3, 4, 5]);
let stats = TunnelStats::new(1, 2, 3, 4, 5, 6);
let stats = TunnelStats::new(1, 2, 3, 4, 5, 6).with_broadcast_frames(7, 8);
let diagnostics = ClientDiagnostics::new(
RelayDiagnostics::new(true, true, true),
QuicDiagnostics::new(true, Some(1400)),
@@ -455,6 +482,26 @@ mod tests {
assert_eq!(diagnostics.tap().mac(), Some(mac));
assert_eq!(diagnostics.tap().mtu(), Some(1200));
assert_eq!(diagnostics.tap().ip().unwrap().to_string(), "10.73.42.51");
assert_eq!(diagnostics.stats().broadcast_frames_tx(), 7);
assert_eq!(diagnostics.stats().broadcast_frames_rx(), 8);
assert_eq!(diagnostics.stats().dropped_frames(), 5);
}
#[test]
fn defaults_missing_broadcast_stats_to_zero() {
let stats: TunnelStats = serde_json::from_str(
r#"{
"ethernet_frames_tx": 1,
"ethernet_frames_rx": 2,
"datagrams_tx": 3,
"datagrams_rx": 4,
"dropped_frames": 5,
"malformed_frames": 6
}"#,
)
.unwrap();
assert_eq!(stats.broadcast_frames_tx(), 0);
assert_eq!(stats.broadcast_frames_rx(), 0);
}
}
+3 -1
View File
@@ -539,12 +539,14 @@ fn relay_frame_log_line(
fn peer_stats_log_line(accepted: &AcceptedPeer, stats: &TunnelStats) -> String {
format!(
"peer stats room={} peer_id={} role={:?} frames_tx={} frames_rx={} datagrams_tx={} datagrams_rx={} drops={} malformed={}",
"peer stats room={} peer_id={} role={:?} frames_tx={} frames_rx={} broadcast_tx={} broadcast_rx={} datagrams_tx={} datagrams_rx={} drops={} malformed={}",
accepted.room,
accepted.peer.peer_id(),
accepted.peer.role(),
stats.ethernet_frames_tx(),
stats.ethernet_frames_rx(),
stats.broadcast_frames_tx(),
stats.broadcast_frames_rx(),
stats.datagrams_tx(),
stats.datagrams_rx(),
stats.dropped_frames(),