feat(gateway): bridge relay and LAN frames

The gateway now runs the actual frame bridge after relay admission. It registers
the AF_PACKET socket with Tokio using AsyncFd, reads valid LAN Ethernet frames
and forwards them as relay datagrams, and writes valid relay Ethernet datagrams
back to the LAN socket.

The packet socket is opened nonblocking so the bridge can shut down cleanly on
Ctrl-C without leaving a blocking recv thread behind. Existing send_ethernet and
recv_ethernet helpers now share the same validation and encoding helpers used by
the bridge.

This still needs a privileged LAN-host smoke test with a real wired interface,
but the compile-time and loopback coverage now include the gateway relay side of
the bridge and the non-root-safe packet-socket validation.

Test Plan:
- cargo fmt --check
- cargo test --workspace
- cargo clippy --workspace --all-targets -- -D warnings

Refs: PLAN.md gateway AF_PACKET to relay bridge loop
This commit is contained in:
2026-05-21 18:16:04 +02:00
parent 128903c312
commit 63c829183f
5 changed files with 137 additions and 47 deletions
+1 -1
View File
@@ -25,5 +25,5 @@ rustls = { version = "0.23", default-features = false, features = ["ring", "std"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "2"
tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] }
tokio = { version = "1.52.3", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] }
tracing = "0.1"
+3 -3
View File
@@ -80,7 +80,7 @@ cargo run -p lanparty-gateway -- \
--interface eth0
```
The gateway currently connects to the relay as `role = gateway`, completes the
The gateway connects to the relay as `role = gateway`, completes the
control-stream hello/welcome handshake, opens an AF_PACKET socket on the LAN
interface, and has relay Ethernet datagram send/receive helpers. The frame
bridge loop is not wired yet.
interface, and bridges Ethernet frames between the relay and wired LAN until
shutdown.
+121 -34
View File
@@ -21,9 +21,13 @@ use lanparty_ctrl::{
CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN,
RoomCode, ServerWelcome, decode_control_frame, encode_control_message,
};
use lanparty_proto::{EthernetFrame, FrameType, decode_datagram, encode_datagram};
use lanparty_proto::{
EthernetFrame, FrameType, MAX_STANDARD_ETHERNET_FRAME_LEN, decode_datagram, encode_datagram,
};
use quinn::{ClientConfig, Endpoint, crypto::rustls::QuicClientConfig};
use rustls::pki_types::CertificateDer;
#[cfg(target_os = "linux")]
use tokio::io::unix::AsyncFd;
#[cfg(target_os = "linux")]
pub use packet::PacketSocket;
@@ -196,43 +200,39 @@ impl GatewayConnection {
}
pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> {
let datagram = encode_datagram(
FrameType::Ethernet,
self.welcome.room_id(),
self.welcome.peer_id(),
0,
frame,
)
.context("failed to encode gateway Ethernet datagram")?;
self.connection
.send_datagram(Bytes::from(datagram))
.context("failed to send gateway Ethernet datagram")?;
Ok(())
send_gateway_ethernet(&self.connection, &self.welcome, frame)
}
pub async fn recv_ethernet(&self) -> Result<ReceivedEthernetFrame> {
loop {
let datagram = self.connection.read_datagram().await?;
let Ok(packet) = decode_datagram(&datagram) else {
continue;
};
let header = packet.header();
if header.frame_type() != FrameType::Ethernet
|| header.room_id() != self.welcome.room_id()
|| header.peer_id() == self.welcome.peer_id()
{
continue;
}
if EthernetFrame::parse(packet.payload()).is_err() {
continue;
}
recv_gateway_ethernet(&self.connection, &self.welcome).await
}
return Ok(ReceivedEthernetFrame {
source_peer_id: header.peer_id(),
payload: Bytes::copy_from_slice(packet.payload()),
});
#[cfg(target_os = "linux")]
pub async fn bridge_until_shutdown(self, packet_socket: PacketSocket) -> Result<()> {
let packet_socket = AsyncFd::new(packet_socket)
.context("failed to register AF_PACKET socket with Tokio")?;
let Self {
endpoint,
connection,
welcome,
..
} = self;
loop {
tokio::select! {
shutdown = tokio::signal::ctrl_c() => {
shutdown.context("failed to wait for Ctrl-C")?;
connection.close(0_u32.into(), b"gateway shutting down");
endpoint.wait_idle().await;
return Ok(());
}
lan_frame = read_lan_ethernet(&packet_socket) => {
send_gateway_ethernet(&connection, &welcome, &lan_frame?)?;
}
relay_frame = recv_gateway_ethernet(&connection, &welcome) => {
write_lan_ethernet(&packet_socket, relay_frame?.payload()).await?;
}
}
}
}
@@ -242,6 +242,93 @@ impl GatewayConnection {
}
}
fn send_gateway_ethernet(
connection: &quinn::Connection,
welcome: &ServerWelcome,
frame: &[u8],
) -> Result<()> {
EthernetFrame::parse(frame).context("gateway Ethernet frame is malformed")?;
let datagram = encode_datagram(
FrameType::Ethernet,
welcome.room_id(),
welcome.peer_id(),
0,
frame,
)
.context("failed to encode gateway Ethernet datagram")?;
connection
.send_datagram(Bytes::from(datagram))
.context("failed to send gateway Ethernet datagram")?;
Ok(())
}
async fn recv_gateway_ethernet(
connection: &quinn::Connection,
welcome: &ServerWelcome,
) -> Result<ReceivedEthernetFrame> {
loop {
let datagram = connection.read_datagram().await?;
let Ok(packet) = decode_datagram(&datagram) else {
continue;
};
let header = packet.header();
if header.frame_type() != FrameType::Ethernet
|| header.room_id() != welcome.room_id()
|| header.peer_id() == welcome.peer_id()
{
continue;
}
if EthernetFrame::parse(packet.payload()).is_err() {
continue;
}
return Ok(ReceivedEthernetFrame {
source_peer_id: header.peer_id(),
payload: Bytes::copy_from_slice(packet.payload()),
});
}
}
#[cfg(target_os = "linux")]
async fn read_lan_ethernet(packet_socket: &AsyncFd<PacketSocket>) -> Result<Bytes> {
loop {
let mut buffer = vec![0; MAX_STANDARD_ETHERNET_FRAME_LEN];
let mut guard = packet_socket
.readable()
.await
.context("failed to wait for LAN Ethernet frame")?;
match guard.try_io(|inner| inner.get_ref().recv_frame(&mut buffer)) {
Ok(Ok(len)) => {
buffer.truncate(len);
if EthernetFrame::parse(&buffer).is_ok() {
return Ok(Bytes::from(buffer));
}
}
Ok(Err(error)) => return Err(error).context("failed to read LAN Ethernet frame"),
Err(_would_block) => continue,
}
}
}
#[cfg(target_os = "linux")]
async fn write_lan_ethernet(packet_socket: &AsyncFd<PacketSocket>, frame: &[u8]) -> Result<()> {
EthernetFrame::parse(frame).context("relay Ethernet frame is malformed")?;
loop {
let mut guard = packet_socket
.writable()
.await
.context("failed to wait for writable LAN Ethernet socket")?;
match guard.try_io(|inner| inner.get_ref().send_frame(frame)) {
Ok(Ok(sent)) if sent == frame.len() => return Ok(()),
Ok(Ok(sent)) => bail!("partial LAN Ethernet frame write: {sent}/{}", frame.len()),
Ok(Err(error)) => return Err(error).context("failed to write LAN Ethernet frame"),
Err(_would_block) => continue,
}
}
}
pub async fn connect_gateway(config: GatewayConfig) -> Result<GatewayConnection> {
let client_config = relay_client_config(config.relay_ca_cert_der())?;
let mut endpoint = Endpoint::client(client_bind_addr(config.relay_addr()))
+4 -7
View File
@@ -20,22 +20,19 @@ async fn main() -> anyhow::Result<()> {
gateway.welcome().room_id(),
gateway.welcome().effective_tap_mtu()
);
println!("AF_PACKET bridging is not wired yet; press Ctrl-C to stop");
#[cfg(target_os = "linux")]
let _packet_socket = {
{
let socket = PacketSocket::open(gateway.config().interface())?;
println!(
"lanparty-gateway opened AF_PACKET socket on {} (ifindex {})",
socket.interface(),
socket.interface_index()
);
socket
};
println!("lanparty-gateway bridging frames; press Ctrl-C to stop");
gateway.bridge_until_shutdown(socket).await?;
}
#[cfg(not(target_os = "linux"))]
anyhow::bail!("lanparty-gateway requires Linux AF_PACKET support");
tokio::signal::ctrl_c().await?;
gateway.shutdown("gateway shutting down").await;
Ok(())
}
+8 -2
View File
@@ -1,7 +1,7 @@
use std::{
ffi::CString,
io,
os::fd::{AsRawFd, FromRawFd, OwnedFd},
os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd},
};
const ETH_P_ALL: u16 = libc::ETH_P_ALL as u16;
@@ -22,7 +22,7 @@ impl PacketSocket {
// a new file descriptor or -1 without aliasing Rust-owned memory.
libc::socket(
libc::AF_PACKET,
libc::SOCK_RAW | libc::SOCK_CLOEXEC,
libc::SOCK_RAW | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
protocol,
)
};
@@ -110,6 +110,12 @@ impl PacketSocket {
}
}
impl AsRawFd for PacketSocket {
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
pub fn interface_index(interface: &str) -> io::Result<u32> {
if interface.trim().is_empty() {
return Err(io::Error::new(