This commit is contained in:
2025-11-11 21:20:03 +01:00
parent 936111e3c6
commit d831179783
4 changed files with 385 additions and 46 deletions
+370 -39
View File
@@ -10,14 +10,16 @@ use std::{
time::{Duration, Instant},
};
use crate::peer::{send_game_file_chunk, send_game_file_data};
use bytes::BytesMut;
use gethostname::gethostname;
use lanspread_db::db::{Game, GameDB, GameFileDescription};
use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, discover_service};
use lanspread_proto::{Message, Request, Response};
use s2n_quic::{
Client as QuicClient, Connection, Server, client::Connect, provider::limits::Limits,
Client as QuicClient,
Connection,
Server,
client::Connect,
provider::limits::Limits,
stream::BidirectionalStream,
};
use tokio::{
@@ -30,6 +32,8 @@ use tokio::{
};
use uuid::Uuid;
use crate::peer::{send_game_file_chunk, send_game_file_data};
static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem"));
static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem"));
@@ -68,7 +72,14 @@ pub struct PeerGameDB {
peers: HashMap<SocketAddr, PeerInfo>,
}
impl Default for PeerGameDB {
fn default() -> Self {
Self::new()
}
}
impl PeerGameDB {
#[must_use]
pub fn new() -> Self {
Self {
peers: HashMap::new(),
@@ -120,6 +131,7 @@ impl PeerGameDB {
}
}
#[must_use]
pub fn get_all_games(&self) -> Vec<Game> {
let mut aggregated: HashMap<String, Game> = HashMap::new();
for peer in self.peers.values() {
@@ -127,14 +139,14 @@ impl PeerGameDB {
aggregated
.entry(game.id.clone())
.and_modify(|existing| {
if let (Some(ref new_version), Some(ref current)) =
if let (Some(new_version), Some(current)) =
(&game.eti_game_version, &existing.eti_game_version)
{
if new_version > current {
existing.eti_game_version = Some(new_version.clone());
}
} else if existing.eti_game_version.is_none() {
existing.eti_game_version = game.eti_game_version.clone();
existing.eti_game_version.clone_from(&game.eti_game_version);
}
})
.or_insert_with(|| game.clone());
@@ -146,18 +158,19 @@ impl PeerGameDB {
games
}
#[must_use]
pub fn get_latest_version_for_game(&self, game_id: &str) -> Option<String> {
let mut latest_version: Option<String> = None;
for peer in self.peers.values() {
if let Some(game) = peer.games.get(game_id) {
if let Some(ref version) = game.eti_game_version {
match &latest_version {
None => latest_version = Some(version.clone()),
Some(current_latest) => {
if version > current_latest {
latest_version = Some(version.clone());
}
if let Some(game) = peer.games.get(game_id)
&& let Some(ref version) = game.eti_game_version
{
match &latest_version {
None => latest_version = Some(version.clone()),
Some(current_latest) => {
if version > current_latest {
latest_version = Some(version.clone());
}
}
}
@@ -167,10 +180,12 @@ impl PeerGameDB {
latest_version
}
#[must_use]
pub fn get_peer_addresses(&self) -> Vec<SocketAddr> {
self.peers.keys().copied().collect()
}
#[must_use]
pub fn peers_with_game(&self, game_id: &str) -> Vec<SocketAddr> {
self.peers
.iter()
@@ -179,6 +194,33 @@ impl PeerGameDB {
.collect()
}
#[must_use]
pub fn peers_with_latest_version(&self, game_id: &str) -> Vec<SocketAddr> {
let latest_version = self.get_latest_version_for_game(game_id);
if let Some(ref latest) = latest_version {
self.peers
.iter()
.filter(|(_, peer)| {
if let Some(game) = peer.games.get(game_id) {
if let Some(ref version) = game.eti_game_version {
version == latest
} else {
false
}
} else {
false
}
})
.map(|(addr, _)| *addr)
.collect()
} else {
// If no version info is available, fall back to all peers with the game
self.peers_with_game(game_id)
}
}
#[must_use]
pub fn game_files_for(&self, game_id: &str) -> Vec<(SocketAddr, Vec<GameFileDescription>)> {
self.peers
.iter()
@@ -186,6 +228,7 @@ impl PeerGameDB {
.collect()
}
#[must_use]
pub fn aggregated_game_files(&self, game_id: &str) -> Vec<GameFileDescription> {
let mut seen: HashMap<String, GameFileDescription> = HashMap::new();
for (_, files) in self.game_files_for(game_id) {
@@ -196,6 +239,7 @@ impl PeerGameDB {
seen.into_values().collect()
}
#[must_use]
pub fn get_stale_peers(&self, timeout: Duration) -> Vec<SocketAddr> {
self.peers
.iter()
@@ -253,12 +297,14 @@ async fn initial_peer_alive_check(conn: &mut Connection) -> bool {
}
const CHUNK_SIZE: u64 = 512 * 1024;
const MAX_RETRY_COUNT: usize = 3;
#[derive(Debug, Clone)]
struct DownloadChunk {
relative_path: String,
offset: u64,
length: u64,
retry_count: usize,
}
#[derive(Debug, Default)]
@@ -267,6 +313,13 @@ struct PeerDownloadPlan {
whole_files: Vec<GameFileDescription>,
}
#[derive(Debug)]
struct ChunkDownloadResult {
chunk: DownloadChunk,
result: eyre::Result<()>,
peer_addr: SocketAddr,
}
async fn prepare_game_storage(
games_folder: &Path,
file_descs: &[GameFileDescription],
@@ -280,8 +333,9 @@ async fn prepare_game_storage(
tokio::fs::create_dir_all(parent).await?;
}
let file_size = desc.file_size().unwrap_or(0);
let mut file = OpenOptions::new()
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&path)
.await?;
@@ -311,6 +365,7 @@ fn build_peer_plans(
relative_path: desc.relative_path.clone(),
offset: 0,
length: 0,
retry_count: 0,
});
continue;
}
@@ -324,6 +379,7 @@ fn build_peer_plans(
relative_path: desc.relative_path.clone(),
offset,
length,
retry_count: 0,
});
offset += length;
}
@@ -380,8 +436,12 @@ async fn download_chunk(
file.seek(std::io::SeekFrom::Start(chunk.offset)).await?;
let mut remaining = chunk.length;
let mut received_bytes = 0u64;
while let Some(bytes) = rx.receive().await? {
file.write_all(&bytes).await?;
received_bytes += bytes.len() as u64;
if remaining == 0 {
continue;
}
@@ -391,7 +451,46 @@ async fn download_chunk(
}
}
// Verify we received the expected amount of data
if chunk.length > 0 && received_bytes != chunk.length {
eyre::bail!(
"Incomplete chunk download: expected {} bytes, received {} bytes for file {} at offset {}",
chunk.length,
received_bytes,
chunk.relative_path,
chunk.offset
);
}
file.flush().await?;
// Verify file integrity by checking the file size
verify_chunk_integrity(&path, chunk.offset, chunk.length).await?;
Ok(())
}
async fn verify_chunk_integrity(
file_path: &Path,
offset: u64,
expected_length: u64,
) -> eyre::Result<()> {
if expected_length == 0 {
return Ok(()); // Skip verification for whole files or zero-length chunks
}
let metadata = tokio::fs::metadata(file_path).await?;
let file_size = metadata.len();
if file_size < offset + expected_length {
eyre::bail!(
"File integrity check failed: file size {} is less than expected {} (offset: {})",
file_size,
offset + expected_length,
offset
);
}
Ok(())
}
@@ -428,9 +527,9 @@ async fn download_from_peer(
game_id: &str,
plan: PeerDownloadPlan,
games_folder: PathBuf,
) -> eyre::Result<()> {
) -> eyre::Result<Vec<ChunkDownloadResult>> {
if plan.chunks.is_empty() && plan.whole_files.is_empty() {
return Ok(());
return Ok(Vec::new());
}
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
@@ -446,16 +545,36 @@ async fn download_from_peer(
conn.keep_alive(true)?;
let base_dir = games_folder;
let mut results = Vec::new();
// Download chunks with error handling
for chunk in &plan.chunks {
download_chunk(&mut conn, &base_dir, game_id, chunk).await?;
let result = download_chunk(&mut conn, &base_dir, game_id, chunk).await;
results.push(ChunkDownloadResult {
chunk: chunk.clone(),
result,
peer_addr,
});
}
// Download whole files
for desc in &plan.whole_files {
download_whole_file(&mut conn, &base_dir, desc).await?;
let chunk = DownloadChunk {
relative_path: desc.relative_path.clone(),
offset: 0,
length: 0, // Indicates whole file
retry_count: 0,
};
let result = download_whole_file(&mut conn, &base_dir, desc).await;
results.push(ChunkDownloadResult {
chunk,
result,
peer_addr,
});
}
Ok(())
Ok(results)
}
async fn download_game_files(
@@ -487,16 +606,54 @@ async fn download_game_files(
}));
}
let mut failed_chunks: Vec<DownloadChunk> = Vec::new();
let mut last_err: Option<eyre::Report> = None;
for handle in tasks {
match handle.await {
Ok(Ok(())) => {}
Ok(Ok(results)) => {
for chunk_result in results {
if let Err(e) = chunk_result.result {
log::warn!(
"Failed to download chunk from {}: {e}",
chunk_result.peer_addr
);
if chunk_result.chunk.retry_count < MAX_RETRY_COUNT {
let mut retry_chunk = chunk_result.chunk;
retry_chunk.retry_count += 1;
failed_chunks.push(retry_chunk);
} else {
last_err = Some(eyre::eyre!(
"Max retries exceeded for chunk: {}",
chunk_result.chunk.relative_path
));
}
}
}
}
Ok(Err(e)) => last_err = Some(e),
Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")),
}
}
// Retry failed chunks if any
if !failed_chunks.is_empty() && !peers.is_empty() {
log::info!("Retrying {} failed chunks", failed_chunks.len());
let retry_results = retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id).await;
for chunk_result in retry_results {
if let Err(e) = chunk_result.result {
log::error!("Retry failed for chunk: {e}");
last_err = Some(e);
}
}
}
if let Some(err) = last_err {
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed {
id: game_id.to_string(),
})?;
return Err(err);
}
@@ -507,6 +664,101 @@ async fn download_game_files(
Ok(())
}
async fn retry_failed_chunks(
failed_chunks: Vec<DownloadChunk>,
peers: &[SocketAddr],
base_dir: &Path,
game_id: &str,
) -> Vec<ChunkDownloadResult> {
let mut results = Vec::new();
// Redistribute failed chunks among available peers
let _retry_plans = build_peer_plans(peers, &[]);
for (i, chunk) in failed_chunks.into_iter().enumerate() {
let peer_addr = peers[i % peers.len()];
let plan = PeerDownloadPlan {
chunks: vec![chunk],
whole_files: Vec::new(),
};
match download_from_peer(peer_addr, game_id, plan, base_dir.to_path_buf()).await {
Ok(chunk_results) => results.extend(chunk_results),
Err(e) => {
log::error!("Failed to retry chunk: {e}");
// Add empty failure result
results.push(ChunkDownloadResult {
chunk: DownloadChunk {
relative_path: "unknown".to_string(),
offset: 0,
length: 0,
retry_count: MAX_RETRY_COUNT,
},
result: Err(e),
peer_addr,
});
}
}
}
results
}
async fn load_local_game_db(game_dir: &str) -> eyre::Result<GameDB> {
let game_path = PathBuf::from(game_dir);
// Scan game directory for game folders
let mut games = Vec::new();
let mut entries = tokio::fs::read_dir(&game_path).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir()
&& let Some(game_id) = path.file_name().and_then(|n| n.to_str())
{
// Check if this game has a version.ini file
if let Ok(version) = lanspread_db::db::read_version_from_ini(&path) {
let size = calculate_directory_size(&path).await?;
let game = Game {
id: game_id.to_string(),
name: game_id.to_string(), // Use folder name as game name for now
description: String::new(),
release_year: String::new(),
publisher: String::new(),
max_players: 1,
version: "1.0".to_string(),
genre: String::new(),
size,
thumbnail: None,
installed: true,
eti_game_version: version.clone(),
local_version: version,
};
games.push(game);
}
}
}
Ok(GameDB::from(games))
}
async fn calculate_directory_size(dir: &Path) -> eyre::Result<u64> {
let mut total_size = 0u64;
let mut entries = tokio::fs::read_dir(dir).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let metadata = tokio::fs::metadata(&path).await?;
if metadata.is_dir() {
total_size += Box::pin(calculate_directory_size(&path)).await?;
} else {
total_size += metadata.len();
}
}
Ok(total_size)
}
struct Ctx {
game_dir: Arc<RwLock<Option<String>>>,
local_game_db: Arc<RwLock<Option<GameDB>>>,
@@ -632,9 +884,9 @@ pub async fn run_peer(
}
let games_folder = games_folder.expect("checked above");
let peers = { ctx.peer_game_db.read().await.peers_with_game(&id) };
let peers = { ctx.peer_game_db.read().await.peers_with_latest_version(&id) };
if peers.is_empty() {
log::error!("No peers available to download game {id}");
log::error!("No peers with latest version available to download game {id}");
continue;
}
@@ -679,7 +931,21 @@ pub async fn run_peer(
PeerCommand::SetGameDir(game_dir) => {
*ctx.game_dir.write().await = Some(game_dir.clone());
log::info!("Game directory set to: {game_dir}");
// TODO: Load local game database when game directory is set
// Load local game database when game directory is set
let game_dir = game_dir.clone();
let local_game_db = ctx.local_game_db.clone();
tokio::spawn(async move {
match load_local_game_db(&game_dir).await {
Ok(db) => {
*local_game_db.write().await = Some(db);
log::info!("Local game database loaded successfully");
}
Err(e) => {
log::error!("Failed to load local game database: {e}");
}
}
});
}
PeerCommand::ConnectToPeer(peer_addr) => {
log::info!("Connecting to peer: {peer_addr}");
@@ -740,7 +1006,7 @@ async fn run_server_component(
tokio::time::sleep(Duration::from_secs(1)).await;
}
_ => {
log::trace!("mDNS event: {:?}", event);
log::trace!("mDNS event: {event:?}");
}
}
}
@@ -752,7 +1018,7 @@ async fn run_server_component(
tokio::spawn(async move {
if let Err(e) = handle_peer_connection(connection, ctx, tx_notify_ui).await {
log::error!("Peer connection error: {}", e);
log::error!("Peer connection error: {e}");
}
});
}
@@ -798,7 +1064,7 @@ async fn handle_peer_stream(
) -> eyre::Result<()> {
let (mut rx, mut tx) = stream.split();
log::trace!("{:?} peer stream opened", remote_addr);
log::trace!("{remote_addr:?} peer stream opened");
// handle streams
loop {
@@ -811,7 +1077,7 @@ async fn handle_peer_stream(
);
let request = Request::decode(data);
log::debug!("{:?} msg: {request:?}", remote_addr);
log::debug!("{remote_addr:?} msg: {request:?}");
match request {
Request::Ping => {
@@ -835,13 +1101,23 @@ async fn handle_peer_stream(
}
Request::GetGame { id } => {
log::info!("Received GetGame request for {id} from peer");
// TODO: Handle game request using local game DB
let response = if let Some(ref db) = *ctx.local_game_db.read().await {
if db.get_game_by_id(&id).is_some() {
// TODO: Return actual game file descriptions
Response::GetGame {
id,
file_descriptions: Vec::new(),
let response = if let Some(ref game_dir) = *ctx.game_dir.read().await {
if let Some(ref db) = *ctx.local_game_db.read().await {
if db.get_game_by_id(&id).is_some() {
match get_game_file_descriptions(&id, game_dir).await {
Ok(file_descriptions) => Response::GetGame {
id,
file_descriptions,
},
Err(e) => {
log::error!(
"Failed to get game file descriptions for {id}: {e}"
);
Response::GameNotFound(id)
}
}
} else {
Response::GameNotFound(id)
}
} else {
Response::GameNotFound(id)
@@ -918,11 +1194,11 @@ async fn handle_peer_stream(
}
}
Ok(None) => {
log::trace!("{:?} peer stream closed", remote_addr);
log::trace!("{remote_addr:?} peer stream closed");
break;
}
Err(e) => {
log::error!("{:?} peer stream error: {e}", remote_addr);
log::error!("{remote_addr:?} peer stream error: {e}");
break;
}
}
@@ -946,11 +1222,11 @@ async fn run_peer_discovery(
let is_new_peer = {
let mut db = peer_game_db.write().await;
let peer_addresses = db.get_peer_addresses();
if !peer_addresses.contains(&peer_addr) {
if peer_addresses.contains(&peer_addr) {
false
} else {
db.add_peer(peer_addr);
true
} else {
false
}
};
@@ -1189,3 +1465,58 @@ async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result<bool> {
let is_alive = initial_peer_alive_check(&mut conn).await;
Ok(is_alive)
}
async fn get_game_file_descriptions(
game_id: &str,
game_dir: &str,
) -> eyre::Result<Vec<GameFileDescription>> {
let base_dir = PathBuf::from(game_dir);
let game_path = base_dir.join(game_id);
if !game_path.exists() {
eyre::bail!("Game directory does not exist: {}", game_path.display());
}
let mut file_descriptions = Vec::new();
for entry in walkdir::WalkDir::new(&game_path)
.into_iter()
.filter_map(std::result::Result::ok)
{
let relative_path = match entry.path().strip_prefix(&base_dir) {
Ok(path) => path.to_string_lossy().to_string(),
Err(e) => {
log::error!(
"Failed to get relative path for {}: {}",
entry.path().display(),
e
);
continue;
}
};
let is_dir = entry.file_type().is_dir();
let size = if is_dir {
None
} else {
match tokio::fs::metadata(entry.path()).await {
Ok(metadata) => Some(metadata.len()),
Err(e) => {
log::error!("Failed to read metadata for {relative_path}: {e}");
None
}
}
};
let file_desc = GameFileDescription {
game_id: game_id.to_string(),
relative_path,
is_dir,
size,
};
file_descriptions.push(file_desc);
}
Ok(file_descriptions)
}
+10 -4
View File
@@ -14,8 +14,7 @@ use gethostname::gethostname;
use lanspread_compat::eti;
use lanspread_db::db::{Game, GameDB};
use lanspread_mdns::{DaemonEvent, LANSPREAD_SERVICE_TYPE, MdnsAdvertiser};
use lanspread_peer::{PeerEvent, run_peer};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use lanspread_peer::{PeerCommand, PeerEvent, run_peer};
use tracing_subscriber::EnvFilter;
use uuid::Uuid;
@@ -110,7 +109,7 @@ async fn main() -> eyre::Result<()> {
// spawn mDNS listener task
spawn_mdns_task(server_addr)?;
let game_db = prepare_game_db(&cli).await?;
let _game_db = prepare_game_db(&cli).await?;
tracing::info!("Peer listening on {server_addr}");
@@ -162,7 +161,14 @@ async fn main() -> eyre::Result<()> {
}
});
// TODO: Add CLI interaction or other peer discovery logic here
// Set the game directory from CLI args
if let Err(e) = tx_control.send(PeerCommand::SetGameDir(
cli.game_dir.to_string_lossy().to_string(),
)) {
tracing::error!("Failed to send SetGameDir command: {e}");
}
// TODO: Add additional CLI interaction or other peer discovery logic here
// Wait for tasks
let (peer_result, _) = tokio::join!(peer_task, event_handler);
+4 -2
View File
@@ -1,4 +1,5 @@
use std::{
convert::TryInto,
path::{Path, PathBuf},
sync::Arc,
};
@@ -150,7 +151,7 @@ async fn stream_file_bytes(
let remote_addr = maybe_addr!(tx.connection().remote_addr());
let game_file = base_dir.join(relative_path);
tracing::debug!(
"{remote_addr} streaming file bytes for peer: {:?}, offset: {offset}, length: {:?}",
"{remote_addr} streaming file bytes for peer: {:?}, offset: {offset}, length: {length:?}",
game_file
);
@@ -166,7 +167,8 @@ async fn stream_file_bytes(
let mut buf = vec![0u8; 64 * 1024];
while remaining > 0 {
let read_len = std::cmp::min(remaining, buf.len() as u64) as usize;
let read_len = std::cmp::min(remaining, buf.len() as u64);
let read_len: usize = read_len.try_into().unwrap_or(usize::MAX);
if read_len == 0 {
break;
}