diff --git a/crates/lanspread-peer-cli/src/lib.rs b/crates/lanspread-peer-cli/src/lib.rs index b05eefa..38fab43 100644 --- a/crates/lanspread-peer-cli/src/lib.rs +++ b/crates/lanspread-peer-cli/src/lib.rs @@ -15,7 +15,10 @@ use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, U use lanspread_proto::StreamInstallFrame; use serde::Serialize; use serde_json::{Value, json}; -use tokio::{io::AsyncReadExt, sync::mpsc}; +use tokio::{ + io::{AsyncRead, AsyncReadExt}, + sync::mpsc, +}; use tokio_util::sync::CancellationToken; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; @@ -298,53 +301,19 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider { StreamInstallFrame::ArchiveBegin { archive_name: archive_name.clone(), solid: listing.solid, + unpacked_size: listing.unpacked_size(), }, ) .await?; - for entry in listing.entries { - if cancel_token.is_cancelled() { - eyre::bail!("streamed archive {} was cancelled", archive.display()); - } - - match entry.kind { - RarEntryKind::Directory => { - send_stream_frame( - &frames, - StreamInstallFrame::Directory { - relative_path: entry.relative_path, - }, - ) - .await?; - } - RarEntryKind::File => { - send_stream_frame( - &frames, - StreamInstallFrame::FileBegin { - relative_path: entry.relative_path.clone(), - size: entry.size, - crc32: entry.crc32, - }, - ) - .await?; - stream_unrar_file( - &self.program, - archive, - &entry.relative_path, - &frames, - cancel_token.clone(), - ) - .await?; - send_stream_frame( - &frames, - StreamInstallFrame::FileEnd { - relative_path: entry.relative_path, - }, - ) - .await?; - } - } - } + stream_unrar_entries( + &self.program, + archive, + &listing.entries, + &frames, + cancel_token.clone(), + ) + .await?; send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await }) @@ -357,6 +326,16 @@ struct RarListing { entries: Vec, } +impl RarListing { + fn unpacked_size(&self) -> u64 { + self.entries + .iter() + .filter(|entry| entry.kind == RarEntryKind::File) + .map(|entry| entry.size) + .sum() + } +} + #[derive(Debug, Clone, PartialEq, Eq)] struct RarEntry { relative_path: String, @@ -448,26 +427,32 @@ fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Re return Ok(()); }; - let size = match kind { - RarEntryKind::File => draft - .size - .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?, - RarEntryKind::Directory => 0, + let (size, crc32) = match kind { + RarEntryKind::File => { + let size = draft + .size + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; + let crc32 = draft + .crc32 + .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; + (size, Some(crc32)) + } + RarEntryKind::Directory => (0, None), }; entries.push(RarEntry { relative_path, kind, size, - crc32: draft.crc32, + crc32, }); Ok(()) } -async fn stream_unrar_file( +async fn stream_unrar_entries( program: &Path, archive: &Path, - relative_path: &str, + entries: &[RarEntry], frames: &mpsc::Sender, cancel_token: CancellationToken, ) -> eyre::Result<()> { @@ -476,28 +461,112 @@ async fn stream_unrar_file( .arg("-inul") .arg("-cfg-") .arg(archive) - .arg(relative_path) .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn()?; - let mut stdout = child - .stdout - .take() - .ok_or_eyre("unrar stdout was not captured")?; - let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; + let result = async { + let mut stdout = child + .stdout + .take() + .ok_or_eyre("unrar stdout was not captured")?; + let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; - loop { - let read = tokio::select! { - () = cancel_token.cancelled() => { - let _ = child.kill().await; - eyre::bail!("streaming {relative_path} from {} was cancelled", archive.display()); + for entry in entries { + if cancel_token.is_cancelled() { + eyre::bail!("streamed archive {} was cancelled", archive.display()); } - read = stdout.read(&mut buffer) => read?, - }; + match entry.kind { + RarEntryKind::Directory => { + send_stream_frame( + frames, + StreamInstallFrame::Directory { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + RarEntryKind::File => { + let Some(crc32) = entry.crc32 else { + eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); + }; + send_stream_frame( + frames, + StreamInstallFrame::FileBegin { + relative_path: entry.relative_path.clone(), + size: entry.size, + crc32, + }, + ) + .await?; + stream_unrar_file_from_stdout( + &mut stdout, + archive, + entry, + frames, + &mut buffer, + &cancel_token, + ) + .await?; + send_stream_frame( + frames, + StreamInstallFrame::FileEnd { + relative_path: entry.relative_path.clone(), + }, + ) + .await?; + } + } + } + + let extra = + read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?; + if extra != 0 { + eyre::bail!( + "unrar produced bytes after listed entries for {}", + archive.display() + ); + } + + let status = wait_unrar_child(&mut child, &cancel_token, archive).await?; + if !status.success() { + eyre::bail!( + "unrar p failed for {} with status {status}", + archive.display() + ); + } + + Ok(()) + } + .await; + + if result.is_err() { + let _ = child.kill().await; + } + + result +} + +async fn stream_unrar_file_from_stdout( + stdout: &mut (impl AsyncRead + Unpin), + archive: &Path, + entry: &RarEntry, + frames: &mpsc::Sender, + buffer: &mut [u8], + cancel_token: &CancellationToken, +) -> eyre::Result<()> { + let mut remaining = entry.size; + while remaining > 0 { + let read_len = usize::try_from(remaining.min(buffer.len() as u64))?; + let read = + read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?; if read == 0 { - break; + eyre::bail!( + "unrar ended while streaming {} from {}; {remaining} bytes missing", + entry.relative_path, + archive.display() + ); } send_stream_frame( @@ -507,20 +576,40 @@ async fn stream_unrar_file( }, ) .await?; - } - - let status = child.wait().await?; - if !status.success() { - eyre::bail!( - "unrar p failed for {}:{} with status {status}", - archive.display(), - relative_path - ); + remaining = remaining.saturating_sub(u64::try_from(read)?); } Ok(()) } +async fn read_unrar_stdout( + stdout: &mut (impl AsyncRead + Unpin), + buffer: &mut [u8], + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + read = stdout.read(buffer) => Ok(read?), + } +} + +async fn wait_unrar_child( + child: &mut tokio::process::Child, + cancel_token: &CancellationToken, + archive: &Path, +) -> eyre::Result { + tokio::select! { + () = cancel_token.cancelled() => { + let _ = child.kill().await; + eyre::bail!("streamed archive {} was cancelled", archive.display()); + } + status = child.wait() => Ok(status?), + } +} + async fn send_stream_frame( frames: &mpsc::Sender, frame: StreamInstallFrame, @@ -639,7 +728,7 @@ mod tests { let listing = parse_unrar_listing( r#" Archive: game.eti -Details: RAR 5 +Details: RAR 5, solid Name: bin/payload.bin Type: File @@ -652,7 +741,7 @@ Details: RAR 5 ) .expect("listing should parse"); - assert!(!listing.solid); + assert!(listing.solid); assert_eq!( listing.entries, vec![ @@ -672,6 +761,23 @@ Details: RAR 5 ); } + #[test] + fn rejects_unrar_file_entries_without_crc32() { + let err = parse_unrar_listing( + r#" +Archive: game.eti +Details: RAR 5 + + Name: bin/payload.bin + Type: File + Size: 123 +"#, + ) + .expect_err("file entries without CRC32 should be rejected"); + + assert!(err.to_string().contains("has no CRC32")); + } + #[tokio::test] async fn fixture_unpacker_creates_install_payload() { let temp = TempDir::new("lanspread-peer-cli-fixture"); diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index c5851c5..8985f74 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -497,11 +497,11 @@ pub async fn handle_stream_install_game_command( } }; peers.sort(); - let Some(peer_addr) = peers.into_iter().next() else { + if peers.is_empty() { log::error!("No peer selected for streamed install of {id}"); send_download_failed(tx_notify_ui, &id); return; - }; + } match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { BeginOperationResult::Started => {} @@ -525,15 +525,8 @@ pub async fn handle_stream_install_game_command( let ctx_clone = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); ctx.task_tracker.spawn(async move { - run_stream_install_operation( - ctx_clone, - tx_notify_ui, - id, - game_root, - peer_addr, - cancel_token, - ) - .await; + run_stream_install_operation(ctx_clone, tx_notify_ui, id, game_root, peers, cancel_token) + .await; }); } @@ -582,7 +575,7 @@ async fn run_stream_install_operation( tx_notify_ui: UnboundedSender, id: String, game_root: PathBuf, - peer_addr: SocketAddr, + peer_addrs: Vec, cancel_token: CancellationToken, ) { let download_guard = OperationGuard::download( @@ -597,63 +590,93 @@ async fn run_stream_install_operation( PeerEvent::DownloadGameFilesBegin { id: id.clone() }, ); - let transaction = match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id) - .await - { - Ok(transaction) => transaction, - Err(err) => { - log::error!("Failed to prepare streamed install for {id}: {err}"); - finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await; - return; + let mut last_receive_error = None; + for peer_addr in peer_addrs { + if cancel_token.is_cancelled() { + last_receive_error = Some(eyre::eyre!("streamed install for {id} was cancelled")); + break; } - }; - let receive_result = receive_streamed_install( - peer_addr, - &id, - transaction.staging_dir(), - tx_notify_ui.clone(), - cancel_token.clone(), - ) - .await; + let transaction = + match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id).await { + Ok(transaction) => transaction, + Err(err) => { + log::error!("Failed to prepare streamed install for {id}: {err}"); + finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) + .await; + return; + } + }; - match receive_result { - Ok(()) => { - if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing) + let receive_result = receive_streamed_install( + peer_addr, + &id, + transaction.staging_dir(), + tx_notify_ui.clone(), + cancel_token.clone(), + ) + .await; + + match receive_result { + Ok(()) => { + if transition_download_to_install( + &ctx, + &tx_notify_ui, + &id, + OperationKind::Installing, + ) .await - { - clear_active_download(&ctx, &id).await; - send_download_finished(&tx_notify_ui, &id); - download_guard.disarm(); - commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; - } else { + { + clear_active_download(&ctx, &id).await; + send_download_finished(&tx_notify_ui, &id); + download_guard.disarm(); + commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; + return; + } + if let Err(err) = transaction.rollback().await { log::error!("Failed to roll back streamed install for {id}: {err}"); } finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) .await; + return; } - } - Err(err) => { - if let Err(rollback_err) = transaction.rollback().await { - log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); + Err(err) => { + if let Err(rollback_err) = transaction.rollback().await { + log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); + } + if cancel_token.is_cancelled() { + log::info!("Streamed install download cancelled for {id}: {err}"); + last_receive_error = Some(err); + break; + } + + log::warn!( + "Streamed install attempt from {peer_addr} failed for {id}; trying another peer if available: {err}" + ); + last_receive_error = Some(err); } - let download_was_cancelled = cancel_token.is_cancelled(); - if download_was_cancelled { - log::info!("Streamed install download cancelled for {id}: {err}"); - } else { - log::error!("Streamed install download failed for {id}: {err}"); - } - finish_failed_stream_download( - &ctx, - &tx_notify_ui, - &id, - download_guard, - download_was_cancelled, - ) - .await; } } + + let download_was_cancelled = cancel_token.is_cancelled(); + if let Some(err) = last_receive_error { + if download_was_cancelled { + log::info!("Streamed install download cancelled for {id}: {err}"); + } else { + log::error!("Streamed install download failed for {id}: {err}"); + } + } else { + log::error!("Streamed install download failed for {id}: no peer attempts were made"); + } + finish_failed_stream_download( + &ctx, + &tx_notify_ui, + &id, + download_guard, + download_was_cancelled, + ) + .await; } async fn finish_failed_stream_download( diff --git a/crates/lanspread-peer/src/stream_install.rs b/crates/lanspread-peer/src/stream_install.rs index 3d2e168..f6c5df5 100644 --- a/crates/lanspread-peer/src/stream_install.rs +++ b/crates/lanspread-peer/src/stream_install.rs @@ -4,6 +4,7 @@ use std::{ path::{Path, PathBuf}, pin::Pin, sync::Arc, + time::{Duration, Instant}, }; use bytes::Bytes; @@ -15,6 +16,7 @@ use tokio::{ fs::File, io::AsyncWriteExt, sync::{mpsc, mpsc::UnboundedSender}, + time::{self, MissedTickBehavior}, }; use tokio_util::{ codec::{FramedRead, FramedWrite, LengthDelimitedCodec}, @@ -22,6 +24,7 @@ use tokio_util::{ }; use crate::{ + DownloadProgress, PeerEvent, install::root_eti_archives, network::connect_to_peer, @@ -29,6 +32,7 @@ use crate::{ }; const FRAME_CHANNEL_DEPTH: usize = 16; +const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500); pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; @@ -182,10 +186,18 @@ pub(crate) async fn receive_streamed_install( let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); let mut current_file: Option = None; + let mut progress = StreamInstallProgress::new(game_id.to_string()); + let mut progress_interval = time::interval(STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL); + progress_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + progress_interval.tick().await; loop { let next = tokio::select! { () = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"), + _ = progress_interval.tick() => { + progress.emit_current(&tx_notify_ui); + continue; + } next = framed_rx.next() => next, }; @@ -199,9 +211,13 @@ pub(crate) async fn receive_streamed_install( StreamInstallFrame::ArchiveBegin { archive_name, solid, + unpacked_size, } => { + progress.add_total(unpacked_size); + progress.emit_snapshot(&tx_notify_ui, 0); log::info!( - "Receiving streamed install archive {archive_name} for {game_id} (solid={solid})" + "Receiving streamed install archive {archive_name} for {game_id} \ + (solid={solid}, unpacked_size={unpacked_size})" ); } StreamInstallFrame::Directory { relative_path } => { @@ -227,8 +243,10 @@ pub(crate) async fn receive_streamed_install( let Some(file) = current_file.as_mut() else { eyre::bail!("received FileChunk without FileBegin"); }; - file.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) + let length = file + .write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) .await?; + progress.record_bytes(length); } StreamInstallFrame::FileEnd { relative_path } => { let Some(file) = current_file.take() else { @@ -243,6 +261,7 @@ pub(crate) async fn receive_streamed_install( if current_file.is_some() { eyre::bail!("streamed install completed with an open file"); } + progress.emit_snapshot(&tx_notify_ui, 0); return Ok(()); } StreamInstallFrame::Error { message } => { @@ -252,11 +271,68 @@ pub(crate) async fn receive_streamed_install( } } +struct StreamInstallProgress { + id: String, + total_bytes: u64, + downloaded_bytes: u64, + last_downloaded_bytes: u64, + last_at: Instant, +} + +impl StreamInstallProgress { + fn new(id: String) -> Self { + Self { + id, + total_bytes: 0, + downloaded_bytes: 0, + last_downloaded_bytes: 0, + last_at: Instant::now(), + } + } + + fn add_total(&mut self, bytes: u64) { + self.total_bytes = self.total_bytes.saturating_add(bytes); + } + + fn record_bytes(&mut self, bytes: u64) { + self.downloaded_bytes = self.downloaded_bytes.saturating_add(bytes); + } + + fn emit_current(&mut self, tx_notify_ui: &UnboundedSender) { + let now = Instant::now(); + let speed = bytes_per_second( + self.downloaded_bytes + .saturating_sub(self.last_downloaded_bytes), + now.duration_since(self.last_at), + ); + + self.last_downloaded_bytes = self.downloaded_bytes; + self.last_at = now; + self.emit_snapshot(tx_notify_ui, speed); + } + + fn emit_snapshot(&self, tx_notify_ui: &UnboundedSender, bytes_per_second: u64) { + let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesProgress(DownloadProgress { + id: self.id.clone(), + downloaded_bytes: self.downloaded_bytes, + total_bytes: self.total_bytes, + bytes_per_second, + active_peer_count: 1, + })); + } +} + +fn bytes_per_second(bytes: u64, elapsed: Duration) -> u64 { + let millis = elapsed.as_millis().max(1); + let rate = u128::from(bytes).saturating_mul(1_000) / millis; + u64::try_from(rate).unwrap_or(u64::MAX) +} + struct IncomingFile { relative_path: String, path: PathBuf, expected_size: u64, - expected_crc32: Option, + expected_crc32: u32, received: u64, hasher: Hasher, file: File, @@ -267,7 +343,7 @@ impl IncomingFile { relative_path: String, path: PathBuf, expected_size: u64, - expected_crc32: Option, + expected_crc32: u32, file: File, ) -> Self { Self { @@ -287,7 +363,7 @@ impl IncomingFile { peer_addr: SocketAddr, tx_notify_ui: &UnboundedSender, bytes: Bytes, - ) -> eyre::Result<()> { + ) -> eyre::Result { let offset = self.received; let length = u64::try_from(bytes.len())?; if offset.saturating_add(length) > self.expected_size { @@ -304,11 +380,11 @@ impl IncomingFile { let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished { id: game_id.to_string(), peer_addr, - relative_path: format!("{game_id}/local/{}", self.relative_path), + relative_path: format!("{game_id}/.local.installing/{}", self.relative_path), offset, length, }); - Ok(()) + Ok(length) } async fn finish(mut self, relative_path: &str) -> eyre::Result<()> { @@ -329,14 +405,13 @@ impl IncomingFile { ); } - if let Some(expected) = self.expected_crc32 { - let actual = self.hasher.finalize(); - if actual != expected { - eyre::bail!( - "streamed file {} CRC32 mismatch: got {actual:08X}, expected {expected:08X}", - self.relative_path - ); - } + let actual = self.hasher.finalize(); + if actual != self.expected_crc32 { + eyre::bail!( + "streamed file {} CRC32 mismatch: got {actual:08X}, expected {:08X}", + self.relative_path, + self.expected_crc32 + ); } log::debug!( diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index 27609c5..3aad2dc 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -97,11 +97,17 @@ pub enum Response { InternalPeerError(String), } -#[derive(Clone, Debug, Serialize, Deserialize)] +const STREAM_INSTALL_CONTROL_FRAME_TAG: u8 = 0; +const STREAM_INSTALL_FILE_CHUNK_FRAME_TAG: u8 = 1; +const STREAM_INSTALL_ENCODE_ERROR_FRAME: &[u8] = + b"\0{\"Error\":{\"message\":\"stream install frame encoding error\"}}"; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum StreamInstallFrame { ArchiveBegin { archive_name: String, solid: bool, + unpacked_size: u64, }, Directory { relative_path: String, @@ -109,7 +115,7 @@ pub enum StreamInstallFrame { FileBegin { relative_path: String, size: u64, - crc32: Option, + crc32: u32, }, FileChunk { bytes: Bytes, @@ -180,26 +186,59 @@ impl Message for Response { impl Message for StreamInstallFrame { fn decode(bytes: Bytes) -> Self { - match serde_json::from_slice(&bytes) { - Ok(t) => t, - Err(e) => { - tracing::error!(?e, "StreamInstallFrame decoding error"); - StreamInstallFrame::Error { - message: format!("stream install frame decoding error: {e}"), - } - } + if bytes.is_empty() { + return stream_install_decode_error("stream install frame is empty"); + } + + let tag = bytes[0]; + let payload = bytes.slice(1..); + match tag { + STREAM_INSTALL_CONTROL_FRAME_TAG => decode_stream_install_control_frame(&payload), + STREAM_INSTALL_FILE_CHUNK_FRAME_TAG => StreamInstallFrame::FileChunk { bytes: payload }, + _ => stream_install_decode_error(format!("unknown stream install frame tag {tag}")), } } fn encode(&self) -> Bytes { - match serde_json::to_vec(self) { - Ok(s) => Bytes::from(s), - Err(e) => { - tracing::error!(?e, "StreamInstallFrame encoding error"); - Bytes::from(format!( - r#"{{"Error": {{"message": "encoding error: {e}"}}}}"# - )) + match self { + StreamInstallFrame::FileChunk { bytes } => { + tagged_stream_install_frame(STREAM_INSTALL_FILE_CHUNK_FRAME_TAG, bytes) } + _ => match serde_json::to_vec(self) { + Ok(payload) => { + tagged_stream_install_frame(STREAM_INSTALL_CONTROL_FRAME_TAG, &payload) + } + Err(e) => { + tracing::error!(?e, "StreamInstallFrame encoding error"); + Bytes::from_static(STREAM_INSTALL_ENCODE_ERROR_FRAME) + } + }, } } } + +fn decode_stream_install_control_frame(payload: &[u8]) -> StreamInstallFrame { + match serde_json::from_slice(payload) { + Ok(StreamInstallFrame::FileChunk { .. }) => { + stream_install_decode_error("stream install control frame cannot contain file bytes") + } + Ok(frame) => frame, + Err(e) => { + tracing::error!(?e, "StreamInstallFrame decoding error"); + stream_install_decode_error(format!("stream install frame decoding error: {e}")) + } + } +} + +fn tagged_stream_install_frame(tag: u8, payload: &[u8]) -> Bytes { + let mut frame = Vec::with_capacity(1 + payload.len()); + frame.push(tag); + frame.extend_from_slice(payload); + Bytes::from(frame) +} + +fn stream_install_decode_error(message: impl Into) -> StreamInstallFrame { + StreamInstallFrame::Error { + message: message.into(), + } +} diff --git a/crates/lanspread-proto/tests/stream_install_frame.rs b/crates/lanspread-proto/tests/stream_install_frame.rs new file mode 100644 index 0000000..ed44eb7 --- /dev/null +++ b/crates/lanspread-proto/tests/stream_install_frame.rs @@ -0,0 +1,42 @@ +use bytes::Bytes; +use lanspread_proto::{Message, StreamInstallFrame}; + +#[test] +fn file_chunks_encode_raw_bytes() { + let bytes = Bytes::from_static(&[0, 1, 2, 255]); + let encoded = StreamInstallFrame::FileChunk { + bytes: bytes.clone(), + } + .encode(); + + assert_eq!(&encoded[..], &[1, 0, 1, 2, 255]); + assert_eq!( + StreamInstallFrame::decode(encoded), + StreamInstallFrame::FileChunk { bytes } + ); +} + +#[test] +fn control_frames_are_tagged_json() { + let frame = StreamInstallFrame::FileBegin { + relative_path: "bin/game.exe".to_string(), + size: 42, + crc32: 0x38B4_88A7, + }; + let encoded = frame.encode(); + + assert_eq!(encoded[0], 0); + assert_eq!(StreamInstallFrame::decode(encoded), frame); +} + +#[test] +fn empty_frames_decode_as_errors() { + match StreamInstallFrame::decode(Bytes::new()) { + StreamInstallFrame::Error { message } => { + assert!(message.contains("empty")); + } + other => { + panic!("expected error frame, got {other:?}"); + } + } +}