fix(relay): require peer catch-up during joins
Catch-up lifecycle events to the newly joined peer are part of the accepted handshake. They seed the peer's view of room membership before the rest of the room can rely on that peer being ready. The gateway-join ordering already sends the current client list to a new gateway before notifying clients that a gateway is available. Make that delivery mandatory: if the relay cannot send the catch-up event stream, clean up the accepted peer and fail the handshake instead of publishing an incompletely seeded gateway. The same required catch-up behavior applies to newly joined clients after their welcome. Existing-peer join/leave notifications remain best effort; a stale existing peer should not prevent the relay from accepting healthy joins. Test Plan: - cargo fmt --check - cargo test -p lanparty-relay - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings - git diff --check - git diff --cached --check Refs: MVP lifecycle ordering
This commit is contained in:
@@ -322,7 +322,19 @@ async fn accept_control_handshake(
|
||||
}
|
||||
}
|
||||
AcceptedHandshakeStep::NotifyExistingPeersToJoined => {
|
||||
notify_existing_peers_to_joined_peer(rooms, connection, &accepted).await;
|
||||
if let Err(error) =
|
||||
notify_existing_peers_to_joined_peer(rooms, connection, &accepted).await
|
||||
{
|
||||
cleanup_accepted_handshake(
|
||||
rooms,
|
||||
sessions,
|
||||
&accepted,
|
||||
accepted.peer.role() == Role::Client,
|
||||
)
|
||||
.await?;
|
||||
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -734,19 +746,23 @@ async fn notify_existing_peers_to_joined_peer(
|
||||
rooms: &Arc<Mutex<RoomRegistry>>,
|
||||
connection: &quinn::Connection,
|
||||
accepted: &AcceptedPeer,
|
||||
) {
|
||||
) -> Result<()> {
|
||||
let peers = room_peers_except(rooms, &accepted.room, accepted.peer.peer_id()).await;
|
||||
|
||||
for peer in peers {
|
||||
let message = ControlMessage::PeerJoined(peer);
|
||||
if let Err(error) = send_control_event(connection, &message).await {
|
||||
eprintln!(
|
||||
"failed to send existing room peer to peer {} in room {}: {error:#}",
|
||||
accepted.peer.peer_id(),
|
||||
accepted.room
|
||||
);
|
||||
}
|
||||
send_control_event(connection, &message)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to send existing room peer to peer {} in room {}",
|
||||
accepted.peer.peer_id(),
|
||||
accepted.room
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn notify_peer_left(
|
||||
@@ -1217,6 +1233,45 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fails_catchup_when_joined_peer_connection_is_closed() {
|
||||
let rooms = Arc::new(Mutex::new(RoomRegistry::default()));
|
||||
let _client = accepted_client_for_forwarding(&rooms, client_mac(1)).await;
|
||||
let gateway = accepted_gateway_for_forwarding(&rooms).await;
|
||||
let (server_config, certificate) = development_server_config_with_certificate().unwrap();
|
||||
let server_endpoint = Endpoint::server(
|
||||
server_config,
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
|
||||
)
|
||||
.unwrap();
|
||||
let server_addr = server_endpoint.local_addr().unwrap();
|
||||
let client_endpoint = client_endpoint(certificate).unwrap();
|
||||
let client_connecting = client_endpoint
|
||||
.connect(server_addr, "lanparty-relay.local")
|
||||
.unwrap();
|
||||
let incoming = server_endpoint.accept().await.unwrap();
|
||||
let (client_connection, server_connection) =
|
||||
tokio::try_join!(client_connecting, incoming).unwrap();
|
||||
|
||||
client_connection.close(0_u32.into(), b"test complete");
|
||||
tokio::time::timeout(Duration::from_secs(5), server_connection.closed())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let error = notify_existing_peers_to_joined_peer(&rooms, &server_connection, &gateway)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
assert!(
|
||||
format!("{error:#}").contains("failed to send existing room peer to peer"),
|
||||
"{error:#}"
|
||||
);
|
||||
|
||||
server_endpoint.close(0_u32.into(), b"test complete");
|
||||
client_endpoint.wait_idle().await;
|
||||
server_endpoint.wait_idle().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn formats_malformed_datagram_log_line() {
|
||||
let rooms = Arc::new(Mutex::new(RoomRegistry::default()));
|
||||
@@ -1779,6 +1834,19 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
async fn accepted_gateway_for_forwarding(rooms: &Arc<Mutex<RoomRegistry>>) -> AcceptedPeer {
|
||||
let hello = EndpointHello::gateway(RoomCode::new("TESTROOM").unwrap(), 1400).unwrap();
|
||||
let join = rooms.lock().await.join(hello).unwrap();
|
||||
|
||||
AcceptedPeer {
|
||||
room: RoomCode::new("TESTROOM").unwrap(),
|
||||
welcome: join.welcome().clone(),
|
||||
peer: join.peer().clone(),
|
||||
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 12345),
|
||||
max_datagram_size: 1400,
|
||||
}
|
||||
}
|
||||
|
||||
fn client_mac(last: u8) -> MacAddr {
|
||||
MacAddr::new([0x02, 0, 0, 0, 0, last])
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user