From 5dd356eca8cb56bc8a1a8b9ae93d681a4bc30ba5 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sun, 7 Jun 2026 21:12:15 +0200 Subject: [PATCH] fix(stream-install)!: stream archive payloads as raw frames Streamed installs were sending FileChunk payloads through the shared JSON Message impl. serde_json serializes bytes as arrays of integers, which bloats wire traffic and burns CPU on large archives. Replace StreamInstallFrame encoding with tagged frames: JSON control frames keep their shape under tag 0, while file chunks carry raw bytes under tag 1. The stream install metadata now carries unpacked archive size and mandatory CRC32. The CLI unrar provider validates CRCs up front, runs one archive-wide unrar p stream, splits stdout by listed file sizes, and refuses trailing or missing bytes. That avoids solid archive re-decompression and sidesteps unrar wildcard masks for path arguments. Receivers now sample existing download progress events for streamed installs, report staging-relative chunk paths, and retry trusted peers with a fresh streamed-install transaction after a failed attempt. The current protocol policy does not preserve compatibility with older stream-install builds. Test Plan: - just fmt - just test - just clippy - git diff --check - git diff --cached --check BREAKING CHANGE: StreamInstallFrame now uses tagged frames with raw chunk payloads and requires current peers on both sides of streamed installs. Refs: NEXT_STEPS_CLAUDES_REVIEW.md --- crates/lanspread-peer-cli/src/lib.rs | 260 ++++++++++++------ crates/lanspread-peer/src/handlers.rs | 135 +++++---- crates/lanspread-peer/src/stream_install.rs | 105 ++++++- crates/lanspread-proto/src/lib.rs | 73 +++-- .../tests/stream_install_frame.rs | 42 +++ 5 files changed, 450 insertions(+), 165 deletions(-) create mode 100644 crates/lanspread-proto/tests/stream_install_frame.rs diff --git a/crates/lanspread-peer-cli/src/lib.rs b/crates/lanspread-peer-cli/src/lib.rs index b05eefa..38fab43 100644 --- a/crates/lanspread-peer-cli/src/lib.rs +++ b/crates/lanspread-peer-cli/src/lib.rs @@ -15,7 +15,10 @@ use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, U use lanspread_proto::StreamInstallFrame; use serde::Serialize; use serde_json::{Value, json}; -use tokio::{io::AsyncReadExt, sync::mpsc}; +use tokio::{ + io::{AsyncRead, AsyncReadExt}, + sync::mpsc, +}; use tokio_util::sync::CancellationToken; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; @@ -298,53 +301,19 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { StreamInstallFrame::ArchiveBegin { archive_name: archive_name.clone(), solid: listing.solid, + unpacked_size: listing.unpacked_size(), }, ) .await?; - for entry in listing.entries { - if cancel_token.is_cancelled() { - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - - match entry.kind { - RarEntryKind::Directory => { - send_stream_frame( - &frames, - StreamInstallFrame::Directory { - relative_path: entry.relative_path, - }, - ) - .await?; - } - RarEntryKind::File => { - send_stream_frame( - &frames, - StreamInstallFrame::FileBegin { - relative_path: entry.relative_path.clone(), - size: entry.size, - crc32: entry.crc32, - }, - ) - .await?; - stream_unrar_file( - &self.program, - archive, - &entry.relative_path, - &frames, - cancel_token.clone(), - ) - .await?; - send_stream_frame( - &frames, - StreamInstallFrame::FileEnd { - relative_path: entry.relative_path, - }, - ) - .await?; - } - } - } + stream_unrar_entries( + &self.program, + archive, + &listing.entries, + &frames, + cancel_token.clone(), + ) + .await?; send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await }) @@ -357,6 +326,16 @@ struct RarListing { entries: Vec, } +impl RarListing { + fn unpacked_size(&self) -> u64 { + self.entries + .iter() + .filter(|entry| entry.kind == RarEntryKind::File) + .map(|entry| entry.size) + .sum() + } +} + #[derive(Debug, Clone, PartialEq, Eq)] struct RarEntry { relative_path: String, @@ -448,26 +427,32 @@ fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Re return Ok(()); }; - let size = match kind { - RarEntryKind::File => draft - .size - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?, - RarEntryKind::Directory => 0, + let (size, crc32) = match kind { + RarEntryKind::File => { + let size = draft + .size + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; + let crc32 = draft + .crc32 + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; + (size, Some(crc32)) + } + RarEntryKind::Directory => (0, None), }; entries.push(RarEntry { relative_path, kind, size, - crc32: draft.crc32, + crc32, }); Ok(()) } -async fn stream_unrar_file( +async fn stream_unrar_entries( program: &Path, archive: &Path, - relative_path: &str, + entries: &[RarEntry], frames: &mpsc::Sender, cancel_token: CancellationToken, ) -> eyre::Result<()> { @@ -476,28 +461,112 @@ async fn stream_unrar_file( .arg("-inul") .arg("-cfg-") .arg(archive) - .arg(relative_path) .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn()?; - let mut stdout = child - .stdout - .take() - .ok_or_eyre("unrar stdout was not captured")?; - let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; + let result = async { + let mut stdout = child + .stdout + .take() + .ok_or_eyre("unrar stdout was not captured")?; + let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; - loop { - let read = tokio::select! { - () = cancel_token.cancelled() => { - let _ = child.kill().await; - eyre::bail!("streaming {relative_path} from {} was cancelled", archive.display()); + for entry in entries { + if cancel_token.is_cancelled() { + eyre::bail!("streamed archive {} was cancelled", archive.display()); } - read = stdout.read(&mut buffer) => read?, - }; + match entry.kind { + RarEntryKind::Directory => { + send_stream_frame( + frames, + StreamInstallFrame::Directory { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + RarEntryKind::File => { + let Some(crc32) = entry.crc32 else { + eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); + }; + send_stream_frame( + frames, + StreamInstallFrame::FileBegin { + relative_path: entry.relative_path.clone(), + size: entry.size, + crc32, + }, + ) + .await?; + stream_unrar_file_from_stdout( + &mut stdout, + archive, + entry, + frames, + &mut buffer, + &cancel_token, + ) + .await?; + send_stream_frame( + frames, + StreamInstallFrame::FileEnd { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + } + } + + let extra = + read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?; + if extra != 0 { + eyre::bail!( + "unrar produced bytes after listed entries for {}", + archive.display() + ); + } + + let status = wait_unrar_child(&mut child, &cancel_token, archive).await?; + if !status.success() { + eyre::bail!( + "unrar p failed for {} with status {status}", + archive.display() + ); + } + + Ok(()) + } + .await; + + if result.is_err() { + let _ = child.kill().await; + } + + result +} + +async fn stream_unrar_file_from_stdout( + stdout: &mut (impl AsyncRead + Unpin), + archive: &Path, + entry: &RarEntry, + frames: &mpsc::Sender, + buffer: &mut [u8], + cancel_token: &CancellationToken, +) -> eyre::Result<()> { + let mut remaining = entry.size; + while remaining > 0 { + let read_len = usize::try_from(remaining.min(buffer.len() as u64))?; + let read = + read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?; if read == 0 { - break; + eyre::bail!( + "unrar ended while streaming {} from {}; {remaining} bytes missing", + entry.relative_path, + archive.display() + ); } send_stream_frame( @@ -507,20 +576,40 @@ async fn stream_unrar_file( }, ) .await?; - } - - let status = child.wait().await?; - if !status.success() { - eyre::bail!( - "unrar p failed for {}:{} with status {status}", - archive.display(), - relative_path - ); + remaining = remaining.saturating_sub(u64::try_from(read)?); } Ok(()) } +async fn read_unrar_stdout( + stdout: &mut (impl AsyncRead + Unpin), + buffer: &mut [u8], + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + read = stdout.read(buffer) => Ok(read?), + } +} + +async fn wait_unrar_child( + child: &mut tokio::process::Child, + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + let _ = child.kill().await; + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + status = child.wait() => Ok(status?), + } +} + async fn send_stream_frame( frames: &mpsc::Sender, frame: StreamInstallFrame, @@ -639,7 +728,7 @@ mod tests { let listing = parse_unrar_listing( r#" Archive: game.eti -Details: RAR 5 +Details: RAR 5, solid Name: bin/payload.bin Type: File @@ -652,7 +741,7 @@ Details: RAR 5 ) .expect("listing should parse"); - assert!(!listing.solid); + assert!(listing.solid); assert_eq!( listing.entries, vec![ @@ -672,6 +761,23 @@ Details: RAR 5 ); } + #[test] + fn rejects_unrar_file_entries_without_crc32() { + let err = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/payload.bin + Type: File + Size: 123 +"#, + ) + .expect_err("file entries without CRC32 should be rejected"); + + assert!(err.to_string().contains("has no CRC32")); + } + #[tokio::test] async fn fixture_unpacker_creates_install_payload() { let temp = TempDir::new("lanspread-peer-cli-fixture"); diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index c5851c5..8985f74 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -497,11 +497,11 @@ pub async fn handle_stream_install_game_command( } }; peers.sort(); - let Some(peer_addr) = peers.into_iter().next() else { + if peers.is_empty() { log::error!("No peer selected for streamed install of {id}"); send_download_failed(tx_notify_ui, &id); return; - }; + } match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { BeginOperationResult::Started => {} @@ -525,15 +525,8 @@ pub async fn handle_stream_install_game_command( let ctx_clone = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.spawn(async move { - run_stream_install_operation( - ctx_clone, - tx_notify_ui, - id, - game_root, - peer_addr, - cancel_token, - ) - .await; + run_stream_install_operation(ctx_clone, tx_notify_ui, id, game_root, peers, cancel_token) + .await; }); } @@ -582,7 +575,7 @@ async fn run_stream_install_operation( tx_notify_ui: UnboundedSender, id: String, game_root: PathBuf, - peer_addr: SocketAddr, + peer_addrs: Vec, cancel_token: CancellationToken, ) { let download_guard = OperationGuard::download( @@ -597,63 +590,93 @@ async fn run_stream_install_operation( PeerEvent::DownloadGameFilesBegin { id: id.clone() }, ); - let transaction = match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id) - .await - { - Ok(transaction) => transaction, - Err(err) => { - log::error!("Failed to prepare streamed install for {id}: {err}"); - finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await; - return; + let mut last_receive_error = None; + for peer_addr in peer_addrs { + if cancel_token.is_cancelled() { + last_receive_error = Some(eyre::eyre!("streamed install for {id} was cancelled")); + break; } - }; - let receive_result = receive_streamed_install( - peer_addr, - &id, - transaction.staging_dir(), - tx_notify_ui.clone(), - cancel_token.clone(), - ) - .await; + let transaction = + match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id).await { + Ok(transaction) => transaction, + Err(err) => { + log::error!("Failed to prepare streamed install for {id}: {err}"); + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) + .await; + return; + } + }; - match receive_result { - Ok(()) => { - if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing) + let receive_result = receive_streamed_install( + peer_addr, + &id, + transaction.staging_dir(), + tx_notify_ui.clone(), + cancel_token.clone(), + ) + .await; + + match receive_result { + Ok(()) => { + if transition_download_to_install( + &ctx, + &tx_notify_ui, + &id, + OperationKind::Installing, + ) .await - { - clear_active_download(&ctx, &id).await; - send_download_finished(&tx_notify_ui, &id); - download_guard.disarm(); - commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; - } else { + { + clear_active_download(&ctx, &id).await; + send_download_finished(&tx_notify_ui, &id); + download_guard.disarm(); + commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; + return; + } + if let Err(err) = transaction.rollback().await { log::error!("Failed to roll back streamed install for {id}: {err}"); } finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) .await; + return; } - } - Err(err) => { - if let Err(rollback_err) = transaction.rollback().await { - log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); + Err(err) => { + if let Err(rollback_err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); + } + if cancel_token.is_cancelled() { + log::info!("Streamed install download cancelled for {id}: {err}"); + last_receive_error = Some(err); + break; + } + + log::warn!( + "Streamed install attempt from {peer_addr} failed for {id}; trying another peer if available: {err}" + ); + last_receive_error = Some(err); } - let download_was_cancelled = cancel_token.is_cancelled(); - if download_was_cancelled { - log::info!("Streamed install download cancelled for {id}: {err}"); - } else { - log::error!("Streamed install download failed for {id}: {err}"); - } - finish_failed_stream_download( - &ctx, - &tx_notify_ui, - &id, - download_guard, - download_was_cancelled, - ) - .await; } } + + let download_was_cancelled = cancel_token.is_cancelled(); + if let Some(err) = last_receive_error { + if download_was_cancelled { + log::info!("Streamed install download cancelled for {id}: {err}"); + } else { + log::error!("Streamed install download failed for {id}: {err}"); + } + } else { + log::error!("Streamed install download failed for {id}: no peer attempts were made"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; } async fn finish_failed_stream_download( diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs index 3d2e168..f6c5df5 100644 --- a/crates/lanspread-peer/src/stream_install.rs +++ b/crates/lanspread-peer/src/stream_install.rs @@ -4,6 +4,7 @@ use std::{ path::{Path, PathBuf}, pin::Pin, sync::Arc, + time::{Duration, Instant}, }; use bytes::Bytes; @@ -15,6 +16,7 @@ use tokio::{ fs::File, io::AsyncWriteExt, sync::{mpsc, mpsc::UnboundedSender}, + time::{self, MissedTickBehavior}, }; use tokio_util::{ codec::{FramedRead, FramedWrite, LengthDelimitedCodec}, @@ -22,6 +24,7 @@ use tokio_util::{ }; use crate::{ + DownloadProgress, PeerEvent, install::root_eti_archives, network::connect_to_peer, @@ -29,6 +32,7 @@ use crate::{ }; const FRAME_CHANNEL_DEPTH: usize = 16; +const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500); pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; @@ -182,10 +186,18 @@ pub(crate) async fn receive_streamed_install( let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); let mut current_file: Option = None; + let mut progress = StreamInstallProgress::new(game_id.to_string()); + let mut progress_interval = time::interval(STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL); + progress_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + progress_interval.tick().await; loop { let next = tokio::select! { () = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"), + _ = progress_interval.tick() => { + progress.emit_current(&tx_notify_ui); + continue; + } next = framed_rx.next() => next, }; @@ -199,9 +211,13 @@ pub(crate) async fn receive_streamed_install( StreamInstallFrame::ArchiveBegin { archive_name, solid, + unpacked_size, } => { + progress.add_total(unpacked_size); + progress.emit_snapshot(&tx_notify_ui, 0); log::info!( - "Receiving streamed install archive {archive_name} for {game_id} (solid={solid})" + "Receiving streamed install archive {archive_name} for {game_id} \ + (solid={solid}, unpacked_size={unpacked_size})" ); } StreamInstallFrame::Directory { relative_path } => { @@ -227,8 +243,10 @@ pub(crate) async fn receive_streamed_install( let Some(file) = current_file.as_mut() else { eyre::bail!("received FileChunk without FileBegin"); }; - file.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) + let length = file + .write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) .await?; + progress.record_bytes(length); } StreamInstallFrame::FileEnd { relative_path } => { let Some(file) = current_file.take() else { @@ -243,6 +261,7 @@ pub(crate) async fn receive_streamed_install( if current_file.is_some() { eyre::bail!("streamed install completed with an open file"); } + progress.emit_snapshot(&tx_notify_ui, 0); return Ok(()); } StreamInstallFrame::Error { message } => { @@ -252,11 +271,68 @@ pub(crate) async fn receive_streamed_install( } } +struct StreamInstallProgress { + id: String, + total_bytes: u64, + downloaded_bytes: u64, + last_downloaded_bytes: u64, + last_at: Instant, +} + +impl StreamInstallProgress { + fn new(id: String) -> Self { + Self { + id, + total_bytes: 0, + downloaded_bytes: 0, + last_downloaded_bytes: 0, + last_at: Instant::now(), + } + } + + fn add_total(&mut self, bytes: u64) { + self.total_bytes = self.total_bytes.saturating_add(bytes); + } + + fn record_bytes(&mut self, bytes: u64) { + self.downloaded_bytes = self.downloaded_bytes.saturating_add(bytes); + } + + fn emit_current(&mut self, tx_notify_ui: &UnboundedSender) { + let now = Instant::now(); + let speed = bytes_per_second( + self.downloaded_bytes + .saturating_sub(self.last_downloaded_bytes), + now.duration_since(self.last_at), + ); + + self.last_downloaded_bytes = self.downloaded_bytes; + self.last_at = now; + self.emit_snapshot(tx_notify_ui, speed); + } + + fn emit_snapshot(&self, tx_notify_ui: &UnboundedSender, bytes_per_second: u64) { + let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesProgress(DownloadProgress { + id: self.id.clone(), + downloaded_bytes: self.downloaded_bytes, + total_bytes: self.total_bytes, + bytes_per_second, + active_peer_count: 1, + })); + } +} + +fn bytes_per_second(bytes: u64, elapsed: Duration) -> u64 { + let millis = elapsed.as_millis().max(1); + let rate = u128::from(bytes).saturating_mul(1_000) / millis; + u64::try_from(rate).unwrap_or(u64::MAX) +} + struct IncomingFile { relative_path: String, path: PathBuf, expected_size: u64, - expected_crc32: Option, + expected_crc32: u32, received: u64, hasher: Hasher, file: File, @@ -267,7 +343,7 @@ impl IncomingFile { relative_path: String, path: PathBuf, expected_size: u64, - expected_crc32: Option, + expected_crc32: u32, file: File, ) -> Self { Self { @@ -287,7 +363,7 @@ impl IncomingFile { peer_addr: SocketAddr, tx_notify_ui: &UnboundedSender, bytes: Bytes, - ) -> eyre::Result<()> { + ) -> eyre::Result { let offset = self.received; let length = u64::try_from(bytes.len())?; if offset.saturating_add(length) > self.expected_size { @@ -304,11 +380,11 @@ impl IncomingFile { let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished { id: game_id.to_string(), peer_addr, - relative_path: format!("{game_id}/local/{}", self.relative_path), + relative_path: format!("{game_id}/.local.installing/{}", self.relative_path), offset, length, }); - Ok(()) + Ok(length) } async fn finish(mut self, relative_path: &str) -> eyre::Result<()> { @@ -329,14 +405,13 @@ impl IncomingFile { ); } - if let Some(expected) = self.expected_crc32 { - let actual = self.hasher.finalize(); - if actual != expected { - eyre::bail!( - "streamed file {} CRC32 mismatch: got {actual:08X}, expected {expected:08X}", - self.relative_path - ); - } + let actual = self.hasher.finalize(); + if actual != self.expected_crc32 { + eyre::bail!( + "streamed file {} CRC32 mismatch: got {actual:08X}, expected {:08X}", + self.relative_path, + self.expected_crc32 + ); } log::debug!( diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index 27609c5..3aad2dc 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -97,11 +97,17 @@ pub enum Response { InternalPeerError(String), } -#[derive(Clone, Debug, Serialize, Deserialize)] +const STREAM_INSTALL_CONTROL_FRAME_TAG: u8 = 0; +const STREAM_INSTALL_FILE_CHUNK_FRAME_TAG: u8 = 1; +const STREAM_INSTALL_ENCODE_ERROR_FRAME: &[u8] = + b"\0{\"Error\":{\"message\":\"stream install frame encoding error\"}}"; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum StreamInstallFrame { ArchiveBegin { archive_name: String, solid: bool, + unpacked_size: u64, }, Directory { relative_path: String, @@ -109,7 +115,7 @@ pub enum StreamInstallFrame { FileBegin { relative_path: String, size: u64, - crc32: Option, + crc32: u32, }, FileChunk { bytes: Bytes, @@ -180,26 +186,59 @@ impl Message for Response { impl Message for StreamInstallFrame { fn decode(bytes: Bytes) -> Self { - match serde_json::from_slice(&bytes) { - Ok(t) => t, - Err(e) => { - tracing::error!(?e, "StreamInstallFrame decoding error"); - StreamInstallFrame::Error { - message: format!("stream install frame decoding error: {e}"), - } - } + if bytes.is_empty() { + return stream_install_decode_error("stream install frame is empty"); + } + + let tag = bytes[0]; + let payload = bytes.slice(1..); + match tag { + STREAM_INSTALL_CONTROL_FRAME_TAG => decode_stream_install_control_frame(&payload), + STREAM_INSTALL_FILE_CHUNK_FRAME_TAG => StreamInstallFrame::FileChunk { bytes: payload }, + _ => stream_install_decode_error(format!("unknown stream install frame tag {tag}")), } } fn encode(&self) -> Bytes { - match serde_json::to_vec(self) { - Ok(s) => Bytes::from(s), - Err(e) => { - tracing::error!(?e, "StreamInstallFrame encoding error"); - Bytes::from(format!( - r#"{{"Error": {{"message": "encoding error: {e}"}}}}"# - )) + match self { + StreamInstallFrame::FileChunk { bytes } => { + tagged_stream_install_frame(STREAM_INSTALL_FILE_CHUNK_FRAME_TAG, bytes) } + _ => match serde_json::to_vec(self) { + Ok(payload) => { + tagged_stream_install_frame(STREAM_INSTALL_CONTROL_FRAME_TAG, &payload) + } + Err(e) => { + tracing::error!(?e, "StreamInstallFrame encoding error"); + Bytes::from_static(STREAM_INSTALL_ENCODE_ERROR_FRAME) + } + }, } } } + +fn decode_stream_install_control_frame(payload: &[u8]) -> StreamInstallFrame { + match serde_json::from_slice(payload) { + Ok(StreamInstallFrame::FileChunk { .. }) => { + stream_install_decode_error("stream install control frame cannot contain file bytes") + } + Ok(frame) => frame, + Err(e) => { + tracing::error!(?e, "StreamInstallFrame decoding error"); + stream_install_decode_error(format!("stream install frame decoding error: {e}")) + } + } +} + +fn tagged_stream_install_frame(tag: u8, payload: &[u8]) -> Bytes { + let mut frame = Vec::with_capacity(1 + payload.len()); + frame.push(tag); + frame.extend_from_slice(payload); + Bytes::from(frame) +} + +fn stream_install_decode_error(message: impl Into) -> StreamInstallFrame { + StreamInstallFrame::Error { + message: message.into(), + } +} diff --git a/crates/lanspread-proto/tests/stream_install_frame.rs b/crates/lanspread-proto/tests/stream_install_frame.rs new file mode 100644 index 0000000..ed44eb7 --- /dev/null +++ b/crates/lanspread-proto/tests/stream_install_frame.rs @@ -0,0 +1,42 @@ +use bytes::Bytes; +use lanspread_proto::{Message, StreamInstallFrame}; + +#[test] +fn file_chunks_encode_raw_bytes() { + let bytes = Bytes::from_static(&[0, 1, 2, 255]); + let encoded = StreamInstallFrame::FileChunk { + bytes: bytes.clone(), + } + .encode(); + + assert_eq!(&encoded[..], &[1, 0, 1, 2, 255]); + assert_eq!( + StreamInstallFrame::decode(encoded), + StreamInstallFrame::FileChunk { bytes } + ); +} + +#[test] +fn control_frames_are_tagged_json() { + let frame = StreamInstallFrame::FileBegin { + relative_path: "bin/game.exe".to_string(), + size: 42, + crc32: 0x38B4_88A7, + }; + let encoded = frame.encode(); + + assert_eq!(encoded[0], 0); + assert_eq!(StreamInstallFrame::decode(encoded), frame); +} + +#[test] +fn empty_frames_decode_as_errors() { + match StreamInstallFrame::decode(Bytes::new()) { + StreamInstallFrame::Error { message } => { + assert!(message.contains("empty")); + } + other => { + panic!("expected error frame, got {other:?}"); + } + } +}