diff --git a/Cargo.lock b/Cargo.lock index 66cdfb4..28daab1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -473,6 +473,7 @@ name = "lanparty-relay" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "clap", "lanparty-ctrl", "lanparty-obs", diff --git a/README.md b/README.md index b92936c..28a0c58 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Public relay binary and relay-owned room state: - room admission for clients and gateways - one gateway per room, duplicate client MAC rejection, and room limits - stable effective room MTU chosen before Ethernet datagrams flow -- Ethernet datagram forwarding decisions with no ingress reflection +- live Ethernet datagram forwarding with no ingress reflection - peer leave cleanup for room membership and MAC indexes ## Build @@ -63,7 +63,7 @@ cargo run -p lanparty-relay -- --listen 443/udp `--listen` accepts either a socket address or a UDP port shorthand such as `443/udp`. The relay binds a QUIC endpoint, accepts a control-stream `hello`, -and replies with `welcome` or `reject`. Ethernet datagram forwarding is still -implemented as relay-owned decisions but not yet wired to live QUIC datagrams. -It currently uses a generated self-signed development certificate; production -certificate and client trust handling remain future work. +replies with `welcome` or `reject`, and forwards live Ethernet QUIC datagrams +between accepted peers in the same room. It currently uses a generated +self-signed development certificate; production certificate and client trust +handling remain future work. diff --git a/crates/lanparty-relay/Cargo.toml b/crates/lanparty-relay/Cargo.toml index 3e92a88..91871e9 100644 --- a/crates/lanparty-relay/Cargo.toml +++ b/crates/lanparty-relay/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] anyhow.workspace = true +bytes.workspace = true clap.workspace = true lanparty-ctrl = { path = "../lanparty-ctrl" } lanparty-obs = { path = "../lanparty-obs" } diff --git a/crates/lanparty-relay/src/server.rs b/crates/lanparty-relay/src/server.rs index 4352606..254311a 100644 --- a/crates/lanparty-relay/src/server.rs +++ b/crates/lanparty-relay/src/server.rs @@ -1,14 +1,17 @@ use std::{net::SocketAddr, sync::Arc}; use anyhow::{Context, Result, anyhow}; +use bytes::Bytes; use lanparty_ctrl::{ CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome, decode_control_frame, encode_control_message, }; +use lanparty_proto::{FrameType, decode_datagram, encode_datagram}; use quinn::crypto::rustls::QuicServerConfig; use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; +use std::collections::HashMap; use tokio::sync::Mutex; use crate::{RelayConfig, RoomRegistry}; @@ -21,6 +24,7 @@ const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MES pub struct RelayServer { endpoint: Endpoint, rooms: Arc>, + sessions: Arc>>, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -32,6 +36,28 @@ struct AcceptedPeer { max_datagram_size: usize, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct PeerKey { + room: RoomCode, + peer_id: u32, +} + +impl PeerKey { + fn new(room: RoomCode, peer_id: u32) -> Self { + Self { room, peer_id } + } + + fn from_accepted(accepted: &AcceptedPeer) -> Self { + Self::new(accepted.room.clone(), accepted.peer.peer_id()) + } +} + +#[derive(Debug, Clone)] +struct PeerSession { + connection: quinn::Connection, + max_datagram_size: usize, +} + impl RelayServer { pub fn bind(config: &RelayConfig) -> Result { let server_config = development_server_config()?; @@ -45,6 +71,7 @@ impl RelayServer { Self { endpoint, rooms: Arc::new(Mutex::new(RoomRegistry::new(max_clients_per_room))), + sessions: Arc::new(Mutex::new(HashMap::new())), } } @@ -57,6 +84,7 @@ impl RelayServer { pub async fn run_until_shutdown(self) -> Result<()> { let endpoint = self.endpoint.clone(); let rooms = Arc::clone(&self.rooms); + let sessions = Arc::clone(&self.sessions); loop { tokio::select! { @@ -70,9 +98,10 @@ impl RelayServer { return Ok(()); }; let rooms = Arc::clone(&rooms); + let sessions = Arc::clone(&sessions); tokio::spawn(async move { - if let Err(error) = handle_incoming_connection(rooms, incoming).await { + if let Err(error) = handle_incoming_connection(rooms, sessions, incoming).await { eprintln!("relay connection failed: {error:#}"); } }); @@ -87,7 +116,33 @@ impl RelayServer { return Ok(None); }; - handle_incoming_connection(Arc::clone(&self.rooms), incoming).await + handle_incoming_connection( + Arc::clone(&self.rooms), + Arc::clone(&self.sessions), + incoming, + ) + .await + } + + #[cfg(test)] + async fn accept_many_for_test( + &self, + count: usize, + ) -> Result>>>> { + let mut handles = Vec::with_capacity(count); + + for _ in 0..count { + let Some(incoming) = self.endpoint.accept().await else { + return Err(anyhow!("relay endpoint stopped while accepting test peer")); + }; + let rooms = Arc::clone(&self.rooms); + let sessions = Arc::clone(&self.sessions); + handles.push(tokio::spawn(async move { + handle_incoming_connection(rooms, sessions, incoming).await + })); + } + + Ok(handles) } pub async fn shutdown(self, reason: &str) { @@ -98,13 +153,14 @@ impl RelayServer { async fn handle_incoming_connection( rooms: Arc>, + sessions: Arc>>, incoming: Incoming, ) -> Result> { let remote_addr = incoming.remote_address(); let connection = incoming .await .with_context(|| format!("failed to establish QUIC connection from {remote_addr}"))?; - let Some(accepted) = accept_control_handshake(&rooms, &connection).await? else { + let Some(accepted) = accept_control_handshake(&rooms, &sessions, &connection).await? else { return Ok(None); }; @@ -118,8 +174,8 @@ async fn handle_incoming_connection( accepted.welcome.effective_tap_mtu() ); - let close_reason = connection.closed().await; - leave_peer(&rooms, &accepted.room, accepted.peer.peer_id()).await?; + let close_reason = run_peer_datagrams(&rooms, &sessions, &accepted, &connection).await; + leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?; println!( "peer {} left room {}: {}", accepted.peer.peer_id(), @@ -132,6 +188,7 @@ async fn handle_incoming_connection( async fn accept_control_handshake( rooms: &Arc>, + sessions: &Arc>>, connection: &quinn::Connection, ) -> Result> { let (mut send, mut recv) = connection @@ -144,10 +201,13 @@ async fn accept_control_handshake( .context("failed to read relay control hello")?; let (accepted, response) = build_handshake_response(rooms, connection, frame.as_slice()).await; + if let Some(accepted) = &accepted { + register_peer(sessions, accepted, connection.clone()).await; + } if let Err(error) = send_control_message(&mut send, &response).await { if let Some(accepted) = &accepted { - leave_peer(rooms, &accepted.room, accepted.peer.peer_id()).await?; + leave_peer(rooms, sessions, &accepted.room, accepted.peer.peer_id()).await?; } return Err(error); @@ -156,6 +216,113 @@ async fn accept_control_handshake( Ok(accepted) } +async fn register_peer( + sessions: &Arc>>, + accepted: &AcceptedPeer, + connection: quinn::Connection, +) { + sessions.lock().await.insert( + PeerKey::from_accepted(accepted), + PeerSession { + connection, + max_datagram_size: accepted.max_datagram_size, + }, + ); +} + +async fn run_peer_datagrams( + rooms: &Arc>, + sessions: &Arc>>, + accepted: &AcceptedPeer, + connection: &quinn::Connection, +) -> quinn::ConnectionError { + loop { + match connection.read_datagram().await { + Ok(datagram) => { + if let Err(error) = forward_peer_datagram(rooms, sessions, accepted, datagram).await + { + eprintln!( + "failed to forward datagram from peer {} in room {}: {error:#}", + accepted.peer.peer_id(), + accepted.room + ); + } + } + Err(error) => return error, + } + } +} + +async fn forward_peer_datagram( + rooms: &Arc>, + sessions: &Arc>>, + accepted: &AcceptedPeer, + datagram: Bytes, +) -> Result<()> { + let Ok(packet) = decode_datagram(&datagram) else { + return Ok(()); + }; + let header = packet.header(); + + if header.frame_type() != FrameType::Ethernet + || header.room_id() != accepted.welcome.room_id() + || header.peer_id() != accepted.peer.peer_id() + { + return Ok(()); + } + + let decision = rooms.lock().await.forward_ethernet( + &accepted.room, + accepted.peer.peer_id(), + packet.payload(), + )?; + let target_peer_ids = decision.targets().to_vec(); + if target_peer_ids.is_empty() { + return Ok(()); + } + + let outgoing = encode_datagram( + FrameType::Ethernet, + accepted.welcome.room_id(), + accepted.peer.peer_id(), + header.flags(), + packet.payload(), + )?; + let target_sessions = collect_target_sessions(sessions, &accepted.room, &target_peer_ids).await; + + for target in target_sessions { + if outgoing.len() > target.max_datagram_size { + continue; + } + + if let Err(error) = target + .connection + .send_datagram(Bytes::from(outgoing.clone())) + { + eprintln!( + "failed to send datagram from peer {} in room {}: {error}", + accepted.peer.peer_id(), + accepted.room + ); + } + } + + Ok(()) +} + +async fn collect_target_sessions( + sessions: &Arc>>, + room: &RoomCode, + target_peer_ids: &[u32], +) -> Vec { + let sessions = sessions.lock().await; + + target_peer_ids + .iter() + .filter_map(|peer_id| sessions.get(&PeerKey::new(room.clone(), *peer_id)).cloned()) + .collect() +} + async fn build_handshake_response( rooms: &Arc>, connection: &quinn::Connection, @@ -255,7 +422,16 @@ async fn send_control_message(send: &mut SendStream, message: &ControlMessage) - Ok(()) } -async fn leave_peer(rooms: &Arc>, room: &RoomCode, peer_id: u32) -> Result<()> { +async fn leave_peer( + rooms: &Arc>, + sessions: &Arc>>, + room: &RoomCode, + peer_id: u32, +) -> Result<()> { + sessions + .lock() + .await + .remove(&PeerKey::new(room.clone(), peer_id)); rooms .lock() .await @@ -305,8 +481,9 @@ mod tests { time::Duration, }; + use bytes::Bytes; use lanparty_ctrl::{RoomCode, decode_control_frame, encode_control_message}; - use lanparty_proto::MacAddr; + use lanparty_proto::{FrameType, MacAddr, decode_datagram, encode_datagram}; use quinn::{ClientConfig, crypto::rustls::QuicClientConfig}; use crate::{DEFAULT_MAX_CLIENTS_PER_ROOM, ListenEndpoint}; @@ -382,6 +559,83 @@ mod tests { assert_eq!(rooms.lock().await.room_count(), 0); } + #[tokio::test] + async fn forwards_ethernet_datagrams_between_joined_peers() { + let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM); + let rooms = Arc::clone(&server.rooms); + let sessions = Arc::clone(&server.sessions); + let server_addr = server.local_addr().unwrap(); + let server_task = tokio::spawn(async move { + let handles = server.accept_many_for_test(2).await.unwrap(); + let mut accepted = Vec::with_capacity(handles.len()); + + for handle in handles { + accepted.push(handle.await.unwrap().unwrap().unwrap()); + } + + server.shutdown("test complete").await; + accepted + }); + + let first_endpoint = client_endpoint(certificate.clone()).unwrap(); + let first_connection = first_endpoint + .connect(server_addr, "lanparty-relay.local") + .unwrap() + .await + .unwrap(); + let second_endpoint = client_endpoint(certificate).unwrap(); + let second_connection = second_endpoint + .connect(server_addr, "lanparty-relay.local") + .unwrap() + .await + .unwrap(); + + let first_mac = client_mac(1); + let second_mac = client_mac(2); + let first_welcome = welcome_for_client(&first_connection, first_mac).await; + let second_welcome = welcome_for_client(&second_connection, second_mac).await; + + let ethernet = ethernet_frame(second_mac, first_mac); + let datagram = encode_datagram( + FrameType::Ethernet, + first_welcome.room_id(), + first_welcome.peer_id(), + 0, + ðernet, + ) + .unwrap(); + first_connection + .send_datagram(Bytes::from(datagram)) + .unwrap(); + + let received = + tokio::time::timeout(Duration::from_secs(5), second_connection.read_datagram()) + .await + .unwrap() + .unwrap(); + let packet = decode_datagram(&received).unwrap(); + let header = packet.header(); + + assert_eq!(header.frame_type(), FrameType::Ethernet); + assert_eq!(header.room_id(), first_welcome.room_id()); + assert_eq!(header.peer_id(), first_welcome.peer_id()); + assert_eq!(packet.payload(), ethernet.as_slice()); + assert_eq!(first_welcome.room_id(), second_welcome.room_id()); + + first_connection.close(0_u32.into(), b"test complete"); + second_connection.close(0_u32.into(), b"test complete"); + first_endpoint.wait_idle().await; + second_endpoint.wait_idle().await; + + let accepted = tokio::time::timeout(Duration::from_secs(5), server_task) + .await + .unwrap() + .unwrap(); + assert_eq!(accepted.len(), 2); + assert_eq!(rooms.lock().await.room_count(), 0); + assert!(sessions.lock().await.is_empty()); + } + fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) { let (server_config, certificate) = development_server_config_with_certificate().unwrap(); let endpoint = Endpoint::server( @@ -430,4 +684,30 @@ mod tests { Ok(decode_control_frame(&response)?) } + + async fn welcome_for_client(connection: &quinn::Connection, mac: MacAddr) -> ServerWelcome { + let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap(); + let response = request_control_message(connection, ControlMessage::Hello(hello)) + .await + .unwrap(); + + let ControlMessage::Welcome(welcome) = response else { + panic!("expected welcome response"); + }; + + welcome + } + + fn client_mac(last: u8) -> MacAddr { + MacAddr::new([0x02, 0, 0, 0, 0, last]) + } + + fn ethernet_frame(destination: MacAddr, source: MacAddr) -> Vec { + let mut frame = Vec::new(); + frame.extend_from_slice(&destination.octets()); + frame.extend_from_slice(&source.octets()); + frame.extend_from_slice(&0x0800_u16.to_be_bytes()); + frame.extend_from_slice(b"payload"); + frame + } }