Files
lanspread/crates/lanspread-peer/src/lib.rs
T
ddidderr e8e7d7a93e feat: store launcher state outside game dirs
Move launcher-owned metadata from game roots into the configured peer state
area. Peer identity, the local library index, install intent logs, and setup
markers now live under app/CLI state instead of being written beside games.
The Tauri shell passes its app data directory into the peer, and the peer CLI
runs the same path through its explicit --state-dir.

Add a dedicated pre-start migration phase for legacy files. It migrates the
old global library index, per-game install intents, and the old first-start
marker into app state, then deletes legacy files only after the replacement
write succeeds. Normal scan, install, recovery, and transfer paths no longer
read legacy state files.

Rename the old first-start meaning to setup_done and only set it after
launching game_setup.cmd. Start/setup scripts keep the shared argument shape,
while server_start.cmd now uses cmd /k and a visible window so server logs stay
open for inspection.

While validating the Docker scenario matrix, make download terminal events
come from the handler after local state refresh and operation cleanup. This
makes download-finished/download-failed safe points for immediate follow-up CLI
commands. Also update the multi-peer chunking scenario to use a sparse archive
large enough to actually span multiple production chunks.

Test Plan:
- just fmt
- just test
- just frontend-test
- just build
- just clippy
- git diff --check
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py

Refs: local app-state migration discussion
2026-05-21 17:04:00 +02:00

450 lines
15 KiB
Rust

