diff --git a/README.md b/README.md index db7ab93..c5bbfc7 100644 --- a/README.md +++ b/README.md @@ -137,7 +137,8 @@ between the relay and wired LAN until shutdown. It tracks remote-client source MACs seen from relay traffic and periodically emits small CAM refresh frames so the physical switch keeps those MACs associated with the gateway port. Gateway frame logs include direction, peer id when present, MACs, ethertype/length, -frame length, action, and drop reason. +frame length, action, and drop reason. The gateway also tracks frame/datagram +counters and periodically sends stats snapshots to the relay. ## Windows Client diff --git a/crates/lanparty-gateway/src/lib.rs b/crates/lanparty-gateway/src/lib.rs index 32a0449..93f4309 100644 --- a/crates/lanparty-gateway/src/lib.rs +++ b/crates/lanparty-gateway/src/lib.rs @@ -12,7 +12,10 @@ use std::{ fs, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, path::PathBuf, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, }; use anyhow::{Context, Result, bail}; @@ -22,6 +25,7 @@ use lanparty_ctrl::{ CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN, RoomCode, ServerWelcome, decode_control_frame, encode_control_message, }; +use lanparty_obs::TunnelStats; #[cfg(target_os = "linux")] use lanparty_obs::{FrameAction, FrameDirection, FrameLog}; use lanparty_proto::{ @@ -40,6 +44,8 @@ const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MES #[cfg(target_os = "linux")] const CAM_REFRESH_INTERVAL: Duration = Duration::from_secs(60); #[cfg(target_os = "linux")] +const GATEWAY_STATS_INTERVAL: Duration = Duration::from_secs(10); +#[cfg(target_os = "linux")] // Local experimental EtherType for frames whose only job is switch MAC learning. const CAM_REFRESH_ETHERTYPE: u16 = 0x88b5; #[cfg(target_os = "linux")] @@ -181,6 +187,7 @@ pub struct GatewayConnection { connection: quinn::Connection, config: GatewayConfig, welcome: ServerWelcome, + stats: Arc, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -213,11 +220,20 @@ impl GatewayConnection { } pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> { - send_gateway_ethernet(&self.connection, &self.welcome, frame) + send_gateway_ethernet(&self.connection, &self.welcome, &self.stats, frame) } pub async fn recv_ethernet(&self) -> Result { - recv_gateway_ethernet(&self.connection, &self.welcome).await + recv_gateway_ethernet(&self.connection, &self.welcome, &self.stats).await + } + + #[must_use] + pub fn stats_snapshot(&self) -> TunnelStats { + self.stats.snapshot() + } + + pub async fn send_stats_snapshot(&self) -> Result<()> { + send_gateway_stats(&self.connection, self.stats.snapshot()).await } #[cfg(target_os = "linux")] @@ -234,8 +250,14 @@ impl GatewayConnection { endpoint, connection, welcome, + stats, .. } = self; + let mut stats_tick = tokio::time::interval_at( + tokio::time::Instant::now() + GATEWAY_STATS_INTERVAL, + GATEWAY_STATS_INTERVAL, + ); + stats_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { tokio::select! { @@ -247,7 +269,7 @@ impl GatewayConnection { } lan_frame = read_lan_ethernet(&packet_socket) => { let lan_frame = lan_frame?; - send_gateway_ethernet(&connection, &welcome, &lan_frame)?; + send_gateway_ethernet(&connection, &welcome, &stats, &lan_frame)?; println!( "{}", gateway_frame_log_line( @@ -260,7 +282,7 @@ impl GatewayConnection { ) ); } - relay_frame = recv_gateway_ethernet(&connection, &welcome) => { + relay_frame = recv_gateway_ethernet(&connection, &welcome, &stats) => { let relay_frame = relay_frame?; cam_refresh.observe_remote_frame(relay_frame.payload())?; write_lan_ethernet(&packet_socket, relay_frame.payload()).await?; @@ -281,6 +303,11 @@ impl GatewayConnection { write_lan_ethernet(&packet_socket, &frame).await?; } } + _ = stats_tick.tick() => { + if let Err(error) = send_gateway_stats(&connection, stats.snapshot()).await { + eprintln!("failed to send gateway stats to relay: {error:#}"); + } + } } } } @@ -294,9 +321,13 @@ impl GatewayConnection { fn send_gateway_ethernet( connection: &quinn::Connection, welcome: &ServerWelcome, + stats: &GatewayTunnelStats, frame: &[u8], ) -> Result<()> { - EthernetFrame::parse(frame).context("gateway Ethernet frame is malformed")?; + if let Err(error) = EthernetFrame::parse(frame) { + stats.record_malformed_frame(); + return Err(error).context("gateway Ethernet frame is malformed"); + } let datagram = encode_datagram( FrameType::Ethernet, welcome.room_id(), @@ -309,6 +340,7 @@ fn send_gateway_ethernet( connection .send_datagram(Bytes::from(datagram)) .context("failed to send gateway Ethernet datagram")?; + stats.record_ethernet_tx(); Ok(()) } @@ -316,10 +348,13 @@ fn send_gateway_ethernet( async fn recv_gateway_ethernet( connection: &quinn::Connection, welcome: &ServerWelcome, + stats: &GatewayTunnelStats, ) -> Result { loop { let datagram = connection.read_datagram().await?; + stats.record_datagram_rx(); let Ok(packet) = decode_datagram(&datagram) else { + stats.record_malformed_frame(); continue; }; let header = packet.header(); @@ -327,12 +362,15 @@ async fn recv_gateway_ethernet( || header.room_id() != welcome.room_id() || header.peer_id() == welcome.peer_id() { + stats.record_dropped_frame(); continue; } if EthernetFrame::parse(packet.payload()).is_err() { + stats.record_malformed_frame(); continue; } + stats.record_ethernet_rx(); return Ok(ReceivedEthernetFrame { source_peer_id: header.peer_id(), payload: Bytes::copy_from_slice(packet.payload()), @@ -340,6 +378,66 @@ async fn recv_gateway_ethernet( } } +async fn send_gateway_stats(connection: &quinn::Connection, stats: TunnelStats) -> Result<()> { + let mut send = connection + .open_uni() + .await + .context("failed to open gateway stats stream")?; + let frame = encode_control_message(&ControlMessage::Stats(stats)) + .context("failed to encode gateway stats")?; + send.write_all(&frame) + .await + .context("failed to write gateway stats")?; + send.finish()?; + + Ok(()) +} + +#[derive(Debug, Default)] +struct GatewayTunnelStats { + ethernet_frames_tx: AtomicU64, + ethernet_frames_rx: AtomicU64, + datagrams_tx: AtomicU64, + datagrams_rx: AtomicU64, + dropped_frames: AtomicU64, + malformed_frames: AtomicU64, +} + +impl GatewayTunnelStats { + fn record_ethernet_tx(&self) { + self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed); + self.datagrams_tx.fetch_add(1, Ordering::Relaxed); + } + + fn record_ethernet_rx(&self) { + self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed); + } + + fn record_datagram_rx(&self) { + self.datagrams_rx.fetch_add(1, Ordering::Relaxed); + } + + fn record_dropped_frame(&self) { + self.dropped_frames.fetch_add(1, Ordering::Relaxed); + } + + fn record_malformed_frame(&self) { + self.dropped_frames.fetch_add(1, Ordering::Relaxed); + self.malformed_frames.fetch_add(1, Ordering::Relaxed); + } + + fn snapshot(&self) -> TunnelStats { + TunnelStats::new( + self.ethernet_frames_tx.load(Ordering::Relaxed), + self.ethernet_frames_rx.load(Ordering::Relaxed), + self.datagrams_tx.load(Ordering::Relaxed), + self.datagrams_rx.load(Ordering::Relaxed), + self.dropped_frames.load(Ordering::Relaxed), + self.malformed_frames.load(Ordering::Relaxed), + ) + } +} + #[cfg(target_os = "linux")] fn gateway_frame_log_line( interface: &str, @@ -502,6 +600,7 @@ pub async fn connect_gateway(config: GatewayConfig) -> Result connection, config, welcome, + stats: Arc::default(), }), ControlMessage::Reject(reject) => bail!( "relay rejected gateway hello: {:?}: {}", @@ -616,6 +715,7 @@ mod tests { let (server_config, certificate) = test_server_config(); let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap()).unwrap(); let server_addr = endpoint.local_addr().unwrap(); + let (stats_received_tx, stats_received_rx) = tokio::sync::oneshot::channel(); let server_task = tokio::spawn(async move { let incoming = endpoint.accept().await.unwrap(); let connection = incoming.await.unwrap(); @@ -654,6 +754,15 @@ mod tests { .unwrap(); connection.send_datagram(Bytes::from(response)).unwrap(); + let mut stats_recv = connection.accept_uni().await.unwrap(); + let stats_frame = stats_recv.read_to_end(MAX_CONTROL_FRAME_LEN).await.unwrap(); + let stats_message = decode_control_frame(&stats_frame).unwrap(); + let ControlMessage::Stats(stats) = stats_message else { + panic!("expected gateway stats event"); + }; + assert_eq!(stats, TunnelStats::new(1, 1, 1, 1, 1, 1)); + stats_received_tx.send(()).unwrap(); + connection.closed().await; endpoint.close(0_u32.into(), b"test complete"); endpoint.wait_idle().await; @@ -682,6 +791,15 @@ mod tests { assert_eq!(received.source_peer_id(), 99); assert_eq!(received.payload(), ethernet_frame(b"from relay").as_slice()); + assert!(gateway.send_ethernet(&[0; 4]).is_err()); + let stats = gateway.stats_snapshot(); + assert_eq!(stats, TunnelStats::new(1, 1, 1, 1, 1, 1)); + + gateway.send_stats_snapshot().await.unwrap(); + tokio::time::timeout(Duration::from_secs(5), stats_received_rx) + .await + .unwrap() + .unwrap(); gateway.shutdown("test complete").await; tokio::time::timeout(Duration::from_secs(5), server_task) .await