use std::{ future::Future, net::SocketAddr, path::{Path, PathBuf}, pin::Pin, process::Stdio, sync::Arc, time::{Duration, Instant}, }; use bytes::Bytes; use crc32fast::Hasher; use futures::{SinkExt, StreamExt}; use lanspread_proto::{Message, Request, StreamInstallFrame}; use s2n_quic::stream::SendStream; use tokio::{ fs::File, io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, process::Command, sync::{mpsc, mpsc::UnboundedSender}, time::{self, MissedTickBehavior}, }; use tokio_util::{ codec::{FramedRead, FramedWrite, LengthDelimitedCodec}, sync::CancellationToken, }; use crate::{ DownloadProgress, PeerEvent, install::root_eti_archives, network::connect_to_peer, path_validation::validate_game_file_path, }; const FRAME_CHANNEL_DEPTH: usize = 16; const STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500); const STREAM_CHUNK_SIZE: usize = 256 * 1024; /// Integrity metadata advertised by the sender's RAR archive. /// /// This catches transport corruption, truncation, and provider bugs. It is not /// a trusted-content guarantee because a malicious peer controls both the bytes /// and the archive metadata. Trusted content would need catalog-owned hashes. #[derive(Debug, Clone, Copy, PartialEq, Eq)] struct SenderArchiveIntegrity { expected_size: u64, expected_crc32: u32, } impl SenderArchiveIntegrity { fn new(expected_size: u64, expected_crc32: u32) -> Self { Self { expected_size, expected_crc32, } } fn verify(self, relative_path: &str, received: u64, actual_crc32: u32) -> eyre::Result<()> { if received != self.expected_size { eyre::bail!( "streamed file {relative_path} size mismatch: got {received}, expected {}", self.expected_size ); } if actual_crc32 != self.expected_crc32 { eyre::bail!( "streamed file {relative_path} sender RAR CRC32 mismatch: got {actual_crc32:08X}, expected {:08X}", self.expected_crc32 ); } Ok(()) } } pub type StreamInstallFuture<'a> = Pin> + Send + 'a>>; pub trait StreamInstallProvider: Send + Sync { fn stream_archive<'a>( &'a self, archive: &'a Path, frames: mpsc::Sender, cancel_token: CancellationToken, ) -> StreamInstallFuture<'a>; } #[derive(Debug, Default)] pub struct NoopStreamInstallProvider; impl StreamInstallProvider for NoopStreamInstallProvider { fn stream_archive<'a>( &'a self, archive: &'a Path, _frames: mpsc::Sender, _cancel_token: CancellationToken, ) -> StreamInstallFuture<'a> { Box::pin(async move { eyre::bail!( "streamed install provider is not configured for {}", archive.display() ) }) } } #[derive(Debug)] pub struct ExternalUnrarStreamProvider { program: PathBuf, } impl ExternalUnrarStreamProvider { #[must_use] pub fn new(program: PathBuf) -> Self { Self { program } } } impl StreamInstallProvider for ExternalUnrarStreamProvider { fn stream_archive<'a>( &'a self, archive: &'a Path, frames: mpsc::Sender, cancel_token: CancellationToken, ) -> StreamInstallFuture<'a> { Box::pin(async move { let listing = unrar_listing(&self.program, archive).await?; let archive_name = archive .file_name() .and_then(|name| name.to_str()) .unwrap_or("archive.eti") .to_string(); send_stream_frame( &frames, StreamInstallFrame::ArchiveBegin { archive_name: archive_name.clone(), solid: listing.solid, unpacked_size: listing.unpacked_size(), }, ) .await?; stream_unrar_entries( &self.program, archive, &listing.entries, &frames, cancel_token.clone(), ) .await?; send_stream_frame(&frames, StreamInstallFrame::ArchiveEnd { archive_name }).await }) } } #[derive(Debug, Clone, PartialEq, Eq)] struct RarListing { solid: bool, entries: Vec, } impl RarListing { fn unpacked_size(&self) -> u64 { self.entries .iter() .filter(|entry| entry.kind == RarEntryKind::File) .map(|entry| entry.size) .sum() } } #[derive(Debug, Clone, PartialEq, Eq)] struct RarEntry { relative_path: String, kind: RarEntryKind, size: u64, crc32: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum RarEntryKind { File, Directory, } #[derive(Default)] struct RarEntryDraft { relative_path: Option, kind: Option, size: Option, crc32: Option, } async fn unrar_listing(program: &Path, archive: &Path) -> eyre::Result { let output = Command::new(program) .arg("lt") .arg("-cfg-") .arg(archive) .output() .await?; if !output.status.success() { eyre::bail!( "unrar lt failed for {} with status {}: {}", archive.display(), output.status, String::from_utf8_lossy(&output.stderr) ); } parse_unrar_listing(&String::from_utf8_lossy(&output.stdout)) } fn parse_unrar_listing(output: &str) -> eyre::Result { let mut solid = false; let mut entries = Vec::new(); let mut current = RarEntryDraft::default(); for line in output.lines() { let trimmed = line.trim(); if let Some(details) = trimmed.strip_prefix("Details:") { solid = details.to_ascii_lowercase().contains("solid"); continue; } if let Some(name) = trimmed.strip_prefix("Name:") { push_rar_entry(&mut entries, std::mem::take(&mut current))?; current.relative_path = Some(name.trim().to_string()); continue; } if let Some(kind) = trimmed.strip_prefix("Type:") { current.kind = match kind.trim() { "File" => Some(RarEntryKind::File), "Directory" => Some(RarEntryKind::Directory), _ => None, }; continue; } if let Some(size) = trimmed.strip_prefix("Size:") { current.size = Some(size.trim().parse()?); continue; } if let Some(crc) = trimmed.strip_prefix("CRC32:") { current.crc32 = Some(u32::from_str_radix(crc.trim(), 16)?); } } push_rar_entry(&mut entries, current)?; Ok(RarListing { solid, entries }) } fn push_rar_entry(entries: &mut Vec, draft: RarEntryDraft) -> eyre::Result<()> { let Some(relative_path) = draft.relative_path else { return Ok(()); }; let Some(kind) = draft.kind else { return Ok(()); }; let (size, crc32) = match kind { RarEntryKind::File => { let size = draft .size .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no Size"))?; let crc32 = draft .crc32 .ok_or_else(|| eyre::eyre!("RAR file entry {relative_path} has no CRC32"))?; (size, Some(crc32)) } RarEntryKind::Directory => (0, None), }; entries.push(RarEntry { relative_path, kind, size, crc32, }); Ok(()) } async fn stream_unrar_entries( program: &Path, archive: &Path, entries: &[RarEntry], frames: &mpsc::Sender, cancel_token: CancellationToken, ) -> eyre::Result<()> { let mut child = Command::new(program) .arg("p") .arg("-inul") .arg("-cfg-") .arg(archive) .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn()?; let result = async { let mut stdout = child .stdout .take() .ok_or_else(|| eyre::eyre!("unrar stdout was not captured"))?; let mut buffer = vec![0_u8; STREAM_CHUNK_SIZE]; for entry in entries { if cancel_token.is_cancelled() { eyre::bail!("streamed archive {} was cancelled", archive.display()); } match entry.kind { RarEntryKind::Directory => { send_stream_frame( frames, StreamInstallFrame::Directory { relative_path: entry.relative_path.clone(), }, ) .await?; } RarEntryKind::File => { let Some(crc32) = entry.crc32 else { eyre::bail!("RAR file entry {} has no CRC32", entry.relative_path); }; send_stream_frame( frames, StreamInstallFrame::FileBegin { relative_path: entry.relative_path.clone(), size: entry.size, crc32, }, ) .await?; stream_unrar_file_from_stdout( &mut stdout, archive, entry, frames, &mut buffer, &cancel_token, ) .await?; send_stream_frame( frames, StreamInstallFrame::FileEnd { relative_path: entry.relative_path.clone(), }, ) .await?; } } } let extra = read_unrar_stdout(&mut stdout, &mut buffer[..1], &cancel_token, archive).await?; if extra != 0 { eyre::bail!( "unrar produced bytes after listed entries for {}", archive.display() ); } let status = wait_unrar_child(&mut child, &cancel_token, archive).await?; if !status.success() { eyre::bail!( "unrar p failed for {} with status {status}", archive.display() ); } Ok(()) } .await; if result.is_err() { let _ = child.kill().await; } result } async fn stream_unrar_file_from_stdout( stdout: &mut (impl AsyncRead + Unpin), archive: &Path, entry: &RarEntry, frames: &mpsc::Sender, buffer: &mut [u8], cancel_token: &CancellationToken, ) -> eyre::Result<()> { let mut remaining = entry.size; while remaining > 0 { let read_len = usize::try_from(remaining.min(u64::try_from(buffer.len())?))?; let read = read_unrar_stdout(stdout, &mut buffer[..read_len], cancel_token, archive).await?; if read == 0 { eyre::bail!( "unrar ended while streaming {} from {}; {remaining} bytes missing", entry.relative_path, archive.display() ); } send_stream_frame( frames, StreamInstallFrame::FileChunk { bytes: Bytes::copy_from_slice(&buffer[..read]), }, ) .await?; remaining = remaining.saturating_sub(u64::try_from(read)?); } Ok(()) } async fn read_unrar_stdout( stdout: &mut (impl AsyncRead + Unpin), buffer: &mut [u8], cancel_token: &CancellationToken, archive: &Path, ) -> eyre::Result { tokio::select! { () = cancel_token.cancelled() => { eyre::bail!("streamed archive {} was cancelled", archive.display()); } read = stdout.read(buffer) => Ok(read?), } } async fn wait_unrar_child( child: &mut tokio::process::Child, cancel_token: &CancellationToken, archive: &Path, ) -> eyre::Result { tokio::select! { () = cancel_token.cancelled() => { let _ = child.kill().await; eyre::bail!("streamed archive {} was cancelled", archive.display()); } status = child.wait() => Ok(status?), } } async fn send_stream_frame( frames: &mpsc::Sender, frame: StreamInstallFrame, ) -> eyre::Result<()> { frames .send(frame) .await .map_err(|_| eyre::eyre!("streamed install frame receiver closed")) } pub(crate) async fn send_stream_install_error( tx: SendStream, message: impl Into, ) -> SendStream { let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); if let Err(err) = framed_tx .send( StreamInstallFrame::Error { message: message.into(), } .encode(), ) .await { log::warn!("Failed to send streamed install error frame: {err}"); } if let Err(err) = framed_tx.close().await { log::debug!("Failed to close streamed install error response: {err}"); } framed_tx.into_inner() } pub(crate) async fn send_game_install_stream( provider: Arc, tx: SendStream, game_root: &Path, game_id: &str, cancel_token: CancellationToken, ) -> (SendStream, eyre::Result<()>) { let archives = match root_eti_archives(game_root).await { Ok(archives) => archives, Err(err) => { let message = err.to_string(); let tx = send_stream_install_error(tx, message.clone()).await; return (tx, Err(eyre::eyre!(message))); } }; if archives.is_empty() { let message = format!("no .eti archives found for {game_id}"); let tx = send_stream_install_error(tx, message.clone()).await; return (tx, Err(eyre::eyre!(message))); } let (frame_tx, mut frame_rx) = mpsc::channel(FRAME_CHANNEL_DEPTH); let producer_cancel = cancel_token.child_token(); let game_id_for_producer = game_id.to_string(); let producer = tokio::spawn({ let provider = provider.clone(); let producer_cancel = producer_cancel.clone(); async move { for archive in archives { if producer_cancel.is_cancelled() { eyre::bail!("streamed install for {game_id_for_producer} was cancelled"); } if let Err(err) = provider .stream_archive(&archive, frame_tx.clone(), producer_cancel.clone()) .await { let message = err.to_string(); let _ = frame_tx.send(StreamInstallFrame::Error { message }).await; return Err(err); } } let _ = frame_tx.send(StreamInstallFrame::Complete).await; Ok(()) } }); let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); let mut send_result = Ok(()); while let Some(frame) = frame_rx.recv().await { if let Err(err) = framed_tx.send(frame.encode()).await { producer_cancel.cancel(); send_result = Err(eyre::eyre!("failed to send streamed install frame: {err}")); break; } } let close_result = framed_tx .close() .await .map_err(|err| eyre::eyre!("failed to close streamed install stream: {err}")); let tx = framed_tx.into_inner(); let producer_result = match producer.await { Ok(result) => result, Err(err) => Err(eyre::eyre!("streamed install producer task failed: {err}")), }; let result = send_result.and(producer_result).and(close_result); (tx, result) } pub(crate) async fn receive_streamed_install( peer_addr: SocketAddr, game_id: &str, staging_dir: &Path, tx_notify_ui: UnboundedSender, cancel_token: CancellationToken, ) -> eyre::Result<()> { let staging_dir = tokio::fs::canonicalize(staging_dir) .await .unwrap_or_else(|_| staging_dir.to_path_buf()); let mut conn = connect_to_peer(peer_addr).await?; let stream = conn.open_bidirectional_stream().await?; let (rx, tx) = stream.split(); let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new()); framed_tx .send( Request::StreamInstall { game_id: game_id.to_string(), } .encode(), ) .await?; framed_tx.close().await?; let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new()); let mut current_file: Option = None; let mut progress = StreamInstallProgress::new(game_id.to_string()); let mut progress_interval = time::interval(STREAM_INSTALL_PROGRESS_UPDATE_INTERVAL); progress_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); progress_interval.tick().await; loop { let next = tokio::select! { () = cancel_token.cancelled() => eyre::bail!("streamed install for {game_id} was cancelled"), _ = progress_interval.tick() => { progress.emit_current(&tx_notify_ui); continue; } next = framed_rx.next() => next, }; let Some(frame) = next else { eyre::bail!("streamed install ended before Complete"); }; let frame = frame?.freeze(); let frame = StreamInstallFrame::decode(frame); match frame { StreamInstallFrame::ArchiveBegin { archive_name, solid, unpacked_size, } => { progress.add_total(unpacked_size); progress.emit_snapshot(&tx_notify_ui, 0); log::info!( "Receiving streamed install archive {archive_name} for {game_id} \ (solid={solid}, unpacked_size={unpacked_size})" ); } StreamInstallFrame::Directory { relative_path } => { let path = resolve_stream_path(&staging_dir, &relative_path)?; tokio::fs::create_dir_all(path).await?; } StreamInstallFrame::FileBegin { relative_path, size, crc32, } => { if current_file.is_some() { eyre::bail!("received FileBegin for {relative_path} before previous FileEnd"); } let path = resolve_stream_path(&staging_dir, &relative_path)?; if let Some(parent) = path.parent() { tokio::fs::create_dir_all(parent).await?; } let file = File::create(&path).await?; current_file = Some(IncomingFile::new(relative_path, path, size, crc32, file)); } StreamInstallFrame::FileChunk { bytes } => { let Some(file) = current_file.as_mut() else { eyre::bail!("received FileChunk without FileBegin"); }; let length = file .write_chunk(game_id, peer_addr, &tx_notify_ui, bytes) .await?; progress.record_bytes(length); } StreamInstallFrame::FileEnd { relative_path } => { let Some(file) = current_file.take() else { eyre::bail!("received FileEnd for {relative_path} without FileBegin"); }; file.finish(&relative_path).await?; } StreamInstallFrame::ArchiveEnd { archive_name } => { log::info!("Finished streamed install archive {archive_name} for {game_id}"); } StreamInstallFrame::Complete => { if current_file.is_some() { eyre::bail!("streamed install completed with an open file"); } progress.emit_snapshot(&tx_notify_ui, 0); return Ok(()); } StreamInstallFrame::Error { message } => { eyre::bail!("streamed install sender failed: {message}"); } } } } struct StreamInstallProgress { id: String, total_bytes: u64, downloaded_bytes: u64, last_downloaded_bytes: u64, last_at: Instant, } impl StreamInstallProgress { fn new(id: String) -> Self { Self { id, total_bytes: 0, downloaded_bytes: 0, last_downloaded_bytes: 0, last_at: Instant::now(), } } fn add_total(&mut self, bytes: u64) { self.total_bytes = self.total_bytes.saturating_add(bytes); } fn record_bytes(&mut self, bytes: u64) { self.downloaded_bytes = self.downloaded_bytes.saturating_add(bytes); } fn emit_current(&mut self, tx_notify_ui: &UnboundedSender) { let now = Instant::now(); let speed = bytes_per_second( self.downloaded_bytes .saturating_sub(self.last_downloaded_bytes), now.duration_since(self.last_at), ); self.last_downloaded_bytes = self.downloaded_bytes; self.last_at = now; self.emit_snapshot(tx_notify_ui, speed); } fn emit_snapshot(&self, tx_notify_ui: &UnboundedSender, bytes_per_second: u64) { let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesProgress(DownloadProgress { id: self.id.clone(), downloaded_bytes: self.downloaded_bytes, total_bytes: self.total_bytes, bytes_per_second, active_peer_count: 1, })); } } fn bytes_per_second(bytes: u64, elapsed: Duration) -> u64 { let millis = elapsed.as_millis().max(1); let rate = u128::from(bytes).saturating_mul(1_000) / millis; u64::try_from(rate).unwrap_or(u64::MAX) } struct IncomingFile { relative_path: String, path: PathBuf, integrity: SenderArchiveIntegrity, received: u64, crc32: Hasher, file: File, } impl IncomingFile { fn new( relative_path: String, path: PathBuf, expected_size: u64, expected_crc32: u32, file: File, ) -> Self { Self { relative_path, path, integrity: SenderArchiveIntegrity::new(expected_size, expected_crc32), received: 0, crc32: Hasher::new(), file, } } async fn write_chunk( &mut self, game_id: &str, peer_addr: SocketAddr, tx_notify_ui: &UnboundedSender, bytes: Bytes, ) -> eyre::Result { let offset = self.received; let length = u64::try_from(bytes.len())?; if offset.saturating_add(length) > self.integrity.expected_size { eyre::bail!( "streamed file {} exceeded expected size {}", self.relative_path, self.integrity.expected_size ); } self.file.write_all(&bytes).await?; self.crc32.update(&bytes); self.received = self.received.saturating_add(length); let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished { id: game_id.to_string(), peer_addr, relative_path: format!("{game_id}/.local.installing/{}", self.relative_path), offset, length, }); Ok(length) } async fn finish(mut self, relative_path: &str) -> eyre::Result<()> { if self.relative_path != relative_path { eyre::bail!( "streamed file end mismatch: began {}, ended {relative_path}", self.relative_path ); } self.file.flush().await?; let actual_crc32 = self.crc32.finalize(); self.integrity .verify(&self.relative_path, self.received, actual_crc32)?; log::debug!( "Received streamed file {} -> {}", self.relative_path, self.path.display() ); Ok(()) } } fn resolve_stream_path(staging_dir: &Path, relative_path: &str) -> eyre::Result { validate_game_file_path(staging_dir, relative_path) } #[cfg(test)] mod tests { use super::*; use crate::test_support::TempDir; #[test] fn stream_paths_stay_inside_staging_dir() { let temp = TempDir::new("lanspread-stream-install-path"); let staging = temp.path().join("staging"); std::fs::create_dir_all(&staging).expect("staging should be created"); let staging = std::fs::canonicalize(staging).expect("staging should canonicalize"); assert!(resolve_stream_path(&staging, "bin/game.exe").is_ok()); assert!(resolve_stream_path(&staging, "../outside").is_err()); assert!(resolve_stream_path(&staging, "/absolute").is_err()); assert!(resolve_stream_path(&staging, "C:/windows").is_err()); } #[test] fn parses_unrar_technical_listing() { let listing = parse_unrar_listing( r#" Archive: game.eti Details: RAR 5, solid Name: bin/payload.bin Type: File Size: 123 CRC32: 38B488A7 Name: bin Type: Directory "#, ) .expect("listing should parse"); assert!(listing.solid); assert_eq!( listing.entries, vec![ RarEntry { relative_path: "bin/payload.bin".to_string(), kind: RarEntryKind::File, size: 123, crc32: Some(0x38B4_88A7), }, RarEntry { relative_path: "bin".to_string(), kind: RarEntryKind::Directory, size: 0, crc32: None, }, ] ); } #[test] fn rejects_unrar_file_entries_without_crc32() { let err = parse_unrar_listing( r#" Archive: game.eti Details: RAR 5 Name: bin/payload.bin Type: File Size: 123 "#, ) .expect_err("file entries without CRC32 should be rejected"); assert!(err.to_string().contains("has no CRC32")); } #[test] fn sender_archive_integrity_accepts_matching_size_and_crc32() { let bytes = b"payload"; let integrity = SenderArchiveIntegrity::new(u64::try_from(bytes.len()).unwrap(), crc32_of(bytes)); integrity .verify( "bin/payload.bin", u64::try_from(bytes.len()).unwrap(), crc32_of(bytes), ) .expect("matching sender archive metadata should verify"); } #[test] fn sender_archive_integrity_rejects_size_mismatch() { let integrity = SenderArchiveIntegrity::new(7, crc32_of(b"payload")); let err = integrity .verify("bin/payload.bin", 6, crc32_of(b"payload")) .expect_err("truncated file should fail sender archive integrity"); assert!(err.to_string().contains("size mismatch")); } #[test] fn sender_archive_integrity_rejects_crc32_mismatch() { let integrity = SenderArchiveIntegrity::new(7, crc32_of(b"payload")); let err = integrity .verify("bin/payload.bin", 7, crc32_of(b"paylord")) .expect_err("mutated file should fail sender archive integrity"); assert!(err.to_string().contains("sender RAR CRC32 mismatch")); } fn crc32_of(bytes: &[u8]) -> u32 { let mut hasher = Hasher::new(); hasher.update(bytes); hasher.finalize() } }