From b8ae95a911a00e5f50b9efcfc931b3ce62ff42d1 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Thu, 21 May 2026 17:51:40 +0200 Subject: [PATCH] feat(relay): accept control handshakes The relay now keeps a shared room registry behind the QUIC endpoint and runs an accept loop instead of only binding the socket. Each accepted connection must open its first bidirectional control stream with a hello frame; the relay joins the room registry and replies with welcome or reject. Admission clamps the hello datagram budget to Quinn's negotiated peer datagram size before choosing the effective room MTU, so room state is based on what the connection can actually carry. Accepted peers remain present until the QUIC connection closes, then the relay removes them through the existing leave cleanup path. The development self-signed certificate helper now exposes the certificate to tests so a loopback Quinn client can trust the relay and exercise the real stream codec path. Test Plan: - cargo fmt --check - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings Refs: PLAN.md relay QUIC control-stream startup flow --- Cargo.toml | 2 +- README.md | 10 +- crates/lanparty-relay/src/lib.rs | 5 +- crates/lanparty-relay/src/main.rs | 3 +- crates/lanparty-relay/src/server.rs | 364 ++++++++++++++++++++++++++-- 5 files changed, 361 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 122a48d..2757b3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,5 +24,5 @@ rustls = { version = "0.23", default-features = false, features = ["ring", "std" serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "2" -tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal"] } +tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } tracing = "0.1" diff --git a/README.md b/README.md index 8f38555..b92936c 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ Shared diagnostics and structured logging vocabulary: Public relay binary and relay-owned room state: +- QUIC endpoint binding and first-stream hello/welcome admission - room admission for clients and gateways - one gateway per room, duplicate client MAC rejection, and room limits - stable effective room MTU chosen before Ethernet datagrams flow @@ -61,7 +62,8 @@ cargo run -p lanparty-relay -- --listen 443/udp ``` `--listen` accepts either a socket address or a UDP port shorthand such as -`443/udp`. The relay binds a QUIC endpoint and waits for shutdown, but -connection handling is not wired yet. It currently uses a generated self-signed -development certificate; production certificate and client trust handling remain -future work. +`443/udp`. The relay binds a QUIC endpoint, accepts a control-stream `hello`, +and replies with `welcome` or `reject`. Ethernet datagram forwarding is still +implemented as relay-owned decisions but not yet wired to live QUIC datagrams. +It currently uses a generated self-signed development certificate; production +certificate and client trust handling remain future work. diff --git a/crates/lanparty-relay/src/lib.rs b/crates/lanparty-relay/src/lib.rs index 85fdae4..13886b1 100644 --- a/crates/lanparty-relay/src/lib.rs +++ b/crates/lanparty-relay/src/lib.rs @@ -1,7 +1,8 @@ //! Relay room state and admission control. //! -//! QUIC accept loops will sit above this crate layer. Keeping room admission -//! separate makes the important relay invariants testable without sockets. +//! The QUIC server loop admits peers through this room registry, while the +//! registry itself stays socket-free so the relay invariants remain directly +//! testable. mod config; mod server; diff --git a/crates/lanparty-relay/src/main.rs b/crates/lanparty-relay/src/main.rs index 0094d70..0150daf 100644 --- a/crates/lanparty-relay/src/main.rs +++ b/crates/lanparty-relay/src/main.rs @@ -15,6 +15,5 @@ async fn run(config: RelayConfig) -> anyhow::Result<()> { ); let server = RelayServer::bind(&config)?; println!("lanparty-relay listening on {}", server.local_addr()?); - println!("connection handling is not wired yet; press Ctrl-C to stop"); - server.wait_for_shutdown().await + server.run_until_shutdown().await } diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index dcff340..4352606 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -1,17 +1,35 @@ use std::{net::SocketAddr, sync::Arc}; -use anyhow::{Context, Result}; -use quinn::{Endpoint, ServerConfig, TransportConfig, crypto::rustls::QuicServerConfig}; -use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer}; +use anyhow::{Context, Result, anyhow}; +use lanparty_ctrl::{ + CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, EndpointHello, + MAX_CONTROL_MESSAGE_LEN, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome, + decode_control_frame, encode_control_message, +}; +use quinn::crypto::rustls::QuicServerConfig; +use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; +use tokio::sync::Mutex; -use crate::RelayConfig; +use crate::{RelayConfig, RoomRegistry}; const RELAY_ALPN: &[u8] = b"lanparty-l2/1"; const DATAGRAM_BUFFER_BYTES: usize = 4 * 1024 * 1024; +const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MESSAGE_LEN; #[derive(Debug)] pub struct RelayServer { endpoint: Endpoint, + rooms: Arc>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct AcceptedPeer { + room: RoomCode, + welcome: ServerWelcome, + peer: PeerInfo, + remote_addr: SocketAddr, + max_datagram_size: usize, } impl RelayServer { @@ -20,7 +38,14 @@ impl RelayServer { let endpoint = Endpoint::server(server_config, config.listen().socket_addr()) .context("failed to bind QUIC relay endpoint")?; - Ok(Self { endpoint }) + Ok(Self::from_endpoint(endpoint, config.max_clients_per_room())) + } + + fn from_endpoint(endpoint: Endpoint, max_clients_per_room: usize) -> Self { + Self { + endpoint, + rooms: Arc::new(Mutex::new(RoomRegistry::new(max_clients_per_room))), + } } pub fn local_addr(&self) -> Result { @@ -29,13 +54,40 @@ impl RelayServer { .context("failed to read relay local address") } - pub async fn wait_for_shutdown(self) -> Result<()> { - tokio::signal::ctrl_c() - .await - .context("failed to wait for Ctrl-C")?; - self.shutdown("relay shutting down").await; + pub async fn run_until_shutdown(self) -> Result<()> { + let endpoint = self.endpoint.clone(); + let rooms = Arc::clone(&self.rooms); - Ok(()) + loop { + tokio::select! { + shutdown = tokio::signal::ctrl_c() => { + shutdown.context("failed to wait for Ctrl-C")?; + self.shutdown("relay shutting down").await; + return Ok(()); + } + incoming = endpoint.accept() => { + let Some(incoming) = incoming else { + return Ok(()); + }; + let rooms = Arc::clone(&rooms); + + tokio::spawn(async move { + if let Err(error) = handle_incoming_connection(rooms, incoming).await { + eprintln!("relay connection failed: {error:#}"); + } + }); + } + } + } + } + + #[cfg(test)] + async fn accept_once(&self) -> Result> { + let Some(incoming) = self.endpoint.accept().await else { + return Ok(None); + }; + + handle_incoming_connection(Arc::clone(&self.rooms), incoming).await } pub async fn shutdown(self, reason: &str) { @@ -44,10 +96,186 @@ impl RelayServer { } } +async fn handle_incoming_connection( + rooms: Arc>, + incoming: Incoming, +) -> Result> { + let remote_addr = incoming.remote_address(); + let connection = incoming + .await + .with_context(|| format!("failed to establish QUIC connection from {remote_addr}"))?; + let Some(accepted) = accept_control_handshake(&rooms, &connection).await? else { + return Ok(None); + }; + + println!( + "accepted {:?} peer {} in room {} from {} with peer datagram budget {} and TAP MTU {}", + accepted.peer.role(), + accepted.peer.peer_id(), + accepted.room, + accepted.remote_addr, + accepted.max_datagram_size, + accepted.welcome.effective_tap_mtu() + ); + + let close_reason = connection.closed().await; + leave_peer(&rooms, &accepted.room, accepted.peer.peer_id()).await?; + println!( + "peer {} left room {}: {}", + accepted.peer.peer_id(), + accepted.room, + close_reason + ); + + Ok(Some(accepted)) +} + +async fn accept_control_handshake( + rooms: &Arc>, + connection: &quinn::Connection, +) -> Result> { + let (mut send, mut recv) = connection + .accept_bi() + .await + .context("failed to accept relay control stream")?; + let frame = recv + .read_to_end(MAX_CONTROL_FRAME_LEN) + .await + .context("failed to read relay control hello")?; + + let (accepted, response) = build_handshake_response(rooms, connection, frame.as_slice()).await; + + if let Err(error) = send_control_message(&mut send, &response).await { + if let Some(accepted) = &accepted { + leave_peer(rooms, &accepted.room, accepted.peer.peer_id()).await?; + } + + return Err(error); + } + + Ok(accepted) +} + +async fn build_handshake_response( + rooms: &Arc>, + connection: &quinn::Connection, + frame: &[u8], +) -> (Option, ControlMessage) { + let Some(connection_max_datagram_size) = connection.max_datagram_size() else { + return reject(Reject::new( + RejectReason::MtuTooSmall, + "QUIC DATAGRAM support was not negotiated", + )); + }; + + let message = match decode_control_frame(frame) { + Ok(message) => message, + Err(error) => return reject(reject_codec_error(error)), + }; + + let ControlMessage::Hello(hello) = message else { + return reject(Reject::new( + RejectReason::MalformedHello, + "first relay control frame must be hello", + )); + }; + + let room = hello.room().clone(); + let hello = match limit_hello_to_connection(hello, connection_max_datagram_size) { + Ok(hello) => hello, + Err(reject) => return (None, ControlMessage::Reject(reject)), + }; + let join = rooms.lock().await.join(hello); + + match join { + Ok(join) => { + let accepted = AcceptedPeer { + room, + welcome: join.welcome().clone(), + peer: join.peer().clone(), + remote_addr: connection.remote_address(), + max_datagram_size: connection_max_datagram_size, + }; + + ( + Some(accepted), + ControlMessage::Welcome(join.welcome().clone()), + ) + } + Err(reject) => (None, ControlMessage::Reject(reject)), + } +} + +fn limit_hello_to_connection( + hello: EndpointHello, + connection_max_datagram_size: usize, +) -> Result { + let max_datagram_size = usize::from(hello.max_datagram_size()) + .min(connection_max_datagram_size) + .min(usize::from(u16::MAX)) as u16; + + match hello.role() { + Role::Client => EndpointHello::client( + hello.room().clone(), + hello + .announced_mac() + .expect("validated client hello has MAC address"), + max_datagram_size, + ), + Role::Gateway => EndpointHello::gateway(hello.room().clone(), max_datagram_size), + } + .map_err(crate::reject_control_error) +} + +fn reject(reject: Reject) -> (Option, ControlMessage) { + (None, ControlMessage::Reject(reject)) +} + +fn reject_codec_error(error: ControlCodecError) -> Reject { + match error { + ControlCodecError::InvalidMessage(error) => crate::reject_control_error(error), + ControlCodecError::FrameTooShort { .. } + | ControlCodecError::MessageTooLarge { .. } + | ControlCodecError::IncompletePayload { .. } + | ControlCodecError::TrailingBytes { .. } + | ControlCodecError::Json(_) => { + Reject::new(RejectReason::MalformedHello, error.to_string()) + } + } +} + +async fn send_control_message(send: &mut SendStream, message: &ControlMessage) -> Result<()> { + let response = encode_control_message(message).context("failed to encode control response")?; + send.write_all(&response) + .await + .context("failed to write control response")?; + send.finish() + .map_err(|error| anyhow!("failed to finish control response stream: {error}"))?; + + Ok(()) +} + +async fn leave_peer(rooms: &Arc>, room: &RoomCode, peer_id: u32) -> Result<()> { + rooms + .lock() + .await + .leave(room, peer_id) + .with_context(|| format!("failed to remove peer {peer_id} from room {room}"))?; + + Ok(()) +} + fn development_server_config() -> Result { + let (server_config, _) = development_server_config_with_certificate()?; + + Ok(server_config) +} + +fn development_server_config_with_certificate() -> Result<(ServerConfig, CertificateDer<'static>)> { let certified_key = rcgen::generate_simple_self_signed(vec!["lanparty-relay.local".into()]) .context("failed to generate development relay certificate")?; - let cert_chain = vec![certified_key.cert.der().clone()]; + let certificate = certified_key.cert.der().clone(); + let cert_chain = vec![certificate.clone()]; let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from( certified_key.signing_key.serialize_der(), )); @@ -67,12 +295,19 @@ fn development_server_config() -> Result { transport.datagram_send_buffer_size(DATAGRAM_BUFFER_BYTES); server_config.transport_config(Arc::new(transport)); - Ok(server_config) + Ok((server_config, certificate)) } #[cfg(test)] mod tests { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, + }; + + use lanparty_ctrl::{RoomCode, decode_control_frame, encode_control_message}; + use lanparty_proto::MacAddr; + use quinn::{ClientConfig, crypto::rustls::QuicClientConfig}; use crate::{DEFAULT_MAX_CLIENTS_PER_ROOM, ListenEndpoint}; @@ -94,4 +329,105 @@ mod tests { server.shutdown("test complete").await; } + + #[tokio::test] + async fn accepts_client_hello_and_replies_welcome() { + let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM); + let rooms = Arc::clone(&server.rooms); + let server_addr = server.local_addr().unwrap(); + let server_task = tokio::spawn(async move { + let accepted = server + .accept_once() + .await + .unwrap() + .expect("connection should be accepted"); + server.shutdown("test complete").await; + accepted + }); + + let client = client_endpoint(certificate).unwrap(); + let connection = client + .connect(server_addr, "lanparty-relay.local") + .unwrap() + .await + .unwrap(); + let hello = EndpointHello::client( + RoomCode::new("TESTROOM").unwrap(), + MacAddr::new([0x02, 0, 0, 0, 0, 1]), + 1400, + ) + .unwrap(); + let response = request_control_message(&connection, ControlMessage::Hello(hello)) + .await + .unwrap(); + + let ControlMessage::Welcome(welcome) = response else { + panic!("expected welcome response"); + }; + + assert_eq!(welcome.room_id(), 1); + assert_eq!(welcome.peer_id(), 1); + assert!(welcome.effective_tap_mtu() <= 1400); + + connection.close(0_u32.into(), b"test complete"); + client.wait_idle().await; + + let accepted = tokio::time::timeout(Duration::from_secs(5), server_task) + .await + .unwrap() + .unwrap(); + assert_eq!(accepted.room.as_str(), "TESTROOM"); + assert_eq!(accepted.peer.peer_id(), 1); + assert_eq!(accepted.welcome, welcome); + assert_eq!(rooms.lock().await.room_count(), 0); + } + + fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) { + let (server_config, certificate) = development_server_config_with_certificate().unwrap(); + let endpoint = Endpoint::server( + server_config, + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), + ) + .unwrap(); + + ( + RelayServer::from_endpoint(endpoint, max_clients_per_room), + certificate, + ) + } + + fn client_endpoint(certificate: CertificateDer<'static>) -> Result { + let mut roots = rustls::RootCertStore::empty(); + roots + .add(certificate) + .context("failed to trust relay test certificate")?; + let mut client_crypto = rustls::ClientConfig::builder() + .with_root_certificates(roots) + .with_no_client_auth(); + client_crypto.alpn_protocols = vec![RELAY_ALPN.to_vec()]; + + let client_config = ClientConfig::new(Arc::new( + QuicClientConfig::try_from(client_crypto).context("failed to build client config")?, + )); + let mut endpoint = Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)) + .context("failed to bind client endpoint")?; + endpoint.set_default_client_config(client_config); + + Ok(endpoint) + } + + async fn request_control_message( + connection: &quinn::Connection, + message: ControlMessage, + ) -> Result { + let (mut send, mut recv) = connection.open_bi().await?; + let request = encode_control_message(&message)?; + send.write_all(&request).await?; + send.finish() + .map_err(|error| anyhow!("failed to finish request stream: {error}"))?; + + let response = recv.read_to_end(MAX_CONTROL_FRAME_LEN).await?; + + Ok(decode_control_frame(&response)?) + } }