feat(relay): send peer catch-up on join

The relay now sends PeerJoined catch-up events to a newly accepted peer for
peers that were already present in the room. This makes lifecycle delivery
symmetric enough for clients and gateways to learn the current room membership
after welcome, not only future joins.

The catch-up list is built from a cloned room snapshot before opening control
event streams, so room state is not locked across QUIC I/O. Delivery remains
best-effort and uses the same one-frame unidirectional control stream path as
live PeerJoined and PeerLeft notifications.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-relay \
  forwards_ethernet_datagrams_between_joined_peers -- --nocapture
- cargo test -p lanparty-relay
- cargo clippy -p lanparty-relay --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 20:42:21 +02:00
parent f4f617ea01
commit eedd03b98c
2 changed files with 55 additions and 1 deletions
+53
View File
@@ -273,6 +273,7 @@ async fn accept_control_handshake(
if let Some(accepted) = &accepted {
notify_peer_joined(sessions, accepted).await;
notify_existing_peers_to_joined_peer(rooms, connection, accepted).await;
}
Ok(accepted)
@@ -479,6 +480,25 @@ async fn notify_peer_joined(
}
}
async fn notify_existing_peers_to_joined_peer(
rooms: &Arc<Mutex<RoomRegistry>>,
connection: &quinn::Connection,
accepted: &AcceptedPeer,
) {
let peers = room_peers_except(rooms, &accepted.room, accepted.peer.peer_id()).await;
for peer in peers {
let message = ControlMessage::PeerJoined(peer);
if let Err(error) = send_control_event(connection, &message).await {
eprintln!(
"failed to send existing room peer to peer {} in room {}: {error:#}",
accepted.peer.peer_id(),
accepted.room
);
}
}
}
async fn notify_peer_left(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode,
@@ -495,6 +515,32 @@ async fn notify_peer_left(
}
}
async fn room_peers_except(
rooms: &Arc<Mutex<RoomRegistry>>,
room: &RoomCode,
excluded_peer_id: u32,
) -> Vec<PeerInfo> {
let Some(snapshot) = rooms.lock().await.snapshot(room) else {
return Vec::new();
};
let mut peers =
Vec::with_capacity(snapshot.clients().len() + usize::from(snapshot.gateway().is_some()));
if let Some(gateway) = snapshot.gateway()
&& gateway.peer_id() != excluded_peer_id
{
peers.push(gateway.clone());
}
peers.extend(
snapshot
.clients()
.iter()
.filter(|peer| peer.peer_id() != excluded_peer_id)
.cloned(),
);
peers
}
async fn collect_room_sessions_except(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode,
@@ -899,6 +945,13 @@ mod tests {
assert_eq!(peer.peer_id(), second_welcome.peer_id());
assert_eq!(peer.role(), Role::Client);
assert_eq!(peer.mac(), Some(second_mac));
let event = read_control_event(&second_connection).await;
let ControlMessage::PeerJoined(peer) = event else {
panic!("expected peer joined catch-up event");
};
assert_eq!(peer.peer_id(), first_welcome.peer_id());
assert_eq!(peer.role(), Role::Client);
assert_eq!(peer.mac(), Some(first_mac));
let ethernet = ethernet_frame(second_mac, first_mac);
let datagram = encode_datagram(