feat(relay): notify peers when room members leave

The relay now sends a reliable PeerLeft control event to remaining room peers
after removing the departed peer from session and room state. Normal connection
closure is reported as Normal, while malformed-datagram disconnects are reported
as ProtocolError.

This completes the first relay-side lifecycle event pair. Delivery remains
best-effort and client-side event consumption is intentionally left for a
separate slice.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-relay forwards_ethernet_datagrams_between_joined_peers \
  -- --nocapture
- cargo test -p lanparty-relay
- cargo clippy -p lanparty-relay --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 20:32:03 +02:00
parent 7f4b22d5f4
commit a24341c361
2 changed files with 65 additions and 13 deletions
+3 -3
View File
@@ -84,7 +84,7 @@ Public relay binary and relay-owned room state:
- one gateway per room, duplicate client MAC rejection, and room limits - one gateway per room, duplicate client MAC rejection, and room limits
- stable effective room MTU chosen before Ethernet datagrams flow - stable effective room MTU chosen before Ethernet datagrams flow
- live Ethernet datagram forwarding with no ingress reflection - live Ethernet datagram forwarding with no ingress reflection
- reliable `PeerJoined` notifications to existing room peers - reliable `PeerJoined`/`PeerLeft` notifications to existing room peers
- L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames - L2 safety filters for jumbo, switch-control, DHCP-server, and IPv6-RA frames
- client broadcast/multicast, unknown-unicast, and total bandwidth limiting - client broadcast/multicast, unknown-unicast, and total bandwidth limiting
- malformed peer datagram disconnect threshold - malformed peer datagram disconnect threshold
@@ -112,8 +112,8 @@ certificate handling remains future work. Ethernet forwarding decisions are
logged with room, peer, MAC, ethertype, action, drop reason, and target count. logged with room, peer, MAC, ethertype, action, drop reason, and target count.
Unknown unicast from a client is forwarded only to the gateway port; unknown Unknown unicast from a client is forwarded only to the gateway port; unknown
unicast from the gateway is dropped instead of flooded to every remote client. unicast from the gateway is dropped instead of flooded to every remote client.
When a peer joins, the relay sends a reliable `PeerJoined` control event to When a peer joins or leaves, the relay sends a reliable lifecycle control event
peers that were already present in the room. to peers that are still present in the room.
## Gateway ## Gateway
+62 -10
View File
@@ -3,7 +3,7 @@ use std::{fs, net::SocketAddr, path::Path, sync::Arc};
use anyhow::{Context, Result, anyhow}; use anyhow::{Context, Result, anyhow};
use bytes::Bytes; use bytes::Bytes;
use lanparty_ctrl::{ use lanparty_ctrl::{
CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, EndpointHello, CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, DisconnectReason, EndpointHello,
MAX_CONTROL_MESSAGE_LEN, PeerInfo, RELAY_ALPN, Reject, RejectReason, Role, RoomCode, MAX_CONTROL_MESSAGE_LEN, PeerInfo, RELAY_ALPN, Reject, RejectReason, Role, RoomCode,
ServerWelcome, decode_control_frame, encode_control_message, ServerWelcome, decode_control_frame, encode_control_message,
}; };
@@ -65,6 +65,28 @@ enum PeerDatagramOutcome {
Malformed, Malformed,
} }
#[derive(Debug, Clone, PartialEq, Eq)]
struct PeerClose {
reason: DisconnectReason,
message: String,
}
impl PeerClose {
fn normal(message: impl Into<String>) -> Self {
Self {
reason: DisconnectReason::Normal,
message: message.into(),
}
}
fn protocol_error(message: impl Into<String>) -> Self {
Self {
reason: DisconnectReason::ProtocolError,
message: message.into(),
}
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct MalformedDatagramTracker { struct MalformedDatagramTracker {
count: usize, count: usize,
@@ -203,13 +225,20 @@ async fn handle_incoming_connection(
accepted.welcome.effective_tap_mtu() accepted.welcome.effective_tap_mtu()
); );
let close_reason = run_peer_datagrams(&rooms, &sessions, &accepted, &connection).await; let close = run_peer_datagrams(&rooms, &sessions, &accepted, &connection).await;
leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?; let leave = leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?;
notify_peer_left(
&sessions,
&accepted.room,
leave.peer().peer_id(),
close.reason.clone(),
)
.await;
println!( println!(
"peer {} left room {}: {}", "peer {} left room {}: {}",
accepted.peer.peer_id(), accepted.peer.peer_id(),
accepted.room, accepted.room,
close_reason close.message
); );
Ok(Some(accepted)) Ok(Some(accepted))
@@ -268,7 +297,7 @@ async fn run_peer_datagrams(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>, sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer, accepted: &AcceptedPeer,
connection: &quinn::Connection, connection: &quinn::Connection,
) -> String { ) -> PeerClose {
let mut malformed_tracker = MalformedDatagramTracker::default(); let mut malformed_tracker = MalformedDatagramTracker::default();
loop { loop {
@@ -279,7 +308,7 @@ async fn run_peer_datagrams(
Ok(PeerDatagramOutcome::Malformed) => { Ok(PeerDatagramOutcome::Malformed) => {
if let Some(reason) = malformed_tracker.record_malformed() { if let Some(reason) = malformed_tracker.record_malformed() {
connection.close(0_u32.into(), reason.as_bytes()); connection.close(0_u32.into(), reason.as_bytes());
return reason; return PeerClose::protocol_error(reason);
} }
} }
Err(error) => { Err(error) => {
@@ -291,7 +320,7 @@ async fn run_peer_datagrams(
} }
} }
} }
Err(error) => return error.to_string(), Err(error) => return PeerClose::normal(error.to_string()),
} }
} }
} }
@@ -450,6 +479,22 @@ async fn notify_peer_joined(
} }
} }
async fn notify_peer_left(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode,
peer_id: u32,
reason: DisconnectReason,
) {
let target_sessions = collect_room_sessions_except(sessions, room, peer_id).await;
let message = ControlMessage::PeerLeft { peer_id, reason };
for target in target_sessions {
if let Err(error) = send_control_event(&target.connection, &message).await {
eprintln!("failed to notify room {room} about peer {peer_id} leaving: {error:#}");
}
}
}
async fn collect_room_sessions_except( async fn collect_room_sessions_except(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>, sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode, room: &RoomCode,
@@ -579,18 +624,18 @@ async fn leave_peer(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>, sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode, room: &RoomCode,
peer_id: u32, peer_id: u32,
) -> Result<()> { ) -> Result<crate::LeaveResult> {
sessions sessions
.lock() .lock()
.await .await
.remove(&PeerKey::new(room.clone(), peer_id)); .remove(&PeerKey::new(room.clone(), peer_id));
rooms let leave = rooms
.lock() .lock()
.await .await
.leave(room, peer_id) .leave(room, peer_id)
.with_context(|| format!("failed to remove peer {peer_id} from room {room}"))?; .with_context(|| format!("failed to remove peer {peer_id} from room {room}"))?;
Ok(()) Ok(leave)
} }
fn write_development_certificate(path: &Path, certificate: &CertificateDer<'_>) -> Result<()> { fn write_development_certificate(path: &Path, certificate: &CertificateDer<'_>) -> Result<()> {
@@ -883,6 +928,13 @@ mod tests {
assert_eq!(first_welcome.room_id(), second_welcome.room_id()); assert_eq!(first_welcome.room_id(), second_welcome.room_id());
first_connection.close(0_u32.into(), b"test complete"); first_connection.close(0_u32.into(), b"test complete");
let event = read_control_event(&second_connection).await;
let ControlMessage::PeerLeft { peer_id, reason } = event else {
panic!("expected peer left event");
};
assert_eq!(peer_id, first_welcome.peer_id());
assert_eq!(reason, DisconnectReason::Normal);
second_connection.close(0_u32.into(), b"test complete"); second_connection.close(0_u32.into(), b"test complete");
first_endpoint.wait_idle().await; first_endpoint.wait_idle().await;
second_endpoint.wait_idle().await; second_endpoint.wait_idle().await;