feat(relay): receive peer stats streams

Peers can now send post-handshake Stats messages over unidirectional control
streams. The relay listens for those streams alongside Ethernet datagrams,
stores the latest snapshot on the peer session, and logs the counters with
room, peer, and role context.

Unexpected post-handshake control messages now close the peer as a protocol
error. Peer-requested Disconnect messages leave the room through the normal
cleanup path, which keeps lifecycle notifications centralized.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-relay \
  forwards_ethernet_datagrams_between_joined_peers \
  -- --nocapture
- cargo test -p lanparty-relay
- cargo clippy -p lanparty-relay --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 20:50:15 +02:00
parent ffdcbf8d16
commit dffb490afe
2 changed files with 151 additions and 8 deletions
+1
View File
@@ -89,6 +89,7 @@ Public relay binary and relay-owned room state:
- L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames - L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames
- client broadcast/multicast, unknown-unicast, and total bandwidth limiting - client broadcast/multicast, unknown-unicast, and total bandwidth limiting
- malformed peer datagram disconnect threshold - malformed peer datagram disconnect threshold
- peer stats control events retained for relay diagnostics
- peer leave cleanup for room membership and MAC indexes - peer leave cleanup for room membership and MAC indexes
## Build ## Build
+149 -7
View File
@@ -1,16 +1,16 @@
use std::{fs, net::SocketAddr, path::Path, sync::Arc}; use std::{fs, net::SocketAddr, path::Path, sync::Arc};
use anyhow::{Context, Result, anyhow}; use anyhow::{Context, Result, anyhow, bail};
use bytes::Bytes; use bytes::Bytes;
use lanparty_ctrl::{ use lanparty_ctrl::{
CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, DisconnectReason, EndpointHello, CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, DisconnectReason, EndpointHello,
MAX_CONTROL_MESSAGE_LEN, PeerInfo, RELAY_ALPN, Reject, RejectReason, Role, RoomCode, MAX_CONTROL_MESSAGE_LEN, PeerInfo, RELAY_ALPN, Reject, RejectReason, Role, RoomCode,
ServerWelcome, decode_control_frame, encode_control_message, ServerWelcome, decode_control_frame, encode_control_message,
}; };
use lanparty_obs::{DropReason, FrameDirection, FrameLog}; use lanparty_obs::{DropReason, FrameDirection, FrameLog, TunnelStats};
use lanparty_proto::{EthernetFrame, FrameType, decode_datagram, encode_datagram}; use lanparty_proto::{EthernetFrame, FrameType, decode_datagram, encode_datagram};
use quinn::crypto::rustls::QuicServerConfig; use quinn::crypto::rustls::QuicServerConfig;
use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig}; use quinn::{Endpoint, Incoming, RecvStream, SendStream, ServerConfig, TransportConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@@ -57,6 +57,7 @@ impl PeerKey {
struct PeerSession { struct PeerSession {
connection: quinn::Connection, connection: quinn::Connection,
max_datagram_size: usize, max_datagram_size: usize,
latest_stats: Option<TunnelStats>,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -65,6 +66,12 @@ enum PeerDatagramOutcome {
Malformed, Malformed,
} }
#[derive(Debug, Clone, PartialEq, Eq)]
enum PeerControlOutcome {
Continue,
Close(PeerClose),
}
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
struct PeerClose { struct PeerClose {
reason: DisconnectReason, reason: DisconnectReason,
@@ -225,7 +232,7 @@ async fn handle_incoming_connection(
accepted.welcome.effective_tap_mtu() accepted.welcome.effective_tap_mtu()
); );
let close = run_peer_datagrams(&rooms, &sessions, &accepted, &connection).await; let close = run_peer_io(&rooms, &sessions, &accepted, &connection).await;
let leave = leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?; let leave = leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?;
notify_peer_left( notify_peer_left(
&sessions, &sessions,
@@ -289,11 +296,12 @@ async fn register_peer(
PeerSession { PeerSession {
connection, connection,
max_datagram_size: accepted.max_datagram_size, max_datagram_size: accepted.max_datagram_size,
latest_stats: None,
}, },
); );
} }
async fn run_peer_datagrams( async fn run_peer_io(
rooms: &Arc<Mutex<RoomRegistry>>, rooms: &Arc<Mutex<RoomRegistry>>,
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>, sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer, accepted: &AcceptedPeer,
@@ -302,8 +310,13 @@ async fn run_peer_datagrams(
let mut malformed_tracker = MalformedDatagramTracker::default(); let mut malformed_tracker = MalformedDatagramTracker::default();
loop { loop {
match connection.read_datagram().await { tokio::select! {
Ok(datagram) => { datagram = connection.read_datagram() => {
let datagram = match datagram {
Ok(datagram) => datagram,
Err(error) => return PeerClose::normal(error.to_string()),
};
match forward_peer_datagram(rooms, sessions, accepted, datagram).await { match forward_peer_datagram(rooms, sessions, accepted, datagram).await {
Ok(PeerDatagramOutcome::Accepted) => {} Ok(PeerDatagramOutcome::Accepted) => {}
Ok(PeerDatagramOutcome::Malformed) => { Ok(PeerDatagramOutcome::Malformed) => {
@@ -321,7 +334,22 @@ async fn run_peer_datagrams(
} }
} }
} }
stream = connection.accept_uni() => {
let stream = match stream {
Ok(stream) => stream,
Err(error) => return PeerClose::normal(error.to_string()), Err(error) => return PeerClose::normal(error.to_string()),
};
match handle_peer_control_stream(sessions, accepted, stream).await {
Ok(PeerControlOutcome::Continue) => {}
Ok(PeerControlOutcome::Close(close)) => return close,
Err(error) => {
let reason = format!("invalid peer control stream: {error:#}");
connection.close(0_u32.into(), reason.as_bytes());
return PeerClose::protocol_error(reason);
}
}
}
} }
} }
} }
@@ -396,6 +424,64 @@ async fn forward_peer_datagram(
Ok(PeerDatagramOutcome::Accepted) Ok(PeerDatagramOutcome::Accepted)
} }
async fn handle_peer_control_stream(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer,
mut recv: RecvStream,
) -> Result<PeerControlOutcome> {
let frame = recv
.read_to_end(MAX_CONTROL_FRAME_LEN)
.await
.context("failed to read peer control stream")?;
let message = decode_control_frame(&frame).context("failed to decode peer control stream")?;
handle_peer_control_message(sessions, accepted, message).await
}
async fn handle_peer_control_message(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer,
message: ControlMessage,
) -> Result<PeerControlOutcome> {
match message {
ControlMessage::Stats(stats) => {
record_peer_stats(
sessions,
&accepted.room,
accepted.peer.peer_id(),
stats.clone(),
)
.await;
println!("{}", peer_stats_log_line(accepted, &stats));
Ok(PeerControlOutcome::Continue)
}
ControlMessage::Disconnect { reason, message } => {
Ok(PeerControlOutcome::Close(PeerClose { reason, message }))
}
other => bail!(
"unexpected post-handshake control message from peer {} in room {}: {other:?}",
accepted.peer.peer_id(),
accepted.room
),
}
}
async fn record_peer_stats(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode,
peer_id: u32,
stats: TunnelStats,
) {
if let Some(session) = sessions
.lock()
.await
.get_mut(&PeerKey::new(room.clone(), peer_id))
{
session.latest_stats = Some(stats);
}
}
fn relay_frame_log_line( fn relay_frame_log_line(
room: &RoomCode, room: &RoomCode,
ingress_peer_id: u32, ingress_peer_id: u32,
@@ -448,6 +534,21 @@ fn relay_frame_log_line(
) )
} }
fn peer_stats_log_line(accepted: &AcceptedPeer, stats: &TunnelStats) -> String {
format!(
"peer stats room={} peer_id={} role={:?} frames_tx={} frames_rx={} datagrams_tx={} datagrams_rx={} drops={} malformed={}",
accepted.room,
accepted.peer.peer_id(),
accepted.peer.role(),
stats.ethernet_frames_tx(),
stats.ethernet_frames_rx(),
stats.datagrams_tx(),
stats.datagrams_rx(),
stats.dropped_frames(),
stats.malformed_frames()
)
}
async fn collect_target_sessions( async fn collect_target_sessions(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>, sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode, room: &RoomCode,
@@ -953,6 +1054,16 @@ mod tests {
assert_eq!(peer.role(), Role::Client); assert_eq!(peer.role(), Role::Client);
assert_eq!(peer.mac(), Some(first_mac)); assert_eq!(peer.mac(), Some(first_mac));
let stats = TunnelStats::new(1, 2, 3, 4, 5, 6);
send_peer_control_event(&first_connection, ControlMessage::Stats(stats.clone())).await;
wait_for_peer_stats(
&sessions,
&RoomCode::new("TESTROOM").unwrap(),
first_welcome.peer_id(),
&stats,
)
.await;
let ethernet = ethernet_frame(second_mac, first_mac); let ethernet = ethernet_frame(second_mac, first_mac);
let datagram = encode_datagram( let datagram = encode_datagram(
FrameType::Ethernet, FrameType::Ethernet,
@@ -1060,6 +1171,37 @@ mod tests {
decode_control_frame(&frame).unwrap() decode_control_frame(&frame).unwrap()
} }
async fn send_peer_control_event(connection: &quinn::Connection, message: ControlMessage) {
let mut send = connection.open_uni().await.unwrap();
let frame = encode_control_message(&message).unwrap();
send.write_all(&frame).await.unwrap();
send.finish().unwrap();
}
async fn wait_for_peer_stats(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode,
peer_id: u32,
expected: &TunnelStats,
) {
tokio::time::timeout(Duration::from_secs(5), async {
loop {
let latest_stats = sessions
.lock()
.await
.get(&PeerKey::new(room.clone(), peer_id))
.and_then(|session| session.latest_stats.clone());
if latest_stats.as_ref() == Some(expected) {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
}
async fn welcome_for_client(connection: &quinn::Connection, mac: MacAddr) -> ServerWelcome { async fn welcome_for_client(connection: &quinn::Connection, mac: MacAddr) -> ServerWelcome {
let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap(); let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap();
let response = request_control_message(connection, ControlMessage::Hello(hello)) let response = request_control_message(connection, ControlMessage::Hello(hello))