9835e77e8d
Move launcher-owned metadata from game roots into the configured peer state area. Peer identity, the local library index, install intent logs, and setup markers now live under app/CLI state instead of being written beside games. The Tauri shell passes its app data directory into the peer, and the peer CLI runs the same path through its explicit --state-dir. Add a dedicated pre-start migration phase for legacy files. It migrates the old global library index, per-game install intents, and the old first-start marker into app state, then deletes legacy files only after the replacement write succeeds. Normal scan, install, recovery, and transfer paths no longer read legacy state files. Rename the old first-start meaning to setup_done and only set it after launching game_setup.cmd. Start/setup scripts keep the shared argument shape, while server_start.cmd now uses cmd /k and a visible window so server logs stay open for inspection. While validating the Docker scenario matrix, make download terminal events come from the handler after local state refresh and operation cleanup. This makes download-finished/download-failed safe points for immediate follow-up CLI commands. Also update the multi-peer chunking scenario to use a sparse archive large enough to actually span multiple production chunks. Test Plan: - just fmt - just test - just frontend-test - just build - just clippy - git diff --check - python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py Refs: local app-state migration discussion
464 lines
14 KiB
Rust
464 lines
14 KiB
Rust
//! Peer runtime task startup and shutdown orchestration.
|
|
|
|
use std::{
|
|
any::Any,
|
|
future::Future,
|
|
net::SocketAddr,
|
|
panic::AssertUnwindSafe,
|
|
path::PathBuf,
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
|
|
use futures::FutureExt as _;
|
|
use tokio::sync::{
|
|
RwLock,
|
|
mpsc::{UnboundedReceiver, UnboundedSender},
|
|
watch,
|
|
};
|
|
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
|
|
|
use crate::{
|
|
PeerCommand,
|
|
PeerEvent,
|
|
PeerRuntimeComponent,
|
|
Unpacker,
|
|
context::Ctx,
|
|
events,
|
|
network::send_goodbye,
|
|
peer_db::PeerGameDB,
|
|
run_peer,
|
|
services::{
|
|
run_local_game_monitor,
|
|
run_peer_discovery,
|
|
run_ping_service,
|
|
run_server_component,
|
|
},
|
|
};
|
|
|
|
/// Handle to a running peer runtime.
|
|
///
|
|
/// Holds the command sender plus the runtime's shutdown token and a `stopped`
|
|
/// signal so callers can request a clean shutdown and wait for goodbye
|
|
/// notifications to flush.
|
|
pub struct PeerRuntimeHandle {
|
|
tx: UnboundedSender<PeerCommand>,
|
|
shutdown: CancellationToken,
|
|
stopped: watch::Receiver<bool>,
|
|
}
|
|
|
|
impl PeerRuntimeHandle {
|
|
/// Returns a clone of the command channel sender.
|
|
#[must_use]
|
|
pub fn sender(&self) -> UnboundedSender<PeerCommand> {
|
|
self.tx.clone()
|
|
}
|
|
|
|
/// Signals the runtime to shut down. Idempotent.
|
|
pub fn shutdown(&self) {
|
|
self.shutdown.cancel();
|
|
}
|
|
|
|
/// Resolves once the runtime task has fully stopped (services drained,
|
|
/// goodbye notifications sent). Returns even if the runtime stopped
|
|
/// without an explicit shutdown request.
|
|
pub async fn wait_stopped(&mut self) {
|
|
let _ = self.stopped.wait_for(|stopped| *stopped).await;
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
pub(crate) enum SupervisionPolicy {
|
|
Required,
|
|
Restart { backoff: Duration },
|
|
BestEffort,
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
|
|
pub(crate) fn spawn_peer_runtime(
|
|
tx_control: UnboundedSender<PeerCommand>,
|
|
rx_control: UnboundedReceiver<PeerCommand>,
|
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
|
peer_id: String,
|
|
game_dir: PathBuf,
|
|
state_dir: PathBuf,
|
|
unpacker: Arc<dyn Unpacker>,
|
|
catalog: Arc<RwLock<std::collections::HashSet<String>>>,
|
|
) -> PeerRuntimeHandle {
|
|
let shutdown = CancellationToken::new();
|
|
let task_tracker = TaskTracker::new();
|
|
let (tx_stopped, stopped) = watch::channel(false);
|
|
|
|
let runtime_shutdown = shutdown.clone();
|
|
let runtime_tracker = task_tracker.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(err) = run_peer(
|
|
rx_control,
|
|
tx_notify_ui,
|
|
peer_game_db,
|
|
peer_id,
|
|
game_dir,
|
|
state_dir,
|
|
unpacker,
|
|
runtime_shutdown.clone(),
|
|
runtime_tracker.clone(),
|
|
catalog,
|
|
)
|
|
.await
|
|
{
|
|
log::error!("Peer system failed: {err}");
|
|
}
|
|
|
|
runtime_shutdown.cancel();
|
|
runtime_tracker.close();
|
|
runtime_tracker.wait().await;
|
|
if tx_stopped.send(true).is_err() {
|
|
log::debug!("Peer runtime stopped after handle was dropped");
|
|
}
|
|
});
|
|
|
|
PeerRuntimeHandle {
|
|
tx: tx_control,
|
|
shutdown,
|
|
stopped,
|
|
}
|
|
}
|
|
|
|
pub(crate) fn spawn_startup_services(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
spawn_quic_server(ctx, tx_notify_ui);
|
|
spawn_peer_discovery_service(ctx, tx_notify_ui);
|
|
spawn_peer_liveness_service(ctx, tx_notify_ui);
|
|
spawn_local_library_monitor(ctx, tx_notify_ui);
|
|
}
|
|
|
|
pub(crate) async fn send_goodbye_notifications(ctx: &Ctx) {
|
|
let peer_id = ctx.peer_id.as_ref().clone();
|
|
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
|
|
|
|
futures::future::join_all(
|
|
peer_addresses
|
|
.into_iter()
|
|
.map(|peer_addr| send_goodbye_notification(peer_addr, peer_id.clone())),
|
|
)
|
|
.await;
|
|
}
|
|
|
|
fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
let server_addr = SocketAddr::from(([0, 0, 0, 0], 0));
|
|
let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone());
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let supervisor_tx = tx_notify_ui.clone();
|
|
|
|
spawn_supervised_service(
|
|
&ctx.task_tracker,
|
|
&ctx.shutdown,
|
|
&supervisor_tx,
|
|
PeerRuntimeComponent::QuicServer,
|
|
SupervisionPolicy::Required,
|
|
move || {
|
|
let peer_ctx = peer_ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
async move { run_server_component(server_addr, peer_ctx, tx_notify_ui).await }
|
|
},
|
|
);
|
|
}
|
|
|
|
fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
let ctx = ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let task_tracker = ctx.task_tracker.clone();
|
|
let shutdown = ctx.shutdown.clone();
|
|
let supervisor_tx = tx_notify_ui.clone();
|
|
|
|
spawn_supervised_service(
|
|
&task_tracker,
|
|
&shutdown,
|
|
&supervisor_tx,
|
|
PeerRuntimeComponent::Discovery,
|
|
SupervisionPolicy::Restart {
|
|
backoff: Duration::from_secs(5),
|
|
},
|
|
move || {
|
|
let ctx = ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
async move { run_peer_discovery(tx_notify_ui, ctx).await }
|
|
},
|
|
);
|
|
}
|
|
|
|
fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let peer_game_db = ctx.peer_game_db.clone();
|
|
let active_operations = ctx.active_operations.clone();
|
|
let active_downloads = ctx.active_downloads.clone();
|
|
let shutdown = ctx.shutdown.clone();
|
|
let task_tracker = ctx.task_tracker.clone();
|
|
let supervisor_tx = tx_notify_ui.clone();
|
|
|
|
spawn_supervised_service(
|
|
&ctx.task_tracker,
|
|
&ctx.shutdown,
|
|
&supervisor_tx,
|
|
PeerRuntimeComponent::Liveness,
|
|
SupervisionPolicy::Restart {
|
|
backoff: Duration::from_secs(5),
|
|
},
|
|
move || {
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let peer_game_db = peer_game_db.clone();
|
|
let active_operations = active_operations.clone();
|
|
let active_downloads = active_downloads.clone();
|
|
let shutdown = shutdown.clone();
|
|
let task_tracker = task_tracker.clone();
|
|
async move {
|
|
run_ping_service(
|
|
tx_notify_ui,
|
|
peer_game_db,
|
|
active_operations,
|
|
active_downloads,
|
|
shutdown,
|
|
task_tracker,
|
|
)
|
|
.await
|
|
}
|
|
},
|
|
);
|
|
}
|
|
|
|
fn spawn_local_library_monitor(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
let ctx = ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let task_tracker = ctx.task_tracker.clone();
|
|
let shutdown = ctx.shutdown.clone();
|
|
let supervisor_tx = tx_notify_ui.clone();
|
|
|
|
spawn_supervised_service(
|
|
&task_tracker,
|
|
&shutdown,
|
|
&supervisor_tx,
|
|
PeerRuntimeComponent::LocalMonitor,
|
|
SupervisionPolicy::BestEffort,
|
|
move || {
|
|
let ctx = ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
async move { run_local_game_monitor(tx_notify_ui, ctx).await }
|
|
},
|
|
);
|
|
}
|
|
|
|
async fn send_goodbye_notification(peer_addr: SocketAddr, peer_id: String) {
|
|
match tokio::time::timeout(Duration::from_secs(1), send_goodbye(peer_addr, peer_id)).await {
|
|
Ok(Ok(())) => {}
|
|
Ok(Err(err)) => log::warn!("Failed to send Goodbye to {peer_addr}: {err}"),
|
|
Err(_) => log::warn!("Timed out sending Goodbye to {peer_addr}"),
|
|
}
|
|
}
|
|
|
|
fn spawn_supervised_service<F, Fut>(
|
|
task_tracker: &TaskTracker,
|
|
shutdown: &CancellationToken,
|
|
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
|
component: PeerRuntimeComponent,
|
|
policy: SupervisionPolicy,
|
|
mut make_service: F,
|
|
) where
|
|
F: FnMut() -> Fut + Send + 'static,
|
|
Fut: Future<Output = eyre::Result<()>> + Send + 'static,
|
|
{
|
|
let task_tracker = task_tracker.clone();
|
|
let shutdown = shutdown.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
|
|
task_tracker.spawn(async move {
|
|
loop {
|
|
if shutdown.is_cancelled() {
|
|
break;
|
|
}
|
|
|
|
let result = match AssertUnwindSafe(make_service()).catch_unwind().await {
|
|
Ok(result) => result,
|
|
Err(payload) => Err(eyre::eyre!(
|
|
"component panicked: {}",
|
|
panic_payload_to_string(&payload)
|
|
)),
|
|
};
|
|
if shutdown.is_cancelled() {
|
|
break;
|
|
}
|
|
|
|
match policy {
|
|
SupervisionPolicy::Required => {
|
|
let error = match result {
|
|
Ok(()) => "component exited unexpectedly".to_string(),
|
|
Err(err) => err.to_string(),
|
|
};
|
|
report_required_service_failure(&tx_notify_ui, component, error, &shutdown);
|
|
break;
|
|
}
|
|
SupervisionPolicy::Restart { backoff } => {
|
|
match result {
|
|
Ok(()) => log::warn!("{component:?} exited; restarting in {backoff:?}"),
|
|
Err(err) => {
|
|
log::error!("{component:?} failed: {err}; restarting in {backoff:?}");
|
|
}
|
|
}
|
|
|
|
tokio::select! {
|
|
() = shutdown.cancelled() => break,
|
|
() = tokio::time::sleep(backoff) => {}
|
|
}
|
|
}
|
|
SupervisionPolicy::BestEffort => {
|
|
match result {
|
|
Ok(()) => log::warn!("{component:?} exited"),
|
|
Err(err) => log::error!("{component:?} failed: {err}"),
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
fn report_required_service_failure(
|
|
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
|
component: PeerRuntimeComponent,
|
|
error: String,
|
|
shutdown: &CancellationToken,
|
|
) {
|
|
log::error!("{component:?} failed: {error}");
|
|
events::send(tx_notify_ui, PeerEvent::RuntimeFailed { component, error });
|
|
shutdown.cancel();
|
|
}
|
|
|
|
fn panic_payload_to_string(payload: &(dyn Any + Send)) -> String {
|
|
if let Some(message) = payload.downcast_ref::<&'static str>() {
|
|
return (*message).to_string();
|
|
}
|
|
|
|
if let Some(message) = payload.downcast_ref::<String>() {
|
|
return message.clone();
|
|
}
|
|
|
|
"unknown panic payload".to_string()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::{
|
|
sync::{
|
|
Arc,
|
|
atomic::{AtomicUsize, Ordering},
|
|
},
|
|
time::Duration,
|
|
};
|
|
|
|
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
|
|
|
use super::{SupervisionPolicy, spawn_supervised_service};
|
|
use crate::{PeerRuntimeComponent, startup::PeerRuntimeHandle};
|
|
|
|
#[tokio::test]
|
|
async fn required_service_failure_cancels_runtime_and_emits_event() {
|
|
let tracker = TaskTracker::new();
|
|
let shutdown = CancellationToken::new();
|
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
|
|
|
spawn_supervised_service(
|
|
&tracker,
|
|
&shutdown,
|
|
&tx,
|
|
PeerRuntimeComponent::QuicServer,
|
|
SupervisionPolicy::Required,
|
|
|| async { Err(eyre::eyre!("bind failed")) },
|
|
);
|
|
|
|
let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
|
|
.await
|
|
.expect("runtime failure event should arrive")
|
|
.expect("event channel should stay open");
|
|
|
|
assert!(shutdown.is_cancelled());
|
|
assert!(matches!(
|
|
event,
|
|
crate::PeerEvent::RuntimeFailed {
|
|
component: PeerRuntimeComponent::QuicServer,
|
|
..
|
|
}
|
|
));
|
|
|
|
tracker.close();
|
|
tokio::time::timeout(Duration::from_secs(1), tracker.wait())
|
|
.await
|
|
.expect("supervisor task should stop");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn restart_service_restarts_until_shutdown() {
|
|
let tracker = TaskTracker::new();
|
|
let shutdown = CancellationToken::new();
|
|
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
|
|
let attempts = Arc::new(AtomicUsize::new(0));
|
|
|
|
spawn_supervised_service(
|
|
&tracker,
|
|
&shutdown,
|
|
&tx,
|
|
PeerRuntimeComponent::Discovery,
|
|
SupervisionPolicy::Restart {
|
|
backoff: Duration::from_millis(10),
|
|
},
|
|
{
|
|
let attempts = attempts.clone();
|
|
move || {
|
|
let attempts = attempts.clone();
|
|
async move {
|
|
attempts.fetch_add(1, Ordering::SeqCst);
|
|
Err(eyre::eyre!("discovery worker stopped"))
|
|
}
|
|
}
|
|
},
|
|
);
|
|
|
|
tokio::time::timeout(Duration::from_secs(1), async {
|
|
loop {
|
|
if attempts.load(Ordering::SeqCst) >= 2 {
|
|
break;
|
|
}
|
|
tokio::task::yield_now().await;
|
|
}
|
|
})
|
|
.await
|
|
.expect("restartable service should run more than once");
|
|
|
|
shutdown.cancel();
|
|
tracker.close();
|
|
tokio::time::timeout(Duration::from_secs(1), tracker.wait())
|
|
.await
|
|
.expect("restart supervisor should stop after shutdown");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn runtime_handle_can_shutdown_and_await_stopped() {
|
|
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
|
|
let shutdown = CancellationToken::new();
|
|
let (tx_stopped, stopped) = tokio::sync::watch::channel(false);
|
|
let mut handle = PeerRuntimeHandle {
|
|
tx,
|
|
shutdown: shutdown.clone(),
|
|
stopped,
|
|
};
|
|
|
|
tokio::spawn(async move {
|
|
shutdown.cancelled().await;
|
|
let _ = tx_stopped.send(true);
|
|
});
|
|
|
|
handle.shutdown();
|
|
tokio::time::timeout(Duration::from_secs(1), handle.wait_stopped())
|
|
.await
|
|
.expect("runtime handle should observe stopped");
|
|
}
|
|
}
|