feat: Implement length-delimited framing for QUIC stream communication using tokio-util and futures.
This commit is contained in:
@@ -19,11 +19,13 @@ lanspread-utils = { path = "../lanspread-utils" }
|
||||
# external
|
||||
bytes = { workspace = true }
|
||||
eyre = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
gethostname = { workspace = true }
|
||||
if-addrs = { workspace = true }
|
||||
log = { workspace = true }
|
||||
s2n-quic = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
walkdir = { workspace = true }
|
||||
if-addrs = { workspace = true }
|
||||
|
||||
@@ -14,6 +14,7 @@ use std::{
|
||||
};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use if_addrs::{IfAddr, Interface, get_if_addrs};
|
||||
use lanspread_db::db::{Game, GameDB, GameFileDescription};
|
||||
use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser};
|
||||
@@ -35,6 +36,7 @@ use tokio::{
|
||||
},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
@@ -616,6 +618,20 @@ pub enum PeerCommand {
|
||||
GetPeerCount,
|
||||
}
|
||||
|
||||
async fn connect_to_peer(addr: SocketAddr) -> eyre::Result<Connection> {
|
||||
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||
|
||||
let client = QuicClient::builder()
|
||||
.with_tls(CERT_PEM)?
|
||||
.with_io("0.0.0.0:0")?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let conn = Connect::new(addr).with_server_name("localhost");
|
||||
let conn = client.connect(conn).await?;
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
async fn initial_peer_alive_check(conn: &mut Connection) -> bool {
|
||||
let remote_addr = conn.remote_addr().ok();
|
||||
|
||||
@@ -627,18 +643,20 @@ async fn initial_peer_alive_check(conn: &mut Connection) -> bool {
|
||||
}
|
||||
};
|
||||
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
let (rx, tx) = stream.split();
|
||||
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
// send ping
|
||||
if let Err(e) = tx.send(Request::Ping.encode()).await {
|
||||
if let Err(e) = framed_tx.send(Request::Ping.encode()).await {
|
||||
log::error!("{remote_addr:?} failed to send ping to peer: {e}");
|
||||
return false;
|
||||
}
|
||||
let _ = tx.close().await;
|
||||
let _ = framed_tx.close().await;
|
||||
|
||||
// receive pong
|
||||
if let Ok(Some(response)) = rx.receive().await {
|
||||
let response = Response::decode(response);
|
||||
if let Some(Ok(response_bytes)) = framed_rx.next().await {
|
||||
let response = Response::decode(response_bytes.freeze());
|
||||
match response {
|
||||
Response::Pong => {
|
||||
log::trace!("{remote_addr:?} peer is alive");
|
||||
@@ -795,7 +813,8 @@ async fn download_chunk(
|
||||
chunk: &DownloadChunk,
|
||||
) -> eyre::Result<()> {
|
||||
let stream = conn.open_bidirectional_stream().await?;
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
let (mut rx, tx) = stream.split();
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
let request = Request::GetGameFileChunk {
|
||||
game_id: game_id.to_string(),
|
||||
@@ -803,9 +822,9 @@ async fn download_chunk(
|
||||
offset: chunk.offset,
|
||||
length: chunk.length,
|
||||
};
|
||||
tx.write_all(&request.encode()).await?;
|
||||
framed_tx.send(request.encode()).await?;
|
||||
|
||||
tx.close().await?;
|
||||
framed_tx.close().await?;
|
||||
|
||||
// Validate the path to prevent directory traversal
|
||||
let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?;
|
||||
@@ -886,11 +905,13 @@ async fn download_whole_file(
|
||||
desc: &GameFileDescription,
|
||||
) -> eyre::Result<()> {
|
||||
let stream = conn.open_bidirectional_stream().await?;
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
let (mut rx, tx) = stream.split();
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
tx.write_all(&Request::GetGameFileData(desc.clone()).encode())
|
||||
framed_tx
|
||||
.send(Request::GetGameFileData(desc.clone()).encode())
|
||||
.await?;
|
||||
tx.close().await?;
|
||||
framed_tx.close().await?;
|
||||
|
||||
// Validate the path to prevent directory traversal
|
||||
let validated_path = validate_game_file_path(base_dir, &desc.relative_path)?;
|
||||
@@ -920,16 +941,8 @@ async fn download_from_peer(
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||
|
||||
let client = QuicClient::builder()
|
||||
.with_tls(CERT_PEM)?
|
||||
.with_io("0.0.0.0:0")?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||
let mut conn = client.connect(conn).await?;
|
||||
let mut conn = connect_to_peer(peer_addr).await?;
|
||||
conn.keep_alive(true)?;
|
||||
conn.keep_alive(true)?;
|
||||
|
||||
let base_dir = games_folder;
|
||||
@@ -1838,27 +1851,29 @@ async fn handle_peer_stream(
|
||||
ctx: PeerCtx,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
) -> eyre::Result<()> {
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
let (rx, tx) = stream.split();
|
||||
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
log::trace!("{remote_addr:?} peer stream opened");
|
||||
|
||||
// handle streams
|
||||
loop {
|
||||
match rx.receive().await {
|
||||
Ok(Some(data)) => {
|
||||
match framed_rx.next().await {
|
||||
Some(Ok(data)) => {
|
||||
log::trace!(
|
||||
"{:?} msg: (raw): {}",
|
||||
remote_addr,
|
||||
String::from_utf8_lossy(&data)
|
||||
);
|
||||
|
||||
let request = Request::decode(data);
|
||||
let request = Request::decode(data.freeze());
|
||||
log::debug!("{remote_addr:?} msg: {request:?}");
|
||||
|
||||
match request {
|
||||
Request::Ping => {
|
||||
// Respond with pong
|
||||
if let Err(e) = tx.send(Response::Pong.encode()).await {
|
||||
if let Err(e) = framed_tx.send(Response::Pong.encode()).await {
|
||||
log::error!("Failed to send pong: {e}");
|
||||
}
|
||||
}
|
||||
@@ -1887,7 +1902,7 @@ async fn handle_peer_stream(
|
||||
.collect()
|
||||
};
|
||||
|
||||
if let Err(e) = tx.send(Response::ListGames(games).encode()).await {
|
||||
if let Err(e) = framed_tx.send(Response::ListGames(games).encode()).await {
|
||||
log::error!("Failed to send ListGames response: {e}");
|
||||
}
|
||||
}
|
||||
@@ -1933,7 +1948,7 @@ async fn handle_peer_stream(
|
||||
Response::GameNotFound(id)
|
||||
};
|
||||
|
||||
if let Err(e) = tx.send(response.encode()).await {
|
||||
if let Err(e) = framed_tx.send(response.encode()).await {
|
||||
log::error!("Failed to send GetGame response: {e}");
|
||||
}
|
||||
}
|
||||
@@ -1946,8 +1961,12 @@ async fn handle_peer_stream(
|
||||
let maybe_game_dir = ctx.game_dir.read().await.clone();
|
||||
if let Some(game_dir) = maybe_game_dir {
|
||||
let base_dir = PathBuf::from(game_dir);
|
||||
// For file data, we need the raw stream, so we unwrap the FramedWrite
|
||||
let mut tx = framed_tx.into_inner();
|
||||
send_game_file_data(&desc, &mut tx, &base_dir).await;
|
||||
} else if let Err(e) = tx
|
||||
// Re-wrap for next iteration (though usually stream closes after file transfer)
|
||||
framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
} else if let Err(e) = framed_tx
|
||||
.send(
|
||||
Response::InvalidRequest(
|
||||
desc.relative_path.as_bytes().to_vec().into(),
|
||||
@@ -1973,6 +1992,8 @@ async fn handle_peer_stream(
|
||||
let maybe_game_dir = ctx.game_dir.read().await.clone();
|
||||
if let Some(game_dir) = maybe_game_dir {
|
||||
let base_dir = PathBuf::from(game_dir);
|
||||
// For file data, we need the raw stream, so we unwrap the FramedWrite
|
||||
let mut tx = framed_tx.into_inner();
|
||||
send_game_file_chunk(
|
||||
&game_id,
|
||||
&relative_path,
|
||||
@@ -1982,7 +2003,9 @@ async fn handle_peer_stream(
|
||||
&base_dir,
|
||||
)
|
||||
.await;
|
||||
} else if let Err(e) = tx
|
||||
// Re-wrap for next iteration
|
||||
framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
} else if let Err(e) = framed_tx
|
||||
.send(
|
||||
Response::InvalidRequest(
|
||||
relative_path.as_bytes().to_vec().into(),
|
||||
@@ -2020,12 +2043,13 @@ async fn handle_peer_stream(
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
log::trace!("{remote_addr:?} peer stream closed");
|
||||
|
||||
Some(Err(e)) => {
|
||||
log::error!("{remote_addr:?} peer stream error: {e}");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("{remote_addr:?} peer stream error: {e}");
|
||||
None => {
|
||||
log::trace!("{remote_addr:?} peer stream closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -2131,27 +2155,20 @@ async fn request_games_from_peer(
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
) -> eyre::Result<()> {
|
||||
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||
|
||||
let client = QuicClient::builder()
|
||||
.with_tls(CERT_PEM)?
|
||||
.with_io("0.0.0.0:0")?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||
let mut conn = client.connect(conn).await?;
|
||||
let mut conn = connect_to_peer(peer_addr).await?;
|
||||
|
||||
let stream = conn.open_bidirectional_stream().await?;
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
let (rx, tx) = stream.split();
|
||||
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
// Send ListGames request
|
||||
tx.send(Request::ListGames.encode()).await?;
|
||||
let _ = tx.close().await;
|
||||
framed_tx.send(Request::ListGames.encode()).await?;
|
||||
let _ = framed_tx.close().await;
|
||||
|
||||
// Receive response
|
||||
let mut data = BytesMut::new();
|
||||
while let Ok(Some(bytes)) = rx.receive().await {
|
||||
while let Some(Ok(bytes)) = framed_rx.next().await {
|
||||
data.extend_from_slice(&bytes);
|
||||
}
|
||||
|
||||
@@ -2179,23 +2196,17 @@ async fn request_games_from_peer(
|
||||
}
|
||||
|
||||
async fn announce_games_to_peer(peer_addr: SocketAddr, games: Vec<Game>) -> eyre::Result<()> {
|
||||
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||
|
||||
let client = QuicClient::builder()
|
||||
.with_tls(CERT_PEM)?
|
||||
.with_io("0.0.0.0:0")?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||
let mut conn = client.connect(conn).await?;
|
||||
let mut conn = connect_to_peer(peer_addr).await?;
|
||||
|
||||
let stream = conn.open_bidirectional_stream().await?;
|
||||
let (_, mut tx) = stream.split();
|
||||
let (_, tx) = stream.split();
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
// Send AnnounceGames request
|
||||
tx.send(Request::AnnounceGames(games).encode()).await?;
|
||||
let _ = tx.close().await;
|
||||
framed_tx
|
||||
.send(Request::AnnounceGames(games).encode())
|
||||
.await?;
|
||||
let _ = framed_tx.close().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2205,31 +2216,25 @@ async fn request_game_details_from_peer(
|
||||
game_id: &str,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
) -> eyre::Result<Vec<GameFileDescription>> {
|
||||
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||
|
||||
let client = QuicClient::builder()
|
||||
.with_tls(CERT_PEM)?
|
||||
.with_io("0.0.0.0:0")?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||
let mut conn = client.connect(conn).await?;
|
||||
let mut conn = connect_to_peer(peer_addr).await?;
|
||||
|
||||
let stream = conn.open_bidirectional_stream().await?;
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
let (rx, tx) = stream.split();
|
||||
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
tx.send(
|
||||
Request::GetGame {
|
||||
id: game_id.to_string(),
|
||||
}
|
||||
.encode(),
|
||||
)
|
||||
.await?;
|
||||
tx.close().await?;
|
||||
framed_tx
|
||||
.send(
|
||||
Request::GetGame {
|
||||
id: game_id.to_string(),
|
||||
}
|
||||
.encode(),
|
||||
)
|
||||
.await?;
|
||||
framed_tx.close().await?;
|
||||
|
||||
let mut data = BytesMut::new();
|
||||
while let Ok(Some(bytes)) = rx.receive().await {
|
||||
while let Some(Ok(bytes)) = framed_rx.next().await {
|
||||
data.extend_from_slice(&bytes);
|
||||
}
|
||||
|
||||
@@ -2555,16 +2560,7 @@ async fn scan_local_games(game_dir: &str) -> eyre::Result<GameDB> {
|
||||
}
|
||||
|
||||
async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result<bool> {
|
||||
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||
|
||||
let client = QuicClient::builder()
|
||||
.with_tls(CERT_PEM)?
|
||||
.with_io("0.0.0.0:0")?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||
let mut conn = client.connect(conn).await?;
|
||||
let mut conn = connect_to_peer(peer_addr).await?;
|
||||
|
||||
let is_alive = initial_peer_alive_check(&mut conn).await;
|
||||
Ok(is_alive)
|
||||
|
||||
Reference in New Issue
Block a user