Files
lanspread/crates/lanspread-peer/src/services.rs
T

1226 lines
48 KiB
Rust

//! Background services for the peer system.
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
path::PathBuf,
sync::Arc,
time::Duration,
};
use futures::{SinkExt, StreamExt};
use lanspread_db::db::Game;
use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser, MdnsService};
use lanspread_proto::{
Hello,
HelloAck,
LibraryDelta,
LibrarySnapshot,
Message,
PROTOCOL_VERSION,
Request,
Response,
};
use s2n_quic::{Connection, Server, provider::limits::Limits, stream::BidirectionalStream};
use tokio::{
sync::{RwLock, mpsc::UnboundedSender},
task::JoinHandle,
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::{
PeerEvent,
config::{
CERT_PEM,
KEY_PEM,
LOCAL_GAME_MONITOR_INTERVAL_SECS,
PEER_PING_INTERVAL_SECS,
peer_stale_timeout,
},
context::{Ctx, PeerCtx},
error::PeerError,
handlers::{emit_peer_game_list, update_and_announce_games},
identity::default_features,
library::{
LocalLibraryState,
build_library_snapshot,
build_library_summary,
compute_library_digest,
},
local_games::{get_game_file_descriptions, scan_local_library},
network::{
exchange_hello,
fetch_games_from_peer,
ping_peer,
select_advertise_ip,
send_library_delta,
send_library_snapshot,
send_library_summary,
},
peer::{send_game_file_chunk, send_game_file_data},
peer_db::{PeerGameDB, PeerId, PeerUpsert},
};
// =============================================================================
// Server component
// =============================================================================
/// Runs the QUIC server and mDNS advertiser.
pub async fn run_server_component(
addr: SocketAddr,
ctx: PeerCtx,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<()> {
let limits = Limits::default()
.with_max_handshake_duration(Duration::from_secs(3))?
.with_max_idle_timeout(Duration::from_secs(3))?;
let mut server = Server::builder()
.with_tls((CERT_PEM, KEY_PEM))?
.with_io(addr)?
.with_limits(limits)?
.start()?;
let server_addr = server.local_addr()?;
log::info!("Peer server listening on {server_addr}");
let advertise_ip = select_advertise_ip()?;
let advertise_addr = SocketAddr::new(advertise_ip, server_addr.port());
log::info!("Advertising peer via mDNS from {advertise_addr}");
{
let mut guard = ctx.local_peer_addr.write().await;
*guard = Some(advertise_addr);
}
// Start mDNS advertising for peer discovery
let peer_id = ctx.peer_id.as_ref().clone();
let hostname = gethostname::gethostname();
let hostname_str = hostname.to_str().unwrap_or("");
// Calculate maximum hostname length that fits with UUID in 63 char limit
let max_hostname_len = 63usize.saturating_sub(peer_id.len() + 1);
let truncated_hostname = if hostname_str.len() > max_hostname_len {
hostname_str.get(..max_hostname_len).unwrap_or(hostname_str)
} else {
hostname_str
};
let combined_str = if truncated_hostname.is_empty() {
peer_id.clone()
} else {
format!("{truncated_hostname}-{peer_id}")
};
let (library_rev, library_digest) = {
let library_guard = ctx.local_library.read().await;
(library_guard.revision, library_guard.digest)
};
let mut properties = HashMap::new();
properties.insert("peer_id".to_string(), peer_id.clone());
properties.insert("proto_ver".to_string(), PROTOCOL_VERSION.to_string());
properties.insert("library_rev".to_string(), library_rev.to_string());
properties.insert("library_digest".to_string(), library_digest.to_string());
if !hostname_str.is_empty() {
properties.insert("hostname".to_string(), hostname_str.to_string());
}
let mdns = tokio::task::spawn_blocking(move || {
MdnsAdvertiser::new(
LANSPREAD_SERVICE_TYPE,
&combined_str,
advertise_addr,
Some(properties),
)
})
.await??;
// Monitor mDNS events
let _tx_notify_ui_mdns = tx_notify_ui.clone();
let hostname = truncated_hostname.to_string();
tokio::spawn(async move {
log::info!("Registering mDNS service with hostname: {hostname}");
while let Ok(event) = mdns.monitor.recv() {
match event {
lanspread_mdns::DaemonEvent::Error(e) => {
log::error!("mDNS error: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
_ => {
log::trace!("mDNS event: {event:?}");
}
}
}
});
while let Some(connection) = server.accept().await {
let ctx = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
tokio::spawn(async move {
if let Err(e) = handle_peer_connection(connection, ctx, tx_notify_ui).await {
log::error!("Peer connection error: {e}");
}
});
}
Ok(())
}
/// Handles an incoming peer connection.
async fn handle_peer_connection(
mut connection: Connection,
ctx: PeerCtx,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<()> {
let remote_addr = connection.remote_addr()?;
log::info!("{remote_addr} peer connected");
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerConnected(remote_addr)) {
log::error!("Failed to send PeerConnected event: {e}");
}
// handle streams
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
let ctx = ctx.clone();
let remote_addr = Some(remote_addr);
tokio::spawn(async move {
if let Err(e) = handle_peer_stream(stream, ctx, remote_addr).await {
log::error!("{remote_addr:?} peer stream error: {e}");
}
});
}
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDisconnected(remote_addr)) {
log::error!("Failed to send PeerDisconnected event: {e}");
}
Ok(())
}
enum LibraryUpdate {
Delta(LibraryDelta),
Snapshot(LibrarySnapshot),
}
async fn build_hello_from_state(
peer_id: &str,
local_library: &Arc<RwLock<LocalLibraryState>>,
) -> Hello {
let library_guard = local_library.read().await;
Hello {
peer_id: peer_id.to_string(),
proto_ver: PROTOCOL_VERSION,
library_rev: library_guard.revision,
library_digest: library_guard.digest,
features: default_features(),
}
}
async fn build_hello_ack(ctx: &PeerCtx) -> HelloAck {
let library_guard = ctx.local_library.read().await;
HelloAck {
peer_id: ctx.peer_id.as_ref().clone(),
proto_ver: PROTOCOL_VERSION,
library_rev: library_guard.revision,
library_digest: library_guard.digest,
features: default_features(),
}
}
async fn select_library_update(
local_library: &Arc<RwLock<LocalLibraryState>>,
remote_rev: u64,
remote_digest: u64,
) -> Option<LibraryUpdate> {
let library_guard = local_library.read().await;
if library_guard.digest == remote_digest {
return None;
}
if remote_rev > library_guard.revision {
return None;
}
if let Some(delta) = library_guard.delta_since(remote_rev) {
return Some(LibraryUpdate::Delta(delta));
}
Some(LibraryUpdate::Snapshot(build_library_snapshot(
&library_guard,
)))
}
async fn ensure_peer_id_for_addr(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
peer_addr: SocketAddr,
) -> PeerId {
let mut db = peer_game_db.write().await;
if let Some(peer_id) = db.peer_id_for_addr(&peer_addr).cloned() {
return peer_id;
}
let legacy_id = format!("legacy-{peer_addr}");
db.upsert_peer(legacy_id.clone(), peer_addr);
legacy_id
}
fn summary_from_game(game: &Game) -> lanspread_proto::GameSummary {
lanspread_proto::GameSummary {
id: game.id.clone(),
name: game.name.clone(),
size: game.size,
downloaded: game.downloaded,
installed: game.installed,
eti_version: game.eti_game_version.clone(),
manifest_hash: 0,
availability: lanspread_proto::Availability::Ready,
}
}
async fn perform_handshake_with_peer(
peer_id: Arc<String>,
local_library: Arc<RwLock<LocalLibraryState>>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_addr: SocketAddr,
peer_id_hint: Option<PeerId>,
) -> eyre::Result<()> {
let hello = build_hello_from_state(peer_id.as_ref(), &local_library).await;
let ack = exchange_hello(peer_addr, hello).await?;
if ack.proto_ver != PROTOCOL_VERSION {
log::warn!(
"Peer {peer_addr} uses incompatible protocol {} (expected {PROTOCOL_VERSION})",
ack.proto_ver
);
return Ok(());
}
if ack.peer_id == *peer_id {
log::trace!("Ignoring handshake with self for {peer_addr}");
return Ok(());
}
if let Some(expected) = peer_id_hint.as_ref()
&& expected != &ack.peer_id
{
log::warn!(
"Peer {peer_addr} id mismatch: mDNS advertised {expected}, hello ack returned {}",
ack.peer_id
);
let _ = peer_game_db.write().await.remove_peer(expected);
}
let upsert = {
let mut db = peer_game_db.write().await;
let upsert = db.upsert_peer(ack.peer_id.clone(), peer_addr);
db.update_peer_library(
&ack.peer_id,
ack.library_rev,
ack.library_digest,
ack.features.clone(),
);
upsert
};
if upsert.is_new {
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) {
log::error!("Failed to send PeerDiscovered event: {e}");
}
let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) {
log::error!("Failed to send PeerCountUpdated event: {e}");
}
let summary = {
let library_guard = local_library.read().await;
build_library_summary(&library_guard)
};
tokio::spawn(async move {
if let Err(e) = send_library_summary(peer_addr, summary).await {
log::warn!("Failed to send library summary to {peer_addr}: {e}");
}
});
}
if let Some(update) =
select_library_update(&local_library, ack.library_rev, ack.library_digest).await
{
tokio::spawn(async move {
let result = match update {
LibraryUpdate::Delta(delta) => send_library_delta(peer_addr, delta).await,
LibraryUpdate::Snapshot(snapshot) => {
send_library_snapshot(peer_addr, snapshot).await
}
};
if let Err(e) = result {
log::warn!("Failed to send library update to {peer_addr}: {e}");
}
});
}
Ok(())
}
struct MdnsPeerInfo {
addr: SocketAddr,
peer_id: Option<PeerId>,
proto_ver: Option<u32>,
library_rev: u64,
library_digest: u64,
}
fn parse_mdns_peer(service: &MdnsService) -> MdnsPeerInfo {
let peer_id = service.properties.get("peer_id").cloned();
let proto_ver = service
.properties
.get("proto_ver")
.and_then(|value| value.parse::<u32>().ok());
let library_rev = service
.properties
.get("library_rev")
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0);
let library_digest = service
.properties
.get("library_digest")
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0);
MdnsPeerInfo {
addr: service.addr,
peer_id,
proto_ver,
library_rev,
library_digest,
}
}
async fn is_self_advertisement(info: &MdnsPeerInfo, ctx: &Ctx) -> bool {
let guard = ctx.local_peer_addr.read().await;
guard.as_ref().is_some_and(|addr| *addr == info.addr)
|| info
.peer_id
.as_ref()
.is_some_and(|peer_id| peer_id == ctx.peer_id.as_ref())
}
async fn handle_discovered_peer(
info: MdnsPeerInfo,
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) {
let peer_id = info
.peer_id
.unwrap_or_else(|| format!("legacy-{}", info.addr));
let upsert = {
let mut db = ctx.peer_game_db.write().await;
let upsert = db.upsert_peer(peer_id.clone(), info.addr);
let features = db.peer_features(&peer_id);
if info.library_rev > 0 || info.library_digest > 0 {
db.update_peer_library(&peer_id, info.library_rev, info.library_digest, features);
}
upsert
};
if upsert.is_new {
log::info!("Discovered peer at: {}", info.addr);
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(info.addr)) {
log::error!("Failed to send PeerDiscovered event: {e}");
}
let current_peer_count = ctx.peer_game_db.read().await.get_peer_addresses().len();
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) {
log::error!("Failed to send PeerCountUpdated event: {e}");
}
}
if upsert.is_new || upsert.addr_changed {
let peer_id_arc = ctx.peer_id.clone();
let local_library = ctx.local_library.clone();
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui_clone = tx_notify_ui.clone();
let peer_id_hint = Some(peer_id.clone());
tokio::spawn(async move {
let handshake_result =
if info.proto_ver.is_none() || info.proto_ver == Some(PROTOCOL_VERSION) {
perform_handshake_with_peer(
peer_id_arc,
local_library,
peer_game_db.clone(),
tx_notify_ui_clone.clone(),
info.addr,
peer_id_hint.clone(),
)
.await
} else {
Err(eyre::eyre!("Skipping hello for legacy peer"))
};
if handshake_result.is_err()
&& let Err(e) =
request_games_from_peer(info.addr, tx_notify_ui_clone, peer_game_db, 0).await
{
log::error!("Failed to request games from peer {}: {e}", info.addr);
}
});
}
}
/// Handles a bidirectional stream from a peer.
#[allow(clippy::too_many_lines)]
async fn handle_peer_stream(
stream: BidirectionalStream,
ctx: PeerCtx,
remote_addr: Option<SocketAddr>,
) -> eyre::Result<()> {
let (rx, tx) = stream.split();
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
log::trace!("{remote_addr:?} peer stream opened");
// handle streams
loop {
match framed_rx.next().await {
Some(Ok(data)) => {
log::trace!(
"{:?} msg: (raw): {}",
remote_addr,
String::from_utf8_lossy(&data)
);
let request = Request::decode(data.freeze());
log::debug!("{remote_addr:?} msg: {request:?}");
if let Some(addr) = remote_addr {
ctx.peer_game_db
.write()
.await
.update_last_seen_by_addr(&addr);
}
match request {
Request::Ping => {
if let Err(e) = framed_tx.send(Response::Pong.encode()).await {
log::error!("Failed to send pong: {e}");
}
}
Request::Hello(hello) => {
if hello.peer_id == *ctx.peer_id {
log::trace!("Ignoring hello from self");
let ack = build_hello_ack(&ctx).await;
if let Err(e) = framed_tx.send(Response::HelloAck(ack).encode()).await {
log::error!("Failed to send HelloAck: {e}");
}
continue;
}
if hello.proto_ver != PROTOCOL_VERSION {
log::warn!(
"Incompatible protocol from {remote_addr:?}: {}",
hello.proto_ver
);
let ack = build_hello_ack(&ctx).await;
if let Err(e) = framed_tx.send(Response::HelloAck(ack).encode()).await {
log::error!("Failed to send HelloAck: {e}");
}
continue;
}
let upsert = if let Some(addr) = remote_addr {
let mut db = ctx.peer_game_db.write().await;
let upsert = db.upsert_peer(hello.peer_id.clone(), addr);
db.update_peer_library(
&hello.peer_id,
hello.library_rev,
hello.library_digest,
hello.features.clone(),
);
upsert
} else {
PeerUpsert {
is_new: false,
addr_changed: false,
}
};
if upsert.is_new
&& let Some(addr) = remote_addr
{
if let Err(e) = ctx.tx_notify_ui.send(PeerEvent::PeerDiscovered(addr)) {
log::error!("Failed to send PeerDiscovered event: {e}");
}
let current_peer_count =
{ ctx.peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) = ctx
.tx_notify_ui
.send(PeerEvent::PeerCountUpdated(current_peer_count))
{
log::error!("Failed to send PeerCountUpdated event: {e}");
}
}
let ack = build_hello_ack(&ctx).await;
if let Err(e) = framed_tx.send(Response::HelloAck(ack).encode()).await {
log::error!("Failed to send HelloAck: {e}");
}
if let Some(addr) = remote_addr {
if upsert.is_new {
let summary = {
let library_guard = ctx.local_library.read().await;
build_library_summary(&library_guard)
};
tokio::spawn(async move {
if let Err(e) = send_library_summary(addr, summary).await {
log::warn!("Failed to send library summary to {addr}: {e}");
}
});
}
if let Some(update) = select_library_update(
&ctx.local_library,
hello.library_rev,
hello.library_digest,
)
.await
{
tokio::spawn(async move {
let result = match update {
LibraryUpdate::Delta(delta) => {
send_library_delta(addr, delta).await
}
LibraryUpdate::Snapshot(snapshot) => {
send_library_snapshot(addr, snapshot).await
}
};
if let Err(e) = result {
log::warn!("Failed to send library update to {addr}: {e}");
}
});
}
}
}
Request::ListGames => {
log::info!("Received ListGames request from peer");
let snapshot = {
let db_guard = ctx.local_game_db.read().await;
if let Some(ref db) = *db_guard {
db.all_games().into_iter().cloned().collect::<Vec<Game>>()
} else {
log::info!(
"Local game database not yet loaded, responding with empty game list"
);
Vec::new()
}
};
let games = if snapshot.is_empty() {
snapshot
} else {
let downloading = ctx.downloading_games.read().await;
snapshot
.into_iter()
.filter(|game| !downloading.contains(&game.id))
.collect()
};
if let Err(e) = framed_tx.send(Response::ListGames(games).encode()).await {
log::error!("Failed to send ListGames response: {e}");
}
}
Request::LibrarySummary(summary) => {
if let Some(addr) = remote_addr {
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
let (previous_digest, previous_count, features) = {
let db = ctx.peer_game_db.read().await;
let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0));
(
digest,
db.peer_game_count(&peer_id),
db.peer_features(&peer_id),
)
};
{
let mut db = ctx.peer_game_db.write().await;
db.update_peer_library(
&peer_id,
summary.library_rev,
summary.library_digest,
features,
);
}
if summary.library_digest != previous_digest || previous_count == 0 {
let peer_id_arc = ctx.peer_id.clone();
let local_library = ctx.local_library.clone();
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui = ctx.tx_notify_ui.clone();
tokio::spawn(async move {
if let Err(e) = perform_handshake_with_peer(
peer_id_arc,
local_library,
peer_game_db,
tx_notify_ui,
addr,
Some(peer_id),
)
.await
{
log::warn!("Failed to refresh library from {addr}: {e}");
}
});
}
}
}
Request::LibrarySnapshot(snapshot) => {
if let Some(addr) = remote_addr {
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
{
let mut db = ctx.peer_game_db.write().await;
db.apply_library_snapshot(&peer_id, snapshot);
}
emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
}
}
Request::LibraryDelta(delta) => {
if let Some(addr) = remote_addr {
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
let applied = {
let mut db = ctx.peer_game_db.write().await;
db.apply_library_delta(&peer_id, delta)
};
if applied {
emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
} else {
let peer_id_arc = ctx.peer_id.clone();
let local_library = ctx.local_library.clone();
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui = ctx.tx_notify_ui.clone();
tokio::spawn(async move {
if let Err(e) = perform_handshake_with_peer(
peer_id_arc,
local_library,
peer_game_db,
tx_notify_ui,
addr,
Some(peer_id),
)
.await
{
log::warn!("Failed to resync library from {addr}: {e}");
}
});
}
}
}
Request::GetGame { id } => {
log::info!("Received GetGame request for {id} from peer");
let downloading = ctx.downloading_games.read().await.contains(&id);
let response = if downloading {
log::info!(
"Declining to serve GetGame for {id} because download is in progress"
);
Response::GameNotFound(id)
} else if let Some(ref game_dir) = *ctx.game_dir.read().await {
if let Some(ref db) = *ctx.local_game_db.read().await {
if db.get_game_by_id(&id).is_some() {
match get_game_file_descriptions(&id, game_dir).await {
Ok(file_descriptions) => Response::GetGame {
id,
file_descriptions,
},
Err(PeerError::FileSizeDetermination { path, source }) => {
let error_msg = format!(
"Failed to determine file size for {path}: {source}"
);
log::error!(
"File size determination error for game {id}: {error_msg}"
);
Response::InternalPeerError(error_msg)
}
Err(e) => {
log::error!(
"Failed to get game file descriptions for {id}: {e}"
);
Response::GameNotFound(id)
}
}
} else {
Response::GameNotFound(id)
}
} else {
Response::GameNotFound(id)
}
} else {
Response::GameNotFound(id)
};
if let Err(e) = framed_tx.send(response.encode()).await {
log::error!("Failed to send GetGame response: {e}");
}
}
Request::GetGameFileData(desc) => {
log::info!(
"Received GetGameFileData request for {} from peer",
desc.relative_path
);
let maybe_game_dir = ctx.game_dir.read().await.clone();
if let Some(game_dir) = maybe_game_dir {
let base_dir = PathBuf::from(game_dir);
let mut tx = framed_tx.into_inner();
send_game_file_data(&desc, &mut tx, &base_dir).await;
framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
} else if let Err(e) = framed_tx
.send(
Response::InvalidRequest(
desc.relative_path.as_bytes().to_vec().into(),
"Game directory not set".to_string(),
)
.encode(),
)
.await
{
log::error!("Failed to send GetGameFileData error: {e}");
}
}
Request::GetGameFileChunk {
game_id,
relative_path,
offset,
length,
} => {
log::info!(
"{remote_addr:?} received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
);
let maybe_game_dir = ctx.game_dir.read().await.clone();
if let Some(game_dir) = maybe_game_dir {
let base_dir = PathBuf::from(game_dir);
let mut tx = framed_tx.into_inner();
send_game_file_chunk(
&game_id,
&relative_path,
offset,
length,
&mut tx,
&base_dir,
)
.await;
framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
} else if let Err(e) = framed_tx
.send(
Response::InvalidRequest(
relative_path.as_bytes().to_vec().into(),
"Game directory not set".to_string(),
)
.encode(),
)
.await
{
log::error!("Failed to send GetGameFileChunk error: {e}");
}
}
Request::Goodbye { peer_id } => {
log::info!("Received Goodbye from peer {peer_id}");
let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) };
if removed.is_some() {
if let Some(addr) = remote_addr {
if let Err(e) = ctx.tx_notify_ui.send(PeerEvent::PeerLost(addr)) {
log::error!("Failed to send PeerLost event: {e}");
}
let current_peer_count =
{ ctx.peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) = ctx
.tx_notify_ui
.send(PeerEvent::PeerCountUpdated(current_peer_count))
{
log::error!("Failed to send PeerCountUpdated event: {e}");
}
}
emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
}
}
Request::Invalid(_, _) => {
log::error!("Received invalid request from peer");
}
Request::AnnounceGames(games) => {
log::info!(
"Received {} announced games from peer {remote_addr:?}",
games.len()
);
if let Some(addr) = remote_addr {
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
let summaries: Vec<_> = games.iter().map(summary_from_game).collect();
let mut map = HashMap::with_capacity(summaries.len());
for summary in &summaries {
map.insert(summary.id.clone(), summary.clone());
}
let digest = compute_library_digest(&map);
let aggregated_games = {
let mut db = ctx.peer_game_db.write().await;
db.update_peer_games(&peer_id, summaries);
let features = db.peer_features(&peer_id);
db.update_peer_library(&peer_id, 0, digest, features);
db.get_all_games()
};
if let Err(e) = ctx
.tx_notify_ui
.send(PeerEvent::ListGames(aggregated_games))
{
log::error!("Failed to send ListGames event: {e}");
}
}
}
}
}
Some(Err(e)) => {
log::error!("{remote_addr:?} peer stream error: {e}");
break;
}
None => {
log::trace!("{remote_addr:?} peer stream closed");
break;
}
}
}
Ok(())
}
// =============================================================================
// Peer discovery
// =============================================================================
/// Runs the peer discovery service using mDNS.
pub async fn run_peer_discovery(tx_notify_ui: UnboundedSender<PeerEvent>, ctx: Ctx) {
log::info!("Starting peer discovery task");
let service_type = LANSPREAD_SERVICE_TYPE.to_string();
loop {
let (service_tx, mut service_rx) = tokio::sync::mpsc::unbounded_channel();
let service_type_clone = service_type.clone();
let worker_handle = tokio::task::spawn_blocking(move || -> eyre::Result<()> {
let browser = MdnsBrowser::new(&service_type_clone)?;
loop {
if let Some(service) = browser.next_service(None)? {
if service_tx.send(service).is_err() {
log::debug!("Peer discovery consumer dropped; stopping worker");
break;
}
} else {
log::warn!("mDNS browser closed; stopping peer discovery worker");
break;
}
}
Ok(())
});
while let Some(service) = service_rx.recv().await {
let info = parse_mdns_peer(&service);
if is_self_advertisement(&info, &ctx).await {
log::trace!("Ignoring self advertisement at {}", info.addr);
continue;
}
handle_discovered_peer(info, &ctx, &tx_notify_ui).await;
}
match worker_handle.await {
Ok(Ok(())) => {
log::warn!("Peer discovery worker exited; restarting shortly");
}
Ok(Err(e)) => {
log::error!("Peer discovery worker failed: {e}");
}
Err(e) => {
log::error!("Peer discovery worker join error: {e}");
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
/// Requests games from a peer with retry logic.
async fn request_games_from_peer(
peer_addr: SocketAddr,
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
mut retry_count: u32,
) -> eyre::Result<()> {
loop {
match fetch_games_from_peer(peer_addr).await {
Ok(games) => {
log::info!("Received {} games from peer {peer_addr}", games.len());
if games.is_empty() && retry_count < 1 {
log::info!("Received 0 games from peer {peer_addr}, scheduling retry in 5s");
tokio::time::sleep(Duration::from_secs(5)).await;
retry_count += 1;
continue;
}
let mut map = HashMap::with_capacity(games.len());
let mut summaries = Vec::with_capacity(games.len());
for game in &games {
let summary = summary_from_game(game);
map.insert(summary.id.clone(), summary.clone());
summaries.push(summary);
}
let digest = compute_library_digest(&map);
let peer_id = ensure_peer_id_for_addr(&peer_game_db, peer_addr).await;
let aggregated_games = {
let mut db = peer_game_db.write().await;
db.update_peer_games(&peer_id, summaries);
let features = db.peer_features(&peer_id);
db.update_peer_library(&peer_id, 0, digest, features);
db.get_all_games()
};
if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) {
log::error!("Failed to send ListGames event: {e}");
}
return Ok(());
}
Err(e) => return Err(e),
}
}
}
// =============================================================================
// Ping service
// =============================================================================
/// Runs the ping service to check peer liveness.
#[allow(clippy::too_many_lines)]
pub async fn run_ping_service(
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
downloading_games: Arc<RwLock<HashSet<String>>>,
active_downloads: Arc<RwLock<HashMap<String, JoinHandle<()>>>>,
) {
log::info!(
"Starting ping service ({PEER_PING_INTERVAL_SECS}s interval, \
{}s idle threshold, {}s timeout)",
crate::config::PEER_PING_IDLE_SECS,
peer_stale_timeout().as_secs()
);
let mut interval = tokio::time::interval(Duration::from_secs(PEER_PING_INTERVAL_SECS));
loop {
interval.tick().await;
let peer_snapshots = { peer_game_db.read().await.peer_liveness_snapshot() };
for (peer_id, peer_addr, last_seen) in peer_snapshots {
if last_seen.elapsed() < Duration::from_secs(crate::config::PEER_PING_IDLE_SECS) {
continue;
}
let tx_notify_ui_clone = tx_notify_ui.clone();
let peer_game_db_clone = peer_game_db.clone();
let downloading_games_clone = downloading_games.clone();
let active_downloads_clone = active_downloads.clone();
tokio::spawn(async move {
match ping_peer(peer_addr).await {
Ok(is_alive) => {
if is_alive {
peer_game_db_clone.write().await.update_last_seen(&peer_id);
} else {
log::warn!("Peer {peer_addr} failed ping check");
let removed_peer =
peer_game_db_clone.write().await.remove_peer(&peer_id);
if let Some(peer) = removed_peer {
log::info!("Removed stale peer: {}", peer.addr);
if let Err(e) =
tx_notify_ui_clone.send(PeerEvent::PeerLost(peer.addr))
{
log::error!("Failed to send PeerLost event: {e}");
}
let current_peer_count =
{ peer_game_db_clone.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui_clone
.send(PeerEvent::PeerCountUpdated(current_peer_count))
{
log::error!("Failed to send PeerCountUpdated event: {e}");
}
emit_peer_game_list(&peer_game_db_clone, &tx_notify_ui_clone).await;
handle_active_downloads_without_peers(
&peer_game_db_clone,
&downloading_games_clone,
&active_downloads_clone,
&tx_notify_ui_clone,
)
.await;
}
}
}
Err(e) => {
log::error!("Failed to ping peer {peer_addr}: {e}");
let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_id);
if let Some(peer) = removed_peer {
log::info!("Removed peer due to ping error: {}", peer.addr);
if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer.addr))
{
log::error!("Failed to send PeerLost event: {e}");
}
let current_peer_count =
{ peer_game_db_clone.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui_clone
.send(PeerEvent::PeerCountUpdated(current_peer_count))
{
log::error!("Failed to send PeerCountUpdated event: {e}");
}
emit_peer_game_list(&peer_game_db_clone, &tx_notify_ui_clone).await;
handle_active_downloads_without_peers(
&peer_game_db_clone,
&downloading_games_clone,
&active_downloads_clone,
&tx_notify_ui_clone,
)
.await;
}
}
}
});
}
// Also clean up stale peers
let stale_peers = {
peer_game_db
.read()
.await
.get_stale_peer_ids(peer_stale_timeout())
};
let mut removed_any = false;
for stale_peer_id in stale_peers {
let removed_peer = peer_game_db.write().await.remove_peer(&stale_peer_id);
if let Some(peer) = removed_peer {
log::info!("Removed stale peer: {}", peer.addr);
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerLost(peer.addr)) {
log::error!("Failed to send PeerLost event: {e}");
}
// Send updated peer count
let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) {
log::error!("Failed to send PeerCountUpdated event: {e}");
}
removed_any = true;
}
}
if removed_any {
emit_peer_game_list(&peer_game_db, &tx_notify_ui).await;
handle_active_downloads_without_peers(
&peer_game_db,
&downloading_games,
&active_downloads,
&tx_notify_ui,
)
.await;
}
}
}
/// Handles downloads that no longer have peers available.
async fn handle_active_downloads_without_peers(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
downloading_games: &Arc<RwLock<HashSet<String>>>,
active_downloads: &Arc<RwLock<HashMap<String, JoinHandle<()>>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) {
let active_ids: Vec<String> = { downloading_games.read().await.iter().cloned().collect() };
if active_ids.is_empty() {
return;
}
for id in active_ids {
let has_peers = {
let guard = peer_game_db.read().await;
!guard.peers_with_game(&id).is_empty()
};
if has_peers {
continue;
}
let removed_from_tracking = {
let mut guard = downloading_games.write().await;
guard.remove(&id)
};
if !removed_from_tracking {
continue;
}
if let Some(handle) = { active_downloads.write().await.remove(&id) } {
handle.abort();
}
if let Err(e) =
tx_notify_ui.send(PeerEvent::DownloadGameFilesAllPeersGone { id: id.clone() })
{
log::error!("Failed to send DownloadGameFilesAllPeersGone event: {e}");
}
}
}
// =============================================================================
// Local game monitor
// =============================================================================
/// Monitors the local game directory for changes.
pub async fn run_local_game_monitor(tx_notify_ui: UnboundedSender<PeerEvent>, ctx: Ctx) {
log::info!(
"Starting local game directory monitor ({LOCAL_GAME_MONITOR_INTERVAL_SECS}s interval)"
);
let mut interval = tokio::time::interval(Duration::from_secs(LOCAL_GAME_MONITOR_INTERVAL_SECS));
loop {
interval.tick().await;
let game_dir = {
let guard = ctx.game_dir.read().await;
guard.clone()
};
if let Some(ref game_dir) = game_dir {
match scan_local_library(game_dir).await {
Ok(scan) => {
update_and_announce_games(&ctx, &tx_notify_ui, scan).await;
}
Err(e) => {
log::error!("Failed to scan local games directory: {e}");
}
}
}
}
}