diff --git a/Cargo.lock b/Cargo.lock index 782182d..2965411 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + [[package]] name = "lanparty-client-core" version = "0.1.0" @@ -17,6 +23,7 @@ dependencies = [ "lanparty-obs", "lanparty-proto", "serde", + "serde_json", "thiserror", ] @@ -48,6 +55,12 @@ dependencies = [ "lanparty-proto", ] +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + [[package]] name = "proc-macro2" version = "1.0.106" @@ -96,6 +109,19 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + [[package]] name = "syn" version = "2.0.117" @@ -132,3 +158,9 @@ name = "unicode-ident" version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index 3cd6483..7b379e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,5 +18,6 @@ edition = "2024" anyhow = "1" bytes = "1" serde = { version = "1", features = ["derive"] } +serde_json = "1" thiserror = "2" tracing = "0.1" diff --git a/README.md b/README.md index de72bdb..cc530f4 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Reliable control-plane schema shared by the QUIC stream handlers: - endpoint hello messages with role, room, MAC, and datagram budget - server welcome, reject, peer lifecycle, stats, and disconnect messages - room-code, role/MAC, peer-id, and effective-MTU validation +- length-prefixed JSON control frames for reliable QUIC streams ### `lanparty-obs` diff --git a/crates/lanparty-ctrl/Cargo.toml b/crates/lanparty-ctrl/Cargo.toml index 0ef8b01..1c580a4 100644 --- a/crates/lanparty-ctrl/Cargo.toml +++ b/crates/lanparty-ctrl/Cargo.toml @@ -7,4 +7,5 @@ edition.workspace = true lanparty-obs = { path = "../lanparty-obs" } lanparty-proto = { path = "../lanparty-proto" } serde.workspace = true +serde_json.workspace = true thiserror.workspace = true diff --git a/crates/lanparty-ctrl/src/codec.rs b/crates/lanparty-ctrl/src/codec.rs new file mode 100644 index 0000000..db57691 --- /dev/null +++ b/crates/lanparty-ctrl/src/codec.rs @@ -0,0 +1,200 @@ +use thiserror::Error; + +use crate::{ControlError, ControlMessage}; + +pub const CONTROL_LENGTH_PREFIX_LEN: usize = 4; +pub const MAX_CONTROL_MESSAGE_LEN: usize = 64 * 1024; + +#[derive(Debug, Error)] +pub enum ControlCodecError { + #[error("control frame is too short: got {actual} bytes, need at least {minimum}")] + FrameTooShort { actual: usize, minimum: usize }, + #[error("control message length {len} exceeds maximum {max}")] + MessageTooLarge { len: usize, max: usize }, + #[error( + "control frame payload is incomplete: declared {declared} bytes, available {available}" + )] + IncompletePayload { declared: usize, available: usize }, + #[error("control frame has {trailing} trailing bytes after one message")] + TrailingBytes { trailing: usize }, + #[error("control message JSON is invalid: {0}")] + Json(#[from] serde_json::Error), + #[error("control message failed validation: {0}")] + InvalidMessage(#[from] ControlError), +} + +pub fn encode_control_message(message: &ControlMessage) -> Result, ControlCodecError> { + message.validate()?; + + let payload = serde_json::to_vec(message)?; + let payload_len = payload.len(); + if payload_len > MAX_CONTROL_MESSAGE_LEN { + return Err(ControlCodecError::MessageTooLarge { + len: payload_len, + max: MAX_CONTROL_MESSAGE_LEN, + }); + } + + let mut frame = Vec::with_capacity(CONTROL_LENGTH_PREFIX_LEN + payload_len); + frame.extend_from_slice(&(payload_len as u32).to_be_bytes()); + frame.extend_from_slice(&payload); + Ok(frame) +} + +pub fn decode_control_frame(frame: &[u8]) -> Result { + let Some(total_len) = complete_control_frame_len(frame)? else { + return Err(incomplete_frame_error(frame)); + }; + + if frame.len() > total_len { + return Err(ControlCodecError::TrailingBytes { + trailing: frame.len() - total_len, + }); + } + + let payload = &frame[CONTROL_LENGTH_PREFIX_LEN..total_len]; + let message: ControlMessage = serde_json::from_slice(payload)?; + message.validate()?; + + Ok(message) +} + +pub fn complete_control_frame_len(buffer: &[u8]) -> Result, ControlCodecError> { + if buffer.len() < CONTROL_LENGTH_PREFIX_LEN { + return Ok(None); + } + + let declared = declared_payload_len(buffer)?; + if declared > MAX_CONTROL_MESSAGE_LEN { + return Err(ControlCodecError::MessageTooLarge { + len: declared, + max: MAX_CONTROL_MESSAGE_LEN, + }); + } + + let total = CONTROL_LENGTH_PREFIX_LEN + declared; + if buffer.len() < total { + return Ok(None); + } + + Ok(Some(total)) +} + +fn declared_payload_len(buffer: &[u8]) -> Result { + if buffer.len() < CONTROL_LENGTH_PREFIX_LEN { + return Err(ControlCodecError::FrameTooShort { + actual: buffer.len(), + minimum: CONTROL_LENGTH_PREFIX_LEN, + }); + } + + Ok(u32::from_be_bytes( + buffer[0..CONTROL_LENGTH_PREFIX_LEN] + .try_into() + .expect("length prefix slice has exact size"), + ) as usize) +} + +fn incomplete_frame_error(frame: &[u8]) -> ControlCodecError { + if frame.len() < CONTROL_LENGTH_PREFIX_LEN { + return ControlCodecError::FrameTooShort { + actual: frame.len(), + minimum: CONTROL_LENGTH_PREFIX_LEN, + }; + } + + let declared = declared_payload_len(frame).expect("frame has length prefix"); + ControlCodecError::IncompletePayload { + declared, + available: frame.len() - CONTROL_LENGTH_PREFIX_LEN, + } +} + +#[cfg(test)] +mod tests { + use lanparty_proto::{MIN_USEFUL_TAP_MTU, MacAddr}; + + use super::*; + use crate::{ControlMessage, DisconnectReason, EndpointHello, PeerInfo, Role, RoomCode}; + + fn room() -> RoomCode { + RoomCode::new("ROOM_1").unwrap() + } + + fn mac() -> MacAddr { + MacAddr::new([0x02, 0, 0, 0, 0, 1]) + } + + #[test] + fn encodes_and_decodes_control_messages() { + let message = ControlMessage::Hello(EndpointHello::client(room(), mac(), 1400).unwrap()); + + let frame = encode_control_message(&message).unwrap(); + let declared = u32::from_be_bytes(frame[0..4].try_into().unwrap()) as usize; + + assert_eq!(declared, frame.len() - CONTROL_LENGTH_PREFIX_LEN); + assert_eq!( + complete_control_frame_len(&frame).unwrap(), + Some(frame.len()) + ); + assert_eq!(decode_control_frame(&frame).unwrap(), message); + } + + #[test] + fn reports_incomplete_frames_for_stream_buffering() { + assert_eq!(complete_control_frame_len(&[0, 0, 0]).unwrap(), None); + + let frame = encode_control_message(&ControlMessage::PeerLeft { + peer_id: 1, + reason: DisconnectReason::Normal, + }) + .unwrap(); + + assert_eq!(complete_control_frame_len(&frame[..8]).unwrap(), None); + assert!(matches!( + decode_control_frame(&frame[..8]).unwrap_err(), + ControlCodecError::IncompletePayload { .. } + )); + } + + #[test] + fn rejects_trailing_bytes_after_one_frame() { + let mut frame = encode_control_message(&ControlMessage::PeerJoined( + PeerInfo::new(1, Role::Client, Some(mac())).unwrap(), + )) + .unwrap(); + frame.push(0); + + assert!(matches!( + decode_control_frame(&frame).unwrap_err(), + ControlCodecError::TrailingBytes { trailing: 1 } + )); + } + + #[test] + fn rejects_oversized_declared_length() { + let mut frame = [0; CONTROL_LENGTH_PREFIX_LEN]; + frame.copy_from_slice(&((MAX_CONTROL_MESSAGE_LEN as u32) + 1).to_be_bytes()); + + assert!(matches!( + complete_control_frame_len(&frame).unwrap_err(), + ControlCodecError::MessageTooLarge { .. } + )); + } + + #[test] + fn validates_decoded_messages() { + let json = format!( + r#"{{"type":"welcome","payload":{{"protocol_version":1,"room_id":1,"peer_id":0,"effective_tap_mtu":{}}}}}"#, + MIN_USEFUL_TAP_MTU + ); + let mut frame = Vec::new(); + frame.extend_from_slice(&(json.len() as u32).to_be_bytes()); + frame.extend_from_slice(json.as_bytes()); + + assert!(matches!( + decode_control_frame(&frame).unwrap_err(), + ControlCodecError::InvalidMessage(ControlError::InvalidPeerId) + )); + } +} diff --git a/crates/lanparty-ctrl/src/lib.rs b/crates/lanparty-ctrl/src/lib.rs index 32b0a02..98dc360 100644 --- a/crates/lanparty-ctrl/src/lib.rs +++ b/crates/lanparty-ctrl/src/lib.rs @@ -1,11 +1,17 @@ //! Reliable control-plane messages for the LAN party tunnel. //! -//! QUIC streams will carry these messages. The crate does not choose a stream -//! codec yet; it defines the typed handshake and status model that client, -//! relay, and gateway must validate consistently. +//! QUIC streams carry these messages as length-prefixed JSON frames. The crate +//! defines the typed handshake/status model and the small framing layer needed +//! by client, relay, and gateway stream handlers. use std::{fmt, str::FromStr}; +mod codec; + +pub use codec::{ + CONTROL_LENGTH_PREFIX_LEN, ControlCodecError, MAX_CONTROL_MESSAGE_LEN, + complete_control_frame_len, decode_control_frame, encode_control_message, +}; pub use lanparty_obs::TunnelStats; use lanparty_proto::{MIN_USEFUL_TAP_MTU, MacAddr, MtuError, recommended_tap_mtu}; use thiserror::Error;