From 8f35a197a9b1229ece6461d17edb7625f3ddb092 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sat, 2 May 2026 16:02:37 +0200 Subject: [PATCH] refactor(peer): extract peer startup task spawning The peer runtime used to spawn each long-running service inline inside run_peer. That made the startup path harder to scan because service names, clone setup, and task error handling were interleaved with the command loop. Move the task wrappers into a startup module and leave run_peer as the lifecycle overview: create shared context, start services, handle commands, then send shutdown goodbyes. The spawned services and their error handling are unchanged; only the ownership plumbing moved into named helpers. Test Plan: - cargo clippy - cargo clippy --benches - cargo clippy --tests - cargo +nightly fmt Refs: none --- crates/lanspread-peer/src/lib.rs | 92 ++++---------------- crates/lanspread-peer/src/services.rs | 3 +- crates/lanspread-peer/src/startup.rs | 116 ++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 76 deletions(-) create mode 100644 crates/lanspread-peer/src/startup.rs diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index c271a15..e4f5e56 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -27,6 +27,7 @@ mod peer; mod peer_db; mod remote_peer; mod services; +mod startup; // ============================================================================= // Public re-exports @@ -52,13 +53,6 @@ use crate::{ handle_list_games_command, handle_set_game_dir_command, }, - network::send_goodbye, - services::{ - run_local_game_monitor, - run_peer_discovery, - run_ping_service, - run_server_component, - }, }; // ============================================================================= @@ -146,13 +140,8 @@ pub fn start_peer( let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel(); - // Start the peer in a background task let tx_control_clone = tx_control.clone(); - tokio::spawn(async move { - if let Err(e) = run_peer(rx_control, tx_notify_ui, peer_game_db, peer_id).await { - log::error!("Peer system failed: {e}"); - } - }); + startup::spawn_peer_runtime(rx_control, tx_notify_ui, peer_game_db, peer_id); // Set the game directory tx_control.send(PeerCommand::SetGameDir(game_dir))?; @@ -167,52 +156,19 @@ async fn run_peer( peer_game_db: Arc>, peer_id: String, ) -> eyre::Result<()> { - // Create the shared context let ctx = Ctx::new(peer_game_db.clone(), peer_id); - let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone()); + startup::spawn_startup_services(&ctx, &tx_notify_ui)?; + handle_peer_commands(&ctx, &tx_notify_ui, &mut rx_control).await; + startup::spawn_goodbye_notifications(&ctx).await; - // Start server component - let server_addr = "0.0.0.0:0".parse::()?; - let tx_notify_ui_clone = tx_notify_ui.clone(); - let peer_ctx_clone = peer_ctx.clone(); + Ok(()) +} - tokio::spawn(async move { - if let Err(e) = run_server_component(server_addr, peer_ctx_clone, tx_notify_ui_clone).await - { - log::error!("Server component error: {e}"); - } - }); - - // Start peer discovery task - let tx_notify_ui_discovery = tx_notify_ui.clone(); - let ctx_discovery = ctx.clone(); - tokio::spawn(async move { - run_peer_discovery(tx_notify_ui_discovery, ctx_discovery).await; - }); - - // Start ping service task - let tx_notify_ui_ping = tx_notify_ui.clone(); - let peer_game_db_ping = ctx.peer_game_db.clone(); - let downloading_games_ping = ctx.downloading_games.clone(); - let active_downloads_ping = ctx.active_downloads.clone(); - tokio::spawn(async move { - run_ping_service( - tx_notify_ui_ping, - peer_game_db_ping, - downloading_games_ping, - active_downloads_ping, - ) - .await; - }); - - // Start local game directory monitoring task - let tx_notify_ui_monitor = tx_notify_ui.clone(); - let ctx_monitor = ctx.clone(); - tokio::spawn(async move { - run_local_game_monitor(tx_notify_ui_monitor, ctx_monitor).await; - }); - - // Handle client commands +async fn handle_peer_commands( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + rx_control: &mut UnboundedReceiver, +) { loop { let Some(cmd) = rx_control.recv().await else { break; @@ -220,37 +176,23 @@ async fn run_peer( match cmd { PeerCommand::ListGames => { - handle_list_games_command(&ctx, &tx_notify_ui).await; + handle_list_games_command(ctx, tx_notify_ui).await; } PeerCommand::GetGame(id) => { - handle_get_game_command(&ctx, &tx_notify_ui, id).await; + handle_get_game_command(ctx, tx_notify_ui, id).await; } PeerCommand::DownloadGameFiles { id, file_descriptions, } => { - handle_download_game_files_command(&ctx, &tx_notify_ui, id, file_descriptions) - .await; + handle_download_game_files_command(ctx, tx_notify_ui, id, file_descriptions).await; } PeerCommand::SetGameDir(game_dir) => { - handle_set_game_dir_command(&ctx, &tx_notify_ui, game_dir).await; + handle_set_game_dir_command(ctx, tx_notify_ui, game_dir).await; } PeerCommand::GetPeerCount => { - handle_get_peer_count_command(&ctx, &tx_notify_ui).await; + handle_get_peer_count_command(ctx, tx_notify_ui).await; } } } - - let peer_id = ctx.peer_id.as_ref().clone(); - let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; - for peer_addr in peer_addresses { - let peer_id = peer_id.clone(); - tokio::spawn(async move { - if let Err(e) = send_goodbye(peer_addr, peer_id).await { - log::warn!("Failed to send Goodbye to {peer_addr}: {e}"); - } - }); - } - - Ok(()) } diff --git a/crates/lanspread-peer/src/services.rs b/crates/lanspread-peer/src/services.rs index 676276e..1c43686 100644 --- a/crates/lanspread-peer/src/services.rs +++ b/crates/lanspread-peer/src/services.rs @@ -1,7 +1,8 @@ //! Long-running peer services. //! //! Each submodule owns one runtime concern. The public surface intentionally -//! stays small because `lib.rs` only needs to start these four background tasks. +//! stays small because peer startup only needs to start these four background +//! tasks. mod advertise; mod discovery; diff --git a/crates/lanspread-peer/src/startup.rs b/crates/lanspread-peer/src/startup.rs new file mode 100644 index 0000000..f4ce58c --- /dev/null +++ b/crates/lanspread-peer/src/startup.rs @@ -0,0 +1,116 @@ +//! Peer runtime task startup and shutdown orchestration. + +use std::{net::SocketAddr, sync::Arc}; + +use tokio::sync::{ + RwLock, + mpsc::{UnboundedReceiver, UnboundedSender}, +}; + +use crate::{ + PeerCommand, + PeerEvent, + context::Ctx, + network::send_goodbye, + peer_db::PeerGameDB, + run_peer, + services::{ + run_local_game_monitor, + run_peer_discovery, + run_ping_service, + run_server_component, + }, +}; + +const EPHEMERAL_SERVER_ADDR: &str = "0.0.0.0:0"; + +pub(crate) fn spawn_peer_runtime( + rx_control: UnboundedReceiver, + tx_notify_ui: UnboundedSender, + peer_game_db: Arc>, + peer_id: String, +) { + tokio::spawn(async move { + if let Err(err) = run_peer(rx_control, tx_notify_ui, peer_game_db, peer_id).await { + log::error!("Peer system failed: {err}"); + } + }); +} + +pub(crate) fn spawn_startup_services( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, +) -> eyre::Result<()> { + spawn_quic_server(ctx, tx_notify_ui)?; + spawn_peer_discovery_service(ctx, tx_notify_ui); + spawn_peer_liveness_service(ctx, tx_notify_ui); + spawn_local_library_monitor(ctx, tx_notify_ui); + + Ok(()) +} + +pub(crate) async fn spawn_goodbye_notifications(ctx: &Ctx) { + let peer_id = ctx.peer_id.as_ref().clone(); + let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; + + for peer_addr in peer_addresses { + spawn_goodbye_notification(peer_addr, peer_id.clone()); + } +} + +fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender) -> eyre::Result<()> { + let server_addr = EPHEMERAL_SERVER_ADDR.parse::()?; + let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone()); + let tx_notify_ui = tx_notify_ui.clone(); + + tokio::spawn(async move { + if let Err(err) = run_server_component(server_addr, peer_ctx, tx_notify_ui).await { + log::error!("Server component error: {err}"); + } + }); + + Ok(()) +} + +fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { + let ctx = ctx.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + + tokio::spawn(async move { + run_peer_discovery(tx_notify_ui, ctx).await; + }); +} + +fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { + let tx_notify_ui = tx_notify_ui.clone(); + let peer_game_db = ctx.peer_game_db.clone(); + let downloading_games = ctx.downloading_games.clone(); + let active_downloads = ctx.active_downloads.clone(); + + tokio::spawn(async move { + run_ping_service( + tx_notify_ui, + peer_game_db, + downloading_games, + active_downloads, + ) + .await; + }); +} + +fn spawn_local_library_monitor(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { + let ctx = ctx.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + + tokio::spawn(async move { + run_local_game_monitor(tx_notify_ui, ctx).await; + }); +} + +fn spawn_goodbye_notification(peer_addr: SocketAddr, peer_id: String) { + tokio::spawn(async move { + if let Err(err) = send_goodbye(peer_addr, peer_id).await { + log::warn!("Failed to send Goodbye to {peer_addr}: {err}"); + } + }); +}