Files
lanspread/crates/lanspread-peer/src/lib.rs
T
2025-11-14 01:44:39 +01:00

2567 lines
84 KiB
Rust

#![allow(clippy::missing_errors_doc)]
mod path_validation;
mod peer;
use std::{
cmp::Reverse,
collections::{HashMap, HashSet, VecDeque},
io::ErrorKind,
net::{IpAddr, SocketAddr},
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
use bytes::BytesMut;
use if_addrs::{IfAddr, Interface, get_if_addrs};
use lanspread_db::db::{Game, GameDB, GameFileDescription};
use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, MdnsBrowser};
use lanspread_proto::{Message, Request, Response};
use s2n_quic::{
Client as QuicClient,
Connection,
Server,
client::Connect,
provider::limits::Limits,
stream::BidirectionalStream,
};
use tokio::{
fs::OpenOptions,
io::{AsyncSeekExt, AsyncWriteExt},
sync::{
RwLock,
mpsc::{UnboundedReceiver, UnboundedSender},
},
};
use uuid::Uuid;
use crate::{
path_validation::validate_game_file_path,
peer::{send_game_file_chunk, send_game_file_data},
};
const PEER_PING_INTERVAL_SECS: u64 = 5;
const PEER_STALE_TIMEOUT_SECS: u64 = 12;
/// Custom error types for peer operations
#[derive(Debug)]
pub enum PeerError {
FileSizeDetermination {
path: String,
source: std::io::Error,
},
GameDirNotSet,
Other(eyre::Report),
}
impl std::fmt::Display for PeerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PeerError::FileSizeDetermination { path, source } => {
write!(f, "Failed to determine file size for {path}: {source}")
}
PeerError::GameDirNotSet => write!(f, "Game directory not set"),
PeerError::Other(err) => write!(f, "General error: {err}"),
}
}
}
impl std::error::Error for PeerError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
PeerError::FileSizeDetermination { source, .. } => Some(source),
PeerError::Other(err) => Some(err.root_cause()),
PeerError::GameDirNotSet => None,
}
}
}
impl From<eyre::Report> for PeerError {
fn from(err: eyre::Report) -> Self {
PeerError::Other(err)
}
}
/// Initialize and start the peer system
/// This function replaces the main.rs entry point and allows the peer to be started from other crates
pub fn start_peer(
game_dir: String,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<UnboundedSender<PeerCommand>> {
log::info!("Starting peer system with game directory: {game_dir}");
let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel();
// Start the peer in a background task
let tx_control_clone = tx_control.clone();
tokio::spawn(async move {
if let Err(e) = run_peer(rx_control, tx_notify_ui).await {
log::error!("Peer system failed: {e}");
}
});
// Set the game directory
tx_control.send(PeerCommand::SetGameDir(game_dir))?;
Ok(tx_control_clone)
}
static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem"));
static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem"));
#[derive(Debug)]
pub enum PeerEvent {
ListGames(Vec<Game>),
GotGameFiles {
id: String,
file_descriptions: Vec<GameFileDescription>,
},
DownloadGameFilesBegin {
id: String,
},
DownloadGameFilesFinished {
id: String,
},
DownloadGameFilesFailed {
id: String,
},
NoPeersHaveGame {
id: String,
},
PeerConnected(SocketAddr),
PeerDisconnected(SocketAddr),
PeerDiscovered(SocketAddr),
PeerLost(SocketAddr),
PeerCountUpdated(usize),
LocalGamesUpdated(Vec<Game>),
}
#[derive(Clone, Debug)]
pub struct PeerInfo {
pub addr: SocketAddr,
pub last_seen: Instant,
pub games: HashMap<String, Game>,
pub files: HashMap<String, Vec<GameFileDescription>>,
}
#[derive(Debug)]
pub struct PeerGameDB {
peers: HashMap<SocketAddr, PeerInfo>,
}
impl Default for PeerGameDB {
fn default() -> Self {
Self::new()
}
}
impl PeerGameDB {
#[must_use]
pub fn new() -> Self {
Self {
peers: HashMap::new(),
}
}
pub fn add_peer(&mut self, addr: SocketAddr) {
let peer_info = PeerInfo {
addr,
last_seen: Instant::now(),
games: HashMap::new(),
files: HashMap::new(),
};
self.peers.insert(addr, peer_info);
log::info!("Added peer: {addr}");
}
pub fn remove_peer(&mut self, addr: &SocketAddr) -> Option<PeerInfo> {
self.peers.remove(addr)
}
pub fn update_peer_games(&mut self, addr: SocketAddr, games: Vec<Game>) {
if let Some(peer) = self.peers.get_mut(&addr) {
let mut map = HashMap::with_capacity(games.len());
for game in games {
map.insert(game.id.clone(), game);
}
peer.games = map;
peer.last_seen = Instant::now();
log::info!("Updated games for peer: {addr}");
}
}
pub fn update_peer_game_files(
&mut self,
addr: SocketAddr,
game_id: &str,
files: Vec<GameFileDescription>,
) {
if let Some(peer) = self.peers.get_mut(&addr) {
peer.files.insert(game_id.to_string(), files);
peer.last_seen = Instant::now();
}
}
pub fn update_last_seen(&mut self, addr: &SocketAddr) {
if let Some(peer) = self.peers.get_mut(addr) {
peer.last_seen = Instant::now();
}
}
#[must_use]
pub fn get_all_games(&self) -> Vec<Game> {
let mut aggregated: HashMap<String, Game> = HashMap::new();
let mut peer_counts: HashMap<String, u32> = HashMap::new();
// Count peers per game
for peer in self.peers.values() {
for game_id in peer.games.keys() {
*peer_counts.entry(game_id.clone()).or_insert(0) += 1;
}
}
// Aggregate games with peer counts
for peer in self.peers.values() {
for game in peer.games.values() {
aggregated
.entry(game.id.clone())
.and_modify(|existing| {
if let (Some(new_version), Some(current)) =
(&game.eti_game_version, &existing.eti_game_version)
{
if new_version > current {
existing.eti_game_version = Some(new_version.clone());
}
} else if existing.eti_game_version.is_none() {
existing.eti_game_version.clone_from(&game.eti_game_version);
}
// Update peer count
existing.peer_count = peer_counts[&game.id];
})
.or_insert_with(|| {
let mut game_clone = game.clone();
game_clone.peer_count = peer_counts[&game.id];
game_clone
});
}
}
let mut games: Vec<Game> = aggregated.into_values().collect();
games.sort_by(|a, b| a.name.cmp(&b.name));
games
}
#[must_use]
pub fn get_latest_version_for_game(&self, game_id: &str) -> Option<String> {
let mut latest_version: Option<String> = None;
for peer in self.peers.values() {
if let Some(game) = peer.games.get(game_id)
&& let Some(ref version) = game.eti_game_version
{
match &latest_version {
None => latest_version = Some(version.clone()),
Some(current_latest) => {
if version > current_latest {
latest_version = Some(version.clone());
}
}
}
}
}
latest_version
}
#[must_use]
pub fn get_peer_addresses(&self) -> Vec<SocketAddr> {
self.peers.keys().copied().collect()
}
#[must_use]
pub fn contains_peer(&self, addr: &SocketAddr) -> bool {
self.peers.contains_key(addr)
}
#[must_use]
pub fn peers_with_game(&self, game_id: &str) -> Vec<SocketAddr> {
self.peers
.iter()
.filter(|(_, peer)| peer.games.contains_key(game_id))
.map(|(addr, _)| *addr)
.collect()
}
#[must_use]
pub fn peers_with_latest_version(&self, game_id: &str) -> Vec<SocketAddr> {
let latest_version = self.get_latest_version_for_game(game_id);
if let Some(ref latest) = latest_version {
self.peers
.iter()
.filter(|(_, peer)| {
if let Some(game) = peer.games.get(game_id) {
if let Some(ref version) = game.eti_game_version {
version == latest
} else {
false
}
} else {
false
}
})
.map(|(addr, _)| *addr)
.collect()
} else {
// If no version info is available, fall back to all peers with the game
self.peers_with_game(game_id)
}
}
#[must_use]
pub fn game_files_for(&self, game_id: &str) -> Vec<(SocketAddr, Vec<GameFileDescription>)> {
self.peers
.iter()
.filter_map(|(addr, peer)| peer.files.get(game_id).cloned().map(|files| (*addr, files)))
.collect()
}
#[must_use]
pub fn aggregated_game_files(&self, game_id: &str) -> Vec<GameFileDescription> {
let mut seen: HashMap<String, GameFileDescription> = HashMap::new();
for (_, files) in self.game_files_for(game_id) {
for file in files {
seen.entry(file.relative_path.clone()).or_insert(file);
}
}
seen.into_values().collect()
}
/// Validates file sizes across all peers and returns only the files with majority consensus
/// Returns a tuple of (`validated_files`, `peer_whitelist`, `file_peer_map`) where
/// `peer_whitelist` contains peers that have at least one majority-approved file and
/// `file_peer_map` lists which peers were validated for each file
pub fn validate_file_sizes_majority(
&self,
game_id: &str,
) -> eyre::Result<MajorityValidationResult> {
let game_files = self.game_files_for(game_id);
if game_files.is_empty() {
return Ok((Vec::new(), Vec::new(), HashMap::new()));
}
let (file_size_map, _peer_files) = collect_file_sizes(&game_files);
let (validated_files, peer_scores, file_peer_map) =
self.validate_each_file_consensus(game_id, file_size_map)?;
let peer_whitelist = create_peer_whitelist(peer_scores);
Ok((validated_files, peer_whitelist, file_peer_map))
}
/// Validates consensus for each file and returns validated files with peer scores
fn validate_each_file_consensus(
&self,
game_id: &str,
file_size_map: FileSizeMap,
) -> eyre::Result<FileConsensusAggregation> {
let mut validated_files = Vec::new();
let mut peer_whitelist_scores: HashMap<SocketAddr, usize> = HashMap::new();
let mut file_peer_map: HashMap<String, Vec<SocketAddr>> = HashMap::new();
for (relative_path, size_map) in file_size_map {
let total_peers: usize = size_map.values().map(Vec::len).sum();
if total_peers == 0 {
continue; // Skip files with no size information
}
let (consensus_size, consensus_peers) =
self.determine_size_consensus(&size_map, total_peers, &relative_path)?;
update_peer_scores(&consensus_peers, &mut peer_whitelist_scores);
if let Some((size, peers)) = consensus_size
&& let Some(file_desc) =
self.create_validated_file_description(game_id, &relative_path, size, &peers)
{
file_peer_map.insert(relative_path.clone(), peers.clone());
validated_files.push(file_desc);
}
}
Ok((validated_files, peer_whitelist_scores, file_peer_map))
}
/// Determines the consensus size for a file based on peer reports
///
/// # Panics
///
/// Panics if `size_map.iter().next()` returns None when `total_peers` == 1
#[allow(clippy::unused_self)]
fn determine_size_consensus(
&self,
size_map: &HashMap<u64, Vec<SocketAddr>>,
total_peers: usize,
relative_path: &str,
) -> eyre::Result<(ConsensusResult, Vec<SocketAddr>)> {
if total_peers == 1 {
// Only one peer has this file - trust it
let (&size, peers) = size_map
.iter()
.next()
.expect("size_map should have at least one entry when total_peers == 1");
return Ok((Some((size, peers.clone())), peers.clone()));
}
let (majority_size, _majority_count) = find_majority_size(size_map);
if let Some(size) = majority_size {
let majority_peers = &size_map[&size];
let is_majority = majority_peers.len() > total_peers / 2;
if is_majority {
// We have a clear majority
Ok((Some((size, majority_peers.clone())), majority_peers.clone()))
} else if total_peers == 2 {
// Two peers with different sizes - ambiguous, fail
eyre::bail!(
"File size ambiguity for '{}': two peers report different sizes, cannot determine majority",
relative_path
);
}
// If no majority and more than 2 peers, we fall back to plurality (largest group)
else {
Ok((Some((size, majority_peers.clone())), majority_peers.clone()))
}
} else {
// No clear majority and it's a tie between different sizes
if total_peers == 2 {
eyre::bail!(
"File size ambiguity for '{}': two peers report different sizes, cannot determine majority",
relative_path
);
}
// For more than 2 peers, we could fall back to plurality, but for now let's be strict
eyre::bail!(
"File size ambiguity for '{}': no clear majority among {} peers",
relative_path,
total_peers
);
}
}
/// Creates a validated file description from consensus data
fn create_validated_file_description(
&self,
game_id: &str,
relative_path: &str,
size: u64,
peers: &[SocketAddr],
) -> Option<GameFileDescription> {
if let Some(first_peer) = peers.first()
&& let Some(files) = self
.peers
.get(first_peer)
.and_then(|p| p.files.get(game_id))
&& let Some(file_desc) = files
.iter()
.find(|f| f.relative_path == relative_path && f.size == size)
{
return Some(file_desc.clone());
}
None
}
#[must_use]
pub fn get_stale_peers(&self, timeout: Duration) -> Vec<SocketAddr> {
self.peers
.iter()
.filter(|(_, peer)| peer.last_seen.elapsed() > timeout)
.map(|(addr, _)| *addr)
.collect()
}
}
/// Type alias for file size mapping: path -> size -> peers
type FileSizeMap = HashMap<String, HashMap<u64, Vec<SocketAddr>>>;
/// Type alias for peer file mapping: peer -> path -> size
type PeerFileMap = HashMap<SocketAddr, HashMap<String, u64>>;
/// Type alias for consensus result: (size, peers) or None
type ConsensusResult = Option<(u64, Vec<SocketAddr>)>;
/// Type alias for the aggregated majority validation result
type MajorityValidationResult = (
Vec<GameFileDescription>,
Vec<SocketAddr>,
HashMap<String, Vec<SocketAddr>>,
);
/// Type alias for per-file consensus aggregation results
type FileConsensusAggregation = (
Vec<GameFileDescription>,
HashMap<SocketAddr, usize>,
HashMap<String, Vec<SocketAddr>>,
);
/// Collects file sizes from all peers and organizes them by path and size
fn collect_file_sizes(
game_files: &[(SocketAddr, Vec<GameFileDescription>)],
) -> (FileSizeMap, PeerFileMap) {
let mut file_size_map: FileSizeMap = HashMap::new();
let mut peer_files: PeerFileMap = HashMap::new();
for (peer_addr, files) in game_files {
let mut peer_file_sizes = HashMap::new();
for file in files {
if !file.is_dir {
let size = file.size;
file_size_map
.entry(file.relative_path.clone())
.or_default()
.entry(size)
.or_default()
.push(*peer_addr);
peer_file_sizes.insert(file.relative_path.clone(), size);
}
}
peer_files.insert(*peer_addr, peer_file_sizes);
}
(file_size_map, peer_files)
}
/// Finds the majority size from a map of sizes to peer lists
fn find_majority_size(size_map: &HashMap<u64, Vec<SocketAddr>>) -> (Option<u64>, usize) {
let mut majority_size = None;
let mut majority_count = 0;
for (&size, peers) in size_map {
let count = peers.len();
if count > majority_count {
majority_count = count;
majority_size = Some(size);
} else if count == majority_count {
// Tie between different sizes - ambiguous, fail
majority_size = None;
break;
}
}
(majority_size, majority_count)
}
/// Updates peer scores based on consensus participation
fn update_peer_scores(
peers: &[SocketAddr],
peer_whitelist_scores: &mut HashMap<SocketAddr, usize>,
) {
for &peer in peers {
*peer_whitelist_scores.entry(peer).or_insert(0) += 1;
}
}
/// Creates a peer whitelist from scores, including peers with the highest scores
fn create_peer_whitelist(peer_scores: HashMap<SocketAddr, usize>) -> Vec<SocketAddr> {
if peer_scores.is_empty() {
return Vec::new();
}
let mut peers: Vec<_> = peer_scores
.into_iter()
.filter_map(|(peer, score)| (score > 0).then_some((peer, score)))
.collect();
peers.sort_by_key(|(peer, score)| (Reverse(*score), *peer));
peers.into_iter().map(|(peer, _)| peer).collect()
}
#[derive(Debug)]
pub enum PeerCommand {
ListGames,
GetGame(String),
DownloadGameFiles {
id: String,
file_descriptions: Vec<GameFileDescription>,
},
SetGameDir(String),
GetPeerCount,
}
async fn initial_peer_alive_check(conn: &mut Connection) -> bool {
let remote_addr = conn.remote_addr().ok();
let stream = match conn.open_bidirectional_stream().await {
Ok(stream) => stream,
Err(e) => {
log::error!("{remote_addr:?} failed to open stream: {e}");
return false;
}
};
let (mut rx, mut tx) = stream.split();
// send ping
if let Err(e) = tx.send(Request::Ping.encode()).await {
log::error!("{remote_addr:?} failed to send ping to peer: {e}");
return false;
}
let _ = tx.close().await;
// receive pong
if let Ok(Some(response)) = rx.receive().await {
let response = Response::decode(response);
match response {
Response::Pong => {
log::trace!("{remote_addr:?} peer is alive");
return true;
}
_ => {
log::error!("{remote_addr:?} peer sent invalid response to ping: {response:?}");
}
}
}
false
}
const CHUNK_SIZE: u64 = 32 * 1024 * 1024;
const MAX_RETRY_COUNT: usize = 3;
#[derive(Debug, Clone)]
struct DownloadChunk {
relative_path: String,
offset: u64,
length: u64,
retry_count: usize,
last_peer: Option<SocketAddr>,
}
#[derive(Debug, Default)]
struct PeerDownloadPlan {
chunks: Vec<DownloadChunk>,
whole_files: Vec<GameFileDescription>,
}
#[derive(Debug)]
struct ChunkDownloadResult {
chunk: DownloadChunk,
result: eyre::Result<()>,
peer_addr: SocketAddr,
}
async fn prepare_game_storage(
games_folder: &Path,
file_descs: &[GameFileDescription],
) -> eyre::Result<()> {
for desc in file_descs {
// Validate the path to prevent directory traversal
let validated_path = validate_game_file_path(games_folder, &desc.relative_path)?;
if desc.is_dir {
tokio::fs::create_dir_all(&validated_path).await?;
} else {
if let Some(parent) = validated_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
// Create and pre-allocate the file with the expected size
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&validated_path)
.await?;
// Pre-allocate the file with the expected size
let size = desc.size;
if let Err(e) = file.set_len(size).await {
log::warn!(
"Failed to pre-allocate file {} (size: {}): {}",
desc.relative_path,
size,
e
);
// Continue without pre-allocation - the file will grow as chunks are written
} else {
log::debug!(
"Pre-allocated file {} with {} bytes",
desc.relative_path,
size
);
}
}
}
Ok(())
}
fn resolve_file_peers<'a>(
relative_path: &str,
file_peer_map: &'a HashMap<String, Vec<SocketAddr>>,
fallback: &'a [SocketAddr],
) -> &'a [SocketAddr] {
if let Some(peers) = file_peer_map.get(relative_path)
&& !peers.is_empty()
{
return peers;
}
fallback
}
fn build_peer_plans(
peers: &[SocketAddr],
file_descs: &[GameFileDescription],
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
) -> HashMap<SocketAddr, PeerDownloadPlan> {
let mut plans: HashMap<SocketAddr, PeerDownloadPlan> = HashMap::new();
if peers.is_empty() {
return plans;
}
let mut peer_index = 0usize;
for desc in file_descs.iter().filter(|d| !d.is_dir) {
let size = desc.file_size();
let eligible_peers = resolve_file_peers(&desc.relative_path, file_peer_map, peers);
if eligible_peers.is_empty() {
continue;
}
if size == 0 {
let peer = eligible_peers[peer_index % eligible_peers.len()];
peer_index += 1;
plans.entry(peer).or_default().chunks.push(DownloadChunk {
relative_path: desc.relative_path.clone(),
offset: 0,
length: 0,
retry_count: 0,
last_peer: Some(peer),
});
continue;
}
let mut offset = 0u64;
while offset < size {
let length = std::cmp::min(CHUNK_SIZE, size - offset);
let peer = eligible_peers[peer_index % eligible_peers.len()];
peer_index += 1;
plans.entry(peer).or_default().chunks.push(DownloadChunk {
relative_path: desc.relative_path.clone(),
offset,
length,
retry_count: 0,
last_peer: Some(peer),
});
offset += length;
}
}
plans
}
async fn download_chunk(
conn: &mut Connection,
base_dir: &Path,
game_id: &str,
chunk: &DownloadChunk,
) -> eyre::Result<()> {
let stream = conn.open_bidirectional_stream().await?;
let (mut rx, mut tx) = stream.split();
let request = Request::GetGameFileChunk {
game_id: game_id.to_string(),
relative_path: chunk.relative_path.clone(),
offset: chunk.offset,
length: chunk.length,
};
tx.write_all(&request.encode()).await?;
tx.close().await?;
// Validate the path to prevent directory traversal
let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?;
let mut file = OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&validated_path)
.await?;
if chunk.length == 0 && chunk.offset == 0 {
// fallback-to-whole-file path replaces any existing partial data
file.set_len(0).await?;
}
file.seek(std::io::SeekFrom::Start(chunk.offset)).await?;
let mut remaining = chunk.length;
let mut received_bytes = 0u64;
while let Some(bytes) = rx.receive().await? {
file.write_all(&bytes).await?;
received_bytes += bytes.len() as u64;
if remaining == 0 {
continue;
}
remaining = remaining.saturating_sub(bytes.len() as u64);
if remaining == 0 {
break;
}
}
// Verify we received the expected amount of data
if chunk.length > 0 && received_bytes != chunk.length {
eyre::bail!(
"Incomplete chunk download: expected {} bytes, received {} bytes for file {} at offset {}",
chunk.length,
received_bytes,
chunk.relative_path,
chunk.offset
);
}
file.flush().await?;
// Verify file integrity by checking the file size
verify_chunk_integrity(&validated_path, chunk.offset, chunk.length).await?;
Ok(())
}
async fn verify_chunk_integrity(
file_path: &Path,
offset: u64,
expected_length: u64,
) -> eyre::Result<()> {
if expected_length == 0 {
return Ok(()); // Skip verification for whole files or zero-length chunks
}
let metadata = tokio::fs::metadata(file_path).await?;
let file_size = metadata.len();
if file_size < offset + expected_length {
eyre::bail!(
"File integrity check failed: file size {} is less than expected {} (offset: {})",
file_size,
offset + expected_length,
offset
);
}
Ok(())
}
async fn download_whole_file(
conn: &mut Connection,
base_dir: &Path,
desc: &GameFileDescription,
) -> eyre::Result<()> {
let stream = conn.open_bidirectional_stream().await?;
let (mut rx, mut tx) = stream.split();
tx.write_all(&Request::GetGameFileData(desc.clone()).encode())
.await?;
tx.close().await?;
// Validate the path to prevent directory traversal
let validated_path = validate_game_file_path(base_dir, &desc.relative_path)?;
let mut file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&validated_path)
.await?;
file.seek(std::io::SeekFrom::Start(0)).await?;
while let Some(bytes) = rx.receive().await? {
file.write_all(&bytes).await?;
}
file.flush().await?;
Ok(())
}
async fn download_from_peer(
peer_addr: SocketAddr,
game_id: &str,
plan: PeerDownloadPlan,
games_folder: PathBuf,
) -> eyre::Result<Vec<ChunkDownloadResult>> {
if plan.chunks.is_empty() && plan.whole_files.is_empty() {
return Ok(Vec::new());
}
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
let client = QuicClient::builder()
.with_tls(CERT_PEM)?
.with_io("0.0.0.0:0")?
.with_limits(limits)?
.start()?;
let conn = Connect::new(peer_addr).with_server_name("localhost");
let mut conn = client.connect(conn).await?;
conn.keep_alive(true)?;
let base_dir = games_folder;
let mut results = Vec::new();
// Download chunks with error handling
for chunk in &plan.chunks {
log::info!(
"Downloading chunk {} (offset {}, length {}) from {}",
chunk.relative_path,
chunk.offset,
chunk.length,
peer_addr
);
let result = download_chunk(&mut conn, &base_dir, game_id, chunk).await;
results.push(ChunkDownloadResult {
chunk: chunk.clone(),
result,
peer_addr,
});
}
// Download whole files
for desc in &plan.whole_files {
let chunk = DownloadChunk {
relative_path: desc.relative_path.clone(),
offset: 0,
length: 0, // Indicates whole file
retry_count: 0,
last_peer: Some(peer_addr),
};
let result = download_whole_file(&mut conn, &base_dir, desc).await;
results.push(ChunkDownloadResult {
chunk,
result,
peer_addr,
});
}
Ok(results)
}
async fn download_game_files(
game_id: &str,
game_file_descs: Vec<GameFileDescription>,
games_folder: String,
peers: Vec<SocketAddr>,
file_peer_map: HashMap<String, Vec<SocketAddr>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<()> {
if peers.is_empty() {
eyre::bail!("no peers available for game {game_id}");
}
let base_dir = PathBuf::from(&games_folder);
prepare_game_storage(&base_dir, &game_file_descs).await?;
tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin {
id: game_id.to_string(),
})?;
let plans = build_peer_plans(&peers, &game_file_descs, &file_peer_map);
let mut tasks = Vec::new();
for (peer_addr, plan) in plans {
let base_dir = base_dir.clone();
let game_id = game_id.to_string();
tasks.push(tokio::spawn(async move {
download_from_peer(peer_addr, &game_id, plan, base_dir).await
}));
}
let mut failed_chunks: Vec<DownloadChunk> = Vec::new();
let mut last_err: Option<eyre::Report> = None;
for handle in tasks {
match handle.await {
Ok(Ok(results)) => {
for chunk_result in results {
if let Err(e) = chunk_result.result {
log::warn!(
"Failed to download chunk from {}: {e}",
chunk_result.peer_addr
);
if chunk_result.chunk.retry_count < MAX_RETRY_COUNT {
let mut retry_chunk = chunk_result.chunk;
retry_chunk.retry_count += 1;
retry_chunk.last_peer = Some(chunk_result.peer_addr);
failed_chunks.push(retry_chunk);
} else {
last_err = Some(eyre::eyre!(
"Max retries exceeded for chunk: {}",
chunk_result.chunk.relative_path
));
}
}
}
}
Ok(Err(e)) => last_err = Some(e),
Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")),
}
}
// Retry failed chunks if any
if !failed_chunks.is_empty() && !peers.is_empty() {
log::info!("Retrying {} failed chunks", failed_chunks.len());
let retry_results =
retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id, &file_peer_map).await;
for chunk_result in retry_results {
if let Err(e) = chunk_result.result {
log::error!("Retry failed for chunk: {e}");
last_err = Some(e);
}
}
}
if let Some(err) = last_err {
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
return Err(err);
}
log::info!("all files downloaded for game: {game_id}");
tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished {
id: game_id.to_string(),
})?;
Ok(())
}
fn select_retry_peer(
peers: &[SocketAddr],
last_peer: Option<SocketAddr>,
attempt_offset: usize,
) -> Option<SocketAddr> {
if peers.is_empty() {
return None;
}
if peers.len() > 1
&& let Some(last) = last_peer
&& let Some(pos) = peers.iter().position(|addr| *addr == last)
{
let next_index = (pos + 1 + attempt_offset) % peers.len();
return Some(peers[next_index]);
}
Some(peers[attempt_offset % peers.len()])
}
fn fallback_peer_addr(peers: &[SocketAddr], last_peer: Option<SocketAddr>) -> SocketAddr {
last_peer
.or_else(|| peers.first().copied())
.unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], 0)))
}
async fn retry_failed_chunks(
failed_chunks: Vec<DownloadChunk>,
peers: &[SocketAddr],
base_dir: &Path,
game_id: &str,
file_peer_map: &HashMap<String, Vec<SocketAddr>>,
) -> Vec<ChunkDownloadResult> {
let mut exhausted = Vec::new();
let mut queue: VecDeque<DownloadChunk> = failed_chunks.into_iter().collect();
while let Some(mut chunk) = queue.pop_front() {
let eligible_peers = resolve_file_peers(&chunk.relative_path, file_peer_map, peers);
if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"Retry budget exhausted for chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
continue;
}
let retry_offset = chunk.retry_count.saturating_sub(1);
let Some(peer_addr) = select_retry_peer(eligible_peers, chunk.last_peer, retry_offset)
else {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(eyre::eyre!(
"No peers available to retry chunk: {}",
chunk.relative_path
)),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
continue;
};
let mut attempt_chunk = chunk.clone();
attempt_chunk.last_peer = Some(peer_addr);
let plan = PeerDownloadPlan {
chunks: vec![attempt_chunk.clone()],
whole_files: Vec::new(),
};
match download_from_peer(peer_addr, game_id, plan, base_dir.to_path_buf()).await {
Ok(results) => {
for result in results {
match result.result {
Ok(()) => {}
Err(e) => {
let mut retry_chunk = result.chunk.clone();
retry_chunk.retry_count = chunk.retry_count + 1;
retry_chunk.last_peer = Some(result.peer_addr);
if retry_chunk.retry_count >= MAX_RETRY_COUNT {
let context = format!(
"Retry budget exhausted for chunk: {}",
result.chunk.relative_path
);
exhausted.push(ChunkDownloadResult {
chunk: retry_chunk,
result: Err(e.wrap_err(context)),
peer_addr: result.peer_addr,
});
} else {
queue.push_back(retry_chunk);
}
}
}
}
}
Err(e) => {
chunk.retry_count += 1;
chunk.last_peer = Some(peer_addr);
if chunk.retry_count >= MAX_RETRY_COUNT {
exhausted.push(ChunkDownloadResult {
chunk: chunk.clone(),
result: Err(e.wrap_err(format!(
"Retry budget exhausted for chunk after connection failure: {}",
chunk.relative_path
))),
peer_addr: fallback_peer_addr(eligible_peers, chunk.last_peer),
});
} else {
queue.push_back(chunk);
}
}
}
}
exhausted
}
/// Load local game database combining locally installed games
async fn load_local_game_db(game_dir: &str) -> eyre::Result<GameDB> {
let game_path = PathBuf::from(game_dir);
let metadata = match tokio::fs::metadata(&game_path).await {
Ok(metadata) => metadata,
Err(err) => {
if err.kind() == ErrorKind::NotFound {
log::warn!(
"Local game directory {} missing; reporting empty game database",
game_path.display()
);
return Ok(GameDB::empty());
}
return Err(err.into());
}
};
if !metadata.is_dir() {
log::warn!(
"Configured game directory {} is not a directory; reporting empty game database",
game_path.display()
);
return Ok(GameDB::empty());
}
let mut games = Vec::new();
// Scan game directory and create entries for installed games
let mut entries = tokio::fs::read_dir(&game_path).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir()
&& let Some(game_id) = path.file_name().and_then(|n| n.to_str())
{
let eti_path = path.join(format!("{game_id}.eti"));
let downloaded = tokio::fs::metadata(&eti_path).await.is_ok();
if !downloaded {
continue;
}
let installed = local_dir_has_content(&path).await;
let local_version = if installed {
match lanspread_db::db::read_version_from_ini(&path) {
Ok(version) => version,
Err(e) => {
log::warn!("Failed to read version.ini for installed game {game_id}: {e}");
None
}
}
} else {
None
};
let size = calculate_directory_size(&path).await?;
let game = Game {
id: game_id.to_string(),
name: game_id.to_string(),
description: String::new(),
release_year: String::new(),
publisher: String::new(),
max_players: 1,
version: "1.0".to_string(),
genre: String::new(),
size,
thumbnail: None,
downloaded,
installed,
eti_game_version: local_version.clone(),
local_version,
peer_count: 0, // Local games start with 0 peers
};
games.push(game);
}
}
Ok(GameDB::from(games))
}
async fn local_dir_has_content(path: &Path) -> bool {
let local_dir = path.join("local");
if tokio::fs::metadata(&local_dir).await.is_err() {
return false;
}
let mut entries = match tokio::fs::read_dir(&local_dir).await {
Ok(entries) => entries,
Err(e) => {
log::warn!("Failed to read local dir {}: {e}", local_dir.display());
return false;
}
};
match entries.next_entry().await {
Ok(Some(_)) => true,
Ok(None) => false,
Err(e) => {
log::warn!("Failed to iterate local dir {}: {e}", local_dir.display());
false
}
}
}
async fn calculate_directory_size(dir: &Path) -> eyre::Result<u64> {
let mut total_size = 0u64;
let mut entries = tokio::fs::read_dir(dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let metadata = tokio::fs::metadata(&path).await?;
if metadata.is_dir() {
total_size += Box::pin(calculate_directory_size(&path)).await?;
} else {
total_size += metadata.len();
}
}
Ok(total_size)
}
async fn local_download_available(game_dir: &str, game_id: &str) -> bool {
let game_path = PathBuf::from(game_dir).join(game_id);
let eti_path = game_path.join(format!("{game_id}.eti"));
if tokio::fs::metadata(&eti_path).await.is_err() {
return false;
}
// Only treat as pending install if the local installation directory is empty/missing
!local_dir_has_content(game_path.as_path()).await
}
#[derive(Clone)]
struct Ctx {
game_dir: Arc<RwLock<Option<String>>>,
local_game_db: Arc<RwLock<Option<GameDB>>>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
}
#[derive(Clone)]
struct PeerCtx {
game_dir: Arc<RwLock<Option<String>>>,
local_game_db: Arc<RwLock<Option<GameDB>>>,
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
}
impl std::fmt::Debug for PeerCtx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerCtx")
.field("game_dir", &"...")
.field("local_game_db", &"...")
.field("local_peer_addr", &"...")
.finish()
}
}
/// Main peer execution loop that handles peer commands and manages the peer system.
///
/// # Panics
///
/// This function will panic if the games folder is None after being checked for None.
/// The panic occurs at line 908 where `games_folder.expect("checked above")` is called.
pub async fn run_peer(
mut rx_control: UnboundedReceiver<PeerCommand>,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<()> {
// peer context
let ctx = Ctx {
game_dir: Arc::new(RwLock::new(None)),
local_game_db: Arc::new(RwLock::new(None)),
peer_game_db: Arc::new(RwLock::new(PeerGameDB::new())),
local_peer_addr: Arc::new(RwLock::new(None)),
};
let peer_ctx = PeerCtx {
game_dir: ctx.game_dir.clone(),
local_game_db: ctx.local_game_db.clone(),
local_peer_addr: ctx.local_peer_addr.clone(),
};
// Start server component
let server_addr = "0.0.0.0:0".parse::<SocketAddr>()?;
let tx_notify_ui_clone = tx_notify_ui.clone();
let peer_ctx_clone = peer_ctx.clone();
tokio::spawn(async move {
if let Err(e) = run_server_component(server_addr, peer_ctx_clone, tx_notify_ui_clone).await
{
log::error!("Server component error: {e}");
}
});
// Start peer discovery task
let tx_notify_ui_discovery = tx_notify_ui.clone();
let peer_game_db_discovery = ctx.peer_game_db.clone();
let local_peer_addr = ctx.local_peer_addr.clone();
tokio::spawn(async move {
run_peer_discovery(
tx_notify_ui_discovery,
peer_game_db_discovery,
local_peer_addr,
)
.await;
});
// Start ping service task
let tx_notify_ui_ping = tx_notify_ui.clone();
let peer_game_db_ping = ctx.peer_game_db.clone();
tokio::spawn(async move {
run_ping_service(tx_notify_ui_ping, peer_game_db_ping).await;
});
// Start local game directory monitoring task
let tx_notify_ui_monitor = tx_notify_ui.clone();
let ctx_monitor = ctx.clone();
tokio::spawn(async move {
run_local_game_monitor(tx_notify_ui_monitor, ctx_monitor).await;
});
// Handle client commands
loop {
let Some(cmd) = rx_control.recv().await else {
break;
};
match cmd {
PeerCommand::ListGames => {
handle_list_games_command(&ctx, &tx_notify_ui).await;
}
PeerCommand::GetGame(id) => {
handle_get_game_command(&ctx, &tx_notify_ui, id).await;
}
PeerCommand::DownloadGameFiles {
id,
file_descriptions,
} => {
handle_download_game_files_command(&ctx, &tx_notify_ui, id, file_descriptions)
.await;
}
PeerCommand::SetGameDir(game_dir) => {
handle_set_game_dir_command(&ctx, game_dir).await;
}
PeerCommand::GetPeerCount => {
handle_get_peer_count_command(&ctx, &tx_notify_ui).await;
}
}
}
Ok(())
}
async fn run_server_component(
addr: SocketAddr,
ctx: PeerCtx,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<()> {
let limits = Limits::default()
.with_max_handshake_duration(Duration::from_secs(3))?
.with_max_idle_timeout(Duration::from_secs(3))?;
let mut server = Server::builder()
.with_tls((CERT_PEM, KEY_PEM))?
.with_io(addr)?
.with_limits(limits)?
.start()?;
let server_addr = server.local_addr()?;
log::info!("Peer server listening on {server_addr}");
let advertise_ip = select_advertise_ip()?;
let advertise_addr = SocketAddr::new(advertise_ip, server_addr.port());
log::info!("Advertising peer via mDNS from {advertise_addr}");
{
let mut guard = ctx.local_peer_addr.write().await;
*guard = Some(advertise_addr);
}
// Start mDNS advertising for peer discovery
let peer_id = Uuid::now_v7().simple().to_string();
let hostname = gethostname::gethostname();
let hostname_str = hostname.to_str().unwrap_or("");
// Calculate maximum hostname length that fits with UUID in 63 char limit
let max_hostname_len = 63usize.saturating_sub(peer_id.len() + 1);
let truncated_hostname = if hostname_str.len() > max_hostname_len {
hostname_str.get(..max_hostname_len).unwrap_or(hostname_str)
} else {
hostname_str
};
let combined_str = if truncated_hostname.is_empty() {
peer_id
} else {
format!("{truncated_hostname}-{peer_id}")
};
let mdns = tokio::task::spawn_blocking(move || {
MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, advertise_addr)
})
.await??;
// Monitor mDNS events
let _tx_notify_ui_mdns = tx_notify_ui.clone();
let hostname = truncated_hostname.to_string();
tokio::spawn(async move {
log::info!("Registering mDNS service with hostname: {hostname}");
while let Ok(event) = mdns.monitor.recv() {
match event {
lanspread_mdns::DaemonEvent::Error(e) => {
log::error!("mDNS error: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
_ => {
log::trace!("mDNS event: {event:?}");
}
}
}
});
while let Some(connection) = server.accept().await {
let ctx = ctx.clone();
let tx_notify_ui = tx_notify_ui.clone();
tokio::spawn(async move {
if let Err(e) = handle_peer_connection(connection, ctx, tx_notify_ui).await {
log::error!("Peer connection error: {e}");
}
});
}
Ok(())
}
async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
log::info!("ListGames command received");
emit_peer_game_list(&ctx.peer_game_db, tx_notify_ui).await;
}
async fn emit_peer_game_list(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) {
let all_games = { peer_game_db.read().await.get_all_games() };
if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(all_games)) {
log::error!("Failed to send ListGames event: {e}");
}
}
async fn try_serve_local_game(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
) -> bool {
let game_dir = { ctx.game_dir.read().await.clone() };
let Some(game_dir) = game_dir else {
return false;
};
if !local_download_available(&game_dir, id).await {
return false;
}
match get_game_file_descriptions(id, &game_dir).await {
Ok(file_descriptions) => {
log::info!("Serving game {id} from local files");
if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles {
id: id.to_string(),
file_descriptions,
}) {
log::error!("Failed to send GotGameFiles event: {e}");
}
true
}
Err(e) => {
log::error!("Failed to enumerate local file descriptions for {id}: {e}");
false
}
}
}
async fn handle_get_game_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>, id: String) {
if try_serve_local_game(ctx, tx_notify_ui, &id).await {
return;
}
log::info!("Requesting game from peers: {id}");
let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) };
if peers.is_empty() {
log::warn!("No peers have game {id}");
if let Err(e) = tx_notify_ui.send(PeerEvent::NoPeersHaveGame { id: id.clone() }) {
log::error!("Failed to send NoPeersHaveGame event: {e}");
}
return;
}
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui = tx_notify_ui.clone();
tokio::spawn(async move {
let mut fetched_any = false;
for peer_addr in peers {
match request_game_details_from_peer(peer_addr, &id, peer_game_db.clone()).await {
Ok(_) => {
log::info!("Fetched game file list for {id} from peer {peer_addr}");
fetched_any = true;
}
Err(e) => {
log::error!("Failed to fetch game files for {id} from {peer_addr}: {e}");
}
}
}
if fetched_any {
let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) };
if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles {
id: id.clone(),
file_descriptions: aggregated_files,
}) {
log::error!("Failed to send GotGameFiles event: {e}");
}
} else {
log::warn!("Failed to retrieve game files for {id} from any peer");
}
});
}
async fn handle_download_game_files_command(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: String,
file_descriptions: Vec<GameFileDescription>,
) {
log::info!("Got PeerCommand::DownloadGameFiles");
let games_folder = { ctx.game_dir.read().await.clone() };
if games_folder.is_none() {
log::error!("Cannot handle game file descriptions: games_folder is not set");
return;
}
let games_folder = games_folder.expect("checked above");
// Use majority validation to get trusted file descriptions and peer whitelist
let (validated_descriptions, peer_whitelist, file_peer_map) = {
match ctx
.peer_game_db
.read()
.await
.validate_file_sizes_majority(&id)
{
Ok((files, peers, file_peer_map)) => {
log::info!(
"Majority validation: {} validated files, {} trusted peers for game {id}",
files.len(),
peers.len()
);
(files, peers, file_peer_map)
}
Err(e) => {
log::error!("File size majority validation failed for {id}: {e}");
if let Err(send_err) =
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() })
{
log::error!("Failed to send DownloadGameFilesFailed event: {send_err}");
}
return;
}
}
};
let resolved_descriptions = if file_descriptions.is_empty() {
validated_descriptions
} else {
// If user provided specific descriptions, still validate them against majority
// but keep user's selection (they might want specific files)
file_descriptions
};
if resolved_descriptions.is_empty() {
log::error!(
"No validated file descriptions available to download game {id}; request metadata first"
);
return;
}
if peer_whitelist.is_empty() {
if local_download_available(&games_folder, &id).await {
log::info!("Using locally downloaded files for game {id}; skipping peer transfer");
if let Err(e) = tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { id: id.clone() })
{
log::error!("Failed to send DownloadGameFilesBegin event: {e}");
}
if let Err(e) =
tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished { id: id.clone() })
{
log::error!("Failed to send DownloadGameFilesFinished event: {e}");
}
} else {
log::error!("No trusted peers available after majority validation for game {id}");
}
return;
}
let tx_notify_ui = tx_notify_ui.clone();
tokio::spawn(async move {
match download_game_files(
&id,
resolved_descriptions,
games_folder,
peer_whitelist,
file_peer_map,
tx_notify_ui.clone(),
)
.await
{
Ok(()) => {}
Err(e) => {
log::error!("Download failed for {id}: {e}");
if let Err(send_err) =
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() })
{
log::error!("Failed to send DownloadGameFilesFailed event: {send_err}");
}
}
}
});
}
async fn handle_set_game_dir_command(ctx: &Ctx, game_dir: String) {
*ctx.game_dir.write().await = Some(game_dir.clone());
log::info!("Game directory set to: {game_dir}");
// Load local game database when game directory is set
let game_dir = game_dir.clone();
let local_game_db = ctx.local_game_db.clone();
tokio::spawn(async move {
match load_local_game_db(&game_dir).await {
Ok(db) => {
*local_game_db.write().await = Some(db);
log::info!("Local game database loaded successfully");
}
Err(e) => {
log::error!("Failed to load local game database: {e}");
}
}
});
}
async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
log::info!("GetPeerCount command received");
let peer_count = { ctx.peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(peer_count)) {
log::error!("Failed to send PeerCountUpdated event: {e}");
}
}
async fn handle_peer_connection(
mut connection: Connection,
ctx: PeerCtx,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<()> {
let remote_addr = connection.remote_addr()?;
log::info!("{remote_addr} peer connected");
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerConnected(remote_addr)) {
log::error!("Failed to send PeerConnected event: {e}");
}
// handle streams
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
let ctx = ctx.clone();
let remote_addr = Some(remote_addr);
tokio::spawn(async move {
if let Err(e) = handle_peer_stream(stream, ctx, remote_addr).await {
log::error!("{remote_addr:?} peer stream error: {e}");
}
});
}
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDisconnected(remote_addr)) {
log::error!("Failed to send PeerDisconnected event: {e}");
}
Ok(())
}
#[allow(clippy::too_many_lines)]
async fn handle_peer_stream(
stream: BidirectionalStream,
ctx: PeerCtx,
remote_addr: Option<SocketAddr>,
) -> eyre::Result<()> {
let (mut rx, mut tx) = stream.split();
log::trace!("{remote_addr:?} peer stream opened");
// handle streams
loop {
match rx.receive().await {
Ok(Some(data)) => {
log::trace!(
"{:?} msg: (raw): {}",
remote_addr,
String::from_utf8_lossy(&data)
);
let request = Request::decode(data);
log::debug!("{remote_addr:?} msg: {request:?}");
match request {
Request::Ping => {
// Respond with pong
if let Err(e) = tx.send(Response::Pong.encode()).await {
log::error!("Failed to send pong: {e}");
}
}
Request::ListGames => {
// Return list of games from this peer
log::info!("Received ListGames request from peer");
let games = if let Some(ref db) = *ctx.local_game_db.read().await {
db.all_games().into_iter().cloned().collect()
} else {
Vec::new()
};
if let Err(e) = tx.send(Response::ListGames(games).encode()).await {
log::error!("Failed to send ListGames response: {e}");
}
}
Request::GetGame { id } => {
log::info!("Received GetGame request for {id} from peer");
let response = if let Some(ref game_dir) = *ctx.game_dir.read().await {
if let Some(ref db) = *ctx.local_game_db.read().await {
if db.get_game_by_id(&id).is_some() {
match get_game_file_descriptions(&id, game_dir).await {
Ok(file_descriptions) => Response::GetGame {
id,
file_descriptions,
},
Err(PeerError::FileSizeDetermination { path, source }) => {
let error_msg = format!(
"Failed to determine file size for {path}: {source}"
);
log::error!(
"File size determination error for game {id}: {error_msg}"
);
Response::InternalPeerError(error_msg)
}
Err(e) => {
log::error!(
"Failed to get game file descriptions for {id}: {e}"
);
Response::GameNotFound(id)
}
}
} else {
Response::GameNotFound(id)
}
} else {
Response::GameNotFound(id)
}
} else {
Response::GameNotFound(id)
};
if let Err(e) = tx.send(response.encode()).await {
log::error!("Failed to send GetGame response: {e}");
}
}
Request::GetGameFileData(desc) => {
log::info!(
"Received GetGameFileData request for {} from peer",
desc.relative_path
);
let maybe_game_dir = ctx.game_dir.read().await.clone();
if let Some(game_dir) = maybe_game_dir {
let base_dir = PathBuf::from(game_dir);
send_game_file_data(&desc, &mut tx, &base_dir).await;
} else if let Err(e) = tx
.send(
Response::InvalidRequest(
desc.relative_path.as_bytes().to_vec().into(),
"Game directory not set".to_string(),
)
.encode(),
)
.await
{
log::error!("Failed to send GetGameFileData error: {e}");
}
}
Request::GetGameFileChunk {
game_id,
relative_path,
offset,
length,
} => {
log::info!(
"{remote_addr:?} received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
);
let maybe_game_dir = ctx.game_dir.read().await.clone();
if let Some(game_dir) = maybe_game_dir {
let base_dir = PathBuf::from(game_dir);
send_game_file_chunk(
&game_id,
&relative_path,
offset,
length,
&mut tx,
&base_dir,
)
.await;
} else if let Err(e) = tx
.send(
Response::InvalidRequest(
relative_path.as_bytes().to_vec().into(),
"Game directory not set".to_string(),
)
.encode(),
)
.await
{
log::error!("Failed to send GetGameFileChunk error: {e}");
}
}
Request::Invalid(_, _) => {
log::error!("Received invalid request from peer");
}
}
}
Ok(None) => {
log::trace!("{remote_addr:?} peer stream closed");
break;
}
Err(e) => {
log::error!("{remote_addr:?} peer stream error: {e}");
break;
}
}
}
Ok(())
}
async fn run_peer_discovery(
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
) {
log::info!("Starting peer discovery task");
let service_type = LANSPREAD_SERVICE_TYPE.to_string();
loop {
let (addr_tx, mut addr_rx) = tokio::sync::mpsc::unbounded_channel();
let service_type_clone = service_type.clone();
let worker_handle = tokio::task::spawn_blocking(move || -> eyre::Result<()> {
let browser = MdnsBrowser::new(&service_type_clone)?;
loop {
if let Some(addr) = browser.next_address(None)? {
if addr_tx.send(addr).is_err() {
log::debug!("Peer discovery consumer dropped; stopping worker");
break;
}
} else {
log::warn!("mDNS browser closed; stopping peer discovery worker");
break;
}
}
Ok(())
});
while let Some(peer_addr) = addr_rx.recv().await {
let is_self = {
let guard = local_peer_addr.read().await;
guard.as_ref().is_some_and(|addr| *addr == peer_addr)
};
if is_self {
log::trace!("Ignoring self advertisement at {peer_addr}");
continue;
}
let is_new_peer = {
let mut db = peer_game_db.write().await;
if db.contains_peer(&peer_addr) {
db.update_last_seen(&peer_addr);
false
} else {
db.add_peer(peer_addr);
true
}
};
if is_new_peer {
log::info!("Discovered peer at: {peer_addr}");
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) {
log::error!("Failed to send PeerDiscovered event: {e}");
}
let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) {
log::error!("Failed to send PeerCountUpdated event: {e}");
}
let tx_notify_ui_clone = tx_notify_ui.clone();
let peer_game_db_clone = peer_game_db.clone();
tokio::spawn(async move {
if let Err(e) =
request_games_from_peer(peer_addr, tx_notify_ui_clone, peer_game_db_clone)
.await
{
log::error!("Failed to request games from peer {peer_addr}: {e}");
}
});
}
}
match worker_handle.await {
Ok(Ok(())) => {
log::warn!("Peer discovery worker exited; restarting shortly");
}
Ok(Err(e)) => {
log::error!("Peer discovery worker failed: {e}");
}
Err(e) => {
log::error!("Peer discovery worker join error: {e}");
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
async fn request_games_from_peer(
peer_addr: SocketAddr,
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
) -> eyre::Result<()> {
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
let client = QuicClient::builder()
.with_tls(CERT_PEM)?
.with_io("0.0.0.0:0")?
.with_limits(limits)?
.start()?;
let conn = Connect::new(peer_addr).with_server_name("localhost");
let mut conn = client.connect(conn).await?;
let stream = conn.open_bidirectional_stream().await?;
let (mut rx, mut tx) = stream.split();
// Send ListGames request
tx.send(Request::ListGames.encode()).await?;
let _ = tx.close().await;
// Receive response
let mut data = BytesMut::new();
while let Ok(Some(bytes)) = rx.receive().await {
data.extend_from_slice(&bytes);
}
let response = Response::decode(data.freeze());
match response {
Response::ListGames(games) => {
log::info!("Received {} games from peer {peer_addr}", games.len());
let aggregated_games = {
let mut db = peer_game_db.write().await;
db.update_peer_games(peer_addr, games);
db.get_all_games()
};
if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) {
log::error!("Failed to send ListGames event: {e}");
}
}
_ => {
log::warn!("Unexpected response from peer {peer_addr}: {response:?}");
}
}
Ok(())
}
async fn request_game_details_from_peer(
peer_addr: SocketAddr,
game_id: &str,
peer_game_db: Arc<RwLock<PeerGameDB>>,
) -> eyre::Result<Vec<GameFileDescription>> {
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
let client = QuicClient::builder()
.with_tls(CERT_PEM)?
.with_io("0.0.0.0:0")?
.with_limits(limits)?
.start()?;
let conn = Connect::new(peer_addr).with_server_name("localhost");
let mut conn = client.connect(conn).await?;
let stream = conn.open_bidirectional_stream().await?;
let (mut rx, mut tx) = stream.split();
tx.send(
Request::GetGame {
id: game_id.to_string(),
}
.encode(),
)
.await?;
tx.close().await?;
let mut data = BytesMut::new();
while let Ok(Some(bytes)) = rx.receive().await {
data.extend_from_slice(&bytes);
}
let response = Response::decode(data.freeze());
match response {
Response::GetGame {
id,
file_descriptions,
} => {
if id != game_id {
eyre::bail!("peer {peer_addr} responded with mismatched game id {id}");
}
{
let mut db = peer_game_db.write().await;
db.update_peer_game_files(peer_addr, game_id, file_descriptions.clone());
}
Ok(file_descriptions)
}
Response::GameNotFound(_) => {
eyre::bail!("peer {peer_addr} does not have game {game_id}")
}
Response::InternalPeerError(error_msg) => {
eyre::bail!("peer {peer_addr} reported internal error: {error_msg}")
}
_ => eyre::bail!("unexpected response from {peer_addr}: {response:?}"),
}
}
async fn run_ping_service(
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
) {
log::info!(
"Starting ping service ({PEER_PING_INTERVAL_SECS}s interval, \
{PEER_STALE_TIMEOUT_SECS}s timeout)"
);
let mut interval = tokio::time::interval(Duration::from_secs(PEER_PING_INTERVAL_SECS));
loop {
interval.tick().await;
let peer_addresses = { peer_game_db.read().await.get_peer_addresses() };
for peer_addr in peer_addresses {
let tx_notify_ui_clone = tx_notify_ui.clone();
let peer_game_db_clone = peer_game_db.clone();
tokio::spawn(async move {
match ping_peer(peer_addr).await {
Ok(is_alive) => {
if is_alive {
// Update last seen time
peer_game_db_clone
.write()
.await
.update_last_seen(&peer_addr);
} else {
log::warn!("Peer {peer_addr} failed ping check");
// Remove stale peer
let removed_peer =
peer_game_db_clone.write().await.remove_peer(&peer_addr);
if removed_peer.is_some() {
log::info!("Removed stale peer: {peer_addr}");
if let Err(e) =
tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr))
{
log::error!("Failed to send PeerLost event: {e}");
}
// Send updated peer count
let current_peer_count =
{ peer_game_db_clone.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui_clone
.send(PeerEvent::PeerCountUpdated(current_peer_count))
{
log::error!("Failed to send PeerCountUpdated event: {e}");
}
emit_peer_game_list(&peer_game_db_clone, &tx_notify_ui_clone).await;
}
}
}
Err(e) => {
log::error!("Failed to ping peer {peer_addr}: {e}");
// Remove peer on error
let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_addr);
if removed_peer.is_some() {
log::info!("Removed peer due to ping error: {peer_addr}");
if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr))
{
log::error!("Failed to send PeerLost event: {e}");
}
// Send updated peer count
let current_peer_count =
{ peer_game_db_clone.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui_clone
.send(PeerEvent::PeerCountUpdated(current_peer_count))
{
log::error!("Failed to send PeerCountUpdated event: {e}");
}
emit_peer_game_list(&peer_game_db_clone, &tx_notify_ui_clone).await;
}
}
}
});
}
// Also clean up stale peers
let stale_peers = {
peer_game_db
.read()
.await
.get_stale_peers(Duration::from_secs(PEER_STALE_TIMEOUT_SECS))
};
let mut removed_any = false;
for stale_addr in stale_peers {
let removed_peer = peer_game_db.write().await.remove_peer(&stale_addr);
if removed_peer.is_some() {
log::info!("Removed stale peer: {stale_addr}");
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerLost(stale_addr)) {
log::error!("Failed to send PeerLost event: {e}");
}
// Send updated peer count
let current_peer_count = { peer_game_db.read().await.get_peer_addresses().len() };
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerCountUpdated(current_peer_count)) {
log::error!("Failed to send PeerCountUpdated event: {e}");
}
removed_any = true;
}
}
if removed_any {
emit_peer_game_list(&peer_game_db, &tx_notify_ui).await;
}
}
}
/// Monitor local game directory for changes and update the local game database
async fn run_local_game_monitor(tx_notify_ui: UnboundedSender<PeerEvent>, ctx: Ctx) {
log::info!("Starting local game directory monitor (5s interval)");
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let game_dir = {
let guard = ctx.game_dir.read().await;
guard.clone()
};
if let Some(ref game_dir) = game_dir {
match scan_local_games(game_dir).await {
Ok(current_games) => {
let local_game_db = ctx.local_game_db.clone();
let mut db_guard = local_game_db.write().await;
let previous_games = db_guard
.as_ref()
.map(|db| db.games.keys().cloned().collect::<HashSet<_>>())
.unwrap_or_default();
let current_game_ids =
current_games.games.keys().cloned().collect::<HashSet<_>>();
// Check if any games were removed
let removed_games: Vec<String> = previous_games
.difference(&current_game_ids)
.cloned()
.collect();
if removed_games.is_empty() {
// Check if any games were added or updated
if previous_games != current_game_ids {
log::debug!(
"Local games directory structure changed, updating database"
);
*db_guard = Some(current_games);
let all_games = db_guard
.as_ref()
.map(|db| {
db.all_games().into_iter().cloned().collect::<Vec<Game>>()
})
.unwrap_or_default();
if let Err(e) =
tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games))
{
log::error!("Failed to send LocalGamesUpdated event: {e}");
}
}
} else {
log::info!("Detected removed games: {removed_games:?}");
*db_guard = Some(current_games);
// Notify UI about the change
let all_games = db_guard
.as_ref()
.map(|db| db.all_games().into_iter().cloned().collect::<Vec<Game>>())
.unwrap_or_default();
if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games)) {
log::error!("Failed to send LocalGamesUpdated event: {e}");
}
}
}
Err(e) => {
log::error!("Failed to scan local games directory: {e}");
}
}
}
}
}
/// Scan the local games directory and return a `GameDB` with current games
async fn scan_local_games(game_dir: &str) -> eyre::Result<GameDB> {
load_local_game_db(game_dir).await
}
async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result<bool> {
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
let client = QuicClient::builder()
.with_tls(CERT_PEM)?
.with_io("0.0.0.0:0")?
.with_limits(limits)?
.start()?;
let conn = Connect::new(peer_addr).with_server_name("localhost");
let mut conn = client.connect(conn).await?;
let is_alive = initial_peer_alive_check(&mut conn).await;
Ok(is_alive)
}
fn select_advertise_ip() -> eyre::Result<IpAddr> {
let mut best_candidate: Option<(u8, IpAddr)> = None;
let mut loopback_fallback = None;
for interface in get_if_addrs()? {
if interface.is_loopback() {
loopback_fallback.get_or_insert(interface.ip());
continue;
}
if let Some(candidate) = classify_interface(&interface)
&& best_candidate
.as_ref()
.is_none_or(|(rank, _)| candidate.0 < *rank)
{
best_candidate = Some(candidate);
}
}
if let Some((_, ip)) = best_candidate {
return Ok(ip);
}
if let Some(ip) = loopback_fallback {
log::warn!(
"No non-loopback interface suitable for mDNS advertisement; falling back to {ip}"
);
return Ok(ip);
}
eyre::bail!("No usable network interface found for mDNS advertisement");
}
fn classify_interface(interface: &Interface) -> Option<(u8, IpAddr)> {
match interface.addr {
IfAddr::V4(ref v4) => {
let ip = v4.ip;
if ip.is_unspecified() || ip.is_link_local() {
return None;
}
let mut rank = if ip.is_private() { 0 } else { 2 };
if is_virtual_interface(&interface.name) {
rank += 2;
}
Some((rank, IpAddr::V4(ip)))
}
IfAddr::V6(_) => None,
}
}
fn is_virtual_interface(name: &str) -> bool {
const VIRTUAL_HINTS: &[&str] = &[
"awdl",
"br-",
"bridge",
"docker",
"ham",
"llw",
"tap",
"tailscale",
"tun",
"utun",
"vbox",
"veth",
"virbr",
"vmnet",
"wg",
"zt",
];
let lower = name.to_ascii_lowercase();
VIRTUAL_HINTS.iter().any(|hint| lower.contains(hint))
}
async fn get_game_file_descriptions(
game_id: &str,
game_dir: &str,
) -> Result<Vec<GameFileDescription>, PeerError> {
let base_dir = PathBuf::from(game_dir);
let game_path = base_dir.join(game_id);
if !game_path.exists() {
return Err(PeerError::Other(eyre::eyre!(
"Game directory does not exist: {}",
game_path.display()
)));
}
let mut file_descriptions = Vec::new();
let local_dir = game_path.join("local");
for entry in walkdir::WalkDir::new(&game_path)
.into_iter()
// Skip the local install folder; it's not meant to sync.
.filter_entry(|entry| !entry.path().starts_with(&local_dir))
.filter_map(std::result::Result::ok)
{
let relative_path = match entry.path().strip_prefix(&base_dir) {
Ok(path) => path.to_string_lossy().to_string(),
Err(e) => {
log::error!(
"Failed to get relative path for {}: {}",
entry.path().display(),
e
);
continue;
}
};
let is_dir = entry.file_type().is_dir();
let size = if is_dir {
0
} else {
match tokio::fs::metadata(entry.path()).await {
Ok(metadata) => metadata.len(),
Err(e) => {
log::error!("Failed to read metadata for {relative_path}: {e}");
return Err(PeerError::FileSizeDetermination {
path: relative_path.clone(),
source: e,
});
}
}
};
let file_desc = GameFileDescription {
game_id: game_id.to_string(),
relative_path,
is_dir,
size,
};
file_descriptions.push(file_desc);
}
Ok(file_descriptions)
}
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use super::*;
fn loopback_addr(port: u16) -> SocketAddr {
SocketAddr::from(([127, 0, 0, 1], port))
}
#[test]
fn build_peer_plans_handles_partial_final_chunk() {
let peers = vec![loopback_addr(12000), loopback_addr(12001)];
let file_size = CHUNK_SIZE * 2 + CHUNK_SIZE / 4;
let mut file_peer_map = HashMap::new();
file_peer_map.insert("game/file.dat".to_string(), peers.clone());
let file_descs = vec![GameFileDescription {
game_id: "test".to_string(),
relative_path: "game/file.dat".to_string(),
is_dir: false,
size: file_size,
}];
let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let mut chunks: Vec<_> = plans.values().flat_map(|plan| plan.chunks.iter()).collect();
assert_eq!(chunks.len(), 3, "expected three chunks for 2.25 blocks");
chunks.sort_by_key(|chunk| chunk.offset);
let last_chunk = chunks.last().expect("last chunk exists");
assert_eq!(last_chunk.offset, CHUNK_SIZE * 2);
assert_eq!(last_chunk.length, file_size - last_chunk.offset);
assert_eq!(last_chunk.length, CHUNK_SIZE / 4);
assert_eq!(
last_chunk.offset + last_chunk.length,
file_size,
"last chunk should finish the file"
);
}
#[test]
fn build_peer_plans_respects_file_peer_map() {
let shared_a = loopback_addr(12010);
let shared_b = loopback_addr(12011);
let exclusive = loopback_addr(12012);
let peers = vec![shared_a, shared_b, exclusive];
let mut file_peer_map = HashMap::new();
file_peer_map.insert("shared.bin".to_string(), vec![shared_a, shared_b]);
file_peer_map.insert("exclusive.bin".to_string(), vec![exclusive]);
let file_descs = vec![
GameFileDescription {
game_id: "test".to_string(),
relative_path: "shared.bin".to_string(),
is_dir: false,
size: CHUNK_SIZE * 2,
},
GameFileDescription {
game_id: "test".to_string(),
relative_path: "exclusive.bin".to_string(),
is_dir: false,
size: CHUNK_SIZE,
},
];
let plans = build_peer_plans(&peers, &file_descs, &file_peer_map);
let exclusive_plan = plans
.get(&exclusive)
.expect("exclusive peer should have a plan");
assert!(
exclusive_plan
.chunks
.iter()
.all(|chunk| chunk.relative_path == "exclusive.bin"),
"exclusive peer should only receive exclusive.bin chunks"
);
for (peer, plan) in plans {
for chunk in plan.chunks {
match chunk.relative_path.as_str() {
"exclusive.bin" => assert_eq!(
peer, exclusive,
"exclusive.bin chunks should only be assigned to the exclusive peer"
),
"shared.bin" => assert!(
peer == shared_a || peer == shared_b,
"shared.bin chunks must stay within shared peers"
),
other => panic!("unexpected file in plan: {other}"),
}
}
}
}
}