Compare commits
2 Commits
0f10108438
...
51216b7281
| Author | SHA1 | Date | |
|---|---|---|---|
|
51216b7281
|
|||
|
01712f248b
|
+15
-1
@@ -1,6 +1,20 @@
|
|||||||
# Findings
|
# Findings
|
||||||
|
|
||||||
No open pre-merge findings are currently tracked here.
|
## Open
|
||||||
|
|
||||||
|
### `handleErrorEvent` still writes status fields directly
|
||||||
|
|
||||||
|
`crates/lanspread-tauri-deno-ts/src/hooks/useGames.ts:80-89` — the error
|
||||||
|
handler writes `install_status`, `status_message`, `status_level`, and
|
||||||
|
`download_progress` from a lifecycle event, which is the same "two sources of
|
||||||
|
truth" pattern that commit `5df82aa` ("fix(ui): derive operation status from
|
||||||
|
snapshots") removed everywhere else. That commit explicitly carved out error
|
||||||
|
messages as a preserved side effect, so this is a documented exception rather
|
||||||
|
than a regression — but if we want strict snapshot-is-truth, the error handler
|
||||||
|
should stop writing status fields and let the next snapshot reconcile the card,
|
||||||
|
keeping only the error message overlay (which the snapshot does not carry).
|
||||||
|
|
||||||
|
Not blocking. Captured here for a future cleanup pass.
|
||||||
|
|
||||||
## Claude Review Scope Triage
|
## Claude Review Scope Triage
|
||||||
|
|
||||||
|
|||||||
@@ -461,6 +461,15 @@ async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'s
|
|||||||
download_chunk_finished_event(shared, id, peer_addr, relative_path, offset, length)
|
download_chunk_finished_event(shared, id, peer_addr, relative_path, offset, length)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
PeerEvent::DownloadGameFilesProgress(progress) => (
|
||||||
|
"download-progress",
|
||||||
|
json!({
|
||||||
|
"game_id": progress.id,
|
||||||
|
"downloaded_bytes": progress.downloaded_bytes,
|
||||||
|
"total_bytes": progress.total_bytes,
|
||||||
|
"bytes_per_second": progress.bytes_per_second,
|
||||||
|
}),
|
||||||
|
),
|
||||||
PeerEvent::DownloadGameFilesFinished { id } => {
|
PeerEvent::DownloadGameFilesFinished { id } => {
|
||||||
download_terminal_event(shared, "download-finished", id).await
|
download_terminal_event(shared, "download-finished", id).await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
mod orchestrator;
|
mod orchestrator;
|
||||||
mod planning;
|
mod planning;
|
||||||
|
mod progress;
|
||||||
mod retry;
|
mod retry;
|
||||||
mod transport;
|
mod transport;
|
||||||
mod version_ini;
|
mod version_ini;
|
||||||
|
|||||||
@@ -1,12 +1,24 @@
|
|||||||
use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc};
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
net::SocketAddr,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use lanspread_db::db::GameFileDescription;
|
use lanspread_db::db::GameFileDescription;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
planning::{DownloadChunk, build_peer_plans, extract_version_descriptor, prepare_game_storage},
|
planning::{
|
||||||
retry::retry_failed_chunks,
|
ChunkDownloadResult,
|
||||||
|
DownloadChunk,
|
||||||
|
build_peer_plans,
|
||||||
|
extract_version_descriptor,
|
||||||
|
prepare_game_storage,
|
||||||
|
},
|
||||||
|
progress::{DownloadProgressTracker, sample_download_progress},
|
||||||
|
retry::{RetryContext, retry_failed_chunks},
|
||||||
transport::download_from_peer,
|
transport::download_from_peer,
|
||||||
version_ini::{
|
version_ini::{
|
||||||
VersionIniBuffer,
|
VersionIniBuffer,
|
||||||
@@ -71,148 +83,32 @@ pub async fn download_game_files(
|
|||||||
id: game_id.to_string(),
|
id: game_id.to_string(),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let plans = build_peer_plans(&peers, &transfer_descs, &file_peer_map);
|
let progress_tracker = DownloadProgressTracker::new(total_download_bytes(&transfer_descs));
|
||||||
|
let transfer_ctx = TransferContext {
|
||||||
|
game_id,
|
||||||
|
games_folder: &games_folder,
|
||||||
|
peers: &peers,
|
||||||
|
file_peer_map: &file_peer_map,
|
||||||
|
tx_notify_ui: &tx_notify_ui,
|
||||||
|
cancel_token: &cancel_token,
|
||||||
|
version_buffer: version_buffer.clone(),
|
||||||
|
progress_tracker: progress_tracker.clone(),
|
||||||
|
};
|
||||||
|
let transfer_result = sample_download_progress(
|
||||||
|
game_id,
|
||||||
|
progress_tracker,
|
||||||
|
tx_notify_ui.clone(),
|
||||||
|
download_transfer_chunks(&transfer_ctx, &transfer_descs),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
let mut tasks = Vec::new();
|
if let Err(err) = transfer_result {
|
||||||
for (peer_addr, plan) in plans {
|
|
||||||
let base_dir = games_folder.clone();
|
|
||||||
let game_id = game_id.to_string();
|
|
||||||
let cancel_token = cancel_token.clone();
|
|
||||||
let version_buffer = version_buffer.clone();
|
|
||||||
tasks.push(tokio::spawn(async move {
|
|
||||||
download_from_peer(
|
|
||||||
peer_addr,
|
|
||||||
&game_id,
|
|
||||||
plan,
|
|
||||||
base_dir,
|
|
||||||
&cancel_token,
|
|
||||||
Some(version_buffer),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut failed_chunks: Vec<DownloadChunk> = Vec::new();
|
|
||||||
let mut last_err: Option<eyre::Report> = None;
|
|
||||||
|
|
||||||
for handle in tasks {
|
|
||||||
if cancel_token.is_cancelled() {
|
|
||||||
rollback_version_ini_transaction(&game_root).await;
|
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
|
||||||
}
|
|
||||||
|
|
||||||
match handle.await {
|
|
||||||
Ok(Ok(results)) => {
|
|
||||||
if cancel_token.is_cancelled() {
|
|
||||||
rollback_version_ini_transaction(&game_root).await;
|
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
|
||||||
}
|
|
||||||
|
|
||||||
for chunk_result in results {
|
|
||||||
match chunk_result.result {
|
|
||||||
Ok(()) => {
|
|
||||||
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
|
|
||||||
id: game_id.to_string(),
|
|
||||||
peer_addr: chunk_result.peer_addr,
|
|
||||||
relative_path: chunk_result.chunk.relative_path,
|
|
||||||
offset: chunk_result.chunk.offset,
|
|
||||||
length: chunk_result.chunk.length,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!(
|
|
||||||
"Failed to download chunk from {}: {e}",
|
|
||||||
chunk_result.peer_addr
|
|
||||||
);
|
|
||||||
if chunk_result.chunk.retry_count < MAX_RETRY_COUNT {
|
|
||||||
let mut retry_chunk = chunk_result.chunk;
|
|
||||||
retry_chunk.retry_count += 1;
|
|
||||||
retry_chunk.last_peer = Some(chunk_result.peer_addr);
|
|
||||||
failed_chunks.push(retry_chunk);
|
|
||||||
} else {
|
|
||||||
last_err = Some(eyre::eyre!(
|
|
||||||
"Max retries exceeded for chunk: {}",
|
|
||||||
chunk_result.chunk.relative_path
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Err(_)) | Err(_) if cancel_token.is_cancelled() => {
|
|
||||||
rollback_version_ini_transaction(&game_root).await;
|
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
|
||||||
}
|
|
||||||
Ok(Err(e)) => last_err = Some(e),
|
|
||||||
Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retry failed chunks if any
|
|
||||||
if !failed_chunks.is_empty() && !peers.is_empty() {
|
|
||||||
if cancel_token.is_cancelled() {
|
|
||||||
rollback_version_ini_transaction(&game_root).await;
|
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
|
||||||
}
|
|
||||||
|
|
||||||
log::info!("Retrying {} failed chunks", failed_chunks.len());
|
|
||||||
|
|
||||||
let retry_results = match retry_failed_chunks(
|
|
||||||
failed_chunks,
|
|
||||||
&peers,
|
|
||||||
&games_folder,
|
|
||||||
game_id,
|
|
||||||
&file_peer_map,
|
|
||||||
&cancel_token,
|
|
||||||
Some(version_buffer.clone()),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(results) => results,
|
|
||||||
Err(_) if cancel_token.is_cancelled() => {
|
|
||||||
rollback_version_ini_transaction(&game_root).await;
|
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
last_err = Some(err);
|
|
||||||
Vec::new()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
for chunk_result in retry_results {
|
|
||||||
if cancel_token.is_cancelled() {
|
|
||||||
rollback_version_ini_transaction(&game_root).await;
|
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
|
||||||
}
|
|
||||||
|
|
||||||
match chunk_result.result {
|
|
||||||
Ok(()) => {
|
|
||||||
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
|
|
||||||
id: game_id.to_string(),
|
|
||||||
peer_addr: chunk_result.peer_addr,
|
|
||||||
relative_path: chunk_result.chunk.relative_path,
|
|
||||||
offset: chunk_result.chunk.offset,
|
|
||||||
length: chunk_result.chunk.length,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::error!("Retry failed for chunk: {e}");
|
|
||||||
last_err = Some(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if cancel_token.is_cancelled() {
|
|
||||||
rollback_version_ini_transaction(&game_root).await;
|
rollback_version_ini_transaction(&game_root).await;
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
if !cancel_token.is_cancelled() {
|
||||||
}
|
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
|
||||||
|
id: game_id.to_string(),
|
||||||
if let Some(err) = last_err {
|
})?;
|
||||||
rollback_version_ini_transaction(&game_root).await;
|
}
|
||||||
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
|
|
||||||
id: game_id.to_string(),
|
|
||||||
})?;
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -229,3 +125,190 @@ pub async fn download_game_files(
|
|||||||
})?;
|
})?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct TransferContext<'a> {
|
||||||
|
game_id: &'a str,
|
||||||
|
games_folder: &'a Path,
|
||||||
|
peers: &'a [SocketAddr],
|
||||||
|
file_peer_map: &'a HashMap<String, Vec<SocketAddr>>,
|
||||||
|
tx_notify_ui: &'a UnboundedSender<PeerEvent>,
|
||||||
|
cancel_token: &'a CancellationToken,
|
||||||
|
version_buffer: Arc<VersionIniBuffer>,
|
||||||
|
progress_tracker: Arc<DownloadProgressTracker>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn download_transfer_chunks(
|
||||||
|
ctx: &TransferContext<'_>,
|
||||||
|
transfer_descs: &[GameFileDescription],
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
let plans = build_peer_plans(ctx.peers, transfer_descs, ctx.file_peer_map);
|
||||||
|
|
||||||
|
let mut tasks = Vec::new();
|
||||||
|
for (peer_addr, plan) in plans {
|
||||||
|
let base_dir = ctx.games_folder.to_path_buf();
|
||||||
|
let game_id = ctx.game_id.to_string();
|
||||||
|
let cancel_token = ctx.cancel_token.clone();
|
||||||
|
let version_buffer = ctx.version_buffer.clone();
|
||||||
|
let progress_tracker = ctx.progress_tracker.clone();
|
||||||
|
tasks.push(tokio::spawn(async move {
|
||||||
|
download_from_peer(
|
||||||
|
peer_addr,
|
||||||
|
&game_id,
|
||||||
|
plan,
|
||||||
|
base_dir,
|
||||||
|
&cancel_token,
|
||||||
|
Some(version_buffer),
|
||||||
|
progress_tracker,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut failed_chunks: Vec<DownloadChunk> = Vec::new();
|
||||||
|
let mut last_err: Option<eyre::Report> = None;
|
||||||
|
|
||||||
|
for handle in tasks {
|
||||||
|
if ctx.cancel_token.is_cancelled() {
|
||||||
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
match handle.await {
|
||||||
|
Ok(Ok(results)) => {
|
||||||
|
if ctx.cancel_token.is_cancelled() {
|
||||||
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
collect_chunk_results(
|
||||||
|
ctx.game_id,
|
||||||
|
ctx.tx_notify_ui,
|
||||||
|
results,
|
||||||
|
&mut failed_chunks,
|
||||||
|
&mut last_err,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(Err(_)) | Err(_) if ctx.cancel_token.is_cancelled() => {
|
||||||
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => last_err = Some(e),
|
||||||
|
Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !failed_chunks.is_empty() && !ctx.peers.is_empty() {
|
||||||
|
retry_chunks(ctx, failed_chunks, &mut last_err).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx.cancel_token.is_cancelled() {
|
||||||
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(err) = last_err {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect_chunk_results(
|
||||||
|
game_id: &str,
|
||||||
|
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||||
|
results: Vec<ChunkDownloadResult>,
|
||||||
|
failed_chunks: &mut Vec<DownloadChunk>,
|
||||||
|
last_err: &mut Option<eyre::Report>,
|
||||||
|
) {
|
||||||
|
for chunk_result in results {
|
||||||
|
match chunk_result.result {
|
||||||
|
Ok(()) => {
|
||||||
|
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFileChunkFinished {
|
||||||
|
id: game_id.to_string(),
|
||||||
|
peer_addr: chunk_result.peer_addr,
|
||||||
|
relative_path: chunk_result.chunk.relative_path,
|
||||||
|
offset: chunk_result.chunk.offset,
|
||||||
|
length: chunk_result.chunk.length,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!(
|
||||||
|
"Failed to download chunk from {}: {e}",
|
||||||
|
chunk_result.peer_addr
|
||||||
|
);
|
||||||
|
if chunk_result.chunk.retry_count < MAX_RETRY_COUNT {
|
||||||
|
let mut retry_chunk = chunk_result.chunk;
|
||||||
|
retry_chunk.retry_count += 1;
|
||||||
|
retry_chunk.last_peer = Some(chunk_result.peer_addr);
|
||||||
|
failed_chunks.push(retry_chunk);
|
||||||
|
} else {
|
||||||
|
*last_err = Some(eyre::eyre!(
|
||||||
|
"Max retries exceeded for chunk: {}",
|
||||||
|
chunk_result.chunk.relative_path
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn retry_chunks(
|
||||||
|
ctx: &TransferContext<'_>,
|
||||||
|
failed_chunks: Vec<DownloadChunk>,
|
||||||
|
last_err: &mut Option<eyre::Report>,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
if ctx.cancel_token.is_cancelled() {
|
||||||
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("Retrying {} failed chunks", failed_chunks.len());
|
||||||
|
|
||||||
|
let retry_ctx = RetryContext {
|
||||||
|
peers: ctx.peers,
|
||||||
|
base_dir: ctx.games_folder,
|
||||||
|
game_id: ctx.game_id,
|
||||||
|
file_peer_map: ctx.file_peer_map,
|
||||||
|
cancel_token: ctx.cancel_token,
|
||||||
|
version_buffer: Some(ctx.version_buffer.clone()),
|
||||||
|
progress_tracker: ctx.progress_tracker.clone(),
|
||||||
|
};
|
||||||
|
let retry_results = match retry_failed_chunks(failed_chunks, &retry_ctx).await {
|
||||||
|
Ok(results) => results,
|
||||||
|
Err(_) if ctx.cancel_token.is_cancelled() => {
|
||||||
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
*last_err = Some(err);
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for chunk_result in retry_results {
|
||||||
|
if ctx.cancel_token.is_cancelled() {
|
||||||
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
match chunk_result.result {
|
||||||
|
Ok(()) => {
|
||||||
|
let _ = ctx
|
||||||
|
.tx_notify_ui
|
||||||
|
.send(PeerEvent::DownloadGameFileChunkFinished {
|
||||||
|
id: ctx.game_id.to_string(),
|
||||||
|
peer_addr: chunk_result.peer_addr,
|
||||||
|
relative_path: chunk_result.chunk.relative_path,
|
||||||
|
offset: chunk_result.chunk.offset,
|
||||||
|
length: chunk_result.chunk.length,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Retry failed for chunk: {e}");
|
||||||
|
*last_err = Some(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn total_download_bytes(file_descs: &[GameFileDescription]) -> u64 {
|
||||||
|
file_descs
|
||||||
|
.iter()
|
||||||
|
.filter(|desc| !desc.is_dir)
|
||||||
|
.fold(0u64, |total, desc| total.saturating_add(desc.file_size()))
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,257 @@
|
|||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
future::Future,
|
||||||
|
sync::{
|
||||||
|
Arc,
|
||||||
|
Mutex,
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
},
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use tokio::{
|
||||||
|
sync::mpsc::UnboundedSender,
|
||||||
|
time::{self, MissedTickBehavior},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{DownloadProgress, PeerEvent, events};
|
||||||
|
|
||||||
|
const DOWNLOAD_PROGRESS_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||||
|
struct ChunkProgressKey {
|
||||||
|
relative_path: String,
|
||||||
|
offset: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) struct DownloadProgressTracker {
|
||||||
|
total_bytes: u64,
|
||||||
|
downloaded_bytes: AtomicU64,
|
||||||
|
transferred_bytes: AtomicU64,
|
||||||
|
chunks: Mutex<HashMap<ChunkProgressKey, u64>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DownloadProgressTracker {
|
||||||
|
pub(super) fn new(total_bytes: u64) -> Arc<Self> {
|
||||||
|
Arc::new(Self {
|
||||||
|
total_bytes,
|
||||||
|
downloaded_bytes: AtomicU64::new(0),
|
||||||
|
transferred_bytes: AtomicU64::new(0),
|
||||||
|
chunks: Mutex::new(HashMap::new()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn track_chunk(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
relative_path: &str,
|
||||||
|
offset: u64,
|
||||||
|
expected_bytes: u64,
|
||||||
|
) -> ChunkProgress {
|
||||||
|
ChunkProgress {
|
||||||
|
tracker: self.clone(),
|
||||||
|
key: ChunkProgressKey {
|
||||||
|
relative_path: relative_path.to_string(),
|
||||||
|
offset,
|
||||||
|
},
|
||||||
|
expected_bytes,
|
||||||
|
received_bytes: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn raw_downloaded_bytes(&self) -> u64 {
|
||||||
|
self.downloaded_bytes.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn raw_transferred_bytes(&self) -> u64 {
|
||||||
|
self.transferred_bytes.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reported_downloaded_bytes(&self) -> u64 {
|
||||||
|
let downloaded = self.raw_downloaded_bytes();
|
||||||
|
if self.total_bytes == 0 {
|
||||||
|
downloaded
|
||||||
|
} else {
|
||||||
|
downloaded.min(self.total_bytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_transferred_bytes(&self, byte_count: u64) {
|
||||||
|
add_saturating(&self.transferred_bytes, byte_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_chunk_bytes(&self, key: &ChunkProgressKey, received_bytes: u64) {
|
||||||
|
let mut chunks = self
|
||||||
|
.chunks
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||||
|
|
||||||
|
let delta = match chunks.get_mut(key) {
|
||||||
|
Some(previous) if received_bytes > *previous => {
|
||||||
|
let delta = received_bytes - *previous;
|
||||||
|
*previous = received_bytes;
|
||||||
|
delta
|
||||||
|
}
|
||||||
|
Some(_) => 0,
|
||||||
|
None => {
|
||||||
|
chunks.insert(key.clone(), received_bytes);
|
||||||
|
received_bytes
|
||||||
|
}
|
||||||
|
};
|
||||||
|
drop(chunks);
|
||||||
|
|
||||||
|
if delta > 0 {
|
||||||
|
add_saturating(&self.downloaded_bytes, delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self, id: &str, bytes_per_second: u64) -> DownloadProgress {
|
||||||
|
DownloadProgress {
|
||||||
|
id: id.to_string(),
|
||||||
|
downloaded_bytes: self.reported_downloaded_bytes(),
|
||||||
|
total_bytes: self.total_bytes,
|
||||||
|
bytes_per_second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) struct ChunkProgress {
|
||||||
|
tracker: Arc<DownloadProgressTracker>,
|
||||||
|
key: ChunkProgressKey,
|
||||||
|
expected_bytes: u64,
|
||||||
|
received_bytes: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChunkProgress {
|
||||||
|
pub(super) fn record_bytes(&mut self, byte_count: usize) {
|
||||||
|
let byte_count = u64::try_from(byte_count).unwrap_or(u64::MAX);
|
||||||
|
self.tracker.record_transferred_bytes(byte_count);
|
||||||
|
self.received_bytes = self.received_bytes.saturating_add(byte_count);
|
||||||
|
|
||||||
|
let reportable_bytes = if self.expected_bytes == 0 {
|
||||||
|
self.received_bytes
|
||||||
|
} else {
|
||||||
|
self.received_bytes.min(self.expected_bytes)
|
||||||
|
};
|
||||||
|
self.tracker.record_chunk_bytes(&self.key, reportable_bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_saturating(counter: &AtomicU64, delta: u64) {
|
||||||
|
let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
|
||||||
|
Some(current.saturating_add(delta))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ProgressSampler {
|
||||||
|
id: String,
|
||||||
|
tracker: Arc<DownloadProgressTracker>,
|
||||||
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
|
last_bytes: u64,
|
||||||
|
last_at: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProgressSampler {
|
||||||
|
fn new(
|
||||||
|
id: String,
|
||||||
|
tracker: Arc<DownloadProgressTracker>,
|
||||||
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
id,
|
||||||
|
tracker,
|
||||||
|
tx_notify_ui,
|
||||||
|
last_bytes: 0,
|
||||||
|
last_at: Instant::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit_initial(&mut self) {
|
||||||
|
self.last_bytes = self.tracker.raw_transferred_bytes();
|
||||||
|
self.last_at = Instant::now();
|
||||||
|
self.emit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit_current(&mut self) {
|
||||||
|
let now = Instant::now();
|
||||||
|
let bytes = self.tracker.raw_transferred_bytes();
|
||||||
|
let speed = bytes_per_second(
|
||||||
|
bytes.saturating_sub(self.last_bytes),
|
||||||
|
now.duration_since(self.last_at),
|
||||||
|
);
|
||||||
|
|
||||||
|
self.last_bytes = bytes;
|
||||||
|
self.last_at = now;
|
||||||
|
self.emit(speed);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit(&self, bytes_per_second: u64) {
|
||||||
|
events::send(
|
||||||
|
&self.tx_notify_ui,
|
||||||
|
PeerEvent::DownloadGameFilesProgress(self.tracker.snapshot(&self.id, bytes_per_second)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bytes_per_second(bytes: u64, elapsed: Duration) -> u64 {
|
||||||
|
let millis = elapsed.as_millis().max(1);
|
||||||
|
let rate = u128::from(bytes).saturating_mul(1_000) / millis;
|
||||||
|
u64::try_from(rate).unwrap_or(u64::MAX)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn sample_download_progress<F, T>(
|
||||||
|
id: &str,
|
||||||
|
tracker: Arc<DownloadProgressTracker>,
|
||||||
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
|
future: F,
|
||||||
|
) -> T
|
||||||
|
where
|
||||||
|
F: Future<Output = T>,
|
||||||
|
{
|
||||||
|
let mut sampler = ProgressSampler::new(id.to_string(), tracker, tx_notify_ui);
|
||||||
|
sampler.emit_initial();
|
||||||
|
|
||||||
|
let mut interval = time::interval(DOWNLOAD_PROGRESS_UPDATE_INTERVAL);
|
||||||
|
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
tokio::pin!(future);
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
result = &mut future => {
|
||||||
|
sampler.emit_current();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
_ = interval.tick() => sampler.emit_current(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tracker_counts_only_new_bytes_for_a_retried_chunk() {
|
||||||
|
let tracker = DownloadProgressTracker::new(100);
|
||||||
|
let mut first_attempt = tracker.track_chunk("game/file.bin", 0, 100);
|
||||||
|
first_attempt.record_bytes(40);
|
||||||
|
first_attempt.record_bytes(10);
|
||||||
|
|
||||||
|
let mut retry = tracker.track_chunk("game/file.bin", 0, 100);
|
||||||
|
retry.record_bytes(25);
|
||||||
|
retry.record_bytes(50);
|
||||||
|
|
||||||
|
assert_eq!(tracker.reported_downloaded_bytes(), 75);
|
||||||
|
assert_eq!(tracker.raw_transferred_bytes(), 125);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tracker_clamps_reported_bytes_to_total() {
|
||||||
|
let tracker = DownloadProgressTracker::new(10);
|
||||||
|
let mut chunk = tracker.track_chunk("game/file.bin", 0, 0);
|
||||||
|
chunk.record_bytes(25);
|
||||||
|
|
||||||
|
assert_eq!(tracker.raw_downloaded_bytes(), 25);
|
||||||
|
assert_eq!(tracker.reported_downloaded_bytes(), 10);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -10,6 +10,7 @@ use tokio_util::sync::CancellationToken;
|
|||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan, resolve_file_peers},
|
planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan, resolve_file_peers},
|
||||||
|
progress::DownloadProgressTracker,
|
||||||
transport::download_from_peer,
|
transport::download_from_peer,
|
||||||
version_ini::VersionIniBuffer,
|
version_ini::VersionIniBuffer,
|
||||||
};
|
};
|
||||||
@@ -52,6 +53,16 @@ struct RetryAttempt {
|
|||||||
result: eyre::Result<Vec<ChunkDownloadResult>>,
|
result: eyre::Result<Vec<ChunkDownloadResult>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) struct RetryContext<'a> {
|
||||||
|
pub(super) peers: &'a [SocketAddr],
|
||||||
|
pub(super) base_dir: &'a Path,
|
||||||
|
pub(super) game_id: &'a str,
|
||||||
|
pub(super) file_peer_map: &'a HashMap<String, Vec<SocketAddr>>,
|
||||||
|
pub(super) cancel_token: &'a CancellationToken,
|
||||||
|
pub(super) version_buffer: Option<Arc<VersionIniBuffer>>,
|
||||||
|
pub(super) progress_tracker: Arc<DownloadProgressTracker>,
|
||||||
|
}
|
||||||
|
|
||||||
fn plan_retry_batch(
|
fn plan_retry_batch(
|
||||||
queue: &mut VecDeque<DownloadChunk>,
|
queue: &mut VecDeque<DownloadChunk>,
|
||||||
peers: &[SocketAddr],
|
peers: &[SocketAddr],
|
||||||
@@ -96,19 +107,17 @@ fn plan_retry_batch(
|
|||||||
|
|
||||||
async fn run_retry_batch(
|
async fn run_retry_batch(
|
||||||
retry_plans: HashMap<SocketAddr, PeerDownloadPlan>,
|
retry_plans: HashMap<SocketAddr, PeerDownloadPlan>,
|
||||||
base_dir: &Path,
|
ctx: &RetryContext<'_>,
|
||||||
game_id: &str,
|
|
||||||
cancel_token: &CancellationToken,
|
|
||||||
version_buffer: Option<Arc<VersionIniBuffer>>,
|
|
||||||
) -> eyre::Result<Vec<RetryAttempt>> {
|
) -> eyre::Result<Vec<RetryAttempt>> {
|
||||||
let mut attempts = FuturesUnordered::new();
|
let mut attempts = FuturesUnordered::new();
|
||||||
|
|
||||||
for (peer_addr, plan) in retry_plans {
|
for (peer_addr, plan) in retry_plans {
|
||||||
let retry_chunks = plan.chunks.clone();
|
let retry_chunks = plan.chunks.clone();
|
||||||
let base_dir = base_dir.to_path_buf();
|
let base_dir = ctx.base_dir.to_path_buf();
|
||||||
let game_id = game_id.to_string();
|
let game_id = ctx.game_id.to_string();
|
||||||
let cancel_token = cancel_token.clone();
|
let cancel_token = ctx.cancel_token.clone();
|
||||||
let version_buffer = version_buffer.clone();
|
let version_buffer = ctx.version_buffer.clone();
|
||||||
|
let progress_tracker = ctx.progress_tracker.clone();
|
||||||
|
|
||||||
attempts.push(async move {
|
attempts.push(async move {
|
||||||
let result = download_from_peer(
|
let result = download_from_peer(
|
||||||
@@ -118,6 +127,7 @@ async fn run_retry_batch(
|
|||||||
base_dir,
|
base_dir,
|
||||||
&cancel_token,
|
&cancel_token,
|
||||||
version_buffer,
|
version_buffer,
|
||||||
|
progress_tracker,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
RetryAttempt {
|
RetryAttempt {
|
||||||
@@ -131,8 +141,8 @@ async fn run_retry_batch(
|
|||||||
let mut results = Vec::new();
|
let mut results = Vec::new();
|
||||||
while !attempts.is_empty() {
|
while !attempts.is_empty() {
|
||||||
let result = tokio::select! {
|
let result = tokio::select! {
|
||||||
() = cancel_token.cancelled() => {
|
() = ctx.cancel_token.cancelled() => {
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
}
|
}
|
||||||
result = attempts.next() => result.expect("retry attempt should exist"),
|
result = attempts.next() => result.expect("retry attempt should exist"),
|
||||||
};
|
};
|
||||||
@@ -208,32 +218,21 @@ fn handle_retry_attempt_error(
|
|||||||
/// Retries downloading failed chunks.
|
/// Retries downloading failed chunks.
|
||||||
pub(super) async fn retry_failed_chunks(
|
pub(super) async fn retry_failed_chunks(
|
||||||
failed_chunks: Vec<DownloadChunk>,
|
failed_chunks: Vec<DownloadChunk>,
|
||||||
peers: &[SocketAddr],
|
ctx: &RetryContext<'_>,
|
||||||
base_dir: &Path,
|
|
||||||
game_id: &str,
|
|
||||||
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
|
|
||||||
cancel_token: &CancellationToken,
|
|
||||||
version_buffer: Option<Arc<VersionIniBuffer>>,
|
|
||||||
) -> eyre::Result<Vec<ChunkDownloadResult>> {
|
) -> eyre::Result<Vec<ChunkDownloadResult>> {
|
||||||
let mut final_results = Vec::new();
|
let mut final_results = Vec::new();
|
||||||
let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect();
|
let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect();
|
||||||
|
|
||||||
while !queue.is_empty() {
|
while !queue.is_empty() {
|
||||||
ensure_not_cancelled(cancel_token, game_id)?;
|
ensure_not_cancelled(ctx.cancel_token, ctx.game_id)?;
|
||||||
|
|
||||||
let retry_plans = plan_retry_batch(&mut queue, peers, file_peer_map, &mut final_results);
|
let retry_plans =
|
||||||
|
plan_retry_batch(&mut queue, ctx.peers, ctx.file_peer_map, &mut final_results);
|
||||||
if retry_plans.is_empty() {
|
if retry_plans.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let attempts = run_retry_batch(
|
let attempts = run_retry_batch(retry_plans, ctx).await?;
|
||||||
retry_plans,
|
|
||||||
base_dir,
|
|
||||||
game_id,
|
|
||||||
cancel_token,
|
|
||||||
version_buffer.clone(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
for attempt in attempts {
|
for attempt in attempts {
|
||||||
let RetryAttempt {
|
let RetryAttempt {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use tokio_util::{
|
|||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan},
|
planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan},
|
||||||
|
progress::DownloadProgressTracker,
|
||||||
version_ini::VersionIniBuffer,
|
version_ini::VersionIniBuffer,
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -65,11 +66,12 @@ async fn receive_chunk(
|
|||||||
base_dir: &Path,
|
base_dir: &Path,
|
||||||
chunk: &DownloadChunk,
|
chunk: &DownloadChunk,
|
||||||
version_buffer: Option<Arc<VersionIniBuffer>>,
|
version_buffer: Option<Arc<VersionIniBuffer>>,
|
||||||
|
progress_tracker: Arc<DownloadProgressTracker>,
|
||||||
) -> eyre::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
if let Some(buffer) = version_buffer
|
if let Some(buffer) = version_buffer
|
||||||
&& buffer.matches(&chunk.relative_path)
|
&& buffer.matches(&chunk.relative_path)
|
||||||
{
|
{
|
||||||
return download_version_ini_chunk(rx, chunk, &buffer).await;
|
return download_version_ini_chunk(rx, chunk, &buffer, progress_tracker).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate the path to prevent directory traversal
|
// Validate the path to prevent directory traversal
|
||||||
@@ -88,15 +90,19 @@ async fn receive_chunk(
|
|||||||
|
|
||||||
let mut remaining = chunk.length;
|
let mut remaining = chunk.length;
|
||||||
let mut received_bytes = 0u64;
|
let mut received_bytes = 0u64;
|
||||||
|
let mut progress =
|
||||||
|
progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length);
|
||||||
|
|
||||||
while let Some(bytes) = rx.receive().await? {
|
while let Some(bytes) = rx.receive().await? {
|
||||||
file.write_all(&bytes).await?;
|
file.write_all(&bytes).await?;
|
||||||
received_bytes += bytes.len() as u64;
|
progress.record_bytes(bytes.len());
|
||||||
|
let byte_count = u64::try_from(bytes.len()).unwrap_or(u64::MAX);
|
||||||
|
received_bytes = received_bytes.saturating_add(byte_count);
|
||||||
|
|
||||||
if remaining == 0 {
|
if remaining == 0 {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
remaining = remaining.saturating_sub(bytes.len() as u64);
|
remaining = remaining.saturating_sub(byte_count);
|
||||||
if remaining == 0 {
|
if remaining == 0 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -127,8 +133,9 @@ async fn receive_chunk_result(
|
|||||||
chunk: DownloadChunk,
|
chunk: DownloadChunk,
|
||||||
rx: ReceiveStream,
|
rx: ReceiveStream,
|
||||||
version_buffer: Option<Arc<VersionIniBuffer>>,
|
version_buffer: Option<Arc<VersionIniBuffer>>,
|
||||||
|
progress_tracker: Arc<DownloadProgressTracker>,
|
||||||
) -> ChunkDownloadResult {
|
) -> ChunkDownloadResult {
|
||||||
let result = receive_chunk(rx, &base_dir, &chunk, version_buffer).await;
|
let result = receive_chunk(rx, &base_dir, &chunk, version_buffer, progress_tracker).await;
|
||||||
ChunkDownloadResult {
|
ChunkDownloadResult {
|
||||||
chunk,
|
chunk,
|
||||||
result,
|
result,
|
||||||
@@ -140,9 +147,13 @@ async fn download_version_ini_chunk(
|
|||||||
mut rx: ReceiveStream,
|
mut rx: ReceiveStream,
|
||||||
chunk: &DownloadChunk,
|
chunk: &DownloadChunk,
|
||||||
buffer: &VersionIniBuffer,
|
buffer: &VersionIniBuffer,
|
||||||
|
progress_tracker: Arc<DownloadProgressTracker>,
|
||||||
) -> eyre::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
let mut received = Vec::new();
|
let mut received = Vec::new();
|
||||||
|
let mut progress =
|
||||||
|
progress_tracker.track_chunk(&chunk.relative_path, chunk.offset, chunk.length);
|
||||||
while let Some(bytes) = rx.receive().await? {
|
while let Some(bytes) = rx.receive().await? {
|
||||||
|
progress.record_bytes(bytes.len());
|
||||||
received.extend_from_slice(&bytes);
|
received.extend_from_slice(&bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,53 +218,59 @@ fn failed_plan_results(
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ChunkPlanContext<'a> {
|
||||||
|
peer_addr: SocketAddr,
|
||||||
|
game_id: &'a str,
|
||||||
|
base_dir: &'a Path,
|
||||||
|
cancel_token: &'a CancellationToken,
|
||||||
|
version_buffer: Option<Arc<VersionIniBuffer>>,
|
||||||
|
progress_tracker: Arc<DownloadProgressTracker>,
|
||||||
|
}
|
||||||
|
|
||||||
async fn download_chunk_plan(
|
async fn download_chunk_plan(
|
||||||
conn: &mut Connection,
|
conn: &mut Connection,
|
||||||
peer_addr: SocketAddr,
|
|
||||||
game_id: &str,
|
|
||||||
chunks: Vec<DownloadChunk>,
|
chunks: Vec<DownloadChunk>,
|
||||||
base_dir: &Path,
|
ctx: &ChunkPlanContext<'_>,
|
||||||
cancel_token: &CancellationToken,
|
|
||||||
version_buffer: Option<Arc<VersionIniBuffer>>,
|
|
||||||
) -> eyre::Result<Vec<ChunkDownloadResult>> {
|
) -> eyre::Result<Vec<ChunkDownloadResult>> {
|
||||||
let mut pending: VecDeque<DownloadChunk> = chunks.into();
|
let mut pending: VecDeque<DownloadChunk> = chunks.into();
|
||||||
let mut in_flight = FuturesUnordered::new();
|
let mut in_flight = FuturesUnordered::new();
|
||||||
let mut results = Vec::new();
|
let mut results = Vec::new();
|
||||||
let window = PEER_DOWNLOAD_STREAM_WINDOW.max(1);
|
let window = PEER_DOWNLOAD_STREAM_WINDOW.max(1);
|
||||||
let base_dir = base_dir.to_path_buf();
|
let base_dir = ctx.base_dir.to_path_buf();
|
||||||
|
|
||||||
while !pending.is_empty() || !in_flight.is_empty() {
|
while !pending.is_empty() || !in_flight.is_empty() {
|
||||||
while in_flight.len() < window {
|
while in_flight.len() < window {
|
||||||
let Some(chunk) = pending.pop_front() else {
|
let Some(chunk) = pending.pop_front() else {
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
ensure_download_not_cancelled(cancel_token, game_id)?;
|
ensure_download_not_cancelled(ctx.cancel_token, ctx.game_id)?;
|
||||||
|
|
||||||
log::info!(
|
log::info!(
|
||||||
"Downloading chunk {} (offset {}, length {}) from {}",
|
"Downloading chunk {} (offset {}, length {}) from {}",
|
||||||
chunk.relative_path,
|
chunk.relative_path,
|
||||||
chunk.offset,
|
chunk.offset,
|
||||||
chunk.length,
|
chunk.length,
|
||||||
peer_addr
|
ctx.peer_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
match open_chunk_stream(conn, game_id, &chunk).await {
|
match open_chunk_stream(conn, ctx.game_id, &chunk).await {
|
||||||
Ok(rx) => {
|
Ok(rx) => {
|
||||||
in_flight.push(receive_chunk_result(
|
in_flight.push(receive_chunk_result(
|
||||||
peer_addr,
|
ctx.peer_addr,
|
||||||
base_dir.clone(),
|
base_dir.clone(),
|
||||||
chunk,
|
chunk,
|
||||||
rx,
|
rx,
|
||||||
version_buffer.clone(),
|
ctx.version_buffer.clone(),
|
||||||
|
ctx.progress_tracker.clone(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let reason = format!("failed to open chunk stream: {err}");
|
let reason = format!("failed to open chunk stream: {err}");
|
||||||
results.push(failed_chunk_result(chunk, peer_addr, reason.clone()));
|
results.push(failed_chunk_result(chunk, ctx.peer_addr, reason.clone()));
|
||||||
while let Some(chunk) = pending.pop_front() {
|
while let Some(chunk) = pending.pop_front() {
|
||||||
results.push(failed_chunk_result(
|
results.push(failed_chunk_result(
|
||||||
chunk,
|
chunk,
|
||||||
peer_addr,
|
ctx.peer_addr,
|
||||||
format!("peer stream unavailable after earlier open failure: {reason}"),
|
format!("peer stream unavailable after earlier open failure: {reason}"),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
@@ -267,8 +284,8 @@ async fn download_chunk_plan(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let result = tokio::select! {
|
let result = tokio::select! {
|
||||||
() = cancel_token.cancelled() => {
|
() = ctx.cancel_token.cancelled() => {
|
||||||
eyre::bail!("download cancelled for game {game_id}");
|
eyre::bail!("download cancelled for game {}", ctx.game_id);
|
||||||
}
|
}
|
||||||
result = in_flight.next() => result.expect("in-flight chunk stream should exist"),
|
result = in_flight.next() => result.expect("in-flight chunk stream should exist"),
|
||||||
};
|
};
|
||||||
@@ -286,6 +303,7 @@ pub(super) async fn download_from_peer(
|
|||||||
games_folder: PathBuf,
|
games_folder: PathBuf,
|
||||||
cancel_token: &CancellationToken,
|
cancel_token: &CancellationToken,
|
||||||
version_buffer: Option<Arc<VersionIniBuffer>>,
|
version_buffer: Option<Arc<VersionIniBuffer>>,
|
||||||
|
progress_tracker: Arc<DownloadProgressTracker>,
|
||||||
) -> eyre::Result<Vec<ChunkDownloadResult>> {
|
) -> eyre::Result<Vec<ChunkDownloadResult>> {
|
||||||
if plan.chunks.is_empty() {
|
if plan.chunks.is_empty() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
@@ -303,17 +321,16 @@ pub(super) async fn download_from_peer(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let base_dir = games_folder;
|
let base_dir = games_folder;
|
||||||
|
let chunk_ctx = ChunkPlanContext {
|
||||||
let results = download_chunk_plan(
|
|
||||||
&mut conn,
|
|
||||||
peer_addr,
|
peer_addr,
|
||||||
game_id,
|
game_id,
|
||||||
plan.chunks,
|
base_dir: &base_dir,
|
||||||
&base_dir,
|
|
||||||
cancel_token,
|
cancel_token,
|
||||||
version_buffer,
|
version_buffer,
|
||||||
)
|
progress_tracker,
|
||||||
.await?;
|
};
|
||||||
|
|
||||||
|
let results = download_chunk_plan(&mut conn, plan.chunks, &chunk_ctx).await?;
|
||||||
|
|
||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -100,6 +100,8 @@ pub enum PeerEvent {
|
|||||||
offset: u64,
|
offset: u64,
|
||||||
length: u64,
|
length: u64,
|
||||||
},
|
},
|
||||||
|
/// Download progress sampled while game files are being received.
|
||||||
|
DownloadGameFilesProgress(DownloadProgress),
|
||||||
/// Download has completed successfully.
|
/// Download has completed successfully.
|
||||||
DownloadGameFilesFinished { id: String },
|
DownloadGameFilesFinished { id: String },
|
||||||
/// Download has failed.
|
/// Download has failed.
|
||||||
@@ -152,6 +154,15 @@ pub enum PeerEvent {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sampled byte progress for one active game download.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
|
||||||
|
pub struct DownloadProgress {
|
||||||
|
pub id: String,
|
||||||
|
pub downloaded_bytes: u64,
|
||||||
|
pub total_bytes: u64,
|
||||||
|
pub bytes_per_second: u64,
|
||||||
|
}
|
||||||
|
|
||||||
/// Long-running peer runtime components reported in failure events.
|
/// Long-running peer runtime components reported in failure events.
|
||||||
#[derive(Clone, Copy, Debug, strum::IntoStaticStr)]
|
#[derive(Clone, Copy, Debug, strum::IntoStaticStr)]
|
||||||
pub enum PeerRuntimeComponent {
|
pub enum PeerRuntimeComponent {
|
||||||
|
|||||||
@@ -1038,6 +1038,11 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
|
|||||||
{relative_path} offset {offset} length {length} from {peer_addr}"
|
{relative_path} offset {offset} length {length} from {peer_addr}"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
PeerEvent::DownloadGameFilesProgress(progress) => {
|
||||||
|
if let Err(e) = app_handle.emit("game-download-progress", Some(progress)) {
|
||||||
|
log::error!("Failed to emit game-download-progress event: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
PeerEvent::DownloadGameFilesFinished { id } => {
|
PeerEvent::DownloadGameFilesFinished { id } => {
|
||||||
handle_download_finished(app_handle, id);
|
handle_download_finished(app_handle, id);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { JSX, MouseEvent } from 'react';
|
import { CSSProperties, JSX, MouseEvent } from 'react';
|
||||||
|
|
||||||
import { Icon } from './Icon';
|
import { Icon } from './Icon';
|
||||||
import { Game } from '../lib/types';
|
import { Game, InstallStatus } from '../lib/types';
|
||||||
import { actionLabel, primaryActionFor, PrimaryAction } from '../lib/gameState';
|
import { actionLabel, primaryActionFor, PrimaryAction } from '../lib/gameState';
|
||||||
|
|
||||||
interface Props {
|
interface Props {
|
||||||
@@ -18,16 +18,29 @@ const ICON_FOR_ACTION: Partial<Record<PrimaryAction, JSX.Element>> = {
|
|||||||
download: <Icon.download />,
|
download: <Icon.download />,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const downloadProgressPercent = (game: Game): number | undefined => {
|
||||||
|
const progress = game.download_progress;
|
||||||
|
if (!progress || progress.total_bytes <= 0) return undefined;
|
||||||
|
|
||||||
|
return Math.max(0, Math.min(100, (progress.downloaded_bytes / progress.total_bytes) * 100));
|
||||||
|
};
|
||||||
|
|
||||||
/** Color-coded primary action: Play / Install / Update / Download / busy. */
|
/** Color-coded primary action: Play / Install / Update / Download / busy. */
|
||||||
export const ActionButton = ({ game, size = 'md', full = false, onClick }: Props) => {
|
export const ActionButton = ({ game, size = 'md', full = false, onClick }: Props) => {
|
||||||
const action = primaryActionFor(game);
|
const action = primaryActionFor(game);
|
||||||
|
const isDownloading = game.install_status === InstallStatus.Downloading;
|
||||||
|
const progressPercent = downloadProgressPercent(game);
|
||||||
const cls = [
|
const cls = [
|
||||||
'act-btn',
|
'act-btn',
|
||||||
`act-${action}`,
|
`act-${action}`,
|
||||||
|
isDownloading ? 'act-downloading' : '',
|
||||||
size === 'lg' ? 'act-lg' : '',
|
size === 'lg' ? 'act-lg' : '',
|
||||||
full ? 'act-full' : '',
|
full ? 'act-full' : '',
|
||||||
].filter(Boolean).join(' ');
|
].filter(Boolean).join(' ');
|
||||||
const disabled = action === 'busy' || action === 'disabled';
|
const disabled = action === 'busy' || action === 'disabled';
|
||||||
|
const style = progressPercent === undefined
|
||||||
|
? undefined
|
||||||
|
: ({ '--download-progress': `${progressPercent}%` } as CSSProperties);
|
||||||
|
|
||||||
const handle = (e: MouseEvent<HTMLButtonElement>) => {
|
const handle = (e: MouseEvent<HTMLButtonElement>) => {
|
||||||
e.stopPropagation();
|
e.stopPropagation();
|
||||||
@@ -36,9 +49,10 @@ export const ActionButton = ({ game, size = 'md', full = false, onClick }: Props
|
|||||||
};
|
};
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<button className={cls} onClick={handle} disabled={disabled}>
|
<button className={cls} onClick={handle} disabled={disabled} style={style}>
|
||||||
|
{isDownloading && <span className="act-progress-fill" aria-hidden />}
|
||||||
{ICON_FOR_ACTION[action]}
|
{ICON_FOR_ACTION[action]}
|
||||||
<span>{actionLabel(game)}</span>
|
<span className="act-label">{actionLabel(game)}</span>
|
||||||
</button>
|
</button>
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { invoke } from '@tauri-apps/api/core';
|
|||||||
import { listen, UnlistenFn } from '@tauri-apps/api/event';
|
import { listen, UnlistenFn } from '@tauri-apps/api/event';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
|
DownloadProgressPayload,
|
||||||
Game,
|
Game,
|
||||||
GamesListPayload,
|
GamesListPayload,
|
||||||
InstallStatus,
|
InstallStatus,
|
||||||
@@ -51,7 +52,10 @@ export const useGames = (rescanGameDir: () => void): UseGamesResult => {
|
|||||||
const markChecking = useCallback((id: string) => {
|
const markChecking = useCallback((id: string) => {
|
||||||
setGames(prev => prev.map(item =>
|
setGames(prev => prev.map(item =>
|
||||||
item.id === id && !isInProgress(item.install_status)
|
item.id === id && !isInProgress(item.install_status)
|
||||||
? applyPatch(item, { install_status: InstallStatus.CheckingPeers, clearStatus: true })
|
? applyPatch(item, {
|
||||||
|
install_status: InstallStatus.CheckingPeers,
|
||||||
|
clearStatus: true,
|
||||||
|
})
|
||||||
: item
|
: item
|
||||||
));
|
));
|
||||||
}, []);
|
}, []);
|
||||||
@@ -81,6 +85,7 @@ export const useGames = (rescanGameDir: () => void): UseGamesResult => {
|
|||||||
: InstallStatus.NotInstalled,
|
: InstallStatus.NotInstalled,
|
||||||
status_message: message,
|
status_message: message,
|
||||||
status_level: 'error',
|
status_level: 'error',
|
||||||
|
download_progress: undefined,
|
||||||
}
|
}
|
||||||
: item));
|
: item));
|
||||||
if (triggerRescan) rescanRef.current();
|
if (triggerRescan) rescanRef.current();
|
||||||
@@ -115,6 +120,16 @@ export const useGames = (rescanGameDir: () => void): UseGamesResult => {
|
|||||||
});
|
});
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
unlisteners.push(await listen('game-download-progress', (e) => {
|
||||||
|
const { id, ...download_progress } = e.payload as DownloadProgressPayload;
|
||||||
|
setGames(prev => prev.map(item => item.id === id
|
||||||
|
? {
|
||||||
|
...item,
|
||||||
|
download_progress,
|
||||||
|
}
|
||||||
|
: item));
|
||||||
|
}));
|
||||||
|
|
||||||
unlisteners.push(await listen('game-no-peers', (e) => {
|
unlisteners.push(await listen('game-no-peers', (e) => {
|
||||||
handleErrorEvent(e.payload as string, 'No peers currently have this game.');
|
handleErrorEvent(e.payload as string, 'No peers currently have this game.');
|
||||||
}));
|
}));
|
||||||
|
|||||||
@@ -66,6 +66,9 @@ export const mergeGameUpdate = (
|
|||||||
install_status: installStatus,
|
install_status: installStatus,
|
||||||
status_message: clearStatus ? undefined : previous?.status_message,
|
status_message: clearStatus ? undefined : previous?.status_message,
|
||||||
status_level: clearStatus ? undefined : previous?.status_level,
|
status_level: clearStatus ? undefined : previous?.status_level,
|
||||||
|
download_progress: installStatus === InstallStatus.Downloading
|
||||||
|
? previous?.download_progress
|
||||||
|
: undefined,
|
||||||
peer_count: incoming.peer_count ?? 0,
|
peer_count: incoming.peer_count ?? 0,
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
@@ -108,12 +111,29 @@ export const primaryActionFor = (game: Game): PrimaryAction => {
|
|||||||
return 'play';
|
return 'play';
|
||||||
};
|
};
|
||||||
|
|
||||||
export const inProgressLabel = (status: InstallStatus): string | undefined => {
|
export const formatBytesPerSecond = (bytesPerSecond: number): string => {
|
||||||
switch (status) {
|
const units = ['B/s', 'KB/s', 'MB/s', 'GB/s'];
|
||||||
|
let value = Math.max(0, bytesPerSecond);
|
||||||
|
let unitIndex = 0;
|
||||||
|
|
||||||
|
while (value >= 1000 && unitIndex < units.length - 1) {
|
||||||
|
value /= 1000;
|
||||||
|
unitIndex += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (unitIndex === 0) return `${Math.round(value)} ${units[unitIndex]}`;
|
||||||
|
const precision = value >= 100 ? 0 : value >= 10 ? 1 : 2;
|
||||||
|
return `${value.toFixed(precision)} ${units[unitIndex]}`;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const inProgressLabel = (game: Game): string | undefined => {
|
||||||
|
switch (game.install_status) {
|
||||||
case InstallStatus.CheckingPeers:
|
case InstallStatus.CheckingPeers:
|
||||||
return 'Checking peers…';
|
return 'Checking peers…';
|
||||||
case InstallStatus.Downloading:
|
case InstallStatus.Downloading:
|
||||||
return 'Downloading…';
|
return game.download_progress
|
||||||
|
? `Downloading… ${formatBytesPerSecond(game.download_progress.bytes_per_second)}`
|
||||||
|
: 'Downloading…';
|
||||||
case InstallStatus.Installing:
|
case InstallStatus.Installing:
|
||||||
return 'Installing…';
|
return 'Installing…';
|
||||||
case InstallStatus.Uninstalling:
|
case InstallStatus.Uninstalling:
|
||||||
@@ -126,7 +146,7 @@ export const inProgressLabel = (status: InstallStatus): string | undefined => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
export const actionLabel = (game: Game): string => {
|
export const actionLabel = (game: Game): string => {
|
||||||
const busy = inProgressLabel(game.install_status);
|
const busy = inProgressLabel(game);
|
||||||
if (busy) return busy;
|
if (busy) return busy;
|
||||||
if (isUnavailable(game)) return 'Unavailable';
|
if (isUnavailable(game)) return 'Unavailable';
|
||||||
if (!game.installed) return game.downloaded ? 'Install' : 'Download';
|
if (!game.installed) return game.downloaded ? 'Install' : 'Download';
|
||||||
|
|||||||
@@ -23,6 +23,16 @@ export enum ActiveOperationKind {
|
|||||||
|
|
||||||
export type StatusLevel = 'info' | 'error';
|
export type StatusLevel = 'info' | 'error';
|
||||||
|
|
||||||
|
export interface DownloadProgress {
|
||||||
|
downloaded_bytes: number;
|
||||||
|
total_bytes: number;
|
||||||
|
bytes_per_second: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface DownloadProgressPayload extends DownloadProgress {
|
||||||
|
id: string;
|
||||||
|
}
|
||||||
|
|
||||||
export interface Game {
|
export interface Game {
|
||||||
id: string;
|
id: string;
|
||||||
name: string;
|
name: string;
|
||||||
@@ -45,6 +55,7 @@ export interface Game {
|
|||||||
genre?: string;
|
genre?: string;
|
||||||
status_message?: string;
|
status_message?: string;
|
||||||
status_level?: StatusLevel;
|
status_level?: StatusLevel;
|
||||||
|
download_progress?: DownloadProgress;
|
||||||
peer_count: number;
|
peer_count: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -737,14 +737,21 @@
|
|||||||
font: inherit;
|
font: inherit;
|
||||||
font-weight: 600;
|
font-weight: 600;
|
||||||
font-size: 12.5px;
|
font-size: 12.5px;
|
||||||
letter-spacing: 0.005em;
|
letter-spacing: 0;
|
||||||
cursor: pointer;
|
cursor: pointer;
|
||||||
|
position: relative;
|
||||||
|
overflow: hidden;
|
||||||
transition:
|
transition:
|
||||||
transform 0.12s,
|
transform 0.12s,
|
||||||
filter 0.12s,
|
filter 0.12s,
|
||||||
background 0.15s;
|
background 0.15s;
|
||||||
white-space: nowrap;
|
white-space: nowrap;
|
||||||
}
|
}
|
||||||
|
.act-btn > svg,
|
||||||
|
.act-btn > .act-label {
|
||||||
|
position: relative;
|
||||||
|
z-index: 1;
|
||||||
|
}
|
||||||
.act-btn:hover:not(:disabled) {
|
.act-btn:hover:not(:disabled) {
|
||||||
filter: brightness(1.12);
|
filter: brightness(1.12);
|
||||||
}
|
}
|
||||||
@@ -800,6 +807,20 @@
|
|||||||
background: rgba(255, 255, 255, 0.06);
|
background: rgba(255, 255, 255, 0.06);
|
||||||
border: 1px solid var(--bd-1);
|
border: 1px solid var(--bd-1);
|
||||||
}
|
}
|
||||||
|
.act-downloading {
|
||||||
|
min-width: 148px;
|
||||||
|
font-variant-numeric: tabular-nums;
|
||||||
|
}
|
||||||
|
.act-lg.act-downloading {
|
||||||
|
min-width: 174px;
|
||||||
|
}
|
||||||
|
.act-progress-fill {
|
||||||
|
position: absolute;
|
||||||
|
inset: 0 auto 0 0;
|
||||||
|
width: var(--download-progress, 0%);
|
||||||
|
background: color-mix(in srgb, var(--accent) 28%, transparent);
|
||||||
|
transition: width 0.45s linear;
|
||||||
|
}
|
||||||
.act-busy::before {
|
.act-busy::before {
|
||||||
content: "";
|
content: "";
|
||||||
display: inline-block;
|
display: inline-block;
|
||||||
@@ -809,6 +830,9 @@
|
|||||||
border: 1.6px solid color-mix(in srgb, var(--accent) 60%, transparent);
|
border: 1.6px solid color-mix(in srgb, var(--accent) 60%, transparent);
|
||||||
border-top-color: var(--accent);
|
border-top-color: var(--accent);
|
||||||
animation: spin 0.9s linear infinite;
|
animation: spin 0.9s linear infinite;
|
||||||
|
position: relative;
|
||||||
|
z-index: 1;
|
||||||
|
flex: 0 0 auto;
|
||||||
}
|
}
|
||||||
@keyframes spin {
|
@keyframes spin {
|
||||||
to {
|
to {
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
import {
|
import {
|
||||||
|
actionLabel,
|
||||||
activeStatusById,
|
activeStatusById,
|
||||||
|
formatBytesPerSecond,
|
||||||
mergeGameUpdate,
|
mergeGameUpdate,
|
||||||
} from '../src/lib/gameState.ts';
|
} from '../src/lib/gameState.ts';
|
||||||
import {
|
import {
|
||||||
@@ -81,3 +83,54 @@ Deno.test('active operation snapshot is the source of busy status', () => {
|
|||||||
'update operation should render Installing',
|
'update operation should render Installing',
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Deno.test('download progress is preserved only while actively downloading', () => {
|
||||||
|
const downloading = game({
|
||||||
|
install_status: InstallStatus.Downloading,
|
||||||
|
download_progress: {
|
||||||
|
downloaded_bytes: 50,
|
||||||
|
total_bytes: 100,
|
||||||
|
bytes_per_second: 12_500_000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const stillDownloading = mergeGameUpdate(
|
||||||
|
game(),
|
||||||
|
downloading,
|
||||||
|
InstallStatus.Downloading,
|
||||||
|
);
|
||||||
|
const settled = mergeGameUpdate(game({ downloaded: true }), stillDownloading);
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
stillDownloading.download_progress?.downloaded_bytes,
|
||||||
|
50,
|
||||||
|
'active download snapshot should keep progress',
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
settled.download_progress,
|
||||||
|
undefined,
|
||||||
|
'settled snapshot should clear progress',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
Deno.test('downloading action label includes current speed', () => {
|
||||||
|
const downloading = game({
|
||||||
|
install_status: InstallStatus.Downloading,
|
||||||
|
download_progress: {
|
||||||
|
downloaded_bytes: 50,
|
||||||
|
total_bytes: 100,
|
||||||
|
bytes_per_second: 12_500_000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
formatBytesPerSecond(12_500_000),
|
||||||
|
'12.5 MB/s',
|
||||||
|
'speed formatter should use compact decimal units',
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
actionLabel(downloading),
|
||||||
|
'Downloading… 12.5 MB/s',
|
||||||
|
'download label should include speed',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user