diff --git a/README.md b/README.md index c44a505..ba8344a 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ Public relay binary and relay-owned room state: - stable effective room MTU chosen before Ethernet datagrams flow - live Ethernet datagram forwarding with no ingress reflection - L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames -- client broadcast/multicast and unknown-unicast burst limiting +- client broadcast/multicast, unknown-unicast, and total bandwidth limiting - malformed peer datagram disconnect threshold - peer leave cleanup for room membership and MAC indexes diff --git a/crates/lanparty-relay/src/lib.rs b/crates/lanparty-relay/src/lib.rs index 9dacc54..069a296 100644 --- a/crates/lanparty-relay/src/lib.rs +++ b/crates/lanparty-relay/src/lib.rs @@ -20,10 +20,13 @@ pub use config::{ConfigError, DEFAULT_RELAY_PORT, ListenEndpoint, RelayArgs, Rel pub use server::RelayServer; pub const DEFAULT_MAX_CLIENTS_PER_ROOM: usize = 16; +const MEBIBYTE: u64 = 1024 * 1024; const CLIENT_MULTICAST_BURST_FRAMES: u32 = 64; const CLIENT_MULTICAST_REFILL_FRAMES_PER_SECOND: u32 = 32; const CLIENT_UNKNOWN_UNICAST_BURST_FRAMES: u32 = 64; const CLIENT_UNKNOWN_UNICAST_REFILL_FRAMES_PER_SECOND: u32 = 32; +const CLIENT_TOTAL_BANDWIDTH_BURST_BYTES: u64 = 4 * MEBIBYTE; +const CLIENT_TOTAL_BANDWIDTH_REFILL_BYTES_PER_SECOND: u64 = 2 * MEBIBYTE; const ETHERTYPE_IPV4: u16 = 0x0800; const ETHERTYPE_EAPOL: u16 = 0x888e; const ETHERTYPE_SLOW_PROTOCOLS: u16 = 0x8809; @@ -287,8 +290,9 @@ struct Room { gateway: Option, clients: HashMap, clients_by_mac: HashMap, - client_multicast_limits: HashMap, - client_unknown_unicast_limits: HashMap, + client_multicast_limits: HashMap, + client_unknown_unicast_limits: HashMap, + client_total_bandwidth_limits: HashMap, } impl Room { @@ -303,6 +307,7 @@ impl Room { clients_by_mac: HashMap::new(), client_multicast_limits: HashMap::new(), client_unknown_unicast_limits: HashMap::new(), + client_total_bandwidth_limits: HashMap::new(), } } @@ -342,6 +347,8 @@ impl Room { .insert(peer.peer_id(), client_multicast_limit()); self.client_unknown_unicast_limits .insert(peer.peer_id(), client_unknown_unicast_limit()); + self.client_total_bandwidth_limits + .insert(peer.peer_id(), client_total_bandwidth_limit()); } } @@ -431,6 +438,7 @@ impl Room { self.clients_by_mac.remove(&mac); self.client_multicast_limits.remove(&peer.peer_id()); self.client_unknown_unicast_limits.remove(&peer.peer_id()); + self.client_total_bandwidth_limits.remove(&peer.peer_id()); Ok(peer) } @@ -471,6 +479,12 @@ impl Room { return Ok(ForwardingDecision::dropped(drop_reason)); } + if ingress_role == Role::Client + && !self.allow_client_total_bandwidth(ingress_peer_id, frame.len() as u64, now) + { + return Ok(ForwardingDecision::rate_limited()); + } + if ingress_role == Role::Client && frame.destination().is_multicast() && !self.allow_client_multicast(ingress_peer_id, now) @@ -507,14 +521,21 @@ impl Room { self.client_multicast_limits .entry(peer_id) .or_insert_with(client_multicast_limit) - .allow(now) + .allow(1, now) } fn allow_client_unknown_unicast(&mut self, peer_id: u32, now: Instant) -> bool { self.client_unknown_unicast_limits .entry(peer_id) .or_insert_with(client_unknown_unicast_limit) - .allow(now) + .allow(1, now) + } + + fn allow_client_total_bandwidth(&mut self, peer_id: u32, bytes: u64, now: Instant) -> bool { + self.client_total_bandwidth_limits + .entry(peer_id) + .or_insert_with(client_total_bandwidth_limit) + .allow(bytes, now) } fn peer(&self, peer_id: u32) -> Option<&PeerInfo> { @@ -546,15 +567,15 @@ impl Room { } #[derive(Debug, Clone)] -struct FrameRateLimit { - tokens: u32, - capacity: u32, - refill_per_second: u32, +struct TokenBucket { + tokens: u64, + capacity: u64, + refill_per_second: u64, last_refill: Instant, } -impl FrameRateLimit { - fn new(capacity: u32, refill_per_second: u32) -> Self { +impl TokenBucket { + fn new(capacity: u64, refill_per_second: u64) -> Self { assert!(capacity > 0, "rate-limit capacity must be nonzero"); assert!( refill_per_second > 0, @@ -569,12 +590,12 @@ impl FrameRateLimit { } } - fn allow(&mut self, now: Instant) -> bool { + fn allow(&mut self, cost: u64, now: Instant) -> bool { self.refill(now); - if self.tokens == 0 { + if self.tokens < cost { false } else { - self.tokens -= 1; + self.tokens -= cost; true } } @@ -585,24 +606,30 @@ impl FrameRateLimit { return; } - let refill = elapsed_secs.saturating_mul(u64::from(self.refill_per_second)); - let refill = refill.min(u64::from(u32::MAX)) as u32; + let refill = elapsed_secs.saturating_mul(self.refill_per_second); self.tokens = self.tokens.saturating_add(refill).min(self.capacity); self.last_refill = now; } } -fn client_multicast_limit() -> FrameRateLimit { - FrameRateLimit::new( - CLIENT_MULTICAST_BURST_FRAMES, - CLIENT_MULTICAST_REFILL_FRAMES_PER_SECOND, +fn client_multicast_limit() -> TokenBucket { + TokenBucket::new( + u64::from(CLIENT_MULTICAST_BURST_FRAMES), + u64::from(CLIENT_MULTICAST_REFILL_FRAMES_PER_SECOND), ) } -fn client_unknown_unicast_limit() -> FrameRateLimit { - FrameRateLimit::new( - CLIENT_UNKNOWN_UNICAST_BURST_FRAMES, - CLIENT_UNKNOWN_UNICAST_REFILL_FRAMES_PER_SECOND, +fn client_unknown_unicast_limit() -> TokenBucket { + TokenBucket::new( + u64::from(CLIENT_UNKNOWN_UNICAST_BURST_FRAMES), + u64::from(CLIENT_UNKNOWN_UNICAST_REFILL_FRAMES_PER_SECOND), + ) +} + +fn client_total_bandwidth_limit() -> TokenBucket { + TokenBucket::new( + CLIENT_TOTAL_BANDWIDTH_BURST_BYTES, + CLIENT_TOTAL_BANDWIDTH_REFILL_BYTES_PER_SECOND, ) } @@ -1033,6 +1060,54 @@ mod tests { assert_eq!(decision.action(), FrameAction::Forwarded); } + #[test] + fn rate_limits_client_total_bandwidth_after_burst() { + let mut registry = RoomRegistry::default(); + let client_one = registry.join(client_hello(1)).unwrap(); + let client_two = registry.join(client_hello(2)).unwrap(); + let payload = vec![0; MAX_STANDARD_ETHERNET_FRAME_LEN - ETHERNET_HEADER_LEN]; + let frame = ethernet_with_payload(mac(2), mac(1), ETHERTYPE_IPV4, &payload); + let frame_len = frame.len() as u64; + let burst_frames = CLIENT_TOTAL_BANDWIDTH_BURST_BYTES / frame_len; + let now = Instant::now(); + + assert!(burst_frames > 0); + + for _ in 0..burst_frames { + let decision = registry + .forward_ethernet_at(&room(), client_one.peer().peer_id(), &frame, now) + .unwrap(); + assert_eq!(decision.action(), FrameAction::Forwarded); + assert_eq!(decision.targets(), &[client_two.peer().peer_id()]); + } + + let decision = registry + .forward_ethernet_at(&room(), client_one.peer().peer_id(), &frame, now) + .unwrap(); + assert_rate_limited(&decision); + + let decision = registry + .forward_ethernet_at( + &room(), + client_two.peer().peer_id(), + ðernet(mac(1), mac(2)), + now, + ) + .unwrap(); + assert_eq!(decision.action(), FrameAction::Forwarded); + assert_eq!(decision.targets(), &[client_one.peer().peer_id()]); + + let decision = registry + .forward_ethernet_at( + &room(), + client_one.peer().peer_id(), + &frame, + now + Duration::from_secs(1), + ) + .unwrap(); + assert_eq!(decision.action(), FrameAction::Forwarded); + } + #[test] fn drops_client_frames_with_forged_source_mac() { let mut registry = RoomRegistry::default();