feat(relay): disconnect peers after malformed datagrams

The relay now tracks malformed datagrams per accepted peer and closes the QUIC
connection after a small threshold. Malformed overlay bytes, datagrams with the
wrong room/peer/type header, and malformed Ethernet payloads all count toward
that threshold.

This implements the malformed-packet disconnect part of PLAN.md without mixing
in broader bandwidth or broadcast rate limiting. Ordinary safety-filter drops
still remain non-fatal; this only targets peers that repeatedly send packets the
relay cannot treat as valid tunnel Ethernet traffic.

The threshold state lives in the relay server loop, while the forwarding helper
returns a small outcome enum so malformed classification stays testable without
running a full QUIC server. The room registry remains responsible for Ethernet
policy decisions such as unauthorized source MACs, jumbo frames, and control
plane filters.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-relay
- cargo clippy -p lanparty-relay --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 19:43:08 +02:00
parent 4033b7c2d2
commit f29d0b755c
2 changed files with 121 additions and 15 deletions
+1
View File
@@ -80,6 +80,7 @@ Public relay binary and relay-owned room state:
- stable effective room MTU chosen before Ethernet datagrams flow
- live Ethernet datagram forwarding with no ingress reflection
- L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames
- malformed peer datagram disconnect threshold
- peer leave cleanup for room membership and MAC indexes
## Build
+120 -15
View File
@@ -7,7 +7,7 @@ use lanparty_ctrl::{
MAX_CONTROL_MESSAGE_LEN, PeerInfo, RELAY_ALPN, Reject, RejectReason, Role, RoomCode,
ServerWelcome, decode_control_frame, encode_control_message,
};
use lanparty_obs::{FrameDirection, FrameLog};
use lanparty_obs::{DropReason, FrameDirection, FrameLog};
use lanparty_proto::{EthernetFrame, FrameType, decode_datagram, encode_datagram};
use quinn::crypto::rustls::QuicServerConfig;
use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig};
@@ -19,6 +19,7 @@ use crate::{ForwardingDecision, RelayConfig, RoomRegistry};
const DATAGRAM_BUFFER_BYTES: usize = 4 * 1024 * 1024;
const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MESSAGE_LEN;
const MAX_MALFORMED_DATAGRAMS_PER_PEER: usize = 8;
#[derive(Debug)]
pub struct RelayServer {
@@ -58,6 +59,31 @@ struct PeerSession {
max_datagram_size: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PeerDatagramOutcome {
Accepted,
Malformed,
}
#[derive(Debug, Default)]
struct MalformedDatagramTracker {
count: usize,
}
impl MalformedDatagramTracker {
fn record_malformed(&mut self) -> Option<String> {
self.count += 1;
if self.count >= MAX_MALFORMED_DATAGRAMS_PER_PEER {
Some(format!(
"peer sent {} malformed datagrams; disconnecting",
self.count
))
} else {
None
}
}
}
impl RelayServer {
pub fn bind(config: &RelayConfig) -> Result<Self> {
let (server_config, certificate) = development_server_config_with_certificate()?;
@@ -238,20 +264,30 @@ async fn run_peer_datagrams(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer,
connection: &quinn::Connection,
) -> quinn::ConnectionError {
) -> String {
let mut malformed_tracker = MalformedDatagramTracker::default();
loop {
match connection.read_datagram().await {
Ok(datagram) => {
if let Err(error) = forward_peer_datagram(rooms, sessions, accepted, datagram).await
{
eprintln!(
"failed to forward datagram from peer {} in room {}: {error:#}",
accepted.peer.peer_id(),
accepted.room
);
match forward_peer_datagram(rooms, sessions, accepted, datagram).await {
Ok(PeerDatagramOutcome::Accepted) => {}
Ok(PeerDatagramOutcome::Malformed) => {
if let Some(reason) = malformed_tracker.record_malformed() {
connection.close(0_u32.into(), reason.as_bytes());
return reason;
}
}
Err(error) => {
eprintln!(
"failed to forward datagram from peer {} in room {}: {error:#}",
accepted.peer.peer_id(),
accepted.room
);
}
}
}
Err(error) => return error,
Err(error) => return error.to_string(),
}
}
}
@@ -261,9 +297,9 @@ async fn forward_peer_datagram(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer,
datagram: Bytes,
) -> Result<()> {
) -> Result<PeerDatagramOutcome> {
let Ok(packet) = decode_datagram(&datagram) else {
return Ok(());
return Ok(PeerDatagramOutcome::Malformed);
};
let header = packet.header();
@@ -271,7 +307,7 @@ async fn forward_peer_datagram(
|| header.room_id() != accepted.welcome.room_id()
|| header.peer_id() != accepted.peer.peer_id()
{
return Ok(());
return Ok(PeerDatagramOutcome::Malformed);
}
let decision = rooms.lock().await.forward_ethernet(
@@ -288,9 +324,13 @@ async fn forward_peer_datagram(
&decision
)
);
if decision.drop_reason() == Some(DropReason::Malformed) {
return Ok(PeerDatagramOutcome::Malformed);
}
let target_peer_ids = decision.targets().to_vec();
if target_peer_ids.is_empty() {
return Ok(());
return Ok(PeerDatagramOutcome::Accepted);
}
let outgoing = encode_datagram(
@@ -319,7 +359,7 @@ async fn forward_peer_datagram(
}
}
Ok(())
Ok(PeerDatagramOutcome::Accepted)
}
fn relay_frame_log_line(
@@ -675,6 +715,55 @@ mod tests {
assert!(line.contains("targets=2"));
}
#[test]
fn tracks_malformed_datagram_disconnect_threshold() {
let mut tracker = MalformedDatagramTracker::default();
for _ in 1..MAX_MALFORMED_DATAGRAMS_PER_PEER {
assert_eq!(tracker.record_malformed(), None);
}
let reason = tracker
.record_malformed()
.expect("threshold should disconnect peer");
assert!(reason.contains("malformed datagrams"));
}
#[tokio::test]
async fn classifies_bad_peer_datagrams_as_malformed() {
let rooms = Arc::new(Mutex::new(RoomRegistry::default()));
let sessions = Arc::new(Mutex::new(HashMap::new()));
let accepted = accepted_client_for_forwarding(&rooms, client_mac(1)).await;
let outcome = forward_peer_datagram(
&rooms,
&sessions,
&accepted,
Bytes::from_static(b"not an overlay datagram"),
)
.await
.unwrap();
assert_eq!(outcome, PeerDatagramOutcome::Malformed);
let malformed_ethernet = encode_datagram(
FrameType::Ethernet,
accepted.welcome.room_id(),
accepted.peer.peer_id(),
0,
&[0; 4],
)
.unwrap();
let outcome = forward_peer_datagram(
&rooms,
&sessions,
&accepted,
Bytes::from(malformed_ethernet),
)
.await
.unwrap();
assert_eq!(outcome, PeerDatagramOutcome::Malformed);
}
#[tokio::test]
async fn forwards_ethernet_datagrams_between_joined_peers() {
let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM);
@@ -814,6 +903,22 @@ mod tests {
welcome
}
async fn accepted_client_for_forwarding(
rooms: &Arc<Mutex<RoomRegistry>>,
mac: MacAddr,
) -> AcceptedPeer {
let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap();
let join = rooms.lock().await.join(hello).unwrap();
AcceptedPeer {
room: RoomCode::new("TESTROOM").unwrap(),
welcome: join.welcome().clone(),
peer: join.peer().clone(),
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 12345),
max_datagram_size: 1400,
}
}
fn client_mac(last: u8) -> MacAddr {
MacAddr::new([0x02, 0, 0, 0, 0, last])
}