codec review

This commit is contained in:
2025-11-12 21:36:13 +01:00
parent d9f8a342e6
commit bdfb461efc
2 changed files with 233 additions and 103 deletions
+107 -24
View File
@@ -4,7 +4,7 @@ mod path_validation;
mod peer;
use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
@@ -332,6 +332,7 @@ struct DownloadChunk {
offset: u64,
length: u64,
retry_count: usize,
last_peer: Option<SocketAddr>,
}
#[derive(Debug, Default)]
@@ -361,14 +362,12 @@ async fn prepare_game_storage(
if let Some(parent) = validated_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file_size = desc.file_size().unwrap_or(0);
let file = OpenOptions::new()
OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&validated_path)
.await?;
file.set_len(file_size).await?;
}
}
Ok(())
@@ -395,6 +394,7 @@ fn build_peer_plans(
offset: 0,
length: 0,
retry_count: 0,
last_peer: Some(peer),
});
continue;
}
@@ -409,6 +409,7 @@ fn build_peer_plans(
offset,
length,
retry_count: 0,
last_peer: Some(peer),
});
offset += length;
}
@@ -601,6 +602,7 @@ async fn download_from_peer(
offset: 0,
length: 0, // Indicates whole file
retry_count: 0,
last_peer: Some(peer_addr),
};
let result = download_whole_file(&mut conn, &base_dir, desc).await;
@@ -658,6 +660,7 @@ async fn download_game_files(
if chunk_result.chunk.retry_count < MAX_RETRY_COUNT {
let mut retry_chunk = chunk_result.chunk;
retry_chunk.retry_count += 1;
retry_chunk.last_peer = Some(chunk_result.peer_addr);
failed_chunks.push(retry_chunk);
} else {
last_err = Some(eyre::eyre!(
@@ -701,43 +704,123 @@ async fn download_game_files(
Ok(())
}
fn select_retry_peer(
peers: &[SocketAddr],
last_peer: Option<SocketAddr>,
attempt_offset: usize,
) -> Option<SocketAddr> {
if peers.is_empty() {
return None;
}
if peers.len() > 1
&& let Some(last) = last_peer
&& let Some(pos) = peers.iter().position(|addr| *addr == last)
{
let next_index = (pos + 1 + attempt_offset) % peers.len();
return Some(peers[next_index]);
}
Some(peers[attempt_offset % peers.len()])
}
fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option<SocketAddr>) -> SocketAddr {
last_peer
.or_else(|| peers.first().copied())
.unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0)))
}
async fn retry_failed_chunks(
failed_chunks: Vec<DownloadChunk>,
peers: &[SocketAddr],
base_dir: &Path,
game_id: &str,
) -> Vec<ChunkDownloadResult> {
let mut results = Vec::new();
let mut exhausted = Vec::new();
let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect();
while let Some(mut chunk) = queue.pop_front() {
if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"Retry budget exhausted for chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(peers, chunk.last_peer),
});
continue;
}
let retry_offset = chunk.retry_count.saturating_sub(1);
let Some(peer_addr) = select_retry_peer(peers, chunk.last_peer, retry_offset) else {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"No peers available to retry chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(peers, chunk.last_peer),
});
continue;
};
let mut attempt_chunk = chunk.clone();
attempt_chunk.last_peer = Some(peer_addr);
// Redistribute failed chunks among available peers
let _retry_plans = build_peer_plans(peers, &[]);
for (i, chunk) in failed_chunks.into_iter().enumerate() {
let peer_addr = peers[i % peers.len()];
let plan = PeerDownloadPlan {
chunks: vec![chunk],
chunks: vec![attempt_chunk.clone()],
whole_files: Vec::new(),
};
match download_from_peer(peer_addr, game_id, plan, base_dir.to_path_buf()).await {
Ok(chunk_results) => results.extend(chunk_results),
Ok(results) => {
for result in results {
match result.result {
Ok(()) => {}
Err(e) => {
let mut retry_chunk = result.chunk.clone();
retry_chunk.retry_count = chunk.retry_count + 1;
retry_chunk.last_peer = Some(result.peer_addr);
if retry_chunk.retry_count >= MAX_RETRY_COUNT {
let context = format!(
"Retry budget exhausted for chunk: {}",
result.chunk.relative_path
);
exhausted.push(ChunkDownloadResult {
chunk: retry_chunk,
result: Err(e.wrap_err(context)),
peer_addr: result.peer_addr,
});
} else {
queue.push_back(retry_chunk);
}
}
}
}
}
Err(e) => {
log::error!("Failed to retry chunk: {e}");
// Add empty failure result
results.push(ChunkDownloadResult {
chunk: DownloadChunk {
relative_path: "unknown".to_string(),
offset: 0,
length: 0,
retry_count: MAX_RETRY_COUNT,
},
result: Err(e),
peer_addr,
});
chunk.retry_count += 1;
chunk.last_peer = Some(peer_addr);
if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(e.wrap_err(format!(
"Retry budget exhausted for chunk after connection failure: {}",
chunk.relative_path
))),
peer_addr: fallback_peer_addr(peers, chunk.last_peer),
});
} else {
queue.push_back(chunk);
}
}
}
}
results
exhausted
}
async fn load_local_game_db(game_dir: &str) -> eyre::Result<GameDB> {