Files
softlan-vpn/crates/lanparty-client-core/src/lib.rs
T
ddidderr 3c1a35ea00 fix(client): enforce virtual MAC before relay send
The relay already rejects client frames whose source MAC does not match the
announced virtual MAC. The Windows bridge can still see those frames from TAP,
though, and sending them to the relay wastes datagram budget and makes the
client-side counters less useful during manual tests.

Carry the configured virtual MAC into ClientRelayIo and drop invalid or
unauthorized TAP source MACs before QUIC DATAGRAM encoding. The relay keeps the
same checks as the trust boundary, but client diagnostics now account for these
drops locally.

Document the local source-MAC check and list InvalidSourceMac as a suspicious
manual-test drop reason.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-client-core connects_to_relay_control_stream_as_client
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md source-MAC authorization and safety filters
2026-05-22 04:58:12 +02:00

1044 lines
34 KiB
Rust

//! Platform-neutral remote client relay session.
//!
//! The Windows binary will own TAP discovery, routing, and adapter I/O. This
//! crate owns the shared relay-facing state machine: connect to the relay,
//! announce the client's virtual MAC, and exchange Ethernet frames as QUIC
//! datagrams after the control-plane welcome.
use std::{
fs,
future::poll_fn,
io::ErrorKind,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use anyhow::{Context, Result, bail};
use bytes::Bytes;
use lanparty_ctrl::{
CONTROL_LENGTH_PREFIX_LEN, ControlMessage, DisconnectReason, EndpointHello,
MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN, RoomCode, ServerWelcome, decode_control_frame,
encode_control_message,
};
use lanparty_obs::{DropReason, QuicDiagnostics, TunnelStats};
use lanparty_proto::{
EthernetFrame, FrameType, MacAddr, decode_datagram, encode_datagram, validate_datagram_budget,
};
use quinn::{ClientConfig, Endpoint, crypto::rustls::QuicClientConfig};
use rustls::pki_types::CertificateDer;
const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MESSAGE_LEN;
const DISCONNECT_DRAIN_TIMEOUT: Duration = Duration::from_millis(250);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ClientIdentity {
virtual_mac: MacAddr,
}
impl ClientIdentity {
pub fn new(virtual_mac: MacAddr) -> Result<Self> {
if !virtual_mac.is_valid_client_identity() {
bail!("client virtual MAC must be locally administered unicast");
}
Ok(Self { virtual_mac })
}
pub fn generate() -> Result<Self> {
let mut octets = [0_u8; 6];
getrandom::fill(&mut octets).context("failed to generate client virtual MAC")?;
octets[0] = (octets[0] | 0x02) & 0xfe;
Self::new(MacAddr::new(octets))
}
#[must_use]
pub const fn virtual_mac(self) -> MacAddr {
self.virtual_mac
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientIdentityStore {
path: PathBuf,
}
impl ClientIdentityStore {
pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
if path.as_os_str().is_empty() {
bail!("client identity path cannot be empty");
}
Ok(Self { path })
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
pub fn load_or_create(&self) -> Result<ClientIdentity> {
match fs::read(&self.path) {
Ok(bytes) => read_identity(&bytes)
.with_context(|| format!("failed to read client identity {}", self.path.display())),
Err(error) if error.kind() == ErrorKind::NotFound => {
let identity = ClientIdentity::generate()?;
write_identity(&self.path, identity)?;
Ok(identity)
}
Err(error) => Err(error)
.with_context(|| format!("failed to read client identity {}", self.path.display())),
}
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct ClientIdentityFile {
virtual_mac: String,
}
fn read_identity(bytes: &[u8]) -> Result<ClientIdentity> {
let file: ClientIdentityFile =
serde_json::from_slice(bytes).context("failed to parse client identity JSON")?;
let virtual_mac = file
.virtual_mac
.parse()
.context("failed to parse client identity virtual MAC")?;
ClientIdentity::new(virtual_mac)
}
fn write_identity(path: &Path, identity: ClientIdentity) -> Result<()> {
let file_name = path
.file_name()
.context("client identity path must include a file name")?;
if let Some(parent) = path
.parent()
.filter(|parent| !parent.as_os_str().is_empty())
{
fs::create_dir_all(parent)
.with_context(|| format!("failed to create identity directory {}", parent.display()))?;
}
let mut temp_file_name = file_name.to_os_string();
temp_file_name.push(".tmp");
let temp_path = path.with_file_name(temp_file_name);
let contents = serde_json::to_vec_pretty(&ClientIdentityFile {
virtual_mac: identity.virtual_mac().to_string(),
})
.context("failed to encode client identity JSON")?;
fs::write(&temp_path, contents)
.with_context(|| format!("failed to write client identity {}", temp_path.display()))?;
fs::rename(&temp_path, path)
.with_context(|| format!("failed to persist client identity {}", path.display()))?;
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientSessionConfig {
relay_addr: SocketAddr,
server_name: String,
relay_ca_cert_der: Vec<u8>,
room: RoomCode,
virtual_mac: MacAddr,
max_datagram_size: u16,
}
impl ClientSessionConfig {
pub fn new(
relay_addr: SocketAddr,
server_name: impl Into<String>,
relay_ca_cert_der: Vec<u8>,
room: RoomCode,
virtual_mac: MacAddr,
max_datagram_size: u16,
) -> Result<Self> {
let server_name = server_name.into();
if server_name.trim().is_empty() {
bail!("relay server name cannot be empty");
}
if relay_ca_cert_der.is_empty() {
bail!("relay CA certificate cannot be empty");
}
EndpointHello::client(room.clone(), virtual_mac, max_datagram_size)
.context("invalid client identity or datagram budget")?;
Ok(Self {
relay_addr,
server_name,
relay_ca_cert_der,
room,
virtual_mac,
max_datagram_size,
})
}
#[must_use]
pub const fn relay_addr(&self) -> SocketAddr {
self.relay_addr
}
#[must_use]
pub fn server_name(&self) -> &str {
&self.server_name
}
#[must_use]
pub fn relay_ca_cert_der(&self) -> &[u8] {
&self.relay_ca_cert_der
}
#[must_use]
pub const fn room(&self) -> &RoomCode {
&self.room
}
#[must_use]
pub const fn virtual_mac(&self) -> MacAddr {
self.virtual_mac
}
#[must_use]
pub const fn max_datagram_size(&self) -> u16 {
self.max_datagram_size
}
}
#[derive(Debug)]
pub struct ClientSession {
endpoint: Endpoint,
connection: quinn::Connection,
config: ClientSessionConfig,
welcome: ServerWelcome,
quic_max_datagram_size: u16,
stats: Arc<ClientTunnelStats>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReceivedEthernetFrame {
source_peer_id: u32,
payload: Bytes,
}
impl ReceivedEthernetFrame {
#[must_use]
pub const fn source_peer_id(&self) -> u32 {
self.source_peer_id
}
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.payload
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClientSendOutcome {
Sent,
Dropped(DropReason),
}
impl ClientSession {
#[must_use]
pub const fn config(&self) -> &ClientSessionConfig {
&self.config
}
#[must_use]
pub const fn welcome(&self) -> &ServerWelcome {
&self.welcome
}
#[must_use]
pub const fn quic_max_datagram_size(&self) -> u16 {
self.quic_max_datagram_size
}
#[must_use]
pub fn quic_diagnostics(&self) -> QuicDiagnostics {
QuicDiagnostics::new(true, Some(self.quic_max_datagram_size))
.with_relay_rtt_ms(Some(duration_millis_saturating(self.connection.rtt())))
}
#[must_use]
pub fn relay_io(&self) -> ClientRelayIo {
ClientRelayIo::new(
self.connection.clone(),
self.welcome.clone(),
self.quic_max_datagram_size,
self.config.virtual_mac(),
Arc::clone(&self.stats),
)
}
pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> {
self.relay_io().send_ethernet(frame)
}
pub fn send_ethernet_with_outcome(&self, frame: &[u8]) -> Result<ClientSendOutcome> {
self.relay_io().send_ethernet_with_outcome(frame)
}
pub async fn recv_ethernet(&self) -> Result<ReceivedEthernetFrame> {
self.relay_io().recv_ethernet().await
}
pub async fn recv_control_event(&self) -> Result<ControlMessage> {
recv_control_event(&self.connection).await
}
pub async fn send_stats_snapshot(&self) -> Result<()> {
self.relay_io().send_stats_snapshot().await
}
#[must_use]
pub fn stats_snapshot(&self) -> TunnelStats {
self.stats.snapshot()
}
pub async fn shutdown(self, reason: &str) {
if send_disconnect(&self.connection, reason).await.is_ok() {
drain_disconnect().await;
}
self.connection.close(0_u32.into(), reason.as_bytes());
self.endpoint.wait_idle().await;
}
}
#[derive(Debug, Clone)]
pub struct ClientRelayIo {
connection: quinn::Connection,
welcome: ServerWelcome,
quic_max_datagram_size: u16,
virtual_mac: MacAddr,
stats: Arc<ClientTunnelStats>,
}
impl ClientRelayIo {
#[must_use]
fn new(
connection: quinn::Connection,
welcome: ServerWelcome,
quic_max_datagram_size: u16,
virtual_mac: MacAddr,
stats: Arc<ClientTunnelStats>,
) -> Self {
Self {
connection,
welcome,
quic_max_datagram_size,
virtual_mac,
stats,
}
}
#[must_use]
pub const fn welcome(&self) -> &ServerWelcome {
&self.welcome
}
#[must_use]
pub const fn quic_max_datagram_size(&self) -> u16 {
self.quic_max_datagram_size
}
#[must_use]
pub const fn virtual_mac(&self) -> MacAddr {
self.virtual_mac
}
pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> {
match self.send_ethernet_with_outcome(frame)? {
ClientSendOutcome::Sent => Ok(()),
ClientSendOutcome::Dropped(DropReason::DatagramBudget) => {
bail!("client Ethernet datagram exceeds negotiated QUIC budget")
}
ClientSendOutcome::Dropped(DropReason::Malformed) => {
bail!("client Ethernet frame is malformed")
}
ClientSendOutcome::Dropped(drop_reason) => {
bail!("client Ethernet frame was dropped: {drop_reason:?}")
}
}
}
pub fn send_ethernet_with_outcome(&self, frame: &[u8]) -> Result<ClientSendOutcome> {
let ethernet_frame = match EthernetFrame::parse(frame) {
Ok(frame) => frame,
Err(_) => {
self.stats.record_malformed_frame();
return Ok(ClientSendOutcome::Dropped(DropReason::Malformed));
}
};
if ethernet_frame.is_jumbo() {
self.stats.record_dropped_frame();
return Ok(ClientSendOutcome::Dropped(DropReason::JumboFrame));
}
if !ethernet_frame.source().is_valid_unicast() {
self.stats.record_dropped_frame();
return Ok(ClientSendOutcome::Dropped(DropReason::InvalidSourceMac));
}
if ethernet_frame.source() != self.virtual_mac {
self.stats.record_dropped_frame();
return Ok(ClientSendOutcome::Dropped(
DropReason::UnauthorizedSourceMac,
));
}
let datagram = encode_datagram(
FrameType::Ethernet,
self.welcome.room_id(),
self.welcome.peer_id(),
0,
frame,
)
.context("failed to encode client Ethernet datagram")?;
if validate_datagram_budget(datagram.len(), usize::from(self.quic_max_datagram_size))
.is_err()
{
self.stats.record_dropped_frame();
return Ok(ClientSendOutcome::Dropped(DropReason::DatagramBudget));
}
self.connection
.send_datagram(Bytes::from(datagram))
.context("failed to send client Ethernet datagram")?;
self.stats.record_ethernet_tx(ethernet_frame);
Ok(ClientSendOutcome::Sent)
}
pub async fn recv_ethernet(&self) -> Result<ReceivedEthernetFrame> {
loop {
let datagram = self.connection.read_datagram().await?;
self.stats.record_datagram_rx();
let Ok(packet) = decode_datagram(&datagram) else {
self.stats.record_malformed_frame();
continue;
};
let header = packet.header();
if header.frame_type() != FrameType::Ethernet
|| header.room_id() != self.welcome.room_id()
|| header.peer_id() == self.welcome.peer_id()
{
self.stats.record_dropped_frame();
continue;
}
let ethernet_frame = match EthernetFrame::parse(packet.payload()) {
Ok(frame) => frame,
Err(_) => {
self.stats.record_malformed_frame();
continue;
}
};
self.stats.record_ethernet_rx(ethernet_frame);
return Ok(ReceivedEthernetFrame {
source_peer_id: header.peer_id(),
payload: Bytes::copy_from_slice(packet.payload()),
});
}
}
#[must_use]
pub fn stats_snapshot(&self) -> TunnelStats {
self.stats.snapshot()
}
pub async fn send_stats_snapshot(&self) -> Result<()> {
let stats = self.stats.snapshot();
send_control_event(&self.connection, ControlMessage::Stats(stats)).await
}
}
#[derive(Debug, Default)]
struct ClientTunnelStats {
ethernet_frames_tx: AtomicU64,
ethernet_frames_rx: AtomicU64,
broadcast_frames_tx: AtomicU64,
broadcast_frames_rx: AtomicU64,
datagrams_tx: AtomicU64,
datagrams_rx: AtomicU64,
dropped_frames: AtomicU64,
malformed_frames: AtomicU64,
}
impl ClientTunnelStats {
fn record_ethernet_tx(&self, frame: EthernetFrame<'_>) {
self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed);
if frame.is_broadcast() {
self.broadcast_frames_tx.fetch_add(1, Ordering::Relaxed);
}
self.datagrams_tx.fetch_add(1, Ordering::Relaxed);
}
fn record_ethernet_rx(&self, frame: EthernetFrame<'_>) {
self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed);
if frame.is_broadcast() {
self.broadcast_frames_rx.fetch_add(1, Ordering::Relaxed);
}
}
fn record_datagram_rx(&self) {
self.datagrams_rx.fetch_add(1, Ordering::Relaxed);
}
fn record_dropped_frame(&self) {
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
}
fn record_malformed_frame(&self) {
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
self.malformed_frames.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self) -> TunnelStats {
TunnelStats::new(
self.ethernet_frames_tx.load(Ordering::Relaxed),
self.ethernet_frames_rx.load(Ordering::Relaxed),
self.datagrams_tx.load(Ordering::Relaxed),
self.datagrams_rx.load(Ordering::Relaxed),
self.dropped_frames.load(Ordering::Relaxed),
self.malformed_frames.load(Ordering::Relaxed),
)
.with_broadcast_frames(
self.broadcast_frames_tx.load(Ordering::Relaxed),
self.broadcast_frames_rx.load(Ordering::Relaxed),
)
}
}
pub async fn connect_client(config: ClientSessionConfig) -> Result<ClientSession> {
let client_config = relay_client_config(config.relay_ca_cert_der())?;
let mut endpoint = Endpoint::client(client_bind_addr(config.relay_addr()))
.context("failed to bind client QUIC endpoint")?;
endpoint.set_default_client_config(client_config);
let connection = endpoint
.connect(config.relay_addr(), config.server_name())?
.await
.with_context(|| format!("failed to connect to relay {}", config.relay_addr()))?;
let peer_datagram_size = connection
.max_datagram_size()
.context("relay did not negotiate QUIC DATAGRAM support")?;
let hello_datagram_size =
negotiated_quic_datagram_size(config.max_datagram_size(), peer_datagram_size);
let hello = EndpointHello::client(
config.room().clone(),
config.virtual_mac(),
hello_datagram_size,
)
.context("failed to build client hello")?;
let response = request_control_message(&connection, ControlMessage::Hello(hello)).await?;
match response {
ControlMessage::Welcome(welcome) => Ok(ClientSession {
endpoint,
connection,
config,
welcome,
quic_max_datagram_size: hello_datagram_size,
stats: Arc::default(),
}),
ControlMessage::Reject(reject) => bail!(
"relay rejected client hello: {:?}: {}",
reject.reason(),
reject.message()
),
other => bail!("relay sent unexpected client handshake response: {other:?}"),
}
}
#[must_use]
fn negotiated_quic_datagram_size(configured: u16, peer: usize) -> u16 {
usize::from(configured).min(peer).min(usize::from(u16::MAX)) as u16
}
fn relay_client_config(relay_ca_cert_der: &[u8]) -> Result<ClientConfig> {
let mut roots = rustls::RootCertStore::empty();
roots
.add(CertificateDer::from(relay_ca_cert_der.to_vec()))
.context("failed to trust relay CA certificate")?;
let mut client_crypto = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
client_crypto.alpn_protocols = vec![RELAY_ALPN.to_vec()];
Ok(ClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto).context("failed to build QUIC client config")?,
)))
}
fn client_bind_addr(relay_addr: SocketAddr) -> SocketAddr {
match relay_addr.ip() {
IpAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
IpAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
}
}
fn duration_millis_saturating(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
async fn request_control_message(
connection: &quinn::Connection,
message: ControlMessage,
) -> Result<ControlMessage> {
let (mut send, mut recv) = connection.open_bi().await?;
let request = encode_control_message(&message)?;
send.write_all(&request).await?;
send.finish()?;
let response = recv.read_to_end(MAX_CONTROL_FRAME_LEN).await?;
Ok(decode_control_frame(&response)?)
}
async fn recv_control_event(connection: &quinn::Connection) -> Result<ControlMessage> {
let mut recv = connection
.accept_uni()
.await
.context("failed to accept relay control event stream")?;
let frame = recv
.read_to_end(MAX_CONTROL_FRAME_LEN)
.await
.context("failed to read relay control event")?;
decode_control_frame(&frame).context("failed to decode relay control event")
}
async fn send_control_event(connection: &quinn::Connection, message: ControlMessage) -> Result<()> {
let mut send = connection
.open_uni()
.await
.context("failed to open client control event stream")?;
let frame =
encode_control_message(&message).context("failed to encode client control event")?;
send.write_all(&frame)
.await
.context("failed to write client control event")?;
send.finish()?;
Ok(())
}
async fn send_disconnect(connection: &quinn::Connection, message: &str) -> Result<()> {
send_control_event(
connection,
ControlMessage::Disconnect {
reason: DisconnectReason::Normal,
message: message.to_owned(),
},
)
.await
}
async fn drain_disconnect() {
let Some(runtime) = quinn::default_runtime() else {
return;
};
let mut timer = runtime.new_timer(runtime.now() + DISCONNECT_DRAIN_TIMEOUT);
poll_fn(|cx| timer.as_mut().poll(cx)).await;
}
#[cfg(test)]
mod tests {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use lanparty_ctrl::{PeerInfo, Role};
use quinn::{ServerConfig, TransportConfig, crypto::rustls::QuicServerConfig};
use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer};
use super::*;
#[test]
fn validates_client_config() {
let room = RoomCode::new("ROOM1").unwrap();
let cert = vec![1, 2, 3];
let mac = MacAddr::new([0x02, 0, 0, 0, 0, 1]);
assert!(
ClientSessionConfig::new(
"127.0.0.1:443".parse().unwrap(),
"relay.local",
cert.clone(),
room.clone(),
mac,
1400,
)
.is_ok()
);
assert!(
ClientSessionConfig::new(
"127.0.0.1:443".parse().unwrap(),
"",
cert.clone(),
room.clone(),
mac,
1400,
)
.is_err()
);
assert!(
ClientSessionConfig::new(
"127.0.0.1:443".parse().unwrap(),
"relay.local",
Vec::new(),
room.clone(),
mac,
1400,
)
.is_err()
);
assert!(
ClientSessionConfig::new(
"127.0.0.1:443".parse().unwrap(),
"relay.local",
cert,
room,
MacAddr::BROADCAST,
1400,
)
.is_err()
);
}
#[test]
fn generates_valid_client_identity() {
let identity = ClientIdentity::generate().unwrap();
assert!(identity.virtual_mac().is_valid_client_identity());
}
#[test]
fn clamps_negotiated_quic_datagram_size() {
assert_eq!(negotiated_quic_datagram_size(1400, 1350), 1350);
assert_eq!(negotiated_quic_datagram_size(1300, 1400), 1300);
assert_eq!(
negotiated_quic_datagram_size(u16::MAX, usize::MAX),
u16::MAX
);
}
#[test]
fn identity_store_creates_and_reuses_mac() {
let path = unique_temp_identity_path();
let store = ClientIdentityStore::new(&path).unwrap();
let first = store.load_or_create().unwrap();
let second = store.load_or_create().unwrap();
let stored = std::fs::read_to_string(&path).unwrap();
assert!(path.exists());
assert!(first.virtual_mac().is_valid_client_identity());
assert_eq!(first, second);
assert!(stored.contains(&first.virtual_mac().to_string()));
std::fs::remove_file(path).unwrap();
}
#[test]
fn identity_store_rejects_invalid_saved_mac() {
let path = unique_temp_identity_path();
std::fs::write(
&path,
r#"{
"virtual_mac": "ff:ff:ff:ff:ff:ff"
}"#,
)
.unwrap();
let store = ClientIdentityStore::new(&path).unwrap();
assert!(store.load_or_create().is_err());
std::fs::remove_file(path).unwrap();
}
#[tokio::test]
async fn connects_to_relay_control_stream_as_client() {
let (server_config, certificate) = test_server_config();
let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap()).unwrap();
let server_addr = endpoint.local_addr().unwrap();
let (stats_received_tx, stats_received_rx) = tokio::sync::oneshot::channel();
let server_task = tokio::spawn(async move {
let incoming = endpoint.accept().await.unwrap();
let connection = incoming.await.unwrap();
let (mut send, mut recv) = connection.accept_bi().await.unwrap();
let request = recv.read_to_end(MAX_CONTROL_FRAME_LEN).await.unwrap();
let message = decode_control_frame(&request).unwrap();
let ControlMessage::Hello(hello) = message else {
panic!("expected client hello");
};
assert_eq!(hello.role(), Role::Client);
assert_eq!(hello.room().as_str(), "ROOM1");
assert_eq!(
hello.announced_mac(),
Some(MacAddr::new([0x02, 0, 0, 0, 0, 1]))
);
let response = encode_control_message(&ControlMessage::Welcome(
ServerWelcome::new(7, 2, 1200).unwrap(),
))
.unwrap();
send.write_all(&response).await.unwrap();
send.finish().unwrap();
let datagram = connection.read_datagram().await.unwrap();
let packet = decode_datagram(&datagram).unwrap();
let header = packet.header();
assert_eq!(header.frame_type(), FrameType::Ethernet);
assert_eq!(header.room_id(), 7);
assert_eq!(header.peer_id(), 2);
assert_eq!(packet.payload(), ethernet_frame(b"to relay").as_slice());
let response =
encode_datagram(FrameType::Ethernet, 7, 1, 0, &ethernet_frame(b"from relay"))
.unwrap();
connection.send_datagram(Bytes::from(response)).unwrap();
let event = encode_control_message(&ControlMessage::PeerJoined(
PeerInfo::new(1, Role::Gateway, None).unwrap(),
))
.unwrap();
let mut event_send = connection.open_uni().await.unwrap();
event_send.write_all(&event).await.unwrap();
event_send.finish().unwrap();
let mut stats_recv = connection.accept_uni().await.unwrap();
let stats_frame = stats_recv.read_to_end(MAX_CONTROL_FRAME_LEN).await.unwrap();
let stats_message = decode_control_frame(&stats_frame).unwrap();
let ControlMessage::Stats(stats) = stats_message else {
panic!("expected client stats event");
};
assert_eq!(stats, TunnelStats::new(1, 1, 1, 1, 4, 1));
stats_received_tx.send(()).unwrap();
let mut disconnect_recv = connection.accept_uni().await.unwrap();
let disconnect_frame = disconnect_recv
.read_to_end(MAX_CONTROL_FRAME_LEN)
.await
.unwrap();
let disconnect_message = decode_control_frame(&disconnect_frame).unwrap();
let ControlMessage::Disconnect { reason, message } = disconnect_message else {
panic!("expected client disconnect event");
};
assert_eq!(reason, DisconnectReason::Normal);
assert_eq!(message, "test complete");
connection.closed().await;
endpoint.close(0_u32.into(), b"test complete");
endpoint.wait_idle().await;
});
let config = ClientSessionConfig::new(
server_addr,
"lanparty-relay.local",
certificate.as_ref().to_vec(),
RoomCode::new("ROOM1").unwrap(),
MacAddr::new([0x02, 0, 0, 0, 0, 1]),
1400,
)
.unwrap();
let client = connect_client(config).await.unwrap();
assert_eq!(
client.config().virtual_mac(),
MacAddr::new([0x02, 0, 0, 0, 0, 1])
);
assert_eq!(client.welcome().room_id(), 7);
assert_eq!(client.welcome().peer_id(), 2);
assert!(!client.welcome().gateway_connected());
assert!(client.quic_max_datagram_size() <= 1400);
assert!(client.quic_diagnostics().datagram_supported());
assert_eq!(
client.quic_diagnostics().max_datagram_size(),
Some(client.quic_max_datagram_size())
);
assert!(client.quic_diagnostics().relay_rtt_ms().is_some());
let relay_io = client.relay_io();
assert_eq!(relay_io.welcome().peer_id(), 2);
assert_eq!(
relay_io.quic_max_datagram_size(),
client.quic_max_datagram_size()
);
assert_eq!(relay_io.virtual_mac(), client.config().virtual_mac());
assert_eq!(
relay_io
.send_ethernet_with_outcome(&ethernet_frame(b"to relay"))
.unwrap(),
ClientSendOutcome::Sent
);
let received = tokio::time::timeout(Duration::from_secs(5), relay_io.recv_ethernet())
.await
.unwrap()
.unwrap();
assert_eq!(received.source_peer_id(), 1);
assert_eq!(received.payload(), ethernet_frame(b"from relay").as_slice());
let event = tokio::time::timeout(Duration::from_secs(5), client.recv_control_event())
.await
.unwrap()
.unwrap();
let ControlMessage::PeerJoined(peer) = event else {
panic!("expected peer joined event");
};
assert_eq!(peer.peer_id(), 1);
assert_eq!(peer.role(), Role::Gateway);
let oversized_payload = vec![0; usize::from(relay_io.quic_max_datagram_size())];
assert_eq!(
relay_io
.send_ethernet_with_outcome(&ethernet_frame(&oversized_payload))
.unwrap(),
ClientSendOutcome::Dropped(DropReason::DatagramBudget)
);
assert_eq!(
relay_io.send_ethernet_with_outcome(&[0; 4]).unwrap(),
ClientSendOutcome::Dropped(DropReason::Malformed)
);
assert_eq!(
relay_io
.send_ethernet_with_outcome(&ethernet_frame_from(
MacAddr::new([0x03, 0, 0, 0, 0, 9]),
b"invalid source"
))
.unwrap(),
ClientSendOutcome::Dropped(DropReason::InvalidSourceMac)
);
assert_eq!(
relay_io
.send_ethernet_with_outcome(&ethernet_frame_from(
MacAddr::new([0x02, 0, 0, 0, 0, 9]),
b"forged source"
))
.unwrap(),
ClientSendOutcome::Dropped(DropReason::UnauthorizedSourceMac)
);
let stats = relay_io.stats_snapshot();
assert_eq!(stats.ethernet_frames_tx(), 1);
assert_eq!(stats.ethernet_frames_rx(), 1);
assert_eq!(stats.broadcast_frames_tx(), 0);
assert_eq!(stats.broadcast_frames_rx(), 0);
assert_eq!(stats.datagrams_tx(), 1);
assert_eq!(stats.datagrams_rx(), 1);
assert_eq!(stats.dropped_frames(), 4);
assert_eq!(stats.malformed_frames(), 1);
assert_eq!(client.stats_snapshot(), stats);
client.send_stats_snapshot().await.unwrap();
tokio::time::timeout(Duration::from_secs(5), stats_received_rx)
.await
.unwrap()
.unwrap();
client.shutdown("test complete").await;
tokio::time::timeout(Duration::from_secs(5), server_task)
.await
.unwrap()
.unwrap();
}
#[test]
fn snapshots_client_tunnel_stats() {
let stats = ClientTunnelStats::default();
let broadcast_tx_bytes = broadcast_ethernet_frame(b"broadcast tx");
let broadcast_rx_bytes = broadcast_ethernet_frame(b"broadcast rx");
let broadcast_tx = EthernetFrame::parse(&broadcast_tx_bytes).unwrap();
let broadcast_rx = EthernetFrame::parse(&broadcast_rx_bytes).unwrap();
stats.record_ethernet_tx(broadcast_tx);
stats.record_datagram_rx();
stats.record_ethernet_rx(broadcast_rx);
stats.record_dropped_frame();
stats.record_malformed_frame();
let snapshot = stats.snapshot();
assert_eq!(snapshot.ethernet_frames_tx(), 1);
assert_eq!(snapshot.ethernet_frames_rx(), 1);
assert_eq!(snapshot.broadcast_frames_tx(), 1);
assert_eq!(snapshot.broadcast_frames_rx(), 1);
assert_eq!(snapshot.datagrams_tx(), 1);
assert_eq!(snapshot.datagrams_rx(), 1);
assert_eq!(snapshot.dropped_frames(), 2);
assert_eq!(snapshot.malformed_frames(), 1);
}
#[test]
fn converts_relay_rtt_to_milliseconds() {
assert_eq!(duration_millis_saturating(Duration::from_micros(999)), 0);
assert_eq!(duration_millis_saturating(Duration::from_millis(23)), 23);
}
fn test_server_config() -> (ServerConfig, CertificateDer<'static>) {
let certified_key =
rcgen::generate_simple_self_signed(vec!["lanparty-relay.local".into()]).unwrap();
let certificate = certified_key.cert.der().clone();
let cert_chain = vec![certificate.clone()];
let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(
certified_key.signing_key.serialize_der(),
));
let mut tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(cert_chain, private_key)
.unwrap();
tls_config.alpn_protocols = vec![RELAY_ALPN.to_vec()];
let mut server_config =
ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(tls_config).unwrap()));
let mut transport = TransportConfig::default();
transport.datagram_receive_buffer_size(Some(4 * 1024 * 1024));
transport.datagram_send_buffer_size(4 * 1024 * 1024);
server_config.transport_config(Arc::new(transport));
(server_config, certificate)
}
fn ethernet_frame(payload: &[u8]) -> Vec<u8> {
ethernet_frame_from(MacAddr::new([0x02, 0, 0, 0, 0, 1]), payload)
}
fn ethernet_frame_from(source: MacAddr, payload: &[u8]) -> Vec<u8> {
let mut frame = Vec::new();
frame.extend_from_slice(&[0x02, 0, 0, 0, 0, 2]);
frame.extend_from_slice(&source.octets());
frame.extend_from_slice(&0x0800_u16.to_be_bytes());
frame.extend_from_slice(payload);
frame
}
fn broadcast_ethernet_frame(payload: &[u8]) -> Vec<u8> {
let mut frame = Vec::new();
frame.extend_from_slice(&MacAddr::BROADCAST.octets());
frame.extend_from_slice(&[0x02, 0, 0, 0, 0, 1]);
frame.extend_from_slice(&0x0800_u16.to_be_bytes());
frame.extend_from_slice(payload);
frame
}
fn unique_temp_identity_path() -> std::path::PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
std::env::temp_dir().join(format!(
"lanparty-client-identity-{}-{nanos}.json",
std::process::id()
))
}
}