refactor(peer): split download pipeline into modules

The download pipeline had grown into one large file that mixed sentinel
transaction handling, peer planning, transport, retry, and top-level
orchestration. Split it into a download/ module tree with one file per
concern so future lifecycle changes can be reviewed at the right boundary.

The public crate surface remains download::download_game_files. Helper types
and functions are kept pub(super) or private so the refactor does not widen
the API or encourage new callers to depend on internals. The version.ini
transaction helpers stay local to version_ini.rs; the proposed fs_util
extraction is intentionally left for the later atomic-index work, where a
second caller exists.

There is no intended runtime behavior change.

Test Plan:
- just fmt
- just test
- just clippy
- just build

Refs: none
This commit is contained in:
2026-05-16 12:16:08 +02:00
parent 504ee1bc02
commit a251233653
7 changed files with 1220 additions and 1191 deletions
@@ -0,0 +1,9 @@
//! Download pipeline for game files from peers.
mod orchestrator;
mod planning;
mod retry;
mod transport;
mod version_ini;
pub use orchestrator::download_game_files;
@@ -0,0 +1,209 @@
use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc};
use lanspread_db::db::GameFileDescription;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;
use super::{
planning::{DownloadChunk, build_peer_plans, extract_version_descriptor, prepare_game_storage},
retry::retry_failed_chunks,
transport::download_from_peer,
version_ini::{
VersionIniBuffer,
begin_version_ini_transaction,
commit_version_ini_buffer,
rollback_version_ini_transaction,
},
};
use crate::{PeerEvent, config::MAX_RETRY_COUNT};
/// Downloads all game files from available peers.
#[allow(clippy::too_many_lines)]
pub async fn download_game_files(
game_id: &str,
game_file_descs: Vec<GameFileDescription>,
games_folder: PathBuf,
peers: Vec<SocketAddr>,
file_peer_map: HashMap<String, Vec<SocketAddr>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
cancel_token: CancellationToken,
) -> eyre::Result<()> {
if peers.is_empty() {
eyre::bail!("no peers available for game {game_id}");
}
if cancel_token.is_cancelled() {
eyre::bail!("download cancelled for game {game_id}");
}
let (version_desc, transfer_descs) =
extract_version_descriptor(game_id, game_file_descs, &tx_notify_ui)?;
let version_buffer = match VersionIniBuffer::new(&version_desc) {
Ok(buffer) => Arc::new(buffer),
Err(err) => {
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
return Err(err);
}
};
let game_root = games_folder.join(game_id);
if let Err(err) = begin_version_ini_transaction(&game_root).await {
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
return Err(err);
}
if let Err(err) = prepare_game_storage(&games_folder, &transfer_descs).await {
rollback_version_ini_transaction(&game_root).await;
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
return Err(err);
}
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin {
id: game_id.to_string(),
})?;
let plans = build_peer_plans(&peers, &transfer_descs, &file_peer_map);
let mut tasks = Vec::new();
for (peer_addr, plan) in plans {
let base_dir = games_folder.clone();
let game_id = game_id.to_string();
let cancel_token = cancel_token.clone();
let version_buffer = version_buffer.clone();
tasks.push(tokio::spawn(async move {
download_from_peer(
peer_addr,
&game_id,
plan,
base_dir,
&cancel_token,
Some(version_buffer),
)
.await
}));
}
let mut failed_chunks: Vec<DownloadChunk> = Vec::new();
let mut last_err: Option<eyre::Report> = None;
for handle in tasks {
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
match handle.await {
Ok(Ok(results)) => {
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
for chunk_result in results {
if let Err(e) = chunk_result.result {
log::warn!(
"Failed to download chunk from {}: {e}",
chunk_result.peer_addr
);
if chunk_result.chunk.retry_count < MAX_RETRY_COUNT {
let mut retry_chunk = chunk_result.chunk;
retry_chunk.retry_count += 1;
retry_chunk.last_peer = Some(chunk_result.peer_addr);
failed_chunks.push(retry_chunk);
} else {
last_err = Some(eyre::eyre!(
"Max retries exceeded for chunk: {}",
chunk_result.chunk.relative_path
));
}
}
}
}
Ok(Err(_)) | Err(_) if cancel_token.is_cancelled() => {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
Ok(Err(e)) => last_err = Some(e),
Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")),
}
}
// Retry failed chunks if any
if !failed_chunks.is_empty() && !peers.is_empty() {
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
log::info!("Retrying {} failed chunks", failed_chunks.len());
let retry_results = match retry_failed_chunks(
failed_chunks,
&peers,
&games_folder,
game_id,
&file_peer_map,
&cancel_token,
Some(version_buffer.clone()),
)
.await
{
Ok(results) => results,
Err(_) if cancel_token.is_cancelled() => {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
Err(err) => {
last_err = Some(err);
Vec::new()
}
};
for chunk_result in retry_results {
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
if let Err(e) = chunk_result.result {
log::error!("Retry failed for chunk: {e}");
last_err = Some(e);
}
}
}
if cancel_token.is_cancelled() {
rollback_version_ini_transaction(&game_root).await;
eyre::bail!("download cancelled for game {game_id}");
}
if let Some(err) = last_err {
rollback_version_ini_transaction(&game_root).await;
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
return Err(err);
}
if let Err(err) = commit_version_ini_buffer(&game_root, &version_buffer).await {
rollback_version_ini_transaction(&game_root).await;
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
return Err(err);
}
log::info!("all files downloaded for game: {game_id}");
tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished {
id: game_id.to_string(),
})?;
Ok(())
}
@@ -0,0 +1,350 @@
use std::{collections::HashMap, net::SocketAddr, path::Path};
use lanspread_db::db::GameFileDescription;
use tokio::{fs::OpenOptions, sync::mpsc::UnboundedSender};
use crate::{PeerEvent, config::CHUNK_SIZE, path_validation::validate_game_file_path};
/// Represents a chunk of a file to be downloaded.
#[derive(Debug, Clone)]
pub(super) struct DownloadChunk {
pub(super) relative_path: String,
pub(super) offset: u64,
pub(super) length: u64,
pub(super) retry_count: usize,
pub(super) last_peer: Option<SocketAddr>,
}
/// Download plan for a single peer.
#[derive(Debug, Default)]
pub(super) struct PeerDownloadPlan {
pub(super) chunks: Vec<DownloadChunk>,
pub(super) whole_files: Vec<GameFileDescription>,
}
/// Result of downloading a chunk.
#[derive(Debug)]
pub(super) struct ChunkDownloadResult {
pub(super) chunk: DownloadChunk,
pub(super) result: eyre::Result<()>,
pub(super) peer_addr: SocketAddr,
}
/// Extracts the root `version.ini` descriptor while keeping every descriptor in
/// the transfer list. The chunk writer diverts the sentinel bytes into memory.
pub(super) fn extract_version_descriptor(
game_id: &str,
game_file_descs: Vec<GameFileDescription>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) -> eyre::Result<(GameFileDescription, Vec<GameFileDescription>)> {
let mut version_descs = Vec::new();
let mut transfer_descs = Vec::new();
for desc in game_file_descs {
if desc.is_version_ini() {
version_descs.push(desc.clone());
}
transfer_descs.push(desc);
}
if version_descs.len() != 1 {
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
});
eyre::bail!(
"expected exactly one root-level version.ini sentinel for {game_id}, found {}",
version_descs.len()
);
}
let version_desc = version_descs.remove(0);
Ok((version_desc, transfer_descs))
}
/// Prepares storage for game files by creating directories and pre-allocating files.
pub(super) async fn prepare_game_storage(
games_folder: &Path,
file_descs: &[GameFileDescription],
) -> eyre::Result<()> {
for desc in file_descs {
if desc.is_version_ini() {
continue;
}
// Validate the path to prevent directory traversal
let validated_path = validate_game_file_path(games_folder, &desc.relative_path)?;
if desc.is_dir {
tokio::fs::create_dir_all(&validated_path).await?;
} else {
if let Some(parent) = validated_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
// Create and pre-allocate the file with the expected size
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&validated_path)
.await?;
// Pre-allocate the file with the expected size
let size = desc.size;
if let Err(e) = file.set_len(size).await {
log::warn!(
"Failed to pre-allocate file {} (size: {}): {}",
desc.relative_path,
size,
e
);
// Continue without pre-allocation - the file will grow as chunks are written
} else {
log::debug!(
"Pre-allocated file {} with {} bytes",
desc.relative_path,
size
);
}
}
}
Ok(())
}
/// Resolves which peers have a specific file.
pub(super) fn resolve_file_peers<'a>(
relative_path: &str,
file_peer_map: &'a HashMap<String, Vec<SocketAddr>>,
fallback: &'a [SocketAddr],
) -> &'a [SocketAddr] {
if let Some(peers) = file_peer_map.get(relative_path)
&& !peers.is_empty()
{
return peers;
}
fallback
}
/// Builds download plans distributing files across peers.
pub(super) fn build_peer_plans(
peers: &[SocketAddr],
file_descs: &[GameFileDescription],
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
) -> HashMap<SocketAddr, PeerDownloadPlan> {
let mut plans: HashMap<SocketAddr, PeerDownloadPlan> = HashMap::new();
if peers.is_empty() {
return plans;
}
let mut peer_index = 0usize;
for desc in file_descs.iter().filter(|d| !d.is_dir) {
let size = desc.file_size();
let eligible_peers = resolve_file_peers(&desc.relative_path, file_peer_map, peers);
if eligible_peers.is_empty() {
continue;
}
if size == 0 {
let peer = eligible_peers[peer_index % eligible_peers.len()];
peer_index += 1;
plans.entry(peer).or_default().chunks.push(DownloadChunk {
relative_path: desc.relative_path.clone(),
offset: 0,
length: 0,
retry_count: 0,
last_peer: Some(peer),
});
continue;
}
let mut offset = 0u64;
while offset < size {
let length = std::cmp::min(CHUNK_SIZE, size - offset);
let peer = eligible_peers[peer_index % eligible_peers.len()];
peer_index += 1;
plans.entry(peer).or_default().chunks.push(DownloadChunk {
relative_path: desc.relative_path.clone(),
offset,
length,
retry_count: 0,
last_peer: Some(peer),
});
offset += length;
}
}
plans
}
#[cfg(test)]
mod tests {
use lanspread_db::db::GameFileDescription;
use super::*;
use crate::test_support::TempDir;
fn loopback_addr(port: u16) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], port))
}
#[test]
fn build_peer_plans_handles_partial_final_chunk() {
let peers = vec![loopback_addr(12000), loopback_addr(12001)];
let file_size = CHUNK_SIZE * 2 + CHUNK_SIZE / 4;
let mut file_peer_map = HashMap::new();
file_peer_map.insert("game/file.dat".to_string(), peers.clone());
let file_descs = vec![GameFileDescription {
game_id: "test".to_string(),
relative_path: "game/file.dat".to_string(),
is_dir: false,
size: file_size,
}];
let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let mut chunks: Vec<_> = plans.values().flat_map(|plan| plan.chunks.iter()).collect();
assert_eq!(chunks.len(), 3, "expected three chunks for 2.25 blocks");
chunks.sort_by_key(|chunk| chunk.offset);
let last_chunk = chunks.last().expect("last chunk exists");
assert_eq!(last_chunk.offset, CHUNK_SIZE * 2);
assert_eq!(last_chunk.length, file_size - last_chunk.offset);
assert_eq!(last_chunk.length, CHUNK_SIZE / 4);
assert_eq!(
last_chunk.offset + last_chunk.length,
file_size,
"last chunk should finish the file"
);
}
#[test]
fn build_peer_plans_respects_file_peer_map() {
let shared_a = loopback_addr(12010);
let shared_b = loopback_addr(12011);
let exclusive = loopback_addr(12012);
let peers = vec![shared_a, shared_b, exclusive];
let mut file_peer_map = HashMap::new();
file_peer_map.insert("shared.bin".to_string(), vec![shared_a, shared_b]);
file_peer_map.insert("exclusive.bin".to_string(), vec![exclusive]);
let file_descs = vec![
GameFileDescription {
game_id: "test".to_string(),
relative_path: "shared.bin".to_string(),
is_dir: false,
size: CHUNK_SIZE * 2,
},
GameFileDescription {
game_id: "test".to_string(),
relative_path: "exclusive.bin".to_string(),
is_dir: false,
size: CHUNK_SIZE,
},
];
let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let exclusive_plan = plans
.get(&exclusive)
.expect("exclusive peer should have a plan");
assert!(
exclusive_plan
.chunks
.iter()
.all(|chunk| chunk.relative_path == "exclusive.bin"),
"exclusive peer should only receive exclusive.bin chunks"
);
for (peer, plan) in plans {
for chunk in plan.chunks {
match chunk.relative_path.as_str() {
"exclusive.bin" => assert_eq!(
peer, exclusive,
"exclusive.bin chunks should only be assigned to the exclusive peer"
),
"shared.bin" => assert!(
peer == shared_a || peer == shared_b,
"shared.bin chunks must stay within shared peers"
),
other => panic!("unexpected file in plan: {other}"),
}
}
}
}
#[tokio::test]
async fn prepare_game_storage_skips_version_ini_sentinel() {
let temp = TempDir::new("lanspread-download");
let descs = vec![GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
}];
prepare_game_storage(temp.path(), &descs)
.await
.expect("storage preparation should succeed");
assert!(!temp.path().join("game").join("version.ini").exists());
}
#[test]
fn version_descriptor_extraction_keeps_nested_decoy_in_transfer_list() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let nested_decoy = vec![
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
},
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/local/version.ini".to_string(),
is_dir: false,
size: 8,
},
];
let (version, transfer) =
extract_version_descriptor("game", nested_decoy, &tx).expect("only one root sentinel");
assert_eq!(version.relative_path, "game/version.ini");
assert_eq!(transfer.len(), 2);
}
#[test]
fn version_descriptor_extraction_requires_a_root_version_ini() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let missing = vec![GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/archive.eti".to_string(),
is_dir: false,
size: 1,
}];
assert!(extract_version_descriptor("game", missing, &tx).is_err());
}
#[test]
fn version_descriptor_extraction_rejects_duplicate_root_version_ini() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let multiple = vec![
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
},
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
},
];
assert!(extract_version_descriptor("game", multiple, &tx).is_err());
}
}
+164
View File
@@ -0,0 +1,164 @@
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
path::Path,
sync::Arc,
};
use tokio_util::sync::CancellationToken;
use super::{
planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan, resolve_file_peers},
transport::download_from_peer,
version_ini::VersionIniBuffer,
};
use crate::config::MAX_RETRY_COUNT;
/// Selects a peer for retrying a failed chunk.
fn select_retry_peer(
peers: &[SocketAddr],
last_peer: Option<SocketAddr>,
attempt_offset: usize,
) -> Option<SocketAddr> {
if peers.is_empty() {
return None;
}
if peers.len() > 1
&& let Some(last) = last_peer
&& let Some(pos) = peers.iter().position(|addr| *addr == last)
{
let next_index = (pos + 1 + attempt_offset) % peers.len();
return Some(peers[next_index]);
}
Some(peers[attempt_offset % peers.len()])
}
/// Returns a fallback peer address for error reporting.
fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option<SocketAddr>) -> SocketAddr {
last_peer
.or_else(|| peers.first().copied())
.unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0)))
}
/// Retries downloading failed chunks.
pub(super) async fn retry_failed_chunks(
failed_chunks: Vec<DownloadChunk>,
peers: &[SocketAddr],
base_dir: &Path,
game_id: &str,
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
cancel_token: &CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
) -> eyre::Result<Vec<ChunkDownloadResult>> {
let mut exhausted = Vec::new();
let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect();
while let Some(mut chunk) = queue.pop_front() {
if cancel_token.is_cancelled() {
return Ok(exhausted);
}
let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers);
if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"Retry budget exhausted for chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
continue;
}
let retry_offset = chunk.retry_count.saturating_sub(1);
let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer, retry_offset)
else {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"No peers available to retry chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
continue;
};
let mut attempt_chunk = chunk.clone();
attempt_chunk.last_peer = Some(peer_addr);
let plan = PeerDownloadPlan {
chunks: vec![attempt_chunk.clone()],
whole_files: Vec::new(),
};
match download_from_peer(
peer_addr,
game_id,
plan,
base_dir.to_path_buf(),
cancel_token,
version_buffer.clone(),
)
.await
{
Ok(results) => {
if cancel_token.is_cancelled() {
return Ok(exhausted);
}
for result in results {
match result.result {
Ok(()) => {}
Err(e) => {
let mut retry_chunk = result.chunk.clone();
retry_chunk.retry_count = chunk.retry_count + 1;
retry_chunk.last_peer = Some(result.peer_addr);
if retry_chunk.retry_count >= MAX_RETRY_COUNT {
let context = format!(
"Retry budget exhausted for chunk: {}",
result.chunk.relative_path
);
exhausted.push(ChunkDownloadResult {
chunk: retry_chunk,
result: Err(e.wrap_err(context)),
peer_addr: result.peer_addr,
});
} else {
queue.push_back(retry_chunk);
}
}
}
}
}
Err(e) => {
if cancel_token.is_cancelled() {
return Ok(exhausted);
}
chunk.retry_count += 1;
chunk.last_peer = Some(peer_addr);
if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(e.wrap_err(format!(
"Retry budget exhausted for chunk after connection failure: {}",
chunk.relative_path
))),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
} else {
queue.push_back(chunk);
}
}
}
}
Ok(exhausted)
}
@@ -0,0 +1,259 @@
use std::{
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
};
use lanspread_db::db::GameFileDescription;
use tokio::{
fs::OpenOptions,
io::{AsyncSeekExt, AsyncWriteExt},
};
use tokio_util::{
codec::{FramedWrite, LengthDelimitedCodec},
sync::CancellationToken,
};
use super::{
planning::{ChunkDownloadResult, DownloadChunk, PeerDownloadPlan},
version_ini::VersionIniBuffer,
};
use crate::{network::connect_to_peer, path_validation::validate_game_file_path};
fn ensure_download_not_cancelled(
cancel_token: &CancellationToken,
game_id: &str,
) -> eyre::Result<()> {
if cancel_token.is_cancelled() {
eyre::bail!("download cancelled for game {game_id}");
}
Ok(())
}
/// Downloads a single chunk from a peer.
async fn download_chunk(
conn: &mut s2n_quic::Connection,
base_dir: &Path,
game_id: &str,
chunk: &DownloadChunk,
version_buffer: Option<Arc<VersionIniBuffer>>,
) -> eyre::Result<()> {
use futures::SinkExt;
use lanspread_proto::{Message, Request};
let stream = conn.open_bidirectional_stream().await?;
let (mut rx, tx) = stream.split();
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
let request = Request::GetGameFileChunk {
game_id: game_id.to_string(),
relative_path: chunk.relative_path.clone(),
offset: chunk.offset,
length: chunk.length,
};
framed_tx.send(request.encode()).await?;
framed_tx.close().await?;
if let Some(buffer) = version_buffer
&& buffer.matches(&chunk.relative_path)
{
return download_version_ini_chunk(&mut rx, chunk, &buffer).await;
}
// Validate the path to prevent directory traversal
let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?;
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&validated_path)
.await?;
if chunk.length == 0 && chunk.offset == 0 {
// fallback-to-whole-file path replaces any existing partial data
file.set_len(0).await?;
}
file.seek(std::io::SeekFrom::Start(chunk.offset)).await?;
let mut remaining = chunk.length;
let mut received_bytes = 0u64;
while let Some(bytes) = rx.receive().await? {
file.write_all(&bytes).await?;
received_bytes += bytes.len() as u64;
if remaining == 0 {
continue;
}
remaining = remaining.saturating_sub(bytes.len() as u64);
if remaining == 0 {
break;
}
}
// Verify we received the expected amount of data
if chunk.length > 0 && received_bytes != chunk.length {
eyre::bail!(
"Incomplete chunk download: expected {} bytes, received {} bytes for file {} at offset {}",
chunk.length,
received_bytes,
chunk.relative_path,
chunk.offset
);
}
file.flush().await?;
// Verify file integrity by checking the file size
verify_chunk_integrity(&validated_path, chunk.offset, chunk.length).await?;
Ok(())
}
async fn download_version_ini_chunk(
rx: &mut s2n_quic::stream::ReceiveStream,
chunk: &DownloadChunk,
buffer: &VersionIniBuffer,
) -> eyre::Result<()> {
let mut received = Vec::new();
while let Some(bytes) = rx.receive().await? {
received.extend_from_slice(&bytes);
}
if chunk.length > 0 && u64::try_from(received.len())? != chunk.length {
eyre::bail!(
"Incomplete version.ini chunk download: expected {} bytes, received {} bytes at offset {}",
chunk.length,
received.len(),
chunk.offset
);
}
buffer.write_at(chunk.offset, &received).await
}
/// Verifies that a chunk was written correctly.
async fn verify_chunk_integrity(
file_path: &Path,
offset: u64,
expected_length: u64,
) -> eyre::Result<()> {
if expected_length == 0 {
return Ok(()); // Skip verification for whole files or zero-length chunks
}
let metadata = tokio::fs::metadata(file_path).await?;
let file_size = metadata.len();
if file_size < offset + expected_length {
eyre::bail!(
"File integrity check failed: file size {} is less than expected {} (offset: {})",
file_size,
offset + expected_length,
offset
);
}
Ok(())
}
/// Downloads a whole file from a peer.
async fn download_whole_file(
conn: &mut s2n_quic::Connection,
base_dir: &Path,
desc: &GameFileDescription,
) -> eyre::Result<()> {
use futures::SinkExt;
use lanspread_proto::{Message, Request};
let stream = conn.open_bidirectional_stream().await?;
let (mut rx, tx) = stream.split();
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
framed_tx
.send(Request::GetGameFileData(desc.clone()).encode())
.await?;
framed_tx.close().await?;
// Validate the path to prevent directory traversal
let validated_path = validate_game_file_path(base_dir, &desc.relative_path)?;
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&validated_path)
.await?;
file.seek(std::io::SeekFrom::Start(0)).await?;
while let Some(bytes) = rx.receive().await? {
file.write_all(&bytes).await?;
}
file.flush().await?;
Ok(())
}
/// Downloads all assigned chunks and files from a single peer.
pub(super) async fn download_from_peer(
peer_addr: SocketAddr,
game_id: &str,
plan: PeerDownloadPlan,
games_folder: PathBuf,
cancel_token: &CancellationToken,
version_buffer: Option<Arc<VersionIniBuffer>>,
) -> eyre::Result<Vec<ChunkDownloadResult>> {
if plan.chunks.is_empty() && plan.whole_files.is_empty() {
return Ok(Vec::new());
}
ensure_download_not_cancelled(cancel_token, game_id)?;
let mut conn = connect_to_peer(peer_addr).await?;
conn.keep_alive(true)?;
conn.keep_alive(true)?;
let base_dir = games_folder;
let mut results = Vec::new();
// Download chunks with error handling
for chunk in &plan.chunks {
ensure_download_not_cancelled(cancel_token, game_id)?;
log::info!(
"Downloading chunk {} (offset {}, length {}) from {}",
chunk.relative_path,
chunk.offset,
chunk.length,
peer_addr
);
let result =
download_chunk(&mut conn, &base_dir, game_id, chunk, version_buffer.clone()).await;
results.push(ChunkDownloadResult {
chunk: chunk.clone(),
result,
peer_addr,
});
}
// Download whole files
for desc in &plan.whole_files {
ensure_download_not_cancelled(cancel_token, game_id)?;
let chunk = DownloadChunk {
relative_path: desc.relative_path.clone(),
offset: 0,
length: 0, // Indicates whole file
retry_count: 0,
last_peer: Some(peer_addr),
};
let result = download_whole_file(&mut conn, &base_dir, desc).await;
results.push(ChunkDownloadResult {
chunk,
result,
peer_addr,
});
}
Ok(results)
}
@@ -0,0 +1,229 @@
use std::path::Path;
use lanspread_db::db::GameFileDescription;
use tokio::{io::AsyncWriteExt, sync::Mutex};
#[derive(Debug)]
pub(super) struct VersionIniBuffer {
relative_path: String,
bytes: Mutex<Vec<u8>>,
}
impl VersionIniBuffer {
pub(super) fn new(desc: &GameFileDescription) -> eyre::Result<Self> {
if desc.is_dir {
eyre::bail!("version.ini sentinel cannot be a directory");
}
let size = usize::try_from(desc.size)?;
Ok(Self {
relative_path: desc.relative_path.clone(),
bytes: Mutex::new(vec![0; size]),
})
}
pub(super) fn matches(&self, relative_path: &str) -> bool {
self.relative_path == relative_path
}
pub(super) async fn write_at(&self, offset: u64, bytes: &[u8]) -> eyre::Result<()> {
let offset = usize::try_from(offset)?;
let mut buffer = self.bytes.lock().await;
let end = offset
.checked_add(bytes.len())
.ok_or_else(|| eyre::eyre!("version.ini chunk offset overflow"))?;
if end > buffer.len() {
eyre::bail!(
"version.ini chunk exceeds buffer: end {end}, buffer {}",
buffer.len()
);
}
buffer[offset..end].copy_from_slice(bytes);
Ok(())
}
async fn snapshot(&self) -> Vec<u8> {
self.bytes.lock().await.clone()
}
}
pub(super) async fn begin_version_ini_transaction(game_root: &Path) -> eyre::Result<()> {
tokio::fs::create_dir_all(game_root).await?;
remove_file_if_exists(&game_root.join(".version.ini.tmp")).await?;
remove_file_if_exists(&game_root.join(".version.ini.discarded")).await?;
let version_path = game_root.join("version.ini");
if tokio::fs::metadata(&version_path)
.await
.is_ok_and(|metadata| metadata.is_file())
{
tokio::fs::rename(version_path, game_root.join(".version.ini.discarded")).await?;
}
Ok(())
}
pub(super) async fn rollback_version_ini_transaction(game_root: &Path) {
if let Err(err) = remove_file_if_exists(&game_root.join(".version.ini.tmp")).await {
log::warn!(
"Failed to sweep partial version.ini tmp in {}: {err}",
game_root.display()
);
}
if let Err(err) = remove_file_if_exists(&game_root.join(".version.ini.discarded")).await {
log::warn!(
"Failed to sweep discarded version.ini in {}: {err}",
game_root.display()
);
}
}
pub(super) async fn commit_version_ini_buffer(
game_root: &Path,
buffer: &VersionIniBuffer,
) -> eyre::Result<()> {
let tmp_path = game_root.join(".version.ini.tmp");
let version_path = game_root.join("version.ini");
let bytes = buffer.snapshot().await;
let mut file = tokio::fs::File::create(&tmp_path).await?;
file.write_all(&bytes).await?;
file.sync_all().await?;
drop(file);
tokio::fs::rename(&tmp_path, &version_path).await?;
sync_parent_dir(&version_path)?;
remove_file_if_exists(&game_root.join(".version.ini.discarded")).await?;
Ok(())
}
#[cfg(unix)]
fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
if let Some(parent) = path.parent() {
std::fs::File::open(parent)?.sync_all()?;
}
Ok(())
}
#[cfg(not(unix))]
fn sync_parent_dir(_path: &Path) -> std::io::Result<()> {
Ok(())
}
async fn remove_file_if_exists(path: &Path) -> eyre::Result<()> {
match tokio::fs::remove_file(path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err.into()),
}
}
#[cfg(test)]
mod tests {
use lanspread_db::db::GameFileDescription;
use super::*;
use crate::test_support::TempDir;
#[tokio::test]
async fn version_ini_buffer_accepts_out_of_order_chunks() {
let desc = GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
};
let buffer = VersionIniBuffer::new(&desc).expect("buffer should be created");
buffer
.write_at(4, b"0101")
.await
.expect("second chunk should write");
buffer
.write_at(0, b"2025")
.await
.expect("first chunk should write");
assert_eq!(buffer.snapshot().await, b"20250101");
}
#[tokio::test]
async fn commit_version_ini_writes_sentinel_last_and_sweeps_discarded() {
let temp = TempDir::new("lanspread-download");
let game_root = temp.path().join("game");
tokio::fs::create_dir_all(&game_root)
.await
.expect("game root should be created");
tokio::fs::write(game_root.join(".version.ini.discarded"), b"old")
.await
.expect("discarded sentinel should be written");
let desc = GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
};
let buffer = VersionIniBuffer::new(&desc).expect("buffer should be created");
buffer
.write_at(0, b"20250101")
.await
.expect("version should be buffered");
commit_version_ini_buffer(&game_root, &buffer)
.await
.expect("version sentinel should commit");
assert_eq!(
std::fs::read(game_root.join("version.ini")).expect("version.ini should exist"),
b"20250101"
);
assert!(!game_root.join(".version.ini.tmp").exists());
assert!(!game_root.join(".version.ini.discarded").exists());
}
#[tokio::test]
async fn begin_version_ini_transaction_parks_existing_sentinel() {
let temp = TempDir::new("lanspread-download");
let game_root = temp.path().join("game");
tokio::fs::create_dir_all(&game_root)
.await
.expect("game root should be created");
tokio::fs::write(game_root.join("version.ini"), b"20240101")
.await
.expect("version sentinel should be written");
tokio::fs::write(game_root.join(".version.ini.tmp"), b"partial")
.await
.expect("tmp sentinel should be written");
begin_version_ini_transaction(&game_root)
.await
.expect("transaction should begin");
assert!(!game_root.join("version.ini").exists());
assert!(!game_root.join(".version.ini.tmp").exists());
assert_eq!(
std::fs::read(game_root.join(".version.ini.discarded"))
.expect("discarded sentinel should exist"),
b"20240101"
);
}
#[tokio::test]
async fn rollback_version_ini_transaction_sweeps_transients() {
let temp = TempDir::new("lanspread-download");
let game_root = temp.path().join("game");
tokio::fs::create_dir_all(&game_root)
.await
.expect("game root should be created");
tokio::fs::write(game_root.join(".version.ini.tmp"), b"partial")
.await
.expect("tmp sentinel should be written");
tokio::fs::write(game_root.join(".version.ini.discarded"), b"old")
.await
.expect("discarded sentinel should be written");
rollback_version_ini_transaction(&game_root).await;
assert!(!game_root.join(".version.ini.tmp").exists());
assert!(!game_root.join(".version.ini.discarded").exists());
}
}