Feature/streamed install prototype #27

Merged
ddidderr merged 15 commits from feature/streamed-install-prototype into main 2026-06-11 08:52:33 +02:00
5 changed files with 450 additions and 165 deletions
Showing only changes of commit 5dd356eca8 - Show all commits
+171 -65
View File
@@ -15,7 +15,10 @@ use lanspread_peer::{StreamInstallFuture, StreamInstallProvider, UnpackFuture, U
use lanspread_proto::StreamInstallFrame; use lanspread_proto::StreamInstallFrame;
use serde::Serialize; use serde::Serialize;
use serde_json::{Value, json}; use serde_json::{Value, json};
use tokio::{io::AsyncReadExt, sync::mpsc}; use tokio::{
io::{AsyncRead, AsyncReadExt},
sync::mpsc,
};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub const DEFAULT_FIXTURE_VERSION: &str = "20250101"; pub const DEFAULT_FIXTURE_VERSION: &str = "20250101";
@@ -298,53 +301,19 @@ impl StreamInstallProvider for ExternalUnrarStreamProvider {
StreamInstallFrame::ArchiveBegin { StreamInstallFrame::ArchiveBegin {
archive_name: archive_name.clone(), archive_name: archive_name.clone(),
solid: listing.solid, solid: listing.solid,
unpacked_size: listing.unpacked_size(),
}, },
) )
.await?; .await?;
for entry in listing.entries { stream_unrar_entries(
if cancel_token.is_cancelled() {
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
match entry.kind {
RarEntryKind::Directory => {
send_stream_frame(
&frames,
StreamInstallFrame::Directory {
relative_path: entry.relative_path,
},
)
.await?;
}
RarEntryKind::File => {
send_stream_frame(
&frames,
StreamInstallFrame::FileBegin {
relative_path: entry.relative_path.clone(),
size: entry.size,
crc32: entry.crc32,
},
)
.await?;
stream_unrar_file(
&self.program, &self.program,
archive, archive,
&entry.relative_path, &listing.entries,
&frames, &frames,
cancel_token.clone(), cancel_token.clone(),
) )
.await?; .await?;
send_stream_frame(
&frames,
StreamInstallFrame::FileEnd {
relative_path: entry.relative_path,
},
)
.await?;
}
}
}
send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await
}) })
@@ -357,6 +326,16 @@ struct RarListing {
entries: Vec<RarEntry>, entries: Vec<RarEntry>,
} }
impl RarListing {
fn unpacked_size(&self) -> u64 {
self.entries
.iter()
.filter(|entry| entry.kind == RarEntryKind::File)
.map(|entry| entry.size)
.sum()
}
}
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
struct RarEntry { struct RarEntry {
relative_path: String, relative_path: String,
@@ -448,26 +427,32 @@ fn push_rar_entry(entries: &mut Vec<RarEntry>, draft: RarEntryDraft) -> eyre::Re
return Ok(()); return Ok(());
}; };
let size = match kind { let (size, crc32) = match kind {
RarEntryKind::File => draft RarEntryKind::File => {
let size = draft
.size .size
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?, .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?;
RarEntryKind::Directory => 0, let crc32 = draft
.crc32
.ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?;
(size, Some(crc32))
}
RarEntryKind::Directory => (0, None),
}; };
entries.push(RarEntry { entries.push(RarEntry {
relative_path, relative_path,
kind, kind,
size, size,
crc32: draft.crc32, crc32,
}); });
Ok(()) Ok(())
} }
async fn stream_unrar_file( async fn stream_unrar_entries(
program: &Path, program: &Path,
archive: &Path, archive: &Path,
relative_path: &str, entries: &[RarEntry],
frames: &mpsc::Sender<StreamInstallFrame>, frames: &mpsc::Sender<StreamInstallFrame>,
cancel_token: CancellationToken, cancel_token: CancellationToken,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
@@ -476,28 +461,112 @@ async fn stream_unrar_file(
.arg("-inul") .arg("-inul")
.arg("-cfg-") .arg("-cfg-")
.arg(archive) .arg(archive)
.arg(relative_path)
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::null()) .stderr(Stdio::null())
.spawn()?; .spawn()?;
let result = async {
let mut stdout = child let mut stdout = child
.stdout .stdout
.take() .take()
.ok_or_eyre("unrar stdout was not captured")?; .ok_or_eyre("unrar stdout was not captured")?;
let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE];
loop { for entry in entries {
let read = tokio::select! { if cancel_token.is_cancelled() {
() = cancel_token.cancelled() => { eyre::bail!("streamed archive {} was cancelled", archive.display());
let _ = child.kill().await;
eyre::bail!("streaming {relative_path} from {} was cancelled", archive.display());
} }
read = stdout.read(&mut buffer) => read?,
};
match entry.kind {
RarEntryKind::Directory => {
send_stream_frame(
frames,
StreamInstallFrame::Directory {
relative_path: entry.relative_path.clone(),
},
)
.await?;
}
RarEntryKind::File => {
let Some(crc32) = entry.crc32 else {
eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path);
};
send_stream_frame(
frames,
StreamInstallFrame::FileBegin {
relative_path: entry.relative_path.clone(),
size: entry.size,
crc32,
},
)
.await?;
stream_unrar_file_from_stdout(
&mut stdout,
archive,
entry,
frames,
&mut buffer,
&cancel_token,
)
.await?;
send_stream_frame(
frames,
StreamInstallFrame::FileEnd {
relative_path: entry.relative_path.clone(),
},
)
.await?;
}
}
}
let extra =
read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?;
if extra != 0 {
eyre::bail!(
"unrar produced bytes after listed entries for {}",
archive.display()
);
}
let status = wait_unrar_child(&mut child, &cancel_token, archive).await?;
if !status.success() {
eyre::bail!(
"unrar p failed for {} with status {status}",
archive.display()
);
}
Ok(())
}
.await;
if result.is_err() {
let _ = child.kill().await;
}
result
}
async fn stream_unrar_file_from_stdout(
stdout: &mut (impl AsyncRead + Unpin),
archive: &Path,
entry: &RarEntry,
frames: &mpsc::Sender<StreamInstallFrame>,
buffer: &mut [u8],
cancel_token: &CancellationToken,
) -> eyre::Result<()> {
let mut remaining = entry.size;
while remaining > 0 {
let read_len = usize::try_from(remaining.min(buffer.len() as u64))?;
let read =
read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?;
if read == 0 { if read == 0 {
break; eyre::bail!(
"unrar ended while streaming {} from {}; {remaining} bytes missing",
entry.relative_path,
archive.display()
);
} }
send_stream_frame( send_stream_frame(
@@ -507,20 +576,40 @@ async fn stream_unrar_file(
}, },
) )
.await?; .await?;
} remaining = remaining.saturating_sub(u64::try_from(read)?);
let status = child.wait().await?;
if !status.success() {
eyre::bail!(
"unrar p failed for {}:{} with status {status}",
archive.display(),
relative_path
);
} }
Ok(()) Ok(())
} }
async fn read_unrar_stdout(
stdout: &mut (impl AsyncRead + Unpin),
buffer: &mut [u8],
cancel_token: &CancellationToken,
archive: &Path,
) -> eyre::Result<usize> {
tokio::select! {
() = cancel_token.cancelled() => {
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
read = stdout.read(buffer) => Ok(read?),
}
}
async fn wait_unrar_child(
child: &mut tokio::process::Child,
cancel_token: &CancellationToken,
archive: &Path,
) -> eyre::Result<std::process::ExitStatus> {
tokio::select! {
() = cancel_token.cancelled() => {
let _ = child.kill().await;
eyre::bail!("streamed archive {} was cancelled", archive.display());
}
status = child.wait() => Ok(status?),
}
}
async fn send_stream_frame( async fn send_stream_frame(
frames: &mpsc::Sender<StreamInstallFrame>, frames: &mpsc::Sender<StreamInstallFrame>,
frame: StreamInstallFrame, frame: StreamInstallFrame,
@@ -639,7 +728,7 @@ mod tests {
let listing = parse_unrar_listing( let listing = parse_unrar_listing(
r#" r#"
Archive: game.eti Archive: game.eti
Details: RAR 5 Details: RAR 5, solid
Name: bin/payload.bin Name: bin/payload.bin
Type: File Type: File
@@ -652,7 +741,7 @@ Details: RAR 5
) )
.expect("listing should parse"); .expect("listing should parse");
assert!(!listing.solid); assert!(listing.solid);
assert_eq!( assert_eq!(
listing.entries, listing.entries,
vec![ vec![
@@ -672,6 +761,23 @@ Details: RAR 5
); );
} }
#[test]
fn rejects_unrar_file_entries_without_crc32() {
let err = parse_unrar_listing(
r#"
Archive: game.eti
Details: RAR 5
Name: bin/payload.bin
Type: File
Size: 123
"#,
)
.expect_err("file entries without CRC32 should be rejected");
assert!(err.to_string().contains("has no CRC32"));
}
#[tokio::test] #[tokio::test]
async fn fixture_unpacker_creates_install_payload() { async fn fixture_unpacker_creates_install_payload() {
let temp = TempDir::new("lanspread-peer-cli-fixture"); let temp = TempDir::new("lanspread-peer-cli-fixture");
+43 -20
View File
@@ -497,11 +497,11 @@ pub async fn handle_stream_install_game_command(
} }
}; };
peers.sort(); peers.sort();
let Some(peer_addr) = peers.into_iter().next() else { if peers.is_empty() {
log::error!("No peer selected for streamed install of {id}"); log::error!("No peer selected for streamed install of {id}");
send_download_failed(tx_notify_ui, &id); send_download_failed(tx_notify_ui, &id);
return; return;
}; }
match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
BeginOperationResult::Started => {} BeginOperationResult::Started => {}
@@ -525,14 +525,7 @@ pub async fn handle_stream_install_game_command(
let ctx_clone = ctx.clone(); let ctx_clone = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone(); let tx_notify_ui = tx_notify_ui.clone();
ctx.task_tracker.spawn(async move { ctx.task_tracker.spawn(async move {
run_stream_install_operation( run_stream_install_operation(ctx_clone, tx_notify_ui, id, game_root, peers, cancel_token)
ctx_clone,
tx_notify_ui,
id,
game_root,
peer_addr,
cancel_token,
)
.await; .await;
}); });
} }
@@ -582,7 +575,7 @@ async fn run_stream_install_operation(
tx_notify_ui: UnboundedSender<PeerEvent>, tx_notify_ui: UnboundedSender<PeerEvent>,
id: String, id: String,
game_root: PathBuf, game_root: PathBuf,
peer_addr: SocketAddr, peer_addrs: Vec<SocketAddr>,
cancel_token: CancellationToken, cancel_token: CancellationToken,
) { ) {
let download_guard = OperationGuard::download( let download_guard = OperationGuard::download(
@@ -597,13 +590,20 @@ async fn run_stream_install_operation(
PeerEvent::DownloadGameFilesBegin { id: id.clone() }, PeerEvent::DownloadGameFilesBegin { id: id.clone() },
); );
let transaction = match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id) let mut last_receive_error = None;
.await for peer_addr in peer_addrs {
{ if cancel_token.is_cancelled() {
last_receive_error = Some(eyre::eyre!("streamed install for {id} was cancelled"));
break;
}
let transaction =
match install::begin_streamed_install(&game_root, ctx.state_dir.as_ref(), &id).await {
Ok(transaction) => transaction, Ok(transaction) => transaction,
Err(err) => { Err(err) => {
log::error!("Failed to prepare streamed install for {id}: {err}"); log::error!("Failed to prepare streamed install for {id}: {err}");
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false).await; finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false)
.await;
return; return;
} }
}; };
@@ -619,31 +619,56 @@ async fn run_stream_install_operation(
match receive_result { match receive_result {
Ok(()) => { Ok(()) => {
if transition_download_to_install(&ctx, &tx_notify_ui, &id, OperationKind::Installing) if transition_download_to_install(
&ctx,
&tx_notify_ui,
&id,
OperationKind::Installing,
)
.await .await
{ {
clear_active_download(&ctx, &id).await; clear_active_download(&ctx, &id).await;
send_download_finished(&tx_notify_ui, &id); send_download_finished(&tx_notify_ui, &id);
download_guard.disarm(); download_guard.disarm();
commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await; commit_streamed_install(&ctx, &tx_notify_ui, id, transaction).await;
} else { return;
}
if let Err(err) = transaction.rollback().await { if let Err(err) = transaction.rollback().await {
log::error!("Failed to roll back streamed install for {id}: {err}"); log::error!("Failed to roll back streamed install for {id}: {err}");
} }
finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false) finish_failed_stream_download(&ctx, &tx_notify_ui, &id, download_guard, false)
.await; .await;
} return;
} }
Err(err) => { Err(err) => {
if let Err(rollback_err) = transaction.rollback().await { if let Err(rollback_err) = transaction.rollback().await {
log::error!("Failed to roll back streamed install for {id}: {rollback_err}"); log::error!("Failed to roll back streamed install for {id}: {rollback_err}");
} }
if cancel_token.is_cancelled() {
log::info!("Streamed install download cancelled for {id}: {err}");
last_receive_error = Some(err);
break;
}
log::warn!(
"Streamed install attempt from {peer_addr} failed for {id}; trying another peer if available: {err}"
);
last_receive_error = Some(err);
}
}
}
let download_was_cancelled = cancel_token.is_cancelled(); let download_was_cancelled = cancel_token.is_cancelled();
if let Some(err) = last_receive_error {
if download_was_cancelled { if download_was_cancelled {
log::info!("Streamed install download cancelled for {id}: {err}"); log::info!("Streamed install download cancelled for {id}: {err}");
} else { } else {
log::error!("Streamed install download failed for {id}: {err}"); log::error!("Streamed install download failed for {id}: {err}");
} }
} else {
log::error!("Streamed install download failed for {id}: no peer attempts were made");
}
finish_failed_stream_download( finish_failed_stream_download(
&ctx, &ctx,
&tx_notify_ui, &tx_notify_ui,
@@ -652,8 +677,6 @@ async fn run_stream_install_operation(
download_was_cancelled, download_was_cancelled,
) )
.await; .await;
}
}
} }
async fn finish_failed_stream_download( async fn finish_failed_stream_download(
+87 -12
View File
@@ -4,6 +4,7 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
time::{Duration, Instant},
}; };
use bytes::Bytes; use bytes::Bytes;
@@ -15,6 +16,7 @@ use tokio::{
fs::File, fs::File,
io::AsyncWriteExt, io::AsyncWriteExt,
sync::{mpsc, mpsc::UnboundedSender}, sync::{mpsc, mpsc::UnboundedSender},
time::{self, MissedTickBehavior},
}; };
use tokio_util::{ use tokio_util::{
codec::{FramedRead, FramedWrite, LengthDelimitedCodec}, codec::{FramedRead, FramedWrite, LengthDelimitedCodec},
@@ -22,6 +24,7 @@ use tokio_util::{
}; };
use crate::{ use crate::{
DownloadProgress,
PeerEvent, PeerEvent,
install::root_eti_archives, install::root_eti_archives,
network::connect_to_peer, network::connect_to_peer,
@@ -29,6 +32,7 @@ use crate::{
}; };
const FRAME_CHANNEL_DEPTH: usize = 16; const FRAME_CHANNEL_DEPTH: usize = 16;
const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
pub type StreamInstallFuture<'a> = Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + 'a>>; pub type StreamInstallFuture<'a> = Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + 'a>>;
@@ -182,10 +186,18 @@ pub(crate) async fn receive_streamed_install(
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut current_file: Option<IncomingFile> = None; let mut current_file: Option<IncomingFile> = None;
let mut progress = StreamInstallProgress::new(game_id.to_string());
let mut progress_interval = time::interval(STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL);
progress_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
progress_interval.tick().await;
loop { loop {
let next = tokio::select! { let next = tokio::select! {
() = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"), () = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"),
_ = progress_interval.tick() => {
progress.emit_current(&tx_notify_ui);
continue;
}
next = framed_rx.next() => next, next = framed_rx.next() => next,
}; };
@@ -199,9 +211,13 @@ pub(crate) async fn receive_streamed_install(
StreamInstallFrame::ArchiveBegin { StreamInstallFrame::ArchiveBegin {
archive_name, archive_name,
solid, solid,
unpacked_size,
} => { } => {
progress.add_total(unpacked_size);
progress.emit_snapshot(&tx_notify_ui, 0);
log::info!( log::info!(
"Receiving streamed install archive {archive_name} for {game_id} (solid={solid})" "Receiving streamed install archive {archive_name} for {game_id} \
(solid={solid}, unpacked_size={unpacked_size})"
); );
} }
StreamInstallFrame::Directory { relative_path } => { StreamInstallFrame::Directory { relative_path } => {
@@ -227,8 +243,10 @@ pub(crate) async fn receive_streamed_install(
let Some(file) = current_file.as_mut() else { let Some(file) = current_file.as_mut() else {
eyre::bail!("received FileChunk without FileBegin"); eyre::bail!("received FileChunk without FileBegin");
}; };
file.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) let length = file
.write_chunk(game_id, peer_addr, &tx_notify_ui, bytes)
.await?; .await?;
progress.record_bytes(length);
} }
StreamInstallFrame::FileEnd { relative_path } => { StreamInstallFrame::FileEnd { relative_path } => {
let Some(file) = current_file.take() else { let Some(file) = current_file.take() else {
@@ -243,6 +261,7 @@ pub(crate) async fn receive_streamed_install(
if current_file.is_some() { if current_file.is_some() {
eyre::bail!("streamed install completed with an open file"); eyre::bail!("streamed install completed with an open file");
} }
progress.emit_snapshot(&tx_notify_ui, 0);
return Ok(()); return Ok(());
} }
StreamInstallFrame::Error { message } => { StreamInstallFrame::Error { message } => {
@@ -252,11 +271,68 @@ pub(crate) async fn receive_streamed_install(
} }
} }
struct StreamInstallProgress {
id: String,
total_bytes: u64,
downloaded_bytes: u64,
last_downloaded_bytes: u64,
last_at: Instant,
}
impl StreamInstallProgress {
fn new(id: String) -> Self {
Self {
id,
total_bytes: 0,
downloaded_bytes: 0,
last_downloaded_bytes: 0,
last_at: Instant::now(),
}
}
fn add_total(&mut self, bytes: u64) {
self.total_bytes = self.total_bytes.saturating_add(bytes);
}
fn record_bytes(&mut self, bytes: u64) {
self.downloaded_bytes = self.downloaded_bytes.saturating_add(bytes);
}
fn emit_current(&mut self, tx_notify_ui: &UnboundedSender<PeerEvent>) {
let now = Instant::now();
let speed = bytes_per_second(
self.downloaded_bytes
.saturating_sub(self.last_downloaded_bytes),
now.duration_since(self.last_at),
);
self.last_downloaded_bytes = self.downloaded_bytes;
self.last_at = now;
self.emit_snapshot(tx_notify_ui, speed);
}
fn emit_snapshot(&self, tx_notify_ui: &UnboundedSender<PeerEvent>, bytes_per_second: u64) {
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesProgress(DownloadProgress {
id: self.id.clone(),
downloaded_bytes: self.downloaded_bytes,
total_bytes: self.total_bytes,
bytes_per_second,
active_peer_count: 1,
}));
}
}
fn bytes_per_second(bytes: u64, elapsed: Duration) -> u64 {
let millis = elapsed.as_millis().max(1);
let rate = u128::from(bytes).saturating_mul(1_000) / millis;
u64::try_from(rate).unwrap_or(u64::MAX)
}
struct IncomingFile { struct IncomingFile {
relative_path: String, relative_path: String,
path: PathBuf, path: PathBuf,
expected_size: u64, expected_size: u64,
expected_crc32: Option<u32>, expected_crc32: u32,
received: u64, received: u64,
hasher: Hasher, hasher: Hasher,
file: File, file: File,
@@ -267,7 +343,7 @@ impl IncomingFile {
relative_path: String, relative_path: String,
path: PathBuf, path: PathBuf,
expected_size: u64, expected_size: u64,
expected_crc32: Option<u32>, expected_crc32: u32,
file: File, file: File,
) -> Self { ) -> Self {
Self { Self {
@@ -287,7 +363,7 @@ impl IncomingFile {
peer_addr: SocketAddr, peer_addr: SocketAddr,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
bytes: Bytes, bytes: Bytes,
) -> eyre::Result<()> { ) -> eyre::Result<u64> {
let offset = self.received; let offset = self.received;
let length = u64::try_from(bytes.len())?; let length = u64::try_from(bytes.len())?;
if offset.saturating_add(length) > self.expected_size { if offset.saturating_add(length) > self.expected_size {
@@ -304,11 +380,11 @@ impl IncomingFile {
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished { let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
id: game_id.to_string(), id: game_id.to_string(),
peer_addr, peer_addr,
relative_path: format!("{game_id}/local/{}", self.relative_path), relative_path: format!("{game_id}/.local.installing/{}", self.relative_path),
offset, offset,
length, length,
}); });
Ok(()) Ok(length)
} }
async fn finish(mut self, relative_path: &str) -> eyre::Result<()> { async fn finish(mut self, relative_path: &str) -> eyre::Result<()> {
@@ -329,15 +405,14 @@ impl IncomingFile {
); );
} }
if let Some(expected) = self.expected_crc32 {
let actual = self.hasher.finalize(); let actual = self.hasher.finalize();
if actual != expected { if actual != self.expected_crc32 {
eyre::bail!( eyre::bail!(
"streamed file {} CRC32 mismatch: got {actual:08X}, expected {expected:08X}", "streamed file {} CRC32 mismatch: got {actual:08X}, expected {:08X}",
self.relative_path self.relative_path,
self.expected_crc32
); );
} }
}
log::debug!( log::debug!(
"Received streamed file {} -> {}", "Received streamed file {} -> {}",
+53 -14
View File
@@ -97,11 +97,17 @@ pub enum Response {
InternalPeerError(String), InternalPeerError(String),
} }
#[derive(Clone, Debug, Serialize, Deserialize)] const STREAM_INSTALL_CONTROL_FRAME_TAG: u8 = 0;
const STREAM_INSTALL_FILE_CHUNK_FRAME_TAG: u8 = 1;
const STREAM_INSTALL_ENCODE_ERROR_FRAME: &[u8] =
b"\0{\"Error\":{\"message\":\"stream install frame encoding error\"}}";
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum StreamInstallFrame { pub enum StreamInstallFrame {
ArchiveBegin { ArchiveBegin {
archive_name: String, archive_name: String,
solid: bool, solid: bool,
unpacked_size: u64,
}, },
Directory { Directory {
relative_path: String, relative_path: String,
@@ -109,7 +115,7 @@ pub enum StreamInstallFrame {
FileBegin { FileBegin {
relative_path: String, relative_path: String,
size: u64, size: u64,
crc32: Option<u32>, crc32: u32,
}, },
FileChunk { FileChunk {
bytes: Bytes, bytes: Bytes,
@@ -180,26 +186,59 @@ impl Message for Response {
impl Message for StreamInstallFrame { impl Message for StreamInstallFrame {
fn decode(bytes: Bytes) -> Self { fn decode(bytes: Bytes) -> Self {
match serde_json::from_slice(&bytes) { if bytes.is_empty() {
Ok(t) => t, return stream_install_decode_error("stream install frame is empty");
Err(e) => {
tracing::error!(?e, "StreamInstallFrame decoding error");
StreamInstallFrame::Error {
message: format!("stream install frame decoding error: {e}"),
}
} }
let tag = bytes[0];
let payload = bytes.slice(1..);
match tag {
STREAM_INSTALL_CONTROL_FRAME_TAG => decode_stream_install_control_frame(&payload),
STREAM_INSTALL_FILE_CHUNK_FRAME_TAG => StreamInstallFrame::FileChunk { bytes: payload },
_ => stream_install_decode_error(format!("unknown stream install frame tag {tag}")),
} }
} }
fn encode(&self) -> Bytes { fn encode(&self) -> Bytes {
match serde_json::to_vec(self) { match self {
Ok(s) => Bytes::from(s), StreamInstallFrame::FileChunk { bytes } => {
tagged_stream_install_frame(STREAM_INSTALL_FILE_CHUNK_FRAME_TAG, bytes)
}
_ => match serde_json::to_vec(self) {
Ok(payload) => {
tagged_stream_install_frame(STREAM_INSTALL_CONTROL_FRAME_TAG, &payload)
}
Err(e) => { Err(e) => {
tracing::error!(?e, "StreamInstallFrame encoding error"); tracing::error!(?e, "StreamInstallFrame encoding error");
Bytes::from(format!( Bytes::from_static(STREAM_INSTALL_ENCODE_ERROR_FRAME)
r#"{{"Error": {{"message": "encoding error: {e}"}}}}"#
))
} }
},
} }
} }
} }
fn decode_stream_install_control_frame(payload: &[u8]) -> StreamInstallFrame {
match serde_json::from_slice(payload) {
Ok(StreamInstallFrame::FileChunk { .. }) => {
stream_install_decode_error("stream install control frame cannot contain file bytes")
}
Ok(frame) => frame,
Err(e) => {
tracing::error!(?e, "StreamInstallFrame decoding error");
stream_install_decode_error(format!("stream install frame decoding error: {e}"))
}
}
}
fn tagged_stream_install_frame(tag: u8, payload: &[u8]) -> Bytes {
let mut frame = Vec::with_capacity(1 + payload.len());
frame.push(tag);
frame.extend_from_slice(payload);
Bytes::from(frame)
}
fn stream_install_decode_error(message: impl Into<String>) -> StreamInstallFrame {
StreamInstallFrame::Error {
message: message.into(),
}
}
@@ -0,0 +1,42 @@
use bytes::Bytes;
use lanspread_proto::{Message, StreamInstallFrame};
#[test]
fn file_chunks_encode_raw_bytes() {
let bytes = Bytes::from_static(&[0, 1, 2, 255]);
let encoded = StreamInstallFrame::FileChunk {
bytes: bytes.clone(),
}
.encode();
assert_eq!(&encoded[..], &[1, 0, 1, 2, 255]);
assert_eq!(
StreamInstallFrame::decode(encoded),
StreamInstallFrame::FileChunk { bytes }
);
}
#[test]
fn control_frames_are_tagged_json() {
let frame = StreamInstallFrame::FileBegin {
relative_path: "bin/game.exe".to_string(),
size: 42,
crc32: 0x38B4_88A7,
};
let encoded = frame.encode();
assert_eq!(encoded[0], 0);
assert_eq!(StreamInstallFrame::decode(encoded), frame);
}
#[test]
fn empty_frames_decode_as_errors() {
match StreamInstallFrame::decode(Bytes::new()) {
StreamInstallFrame::Error { message } => {
assert!(message.contains("empty"));
}
other => {
panic!("expected error frame, got {other:?}");
}
}
}