diff --git a/Cargo.toml b/Cargo.toml index 8015eb8..bb97e4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,5 +25,5 @@ rustls = { version = "0.23", default-features = false, features = ["ring", "std" serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "2" -tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } +tokio = { version = "1.52.3", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] } tracing = "0.1" diff --git a/README.md b/README.md index 2cec12b..4d793b4 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ cargo run -p lanparty-gateway -- \ --interface eth0 ``` -The gateway currently connects to the relay as `role = gateway`, completes the +The gateway connects to the relay as `role = gateway`, completes the control-stream hello/welcome handshake, opens an AF_PACKET socket on the LAN -interface, and has relay Ethernet datagram send/receive helpers. The frame -bridge loop is not wired yet. +interface, and bridges Ethernet frames between the relay and wired LAN until +shutdown. diff --git a/crates/lanparty-gateway/src/lib.rs b/crates/lanparty-gateway/src/lib.rs index 7eb24aa..61a6779 100644 --- a/crates/lanparty-gateway/src/lib.rs +++ b/crates/lanparty-gateway/src/lib.rs @@ -21,9 +21,13 @@ use lanparty_ctrl::{ CONTROL_LENGTH_PREFIX_LEN, ControlMessage, EndpointHello, MAX_CONTROL_MESSAGE_LEN, RELAY_ALPN, RoomCode, ServerWelcome, decode_control_frame, encode_control_message, }; -use lanparty_proto::{EthernetFrame, FrameType, decode_datagram, encode_datagram}; +use lanparty_proto::{ + EthernetFrame, FrameType, MAX_STANDARD_ETHERNET_FRAME_LEN, decode_datagram, encode_datagram, +}; use quinn::{ClientConfig, Endpoint, crypto::rustls::QuicClientConfig}; use rustls::pki_types::CertificateDer; +#[cfg(target_os = "linux")] +use tokio::io::unix::AsyncFd; #[cfg(target_os = "linux")] pub use packet::PacketSocket; @@ -196,43 +200,39 @@ impl GatewayConnection { } pub fn send_ethernet(&self, frame: &[u8]) -> Result<()> { - let datagram = encode_datagram( - FrameType::Ethernet, - self.welcome.room_id(), - self.welcome.peer_id(), - 0, - frame, - ) - .context("failed to encode gateway Ethernet datagram")?; - - self.connection - .send_datagram(Bytes::from(datagram)) - .context("failed to send gateway Ethernet datagram")?; - - Ok(()) + send_gateway_ethernet(&self.connection, &self.welcome, frame) } pub async fn recv_ethernet(&self) -> Result { - loop { - let datagram = self.connection.read_datagram().await?; - let Ok(packet) = decode_datagram(&datagram) else { - continue; - }; - let header = packet.header(); - if header.frame_type() != FrameType::Ethernet - || header.room_id() != self.welcome.room_id() - || header.peer_id() == self.welcome.peer_id() - { - continue; - } - if EthernetFrame::parse(packet.payload()).is_err() { - continue; - } + recv_gateway_ethernet(&self.connection, &self.welcome).await + } - return Ok(ReceivedEthernetFrame { - source_peer_id: header.peer_id(), - payload: Bytes::copy_from_slice(packet.payload()), - }); + #[cfg(target_os = "linux")] + pub async fn bridge_until_shutdown(self, packet_socket: PacketSocket) -> Result<()> { + let packet_socket = AsyncFd::new(packet_socket) + .context("failed to register AF_PACKET socket with Tokio")?; + let Self { + endpoint, + connection, + welcome, + .. + } = self; + + loop { + tokio::select! { + shutdown = tokio::signal::ctrl_c() => { + shutdown.context("failed to wait for Ctrl-C")?; + connection.close(0_u32.into(), b"gateway shutting down"); + endpoint.wait_idle().await; + return Ok(()); + } + lan_frame = read_lan_ethernet(&packet_socket) => { + send_gateway_ethernet(&connection, &welcome, &lan_frame?)?; + } + relay_frame = recv_gateway_ethernet(&connection, &welcome) => { + write_lan_ethernet(&packet_socket, relay_frame?.payload()).await?; + } + } } } @@ -242,6 +242,93 @@ impl GatewayConnection { } } +fn send_gateway_ethernet( + connection: &quinn::Connection, + welcome: &ServerWelcome, + frame: &[u8], +) -> Result<()> { + EthernetFrame::parse(frame).context("gateway Ethernet frame is malformed")?; + let datagram = encode_datagram( + FrameType::Ethernet, + welcome.room_id(), + welcome.peer_id(), + 0, + frame, + ) + .context("failed to encode gateway Ethernet datagram")?; + + connection + .send_datagram(Bytes::from(datagram)) + .context("failed to send gateway Ethernet datagram")?; + + Ok(()) +} + +async fn recv_gateway_ethernet( + connection: &quinn::Connection, + welcome: &ServerWelcome, +) -> Result { + loop { + let datagram = connection.read_datagram().await?; + let Ok(packet) = decode_datagram(&datagram) else { + continue; + }; + let header = packet.header(); + if header.frame_type() != FrameType::Ethernet + || header.room_id() != welcome.room_id() + || header.peer_id() == welcome.peer_id() + { + continue; + } + if EthernetFrame::parse(packet.payload()).is_err() { + continue; + } + + return Ok(ReceivedEthernetFrame { + source_peer_id: header.peer_id(), + payload: Bytes::copy_from_slice(packet.payload()), + }); + } +} + +#[cfg(target_os = "linux")] +async fn read_lan_ethernet(packet_socket: &AsyncFd) -> Result { + loop { + let mut buffer = vec![0; MAX_STANDARD_ETHERNET_FRAME_LEN]; + let mut guard = packet_socket + .readable() + .await + .context("failed to wait for LAN Ethernet frame")?; + match guard.try_io(|inner| inner.get_ref().recv_frame(&mut buffer)) { + Ok(Ok(len)) => { + buffer.truncate(len); + if EthernetFrame::parse(&buffer).is_ok() { + return Ok(Bytes::from(buffer)); + } + } + Ok(Err(error)) => return Err(error).context("failed to read LAN Ethernet frame"), + Err(_would_block) => continue, + } + } +} + +#[cfg(target_os = "linux")] +async fn write_lan_ethernet(packet_socket: &AsyncFd, frame: &[u8]) -> Result<()> { + EthernetFrame::parse(frame).context("relay Ethernet frame is malformed")?; + loop { + let mut guard = packet_socket + .writable() + .await + .context("failed to wait for writable LAN Ethernet socket")?; + match guard.try_io(|inner| inner.get_ref().send_frame(frame)) { + Ok(Ok(sent)) if sent == frame.len() => return Ok(()), + Ok(Ok(sent)) => bail!("partial LAN Ethernet frame write: {sent}/{}", frame.len()), + Ok(Err(error)) => return Err(error).context("failed to write LAN Ethernet frame"), + Err(_would_block) => continue, + } + } +} + pub async fn connect_gateway(config: GatewayConfig) -> Result { let client_config = relay_client_config(config.relay_ca_cert_der())?; let mut endpoint = Endpoint::client(client_bind_addr(config.relay_addr())) diff --git a/crates/lanparty-gateway/src/main.rs b/crates/lanparty-gateway/src/main.rs index eb9c319..e8ad3dd 100644 --- a/crates/lanparty-gateway/src/main.rs +++ b/crates/lanparty-gateway/src/main.rs @@ -20,22 +20,19 @@ async fn main() -> anyhow::Result<()> { gateway.welcome().room_id(), gateway.welcome().effective_tap_mtu() ); - println!("AF_PACKET bridging is not wired yet; press Ctrl-C to stop"); #[cfg(target_os = "linux")] - let _packet_socket = { + { let socket = PacketSocket::open(gateway.config().interface())?; println!( "lanparty-gateway opened AF_PACKET socket on {} (ifindex {})", socket.interface(), socket.interface_index() ); - socket - }; + println!("lanparty-gateway bridging frames; press Ctrl-C to stop"); + gateway.bridge_until_shutdown(socket).await?; + } #[cfg(not(target_os = "linux"))] anyhow::bail!("lanparty-gateway requires Linux AF_PACKET support"); - tokio::signal::ctrl_c().await?; - gateway.shutdown("gateway shutting down").await; - Ok(()) } diff --git a/crates/lanparty-gateway/src/packet.rs b/crates/lanparty-gateway/src/packet.rs index c710b3a..f80d673 100644 --- a/crates/lanparty-gateway/src/packet.rs +++ b/crates/lanparty-gateway/src/packet.rs @@ -1,7 +1,7 @@ use std::{ ffi::CString, io, - os::fd::{AsRawFd, FromRawFd, OwnedFd}, + os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, }; const ETH_P_ALL: u16 = libc::ETH_P_ALL as u16; @@ -22,7 +22,7 @@ impl PacketSocket { // a new file descriptor or -1 without aliasing Rust-owned memory. libc::socket( libc::AF_PACKET, - libc::SOCK_RAW | libc::SOCK_CLOEXEC, + libc::SOCK_RAW | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, protocol, ) }; @@ -110,6 +110,12 @@ impl PacketSocket { } } +impl AsRawFd for PacketSocket { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } +} + pub fn interface_index(interface: &str) -> io::Result { if interface.trim().is_empty() { return Err(io::Error::new(