diff --git a/crates/lanspread-peer/README.md b/crates/lanspread-peer/README.md index 32ff7ef..235b57c 100644 --- a/crates/lanspread-peer/README.md +++ b/crates/lanspread-peer/README.md @@ -75,13 +75,16 @@ When the UI asks to download a game: keeps existing data intact and only truncates when we intentionally fall back to a full file transfer, which prevents corruption when multiple peers fill different regions of the same file. -5. `version.ini` chunks are buffered in memory and committed last via +5. `DownloadProgressTracker` samples byte counters, transfer speed, and the + number of unique peers that are actively streaming chunks. The Tauri UI sees + those values together through the regular download-progress event. +6. `version.ini` chunks are buffered in memory and committed last via `.version.ini.tmp` followed by an atomic rename. Failures are accumulated and retried (up to `MAX_RETRY_COUNT`) via `retry_failed_chunks`; failed downloads sweep `.version.ini.tmp` and `.version.ini.discarded` without restoring the previous sentinel. Cancelled downloads also discard the peer-owned download payload while preserving `local/` and install transaction metadata. -6. After a successful sentinel commit, `PeerEvent::DownloadGameFilesFinished` +7. After a successful sentinel commit, `PeerEvent::DownloadGameFilesFinished` is emitted and the peer auto-runs the install transaction. `PeerCommand::CancelDownload` cancels the tracked download token for an active diff --git a/crates/lanspread-peer/src/download/progress.rs b/crates/lanspread-peer/src/download/progress.rs index 113b984..43d4c05 100644 --- a/crates/lanspread-peer/src/download/progress.rs +++ b/crates/lanspread-peer/src/download/progress.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, future::Future, + net::SocketAddr, sync::{ Arc, Mutex, @@ -29,6 +30,7 @@ pub(super) struct DownloadProgressTracker { downloaded_bytes: AtomicU64, transferred_bytes: AtomicU64, chunks: Mutex>, + active_peers: Mutex>, } impl DownloadProgressTracker { @@ -38,11 +40,13 @@ impl DownloadProgressTracker { downloaded_bytes: AtomicU64::new(0), transferred_bytes: AtomicU64::new(0), chunks: Mutex::new(HashMap::new()), + active_peers: Mutex::new(HashMap::new()), }) } pub(super) fn track_chunk( self: &Arc, + peer_addr: SocketAddr, relative_path: &str, offset: u64, expected_bytes: u64, @@ -53,6 +57,7 @@ impl DownloadProgressTracker { relative_path: relative_path.to_string(), offset, }, + _peer_activity: self.track_active_peer(peer_addr), expected_bytes, received_bytes: 0, } @@ -104,12 +109,51 @@ impl DownloadProgressTracker { } } + fn track_active_peer(self: &Arc, peer_addr: SocketAddr) -> ActivePeerDownload { + { + let mut active_peers = self + .active_peers + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *active_peers.entry(peer_addr).or_default() += 1; + } + + ActivePeerDownload { + tracker: self.clone(), + peer_addr, + } + } + + fn finish_active_peer(&self, peer_addr: SocketAddr) { + let mut active_peers = self + .active_peers + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + + let Some(count) = active_peers.get_mut(&peer_addr) else { + return; + }; + if *count <= 1 { + active_peers.remove(&peer_addr); + } else { + *count -= 1; + } + } + + fn active_peer_count(&self) -> usize { + self.active_peers + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .len() + } + fn snapshot(&self, id: &str, bytes_per_second: u64) -> DownloadProgress { DownloadProgress { id: id.to_string(), downloaded_bytes: self.reported_downloaded_bytes(), total_bytes: self.total_bytes, bytes_per_second, + active_peer_count: self.active_peer_count(), } } } @@ -117,6 +161,7 @@ impl DownloadProgressTracker { pub(super) struct ChunkProgress { tracker: Arc, key: ChunkProgressKey, + _peer_activity: ActivePeerDownload, expected_bytes: u64, received_bytes: u64, } @@ -136,6 +181,17 @@ impl ChunkProgress { } } +struct ActivePeerDownload { + tracker: Arc, + peer_addr: SocketAddr, +} + +impl Drop for ActivePeerDownload { + fn drop(&mut self) { + self.tracker.finish_active_peer(self.peer_addr); + } +} + fn add_saturating(counter: &AtomicU64, delta: u64) { let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { Some(current.saturating_add(delta)) @@ -230,14 +286,19 @@ where mod tests { use super::*; + fn loopback_addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + #[test] fn tracker_counts_only_new_bytes_for_a_retried_chunk() { let tracker = DownloadProgressTracker::new(100); - let mut first_attempt = tracker.track_chunk("game/file.bin", 0, 100); + let peer = loopback_addr(12000); + let mut first_attempt = tracker.track_chunk(peer, "game/file.bin", 0, 100); first_attempt.record_bytes(40); first_attempt.record_bytes(10); - let mut retry = tracker.track_chunk("game/file.bin", 0, 100); + let mut retry = tracker.track_chunk(peer, "game/file.bin", 0, 100); retry.record_bytes(25); retry.record_bytes(50); @@ -248,10 +309,27 @@ mod tests { #[test] fn tracker_clamps_reported_bytes_to_total() { let tracker = DownloadProgressTracker::new(10); - let mut chunk = tracker.track_chunk("game/file.bin", 0, 0); + let mut chunk = tracker.track_chunk(loopback_addr(12000), "game/file.bin", 0, 0); chunk.record_bytes(25); assert_eq!(tracker.raw_downloaded_bytes(), 25); assert_eq!(tracker.reported_downloaded_bytes(), 10); } + + #[test] + fn tracker_reports_unique_active_peer_count() { + let tracker = DownloadProgressTracker::new(100); + let first_peer = loopback_addr(12000); + let second_peer = loopback_addr(12001); + + { + let _first_chunk = tracker.track_chunk(first_peer, "game/file.bin", 0, 50); + let _second_chunk = tracker.track_chunk(first_peer, "game/file.bin", 50, 50); + let _third_chunk = tracker.track_chunk(second_peer, "game/other.bin", 0, 10); + + assert_eq!(tracker.snapshot("game", 0).active_peer_count, 2); + } + + assert_eq!(tracker.snapshot("game", 0).active_peer_count, 0); + } } diff --git a/crates/lanspread-peer/src/download/transport.rs b/crates/lanspread-peer/src/download/transport.rs index 8329e1b..31ea90c 100644 --- a/crates/lanspread-peer/src/download/transport.rs +++ b/crates/lanspread-peer/src/download/transport.rs @@ -62,6 +62,7 @@ async fn open_chunk_stream( /// Receives one requested chunk from a peer stream. async fn receive_chunk( + peer_addr: SocketAddr, mut rx: ReceiveStream, base_dir: &Path, chunk: &DownloadChunk, @@ -71,7 +72,7 @@ async fn receive_chunk( if let Some(buffer) = version_buffer && buffer.matches(&chunk.relative_path) { - return download_version_ini_chunk(rx, chunk, &buffer, progress_tracker).await; + return download_version_ini_chunk(peer_addr, rx, chunk, &buffer, progress_tracker).await; } // Validate the path to prevent directory traversal @@ -91,7 +92,7 @@ async fn receive_chunk( let mut remaining = chunk.length; let mut received_bytes = 0u64; let mut progress = - progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length); + progress_tracker.track_chunk(peer_addr, &chunk.relative_path, chunk.offset, chunk.length); while let Some(bytes) = rx.receive().await? { file.write_all(&bytes).await?; @@ -135,7 +136,15 @@ async fn receive_chunk_result( version_buffer: Option>, progress_tracker: Arc, ) -> ChunkDownloadResult { - let result = receive_chunk(rx, &base_dir, &chunk, version_buffer, progress_tracker).await; + let result = receive_chunk( + peer_addr, + rx, + &base_dir, + &chunk, + version_buffer, + progress_tracker, + ) + .await; ChunkDownloadResult { chunk, result, @@ -144,6 +153,7 @@ async fn receive_chunk_result( } async fn download_version_ini_chunk( + peer_addr: SocketAddr, mut rx: ReceiveStream, chunk: &DownloadChunk, buffer: &VersionIniBuffer, @@ -151,7 +161,7 @@ async fn download_version_ini_chunk( ) -> eyre::Result<()> { let mut received = Vec::new(); let mut progress = - progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length); + progress_tracker.track_chunk(peer_addr, &chunk.relative_path, chunk.offset, chunk.length); while let Some(bytes) = rx.receive().await? { progress.record_bytes(bytes.len()); received.extend_from_slice(&bytes); diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 99e333a..bdff499 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -162,6 +162,8 @@ pub struct DownloadProgress { pub downloaded_bytes: u64, pub total_bytes: u64, pub bytes_per_second: u64, + /// Unique peers currently streaming at least one chunk for this download. + pub active_peer_count: usize, } /// Long-running peer runtime components reported in failure events. diff --git a/crates/lanspread-tauri-deno-ts/src/lib/types.ts b/crates/lanspread-tauri-deno-ts/src/lib/types.ts index 1b17847..608c14a 100644 --- a/crates/lanspread-tauri-deno-ts/src/lib/types.ts +++ b/crates/lanspread-tauri-deno-ts/src/lib/types.ts @@ -27,6 +27,7 @@ export interface DownloadProgress { downloaded_bytes: number; total_bytes: number; bytes_per_second: number; + active_peer_count: number; } export interface DownloadProgressPayload extends DownloadProgress { diff --git a/crates/lanspread-tauri-deno-ts/tests/gameState.test.ts b/crates/lanspread-tauri-deno-ts/tests/gameState.test.ts index 5fcc41f..7b433dd 100644 --- a/crates/lanspread-tauri-deno-ts/tests/gameState.test.ts +++ b/crates/lanspread-tauri-deno-ts/tests/gameState.test.ts @@ -99,6 +99,7 @@ Deno.test('download progress is preserved only while actively downloading', () = downloaded_bytes: 50, total_bytes: 100, bytes_per_second: 12_500_000, + active_peer_count: 2, }, }); @@ -114,6 +115,11 @@ Deno.test('download progress is preserved only while actively downloading', () = 50, 'active download snapshot should keep progress', ); + assertEquals( + stillDownloading.download_progress?.active_peer_count, + 2, + 'active download snapshot should keep live peer count', + ); assertEquals( settled.download_progress, undefined, @@ -128,6 +134,7 @@ Deno.test('downloading action label includes current speed', () => { downloaded_bytes: 50, total_bytes: 100, bytes_per_second: 12_500_000, + active_peer_count: 2, }, }); @@ -184,6 +191,7 @@ Deno.test('download progress formatting matches the progress-bar layouts', () => downloaded_bytes: 12 * 1024 * 1024 * 1024, total_bytes: 35 * 1024 * 1024 * 1024, bytes_per_second: 49_400_000, + active_peer_count: 3, }, });