diff --git a/README.md b/README.md index 3b6a7d2..7fe4f81 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,7 @@ Public relay binary and relay-owned room state: - L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames - client broadcast/multicast, unknown-unicast, and total bandwidth limiting - malformed peer datagram disconnect threshold +- peer stats control events retained for relay diagnostics - peer leave cleanup for room membership and MAC indexes ## Build diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index 9dd6084..45edce3 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -1,16 +1,16 @@ use std::{fs, net::SocketAddr, path::Path, sync::Arc}; -use anyhow::{Context, Result, anyhow}; +use anyhow::{Context, Result, anyhow, bail}; use bytes::Bytes; use lanparty_ctrl::{ CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, DisconnectReason, EndpointHello, MAX_CONTROL_MESSAGE_LEN, PeerInfo, RELAY_ALPN, Reject, RejectReason, Role, RoomCode, ServerWelcome, decode_control_frame, encode_control_message, }; -use lanparty_obs::{DropReason, FrameDirection, FrameLog}; +use lanparty_obs::{DropReason, FrameDirection, FrameLog, TunnelStats}; use lanparty_proto::{EthernetFrame, FrameType, decode_datagram, encode_datagram}; use quinn::crypto::rustls::QuicServerConfig; -use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig}; +use quinn::{Endpoint, Incoming, RecvStream, SendStream, ServerConfig, TransportConfig}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use std::collections::HashMap; use tokio::sync::Mutex; @@ -57,6 +57,7 @@ impl PeerKey { struct PeerSession { connection: quinn::Connection, max_datagram_size: usize, + latest_stats: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -65,6 +66,12 @@ enum PeerDatagramOutcome { Malformed, } +#[derive(Debug, Clone, PartialEq, Eq)] +enum PeerControlOutcome { + Continue, + Close(PeerClose), +} + #[derive(Debug, Clone, PartialEq, Eq)] struct PeerClose { reason: DisconnectReason, @@ -225,7 +232,7 @@ async fn handle_incoming_connection( accepted.welcome.effective_tap_mtu() ); - let close = run_peer_datagrams(&rooms, &sessions, &accepted, &connection).await; + let close = run_peer_io(&rooms, &sessions, &accepted, &connection).await; let leave = leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?; notify_peer_left( &sessions, @@ -289,11 +296,12 @@ async fn register_peer( PeerSession { connection, max_datagram_size: accepted.max_datagram_size, + latest_stats: None, }, ); } -async fn run_peer_datagrams( +async fn run_peer_io( rooms: &Arc>, sessions: &Arc>>, accepted: &AcceptedPeer, @@ -302,8 +310,13 @@ async fn run_peer_datagrams( let mut malformed_tracker = MalformedDatagramTracker::default(); loop { - match connection.read_datagram().await { - Ok(datagram) => { + tokio::select! { + datagram = connection.read_datagram() => { + let datagram = match datagram { + Ok(datagram) => datagram, + Err(error) => return PeerClose::normal(error.to_string()), + }; + match forward_peer_datagram(rooms, sessions, accepted, datagram).await { Ok(PeerDatagramOutcome::Accepted) => {} Ok(PeerDatagramOutcome::Malformed) => { @@ -321,7 +334,22 @@ async fn run_peer_datagrams( } } } - Err(error) => return PeerClose::normal(error.to_string()), + stream = connection.accept_uni() => { + let stream = match stream { + Ok(stream) => stream, + Err(error) => return PeerClose::normal(error.to_string()), + }; + + match handle_peer_control_stream(sessions, accepted, stream).await { + Ok(PeerControlOutcome::Continue) => {} + Ok(PeerControlOutcome::Close(close)) => return close, + Err(error) => { + let reason = format!("invalid peer control stream: {error:#}"); + connection.close(0_u32.into(), reason.as_bytes()); + return PeerClose::protocol_error(reason); + } + } + } } } } @@ -396,6 +424,64 @@ async fn forward_peer_datagram( Ok(PeerDatagramOutcome::Accepted) } +async fn handle_peer_control_stream( + sessions: &Arc>>, + accepted: &AcceptedPeer, + mut recv: RecvStream, +) -> Result { + let frame = recv + .read_to_end(MAX_CONTROL_FRAME_LEN) + .await + .context("failed to read peer control stream")?; + let message = decode_control_frame(&frame).context("failed to decode peer control stream")?; + + handle_peer_control_message(sessions, accepted, message).await +} + +async fn handle_peer_control_message( + sessions: &Arc>>, + accepted: &AcceptedPeer, + message: ControlMessage, +) -> Result { + match message { + ControlMessage::Stats(stats) => { + record_peer_stats( + sessions, + &accepted.room, + accepted.peer.peer_id(), + stats.clone(), + ) + .await; + println!("{}", peer_stats_log_line(accepted, &stats)); + + Ok(PeerControlOutcome::Continue) + } + ControlMessage::Disconnect { reason, message } => { + Ok(PeerControlOutcome::Close(PeerClose { reason, message })) + } + other => bail!( + "unexpected post-handshake control message from peer {} in room {}: {other:?}", + accepted.peer.peer_id(), + accepted.room + ), + } +} + +async fn record_peer_stats( + sessions: &Arc>>, + room: &RoomCode, + peer_id: u32, + stats: TunnelStats, +) { + if let Some(session) = sessions + .lock() + .await + .get_mut(&PeerKey::new(room.clone(), peer_id)) + { + session.latest_stats = Some(stats); + } +} + fn relay_frame_log_line( room: &RoomCode, ingress_peer_id: u32, @@ -448,6 +534,21 @@ fn relay_frame_log_line( ) } +fn peer_stats_log_line(accepted: &AcceptedPeer, stats: &TunnelStats) -> String { + format!( + "peer stats room={} peer_id={} role={:?} frames_tx={} frames_rx={} datagrams_tx={} datagrams_rx={} drops={} malformed={}", + accepted.room, + accepted.peer.peer_id(), + accepted.peer.role(), + stats.ethernet_frames_tx(), + stats.ethernet_frames_rx(), + stats.datagrams_tx(), + stats.datagrams_rx(), + stats.dropped_frames(), + stats.malformed_frames() + ) +} + async fn collect_target_sessions( sessions: &Arc>>, room: &RoomCode, @@ -953,6 +1054,16 @@ mod tests { assert_eq!(peer.role(), Role::Client); assert_eq!(peer.mac(), Some(first_mac)); + let stats = TunnelStats::new(1, 2, 3, 4, 5, 6); + send_peer_control_event(&first_connection, ControlMessage::Stats(stats.clone())).await; + wait_for_peer_stats( + &sessions, + &RoomCode::new("TESTROOM").unwrap(), + first_welcome.peer_id(), + &stats, + ) + .await; + let ethernet = ethernet_frame(second_mac, first_mac); let datagram = encode_datagram( FrameType::Ethernet, @@ -1060,6 +1171,37 @@ mod tests { decode_control_frame(&frame).unwrap() } + async fn send_peer_control_event(connection: &quinn::Connection, message: ControlMessage) { + let mut send = connection.open_uni().await.unwrap(); + let frame = encode_control_message(&message).unwrap(); + send.write_all(&frame).await.unwrap(); + send.finish().unwrap(); + } + + async fn wait_for_peer_stats( + sessions: &Arc>>, + room: &RoomCode, + peer_id: u32, + expected: &TunnelStats, + ) { + tokio::time::timeout(Duration::from_secs(5), async { + loop { + let latest_stats = sessions + .lock() + .await + .get(&PeerKey::new(room.clone(), peer_id)) + .and_then(|session| session.latest_stats.clone()); + if latest_stats.as_ref() == Some(expected) { + return; + } + + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + } + async fn welcome_for_client(connection: &quinn::Connection, mac: MacAddr) -> ServerWelcome { let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap(); let response = request_control_message(connection, ControlMessage::Hello(hello))