//! Peer liveness checks and stale-peer cleanup. use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use crate::{ PeerEvent, config::{PEER_PING_IDLE_SECS, PEER_PING_INTERVAL_SECS, peer_stale_timeout}, context::OperationKind, events, network::ping_peer, peer_db::{PeerGameDB, PeerId}, }; /// Runs the ping service to check peer liveness. pub async fn run_ping_service( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, active_operations: Arc>>, active_downloads: Arc>>, shutdown: CancellationToken, task_tracker: TaskTracker, ) -> eyre::Result<()> { log::info!( "Starting ping service ({PEER_PING_INTERVAL_SECS}s interval, \ {}s idle threshold, {}s timeout)", PEER_PING_IDLE_SECS, peer_stale_timeout().as_secs() ); let mut interval = tokio::time::interval(Duration::from_secs(PEER_PING_INTERVAL_SECS)); loop { tokio::select! { () = shutdown.cancelled() => return Ok(()), _ = interval.tick() => {} } ping_idle_peers( &peer_game_db, &active_operations, &active_downloads, &tx_notify_ui, &shutdown, &task_tracker, ) .await; prune_stale_peers( &peer_game_db, &active_operations, &active_downloads, &tx_notify_ui, ) .await; } } async fn ping_idle_peers( peer_game_db: &Arc>, active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, shutdown: &CancellationToken, task_tracker: &TaskTracker, ) { let peer_snapshots = { peer_game_db.read().await.peer_liveness_snapshot() }; for (peer_id, peer_addr, last_seen) in peer_snapshots { if last_seen.elapsed() < Duration::from_secs(PEER_PING_IDLE_SECS) { continue; } let tx_notify_ui = tx_notify_ui.clone(); let peer_game_db = peer_game_db.clone(); let active_operations = active_operations.clone(); let active_downloads = active_downloads.clone(); let shutdown = shutdown.clone(); task_tracker.spawn(async move { let ping_result = tokio::select! { () = shutdown.cancelled() => return, result = ping_peer(peer_addr) => result, }; match ping_result { Ok(true) => { peer_game_db.write().await.update_last_seen(&peer_id); } Ok(false) => { log::warn!("Peer {peer_addr} failed ping check"); remove_peer_and_refresh( &peer_game_db, &active_operations, &active_downloads, &tx_notify_ui, peer_id, "Removed stale peer", ) .await; } Err(err) => { log::error!("Failed to ping peer {peer_addr}: {err}"); remove_peer_and_refresh( &peer_game_db, &active_operations, &active_downloads, &tx_notify_ui, peer_id, "Removed peer due to ping error", ) .await; } } }); } } async fn prune_stale_peers( peer_game_db: &Arc>, active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, ) { let stale_peers = { peer_game_db .read() .await .get_stale_peer_ids(peer_stale_timeout()) }; let mut removed_any = false; for peer_id in stale_peers { removed_any |= remove_peer(peer_game_db, tx_notify_ui, peer_id, "Removed stale peer").await; } if removed_any { events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; handle_active_downloads_without_peers( peer_game_db, active_operations, active_downloads, tx_notify_ui, ) .await; } } async fn remove_peer_and_refresh( peer_game_db: &Arc>, active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, peer_id: PeerId, log_label: &str, ) { if remove_peer(peer_game_db, tx_notify_ui, peer_id, log_label).await { events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; handle_active_downloads_without_peers( peer_game_db, active_operations, active_downloads, tx_notify_ui, ) .await; } } async fn remove_peer( peer_game_db: &Arc>, tx_notify_ui: &UnboundedSender, peer_id: PeerId, log_label: &str, ) -> bool { let removed_peer = { peer_game_db.write().await.remove_peer(&peer_id) }; let Some(peer) = removed_peer else { return false; }; log::info!("{log_label}: {}", peer.addr); events::emit_peer_lost(peer_game_db, tx_notify_ui, peer.addr).await; true } async fn handle_active_downloads_without_peers( peer_game_db: &Arc>, active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, ) { let active_ids = { active_operations .read() .await .iter() .filter_map(|(id, kind)| (*kind == OperationKind::Downloading).then_some(id.clone())) .collect::>() }; if active_ids.is_empty() { return; } for id in active_ids { if peers_still_have_game(peer_game_db, &id).await { continue; } active_operations.write().await.remove(&id); let Some(cancel_token) = active_downloads.write().await.remove(&id) else { continue; }; cancel_token.cancel(); events::send( tx_notify_ui, PeerEvent::DownloadGameFilesAllPeersGone { id }, ); } } async fn peers_still_have_game(peer_game_db: &Arc>, game_id: &str) -> bool { let guard = peer_game_db.read().await; !guard.peers_with_game(game_id).is_empty() } #[cfg(test)] mod tests { use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use super::handle_active_downloads_without_peers; use crate::{PeerEvent, context::OperationKind, peer_db::PeerGameDB}; #[tokio::test] async fn all_peers_gone_cancels_download_and_emits_only_peers_gone() { let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let active_operations = Arc::new(RwLock::new(HashMap::from([( "game".to_string(), OperationKind::Downloading, )]))); let cancel = CancellationToken::new(); let active_downloads = Arc::new(RwLock::new(HashMap::from([( "game".to_string(), cancel.clone(), )]))); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); handle_active_downloads_without_peers( &peer_game_db, &active_operations, &active_downloads, &tx, ) .await; assert!(cancel.is_cancelled()); assert!(!active_operations.read().await.contains_key("game")); assert!(!active_downloads.read().await.contains_key("game")); let event = rx.recv().await.expect("peers-gone event should be emitted"); assert!(matches!( event, PeerEvent::DownloadGameFilesAllPeersGone { id } if id == "game" )); assert!( rx.try_recv().is_err(), "peers-gone cancellation must not emit a duplicate failure event" ); } #[tokio::test] async fn all_peers_gone_cancels_multiple_downloads_without_stuck_entries() { let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let first_cancel = CancellationToken::new(); let second_cancel = CancellationToken::new(); let active_operations = Arc::new(RwLock::new(HashMap::from([ ("first".to_string(), OperationKind::Downloading), ("second".to_string(), OperationKind::Downloading), ("installing".to_string(), OperationKind::Installing), ]))); let active_downloads = Arc::new(RwLock::new(HashMap::from([ ("first".to_string(), first_cancel.clone()), ("second".to_string(), second_cancel.clone()), ]))); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); handle_active_downloads_without_peers( &peer_game_db, &active_operations, &active_downloads, &tx, ) .await; assert!(first_cancel.is_cancelled()); assert!(second_cancel.is_cancelled()); let operations = active_operations.read().await; assert!(!operations.contains_key("first")); assert!(!operations.contains_key("second")); assert_eq!( operations.get("installing"), Some(&OperationKind::Installing) ); drop(operations); assert!(active_downloads.read().await.is_empty()); let mut cancelled_ids = Vec::new(); for _ in 0..2 { let event = rx.recv().await.expect("peers-gone event should be emitted"); let PeerEvent::DownloadGameFilesAllPeersGone { id } = event else { panic!("expected peers-gone event"); }; cancelled_ids.push(id); } cancelled_ids.sort(); assert_eq!(cancelled_ids, vec!["first", "second"]); assert!( rx.try_recv().is_err(), "multiple peers-gone cancellations must not emit duplicate failure events" ); } }