diff --git a/README.md b/README.md index 611db6f..68e761f 100644 --- a/README.md +++ b/README.md @@ -153,11 +153,12 @@ Unknown unicast from a client is forwarded only to the gateway port; unknown unicast from the gateway is dropped instead of flooded to every remote client. When a peer joins or leaves, the relay sends a reliable lifecycle control event to peers that are still present in the room. Newly joined peers also receive -`PeerJoined` events for peers that were already present. When a client joins, -the relay notifies existing peers before the client receives its welcome, so -gateways can seed client MAC state before that client starts sending frames. -When a gateway joins, the relay gives the gateway the current client list -before notifying clients that the gateway is available. +`PeerJoined` events for peers that were already present, and catch-up delivery +is part of the accepted handshake rather than a best-effort follow-up. When a +client joins, the relay notifies existing peers before the client receives its +welcome, so gateways can seed client MAC state before that client starts +sending frames. When a gateway joins, the relay gives the gateway the current +client list before notifying clients that the gateway is available. ### MVP Trust Model diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index 2184c46..b798d28 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -322,7 +322,19 @@ async fn accept_control_handshake( } } AcceptedHandshakeStep::NotifyExistingPeersToJoined => { - notify_existing_peers_to_joined_peer(rooms, connection, &accepted).await; + if let Err(error) = + notify_existing_peers_to_joined_peer(rooms, connection, &accepted).await + { + cleanup_accepted_handshake( + rooms, + sessions, + &accepted, + accepted.peer.role() == Role::Client, + ) + .await?; + + return Err(error); + } } } } @@ -734,19 +746,23 @@ async fn notify_existing_peers_to_joined_peer( rooms: &Arc>, connection: &quinn::Connection, accepted: &AcceptedPeer, -) { +) -> Result<()> { let peers = room_peers_except(rooms, &accepted.room, accepted.peer.peer_id()).await; for peer in peers { let message = ControlMessage::PeerJoined(peer); - if let Err(error) = send_control_event(connection, &message).await { - eprintln!( - "failed to send existing room peer to peer {} in room {}: {error:#}", - accepted.peer.peer_id(), - accepted.room - ); - } + send_control_event(connection, &message) + .await + .with_context(|| { + format!( + "failed to send existing room peer to peer {} in room {}", + accepted.peer.peer_id(), + accepted.room + ) + })?; } + + Ok(()) } async fn notify_peer_left( @@ -1217,6 +1233,45 @@ mod tests { ); } + #[tokio::test] + async fn fails_catchup_when_joined_peer_connection_is_closed() { + let rooms = Arc::new(Mutex::new(RoomRegistry::default())); + let _client = accepted_client_for_forwarding(&rooms, client_mac(1)).await; + let gateway = accepted_gateway_for_forwarding(&rooms).await; + let (server_config, certificate) = development_server_config_with_certificate().unwrap(); + let server_endpoint = Endpoint::server( + server_config, + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), + ) + .unwrap(); + let server_addr = server_endpoint.local_addr().unwrap(); + let client_endpoint = client_endpoint(certificate).unwrap(); + let client_connecting = client_endpoint + .connect(server_addr, "lanparty-relay.local") + .unwrap(); + let incoming = server_endpoint.accept().await.unwrap(); + let (client_connection, server_connection) = + tokio::try_join!(client_connecting, incoming).unwrap(); + + client_connection.close(0_u32.into(), b"test complete"); + tokio::time::timeout(Duration::from_secs(5), server_connection.closed()) + .await + .unwrap(); + + let error = notify_existing_peers_to_joined_peer(&rooms, &server_connection, &gateway) + .await + .unwrap_err(); + + assert!( + format!("{error:#}").contains("failed to send existing room peer to peer"), + "{error:#}" + ); + + server_endpoint.close(0_u32.into(), b"test complete"); + client_endpoint.wait_idle().await; + server_endpoint.wait_idle().await; + } + #[tokio::test] async fn formats_malformed_datagram_log_line() { let rooms = Arc::new(Mutex::new(RoomRegistry::default())); @@ -1779,6 +1834,19 @@ mod tests { } } + async fn accepted_gateway_for_forwarding(rooms: &Arc>) -> AcceptedPeer { + let hello = EndpointHello::gateway(RoomCode::new("TESTROOM").unwrap(), 1400).unwrap(); + let join = rooms.lock().await.join(hello).unwrap(); + + AcceptedPeer { + room: RoomCode::new("TESTROOM").unwrap(), + welcome: join.welcome().clone(), + peer: join.peer().clone(), + remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 12345), + max_datagram_size: 1400, + } + } + fn client_mac(last: u8) -> MacAddr { MacAddr::new([0x02, 0, 0, 0, 0, last]) }