feat(ui): show download progress and speed in the action button

Previously the action button only said "Downloading…" with no indication of
how far along the transfer was or how fast it was going. With multi-gigabyte
game payloads on a LAN this gave the user no signal whether the download had
stalled, was hitting the wire fast, or was about to finish.

Wire a sampled byte-level progress channel from the download pipeline up to
the action button:

- New `DownloadProgressTracker` in `crates/lanspread-peer/src/download/progress.rs`
  holds the total expected bytes plus two atomic counters: `downloaded_bytes`
  (deduplicated per `(relative_path, offset)` chunk key, used for the bar) and
  `transferred_bytes` (raw cumulative, used for the speed sample). The dedup
  prevents a retried chunk from double-counting toward completion while still
  letting speed reflect actual wire activity including retry waste, which is
  the more useful metric for "is the link doing anything right now?".
- `sample_download_progress` wraps the transfer future, emits an initial 0 B/s
  snapshot, then samples on a 500 ms interval (`MissedTickBehavior::Skip` so a
  stalled downloader does not generate a thundering herd of catch-up ticks)
  and emits one final snapshot when the future resolves, so the UI sees the
  closing state before `DownloadGameFilesFinished` arrives.
- New `PeerEvent::DownloadGameFilesProgress(DownloadProgress)` variant carries
  `{ id, downloaded_bytes, total_bytes, bytes_per_second }`. The Tauri shell
  forwards it as `game-download-progress`; the JSONL harness emits it as
  `download-progress`.
- Orchestrator and retry paths refactored to thread a single shared
  `Arc<DownloadProgressTracker>` through both the initial transfer and any
  retry attempts. New `TransferContext`, `RetryContext`, and `ChunkPlanContext`
  structs absorb the parameter-list growth that came with adding the tracker.

Frontend rendering honors the snapshot-is-authoritative decision from commit
`5df82aa` ("fix(ui): derive operation status from snapshots"):

- `Game.download_progress` is an ephemeral overlay carried alongside the card,
  not a status field. `mergeGameUpdate` preserves it only while
  `install_status === Downloading` and otherwise clears it on the next
  snapshot, so the games-list snapshot remains the single authority for when
  the bar should disappear.
- The `game-download-progress` listener writes ONLY `download_progress` — it
  does not touch `install_status`, `status_message`, or `status_level`. This
  preserves the rule that lifecycle events never mutate card status.
- No `game-download-finished` listener; snapshot reconciliation clears the
  overlay automatically when status leaves Downloading.
- `ActionButton` renders a percentage fill behind the icon/label via a
  `--download-progress` CSS custom property; the existing `.act-busy` spinner
  is layered above the fill with `z-index: 1`. `act-downloading` widens the
  button to avoid label jitter as the speed number changes (tabular-nums).
- `actionLabel` for the Downloading status now appends a formatted speed
  ("Downloading… 12.5 MB/s") via the new `formatBytesPerSecond` helper.

Test Plan:
- `just test` — Rust workspace tests including new progress tracker unit tests
  (`tracker_counts_only_new_bytes_for_a_retried_chunk`,
  `tracker_clamps_reported_bytes_to_total`).
- `just frontend-test` — Deno tests including
  `download progress is preserved only while actively downloading` and
  `downloading action label includes current speed`.
- `just clippy` — clean.
- Manual: download a multi-GB game from a peer and watch the action button
  fill, speed update on the half-second, and reset cleanly on completion.

