diff --git a/README.md b/README.md index 64f65f0..677bee9 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,7 @@ Platform-neutral remote client relay session: - client hello with room, virtual MAC, and datagram budget - welcome/reject handling with assigned peer id and effective TAP MTU - QUIC DATAGRAM support and negotiated datagram budget diagnostics +- reliable relay control-event reads for peer lifecycle messages - Ethernet frame send/receive helpers over QUIC DATAGRAM - client tunnel statistics for frame/datagram rx/tx and drops diff --git a/crates/lanparty-client-core/src/lib.rs b/crates/lanparty-client-core/src/lib.rs index 973bcf8..56e7a3a 100644 --- a/crates/lanparty-client-core/src/lib.rs +++ b/crates/lanparty-client-core/src/lib.rs @@ -274,6 +274,10 @@ impl ClientSession { self.relay_io().recv_ethernet().await } + pub async fn recv_control_event(&self) -> Result { + recv_control_event(&self.connection).await + } + #[must_use] pub fn stats_snapshot(&self) -> TunnelStats { self.stats.snapshot() @@ -496,12 +500,25 @@ async fn request_control_message( Ok(decode_control_frame(&response)?) } +async fn recv_control_event(connection: &quinn::Connection) -> Result { + let mut recv = connection + .accept_uni() + .await + .context("failed to accept relay control event stream")?; + let frame = recv + .read_to_end(MAX_CONTROL_FRAME_LEN) + .await + .context("failed to read relay control event")?; + + decode_control_frame(&frame).context("failed to decode relay control event") +} + #[cfg(test)] mod tests { use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bytes::Bytes; - use lanparty_ctrl::Role; + use lanparty_ctrl::{PeerInfo, Role}; use quinn::{ServerConfig, TransportConfig, crypto::rustls::QuicServerConfig}; use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer}; @@ -652,6 +669,14 @@ mod tests { .unwrap(); connection.send_datagram(Bytes::from(response)).unwrap(); + let event = encode_control_message(&ControlMessage::PeerJoined( + PeerInfo::new(1, Role::Gateway, None).unwrap(), + )) + .unwrap(); + let mut event_send = connection.open_uni().await.unwrap(); + event_send.write_all(&event).await.unwrap(); + event_send.finish().unwrap(); + connection.closed().await; endpoint.close(0_u32.into(), b"test complete"); endpoint.wait_idle().await; @@ -694,6 +719,16 @@ mod tests { assert_eq!(received.source_peer_id(), 1); assert_eq!(received.payload(), ethernet_frame(b"from relay").as_slice()); + let event = tokio::time::timeout(Duration::from_secs(5), client.recv_control_event()) + .await + .unwrap() + .unwrap(); + let ControlMessage::PeerJoined(peer) = event else { + panic!("expected peer joined event"); + }; + assert_eq!(peer.peer_id(), 1); + assert_eq!(peer.role(), Role::Gateway); + assert!(relay_io.send_ethernet(&[0; 4]).is_err()); let stats = relay_io.stats_snapshot(); assert_eq!(stats.ethernet_frames_tx(), 1);