Files
lanspread/crates/lanspread-peer/src/download/planning.rs
T
ddidderr 274b9d2fd4 test(peer-cli): add large exact-transfer coverage
Add deeper peer CLI coverage for file-transfer integrity and multi-peer
chunking. The alpha fixture now carries a real renamed RAR archive larger
than 100 MB for alienswarm, which gives the chunk planner enough work to
split a single game archive across multiple peers.

Expose completed chunk source details as a peer event and have the CLI print
that event as JSONL. This keeps transfer behavior in lanspread-peer while the
CLI remains a harness that reports what the peer runtime did. The Tauri shell
logs the event at debug level so the shared PeerEvent enum stays exhaustive.

Document the new S13/S14 scenarios and record the manual run evidence,
including SHA-256 manifests and the per-peer byte split for the large archive.

Test Plan:
- just fmt
- just test
- just peer-cli-build
- just clippy
- just peer-cli-image
- unrar t -idq crates/lanspread-peer-cli/fixtures/fixture-alpha/alienswarm/alienswarm.eti
- Manual peer CLI: bravo -> deep-small-client bfbc2 download with matching SHA-256 manifests
- Manual peer CLI: alpha -> deep-stage-b alienswarm download with matching SHA-256 manifests
- Manual peer CLI: alpha + deep-stage-b -> deep-stage-c alienswarm download with chunk events from both peers and matching SHA-256 manifests

Refs: PEER_CLI_SCENARIOS.md S13 S14
2026-05-17 10:25:26 +02:00

399 lines
13 KiB
Rust

