Compare commits

...

5 Commits

Author SHA1 Message Date
ddidderr 06398fe298 fix(peer): reject transfer paths outside requested game
Inbound file-transfer requests carry both a game ID and a relative path. The
serve gate validated whether the requested game was currently servable, but it
did not require the path itself to be rooted under that same game. A
non-conforming peer could therefore register a guard for one game while asking
to read files from another game root.

Require normalized transfer paths to start with the requested game ID before the
file can be dispatched. This keeps the outbound transfer guard, serve policy,
and filesystem path aligned. Absolute, traversal, local-data, missing-sentinel,
active-operation, and wrong-version paths remain rejected by the existing gates.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #4
2026-05-30 16:36:59 +02:00
ddidderr 9b700c7e3f fix(peer): bound outbound transfer drain waits
Update and remove-download operations must wait for existing outbound readers to
release game files before mutating or deleting the game root. That wait was
unbounded, so a stuck transfer guard could leave the game permanently marked as
Updating or RemovingDownload and prevent the requested operation from ever
starting.

Return a structured begin-operation result and put a five-second timeout around
the drain wait. If the transfer count does not reach zero, the operation start
fails, the active-operation snapshot is cleared, and the caller emits the
normal failure event for the attempted operation. The destructive mutation is
not allowed to proceed after a timeout.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #3
2026-05-30 16:36:59 +02:00
ddidderr 7e40cf4bfb fix(ui): coalesce outbound transfer list refreshes
Every outbound transfer start and finish can arrive on a hot path while a peer
is serving many file chunks. The Tauri event handler used to rebuild and emit
the full games list for each edge, cloning all games and probing per-game server
script files repeatedly during an active serve.

Batch outbound-transfer count changes behind a short scheduled refresh. The
peer still records exact counts in shared state, and the delayed refresh reads
that state once per burst. A generation counter keeps changes that arrive while
an emit is already scheduled from being lost; they trigger one follow-up emit
with the latest counts.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #2
2026-05-30 16:36:59 +02:00
ddidderr f89ff9ceea fix(peer): reset cancelled outbound file streams
Cancelled outbound transfers previously returned from the streaming loop without
terminating the QUIC send half. A whole-file receiver relies on the stream
ending to distinguish EOF from an in-progress body, so cancellation could leave
it waiting on a truncated transfer until its own timeout fired.

Reset the send stream on every cancellation branch, including cancellation
while waiting for the final close acknowledgement. A reset is deliberately used
instead of a graceful close so truncated whole-file transfers cannot be
misinterpreted as a valid EOF.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #1
2026-05-30 16:36:58 +02:00
ddidderr 738095235f feat(peer): coordinate outbound transfers with local game mutations
Updating or removing a local game rewrites its on-disk files. Peers that
were mid-download of that game would keep streaming bytes from files that
are being deleted or replaced, handing them a corrupt or stale copy.
There was also no authoritative notion of which game version a peer
should serve or accept, so a peer could serve whatever happened to be on
disk and downloaders could aggregate files from peers running mismatched
versions.

This introduces a reader-writer coordination scheme between outbound file
transfers (readers) and local mutation operations (writers), and gates
both serving and downloading on an authoritative game catalog version.

Reader-writer coordination:
- Track active outbound transfers per game in a shared `OutboundTransfers`
  map of (id, CancellationToken), threaded through `Ctx`/`PeerCtx` and
  registered by a `TransferGuard` in the stream service. The guard is
  registered *before* the serve-eligibility check to close a TOCTOU window
  where a writer could miss an in-flight reader.
- `stream_file_bytes` now honors a cancellation token at every await point
  (file read, network send, stream close) via `tokio::select!`, so a
  transfer aborts promptly instead of hanging on a stalled receiver.
- `begin_operation` marks a game active first, then cancels its outbound
  transfers and waits for the count to reach zero before any
  Updating/RemovingDownload work touches the filesystem.
- Active games are now hidden from library snapshots entirely while an
  operation is in flight, instead of freezing their last announced state,
  so peers stop discovering a game that is being mutated.

Authoritative version catalog:
- Replace the `HashSet<String>` catalog with `GameCatalog`, mapping each
  game id to its expected version (from the bundled game.db / ETI data).
- Serving requires the local `version.ini` to match the catalog version
  (`local_download_matches_catalog`); peer selection, file aggregation,
  and majority size validation all filter on the expected version
  (`peers_with_expected_version`, `aggregated_game_files`, and friends).

User-visible changes:
- The GUI shows confirmation dialogs before Update and Remove, and
  surfaces a sharing-status indicator on game cards and the detail modal.
- A new `OutboundTransferCountChanged` event lets the UI reflect live
  outbound transfer activity.

