feat(relay): forward live Ethernet datagrams

The relay now keeps active peer sessions alongside room admission state. After
a successful hello/welcome handshake, the connection enters a datagram loop
and stays registered until the QUIC connection closes.

Incoming datagrams are only considered for forwarding when their overlay room
id, peer id, and Ethernet frame type match the peer assigned by the relay.
The relay then reuses the existing room forwarding decision logic, clones the
matching live target sessions, and sends a relay-stamped Ethernet datagram to
each connected target that can carry the frame.

This keeps spoofable wire metadata out of the trust boundary: clients can put
whatever they want in an overlay header, but the relay forwards using the
room and peer identity established during the control handshake.

Test Plan:
- cargo fmt --check
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings

Refs: PLAN.md QUIC DATAGRAM Ethernet forwarding path
This commit is contained in:
2026-05-21 17:55:58 +02:00
parent b8ae95a911
commit 756523927a
4 changed files with 295 additions and 13 deletions
+288 -8
View File
@@ -1,14 +1,17 @@
use std::{net::SocketAddr, sync::Arc};
use anyhow::{Context, Result, anyhow};
use bytes::Bytes;
use lanparty_ctrl::{
CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, ControlMessage, EndpointHello,
MAX_CONTROL_MESSAGE_LEN, PeerInfo, Reject, RejectReason, Role, RoomCode, ServerWelcome,
decode_control_frame, encode_control_message,
};
use lanparty_proto::{FrameType, decode_datagram, encode_datagram};
use quinn::crypto::rustls::QuicServerConfig;
use quinn::{Endpoint, Incoming, SendStream, ServerConfig, TransportConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::collections::HashMap;
use tokio::sync::Mutex;
use crate::{RelayConfig, RoomRegistry};
@@ -21,6 +24,7 @@ const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MES
pub struct RelayServer {
endpoint: Endpoint,
rooms: Arc<Mutex<RoomRegistry>>,
sessions: Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -32,6 +36,28 @@ struct AcceptedPeer {
max_datagram_size: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct PeerKey {
room: RoomCode,
peer_id: u32,
}
impl PeerKey {
fn new(room: RoomCode, peer_id: u32) -> Self {
Self { room, peer_id }
}
fn from_accepted(accepted: &AcceptedPeer) -> Self {
Self::new(accepted.room.clone(), accepted.peer.peer_id())
}
}
#[derive(Debug, Clone)]
struct PeerSession {
connection: quinn::Connection,
max_datagram_size: usize,
}
impl RelayServer {
pub fn bind(config: &RelayConfig) -> Result<Self> {
let server_config = development_server_config()?;
@@ -45,6 +71,7 @@ impl RelayServer {
Self {
endpoint,
rooms: Arc::new(Mutex::new(RoomRegistry::new(max_clients_per_room))),
sessions: Arc::new(Mutex::new(HashMap::new())),
}
}
@@ -57,6 +84,7 @@ impl RelayServer {
pub async fn run_until_shutdown(self) -> Result<()> {
let endpoint = self.endpoint.clone();
let rooms = Arc::clone(&self.rooms);
let sessions = Arc::clone(&self.sessions);
loop {
tokio::select! {
@@ -70,9 +98,10 @@ impl RelayServer {
return Ok(());
};
let rooms = Arc::clone(&rooms);
let sessions = Arc::clone(&sessions);
tokio::spawn(async move {
if let Err(error) = handle_incoming_connection(rooms, incoming).await {
if let Err(error) = handle_incoming_connection(rooms, sessions, incoming).await {
eprintln!("relay connection failed: {error:#}");
}
});
@@ -87,7 +116,33 @@ impl RelayServer {
return Ok(None);
};
handle_incoming_connection(Arc::clone(&self.rooms), incoming).await
handle_incoming_connection(
Arc::clone(&self.rooms),
Arc::clone(&self.sessions),
incoming,
)
.await
}
#[cfg(test)]
async fn accept_many_for_test(
&self,
count: usize,
) -> Result<Vec<tokio::task::JoinHandle<Result<Option<AcceptedPeer>>>>> {
let mut handles = Vec::with_capacity(count);
for _ in 0..count {
let Some(incoming) = self.endpoint.accept().await else {
return Err(anyhow!("relay endpoint stopped while accepting test peer"));
};
let rooms = Arc::clone(&self.rooms);
let sessions = Arc::clone(&self.sessions);
handles.push(tokio::spawn(async move {
handle_incoming_connection(rooms, sessions, incoming).await
}));
}
Ok(handles)
}
pub async fn shutdown(self, reason: &str) {
@@ -98,13 +153,14 @@ impl RelayServer {
async fn handle_incoming_connection(
rooms: Arc<Mutex<RoomRegistry>>,
sessions: Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
incoming: Incoming,
) -> Result<Option<AcceptedPeer>> {
let remote_addr = incoming.remote_address();
let connection = incoming
.await
.with_context(|| format!("failed to establish QUIC connection from {remote_addr}"))?;
let Some(accepted) = accept_control_handshake(&rooms, &connection).await? else {
let Some(accepted) = accept_control_handshake(&rooms, &sessions, &connection).await? else {
return Ok(None);
};
@@ -118,8 +174,8 @@ async fn handle_incoming_connection(
accepted.welcome.effective_tap_mtu()
);
let close_reason = connection.closed().await;
leave_peer(&rooms, &accepted.room, accepted.peer.peer_id()).await?;
let close_reason = run_peer_datagrams(&rooms, &sessions, &accepted, &connection).await;
leave_peer(&rooms, &sessions, &accepted.room, accepted.peer.peer_id()).await?;
println!(
"peer {} left room {}: {}",
accepted.peer.peer_id(),
@@ -132,6 +188,7 @@ async fn handle_incoming_connection(
async fn accept_control_handshake(
rooms: &Arc<Mutex<RoomRegistry>>,
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
connection: &quinn::Connection,
) -> Result<Option<AcceptedPeer>> {
let (mut send, mut recv) = connection
@@ -144,10 +201,13 @@ async fn accept_control_handshake(
.context("failed to read relay control hello")?;
let (accepted, response) = build_handshake_response(rooms, connection, frame.as_slice()).await;
if let Some(accepted) = &accepted {
register_peer(sessions, accepted, connection.clone()).await;
}
if let Err(error) = send_control_message(&mut send, &response).await {
if let Some(accepted) = &accepted {
leave_peer(rooms, &accepted.room, accepted.peer.peer_id()).await?;
leave_peer(rooms, sessions, &accepted.room, accepted.peer.peer_id()).await?;
}
return Err(error);
@@ -156,6 +216,113 @@ async fn accept_control_handshake(
Ok(accepted)
}
async fn register_peer(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer,
connection: quinn::Connection,
) {
sessions.lock().await.insert(
PeerKey::from_accepted(accepted),
PeerSession {
connection,
max_datagram_size: accepted.max_datagram_size,
},
);
}
async fn run_peer_datagrams(
rooms: &Arc<Mutex<RoomRegistry>>,
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer,
connection: &quinn::Connection,
) -> quinn::ConnectionError {
loop {
match connection.read_datagram().await {
Ok(datagram) => {
if let Err(error) = forward_peer_datagram(rooms, sessions, accepted, datagram).await
{
eprintln!(
"failed to forward datagram from peer {} in room {}: {error:#}",
accepted.peer.peer_id(),
accepted.room
);
}
}
Err(error) => return error,
}
}
}
async fn forward_peer_datagram(
rooms: &Arc<Mutex<RoomRegistry>>,
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
accepted: &AcceptedPeer,
datagram: Bytes,
) -> Result<()> {
let Ok(packet) = decode_datagram(&datagram) else {
return Ok(());
};
let header = packet.header();
if header.frame_type() != FrameType::Ethernet
|| header.room_id() != accepted.welcome.room_id()
|| header.peer_id() != accepted.peer.peer_id()
{
return Ok(());
}
let decision = rooms.lock().await.forward_ethernet(
&accepted.room,
accepted.peer.peer_id(),
packet.payload(),
)?;
let target_peer_ids = decision.targets().to_vec();
if target_peer_ids.is_empty() {
return Ok(());
}
let outgoing = encode_datagram(
FrameType::Ethernet,
accepted.welcome.room_id(),
accepted.peer.peer_id(),
header.flags(),
packet.payload(),
)?;
let target_sessions = collect_target_sessions(sessions, &accepted.room, &target_peer_ids).await;
for target in target_sessions {
if outgoing.len() > target.max_datagram_size {
continue;
}
if let Err(error) = target
.connection
.send_datagram(Bytes::from(outgoing.clone()))
{
eprintln!(
"failed to send datagram from peer {} in room {}: {error}",
accepted.peer.peer_id(),
accepted.room
);
}
}
Ok(())
}
async fn collect_target_sessions(
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode,
target_peer_ids: &[u32],
) -> Vec<PeerSession> {
let sessions = sessions.lock().await;
target_peer_ids
.iter()
.filter_map(|peer_id| sessions.get(&PeerKey::new(room.clone(), *peer_id)).cloned())
.collect()
}
async fn build_handshake_response(
rooms: &Arc<Mutex<RoomRegistry>>,
connection: &quinn::Connection,
@@ -255,7 +422,16 @@ async fn send_control_message(send: &mut SendStream, message: &ControlMessage) -
Ok(())
}
async fn leave_peer(rooms: &Arc<Mutex<RoomRegistry>>, room: &RoomCode, peer_id: u32) -> Result<()> {
async fn leave_peer(
rooms: &Arc<Mutex<RoomRegistry>>,
sessions: &Arc<Mutex<HashMap<PeerKey, PeerSession>>>,
room: &RoomCode,
peer_id: u32,
) -> Result<()> {
sessions
.lock()
.await
.remove(&PeerKey::new(room.clone(), peer_id));
rooms
.lock()
.await
@@ -305,8 +481,9 @@ mod tests {
time::Duration,
};
use bytes::Bytes;
use lanparty_ctrl::{RoomCode, decode_control_frame, encode_control_message};
use lanparty_proto::MacAddr;
use lanparty_proto::{FrameType, MacAddr, decode_datagram, encode_datagram};
use quinn::{ClientConfig, crypto::rustls::QuicClientConfig};
use crate::{DEFAULT_MAX_CLIENTS_PER_ROOM, ListenEndpoint};
@@ -382,6 +559,83 @@ mod tests {
assert_eq!(rooms.lock().await.room_count(), 0);
}
#[tokio::test]
async fn forwards_ethernet_datagrams_between_joined_peers() {
let (server, certificate) = bind_test_server(DEFAULT_MAX_CLIENTS_PER_ROOM);
let rooms = Arc::clone(&server.rooms);
let sessions = Arc::clone(&server.sessions);
let server_addr = server.local_addr().unwrap();
let server_task = tokio::spawn(async move {
let handles = server.accept_many_for_test(2).await.unwrap();
let mut accepted = Vec::with_capacity(handles.len());
for handle in handles {
accepted.push(handle.await.unwrap().unwrap().unwrap());
}
server.shutdown("test complete").await;
accepted
});
let first_endpoint = client_endpoint(certificate.clone()).unwrap();
let first_connection = first_endpoint
.connect(server_addr, "lanparty-relay.local")
.unwrap()
.await
.unwrap();
let second_endpoint = client_endpoint(certificate).unwrap();
let second_connection = second_endpoint
.connect(server_addr, "lanparty-relay.local")
.unwrap()
.await
.unwrap();
let first_mac = client_mac(1);
let second_mac = client_mac(2);
let first_welcome = welcome_for_client(&first_connection, first_mac).await;
let second_welcome = welcome_for_client(&second_connection, second_mac).await;
let ethernet = ethernet_frame(second_mac, first_mac);
let datagram = encode_datagram(
FrameType::Ethernet,
first_welcome.room_id(),
first_welcome.peer_id(),
0,
&ethernet,
)
.unwrap();
first_connection
.send_datagram(Bytes::from(datagram))
.unwrap();
let received =
tokio::time::timeout(Duration::from_secs(5), second_connection.read_datagram())
.await
.unwrap()
.unwrap();
let packet = decode_datagram(&received).unwrap();
let header = packet.header();
assert_eq!(header.frame_type(), FrameType::Ethernet);
assert_eq!(header.room_id(), first_welcome.room_id());
assert_eq!(header.peer_id(), first_welcome.peer_id());
assert_eq!(packet.payload(), ethernet.as_slice());
assert_eq!(first_welcome.room_id(), second_welcome.room_id());
first_connection.close(0_u32.into(), b"test complete");
second_connection.close(0_u32.into(), b"test complete");
first_endpoint.wait_idle().await;
second_endpoint.wait_idle().await;
let accepted = tokio::time::timeout(Duration::from_secs(5), server_task)
.await
.unwrap()
.unwrap();
assert_eq!(accepted.len(), 2);
assert_eq!(rooms.lock().await.room_count(), 0);
assert!(sessions.lock().await.is_empty());
}
fn bind_test_server(max_clients_per_room: usize) -> (RelayServer, CertificateDer<'static>) {
let (server_config, certificate) = development_server_config_with_certificate().unwrap();
let endpoint = Endpoint::server(
@@ -430,4 +684,30 @@ mod tests {
Ok(decode_control_frame(&response)?)
}
async fn welcome_for_client(connection: &quinn::Connection, mac: MacAddr) -> ServerWelcome {
let hello = EndpointHello::client(RoomCode::new("TESTROOM").unwrap(), mac, 1400).unwrap();
let response = request_control_message(connection, ControlMessage::Hello(hello))
.await
.unwrap();
let ControlMessage::Welcome(welcome) = response else {
panic!("expected welcome response");
};
welcome
}
fn client_mac(last: u8) -> MacAddr {
MacAddr::new([0x02, 0, 0, 0, 0, last])
}
fn ethernet_frame(destination: MacAddr, source: MacAddr) -> Vec<u8> {
let mut frame = Vec::new();
frame.extend_from_slice(&destination.octets());
frame.extend_from_slice(&source.octets());
frame.extend_from_slice(&0x0800_u16.to_be_bytes());
frame.extend_from_slice(b"payload");
frame
}
}