feat(relay): accept control handshakes

The relay now keeps a shared room registry behind the QUIC endpoint and
runs an accept loop instead of only binding the socket. Each accepted
connection must open its first bidirectional control stream with a hello
frame; the relay joins the room registry and replies with welcome or reject.

Admission clamps the hello datagram budget to Quinn's negotiated peer
datagram size before choosing the effective room MTU, so room state is based
on what the connection can actually carry. Accepted peers remain present
until the QUIC connection closes, then the relay removes them through the
existing leave cleanup path.

The development self-signed certificate helper now exposes the certificate
to tests so a loopback Quinn client can trust the relay and exercise the real
stream codec path.

Test Plan:
- cargo fmt --check
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings

Refs: PLAN.md relay QUIC control-stream startup flow
This commit is contained in:
2026-05-21 17:51:40 +02:00
parent 81ad7abe84
commit b8ae95a911
5 changed files with 361 additions and 23 deletions
+1 -1
View File
@@ -24,5 +24,5 @@ rustls = { version = "0.23", default-features = false, features = ["ring", "std"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
thiserror = "2" thiserror = "2"
tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal"] } tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] }
tracing = "0.1" tracing = "0.1"
+6 -4
View File
@@ -42,6 +42,7 @@ Shared diagnostics and structured logging vocabulary:
Public relay binary and relay-owned room state: Public relay binary and relay-owned room state:
- QUIC endpoint binding and first-stream hello/welcome admission
- room admission for clients and gateways - room admission for clients and gateways
- one gateway per room, duplicate client MAC rejection, and room limits - one gateway per room, duplicate client MAC rejection, and room limits
- stable effective room MTU chosen before Ethernet datagrams flow - stable effective room MTU chosen before Ethernet datagrams flow
@@ -61,7 +62,8 @@ cargo run -p lanparty-relay -- --listen 443/udp
``` ```
`--listen` accepts either a socket address or a UDP port shorthand such as `--listen` accepts either a socket address or a UDP port shorthand such as
`443/udp`. The relay binds a QUIC endpoint and waits for shutdown, but `443/udp`. The relay binds a QUIC endpoint, accepts a control-stream `hello`,
connection handling is not wired yet. It currently uses a generated self-signed and replies with `welcome` or `reject`. Ethernet datagram forwarding is still
development certificate; production certificate and client trust handling remain implemented as relay-owned decisions but not yet wired to live QUIC datagrams.
future work. It currently uses a generated self-signed development certificate; production
certificate and client trust handling remain future work.
+3 -2
View File
@@ -1,7 +1,8 @@
//! Relay room state and admission control. //! Relay room state and admission control.
//! //!
//! QUIC accept loops will sit above this crate layer. Keeping room admission //! The QUIC server loop admits peers through this room registry, while the
//! separate makes the important relay invariants testable without sockets. //! registry itself stays socket-free so the relay invariants remain directly
//! testable.
mod config; mod config;
mod server; mod server;
+1 -2
View File
@@ -15,6 +15,5 @@ async fn run(config: RelayConfig) -> anyhow::Result<()> {
); );
let server = RelayServer::bind(&config)?; let server = RelayServer::bind(&config)?;
println!("lanparty-relay listening on {}", server.local_addr()?); println!("lanparty-relay listening on {}", server.local_addr()?);
println!("connection handling is not wired yet; press Ctrl-C to stop"); server.run_until_shutdown().await
server.wait_for_shutdown().await
} }
+350 -14
View File
@@ -1,17 +1,35 @@
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use anyhow::{Context, Result}; use anyhow::{Context, Result, anyhow};
use quinn::{Endpoint, ServerConfig, TransportConfig, crypto::rustls::QuicServerConfig}; use lanparty_ctrl::{
use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer}; CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, EndpointHello,
MAX_CONTROL_MESSAGE_LEN, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome,
decode_control_frame, encode_control_message,
};
use quinn::crypto::rustls::QuicServerConfig;
use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use tokio::sync::Mutex;
use crate::RelayConfig; use crate::{RelayConfig, RoomRegistry};
const RELAY_ALPN: &[u8] = b"lanparty-l2/1"; const RELAY_ALPN: &[u8] = b"lanparty-l2/1";
const DATAGRAM_BUFFER_BYTES: usize = 4 * 1024 * 1024; const DATAGRAM_BUFFER_BYTES: usize = 4 * 1024 * 1024;
const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MESSAGE_LEN;
#[derive(Debug)] #[derive(Debug)]
pub struct RelayServer { pub struct RelayServer {
endpoint: Endpoint, endpoint: Endpoint,
rooms: Arc<Mutex<RoomRegistry>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct AcceptedPeer {
room: RoomCode,
welcome: ServerWelcome,
peer: PeerInfo,
remote_addr: SocketAddr,
max_datagram_size: usize,
} }
impl RelayServer { impl RelayServer {
@@ -20,7 +38,14 @@ impl RelayServer {
let endpoint = Endpoint::server(server_config, config.listen().socket_addr()) let endpoint = Endpoint::server(server_config, config.listen().socket_addr())
.context("failed to bind QUIC relay endpoint")?; .context("failed to bind QUIC relay endpoint")?;
Ok(Self { endpoint }) Ok(Self::from_endpoint(endpoint, config.max_clients_per_room()))
}
fn from_endpoint(endpoint: Endpoint, max_clients_per_room: usize) -> Self {
Self {
endpoint,
rooms: Arc::new(Mutex::new(RoomRegistry::new(max_clients_per_room))),
}
} }
pub fn local_addr(&self) -> Result<SocketAddr> { pub fn local_addr(&self) -> Result<SocketAddr> {
@@ -29,13 +54,40 @@ impl RelayServer {
.context("failed to read relay local address") .context("failed to read relay local address")
} }
pub async fn wait_for_shutdown(self) -> Result<()> { pub async fn run_until_shutdown(self) -> Result<()> {
tokio::signal::ctrl_c() let endpoint = self.endpoint.clone();
.await let rooms = Arc::clone(&self.rooms);
.context("failed to wait for Ctrl-C")?;
self.shutdown("relay shutting down").await;
Ok(()) loop {
tokio::select! {
shutdown = tokio::signal::ctrl_c() => {
shutdown.context("failed to wait for Ctrl-C")?;
self.shutdown("relay shutting down").await;
return Ok(());
}
incoming = endpoint.accept() => {
let Some(incoming) = incoming else {
return Ok(());
};
let rooms = Arc::clone(&rooms);
tokio::spawn(async move {
if let Err(error) = handle_incoming_connection(rooms, incoming).await {
eprintln!("relay connection failed: {error:#}");
}
});
}
}
}
}
#[cfg(test)]
async fn accept_once(&self) -> Result<Option<AcceptedPeer>> {
let Some(incoming) = self.endpoint.accept().await else {
return Ok(None);
};
handle_incoming_connection(Arc::clone(&self.rooms), incoming).await
} }
pub async fn shutdown(self, reason: &str) { pub async fn shutdown(self, reason: &str) {
@@ -44,10 +96,186 @@ impl RelayServer {
} }
} }
async fn handle_incoming_connection(
rooms: Arc<Mutex<RoomRegistry>>,
incoming: Incoming,
) -> Result<Option<AcceptedPeer>> {
let remote_addr = incoming.remote_address();
let connection = incoming
.await
.with_context(|| format!("failed to establish QUIC connection from {remote_addr}"))?;
let Some(accepted) = accept_control_handshake(&rooms, &connection).await? else {
return Ok(None);
};
println!(
"accepted {:?} peer {} in room {} from {} with peer datagram budget {} and TAP MTU {}",
accepted.peer.role(),
accepted.peer.peer_id(),
accepted.room,
accepted.remote_addr,
accepted.max_datagram_size,
accepted.welcome.effective_tap_mtu()
);
let close_reason = connection.closed().await;
leave_peer(&rooms, &accepted.room, accepted.peer.peer_id()).await?;
println!(
"peer {} left room {}: {}",
accepted.peer.peer_id(),
accepted.room,
close_reason
);
Ok(Some(accepted))
}
async fn accept_control_handshake(
rooms: &Arc<Mutex<RoomRegistry>>,
connection: &quinn::Connection,
) -> Result<Option<AcceptedPeer>> {
let (mut send, mut recv) = connection
.accept_bi()
.await
.context("failed to accept relay control stream")?;
let frame = recv
.read_to_end(MAX_CONTROL_FRAME_LEN)
.await
.context("failed to read relay control hello")?;
let (accepted, response) = build_handshake_response(rooms, connection, frame.as_slice()).await;
if let Err(error) = send_control_message(&mut send, &response).await {
if let Some(accepted) = &accepted {
leave_peer(rooms, &accepted.room, accepted.peer.peer_id()).await?;
}
return Err(error);
}
Ok(accepted)
}
async fn build_handshake_response(
rooms: &Arc<Mutex<RoomRegistry>>,
connection: &quinn::Connection,
frame: &[u8],
) -> (Option<AcceptedPeer>, ControlMessage) {
let Some(connection_max_datagram_size) = connection.max_datagram_size() else {
return reject(Reject::new(
RejectReason::MtuTooSmall,
"QUIC DATAGRAM support was not negotiated",
));
};
let message = match decode_control_frame(frame) {
Ok(message) => message,
Err(error) => return reject(reject_codec_error(error)),
};
let ControlMessage::Hello(hello) = message else {
return reject(Reject::new(
RejectReason::MalformedHello,
"first relay control frame must be hello",
));
};
let room = hello.room().clone();
let hello = match limit_hello_to_connection(hello, connection_max_datagram_size) {
Ok(hello) => hello,
Err(reject) => return (None, ControlMessage::Reject(reject)),
};
let join = rooms.lock().await.join(hello);
match join {
Ok(join) => {
let accepted = AcceptedPeer {
room,
welcome: join.welcome().clone(),
peer: join.peer().clone(),
remote_addr: connection.remote_address(),
max_datagram_size: connection_max_datagram_size,
};
(
Some(accepted),
ControlMessage::Welcome(join.welcome().clone()),
)
}
Err(reject) => (None, ControlMessage::Reject(reject)),
}
}
fn limit_hello_to_connection(
hello: EndpointHello,
connection_max_datagram_size: usize,
) -> Result<EndpointHello, Reject> {
let max_datagram_size = usize::from(hello.max_datagram_size())
.min(connection_max_datagram_size)
.min(usize::from(u16::MAX)) as u16;
match hello.role() {
Role::Client => EndpointHello::client(
hello.room().clone(),
hello
.announced_mac()
.expect("validated client hello has MAC address"),
max_datagram_size,
),
Role::Gateway => EndpointHello::gateway(hello.room().clone(), max_datagram_size),
}
.map_err(crate::reject_control_error)
}
fn reject(reject: Reject) -> (Option<AcceptedPeer>, ControlMessage) {
(None, ControlMessage::Reject(reject))
}
fn reject_codec_error(error: ControlCodecError) -> Reject {
match error {
ControlCodecError::InvalidMessage(error) => crate::reject_control_error(error),
ControlCodecError::FrameTooShort { .. }
| ControlCodecError::MessageTooLarge { .. }
| ControlCodecError::IncompletePayload { .. }
| ControlCodecError::TrailingBytes { .. }
| ControlCodecError::Json(_) => {
Reject::new(RejectReason::MalformedHello, error.to_string())
}
}
}
async fn send_control_message(send: &mut SendStream, message: &ControlMessage) -> Result<()> {
let response = encode_control_message(message).context("failed to encode control response")?;
send.write_all(&response)
.await
.context("failed to write control response")?;
send.finish()
.map_err(|error| anyhow!("failed to finish control response stream: {error}"))?;
Ok(())
}
async fn leave_peer(rooms: &Arc<Mutex<RoomRegistry>>, room: &RoomCode, peer_id: u32) -> Result<()> {
rooms
.lock()
.await
.leave(room, peer_id)
.with_context(|| format!("failed to remove peer {peer_id} from room {room}"))?;
Ok(())
}
fn development_server_config() -> Result<ServerConfig> { fn development_server_config() -> Result<ServerConfig> {
let (server_config, _) = development_server_config_with_certificate()?;
Ok(server_config)
}
fn development_server_config_with_certificate() -> Result<(ServerConfig, CertificateDer<'static>)> {
let certified_key = rcgen::generate_simple_self_signed(vec!["lanparty-relay.local".into()]) let certified_key = rcgen::generate_simple_self_signed(vec!["lanparty-relay.local".into()])
.context("failed to generate development relay certificate")?; .context("failed to generate development relay certificate")?;
let cert_chain = vec![certified_key.cert.der().clone()]; let certificate = certified_key.cert.der().clone();
let cert_chain = vec![certificate.clone()];
let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from( let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(
certified_key.signing_key.serialize_der(), certified_key.signing_key.serialize_der(),
)); ));
@@ -67,12 +295,19 @@ fn development_server_config() -> Result<ServerConfig> {
transport.datagram_send_buffer_size(DATAGRAM_BUFFER_BYTES); transport.datagram_send_buffer_size(DATAGRAM_BUFFER_BYTES);
server_config.transport_config(Arc::new(transport)); server_config.transport_config(Arc::new(transport));
Ok(server_config) Ok((server_config, certificate))
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use lanparty_ctrl::{RoomCode, decode_control_frame, encode_control_message};
use lanparty_proto::MacAddr;
use quinn::{ClientConfig, crypto::rustls::QuicClientConfig};
use crate::{DEFAULT_MAX_CLIENTS_PER_ROOM, ListenEndpoint}; use crate::{DEFAULT_MAX_CLIENTS_PER_ROOM, ListenEndpoint};
@@ -94,4 +329,105 @@ mod tests {
server.shutdown("test complete").await; server.shutdown("test complete").await;
} }
#[tokio::test]
async fn accepts_client_hello_and_replies_welcome() {
let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM);
let rooms = Arc::clone(&server.rooms);
let server_addr = server.local_addr().unwrap();
let server_task = tokio::spawn(async move {
let accepted = server
.accept_once()
.await
.unwrap()
.expect("connection should be accepted");
server.shutdown("test complete").await;
accepted
});
let client = client_endpoint(certificate).unwrap();
let connection = client
.connect(server_addr, "lanparty-relay.local")
.unwrap()
.await
.unwrap();
let hello = EndpointHello::client(
RoomCode::new("TESTROOM").unwrap(),
MacAddr::new([0x02, 0, 0, 0, 0, 1]),
1400,
)
.unwrap();
let response = request_control_message(&connection, ControlMessage::Hello(hello))
.await
.unwrap();
let ControlMessage::Welcome(welcome) = response else {
panic!("expected welcome response");
};
assert_eq!(welcome.room_id(), 1);
assert_eq!(welcome.peer_id(), 1);
assert!(welcome.effective_tap_mtu() <= 1400);
connection.close(0_u32.into(), b"test complete");
client.wait_idle().await;
let accepted = tokio::time::timeout(Duration::from_secs(5), server_task)
.await
.unwrap()
.unwrap();
assert_eq!(accepted.room.as_str(), "TESTROOM");
assert_eq!(accepted.peer.peer_id(), 1);
assert_eq!(accepted.welcome, welcome);
assert_eq!(rooms.lock().await.room_count(), 0);
}
fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) {
let (server_config, certificate) = development_server_config_with_certificate().unwrap();
let endpoint = Endpoint::server(
server_config,
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
)
.unwrap();
(
RelayServer::from_endpoint(endpoint, max_clients_per_room),
certificate,
)
}
fn client_endpoint(certificate: CertificateDer<'static>) -> Result<Endpoint> {
let mut roots = rustls::RootCertStore::empty();
roots
.add(certificate)
.context("failed to trust relay test certificate")?;
let mut client_crypto = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
client_crypto.alpn_protocols = vec![RELAY_ALPN.to_vec()];
let client_config = ClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto).context("failed to build client config")?,
));
let mut endpoint = Endpoint::client(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
.context("failed to bind client endpoint")?;
endpoint.set_default_client_config(client_config);
Ok(endpoint)
}
async fn request_control_message(
connection: &quinn::Connection,
message: ControlMessage,
) -> Result<ControlMessage> {
let (mut send, mut recv) = connection.open_bi().await?;
let request = encode_control_message(&message)?;
send.write_all(&request).await?;
send.finish()
.map_err(|error| anyhow!("failed to finish request stream: {error}"))?;
let response = recv.read_to_end(MAX_CONTROL_FRAME_LEN).await?;
Ok(decode_control_frame(&response)?)
}
} }