Files
lanspread/crates/lanspread-peer/src/services/stream.rs
T
ddidderr 2bbd2ac869 refactor(peer): adopt structured concurrency with supervised shutdown
Replace the detached tokio::spawn pattern in the peer runtime with a
supervised model built on tokio_util's CancellationToken and TaskTracker.
Long-lived services and child tasks now have an explicit parent, a
cancellation path, and a join point. Tauri can request a clean shutdown
on app exit instead of leaking work into process termination.

Background
~~~~~~~~~~

start_peer() previously returned only a command sender. The four startup
services (QUIC server, mDNS discovery, peer liveness, local library
monitor) and their child tasks (ping workers, handshake jobs, download
workers, announcement fan-outs, connection/stream handlers) were spawned
with raw tokio::spawn and detached. Closing the command channel sent
Goodbye notifications but did not stop those services. The mDNS blocking
worker had no cancellation path at all. Active downloads were stored as
JoinHandle<()> and force-aborted, which could interrupt file writes
mid-chunk.

Supervisor
~~~~~~~~~~

The runtime now owns a CancellationToken and a TaskTracker, threaded
through Ctx and PeerCtx. Each long-lived service is spawned through a
small supervisor (spawn_supervised_service) that wraps the service in
catch_unwind and enforces an explicit SupervisionPolicy:

  QuicServer:    Required     (fatal; cancels the runtime if it dies)
  Discovery:     Restart(5s)  (matches the prior self-restart loop)
  Liveness:      Restart(5s)
  LocalMonitor:  BestEffort   (logs and exits, no restart)

A Required failure emits a new RuntimeFailed { component, error } event
to the UI and cancels the runtime; the command loop and goodbye
notifications still run to completion. The Tauri layer forwards the
event as "peer-runtime-failed" so a future UI can surface it.

mDNS cancellation
~~~~~~~~~~~~~~~~~

MdnsBrowser previously blocked on receiver.recv() forever. It now
exposes next_service_timeout(Duration) returning an MdnsServicePoll
enum (Service/Timeout/Closed) via recv_timeout(). The discovery worker
polls at 250ms and checks the shutdown flag between ticks, so
cancellation reaches the blocking thread within one poll interval
instead of waiting for the next mDNS event.

Downloads
~~~~~~~~~

active_downloads is now HashMap<String, CancellationToken>. Each
download gets a child token of the runtime shutdown, checked at chunk
and peer-attempt boundaries (never inside file writes). When all peers
with a game disappear, liveness cancels the token and emits
DownloadGameFilesAllPeersGone; the download exits Ok(()) without
emitting a duplicate Failed event.

DownloadStateGuard (context.rs) is held inside the download task and
clears downloading_games + active_downloads on Drop, covering the happy
path, error returns, cancellation, and task abort. Drop falls back to
spawning the cleanup if write-lock contention prevents try_write.

Public API and Tauri integration
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

start_peer() now returns PeerRuntimeHandle exposing:

  fn sender(&self) -> UnboundedSender<PeerCommand>
  fn shutdown(&self)
  async fn wait_stopped(&mut self)

The Tauri layer stores the handle in managed state and switches its
main loop from .run(ctx) to .build(ctx).run(|h, e| ...). On
RunEvent::Exit it calls handle.shutdown() and blocks up to 2s on
wait_stopped(), giving services time to cancel and Goodbye packets time
to flush over a healthy LAN while staying short enough not to delay
process exit noticeably on a dead network.

The command loop distinguishes graceful shutdown from unexpected
channel closure: if recv() returns None and shutdown.is_cancelled() is
set, the loop returns Ok(()) silently. Only an unexpected close (no
cancellation observed) still emits RuntimeFailed. This avoids a
spurious failure event on every normal app close.

User-visible behavior changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

- Closing the app no longer leaks services into process termination;
  Goodbye notifications are reliably attempted before exit.
- Downloads cancel cleanly (between chunks) instead of force-aborting
  mid-write.
- A new "peer-runtime-failed" Tauri event fires when a Required service
  cannot recover. No frontend handler exists yet — that is a follow-up.

Tradeoffs
~~~~~~~~~

- Workspace tokio-util now requires the "rt" feature for TaskTracker.
- The mDNS worker still runs in spawn_blocking and may stay parked
  briefly between 250ms polls — acceptable for a desktop app.
- The 2s shutdown timeout on app exit is a deliberate compromise.

Tests
~~~~~

New unit tests:
  - DownloadStateGuard clears tracking on completion, cancellation, and
    parent-task abort (context.rs).
  - Required failure cancels the runtime and emits RuntimeFailed
    (startup.rs).
  - Restart policy restarts until shutdown is requested (startup.rs).
  - PeerRuntimeHandle.shutdown() observable via wait_stopped()
    (startup.rs).
  - Peers-gone cancellation emits only PeersGone, no duplicate Failed
    (services/liveness.rs).

Test plan
~~~~~~~~~

  cargo test --workspace
  cargo clippy --workspace --all-targets

