From b56f4e27578f12eb162540e00822f64dd5df9f12 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Thu, 21 May 2026 00:28:08 +0200 Subject: [PATCH] feat(peer): expose active download peer count The launcher needs design work later for showing how many peers are currently feeding an active download. Surface that data now on the existing progress payload so UI state can consume it without a separate event stream or rendering change. The peer download tracker now treats each live chunk receive as peer activity and reports the number of unique peers with in-flight streams. This is a live transfer count, not the number of peers that advertised the game or received a plan. Multiple chunks from one peer count once, and the count falls as chunk streams finish. Tauri already forwards DownloadGameFilesProgress, so no bridge event was added. The TypeScript model accepts active_peer_count under download_progress and preserves it with the same reducer path that keeps bytes and speed while the backend says the game is still downloading. Test Plan: - just fmt - RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test - just frontend-test - RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just clippy - git diff --check - git diff --cached --check Refs: none --- crates/lanspread-peer/README.md | 7 +- .../lanspread-peer/src/download/progress.rs | 84 ++++++++++++++++++- .../lanspread-peer/src/download/transport.rs | 18 +++- crates/lanspread-peer/src/lib.rs | 2 + .../lanspread-tauri-deno-ts/src/lib/types.ts | 1 + .../tests/gameState.test.ts | 8 ++ 6 files changed, 111 insertions(+), 9 deletions(-) diff --git a/crates/lanspread-peer/README.md b/crates/lanspread-peer/README.md index 32ff7ef..235b57c 100644 --- a/crates/lanspread-peer/README.md +++ b/crates/lanspread-peer/README.md @@ -75,13 +75,16 @@ When the UI asks to download a game: keeps existing data intact and only truncates when we intentionally fall back to a full file transfer, which prevents corruption when multiple peers fill different regions of the same file. -5. `version.ini` chunks are buffered in memory and committed last via +5. `DownloadProgressTracker` samples byte counters, transfer speed, and the + number of unique peers that are actively streaming chunks. The Tauri UI sees + those values together through the regular download-progress event. +6. `version.ini` chunks are buffered in memory and committed last via `.version.ini.tmp` followed by an atomic rename. Failures are accumulated and retried (up to `MAX_RETRY_COUNT`) via `retry_failed_chunks`; failed downloads sweep `.version.ini.tmp` and `.version.ini.discarded` without restoring the previous sentinel. Cancelled downloads also discard the peer-owned download payload while preserving `local/` and install transaction metadata. -6. After a successful sentinel commit, `PeerEvent::DownloadGameFilesFinished` +7. After a successful sentinel commit, `PeerEvent::DownloadGameFilesFinished` is emitted and the peer auto-runs the install transaction. `PeerCommand::CancelDownload` cancels the tracked download token for an active diff --git a/crates/lanspread-peer/src/download/progress.rs b/crates/lanspread-peer/src/download/progress.rs index 113b984..43d4c05 100644 --- a/crates/lanspread-peer/src/download/progress.rs +++ b/crates/lanspread-peer/src/download/progress.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, future::Future, + net::SocketAddr, sync::{ Arc, Mutex, @@ -29,6 +30,7 @@ pub(super) struct DownloadProgressTracker { downloaded_bytes: AtomicU64, transferred_bytes: AtomicU64, chunks: Mutex>, + active_peers: Mutex>, } impl DownloadProgressTracker { @@ -38,11 +40,13 @@ impl DownloadProgressTracker { downloaded_bytes: AtomicU64::new(0), transferred_bytes: AtomicU64::new(0), chunks: Mutex::new(HashMap::new()), + active_peers: Mutex::new(HashMap::new()), }) } pub(super) fn track_chunk( self: &Arc, + peer_addr: SocketAddr, relative_path: &str, offset: u64, expected_bytes: u64, @@ -53,6 +57,7 @@ impl DownloadProgressTracker { relative_path: relative_path.to_string(), offset, }, + _peer_activity: self.track_active_peer(peer_addr), expected_bytes, received_bytes: 0, } @@ -104,12 +109,51 @@ impl DownloadProgressTracker { } } + fn track_active_peer(self: &Arc, peer_addr: SocketAddr) -> ActivePeerDownload { + { + let mut active_peers = self + .active_peers + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *active_peers.entry(peer_addr).or_default() += 1; + } + + ActivePeerDownload { + tracker: self.clone(), + peer_addr, + } + } + + fn finish_active_peer(&self, peer_addr: SocketAddr) { + let mut active_peers = self + .active_peers + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + + let Some(count) = active_peers.get_mut(&peer_addr) else { + return; + }; + if *count <= 1 { + active_peers.remove(&peer_addr); + } else { + *count -= 1; + } + } + + fn active_peer_count(&self) -> usize { + self.active_peers + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .len() + } + fn snapshot(&self, id: &str, bytes_per_second: u64) -> DownloadProgress { DownloadProgress { id: id.to_string(), downloaded_bytes: self.reported_downloaded_bytes(), total_bytes: self.total_bytes, bytes_per_second, + active_peer_count: self.active_peer_count(), } } } @@ -117,6 +161,7 @@ impl DownloadProgressTracker { pub(super) struct ChunkProgress { tracker: Arc, key: ChunkProgressKey, + _peer_activity: ActivePeerDownload, expected_bytes: u64, received_bytes: u64, } @@ -136,6 +181,17 @@ impl ChunkProgress { } } +struct ActivePeerDownload { + tracker: Arc, + peer_addr: SocketAddr, +} + +impl Drop for ActivePeerDownload { + fn drop(&mut self) { + self.tracker.finish_active_peer(self.peer_addr); + } +} + fn add_saturating(counter: &AtomicU64, delta: u64) { let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { Some(current.saturating_add(delta)) @@ -230,14 +286,19 @@ where mod tests { use super::*; + fn loopback_addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + #[test] fn tracker_counts_only_new_bytes_for_a_retried_chunk() { let tracker = DownloadProgressTracker::new(100); - let mut first_attempt = tracker.track_chunk("game/file.bin", 0, 100); + let peer = loopback_addr(12000); + let mut first_attempt = tracker.track_chunk(peer, "game/file.bin", 0, 100); first_attempt.record_bytes(40); first_attempt.record_bytes(10); - let mut retry = tracker.track_chunk("game/file.bin", 0, 100); + let mut retry = tracker.track_chunk(peer, "game/file.bin", 0, 100); retry.record_bytes(25); retry.record_bytes(50); @@ -248,10 +309,27 @@ mod tests { #[test] fn tracker_clamps_reported_bytes_to_total() { let tracker = DownloadProgressTracker::new(10); - let mut chunk = tracker.track_chunk("game/file.bin", 0, 0); + let mut chunk = tracker.track_chunk(loopback_addr(12000), "game/file.bin", 0, 0); chunk.record_bytes(25); assert_eq!(tracker.raw_downloaded_bytes(), 25); assert_eq!(tracker.reported_downloaded_bytes(), 10); } + + #[test] + fn tracker_reports_unique_active_peer_count() { + let tracker = DownloadProgressTracker::new(100); + let first_peer = loopback_addr(12000); + let second_peer = loopback_addr(12001); + + { + let _first_chunk = tracker.track_chunk(first_peer, "game/file.bin", 0, 50); + let _second_chunk = tracker.track_chunk(first_peer, "game/file.bin", 50, 50); + let _third_chunk = tracker.track_chunk(second_peer, "game/other.bin", 0, 10); + + assert_eq!(tracker.snapshot("game", 0).active_peer_count, 2); + } + + assert_eq!(tracker.snapshot("game", 0).active_peer_count, 0); + } } diff --git a/crates/lanspread-peer/src/download/transport.rs b/crates/lanspread-peer/src/download/transport.rs index 8329e1b..31ea90c 100644 --- a/crates/lanspread-peer/src/download/transport.rs +++ b/crates/lanspread-peer/src/download/transport.rs @@ -62,6 +62,7 @@ async fn open_chunk_stream( /// Receives one requested chunk from a peer stream. async fn receive_chunk( + peer_addr: SocketAddr, mut rx: ReceiveStream, base_dir: &Path, chunk: &DownloadChunk, @@ -71,7 +72,7 @@ async fn receive_chunk( if let Some(buffer) = version_buffer && buffer.matches(&chunk.relative_path) { - return download_version_ini_chunk(rx, chunk, &buffer, progress_tracker).await; + return download_version_ini_chunk(peer_addr, rx, chunk, &buffer, progress_tracker).await; } // Validate the path to prevent directory traversal @@ -91,7 +92,7 @@ async fn receive_chunk( let mut remaining = chunk.length; let mut received_bytes = 0u64; let mut progress = - progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length); + progress_tracker.track_chunk(peer_addr, &chunk.relative_path, chunk.offset, chunk.length); while let Some(bytes) = rx.receive().await? { file.write_all(&bytes).await?; @@ -135,7 +136,15 @@ async fn receive_chunk_result( version_buffer: Option>, progress_tracker: Arc, ) -> ChunkDownloadResult { - let result = receive_chunk(rx, &base_dir, &chunk, version_buffer, progress_tracker).await; + let result = receive_chunk( + peer_addr, + rx, + &base_dir, + &chunk, + version_buffer, + progress_tracker, + ) + .await; ChunkDownloadResult { chunk, result, @@ -144,6 +153,7 @@ async fn receive_chunk_result( } async fn download_version_ini_chunk( + peer_addr: SocketAddr, mut rx: ReceiveStream, chunk: &DownloadChunk, buffer: &VersionIniBuffer, @@ -151,7 +161,7 @@ async fn download_version_ini_chunk( ) -> eyre::Result<()> { let mut received = Vec::new(); let mut progress = - progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length); + progress_tracker.track_chunk(peer_addr, &chunk.relative_path, chunk.offset, chunk.length); while let Some(bytes) = rx.receive().await? { progress.record_bytes(bytes.len()); received.extend_from_slice(&bytes); diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 99e333a..bdff499 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -162,6 +162,8 @@ pub struct DownloadProgress { pub downloaded_bytes: u64, pub total_bytes: u64, pub bytes_per_second: u64, + /// Unique peers currently streaming at least one chunk for this download. + pub active_peer_count: usize, } /// Long-running peer runtime components reported in failure events. diff --git a/crates/lanspread-tauri-deno-ts/src/lib/types.ts b/crates/lanspread-tauri-deno-ts/src/lib/types.ts index 1b17847..608c14a 100644 --- a/crates/lanspread-tauri-deno-ts/src/lib/types.ts +++ b/crates/lanspread-tauri-deno-ts/src/lib/types.ts @@ -27,6 +27,7 @@ export interface DownloadProgress { downloaded_bytes: number; total_bytes: number; bytes_per_second: number; + active_peer_count: number; } export interface DownloadProgressPayload extends DownloadProgress { diff --git a/crates/lanspread-tauri-deno-ts/tests/gameState.test.ts b/crates/lanspread-tauri-deno-ts/tests/gameState.test.ts index 5fcc41f..7b433dd 100644 --- a/crates/lanspread-tauri-deno-ts/tests/gameState.test.ts +++ b/crates/lanspread-tauri-deno-ts/tests/gameState.test.ts @@ -99,6 +99,7 @@ Deno.test('download progress is preserved only while actively downloading', () = downloaded_bytes: 50, total_bytes: 100, bytes_per_second: 12_500_000, + active_peer_count: 2, }, }); @@ -114,6 +115,11 @@ Deno.test('download progress is preserved only while actively downloading', () = 50, 'active download snapshot should keep progress', ); + assertEquals( + stillDownloading.download_progress?.active_peer_count, + 2, + 'active download snapshot should keep live peer count', + ); assertEquals( settled.download_progress, undefined, @@ -128,6 +134,7 @@ Deno.test('downloading action label includes current speed', () => { downloaded_bytes: 50, total_bytes: 100, bytes_per_second: 12_500_000, + active_peer_count: 2, }, }); @@ -184,6 +191,7 @@ Deno.test('download progress formatting matches the progress-bar layouts', () => downloaded_bytes: 12 * 1024 * 1024 * 1024, total_bytes: 35 * 1024 * 1024 * 1024, bytes_per_second: 49_400_000, + active_peer_count: 3, }, });