refactor(peer): extract peer startup task spawning
The peer runtime used to spawn each long-running service inline inside run_peer. That made the startup path harder to scan because service names, clone setup, and task error handling were interleaved with the command loop. Move the task wrappers into a startup module and leave run_peer as the lifecycle overview: create shared context, start services, handle commands, then send shutdown goodbyes. The spawned services and their error handling are unchanged; only the ownership plumbing moved into named helpers. Test Plan: - cargo clippy - cargo clippy --benches - cargo clippy --tests - cargo +nightly fmt Refs: none
This commit is contained in:
@@ -27,6 +27,7 @@ mod peer;
|
||||
mod peer_db;
|
||||
mod remote_peer;
|
||||
mod services;
|
||||
mod startup;
|
||||
|
||||
// =============================================================================
|
||||
// Public re-exports
|
||||
@@ -52,13 +53,6 @@ use crate::{
|
||||
handle_list_games_command,
|
||||
handle_set_game_dir_command,
|
||||
},
|
||||
network::send_goodbye,
|
||||
services::{
|
||||
run_local_game_monitor,
|
||||
run_peer_discovery,
|
||||
run_ping_service,
|
||||
run_server_component,
|
||||
},
|
||||
};
|
||||
|
||||
// =============================================================================
|
||||
@@ -146,13 +140,8 @@ pub fn start_peer(
|
||||
|
||||
let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
// Start the peer in a background task
|
||||
let tx_control_clone = tx_control.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_peer(rx_control, tx_notify_ui, peer_game_db, peer_id).await {
|
||||
log::error!("Peer system failed: {e}");
|
||||
}
|
||||
});
|
||||
startup::spawn_peer_runtime(rx_control, tx_notify_ui, peer_game_db, peer_id);
|
||||
|
||||
// Set the game directory
|
||||
tx_control.send(PeerCommand::SetGameDir(game_dir))?;
|
||||
@@ -167,52 +156,19 @@ async fn run_peer(
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
peer_id: String,
|
||||
) -> eyre::Result<()> {
|
||||
// Create the shared context
|
||||
let ctx = Ctx::new(peer_game_db.clone(), peer_id);
|
||||
let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone());
|
||||
startup::spawn_startup_services(&ctx, &tx_notify_ui)?;
|
||||
handle_peer_commands(&ctx, &tx_notify_ui, &mut rx_control).await;
|
||||
startup::spawn_goodbye_notifications(&ctx).await;
|
||||
|
||||
// Start server component
|
||||
let server_addr = "0.0.0.0:0".parse::<SocketAddr>()?;
|
||||
let tx_notify_ui_clone = tx_notify_ui.clone();
|
||||
let peer_ctx_clone = peer_ctx.clone();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_server_component(server_addr, peer_ctx_clone, tx_notify_ui_clone).await
|
||||
{
|
||||
log::error!("Server component error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
// Start peer discovery task
|
||||
let tx_notify_ui_discovery = tx_notify_ui.clone();
|
||||
let ctx_discovery = ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
run_peer_discovery(tx_notify_ui_discovery, ctx_discovery).await;
|
||||
});
|
||||
|
||||
// Start ping service task
|
||||
let tx_notify_ui_ping = tx_notify_ui.clone();
|
||||
let peer_game_db_ping = ctx.peer_game_db.clone();
|
||||
let downloading_games_ping = ctx.downloading_games.clone();
|
||||
let active_downloads_ping = ctx.active_downloads.clone();
|
||||
tokio::spawn(async move {
|
||||
run_ping_service(
|
||||
tx_notify_ui_ping,
|
||||
peer_game_db_ping,
|
||||
downloading_games_ping,
|
||||
active_downloads_ping,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
// Start local game directory monitoring task
|
||||
let tx_notify_ui_monitor = tx_notify_ui.clone();
|
||||
let ctx_monitor = ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
run_local_game_monitor(tx_notify_ui_monitor, ctx_monitor).await;
|
||||
});
|
||||
|
||||
// Handle client commands
|
||||
async fn handle_peer_commands(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
rx_control: &mut UnboundedReceiver<PeerCommand>,
|
||||
) {
|
||||
loop {
|
||||
let Some(cmd) = rx_control.recv().await else {
|
||||
break;
|
||||
@@ -220,37 +176,23 @@ async fn run_peer(
|
||||
|
||||
match cmd {
|
||||
PeerCommand::ListGames => {
|
||||
handle_list_games_command(&ctx, &tx_notify_ui).await;
|
||||
handle_list_games_command(ctx, tx_notify_ui).await;
|
||||
}
|
||||
PeerCommand::GetGame(id) => {
|
||||
handle_get_game_command(&ctx, &tx_notify_ui, id).await;
|
||||
handle_get_game_command(ctx, tx_notify_ui, id).await;
|
||||
}
|
||||
PeerCommand::DownloadGameFiles {
|
||||
id,
|
||||
file_descriptions,
|
||||
} => {
|
||||
handle_download_game_files_command(&ctx, &tx_notify_ui, id, file_descriptions)
|
||||
.await;
|
||||
handle_download_game_files_command(ctx, tx_notify_ui, id, file_descriptions).await;
|
||||
}
|
||||
PeerCommand::SetGameDir(game_dir) => {
|
||||
handle_set_game_dir_command(&ctx, &tx_notify_ui, game_dir).await;
|
||||
handle_set_game_dir_command(ctx, tx_notify_ui, game_dir).await;
|
||||
}
|
||||
PeerCommand::GetPeerCount => {
|
||||
handle_get_peer_count_command(&ctx, &tx_notify_ui).await;
|
||||
handle_get_peer_count_command(ctx, tx_notify_ui).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let peer_id = ctx.peer_id.as_ref().clone();
|
||||
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
|
||||
for peer_addr in peer_addresses {
|
||||
let peer_id = peer_id.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = send_goodbye(peer_addr, peer_id).await {
|
||||
log::warn!("Failed to send Goodbye to {peer_addr}: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
//! Long-running peer services.
|
||||
//!
|
||||
//! Each submodule owns one runtime concern. The public surface intentionally
|
||||
//! stays small because `lib.rs` only needs to start these four background tasks.
|
||||
//! stays small because peer startup only needs to start these four background
|
||||
//! tasks.
|
||||
|
||||
mod advertise;
|
||||
mod discovery;
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
//! Peer runtime task startup and shutdown orchestration.
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use tokio::sync::{
|
||||
RwLock,
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
PeerCommand,
|
||||
PeerEvent,
|
||||
context::Ctx,
|
||||
network::send_goodbye,
|
||||
peer_db::PeerGameDB,
|
||||
run_peer,
|
||||
services::{
|
||||
run_local_game_monitor,
|
||||
run_peer_discovery,
|
||||
run_ping_service,
|
||||
run_server_component,
|
||||
},
|
||||
};
|
||||
|
||||
const EPHEMERAL_SERVER_ADDR: &str = "0.0.0.0:0";
|
||||
|
||||
pub(crate) fn spawn_peer_runtime(
|
||||
rx_control: UnboundedReceiver<PeerCommand>,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
peer_id: String,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = run_peer(rx_control, tx_notify_ui, peer_game_db, peer_id).await {
|
||||
log::error!("Peer system failed: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_startup_services(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
spawn_quic_server(ctx, tx_notify_ui)?;
|
||||
spawn_peer_discovery_service(ctx, tx_notify_ui);
|
||||
spawn_peer_liveness_service(ctx, tx_notify_ui);
|
||||
spawn_local_library_monitor(ctx, tx_notify_ui);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_goodbye_notifications(ctx: &Ctx) {
|
||||
let peer_id = ctx.peer_id.as_ref().clone();
|
||||
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
|
||||
|
||||
for peer_addr in peer_addresses {
|
||||
spawn_goodbye_notification(peer_addr, peer_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) -> eyre::Result<()> {
|
||||
let server_addr = EPHEMERAL_SERVER_ADDR.parse::<SocketAddr>()?;
|
||||
let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone());
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = run_server_component(server_addr, peer_ctx, tx_notify_ui).await {
|
||||
log::error!("Server component error: {err}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
||||
let ctx = ctx.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
run_peer_discovery(tx_notify_ui, ctx).await;
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
let peer_game_db = ctx.peer_game_db.clone();
|
||||
let downloading_games = ctx.downloading_games.clone();
|
||||
let active_downloads = ctx.active_downloads.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
run_ping_service(
|
||||
tx_notify_ui,
|
||||
peer_game_db,
|
||||
downloading_games,
|
||||
active_downloads,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_local_library_monitor(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
||||
let ctx = ctx.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
run_local_game_monitor(tx_notify_ui, ctx).await;
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_goodbye_notification(peer_addr: SocketAddr, peer_id: String) {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = send_goodbye(peer_addr, peer_id).await {
|
||||
log::warn!("Failed to send Goodbye to {peer_addr}: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user