refactor (Opus 4.5): modularize and split
This commit is contained in:
@@ -0,0 +1,731 @@
|
||||
//! Background services for the peer system.
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
net::SocketAddr,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use lanspread_db::db::Game;
|
||||
use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser};
|
||||
use lanspread_proto::{Message, Request, Response};
|
||||
use s2n_quic::{Connection, Server, provider::limits::Limits, stream::BidirectionalStream};
|
||||
use tokio::{
|
||||
sync::{RwLock, mpsc::UnboundedSender},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
PeerEvent,
|
||||
config::{
|
||||
CERT_PEM,
|
||||
KEY_PEM,
|
||||
LOCAL_GAME_MONITOR_INTERVAL_SECS,
|
||||
PEER_PING_INTERVAL_SECS,
|
||||
peer_stale_timeout,
|
||||
},
|
||||
context::{Ctx, PeerCtx},
|
||||
error::PeerError,
|
||||
handlers::{emit_peer_game_list, update_and_announce_games},
|
||||
local_games::{get_game_file_descriptions, scan_local_games},
|
||||
network::{fetch_games_from_peer, ping_peer, select_advertise_ip},
|
||||
peer::{send_game_file_chunk, send_game_file_data},
|
||||
peer_db::PeerGameDB,
|
||||
};
|
||||
|
||||
// =============================================================================
|
||||
// Server component
|
||||
// =============================================================================
|
||||
|
||||
/// Runs the QUIC server and mDNS advertiser.
|
||||
pub async fn run_server_component(
|
||||
addr: SocketAddr,
|
||||
ctx: PeerCtx,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
let limits = Limits::default()
|
||||
.with_max_handshake_duration(Duration::from_secs(3))?
|
||||
.with_max_idle_timeout(Duration::from_secs(3))?;
|
||||
|
||||
let mut server = Server::builder()
|
||||
.with_tls((CERT_PEM, KEY_PEM))?
|
||||
.with_io(addr)?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let server_addr = server.local_addr()?;
|
||||
log::info!("Peer server listening on {server_addr}");
|
||||
|
||||
let advertise_ip = select_advertise_ip()?;
|
||||
let advertise_addr = SocketAddr::new(advertise_ip, server_addr.port());
|
||||
log::info!("Advertising peer via mDNS from {advertise_addr}");
|
||||
{
|
||||
let mut guard = ctx.local_peer_addr.write().await;
|
||||
*guard = Some(advertise_addr);
|
||||
}
|
||||
|
||||
// Start mDNS advertising for peer discovery
|
||||
let peer_id = Uuid::now_v7().simple().to_string();
|
||||
let hostname = gethostname::gethostname();
|
||||
let hostname_str = hostname.to_str().unwrap_or("");
|
||||
|
||||
// Calculate maximum hostname length that fits with UUID in 63 char limit
|
||||
let max_hostname_len = 63usize.saturating_sub(peer_id.len() + 1);
|
||||
let truncated_hostname = if hostname_str.len() > max_hostname_len {
|
||||
hostname_str.get(..max_hostname_len).unwrap_or(hostname_str)
|
||||
} else {
|
||||
hostname_str
|
||||
};
|
||||
|
||||
let combined_str = if truncated_hostname.is_empty() {
|
||||
peer_id
|
||||
} else {
|
||||
format!("{truncated_hostname}-{peer_id}")
|
||||
};
|
||||
|
||||
let mdns = tokio::task::spawn_blocking(move || {
|
||||
MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, advertise_addr)
|
||||
})
|
||||
.await??;
|
||||
|
||||
// Monitor mDNS events
|
||||
let _tx_notify_ui_mdns = tx_notify_ui.clone();
|
||||
let hostname = truncated_hostname.to_string();
|
||||
tokio::spawn(async move {
|
||||
log::info!("Registering mDNS service with hostname: {hostname}");
|
||||
while let Ok(event) = mdns.monitor.recv() {
|
||||
match event {
|
||||
lanspread_mdns::DaemonEvent::Error(e) => {
|
||||
log::error!("mDNS error: {e}");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
_ => {
|
||||
log::trace!("mDNS event: {event:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while let Some(connection) = server.accept().await {
|
||||
let ctx = ctx.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_peer_connection(connection, ctx, tx_notify_ui).await {
|
||||
log::error!("Peer connection error: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles an incoming peer connection.
|
||||
async fn handle_peer_connection(
|
||||
mut connection: Connection,
|
||||
ctx: PeerCtx,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
let remote_addr = connection.remote_addr()?;
|
||||
log::info!("{remote_addr} peer connected");
|
||||
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerConnected(remote_addr)) {
|
||||
log::error!("Failed to send PeerConnected event: {e}");
|
||||
}
|
||||
|
||||
// handle streams
|
||||
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
|
||||
let ctx = ctx.clone();
|
||||
let remote_addr = Some(remote_addr);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_peer_stream(stream, ctx, remote_addr).await {
|
||||
log::error!("{remote_addr:?} peer stream error: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDisconnected(remote_addr)) {
|
||||
log::error!("Failed to send PeerDisconnected event: {e}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles a bidirectional stream from a peer.
|
||||
#[allow(clippy::too_many_lines)]
|
||||
async fn handle_peer_stream(
|
||||
stream: BidirectionalStream,
|
||||
ctx: PeerCtx,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
) -> eyre::Result<()> {
|
||||
let (rx, tx) = stream.split();
|
||||
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
log::trace!("{remote_addr:?} peer stream opened");
|
||||
|
||||
// handle streams
|
||||
loop {
|
||||
match framed_rx.next().await {
|
||||
Some(Ok(data)) => {
|
||||
log::trace!(
|
||||
"{:?} msg: (raw): {}",
|
||||
remote_addr,
|
||||
String::from_utf8_lossy(&data)
|
||||
);
|
||||
|
||||
let request = Request::decode(data.freeze());
|
||||
log::debug!("{remote_addr:?} msg: {request:?}");
|
||||
|
||||
match request {
|
||||
Request::Ping => {
|
||||
// Respond with pong
|
||||
if let Err(e) = framed_tx.send(Response::Pong.encode()).await {
|
||||
log::error!("Failed to send pong: {e}");
|
||||
}
|
||||
}
|
||||
Request::ListGames => {
|
||||
// Return list of games from this peer
|
||||
log::info!("Received ListGames request from peer");
|
||||
let snapshot = {
|
||||
let db_guard = ctx.local_game_db.read().await;
|
||||
if let Some(ref db) = *db_guard {
|
||||
db.all_games().into_iter().cloned().collect::<Vec<Game>>()
|
||||
} else {
|
||||
// Local database not loaded yet, return empty result
|
||||
log::info!(
|
||||
"Local game database not yet loaded, responding with empty game list"
|
||||
);
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
let games = if snapshot.is_empty() {
|
||||
snapshot
|
||||
} else {
|
||||
let downloading = ctx.downloading_games.read().await;
|
||||
snapshot
|
||||
.into_iter()
|
||||
.filter(|game| !downloading.contains(&game.id))
|
||||
.collect()
|
||||
};
|
||||
|
||||
if let Err(e) = framed_tx.send(Response::ListGames(games).encode()).await {
|
||||
log::error!("Failed to send ListGames response: {e}");
|
||||
}
|
||||
}
|
||||
Request::GetGame { id } => {
|
||||
log::info!("Received GetGame request for {id} from peer");
|
||||
let downloading = ctx.downloading_games.read().await.contains(&id);
|
||||
let response = if downloading {
|
||||
log::info!(
|
||||
"Declining to serve GetGame for {id} because download is in progress"
|
||||
);
|
||||
Response::GameNotFound(id)
|
||||
} else if let Some(ref game_dir) = *ctx.game_dir.read().await {
|
||||
if let Some(ref db) = *ctx.local_game_db.read().await {
|
||||
if db.get_game_by_id(&id).is_some() {
|
||||
match get_game_file_descriptions(&id, game_dir).await {
|
||||
Ok(file_descriptions) => Response::GetGame {
|
||||
id,
|
||||
file_descriptions,
|
||||
},
|
||||
Err(PeerError::FileSizeDetermination { path, source }) => {
|
||||
let error_msg = format!(
|
||||
"Failed to determine file size for {path}: {source}"
|
||||
);
|
||||
log::error!(
|
||||
"File size determination error for game {id}: {error_msg}"
|
||||
);
|
||||
Response::InternalPeerError(error_msg)
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Failed to get game file descriptions for {id}: {e}"
|
||||
);
|
||||
Response::GameNotFound(id)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Response::GameNotFound(id)
|
||||
}
|
||||
} else {
|
||||
Response::GameNotFound(id)
|
||||
}
|
||||
} else {
|
||||
Response::GameNotFound(id)
|
||||
};
|
||||
|
||||
if let Err(e) = framed_tx.send(response.encode()).await {
|
||||
log::error!("Failed to send GetGame response: {e}");
|
||||
}
|
||||
}
|
||||
Request::GetGameFileData(desc) => {
|
||||
log::info!(
|
||||
"Received GetGameFileData request for {} from peer",
|
||||
desc.relative_path
|
||||
);
|
||||
|
||||
let maybe_game_dir = ctx.game_dir.read().await.clone();
|
||||
if let Some(game_dir) = maybe_game_dir {
|
||||
let base_dir = PathBuf::from(game_dir);
|
||||
// For file data, we need the raw stream, so we unwrap the FramedWrite
|
||||
let mut tx = framed_tx.into_inner();
|
||||
send_game_file_data(&desc, &mut tx, &base_dir).await;
|
||||
// Re-wrap for next iteration (though usually stream closes after file transfer)
|
||||
framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
} else if let Err(e) = framed_tx
|
||||
.send(
|
||||
Response::InvalidRequest(
|
||||
desc.relative_path.as_bytes().to_vec().into(),
|
||||
"Game directory not set".to_string(),
|
||||
)
|
||||
.encode(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
log::error!("Failed to send GetGameFileData error: {e}");
|
||||
}
|
||||
}
|
||||
Request::GetGameFileChunk {
|
||||
game_id,
|
||||
relative_path,
|
||||
offset,
|
||||
length,
|
||||
} => {
|
||||
log::info!(
|
||||
"{remote_addr:?} received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
|
||||
);
|
||||
|
||||
let maybe_game_dir = ctx.game_dir.read().await.clone();
|
||||
if let Some(game_dir) = maybe_game_dir {
|
||||
let base_dir = PathBuf::from(game_dir);
|
||||
// For file data, we need the raw stream, so we unwrap the FramedWrite
|
||||
let mut tx = framed_tx.into_inner();
|
||||
send_game_file_chunk(
|
||||
&game_id,
|
||||
&relative_path,
|
||||
offset,
|
||||
length,
|
||||
&mut tx,
|
||||
&base_dir,
|
||||
)
|
||||
.await;
|
||||
// Re-wrap for next iteration
|
||||
framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
} else if let Err(e) = framed_tx
|
||||
.send(
|
||||
Response::InvalidRequest(
|
||||
relative_path.as_bytes().to_vec().into(),
|
||||
"Game directory not set".to_string(),
|
||||
)
|
||||
.encode(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
log::error!("Failed to send GetGameFileChunk error: {e}");
|
||||
}
|
||||
}
|
||||
Request::Invalid(_, _) => {
|
||||
log::error!("Received invalid request from peer");
|
||||
}
|
||||
Request::AnnounceGames(games) => {
|
||||
log::info!(
|
||||
"Received {} announced games from peer {remote_addr:?}",
|
||||
games.len()
|
||||
);
|
||||
if let Some(addr) = remote_addr {
|
||||
let aggregated_games = {
|
||||
let mut db = ctx.peer_game_db.write().await;
|
||||
db.update_peer_games(addr, games);
|
||||
db.get_all_games()
|
||||
};
|
||||
|
||||
if let Err(e) = ctx
|
||||
.tx_notify_ui
|
||||
.send(PeerEvent::ListGames(aggregated_games))
|
||||
{
|
||||
log::error!("Failed to send ListGames event: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(Err(e)) => {
|
||||
log::error!("{remote_addr:?} peer stream error: {e}");
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
log::trace!("{remote_addr:?} peer stream closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Peer discovery
|
||||
// =============================================================================
|
||||
|
||||
/// Runs the peer discovery service using mDNS.
|
||||
pub async fn run_peer_discovery(
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
|
||||
) {
|
||||
log::info!("Starting peer discovery task");
|
||||
|
||||
let service_type = LANSPREAD_SERVICE_TYPE.to_string();
|
||||
|
||||
loop {
|
||||
let (addr_tx, mut addr_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let service_type_clone = service_type.clone();
|
||||
|
||||
let worker_handle = tokio::task::spawn_blocking(move || -> eyre::Result<()> {
|
||||
let browser = MdnsBrowser::new(&service_type_clone)?;
|
||||
loop {
|
||||
if let Some(addr) = browser.next_address(None)? {
|
||||
if addr_tx.send(addr).is_err() {
|
||||
log::debug!("Peer discovery consumer dropped; stopping worker");
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
log::warn!("mDNS browser closed; stopping peer discovery worker");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
while let Some(peer_addr) = addr_rx.recv().await {
|
||||
let is_self = {
|
||||
let guard = local_peer_addr.read().await;
|
||||
guard.as_ref().is_some_and(|addr| *addr == peer_addr)
|
||||
};
|
||||
|
||||
if is_self {
|
||||
log::trace!("Ignoring self advertisement at {peer_addr}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let is_new_peer = {
|
||||
let mut db = peer_game_db.write().await;
|
||||
if db.contains_peer(&peer_addr) {
|
||||
db.update_last_seen(&peer_addr);
|
||||
false
|
||||
} else {
|
||||
db.add_peer(peer_addr);
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if is_new_peer {
|
||||
log::info!("Discovered peer at: {peer_addr}");
|
||||
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) {
|
||||
log::error!("Failed to send PeerDiscovered event: {e}");
|
||||
}
|
||||
|
||||
let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() };
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) {
|
||||
log::error!("Failed to send PeerCountUpdated event: {e}");
|
||||
}
|
||||
|
||||
let tx_notify_ui_clone = tx_notify_ui.clone();
|
||||
let peer_game_db_clone = peer_game_db.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = request_games_from_peer(
|
||||
peer_addr,
|
||||
tx_notify_ui_clone,
|
||||
peer_game_db_clone,
|
||||
0,
|
||||
)
|
||||
.await
|
||||
{
|
||||
log::error!("Failed to request games from peer {peer_addr}: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
match worker_handle.await {
|
||||
Ok(Ok(())) => {
|
||||
log::warn!("Peer discovery worker exited; restarting shortly");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
log::error!("Peer discovery worker failed: {e}");
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Peer discovery worker join error: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests games from a peer with retry logic.
|
||||
async fn request_games_from_peer(
|
||||
peer_addr: SocketAddr,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
mut retry_count: u32,
|
||||
) -> eyre::Result<()> {
|
||||
loop {
|
||||
match fetch_games_from_peer(peer_addr).await {
|
||||
Ok(games) => {
|
||||
log::info!("Received {} games from peer {peer_addr}", games.len());
|
||||
|
||||
if games.is_empty() && retry_count < 1 {
|
||||
log::info!("Received 0 games from peer {peer_addr}, scheduling retry in 5s");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
retry_count += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let aggregated_games = {
|
||||
let mut db = peer_game_db.write().await;
|
||||
db.update_peer_games(peer_addr, games);
|
||||
db.get_all_games()
|
||||
};
|
||||
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) {
|
||||
log::error!("Failed to send ListGames event: {e}");
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Ping service
|
||||
// =============================================================================
|
||||
|
||||
/// Runs the ping service to check peer liveness.
|
||||
#[allow(clippy::too_many_lines)]
|
||||
pub async fn run_ping_service(
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
downloading_games: Arc<RwLock<HashSet<String>>>,
|
||||
active_downloads: Arc<RwLock<HashMap<String, JoinHandle<()>>>>,
|
||||
) {
|
||||
log::info!(
|
||||
"Starting ping service ({PEER_PING_INTERVAL_SECS}s interval, \
|
||||
{}s timeout)",
|
||||
peer_stale_timeout().as_secs()
|
||||
);
|
||||
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(PEER_PING_INTERVAL_SECS));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let peer_addresses = { peer_game_db.read().await.get_peer_addresses() };
|
||||
|
||||
for peer_addr in peer_addresses {
|
||||
let tx_notify_ui_clone = tx_notify_ui.clone();
|
||||
let peer_game_db_clone = peer_game_db.clone();
|
||||
let downloading_games_clone = downloading_games.clone();
|
||||
let active_downloads_clone = active_downloads.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
match ping_peer(peer_addr).await {
|
||||
Ok(is_alive) => {
|
||||
if is_alive {
|
||||
// Update last seen time
|
||||
peer_game_db_clone
|
||||
.write()
|
||||
.await
|
||||
.update_last_seen(&peer_addr);
|
||||
} else {
|
||||
log::warn!("Peer {peer_addr} failed ping check");
|
||||
|
||||
// Remove stale peer
|
||||
let removed_peer =
|
||||
peer_game_db_clone.write().await.remove_peer(&peer_addr);
|
||||
if removed_peer.is_some() {
|
||||
log::info!("Removed stale peer: {peer_addr}");
|
||||
if let Err(e) =
|
||||
tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr))
|
||||
{
|
||||
log::error!("Failed to send PeerLost event: {e}");
|
||||
}
|
||||
|
||||
// Send updated peer count
|
||||
let current_peer_count =
|
||||
{ peer_game_db_clone.read().await.get_peer_addresses().len() };
|
||||
if let Err(e) = tx_notify_ui_clone
|
||||
.send(PeerEvent::PeerCountUpdated(current_peer_count))
|
||||
{
|
||||
log::error!("Failed to send PeerCountUpdated event: {e}");
|
||||
}
|
||||
|
||||
emit_peer_game_list(&peer_game_db_clone, &tx_notify_ui_clone).await;
|
||||
handle_active_downloads_without_peers(
|
||||
&peer_game_db_clone,
|
||||
&downloading_games_clone,
|
||||
&active_downloads_clone,
|
||||
&tx_notify_ui_clone,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to ping peer {peer_addr}: {e}");
|
||||
|
||||
// Remove peer on error
|
||||
let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_addr);
|
||||
if removed_peer.is_some() {
|
||||
log::info!("Removed peer due to ping error: {peer_addr}");
|
||||
if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr))
|
||||
{
|
||||
log::error!("Failed to send PeerLost event: {e}");
|
||||
}
|
||||
|
||||
// Send updated peer count
|
||||
let current_peer_count =
|
||||
{ peer_game_db_clone.read().await.get_peer_addresses().len() };
|
||||
if let Err(e) = tx_notify_ui_clone
|
||||
.send(PeerEvent::PeerCountUpdated(current_peer_count))
|
||||
{
|
||||
log::error!("Failed to send PeerCountUpdated event: {e}");
|
||||
}
|
||||
|
||||
emit_peer_game_list(&peer_game_db_clone, &tx_notify_ui_clone).await;
|
||||
handle_active_downloads_without_peers(
|
||||
&peer_game_db_clone,
|
||||
&downloading_games_clone,
|
||||
&active_downloads_clone,
|
||||
&tx_notify_ui_clone,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Also clean up stale peers
|
||||
let stale_peers = {
|
||||
peer_game_db
|
||||
.read()
|
||||
.await
|
||||
.get_stale_peers(peer_stale_timeout())
|
||||
};
|
||||
let mut removed_any = false;
|
||||
for stale_addr in stale_peers {
|
||||
let removed_peer = peer_game_db.write().await.remove_peer(&stale_addr);
|
||||
if removed_peer.is_some() {
|
||||
log::info!("Removed stale peer: {stale_addr}");
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerLost(stale_addr)) {
|
||||
log::error!("Failed to send PeerLost event: {e}");
|
||||
}
|
||||
|
||||
// Send updated peer count
|
||||
let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() };
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) {
|
||||
log::error!("Failed to send PeerCountUpdated event: {e}");
|
||||
}
|
||||
removed_any = true;
|
||||
}
|
||||
}
|
||||
|
||||
if removed_any {
|
||||
emit_peer_game_list(&peer_game_db, &tx_notify_ui).await;
|
||||
handle_active_downloads_without_peers(
|
||||
&peer_game_db,
|
||||
&downloading_games,
|
||||
&active_downloads,
|
||||
&tx_notify_ui,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles downloads that no longer have peers available.
|
||||
async fn handle_active_downloads_without_peers(
|
||||
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
||||
downloading_games: &Arc<RwLock<HashSet<String>>>,
|
||||
active_downloads: &Arc<RwLock<HashMap<String, JoinHandle<()>>>>,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
) {
|
||||
let active_ids: Vec<String> = { downloading_games.read().await.iter().cloned().collect() };
|
||||
if active_ids.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
for id in active_ids {
|
||||
let has_peers = {
|
||||
let guard = peer_game_db.read().await;
|
||||
!guard.peers_with_game(&id).is_empty()
|
||||
};
|
||||
|
||||
if has_peers {
|
||||
continue;
|
||||
}
|
||||
|
||||
let removed_from_tracking = {
|
||||
let mut guard = downloading_games.write().await;
|
||||
guard.remove(&id)
|
||||
};
|
||||
|
||||
if !removed_from_tracking {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(handle) = { active_downloads.write().await.remove(&id) } {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
if let Err(e) =
|
||||
tx_notify_ui.send(PeerEvent::DownloadGameFilesAllPeersGone { id: id.clone() })
|
||||
{
|
||||
log::error!("Failed to send DownloadGameFilesAllPeersGone event: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Local game monitor
|
||||
// =============================================================================
|
||||
|
||||
/// Monitors the local game directory for changes.
|
||||
pub async fn run_local_game_monitor(tx_notify_ui: UnboundedSender<PeerEvent>, ctx: Ctx) {
|
||||
log::info!(
|
||||
"Starting local game directory monitor ({LOCAL_GAME_MONITOR_INTERVAL_SECS}s interval)"
|
||||
);
|
||||
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(LOCAL_GAME_MONITOR_INTERVAL_SECS));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let game_dir = {
|
||||
let guard = ctx.game_dir.read().await;
|
||||
guard.clone()
|
||||
};
|
||||
|
||||
if let Some(ref game_dir) = game_dir {
|
||||
match scan_local_games(game_dir).await {
|
||||
Ok(current_games) => {
|
||||
update_and_announce_games(&ctx, &tx_notify_ui, current_games).await;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Failed to scan local games directory: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user