feat(gateway): consume lifecycle events

The relay now sends room lifecycle events to gateways, but the gateway was only
learning remote MACs after seeing relay traffic. That delayed CAM refresh for a
silent remote client and left PeerLeft unable to retire stale refresh entries.

Add a gateway control-event receive path and consume it inside the Linux bridge
loop. Client PeerJoined events seed the CAM refresh table by peer id and MAC,
and PeerLeft removes that peer. Relay traffic can still refresh or correct the
same table from observed source MACs.

The bridge loop selects on accepting a control stream, then reads the selected
stream inside the branch. That avoids dropping an already accepted control
stream if another select branch wins while the stream body is still pending.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-gateway \
  connects_to_relay_control_stream_as_gateway -- --nocapture
- cargo test -p lanparty-gateway updates_cam_refresh_from_lifecycle_events \
  -- --nocapture
- cargo test -p lanparty-gateway
- cargo clippy -p lanparty-gateway --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 21:00:43 +02:00
parent e65831686c
commit 66d6601d21
2 changed files with 141 additions and 15 deletions
+3 -1
View File
@@ -138,7 +138,9 @@ MACs seen from relay traffic and periodically emits small CAM refresh frames so
the physical switch keeps those MACs associated with the gateway port. Gateway
frame logs include direction, peer id when present, MACs, ethertype/length,
frame length, action, and drop reason. The gateway also tracks frame/datagram
counters and periodically sends stats snapshots to the relay.
counters and periodically sends stats snapshots to the relay. Relay lifecycle
events seed and retire remote-client MACs for CAM refresh even before that
client sends traffic.
## Windows Client
+138 -14
View File
@@ -7,7 +7,7 @@
mod packet;
#[cfg(target_os = "linux")]
use std::{collections::BTreeSet, time::Duration};
use std::{collections::BTreeMap, time::Duration};
use std::{
fs,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
@@ -22,8 +22,8 @@ use anyhow::{Context, Result, bail};
use bytes::Bytes;
use clap::Parser;
use lanparty_ctrl::{
CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN,
RoomCode, ServerWelcome, decode_control_frame, encode_control_message,
CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, PeerInfo,
RELAY_ALPN, Role, RoomCode, ServerWelcome, decode_control_frame, encode_control_message,
};
use lanparty_obs::TunnelStats;
#[cfg(target_os = "linux")]
@@ -227,6 +227,10 @@ impl GatewayConnection {
recv_gateway_ethernet(&self.connection, &self.welcome, &self.stats).await
}
pub async fn recv_control_event(&self) -> Result<ControlMessage> {
recv_gateway_control_event(&self.connection).await
}
#[must_use]
pub fn stats_snapshot(&self) -> TunnelStats {
self.stats.snapshot()
@@ -284,7 +288,8 @@ impl GatewayConnection {
}
relay_frame = recv_gateway_ethernet(&connection, &welcome, &stats) => {
let relay_frame = relay_frame?;
cam_refresh.observe_remote_frame(relay_frame.payload())?;
cam_refresh
.observe_remote_frame(relay_frame.source_peer_id(), relay_frame.payload())?;
write_lan_ethernet(&packet_socket, relay_frame.payload()).await?;
println!(
"{}",
@@ -308,6 +313,13 @@ impl GatewayConnection {
eprintln!("failed to send gateway stats to relay: {error:#}");
}
}
control_stream = connection.accept_uni() => {
let control_stream = control_stream
.context("failed to accept gateway control event stream")?;
let control_event = read_gateway_control_event(control_stream).await?;
cam_refresh.observe_control_event(&control_event);
println!("{}", format_gateway_control_event(&control_event));
}
}
}
}
@@ -438,6 +450,53 @@ impl GatewayTunnelStats {
}
}
async fn recv_gateway_control_event(connection: &quinn::Connection) -> Result<ControlMessage> {
let recv = connection
.accept_uni()
.await
.context("failed to accept gateway control event stream")?;
read_gateway_control_event(recv).await
}
async fn read_gateway_control_event(mut recv: quinn::RecvStream) -> Result<ControlMessage> {
let frame = recv
.read_to_end(MAX_CONTROL_FRAME_LEN)
.await
.context("failed to read gateway control event")?;
decode_control_frame(&frame).context("failed to decode gateway control event")
}
fn format_gateway_control_event(event: &ControlMessage) -> String {
match event {
ControlMessage::PeerJoined(peer) if peer.role() == Role::Client => {
let mac = peer
.mac()
.map(|mac| mac.to_string())
.unwrap_or_else(|| "unknown".to_string());
format!(
"gateway control event: client peer {} joined with MAC {}",
peer.peer_id(),
mac
)
}
ControlMessage::PeerJoined(peer) if peer.role() == Role::Gateway => {
format!(
"gateway control event: LAN gateway peer {} joined",
peer.peer_id()
)
}
ControlMessage::PeerJoined(peer) => {
format!("gateway control event: peer {} joined", peer.peer_id())
}
ControlMessage::PeerLeft { peer_id, reason } => {
format!("gateway control event: peer {peer_id} left ({reason:?})")
}
_ => format!("gateway control event: {event:?}"),
}
}
#[cfg(target_os = "linux")]
fn gateway_frame_log_line(
interface: &str,
@@ -528,7 +587,7 @@ async fn write_lan_ethernet(packet_socket: &AsyncFd<PacketSocket>, frame: &[u8])
#[derive(Debug, Clone)]
struct CamRefresh {
gateway_mac: MacAddr,
remote_macs: BTreeSet<MacAddr>,
remote_clients: BTreeMap<u32, MacAddr>,
}
#[cfg(target_os = "linux")]
@@ -536,30 +595,50 @@ impl CamRefresh {
fn new(gateway_mac: MacAddr) -> Self {
Self {
gateway_mac,
remote_macs: BTreeSet::new(),
remote_clients: BTreeMap::new(),
}
}
fn observe_remote_frame(&mut self, frame: &[u8]) -> Result<()> {
fn observe_remote_frame(&mut self, peer_id: u32, frame: &[u8]) -> Result<()> {
let frame = EthernetFrame::parse(frame).context("relay Ethernet frame is malformed")?;
let source = frame.source();
if source.is_valid_client_identity() {
self.remote_macs.insert(source);
self.remote_clients.insert(peer_id, source);
}
Ok(())
}
fn observe_control_event(&mut self, event: &ControlMessage) {
match event {
ControlMessage::PeerJoined(peer) => self.observe_peer_joined(peer),
ControlMessage::PeerLeft { peer_id, .. } => self.observe_peer_left(*peer_id),
_ => {}
}
}
fn observe_peer_joined(&mut self, peer: &PeerInfo) {
if peer.role() == Role::Client
&& let Some(mac) = peer.mac()
{
self.remote_clients.insert(peer.peer_id(), mac);
}
}
fn observe_peer_left(&mut self, peer_id: u32) {
self.remote_clients.remove(&peer_id);
}
fn refresh_frames(&self) -> Vec<Vec<u8>> {
self.remote_macs
.iter()
self.remote_clients
.values()
.map(|source| cam_refresh_frame(*source, self.gateway_mac))
.collect()
}
#[cfg(test)]
fn remote_mac_count(&self) -> usize {
self.remote_macs.len()
self.remote_clients.len()
}
}
@@ -653,7 +732,7 @@ mod tests {
use std::time::Duration;
use bytes::Bytes;
use lanparty_ctrl::Role;
use lanparty_ctrl::DisconnectReason;
use quinn::{ServerConfig, TransportConfig, crypto::rustls::QuicServerConfig};
use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer};
@@ -736,6 +815,14 @@ mod tests {
send.write_all(&response).await.unwrap();
send.finish().unwrap();
let joined = encode_control_message(&ControlMessage::PeerJoined(
PeerInfo::new(99, Role::Client, Some(MacAddr::new([0x02, 0, 0, 0, 0, 9]))).unwrap(),
))
.unwrap();
let mut event_send = connection.open_uni().await.unwrap();
event_send.write_all(&joined).await.unwrap();
event_send.finish().unwrap();
let datagram = connection.read_datagram().await.unwrap();
let packet = decode_datagram(&datagram).unwrap();
let header = packet.header();
@@ -783,6 +870,17 @@ mod tests {
assert_eq!(gateway.welcome().room_id(), 7);
assert_eq!(gateway.welcome().peer_id(), 1);
let event = tokio::time::timeout(Duration::from_secs(5), gateway.recv_control_event())
.await
.unwrap()
.unwrap();
let ControlMessage::PeerJoined(peer) = event else {
panic!("expected gateway lifecycle event");
};
assert_eq!(peer.peer_id(), 99);
assert_eq!(peer.role(), Role::Client);
assert_eq!(peer.mac(), Some(MacAddr::new([0x02, 0, 0, 0, 0, 9])));
gateway.send_ethernet(&ethernet_frame(b"to relay")).unwrap();
let received = tokio::time::timeout(Duration::from_secs(5), gateway.recv_ethernet())
.await
@@ -832,10 +930,10 @@ mod tests {
let mut refresh = CamRefresh::new(gateway_mac);
refresh
.observe_remote_frame(&ethernet_frame_from(remote_mac, b"remote"))
.observe_remote_frame(7, &ethernet_frame_from(remote_mac, b"remote"))
.unwrap();
refresh
.observe_remote_frame(&ethernet_frame_from(invalid_remote_mac, b"ignored"))
.observe_remote_frame(8, &ethernet_frame_from(invalid_remote_mac, b"ignored"))
.unwrap();
let frames = refresh.refresh_frames();
@@ -847,6 +945,32 @@ mod tests {
assert_eq!(refresh_frame.destination(), gateway_mac);
}
#[cfg(target_os = "linux")]
#[test]
fn updates_cam_refresh_from_lifecycle_events() {
let gateway_mac = MacAddr::new([0x0a, 0, 0, 0, 0, 1]);
let remote_mac = MacAddr::new([0x02, 0, 0, 0, 0, 2]);
let mut refresh = CamRefresh::new(gateway_mac);
refresh.observe_control_event(&ControlMessage::PeerJoined(
PeerInfo::new(7, Role::Client, Some(remote_mac)).unwrap(),
));
assert_eq!(refresh.remote_mac_count(), 1);
assert_eq!(
EthernetFrame::parse(&refresh.refresh_frames()[0])
.unwrap()
.source(),
remote_mac
);
refresh.observe_control_event(&ControlMessage::PeerLeft {
peer_id: 7,
reason: DisconnectReason::Normal,
});
assert_eq!(refresh.remote_mac_count(), 0);
assert!(refresh.refresh_frames().is_empty());
}
#[cfg(target_os = "linux")]
#[test]
fn formats_gateway_frame_log_lines() {