Refs: download progress visibility, snapshot-authoritative UI architecture
This commit is contained in:
2026-05-20 22:11:09 +02:00
parent 0f10108438
commit 01712f248b
14 changed files with 724 additions and 205 deletions
@@ -2,6 +2,7 @@
mod orchestrator;
mod planning;
mod progress;
mod retry;
mod transport;
mod version_ini;
+226 -143
View File
@@ -1,12 +1,24 @@
use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{
collections::HashMap,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
};
use lanspread_db::db::GameFileDescription;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;
use super::{
planning::{DownloadChunk, build_peer_plans, extract_version_descriptor, prepare_game_storage},
retry::retry_failed_chunks,
planning::{
ChunkDownloadResult,
DownloadChunk,
build_peer_plans,
extract_version_descriptor,
prepare_game_storage,
},
progress::{DownloadProgressTracker, sample_download_progress},
retry::{RetryContext, retry_failed_chunks},
transport::download_from_peer,
version_ini::{
VersionIniBuffer,
@@ -71,148 +83,32 @@ pub async fn download_game_files(
id: game_id.to_string(),
})?;
let plans = build_peer_plans(&peers, &transfer_descs, &file_peer_map);
let progress_tracker = DownloadProgressTracker::new(total_download_bytes(&transfer_descs));
let transfer_ctx = TransferContext {
game_id,
games_folder: &games_folder,
peers: &peers,
file_peer_map: &file_peer_map,
tx_notify_ui: &tx_notify_ui,
cancel_token: &cancel_token,
version_buffer: version_buffer.clone(),
progress_tracker: progress_tracker.clone(),
};
let transfer_result = sample_download_progress(
game_id,
progress_tracker,
tx_notify_ui.clone(),
download_transfer_chunks(&transfer_ctx, &transfer_descs),
)
.await;
let mut tasks = Vec::new();
for (peer_addr, plan) in plans {
let base_dir = games_folder.clone();
let game_id = game_id.to_string();
let cancel_token = cancel_token.clone();
let version_buffer = version_buffer.clone();
tasks.push(tokio::spawn(async move {
download_from_peer(
peer_addr,
&game_id,
plan,
base_dir,
&cancel_token,
Some(version_buffer),
)
.await
}));
}
let mut failed_chunks: Vec<DownloadChunk> = Vec::new();
let mut last_err: Option<eyre::Report> = None;
for handle in tasks {
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
match handle.await {
Ok(Ok(results)) => {
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
for chunk_result in results {
match chunk_result.result {
Ok(()) => {
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
id: game_id.to_string(),
peer_addr: chunk_result.peer_addr,
relative_path: chunk_result.chunk.relative_path,
offset: chunk_result.chunk.offset,
length: chunk_result.chunk.length,
});
}
Err(e) => {
log::warn!(
"Failed to download chunk from {}: {e}",
chunk_result.peer_addr
);
if chunk_result.chunk.retry_count < MAX_RETRY_COUNT {
let mut retry_chunk = chunk_result.chunk;
retry_chunk.retry_count += 1;
retry_chunk.last_peer = Some(chunk_result.peer_addr);
failed_chunks.push(retry_chunk);
} else {
last_err = Some(eyre::eyre!(
"Max retries exceeded for chunk: {}",
chunk_result.chunk.relative_path
));
}
}
}
}
}
Ok(Err(_)) | Err(_) if cancel_token.is_cancelled() => {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
Ok(Err(e)) => last_err = Some(e),
Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")),
}
}
// Retry failed chunks if any
if !failed_chunks.is_empty() && !peers.is_empty() {
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
log::info!("Retrying {} failed chunks", failed_chunks.len());
let retry_results = match retry_failed_chunks(
failed_chunks,
&peers,
&games_folder,
game_id,
&file_peer_map,
&cancel_token,
Some(version_buffer.clone()),
)
.await
{
Ok(results) => results,
Err(_) if cancel_token.is_cancelled() => {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
Err(err) => {
last_err = Some(err);
Vec::new()
}
};
for chunk_result in retry_results {
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
match chunk_result.result {
Ok(()) => {
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
id: game_id.to_string(),
peer_addr: chunk_result.peer_addr,
relative_path: chunk_result.chunk.relative_path,
offset: chunk_result.chunk.offset,
length: chunk_result.chunk.length,
});
}
Err(e) => {
log::error!("Retry failed for chunk: {e}");
last_err = Some(e);
}
}
}
}
if cancel_token.is_cancelled() {
if let Err(err) = transfer_result {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
if let Some(err) = last_err {
rollback_version_ini_transaction(&game_root).await;
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
if !cancel_token.is_cancelled() {
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
}
return Err(err);
}
@@ -229,3 +125,190 @@ pub async fn download_game_files(
})?;
Ok(())
}
struct TransferContext<'a> {
game_id: &'a str,
games_folder: &'a Path,
peers: &'a [SocketAddr],
file_peer_map: &'a HashMap<String, Vec<SocketAddr>>,
tx_notify_ui: &'a UnboundedSender<PeerEvent>,
cancel_token: &'a CancellationToken,
version_buffer: Arc<VersionIniBuffer>,
progress_tracker: Arc<DownloadProgressTracker>,
}
async fn download_transfer_chunks(
ctx: &TransferContext<'_>,
transfer_descs: &[GameFileDescription],
) -> eyre::Result<()> {
let plans = build_peer_plans(ctx.peers, transfer_descs, ctx.file_peer_map);
let mut tasks = Vec::new();
for (peer_addr, plan) in plans {
let base_dir = ctx.games_folder.to_path_buf();
let game_id = ctx.game_id.to_string();
let cancel_token = ctx.cancel_token.clone();
let version_buffer = ctx.version_buffer.clone();
let progress_tracker = ctx.progress_tracker.clone();
tasks.push(tokio::spawn(async move {
download_from_peer(
peer_addr,
&game_id,
plan,
base_dir,
&cancel_token,
Some(version_buffer),
progress_tracker,
)
.await
}));
}
let mut failed_chunks: Vec<DownloadChunk> = Vec::new();
let mut last_err: Option<eyre::Report> = None;
for handle in tasks {
if ctx.cancel_token.is_cancelled() {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
match handle.await {
Ok(Ok(results)) => {
if ctx.cancel_token.is_cancelled() {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
collect_chunk_results(
ctx.game_id,
ctx.tx_notify_ui,
results,
&mut failed_chunks,
&mut last_err,
);
}
Ok(Err(_)) | Err(_) if ctx.cancel_token.is_cancelled() => {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
Ok(Err(e)) => last_err = Some(e),
Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")),
}
}
if !failed_chunks.is_empty() && !ctx.peers.is_empty() {
retry_chunks(ctx, failed_chunks, &mut last_err).await?;
}
if ctx.cancel_token.is_cancelled() {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
if let Some(err) = last_err {
return Err(err);
}
Ok(())
}
fn collect_chunk_results(
game_id: &str,
tx_notify_ui: &UnboundedSender<PeerEvent>,
results: Vec<ChunkDownloadResult>,
failed_chunks: &mut Vec<DownloadChunk>,
last_err: &mut Option<eyre::Report>,
) {
for chunk_result in results {
match chunk_result.result {
Ok(()) => {
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
id: game_id.to_string(),
peer_addr: chunk_result.peer_addr,
relative_path: chunk_result.chunk.relative_path,
offset: chunk_result.chunk.offset,
length: chunk_result.chunk.length,
});
}
Err(e) => {
log::warn!(
"Failed to download chunk from {}: {e}",
chunk_result.peer_addr
);
if chunk_result.chunk.retry_count < MAX_RETRY_COUNT {
let mut retry_chunk = chunk_result.chunk;
retry_chunk.retry_count += 1;
retry_chunk.last_peer = Some(chunk_result.peer_addr);
failed_chunks.push(retry_chunk);
} else {
*last_err = Some(eyre::eyre!(
"Max retries exceeded for chunk: {}",
chunk_result.chunk.relative_path
));
}
}
}
}
}
async fn retry_chunks(
ctx: &TransferContext<'_>,
failed_chunks: Vec<DownloadChunk>,
last_err: &mut Option<eyre::Report>,
) -> eyre::Result<()> {
if ctx.cancel_token.is_cancelled() {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
log::info!("Retrying {} failed chunks", failed_chunks.len());
let retry_ctx = RetryContext {
peers: ctx.peers,
base_dir: ctx.games_folder,
game_id: ctx.game_id,
file_peer_map: ctx.file_peer_map,
cancel_token: ctx.cancel_token,
version_buffer: Some(ctx.version_buffer.clone()),
progress_tracker: ctx.progress_tracker.clone(),
};
let retry_results = match retry_failed_chunks(failed_chunks, &retry_ctx).await {
Ok(results) => results,
Err(_) if ctx.cancel_token.is_cancelled() => {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
Err(err) => {
*last_err = Some(err);
Vec::new()
}
};
for chunk_result in retry_results {
if ctx.cancel_token.is_cancelled() {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
match chunk_result.result {
Ok(()) => {
let _ = ctx
.tx_notify_ui
.send(PeerEvent::DownloadGameFileChunkFinished {
id: ctx.game_id.to_string(),
peer_addr: chunk_result.peer_addr,
relative_path: chunk_result.chunk.relative_path,
offset: chunk_result.chunk.offset,
length: chunk_result.chunk.length,
});
}
Err(e) => {
log::error!("Retry failed for chunk: {e}");
*last_err = Some(e);
}
}
}
Ok(())
}
fn total_download_bytes(file_descs: &[GameFileDescription]) -> u64 {
file_descs
.iter()
.filter(|desc| !desc.is_dir)
.fold(0u64, |total, desc| total.saturating_add(desc.file_size()))
}
@@ -0,0 +1,257 @@
use std::{
collections::HashMap,
future::Future,
sync::{
Arc,
Mutex,
atomic::{AtomicU64, Ordering},
},
time::{Duration, Instant},
};
use tokio::{
sync::mpsc::UnboundedSender,
time::{self, MissedTickBehavior},
};
use crate::{DownloadProgress, PeerEvent, events};
const DOWNLOAD_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct ChunkProgressKey {
relative_path: String,
offset: u64,
}
pub(super) struct DownloadProgressTracker {
total_bytes: u64,
downloaded_bytes: AtomicU64,
transferred_bytes: AtomicU64,
chunks: Mutex<HashMap<ChunkProgressKey, u64>>,
}
impl DownloadProgressTracker {
pub(super) fn new(total_bytes: u64) -> Arc<Self> {
Arc::new(Self {
total_bytes,
downloaded_bytes: AtomicU64::new(0),
transferred_bytes: AtomicU64::new(0),
chunks: Mutex::new(HashMap::new()),
})
}
pub(super) fn track_chunk(
self: &Arc<Self>,
relative_path: &str,
offset: u64,
expected_bytes: u64,
) -> ChunkProgress {
ChunkProgress {
tracker: self.clone(),
key: ChunkProgressKey {
relative_path: relative_path.to_string(),
offset,
},
expected_bytes,
received_bytes: 0,
}
}
fn raw_downloaded_bytes(&self) -> u64 {
self.downloaded_bytes.load(Ordering::Relaxed)
}
fn raw_transferred_bytes(&self) -> u64 {
self.transferred_bytes.load(Ordering::Relaxed)
}
fn reported_downloaded_bytes(&self) -> u64 {
let downloaded = self.raw_downloaded_bytes();
if self.total_bytes == 0 {
downloaded
} else {
downloaded.min(self.total_bytes)
}
}
fn record_transferred_bytes(&self, byte_count: u64) {
add_saturating(&self.transferred_bytes, byte_count);
}
fn record_chunk_bytes(&self, key: &ChunkProgressKey, received_bytes: u64) {
let mut chunks = self
.chunks
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let delta = match chunks.get_mut(key) {
Some(previous) if received_bytes > *previous => {
let delta = received_bytes - *previous;
*previous = received_bytes;
delta
}
Some(_) => 0,
None => {
chunks.insert(key.clone(), received_bytes);
received_bytes
}
};
drop(chunks);
if delta > 0 {
add_saturating(&self.downloaded_bytes, delta);
}
}
fn snapshot(&self, id: &str, bytes_per_second: u64) -> DownloadProgress {
DownloadProgress {
id: id.to_string(),
downloaded_bytes: self.reported_downloaded_bytes(),
total_bytes: self.total_bytes,
bytes_per_second,
}
}
}
pub(super) struct ChunkProgress {
tracker: Arc<DownloadProgressTracker>,
key: ChunkProgressKey,
expected_bytes: u64,
received_bytes: u64,
}
impl ChunkProgress {
pub(super) fn record_bytes(&mut self, byte_count: usize) {
let byte_count = u64::try_from(byte_count).unwrap_or(u64::MAX);
self.tracker.record_transferred_bytes(byte_count);
self.received_bytes = self.received_bytes.saturating_add(byte_count);
let reportable_bytes = if self.expected_bytes == 0 {
self.received_bytes
} else {
self.received_bytes.min(self.expected_bytes)
};
self.tracker.record_chunk_bytes(&self.key, reportable_bytes);
}
}
fn add_saturating(counter: &AtomicU64, delta: u64) {
let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_add(delta))
});
}
struct ProgressSampler {
id: String,
tracker: Arc<DownloadProgressTracker>,
tx_notify_ui: UnboundedSender<PeerEvent>,
last_bytes: u64,
last_at: Instant,
}
impl ProgressSampler {
fn new(
id: String,
tracker: Arc<DownloadProgressTracker>,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> Self {
Self {
id,
tracker,
tx_notify_ui,
last_bytes: 0,
last_at: Instant::now(),
}
}
fn emit_initial(&mut self) {
self.last_bytes = self.tracker.raw_transferred_bytes();
self.last_at = Instant::now();
self.emit(0);
}
fn emit_current(&mut self) {
let now = Instant::now();
let bytes = self.tracker.raw_transferred_bytes();
let speed = bytes_per_second(
bytes.saturating_sub(self.last_bytes),
now.duration_since(self.last_at),
);
self.last_bytes = bytes;
self.last_at = now;
self.emit(speed);
}
fn emit(&self, bytes_per_second: u64) {
events::send(
&self.tx_notify_ui,
PeerEvent::DownloadGameFilesProgress(self.tracker.snapshot(&self.id, bytes_per_second)),
);
}
}
fn bytes_per_second(bytes: u64, elapsed: Duration) -> u64 {
let millis = elapsed.as_millis().max(1);
let rate = u128::from(bytes).saturating_mul(1_000) / millis;
u64::try_from(rate).unwrap_or(u64::MAX)
}
pub(super) async fn sample_download_progress<F, T>(
id: &str,
tracker: Arc<DownloadProgressTracker>,
tx_notify_ui: UnboundedSender<PeerEvent>,
future: F,
) -> T
where
F: Future<Output = T>,
{
let mut sampler = ProgressSampler::new(id.to_string(), tracker, tx_notify_ui);
sampler.emit_initial();
let mut interval = time::interval(DOWNLOAD_PROGRESS_UPDATE_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
interval.tick().await;
tokio::pin!(future);
loop {
tokio::select! {
result = &mut future => {
sampler.emit_current();
return result;
}
_ = interval.tick() => sampler.emit_current(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tracker_counts_only_new_bytes_for_a_retried_chunk() {
let tracker = DownloadProgressTracker::new(100);
let mut first_attempt = tracker.track_chunk("game/file.bin", 0, 100);
first_attempt.record_bytes(40);
first_attempt.record_bytes(10);
let mut retry = tracker.track_chunk("game/file.bin", 0, 100);
retry.record_bytes(25);
retry.record_bytes(50);
assert_eq!(tracker.reported_downloaded_bytes(), 75);
assert_eq!(tracker.raw_transferred_bytes(), 125);
}
#[test]
fn tracker_clamps_reported_bytes_to_total() {
let tracker = DownloadProgressTracker::new(10);
let mut chunk = tracker.track_chunk("game/file.bin", 0, 0);
chunk.record_bytes(25);
assert_eq!(tracker.raw_downloaded_bytes(), 25);
assert_eq!(tracker.reported_downloaded_bytes(), 10);
}
}
+25 -26
View File
@@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken;
use super::{
planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan, resolve_file_peers},
progress::DownloadProgressTracker,
transport::download_from_peer,
version_ini::VersionIniBuffer,
};
@@ -52,6 +53,16 @@ struct RetryAttempt {
result: eyre::Result<Vec<ChunkDownloadResult>>,
}
pub(super) struct RetryContext<'a> {
pub(super) peers: &'a [SocketAddr],
pub(super) base_dir: &'a Path,
pub(super) game_id: &'a str,
pub(super) file_peer_map: &'a HashMap<String, Vec<SocketAddr>>,
pub(super) cancel_token: &'a CancellationToken,
pub(super) version_buffer: Option<Arc<VersionIniBuffer>>,
pub(super) progress_tracker: Arc<DownloadProgressTracker>,
}
fn plan_retry_batch(
queue: &mut VecDeque<DownloadChunk>,
peers: &[SocketAddr],
@@ -96,19 +107,17 @@ fn plan_retry_batch(
async fn run_retry_batch(
retry_plans: HashMap<SocketAddr, PeerDownloadPlan>,
base_dir: &Path,
game_id: &str,
cancel_token: &CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
ctx: &RetryContext<'_>,
) -> eyre::Result<Vec<RetryAttempt>> {
let mut attempts = FuturesUnordered::new();
for (peer_addr, plan) in retry_plans {
let retry_chunks = plan.chunks.clone();
let base_dir = base_dir.to_path_buf();
let game_id = game_id.to_string();
let cancel_token = cancel_token.clone();
let version_buffer = version_buffer.clone();
let base_dir = ctx.base_dir.to_path_buf();
let game_id = ctx.game_id.to_string();
let cancel_token = ctx.cancel_token.clone();
let version_buffer = ctx.version_buffer.clone();
let progress_tracker = ctx.progress_tracker.clone();
attempts.push(async move {
let result = download_from_peer(
@@ -118,6 +127,7 @@ async fn run_retry_batch(
base_dir,
&cancel_token,
version_buffer,
progress_tracker,
)
.await;
RetryAttempt {
@@ -131,8 +141,8 @@ async fn run_retry_batch(
let mut results = Vec::new();
while !attempts.is_empty() {
let result = tokio::select! {
() = cancel_token.cancelled() => {
eyre::bail!("download cancelled for game {game_id}");
() = ctx.cancel_token.cancelled() => {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
result = attempts.next() => result.expect("retry attempt should exist"),
};
@@ -208,32 +218,21 @@ fn handle_retry_attempt_error(
/// Retries downloading failed chunks.
pub(super) async fn retry_failed_chunks(
failed_chunks: Vec<DownloadChunk>,
peers: &[SocketAddr],
base_dir: &Path,
game_id: &str,
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
cancel_token: &CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
ctx: &RetryContext<'_>,
) -> eyre::Result<Vec<ChunkDownloadResult>> {
let mut final_results = Vec::new();
let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect();
while !queue.is_empty() {
ensure_not_cancelled(cancel_token, game_id)?;
ensure_not_cancelled(ctx.cancel_token, ctx.game_id)?;
let retry_plans = plan_retry_batch(&mut queue, peers, file_peer_map, &mut final_results);
let retry_plans =
plan_retry_batch(&mut queue, ctx.peers, ctx.file_peer_map, &mut final_results);
if retry_plans.is_empty() {
continue;
}
let attempts = run_retry_batch(
retry_plans,
base_dir,
game_id,
cancel_token,
version_buffer.clone(),
)
.await?;
let attempts = run_retry_batch(retry_plans, ctx).await?;
for attempt in attempts {
let RetryAttempt {
+43 -26
View File
@@ -18,6 +18,7 @@ use tokio_util::{
use super::{
planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan},
progress::DownloadProgressTracker,
version_ini::VersionIniBuffer,
};
use crate::{
@@ -65,11 +66,12 @@ async fn receive_chunk(
base_dir: &Path,
chunk: &DownloadChunk,
version_buffer: Option<Arc<VersionIniBuffer>>,
progress_tracker: Arc<DownloadProgressTracker>,
) -> eyre::Result<()> {
if let Some(buffer) = version_buffer
&& buffer.matches(&chunk.relative_path)
{
return download_version_ini_chunk(rx, chunk, &buffer).await;
return download_version_ini_chunk(rx, chunk, &buffer, progress_tracker).await;
}
// Validate the path to prevent directory traversal
@@ -88,15 +90,19 @@ async fn receive_chunk(
let mut remaining = chunk.length;
let mut received_bytes = 0u64;
let mut progress =
progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length);
while let Some(bytes) = rx.receive().await? {
file.write_all(&bytes).await?;
received_bytes += bytes.len() as u64;
progress.record_bytes(bytes.len());
let byte_count = u64::try_from(bytes.len()).unwrap_or(u64::MAX);
received_bytes = received_bytes.saturating_add(byte_count);
if remaining == 0 {
continue;
}
remaining = remaining.saturating_sub(bytes.len() as u64);
remaining = remaining.saturating_sub(byte_count);
if remaining == 0 {
break;
}
@@ -127,8 +133,9 @@ async fn receive_chunk_result(
chunk: DownloadChunk,
rx: ReceiveStream,
version_buffer: Option<Arc<VersionIniBuffer>>,
progress_tracker: Arc<DownloadProgressTracker>,
) -> ChunkDownloadResult {
let result = receive_chunk(rx, &base_dir, &chunk, version_buffer).await;
let result = receive_chunk(rx, &base_dir, &chunk, version_buffer, progress_tracker).await;
ChunkDownloadResult {
chunk,
result,
@@ -140,9 +147,13 @@ async fn download_version_ini_chunk(
mut rx: ReceiveStream,
chunk: &DownloadChunk,
buffer: &VersionIniBuffer,
progress_tracker: Arc<DownloadProgressTracker>,
) -> eyre::Result<()> {
let mut received = Vec::new();
let mut progress =
progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length);
while let Some(bytes) = rx.receive().await? {
progress.record_bytes(bytes.len());
received.extend_from_slice(&bytes);
}
@@ -207,53 +218,59 @@ fn failed_plan_results(
.collect()
}
struct ChunkPlanContext<'a> {
peer_addr: SocketAddr,
game_id: &'a str,
base_dir: &'a Path,
cancel_token: &'a CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
progress_tracker: Arc<DownloadProgressTracker>,
}
async fn download_chunk_plan(
conn: &mut Connection,
peer_addr: SocketAddr,
game_id: &str,
chunks: Vec<DownloadChunk>,
base_dir: &Path,
cancel_token: &CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
ctx: &ChunkPlanContext<'_>,
) -> eyre::Result<Vec<ChunkDownloadResult>> {
let mut pending: VecDeque<DownloadChunk> = chunks.into();
let mut in_flight = FuturesUnordered::new();
let mut results = Vec::new();
let window = PEER_DOWNLOAD_STREAM_WINDOW.max(1);
let base_dir = base_dir.to_path_buf();
let base_dir = ctx.base_dir.to_path_buf();
while !pending.is_empty() || !in_flight.is_empty() {
while in_flight.len() < window {
let Some(chunk) = pending.pop_front() else {
break;
};
ensure_download_not_cancelled(cancel_token, game_id)?;
ensure_download_not_cancelled(ctx.cancel_token, ctx.game_id)?;
log::info!(
"Downloading chunk {} (offset {}, length {}) from {}",
chunk.relative_path,
chunk.offset,
chunk.length,
peer_addr
ctx.peer_addr
);
match open_chunk_stream(conn, game_id, &chunk).await {
match open_chunk_stream(conn, ctx.game_id, &chunk).await {
Ok(rx) => {
in_flight.push(receive_chunk_result(
peer_addr,
ctx.peer_addr,
base_dir.clone(),
chunk,
rx,
version_buffer.clone(),
ctx.version_buffer.clone(),
ctx.progress_tracker.clone(),
));
}
Err(err) => {
let reason = format!("failed to open chunk stream: {err}");
results.push(failed_chunk_result(chunk, peer_addr, reason.clone()));
results.push(failed_chunk_result(chunk, ctx.peer_addr, reason.clone()));
while let Some(chunk) = pending.pop_front() {
results.push(failed_chunk_result(
chunk,
peer_addr,
ctx.peer_addr,
format!("peer stream unavailable after earlier open failure: {reason}"),
));
}
@@ -267,8 +284,8 @@ async fn download_chunk_plan(
}
let result = tokio::select! {
() = cancel_token.cancelled() => {
eyre::bail!("download cancelled for game {game_id}");
() = ctx.cancel_token.cancelled() => {
eyre::bail!("download cancelled for game {}", ctx.game_id);
}
result = in_flight.next() => result.expect("in-flight chunk stream should exist"),
};
@@ -286,6 +303,7 @@ pub(super) async fn download_from_peer(
games_folder: PathBuf,
cancel_token: &CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
progress_tracker: Arc<DownloadProgressTracker>,
) -> eyre::Result<Vec<ChunkDownloadResult>> {
if plan.chunks.is_empty() {
return Ok(Vec::new());
@@ -303,17 +321,16 @@ pub(super) async fn download_from_peer(
}
let base_dir = games_folder;
let results = download_chunk_plan(
&mut conn,
let chunk_ctx = ChunkPlanContext {
peer_addr,
game_id,
plan.chunks,
&base_dir,
base_dir: &base_dir,
cancel_token,
version_buffer,
)
.await?;
progress_tracker,
};
let results = download_chunk_plan(&mut conn, plan.chunks, &chunk_ctx).await?;
Ok(results)
}