274b9d2fd4
Add deeper peer CLI coverage for file-transfer integrity and multi-peer chunking. The alpha fixture now carries a real renamed RAR archive larger than 100 MB for alienswarm, which gives the chunk planner enough work to split a single game archive across multiple peers. Expose completed chunk source details as a peer event and have the CLI print that event as JSONL. This keeps transfer behavior in lanspread-peer while the CLI remains a harness that reports what the peer runtime did. The Tauri shell logs the event at debug level so the shared PeerEvent enum stays exhaustive. Document the new S13/S14 scenarios and record the manual run evidence, including SHA-256 manifests and the per-peer byte split for the large archive. Test Plan: - just fmt - just test - just peer-cli-build - just clippy - just peer-cli-image - unrar t -idq crates/lanspread-peer-cli/fixtures/fixture-alpha/alienswarm/alienswarm.eti - Manual peer CLI: bravo -> deep-small-client bfbc2 download with matching SHA-256 manifests - Manual peer CLI: alpha -> deep-stage-b alienswarm download with matching SHA-256 manifests - Manual peer CLI: alpha + deep-stage-b -> deep-stage-c alienswarm download with chunk events from both peers and matching SHA-256 manifests Refs: PEER_CLI_SCENARIOS.md S13 S14
408 lines
13 KiB
Rust
408 lines
13 KiB
Rust
//! Peer-to-peer game distribution system.
|
|
//!
|
|
//! This crate provides the core P2P networking engine for `LanSpread`, handling:
|
|
//! - Peer discovery via mDNS
|
|
//! - QUIC-based communication
|
|
//! - Game file synchronization with chunked transfers
|
|
//! - Consensus validation for file integrity
|
|
|
|
#![allow(clippy::missing_errors_doc)]
|
|
|
|
// =============================================================================
|
|
// Module declarations
|
|
// =============================================================================
|
|
|
|
mod config;
|
|
mod context;
|
|
mod download;
|
|
mod error;
|
|
mod events;
|
|
mod handlers;
|
|
mod identity;
|
|
mod install;
|
|
mod library;
|
|
mod local_games;
|
|
mod network;
|
|
mod path_validation;
|
|
mod peer;
|
|
mod peer_db;
|
|
mod remote_peer;
|
|
mod services;
|
|
mod startup;
|
|
#[cfg(test)]
|
|
mod test_support;
|
|
|
|
// =============================================================================
|
|
// Public re-exports
|
|
// =============================================================================
|
|
|
|
use std::{collections::HashSet, net::SocketAddr, path::PathBuf, sync::Arc};
|
|
|
|
pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT};
|
|
pub use error::PeerError;
|
|
pub use install::{UnpackFuture, Unpacker};
|
|
use lanspread_db::db::{Game, GameFileDescription};
|
|
pub use peer_db::{
|
|
MajorityValidationResult,
|
|
PeerGameDB,
|
|
PeerId,
|
|
PeerInfo,
|
|
PeerSnapshot,
|
|
PeerUpsert,
|
|
};
|
|
use tokio::sync::{
|
|
RwLock,
|
|
mpsc::{UnboundedReceiver, UnboundedSender},
|
|
};
|
|
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
|
|
|
pub use crate::startup::PeerRuntimeHandle;
|
|
use crate::{
|
|
context::Ctx,
|
|
handlers::{
|
|
GameDetailSource,
|
|
handle_connect_peer_command,
|
|
handle_download_game_files_command,
|
|
handle_get_game_command,
|
|
handle_get_peer_count_command,
|
|
handle_install_game_command,
|
|
handle_list_games_command,
|
|
handle_set_game_dir_command,
|
|
handle_uninstall_game_command,
|
|
load_local_library,
|
|
},
|
|
};
|
|
|
|
// =============================================================================
|
|
// Public API types
|
|
// =============================================================================
|
|
|
|
/// Events sent from the peer system to the UI.
|
|
#[derive(Debug, strum::IntoStaticStr)]
|
|
pub enum PeerEvent {
|
|
/// The local QUIC server is listening and ready to accept peer connections.
|
|
LocalPeerReady { peer_id: String, addr: SocketAddr },
|
|
/// List of available games from peers.
|
|
ListGames(Vec<Game>),
|
|
/// File descriptions for a specific game.
|
|
GotGameFiles {
|
|
id: String,
|
|
file_descriptions: Vec<GameFileDescription>,
|
|
},
|
|
/// Download has started for a game.
|
|
DownloadGameFilesBegin { id: String },
|
|
/// A file chunk has been downloaded from a peer.
|
|
DownloadGameFileChunkFinished {
|
|
id: String,
|
|
peer_addr: SocketAddr,
|
|
relative_path: String,
|
|
offset: u64,
|
|
length: u64,
|
|
},
|
|
/// Download has completed successfully.
|
|
DownloadGameFilesFinished { id: String },
|
|
/// Download has failed.
|
|
DownloadGameFilesFailed { id: String },
|
|
/// All peers with the game have disconnected during download.
|
|
DownloadGameFilesAllPeersGone { id: String },
|
|
/// Install or update transaction has started for a game.
|
|
InstallGameBegin {
|
|
id: String,
|
|
operation: InstallOperation,
|
|
},
|
|
/// Install or update transaction has completed successfully.
|
|
InstallGameFinished { id: String },
|
|
/// Install or update transaction has failed after rollback.
|
|
InstallGameFailed { id: String },
|
|
/// Uninstall transaction has started for a game.
|
|
UninstallGameBegin { id: String },
|
|
/// Uninstall transaction has completed successfully.
|
|
UninstallGameFinished { id: String },
|
|
/// Uninstall transaction has failed after rollback.
|
|
UninstallGameFailed { id: String },
|
|
/// No peers have the requested game.
|
|
NoPeersHaveGame { id: String },
|
|
/// A peer has connected.
|
|
PeerConnected(SocketAddr),
|
|
/// A peer has disconnected.
|
|
PeerDisconnected(SocketAddr),
|
|
/// A new peer was discovered via mDNS.
|
|
PeerDiscovered(SocketAddr),
|
|
/// A peer was lost (timed out or disconnected).
|
|
PeerLost(SocketAddr),
|
|
/// The total peer count has changed.
|
|
PeerCountUpdated(usize),
|
|
/// Local games have been scanned, with authoritative in-progress work.
|
|
LocalGamesUpdated {
|
|
games: Vec<Game>,
|
|
active_operations: Vec<ActiveOperation>,
|
|
},
|
|
/// A required peer runtime component failed.
|
|
RuntimeFailed {
|
|
component: PeerRuntimeComponent,
|
|
error: String,
|
|
},
|
|
}
|
|
|
|
/// Long-running peer runtime components reported in failure events.
|
|
#[derive(Clone, Copy, Debug, strum::IntoStaticStr)]
|
|
pub enum PeerRuntimeComponent {
|
|
/// Command/control message loop.
|
|
CommandLoop,
|
|
/// Inbound QUIC server and its mDNS advertisement.
|
|
QuicServer,
|
|
/// mDNS peer discovery.
|
|
Discovery,
|
|
/// Peer liveness monitoring.
|
|
Liveness,
|
|
/// Local game directory monitor.
|
|
LocalMonitor,
|
|
}
|
|
|
|
/// Install-side operation represented in lifecycle events.
|
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::IntoStaticStr)]
|
|
pub enum InstallOperation {
|
|
/// Fresh install into a missing `local/` directory.
|
|
Installing,
|
|
/// Update that replaces an existing `local/` directory.
|
|
Updating,
|
|
}
|
|
|
|
/// In-progress operation snapshot attached to local library updates.
|
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
|
pub struct ActiveOperation {
|
|
pub id: String,
|
|
pub operation: ActiveOperationKind,
|
|
}
|
|
|
|
/// Operation kinds visible to UI reconciliation.
|
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::IntoStaticStr)]
|
|
pub enum ActiveOperationKind {
|
|
/// Downloading or replacing archive files.
|
|
Downloading,
|
|
/// Extracting into a previously uninstalled game root.
|
|
Installing,
|
|
/// Replacing an existing `local/` install.
|
|
Updating,
|
|
/// Removing an existing `local/` install.
|
|
Uninstalling,
|
|
}
|
|
|
|
/// Commands sent to the peer system from the UI.
|
|
#[derive(Clone, Debug)]
|
|
pub enum PeerCommand {
|
|
/// Request a list of all available games.
|
|
ListGames,
|
|
/// Request file details for a specific game, serving local files when available.
|
|
GetGame(String),
|
|
/// Request the latest peer-advertised file details for an update.
|
|
FetchLatestFromPeers { id: String },
|
|
/// Download game files.
|
|
DownloadGameFiles {
|
|
id: String,
|
|
file_descriptions: Vec<GameFileDescription>,
|
|
},
|
|
/// Download game files with an explicit install policy.
|
|
DownloadGameFilesWithOptions {
|
|
id: String,
|
|
file_descriptions: Vec<GameFileDescription>,
|
|
install_after_download: bool,
|
|
},
|
|
/// Install already-downloaded archives into `local/`.
|
|
InstallGame { id: String },
|
|
/// Remove only the `local/` install for a game.
|
|
UninstallGame { id: String },
|
|
/// Set the local game directory.
|
|
SetGameDir(PathBuf),
|
|
/// Request the current peer count.
|
|
GetPeerCount,
|
|
/// Connect directly to a peer address without waiting for mDNS discovery.
|
|
ConnectPeer(SocketAddr),
|
|
}
|
|
|
|
/// Optional startup settings for non-GUI callers and tests.
|
|
#[derive(Clone, Debug, Default)]
|
|
pub struct PeerStartOptions {
|
|
/// Directory used for peer identity and other state.
|
|
pub state_dir: Option<PathBuf>,
|
|
}
|
|
|
|
// =============================================================================
|
|
// Public API functions
|
|
// =============================================================================
|
|
|
|
/// Initialize and start the peer system.
|
|
///
|
|
/// This is the main entry point for the peer system. It starts all background
|
|
/// services (server, discovery, ping, local monitor) and returns a handle that
|
|
/// owns the command sender plus a shutdown signal callers can use for clean
|
|
/// teardown.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `game_dir` - Path to the local game directory
|
|
/// * `tx_notify_ui` - Channel for sending events to the UI
|
|
/// * `peer_game_db` - Shared peer game database
|
|
#[allow(clippy::implicit_hasher)]
|
|
pub fn start_peer(
|
|
game_dir: impl Into<PathBuf>,
|
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
|
unpacker: Arc<dyn Unpacker>,
|
|
catalog: Arc<RwLock<HashSet<String>>>,
|
|
) -> eyre::Result<PeerRuntimeHandle> {
|
|
start_peer_with_options(
|
|
game_dir,
|
|
tx_notify_ui,
|
|
peer_game_db,
|
|
unpacker,
|
|
catalog,
|
|
PeerStartOptions::default(),
|
|
)
|
|
}
|
|
|
|
/// Initialize and start the peer system with explicit startup settings.
|
|
#[allow(clippy::implicit_hasher)]
|
|
pub fn start_peer_with_options(
|
|
game_dir: impl Into<PathBuf>,
|
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
|
unpacker: Arc<dyn Unpacker>,
|
|
catalog: Arc<RwLock<HashSet<String>>>,
|
|
options: PeerStartOptions,
|
|
) -> eyre::Result<PeerRuntimeHandle> {
|
|
let PeerStartOptions { state_dir } = options;
|
|
let game_dir = game_dir.into();
|
|
log::info!(
|
|
"Starting peer system with game directory: {}",
|
|
game_dir.display()
|
|
);
|
|
let peer_id = identity::load_or_create_peer_id(state_dir.as_deref())?;
|
|
|
|
let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel();
|
|
|
|
Ok(startup::spawn_peer_runtime(
|
|
tx_control,
|
|
rx_control,
|
|
tx_notify_ui,
|
|
peer_game_db,
|
|
peer_id,
|
|
game_dir,
|
|
unpacker,
|
|
catalog,
|
|
))
|
|
}
|
|
|
|
/// Main peer execution loop that handles peer commands and manages the peer system.
|
|
#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
|
|
async fn run_peer(
|
|
mut rx_control: UnboundedReceiver<PeerCommand>,
|
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
|
peer_id: String,
|
|
game_dir: PathBuf,
|
|
unpacker: Arc<dyn Unpacker>,
|
|
shutdown: CancellationToken,
|
|
task_tracker: TaskTracker,
|
|
catalog: Arc<RwLock<HashSet<String>>>,
|
|
) -> eyre::Result<()> {
|
|
let ctx = Ctx::new(
|
|
peer_game_db,
|
|
peer_id,
|
|
game_dir,
|
|
unpacker,
|
|
shutdown,
|
|
task_tracker,
|
|
catalog,
|
|
);
|
|
if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await {
|
|
log::error!("Failed to load initial local game database: {err}");
|
|
}
|
|
startup::spawn_startup_services(&ctx, &tx_notify_ui);
|
|
if let Err(err) = handle_peer_commands(&ctx, &tx_notify_ui, &mut rx_control).await {
|
|
let error = err.to_string();
|
|
log::error!("Peer command loop failed: {error}");
|
|
events::send(
|
|
&tx_notify_ui,
|
|
PeerEvent::RuntimeFailed {
|
|
component: PeerRuntimeComponent::CommandLoop,
|
|
error,
|
|
},
|
|
);
|
|
ctx.shutdown.cancel();
|
|
}
|
|
startup::send_goodbye_notifications(&ctx).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_peer_commands(
|
|
ctx: &Ctx,
|
|
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
|
rx_control: &mut UnboundedReceiver<PeerCommand>,
|
|
) -> eyre::Result<()> {
|
|
loop {
|
|
let cmd = tokio::select! {
|
|
() = ctx.shutdown.cancelled() => return Ok(()),
|
|
cmd = rx_control.recv() => cmd,
|
|
};
|
|
|
|
let Some(cmd) = cmd else {
|
|
if ctx.shutdown.is_cancelled() {
|
|
return Ok(());
|
|
}
|
|
eyre::bail!("peer command channel closed unexpectedly");
|
|
};
|
|
|
|
match cmd {
|
|
PeerCommand::ListGames => {
|
|
handle_list_games_command(ctx, tx_notify_ui).await;
|
|
}
|
|
PeerCommand::GetGame(id) => {
|
|
handle_get_game_command(ctx, tx_notify_ui, id, GameDetailSource::LocalOrPeers)
|
|
.await;
|
|
}
|
|
PeerCommand::FetchLatestFromPeers { id } => {
|
|
handle_get_game_command(ctx, tx_notify_ui, id, GameDetailSource::LatestPeersOnly)
|
|
.await;
|
|
}
|
|
PeerCommand::DownloadGameFiles {
|
|
id,
|
|
file_descriptions,
|
|
} => {
|
|
handle_download_game_files_command(ctx, tx_notify_ui, id, file_descriptions, true)
|
|
.await;
|
|
}
|
|
PeerCommand::DownloadGameFilesWithOptions {
|
|
id,
|
|
file_descriptions,
|
|
install_after_download,
|
|
} => {
|
|
handle_download_game_files_command(
|
|
ctx,
|
|
tx_notify_ui,
|
|
id,
|
|
file_descriptions,
|
|
install_after_download,
|
|
)
|
|
.await;
|
|
}
|
|
PeerCommand::InstallGame { id } => {
|
|
handle_install_game_command(ctx, tx_notify_ui, id).await;
|
|
}
|
|
PeerCommand::UninstallGame { id } => {
|
|
handle_uninstall_game_command(ctx, tx_notify_ui, id).await;
|
|
}
|
|
PeerCommand::SetGameDir(game_dir) => {
|
|
handle_set_game_dir_command(ctx, tx_notify_ui, game_dir).await;
|
|
}
|
|
PeerCommand::GetPeerCount => {
|
|
handle_get_peer_count_command(ctx, tx_notify_ui).await;
|
|
}
|
|
PeerCommand::ConnectPeer(addr) => {
|
|
handle_connect_peer_command(ctx, tx_notify_ui, addr).await;
|
|
}
|
|
}
|
|
}
|
|
}
|