Files
softlan-vpn/crates/lanparty-gateway/src/lib.rs
T
ddidderr bdb571799a feat(net): accept relay hostnames
PLAN.md describes the first client flow as entering a relay domain and room
code, but the client and gateway CLIs only accepted socket-address literals.
Add a small shared RelayEndpoint parser so bare hosts default to UDP/443 while
IP literals and explicit host:port values stay supported.

The runtime configs still store resolved SocketAddr values. That keeps the
Windows route-pinning path on a concrete relay IP before TAP activation while
avoiding duplicated endpoint grammar between client and gateway. The relay
listen config reuses the same default port constant so UDP/443 has one source.

README examples now use lanparty-relay.local and document the shared endpoint
syntax.

Test Plan:
- cargo fmt --check
- cargo test -p lanparty-net
- cargo test -p lanparty-client-win \
  accepts_relay_domain_with_default_port -- --nocapture
- cargo test -p lanparty-gateway \
  accepts_iface_alias_for_gateway_interface -- --nocapture
- cargo test -p lanparty-net -p lanparty-client-win -p lanparty-gateway
- cargo clippy -p lanparty-net -p lanparty-client-win -p lanparty-gateway \
  --all-targets -- -D warnings
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings
- git diff --check

Refs: PLAN.md
2026-05-21 21:40:00 +02:00

1110 lines
37 KiB
Rust