//! Peer-to-peer game distribution system.
//!
//! This crate provides the core P2P networking engine for `LanSpread`, handling:
//! - Peer discovery via mDNS
//! - QUIC-based communication
//! - Game file synchronization with chunked transfers
//! - Consensus validation for file integrity
#![allow(clippy::missing_errors_doc)]
// =============================================================================
// Module declarations
// =============================================================================
mod config;
mod context;
mod download;
mod error;
mod events;
mod handlers;
mod identity;
mod install;
mod library;
mod local_games;
mod migration;
mod network;
mod path_validation;
mod peer;
mod peer_db;
mod remote_peer;
mod services;
mod startup;
mod state_paths;
#[cfg(test)]
mod test_support;
// =============================================================================
// Public re-exports
// =============================================================================
use std::{collections::HashSet, net::SocketAddr, path::PathBuf, sync::Arc};
pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT};
pub use error::PeerError;
pub use install::{UnpackFuture, Unpacker};
use lanspread_db::db::{Game, GameFileDescription};
pub use migration::{MigrationReport, migrate_legacy_state};
pub use peer_db::{
MajorityValidationResult,
PeerGameDB,
PeerId,
PeerInfo,
PeerSnapshot,
PeerUpsert,
};
use tokio::sync::{
RwLock,
mpsc::{UnboundedReceiver, UnboundedSender},
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use crate::{
context::Ctx,
handlers::{
GameDetailSource,
handle_cancel_download_command,
handle_connect_peer_command,
handle_download_game_files_command,
handle_get_game_command,
handle_get_peer_count_command,
handle_install_game_command,
handle_list_games_command,
handle_remove_downloaded_game_command,
handle_set_game_dir_command,
handle_uninstall_game_command,
load_local_library,
},
state_paths::resolve_state_dir,
};
pub use crate::{startup::PeerRuntimeHandle, state_paths::setup_done_path};
// =============================================================================
// Public API types
// =============================================================================
/// Events sent from the peer system to the UI.
#[derive(Debug, strum::IntoStaticStr)]
pub enum PeerEvent {
/// The local QUIC server is listening and ready to accept peer connections.
LocalPeerReady { peer_id: String, addr: SocketAddr },
/// List of available games from peers.
ListGames(Vec<Game>),
/// File descriptions for a specific game.
GotGameFiles {
id: String,
file_descriptions: Vec<GameFileDescription>,
},
/// Download has started for a game.
DownloadGameFilesBegin { id: String },
/// A file chunk has been downloaded from a peer.
DownloadGameFileChunkFinished {
id: String,
peer_addr: SocketAddr,
relative_path: String,
offset: u64,
length: u64,
},
/// Download progress sampled while game files are being received.
DownloadGameFilesProgress(DownloadProgress),
/// Download has completed successfully.
DownloadGameFilesFinished { id: String },
/// Download has failed.
DownloadGameFilesFailed { id: String },
/// All peers with the game have disconnected during download.
DownloadGameFilesAllPeersGone { id: String },
/// Install or update transaction has started for a game.
InstallGameBegin {
id: String,
operation: InstallOperation,
},
/// Install or update transaction has completed successfully.
InstallGameFinished { id: String },
/// Install or update transaction has failed after rollback.
InstallGameFailed { id: String },
/// Uninstall transaction has started for a game.
UninstallGameBegin { id: String },
/// Uninstall transaction has completed successfully.
UninstallGameFinished { id: String },
/// Uninstall transaction has failed after rollback.
UninstallGameFailed { id: String },
/// Downloaded archive removal has started for an uninstalled game.
RemoveDownloadedGameBegin { id: String },
/// Downloaded archive removal has completed successfully.
RemoveDownloadedGameFinished { id: String },
/// Downloaded archive removal has failed before deleting the game root.
RemoveDownloadedGameFailed { id: String },
/// No peers have the requested game.
NoPeersHaveGame { id: String },
/// A peer has connected.
PeerConnected(SocketAddr),
/// A peer has disconnected.
PeerDisconnected(SocketAddr),
/// A new peer was discovered via mDNS.
PeerDiscovered(SocketAddr),
/// A peer was lost (timed out or disconnected).
PeerLost(SocketAddr),
/// The total peer count has changed.
PeerCountUpdated(usize),
/// The local library contents changed after a scan.
LocalLibraryChanged { games: Vec<Game> },
/// The set of in-progress local operations changed.
ActiveOperationsChanged {
active_operations: Vec<ActiveOperation>,
},
/// A required peer runtime component failed.
RuntimeFailed {
component: PeerRuntimeComponent,
error: String,
},
}
/// Sampled byte progress for one active game download.
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)]
pub struct DownloadProgress {
pub id: String,
pub downloaded_bytes: u64,
pub total_bytes: u64,
pub bytes_per_second: u64,
/// Unique peers currently streaming at least one chunk for this download.
pub active_peer_count: usize,
}
/// Long-running peer runtime components reported in failure events.
#[derive(Clone, Copy, Debug, strum::IntoStaticStr)]
pub enum PeerRuntimeComponent {
/// Command/control message loop.
CommandLoop,
/// Inbound QUIC server and its mDNS advertisement.
QuicServer,
/// mDNS peer discovery.
Discovery,
/// Peer liveness monitoring.
Liveness,
/// Local game directory monitor.
LocalMonitor,
}
/// Install-side operation represented in lifecycle events.
#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::IntoStaticStr)]
pub enum InstallOperation {
/// Fresh install into a missing `local/` directory.
Installing,
/// Update that replaces an existing `local/` directory.
Updating,
}
/// In-progress operation snapshot sent when operation state changes.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ActiveOperation {
pub id: String,
pub operation: ActiveOperationKind,
}
/// Operation kinds visible to UI reconciliation.
#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::IntoStaticStr)]
pub enum ActiveOperationKind {
/// Downloading or replacing archive files.
Downloading,
/// Extracting into a previously uninstalled game root.
Installing,
/// Replacing an existing `local/` install.
Updating,
/// Removing an existing `local/` install.
Uninstalling,
/// Removing downloaded archive files for an uninstalled game.
RemovingDownload,
}
/// Commands sent to the peer system from the UI.
#[derive(Clone, Debug)]
pub enum PeerCommand {
/// Request a list of all available games.
ListGames,
/// Request file details for a specific game, serving local files when available.
GetGame(String),
/// Request the latest peer-advertised file details for an update.
FetchLatestFromPeers { id: String },
/// Download game files.
DownloadGameFiles {
id: String,
file_descriptions: Vec<GameFileDescription>,
},
/// Download game files with an explicit install policy.
DownloadGameFilesWithOptions {
id: String,
file_descriptions: Vec<GameFileDescription>,
install_after_download: bool,
},
/// Install already-downloaded archives into `local/`.
InstallGame { id: String },
/// Remove only the `local/` install for a game.
UninstallGame { id: String },
/// Remove downloaded archive files for an uninstalled game.
RemoveDownloadedGame { id: String },
/// Cancel an active peer download without emitting a user-facing failure.
CancelDownload { id: String },
/// Set the local game directory.
SetGameDir(PathBuf),
/// Request the current peer count.
GetPeerCount,
/// Connect directly to a peer address without waiting for mDNS discovery.
ConnectPeer(SocketAddr),
}
/// Optional startup settings for non-GUI callers and tests.
#[derive(Clone, Debug, Default)]
pub struct PeerStartOptions {
/// Directory used for peer identity and other state.
pub state_dir: Option<PathBuf>,
}
// =============================================================================
// Public API functions
// =============================================================================
/// Initialize and start the peer system.
///
/// This is the main entry point for the peer system. It starts all background
/// services (server, discovery, ping, local monitor) and returns a handle that
/// owns the command sender plus a shutdown signal callers can use for clean
/// teardown.
///
/// # Arguments
///
/// * `game_dir` - Path to the local game directory
/// * `tx_notify_ui` - Channel for sending events to the UI
/// * `peer_game_db` - Shared peer game database
#[allow(clippy::implicit_hasher)]
pub fn start_peer(
game_dir: impl Into<PathBuf>,
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<HashSet<String>>>,
) -> eyre::Result<PeerRuntimeHandle> {
start_peer_with_options(
game_dir,
tx_notify_ui,
peer_game_db,
unpacker,
catalog,
PeerStartOptions::default(),
)
}
/// Initialize and start the peer system with explicit startup settings.
#[allow(clippy::implicit_hasher)]
pub fn start_peer_with_options(
game_dir: impl Into<PathBuf>,
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<HashSet<String>>>,
options: PeerStartOptions,
) -> eyre::Result<PeerRuntimeHandle> {
let PeerStartOptions { state_dir } = options;
let state_dir = resolve_state_dir(state_dir.as_deref());
let game_dir = game_dir.into();
log::info!(
"Starting peer system with game directory: {}",
game_dir.display()
);
let peer_id = identity::load_or_create_peer_id(&state_dir)?;
let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel();
Ok(startup::spawn_peer_runtime(
tx_control,
rx_control,
tx_notify_ui,
peer_game_db,
peer_id,
game_dir,
state_dir,
unpacker,
catalog,
))
}
/// Main peer execution loop that handles peer commands and manages the peer system.
#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
async fn run_peer(
mut rx_control: UnboundedReceiver<PeerCommand>,
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
peer_id: String,
game_dir: PathBuf,
state_dir: PathBuf,
unpacker: Arc<dyn Unpacker>,
shutdown: CancellationToken,
task_tracker: TaskTracker,
catalog: Arc<RwLock<HashSet<String>>>,
) -> eyre::Result<()> {
let ctx = Ctx::new(
peer_game_db,
peer_id,
game_dir,
state_dir,
unpacker,
shutdown,
task_tracker,
catalog,
);
if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await {
log::error!("Failed to load initial local game database: {err}");
}
startup::spawn_startup_services(&ctx, &tx_notify_ui);
if let Err(err) = handle_peer_commands(&ctx, &tx_notify_ui, &mut rx_control).await {
let error = err.to_string();
log::error!("Peer command loop failed: {error}");
events::send(
&tx_notify_ui,
PeerEvent::RuntimeFailed {
component: PeerRuntimeComponent::CommandLoop,
error,
},
);
ctx.shutdown.cancel();
}
startup::send_goodbye_notifications(&ctx).await;
Ok(())
}
async fn handle_peer_commands(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
rx_control: &mut UnboundedReceiver<PeerCommand>,
) -> eyre::Result<()> {
loop {
let cmd = tokio::select! {
() = ctx.shutdown.cancelled() => return Ok(()),
cmd = rx_control.recv() => cmd,
};
let Some(cmd) = cmd else {
if ctx.shutdown.is_cancelled() {
return Ok(());
}
eyre::bail!("peer command channel closed unexpectedly");
};
match cmd {
PeerCommand::ListGames => {
handle_list_games_command(ctx, tx_notify_ui).await;
}
PeerCommand::GetGame(id) => {
handle_get_game_command(ctx, tx_notify_ui, id, GameDetailSource::LocalOrPeers)
.await;
}
PeerCommand::FetchLatestFromPeers { id } => {
handle_get_game_command(ctx, tx_notify_ui, id, GameDetailSource::LatestPeersOnly)
.await;
}
PeerCommand::DownloadGameFiles {
id,
file_descriptions,
} => {
handle_download_game_files_command(ctx, tx_notify_ui, id, file_descriptions, true)
.await;
}
PeerCommand::DownloadGameFilesWithOptions {
id,
file_descriptions,
install_after_download,
} => {
handle_download_game_files_command(
ctx,
tx_notify_ui,
id,
file_descriptions,
install_after_download,
)
.await;
}
PeerCommand::InstallGame { id } => {
handle_install_game_command(ctx, tx_notify_ui, id).await;
}
PeerCommand::UninstallGame { id } => {
handle_uninstall_game_command(ctx, tx_notify_ui, id).await;
}
PeerCommand::RemoveDownloadedGame { id } => {
handle_remove_downloaded_game_command(ctx, tx_notify_ui, id).await;
}
PeerCommand::CancelDownload { id } => {
handle_cancel_download_command(ctx, tx_notify_ui, id).await;
}
PeerCommand::SetGameDir(game_dir) => {
handle_set_game_dir_command(ctx, tx_notify_ui, game_dir).await;
}
PeerCommand::GetPeerCount => {
handle_get_peer_count_command(ctx, tx_notify_ui).await;
}
PeerCommand::ConnectPeer(addr) => {
handle_connect_peer_command(ctx, tx_notify_ui, addr).await;
}
}
}
}