Files
lanspread/crates/lanspread-peer/src/services/liveness.rs
T
ddidderr 41e9a0efc1 refactor(peer): split local library and operation UI events
Replace the `a9f9845` local-update dedup cache with explicit peer event
semantics. Local scans now emit `LocalLibraryChanged` when the library changes,
while operation mutations emit `ActiveOperationsChanged` from the mutation
path. Tauri keeps joining those facts into the existing `games-list-updated`
payload, so the frontend contract stays stable.

This removes the cache/invalidation coupling between scan emission and
operation state. The remaining forced local snapshot is explicit: accepted game
directory changes can refresh the UI for an equivalent new path without sending
a peer library delta.

Operation guard cleanup and liveness cancellation now publish the same active
operation snapshot as normal command-handler transitions. The peer CLI JSONL
events follow the same split with `local-library-changed` and
`active-operations-changed`.

Test Plan:
- `just fmt`
- `CARGO_BUILD_RUSTC_WRAPPER= just test`
- `CARGO_BUILD_RUSTC_WRAPPER= just clippy`
- `git diff --check`

Refs: CLEAN_CODE_PLAN_1.md
2026-05-18 21:25:20 +02:00

358 lines
11 KiB
Rust

//! Peer liveness checks and stale-peer cleanup.
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use crate::{
PeerEvent,
config::{PEER_PING_IDLE_SECS, PEER_PING_INTERVAL_SECS, peer_stale_timeout},
context::OperationKind,
events,
network::ping_peer,
peer_db::{PeerGameDB, PeerId},
};
/// Runs the ping service to check peer liveness.
pub async fn run_ping_service(
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
shutdown: CancellationToken,
task_tracker: TaskTracker,
) -> eyre::Result<()> {
log::info!(
"Starting ping service ({PEER_PING_INTERVAL_SECS}s interval, \
{}s idle threshold, {}s timeout)",
PEER_PING_IDLE_SECS,
peer_stale_timeout().as_secs()
);
let mut interval = tokio::time::interval(Duration::from_secs(PEER_PING_INTERVAL_SECS));
loop {
tokio::select! {
() = shutdown.cancelled() => return Ok(()),
_ = interval.tick() => {}
}
ping_idle_peers(
&peer_game_db,
&active_operations,
&active_downloads,
&tx_notify_ui,
&shutdown,
&task_tracker,
)
.await;
prune_stale_peers(
&peer_game_db,
&active_operations,
&active_downloads,
&tx_notify_ui,
)
.await;
}
}
async fn ping_idle_peers(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
shutdown: &CancellationToken,
task_tracker: &TaskTracker,
) {
let peer_snapshots = { peer_game_db.read().await.peer_liveness_snapshot() };
for (peer_id, peer_addr, last_seen) in peer_snapshots {
if last_seen.elapsed() < Duration::from_secs(PEER_PING_IDLE_SECS) {
continue;
}
let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = peer_game_db.clone();
let active_operations = active_operations.clone();
let active_downloads = active_downloads.clone();
let shutdown = shutdown.clone();
task_tracker.spawn(async move {
let ping_result = tokio::select! {
() = shutdown.cancelled() => return,
result = ping_peer(peer_addr) => result,
};
match ping_result {
Ok(true) => {
peer_game_db.write().await.update_last_seen(&peer_id);
}
Ok(false) => {
log::warn!("Peer {peer_addr} failed ping check");
remove_peer_and_refresh(
&peer_game_db,
&active_operations,
&active_downloads,
&tx_notify_ui,
peer_id,
"Removed stale peer",
)
.await;
}
Err(err) => {
log::error!("Failed to ping peer {peer_addr}: {err}");
remove_peer_and_refresh(
&peer_game_db,
&active_operations,
&active_downloads,
&tx_notify_ui,
peer_id,
"Removed peer due to ping error",
)
.await;
}
}
});
}
}
async fn prune_stale_peers(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) {
let stale_peers = {
peer_game_db
.read()
.await
.get_stale_peer_ids(peer_stale_timeout())
};
let mut removed_any = false;
for peer_id in stale_peers {
removed_any |= remove_peer(peer_game_db, tx_notify_ui, peer_id, "Removed stale peer").await;
}
if removed_any {
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await;
handle_active_downloads_without_peers(
peer_game_db,
active_operations,
active_downloads,
tx_notify_ui,
)
.await;
}
}
async fn remove_peer_and_refresh(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
peer_id: PeerId,
log_label: &str,
) {
if remove_peer(peer_game_db, tx_notify_ui, peer_id, log_label).await {
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await;
handle_active_downloads_without_peers(
peer_game_db,
active_operations,
active_downloads,
tx_notify_ui,
)
.await;
}
}
async fn remove_peer(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
peer_id: PeerId,
log_label: &str,
) -> bool {
let removed_peer = { peer_game_db.write().await.remove_peer(&peer_id) };
let Some(peer) = removed_peer else {
return false;
};
log::info!("{log_label}: {}", peer.addr);
events::emit_peer_lost(peer_game_db, tx_notify_ui, peer.addr).await;
true
}
async fn handle_active_downloads_without_peers(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) {
let active_ids = {
active_operations
.read()
.await
.iter()
.filter_map(|(id, kind)| (*kind == OperationKind::Downloading).then_some(id.clone()))
.collect::<Vec<_>>()
};
if active_ids.is_empty() {
return;
}
let mut changed = false;
for id in active_ids {
if peers_still_have_game(peer_game_db, &id).await {
continue;
}
changed |= active_operations.write().await.remove(&id).is_some();
let Some(cancel_token) = active_downloads.write().await.remove(&id) else {
continue;
};
cancel_token.cancel();
events::send(
tx_notify_ui,
PeerEvent::DownloadGameFilesAllPeersGone { id: id.clone() },
);
}
if changed {
events::emit_active_operations(active_operations, tx_notify_ui).await;
}
}
async fn peers_still_have_game(peer_game_db: &Arc<RwLock<PeerGameDB>>, game_id: &str) -> bool {
let guard = peer_game_db.read().await;
!guard.peers_with_game(game_id).is_empty()
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use super::handle_active_downloads_without_peers;
use crate::{
ActiveOperation,
ActiveOperationKind,
PeerEvent,
context::OperationKind,
peer_db::PeerGameDB,
};
#[tokio::test]
async fn all_peers_gone_cancels_download_and_emits_peers_gone_then_active_snapshot() {
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let active_operations = Arc::new(RwLock::new(HashMap::from([(
"game".to_string(),
OperationKind::Downloading,
)])));
let cancel = CancellationToken::new();
let active_downloads = Arc::new(RwLock::new(HashMap::from([(
"game".to_string(),
cancel.clone(),
)])));
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
handle_active_downloads_without_peers(
&peer_game_db,
&active_operations,
&active_downloads,
&tx,
)
.await;
assert!(cancel.is_cancelled());
assert!(!active_operations.read().await.contains_key("game"));
assert!(!active_downloads.read().await.contains_key("game"));
let event = rx.recv().await.expect("peers-gone event should be emitted");
assert!(matches!(
event,
PeerEvent::DownloadGameFilesAllPeersGone { id } if id == "game"
));
let event = rx
.recv()
.await
.expect("active operation snapshot should be emitted");
assert!(matches!(
event,
PeerEvent::ActiveOperationsChanged { active_operations } if active_operations.is_empty()
));
assert!(
rx.try_recv().is_err(),
"peers-gone cancellation must not emit extra events"
);
}
#[tokio::test]
async fn all_peers_gone_cancels_multiple_downloads_without_stuck_entries() {
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let first_cancel = CancellationToken::new();
let second_cancel = CancellationToken::new();
let active_operations = Arc::new(RwLock::new(HashMap::from([
("first".to_string(), OperationKind::Downloading),
("second".to_string(), OperationKind::Downloading),
("installing".to_string(), OperationKind::Installing),
])));
let active_downloads = Arc::new(RwLock::new(HashMap::from([
("first".to_string(), first_cancel.clone()),
("second".to_string(), second_cancel.clone()),
])));
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
handle_active_downloads_without_peers(
&peer_game_db,
&active_operations,
&active_downloads,
&tx,
)
.await;
assert!(first_cancel.is_cancelled());
assert!(second_cancel.is_cancelled());
let operations = active_operations.read().await;
assert!(!operations.contains_key("first"));
assert!(!operations.contains_key("second"));
assert_eq!(
operations.get("installing"),
Some(&OperationKind::Installing)
);
drop(operations);
assert!(active_downloads.read().await.is_empty());
let mut cancelled_ids = Vec::new();
for _ in 0..2 {
let event = rx.recv().await.expect("peers-gone event should be emitted");
let PeerEvent::DownloadGameFilesAllPeersGone { id } = event else {
panic!("expected peers-gone event");
};
cancelled_ids.push(id);
}
cancelled_ids.sort();
assert_eq!(cancelled_ids, vec!["first", "second"]);
let event = rx
.recv()
.await
.expect("active operation snapshot should be emitted");
assert!(matches!(
event,
PeerEvent::ActiveOperationsChanged { active_operations }
if active_operations == vec![ActiveOperation {
id: "installing".to_string(),
operation: ActiveOperationKind::Installing,
}]
));
assert!(
rx.try_recv().is_err(),
"multiple peers-gone cancellations must not emit extra events"
);
}
}