//! Linux LAN gateway control-plane connection.
//!
//! This crate owns the gateway binary's relay connection and Linux AF_PACKET
//! bridge loop that moves Ethernet frames between the relay and wired LAN.
#[cfg(target_os = "linux")]
mod packet;
#[cfg(target_os = "linux")]
use std::collections::BTreeMap;
use std::{
fs,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
path::PathBuf,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use anyhow::{Context, Result, bail};
use bytes::Bytes;
use clap::Parser;
use lanparty_ctrl::{
CONTROL_LENGTH_PREFIX_LEN, ControlMessage, DisconnectReason, EndpointHello,
MAX_CONTROL_MESSAGE_LEN, PeerInfo, RELAY_ALPN, Role, RoomCode, ServerWelcome,
decode_control_frame, encode_control_message,
};
use lanparty_net::RelayEndpoint;
use lanparty_obs::TunnelStats;
#[cfg(target_os = "linux")]
use lanparty_obs::{FrameAction, FrameDirection, FrameLog};
use lanparty_proto::{
EthernetFrame, FrameType, MAX_STANDARD_ETHERNET_FRAME_LEN, MacAddr, decode_datagram,
encode_datagram,
};
use quinn::{ClientConfig, Endpoint, crypto::rustls::QuicClientConfig};
use rustls::pki_types::CertificateDer;
#[cfg(target_os = "linux")]
use tokio::io::unix::AsyncFd;
#[cfg(target_os = "linux")]
pub use packet::PacketSocket;
const MAX_CONTROL_FRAME_LEN: usize = CONTROL_LENGTH_PREFIX_LEN + MAX_CONTROL_MESSAGE_LEN;
const DISCONNECT_DRAIN_TIMEOUT: Duration = Duration::from_millis(250);
#[cfg(target_os = "linux")]
const CAM_REFRESH_INTERVAL: Duration = Duration::from_secs(60);
#[cfg(target_os = "linux")]
const GATEWAY_STATS_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(target_os = "linux")]
// Local experimental EtherType for frames whose only job is switch MAC learning.
const CAM_REFRESH_ETHERTYPE: u16 = 0x88b5;
#[cfg(target_os = "linux")]
const CAM_REFRESH_PAYLOAD: &[u8] = b"lanparty-cam-refresh";
#[cfg(target_os = "linux")]
const MIN_ETHERNET_FRAME_WITHOUT_FCS: usize = 60;
#[derive(Debug, Parser)]
#[command(
name = "lanparty-gateway",
about = "Linux LAN gateway for the LAN party L2 tunnel"
)]
pub struct GatewayArgs {
/// Relay DNS name or UDP socket address; bare hosts default to UDP/443.
#[arg(long, value_name = "HOST[:PORT]")]
relay: RelayEndpoint,
/// TLS server name expected in the relay certificate.
#[arg(long, default_value = "lanparty-relay.local")]
server_name: String,
/// DER-encoded relay CA/certificate to trust.
#[arg(long, value_name = "PATH")]
relay_ca_cert: PathBuf,
/// Room code to join as the LAN gateway.
#[arg(long)]
room: RoomCode,
/// Wired LAN interface that will later be opened with AF_PACKET.
#[arg(long, alias = "iface")]
interface: String,
/// Gateway's advertised QUIC datagram budget before relay clamping.
#[arg(long, default_value_t = 1400)]
max_datagram_size: u16,
}
impl GatewayArgs {
pub fn into_config(self) -> Result<GatewayConfig> {
let relay_ca_cert = fs::read(&self.relay_ca_cert).with_context(|| {
format!(
"failed to read relay CA certificate {}",
self.relay_ca_cert.display()
)
})?;
let relay_addr = self
.relay
.resolve()
.with_context(|| format!("failed to resolve relay endpoint {}", self.relay))?;
GatewayConfig::new(
relay_addr,
self.server_name,
relay_ca_cert,
self.room,
self.interface,
self.max_datagram_size,
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GatewayConfig {
relay_addr: SocketAddr,
server_name: String,
relay_ca_cert_der: Vec<u8>,
room: RoomCode,
interface: String,
max_datagram_size: u16,
}
impl GatewayConfig {
pub fn new(
relay_addr: SocketAddr,
server_name: impl Into<String>,
relay_ca_cert_der: Vec<u8>,
room: RoomCode,
interface: impl Into<String>,
max_datagram_size: u16,
) -> Result<Self> {
let server_name = server_name.into();
if server_name.trim().is_empty() {
bail!("relay server name cannot be empty");
}
if relay_ca_cert_der.is_empty() {
bail!("relay CA certificate cannot be empty");
}
let interface = interface.into();
if interface.trim().is_empty() {
bail!("gateway interface cannot be empty");
}
EndpointHello::gateway(room.clone(), max_datagram_size)
.context("invalid gateway datagram budget")?;
Ok(Self {
relay_addr,
server_name,
relay_ca_cert_der,
room,
interface,
max_datagram_size,
})
}
#[must_use]
pub const fn relay_addr(&self) -> SocketAddr {
self.relay_addr
}
#[must_use]
pub fn server_name(&self) -> &str {
&self.server_name
}
#[must_use]
pub fn relay_ca_cert_der(&self) -> &[u8] {
&self.relay_ca_cert_der
}
#[must_use]
pub const fn room(&self) -> &RoomCode {
&self.room
}
#[must_use]
pub fn interface(&self) -> &str {
&self.interface
}
#[must_use]
pub const fn max_datagram_size(&self) -> u16 {
self.max_datagram_size
}
}
#[derive(Debug)]
pub struct GatewayConnection {
endpoint: Endpoint,
connection: quinn::Connection,
config: GatewayConfig,
welcome: ServerWelcome,
stats: Arc<GatewayTunnelStats>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReceivedEthernetFrame {
source_peer_id: u32,
payload: Bytes,
}
impl ReceivedEthernetFrame {
#[must_use]
pub const fn source_peer_id(&self) -> u32 {
self.source_peer_id
}
#[must_use]
pub fn payload(&self) -> &[u8] {
&self.payload
}
}
impl GatewayConnection {
#[must_use]
pub const fn config(&self) -> &GatewayConfig {
&self.config
}
#[must_use]
pub const fn welcome(&self) -> &ServerWelcome {
&self.welcome
}
pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> {
send_gateway_ethernet(&self.connection, &self.welcome, &self.stats, frame)
}
pub async fn recv_ethernet(&self) -> Result<ReceivedEthernetFrame> {
recv_gateway_ethernet(&self.connection, &self.welcome, &self.stats).await
}
pub async fn recv_control_event(&self) -> Result<ControlMessage> {
recv_gateway_control_event(&self.connection).await
}
#[must_use]
pub fn stats_snapshot(&self) -> TunnelStats {
self.stats.snapshot()
}
pub async fn send_stats_snapshot(&self) -> Result<()> {
send_gateway_stats(&self.connection, self.stats.snapshot()).await
}
#[cfg(target_os = "linux")]
pub async fn bridge_until_shutdown(self, packet_socket: PacketSocket) -> Result<()> {
let mut cam_refresh = CamRefresh::new(packet_socket.interface_mac());
let mut cam_refresh_tick = tokio::time::interval_at(
tokio::time::Instant::now() + CAM_REFRESH_INTERVAL,
CAM_REFRESH_INTERVAL,
);
cam_refresh_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let packet_socket = AsyncFd::new(packet_socket)
.context("failed to register AF_PACKET socket with Tokio")?;
let Self {
endpoint,
connection,
welcome,
stats,
..
} = self;
let mut stats_tick = tokio::time::interval_at(
tokio::time::Instant::now() + GATEWAY_STATS_INTERVAL,
GATEWAY_STATS_INTERVAL,
);
stats_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
shutdown = tokio::signal::ctrl_c() => {
shutdown.context("failed to wait for Ctrl-C")?;
if let Err(error) =
send_gateway_disconnect(&connection, "gateway shutting down").await
{
eprintln!("failed to send gateway disconnect to relay: {error:#}");
}
let _ = tokio::time::timeout(
DISCONNECT_DRAIN_TIMEOUT,
connection.closed(),
)
.await;
connection.close(0_u32.into(), b"gateway shutting down");
endpoint.wait_idle().await;
return Ok(());
}
lan_frame = read_lan_ethernet(&packet_socket) => {
let lan_frame = lan_frame?;
send_gateway_ethernet(&connection, &welcome, &stats, &lan_frame)?;
println!(
"{}",
gateway_frame_log_line(
packet_socket.get_ref().interface(),
FrameDirection::LanToRemote,
Some(welcome.peer_id()),
&lan_frame,
FrameAction::Forwarded,
None,
)
);
}
relay_frame = recv_gateway_ethernet(&connection, &welcome, &stats) => {
let relay_frame = relay_frame?;
cam_refresh
.observe_remote_frame(relay_frame.source_peer_id(), relay_frame.payload())?;
write_lan_ethernet(&packet_socket, relay_frame.payload()).await?;
println!(
"{}",
gateway_frame_log_line(
packet_socket.get_ref().interface(),
FrameDirection::RemoteToLan,
Some(relay_frame.source_peer_id()),
relay_frame.payload(),
FrameAction::Forwarded,
None,
)
);
}
_ = cam_refresh_tick.tick() => {
for frame in cam_refresh.refresh_frames() {
write_lan_ethernet(&packet_socket, &frame).await?;
}
}
_ = stats_tick.tick() => {
if let Err(error) = send_gateway_stats(&connection, stats.snapshot()).await {
eprintln!("failed to send gateway stats to relay: {error:#}");
}
}
control_stream = connection.accept_uni() => {
let control_stream = control_stream
.context("failed to accept gateway control event stream")?;
let control_event = read_gateway_control_event(control_stream).await?;
cam_refresh.observe_control_event(&control_event);
println!("{}", format_gateway_control_event(&control_event));
}
}
}
}
pub async fn shutdown(self, reason: &str) {
if send_gateway_disconnect(&self.connection, reason)
.await
.is_ok()
&& tokio::time::timeout(DISCONNECT_DRAIN_TIMEOUT, self.connection.closed())
.await
.is_ok()
{
self.endpoint.wait_idle().await;
return;
}
self.connection.close(0_u32.into(), reason.as_bytes());
self.endpoint.wait_idle().await;
}
}
fn send_gateway_ethernet(
connection: &quinn::Connection,
welcome: &ServerWelcome,
stats: &GatewayTunnelStats,
frame: &[u8],
) -> Result<()> {
if let Err(error) = EthernetFrame::parse(frame) {
stats.record_malformed_frame();
return Err(error).context("gateway Ethernet frame is malformed");
}
let datagram = encode_datagram(
FrameType::Ethernet,
welcome.room_id(),
welcome.peer_id(),
0,
frame,
)
.context("failed to encode gateway Ethernet datagram")?;
connection
.send_datagram(Bytes::from(datagram))
.context("failed to send gateway Ethernet datagram")?;
stats.record_ethernet_tx();
Ok(())
}
async fn recv_gateway_ethernet(
connection: &quinn::Connection,
welcome: &ServerWelcome,
stats: &GatewayTunnelStats,
) -> Result<ReceivedEthernetFrame> {
loop {
let datagram = connection.read_datagram().await?;
stats.record_datagram_rx();
let Ok(packet) = decode_datagram(&datagram) else {
stats.record_malformed_frame();
continue;
};
let header = packet.header();
if header.frame_type() != FrameType::Ethernet
|| header.room_id() != welcome.room_id()
|| header.peer_id() == welcome.peer_id()
{
stats.record_dropped_frame();
continue;
}
if EthernetFrame::parse(packet.payload()).is_err() {
stats.record_malformed_frame();
continue;
}
stats.record_ethernet_rx();
return Ok(ReceivedEthernetFrame {
source_peer_id: header.peer_id(),
payload: Bytes::copy_from_slice(packet.payload()),
});
}
}
async fn send_gateway_stats(connection: &quinn::Connection, stats: TunnelStats) -> Result<()> {
send_gateway_control_event(connection, ControlMessage::Stats(stats), "gateway stats").await
}
async fn send_gateway_disconnect(connection: &quinn::Connection, message: &str) -> Result<()> {
send_gateway_control_event(
connection,
ControlMessage::Disconnect {
reason: DisconnectReason::Normal,
message: message.to_owned(),
},
"gateway disconnect",
)
.await
}
async fn send_gateway_control_event(
connection: &quinn::Connection,
message: ControlMessage,
context: &str,
) -> Result<()> {
let mut send = connection
.open_uni()
.await
.with_context(|| format!("failed to open {context} stream"))?;
let frame =
encode_control_message(&message).with_context(|| format!("failed to encode {context}"))?;
send.write_all(&frame)
.await
.with_context(|| format!("failed to write {context}"))?;
send.finish()?;
Ok(())
}
#[derive(Debug, Default)]
struct GatewayTunnelStats {
ethernet_frames_tx: AtomicU64,
ethernet_frames_rx: AtomicU64,
datagrams_tx: AtomicU64,
datagrams_rx: AtomicU64,
dropped_frames: AtomicU64,
malformed_frames: AtomicU64,
}
impl GatewayTunnelStats {
fn record_ethernet_tx(&self) {
self.ethernet_frames_tx.fetch_add(1, Ordering::Relaxed);
self.datagrams_tx.fetch_add(1, Ordering::Relaxed);
}
fn record_ethernet_rx(&self) {
self.ethernet_frames_rx.fetch_add(1, Ordering::Relaxed);
}
fn record_datagram_rx(&self) {
self.datagrams_rx.fetch_add(1, Ordering::Relaxed);
}
fn record_dropped_frame(&self) {
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
}
fn record_malformed_frame(&self) {
self.dropped_frames.fetch_add(1, Ordering::Relaxed);
self.malformed_frames.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self) -> TunnelStats {
TunnelStats::new(
self.ethernet_frames_tx.load(Ordering::Relaxed),
self.ethernet_frames_rx.load(Ordering::Relaxed),
self.datagrams_tx.load(Ordering::Relaxed),
self.datagrams_rx.load(Ordering::Relaxed),
self.dropped_frames.load(Ordering::Relaxed),
self.malformed_frames.load(Ordering::Relaxed),
)
}
}
async fn recv_gateway_control_event(connection: &quinn::Connection) -> Result<ControlMessage> {
let recv = connection
.accept_uni()
.await
.context("failed to accept gateway control event stream")?;
read_gateway_control_event(recv).await
}
async fn read_gateway_control_event(mut recv: quinn::RecvStream) -> Result<ControlMessage> {
let frame = recv
.read_to_end(MAX_CONTROL_FRAME_LEN)
.await
.context("failed to read gateway control event")?;
decode_control_frame(&frame).context("failed to decode gateway control event")
}
fn format_gateway_control_event(event: &ControlMessage) -> String {
match event {
ControlMessage::PeerJoined(peer) if peer.role() == Role::Client => {
let mac = peer
.mac()
.map(|mac| mac.to_string())
.unwrap_or_else(|| "unknown".to_string());
format!(
"gateway control event: client peer {} joined with MAC {}",
peer.peer_id(),
mac
)
}
ControlMessage::PeerJoined(peer) if peer.role() == Role::Gateway => {
format!(
"gateway control event: LAN gateway peer {} joined",
peer.peer_id()
)
}
ControlMessage::PeerJoined(peer) => {
format!("gateway control event: peer {} joined", peer.peer_id())
}
ControlMessage::PeerLeft { peer_id, reason } => {
format!("gateway control event: peer {peer_id} left ({reason:?})")
}
_ => format!("gateway control event: {event:?}"),
}
}
#[cfg(target_os = "linux")]
fn gateway_frame_log_line(
interface: &str,
direction: FrameDirection,
peer_id: Option<u32>,
frame_bytes: &[u8],
action: FrameAction,
drop_reason: Option<lanparty_obs::DropReason>,
) -> String {
let log = match EthernetFrame::parse(frame_bytes) {
Ok(frame) => FrameLog::from_ethernet(direction, peer_id, action, drop_reason, frame),
Err(_) => FrameLog::malformed(direction, peer_id, frame_bytes.len()),
};
let source_mac = log
.source_mac()
.map(|mac| mac.to_string())
.unwrap_or_else(|| "-".to_owned());
let destination_mac = log
.destination_mac()
.map(|mac| mac.to_string())
.unwrap_or_else(|| "-".to_owned());
let ethertype_or_len = log
.ethertype_or_len()
.map(|value| format!("0x{value:04x}"))
.unwrap_or_else(|| "-".to_owned());
let peer_id = log
.peer_id()
.map(|peer_id| peer_id.to_string())
.unwrap_or_else(|| "-".to_owned());
let drop_reason = log
.drop_reason()
.map(|reason| format!("{reason:?}"))
.unwrap_or_else(|| "-".to_owned());
format!(
"gateway frame interface={} direction={:?} peer_id={} src={} dst={} ethertype_or_len={} len={} action={:?} drop_reason={}",
interface,
log.direction(),
peer_id,
source_mac,
destination_mac,
ethertype_or_len,
log.frame_len(),
log.action(),
drop_reason,
)
}
#[cfg(target_os = "linux")]
async fn read_lan_ethernet(packet_socket: &AsyncFd<PacketSocket>) -> Result<Bytes> {
loop {
let mut buffer = vec![0; MAX_STANDARD_ETHERNET_FRAME_LEN];
let mut guard = packet_socket
.readable()
.await
.context("failed to wait for LAN Ethernet frame")?;
match guard.try_io(|inner| inner.get_ref().recv_frame(&mut buffer)) {
Ok(Ok(len)) => {
buffer.truncate(len);
if EthernetFrame::parse(&buffer).is_ok() {
return Ok(Bytes::from(buffer));
}
}
Ok(Err(error)) => return Err(error).context("failed to read LAN Ethernet frame"),
Err(_would_block) => continue,
}
}
}
#[cfg(target_os = "linux")]
async fn write_lan_ethernet(packet_socket: &AsyncFd<PacketSocket>, frame: &[u8]) -> Result<()> {
EthernetFrame::parse(frame).context("relay Ethernet frame is malformed")?;
loop {
let mut guard = packet_socket
.writable()
.await
.context("failed to wait for writable LAN Ethernet socket")?;
match guard.try_io(|inner| inner.get_ref().send_frame(frame)) {
Ok(Ok(sent)) if sent == frame.len() => return Ok(()),
Ok(Ok(sent)) => bail!("partial LAN Ethernet frame write: {sent}/{}", frame.len()),
Ok(Err(error)) => return Err(error).context("failed to write LAN Ethernet frame"),
Err(_would_block) => continue,
}
}
}
#[cfg(target_os = "linux")]
#[derive(Debug, Clone)]
struct CamRefresh {
gateway_mac: MacAddr,
remote_clients: BTreeMap<u32, MacAddr>,
}
#[cfg(target_os = "linux")]
impl CamRefresh {
fn new(gateway_mac: MacAddr) -> Self {
Self {
gateway_mac,
remote_clients: BTreeMap::new(),
}
}
fn observe_remote_frame(&mut self, peer_id: u32, frame: &[u8]) -> Result<()> {
let frame = EthernetFrame::parse(frame).context("relay Ethernet frame is malformed")?;
let source = frame.source();
if source.is_valid_client_identity() {
self.remote_clients.insert(peer_id, source);
}
Ok(())
}
fn observe_control_event(&mut self, event: &ControlMessage) {
match event {
ControlMessage::PeerJoined(peer) => self.observe_peer_joined(peer),
ControlMessage::PeerLeft { peer_id, .. } => self.observe_peer_left(*peer_id),
_ => {}
}
}
fn observe_peer_joined(&mut self, peer: &PeerInfo) {
if peer.role() == Role::Client
&& let Some(mac) = peer.mac()
{
self.remote_clients.insert(peer.peer_id(), mac);
}
}
fn observe_peer_left(&mut self, peer_id: u32) {
self.remote_clients.remove(&peer_id);
}
fn refresh_frames(&self) -> Vec<Vec<u8>> {
self.remote_clients
.values()
.map(|source| cam_refresh_frame(*source, self.gateway_mac))
.collect()
}
#[cfg(test)]
fn remote_mac_count(&self) -> usize {
self.remote_clients.len()
}
}
#[cfg(target_os = "linux")]
fn cam_refresh_frame(source: MacAddr, destination: MacAddr) -> Vec<u8> {
let mut frame = Vec::with_capacity(MIN_ETHERNET_FRAME_WITHOUT_FCS);
frame.extend_from_slice(&destination.octets());
frame.extend_from_slice(&source.octets());
frame.extend_from_slice(&CAM_REFRESH_ETHERTYPE.to_be_bytes());
frame.extend_from_slice(CAM_REFRESH_PAYLOAD);
frame.resize(MIN_ETHERNET_FRAME_WITHOUT_FCS, 0);
frame
}
pub async fn connect_gateway(config: GatewayConfig) -> Result<GatewayConnection> {
let client_config = relay_client_config(config.relay_ca_cert_der())?;
let mut endpoint = Endpoint::client(client_bind_addr(config.relay_addr()))
.context("failed to bind gateway QUIC endpoint")?;
endpoint.set_default_client_config(client_config);
let connection = endpoint
.connect(config.relay_addr(), config.server_name())?
.await
.with_context(|| format!("failed to connect to relay {}", config.relay_addr()))?;
let peer_datagram_size = connection
.max_datagram_size()
.context("relay did not negotiate QUIC DATAGRAM support")?;
let hello_datagram_size = usize::from(config.max_datagram_size())
.min(peer_datagram_size)
.min(usize::from(u16::MAX)) as u16;
let hello = EndpointHello::gateway(config.room().clone(), hello_datagram_size)
.context("failed to build gateway hello")?;
let response = request_control_message(&connection, ControlMessage::Hello(hello)).await?;
match response {
ControlMessage::Welcome(welcome) => Ok(GatewayConnection {
endpoint,
connection,
config,
welcome,
stats: Arc::default(),
}),
ControlMessage::Reject(reject) => bail!(
"relay rejected gateway hello: {:?}: {}",
reject.reason(),
reject.message()
),
other => bail!("relay sent unexpected gateway handshake response: {other:?}"),
}
}
fn relay_client_config(relay_ca_cert_der: &[u8]) -> Result<ClientConfig> {
let mut roots = rustls::RootCertStore::empty();
roots
.add(CertificateDer::from(relay_ca_cert_der.to_vec()))
.context("failed to trust relay CA certificate")?;
let mut client_crypto = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
client_crypto.alpn_protocols = vec![RELAY_ALPN.to_vec()];
Ok(ClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto).context("failed to build QUIC client config")?,
)))
}
fn client_bind_addr(relay_addr: SocketAddr) -> SocketAddr {
match relay_addr.ip() {
IpAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
IpAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
}
}
async fn request_control_message(
connection: &quinn::Connection,
message: ControlMessage,
) -> Result<ControlMessage> {
let (mut send, mut recv) = connection.open_bi().await?;
let request = encode_control_message(&message)?;
send.write_all(&request).await?;
send.finish()?;
let response = recv.read_to_end(MAX_CONTROL_FRAME_LEN).await?;
Ok(decode_control_frame(&response)?)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use bytes::Bytes;
use lanparty_net::DEFAULT_RELAY_PORT;
use quinn::{ServerConfig, TransportConfig, crypto::rustls::QuicServerConfig};
use rustls::pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer};
use super::*;
#[test]
fn validates_gateway_config() {
let room = RoomCode::new("ROOM1").unwrap();
let cert = vec![1, 2, 3];
assert!(
GatewayConfig::new(
"127.0.0.1:443".parse().unwrap(),
"relay.local",
cert.clone(),
room.clone(),
"eth0",
1400,
)
.is_ok()
);
assert!(
GatewayConfig::new(
"127.0.0.1:443".parse().unwrap(),
"",
cert.clone(),
room.clone(),
"eth0",
1400,
)
.is_err()
);
assert!(
GatewayConfig::new(
"127.0.0.1:443".parse().unwrap(),
"relay.local",
Vec::new(),
room.clone(),
"eth0",
1400,
)
.is_err()
);
assert!(
GatewayConfig::new(
"127.0.0.1:443".parse().unwrap(),
"relay.local",
cert,
room,
"",
1400,
)
.is_err()
);
}
#[test]
fn accepts_iface_alias_for_gateway_interface() {
let args = GatewayArgs::parse_from([
"lanparty-gateway",
"--relay",
"relay.example.test",
"--relay-ca-cert",
"relay-cert.der",
"--room",
"ROOM1",
"--iface",
"eth0",
]);
assert_eq!(args.relay.host(), "relay.example.test");
assert_eq!(args.relay.port(), DEFAULT_RELAY_PORT);
assert_eq!(args.interface, "eth0");
}
#[tokio::test]
async fn connects_to_relay_control_stream_as_gateway() {
let (server_config, certificate) = test_server_config();
let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap()).unwrap();
let server_addr = endpoint.local_addr().unwrap();
let (stats_received_tx, stats_received_rx) = tokio::sync::oneshot::channel();
let server_task = tokio::spawn(async move {
let incoming = endpoint.accept().await.unwrap();
let connection = incoming.await.unwrap();
let (mut send, mut recv) = connection.accept_bi().await.unwrap();
let request = recv.read_to_end(MAX_CONTROL_FRAME_LEN).await.unwrap();
let message = decode_control_frame(&request).unwrap();
let ControlMessage::Hello(hello) = message else {
panic!("expected gateway hello");
};
assert_eq!(hello.role(), Role::Gateway);
assert_eq!(hello.room().as_str(), "ROOM1");
let response = encode_control_message(&ControlMessage::Welcome(
ServerWelcome::new(7, 1, 1200).unwrap(),
))
.unwrap();
send.write_all(&response).await.unwrap();
send.finish().unwrap();
let joined = encode_control_message(&ControlMessage::PeerJoined(
PeerInfo::new(99, Role::Client, Some(MacAddr::new([0x02, 0, 0, 0, 0, 9]))).unwrap(),
))
.unwrap();
let mut event_send = connection.open_uni().await.unwrap();
event_send.write_all(&joined).await.unwrap();
event_send.finish().unwrap();
let datagram = connection.read_datagram().await.unwrap();
let packet = decode_datagram(&datagram).unwrap();
let header = packet.header();
assert_eq!(header.frame_type(), FrameType::Ethernet);
assert_eq!(header.room_id(), 7);
assert_eq!(header.peer_id(), 1);
assert_eq!(packet.payload(), ethernet_frame(b"to relay").as_slice());
let response = encode_datagram(
FrameType::Ethernet,
7,
99,
0,
&ethernet_frame(b"from relay"),
)
.unwrap();
connection.send_datagram(Bytes::from(response)).unwrap();
let mut stats_recv = connection.accept_uni().await.unwrap();
let stats_frame = stats_recv.read_to_end(MAX_CONTROL_FRAME_LEN).await.unwrap();
let stats_message = decode_control_frame(&stats_frame).unwrap();
let ControlMessage::Stats(stats) = stats_message else {
panic!("expected gateway stats event");
};
assert_eq!(stats, TunnelStats::new(1, 1, 1, 1, 1, 1));
stats_received_tx.send(()).unwrap();
let mut disconnect_recv = connection.accept_uni().await.unwrap();
let disconnect_frame = disconnect_recv
.read_to_end(MAX_CONTROL_FRAME_LEN)
.await
.unwrap();
let disconnect_message = decode_control_frame(&disconnect_frame).unwrap();
let ControlMessage::Disconnect { reason, message } = disconnect_message else {
panic!("expected gateway disconnect event");
};
assert_eq!(reason, DisconnectReason::Normal);
assert_eq!(message, "test complete");
connection.closed().await;
endpoint.close(0_u32.into(), b"test complete");
endpoint.wait_idle().await;
});
let config = GatewayConfig::new(
server_addr,
"lanparty-relay.local",
certificate.as_ref().to_vec(),
RoomCode::new("ROOM1").unwrap(),
"eth0",
1400,
)
.unwrap();
let gateway = connect_gateway(config).await.unwrap();
assert_eq!(gateway.config().interface(), "eth0");
assert_eq!(gateway.welcome().room_id(), 7);
assert_eq!(gateway.welcome().peer_id(), 1);
let event = tokio::time::timeout(Duration::from_secs(5), gateway.recv_control_event())
.await
.unwrap()
.unwrap();
let ControlMessage::PeerJoined(peer) = event else {
panic!("expected gateway lifecycle event");
};
assert_eq!(peer.peer_id(), 99);
assert_eq!(peer.role(), Role::Client);
assert_eq!(peer.mac(), Some(MacAddr::new([0x02, 0, 0, 0, 0, 9])));
gateway.send_ethernet(&ethernet_frame(b"to relay")).unwrap();
let received = tokio::time::timeout(Duration::from_secs(5), gateway.recv_ethernet())
.await
.unwrap()
.unwrap();
assert_eq!(received.source_peer_id(), 99);
assert_eq!(received.payload(), ethernet_frame(b"from relay").as_slice());
assert!(gateway.send_ethernet(&[0; 4]).is_err());
let stats = gateway.stats_snapshot();
assert_eq!(stats, TunnelStats::new(1, 1, 1, 1, 1, 1));
gateway.send_stats_snapshot().await.unwrap();
tokio::time::timeout(Duration::from_secs(5), stats_received_rx)
.await
.unwrap()
.unwrap();
gateway.shutdown("test complete").await;
tokio::time::timeout(Duration::from_secs(5), server_task)
.await
.unwrap()
.unwrap();
}
#[cfg(target_os = "linux")]
#[test]
fn builds_padded_cam_refresh_frame() {
let gateway_mac = MacAddr::new([0x0a, 0, 0, 0, 0, 1]);
let remote_mac = MacAddr::new([0x02, 0, 0, 0, 0, 2]);
let frame = cam_refresh_frame(remote_mac, gateway_mac);
let parsed = EthernetFrame::parse(&frame).unwrap();
assert_eq!(frame.len(), MIN_ETHERNET_FRAME_WITHOUT_FCS);
assert_eq!(parsed.destination(), gateway_mac);
assert_eq!(parsed.source(), remote_mac);
assert_eq!(parsed.ethertype_or_len(), CAM_REFRESH_ETHERTYPE);
assert!(frame[14..].starts_with(CAM_REFRESH_PAYLOAD));
}
#[cfg(target_os = "linux")]
#[test]
fn tracks_valid_remote_macs_for_cam_refresh() {
let gateway_mac = MacAddr::new([0x0a, 0, 0, 0, 0, 1]);
let remote_mac = MacAddr::new([0x02, 0, 0, 0, 0, 2]);
let invalid_remote_mac = MacAddr::BROADCAST;
let mut refresh = CamRefresh::new(gateway_mac);
refresh
.observe_remote_frame(7, &ethernet_frame_from(remote_mac, b"remote"))
.unwrap();
refresh
.observe_remote_frame(8, &ethernet_frame_from(invalid_remote_mac, b"ignored"))
.unwrap();
let frames = refresh.refresh_frames();
let refresh_frame = EthernetFrame::parse(&frames[0]).unwrap();
assert_eq!(refresh.remote_mac_count(), 1);
assert_eq!(frames.len(), 1);
assert_eq!(refresh_frame.source(), remote_mac);
assert_eq!(refresh_frame.destination(), gateway_mac);
}
#[cfg(target_os = "linux")]
#[test]
fn updates_cam_refresh_from_lifecycle_events() {
let gateway_mac = MacAddr::new([0x0a, 0, 0, 0, 0, 1]);
let remote_mac = MacAddr::new([0x02, 0, 0, 0, 0, 2]);
let mut refresh = CamRefresh::new(gateway_mac);
refresh.observe_control_event(&ControlMessage::PeerJoined(
PeerInfo::new(7, Role::Client, Some(remote_mac)).unwrap(),
));
assert_eq!(refresh.remote_mac_count(), 1);
assert_eq!(
EthernetFrame::parse(&refresh.refresh_frames()[0])
.unwrap()
.source(),
remote_mac
);
refresh.observe_control_event(&ControlMessage::PeerLeft {
peer_id: 7,
reason: DisconnectReason::Normal,
});
assert_eq!(refresh.remote_mac_count(), 0);
assert!(refresh.refresh_frames().is_empty());
}
#[cfg(target_os = "linux")]
#[test]
fn formats_gateway_frame_log_lines() {
let line = gateway_frame_log_line(
"eth0",
FrameDirection::RemoteToLan,
Some(7),
&ethernet_frame(b"payload"),
FrameAction::Forwarded,
None,
);
assert_eq!(
line,
"gateway frame interface=eth0 direction=RemoteToLan peer_id=7 src=02:00:00:00:00:01 dst=02:00:00:00:00:02 ethertype_or_len=0x0800 len=21 action=Forwarded drop_reason=-"
);
}
fn test_server_config() -> (ServerConfig, CertificateDer<'static>) {
let certified_key =
rcgen::generate_simple_self_signed(vec!["lanparty-relay.local".into()]).unwrap();
let certificate = certified_key.cert.der().clone();
let cert_chain = vec![certificate.clone()];
let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(
certified_key.signing_key.serialize_der(),
));
let mut tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(cert_chain, private_key)
.unwrap();
tls_config.alpn_protocols = vec![RELAY_ALPN.to_vec()];
let mut server_config =
ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(tls_config).unwrap()));
let mut transport = TransportConfig::default();
transport.datagram_receive_buffer_size(Some(4 * 1024 * 1024));
transport.datagram_send_buffer_size(4 * 1024 * 1024);
server_config.transport_config(Arc::new(transport));
(server_config, certificate)
}
fn ethernet_frame(payload: &[u8]) -> Vec<u8> {
ethernet_frame_from(MacAddr::new([0x02, 0, 0, 0, 0, 1]), payload)
}
fn ethernet_frame_from(source: MacAddr, payload: &[u8]) -> Vec<u8> {
let mut frame = Vec::new();
frame.extend_from_slice(&[0x02, 0, 0, 0, 0, 2]);
frame.extend_from_slice(&source.octets());
frame.extend_from_slice(&0x0800_u16.to_be_bytes());
frame.extend_from_slice(payload);
frame
}
}