diff --git a/README.md b/README.md index 8bcf462..16cccdf 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,7 @@ Public relay binary and relay-owned room state: - stable effective room MTU chosen before Ethernet datagrams flow - live Ethernet datagram forwarding with no ingress reflection - L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames +- client broadcast/multicast burst limiting - malformed peer datagram disconnect threshold - peer leave cleanup for room membership and MAC indexes diff --git a/crates/lanparty-relay/src/lib.rs b/crates/lanparty-relay/src/lib.rs index c829074..1e2bf39 100644 --- a/crates/lanparty-relay/src/lib.rs +++ b/crates/lanparty-relay/src/lib.rs @@ -7,7 +7,7 @@ mod config; mod server; -use std::collections::HashMap; +use std::{collections::HashMap, time::Instant}; use lanparty_ctrl::{ ControlError, EndpointHello, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome, @@ -20,6 +20,8 @@ pub use config::{ConfigError, DEFAULT_RELAY_PORT, ListenEndpoint, RelayArgs, Rel pub use server::RelayServer; pub const DEFAULT_MAX_CLIENTS_PER_ROOM: usize = 16; +const CLIENT_MULTICAST_BURST_FRAMES: u32 = 64; +const CLIENT_MULTICAST_REFILL_FRAMES_PER_SECOND: u32 = 32; const ETHERTYPE_IPV4: u16 = 0x0800; const ETHERTYPE_EAPOL: u16 = 0x888e; const ETHERTYPE_SLOW_PROTOCOLS: u16 = 0x8809; @@ -133,6 +135,15 @@ impl ForwardingDecision { } } + #[must_use] + fn rate_limited() -> Self { + Self { + targets: Vec::new(), + action: FrameAction::RateLimited, + drop_reason: Some(DropReason::RateLimit), + } + } + #[must_use] pub fn targets(&self) -> &[u32] { &self.targets @@ -233,15 +244,25 @@ impl RoomRegistry { } pub fn forward_ethernet( - &self, + &mut self, room: &RoomCode, ingress_peer_id: u32, frame_bytes: &[u8], + ) -> Result { + self.forward_ethernet_at(room, ingress_peer_id, frame_bytes, Instant::now()) + } + + fn forward_ethernet_at( + &mut self, + room: &RoomCode, + ingress_peer_id: u32, + frame_bytes: &[u8], + now: Instant, ) -> Result { self.rooms - .get(room) + .get_mut(room) .ok_or_else(|| ForwardingError::UnknownRoom(room.clone()))? - .forward_ethernet(room, ingress_peer_id, frame_bytes) + .forward_ethernet(room, ingress_peer_id, frame_bytes, now) } fn allocate_room_id(&mut self) -> Result { @@ -264,6 +285,7 @@ struct Room { gateway: Option, clients: HashMap, clients_by_mac: HashMap, + client_multicast_limits: HashMap, } impl Room { @@ -276,6 +298,7 @@ impl Room { gateway: None, clients: HashMap::new(), clients_by_mac: HashMap::new(), + client_multicast_limits: HashMap::new(), } } @@ -311,6 +334,8 @@ impl Room { let mac = peer.mac().expect("client peer info has MAC"); self.clients_by_mac.insert(mac, peer.peer_id()); self.clients.insert(peer.peer_id(), peer.clone()); + self.client_multicast_limits + .insert(peer.peer_id(), FrameRateLimit::new()); } } @@ -398,6 +423,7 @@ impl Room { })?; let mac = peer.mac().expect("client peer info has MAC"); self.clients_by_mac.remove(&mac); + self.client_multicast_limits.remove(&peer.peer_id()); Ok(peer) } @@ -407,10 +433,11 @@ impl Room { } fn forward_ethernet( - &self, + &mut self, room_code: &RoomCode, ingress_peer_id: u32, frame_bytes: &[u8], + now: Instant, ) -> Result { let ingress = self.peer(ingress_peer_id) @@ -418,12 +445,13 @@ impl Room { room: room_code.clone(), peer_id: ingress_peer_id, })?; + let ingress_role = ingress.role(); let frame = match EthernetFrame::parse(frame_bytes) { Ok(frame) => frame, Err(_) => return Ok(ForwardingDecision::dropped(DropReason::Malformed)), }; - if ingress.role() == Role::Client { + if ingress_role == Role::Client { let expected_source = ingress.mac().expect("client peers have MAC addresses"); if frame.source() != expected_source { return Ok(ForwardingDecision::dropped( @@ -432,10 +460,17 @@ impl Room { } } - if let Some(drop_reason) = safety_drop_reason(ingress.role(), frame) { + if let Some(drop_reason) = safety_drop_reason(ingress_role, frame) { return Ok(ForwardingDecision::dropped(drop_reason)); } + if ingress_role == Role::Client + && frame.destination().is_multicast() + && !self.allow_client_multicast(ingress_peer_id, now) + { + return Ok(ForwardingDecision::rate_limited()); + } + let targets = if frame.destination().is_multicast() { self.all_peer_ids_except(ingress_peer_id) } else if let Some(client_peer_id) = self.clients_by_mac.get(&frame.destination()) { @@ -455,6 +490,13 @@ impl Room { Ok(ForwardingDecision::forwarded(targets)) } + fn allow_client_multicast(&mut self, peer_id: u32, now: Instant) -> bool { + self.client_multicast_limits + .entry(peer_id) + .or_insert_with(FrameRateLimit::new) + .allow(now) + } + fn peer(&self, peer_id: u32) -> Option<&PeerInfo> { self.gateway .as_ref() @@ -483,6 +525,47 @@ impl Room { } } +#[derive(Debug, Clone)] +struct FrameRateLimit { + tokens: u32, + last_refill: Instant, +} + +impl FrameRateLimit { + fn new() -> Self { + Self { + tokens: CLIENT_MULTICAST_BURST_FRAMES, + last_refill: Instant::now(), + } + } + + fn allow(&mut self, now: Instant) -> bool { + self.refill(now); + if self.tokens == 0 { + false + } else { + self.tokens -= 1; + true + } + } + + fn refill(&mut self, now: Instant) { + let elapsed_secs = now.saturating_duration_since(self.last_refill).as_secs(); + if elapsed_secs == 0 { + return; + } + + let refill = + elapsed_secs.saturating_mul(u64::from(CLIENT_MULTICAST_REFILL_FRAMES_PER_SECOND)); + let refill = refill.min(u64::from(u32::MAX)) as u32; + self.tokens = self + .tokens + .saturating_add(refill) + .min(CLIENT_MULTICAST_BURST_FRAMES); + self.last_refill = now; + } +} + fn safety_drop_reason(ingress_role: Role, frame: EthernetFrame<'_>) -> Option { if frame.is_jumbo() { return Some(DropReason::JumboFrame); @@ -574,6 +657,8 @@ fn reject_control_error(error: ControlError) -> Reject { #[cfg(test)] mod tests { + use std::time::{Duration, Instant}; + use super::*; use lanparty_proto::MAX_STANDARD_ETHERNET_FRAME_LEN; @@ -638,6 +723,12 @@ mod tests { assert!(decision.targets().is_empty()); } + fn assert_rate_limited(decision: &ForwardingDecision) { + assert_eq!(decision.action(), FrameAction::RateLimited); + assert_eq!(decision.drop_reason(), Some(DropReason::RateLimit)); + assert!(decision.targets().is_empty()); + } + #[test] fn accepts_gateway_and_client_into_room() { let mut registry = RoomRegistry::default(); @@ -823,6 +914,42 @@ mod tests { ); } + #[test] + fn rate_limits_client_broadcast_after_burst() { + let mut registry = RoomRegistry::default(); + let gateway = registry.join(gateway_hello()).unwrap(); + let client_one = registry.join(client_hello(1)).unwrap(); + let client_two = registry.join(client_hello(2)).unwrap(); + let frame = ethernet(MacAddr::BROADCAST, mac(1)); + let now = Instant::now(); + + for _ in 0..CLIENT_MULTICAST_BURST_FRAMES { + let decision = registry + .forward_ethernet_at(&room(), client_one.peer().peer_id(), &frame, now) + .unwrap(); + assert_eq!(decision.action(), FrameAction::Forwarded); + assert_eq!( + decision.targets(), + &[gateway.peer().peer_id(), client_two.peer().peer_id()] + ); + } + + let decision = registry + .forward_ethernet_at(&room(), client_one.peer().peer_id(), &frame, now) + .unwrap(); + assert_rate_limited(&decision); + + let decision = registry + .forward_ethernet_at( + &room(), + client_one.peer().peer_id(), + &frame, + now + Duration::from_secs(1), + ) + .unwrap(); + assert_eq!(decision.action(), FrameAction::Forwarded); + } + #[test] fn drops_client_frames_with_forged_source_mac() { let mut registry = RoomRegistry::default();