Files
lanspread/crates/lanspread-peer/src/startup.rs
T
ddidderr 754afd5621 refactor(peer): drop --no-mdns toggle, mDNS is always on
The peer runtime previously accepted an `enable_mdns: bool` flag, plumbed
through `PeerStartOptions`, `spawn_peer_runtime`, `run_peer`, `Ctx`, and
`PeerCtx`. The lanspread-peer-cli harness exposed the toggle as
`--no-mdns` so test scenarios could fall back to explicit `connect`
commands when mDNS could not be relied on, in particular when multiple
peers ran inside `--network host` containers and could not advertise
independently.

That host-networking workaround no longer exists: the previous commit
moves harness containers onto a macvlan network, where each peer is a
real LAN device and mDNS just works between them. There is no scenario
left in the codebase where disabling mDNS is desirable. Per the project's
protocol policy in CLAUDE.md ("there is only one wire version, no
compatibility shims, no fallback paths"), an opt-out path with no current
caller is exactly the kind of dead code we should not carry.

Remove the flag and every plumbing point that exists only to support it:

- `PeerStartOptions::enable_mdns` and the custom `Default` impl that set
  it to `true`; the struct now derives `Default` and just carries
  `state_dir`.
- The `enable_mdns` parameter on `start_peer_with_options`,
  `spawn_peer_runtime`, `run_peer`, and `Ctx::new`.
- The `enable_mdns` fields on `Ctx` and `PeerCtx` and the propagation
  through `to_peer_ctx`.
- The `if ctx.enable_mdns` guard in `spawn_startup_services`;
  `spawn_peer_discovery_service` is now always spawned.
- The `if ctx.enable_mdns { ... } else { ... }` branch in
  `run_server_component`: the mDNS advertiser and event monitor are now
  unconditionally started, and the no-mDNS-fallback log line that read
  "mDNS disabled; direct peer address is ..." is gone. The
  `direct_connect_addr` helper is kept because the mDNS-on branch still
  uses it as a fallback when `local_peer_addr` has not yet been
  populated.
- The internal test helpers in `handlers.rs`, `services/local_monitor.rs`,
  and `services/stream.rs` that passed `true` as the trailing
  `enable_mdns` arg to `Ctx::new`.
- In `lanspread-peer-cli`: the `--no-mdns` arg parsing, the
  `Args::enable_mdns` field, the `mdns` key on the `cli-started` event
  payload, and the `--no-mdns` mention in the help text and the crate
  README.

The `Args::name` field is wired to the harness identity but is otherwise
untouched. The macvlan network created by `just peer-cli-net` is the
runtime prerequisite for this change to be observable across containers;
on a single workstation, two harness binaries on `127.0.0.1` discover
each other through mDNS on the loopback interface as before.

Test Plan:
- `just fmt`
- `just clippy`
- `just test`
- `just peer-cli-build`
- Two peers on macvlan: `just peer-cli-run alpha` and
  `just peer-cli-run beta`; check that each emits `peer-discovered` and
  `peer-connected` events without an explicit `connect` JSONL command.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:51:54 +02:00

462 lines
14 KiB
Rust

//! Peer runtime task startup and shutdown orchestration.
use std::{
any::Any,
future::Future,
net::SocketAddr,
panic::AssertUnwindSafe,
path::PathBuf,
sync::Arc,
time::Duration,
};
use futures::FutureExt as _;
use tokio::sync::{
RwLock,
mpsc::{UnboundedReceiver, UnboundedSender},
watch,
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use crate::{
PeerCommand,
PeerEvent,
PeerRuntimeComponent,
Unpacker,
context::Ctx,
events,
network::send_goodbye,
peer_db::PeerGameDB,
run_peer,
services::{
run_local_game_monitor,
run_peer_discovery,
run_ping_service,
run_server_component,
},
};
/// Handle to a running peer runtime.
///
/// Holds the command sender plus the runtime's shutdown token and a `stopped`
/// signal so callers can request a clean shutdown and wait for goodbye
/// notifications to flush.
pub struct PeerRuntimeHandle {
tx: UnboundedSender<PeerCommand>,
shutdown: CancellationToken,
stopped: watch::Receiver<bool>,
}
impl PeerRuntimeHandle {
/// Returns a clone of the command channel sender.
#[must_use]
pub fn sender(&self) -> UnboundedSender<PeerCommand> {
self.tx.clone()
}
/// Signals the runtime to shut down. Idempotent.
pub fn shutdown(&self) {
self.shutdown.cancel();
}
/// Resolves once the runtime task has fully stopped (services drained,
/// goodbye notifications sent). Returns even if the runtime stopped
/// without an explicit shutdown request.
pub async fn wait_stopped(&mut self) {
let _ = self.stopped.wait_for(|stopped| *stopped).await;
}
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum SupervisionPolicy {
Required,
Restart { backoff: Duration },
BestEffort,
}
#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
pub(crate) fn spawn_peer_runtime(
tx_control: UnboundedSender<PeerCommand>,
rx_control: UnboundedReceiver<PeerCommand>,
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
peer_id: String,
game_dir: PathBuf,
unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<std::collections::HashSet<String>>>,
) -> PeerRuntimeHandle {
let shutdown = CancellationToken::new();
let task_tracker = TaskTracker::new();
let (tx_stopped, stopped) = watch::channel(false);
let runtime_shutdown = shutdown.clone();
let runtime_tracker = task_tracker.clone();
tokio::spawn(async move {
if let Err(err) = run_peer(
rx_control,
tx_notify_ui,
peer_game_db,
peer_id,
game_dir,
unpacker,
runtime_shutdown.clone(),
runtime_tracker.clone(),
catalog,
)
.await
{
log::error!("Peer system failed: {err}");
}
runtime_shutdown.cancel();
runtime_tracker.close();
runtime_tracker.wait().await;
if tx_stopped.send(true).is_err() {
log::debug!("Peer runtime stopped after handle was dropped");
}
});
PeerRuntimeHandle {
tx: tx_control,
shutdown,
stopped,
}
}
pub(crate) fn spawn_startup_services(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
spawn_quic_server(ctx, tx_notify_ui);
spawn_peer_discovery_service(ctx, tx_notify_ui);
spawn_peer_liveness_service(ctx, tx_notify_ui);
spawn_local_library_monitor(ctx, tx_notify_ui);
}
pub(crate) async fn send_goodbye_notifications(ctx: &Ctx) {
let peer_id = ctx.peer_id.as_ref().clone();
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
futures::future::join_all(
peer_addresses
.into_iter()
.map(|peer_addr| send_goodbye_notification(peer_addr, peer_id.clone())),
)
.await;
}
fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
let server_addr = SocketAddr::from(([0, 0, 0, 0], 0));
let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone());
let tx_notify_ui = tx_notify_ui.clone();
let supervisor_tx = tx_notify_ui.clone();
spawn_supervised_service(
&ctx.task_tracker,
&ctx.shutdown,
&supervisor_tx,
PeerRuntimeComponent::QuicServer,
SupervisionPolicy::Required,
move || {
let peer_ctx = peer_ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
async move { run_server_component(server_addr, peer_ctx, tx_notify_ui).await }
},
);
}
fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
let ctx = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
let task_tracker = ctx.task_tracker.clone();
let shutdown = ctx.shutdown.clone();
let supervisor_tx = tx_notify_ui.clone();
spawn_supervised_service(
&task_tracker,
&shutdown,
&supervisor_tx,
PeerRuntimeComponent::Discovery,
SupervisionPolicy::Restart {
backoff: Duration::from_secs(5),
},
move || {
let ctx = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
async move { run_peer_discovery(tx_notify_ui, ctx).await }
},
);
}
fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = ctx.peer_game_db.clone();
let active_operations = ctx.active_operations.clone();
let active_downloads = ctx.active_downloads.clone();
let shutdown = ctx.shutdown.clone();
let task_tracker = ctx.task_tracker.clone();
let supervisor_tx = tx_notify_ui.clone();
spawn_supervised_service(
&ctx.task_tracker,
&ctx.shutdown,
&supervisor_tx,
PeerRuntimeComponent::Liveness,
SupervisionPolicy::Restart {
backoff: Duration::from_secs(5),
},
move || {
let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = peer_game_db.clone();
let active_operations = active_operations.clone();
let active_downloads = active_downloads.clone();
let shutdown = shutdown.clone();
let task_tracker = task_tracker.clone();
async move {
run_ping_service(
tx_notify_ui,
peer_game_db,
active_operations,
active_downloads,
shutdown,
task_tracker,
)
.await
}
},
);
}
fn spawn_local_library_monitor(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
let ctx = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
let task_tracker = ctx.task_tracker.clone();
let shutdown = ctx.shutdown.clone();
let supervisor_tx = tx_notify_ui.clone();
spawn_supervised_service(
&task_tracker,
&shutdown,
&supervisor_tx,
PeerRuntimeComponent::LocalMonitor,
SupervisionPolicy::BestEffort,
move || {
let ctx = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
async move { run_local_game_monitor(tx_notify_ui, ctx).await }
},
);
}
async fn send_goodbye_notification(peer_addr: SocketAddr, peer_id: String) {
match tokio::time::timeout(Duration::from_secs(1), send_goodbye(peer_addr, peer_id)).await {
Ok(Ok(())) => {}
Ok(Err(err)) => log::warn!("Failed to send Goodbye to {peer_addr}: {err}"),
Err(_) => log::warn!("Timed out sending Goodbye to {peer_addr}"),
}
}
fn spawn_supervised_service<F, Fut>(
task_tracker: &TaskTracker,
shutdown: &CancellationToken,
tx_notify_ui: &UnboundedSender<PeerEvent>,
component: PeerRuntimeComponent,
policy: SupervisionPolicy,
mut make_service: F,
) where
F: FnMut() -> Fut + Send + 'static,
Fut: Future<Output = eyre::Result<()>> + Send + 'static,
{
let task_tracker = task_tracker.clone();
let shutdown = shutdown.clone();
let tx_notify_ui = tx_notify_ui.clone();
task_tracker.spawn(async move {
loop {
if shutdown.is_cancelled() {
break;
}
let result = match AssertUnwindSafe(make_service()).catch_unwind().await {
Ok(result) => result,
Err(payload) => Err(eyre::eyre!(
"component panicked: {}",
panic_payload_to_string(&payload)
)),
};
if shutdown.is_cancelled() {
break;
}
match policy {
SupervisionPolicy::Required => {
let error = match result {
Ok(()) => "component exited unexpectedly".to_string(),
Err(err) => err.to_string(),
};
report_required_service_failure(&tx_notify_ui, component, error, &shutdown);
break;
}
SupervisionPolicy::Restart { backoff } => {
match result {
Ok(()) => log::warn!("{component:?} exited; restarting in {backoff:?}"),
Err(err) => {
log::error!("{component:?} failed: {err}; restarting in {backoff:?}");
}
}
tokio::select! {
() = shutdown.cancelled() => break,
() = tokio::time::sleep(backoff) => {}
}
}
SupervisionPolicy::BestEffort => {
match result {
Ok(()) => log::warn!("{component:?} exited"),
Err(err) => log::error!("{component:?} failed: {err}"),
}
break;
}
}
}
});
}
fn report_required_service_failure(
tx_notify_ui: &UnboundedSender<PeerEvent>,
component: PeerRuntimeComponent,
error: String,
shutdown: &CancellationToken,
) {
log::error!("{component:?} failed: {error}");
events::send(tx_notify_ui, PeerEvent::RuntimeFailed { component, error });
shutdown.cancel();
}
fn panic_payload_to_string(payload: &(dyn Any + Send)) -> String {
if let Some(message) = payload.downcast_ref::<&'static str>() {
return (*message).to_string();
}
if let Some(message) = payload.downcast_ref::<String>() {
return message.clone();
}
"unknown panic payload".to_string()
}
#[cfg(test)]
mod tests {
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use super::{SupervisionPolicy, spawn_supervised_service};
use crate::{PeerRuntimeComponent, startup::PeerRuntimeHandle};
#[tokio::test]
async fn required_service_failure_cancels_runtime_and_emits_event() {
let tracker = TaskTracker::new();
let shutdown = CancellationToken::new();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
spawn_supervised_service(
&tracker,
&shutdown,
&tx,
PeerRuntimeComponent::QuicServer,
SupervisionPolicy::Required,
|| async { Err(eyre::eyre!("bind failed")) },
);
let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.expect("runtime failure event should arrive")
.expect("event channel should stay open");
assert!(shutdown.is_cancelled());
assert!(matches!(
event,
crate::PeerEvent::RuntimeFailed {
component: PeerRuntimeComponent::QuicServer,
..
}
));
tracker.close();
tokio::time::timeout(Duration::from_secs(1), tracker.wait())
.await
.expect("supervisor task should stop");
}
#[tokio::test]
async fn restart_service_restarts_until_shutdown() {
let tracker = TaskTracker::new();
let shutdown = CancellationToken::new();
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let attempts = Arc::new(AtomicUsize::new(0));
spawn_supervised_service(
&tracker,
&shutdown,
&tx,
PeerRuntimeComponent::Discovery,
SupervisionPolicy::Restart {
backoff: Duration::from_millis(10),
},
{
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
async move {
attempts.fetch_add(1, Ordering::SeqCst);
Err(eyre::eyre!("discovery worker stopped"))
}
}
},
);
tokio::time::timeout(Duration::from_secs(1), async {
loop {
if attempts.load(Ordering::SeqCst) >= 2 {
break;
}
tokio::task::yield_now().await;
}
})
.await
.expect("restartable service should run more than once");
shutdown.cancel();
tracker.close();
tokio::time::timeout(Duration::from_secs(1), tracker.wait())
.await
.expect("restart supervisor should stop after shutdown");
}
#[tokio::test]
async fn runtime_handle_can_shutdown_and_await_stopped() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let shutdown = CancellationToken::new();
let (tx_stopped, stopped) = tokio::sync::watch::channel(false);
let mut handle = PeerRuntimeHandle {
tx,
shutdown: shutdown.clone(),
stopped,
};
tokio::spawn(async move {
shutdown.cancelled().await;
let _ = tx_stopped.send(true);
});
handle.shutdown();
tokio::time::timeout(Duration::from_secs(1), handle.wait_stopped())
.await
.expect("runtime handle should observe stopped");
}
}