feat(relay): rate limit client multicast floods

The relay now applies a small token bucket to broadcast and multicast frames
originating from remote clients. Those frames are necessary for ARP, DHCP, and
LAN discovery, but they are also the easiest way for one remote peer to flood
every participant and the LAN gateway. When a client exceeds the burst budget,
the relay returns a rate-limited forwarding decision instead of forwarding the
frame.

This is intentionally only the first rate-limit slice from PLAN.md. Unknown
unicast limits and total bandwidth limits remain separate follow-up work. The
limiter lives in room state because forwarding policy already knows the ingress
role, destination MAC, and room membership, and tests can drive it with explicit
Instants without involving QUIC timing.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-relay
- cargo clippy -p lanparty-relay --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 19:46:24 +02:00
parent f29d0b755c
commit c87033f74c
2 changed files with 135 additions and 7 deletions
+1
View File
@@ -80,6 +80,7 @@ Public relay binary and relay-owned room state:
- stable effective room MTU chosen before Ethernet datagrams flow - stable effective room MTU chosen before Ethernet datagrams flow
- live Ethernet datagram forwarding with no ingress reflection - live Ethernet datagram forwarding with no ingress reflection
- L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames - L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames
- client broadcast/multicast burst limiting
- malformed peer datagram disconnect threshold - malformed peer datagram disconnect threshold
- peer leave cleanup for room membership and MAC indexes - peer leave cleanup for room membership and MAC indexes
+134 -7
View File
@@ -7,7 +7,7 @@
mod config; mod config;
mod server; mod server;
use std::collections::HashMap; use std::{collections::HashMap, time::Instant};
use lanparty_ctrl::{ use lanparty_ctrl::{
ControlError, EndpointHello, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome, ControlError, EndpointHello, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome,
@@ -20,6 +20,8 @@ pub use config::{ConfigError, DEFAULT_RELAY_PORT, ListenEndpoint, RelayArgs, Rel
pub use server::RelayServer; pub use server::RelayServer;
pub const DEFAULT_MAX_CLIENTS_PER_ROOM: usize = 16; pub const DEFAULT_MAX_CLIENTS_PER_ROOM: usize = 16;
const CLIENT_MULTICAST_BURST_FRAMES: u32 = 64;
const CLIENT_MULTICAST_REFILL_FRAMES_PER_SECOND: u32 = 32;
const ETHERTYPE_IPV4: u16 = 0x0800; const ETHERTYPE_IPV4: u16 = 0x0800;
const ETHERTYPE_EAPOL: u16 = 0x888e; const ETHERTYPE_EAPOL: u16 = 0x888e;
const ETHERTYPE_SLOW_PROTOCOLS: u16 = 0x8809; const ETHERTYPE_SLOW_PROTOCOLS: u16 = 0x8809;
@@ -133,6 +135,15 @@ impl ForwardingDecision {
} }
} }
#[must_use]
fn rate_limited() -> Self {
Self {
targets: Vec::new(),
action: FrameAction::RateLimited,
drop_reason: Some(DropReason::RateLimit),
}
}
#[must_use] #[must_use]
pub fn targets(&self) -> &[u32] { pub fn targets(&self) -> &[u32] {
&self.targets &self.targets
@@ -233,15 +244,25 @@ impl RoomRegistry {
} }
pub fn forward_ethernet( pub fn forward_ethernet(
&self, &mut self,
room: &RoomCode, room: &RoomCode,
ingress_peer_id: u32, ingress_peer_id: u32,
frame_bytes: &[u8], frame_bytes: &[u8],
) -> Result<ForwardingDecision, ForwardingError> {
self.forward_ethernet_at(room, ingress_peer_id, frame_bytes, Instant::now())
}
fn forward_ethernet_at(
&mut self,
room: &RoomCode,
ingress_peer_id: u32,
frame_bytes: &[u8],
now: Instant,
) -> Result<ForwardingDecision, ForwardingError> { ) -> Result<ForwardingDecision, ForwardingError> {
self.rooms self.rooms
.get(room) .get_mut(room)
.ok_or_else(|| ForwardingError::UnknownRoom(room.clone()))? .ok_or_else(|| ForwardingError::UnknownRoom(room.clone()))?
.forward_ethernet(room, ingress_peer_id, frame_bytes) .forward_ethernet(room, ingress_peer_id, frame_bytes, now)
} }
fn allocate_room_id(&mut self) -> Result<u64, Reject> { fn allocate_room_id(&mut self) -> Result<u64, Reject> {
@@ -264,6 +285,7 @@ struct Room {
gateway: Option<PeerInfo>, gateway: Option<PeerInfo>,
clients: HashMap<u32, PeerInfo>, clients: HashMap<u32, PeerInfo>,
clients_by_mac: HashMap<MacAddr, u32>, clients_by_mac: HashMap<MacAddr, u32>,
client_multicast_limits: HashMap<u32, FrameRateLimit>,
} }
impl Room { impl Room {
@@ -276,6 +298,7 @@ impl Room {
gateway: None, gateway: None,
clients: HashMap::new(), clients: HashMap::new(),
clients_by_mac: HashMap::new(), clients_by_mac: HashMap::new(),
client_multicast_limits: HashMap::new(),
} }
} }
@@ -311,6 +334,8 @@ impl Room {
let mac = peer.mac().expect("client peer info has MAC"); let mac = peer.mac().expect("client peer info has MAC");
self.clients_by_mac.insert(mac, peer.peer_id()); self.clients_by_mac.insert(mac, peer.peer_id());
self.clients.insert(peer.peer_id(), peer.clone()); self.clients.insert(peer.peer_id(), peer.clone());
self.client_multicast_limits
.insert(peer.peer_id(), FrameRateLimit::new());
} }
} }
@@ -398,6 +423,7 @@ impl Room {
})?; })?;
let mac = peer.mac().expect("client peer info has MAC"); let mac = peer.mac().expect("client peer info has MAC");
self.clients_by_mac.remove(&mac); self.clients_by_mac.remove(&mac);
self.client_multicast_limits.remove(&peer.peer_id());
Ok(peer) Ok(peer)
} }
@@ -407,10 +433,11 @@ impl Room {
} }
fn forward_ethernet( fn forward_ethernet(
&self, &mut self,
room_code: &RoomCode, room_code: &RoomCode,
ingress_peer_id: u32, ingress_peer_id: u32,
frame_bytes: &[u8], frame_bytes: &[u8],
now: Instant,
) -> Result<ForwardingDecision, ForwardingError> { ) -> Result<ForwardingDecision, ForwardingError> {
let ingress = let ingress =
self.peer(ingress_peer_id) self.peer(ingress_peer_id)
@@ -418,12 +445,13 @@ impl Room {
room: room_code.clone(), room: room_code.clone(),
peer_id: ingress_peer_id, peer_id: ingress_peer_id,
})?; })?;
let ingress_role = ingress.role();
let frame = match EthernetFrame::parse(frame_bytes) { let frame = match EthernetFrame::parse(frame_bytes) {
Ok(frame) => frame, Ok(frame) => frame,
Err(_) => return Ok(ForwardingDecision::dropped(DropReason::Malformed)), Err(_) => return Ok(ForwardingDecision::dropped(DropReason::Malformed)),
}; };
if ingress.role() == Role::Client { if ingress_role == Role::Client {
let expected_source = ingress.mac().expect("client peers have MAC addresses"); let expected_source = ingress.mac().expect("client peers have MAC addresses");
if frame.source() != expected_source { if frame.source() != expected_source {
return Ok(ForwardingDecision::dropped( return Ok(ForwardingDecision::dropped(
@@ -432,10 +460,17 @@ impl Room {
} }
} }
if let Some(drop_reason) = safety_drop_reason(ingress.role(), frame) { if let Some(drop_reason) = safety_drop_reason(ingress_role, frame) {
return Ok(ForwardingDecision::dropped(drop_reason)); return Ok(ForwardingDecision::dropped(drop_reason));
} }
if ingress_role == Role::Client
&& frame.destination().is_multicast()
&& !self.allow_client_multicast(ingress_peer_id, now)
{
return Ok(ForwardingDecision::rate_limited());
}
let targets = if frame.destination().is_multicast() { let targets = if frame.destination().is_multicast() {
self.all_peer_ids_except(ingress_peer_id) self.all_peer_ids_except(ingress_peer_id)
} else if let Some(client_peer_id) = self.clients_by_mac.get(&frame.destination()) { } else if let Some(client_peer_id) = self.clients_by_mac.get(&frame.destination()) {
@@ -455,6 +490,13 @@ impl Room {
Ok(ForwardingDecision::forwarded(targets)) Ok(ForwardingDecision::forwarded(targets))
} }
fn allow_client_multicast(&mut self, peer_id: u32, now: Instant) -> bool {
self.client_multicast_limits
.entry(peer_id)
.or_insert_with(FrameRateLimit::new)
.allow(now)
}
fn peer(&self, peer_id: u32) -> Option<&PeerInfo> { fn peer(&self, peer_id: u32) -> Option<&PeerInfo> {
self.gateway self.gateway
.as_ref() .as_ref()
@@ -483,6 +525,47 @@ impl Room {
} }
} }
#[derive(Debug, Clone)]
struct FrameRateLimit {
tokens: u32,
last_refill: Instant,
}
impl FrameRateLimit {
fn new() -> Self {
Self {
tokens: CLIENT_MULTICAST_BURST_FRAMES,
last_refill: Instant::now(),
}
}
fn allow(&mut self, now: Instant) -> bool {
self.refill(now);
if self.tokens == 0 {
false
} else {
self.tokens -= 1;
true
}
}
fn refill(&mut self, now: Instant) {
let elapsed_secs = now.saturating_duration_since(self.last_refill).as_secs();
if elapsed_secs == 0 {
return;
}
let refill =
elapsed_secs.saturating_mul(u64::from(CLIENT_MULTICAST_REFILL_FRAMES_PER_SECOND));
let refill = refill.min(u64::from(u32::MAX)) as u32;
self.tokens = self
.tokens
.saturating_add(refill)
.min(CLIENT_MULTICAST_BURST_FRAMES);
self.last_refill = now;
}
}
fn safety_drop_reason(ingress_role: Role, frame: EthernetFrame<'_>) -> Option<DropReason> { fn safety_drop_reason(ingress_role: Role, frame: EthernetFrame<'_>) -> Option<DropReason> {
if frame.is_jumbo() { if frame.is_jumbo() {
return Some(DropReason::JumboFrame); return Some(DropReason::JumboFrame);
@@ -574,6 +657,8 @@ fn reject_control_error(error: ControlError) -> Reject {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::time::{Duration, Instant};
use super::*; use super::*;
use lanparty_proto::MAX_STANDARD_ETHERNET_FRAME_LEN; use lanparty_proto::MAX_STANDARD_ETHERNET_FRAME_LEN;
@@ -638,6 +723,12 @@ mod tests {
assert!(decision.targets().is_empty()); assert!(decision.targets().is_empty());
} }
fn assert_rate_limited(decision: &ForwardingDecision) {
assert_eq!(decision.action(), FrameAction::RateLimited);
assert_eq!(decision.drop_reason(), Some(DropReason::RateLimit));
assert!(decision.targets().is_empty());
}
#[test] #[test]
fn accepts_gateway_and_client_into_room() { fn accepts_gateway_and_client_into_room() {
let mut registry = RoomRegistry::default(); let mut registry = RoomRegistry::default();
@@ -823,6 +914,42 @@ mod tests {
); );
} }
#[test]
fn rate_limits_client_broadcast_after_burst() {
let mut registry = RoomRegistry::default();
let gateway = registry.join(gateway_hello()).unwrap();
let client_one = registry.join(client_hello(1)).unwrap();
let client_two = registry.join(client_hello(2)).unwrap();
let frame = ethernet(MacAddr::BROADCAST, mac(1));
let now = Instant::now();
for _ in 0..CLIENT_MULTICAST_BURST_FRAMES {
let decision = registry
.forward_ethernet_at(&room(), client_one.peer().peer_id(), &frame, now)
.unwrap();
assert_eq!(decision.action(), FrameAction::Forwarded);
assert_eq!(
decision.targets(),
&[gateway.peer().peer_id(), client_two.peer().peer_id()]
);
}
let decision = registry
.forward_ethernet_at(&room(), client_one.peer().peer_id(), &frame, now)
.unwrap();
assert_rate_limited(&decision);
let decision = registry
.forward_ethernet_at(
&room(),
client_one.peer().peer_id(),
&frame,
now + Duration::from_secs(1),
)
.unwrap();
assert_eq!(decision.action(), FrameAction::Forwarded);
}
#[test] #[test]
fn drops_client_frames_with_forged_source_mac() { fn drops_client_frames_with_forged_source_mac() {
let mut registry = RoomRegistry::default(); let mut registry = RoomRegistry::default();