From f9923bd61e333efb9d304bb0d92b1f50b531dcf9 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Tue, 18 Nov 2025 20:37:46 +0100 Subject: [PATCH] feat: Implement length-delimited framing for QUIC stream communication using `tokio-util` and `futures`. --- Cargo.lock | 27 ++--- Cargo.toml | 4 +- crates/lanspread-peer/Cargo.toml | 4 +- crates/lanspread-peer/src/lib.rs | 178 +++++++++++++++---------------- 4 files changed, 108 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33914a7..7536cf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,9 +393,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.10.1" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" dependencies = [ "serde", ] @@ -469,9 +469,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.45" +version = "1.2.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35900b6c8d709fb1d854671ae27aeaa9eec2f8b01b364e1619a40da3e6fe2afe" +checksum = "b97463e1064cb1b1c1384ad0a0b9c8abd0988e2a91f52606c80ef14aadb63e36" dependencies = [ "find-msvc-tools", "jobserver", @@ -1116,9 +1116,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" [[package]] name = "flate2" @@ -1225,6 +1225,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -2215,6 +2216,7 @@ version = "0.1.0" dependencies = [ "bytes", "eyre", + "futures", "gethostname", "if-addrs 0.11.1", "lanspread-compat", @@ -2225,6 +2227,7 @@ dependencies = [ "log", "s2n-quic", "tokio", + "tokio-util", "tracing", "uuid", "walkdir", @@ -2927,9 +2930,9 @@ checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "open" -version = "5.3.2" +version = "5.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2483562e62ea94312f3576a7aca397306df7990b8d89033e18766744377ef95" +checksum = "43bb73a7fa3799b198970490a51174027ba0d4ec504b03cd08caf513d40024bc" dependencies = [ "dunce", "is-wsl", @@ -4145,9 +4148,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.15.1" +version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa66c845eee442168b2c8134fec70ac50dc20e760769c8ba0ad1319ca1959b04" +checksum = "10574371d41b0d9b2cff89418eda27da52bcaff2cc8741db26382a77c29131f1" dependencies = [ "base64 0.22.1", "chrono", @@ -4164,9 +4167,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.15.1" +version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91a903660542fced4e99881aa481bdbaec1634568ee02e0b8bd57c64cb38955" +checksum = "08a72d8216842fdd57820dc78d840bef99248e35fb2554ff923319e60f2d686b" dependencies = [ "darling", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index e5d5bae..560cc2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,9 @@ bytes = { version = "1", features = ["serde"] } chrono = "0.4" clap = { version = "4", features = ["derive"] } eyre = "0.6" +futures = "0.3" gethostname = "1" +if-addrs = "0.11" itertools = "0.14" log = "0.4" mdns-sd = "0.17" @@ -36,11 +38,11 @@ tauri-plugin-shell = "2" tauri-plugin-dialog = "2" tauri-plugin-store = "2" tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } uuid = { version = "1", features = ["v7"] } walkdir = "2" -if-addrs = "0.11" windows = { version = "0.62", features = [ "Win32", "Win32_UI", diff --git a/crates/lanspread-peer/Cargo.toml b/crates/lanspread-peer/Cargo.toml index 0f62655..3d79ea1 100644 --- a/crates/lanspread-peer/Cargo.toml +++ b/crates/lanspread-peer/Cargo.toml @@ -19,11 +19,13 @@ lanspread-utils = { path = "../lanspread-utils" } # external bytes = { workspace = true } eyre = { workspace = true } +futures = { workspace = true } gethostname = { workspace = true } +if-addrs = { workspace = true } log = { workspace = true } s2n-quic = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } tracing = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } -if-addrs = { workspace = true } diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 089ce6a..06c8d5a 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -14,6 +14,7 @@ use std::{ }; use bytes::BytesMut; +use futures::{SinkExt, StreamExt}; use if_addrs::{IfAddr, Interface, get_if_addrs}; use lanspread_db::db::{Game, GameDB, GameFileDescription}; use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser}; @@ -35,6 +36,7 @@ use tokio::{ }, task::JoinHandle, }; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use uuid::Uuid; use crate::{ @@ -616,6 +618,20 @@ pub enum PeerCommand { GetPeerCount, } +async fn connect_to_peer(addr: SocketAddr) -> eyre::Result { + let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; + + let client = QuicClient::builder() + .with_tls(CERT_PEM)? + .with_io("0.0.0.0:0")? + .with_limits(limits)? + .start()?; + + let conn = Connect::new(addr).with_server_name("localhost"); + let conn = client.connect(conn).await?; + Ok(conn) +} + async fn initial_peer_alive_check(conn: &mut Connection) -> bool { let remote_addr = conn.remote_addr().ok(); @@ -627,18 +643,20 @@ async fn initial_peer_alive_check(conn: &mut Connection) -> bool { } }; - let (mut rx, mut tx) = stream.split(); + let (rx, tx) = stream.split(); + let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); // send ping - if let Err(e) = tx.send(Request::Ping.encode()).await { + if let Err(e) = framed_tx.send(Request::Ping.encode()).await { log::error!("{remote_addr:?} failed to send ping to peer: {e}"); return false; } - let _ = tx.close().await; + let _ = framed_tx.close().await; // receive pong - if let Ok(Some(response)) = rx.receive().await { - let response = Response::decode(response); + if let Some(Ok(response_bytes)) = framed_rx.next().await { + let response = Response::decode(response_bytes.freeze()); match response { Response::Pong => { log::trace!("{remote_addr:?} peer is alive"); @@ -795,7 +813,8 @@ async fn download_chunk( chunk: &DownloadChunk, ) -> eyre::Result<()> { let stream = conn.open_bidirectional_stream().await?; - let (mut rx, mut tx) = stream.split(); + let (mut rx, tx) = stream.split(); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); let request = Request::GetGameFileChunk { game_id: game_id.to_string(), @@ -803,9 +822,9 @@ async fn download_chunk( offset: chunk.offset, length: chunk.length, }; - tx.write_all(&request.encode()).await?; + framed_tx.send(request.encode()).await?; - tx.close().await?; + framed_tx.close().await?; // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?; @@ -886,11 +905,13 @@ async fn download_whole_file( desc: &GameFileDescription, ) -> eyre::Result<()> { let stream = conn.open_bidirectional_stream().await?; - let (mut rx, mut tx) = stream.split(); + let (mut rx, tx) = stream.split(); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); - tx.write_all(&Request::GetGameFileData(desc.clone()).encode()) + framed_tx + .send(Request::GetGameFileData(desc.clone()).encode()) .await?; - tx.close().await?; + framed_tx.close().await?; // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(base_dir, &desc.relative_path)?; @@ -920,16 +941,8 @@ async fn download_from_peer( return Ok(Vec::new()); } - let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; - - let client = QuicClient::builder() - .with_tls(CERT_PEM)? - .with_io("0.0.0.0:0")? - .with_limits(limits)? - .start()?; - - let conn = Connect::new(peer_addr).with_server_name("localhost"); - let mut conn = client.connect(conn).await?; + let mut conn = connect_to_peer(peer_addr).await?; + conn.keep_alive(true)?; conn.keep_alive(true)?; let base_dir = games_folder; @@ -1838,27 +1851,29 @@ async fn handle_peer_stream( ctx: PeerCtx, remote_addr: Option, ) -> eyre::Result<()> { - let (mut rx, mut tx) = stream.split(); + let (rx, tx) = stream.split(); + let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); log::trace!("{remote_addr:?} peer stream opened"); // handle streams loop { - match rx.receive().await { - Ok(Some(data)) => { + match framed_rx.next().await { + Some(Ok(data)) => { log::trace!( "{:?} msg: (raw): {}", remote_addr, String::from_utf8_lossy(&data) ); - let request = Request::decode(data); + let request = Request::decode(data.freeze()); log::debug!("{remote_addr:?} msg: {request:?}"); match request { Request::Ping => { // Respond with pong - if let Err(e) = tx.send(Response::Pong.encode()).await { + if let Err(e) = framed_tx.send(Response::Pong.encode()).await { log::error!("Failed to send pong: {e}"); } } @@ -1887,7 +1902,7 @@ async fn handle_peer_stream( .collect() }; - if let Err(e) = tx.send(Response::ListGames(games).encode()).await { + if let Err(e) = framed_tx.send(Response::ListGames(games).encode()).await { log::error!("Failed to send ListGames response: {e}"); } } @@ -1933,7 +1948,7 @@ async fn handle_peer_stream( Response::GameNotFound(id) }; - if let Err(e) = tx.send(response.encode()).await { + if let Err(e) = framed_tx.send(response.encode()).await { log::error!("Failed to send GetGame response: {e}"); } } @@ -1946,8 +1961,12 @@ async fn handle_peer_stream( let maybe_game_dir = ctx.game_dir.read().await.clone(); if let Some(game_dir) = maybe_game_dir { let base_dir = PathBuf::from(game_dir); + // For file data, we need the raw stream, so we unwrap the FramedWrite + let mut tx = framed_tx.into_inner(); send_game_file_data(&desc, &mut tx, &base_dir).await; - } else if let Err(e) = tx + // Re-wrap for next iteration (though usually stream closes after file transfer) + framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + } else if let Err(e) = framed_tx .send( Response::InvalidRequest( desc.relative_path.as_bytes().to_vec().into(), @@ -1973,6 +1992,8 @@ async fn handle_peer_stream( let maybe_game_dir = ctx.game_dir.read().await.clone(); if let Some(game_dir) = maybe_game_dir { let base_dir = PathBuf::from(game_dir); + // For file data, we need the raw stream, so we unwrap the FramedWrite + let mut tx = framed_tx.into_inner(); send_game_file_chunk( &game_id, &relative_path, @@ -1982,7 +2003,9 @@ async fn handle_peer_stream( &base_dir, ) .await; - } else if let Err(e) = tx + // Re-wrap for next iteration + framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); + } else if let Err(e) = framed_tx .send( Response::InvalidRequest( relative_path.as_bytes().to_vec().into(), @@ -2020,12 +2043,13 @@ async fn handle_peer_stream( } } } - Ok(None) => { - log::trace!("{remote_addr:?} peer stream closed"); + + Some(Err(e)) => { + log::error!("{remote_addr:?} peer stream error: {e}"); break; } - Err(e) => { - log::error!("{remote_addr:?} peer stream error: {e}"); + None => { + log::trace!("{remote_addr:?} peer stream closed"); break; } } @@ -2131,27 +2155,20 @@ async fn request_games_from_peer( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, ) -> eyre::Result<()> { - let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; - - let client = QuicClient::builder() - .with_tls(CERT_PEM)? - .with_io("0.0.0.0:0")? - .with_limits(limits)? - .start()?; - - let conn = Connect::new(peer_addr).with_server_name("localhost"); - let mut conn = client.connect(conn).await?; + let mut conn = connect_to_peer(peer_addr).await?; let stream = conn.open_bidirectional_stream().await?; - let (mut rx, mut tx) = stream.split(); + let (rx, tx) = stream.split(); + let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); // Send ListGames request - tx.send(Request::ListGames.encode()).await?; - let _ = tx.close().await; + framed_tx.send(Request::ListGames.encode()).await?; + let _ = framed_tx.close().await; // Receive response let mut data = BytesMut::new(); - while let Ok(Some(bytes)) = rx.receive().await { + while let Some(Ok(bytes)) = framed_rx.next().await { data.extend_from_slice(&bytes); } @@ -2179,23 +2196,17 @@ async fn request_games_from_peer( } async fn announce_games_to_peer(peer_addr: SocketAddr, games: Vec) -> eyre::Result<()> { - let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; - - let client = QuicClient::builder() - .with_tls(CERT_PEM)? - .with_io("0.0.0.0:0")? - .with_limits(limits)? - .start()?; - - let conn = Connect::new(peer_addr).with_server_name("localhost"); - let mut conn = client.connect(conn).await?; + let mut conn = connect_to_peer(peer_addr).await?; let stream = conn.open_bidirectional_stream().await?; - let (_, mut tx) = stream.split(); + let (_, tx) = stream.split(); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); // Send AnnounceGames request - tx.send(Request::AnnounceGames(games).encode()).await?; - let _ = tx.close().await; + framed_tx + .send(Request::AnnounceGames(games).encode()) + .await?; + let _ = framed_tx.close().await; Ok(()) } @@ -2205,31 +2216,25 @@ async fn request_game_details_from_peer( game_id: &str, peer_game_db: Arc>, ) -> eyre::Result> { - let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; - - let client = QuicClient::builder() - .with_tls(CERT_PEM)? - .with_io("0.0.0.0:0")? - .with_limits(limits)? - .start()?; - - let conn = Connect::new(peer_addr).with_server_name("localhost"); - let mut conn = client.connect(conn).await?; + let mut conn = connect_to_peer(peer_addr).await?; let stream = conn.open_bidirectional_stream().await?; - let (mut rx, mut tx) = stream.split(); + let (rx, tx) = stream.split(); + let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); + let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); - tx.send( - Request::GetGame { - id: game_id.to_string(), - } - .encode(), - ) - .await?; - tx.close().await?; + framed_tx + .send( + Request::GetGame { + id: game_id.to_string(), + } + .encode(), + ) + .await?; + framed_tx.close().await?; let mut data = BytesMut::new(); - while let Ok(Some(bytes)) = rx.receive().await { + while let Some(Ok(bytes)) = framed_rx.next().await { data.extend_from_slice(&bytes); } @@ -2555,16 +2560,7 @@ async fn scan_local_games(game_dir: &str) -> eyre::Result { } async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result { - let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; - - let client = QuicClient::builder() - .with_tls(CERT_PEM)? - .with_io("0.0.0.0:0")? - .with_limits(limits)? - .start()?; - - let conn = Connect::new(peer_addr).with_server_name("localhost"); - let mut conn = client.connect(conn).await?; + let mut conn = connect_to_peer(peer_addr).await?; let is_alive = initial_peer_alive_check(&mut conn).await; Ok(is_alive)