Manual smoke test on two peers on the same LAN:
  1. Start a download, verify chunks transfer.
  2. Close the receiving app mid-download — verify the sending peer
     logs a Goodbye, not a connection-reset error.
  3. Stop the sending peer mid-download — verify the receiver emits
     DownloadGameFilesAllPeersGone, not Failed.

Follow-ups
~~~~~~~~~~

- Frontend handler for "peer-runtime-failed".
- Consider exposing the runtime handle's stopped watch to the frontend
  for a reconnecting indicator on Required failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 07:53:51 +02:00

368 lines
12 KiB
Rust

//! Request dispatch for a single bidirectional QUIC stream.
use std::net::SocketAddr;
use futures::{SinkExt, StreamExt};
use lanspread_db::db::{Game, GameFileDescription};
use lanspread_proto::{LibraryDelta, LibrarySnapshot, LibrarySummary, Message, Request, Response};
use s2n_quic::stream::{BidirectionalStream, SendStream};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::{
PeerEvent,
context::PeerCtx,
error::PeerError,
events,
local_games::get_game_file_descriptions,
peer::{send_game_file_chunk, send_game_file_data},
remote_peer::{ensure_peer_id_for_addr, update_peer_from_game_list},
services::handshake::{
accept_inbound_hello,
perform_handshake_with_peer,
spawn_library_resync,
},
};
type ResponseWriter = FramedWrite<SendStream, LengthDelimitedCodec>;
/// Handles a bidirectional stream from a peer.
pub(super) async fn handle_peer_stream(
stream: BidirectionalStream,
ctx: PeerCtx,
remote_addr: Option<SocketAddr>,
) -> eyre::Result<()> {
let (rx, tx) = stream.split();
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
log::trace!("{remote_addr:?} peer stream opened");
loop {
let next_message = tokio::select! {
() = ctx.shutdown.cancelled() => break,
next_message = framed_rx.next() => next_message,
};
match next_message {
Some(Ok(data)) => {
log::trace!(
"{:?} msg: (raw): {}",
remote_addr,
String::from_utf8_lossy(&data)
);
let request = Request::decode(data.freeze());
log::debug!("{remote_addr:?} msg: {request:?}");
note_peer_activity(&ctx, remote_addr).await;
framed_tx = dispatch_request(&ctx, remote_addr, request, framed_tx).await;
}
Some(Err(err)) => {
log::error!("{remote_addr:?} peer stream error: {err}");
break;
}
None => {
log::trace!("{remote_addr:?} peer stream closed");
break;
}
}
}
Ok(())
}
async fn dispatch_request(
ctx: &PeerCtx,
remote_addr: Option<SocketAddr>,
request: Request,
framed_tx: ResponseWriter,
) -> ResponseWriter {
match request {
Request::Ping => send_response(framed_tx, Response::Pong, "pong").await,
Request::Hello(hello) => {
let ack = accept_inbound_hello(ctx, remote_addr, hello).await;
send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await
}
Request::ListGames => handle_list_games(ctx, framed_tx).await,
Request::LibrarySummary(summary) => {
handle_library_summary(ctx, remote_addr, summary).await;
framed_tx
}
Request::LibrarySnapshot(snapshot) => {
handle_library_snapshot(ctx, remote_addr, snapshot).await;
framed_tx
}
Request::LibraryDelta(delta) => {
handle_library_delta(ctx, remote_addr, delta).await;
framed_tx
}
Request::GetGame { id } => handle_get_game(ctx, id, framed_tx).await,
Request::GetGameFileData(desc) => handle_file_data_request(ctx, desc, framed_tx).await,
Request::GetGameFileChunk {
game_id,
relative_path,
offset,
length,
} => {
handle_file_chunk_request(ctx, game_id, relative_path, offset, length, framed_tx).await
}
Request::Goodbye { peer_id } => {
handle_goodbye(ctx, remote_addr, peer_id).await;
framed_tx
}
Request::Invalid(_, _) => {
log::error!("Received invalid request from peer");
framed_tx
}
Request::AnnounceGames(games) => {
handle_announce_games(ctx, remote_addr, games).await;
framed_tx
}
}
}
async fn note_peer_activity(ctx: &PeerCtx, remote_addr: Option<SocketAddr>) {
if let Some(addr) = remote_addr {
ctx.peer_game_db
.write()
.await
.update_last_seen_by_addr(&addr);
}
}
async fn send_response(
mut framed_tx: ResponseWriter,
response: Response,
label: &str,
) -> ResponseWriter {
if let Err(err) = framed_tx.send(response.encode()).await {
log::error!("Failed to send {label} response: {err}");
}
framed_tx
}
async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> ResponseWriter {
log::info!("Received ListGames request from peer");
let snapshot = {
let db_guard = ctx.local_game_db.read().await;
if let Some(db) = db_guard.as_ref() {
db.all_games().into_iter().cloned().collect::<Vec<Game>>()
} else {
log::info!("Local game database not yet loaded, responding with empty game list");
Vec::new()
}
};
let games = if snapshot.is_empty() {
snapshot
} else {
let downloading = ctx.downloading_games.read().await;
snapshot
.into_iter()
.filter(|game| !downloading.contains(&game.id))
.collect()
};
send_response(framed_tx, Response::ListGames(games), "ListGames").await
}
async fn handle_library_summary(
ctx: &PeerCtx,
remote_addr: Option<SocketAddr>,
summary: LibrarySummary,
) {
let Some(addr) = remote_addr else {
return;
};
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
let (previous_digest, previous_count, features) = {
let db = ctx.peer_game_db.read().await;
let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0));
(
digest,
db.peer_game_count(&peer_id),
db.peer_features(&peer_id),
)
};
{
let mut db = ctx.peer_game_db.write().await;
db.update_peer_library(
&peer_id,
summary.library_rev,
summary.library_digest,
features,
);
}
if summary.library_digest != previous_digest || previous_count == 0 {
ctx.task_tracker.spawn({
let peer_id_arc = ctx.peer_id.clone();
let local_library = ctx.local_library.clone();
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui = ctx.tx_notify_ui.clone();
async move {
if let Err(err) = perform_handshake_with_peer(
peer_id_arc,
local_library,
peer_game_db,
tx_notify_ui,
addr,
Some(peer_id),
)
.await
{
log::warn!("Failed to refresh library from {addr}: {err}");
}
}
});
}
}
async fn handle_library_snapshot(
ctx: &PeerCtx,
remote_addr: Option<SocketAddr>,
snapshot: LibrarySnapshot,
) {
if let Some(addr) = remote_addr {
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
{
let mut db = ctx.peer_game_db.write().await;
db.apply_library_snapshot(&peer_id, snapshot);
}
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
}
}
async fn handle_library_delta(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, delta: LibraryDelta) {
let Some(addr) = remote_addr else {
return;
};
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
let applied = {
let mut db = ctx.peer_game_db.write().await;
db.apply_library_delta(&peer_id, delta)
};
if applied {
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
} else {
spawn_library_resync(
ctx.peer_id.clone(),
ctx.local_library.clone(),
ctx.peer_game_db.clone(),
ctx.tx_notify_ui.clone(),
addr,
peer_id,
"resync",
);
}
}
async fn handle_get_game(ctx: &PeerCtx, id: String, framed_tx: ResponseWriter) -> ResponseWriter {
log::info!("Received GetGame request for {id} from peer");
let response = get_game_response(ctx, id).await;
send_response(framed_tx, response, "GetGame").await
}
async fn get_game_response(ctx: &PeerCtx, id: String) -> Response {
let downloading = ctx.downloading_games.read().await.contains(&id);
if downloading {
log::info!("Declining to serve GetGame for {id} because download is in progress");
return Response::GameNotFound(id);
}
let game_dir = ctx.game_dir.read().await.clone();
let has_game = {
let db_guard = ctx.local_game_db.read().await;
db_guard
.as_ref()
.is_some_and(|db| db.get_game_by_id(&id).is_some())
};
if !has_game {
return Response::GameNotFound(id);
}
match get_game_file_descriptions(&id, &game_dir).await {
Ok(file_descriptions) => Response::GetGame {
id,
file_descriptions,
},
Err(PeerError::FileSizeDetermination { path, source }) => {
let error_msg = format!("Failed to determine file size for {path}: {source}");
log::error!("File size determination error for game {id}: {error_msg}");
Response::InternalPeerError(error_msg)
}
Err(err) => {
log::error!("Failed to get game file descriptions for {id}: {err}");
Response::GameNotFound(id)
}
}
}
async fn handle_file_data_request(
ctx: &PeerCtx,
desc: GameFileDescription,
framed_tx: ResponseWriter,
) -> ResponseWriter {
log::info!(
"Received GetGameFileData request for {} from peer",
desc.relative_path
);
let game_dir = ctx.game_dir.read().await.clone();
let mut tx = framed_tx.into_inner();
send_game_file_data(&desc, &mut tx, &game_dir).await;
FramedWrite::new(tx, LengthDelimitedCodec::new())
}
async fn handle_file_chunk_request(
ctx: &PeerCtx,
game_id: String,
relative_path: String,
offset: u64,
length: u64,
framed_tx: ResponseWriter,
) -> ResponseWriter {
log::info!(
"Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
);
let game_dir = ctx.game_dir.read().await.clone();
let mut tx = framed_tx.into_inner();
send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &game_dir).await;
FramedWrite::new(tx, LengthDelimitedCodec::new())
}
async fn handle_goodbye(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, peer_id: String) {
log::info!("Received Goodbye from peer {peer_id}");
let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) };
if removed.is_none() {
return;
}
if let Some(addr) = remote_addr {
events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, addr).await;
}
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
}
async fn handle_announce_games(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, games: Vec<Game>) {
log::info!(
"Received {} announced games from peer {remote_addr:?}",
games.len()
);
if let Some(addr) = remote_addr {
let aggregated_games = update_peer_from_game_list(&ctx.peer_game_db, addr, &games).await;
events::send(&ctx.tx_notify_ui, PeerEvent::ListGames(aggregated_games));
}
}