feat(peer): expose active download peer count

The launcher needs design work later for showing how many peers are currently
feeding an active download. Surface that data now on the existing progress
payload so UI state can consume it without a separate event stream or rendering
change.

The peer download tracker now treats each live chunk receive as peer activity
and reports the number of unique peers with in-flight streams. This is a live
transfer count, not the number of peers that advertised the game or received a
plan. Multiple chunks from one peer count once, and the count falls as chunk
streams finish.

Tauri already forwards DownloadGameFilesProgress, so no bridge event was added.
The TypeScript model accepts active_peer_count under download_progress and
preserves it with the same reducer path that keeps bytes and speed while the
backend says the game is still downloading.

Test Plan:
- just fmt
- RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test
- just frontend-test
- RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just clippy
- git diff --check
- git diff --cached --check

Refs: none
This commit is contained in:
2026-05-21 00:28:08 +02:00
parent 7e97d6a83a
commit b56f4e2757
6 changed files with 111 additions and 9 deletions
+81 -3
View File
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
future::Future,
net::SocketAddr,
sync::{
Arc,
Mutex,
@@ -29,6 +30,7 @@ pub(super) struct DownloadProgressTracker {
downloaded_bytes: AtomicU64,
transferred_bytes: AtomicU64,
chunks: Mutex<HashMap<ChunkProgressKey, u64>>,
active_peers: Mutex<HashMap<SocketAddr, usize>>,
}
impl DownloadProgressTracker {
@@ -38,11 +40,13 @@ impl DownloadProgressTracker {
downloaded_bytes: AtomicU64::new(0),
transferred_bytes: AtomicU64::new(0),
chunks: Mutex::new(HashMap::new()),
active_peers: Mutex::new(HashMap::new()),
})
}
pub(super) fn track_chunk(
self: &Arc<Self>,
peer_addr: SocketAddr,
relative_path: &str,
offset: u64,
expected_bytes: u64,
@@ -53,6 +57,7 @@ impl DownloadProgressTracker {
relative_path: relative_path.to_string(),
offset,
},
_peer_activity: self.track_active_peer(peer_addr),
expected_bytes,
received_bytes: 0,
}
@@ -104,12 +109,51 @@ impl DownloadProgressTracker {
}
}
fn track_active_peer(self: &Arc<Self>, peer_addr: SocketAddr) -> ActivePeerDownload {
{
let mut active_peers = self
.active_peers
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*active_peers.entry(peer_addr).or_default() += 1;
}
ActivePeerDownload {
tracker: self.clone(),
peer_addr,
}
}
fn finish_active_peer(&self, peer_addr: SocketAddr) {
let mut active_peers = self
.active_peers
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let Some(count) = active_peers.get_mut(&peer_addr) else {
return;
};
if *count <= 1 {
active_peers.remove(&peer_addr);
} else {
*count -= 1;
}
}
fn active_peer_count(&self) -> usize {
self.active_peers
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.len()
}
fn snapshot(&self, id: &str, bytes_per_second: u64) -> DownloadProgress {
DownloadProgress {
id: id.to_string(),
downloaded_bytes: self.reported_downloaded_bytes(),
total_bytes: self.total_bytes,
bytes_per_second,
active_peer_count: self.active_peer_count(),
}
}
}
@@ -117,6 +161,7 @@ impl DownloadProgressTracker {
pub(super) struct ChunkProgress {
tracker: Arc<DownloadProgressTracker>,
key: ChunkProgressKey,
_peer_activity: ActivePeerDownload,
expected_bytes: u64,
received_bytes: u64,
}
@@ -136,6 +181,17 @@ impl ChunkProgress {
}
}
struct ActivePeerDownload {
tracker: Arc<DownloadProgressTracker>,
peer_addr: SocketAddr,
}
impl Drop for ActivePeerDownload {
fn drop(&mut self) {
self.tracker.finish_active_peer(self.peer_addr);
}
}
fn add_saturating(counter: &AtomicU64, delta: u64) {
let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_add(delta))
@@ -230,14 +286,19 @@ where
mod tests {
use super::*;
fn loopback_addr(port: u16) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], port))
}
#[test]
fn tracker_counts_only_new_bytes_for_a_retried_chunk() {
let tracker = DownloadProgressTracker::new(100);
let mut first_attempt = tracker.track_chunk("game/file.bin", 0, 100);
let peer = loopback_addr(12000);
let mut first_attempt = tracker.track_chunk(peer, "game/file.bin", 0, 100);
first_attempt.record_bytes(40);
first_attempt.record_bytes(10);
let mut retry = tracker.track_chunk("game/file.bin", 0, 100);
let mut retry = tracker.track_chunk(peer, "game/file.bin", 0, 100);
retry.record_bytes(25);
retry.record_bytes(50);
@@ -248,10 +309,27 @@ mod tests {
#[test]
fn tracker_clamps_reported_bytes_to_total() {
let tracker = DownloadProgressTracker::new(10);
let mut chunk = tracker.track_chunk("game/file.bin", 0, 0);
let mut chunk = tracker.track_chunk(loopback_addr(12000), "game/file.bin", 0, 0);
chunk.record_bytes(25);
assert_eq!(tracker.raw_downloaded_bytes(), 25);
assert_eq!(tracker.reported_downloaded_bytes(), 10);
}
#[test]
fn tracker_reports_unique_active_peer_count() {
let tracker = DownloadProgressTracker::new(100);
let first_peer = loopback_addr(12000);
let second_peer = loopback_addr(12001);
{
let _first_chunk = tracker.track_chunk(first_peer, "game/file.bin", 0, 50);
let _second_chunk = tracker.track_chunk(first_peer, "game/file.bin", 50, 50);
let _third_chunk = tracker.track_chunk(second_peer, "game/other.bin", 0, 10);
assert_eq!(tracker.snapshot("game", 0).active_peer_count, 2);
}
assert_eq!(tracker.snapshot("game", 0).active_peer_count, 0);
}
}
@@ -62,6 +62,7 @@ async fn open_chunk_stream(
/// Receives one requested chunk from a peer stream.
async fn receive_chunk(
peer_addr: SocketAddr,
mut rx: ReceiveStream,
base_dir: &Path,
chunk: &DownloadChunk,
@@ -71,7 +72,7 @@ async fn receive_chunk(
if let Some(buffer) = version_buffer
&& buffer.matches(&chunk.relative_path)
{
return download_version_ini_chunk(rx, chunk, &buffer, progress_tracker).await;
return download_version_ini_chunk(peer_addr, rx, chunk, &buffer, progress_tracker).await;
}
// Validate the path to prevent directory traversal
@@ -91,7 +92,7 @@ async fn receive_chunk(
let mut remaining = chunk.length;
let mut received_bytes = 0u64;
let mut progress =
progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length);
progress_tracker.track_chunk(peer_addr, &chunk.relative_path, chunk.offset, chunk.length);
while let Some(bytes) = rx.receive().await? {
file.write_all(&bytes).await?;
@@ -135,7 +136,15 @@ async fn receive_chunk_result(
version_buffer: Option<Arc<VersionIniBuffer>>,
progress_tracker: Arc<DownloadProgressTracker>,
) -> ChunkDownloadResult {
let result = receive_chunk(rx, &base_dir, &chunk, version_buffer, progress_tracker).await;
let result = receive_chunk(
peer_addr,
rx,
&base_dir,
&chunk,
version_buffer,
progress_tracker,
)
.await;
ChunkDownloadResult {
chunk,
result,
@@ -144,6 +153,7 @@ async fn receive_chunk_result(
}
async fn download_version_ini_chunk(
peer_addr: SocketAddr,
mut rx: ReceiveStream,
chunk: &DownloadChunk,
buffer: &VersionIniBuffer,
@@ -151,7 +161,7 @@ async fn download_version_ini_chunk(
) -> eyre::Result<()> {
let mut received = Vec::new();
let mut progress =
progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length);
progress_tracker.track_chunk(peer_addr, &chunk.relative_path, chunk.offset, chunk.length);
while let Some(bytes) = rx.receive().await? {
progress.record_bytes(bytes.len());
received.extend_from_slice(&bytes);