373def6d44
Add a streamed-install prototype that can receive archive-derived install bytes straight into local/ without first storing the peer-owned root archive payload. This is intended for low-disk clients that want to install a game but opt out of becoming a downloadable peer source for that game. The protocol gains a current-version-only StreamInstall request and framed StreamInstallFrame responses. The peer core owns the generic transport, transaction, path validation, size checks, CRC32 verification, and lifecycle state. The archive-specific work is hidden behind StreamInstallProvider so the prototype can use unrar while the final implementation can swap in a better provider without rewriting the peer command path. The receiver writes into .local.installing and only promotes to local/ after the full stream verifies. It deliberately does not write the root version.ini or archive files, so the settled local state is installed=true, downloaded=false, and availability=LocalOnly. That preserves the existing rule that local/ is not served to peers and makes streamed receivers non-sources by construction. The CLI is the only caller for now. It exposes stream-install and provides the prototype unrar implementation with unrar lt for entry metadata and unrar p for file bytes. This is simple and good enough to prove non-solid archive streaming, but it is not the production provider shape for solid archives because per-file unrar p would repeatedly decompress prefixes. The Tauri app explicitly passes stream_install_provider: None, so the GUI behavior stays unchanged until a real product path is designed. Document the production-readiness work in NEXT_STEPS.md. The main follow-up is to make the provider abstraction final-ish and replace the per-file CLI unrar provider with a one-pass archive provider, then wire a deliberate GUI low-disk mode, retry semantics, and broader failure scenarios. Test Plan: - just fmt - RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test - python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \ S39 S40 --build-image - RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just clippy - git diff --check - git diff --cached --check Follow-up: NEXT_STEPS.md
473 lines
14 KiB
Rust
473 lines
14 KiB
Rust
//! Peer runtime task startup and shutdown orchestration.
|
|
|
|
use std::{
|
|
any::Any,
|
|
future::Future,
|
|
net::SocketAddr,
|
|
panic::AssertUnwindSafe,
|
|
path::PathBuf,
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
|
|
use futures::FutureExt as _;
|
|
use lanspread_db::db::GameCatalog;
|
|
use tokio::sync::{
|
|
RwLock,
|
|
mpsc::{UnboundedReceiver, UnboundedSender},
|
|
watch,
|
|
};
|
|
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
|
|
|
use crate::{
|
|
PeerCommand,
|
|
PeerEvent,
|
|
PeerRuntimeComponent,
|
|
StreamInstallProvider,
|
|
Unpacker,
|
|
context::Ctx,
|
|
events,
|
|
network::send_goodbye,
|
|
peer_db::PeerGameDB,
|
|
run_peer,
|
|
services::{
|
|
run_local_game_monitor,
|
|
run_peer_discovery,
|
|
run_ping_service,
|
|
run_server_component,
|
|
},
|
|
};
|
|
|
|
/// Handle to a running peer runtime.
|
|
///
|
|
/// Holds the command sender plus the runtime's shutdown token and a `stopped`
|
|
/// signal so callers can request a clean shutdown and wait for goodbye
|
|
/// notifications to flush.
|
|
pub struct PeerRuntimeHandle {
|
|
tx: UnboundedSender<PeerCommand>,
|
|
shutdown: CancellationToken,
|
|
stopped: watch::Receiver<bool>,
|
|
}
|
|
|
|
impl PeerRuntimeHandle {
|
|
/// Returns a clone of the command channel sender.
|
|
#[must_use]
|
|
pub fn sender(&self) -> UnboundedSender<PeerCommand> {
|
|
self.tx.clone()
|
|
}
|
|
|
|
/// Signals the runtime to shut down. Idempotent.
|
|
pub fn shutdown(&self) {
|
|
self.shutdown.cancel();
|
|
}
|
|
|
|
/// Resolves once the runtime task has fully stopped (services drained,
|
|
/// goodbye notifications sent). Returns even if the runtime stopped
|
|
/// without an explicit shutdown request.
|
|
pub async fn wait_stopped(&mut self) {
|
|
let _ = self.stopped.wait_for(|stopped| *stopped).await;
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
pub(crate) enum SupervisionPolicy {
|
|
Required,
|
|
Restart { backoff: Duration },
|
|
BestEffort,
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
|
|
pub(crate) fn spawn_peer_runtime(
|
|
tx_control: UnboundedSender<PeerCommand>,
|
|
rx_control: UnboundedReceiver<PeerCommand>,
|
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
|
peer_id: String,
|
|
game_dir: PathBuf,
|
|
state_dir: PathBuf,
|
|
unpacker: Arc<dyn Unpacker>,
|
|
catalog: Arc<RwLock<GameCatalog>>,
|
|
active_outbound_transfers: crate::context::OutboundTransfers,
|
|
stream_install_provider: Arc<dyn StreamInstallProvider>,
|
|
) -> PeerRuntimeHandle {
|
|
let shutdown = CancellationToken::new();
|
|
let task_tracker = TaskTracker::new();
|
|
let (tx_stopped, stopped) = watch::channel(false);
|
|
|
|
let runtime_shutdown = shutdown.clone();
|
|
let runtime_tracker = task_tracker.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(err) = run_peer(
|
|
rx_control,
|
|
tx_notify_ui,
|
|
peer_game_db,
|
|
peer_id,
|
|
game_dir,
|
|
state_dir,
|
|
unpacker,
|
|
runtime_shutdown.clone(),
|
|
runtime_tracker.clone(),
|
|
catalog,
|
|
active_outbound_transfers,
|
|
stream_install_provider,
|
|
)
|
|
.await
|
|
{
|
|
log::error!("Peer system failed: {err}");
|
|
}
|
|
|
|
runtime_shutdown.cancel();
|
|
runtime_tracker.close();
|
|
runtime_tracker.wait().await;
|
|
if tx_stopped.send(true).is_err() {
|
|
log::debug!("Peer runtime stopped after handle was dropped");
|
|
}
|
|
});
|
|
|
|
PeerRuntimeHandle {
|
|
tx: tx_control,
|
|
shutdown,
|
|
stopped,
|
|
}
|
|
}
|
|
|
|
pub(crate) fn spawn_startup_services(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
spawn_quic_server(ctx, tx_notify_ui);
|
|
spawn_peer_discovery_service(ctx, tx_notify_ui);
|
|
spawn_peer_liveness_service(ctx, tx_notify_ui);
|
|
spawn_local_library_monitor(ctx, tx_notify_ui);
|
|
}
|
|
|
|
pub(crate) async fn send_goodbye_notifications(ctx: &Ctx) {
|
|
let peer_id = ctx.peer_id.as_ref().clone();
|
|
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
|
|
|
|
futures::future::join_all(
|
|
peer_addresses
|
|
.into_iter()
|
|
.map(|peer_addr| send_goodbye_notification(peer_addr, peer_id.clone())),
|
|
)
|
|
.await;
|
|
}
|
|
|
|
fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
let server_addr = SocketAddr::from(([0, 0, 0, 0], 0));
|
|
let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone());
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let supervisor_tx = tx_notify_ui.clone();
|
|
|
|
spawn_supervised_service(
|
|
&ctx.task_tracker,
|
|
&ctx.shutdown,
|
|
&supervisor_tx,
|
|
PeerRuntimeComponent::QuicServer,
|
|
SupervisionPolicy::Required,
|
|
move || {
|
|
let peer_ctx = peer_ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
async move { run_server_component(server_addr, peer_ctx, tx_notify_ui).await }
|
|
},
|
|
);
|
|
}
|
|
|
|
fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
let ctx = ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let task_tracker = ctx.task_tracker.clone();
|
|
let shutdown = ctx.shutdown.clone();
|
|
let supervisor_tx = tx_notify_ui.clone();
|
|
|
|
spawn_supervised_service(
|
|
&task_tracker,
|
|
&shutdown,
|
|
&supervisor_tx,
|
|
PeerRuntimeComponent::Discovery,
|
|
SupervisionPolicy::Restart {
|
|
backoff: Duration::from_secs(5),
|
|
},
|
|
move || {
|
|
let ctx = ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
async move { run_peer_discovery(tx_notify_ui, ctx).await }
|
|
},
|
|
);
|
|
}
|
|
|
|
fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let peer_game_db = ctx.peer_game_db.clone();
|
|
let catalog = ctx.catalog.clone();
|
|
let active_operations = ctx.active_operations.clone();
|
|
let active_downloads = ctx.active_downloads.clone();
|
|
let shutdown = ctx.shutdown.clone();
|
|
let task_tracker = ctx.task_tracker.clone();
|
|
let supervisor_tx = tx_notify_ui.clone();
|
|
|
|
spawn_supervised_service(
|
|
&ctx.task_tracker,
|
|
&ctx.shutdown,
|
|
&supervisor_tx,
|
|
PeerRuntimeComponent::Liveness,
|
|
SupervisionPolicy::Restart {
|
|
backoff: Duration::from_secs(5),
|
|
},
|
|
move || {
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let peer_game_db = peer_game_db.clone();
|
|
let catalog = catalog.clone();
|
|
let active_operations = active_operations.clone();
|
|
let active_downloads = active_downloads.clone();
|
|
let shutdown = shutdown.clone();
|
|
let task_tracker = task_tracker.clone();
|
|
async move {
|
|
run_ping_service(
|
|
tx_notify_ui,
|
|
peer_game_db,
|
|
catalog,
|
|
active_operations,
|
|
active_downloads,
|
|
shutdown,
|
|
task_tracker,
|
|
)
|
|
.await
|
|
}
|
|
},
|
|
);
|
|
}
|
|
|
|
fn spawn_local_library_monitor(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
|
let ctx = ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
let task_tracker = ctx.task_tracker.clone();
|
|
let shutdown = ctx.shutdown.clone();
|
|
let supervisor_tx = tx_notify_ui.clone();
|
|
|
|
spawn_supervised_service(
|
|
&task_tracker,
|
|
&shutdown,
|
|
&supervisor_tx,
|
|
PeerRuntimeComponent::LocalMonitor,
|
|
SupervisionPolicy::BestEffort,
|
|
move || {
|
|
let ctx = ctx.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
async move { run_local_game_monitor(tx_notify_ui, ctx).await }
|
|
},
|
|
);
|
|
}
|
|
|
|
async fn send_goodbye_notification(peer_addr: SocketAddr, peer_id: String) {
|
|
match tokio::time::timeout(Duration::from_secs(1), send_goodbye(peer_addr, peer_id)).await {
|
|
Ok(Ok(())) => {}
|
|
Ok(Err(err)) => log::warn!("Failed to send Goodbye to {peer_addr}: {err}"),
|
|
Err(_) => log::warn!("Timed out sending Goodbye to {peer_addr}"),
|
|
}
|
|
}
|
|
|
|
fn spawn_supervised_service<F, Fut>(
|
|
task_tracker: &TaskTracker,
|
|
shutdown: &CancellationToken,
|
|
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
|
component: PeerRuntimeComponent,
|
|
policy: SupervisionPolicy,
|
|
mut make_service: F,
|
|
) where
|
|
F: FnMut() -> Fut + Send + 'static,
|
|
Fut: Future<Output = eyre::Result<()>> + Send + 'static,
|
|
{
|
|
let task_tracker = task_tracker.clone();
|
|
let shutdown = shutdown.clone();
|
|
let tx_notify_ui = tx_notify_ui.clone();
|
|
|
|
task_tracker.spawn(async move {
|
|
loop {
|
|
if shutdown.is_cancelled() {
|
|
break;
|
|
}
|
|
|
|
let result = match AssertUnwindSafe(make_service()).catch_unwind().await {
|
|
Ok(result) => result,
|
|
Err(payload) => Err(eyre::eyre!(
|
|
"component panicked: {}",
|
|
panic_payload_to_string(&payload)
|
|
)),
|
|
};
|
|
if shutdown.is_cancelled() {
|
|
break;
|
|
}
|
|
|
|
match policy {
|
|
SupervisionPolicy::Required => {
|
|
let error = match result {
|
|
Ok(()) => "component exited unexpectedly".to_string(),
|
|
Err(err) => err.to_string(),
|
|
};
|
|
report_required_service_failure(&tx_notify_ui, component, error, &shutdown);
|
|
break;
|
|
}
|
|
SupervisionPolicy::Restart { backoff } => {
|
|
match result {
|
|
Ok(()) => log::warn!("{component:?} exited; restarting in {backoff:?}"),
|
|
Err(err) => {
|
|
log::error!("{component:?} failed: {err}; restarting in {backoff:?}");
|
|
}
|
|
}
|
|
|
|
tokio::select! {
|
|
() = shutdown.cancelled() => break,
|
|
() = tokio::time::sleep(backoff) => {}
|
|
}
|
|
}
|
|
SupervisionPolicy::BestEffort => {
|
|
match result {
|
|
Ok(()) => log::warn!("{component:?} exited"),
|
|
Err(err) => log::error!("{component:?} failed: {err}"),
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
fn report_required_service_failure(
|
|
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
|
component: PeerRuntimeComponent,
|
|
error: String,
|
|
shutdown: &CancellationToken,
|
|
) {
|
|
log::error!("{component:?} failed: {error}");
|
|
events::send(tx_notify_ui, PeerEvent::RuntimeFailed { component, error });
|
|
shutdown.cancel();
|
|
}
|
|
|
|
fn panic_payload_to_string(payload: &(dyn Any + Send)) -> String {
|
|
if let Some(message) = payload.downcast_ref::<&'static str>() {
|
|
return (*message).to_string();
|
|
}
|
|
|
|
if let Some(message) = payload.downcast_ref::<String>() {
|
|
return message.clone();
|
|
}
|
|
|
|
"unknown panic payload".to_string()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::{
|
|
sync::{
|
|
Arc,
|
|
atomic::{AtomicUsize, Ordering},
|
|
},
|
|
time::Duration,
|
|
};
|
|
|
|
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
|
|
|
use super::{SupervisionPolicy, spawn_supervised_service};
|
|
use crate::{PeerRuntimeComponent, startup::PeerRuntimeHandle};
|
|
|
|
#[tokio::test]
|
|
async fn required_service_failure_cancels_runtime_and_emits_event() {
|
|
let tracker = TaskTracker::new();
|
|
let shutdown = CancellationToken::new();
|
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
|
|
|
spawn_supervised_service(
|
|
&tracker,
|
|
&shutdown,
|
|
&tx,
|
|
PeerRuntimeComponent::QuicServer,
|
|
SupervisionPolicy::Required,
|
|
|| async { Err(eyre::eyre!("bind failed")) },
|
|
);
|
|
|
|
let event = tokio::time::timeout(Duration::from_secs(1), rx.recv())
|
|
.await
|
|
.expect("runtime failure event should arrive")
|
|
.expect("event channel should stay open");
|
|
|
|
assert!(shutdown.is_cancelled());
|
|
assert!(matches!(
|
|
event,
|
|
crate::PeerEvent::RuntimeFailed {
|
|
component: PeerRuntimeComponent::QuicServer,
|
|
..
|
|
}
|
|
));
|
|
|
|
tracker.close();
|
|
tokio::time::timeout(Duration::from_secs(1), tracker.wait())
|
|
.await
|
|
.expect("supervisor task should stop");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn restart_service_restarts_until_shutdown() {
|
|
let tracker = TaskTracker::new();
|
|
let shutdown = CancellationToken::new();
|
|
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
|
|
let attempts = Arc::new(AtomicUsize::new(0));
|
|
|
|
spawn_supervised_service(
|
|
&tracker,
|
|
&shutdown,
|
|
&tx,
|
|
PeerRuntimeComponent::Discovery,
|
|
SupervisionPolicy::Restart {
|
|
backoff: Duration::from_millis(10),
|
|
},
|
|
{
|
|
let attempts = attempts.clone();
|
|
move || {
|
|
let attempts = attempts.clone();
|
|
async move {
|
|
attempts.fetch_add(1, Ordering::SeqCst);
|
|
Err(eyre::eyre!("discovery worker stopped"))
|
|
}
|
|
}
|
|
},
|
|
);
|
|
|
|
tokio::time::timeout(Duration::from_secs(1), async {
|
|
loop {
|
|
if attempts.load(Ordering::SeqCst) >= 2 {
|
|
break;
|
|
}
|
|
tokio::task::yield_now().await;
|
|
}
|
|
})
|
|
.await
|
|
.expect("restartable service should run more than once");
|
|
|
|
shutdown.cancel();
|
|
tracker.close();
|
|
tokio::time::timeout(Duration::from_secs(1), tracker.wait())
|
|
.await
|
|
.expect("restart supervisor should stop after shutdown");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn runtime_handle_can_shutdown_and_await_stopped() {
|
|
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
|
|
let shutdown = CancellationToken::new();
|
|
let (tx_stopped, stopped) = tokio::sync::watch::channel(false);
|
|
let mut handle = PeerRuntimeHandle {
|
|
tx,
|
|
shutdown: shutdown.clone(),
|
|
stopped,
|
|
};
|
|
|
|
tokio::spawn(async move {
|
|
shutdown.cancelled().await;
|
|
let _ = tx_stopped.send(true);
|
|
});
|
|
|
|
handle.shutdown();
|
|
tokio::time::timeout(Duration::from_secs(1), handle.wait_stopped())
|
|
.await
|
|
.expect("runtime handle should observe stopped");
|
|
}
|
|
}
|