diff --git a/README.md b/README.md index 552627a..c9505a0 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,7 @@ Public relay binary and relay-owned room state: - client broadcast/multicast, unknown-unicast, and total bandwidth limiting - malformed peer datagram disconnect threshold - peer stats control events retained for relay diagnostics +- graceful disconnect control events propagated as peer-leave reasons - peer leave cleanup for room membership and MAC indexes ## Build diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index 45edce3..5c8a53a 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -342,7 +342,10 @@ async fn run_peer_io( match handle_peer_control_stream(sessions, accepted, stream).await { Ok(PeerControlOutcome::Continue) => {} - Ok(PeerControlOutcome::Close(close)) => return close, + Ok(PeerControlOutcome::Close(close)) => { + connection.close(0_u32.into(), close.message.as_bytes()); + return close; + } Err(error) => { let reason = format!("invalid peer control stream: {error:#}"); connection.close(0_u32.into(), reason.as_bytes()); @@ -1112,6 +1115,82 @@ mod tests { assert!(sessions.lock().await.is_empty()); } + #[tokio::test] + async fn forwards_graceful_disconnect_reason_to_remaining_peers() { + let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM); + let rooms = Arc::clone(&server.rooms); + let sessions = Arc::clone(&server.sessions); + let server_addr = server.local_addr().unwrap(); + let server_task = tokio::spawn(async move { + let handles = server.accept_many_for_test(2).await.unwrap(); + let mut accepted = Vec::with_capacity(handles.len()); + + for handle in handles { + accepted.push(handle.await.unwrap().unwrap().unwrap()); + } + + server.shutdown("test complete").await; + accepted + }); + + let first_endpoint = client_endpoint(certificate.clone()).unwrap(); + let first_connection = first_endpoint + .connect(server_addr, "lanparty-relay.local") + .unwrap() + .await + .unwrap(); + let second_endpoint = client_endpoint(certificate).unwrap(); + let second_connection = second_endpoint + .connect(server_addr, "lanparty-relay.local") + .unwrap() + .await + .unwrap(); + + let first_welcome = welcome_for_client(&first_connection, client_mac(1)).await; + let second_welcome = welcome_for_client(&second_connection, client_mac(2)).await; + + let ControlMessage::PeerJoined(peer) = read_control_event(&first_connection).await else { + panic!("expected live peer joined event"); + }; + assert_eq!(peer.peer_id(), second_welcome.peer_id()); + let ControlMessage::PeerJoined(peer) = read_control_event(&second_connection).await else { + panic!("expected peer joined catch-up event"); + }; + assert_eq!(peer.peer_id(), first_welcome.peer_id()); + + send_peer_control_event( + &first_connection, + ControlMessage::Disconnect { + reason: DisconnectReason::TimedOut, + message: "idle timeout".to_owned(), + }, + ) + .await; + tokio::time::timeout(Duration::from_secs(5), first_connection.closed()) + .await + .unwrap(); + + let ControlMessage::PeerLeft { peer_id, reason } = + read_control_event(&second_connection).await + else { + panic!("expected peer left event"); + }; + assert_eq!(peer_id, first_welcome.peer_id()); + assert_eq!(reason, DisconnectReason::TimedOut); + + second_connection.close(0_u32.into(), b"test complete"); + first_endpoint.wait_idle().await; + second_endpoint.wait_idle().await; + + let accepted = tokio::time::timeout(Duration::from_secs(5), server_task) + .await + .unwrap() + .unwrap(); + assert_eq!(accepted.len(), 2); + assert_eq!(rooms.lock().await.room_count(), 0); + assert!(sessions.lock().await.is_empty()); + } + fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) { let (server_config, certificate) = development_server_config_with_certificate().unwrap(); let endpoint = Endpoint::server(