use std::{collections::HashMap, net::SocketAddr, path::Path};
use lanspread_db::db::GameFileDescription;
use tokio::{fs::OpenOptions, sync::mpsc::UnboundedSender};
use crate::{PeerEvent, config::CHUNK_SIZE, path_validation::validate_game_file_path};
/// Represents a chunk of a file to be downloaded.
#[derive(Debug, Clone)]
pub(super) struct DownloadChunk {
pub(super) relative_path: String,
pub(super) offset: u64,
pub(super) length: u64,
pub(super) retry_count: usize,
pub(super) last_peer: Option<SocketAddr>,
}
/// Download plan for a single peer.
#[derive(Debug, Default)]
pub(super) struct PeerDownloadPlan {
pub(super) chunks: Vec<DownloadChunk>,
pub(super) whole_files: Vec<GameFileDescription>,
}
/// Result of downloading a chunk.
#[derive(Debug)]
pub(super) struct ChunkDownloadResult {
pub(super) chunk: DownloadChunk,
pub(super) result: eyre::Result<()>,
pub(super) peer_addr: SocketAddr,
}
/// Extracts the root `version.ini` descriptor while keeping every descriptor in
/// the transfer list. The chunk writer diverts the sentinel bytes into memory.
pub(super) fn extract_version_descriptor(
game_id: &str,
game_file_descs: Vec<GameFileDescription>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) -> eyre::Result<(GameFileDescription, Vec<GameFileDescription>)> {
let mut version_descs = Vec::new();
let mut transfer_descs = Vec::new();
for desc in game_file_descs {
if desc.is_version_ini() {
version_descs.push(desc.clone());
}
transfer_descs.push(desc);
}
if version_descs.len() != 1 {
let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
});
eyre::bail!(
"expected exactly one root-level version.ini sentinel for {game_id}, found {}",
version_descs.len()
);
}
let version_desc = version_descs.remove(0);
Ok((version_desc, transfer_descs))
}
/// Prepares storage for game files by creating directories and pre-allocating files.
pub(super) async fn prepare_game_storage(
games_folder: &Path,
file_descs: &[GameFileDescription],
) -> eyre::Result<()> {
for desc in file_descs {
if desc.is_version_ini() {
continue;
}
// Validate the path to prevent directory traversal
let validated_path = validate_game_file_path(games_folder, &desc.relative_path)?;
if desc.is_dir {
tokio::fs::create_dir_all(&validated_path).await?;
} else {
if let Some(parent) = validated_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
// Create and pre-allocate the file with the expected size
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&validated_path)
.await?;
// Pre-allocate the file with the expected size
let size = desc.size;
if let Err(e) = file.set_len(size).await {
log::warn!(
"Failed to pre-allocate file {} (size: {}): {}",
desc.relative_path,
size,
e
);
// Continue without pre-allocation - the file will grow as chunks are written
} else {
log::debug!(
"Pre-allocated file {} with {} bytes",
desc.relative_path,
size
);
}
}
}
Ok(())
}
/// Resolves which peers have a specific file.
pub(super) fn resolve_file_peers<'a>(
relative_path: &str,
file_peer_map: &'a HashMap<String, Vec<SocketAddr>>,
fallback: &'a [SocketAddr],
) -> &'a [SocketAddr] {
if let Some(peers) = file_peer_map.get(relative_path)
&& !peers.is_empty()
{
return peers;
}
fallback
}
/// Builds download plans distributing files across peers.
pub(super) fn build_peer_plans(
peers: &[SocketAddr],
file_descs: &[GameFileDescription],
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
) -> HashMap<SocketAddr, PeerDownloadPlan> {
let mut plans: HashMap<SocketAddr, PeerDownloadPlan> = HashMap::new();
if peers.is_empty() {
return plans;
}
let mut peer_index = 0usize;
for desc in file_descs.iter().filter(|d| !d.is_dir) {
let size = desc.file_size();
let eligible_peers = resolve_file_peers(&desc.relative_path, file_peer_map, peers);
if eligible_peers.is_empty() {
continue;
}
if size == 0 {
let peer = eligible_peers[peer_index % eligible_peers.len()];
peer_index += 1;
plans.entry(peer).or_default().chunks.push(DownloadChunk {
relative_path: desc.relative_path.clone(),
offset: 0,
length: 0,
retry_count: 0,
last_peer: Some(peer),
});
continue;
}
let mut offset = 0u64;
while offset < size {
let length = std::cmp::min(CHUNK_SIZE, size - offset);
let peer = eligible_peers[peer_index % eligible_peers.len()];
peer_index += 1;
plans.entry(peer).or_default().chunks.push(DownloadChunk {
relative_path: desc.relative_path.clone(),
offset,
length,
retry_count: 0,
last_peer: Some(peer),
});
offset += length;
}
}
plans
}
#[cfg(test)]
mod tests {
use lanspread_db::db::GameFileDescription;
use super::*;
use crate::test_support::TempDir;
fn loopback_addr(port: u16) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], port))
}
#[test]
fn build_peer_plans_handles_partial_final_chunk() {
let peers = vec![loopback_addr(12000), loopback_addr(12001)];
let file_size = CHUNK_SIZE * 2 + CHUNK_SIZE / 4;
let mut file_peer_map = HashMap::new();
file_peer_map.insert("game/file.dat".to_string(), peers.clone());
let file_descs = vec![GameFileDescription {
game_id: "test".to_string(),
relative_path: "game/file.dat".to_string(),
is_dir: false,
size: file_size,
}];
let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let mut chunks: Vec<_> = plans.values().flat_map(|plan| plan.chunks.iter()).collect();
assert_eq!(chunks.len(), 3, "expected three chunks for 2.25 blocks");
chunks.sort_by_key(|chunk| chunk.offset);
let last_chunk = chunks.last().expect("last chunk exists");
assert_eq!(last_chunk.offset, CHUNK_SIZE * 2);
assert_eq!(last_chunk.length, file_size - last_chunk.offset);
assert_eq!(last_chunk.length, CHUNK_SIZE / 4);
assert_eq!(
last_chunk.offset + last_chunk.length,
file_size,
"last chunk should finish the file"
);
}
#[test]
fn build_peer_plans_spreads_large_file_chunks_across_shared_peers() {
let peers = vec![loopback_addr(12000), loopback_addr(12001)];
let large_file = "game/large.eti";
let file_size = 120 * 1024 * 1024;
let mut file_peer_map = HashMap::new();
file_peer_map.insert("game/version.ini".to_string(), peers.clone());
file_peer_map.insert(large_file.to_string(), peers.clone());
let file_descs = vec![
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 9,
},
GameFileDescription {
game_id: "game".to_string(),
relative_path: large_file.to_string(),
is_dir: false,
size: file_size,
},
];
let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let mut chunk_counts = HashMap::new();
let mut byte_counts = HashMap::new();
for (peer, plan) in plans {
for chunk in plan.chunks {
if chunk.relative_path == large_file {
*chunk_counts.entry(peer).or_insert(0usize) += 1;
*byte_counts.entry(peer).or_insert(0u64) += chunk.length;
}
}
}
assert_eq!(chunk_counts.get(&peers[0]), Some(&2));
assert_eq!(chunk_counts.get(&peers[1]), Some(&2));
let peer_a_bytes = byte_counts.get(&peers[0]).copied().unwrap_or_default();
let peer_b_bytes = byte_counts.get(&peers[1]).copied().unwrap_or_default();
assert_eq!(peer_a_bytes + peer_b_bytes, file_size);
assert!(
peer_a_bytes.abs_diff(peer_b_bytes) <= CHUNK_SIZE,
"large file bytes should be balanced within one chunk: {peer_a_bytes} vs {peer_b_bytes}"
);
}
#[test]
fn build_peer_plans_respects_file_peer_map() {
let shared_a = loopback_addr(12010);
let shared_b = loopback_addr(12011);
let exclusive = loopback_addr(12012);
let peers = vec![shared_a, shared_b, exclusive];
let mut file_peer_map = HashMap::new();
file_peer_map.insert("shared.bin".to_string(), vec![shared_a, shared_b]);
file_peer_map.insert("exclusive.bin".to_string(), vec![exclusive]);
let file_descs = vec![
GameFileDescription {
game_id: "test".to_string(),
relative_path: "shared.bin".to_string(),
is_dir: false,
size: CHUNK_SIZE * 2,
},
GameFileDescription {
game_id: "test".to_string(),
relative_path: "exclusive.bin".to_string(),
is_dir: false,
size: CHUNK_SIZE,
},
];
let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let exclusive_plan = plans
.get(&exclusive)
.expect("exclusive peer should have a plan");
assert!(
exclusive_plan
.chunks
.iter()
.all(|chunk| chunk.relative_path == "exclusive.bin"),
"exclusive peer should only receive exclusive.bin chunks"
);
for (peer, plan) in plans {
for chunk in plan.chunks {
match chunk.relative_path.as_str() {
"exclusive.bin" => assert_eq!(
peer, exclusive,
"exclusive.bin chunks should only be assigned to the exclusive peer"
),
"shared.bin" => assert!(
peer == shared_a || peer == shared_b,
"shared.bin chunks must stay within shared peers"
),
other => panic!("unexpected file in plan: {other}"),
}
}
}
}
#[tokio::test]
async fn prepare_game_storage_skips_version_ini_sentinel() {
let temp = TempDir::new("lanspread-download");
let descs = vec![GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
}];
prepare_game_storage(temp.path(), &descs)
.await
.expect("storage preparation should succeed");
assert!(!temp.path().join("game").join("version.ini").exists());
}
#[test]
fn version_descriptor_extraction_keeps_nested_decoy_in_transfer_list() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let nested_decoy = vec![
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
},
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/local/version.ini".to_string(),
is_dir: false,
size: 8,
},
];
let (version, transfer) =
extract_version_descriptor("game", nested_decoy, &tx).expect("only one root sentinel");
assert_eq!(version.relative_path, "game/version.ini");
assert_eq!(transfer.len(), 2);
}
#[test]
fn version_descriptor_extraction_requires_a_root_version_ini() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let missing = vec![GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/archive.eti".to_string(),
is_dir: false,
size: 1,
}];
assert!(extract_version_descriptor("game", missing, &tx).is_err());
}
#[test]
fn version_descriptor_extraction_rejects_duplicate_root_version_ini() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let multiple = vec![
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
},
GameFileDescription {
game_id: "game".to_string(),
relative_path: "game/version.ini".to_string(),
is_dir: false,
size: 8,
},
];
assert!(extract_version_descriptor("game", multiple, &tx).is_err());
}
}