//! Peer runtime task startup and shutdown orchestration. use std::{ any::Any, future::Future, net::SocketAddr, panic::AssertUnwindSafe, path::PathBuf, sync::Arc, time::Duration, }; use futures::FutureExt as _; use tokio::sync::{ RwLock, mpsc::{UnboundedReceiver, UnboundedSender}, watch, }; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use crate::{ PeerCommand, PeerEvent, PeerRuntimeComponent, Unpacker, context::Ctx, events, network::send_goodbye, peer_db::PeerGameDB, run_peer, services::{ run_local_game_monitor, run_peer_discovery, run_ping_service, run_server_component, }, }; /// Handle to a running peer runtime. /// /// Holds the command sender plus the runtime's shutdown token and a `stopped` /// signal so callers can request a clean shutdown and wait for goodbye /// notifications to flush. pub struct PeerRuntimeHandle { tx: UnboundedSender, shutdown: CancellationToken, stopped: watch::Receiver, } impl PeerRuntimeHandle { /// Returns a clone of the command channel sender. #[must_use] pub fn sender(&self) -> UnboundedSender { self.tx.clone() } /// Signals the runtime to shut down. Idempotent. pub fn shutdown(&self) { self.shutdown.cancel(); } /// Resolves once the runtime task has fully stopped (services drained, /// goodbye notifications sent). Returns even if the runtime stopped /// without an explicit shutdown request. pub async fn wait_stopped(&mut self) { let _ = self.stopped.wait_for(|stopped| *stopped).await; } } #[derive(Clone, Copy, Debug)] pub(crate) enum SupervisionPolicy { Required, Restart { backoff: Duration }, BestEffort, } #[allow(clippy::too_many_arguments, clippy::implicit_hasher)] pub(crate) fn spawn_peer_runtime( tx_control: UnboundedSender, rx_control: UnboundedReceiver, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, peer_id: String, game_dir: PathBuf, unpacker: Arc, catalog: Arc>>, enable_mdns: bool, ) -> PeerRuntimeHandle { let shutdown = CancellationToken::new(); let task_tracker = TaskTracker::new(); let (tx_stopped, stopped) = watch::channel(false); let runtime_shutdown = shutdown.clone(); let runtime_tracker = task_tracker.clone(); tokio::spawn(async move { if let Err(err) = run_peer( rx_control, tx_notify_ui, peer_game_db, peer_id, game_dir, unpacker, runtime_shutdown.clone(), runtime_tracker.clone(), catalog, enable_mdns, ) .await { log::error!("Peer system failed: {err}"); } runtime_shutdown.cancel(); runtime_tracker.close(); runtime_tracker.wait().await; if tx_stopped.send(true).is_err() { log::debug!("Peer runtime stopped after handle was dropped"); } }); PeerRuntimeHandle { tx: tx_control, shutdown, stopped, } } pub(crate) fn spawn_startup_services(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { spawn_quic_server(ctx, tx_notify_ui); if ctx.enable_mdns { spawn_peer_discovery_service(ctx, tx_notify_ui); } spawn_peer_liveness_service(ctx, tx_notify_ui); spawn_local_library_monitor(ctx, tx_notify_ui); } pub(crate) async fn send_goodbye_notifications(ctx: &Ctx) { let peer_id = ctx.peer_id.as_ref().clone(); let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; futures::future::join_all( peer_addresses .into_iter() .map(|peer_addr| send_goodbye_notification(peer_addr, peer_id.clone())), ) .await; } fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { let server_addr = SocketAddr::from(([0, 0, 0, 0], 0)); let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone()); let tx_notify_ui = tx_notify_ui.clone(); let supervisor_tx = tx_notify_ui.clone(); spawn_supervised_service( &ctx.task_tracker, &ctx.shutdown, &supervisor_tx, PeerRuntimeComponent::QuicServer, SupervisionPolicy::Required, move || { let peer_ctx = peer_ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); async move { run_server_component(server_addr, peer_ctx, tx_notify_ui).await } }, ); } fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); let task_tracker = ctx.task_tracker.clone(); let shutdown = ctx.shutdown.clone(); let supervisor_tx = tx_notify_ui.clone(); spawn_supervised_service( &task_tracker, &shutdown, &supervisor_tx, PeerRuntimeComponent::Discovery, SupervisionPolicy::Restart { backoff: Duration::from_secs(5), }, move || { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); async move { run_peer_discovery(tx_notify_ui, ctx).await } }, ); } fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { let tx_notify_ui = tx_notify_ui.clone(); let peer_game_db = ctx.peer_game_db.clone(); let active_operations = ctx.active_operations.clone(); let active_downloads = ctx.active_downloads.clone(); let shutdown = ctx.shutdown.clone(); let task_tracker = ctx.task_tracker.clone(); let supervisor_tx = tx_notify_ui.clone(); spawn_supervised_service( &ctx.task_tracker, &ctx.shutdown, &supervisor_tx, PeerRuntimeComponent::Liveness, SupervisionPolicy::Restart { backoff: Duration::from_secs(5), }, move || { let tx_notify_ui = tx_notify_ui.clone(); let peer_game_db = peer_game_db.clone(); let active_operations = active_operations.clone(); let active_downloads = active_downloads.clone(); let shutdown = shutdown.clone(); let task_tracker = task_tracker.clone(); async move { run_ping_service( tx_notify_ui, peer_game_db, active_operations, active_downloads, shutdown, task_tracker, ) .await } }, ); } fn spawn_local_library_monitor(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); let task_tracker = ctx.task_tracker.clone(); let shutdown = ctx.shutdown.clone(); let supervisor_tx = tx_notify_ui.clone(); spawn_supervised_service( &task_tracker, &shutdown, &supervisor_tx, PeerRuntimeComponent::LocalMonitor, SupervisionPolicy::BestEffort, move || { let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); async move { run_local_game_monitor(tx_notify_ui, ctx).await } }, ); } async fn send_goodbye_notification(peer_addr: SocketAddr, peer_id: String) { match tokio::time::timeout(Duration::from_secs(1), send_goodbye(peer_addr, peer_id)).await { Ok(Ok(())) => {} Ok(Err(err)) => log::warn!("Failed to send Goodbye to {peer_addr}: {err}"), Err(_) => log::warn!("Timed out sending Goodbye to {peer_addr}"), } } fn spawn_supervised_service( task_tracker: &TaskTracker, shutdown: &CancellationToken, tx_notify_ui: &UnboundedSender, component: PeerRuntimeComponent, policy: SupervisionPolicy, mut make_service: F, ) where F: FnMut() -> Fut + Send + 'static, Fut: Future> + Send + 'static, { let task_tracker = task_tracker.clone(); let shutdown = shutdown.clone(); let tx_notify_ui = tx_notify_ui.clone(); task_tracker.spawn(async move { loop { if shutdown.is_cancelled() { break; } let result = match AssertUnwindSafe(make_service()).catch_unwind().await { Ok(result) => result, Err(payload) => Err(eyre::eyre!( "component panicked: {}", panic_payload_to_string(&payload) )), }; if shutdown.is_cancelled() { break; } match policy { SupervisionPolicy::Required => { let error = match result { Ok(()) => "component exited unexpectedly".to_string(), Err(err) => err.to_string(), }; report_required_service_failure(&tx_notify_ui, component, error, &shutdown); break; } SupervisionPolicy::Restart { backoff } => { match result { Ok(()) => log::warn!("{component:?} exited; restarting in {backoff:?}"), Err(err) => { log::error!("{component:?} failed: {err}; restarting in {backoff:?}"); } } tokio::select! { () = shutdown.cancelled() => break, () = tokio::time::sleep(backoff) => {} } } SupervisionPolicy::BestEffort => { match result { Ok(()) => log::warn!("{component:?} exited"), Err(err) => log::error!("{component:?} failed: {err}"), } break; } } } }); } fn report_required_service_failure( tx_notify_ui: &UnboundedSender, component: PeerRuntimeComponent, error: String, shutdown: &CancellationToken, ) { log::error!("{component:?} failed: {error}"); events::send(tx_notify_ui, PeerEvent::RuntimeFailed { component, error }); shutdown.cancel(); } fn panic_payload_to_string(payload: &(dyn Any + Send)) -> String { if let Some(message) = payload.downcast_ref::<&'static str>() { return (*message).to_string(); } if let Some(message) = payload.downcast_ref::() { return message.clone(); } "unknown panic payload".to_string() } #[cfg(test)] mod tests { use std::{ sync::{ Arc, atomic::{AtomicUsize, Ordering}, }, time::Duration, }; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use super::{SupervisionPolicy, spawn_supervised_service}; use crate::{PeerRuntimeComponent, startup::PeerRuntimeHandle}; #[tokio::test] async fn required_service_failure_cancels_runtime_and_emits_event() { let tracker = TaskTracker::new(); let shutdown = CancellationToken::new(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); spawn_supervised_service( &tracker, &shutdown, &tx, PeerRuntimeComponent::QuicServer, SupervisionPolicy::Required, || async { Err(eyre::eyre!("bind failed")) }, ); let event = tokio::time::timeout(Duration::from_secs(1), rx.recv()) .await .expect("runtime failure event should arrive") .expect("event channel should stay open"); assert!(shutdown.is_cancelled()); assert!(matches!( event, crate::PeerEvent::RuntimeFailed { component: PeerRuntimeComponent::QuicServer, .. } )); tracker.close(); tokio::time::timeout(Duration::from_secs(1), tracker.wait()) .await .expect("supervisor task should stop"); } #[tokio::test] async fn restart_service_restarts_until_shutdown() { let tracker = TaskTracker::new(); let shutdown = CancellationToken::new(); let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); let attempts = Arc::new(AtomicUsize::new(0)); spawn_supervised_service( &tracker, &shutdown, &tx, PeerRuntimeComponent::Discovery, SupervisionPolicy::Restart { backoff: Duration::from_millis(10), }, { let attempts = attempts.clone(); move || { let attempts = attempts.clone(); async move { attempts.fetch_add(1, Ordering::SeqCst); Err(eyre::eyre!("discovery worker stopped")) } } }, ); tokio::time::timeout(Duration::from_secs(1), async { loop { if attempts.load(Ordering::SeqCst) >= 2 { break; } tokio::task::yield_now().await; } }) .await .expect("restartable service should run more than once"); shutdown.cancel(); tracker.close(); tokio::time::timeout(Duration::from_secs(1), tracker.wait()) .await .expect("restart supervisor should stop after shutdown"); } #[tokio::test] async fn runtime_handle_can_shutdown_and_await_stopped() { let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); let shutdown = CancellationToken::new(); let (tx_stopped, stopped) = tokio::sync::watch::channel(false); let mut handle = PeerRuntimeHandle { tx, shutdown: shutdown.clone(), stopped, }; tokio::spawn(async move { shutdown.cancelled().await; let _ = tx_stopped.send(true); }); handle.shutdown(); tokio::time::timeout(Duration::from_secs(1), handle.wait_stopped()) .await .expect("runtime handle should observe stopped"); } }