diff --git a/Cargo.toml b/Cargo.toml index 122a48d..2757b3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,5 +24,5 @@ rustls = { version = "0.23", default-features = false, features = ["ring", "std" serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "2" -tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal"] } +tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } tracing = "0.1" diff --git a/README.md b/README.md index 8f38555..b92936c 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ Shared diagnostics and structured logging vocabulary: Public relay binary and relay-owned room state: +- QUIC endpoint binding and first-stream hello/welcome admission - room admission for clients and gateways - one gateway per room, duplicate client MAC rejection, and room limits - stable effective room MTU chosen before Ethernet datagrams flow @@ -61,7 +62,8 @@ cargo run -p lanparty-relay -- --listen 443/udp ``` `--listen` accepts either a socket address or a UDP port shorthand such as -`443/udp`. The relay binds a QUIC endpoint and waits for shutdown, but -connection handling is not wired yet. It currently uses a generated self-signed -development certificate; production certificate and client trust handling remain -future work. +`443/udp`. The relay binds a QUIC endpoint, accepts a control-stream `hello`, +and replies with `welcome` or `reject`. Ethernet datagram forwarding is still +implemented as relay-owned decisions but not yet wired to live QUIC datagrams. +It currently uses a generated self-signed development certificate; production +certificate and client trust handling remain future work. diff --git a/crates/lanparty-relay/src/lib.rs b/crates/lanparty-relay/src/lib.rs index 85fdae4..13886b1 100644 --- a/crates/lanparty-relay/src/lib.rs +++ b/crates/lanparty-relay/src/lib.rs @@ -1,7 +1,8 @@ //! Relay room state and admission control. //! -//! QUIC accept loops will sit above this crate layer. Keeping room admission -//! separate makes the important relay invariants testable without sockets. +//! The QUIC server loop admits peers through this room registry, while the +//! registry itself stays socket-free so the relay invariants remain directly +//! testable. mod config; mod server; diff --git a/crates/lanparty-relay/src/main.rs b/crates/lanparty-relay/src/main.rs index 0094d70..0150daf 100644 --- a/crates/lanparty-relay/src/main.rs +++ b/crates/lanparty-relay/src/main.rs @@ -15,6 +15,5 @@ async fn run(config: RelayConfig) -> anyhow::Result<()> { ); let server = RelayServer::bind(&config)?; println!("lanparty-relay listening on {}", server.local_addr()?); - println!("connection handling is not wired yet; press Ctrl-C to stop"); - server.wait_for_shutdown().await + server.run_until_shutdown().await } diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index dcff340..4352606 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -1,17 +1,35 @@ use std::{net::SocketAddr, sync::Arc}; -use anyhow::{Context, Result}; -use quinn::{Endpoint, ServerConfig, TransportConfig, crypto::rustls::QuicServerConfig}; -use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer}; +use anyhow::{Context, Result, anyhow}; +use lanparty_ctrl::{ + CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, EndpointHello, + MAX_CONTROL_MESSAGE_LEN, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome, + decode_control_frame, encode_control_message, +}; +use quinn::crypto::rustls::QuicServerConfig; +use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; +use tokio::sync::Mutex; -use crate::RelayConfig; +use crate::{RelayConfig, RoomRegistry}; const RELAY_ALPN: &[u8] = b"lanparty-l2/1"; const DATAGRAM_BUFFER_BYTES: usize = 4 * 1024 * 1024; +const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MESSAGE_LEN; #[derive(Debug)] pub struct RelayServer { endpoint: Endpoint, + rooms: Arc>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct AcceptedPeer { + room: RoomCode, + welcome: ServerWelcome, + peer: PeerInfo, + remote_addr: SocketAddr, + max_datagram_size: usize, } impl RelayServer { @@ -20,7 +38,14 @@ impl RelayServer { let endpoint = Endpoint::server(server_config, config.listen().socket_addr()) .context("failed to bind QUIC relay endpoint")?; - Ok(Self { endpoint }) + Ok(Self::from_endpoint(endpoint, config.max_clients_per_room())) + } + + fn from_endpoint(endpoint: Endpoint, max_clients_per_room: usize) -> Self { + Self { + endpoint, + rooms: Arc::new(Mutex::new(RoomRegistry::new(max_clients_per_room))), + } } pub fn local_addr(&self) -> Result { @@ -29,13 +54,40 @@ impl RelayServer { .context("failed to read relay local address") } - pub async fn wait_for_shutdown(self) -> Result<()> { - tokio::signal::ctrl_c() - .await - .context("failed to wait for Ctrl-C")?; - self.shutdown("relay shutting down").await; + pub async fn run_until_shutdown(self) -> Result<()> { + let endpoint = self.endpoint.clone(); + let rooms = Arc::clone(&self.rooms); - Ok(()) + loop { + tokio::select! { + shutdown = tokio::signal::ctrl_c() => { + shutdown.context("failed to wait for Ctrl-C")?; + self.shutdown("relay shutting down").await; + return Ok(()); + } + incoming = endpoint.accept() => { + let Some(incoming) = incoming else { + return Ok(()); + }; + let rooms = Arc::clone(&rooms); + + tokio::spawn(async move { + if let Err(error) = handle_incoming_connection(rooms, incoming).await { + eprintln!("relay connection failed: {error:#}"); + } + }); + } + } + } + } + + #[cfg(test)] + async fn accept_once(&self) -> Result> { + let Some(incoming) = self.endpoint.accept().await else { + return Ok(None); + }; + + handle_incoming_connection(Arc::clone(&self.rooms), incoming).await } pub async fn shutdown(self, reason: &str) { @@ -44,10 +96,186 @@ impl RelayServer { } } +async fn handle_incoming_connection( + rooms: Arc>, + incoming: Incoming, +) -> Result> { + let remote_addr = incoming.remote_address(); + let connection = incoming + .await + .with_context(|| format!("failed to establish QUIC connection from {remote_addr}"))?; + let Some(accepted) = accept_control_handshake(&rooms, &connection).await? else { + return Ok(None); + }; + + println!( + "accepted {:?} peer {} in room {} from {} with peer datagram budget {} and TAP MTU {}", + accepted.peer.role(), + accepted.peer.peer_id(), + accepted.room, + accepted.remote_addr, + accepted.max_datagram_size, + accepted.welcome.effective_tap_mtu() + ); + + let close_reason = connection.closed().await; + leave_peer(&rooms, &accepted.room, accepted.peer.peer_id()).await?; + println!( + "peer {} left room {}: {}", + accepted.peer.peer_id(), + accepted.room, + close_reason + ); + + Ok(Some(accepted)) +} + +async fn accept_control_handshake( + rooms: &Arc>, + connection: &quinn::Connection, +) -> Result> { + let (mut send, mut recv) = connection + .accept_bi() + .await + .context("failed to accept relay control stream")?; + let frame = recv + .read_to_end(MAX_CONTROL_FRAME_LEN) + .await + .context("failed to read relay control hello")?; + + let (accepted, response) = build_handshake_response(rooms, connection, frame.as_slice()).await; + + if let Err(error) = send_control_message(&mut send, &response).await { + if let Some(accepted) = &accepted { + leave_peer(rooms, &accepted.room, accepted.peer.peer_id()).await?; + } + + return Err(error); + } + + Ok(accepted) +} + +async fn build_handshake_response( + rooms: &Arc>, + connection: &quinn::Connection, + frame: &[u8], +) -> (Option, ControlMessage) { + let Some(connection_max_datagram_size) = connection.max_datagram_size() else { + return reject(Reject::new( + RejectReason::MtuTooSmall, + "QUIC DATAGRAM support was not negotiated", + )); + }; + + let message = match decode_control_frame(frame) { + Ok(message) => message, + Err(error) => return reject(reject_codec_error(error)), + }; + + let ControlMessage::Hello(hello) = message else { + return reject(Reject::new( + RejectReason::MalformedHello, + "first relay control frame must be hello", + )); + }; + + let room = hello.room().clone(); + let hello = match limit_hello_to_connection(hello, connection_max_datagram_size) { + Ok(hello) => hello, + Err(reject) => return (None, ControlMessage::Reject(reject)), + }; + let join = rooms.lock().await.join(hello); + + match join { + Ok(join) => { + let accepted = AcceptedPeer { + room, + welcome: join.welcome().clone(), + peer: join.peer().clone(), + remote_addr: connection.remote_address(), + max_datagram_size: connection_max_datagram_size, + }; + + ( + Some(accepted), + ControlMessage::Welcome(join.welcome().clone()), + ) + } + Err(reject) => (None, ControlMessage::Reject(reject)), + } +} + +fn limit_hello_to_connection( + hello: EndpointHello, + connection_max_datagram_size: usize, +) -> Result { + let max_datagram_size = usize::from(hello.max_datagram_size()) + .min(connection_max_datagram_size) + .min(usize::from(u16::MAX)) as u16; + + match hello.role() { + Role::Client => EndpointHello::client( + hello.room().clone(), + hello + .announced_mac() + .expect("validated client hello has MAC address"), + max_datagram_size, + ), + Role::Gateway => EndpointHello::gateway(hello.room().clone(), max_datagram_size), + } + .map_err(crate::reject_control_error) +} + +fn reject(reject: Reject) -> (Option, ControlMessage) { + (None, ControlMessage::Reject(reject)) +} + +fn reject_codec_error(error: ControlCodecError) -> Reject { + match error { + ControlCodecError::InvalidMessage(error) => crate::reject_control_error(error), + ControlCodecError::FrameTooShort { .. } + | ControlCodecError::MessageTooLarge { .. } + | ControlCodecError::IncompletePayload { .. } + | ControlCodecError::TrailingBytes { .. } + | ControlCodecError::Json(_) => { + Reject::new(RejectReason::MalformedHello, error.to_string()) + } + } +} + +async fn send_control_message(send: &mut SendStream, message: &ControlMessage) -> Result<()> { + let response = encode_control_message(message).context("failed to encode control response")?; + send.write_all(&response) + .await + .context("failed to write control response")?; + send.finish() + .map_err(|error| anyhow!("failed to finish control response stream: {error}"))?; + + Ok(()) +} + +async fn leave_peer(rooms: &Arc>, room: &RoomCode, peer_id: u32) -> Result<()> { + rooms + .lock() + .await + .leave(room, peer_id) + .with_context(|| format!("failed to remove peer {peer_id} from room {room}"))?; + + Ok(()) +} + fn development_server_config() -> Result { + let (server_config, _) = development_server_config_with_certificate()?; + + Ok(server_config) +} + +fn development_server_config_with_certificate() -> Result<(ServerConfig, CertificateDer<'static>)> { let certified_key = rcgen::generate_simple_self_signed(vec!["lanparty-relay.local".into()]) .context("failed to generate development relay certificate")?; - let cert_chain = vec![certified_key.cert.der().clone()]; + let certificate = certified_key.cert.der().clone(); + let cert_chain = vec![certificate.clone()]; let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from( certified_key.signing_key.serialize_der(), )); @@ -67,12 +295,19 @@ fn development_server_config() -> Result { transport.datagram_send_buffer_size(DATAGRAM_BUFFER_BYTES); server_config.transport_config(Arc::new(transport)); - Ok(server_config) + Ok((server_config, certificate)) } #[cfg(test)] mod tests { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, + }; + + use lanparty_ctrl::{RoomCode, decode_control_frame, encode_control_message}; + use lanparty_proto::MacAddr; + use quinn::{ClientConfig, crypto::rustls::QuicClientConfig}; use crate::{DEFAULT_MAX_CLIENTS_PER_ROOM, ListenEndpoint}; @@ -94,4 +329,105 @@ mod tests { server.shutdown("test complete").await; } + + #[tokio::test] + async fn accepts_client_hello_and_replies_welcome() { + let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM); + let rooms = Arc::clone(&server.rooms); + let server_addr = server.local_addr().unwrap(); + let server_task = tokio::spawn(async move { + let accepted = server + .accept_once() + .await + .unwrap() + .expect("connection should be accepted"); + server.shutdown("test complete").await; + accepted + }); + + let client = client_endpoint(certificate).unwrap(); + let connection = client + .connect(server_addr, "lanparty-relay.local") + .unwrap() + .await + .unwrap(); + let hello = EndpointHello::client( + RoomCode::new("TESTROOM").unwrap(), + MacAddr::new([0x02, 0, 0, 0, 0, 1]), + 1400, + ) + .unwrap(); + let response = request_control_message(&connection, ControlMessage::Hello(hello)) + .await + .unwrap(); + + let ControlMessage::Welcome(welcome) = response else { + panic!("expected welcome response"); + }; + + assert_eq!(welcome.room_id(), 1); + assert_eq!(welcome.peer_id(), 1); + assert!(welcome.effective_tap_mtu() <= 1400); + + connection.close(0_u32.into(), b"test complete"); + client.wait_idle().await; + + let accepted = tokio::time::timeout(Duration::from_secs(5), server_task) + .await + .unwrap() + .unwrap(); + assert_eq!(accepted.room.as_str(), "TESTROOM"); + assert_eq!(accepted.peer.peer_id(), 1); + assert_eq!(accepted.welcome, welcome); + assert_eq!(rooms.lock().await.room_count(), 0); + } + + fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) { + let (server_config, certificate) = development_server_config_with_certificate().unwrap(); + let endpoint = Endpoint::server( + server_config, + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), + ) + .unwrap(); + + ( + RelayServer::from_endpoint(endpoint, max_clients_per_room), + certificate, + ) + } + + fn client_endpoint(certificate: CertificateDer<'static>) -> Result { + let mut roots = rustls::RootCertStore::empty(); + roots + .add(certificate) + .context("failed to trust relay test certificate")?; + let mut client_crypto = rustls::ClientConfig::builder() + .with_root_certificates(roots) + .with_no_client_auth(); + client_crypto.alpn_protocols = vec![RELAY_ALPN.to_vec()]; + + let client_config = ClientConfig::new(Arc::new( + QuicClientConfig::try_from(client_crypto).context("failed to build client config")?, + )); + let mut endpoint = Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)) + .context("failed to bind client endpoint")?; + endpoint.set_default_client_config(client_config); + + Ok(endpoint) + } + + async fn request_control_message( + connection: &quinn::Connection, + message: ControlMessage, + ) -> Result { + let (mut send, mut recv) = connection.open_bi().await?; + let request = encode_control_message(&message)?; + send.write_all(&request).await?; + send.finish() + .map_err(|error| anyhow!("failed to finish request stream: {error}"))?; + + let response = recv.read_to_end(MAX_CONTROL_FRAME_LEN).await?; + + Ok(decode_control_frame(&response)?) + } }