feat(relay): close graceful disconnect peers
Peers can now end their relay session with a post-handshake Disconnect control message, but the relay previously only returned from the peer loop. Explicitly close that peer connection before leaving the room so graceful disconnects are finite from both sides. The relay still runs the normal leave path after the close. That keeps room cleanup, session removal, and PeerLeft notification centralized, while the PeerLeft reason now comes from the peer's Disconnect message. The integration test covers two connected clients. One sends a TimedOut Disconnect, the relay closes that client connection, and the remaining peer receives PeerLeft with the TimedOut reason. Test Plan: - cargo fmt --check - cargo test -p lanparty-relay \ forwards_graceful_disconnect_reason_to_remaining_peers \ -- --nocapture - cargo test -p lanparty-relay - cargo clippy -p lanparty-relay --all-targets -- -D warnings - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings - git diff --check Refs: PLAN.md
This commit is contained in:
@@ -92,6 +92,7 @@ Public relay binary and relay-owned room state:
|
|||||||
- client broadcast/multicast, unknown-unicast, and total bandwidth limiting
|
- client broadcast/multicast, unknown-unicast, and total bandwidth limiting
|
||||||
- malformed peer datagram disconnect threshold
|
- malformed peer datagram disconnect threshold
|
||||||
- peer stats control events retained for relay diagnostics
|
- peer stats control events retained for relay diagnostics
|
||||||
|
- graceful disconnect control events propagated as peer-leave reasons
|
||||||
- peer leave cleanup for room membership and MAC indexes
|
- peer leave cleanup for room membership and MAC indexes
|
||||||
|
|
||||||
## Build
|
## Build
|
||||||
|
|||||||
@@ -342,7 +342,10 @@ async fn run_peer_io(
|
|||||||
|
|
||||||
match handle_peer_control_stream(sessions, accepted, stream).await {
|
match handle_peer_control_stream(sessions, accepted, stream).await {
|
||||||
Ok(PeerControlOutcome::Continue) => {}
|
Ok(PeerControlOutcome::Continue) => {}
|
||||||
Ok(PeerControlOutcome::Close(close)) => return close,
|
Ok(PeerControlOutcome::Close(close)) => {
|
||||||
|
connection.close(0_u32.into(), close.message.as_bytes());
|
||||||
|
return close;
|
||||||
|
}
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
let reason = format!("invalid peer control stream: {error:#}");
|
let reason = format!("invalid peer control stream: {error:#}");
|
||||||
connection.close(0_u32.into(), reason.as_bytes());
|
connection.close(0_u32.into(), reason.as_bytes());
|
||||||
@@ -1112,6 +1115,82 @@ mod tests {
|
|||||||
assert!(sessions.lock().await.is_empty());
|
assert!(sessions.lock().await.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn forwards_graceful_disconnect_reason_to_remaining_peers() {
|
||||||
|
let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM);
|
||||||
|
let rooms = Arc::clone(&server.rooms);
|
||||||
|
let sessions = Arc::clone(&server.sessions);
|
||||||
|
let server_addr = server.local_addr().unwrap();
|
||||||
|
let server_task = tokio::spawn(async move {
|
||||||
|
let handles = server.accept_many_for_test(2).await.unwrap();
|
||||||
|
let mut accepted = Vec::with_capacity(handles.len());
|
||||||
|
|
||||||
|
for handle in handles {
|
||||||
|
accepted.push(handle.await.unwrap().unwrap().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
server.shutdown("test complete").await;
|
||||||
|
accepted
|
||||||
|
});
|
||||||
|
|
||||||
|
let first_endpoint = client_endpoint(certificate.clone()).unwrap();
|
||||||
|
let first_connection = first_endpoint
|
||||||
|
.connect(server_addr, "lanparty-relay.local")
|
||||||
|
.unwrap()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let second_endpoint = client_endpoint(certificate).unwrap();
|
||||||
|
let second_connection = second_endpoint
|
||||||
|
.connect(server_addr, "lanparty-relay.local")
|
||||||
|
.unwrap()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let first_welcome = welcome_for_client(&first_connection, client_mac(1)).await;
|
||||||
|
let second_welcome = welcome_for_client(&second_connection, client_mac(2)).await;
|
||||||
|
|
||||||
|
let ControlMessage::PeerJoined(peer) = read_control_event(&first_connection).await else {
|
||||||
|
panic!("expected live peer joined event");
|
||||||
|
};
|
||||||
|
assert_eq!(peer.peer_id(), second_welcome.peer_id());
|
||||||
|
let ControlMessage::PeerJoined(peer) = read_control_event(&second_connection).await else {
|
||||||
|
panic!("expected peer joined catch-up event");
|
||||||
|
};
|
||||||
|
assert_eq!(peer.peer_id(), first_welcome.peer_id());
|
||||||
|
|
||||||
|
send_peer_control_event(
|
||||||
|
&first_connection,
|
||||||
|
ControlMessage::Disconnect {
|
||||||
|
reason: DisconnectReason::TimedOut,
|
||||||
|
message: "idle timeout".to_owned(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
tokio::time::timeout(Duration::from_secs(5), first_connection.closed())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let ControlMessage::PeerLeft { peer_id, reason } =
|
||||||
|
read_control_event(&second_connection).await
|
||||||
|
else {
|
||||||
|
panic!("expected peer left event");
|
||||||
|
};
|
||||||
|
assert_eq!(peer_id, first_welcome.peer_id());
|
||||||
|
assert_eq!(reason, DisconnectReason::TimedOut);
|
||||||
|
|
||||||
|
second_connection.close(0_u32.into(), b"test complete");
|
||||||
|
first_endpoint.wait_idle().await;
|
||||||
|
second_endpoint.wait_idle().await;
|
||||||
|
|
||||||
|
let accepted = tokio::time::timeout(Duration::from_secs(5), server_task)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(accepted.len(), 2);
|
||||||
|
assert_eq!(rooms.lock().await.room_count(), 0);
|
||||||
|
assert!(sessions.lock().await.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) {
|
fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) {
|
||||||
let (server_config, certificate) = development_server_config_with_certificate().unwrap();
|
let (server_config, certificate) = development_server_config_with_certificate().unwrap();
|
||||||
let endpoint = Endpoint::server(
|
let endpoint = Endpoint::server(
|
||||||
|
|||||||
Reference in New Issue
Block a user