feat(gateway): report tunnel stats to relay
Gateway traffic now contributes to the shared TunnelStats model. The gateway counts Ethernet frames sent to the relay, Ethernet frames received from the relay, incoming relay datagrams, dropped datagrams, and malformed frames. Expose a gateway stats snapshot and send it to the relay over the same reliable post-handshake Stats control stream used by clients. The Linux bridge loop sends periodic stats snapshots as diagnostics; send failures are logged instead of bringing down the bridge because stats are observational. The gateway relay-session test now has the server decode a Stats event before shutdown, so the stats control path is covered without a close race. Test Plan: - cargo fmt --check - cargo test -p lanparty-gateway \ connects_to_relay_control_stream_as_gateway -- --nocapture - cargo test -p lanparty-gateway - cargo clippy -p lanparty-gateway --all-targets -- -D warnings - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings - git diff --check Refs: PLAN.md
This commit is contained in:
@@ -12,7 +12,10 @@ use std::{
|
||||
fs,
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
@@ -22,6 +25,7 @@ use lanparty_ctrl::{
|
||||
CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN,
|
||||
RoomCode, ServerWelcome, decode_control_frame, encode_control_message,
|
||||
};
|
||||
use lanparty_obs::TunnelStats;
|
||||
#[cfg(target_os = "linux")]
|
||||
use lanparty_obs::{FrameAction, FrameDirection, FrameLog};
|
||||
use lanparty_proto::{
|
||||
@@ -40,6 +44,8 @@ const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MES
|
||||
#[cfg(target_os = "linux")]
|
||||
const CAM_REFRESH_INTERVAL: Duration = Duration::from_secs(60);
|
||||
#[cfg(target_os = "linux")]
|
||||
const GATEWAY_STATS_INTERVAL: Duration = Duration::from_secs(10);
|
||||
#[cfg(target_os = "linux")]
|
||||
// Local experimental EtherType for frames whose only job is switch MAC learning.
|
||||
const CAM_REFRESH_ETHERTYPE: u16 = 0x88b5;
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -181,6 +187,7 @@ pub struct GatewayConnection {
|
||||
connection: quinn::Connection,
|
||||
config: GatewayConfig,
|
||||
welcome: ServerWelcome,
|
||||
stats: Arc<GatewayTunnelStats>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -213,11 +220,20 @@ impl GatewayConnection {
|
||||
}
|
||||
|
||||
pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> {
|
||||
send_gateway_ethernet(&self.connection, &self.welcome, frame)
|
||||
send_gateway_ethernet(&self.connection, &self.welcome, &self.stats, frame)
|
||||
}
|
||||
|
||||
pub async fn recv_ethernet(&self) -> Result<ReceivedEthernetFrame> {
|
||||
recv_gateway_ethernet(&self.connection, &self.welcome).await
|
||||
recv_gateway_ethernet(&self.connection, &self.welcome, &self.stats).await
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn stats_snapshot(&self) -> TunnelStats {
|
||||
self.stats.snapshot()
|
||||
}
|
||||
|
||||
pub async fn send_stats_snapshot(&self) -> Result<()> {
|
||||
send_gateway_stats(&self.connection, self.stats.snapshot()).await
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -234,8 +250,14 @@ impl GatewayConnection {
|
||||
endpoint,
|
||||
connection,
|
||||
welcome,
|
||||
stats,
|
||||
..
|
||||
} = self;
|
||||
let mut stats_tick = tokio::time::interval_at(
|
||||
tokio::time::Instant::now() + GATEWAY_STATS_INTERVAL,
|
||||
GATEWAY_STATS_INTERVAL,
|
||||
);
|
||||
stats_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -247,7 +269,7 @@ impl GatewayConnection {
|
||||
}
|
||||
lan_frame = read_lan_ethernet(&packet_socket) => {
|
||||
let lan_frame = lan_frame?;
|
||||
send_gateway_ethernet(&connection, &welcome, &lan_frame)?;
|
||||
send_gateway_ethernet(&connection, &welcome, &stats, &lan_frame)?;
|
||||
println!(
|
||||
"{}",
|
||||
gateway_frame_log_line(
|
||||
@@ -260,7 +282,7 @@ impl GatewayConnection {
|
||||
)
|
||||
);
|
||||
}
|
||||
relay_frame = recv_gateway_ethernet(&connection, &welcome) => {
|
||||
relay_frame = recv_gateway_ethernet(&connection, &welcome, &stats) => {
|
||||
let relay_frame = relay_frame?;
|
||||
cam_refresh.observe_remote_frame(relay_frame.payload())?;
|
||||
write_lan_ethernet(&packet_socket, relay_frame.payload()).await?;
|
||||
@@ -281,6 +303,11 @@ impl GatewayConnection {
|
||||
write_lan_ethernet(&packet_socket, &frame).await?;
|
||||
}
|
||||
}
|
||||
_ = stats_tick.tick() => {
|
||||
if let Err(error) = send_gateway_stats(&connection, stats.snapshot()).await {
|
||||
eprintln!("failed to send gateway stats to relay: {error:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -294,9 +321,13 @@ impl GatewayConnection {
|
||||
fn send_gateway_ethernet(
|
||||
connection: &quinn::Connection,
|
||||
welcome: &ServerWelcome,
|
||||
stats: &GatewayTunnelStats,
|
||||
frame: &[u8],
|
||||
) -> Result<()> {
|
||||
EthernetFrame::parse(frame).context("gateway Ethernet frame is malformed")?;
|
||||
if let Err(error) = EthernetFrame::parse(frame) {
|
||||
stats.record_malformed_frame();
|
||||
return Err(error).context("gateway Ethernet frame is malformed");
|
||||
}
|
||||
let datagram = encode_datagram(
|
||||
FrameType::Ethernet,
|
||||
welcome.room_id(),
|
||||
@@ -309,6 +340,7 @@ fn send_gateway_ethernet(
|
||||
connection
|
||||
.send_datagram(Bytes::from(datagram))
|
||||
.context("failed to send gateway Ethernet datagram")?;
|
||||
stats.record_ethernet_tx();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -316,10 +348,13 @@ fn send_gateway_ethernet(
|
||||
async fn recv_gateway_ethernet(
|
||||
connection: &quinn::Connection,
|
||||
welcome: &ServerWelcome,
|
||||
stats: &GatewayTunnelStats,
|
||||
) -> Result<ReceivedEthernetFrame> {
|
||||
loop {
|
||||
let datagram = connection.read_datagram().await?;
|
||||
stats.record_datagram_rx();
|
||||
let Ok(packet) = decode_datagram(&datagram) else {
|
||||
stats.record_malformed_frame();
|
||||
continue;
|
||||
};
|
||||
let header = packet.header();
|
||||
@@ -327,12 +362,15 @@ async fn recv_gateway_ethernet(
|
||||
|| header.room_id() != welcome.room_id()
|
||||
|| header.peer_id() == welcome.peer_id()
|
||||
{
|
||||
stats.record_dropped_frame();
|
||||
continue;
|
||||
}
|
||||
if EthernetFrame::parse(packet.payload()).is_err() {
|
||||
stats.record_malformed_frame();
|
||||
continue;
|
||||
}
|
||||
|
||||
stats.record_ethernet_rx();
|
||||
return Ok(ReceivedEthernetFrame {
|
||||
source_peer_id: header.peer_id(),
|
||||
payload: Bytes::copy_from_slice(packet.payload()),
|
||||
@@ -340,6 +378,66 @@ async fn recv_gateway_ethernet(
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_gateway_stats(connection: &quinn::Connection, stats: TunnelStats) -> Result<()> {
|
||||
let mut send = connection
|
||||
.open_uni()
|
||||
.await
|
||||
.context("failed to open gateway stats stream")?;
|
||||
let frame = encode_control_message(&ControlMessage::Stats(stats))
|
||||
.context("failed to encode gateway stats")?;
|
||||
send.write_all(&frame)
|
||||
.await
|
||||
.context("failed to write gateway stats")?;
|
||||
send.finish()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct GatewayTunnelStats {
|
||||
ethernet_frames_tx: AtomicU64,
|
||||
ethernet_frames_rx: AtomicU64,
|
||||
datagrams_tx: AtomicU64,
|
||||
datagrams_rx: AtomicU64,
|
||||
dropped_frames: AtomicU64,
|
||||
malformed_frames: AtomicU64,
|
||||
}
|
||||
|
||||
impl GatewayTunnelStats {
|
||||
fn record_ethernet_tx(&self) {
|
||||
self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed);
|
||||
self.datagrams_tx.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn record_ethernet_rx(&self) {
|
||||
self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn record_datagram_rx(&self) {
|
||||
self.datagrams_rx.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn record_dropped_frame(&self) {
|
||||
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn record_malformed_frame(&self) {
|
||||
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
|
||||
self.malformed_frames.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn snapshot(&self) -> TunnelStats {
|
||||
TunnelStats::new(
|
||||
self.ethernet_frames_tx.load(Ordering::Relaxed),
|
||||
self.ethernet_frames_rx.load(Ordering::Relaxed),
|
||||
self.datagrams_tx.load(Ordering::Relaxed),
|
||||
self.datagrams_rx.load(Ordering::Relaxed),
|
||||
self.dropped_frames.load(Ordering::Relaxed),
|
||||
self.malformed_frames.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn gateway_frame_log_line(
|
||||
interface: &str,
|
||||
@@ -502,6 +600,7 @@ pub async fn connect_gateway(config: GatewayConfig) -> Result<GatewayConnection>
|
||||
connection,
|
||||
config,
|
||||
welcome,
|
||||
stats: Arc::default(),
|
||||
}),
|
||||
ControlMessage::Reject(reject) => bail!(
|
||||
"relay rejected gateway hello: {:?}: {}",
|
||||
@@ -616,6 +715,7 @@ mod tests {
|
||||
let (server_config, certificate) = test_server_config();
|
||||
let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap()).unwrap();
|
||||
let server_addr = endpoint.local_addr().unwrap();
|
||||
let (stats_received_tx, stats_received_rx) = tokio::sync::oneshot::channel();
|
||||
let server_task = tokio::spawn(async move {
|
||||
let incoming = endpoint.accept().await.unwrap();
|
||||
let connection = incoming.await.unwrap();
|
||||
@@ -654,6 +754,15 @@ mod tests {
|
||||
.unwrap();
|
||||
connection.send_datagram(Bytes::from(response)).unwrap();
|
||||
|
||||
let mut stats_recv = connection.accept_uni().await.unwrap();
|
||||
let stats_frame = stats_recv.read_to_end(MAX_CONTROL_FRAME_LEN).await.unwrap();
|
||||
let stats_message = decode_control_frame(&stats_frame).unwrap();
|
||||
let ControlMessage::Stats(stats) = stats_message else {
|
||||
panic!("expected gateway stats event");
|
||||
};
|
||||
assert_eq!(stats, TunnelStats::new(1, 1, 1, 1, 1, 1));
|
||||
stats_received_tx.send(()).unwrap();
|
||||
|
||||
connection.closed().await;
|
||||
endpoint.close(0_u32.into(), b"test complete");
|
||||
endpoint.wait_idle().await;
|
||||
@@ -682,6 +791,15 @@ mod tests {
|
||||
assert_eq!(received.source_peer_id(), 99);
|
||||
assert_eq!(received.payload(), ethernet_frame(b"from relay").as_slice());
|
||||
|
||||
assert!(gateway.send_ethernet(&[0; 4]).is_err());
|
||||
let stats = gateway.stats_snapshot();
|
||||
assert_eq!(stats, TunnelStats::new(1, 1, 1, 1, 1, 1));
|
||||
|
||||
gateway.send_stats_snapshot().await.unwrap();
|
||||
tokio::time::timeout(Duration::from_secs(5), stats_received_rx)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
gateway.shutdown("test complete").await;
|
||||
tokio::time::timeout(Duration::from_secs(5), server_task)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user