From 756523927a5ad51b35d043f65b23772bffb57d3c Mon Sep 17 00:00:00 2001 From: ddidderr Date: Thu, 21 May 2026 17:55:58 +0200 Subject: [PATCH] feat(relay): forward live Ethernet datagrams The relay now keeps active peer sessions alongside room admission state. After a successful hello/welcome handshake, the connection enters a datagram loop and stays registered until the QUIC connection closes. Incoming datagrams are only considered for forwarding when their overlay room id, peer id, and Ethernet frame type match the peer assigned by the relay. The relay then reuses the existing room forwarding decision logic, clones the matching live target sessions, and sends a relay-stamped Ethernet datagram to each connected target that can carry the frame. This keeps spoofable wire metadata out of the trust boundary: clients can put whatever they want in an overlay header, but the relay forwards using the room and peer identity established during the control handshake. Test Plan: - cargo fmt --check - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings Refs: PLAN.md QUIC DATAGRAM Ethernet forwarding path --- Cargo.lock | 1 + README.md | 10 +- crates/lanparty-relay/Cargo.toml | 1 + crates/lanparty-relay/src/server.rs | 296 +++++++++++++++++++++++++++- 4 files changed, 295 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66cdfb4..28daab1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,6 +473,7 @@ name = "lanparty-relay" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "clap", "lanparty-ctrl", "lanparty-obs", diff --git a/README.md b/README.md index b92936c..28a0c58 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Public relay binary and relay-owned room state: - room admission for clients and gateways - one gateway per room, duplicate client MAC rejection, and room limits - stable effective room MTU chosen before Ethernet datagrams flow -- Ethernet datagram forwarding decisions with no ingress reflection +- live Ethernet datagram forwarding with no ingress reflection - peer leave cleanup for room membership and MAC indexes ## Build @@ -63,7 +63,7 @@ cargo run -p lanparty-relay -- --listen 443/udp `--listen` accepts either a socket address or a UDP port shorthand such as `443/udp`. The relay binds a QUIC endpoint, accepts a control-stream `hello`, -and replies with `welcome` or `reject`. Ethernet datagram forwarding is still -implemented as relay-owned decisions but not yet wired to live QUIC datagrams. -It currently uses a generated self-signed development certificate; production -certificate and client trust handling remain future work. +replies with `welcome` or `reject`, and forwards live Ethernet QUIC datagrams +between accepted peers in the same room. It currently uses a generated +self-signed development certificate; production certificate and client trust +handling remain future work. diff --git a/crates/lanparty-relay/Cargo.toml b/crates/lanparty-relay/Cargo.toml index 3e92a88..91871e9 100644 --- a/crates/lanparty-relay/Cargo.toml +++ b/crates/lanparty-relay/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] anyhow.workspace = true +bytes.workspace = true clap.workspace = true lanparty-ctrl = { path = "../lanparty-ctrl" } lanparty-obs = { path = "../lanparty-obs" } diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index 4352606..254311a 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -1,14 +1,17 @@ use std::{net::SocketAddr, sync::Arc}; use anyhow::{Context, Result, anyhow}; +use bytes::Bytes; use lanparty_ctrl::{ CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome, decode_control_frame, encode_control_message, }; +use lanparty_proto::{FrameType, decode_datagram, encode_datagram}; use quinn::crypto::rustls::QuicServerConfig; use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; +use std::collections::HashMap; use tokio::sync::Mutex; use crate::{RelayConfig, RoomRegistry}; @@ -21,6 +24,7 @@ const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MES pub struct RelayServer { endpoint: Endpoint, rooms: Arc>, + sessions: Arc>>, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -32,6 +36,28 @@ struct AcceptedPeer { max_datagram_size: usize, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct PeerKey { + room: RoomCode, + peer_id: u32, +} + +impl PeerKey { + fn new(room: RoomCode, peer_id: u32) -> Self { + Self { room, peer_id } + } + + fn from_accepted(accepted: &AcceptedPeer) -> Self { + Self::new(accepted.room.clone(), accepted.peer.peer_id()) + } +} + +#[derive(Debug, Clone)] +struct PeerSession { + connection: quinn::Connection, + max_datagram_size: usize, +} + impl RelayServer { pub fn bind(config: &RelayConfig) -> Result { let server_config = development_server_config()?; @@ -45,6 +71,7 @@ impl RelayServer { Self { endpoint, rooms: Arc::new(Mutex::new(RoomRegistry::new(max_clients_per_room))), + sessions: Arc::new(Mutex::new(HashMap::new())), } } @@ -57,6 +84,7 @@ impl RelayServer { pub async fn run_until_shutdown(self) -> Result<()> { let endpoint = self.endpoint.clone(); let rooms = Arc::clone(&self.rooms); + let sessions = Arc::clone(&self.sessions); loop { tokio::select! { @@ -70,9 +98,10 @@ impl RelayServer { return Ok(()); }; let rooms = Arc::clone(&rooms); + let sessions = Arc::clone(&sessions); tokio::spawn(async move { - if let Err(error) = handle_incoming_connection(rooms, incoming).await { + if let Err(error) = handle_incoming_connection(rooms, sessions, incoming).await { eprintln!("relay connection failed: {error:#}"); } }); @@ -87,7 +116,33 @@ impl RelayServer { return Ok(None); }; - handle_incoming_connection(Arc::clone(&self.rooms), incoming).await + handle_incoming_connection( + Arc::clone(&self.rooms), + Arc::clone(&self.sessions), + incoming, + ) + .await + } + + #[cfg(test)] + async fn accept_many_for_test( + &self, + count: usize, + ) -> Result>>>> { + let mut handles = Vec::with_capacity(count); + + for _ in 0..count { + let Some(incoming) = self.endpoint.accept().await else { + return Err(anyhow!("relay endpoint stopped while accepting test peer")); + }; + let rooms = Arc::clone(&self.rooms); + let sessions = Arc::clone(&self.sessions); + handles.push(tokio::spawn(async move { + handle_incoming_connection(rooms, sessions, incoming).await + })); + } + + Ok(handles) } pub async fn shutdown(self, reason: &str) { @@ -98,13 +153,14 @@ impl RelayServer { async fn handle_incoming_connection( rooms: Arc>, + sessions: Arc>>, incoming: Incoming, ) -> Result> { let remote_addr = incoming.remote_address(); let connection = incoming .await .with_context(|| format!("failed to establish QUIC connection from {remote_addr}"))?; - let Some(accepted) = accept_control_handshake(&rooms, &connection).await? else { + let Some(accepted) = accept_control_handshake(&rooms, &sessions, &connection).await? else { return Ok(None); }; @@ -118,8 +174,8 @@ async fn handle_incoming_connection( accepted.welcome.effective_tap_mtu() ); - let close_reason = connection.closed().await; - leave_peer(&rooms, &accepted.room, accepted.peer.peer_id()).await?; + let close_reason = run_peer_datagrams(&rooms, &sessions, &accepted, &connection).await; + leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?; println!( "peer {} left room {}: {}", accepted.peer.peer_id(), @@ -132,6 +188,7 @@ async fn handle_incoming_connection( async fn accept_control_handshake( rooms: &Arc>, + sessions: &Arc>>, connection: &quinn::Connection, ) -> Result> { let (mut send, mut recv) = connection @@ -144,10 +201,13 @@ async fn accept_control_handshake( .context("failed to read relay control hello")?; let (accepted, response) = build_handshake_response(rooms, connection, frame.as_slice()).await; + if let Some(accepted) = &accepted { + register_peer(sessions, accepted, connection.clone()).await; + } if let Err(error) = send_control_message(&mut send, &response).await { if let Some(accepted) = &accepted { - leave_peer(rooms, &accepted.room, accepted.peer.peer_id()).await?; + leave_peer(rooms, sessions, &accepted.room, accepted.peer.peer_id()).await?; } return Err(error); @@ -156,6 +216,113 @@ async fn accept_control_handshake( Ok(accepted) } +async fn register_peer( + sessions: &Arc>>, + accepted: &AcceptedPeer, + connection: quinn::Connection, +) { + sessions.lock().await.insert( + PeerKey::from_accepted(accepted), + PeerSession { + connection, + max_datagram_size: accepted.max_datagram_size, + }, + ); +} + +async fn run_peer_datagrams( + rooms: &Arc>, + sessions: &Arc>>, + accepted: &AcceptedPeer, + connection: &quinn::Connection, +) -> quinn::ConnectionError { + loop { + match connection.read_datagram().await { + Ok(datagram) => { + if let Err(error) = forward_peer_datagram(rooms, sessions, accepted, datagram).await + { + eprintln!( + "failed to forward datagram from peer {} in room {}: {error:#}", + accepted.peer.peer_id(), + accepted.room + ); + } + } + Err(error) => return error, + } + } +} + +async fn forward_peer_datagram( + rooms: &Arc>, + sessions: &Arc>>, + accepted: &AcceptedPeer, + datagram: Bytes, +) -> Result<()> { + let Ok(packet) = decode_datagram(&datagram) else { + return Ok(()); + }; + let header = packet.header(); + + if header.frame_type() != FrameType::Ethernet + || header.room_id() != accepted.welcome.room_id() + || header.peer_id() != accepted.peer.peer_id() + { + return Ok(()); + } + + let decision = rooms.lock().await.forward_ethernet( + &accepted.room, + accepted.peer.peer_id(), + packet.payload(), + )?; + let target_peer_ids = decision.targets().to_vec(); + if target_peer_ids.is_empty() { + return Ok(()); + } + + let outgoing = encode_datagram( + FrameType::Ethernet, + accepted.welcome.room_id(), + accepted.peer.peer_id(), + header.flags(), + packet.payload(), + )?; + let target_sessions = collect_target_sessions(sessions, &accepted.room, &target_peer_ids).await; + + for target in target_sessions { + if outgoing.len() > target.max_datagram_size { + continue; + } + + if let Err(error) = target + .connection + .send_datagram(Bytes::from(outgoing.clone())) + { + eprintln!( + "failed to send datagram from peer {} in room {}: {error}", + accepted.peer.peer_id(), + accepted.room + ); + } + } + + Ok(()) +} + +async fn collect_target_sessions( + sessions: &Arc>>, + room: &RoomCode, + target_peer_ids: &[u32], +) -> Vec { + let sessions = sessions.lock().await; + + target_peer_ids + .iter() + .filter_map(|peer_id| sessions.get(&PeerKey::new(room.clone(), *peer_id)).cloned()) + .collect() +} + async fn build_handshake_response( rooms: &Arc>, connection: &quinn::Connection, @@ -255,7 +422,16 @@ async fn send_control_message(send: &mut SendStream, message: &ControlMessage) - Ok(()) } -async fn leave_peer(rooms: &Arc>, room: &RoomCode, peer_id: u32) -> Result<()> { +async fn leave_peer( + rooms: &Arc>, + sessions: &Arc>>, + room: &RoomCode, + peer_id: u32, +) -> Result<()> { + sessions + .lock() + .await + .remove(&PeerKey::new(room.clone(), peer_id)); rooms .lock() .await @@ -305,8 +481,9 @@ mod tests { time::Duration, }; + use bytes::Bytes; use lanparty_ctrl::{RoomCode, decode_control_frame, encode_control_message}; - use lanparty_proto::MacAddr; + use lanparty_proto::{FrameType, MacAddr, decode_datagram, encode_datagram}; use quinn::{ClientConfig, crypto::rustls::QuicClientConfig}; use crate::{DEFAULT_MAX_CLIENTS_PER_ROOM, ListenEndpoint}; @@ -382,6 +559,83 @@ mod tests { assert_eq!(rooms.lock().await.room_count(), 0); } + #[tokio::test] + async fn forwards_ethernet_datagrams_between_joined_peers() { + let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM); + let rooms = Arc::clone(&server.rooms); + let sessions = Arc::clone(&server.sessions); + let server_addr = server.local_addr().unwrap(); + let server_task = tokio::spawn(async move { + let handles = server.accept_many_for_test(2).await.unwrap(); + let mut accepted = Vec::with_capacity(handles.len()); + + for handle in handles { + accepted.push(handle.await.unwrap().unwrap().unwrap()); + } + + server.shutdown("test complete").await; + accepted + }); + + let first_endpoint = client_endpoint(certificate.clone()).unwrap(); + let first_connection = first_endpoint + .connect(server_addr, "lanparty-relay.local") + .unwrap() + .await + .unwrap(); + let second_endpoint = client_endpoint(certificate).unwrap(); + let second_connection = second_endpoint + .connect(server_addr, "lanparty-relay.local") + .unwrap() + .await + .unwrap(); + + let first_mac = client_mac(1); + let second_mac = client_mac(2); + let first_welcome = welcome_for_client(&first_connection, first_mac).await; + let second_welcome = welcome_for_client(&second_connection, second_mac).await; + + let ethernet = ethernet_frame(second_mac, first_mac); + let datagram = encode_datagram( + FrameType::Ethernet, + first_welcome.room_id(), + first_welcome.peer_id(), + 0, + ðernet, + ) + .unwrap(); + first_connection + .send_datagram(Bytes::from(datagram)) + .unwrap(); + + let received = + tokio::time::timeout(Duration::from_secs(5), second_connection.read_datagram()) + .await + .unwrap() + .unwrap(); + let packet = decode_datagram(&received).unwrap(); + let header = packet.header(); + + assert_eq!(header.frame_type(), FrameType::Ethernet); + assert_eq!(header.room_id(), first_welcome.room_id()); + assert_eq!(header.peer_id(), first_welcome.peer_id()); + assert_eq!(packet.payload(), ethernet.as_slice()); + assert_eq!(first_welcome.room_id(), second_welcome.room_id()); + + first_connection.close(0_u32.into(), b"test complete"); + second_connection.close(0_u32.into(), b"test complete"); + first_endpoint.wait_idle().await; + second_endpoint.wait_idle().await; + + let accepted = tokio::time::timeout(Duration::from_secs(5), server_task) + .await + .unwrap() + .unwrap(); + assert_eq!(accepted.len(), 2); + assert_eq!(rooms.lock().await.room_count(), 0); + assert!(sessions.lock().await.is_empty()); + } + fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) { let (server_config, certificate) = development_server_config_with_certificate().unwrap(); let endpoint = Endpoint::server( @@ -430,4 +684,30 @@ mod tests { Ok(decode_control_frame(&response)?) } + + async fn welcome_for_client(connection: &quinn::Connection, mac: MacAddr) -> ServerWelcome { + let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap(); + let response = request_control_message(connection, ControlMessage::Hello(hello)) + .await + .unwrap(); + + let ControlMessage::Welcome(welcome) = response else { + panic!("expected welcome response"); + }; + + welcome + } + + fn client_mac(last: u8) -> MacAddr { + MacAddr::new([0x02, 0, 0, 0, 0, last]) + } + + fn ethernet_frame(destination: MacAddr, source: MacAddr) -> Vec { + let mut frame = Vec::new(); + frame.extend_from_slice(&destination.octets()); + frame.extend_from_slice(&source.octets()); + frame.extend_from_slice(&0x0800_u16.to_be_bytes()); + frame.extend_from_slice(b"payload"); + frame + } }