From dffb490afef1cec8ed692c35e5e702d4af162d18 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Thu, 21 May 2026 20:50:15 +0200 Subject: [PATCH] feat(relay): receive peer stats streams Peers can now send post-handshake Stats messages over unidirectional control streams. The relay listens for those streams alongside Ethernet datagrams, stores the latest snapshot on the peer session, and logs the counters with room, peer, and role context. Unexpected post-handshake control messages now close the peer as a protocol error. Peer-requested Disconnect messages leave the room through the normal cleanup path, which keeps lifecycle notifications centralized. Test Plan: - cargo fmt --check - cargo test -p lanparty-relay \ forwards_ethernet_datagrams_between_joined_peers \ -- --nocapture - cargo test -p lanparty-relay - cargo clippy -p lanparty-relay --all-targets -- -D warnings - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings - git diff --check Refs: PLAN.md --- README.md | 1 + crates/lanparty-relay/src/server.rs | 158 ++++++++++++++++++++++++++-- 2 files changed, 151 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 3b6a7d2..7fe4f81 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,7 @@ Public relay binary and relay-owned room state: - L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames - client broadcast/multicast, unknown-unicast, and total bandwidth limiting - malformed peer datagram disconnect threshold +- peer stats control events retained for relay diagnostics - peer leave cleanup for room membership and MAC indexes ## Build diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index 9dd6084..45edce3 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -1,16 +1,16 @@ use std::{fs, net::SocketAddr, path::Path, sync::Arc}; -use anyhow::{Context, Result, anyhow}; +use anyhow::{Context, Result, anyhow, bail}; use bytes::Bytes; use lanparty_ctrl::{ CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, DisconnectReason, EndpointHello, MAX_CONTROL_MESSAGE_LEN, PeerInfo, RELAY_ALPN, Reject, RejectReason, Role, RoomCode, ServerWelcome, decode_control_frame, encode_control_message, }; -use lanparty_obs::{DropReason, FrameDirection, FrameLog}; +use lanparty_obs::{DropReason, FrameDirection, FrameLog, TunnelStats}; use lanparty_proto::{EthernetFrame, FrameType, decode_datagram, encode_datagram}; use quinn::crypto::rustls::QuicServerConfig; -use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig}; +use quinn::{Endpoint, Incoming, RecvStream, SendStream, ServerConfig, TransportConfig}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use std::collections::HashMap; use tokio::sync::Mutex; @@ -57,6 +57,7 @@ impl PeerKey { struct PeerSession { connection: quinn::Connection, max_datagram_size: usize, + latest_stats: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -65,6 +66,12 @@ enum PeerDatagramOutcome { Malformed, } +#[derive(Debug, Clone, PartialEq, Eq)] +enum PeerControlOutcome { + Continue, + Close(PeerClose), +} + #[derive(Debug, Clone, PartialEq, Eq)] struct PeerClose { reason: DisconnectReason, @@ -225,7 +232,7 @@ async fn handle_incoming_connection( accepted.welcome.effective_tap_mtu() ); - let close = run_peer_datagrams(&rooms, &sessions, &accepted, &connection).await; + let close = run_peer_io(&rooms, &sessions, &accepted, &connection).await; let leave = leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?; notify_peer_left( &sessions, @@ -289,11 +296,12 @@ async fn register_peer( PeerSession { connection, max_datagram_size: accepted.max_datagram_size, + latest_stats: None, }, ); } -async fn run_peer_datagrams( +async fn run_peer_io( rooms: &Arc>, sessions: &Arc>>, accepted: &AcceptedPeer, @@ -302,8 +310,13 @@ async fn run_peer_datagrams( let mut malformed_tracker = MalformedDatagramTracker::default(); loop { - match connection.read_datagram().await { - Ok(datagram) => { + tokio::select! { + datagram = connection.read_datagram() => { + let datagram = match datagram { + Ok(datagram) => datagram, + Err(error) => return PeerClose::normal(error.to_string()), + }; + match forward_peer_datagram(rooms, sessions, accepted, datagram).await { Ok(PeerDatagramOutcome::Accepted) => {} Ok(PeerDatagramOutcome::Malformed) => { @@ -321,7 +334,22 @@ async fn run_peer_datagrams( } } } - Err(error) => return PeerClose::normal(error.to_string()), + stream = connection.accept_uni() => { + let stream = match stream { + Ok(stream) => stream, + Err(error) => return PeerClose::normal(error.to_string()), + }; + + match handle_peer_control_stream(sessions, accepted, stream).await { + Ok(PeerControlOutcome::Continue) => {} + Ok(PeerControlOutcome::Close(close)) => return close, + Err(error) => { + let reason = format!("invalid peer control stream: {error:#}"); + connection.close(0_u32.into(), reason.as_bytes()); + return PeerClose::protocol_error(reason); + } + } + } } } } @@ -396,6 +424,64 @@ async fn forward_peer_datagram( Ok(PeerDatagramOutcome::Accepted) } +async fn handle_peer_control_stream( + sessions: &Arc>>, + accepted: &AcceptedPeer, + mut recv: RecvStream, +) -> Result { + let frame = recv + .read_to_end(MAX_CONTROL_FRAME_LEN) + .await + .context("failed to read peer control stream")?; + let message = decode_control_frame(&frame).context("failed to decode peer control stream")?; + + handle_peer_control_message(sessions, accepted, message).await +} + +async fn handle_peer_control_message( + sessions: &Arc>>, + accepted: &AcceptedPeer, + message: ControlMessage, +) -> Result { + match message { + ControlMessage::Stats(stats) => { + record_peer_stats( + sessions, + &accepted.room, + accepted.peer.peer_id(), + stats.clone(), + ) + .await; + println!("{}", peer_stats_log_line(accepted, &stats)); + + Ok(PeerControlOutcome::Continue) + } + ControlMessage::Disconnect { reason, message } => { + Ok(PeerControlOutcome::Close(PeerClose { reason, message })) + } + other => bail!( + "unexpected post-handshake control message from peer {} in room {}: {other:?}", + accepted.peer.peer_id(), + accepted.room + ), + } +} + +async fn record_peer_stats( + sessions: &Arc>>, + room: &RoomCode, + peer_id: u32, + stats: TunnelStats, +) { + if let Some(session) = sessions + .lock() + .await + .get_mut(&PeerKey::new(room.clone(), peer_id)) + { + session.latest_stats = Some(stats); + } +} + fn relay_frame_log_line( room: &RoomCode, ingress_peer_id: u32, @@ -448,6 +534,21 @@ fn relay_frame_log_line( ) } +fn peer_stats_log_line(accepted: &AcceptedPeer, stats: &TunnelStats) -> String { + format!( + "peer stats room={} peer_id={} role={:?} frames_tx={} frames_rx={} datagrams_tx={} datagrams_rx={} drops={} malformed={}", + accepted.room, + accepted.peer.peer_id(), + accepted.peer.role(), + stats.ethernet_frames_tx(), + stats.ethernet_frames_rx(), + stats.datagrams_tx(), + stats.datagrams_rx(), + stats.dropped_frames(), + stats.malformed_frames() + ) +} + async fn collect_target_sessions( sessions: &Arc>>, room: &RoomCode, @@ -953,6 +1054,16 @@ mod tests { assert_eq!(peer.role(), Role::Client); assert_eq!(peer.mac(), Some(first_mac)); + let stats = TunnelStats::new(1, 2, 3, 4, 5, 6); + send_peer_control_event(&first_connection, ControlMessage::Stats(stats.clone())).await; + wait_for_peer_stats( + &sessions, + &RoomCode::new("TESTROOM").unwrap(), + first_welcome.peer_id(), + &stats, + ) + .await; + let ethernet = ethernet_frame(second_mac, first_mac); let datagram = encode_datagram( FrameType::Ethernet, @@ -1060,6 +1171,37 @@ mod tests { decode_control_frame(&frame).unwrap() } + async fn send_peer_control_event(connection: &quinn::Connection, message: ControlMessage) { + let mut send = connection.open_uni().await.unwrap(); + let frame = encode_control_message(&message).unwrap(); + send.write_all(&frame).await.unwrap(); + send.finish().unwrap(); + } + + async fn wait_for_peer_stats( + sessions: &Arc>>, + room: &RoomCode, + peer_id: u32, + expected: &TunnelStats, + ) { + tokio::time::timeout(Duration::from_secs(5), async { + loop { + let latest_stats = sessions + .lock() + .await + .get(&PeerKey::new(room.clone(), peer_id)) + .and_then(|session| session.latest_stats.clone()); + if latest_stats.as_ref() == Some(expected) { + return; + } + + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + } + async fn welcome_for_client(connection: &quinn::Connection, mac: MacAddr) -> ServerWelcome { let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap(); let response = request_control_message(connection, ControlMessage::Hello(hello))