feat(relay): notify peers when rooms change

The relay now sends a reliable PeerJoined control event to peers that were
already present in the room after a new peer completes the hello/welcome
handshake. Events are sent on one-frame unidirectional QUIC streams, reusing the
existing control codec without keeping the room/session lock across I/O.

Delivery is best-effort for this first lifecycle slice: a notification failure
is logged, but the newly accepted peer remains joined. PeerLeft delivery and
client-side event consumption remain separate follow-up work.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-relay forwards_ethernet_datagrams_between_joined_peers \
  -- --nocapture
- cargo test -p lanparty-relay
- cargo clippy -p lanparty-relay --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
This commit is contained in:
2026-05-21 20:28:54 +02:00
parent 0824f60548
commit 7f4b22d5f4
2 changed files with 72 additions and 4 deletions
+69 -4
View File
@@ -242,6 +242,10 @@ async fn accept_control_handshake(
return Err(error);
}
if let Some(accepted) = &accepted {
notify_peer_joined(sessions, accepted).await;
}
Ok(accepted)
}
@@ -427,6 +431,50 @@ async fn collect_target_sessions(
.collect()
}
async fn notify_peer_joined(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer,
) {
let target_sessions =
collect_room_sessions_except(sessions, &accepted.room, accepted.peer.peer_id()).await;
let message = ControlMessage::PeerJoined(accepted.peer.clone());
for target in target_sessions {
if let Err(error) = send_control_event(&target.connection, &message).await {
eprintln!(
"failed to notify room {} about peer {} joining: {error:#}",
accepted.room,
accepted.peer.peer_id()
);
}
}
}
async fn collect_room_sessions_except(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode,
excluded_peer_id: u32,
) -> Vec<PeerSession> {
let sessions = sessions.lock().await;
sessions
.iter()
.filter(|(key, _)| key.room == *room && key.peer_id != excluded_peer_id)
.map(|(_, session)| session.clone())
.collect()
}
async fn send_control_event(
connection: &quinn::Connection,
message: &ControlMessage,
) -> Result<()> {
let mut send = connection
.open_uni()
.await
.context("failed to open relay control event stream")?;
send_control_message(&mut send, message).await
}
async fn build_handshake_response(
rooms: &Arc<Mutex<RoomRegistry>>,
connection: &quinn::Connection,
@@ -516,12 +564,12 @@ fn reject_codec_error(error: ControlCodecError) -> Reject {
}
async fn send_control_message(send: &mut SendStream, message: &ControlMessage) -> Result<()> {
let response = encode_control_message(message).context("failed to encode control response")?;
send.write_all(&response)
let frame = encode_control_message(message).context("failed to encode control message")?;
send.write_all(&frame)
.await
.context("failed to write control response")?;
.context("failed to write control message")?;
send.finish()
.map_err(|error| anyhow!("failed to finish control response stream: {error}"))?;
.map_err(|error| anyhow!("failed to finish control message stream: {error}"))?;
Ok(())
}
@@ -799,6 +847,13 @@ mod tests {
let second_mac = client_mac(2);
let first_welcome = welcome_for_client(&first_connection, first_mac).await;
let second_welcome = welcome_for_client(&second_connection, second_mac).await;
let event = read_control_event(&first_connection).await;
let ControlMessage::PeerJoined(peer) = event else {
panic!("expected peer joined event");
};
assert_eq!(peer.peer_id(), second_welcome.peer_id());
assert_eq!(peer.role(), Role::Client);
assert_eq!(peer.mac(), Some(second_mac));
let ethernet = ethernet_frame(second_mac, first_mac);
let datagram = encode_datagram(
@@ -890,6 +945,16 @@ mod tests {
Ok(decode_control_frame(&response)?)
}
async fn read_control_event(connection: &quinn::Connection) -> ControlMessage {
let mut recv = tokio::time::timeout(Duration::from_secs(5), connection.accept_uni())
.await
.unwrap()
.unwrap();
let frame = recv.read_to_end(MAX_CONTROL_FRAME_LEN).await.unwrap();
decode_control_frame(&frame).unwrap()
}
async fn welcome_for_client(connection: &quinn::Connection, mac: MacAddr) -> ServerWelcome {
let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap();
let response = request_control_message(connection, ControlMessage::Hello(hello))