diff --git a/README.md b/README.md index 4d0a345..52203af 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,8 @@ logged with room, peer, MAC, ethertype, action, drop reason, and target count. Unknown unicast from a client is forwarded only to the gateway port; unknown unicast from the gateway is dropped instead of flooded to every remote client. When a peer joins or leaves, the relay sends a reliable lifecycle control event -to peers that are still present in the room. +to peers that are still present in the room. Newly joined peers also receive +`PeerJoined` events for peers that were already present. ## Gateway diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index 84cfd69..9dd6084 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -273,6 +273,7 @@ async fn accept_control_handshake( if let Some(accepted) = &accepted { notify_peer_joined(sessions, accepted).await; + notify_existing_peers_to_joined_peer(rooms, connection, accepted).await; } Ok(accepted) @@ -479,6 +480,25 @@ async fn notify_peer_joined( } } +async fn notify_existing_peers_to_joined_peer( + rooms: &Arc>, + connection: &quinn::Connection, + accepted: &AcceptedPeer, +) { + let peers = room_peers_except(rooms, &accepted.room, accepted.peer.peer_id()).await; + + for peer in peers { + let message = ControlMessage::PeerJoined(peer); + if let Err(error) = send_control_event(connection, &message).await { + eprintln!( + "failed to send existing room peer to peer {} in room {}: {error:#}", + accepted.peer.peer_id(), + accepted.room + ); + } + } +} + async fn notify_peer_left( sessions: &Arc>>, room: &RoomCode, @@ -495,6 +515,32 @@ async fn notify_peer_left( } } +async fn room_peers_except( + rooms: &Arc>, + room: &RoomCode, + excluded_peer_id: u32, +) -> Vec { + let Some(snapshot) = rooms.lock().await.snapshot(room) else { + return Vec::new(); + }; + let mut peers = + Vec::with_capacity(snapshot.clients().len() + usize::from(snapshot.gateway().is_some())); + if let Some(gateway) = snapshot.gateway() + && gateway.peer_id() != excluded_peer_id + { + peers.push(gateway.clone()); + } + peers.extend( + snapshot + .clients() + .iter() + .filter(|peer| peer.peer_id() != excluded_peer_id) + .cloned(), + ); + + peers +} + async fn collect_room_sessions_except( sessions: &Arc>>, room: &RoomCode, @@ -899,6 +945,13 @@ mod tests { assert_eq!(peer.peer_id(), second_welcome.peer_id()); assert_eq!(peer.role(), Role::Client); assert_eq!(peer.mac(), Some(second_mac)); + let event = read_control_event(&second_connection).await; + let ControlMessage::PeerJoined(peer) = event else { + panic!("expected peer joined catch-up event"); + }; + assert_eq!(peer.peer_id(), first_welcome.peer_id()); + assert_eq!(peer.role(), Role::Client); + assert_eq!(peer.mac(), Some(first_mac)); let ethernet = ethernet_frame(second_mac, first_mac); let datagram = encode_datagram(