diff --git a/README.md b/README.md index c5bbfc7..200cc3e 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,9 @@ MACs seen from relay traffic and periodically emits small CAM refresh frames so the physical switch keeps those MACs associated with the gateway port. Gateway frame logs include direction, peer id when present, MACs, ethertype/length, frame length, action, and drop reason. The gateway also tracks frame/datagram -counters and periodically sends stats snapshots to the relay. +counters and periodically sends stats snapshots to the relay. Relay lifecycle +events seed and retire remote-client MACs for CAM refresh even before that +client sends traffic. ## Windows Client diff --git a/crates/lanparty-gateway/src/lib.rs b/crates/lanparty-gateway/src/lib.rs index 93f4309..fcf4cbd 100644 --- a/crates/lanparty-gateway/src/lib.rs +++ b/crates/lanparty-gateway/src/lib.rs @@ -7,7 +7,7 @@ mod packet; #[cfg(target_os = "linux")] -use std::{collections::BTreeSet, time::Duration}; +use std::{collections::BTreeMap, time::Duration}; use std::{ fs, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, @@ -22,8 +22,8 @@ use anyhow::{Context, Result, bail}; use bytes::Bytes; use clap::Parser; use lanparty_ctrl::{ - CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN, - RoomCode, ServerWelcome, decode_control_frame, encode_control_message, + CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, PeerInfo, + RELAY_ALPN, Role, RoomCode, ServerWelcome, decode_control_frame, encode_control_message, }; use lanparty_obs::TunnelStats; #[cfg(target_os = "linux")] @@ -227,6 +227,10 @@ impl GatewayConnection { recv_gateway_ethernet(&self.connection, &self.welcome, &self.stats).await } + pub async fn recv_control_event(&self) -> Result { + recv_gateway_control_event(&self.connection).await + } + #[must_use] pub fn stats_snapshot(&self) -> TunnelStats { self.stats.snapshot() @@ -284,7 +288,8 @@ impl GatewayConnection { } relay_frame = recv_gateway_ethernet(&connection, &welcome, &stats) => { let relay_frame = relay_frame?; - cam_refresh.observe_remote_frame(relay_frame.payload())?; + cam_refresh + .observe_remote_frame(relay_frame.source_peer_id(), relay_frame.payload())?; write_lan_ethernet(&packet_socket, relay_frame.payload()).await?; println!( "{}", @@ -308,6 +313,13 @@ impl GatewayConnection { eprintln!("failed to send gateway stats to relay: {error:#}"); } } + control_stream = connection.accept_uni() => { + let control_stream = control_stream + .context("failed to accept gateway control event stream")?; + let control_event = read_gateway_control_event(control_stream).await?; + cam_refresh.observe_control_event(&control_event); + println!("{}", format_gateway_control_event(&control_event)); + } } } } @@ -438,6 +450,53 @@ impl GatewayTunnelStats { } } +async fn recv_gateway_control_event(connection: &quinn::Connection) -> Result { + let recv = connection + .accept_uni() + .await + .context("failed to accept gateway control event stream")?; + + read_gateway_control_event(recv).await +} + +async fn read_gateway_control_event(mut recv: quinn::RecvStream) -> Result { + let frame = recv + .read_to_end(MAX_CONTROL_FRAME_LEN) + .await + .context("failed to read gateway control event")?; + + decode_control_frame(&frame).context("failed to decode gateway control event") +} + +fn format_gateway_control_event(event: &ControlMessage) -> String { + match event { + ControlMessage::PeerJoined(peer) if peer.role() == Role::Client => { + let mac = peer + .mac() + .map(|mac| mac.to_string()) + .unwrap_or_else(|| "unknown".to_string()); + format!( + "gateway control event: client peer {} joined with MAC {}", + peer.peer_id(), + mac + ) + } + ControlMessage::PeerJoined(peer) if peer.role() == Role::Gateway => { + format!( + "gateway control event: LAN gateway peer {} joined", + peer.peer_id() + ) + } + ControlMessage::PeerJoined(peer) => { + format!("gateway control event: peer {} joined", peer.peer_id()) + } + ControlMessage::PeerLeft { peer_id, reason } => { + format!("gateway control event: peer {peer_id} left ({reason:?})") + } + _ => format!("gateway control event: {event:?}"), + } +} + #[cfg(target_os = "linux")] fn gateway_frame_log_line( interface: &str, @@ -528,7 +587,7 @@ async fn write_lan_ethernet(packet_socket: &AsyncFd, frame: &[u8]) #[derive(Debug, Clone)] struct CamRefresh { gateway_mac: MacAddr, - remote_macs: BTreeSet, + remote_clients: BTreeMap, } #[cfg(target_os = "linux")] @@ -536,30 +595,50 @@ impl CamRefresh { fn new(gateway_mac: MacAddr) -> Self { Self { gateway_mac, - remote_macs: BTreeSet::new(), + remote_clients: BTreeMap::new(), } } - fn observe_remote_frame(&mut self, frame: &[u8]) -> Result<()> { + fn observe_remote_frame(&mut self, peer_id: u32, frame: &[u8]) -> Result<()> { let frame = EthernetFrame::parse(frame).context("relay Ethernet frame is malformed")?; let source = frame.source(); if source.is_valid_client_identity() { - self.remote_macs.insert(source); + self.remote_clients.insert(peer_id, source); } Ok(()) } + fn observe_control_event(&mut self, event: &ControlMessage) { + match event { + ControlMessage::PeerJoined(peer) => self.observe_peer_joined(peer), + ControlMessage::PeerLeft { peer_id, .. } => self.observe_peer_left(*peer_id), + _ => {} + } + } + + fn observe_peer_joined(&mut self, peer: &PeerInfo) { + if peer.role() == Role::Client + && let Some(mac) = peer.mac() + { + self.remote_clients.insert(peer.peer_id(), mac); + } + } + + fn observe_peer_left(&mut self, peer_id: u32) { + self.remote_clients.remove(&peer_id); + } + fn refresh_frames(&self) -> Vec> { - self.remote_macs - .iter() + self.remote_clients + .values() .map(|source| cam_refresh_frame(*source, self.gateway_mac)) .collect() } #[cfg(test)] fn remote_mac_count(&self) -> usize { - self.remote_macs.len() + self.remote_clients.len() } } @@ -653,7 +732,7 @@ mod tests { use std::time::Duration; use bytes::Bytes; - use lanparty_ctrl::Role; + use lanparty_ctrl::DisconnectReason; use quinn::{ServerConfig, TransportConfig, crypto::rustls::QuicServerConfig}; use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer}; @@ -736,6 +815,14 @@ mod tests { send.write_all(&response).await.unwrap(); send.finish().unwrap(); + let joined = encode_control_message(&ControlMessage::PeerJoined( + PeerInfo::new(99, Role::Client, Some(MacAddr::new([0x02, 0, 0, 0, 0, 9]))).unwrap(), + )) + .unwrap(); + let mut event_send = connection.open_uni().await.unwrap(); + event_send.write_all(&joined).await.unwrap(); + event_send.finish().unwrap(); + let datagram = connection.read_datagram().await.unwrap(); let packet = decode_datagram(&datagram).unwrap(); let header = packet.header(); @@ -783,6 +870,17 @@ mod tests { assert_eq!(gateway.welcome().room_id(), 7); assert_eq!(gateway.welcome().peer_id(), 1); + let event = tokio::time::timeout(Duration::from_secs(5), gateway.recv_control_event()) + .await + .unwrap() + .unwrap(); + let ControlMessage::PeerJoined(peer) = event else { + panic!("expected gateway lifecycle event"); + }; + assert_eq!(peer.peer_id(), 99); + assert_eq!(peer.role(), Role::Client); + assert_eq!(peer.mac(), Some(MacAddr::new([0x02, 0, 0, 0, 0, 9]))); + gateway.send_ethernet(ðernet_frame(b"to relay")).unwrap(); let received = tokio::time::timeout(Duration::from_secs(5), gateway.recv_ethernet()) .await @@ -832,10 +930,10 @@ mod tests { let mut refresh = CamRefresh::new(gateway_mac); refresh - .observe_remote_frame(ðernet_frame_from(remote_mac, b"remote")) + .observe_remote_frame(7, ðernet_frame_from(remote_mac, b"remote")) .unwrap(); refresh - .observe_remote_frame(ðernet_frame_from(invalid_remote_mac, b"ignored")) + .observe_remote_frame(8, ðernet_frame_from(invalid_remote_mac, b"ignored")) .unwrap(); let frames = refresh.refresh_frames(); @@ -847,6 +945,32 @@ mod tests { assert_eq!(refresh_frame.destination(), gateway_mac); } + #[cfg(target_os = "linux")] + #[test] + fn updates_cam_refresh_from_lifecycle_events() { + let gateway_mac = MacAddr::new([0x0a, 0, 0, 0, 0, 1]); + let remote_mac = MacAddr::new([0x02, 0, 0, 0, 0, 2]); + let mut refresh = CamRefresh::new(gateway_mac); + + refresh.observe_control_event(&ControlMessage::PeerJoined( + PeerInfo::new(7, Role::Client, Some(remote_mac)).unwrap(), + )); + assert_eq!(refresh.remote_mac_count(), 1); + assert_eq!( + EthernetFrame::parse(&refresh.refresh_frames()[0]) + .unwrap() + .source(), + remote_mac + ); + + refresh.observe_control_event(&ControlMessage::PeerLeft { + peer_id: 7, + reason: DisconnectReason::Normal, + }); + assert_eq!(refresh.remote_mac_count(), 0); + assert!(refresh.refresh_frames().is_empty()); + } + #[cfg(target_os = "linux")] #[test] fn formats_gateway_frame_log_lines() {