From 879cb689a40dca685ac89c46e04b3c94b0230c6a Mon Sep 17 00:00:00 2001 From: ddidderr Date: Thu, 21 May 2026 17:25:26 +0200 Subject: [PATCH] feat(ctrl): add control stream frame codec Add the framing layer that turns typed control messages into bytes for reliable QUIC streams. This keeps the codec next to the control schema while leaving the actual QUIC read/write loops for a later relay/client/gateway slice. The codec uses a four-byte big-endian length prefix followed by JSON. JSON is a phase-1 choice for inspectability during manual tunnel bring-up; the explicit length prefix keeps stream parsing deterministic and the 64 KiB cap prevents a peer from announcing unbounded control payloads. Decoding validates the message after deserialization so forged stream bytes cannot bypass constructor checks. The next networking slice can use complete_control_frame_len to split a stream buffer and decode_control_frame once a complete frame is available. Test Plan: - cargo fmt --check - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings Refs: PLAN.md reliable QUIC control stream requirements --- Cargo.lock | 32 +++++ Cargo.toml | 1 + README.md | 1 + crates/lanparty-ctrl/Cargo.toml | 1 + crates/lanparty-ctrl/src/codec.rs | 200 ++++++++++++++++++++++++++++++ crates/lanparty-ctrl/src/lib.rs | 12 +- 6 files changed, 244 insertions(+), 3 deletions(-) create mode 100644 crates/lanparty-ctrl/src/codec.rs diff --git a/Cargo.lock b/Cargo.lock index 782182d..2965411 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + [[package]] name = "lanparty-client-core" version = "0.1.0" @@ -17,6 +23,7 @@ dependencies = [ "lanparty-obs", "lanparty-proto", "serde", + "serde_json", "thiserror", ] @@ -48,6 +55,12 @@ dependencies = [ "lanparty-proto", ] +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + [[package]] name = "proc-macro2" version = "1.0.106" @@ -96,6 +109,19 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + [[package]] name = "syn" version = "2.0.117" @@ -132,3 +158,9 @@ name = "unicode-ident" version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index 3cd6483..7b379e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,5 +18,6 @@ edition = "2024" anyhow = "1" bytes = "1" serde = { version = "1", features = ["derive"] } +serde_json = "1" thiserror = "2" tracing = "0.1" diff --git a/README.md b/README.md index de72bdb..cc530f4 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Reliable control-plane schema shared by the QUIC stream handlers: - endpoint hello messages with role, room, MAC, and datagram budget - server welcome, reject, peer lifecycle, stats, and disconnect messages - room-code, role/MAC, peer-id, and effective-MTU validation +- length-prefixed JSON control frames for reliable QUIC streams ### `lanparty-obs` diff --git a/crates/lanparty-ctrl/Cargo.toml b/crates/lanparty-ctrl/Cargo.toml index 0ef8b01..1c580a4 100644 --- a/crates/lanparty-ctrl/Cargo.toml +++ b/crates/lanparty-ctrl/Cargo.toml @@ -7,4 +7,5 @@ edition.workspace = true lanparty-obs = { path = "../lanparty-obs" } lanparty-proto = { path = "../lanparty-proto" } serde.workspace = true +serde_json.workspace = true thiserror.workspace = true diff --git a/crates/lanparty-ctrl/src/codec.rs b/crates/lanparty-ctrl/src/codec.rs new file mode 100644 index 0000000..db57691 --- /dev/null +++ b/crates/lanparty-ctrl/src/codec.rs @@ -0,0 +1,200 @@ +use thiserror::Error; + +use crate::{ControlError, ControlMessage}; + +pub const CONTROL_LENGTH_PREFIX_LEN: usize = 4; +pub const MAX_CONTROL_MESSAGE_LEN: usize = 64 * 1024; + +#[derive(Debug, Error)] +pub enum ControlCodecError { + #[error("control frame is too short: got {actual} bytes, need at least {minimum}")] + FrameTooShort { actual: usize, minimum: usize }, + #[error("control message length {len} exceeds maximum {max}")] + MessageTooLarge { len: usize, max: usize }, + #[error( + "control frame payload is incomplete: declared {declared} bytes, available {available}" + )] + IncompletePayload { declared: usize, available: usize }, + #[error("control frame has {trailing} trailing bytes after one message")] + TrailingBytes { trailing: usize }, + #[error("control message JSON is invalid: {0}")] + Json(#[from] serde_json::Error), + #[error("control message failed validation: {0}")] + InvalidMessage(#[from] ControlError), +} + +pub fn encode_control_message(message: &ControlMessage) -> Result, ControlCodecError> { + message.validate()?; + + let payload = serde_json::to_vec(message)?; + let payload_len = payload.len(); + if payload_len > MAX_CONTROL_MESSAGE_LEN { + return Err(ControlCodecError::MessageTooLarge { + len: payload_len, + max: MAX_CONTROL_MESSAGE_LEN, + }); + } + + let mut frame = Vec::with_capacity(CONTROL_LENGTH_PREFIX_LEN + payload_len); + frame.extend_from_slice(&(payload_len as u32).to_be_bytes()); + frame.extend_from_slice(&payload); + Ok(frame) +} + +pub fn decode_control_frame(frame: &[u8]) -> Result { + let Some(total_len) = complete_control_frame_len(frame)? else { + return Err(incomplete_frame_error(frame)); + }; + + if frame.len() > total_len { + return Err(ControlCodecError::TrailingBytes { + trailing: frame.len() - total_len, + }); + } + + let payload = &frame[CONTROL_LENGTH_PREFIX_LEN..total_len]; + let message: ControlMessage = serde_json::from_slice(payload)?; + message.validate()?; + + Ok(message) +} + +pub fn complete_control_frame_len(buffer: &[u8]) -> Result, ControlCodecError> { + if buffer.len() < CONTROL_LENGTH_PREFIX_LEN { + return Ok(None); + } + + let declared = declared_payload_len(buffer)?; + if declared > MAX_CONTROL_MESSAGE_LEN { + return Err(ControlCodecError::MessageTooLarge { + len: declared, + max: MAX_CONTROL_MESSAGE_LEN, + }); + } + + let total = CONTROL_LENGTH_PREFIX_LEN + declared; + if buffer.len() < total { + return Ok(None); + } + + Ok(Some(total)) +} + +fn declared_payload_len(buffer: &[u8]) -> Result { + if buffer.len() < CONTROL_LENGTH_PREFIX_LEN { + return Err(ControlCodecError::FrameTooShort { + actual: buffer.len(), + minimum: CONTROL_LENGTH_PREFIX_LEN, + }); + } + + Ok(u32::from_be_bytes( + buffer[0..CONTROL_LENGTH_PREFIX_LEN] + .try_into() + .expect("length prefix slice has exact size"), + ) as usize) +} + +fn incomplete_frame_error(frame: &[u8]) -> ControlCodecError { + if frame.len() < CONTROL_LENGTH_PREFIX_LEN { + return ControlCodecError::FrameTooShort { + actual: frame.len(), + minimum: CONTROL_LENGTH_PREFIX_LEN, + }; + } + + let declared = declared_payload_len(frame).expect("frame has length prefix"); + ControlCodecError::IncompletePayload { + declared, + available: frame.len() - CONTROL_LENGTH_PREFIX_LEN, + } +} + +#[cfg(test)] +mod tests { + use lanparty_proto::{MIN_USEFUL_TAP_MTU, MacAddr}; + + use super::*; + use crate::{ControlMessage, DisconnectReason, EndpointHello, PeerInfo, Role, RoomCode}; + + fn room() -> RoomCode { + RoomCode::new("ROOM_1").unwrap() + } + + fn mac() -> MacAddr { + MacAddr::new([0x02, 0, 0, 0, 0, 1]) + } + + #[test] + fn encodes_and_decodes_control_messages() { + let message = ControlMessage::Hello(EndpointHello::client(room(), mac(), 1400).unwrap()); + + let frame = encode_control_message(&message).unwrap(); + let declared = u32::from_be_bytes(frame[0..4].try_into().unwrap()) as usize; + + assert_eq!(declared, frame.len() - CONTROL_LENGTH_PREFIX_LEN); + assert_eq!( + complete_control_frame_len(&frame).unwrap(), + Some(frame.len()) + ); + assert_eq!(decode_control_frame(&frame).unwrap(), message); + } + + #[test] + fn reports_incomplete_frames_for_stream_buffering() { + assert_eq!(complete_control_frame_len(&[0, 0, 0]).unwrap(), None); + + let frame = encode_control_message(&ControlMessage::PeerLeft { + peer_id: 1, + reason: DisconnectReason::Normal, + }) + .unwrap(); + + assert_eq!(complete_control_frame_len(&frame[..8]).unwrap(), None); + assert!(matches!( + decode_control_frame(&frame[..8]).unwrap_err(), + ControlCodecError::IncompletePayload { .. } + )); + } + + #[test] + fn rejects_trailing_bytes_after_one_frame() { + let mut frame = encode_control_message(&ControlMessage::PeerJoined( + PeerInfo::new(1, Role::Client, Some(mac())).unwrap(), + )) + .unwrap(); + frame.push(0); + + assert!(matches!( + decode_control_frame(&frame).unwrap_err(), + ControlCodecError::TrailingBytes { trailing: 1 } + )); + } + + #[test] + fn rejects_oversized_declared_length() { + let mut frame = [0; CONTROL_LENGTH_PREFIX_LEN]; + frame.copy_from_slice(&((MAX_CONTROL_MESSAGE_LEN as u32) + 1).to_be_bytes()); + + assert!(matches!( + complete_control_frame_len(&frame).unwrap_err(), + ControlCodecError::MessageTooLarge { .. } + )); + } + + #[test] + fn validates_decoded_messages() { + let json = format!( + r#"{{"type":"welcome","payload":{{"protocol_version":1,"room_id":1,"peer_id":0,"effective_tap_mtu":{}}}}}"#, + MIN_USEFUL_TAP_MTU + ); + let mut frame = Vec::new(); + frame.extend_from_slice(&(json.len() as u32).to_be_bytes()); + frame.extend_from_slice(json.as_bytes()); + + assert!(matches!( + decode_control_frame(&frame).unwrap_err(), + ControlCodecError::InvalidMessage(ControlError::InvalidPeerId) + )); + } +} diff --git a/crates/lanparty-ctrl/src/lib.rs b/crates/lanparty-ctrl/src/lib.rs index 32b0a02..98dc360 100644 --- a/crates/lanparty-ctrl/src/lib.rs +++ b/crates/lanparty-ctrl/src/lib.rs @@ -1,11 +1,17 @@ //! Reliable control-plane messages for the LAN party tunnel. //! -//! QUIC streams will carry these messages. The crate does not choose a stream -//! codec yet; it defines the typed handshake and status model that client, -//! relay, and gateway must validate consistently. +//! QUIC streams carry these messages as length-prefixed JSON frames. The crate +//! defines the typed handshake/status model and the small framing layer needed +//! by client, relay, and gateway stream handlers. use std::{fmt, str::FromStr}; +mod codec; + +pub use codec::{ + CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, MAX_CONTROL_MESSAGE_LEN, + complete_control_frame_len, decode_control_frame, encode_control_message, +}; pub use lanparty_obs::TunnelStats; use lanparty_proto::{MIN_USEFUL_TAP_MTU, MacAddr, MtuError, recommended_tap_mtu}; use thiserror::Error;