//! Peer-to-peer game distribution system. //! //! This crate provides the core P2P networking engine for `LanSpread`, handling: //! - Peer discovery via mDNS //! - QUIC-based communication //! - Game file synchronization with chunked transfers //! - Consensus validation for file integrity #![allow(clippy::missing_errors_doc)] // ============================================================================= // Module declarations // ============================================================================= mod config; mod context; mod download; mod error; mod events; mod handlers; mod identity; mod install; mod library; mod local_games; mod migration; mod network; mod path_validation; mod peer; mod peer_db; mod remote_peer; mod services; mod startup; mod state_paths; #[cfg(test)] mod test_support; // ============================================================================= // Public re-exports // ============================================================================= use std::{collections::HashSet, net::SocketAddr, path::PathBuf, sync::Arc}; pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT}; pub use error::PeerError; pub use install::{UnpackFuture, Unpacker}; use lanspread_db::db::{Game, GameFileDescription}; pub use migration::{MigrationReport, migrate_legacy_state}; pub use peer_db::{ MajorityValidationResult, PeerGameDB, PeerId, PeerInfo, PeerSnapshot, PeerUpsert, }; use tokio::sync::{ RwLock, mpsc::{UnboundedReceiver, UnboundedSender}, }; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use crate::{ context::Ctx, handlers::{ GameDetailSource, handle_cancel_download_command, handle_connect_peer_command, handle_download_game_files_command, handle_get_game_command, handle_get_peer_count_command, handle_install_game_command, handle_list_games_command, handle_remove_downloaded_game_command, handle_set_game_dir_command, handle_uninstall_game_command, load_local_library, }, state_paths::resolve_state_dir, }; pub use crate::{startup::PeerRuntimeHandle, state_paths::setup_done_path}; // ============================================================================= // Public API types // ============================================================================= /// Events sent from the peer system to the UI. #[derive(Debug, strum::IntoStaticStr)] pub enum PeerEvent { /// The local QUIC server is listening and ready to accept peer connections. LocalPeerReady { peer_id: String, addr: SocketAddr }, /// List of available games from peers. ListGames(Vec), /// File descriptions for a specific game. GotGameFiles { id: String, file_descriptions: Vec, }, /// Download has started for a game. DownloadGameFilesBegin { id: String }, /// A file chunk has been downloaded from a peer. DownloadGameFileChunkFinished { id: String, peer_addr: SocketAddr, relative_path: String, offset: u64, length: u64, }, /// Download progress sampled while game files are being received. DownloadGameFilesProgress(DownloadProgress), /// Download has completed successfully. DownloadGameFilesFinished { id: String }, /// Download has failed. DownloadGameFilesFailed { id: String }, /// All peers with the game have disconnected during download. DownloadGameFilesAllPeersGone { id: String }, /// Install or update transaction has started for a game. InstallGameBegin { id: String, operation: InstallOperation, }, /// Install or update transaction has completed successfully. InstallGameFinished { id: String }, /// Install or update transaction has failed after rollback. InstallGameFailed { id: String }, /// Uninstall transaction has started for a game. UninstallGameBegin { id: String }, /// Uninstall transaction has completed successfully. UninstallGameFinished { id: String }, /// Uninstall transaction has failed after rollback. UninstallGameFailed { id: String }, /// Downloaded archive removal has started for an uninstalled game. RemoveDownloadedGameBegin { id: String }, /// Downloaded archive removal has completed successfully. RemoveDownloadedGameFinished { id: String }, /// Downloaded archive removal has failed before deleting the game root. RemoveDownloadedGameFailed { id: String }, /// No peers have the requested game. NoPeersHaveGame { id: String }, /// A peer has connected. PeerConnected(SocketAddr), /// A peer has disconnected. PeerDisconnected(SocketAddr), /// A new peer was discovered via mDNS. PeerDiscovered(SocketAddr), /// A peer was lost (timed out or disconnected). PeerLost(SocketAddr), /// The total peer count has changed. PeerCountUpdated(usize), /// The local library contents changed after a scan. LocalLibraryChanged { games: Vec }, /// The set of in-progress local operations changed. ActiveOperationsChanged { active_operations: Vec, }, /// A required peer runtime component failed. RuntimeFailed { component: PeerRuntimeComponent, error: String, }, } /// Sampled byte progress for one active game download. #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] pub struct DownloadProgress { pub id: String, pub downloaded_bytes: u64, pub total_bytes: u64, pub bytes_per_second: u64, /// Unique peers currently streaming at least one chunk for this download. pub active_peer_count: usize, } /// Long-running peer runtime components reported in failure events. #[derive(Clone, Copy, Debug, strum::IntoStaticStr)] pub enum PeerRuntimeComponent { /// Command/control message loop. CommandLoop, /// Inbound QUIC server and its mDNS advertisement. QuicServer, /// mDNS peer discovery. Discovery, /// Peer liveness monitoring. Liveness, /// Local game directory monitor. LocalMonitor, } /// Install-side operation represented in lifecycle events. #[derive(Clone, Copy, Debug, PartialEq, Eq, strum::IntoStaticStr)] pub enum InstallOperation { /// Fresh install into a missing `local/` directory. Installing, /// Update that replaces an existing `local/` directory. Updating, } /// In-progress operation snapshot sent when operation state changes. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ActiveOperation { pub id: String, pub operation: ActiveOperationKind, } /// Operation kinds visible to UI reconciliation. #[derive(Clone, Copy, Debug, PartialEq, Eq, strum::IntoStaticStr)] pub enum ActiveOperationKind { /// Downloading or replacing archive files. Downloading, /// Extracting into a previously uninstalled game root. Installing, /// Replacing an existing `local/` install. Updating, /// Removing an existing `local/` install. Uninstalling, /// Removing downloaded archive files for an uninstalled game. RemovingDownload, } /// Commands sent to the peer system from the UI. #[derive(Clone, Debug)] pub enum PeerCommand { /// Request a list of all available games. ListGames, /// Request file details for a specific game, serving local files when available. GetGame(String), /// Request the latest peer-advertised file details for an update. FetchLatestFromPeers { id: String }, /// Download game files. DownloadGameFiles { id: String, file_descriptions: Vec, account_name: Option, language: Option, }, /// Download game files with an explicit install policy. DownloadGameFilesWithOptions { id: String, file_descriptions: Vec, install_after_download: bool, account_name: Option, language: Option, }, /// Install already-downloaded archives into `local/`. InstallGame { id: String, account_name: Option, language: Option, }, /// Remove only the `local/` install for a game. UninstallGame { id: String }, /// Remove downloaded archive files for an uninstalled game. RemoveDownloadedGame { id: String }, /// Cancel an active peer download without emitting a user-facing failure. CancelDownload { id: String }, /// Set the local game directory. SetGameDir(PathBuf), /// Request the current peer count. GetPeerCount, /// Connect directly to a peer address without waiting for mDNS discovery. ConnectPeer(SocketAddr), } /// Optional startup settings for non-GUI callers and tests. #[derive(Clone, Debug, Default)] pub struct PeerStartOptions { /// Directory used for peer identity and other state. pub state_dir: Option, } // ============================================================================= // Public API functions // ============================================================================= /// Initialize and start the peer system. /// /// This is the main entry point for the peer system. It starts all background /// services (server, discovery, ping, local monitor) and returns a handle that /// owns the command sender plus a shutdown signal callers can use for clean /// teardown. /// /// # Arguments /// /// * `game_dir` - Path to the local game directory /// * `tx_notify_ui` - Channel for sending events to the UI /// * `peer_game_db` - Shared peer game database #[allow(clippy::implicit_hasher)] pub fn start_peer( game_dir: impl Into, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, unpacker: Arc, catalog: Arc>>, ) -> eyre::Result { start_peer_with_options( game_dir, tx_notify_ui, peer_game_db, unpacker, catalog, PeerStartOptions::default(), ) } /// Initialize and start the peer system with explicit startup settings. #[allow(clippy::implicit_hasher)] pub fn start_peer_with_options( game_dir: impl Into, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, unpacker: Arc, catalog: Arc>>, options: PeerStartOptions, ) -> eyre::Result { let PeerStartOptions { state_dir } = options; let state_dir = resolve_state_dir(state_dir.as_deref()); let game_dir = game_dir.into(); log::info!( "Starting peer system with game directory: {}", game_dir.display() ); let peer_id = identity::load_or_create_peer_id(&state_dir)?; let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel(); Ok(startup::spawn_peer_runtime( tx_control, rx_control, tx_notify_ui, peer_game_db, peer_id, game_dir, state_dir, unpacker, catalog, )) } /// Main peer execution loop that handles peer commands and manages the peer system. #[allow(clippy::too_many_arguments, clippy::implicit_hasher)] async fn run_peer( mut rx_control: UnboundedReceiver, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, peer_id: String, game_dir: PathBuf, state_dir: PathBuf, unpacker: Arc, shutdown: CancellationToken, task_tracker: TaskTracker, catalog: Arc>>, ) -> eyre::Result<()> { let ctx = Ctx::new( peer_game_db, peer_id, game_dir, state_dir, unpacker, shutdown, task_tracker, catalog, ); if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await { log::error!("Failed to load initial local game database: {err}"); } startup::spawn_startup_services(&ctx, &tx_notify_ui); if let Err(err) = handle_peer_commands(&ctx, &tx_notify_ui, &mut rx_control).await { let error = err.to_string(); log::error!("Peer command loop failed: {error}"); events::send( &tx_notify_ui, PeerEvent::RuntimeFailed { component: PeerRuntimeComponent::CommandLoop, error, }, ); ctx.shutdown.cancel(); } startup::send_goodbye_notifications(&ctx).await; Ok(()) } async fn handle_peer_commands( ctx: &Ctx, tx_notify_ui: &UnboundedSender, rx_control: &mut UnboundedReceiver, ) -> eyre::Result<()> { loop { let cmd = tokio::select! { () = ctx.shutdown.cancelled() => return Ok(()), cmd = rx_control.recv() => cmd, }; let Some(cmd) = cmd else { if ctx.shutdown.is_cancelled() { return Ok(()); } eyre::bail!("peer command channel closed unexpectedly"); }; match cmd { PeerCommand::ListGames => { handle_list_games_command(ctx, tx_notify_ui).await; } PeerCommand::GetGame(id) => { handle_get_game_command(ctx, tx_notify_ui, id, GameDetailSource::LocalOrPeers) .await; } PeerCommand::FetchLatestFromPeers { id } => { handle_get_game_command(ctx, tx_notify_ui, id, GameDetailSource::LatestPeersOnly) .await; } PeerCommand::DownloadGameFiles { id, file_descriptions, account_name, language, } => { handle_download_game_files_command( ctx, tx_notify_ui, id, file_descriptions, true, account_name, language, ) .await; } PeerCommand::DownloadGameFilesWithOptions { id, file_descriptions, install_after_download, account_name, language, } => { handle_download_game_files_command( ctx, tx_notify_ui, id, file_descriptions, install_after_download, account_name, language, ) .await; } PeerCommand::InstallGame { id, account_name, language, } => { handle_install_game_command(ctx, tx_notify_ui, id, account_name, language).await; } PeerCommand::UninstallGame { id } => { handle_uninstall_game_command(ctx, tx_notify_ui, id).await; } PeerCommand::RemoveDownloadedGame { id } => { handle_remove_downloaded_game_command(ctx, tx_notify_ui, id).await; } PeerCommand::CancelDownload { id } => { handle_cancel_download_command(ctx, tx_notify_ui, id).await; } PeerCommand::SetGameDir(game_dir) => { handle_set_game_dir_command(ctx, tx_notify_ui, game_dir).await; } PeerCommand::GetPeerCount => { handle_get_peer_count_command(ctx, tx_notify_ui).await; } PeerCommand::ConnectPeer(addr) => { handle_connect_peer_command(ctx, tx_notify_ui, addr).await; } } } }