Test Plan:
- just test
- just frontend-test
- just clippy
2026-05-30 16:36:58 +02:00
24 changed files with 1182 additions and 228 deletions
Generated
+1
View File
@@ -2217,6 +2217,7 @@ dependencies = [
"tauri-plugin-shell", "tauri-plugin-shell",
"tauri-plugin-store", "tauri-plugin-store",
"tokio", "tokio",
"tokio-util",
"walkdir", "walkdir",
"windows 0.62.2", "windows 0.62.2",
] ]
+2 -2
View File
@@ -57,14 +57,14 @@ impl From<EtiGame> for Game {
release_year: eti_game.game_release, release_year: eti_game.game_release,
publisher: eti_game.game_publisher, publisher: eti_game.game_publisher,
max_players: eti_game.game_maxplayers, max_players: eti_game.game_maxplayers,
version: eti_game.game_version, version: eti_game.game_version.clone(),
genre: eti_game.genre_de, genre: eti_game.genre_de,
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
size: (eti_game.game_size * 1024.0 * 1024.0 * 1024.0) as u64, size: (eti_game.game_size * 1024.0 * 1024.0 * 1024.0) as u64,
downloaded: false, downloaded: false,
installed: false, installed: false,
availability: Availability::LocalOnly, availability: Availability::LocalOnly,
eti_game_version: None, eti_game_version: Some(eti_game.game_version),
local_version: None, local_version: None,
peer_count: 0, // ETI games start with 0 peers until peer system discovers them peer_count: 0, // ETI games start with 0 peers until peer system discovers them
} }
+55 -1
View File
@@ -78,7 +78,7 @@ pub struct Game {
/// Backend-reported availability state for this game's local or peer summary. /// Backend-reported availability state for this game's local or peer summary.
#[serde(default)] #[serde(default)]
pub availability: Availability, pub availability: Availability,
/// ETI game version from version.ini (YYYYMMDD format) (server) /// Authoritative ETI game version from the bundled game.db (YYYYMMDD format).
pub eti_game_version: Option<String>, pub eti_game_version: Option<String>,
/// Local game version from version.ini (YYYYMMDD format) /// Local game version from version.ini (YYYYMMDD format)
pub local_version: Option<String>, pub local_version: Option<String>,
@@ -198,6 +198,60 @@ impl Default for GameDB {
} }
} }
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct GameCatalog {
expected_versions: HashMap<String, Option<String>>,
}
impl GameCatalog {
#[must_use]
pub fn empty() -> Self {
Self {
expected_versions: HashMap::new(),
}
}
#[must_use]
pub fn from_game_db(game_db: &GameDB) -> Self {
Self {
expected_versions: game_db
.games
.values()
.map(|game| (game.id.clone(), game.eti_game_version.clone()))
.collect(),
}
}
#[must_use]
pub fn from_ids(ids: impl IntoIterator<Item = String>) -> Self {
Self {
expected_versions: ids.into_iter().map(|id| (id, None)).collect(),
}
}
pub fn insert(&mut self, id: String, expected_version: Option<String>) {
self.expected_versions.insert(id, expected_version);
}
#[must_use]
pub fn contains<S>(&self, id: S) -> bool
where
S: AsRef<str>,
{
self.expected_versions.contains_key(id.as_ref())
}
#[must_use]
pub fn expected_version<S>(&self, id: S) -> Option<&str>
where
S: AsRef<str>,
{
self.expected_versions
.get(id.as_ref())
.and_then(Option::as_deref)
}
}
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct GameFileDescription { pub struct GameFileDescription {
pub game_id: String, pub game_id: String,
+21 -15
View File
@@ -12,7 +12,7 @@ use std::{
use eyre::Context; use eyre::Context;
use lanspread_compat::eti::get_games; use lanspread_compat::eti::get_games;
use lanspread_db::db::{Game, GameFileDescription}; use lanspread_db::db::{Game, GameCatalog, GameFileDescription};
use lanspread_peer::{ use lanspread_peer::{
ActiveOperation, ActiveOperation,
ActiveOperationKind, ActiveOperationKind,
@@ -30,6 +30,7 @@ use lanspread_peer::{
use lanspread_peer_cli::{ use lanspread_peer_cli::{
CliCommand, CliCommand,
CommandEnvelope, CommandEnvelope,
DEFAULT_FIXTURE_VERSION,
ExternalUnrarUnpacker, ExternalUnrarUnpacker,
FixtureSeed, FixtureSeed,
FixtureUnpacker, FixtureUnpacker,
@@ -114,7 +115,7 @@ struct DownloadMeasurement {
struct SharedState { struct SharedState {
state: RwLock<CliState>, state: RwLock<CliState>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<HashSet<String>>>, catalog: Arc<RwLock<GameCatalog>>,
notify: Notify, notify: Notify,
games_dir: PathBuf, games_dir: PathBuf,
state_dir: PathBuf, state_dir: PathBuf,
@@ -146,6 +147,7 @@ async fn main() -> eyre::Result<()> {
catalog.clone(), catalog.clone(),
PeerStartOptions { PeerStartOptions {
state_dir: Some(args.state_dir.clone()), state_dir: Some(args.state_dir.clone()),
active_outbound_transfers: None,
}, },
)?; )?;
let sender = handle.sender(); let sender = handle.sender();
@@ -303,15 +305,8 @@ async fn list_peers(shared: &SharedState) -> eyre::Result<Value> {
async fn list_games(shared: &SharedState) -> eyre::Result<Value> { async fn list_games(shared: &SharedState) -> eyre::Result<Value> {
let state = shared.state.read().await; let state = shared.state.read().await;
let catalog = shared.catalog.read().await.clone(); let catalog = shared.catalog.read().await;
let remote = shared let remote = shared.peer_game_db.read().await.get_catalog_games(&catalog);
.peer_game_db
.read()
.await
.get_all_games()
.into_iter()
.filter(|game| catalog.contains(&game.id))
.collect::<Vec<_>>();
Ok(json!({ Ok(json!({
"local": state.local_games.clone(), "local": state.local_games.clone(),
"remote": remote, "remote": remote,
@@ -434,6 +429,7 @@ async fn event_loop(
} }
} }
#[allow(clippy::too_many_lines)]
async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'static str, Value) { async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'static str, Value) {
match event { match event {
PeerEvent::LocalPeerReady { peer_id, addr } => { PeerEvent::LocalPeerReady { peer_id, addr } => {
@@ -458,6 +454,7 @@ async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'s
state.local_games.clone_from(&games); state.local_games.clone_from(&games);
("local-library-changed", json!({ "games": games })) ("local-library-changed", json!({ "games": games }))
} }
PeerEvent::OutboundTransferCountChanged => ("outbound-transfer-count-changed", json!({})),
PeerEvent::ActiveOperationsChanged { active_operations } => { PeerEvent::ActiveOperationsChanged { active_operations } => {
let mut state = shared.state.write().await; let mut state = shared.state.write().await;
state.active_operations.clone_from(&active_operations); state.active_operations.clone_from(&active_operations);
@@ -668,18 +665,27 @@ fn seed_fixtures(game_dir: &Path, fixtures: &[String]) -> eyre::Result<Vec<Fixtu
.collect() .collect()
} }
async fn load_catalog(catalog_db: Option<&Path>, fixtures: &[FixtureSeed]) -> HashSet<String> { async fn load_catalog(catalog_db: Option<&Path>, fixtures: &[FixtureSeed]) -> GameCatalog {
let mut catalog = HashSet::new(); let mut catalog = GameCatalog::empty();
if let Some(path) = catalog_db if let Some(path) = catalog_db
&& path.exists() && path.exists()
{ {
match get_games(path).await { match get_games(path).await {
Ok(games) => catalog.extend(games.into_iter().map(|game| game.game_id)), Ok(games) => {
for game in games {
catalog.insert(game.game_id, Some(game.game_version));
}
}
Err(err) => eprintln!("failed to load catalog db {}: {err}", path.display()), Err(err) => eprintln!("failed to load catalog db {}: {err}", path.display()),
} }
} }
catalog.extend(fixtures.iter().map(|seed| seed.game_id.clone())); for seed in fixtures {
catalog.insert(
seed.game_id.clone(),
Some(DEFAULT_FIXTURE_VERSION.to_string()),
);
}
catalog catalog
} }
+14 -10
View File
@@ -1,18 +1,17 @@
//! Shared context types for the peer system. //! Shared context types for the peer system.
use std::{ use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc};
collections::{HashMap, HashSet},
net::SocketAddr,
path::PathBuf,
sync::Arc,
};
use lanspread_db::db::GameDB; use lanspread_db::db::{GameCatalog, GameDB};
use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio::sync::{RwLock, mpsc::UnboundedSender};
use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tokio_util::{sync::CancellationToken, task::TaskTracker};
use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB}; use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB};
/// Thread-safe map of active outbound file transfers grouped by game ID.
pub type OutboundTransfers = Arc<RwLock<HashMap<String, Vec<(u64, CancellationToken)>>>>;
/// Mutating filesystem operation currently in flight for a game root. /// Mutating filesystem operation currently in flight for a game root.
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OperationKind { pub enum OperationKind {
@@ -40,10 +39,11 @@ pub struct Ctx {
pub active_operations: Arc<RwLock<HashMap<String, OperationKind>>>, pub active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
pub active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>, pub active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
pub unpacker: Arc<dyn Unpacker>, pub unpacker: Arc<dyn Unpacker>,
pub catalog: Arc<RwLock<HashSet<String>>>, pub catalog: Arc<RwLock<GameCatalog>>,
pub peer_id: Arc<String>, pub peer_id: Arc<String>,
pub shutdown: CancellationToken, pub shutdown: CancellationToken,
pub task_tracker: TaskTracker, pub task_tracker: TaskTracker,
pub active_outbound_transfers: OutboundTransfers,
} }
/// Context for peer connection handling. /// Context for peer connection handling.
@@ -55,11 +55,12 @@ pub struct PeerCtx {
pub local_peer_addr: Arc<RwLock<Option<SocketAddr>>>, pub local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
pub active_operations: Arc<RwLock<HashMap<String, OperationKind>>>, pub active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
pub peer_game_db: Arc<RwLock<PeerGameDB>>, pub peer_game_db: Arc<RwLock<PeerGameDB>>,
pub catalog: Arc<RwLock<HashSet<String>>>, pub catalog: Arc<RwLock<GameCatalog>>,
pub peer_id: Arc<String>, pub peer_id: Arc<String>,
pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender<PeerEvent>, pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender<PeerEvent>,
pub shutdown: CancellationToken, pub shutdown: CancellationToken,
pub task_tracker: TaskTracker, pub task_tracker: TaskTracker,
pub active_outbound_transfers: OutboundTransfers,
} }
impl std::fmt::Debug for PeerCtx { impl std::fmt::Debug for PeerCtx {
@@ -84,7 +85,8 @@ impl Ctx {
unpacker: Arc<dyn Unpacker>, unpacker: Arc<dyn Unpacker>,
shutdown: CancellationToken, shutdown: CancellationToken,
task_tracker: TaskTracker, task_tracker: TaskTracker,
catalog: Arc<RwLock<HashSet<String>>>, catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: OutboundTransfers,
) -> Self { ) -> Self {
Self { Self {
game_dir: Arc::new(RwLock::new(game_dir)), game_dir: Arc::new(RwLock::new(game_dir)),
@@ -100,6 +102,7 @@ impl Ctx {
peer_id: Arc::new(peer_id), peer_id: Arc::new(peer_id),
shutdown, shutdown,
task_tracker, task_tracker,
active_outbound_transfers,
} }
} }
@@ -120,6 +123,7 @@ impl Ctx {
tx_notify_ui, tx_notify_ui,
shutdown: self.shutdown.clone(), shutdown: self.shutdown.clone(),
task_tracker: self.task_tracker.clone(), task_tracker: self.task_tracker.clone(),
active_outbound_transfers: self.active_outbound_transfers.clone(),
} }
} }
} }
+6 -1
View File
@@ -2,6 +2,7 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use lanspread_db::db::GameCatalog;
use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio::sync::{RwLock, mpsc::UnboundedSender};
use crate::{ use crate::{
@@ -65,9 +66,13 @@ fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind {
pub async fn emit_peer_game_list( pub async fn emit_peer_game_list(
peer_game_db: &Arc<RwLock<PeerGameDB>>, peer_game_db: &Arc<RwLock<PeerGameDB>>,
catalog: &Arc<RwLock<GameCatalog>>,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
) { ) {
let games = { peer_game_db.read().await.get_all_games() }; let games = {
let catalog = catalog.read().await;
peer_game_db.read().await.get_catalog_games(&catalog)
};
send(tx_notify_ui, PeerEvent::ListGames(games)); send(tx_notify_ui, PeerEvent::ListGames(games));
} }
+255 -40
View File
@@ -6,6 +6,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
time::Duration,
}; };
use lanspread_db::db::{GameDB, GameFileDescription}; use lanspread_db::db::{GameDB, GameFileDescription};
@@ -23,7 +24,7 @@ use crate::{
game_from_summary, game_from_summary,
get_game_file_descriptions, get_game_file_descriptions,
local_dir_is_directory, local_dir_is_directory,
local_download_available, local_download_matches_catalog,
rescan_local_game, rescan_local_game,
scan_local_library, scan_local_library,
version_ini_is_regular_file, version_ini_is_regular_file,
@@ -38,10 +39,13 @@ use crate::{
// Command handlers // Command handlers
// ============================================================================= // =============================================================================
const OUTBOUND_TRANSFER_DRAIN_POLL_INTERVAL: Duration = Duration::from_millis(10);
const OUTBOUND_TRANSFER_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
/// Handles the `ListGames` command. /// Handles the `ListGames` command.
pub async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) { pub async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
log::info!("ListGames command received"); log::info!("ListGames command received");
events::emit_peer_game_list(&ctx.peer_game_db, tx_notify_ui).await; events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, tx_notify_ui).await;
} }
/// Tries to serve a game from local files. /// Tries to serve a game from local files.
@@ -54,7 +58,7 @@ async fn try_serve_local_game(
let active_operations = ctx.active_operations.read().await; let active_operations = ctx.active_operations.read().await;
let catalog = ctx.catalog.read().await; let catalog = ctx.catalog.read().await;
if !local_download_available(&game_dir, id, &active_operations, &catalog).await { if !local_download_matches_catalog(&game_dir, id, &active_operations, &catalog).await {
return false; return false;
} }
drop(active_operations); drop(active_operations);
@@ -90,9 +94,10 @@ pub(crate) async fn handle_get_game_command(
} }
log::info!("Requesting game from peers: {id}"); log::info!("Requesting game from peers: {id}");
let expected_version = catalog_expected_version(ctx, &id).await;
let peers = { let peers = {
let peer_game_db = ctx.peer_game_db.read().await; let peer_game_db = ctx.peer_game_db.read().await;
source.select_peers(&peer_game_db, &id) source.select_peers(&peer_game_db, &id, expected_version.as_deref())
}; };
if peers.is_empty() { if peers.is_empty() {
log::warn!("No peers have game {id}"); log::warn!("No peers have game {id}");
@@ -107,6 +112,7 @@ pub(crate) async fn handle_get_game_command(
ctx.task_tracker.spawn(fetch_game_details_from_peers( ctx.task_tracker.spawn(fetch_game_details_from_peers(
peers, peers,
id, id,
expected_version,
peer_game_db, peer_game_db,
tx_notify_ui, tx_notify_ui,
|peer_addr, game_id, peer_game_db| async move { |peer_addr, game_id, peer_game_db| async move {
@@ -126,10 +132,16 @@ impl GameDetailSource {
matches!(self, Self::LocalOrPeers) matches!(self, Self::LocalOrPeers)
} }
fn select_peers(self, peer_game_db: &PeerGameDB, id: &str) -> Vec<SocketAddr> { fn select_peers(
self,
peer_game_db: &PeerGameDB,
id: &str,
expected_version: Option<&str>,
) -> Vec<SocketAddr> {
match self { match self {
Self::LocalOrPeers => peer_game_db.peers_with_game(id), Self::LocalOrPeers | Self::LatestPeersOnly => {
Self::LatestPeersOnly => peer_game_db.peers_with_latest_version(id), peer_game_db.peers_with_expected_version(id, expected_version)
}
} }
} }
} }
@@ -154,6 +166,7 @@ async fn request_game_details_and_update(
async fn fetch_game_details_from_peers<F, Fut>( async fn fetch_game_details_from_peers<F, Fut>(
peers: Vec<SocketAddr>, peers: Vec<SocketAddr>,
id: String, id: String,
expected_version: Option<String>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
tx_notify_ui: UnboundedSender<PeerEvent>, tx_notify_ui: UnboundedSender<PeerEvent>,
mut fetch_details: F, mut fetch_details: F,
@@ -175,7 +188,12 @@ async fn fetch_game_details_from_peers<F, Fut>(
} }
if fetched_any { if fetched_any {
let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) }; let aggregated_files = {
peer_game_db
.read()
.await
.aggregated_game_files(&id, expected_version.as_deref())
};
if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles { if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles {
id: id.clone(), id: id.clone(),
@@ -210,6 +228,7 @@ pub async fn handle_download_game_files_command(
} }
let games_folder = { ctx.game_dir.read().await.clone() }; let games_folder = { ctx.game_dir.read().await.clone() };
let expected_version = catalog_expected_version(ctx, &id).await;
// Use majority validation to get trusted file descriptions and peer whitelist // Use majority validation to get trusted file descriptions and peer whitelist
let (validated_descriptions, peer_whitelist, file_peer_map) = { let (validated_descriptions, peer_whitelist, file_peer_map) = {
@@ -217,7 +236,7 @@ pub async fn handle_download_game_files_command(
.peer_game_db .peer_game_db
.read() .read()
.await .await
.validate_file_sizes_majority(&id) .validate_file_sizes_majority(&id, expected_version.as_deref())
{ {
Ok((files, peers, file_peer_map)) => { Ok((files, peers, file_peer_map)) => {
log::info!( log::info!(
@@ -260,7 +279,7 @@ pub async fn handle_download_game_files_command(
let local_dl_available = { let local_dl_available = {
let active_operations = ctx.active_operations.read().await; let active_operations = ctx.active_operations.read().await;
let catalog = ctx.catalog.read().await; let catalog = ctx.catalog.read().await;
local_download_available(&games_folder, &id, &active_operations, &catalog).await local_download_matches_catalog(&games_folder, &id, &active_operations, &catalog).await
}; };
if peer_whitelist.is_empty() { if peer_whitelist.is_empty() {
@@ -289,10 +308,18 @@ pub async fn handle_download_game_files_command(
return; return;
} }
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await { match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring new download request"); log::warn!("Operation for {id} already in progress; ignoring new download request");
return; return;
} }
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before downloading {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
}
let active_operations = ctx.active_operations.clone(); let active_operations = ctx.active_operations.clone();
let active_downloads = ctx.active_downloads.clone(); let active_downloads = ctx.active_downloads.clone();
@@ -476,10 +503,21 @@ async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
return; return;
}; };
if !begin_operation(ctx, tx_notify_ui, &id, prepared.operation_kind).await { match begin_operation(ctx, tx_notify_ui, &id, prepared.operation_kind).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring install command"); log::warn!("Operation for {id} already in progress; ignoring install command");
return; return;
} }
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before install/update of {id}");
events::send(
tx_notify_ui,
PeerEvent::InstallGameFailed { id: id.clone() },
);
return;
}
}
run_started_install_operation(ctx, tx_notify_ui, id, prepared).await; run_started_install_operation(ctx, tx_notify_ui, id, prepared).await;
} }
@@ -601,10 +639,21 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerE
return; return;
} }
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Uninstalling).await { match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Uninstalling).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring uninstall command"); log::warn!("Operation for {id} already in progress; ignoring uninstall command");
return; return;
} }
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before uninstall of {id}");
events::send(
tx_notify_ui,
PeerEvent::UninstallGameFailed { id: id.clone() },
);
return;
}
}
let game_root = { ctx.game_dir.read().await.join(&id) }; let game_root = { ctx.game_dir.read().await.join(&id) };
let operation_guard = OperationGuard::new( let operation_guard = OperationGuard::new(
@@ -663,10 +712,21 @@ async fn run_remove_downloaded_operation(
return; return;
} }
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::RemovingDownload).await { match begin_operation(ctx, tx_notify_ui, &id, OperationKind::RemovingDownload).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring downloaded-file removal"); log::warn!("Operation for {id} already in progress; ignoring downloaded-file removal");
return; return;
} }
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before removal of {id}");
events::send(
tx_notify_ui,
PeerEvent::RemoveDownloadedGameFailed { id: id.clone() },
);
return;
}
}
let game_dir = { ctx.game_dir.read().await.clone() }; let game_dir = { ctx.game_dir.read().await.clone() };
let operation_guard = OperationGuard::new( let operation_guard = OperationGuard::new(
@@ -715,12 +775,36 @@ async fn run_remove_downloaded_operation(
} }
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum BeginOperationResult {
Started,
AlreadyActive,
DrainTimedOut,
}
async fn begin_operation( async fn begin_operation(
ctx: &Ctx, ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str, id: &str,
operation: OperationKind, operation: OperationKind,
) -> bool { ) -> BeginOperationResult {
begin_operation_with_drain_timeout(
ctx,
tx_notify_ui,
id,
operation,
OUTBOUND_TRANSFER_DRAIN_TIMEOUT,
)
.await
}
async fn begin_operation_with_drain_timeout(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
operation: OperationKind,
drain_timeout: Duration,
) -> BeginOperationResult {
let started = { let started = {
let mut active_operations = ctx.active_operations.write().await; let mut active_operations = ctx.active_operations.write().await;
match active_operations.entry(id.to_string()) { match active_operations.entry(id.to_string()) {
@@ -732,11 +816,70 @@ async fn begin_operation(
} }
}; };
if started { if !started {
events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await; return BeginOperationResult::AlreadyActive;
} }
started events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await;
if operation_requires_outbound_drain(operation)
&& !cancel_and_wait_for_outbound_transfers(ctx, id, drain_timeout).await
{
end_operation(ctx, tx_notify_ui, id).await;
return BeginOperationResult::DrainTimedOut;
}
BeginOperationResult::Started
}
fn operation_requires_outbound_drain(operation: OperationKind) -> bool {
operation == OperationKind::Updating || operation == OperationKind::RemovingDownload
}
async fn cancel_and_wait_for_outbound_transfers(
ctx: &Ctx,
id: &str,
drain_timeout: Duration,
) -> bool {
let mut tokens_to_cancel = Vec::new();
{
let active = ctx.active_outbound_transfers.read().await;
if let Some(transfers) = active.get(id) {
for (_, token) in transfers {
tokens_to_cancel.push(token.clone());
}
}
}
for token in tokens_to_cancel {
token.cancel();
}
let drained = tokio::time::timeout(drain_timeout, async {
loop {
let count = {
let active = ctx.active_outbound_transfers.read().await;
active.get(id).map_or(0, Vec::len)
};
if count == 0 {
break;
}
tokio::time::sleep(OUTBOUND_TRANSFER_DRAIN_POLL_INTERVAL).await;
}
})
.await
.is_ok();
if !drained {
let count = {
let active = ctx.active_outbound_transfers.read().await;
active.get(id).map_or(0, Vec::len)
};
log::error!(
"Timed out after {drain_timeout:?} waiting for {count} outbound transfer(s) to drain for {id}"
);
}
drained
} }
async fn transition_download_to_install( async fn transition_download_to_install(
@@ -818,6 +961,14 @@ async fn catalog_contains(ctx: &Ctx, id: &str) -> bool {
ctx.catalog.read().await.contains(id) ctx.catalog.read().await.contains(id)
} }
async fn catalog_expected_version(ctx: &Ctx, id: &str) -> Option<String> {
ctx.catalog
.read()
.await
.expected_version(id)
.map(ToOwned::to_owned)
}
/// Handles the `SetGameDir` command. /// Handles the `SetGameDir` command.
pub async fn handle_set_game_dir_command( pub async fn handle_set_game_dir_command(
ctx: &Ctx, ctx: &Ctx,
@@ -1008,14 +1159,9 @@ async fn update_and_announce_games_with_policy(
active_operation_ids.remove(id); active_operation_ids.remove(id);
} }
if !active_operation_ids.is_empty() { if !active_operation_ids.is_empty() {
let previous = ctx.local_library.read().await.games.clone();
for id in &active_operation_ids { for id in &active_operation_ids {
if let Some(summary) = previous.get(id.as_str()) {
summaries.insert(id.clone(), summary.clone());
} else {
summaries.remove(id); summaries.remove(id);
} }
}
game_db = GameDB::from(summaries.values().map(game_from_summary).collect()); game_db = GameDB::from(summaries.values().map(game_from_summary).collect());
} }
@@ -1068,13 +1214,14 @@ async fn update_and_announce_games_with_policy(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
collections::HashSet, collections::HashMap,
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Duration, time::Duration,
}; };
use lanspread_db::db::GameCatalog;
use lanspread_proto::{Availability, GameSummary}; use lanspread_proto::{Availability, GameSummary};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -1115,7 +1262,8 @@ mod tests {
Arc::new(FakeUnpacker), Arc::new(FakeUnpacker),
CancellationToken::new(), CancellationToken::new(),
TaskTracker::new(), TaskTracker::new(),
Arc::new(RwLock::new(HashSet::from(["game".to_string()]))), Arc::new(RwLock::new(GameCatalog::from_ids(["game".to_string()]))),
Arc::new(RwLock::new(HashMap::new())),
) )
} }
@@ -1220,7 +1368,7 @@ mod tests {
} }
#[test] #[test]
fn update_source_selects_latest_ready_peer_manifest() { fn update_source_selects_expected_ready_peer_manifest() {
let old_addr = addr(12_000); let old_addr = addr(12_000);
let new_addr = addr(12_001); let new_addr = addr(12_001);
let local_only_addr = addr(12_002); let local_only_addr = addr(12_002);
@@ -1242,13 +1390,13 @@ mod tests {
); );
assert_eq!( assert_eq!(
GameDetailSource::LatestPeersOnly.select_peers(&db, "game"), GameDetailSource::LatestPeersOnly.select_peers(&db, "game", Some("20250101")),
vec![new_addr] vec![new_addr]
); );
} }
#[tokio::test] #[tokio::test]
async fn update_fetch_emits_fresh_manifest_from_latest_peer() { async fn update_fetch_emits_fresh_manifest_from_expected_peer() {
let old_addr = addr(12_010); let old_addr = addr(12_010);
let new_addr = addr(12_011); let new_addr = addr(12_011);
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
@@ -1267,12 +1415,18 @@ mod tests {
} }
let peers = { let peers = {
let db = peer_game_db.read().await; let db = peer_game_db.read().await;
GameDetailSource::LatestPeersOnly.select_peers(&db, "game") GameDetailSource::LatestPeersOnly.select_peers(&db, "game", Some("20250101"))
}; };
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
let fetched_peers = Arc::new(Mutex::new(Vec::new())); let fetched_peers = Arc::new(Mutex::new(Vec::new()));
fetch_game_details_from_peers(peers, "game".to_string(), peer_game_db.clone(), tx, { fetch_game_details_from_peers(
peers,
"game".to_string(),
Some("20250101".to_string()),
peer_game_db.clone(),
tx,
{
let fetched_peers = fetched_peers.clone(); let fetched_peers = fetched_peers.clone();
move |peer_addr, game_id, peer_game_db| { move |peer_addr, game_id, peer_game_db| {
let fetched_peers = fetched_peers.clone(); let fetched_peers = fetched_peers.clone();
@@ -1293,7 +1447,8 @@ mod tests {
Ok(files) Ok(files)
} }
} }
}) },
)
.await; .await;
assert_eq!( assert_eq!(
@@ -1314,7 +1469,7 @@ mod tests {
file_descriptions file_descriptions
.iter() .iter()
.any(|desc| desc.relative_path == "game/new.eti" && desc.size == 11), .any(|desc| desc.relative_path == "game/new.eti" && desc.size == 11),
"latest peer manifest should be emitted to the download path" "expected-version peer manifest should be emitted to the download path"
); );
} }
@@ -1329,6 +1484,7 @@ mod tests {
fetch_game_details_from_peers( fetch_game_details_from_peers(
vec![first_addr, second_addr], vec![first_addr, second_addr],
"game".to_string(), "game".to_string(),
Some("20250101".to_string()),
peer_game_db, peer_game_db,
tx.clone(), tx.clone(),
{ {
@@ -1362,7 +1518,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn update_request_skips_local_manifest_even_when_download_exists() { async fn update_request_skips_local_manifest_even_when_download_exists() {
let temp = TempDir::new("lanspread-handler-latest-peer"); let temp = TempDir::new("lanspread-handler-expected-peer");
let root = temp.game_root(); let root = temp.game_root();
write_file(&root.join("version.ini"), b"20240101"); write_file(&root.join("version.ini"), b"20240101");
write_file(&root.join("game.eti"), b"old archive"); write_file(&root.join("game.eti"), b"old archive");
@@ -1385,23 +1541,37 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn local_library_scan_freezes_active_game_state() { async fn local_library_scan_hides_active_game_state() {
let temp = TempDir::new("lanspread-handler-active-freeze"); let temp = TempDir::new("lanspread-handler-active-hide");
let root = temp.game_root(); let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101"); write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive"); write_file(&root.join("game.eti"), b"archive");
let ctx = test_ctx(temp.path().to_path_buf()); let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
let catalog = ctx.catalog.read().await.clone();
// 1. Initial scan: the game is ready and announced
let scan = scan_local_library(temp.path(), ctx.state_dir.as_ref(), &catalog)
.await
.expect("scan should succeed");
update_and_announce_games(&ctx, &tx, scan).await;
let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else {
panic!("expected LocalLibraryChanged");
};
assert_eq!(games.len(), 1);
assert_eq!(games[0].id, "game");
// 2. Set the game as active/in-progress and scan again
ctx.active_operations ctx.active_operations
.write() .write()
.await .await
.insert("game".to_string(), OperationKind::Installing); .insert("game".to_string(), OperationKind::Installing);
let (tx, mut rx) = mpsc::unbounded_channel();
let catalog = ctx.catalog.read().await.clone();
let scan = scan_local_library(temp.path(), ctx.state_dir.as_ref(), &catalog) let scan = scan_local_library(temp.path(), ctx.state_dir.as_ref(), &catalog)
.await .await
.expect("scan should succeed"); .expect("scan should succeed");
update_and_announce_games(&ctx, &tx, scan).await; update_and_announce_games(&ctx, &tx, scan).await;
let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else { let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else {
@@ -1409,7 +1579,7 @@ mod tests {
}; };
assert!( assert!(
games.is_empty(), games.is_empty(),
"active game should keep its previous announced state" "active game should be hidden/unannounced during operations"
); );
} }
@@ -1423,7 +1593,10 @@ mod tests {
let ctx = test_ctx(temp.path().to_path_buf()); let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
assert!(begin_operation(&ctx, &tx, "game", OperationKind::Updating).await); assert_eq!(
begin_operation(&ctx, &tx, "game", OperationKind::Updating).await,
BeginOperationResult::Started
);
assert_active_update( assert_active_update(
recv_event(&mut rx).await, recv_event(&mut rx).await,
vec![ActiveOperation { vec![ActiveOperation {
@@ -1433,6 +1606,48 @@ mod tests {
); );
} }
#[tokio::test]
async fn begin_operation_timeout_clears_active_operation_snapshot() {
let temp = TempDir::new("lanspread-handler-active-drain-timeout");
let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive");
let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
let token = CancellationToken::new();
ctx.active_outbound_transfers
.write()
.await
.insert("game".to_string(), vec![(1, token.clone())]);
assert_eq!(
begin_operation_with_drain_timeout(
&ctx,
&tx,
"game",
OperationKind::Updating,
Duration::from_millis(1),
)
.await,
BeginOperationResult::DrainTimedOut
);
assert!(token.is_cancelled());
assert_active_update(
recv_event(&mut rx).await,
vec![ActiveOperation {
id: "game".to_string(),
operation: ActiveOperationKind::Updating,
}],
);
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(
!ctx.active_operations.read().await.contains_key("game"),
"timed-out drain should not leave the operation stuck active"
);
}
#[tokio::test] #[tokio::test]
async fn unchanged_settled_scan_is_not_reemitted() { async fn unchanged_settled_scan_is_not_reemitted() {
let temp = TempDir::new("lanspread-handler-settled-unchanged"); let temp = TempDir::new("lanspread-handler-settled-unchanged");
+17 -6
View File
@@ -39,12 +39,12 @@ mod test_support;
// Public re-exports // Public re-exports
// ============================================================================= // =============================================================================
use std::{collections::HashSet, net::SocketAddr, path::PathBuf, sync::Arc}; use std::{net::SocketAddr, path::PathBuf, sync::Arc};
pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT}; pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT};
pub use error::PeerError; pub use error::PeerError;
pub use install::{UnpackFuture, Unpacker}; pub use install::{UnpackFuture, Unpacker};
use lanspread_db::db::{Game, GameFileDescription}; use lanspread_db::db::{Game, GameCatalog, GameFileDescription};
pub use migration::{MigrationReport, migrate_legacy_state}; pub use migration::{MigrationReport, migrate_legacy_state};
pub use peer_db::{ pub use peer_db::{
MajorityValidationResult, MajorityValidationResult,
@@ -153,6 +153,8 @@ pub enum PeerEvent {
PeerCountUpdated(usize), PeerCountUpdated(usize),
/// The local library contents changed after a scan. /// The local library contents changed after a scan.
LocalLibraryChanged { games: Vec<Game> }, LocalLibraryChanged { games: Vec<Game> },
/// The number of active outbound transfers changed.
OutboundTransferCountChanged,
/// The set of in-progress local operations changed. /// The set of in-progress local operations changed.
ActiveOperationsChanged { ActiveOperationsChanged {
active_operations: Vec<ActiveOperation>, active_operations: Vec<ActiveOperation>,
@@ -262,6 +264,7 @@ pub enum PeerCommand {
pub struct PeerStartOptions { pub struct PeerStartOptions {
/// Directory used for peer identity and other state. /// Directory used for peer identity and other state.
pub state_dir: Option<PathBuf>, pub state_dir: Option<PathBuf>,
pub active_outbound_transfers: Option<crate::context::OutboundTransfers>,
} }
// ============================================================================= // =============================================================================
@@ -286,7 +289,7 @@ pub fn start_peer(
tx_notify_ui: UnboundedSender<PeerEvent>, tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
unpacker: Arc<dyn Unpacker>, unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<HashSet<String>>>, catalog: Arc<RwLock<GameCatalog>>,
) -> eyre::Result<PeerRuntimeHandle> { ) -> eyre::Result<PeerRuntimeHandle> {
start_peer_with_options( start_peer_with_options(
game_dir, game_dir,
@@ -305,12 +308,17 @@ pub fn start_peer_with_options(
tx_notify_ui: UnboundedSender<PeerEvent>, tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
unpacker: Arc<dyn Unpacker>, unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<HashSet<String>>>, catalog: Arc<RwLock<GameCatalog>>,
options: PeerStartOptions, options: PeerStartOptions,
) -> eyre::Result<PeerRuntimeHandle> { ) -> eyre::Result<PeerRuntimeHandle> {
let PeerStartOptions { state_dir } = options; let PeerStartOptions {
state_dir,
active_outbound_transfers,
} = options;
let state_dir = resolve_state_dir(state_dir.as_deref()); let state_dir = resolve_state_dir(state_dir.as_deref());
let game_dir = game_dir.into(); let game_dir = game_dir.into();
let active_outbound_transfers = active_outbound_transfers
.unwrap_or_else(|| Arc::new(RwLock::new(std::collections::HashMap::new())));
log::info!( log::info!(
"Starting peer system with game directory: {}", "Starting peer system with game directory: {}",
game_dir.display() game_dir.display()
@@ -329,6 +337,7 @@ pub fn start_peer_with_options(
state_dir, state_dir,
unpacker, unpacker,
catalog, catalog,
active_outbound_transfers,
)) ))
} }
@@ -344,7 +353,8 @@ async fn run_peer(
unpacker: Arc<dyn Unpacker>, unpacker: Arc<dyn Unpacker>,
shutdown: CancellationToken, shutdown: CancellationToken,
task_tracker: TaskTracker, task_tracker: TaskTracker,
catalog: Arc<RwLock<HashSet<String>>>, catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: crate::context::OutboundTransfers,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let ctx = Ctx::new( let ctx = Ctx::new(
peer_game_db, peer_game_db,
@@ -355,6 +365,7 @@ async fn run_peer(
shutdown, shutdown,
task_tracker, task_tracker,
catalog, catalog,
active_outbound_transfers,
); );
if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await { if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await {
log::error!("Failed to load initial local game database: {err}"); log::error!("Failed to load initial local game database: {err}");
+66 -14
View File
@@ -9,7 +9,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
use lanspread_db::db::{Game, GameDB, GameFileDescription}; use lanspread_db::db::{Game, GameCatalog, GameDB, GameFileDescription};
use lanspread_proto::{Availability, GameSummary}; use lanspread_proto::{Availability, GameSummary};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, sync::Mutex}; use tokio::{io::AsyncWriteExt, sync::Mutex};
@@ -51,7 +51,7 @@ pub async fn local_download_available(
game_dir: &Path, game_dir: &Path,
game_id: &str, game_id: &str,
active_operations: &HashMap<String, OperationKind>, active_operations: &HashMap<String, OperationKind>,
catalog: &HashSet<String>, catalog: &GameCatalog,
) -> bool { ) -> bool {
if !catalog.contains(game_id) { if !catalog.contains(game_id) {
log::debug!("Not serving game {game_id} locally because it is not in the catalog"); log::debug!("Not serving game {game_id} locally because it is not in the catalog");
@@ -67,6 +67,40 @@ pub async fn local_download_available(
version_ini_is_regular_file(game_path.as_path()).await version_ini_is_regular_file(game_path.as_path()).await
} }
/// Checks if a local game may be served to peers under the authoritative catalog version.
pub async fn local_download_matches_catalog(
game_dir: &Path,
game_id: &str,
active_operations: &HashMap<String, OperationKind>,
catalog: &GameCatalog,
) -> bool {
if !local_download_available(game_dir, game_id, active_operations, catalog).await {
return false;
}
let Some(expected_version) = catalog.expected_version(game_id) else {
return true;
};
let game_path = game_dir.join(game_id);
match lanspread_db::db::read_version_from_ini(&game_path) {
Ok(Some(local_version)) if local_version == expected_version => true,
Ok(Some(local_version)) => {
log::debug!(
"Not serving game {game_id}: local version.ini {local_version} does not match catalog {expected_version}"
);
false
}
Ok(None) => false,
Err(err) => {
log::warn!(
"Not serving game {game_id}: failed to read local version.ini for catalog comparison: {err}"
);
false
}
}
}
// ============================================================================= // =============================================================================
// Local library index and scanning // Local library index and scanning
// ============================================================================= // =============================================================================
@@ -468,7 +502,7 @@ struct IndexUpdate {
async fn update_index_for_game( async fn update_index_for_game(
game_root: &Path, game_root: &Path,
game_id: &str, game_id: &str,
catalog: &HashSet<String>, catalog: &GameCatalog,
index: &mut LibraryIndex, index: &mut LibraryIndex,
) -> eyre::Result<IndexUpdate> { ) -> eyre::Result<IndexUpdate> {
if !catalog.contains(game_id) { if !catalog.contains(game_id) {
@@ -557,7 +591,7 @@ fn scan_from_index(index: &LibraryIndex) -> LocalLibraryScan {
pub async fn scan_local_library( pub async fn scan_local_library(
game_dir: impl AsRef<Path>, game_dir: impl AsRef<Path>,
state_dir: impl AsRef<Path>, state_dir: impl AsRef<Path>,
catalog: &HashSet<String>, catalog: &GameCatalog,
) -> eyre::Result<LocalLibraryScan> { ) -> eyre::Result<LocalLibraryScan> {
let game_path = game_dir.as_ref(); let game_path = game_dir.as_ref();
let state_path = state_dir.as_ref(); let state_path = state_dir.as_ref();
@@ -645,7 +679,7 @@ pub async fn scan_local_library(
pub async fn rescan_local_game( pub async fn rescan_local_game(
game_dir: impl AsRef<Path>, game_dir: impl AsRef<Path>,
state_dir: impl AsRef<Path>, state_dir: impl AsRef<Path>,
catalog: &HashSet<String>, catalog: &GameCatalog,
game_id: &str, game_id: &str,
) -> eyre::Result<LocalLibraryScan> { ) -> eyre::Result<LocalLibraryScan> {
let game_path = game_dir.as_ref(); let game_path = game_dir.as_ref();
@@ -682,10 +716,7 @@ pub async fn get_game_file_descriptions(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{collections::HashMap, path::Path};
collections::{HashMap, HashSet},
path::Path,
};
use lanspread_proto::Availability; use lanspread_proto::Availability;
@@ -776,7 +807,7 @@ mod tests {
async fn scan_uses_version_ini_and_local_dir_as_independent_state() { async fn scan_uses_version_ini_and_local_dir_as_independent_state() {
let temp = TempDir::new("lanspread-local-games"); let temp = TempDir::new("lanspread-local-games");
let state = TempDir::new("lanspread-local-games-state"); let state = TempDir::new("lanspread-local-games-state");
let catalog = HashSet::from([ let catalog = GameCatalog::from_ids([
"ready".to_string(), "ready".to_string(),
"local-only".to_string(), "local-only".to_string(),
"eti-only".to_string(), "eti-only".to_string(),
@@ -830,7 +861,7 @@ mod tests {
async fn rescan_promotes_installed_only_game_to_ready_when_sentinel_appears() { async fn rescan_promotes_installed_only_game_to_ready_when_sentinel_appears() {
let temp = TempDir::new("lanspread-local-games"); let temp = TempDir::new("lanspread-local-games");
let state = TempDir::new("lanspread-local-games-state"); let state = TempDir::new("lanspread-local-games-state");
let catalog = HashSet::from(["game".to_string()]); let catalog = GameCatalog::from_ids(["game".to_string()]);
std::fs::create_dir_all(temp.path().join("game").join("local")) std::fs::create_dir_all(temp.path().join("game").join("local"))
.expect("local install dir should be created"); .expect("local install dir should be created");
@@ -864,7 +895,7 @@ mod tests {
async fn concurrent_rescans_preserve_both_index_updates() { async fn concurrent_rescans_preserve_both_index_updates() {
let temp = TempDir::new("lanspread-local-games-concurrent"); let temp = TempDir::new("lanspread-local-games-concurrent");
let state = TempDir::new("lanspread-local-games-state"); let state = TempDir::new("lanspread-local-games-state");
let catalog = HashSet::from(["game-a".to_string(), "game-b".to_string()]); let catalog = GameCatalog::from_ids(["game-a".to_string(), "game-b".to_string()]);
write_file(&temp.path().join("game-a").join("version.ini"), b"20250101"); write_file(&temp.path().join("game-a").join("version.ini"), b"20250101");
write_file(&temp.path().join("game-b").join("version.ini"), b"20250101"); write_file(&temp.path().join("game-b").join("version.ini"), b"20250101");
@@ -909,7 +940,7 @@ mod tests {
let game_root = temp.path().join("game"); let game_root = temp.path().join("game");
write_file(&game_root.join("version.ini"), b"20250101"); write_file(&game_root.join("version.ini"), b"20250101");
let catalog = HashSet::from(["game".to_string()]); let catalog = GameCatalog::from_ids(["game".to_string()]);
let no_operations = HashMap::new(); let no_operations = HashMap::new();
assert!(local_download_available(temp.path(), "game", &no_operations, &catalog).await); assert!(local_download_available(temp.path(), "game", &no_operations, &catalog).await);
@@ -917,8 +948,29 @@ mod tests {
assert!(!local_download_available(temp.path(), "game", &active_operations, &catalog).await); assert!(!local_download_available(temp.path(), "game", &active_operations, &catalog).await);
assert!( assert!(
!local_download_available(temp.path(), "game", &no_operations, &HashSet::new()).await !local_download_available(temp.path(), "game", &no_operations, &GameCatalog::empty())
.await
); );
assert!(!local_download_available(temp.path(), "missing", &no_operations, &catalog).await); assert!(!local_download_available(temp.path(), "missing", &no_operations, &catalog).await);
} }
#[tokio::test]
async fn local_download_matches_catalog_requires_expected_version() {
let temp = TempDir::new("lanspread-local-games");
let game_root = temp.path().join("game");
write_file(&game_root.join("version.ini"), b"20260101");
let mut catalog = GameCatalog::empty();
catalog.insert("game".to_string(), Some("20250101".to_string()));
let no_operations = HashMap::new();
assert!(
!local_download_matches_catalog(temp.path(), "game", &no_operations, &catalog).await
);
catalog.insert("game".to_string(), Some("20260101".to_string()));
assert!(
local_download_matches_catalog(temp.path(), "game", &no_operations, &catalog).await
);
}
} }
+80 -5
View File
@@ -4,6 +4,7 @@ use bytes::Bytes;
use lanspread_db::db::GameFileDescription; use lanspread_db::db::GameFileDescription;
use lanspread_utils::maybe_addr; use lanspread_utils::maybe_addr;
use s2n_quic::{ use s2n_quic::{
application,
connection, connection,
stream::{Error as StreamError, SendStream}, stream::{Error as StreamError, SendStream},
}; };
@@ -14,12 +15,24 @@ use tokio::{
use crate::{config::FILE_TRANSFER_BUFFER_SIZE, path_validation::validate_game_file_path}; use crate::{config::FILE_TRANSFER_BUFFER_SIZE, path_validation::validate_game_file_path};
fn cancel_send_stream(tx: &mut SendStream, remote_addr: impl std::fmt::Display, path: &Path) {
// Reset instead of finishing so truncated whole-file transfers cannot look like EOF.
if let Err(err) = tx.reset(application::Error::UNKNOWN) {
log::debug!(
"{remote_addr} failed to reset cancelled transfer for {}: {err}",
path.display()
);
}
}
#[allow(clippy::too_many_lines)]
async fn stream_file_bytes( async fn stream_file_bytes(
tx: &mut SendStream, tx: &mut SendStream,
base_dir: &Path, base_dir: &Path,
relative_path: &str, relative_path: &str,
offset: u64, offset: u64,
length: Option<u64>, length: Option<u64>,
cancel_token: tokio_util::sync::CancellationToken,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let remote_addr = maybe_addr!(tx.connection().remote_addr()); let remote_addr = maybe_addr!(tx.connection().remote_addr());
@@ -45,13 +58,34 @@ async fn stream_file_bytes(
let mut buf = vec![0u8; FILE_TRANSFER_BUFFER_SIZE]; let mut buf = vec![0u8; FILE_TRANSFER_BUFFER_SIZE];
while remaining > 0 { while remaining > 0 {
if cancel_token.is_cancelled() {
log::info!(
"{remote_addr} transfer cancelled for {}",
validated_path.display()
);
cancel_send_stream(tx, remote_addr, &validated_path);
return Err(eyre::eyre!("File transfer cancelled by user"));
}
let read_len = std::cmp::min(remaining, buf.len() as u64); let read_len = std::cmp::min(remaining, buf.len() as u64);
let read_len: usize = read_len.try_into().unwrap_or(usize::MAX); let read_len: usize = read_len.try_into().unwrap_or(usize::MAX);
if read_len == 0 { if read_len == 0 {
break; break;
} }
let bytes_read = file.read(&mut buf[..read_len]).await?; let bytes_read = tokio::select! {
() = cancel_token.cancelled() => {
log::info!(
"{remote_addr} transfer cancelled for {}",
validated_path.display()
);
cancel_send_stream(tx, remote_addr, &validated_path);
return Err(eyre::eyre!("File transfer cancelled by user"));
}
res = file.read(&mut buf[..read_len]) => {
res?
}
};
if bytes_read == 0 { if bytes_read == 0 {
if !expect_exact { if !expect_exact {
transfer_complete = true; transfer_complete = true;
@@ -59,7 +93,19 @@ async fn stream_file_bytes(
break; break;
} }
tx.send(Bytes::copy_from_slice(&buf[..bytes_read])).await?; tokio::select! {
() = cancel_token.cancelled() => {
log::info!(
"{remote_addr} transfer cancelled for {}",
validated_path.display()
);
cancel_send_stream(tx, remote_addr, &validated_path);
return Err(eyre::eyre!("File transfer cancelled by user"));
}
res = tx.send(Bytes::copy_from_slice(&buf[..bytes_read])) => {
res?;
}
}
remaining = remaining.saturating_sub(bytes_read as u64); remaining = remaining.saturating_sub(bytes_read as u64);
total_bytes += bytes_read as u64; total_bytes += bytes_read as u64;
@@ -97,13 +143,22 @@ async fn stream_file_bytes(
validated_path.display() validated_path.display()
); );
match tx.close().await { tokio::select! {
() = cancel_token.cancelled() => {
log::info!("{remote_addr} transfer cancelled while closing stream");
cancel_send_stream(tx, remote_addr, &validated_path);
return Err(eyre::eyre!("File transfer cancelled by user"));
}
res = tx.close() => {
match res {
Ok(()) => {} Ok(()) => {}
Err(err) if transfer_complete && is_clean_remote_close(&err) => { Err(err) if transfer_complete && is_clean_remote_close(&err) => {
log::debug!("{remote_addr} closed stream after transfer completion: {err}"); log::debug!("{remote_addr} closed stream after transfer completion: {err}");
} }
Err(err) => return Err(err.into()), Err(err) => return Err(err.into()),
} }
}
}
Ok(()) Ok(())
} }
@@ -121,8 +176,18 @@ pub async fn send_game_file_data(
game_file_desc: &GameFileDescription, game_file_desc: &GameFileDescription,
tx: &mut SendStream, tx: &mut SendStream,
game_dir: &Path, game_dir: &Path,
cancel_token: tokio_util::sync::CancellationToken,
) { ) {
if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await { if let Err(e) = stream_file_bytes(
tx,
game_dir,
&game_file_desc.relative_path,
0,
None,
cancel_token,
)
.await
{
let remote_addr = maybe_addr!(tx.connection().remote_addr()); let remote_addr = maybe_addr!(tx.connection().remote_addr());
log::error!( log::error!(
"{remote_addr} failed to stream file {}: {e}", "{remote_addr} failed to stream file {}: {e}",
@@ -138,8 +203,18 @@ pub async fn send_game_file_chunk(
length: u64, length: u64,
tx: &mut SendStream, tx: &mut SendStream,
game_dir: &Path, game_dir: &Path,
cancel_token: tokio_util::sync::CancellationToken,
) { ) {
if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await { if let Err(e) = stream_file_bytes(
tx,
game_dir,
relative_path,
offset,
Some(length),
cancel_token,
)
.await
{
let remote_addr = maybe_addr!(tx.connection().remote_addr()); let remote_addr = maybe_addr!(tx.connection().remote_addr());
log::error!( log::error!(
"{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}" "{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}"
+141 -9
View File
@@ -7,7 +7,7 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use lanspread_db::db::{Availability, Game, GameFileDescription}; use lanspread_db::db::{Availability, Game, GameCatalog, GameFileDescription};
use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot}; use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot};
use crate::library::compute_library_digest; use crate::library::compute_library_digest;
@@ -357,6 +357,54 @@ impl PeerGameDB {
games games
} }
/// Returns catalog games aggregated from peers that advertise the expected catalog version.
#[must_use]
pub fn get_catalog_games(&self, catalog: &GameCatalog) -> Vec<Game> {
let mut aggregated: HashMap<String, Game> = HashMap::new();
let mut peer_counts: HashMap<String, u32> = HashMap::new();
for peer in self.peers.values() {
for game in peer.games.values().filter(|game| {
catalog.contains(&game.id)
&& game_matches_expected_version(game, catalog.expected_version(&game.id))
}) {
*peer_counts.entry(game.id.clone()).or_insert(0) += 1;
}
}
for peer in self.peers.values() {
for game in peer.games.values().filter(|game| {
catalog.contains(&game.id)
&& game_matches_expected_version(game, catalog.expected_version(&game.id))
}) {
aggregated
.entry(game.id.clone())
.and_modify(|existing| {
existing.peer_count = *peer_counts.get(&game.id).unwrap_or(&0);
if game.size > existing.size {
existing.size = game.size;
}
existing.set_downloaded(true);
if game.installed {
existing.installed = true;
}
})
.or_insert_with(|| {
let mut game_clone = summary_to_game(game);
if let Some(expected_version) = catalog.expected_version(&game.id) {
game_clone.eti_game_version = Some(expected_version.to_string());
}
game_clone.peer_count = *peer_counts.get(&game.id).unwrap_or(&0);
game_clone
});
}
}
let mut games: Vec<Game> = aggregated.into_values().collect();
games.sort_by(|a, b| a.name.cmp(&b.name));
games
}
/// Returns the latest version of a game across all peers. /// Returns the latest version of a game across all peers.
#[must_use] #[must_use]
pub fn get_latest_version_for_game(&self, game_id: &str) -> Option<String> { pub fn get_latest_version_for_game(&self, game_id: &str) -> Option<String> {
@@ -451,6 +499,24 @@ impl PeerGameDB {
.collect() .collect()
} }
/// Returns addresses of peers that have the expected catalog version of a game.
#[must_use]
pub fn peers_with_expected_version(
&self,
game_id: &str,
expected_version: Option<&str>,
) -> Vec<SocketAddr> {
self.peers
.iter()
.filter(|(_, peer)| {
peer.games
.get(game_id)
.is_some_and(|game| game_matches_expected_version(game, expected_version))
})
.map(|(_, peer)| peer.addr)
.collect()
}
/// Returns addresses of peers that have the latest version of a game. /// Returns addresses of peers that have the latest version of a game.
#[must_use] #[must_use]
pub fn peers_with_latest_version(&self, game_id: &str) -> Vec<SocketAddr> { pub fn peers_with_latest_version(&self, game_id: &str) -> Vec<SocketAddr> {
@@ -514,11 +580,33 @@ impl PeerGameDB {
.collect() .collect()
} }
/// Returns file descriptions from peers that advertise the expected catalog version.
#[must_use]
pub fn expected_version_game_files_for(
&self,
game_id: &str,
expected_version: Option<&str>,
) -> Vec<(SocketAddr, Vec<GameFileDescription>)> {
let expected_peers = self.peers_with_expected_version(game_id, expected_version);
if expected_peers.is_empty() {
return Vec::new();
}
self.game_files_for(game_id)
.into_iter()
.filter(|(addr, _)| expected_peers.contains(addr))
.collect()
}
/// Returns aggregated file descriptions for a game across all peers. /// Returns aggregated file descriptions for a game across all peers.
#[must_use] #[must_use]
pub fn aggregated_game_files(&self, game_id: &str) -> Vec<GameFileDescription> { pub fn aggregated_game_files(
&self,
game_id: &str,
expected_version: Option<&str>,
) -> Vec<GameFileDescription> {
let mut seen: HashMap<String, GameFileDescription> = HashMap::new(); let mut seen: HashMap<String, GameFileDescription> = HashMap::new();
for (_, files) in self.latest_game_files_for(game_id) { for (_, files) in self.expected_version_game_files_for(game_id, expected_version) {
for file in files { for file in files {
seen.entry(file.relative_path.clone()).or_insert(file); seen.entry(file.relative_path.clone()).or_insert(file);
} }
@@ -559,8 +647,9 @@ impl PeerGameDB {
pub fn validate_file_sizes_majority( pub fn validate_file_sizes_majority(
&self, &self,
game_id: &str, game_id: &str,
expected_version: Option<&str>,
) -> eyre::Result<MajorityValidationResult> { ) -> eyre::Result<MajorityValidationResult> {
let game_files = self.latest_game_files_for(game_id); let game_files = self.expected_version_game_files_for(game_id, expected_version);
if game_files.is_empty() { if game_files.is_empty() {
return Ok((Vec::new(), Vec::new(), HashMap::new())); return Ok((Vec::new(), Vec::new(), HashMap::new()));
} }
@@ -813,6 +902,14 @@ fn game_is_ready(summary: &GameSummary) -> bool {
summary.availability == Availability::Ready summary.availability == Availability::Ready
} }
fn game_matches_expected_version(summary: &GameSummary, expected_version: Option<&str>) -> bool {
if !game_is_ready(summary) {
return false;
}
expected_version.is_none_or(|expected| summary.eti_version.as_deref() == Some(expected))
}
fn summary_to_game(summary: &GameSummary) -> Game { fn summary_to_game(summary: &GameSummary) -> Game {
let eti_game_version = game_is_ready(summary) let eti_game_version = game_is_ready(summary)
.then(|| summary.eti_version.clone()) .then(|| summary.eti_version.clone())
@@ -925,6 +1022,41 @@ mod tests {
assert!(db.peers_with_latest_version("game").is_empty()); assert!(db.peers_with_latest_version("game").is_empty());
} }
#[test]
fn catalog_aggregation_counts_only_expected_version_peers() {
let old_addr = addr(12003);
let expected_addr = addr(12004);
let newer_addr = addr(12005);
let mut db = PeerGameDB::new();
db.upsert_peer("old".to_string(), old_addr);
db.upsert_peer("expected".to_string(), expected_addr);
db.upsert_peer("newer".to_string(), newer_addr);
db.update_peer_games(
&"old".to_string(),
vec![summary("game", "20240101", Availability::Ready)],
);
db.update_peer_games(
&"expected".to_string(),
vec![summary("game", "20250101", Availability::Ready)],
);
db.update_peer_games(
&"newer".to_string(),
vec![summary("game", "20260101", Availability::Ready)],
);
let mut catalog = GameCatalog::empty();
catalog.insert("game".to_string(), Some("20250101".to_string()));
let games = db.get_catalog_games(&catalog);
assert_eq!(games.len(), 1);
assert_eq!(games[0].peer_count, 1);
assert_eq!(games[0].eti_game_version.as_deref(), Some("20250101"));
assert_eq!(
db.peers_with_expected_version("game", Some("20250101")),
vec![expected_addr]
);
}
#[test] #[test]
fn transport_addr_matches_known_peer_on_ephemeral_port() { fn transport_addr_matches_known_peer_on_ephemeral_port() {
let advertised = ip_addr([10, 66, 0, 2], 40000); let advertised = ip_addr([10, 66, 0, 2], 40000);
@@ -979,7 +1111,7 @@ mod tests {
} }
#[test] #[test]
fn validation_uses_latest_version_file_metadata() { fn validation_uses_expected_version_file_metadata() {
let old_addr = addr(12003); let old_addr = addr(12003);
let new_addr = addr(12004); let new_addr = addr(12004);
let mut db = PeerGameDB::new(); let mut db = PeerGameDB::new();
@@ -1010,21 +1142,21 @@ mod tests {
], ],
); );
let aggregated = db.aggregated_game_files("game"); let aggregated = db.aggregated_game_files("game", Some("20250101"));
let archive = aggregated let archive = aggregated
.iter() .iter()
.find(|desc| desc.relative_path == "game/archive.eti") .find(|desc| desc.relative_path == "game/archive.eti")
.expect("latest archive should be present"); .expect("expected-version archive should be present");
assert_eq!(archive.size, 20); assert_eq!(archive.size, 20);
let (validated, peers, file_peer_map) = db let (validated, peers, file_peer_map) = db
.validate_file_sizes_majority("game") .validate_file_sizes_majority("game", Some("20250101"))
.expect("old-version file metadata should not create ambiguity"); .expect("old-version file metadata should not create ambiguity");
assert_eq!(peers, vec![new_addr]); assert_eq!(peers, vec![new_addr]);
let archive = validated let archive = validated
.iter() .iter()
.find(|desc| desc.relative_path == "game/archive.eti") .find(|desc| desc.relative_path == "game/archive.eti")
.expect("latest archive should validate"); .expect("expected-version archive should validate");
assert_eq!(archive.size, 20); assert_eq!(archive.size, 20);
assert_eq!(file_peer_map.get("game/archive.eti"), Some(&vec![new_addr])); assert_eq!(file_peer_map.get("game/archive.eti"), Some(&vec![new_addr]));
} }
@@ -2,6 +2,7 @@
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use lanspread_db::db::GameCatalog;
use lanspread_proto::{Hello, HelloAck, PROTOCOL_VERSION}; use lanspread_proto::{Hello, HelloAck, PROTOCOL_VERSION};
use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio::sync::{RwLock, mpsc::UnboundedSender};
@@ -22,6 +23,7 @@ pub(crate) struct HandshakeCtx {
local_library: Arc<RwLock<LocalLibraryState>>, local_library: Arc<RwLock<LocalLibraryState>>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
tx_notify_ui: UnboundedSender<PeerEvent>, tx_notify_ui: UnboundedSender<PeerEvent>,
catalog: Arc<RwLock<GameCatalog>>,
} }
impl HandshakeCtx { impl HandshakeCtx {
@@ -32,6 +34,7 @@ impl HandshakeCtx {
local_library: ctx.local_library.clone(), local_library: ctx.local_library.clone(),
peer_game_db: ctx.peer_game_db.clone(), peer_game_db: ctx.peer_game_db.clone(),
tx_notify_ui: tx_notify_ui.clone(), tx_notify_ui: tx_notify_ui.clone(),
catalog: ctx.catalog.clone(),
} }
} }
@@ -42,6 +45,7 @@ impl HandshakeCtx {
local_library: ctx.local_library.clone(), local_library: ctx.local_library.clone(),
peer_game_db: ctx.peer_game_db.clone(), peer_game_db: ctx.peer_game_db.clone(),
tx_notify_ui: ctx.tx_notify_ui.clone(), tx_notify_ui: ctx.tx_notify_ui.clone(),
catalog: ctx.catalog.clone(),
} }
} }
} }
@@ -121,7 +125,7 @@ pub(crate) async fn perform_handshake_with_peer(
.await; .await;
after_peer_library_recorded(&ctx, upsert, record_addr).await; after_peer_library_recorded(&ctx, upsert, record_addr).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await;
Ok(()) Ok(())
} }
@@ -156,7 +160,7 @@ pub(super) async fn accept_inbound_hello(
.await; .await;
after_peer_library_recorded(&handshake_ctx, upsert, addr).await; after_peer_library_recorded(&handshake_ctx, upsert, addr).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await;
build_hello_ack(ctx).await build_hello_ack(ctx).await
} }
@@ -201,12 +205,13 @@ async fn after_peer_library_recorded(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
collections::{HashMap, HashSet}, collections::HashMap,
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use lanspread_db::db::GameCatalog;
use lanspread_proto::{Availability, GameSummary, Hello, LibrarySnapshot, PROTOCOL_VERSION}; use lanspread_proto::{Availability, GameSummary, Hello, LibrarySnapshot, PROTOCOL_VERSION};
use tokio::sync::{RwLock, mpsc}; use tokio::sync::{RwLock, mpsc};
use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -242,6 +247,7 @@ mod tests {
local_library: Arc::new(RwLock::new(LocalLibraryState::empty())), local_library: Arc::new(RwLock::new(LocalLibraryState::empty())),
peer_game_db, peer_game_db,
tx_notify_ui, tx_notify_ui,
catalog: Arc::new(RwLock::new(GameCatalog::empty())),
} }
} }
@@ -301,6 +307,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn inbound_hello_applies_remote_library_snapshot() { async fn inbound_hello_applies_remote_library_snapshot() {
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let mut catalog = GameCatalog::empty();
catalog.insert("remote-game".to_string(), Some("20250101".to_string()));
let ctx = Ctx::new( let ctx = Ctx::new(
peer_game_db.clone(), peer_game_db.clone(),
"local-peer".to_string(), "local-peer".to_string(),
@@ -309,7 +317,8 @@ mod tests {
Arc::new(NoopUnpacker), Arc::new(NoopUnpacker),
CancellationToken::new(), CancellationToken::new(),
TaskTracker::new(), TaskTracker::new(),
Arc::new(RwLock::new(HashSet::new())), Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(HashMap::new())),
); );
*ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000)); *ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000));
+12 -2
View File
@@ -2,6 +2,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration}; use std::{collections::HashMap, sync::Arc, time::Duration};
use lanspread_db::db::GameCatalog;
use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio::sync::{RwLock, mpsc::UnboundedSender};
use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -18,6 +19,7 @@ use crate::{
pub async fn run_ping_service( pub async fn run_ping_service(
tx_notify_ui: UnboundedSender<PeerEvent>, tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<GameCatalog>>,
active_operations: Arc<RwLock<HashMap<String, OperationKind>>>, active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>, active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
shutdown: CancellationToken, shutdown: CancellationToken,
@@ -40,6 +42,7 @@ pub async fn run_ping_service(
ping_idle_peers( ping_idle_peers(
&peer_game_db, &peer_game_db,
&catalog,
&active_operations, &active_operations,
&active_downloads, &active_downloads,
&tx_notify_ui, &tx_notify_ui,
@@ -50,6 +53,7 @@ pub async fn run_ping_service(
prune_stale_peers( prune_stale_peers(
&peer_game_db, &peer_game_db,
&catalog,
&active_operations, &active_operations,
&active_downloads, &active_downloads,
&tx_notify_ui, &tx_notify_ui,
@@ -60,6 +64,7 @@ pub async fn run_ping_service(
async fn ping_idle_peers( async fn ping_idle_peers(
peer_game_db: &Arc<RwLock<PeerGameDB>>, peer_game_db: &Arc<RwLock<PeerGameDB>>,
catalog: &Arc<RwLock<GameCatalog>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>, active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>, active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
@@ -75,6 +80,7 @@ async fn ping_idle_peers(
let tx_notify_ui = tx_notify_ui.clone(); let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = peer_game_db.clone(); let peer_game_db = peer_game_db.clone();
let catalog = catalog.clone();
let active_operations = active_operations.clone(); let active_operations = active_operations.clone();
let active_downloads = active_downloads.clone(); let active_downloads = active_downloads.clone();
let shutdown = shutdown.clone(); let shutdown = shutdown.clone();
@@ -93,6 +99,7 @@ async fn ping_idle_peers(
log::warn!("Peer {peer_addr} failed ping check"); log::warn!("Peer {peer_addr} failed ping check");
remove_peer_and_refresh( remove_peer_and_refresh(
&peer_game_db, &peer_game_db,
&catalog,
&active_operations, &active_operations,
&active_downloads, &active_downloads,
&tx_notify_ui, &tx_notify_ui,
@@ -105,6 +112,7 @@ async fn ping_idle_peers(
log::error!("Failed to ping peer {peer_addr}: {err}"); log::error!("Failed to ping peer {peer_addr}: {err}");
remove_peer_and_refresh( remove_peer_and_refresh(
&peer_game_db, &peer_game_db,
&catalog,
&active_operations, &active_operations,
&active_downloads, &active_downloads,
&tx_notify_ui, &tx_notify_ui,
@@ -120,6 +128,7 @@ async fn ping_idle_peers(
async fn prune_stale_peers( async fn prune_stale_peers(
peer_game_db: &Arc<RwLock<PeerGameDB>>, peer_game_db: &Arc<RwLock<PeerGameDB>>,
catalog: &Arc<RwLock<GameCatalog>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>, active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>, active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
@@ -137,7 +146,7 @@ async fn prune_stale_peers(
} }
if removed_any { if removed_any {
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; events::emit_peer_game_list(peer_game_db, catalog, tx_notify_ui).await;
handle_active_downloads_without_peers( handle_active_downloads_without_peers(
peer_game_db, peer_game_db,
active_operations, active_operations,
@@ -150,6 +159,7 @@ async fn prune_stale_peers(
async fn remove_peer_and_refresh( async fn remove_peer_and_refresh(
peer_game_db: &Arc<RwLock<PeerGameDB>>, peer_game_db: &Arc<RwLock<PeerGameDB>>,
catalog: &Arc<RwLock<GameCatalog>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>, active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>, active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>, tx_notify_ui: &UnboundedSender<PeerEvent>,
@@ -157,7 +167,7 @@ async fn remove_peer_and_refresh(
log_label: &str, log_label: &str,
) { ) {
if remove_peer(peer_game_db, tx_notify_ui, peer_id, log_label).await { if remove_peer(peer_game_db, tx_notify_ui, peer_id, log_label).await {
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; events::emit_peer_game_list(peer_game_db, catalog, tx_notify_ui).await;
handle_active_downloads_without_peers( handle_active_downloads_without_peers(
peer_game_db, peer_game_db,
active_operations, active_operations,
@@ -336,12 +336,12 @@ fn should_ignore_game_child(name: &str) -> bool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
collections::HashSet,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use lanspread_db::db::GameCatalog;
use notify::{ use notify::{
EventKind, EventKind,
event::{AccessKind, AccessMode}, event::{AccessKind, AccessMode},
@@ -373,7 +373,7 @@ mod tests {
std::fs::write(path, bytes).expect("file should be written"); std::fs::write(path, bytes).expect("file should be written");
} }
fn test_ctx(game_dir: PathBuf, catalog: HashSet<String>) -> Ctx { fn test_ctx(game_dir: PathBuf, catalog: GameCatalog) -> Ctx {
Ctx::new( Ctx::new(
Arc::new(RwLock::new(PeerGameDB::new())), Arc::new(RwLock::new(PeerGameDB::new())),
"peer".to_string(), "peer".to_string(),
@@ -383,6 +383,7 @@ mod tests {
CancellationToken::new(), CancellationToken::new(),
TaskTracker::new(), TaskTracker::new(),
Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(std::collections::HashMap::new())),
) )
} }
@@ -445,7 +446,7 @@ mod tests {
let temp = TempDir::new("lanspread-local-monitor"); let temp = TempDir::new("lanspread-local-monitor");
let ctx = test_ctx( let ctx = test_ctx(
temp.path().to_path_buf(), temp.path().to_path_buf(),
HashSet::from(["game".to_string()]), GameCatalog::from_ids(["game".to_string()]),
); );
ctx.active_operations ctx.active_operations
.write() .write()
@@ -480,7 +481,7 @@ mod tests {
write_file(&temp.path().join("game").join("version.ini"), b"20250101"); write_file(&temp.path().join("game").join("version.ini"), b"20250101");
let ctx = test_ctx( let ctx = test_ctx(
temp.path().to_path_buf(), temp.path().to_path_buf(),
HashSet::from(["game".to_string()]), GameCatalog::from_ids(["game".to_string()]),
); );
let gate = RescanGate::default(); let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
@@ -515,7 +516,7 @@ mod tests {
write_file(&game_root.join("version.ini"), b"20250101"); write_file(&game_root.join("version.ini"), b"20250101");
let ctx = test_ctx( let ctx = test_ctx(
temp.path().to_path_buf(), temp.path().to_path_buf(),
HashSet::from(["game".to_string()]), GameCatalog::from_ids(["game".to_string()]),
); );
let gate = RescanGate::default(); let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
@@ -551,7 +552,7 @@ mod tests {
write_file(&temp.path().join("game").join("version.ini"), b"20250101"); write_file(&temp.path().join("game").join("version.ini"), b"20250101");
let ctx = test_ctx( let ctx = test_ctx(
temp.path().to_path_buf(), temp.path().to_path_buf(),
HashSet::from(["game".to_string()]), GameCatalog::from_ids(["game".to_string()]),
); );
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
@@ -575,7 +576,7 @@ mod tests {
); );
let ctx = test_ctx( let ctx = test_ctx(
temp.path().to_path_buf(), temp.path().to_path_buf(),
HashSet::from(["game".to_string()]), GameCatalog::from_ids(["game".to_string()]),
); );
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
+162 -25
View File
@@ -12,7 +12,7 @@ use crate::{
context::PeerCtx, context::PeerCtx,
error::PeerError, error::PeerError,
events, events,
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available}, local_games::{get_game_file_descriptions, is_local_dir_name, local_download_matches_catalog},
peer::{send_game_file_chunk, send_game_file_data}, peer::{send_game_file_chunk, send_game_file_data},
services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync}, services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync},
}; };
@@ -162,7 +162,7 @@ async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelt
}; };
if applied { if applied {
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await;
} else { } else {
let addr = { let addr = {
let db = ctx.peer_game_db.read().await; let db = ctx.peer_game_db.read().await;
@@ -209,7 +209,7 @@ async fn get_game_response(ctx: &PeerCtx, id: String) -> Response {
async fn can_serve_game(ctx: &PeerCtx, game_dir: &std::path::Path, game_id: &str) -> bool { async fn can_serve_game(ctx: &PeerCtx, game_dir: &std::path::Path, game_id: &str) -> bool {
let active_operations = ctx.active_operations.read().await; let active_operations = ctx.active_operations.read().await;
let catalog = ctx.catalog.read().await; let catalog = ctx.catalog.read().await;
local_download_available(game_dir, game_id, &active_operations, &catalog).await local_download_matches_catalog(game_dir, game_id, &active_operations, &catalog).await
} }
async fn can_dispatch_file_transfer( async fn can_dispatch_file_transfer(
@@ -218,10 +218,23 @@ async fn can_dispatch_file_transfer(
game_id: &str, game_id: &str,
relative_path: &str, relative_path: &str,
) -> bool { ) -> bool {
!path_points_inside_local(game_id, relative_path) relative_path_belongs_to_game(game_id, relative_path)
&& !path_points_inside_local(game_id, relative_path)
&& can_serve_game(ctx, game_dir, game_id).await && can_serve_game(ctx, game_dir, game_id).await
} }
fn relative_path_belongs_to_game(game_id: &str, relative_path: &str) -> bool {
let normalised = relative_path.replace('\\', "/");
if normalised.starts_with('/') {
return false;
}
normalised
.split('/')
.find(|part| !part.is_empty())
.is_some_and(|first| first == game_id)
}
fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool { fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool {
let normalised = relative_path.replace('\\', "/"); let normalised = relative_path.replace('\\', "/");
let mut parts = normalised.split('/').filter(|part| !part.is_empty()); let mut parts = normalised.split('/').filter(|part| !part.is_empty());
@@ -232,6 +245,67 @@ fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool {
} }
} }
use std::sync::atomic::{AtomicU64, Ordering};
static NEXT_TRANSFER_ID: AtomicU64 = AtomicU64::new(1);
struct TransferGuard {
game_id: String,
id: u64,
active_outbound_transfers: crate::context::OutboundTransfers,
tx_notify_ui: tokio::sync::mpsc::UnboundedSender<crate::PeerEvent>,
}
impl TransferGuard {
async fn new(
game_id: String,
active_outbound_transfers: crate::context::OutboundTransfers,
tx_notify_ui: tokio::sync::mpsc::UnboundedSender<crate::PeerEvent>,
shutdown: &tokio_util::sync::CancellationToken,
) -> (Self, tokio_util::sync::CancellationToken) {
let id = NEXT_TRANSFER_ID.fetch_add(1, Ordering::SeqCst);
let token = shutdown.child_token();
{
let mut active = active_outbound_transfers.write().await;
active
.entry(game_id.clone())
.or_default()
.push((id, token.clone()));
}
let _ = tx_notify_ui.send(crate::PeerEvent::OutboundTransferCountChanged);
(
Self {
game_id,
id,
active_outbound_transfers,
tx_notify_ui,
},
token,
)
}
}
impl Drop for TransferGuard {
fn drop(&mut self) {
let game_id = self.game_id.clone();
let id = self.id;
let active_outbound_transfers = self.active_outbound_transfers.clone();
let tx_notify_ui = self.tx_notify_ui.clone();
tokio::spawn(async move {
{
let mut active = active_outbound_transfers.write().await;
if let Some(tokens) = active.get_mut(&game_id) {
tokens.retain(|(tid, _)| *tid != id);
if tokens.is_empty() {
active.remove(&game_id);
}
}
}
let _ = tx_notify_ui.send(crate::PeerEvent::OutboundTransferCountChanged);
});
}
}
async fn handle_file_data_request( async fn handle_file_data_request(
ctx: &PeerCtx, ctx: &PeerCtx,
desc: GameFileDescription, desc: GameFileDescription,
@@ -242,6 +316,14 @@ async fn handle_file_data_request(
desc.relative_path desc.relative_path
); );
let (guard, cancel_token) = TransferGuard::new(
desc.game_id.clone(),
ctx.active_outbound_transfers.clone(),
ctx.tx_notify_ui.clone(),
&ctx.shutdown,
)
.await;
let mut tx = framed_tx.into_inner(); let mut tx = framed_tx.into_inner();
let game_dir = ctx.game_dir.read().await.clone(); let game_dir = ctx.game_dir.read().await.clone();
if !can_dispatch_file_transfer(ctx, &game_dir, &desc.game_id, &desc.relative_path).await { if !can_dispatch_file_transfer(ctx, &game_dir, &desc.game_id, &desc.relative_path).await {
@@ -249,11 +331,13 @@ async fn handle_file_data_request(
"Declining GetGameFileData for {} because the game is not currently transferable", "Declining GetGameFileData for {} because the game is not currently transferable",
desc.relative_path desc.relative_path
); );
drop(guard);
let _ = tx.close().await; let _ = tx.close().await;
return FramedWrite::new(tx, LengthDelimitedCodec::new()); return FramedWrite::new(tx, LengthDelimitedCodec::new());
} }
send_game_file_data(&desc, &mut tx, &game_dir).await; send_game_file_data(&desc, &mut tx, &game_dir, cancel_token).await;
drop(guard);
FramedWrite::new(tx, LengthDelimitedCodec::new()) FramedWrite::new(tx, LengthDelimitedCodec::new())
} }
@@ -269,17 +353,36 @@ async fn handle_file_chunk_request(
"Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})" "Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
); );
let (guard, cancel_token) = TransferGuard::new(
game_id.clone(),
ctx.active_outbound_transfers.clone(),
ctx.tx_notify_ui.clone(),
&ctx.shutdown,
)
.await;
let mut tx = framed_tx.into_inner(); let mut tx = framed_tx.into_inner();
let game_dir = ctx.game_dir.read().await.clone(); let game_dir = ctx.game_dir.read().await.clone();
if !can_dispatch_file_transfer(ctx, &game_dir, &game_id, &relative_path).await { if !can_dispatch_file_transfer(ctx, &game_dir, &game_id, &relative_path).await {
log::info!( log::info!(
"Declining GetGameFileChunk for {relative_path} because the game is not currently transferable" "Declining GetGameFileChunk for {relative_path} because the game is not currently transferable"
); );
drop(guard);
let _ = tx.close().await; let _ = tx.close().await;
return FramedWrite::new(tx, LengthDelimitedCodec::new()); return FramedWrite::new(tx, LengthDelimitedCodec::new());
} }
send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &game_dir).await; send_game_file_chunk(
&game_id,
&relative_path,
offset,
length,
&mut tx,
&game_dir,
cancel_token,
)
.await;
drop(guard);
FramedWrite::new(tx, LengthDelimitedCodec::new()) FramedWrite::new(tx, LengthDelimitedCodec::new())
} }
@@ -289,17 +392,17 @@ async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option<SocketAddr>, peer_id
let Some(peer) = removed else { return }; let Some(peer) = removed else { return };
events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, peer.addr).await; events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, peer.addr).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await; events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await;
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{
collections::HashSet,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use lanspread_db::db::GameCatalog;
use tokio::sync::{RwLock, mpsc}; use tokio::sync::{RwLock, mpsc};
use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -327,7 +430,7 @@ mod tests {
std::fs::write(path, bytes).expect("file should be written"); std::fs::write(path, bytes).expect("file should be written");
} }
fn test_ctx(game_dir: PathBuf, catalog: HashSet<String>) -> PeerCtx { fn test_ctx(game_dir: PathBuf, catalog: GameCatalog) -> PeerCtx {
let (tx_notify_ui, _rx) = mpsc::unbounded_channel(); let (tx_notify_ui, _rx) = mpsc::unbounded_channel();
Ctx::new( Ctx::new(
Arc::new(RwLock::new(PeerGameDB::new())), Arc::new(RwLock::new(PeerGameDB::new())),
@@ -338,6 +441,7 @@ mod tests {
CancellationToken::new(), CancellationToken::new(),
TaskTracker::new(), TaskTracker::new(),
Arc::new(RwLock::new(catalog)), Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(std::collections::HashMap::new())),
) )
.to_peer_ctx(tx_notify_ui) .to_peer_ctx(tx_notify_ui)
} }
@@ -351,6 +455,19 @@ mod tests {
assert!(!path_points_inside_local("game", "game/archive.eti")); assert!(!path_points_inside_local("game", "game/archive.eti"));
} }
#[test]
fn transferable_paths_must_belong_to_requested_game() {
assert!(relative_path_belongs_to_game("game", "game/version.ini"));
assert!(relative_path_belongs_to_game("game", "game\\archive.eti"));
assert!(!relative_path_belongs_to_game("game", "other/archive.eti"));
assert!(!relative_path_belongs_to_game("game", "archive.eti"));
assert!(!relative_path_belongs_to_game("game", "/game/archive.eti"));
assert!(!relative_path_belongs_to_game(
"game",
"../game/archive.eti"
));
}
#[tokio::test] #[tokio::test]
async fn get_game_response_respects_serve_gates() { async fn get_game_response_respects_serve_gates() {
let temp = TempDir::new("lanspread-stream"); let temp = TempDir::new("lanspread-stream");
@@ -360,17 +477,19 @@ mod tests {
b"20250101", b"20250101",
); );
write_file(&temp.path().join("active").join("version.ini"), b"20250101"); write_file(&temp.path().join("active").join("version.ini"), b"20250101");
write_file(
&temp.path().join("wrong-version").join("version.ini"),
b"20260101",
);
std::fs::create_dir_all(temp.path().join("missing-sentinel")) std::fs::create_dir_all(temp.path().join("missing-sentinel"))
.expect("missing sentinel root should be created"); .expect("missing sentinel root should be created");
let ctx = test_ctx( let mut catalog = GameCatalog::empty();
temp.path().to_path_buf(), catalog.insert("ready".to_string(), Some("20250101".to_string()));
HashSet::from([ catalog.insert("active".to_string(), Some("20250101".to_string()));
"ready".to_string(), catalog.insert("missing-sentinel".to_string(), Some("20250101".to_string()));
"active".to_string(), catalog.insert("wrong-version".to_string(), Some("20250101".to_string()));
"missing-sentinel".to_string(), let ctx = test_ctx(temp.path().to_path_buf(), catalog);
]),
);
ctx.active_operations ctx.active_operations
.write() .write()
.await .await
@@ -388,6 +507,10 @@ mod tests {
get_game_response(&ctx, "active".to_string()).await, get_game_response(&ctx, "active".to_string()).await,
Response::GameNotFound(id) if id == "active" Response::GameNotFound(id) if id == "active"
)); ));
assert!(matches!(
get_game_response(&ctx, "wrong-version".to_string()).await,
Response::GameNotFound(id) if id == "wrong-version"
));
assert!(matches!( assert!(matches!(
get_game_response(&ctx, "missing-sentinel".to_string()).await, get_game_response(&ctx, "missing-sentinel".to_string()).await,
Response::GameNotFound(id) if id == "missing-sentinel" Response::GameNotFound(id) if id == "missing-sentinel"
@@ -403,23 +526,28 @@ mod tests {
b"20250101", b"20250101",
); );
write_file(&temp.path().join("active").join("version.ini"), b"20250101"); write_file(&temp.path().join("active").join("version.ini"), b"20250101");
write_file(
&temp.path().join("wrong-version").join("version.ini"),
b"20260101",
);
std::fs::create_dir_all(temp.path().join("missing-sentinel")) std::fs::create_dir_all(temp.path().join("missing-sentinel"))
.expect("missing sentinel root should be created"); .expect("missing sentinel root should be created");
let ctx = test_ctx( let mut catalog = GameCatalog::empty();
temp.path().to_path_buf(), catalog.insert("ready".to_string(), Some("20250101".to_string()));
HashSet::from([ catalog.insert("active".to_string(), Some("20250101".to_string()));
"ready".to_string(), catalog.insert("missing-sentinel".to_string(), Some("20250101".to_string()));
"active".to_string(), catalog.insert("wrong-version".to_string(), Some("20250101".to_string()));
"missing-sentinel".to_string(), let ctx = test_ctx(temp.path().to_path_buf(), catalog);
]),
);
ctx.active_operations ctx.active_operations
.write() .write()
.await .await
.insert("active".to_string(), OperationKind::Downloading); .insert("active".to_string(), OperationKind::Downloading);
assert!(can_dispatch_file_transfer(&ctx, temp.path(), "ready", "ready/version.ini").await); assert!(can_dispatch_file_transfer(&ctx, temp.path(), "ready", "ready/version.ini").await);
assert!(
!can_dispatch_file_transfer(&ctx, temp.path(), "ready", "active/version.ini").await
);
assert!( assert!(
!can_dispatch_file_transfer( !can_dispatch_file_transfer(
&ctx, &ctx,
@@ -432,6 +560,15 @@ mod tests {
assert!( assert!(
!can_dispatch_file_transfer(&ctx, temp.path(), "active", "active/version.ini").await !can_dispatch_file_transfer(&ctx, temp.path(), "active", "active/version.ini").await
); );
assert!(
!can_dispatch_file_transfer(
&ctx,
temp.path(),
"wrong-version",
"wrong-version/version.ini",
)
.await
);
assert!( assert!(
!can_dispatch_file_transfer( !can_dispatch_file_transfer(
&ctx, &ctx,
+7 -1
View File
@@ -11,6 +11,7 @@ use std::{
}; };
use futures::FutureExt as _; use futures::FutureExt as _;
use lanspread_db::db::GameCatalog;
use tokio::sync::{ use tokio::sync::{
RwLock, RwLock,
mpsc::{UnboundedReceiver, UnboundedSender}, mpsc::{UnboundedReceiver, UnboundedSender},
@@ -84,7 +85,8 @@ pub(crate) fn spawn_peer_runtime(
game_dir: PathBuf, game_dir: PathBuf,
state_dir: PathBuf, state_dir: PathBuf,
unpacker: Arc<dyn Unpacker>, unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<std::collections::HashSet<String>>>, catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: crate::context::OutboundTransfers,
) -> PeerRuntimeHandle { ) -> PeerRuntimeHandle {
let shutdown = CancellationToken::new(); let shutdown = CancellationToken::new();
let task_tracker = TaskTracker::new(); let task_tracker = TaskTracker::new();
@@ -104,6 +106,7 @@ pub(crate) fn spawn_peer_runtime(
runtime_shutdown.clone(), runtime_shutdown.clone(),
runtime_tracker.clone(), runtime_tracker.clone(),
catalog, catalog,
active_outbound_transfers,
) )
.await .await
{ {
@@ -190,6 +193,7 @@ fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEv
fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) { fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
let tx_notify_ui = tx_notify_ui.clone(); let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = ctx.peer_game_db.clone(); let peer_game_db = ctx.peer_game_db.clone();
let catalog = ctx.catalog.clone();
let active_operations = ctx.active_operations.clone(); let active_operations = ctx.active_operations.clone();
let active_downloads = ctx.active_downloads.clone(); let active_downloads = ctx.active_downloads.clone();
let shutdown = ctx.shutdown.clone(); let shutdown = ctx.shutdown.clone();
@@ -207,6 +211,7 @@ fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
move || { move || {
let tx_notify_ui = tx_notify_ui.clone(); let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = peer_game_db.clone(); let peer_game_db = peer_game_db.clone();
let catalog = catalog.clone();
let active_operations = active_operations.clone(); let active_operations = active_operations.clone();
let active_downloads = active_downloads.clone(); let active_downloads = active_downloads.clone();
let shutdown = shutdown.clone(); let shutdown = shutdown.clone();
@@ -215,6 +220,7 @@ fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
run_ping_service( run_ping_service(
tx_notify_ui, tx_notify_ui,
peer_game_db, peer_game_db,
catalog,
active_operations, active_operations,
active_downloads, active_downloads,
shutdown, shutdown,
@@ -44,6 +44,7 @@ tauri-plugin-shell = { workspace = true }
tauri-plugin-dialog = { workspace = true } tauri-plugin-dialog = { workspace = true }
tauri-plugin-store = { workspace = true } tauri-plugin-store = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
tokio-util = { workspace = true }
walkdir = { workspace = true } walkdir = { workspace = true }
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
@@ -3,12 +3,12 @@ use std::{
net::SocketAddr, net::SocketAddr,
path::{Component, Path, PathBuf}, path::{Component, Path, PathBuf},
sync::{Arc, OnceLock}, sync::{Arc, OnceLock},
time::{SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
use eyre::bail; use eyre::bail;
use lanspread_compat::eti::get_games; use lanspread_compat::eti::get_games;
use lanspread_db::db::{Availability, Game, GameDB, GameFileDescription}; use lanspread_db::db::{Availability, Game, GameCatalog, GameDB, GameFileDescription};
use lanspread_peer::{ use lanspread_peer::{
ActiveOperation, ActiveOperation,
ActiveOperationKind, ActiveOperationKind,
@@ -31,6 +31,42 @@ use tokio::sync::{
// Learn more about Tauri commands at https://tauri.app/develop/calling-rust/ // Learn more about Tauri commands at https://tauri.app/develop/calling-rust/
type OutboundTransfers =
Arc<RwLock<std::collections::HashMap<String, Vec<(u64, tokio_util::sync::CancellationToken)>>>>;
const OUTBOUND_TRANSFER_EMIT_DEBOUNCE: Duration = Duration::from_millis(100);
#[derive(Default)]
struct OutboundTransferEmitState {
scheduled: bool,
generation: u64,
}
impl OutboundTransferEmitState {
fn record_change(&mut self) -> bool {
self.generation = self.generation.saturating_add(1);
if self.scheduled {
return false;
}
self.scheduled = true;
true
}
fn observed_generation(&self) -> u64 {
self.generation
}
fn finish_emit(&mut self, observed_generation: u64) -> bool {
if self.generation != observed_generation {
return true;
}
self.scheduled = false;
false
}
}
/// Tauri-managed runtime state shared by commands and setup tasks. /// Tauri-managed runtime state shared by commands and setup tasks.
#[derive(Default)] #[derive(Default)]
struct LanSpreadState { struct LanSpreadState {
@@ -40,9 +76,11 @@ struct LanSpreadState {
active_operations: Arc<RwLock<HashMap<String, UiOperationKind>>>, active_operations: Arc<RwLock<HashMap<String, UiOperationKind>>>,
games_folder: Arc<RwLock<String>>, games_folder: Arc<RwLock<String>>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<HashSet<String>>>, catalog: Arc<RwLock<GameCatalog>>,
unpack_logs: Arc<RwLock<Vec<UnpackLogEntry>>>, unpack_logs: Arc<RwLock<Vec<UnpackLogEntry>>>,
state_dir: OnceLock<PathBuf>, state_dir: OnceLock<PathBuf>,
active_outbound_transfers: OutboundTransfers,
outbound_transfer_emit: Arc<RwLock<OutboundTransferEmitState>>,
} }
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
@@ -79,6 +117,7 @@ struct LauncherGame {
#[serde(flatten)] #[serde(flatten)]
game: Game, game: Game,
can_host_server: bool, can_host_server: bool,
active_outbound_transfers: usize,
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@@ -829,6 +868,24 @@ fn apply_peer_local_games(game_db: &mut GameDB, local_games: &[Game]) {
} }
} }
fn apply_peer_remote_games(game_db: &mut GameDB, peer_games: Vec<Game>) {
// Peer events update availability, but catalog metadata stays anchored to game.db.
for game in game_db.games.values_mut() {
game.peer_count = 0;
}
for peer_game in peer_games {
if let Some(existing) = game_db.get_mut_game_by_id(&peer_game.id) {
existing.peer_count = peer_game.peer_count;
} else {
log::debug!(
"Peer advertised unknown game {id}; ignoring because game.db is ground truth",
id = peer_game.id
);
}
}
}
fn clear_all_local_game_states(game_db: &mut GameDB) { fn clear_all_local_game_states(game_db: &mut GameDB) {
for game in game_db.games.values_mut() { for game in game_db.games.values_mut() {
clear_local_game_state(game); clear_local_game_state(game);
@@ -847,17 +904,24 @@ async fn emit_games_list(app_handle: &AppHandle) {
return; return;
} }
let active_transfers = state.active_outbound_transfers.read().await;
let games_to_emit = game_db let games_to_emit = game_db
.all_games() .all_games()
.into_iter() .into_iter()
.cloned() .cloned()
.map(|game| LauncherGame { .map(|game| {
let active_outbound_transfers = active_transfers.get(&game.id).map_or(0, Vec::len);
LauncherGame {
can_host_server: game_can_host_server(&games_folder, &game), can_host_server: game_can_host_server(&games_folder, &game),
active_outbound_transfers,
game, game,
}
}) })
.collect::<Vec<LauncherGame>>(); .collect::<Vec<LauncherGame>>();
drop(game_db); drop(game_db);
drop(active_transfers);
let active_operations = { let active_operations = {
let active_operations = state.active_operations.read().await; let active_operations = state.active_operations.read().await;
@@ -996,36 +1060,7 @@ async fn update_game_db(games: Vec<Game>, app: AppHandle) {
{ {
let mut game_db = state.games.write().await; let mut game_db = state.games.write().await;
apply_peer_remote_games(&mut game_db, games);
// Reset peer counts up front. Presence/metadata stay anchored to the baked game.db.
for game in game_db.games.values_mut() {
game.peer_count = 0;
}
for peer_game in games {
if let Some(existing) = game_db.get_mut_game_by_id(&peer_game.id) {
existing.peer_count = peer_game.peer_count;
if let Some(peer_version) = &peer_game.eti_game_version {
match &existing.eti_game_version {
Some(current_version) if current_version >= peer_version => {}
_ => {
existing.eti_game_version = Some(peer_version.clone());
log::debug!(
"Updated eti_game_version for {} to {} based on peer data",
peer_game.id,
peer_version
);
}
}
}
} else {
log::debug!(
"Peer advertised unknown game {id}; ignoring because game.db is ground truth",
id = peer_game.id
);
}
}
} }
emit_games_list(&app).await; emit_games_list(&app).await;
@@ -1399,7 +1434,7 @@ async fn ensure_bundled_game_db_loaded(app_handle: &AppHandle) {
if needs_load { if needs_load {
let game_db = load_bundled_game_db(app_handle).await; let game_db = load_bundled_game_db(app_handle).await;
let catalog = game_db.games.keys().cloned().collect::<HashSet<_>>(); let catalog = GameCatalog::from_game_db(&game_db);
*state.games.write().await = game_db; *state.games.write().await = game_db;
*state.catalog.write().await = catalog; *state.catalog.write().await = catalog;
} }
@@ -1432,6 +1467,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) {
state.catalog.clone(), state.catalog.clone(),
PeerStartOptions { PeerStartOptions {
state_dir: Some(state_dir), state_dir: Some(state_dir),
active_outbound_transfers: Some(state.active_outbound_transfers.clone()),
}, },
) { ) {
Ok(handle) => { Ok(handle) => {
@@ -1469,6 +1505,44 @@ fn spawn_peer_event_loop(app_handle: AppHandle, mut rx_peer_event: UnboundedRece
}); });
} }
async fn schedule_outbound_transfer_emit(app_handle: &AppHandle) {
let state = app_handle.state::<LanSpreadState>();
let should_spawn = {
let mut emit_state = state.outbound_transfer_emit.write().await;
emit_state.record_change()
};
if !should_spawn {
return;
}
let app_handle = app_handle.clone();
tauri::async_runtime::spawn(async move {
loop {
tokio::time::sleep(OUTBOUND_TRANSFER_EMIT_DEBOUNCE).await;
let observed_generation = {
let state = app_handle.state::<LanSpreadState>();
state
.outbound_transfer_emit
.read()
.await
.observed_generation()
};
emit_games_list(&app_handle).await;
let needs_follow_up_emit = {
let state = app_handle.state::<LanSpreadState>();
let mut emit_state = state.outbound_transfer_emit.write().await;
emit_state.finish_emit(observed_generation)
};
if !needs_follow_up_emit {
break;
}
}
});
}
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) { async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
match event { match event {
@@ -1495,6 +1569,10 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
} }
emit_games_list(app_handle).await; emit_games_list(app_handle).await;
} }
PeerEvent::OutboundTransferCountChanged => {
log::info!("PeerEvent::OutboundTransferCountChanged received");
schedule_outbound_transfer_emit(app_handle).await;
}
PeerEvent::GotGameFiles { PeerEvent::GotGameFiles {
id, id,
file_descriptions, file_descriptions,
@@ -1747,6 +1825,33 @@ mod tests {
} }
} }
fn eti_game_fixture(game_id: &str, game_version: &str) -> lanspread_compat::eti::EtiGame {
lanspread_compat::eti::EtiGame {
game_id: game_id.to_string(),
game_title: "Catalog Game".to_string(),
game_key: "catalog-game".to_string(),
game_release: "2000".to_string(),
game_publisher: "publisher".to_string(),
game_size: 1.0,
game_readme_de: "description".to_string(),
game_readme_en: "description".to_string(),
game_readme_fr: "description".to_string(),
game_maxplayers: 4,
game_master_req: 0,
genre_de: "genre".to_string(),
game_version: game_version.to_string(),
}
}
#[test]
fn eti_game_conversion_uses_catalog_version_as_authoritative_eti_version() {
let game = Game::from(eti_game_fixture("alpha", "20200721"));
assert_eq!(game.version, "20200721");
assert_eq!(game.eti_game_version.as_deref(), Some("20200721"));
assert_eq!(game.local_version, None);
}
#[test] #[test]
fn terminal_log_cleanup_preserves_crlf_and_collapses_redrawn_lines() { fn terminal_log_cleanup_preserves_crlf_and_collapses_redrawn_lines() {
let input = "Extracting foo 10%\rExtracting foo 80%\rExtracting foo OK\r\nAll done\r\n"; let input = "Extracting foo 10%\rExtracting foo 80%\rExtracting foo OK\r\nAll done\r\n";
@@ -1901,6 +2006,32 @@ mod tests {
); );
} }
#[test]
fn outbound_transfer_emit_state_coalesces_bursts_without_losing_updates() {
let mut state = OutboundTransferEmitState::default();
assert!(
state.record_change(),
"first change should schedule an emit"
);
assert_eq!(state.observed_generation(), 1);
assert!(
!state.record_change(),
"second change should reuse the scheduled emit"
);
assert_eq!(state.observed_generation(), 2);
assert!(
state.finish_emit(1),
"a generation observed before the latest change needs a follow-up emit"
);
assert!(
!state.finish_emit(2),
"the latest observed generation clears the scheduled emit"
);
assert!(state.record_change(), "a later burst should schedule again");
}
#[test] #[test]
fn game_file_viewer_ids_must_be_single_path_components() { fn game_file_viewer_ids_must_be_single_path_components() {
assert!(is_single_component_game_id("game")); assert!(is_single_component_game_id("game"));
@@ -2048,6 +2179,42 @@ mod tests {
assert!(game_db.get_game_by_id("unknown").is_none()); assert!(game_db.get_game_by_id("unknown").is_none());
} }
#[test]
fn peer_remote_snapshot_updates_counts_without_overwriting_catalog_version() {
let mut alpha = game_fixture("alpha", "Catalog Alpha");
alpha.size = 999;
alpha.eti_game_version = Some("20200721".to_string());
let mut beta = game_fixture("beta", "Catalog Beta");
beta.peer_count = 2;
beta.eti_game_version = Some("20200101".to_string());
let mut game_db = GameDB::from(vec![alpha, beta]);
let mut peer_alpha = game_fixture("alpha", "Peer Alpha");
peer_alpha.size = 42;
peer_alpha.peer_count = 3;
peer_alpha.eti_game_version = Some("20990101".to_string());
let mut unknown = game_fixture("unknown", "Unknown");
unknown.peer_count = 1;
unknown.eti_game_version = Some("20990101".to_string());
apply_peer_remote_games(&mut game_db, vec![peer_alpha, unknown]);
let alpha = game_db.get_game_by_id("alpha").expect("alpha remains");
assert_eq!(alpha.name, "Catalog Alpha");
assert_eq!(alpha.size, 999);
assert_eq!(alpha.peer_count, 3);
assert_eq!(alpha.eti_game_version.as_deref(), Some("20200721"));
let beta = game_db.get_game_by_id("beta").expect("beta remains");
assert_eq!(beta.peer_count, 0);
assert_eq!(beta.eti_game_version.as_deref(), Some("20200101"));
assert!(game_db.get_game_by_id("unknown").is_none());
}
} }
#[allow(clippy::missing_panics_doc)] #[allow(clippy::missing_panics_doc)]
@@ -3,6 +3,7 @@ import { JSX, KeyboardEvent } from 'react';
import { Game } from '../../lib/types'; import { Game } from '../../lib/types';
import { CoverAspect } from '../../hooks/useSettings'; import { CoverAspect } from '../../hooks/useSettings';
import { formatBytes } from '../../lib/format'; import { formatBytes } from '../../lib/format';
import { hasNewerLocalVersion } from '../../lib/gameState';
import { GameCover } from './GameCover'; import { GameCover } from './GameCover';
import { StateChip } from '../StateChip'; import { StateChip } from '../StateChip';
@@ -42,6 +43,14 @@ export const GameCard = ({
onOpen(game); onOpen(game);
} }
}; };
const newerThanExpected = hasNewerLocalVersion(game);
const hasOutbound = game.active_outbound_transfers !== undefined && game.active_outbound_transfers > 0;
const statusMessage = hasOutbound
? `Sharing to ${game.active_outbound_transfers} peer${game.active_outbound_transfers === 1 ? '' : 's'}`
: (game.status_message ?? (newerThanExpected ? 'Newer than expected' : ''));
const statusLevel = hasOutbound
? 'info'
: (game.status_level ?? (newerThanExpected ? 'warning' : undefined));
return ( return (
<button <button
@@ -66,8 +75,8 @@ export const GameCard = ({
<div className="card-meta"> <div className="card-meta">
{metaSeparator(formatBytes(game.size), game.genre || null)} {metaSeparator(formatBytes(game.size), game.genre || null)}
</div> </div>
<div className={`card-status${game.status_level === 'error' ? ' is-error' : ''}`}> <div className={`card-status${statusLevel ? ` is-${statusLevel}` : ''}`}>
{game.status_message ?? ''} {statusMessage}
</div> </div>
<ActionButton <ActionButton
game={game} game={game}
@@ -5,7 +5,7 @@ import { StateChip } from '../StateChip';
import { ActionButton } from '../ActionButton'; import { ActionButton } from '../ActionButton';
import { Game, InstallStatus } from '../../lib/types'; import { Game, InstallStatus } from '../../lib/types';
import { deriveState, isInProgress } from '../../lib/gameState'; import { deriveState, hasNewerLocalVersion, isInProgress } from '../../lib/gameState';
import { formatBytes, formatEtiVersion, formatPlayers } from '../../lib/format'; import { formatBytes, formatEtiVersion, formatPlayers } from '../../lib/format';
interface Props { interface Props {
@@ -59,6 +59,18 @@ export const GameDetailModal = ({
|| game.installed || game.installed
|| game.install_status === InstallStatus.Downloading || game.install_status === InstallStatus.Downloading
|| game.install_status === InstallStatus.Installing; || game.install_status === InstallStatus.Installing;
const newerThanExpected = hasNewerLocalVersion(game);
const newerStatus = newerThanExpected
? `Local version ${formatEtiVersion(game.local_version)} is newer than expected ${formatEtiVersion(game.eti_game_version)}.`
: undefined;
const hasOutbound = game.active_outbound_transfers !== undefined && game.active_outbound_transfers > 0;
const outboundStatus = hasOutbound
? `Sharing to ${game.active_outbound_transfers} peer${game.active_outbound_transfers === 1 ? '' : 's'}.`
: undefined;
const statusMessage = outboundStatus ?? game.status_message ?? newerStatus;
const statusLevel = hasOutbound
? 'info'
: (game.status_level ?? (newerStatus ? 'warning' : undefined));
return ( return (
<Modal onClose={onClose}> <Modal onClose={onClose}>
<button className="modal-close" type="button" onClick={onClose} aria-label="Close"> <button className="modal-close" type="button" onClick={onClose} aria-label="Close">
@@ -95,7 +107,7 @@ export const GameDetailModal = ({
<div className="meta-cell"> <div className="meta-cell">
<div className="meta-label">Version</div> <div className="meta-label">Version</div>
<div className="meta-value meta-mono"> <div className="meta-value meta-mono">
{formatEtiVersion(game.local_version ?? game.eti_game_version)} {formatEtiVersion(game.eti_game_version ?? game.local_version)}
</div> </div>
</div> </div>
<div className="meta-cell"> <div className="meta-cell">
@@ -108,9 +120,9 @@ export const GameDetailModal = ({
<p className="modal-desc">{description}</p> <p className="modal-desc">{description}</p>
)} )}
{game.status_message && ( {statusMessage && (
<p className={`modal-status${game.status_level === 'error' ? ' is-error' : ''}`}> <p className={`modal-status${statusLevel ? ` is-${statusLevel}` : ''}`}>
{game.status_message} {statusMessage}
</p> </p>
)} )}
@@ -1,5 +1,6 @@
import { useCallback } from 'react'; import { useCallback } from 'react';
import { invoke } from '@tauri-apps/api/core'; import { invoke } from '@tauri-apps/api/core';
import { ask } from '@tauri-apps/plugin-dialog';
import { type UseGamesResult } from './useGames'; import { type UseGamesResult } from './useGames';
import { type UISettings } from './useSettings'; import { type UISettings } from './useSettings';
@@ -69,6 +70,14 @@ export const useGameActions = (
const update = useCallback(async (id: string) => { const update = useCallback(async (id: string) => {
try { try {
const game = games.games.find(item => item.id === id);
if (game && game.active_outbound_transfers && game.active_outbound_transfers > 0) {
const confirmed = await ask(
`Peers are currently downloading this game from you. Updating will abort their downloads. Do you want to proceed?`,
{ title: 'Active Transfers in Progress', kind: 'warning' }
);
if (!confirmed) return;
}
const success = await invoke<boolean>('update_game', { const success = await invoke<boolean>('update_game', {
id, id,
language: settings.language, language: settings.language,
@@ -90,11 +99,19 @@ export const useGameActions = (
const removeDownload = useCallback(async (id: string) => { const removeDownload = useCallback(async (id: string) => {
try { try {
const game = games.games.find(item => item.id === id);
if (game && game.active_outbound_transfers && game.active_outbound_transfers > 0) {
const confirmed = await ask(
`Peers are currently downloading this game from you. Removing game files will abort their downloads. Do you want to proceed?`,
{ title: 'Active Transfers in Progress', kind: 'warning' }
);
if (!confirmed) return;
}
await invoke('remove_downloaded_game', { id }); await invoke('remove_downloaded_game', { id });
} catch (err) { } catch (err) {
console.error('remove_downloaded_game failed:', err); console.error('remove_downloaded_game failed:', err);
} }
}, []); }, [games]);
const cancelDownload = useCallback(async (id: string) => { const cancelDownload = useCallback(async (id: string) => {
try { try {
@@ -88,17 +88,30 @@ export const isUnavailable = (game: Game): boolean =>
&& game.peer_count === 0 && game.peer_count === 0
&& game.install_status === InstallStatus.NotInstalled; && game.install_status === InstallStatus.NotInstalled;
const parseVersionStamp = (version: string | undefined): number | null => {
if (!version || !/^\d{8}$/.test(version)) return null;
const parsed = parseInt(version, 10);
return Number.isNaN(parsed) ? null : parsed;
};
export const compareVersionStamps = (
left: string | undefined,
right: string | undefined,
): number | null => {
const parsedLeft = parseVersionStamp(left);
const parsedRight = parseVersionStamp(right);
if (parsedLeft === null || parsedRight === null) return null;
return parsedLeft - parsedRight;
};
export const hasNewerLocalVersion = (game: Game): boolean =>
(compareVersionStamps(game.local_version, game.eti_game_version) ?? 0) > 0;
export const needsUpdate = (game: Game): boolean => { export const needsUpdate = (game: Game): boolean => {
if (!game.installed) return false; if (!game.installed) return false;
const peer = game.eti_game_version; if (game.peer_count <= 0) return false;
const local = game.local_version; if (!game.local_version && game.eti_game_version) return true;
if (!local && peer) return true; return (compareVersionStamps(game.eti_game_version, game.local_version) ?? 0) > 0;
if (local && peer) {
const l = parseInt(local, 10);
const p = parseInt(peer, 10);
if (!Number.isNaN(l) && !Number.isNaN(p)) return p > l;
}
return false;
}; };
/** What pressing the card's main action button should do, given the state. */ /** What pressing the card's main action button should do, given the state. */
@@ -21,7 +21,7 @@ export enum ActiveOperationKind {
RemovingDownload = 'RemovingDownload', RemovingDownload = 'RemovingDownload',
} }
export type StatusLevel = 'info' | 'error'; export type StatusLevel = 'info' | 'warning' | 'error';
export interface DownloadProgress { export interface DownloadProgress {
downloaded_bytes: number; downloaded_bytes: number;
@@ -59,6 +59,7 @@ export interface Game {
download_progress?: DownloadProgress; download_progress?: DownloadProgress;
peer_count: number; peer_count: number;
can_host_server?: boolean; can_host_server?: boolean;
active_outbound_transfers?: number;
} }
export interface ActiveOperation { export interface ActiveOperation {
@@ -739,6 +739,12 @@
.card-status.is-error { .card-status.is-error {
color: #f87171; color: #f87171;
} }
.card-status.is-warning {
color: #fbbf24;
}
.card-status.is-info {
color: #60a5fa;
}
.density-compact .card-body { .density-compact .card-body {
padding: 9px 10px 10px; padding: 9px 10px 10px;
@@ -1383,6 +1389,16 @@
border-color: rgba(239, 68, 68, 0.4); border-color: rgba(239, 68, 68, 0.4);
background: rgba(239, 68, 68, 0.08); background: rgba(239, 68, 68, 0.08);
} }
.modal-status.is-warning {
color: #fbbf24;
border-color: rgba(245, 158, 11, 0.4);
background: rgba(245, 158, 11, 0.08);
}
.modal-status.is-info {
color: #60a5fa;
border-color: rgba(96, 165, 250, 0.4);
background: rgba(96, 165, 250, 0.08);
}
.modal-actions { .modal-actions {
display: flex; display: flex;
align-items: center; align-items: center;