Compare commits

..

5 Commits

Author SHA1 Message Date
ddidderr 06398fe298 fix(peer): reject transfer paths outside requested game
Inbound file-transfer requests carry both a game ID and a relative path. The
serve gate validated whether the requested game was currently servable, but it
did not require the path itself to be rooted under that same game. A
non-conforming peer could therefore register a guard for one game while asking
to read files from another game root.

Require normalized transfer paths to start with the requested game ID before the
file can be dispatched. This keeps the outbound transfer guard, serve policy,
and filesystem path aligned. Absolute, traversal, local-data, missing-sentinel,
active-operation, and wrong-version paths remain rejected by the existing gates.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #4
2026-05-30 16:36:59 +02:00
ddidderr 9b700c7e3f fix(peer): bound outbound transfer drain waits
Update and remove-download operations must wait for existing outbound readers to
release game files before mutating or deleting the game root. That wait was
unbounded, so a stuck transfer guard could leave the game permanently marked as
Updating or RemovingDownload and prevent the requested operation from ever
starting.

Return a structured begin-operation result and put a five-second timeout around
the drain wait. If the transfer count does not reach zero, the operation start
fails, the active-operation snapshot is cleared, and the caller emits the
normal failure event for the attempted operation. The destructive mutation is
not allowed to proceed after a timeout.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #3
2026-05-30 16:36:59 +02:00
ddidderr 7e40cf4bfb fix(ui): coalesce outbound transfer list refreshes
Every outbound transfer start and finish can arrive on a hot path while a peer
is serving many file chunks. The Tauri event handler used to rebuild and emit
the full games list for each edge, cloning all games and probing per-game server
script files repeatedly during an active serve.

Batch outbound-transfer count changes behind a short scheduled refresh. The
peer still records exact counts in shared state, and the delayed refresh reads
that state once per burst. A generation counter keeps changes that arrive while
an emit is already scheduled from being lost; they trigger one follow-up emit
with the latest counts.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #2
2026-05-30 16:36:59 +02:00
ddidderr f89ff9ceea fix(peer): reset cancelled outbound file streams
Cancelled outbound transfers previously returned from the streaming loop without
terminating the QUIC send half. A whole-file receiver relies on the stream
ending to distinguish EOF from an in-progress body, so cancellation could leave
it waiting on a truncated transfer until its own timeout fired.

Reset the send stream on every cancellation branch, including cancellation
while waiting for the final close acknowledgement. A reset is deliberately used
instead of a graceful close so truncated whole-file transfers cannot be
misinterpreted as a valid EOF.

Test Plan:
- just test
- just clippy
- git diff --check

Refs: Claude review finding #1
2026-05-30 16:36:58 +02:00
ddidderr 738095235f feat(peer): coordinate outbound transfers with local game mutations
Updating or removing a local game rewrites its on-disk files. Peers that
were mid-download of that game would keep streaming bytes from files that
are being deleted or replaced, handing them a corrupt or stale copy.
There was also no authoritative notion of which game version a peer
should serve or accept, so a peer could serve whatever happened to be on
disk and downloaders could aggregate files from peers running mismatched
versions.

This introduces a reader-writer coordination scheme between outbound file
transfers (readers) and local mutation operations (writers), and gates
both serving and downloading on an authoritative game catalog version.

Reader-writer coordination:
- Track active outbound transfers per game in a shared `OutboundTransfers`
  map of (id, CancellationToken), threaded through `Ctx`/`PeerCtx` and
  registered by a `TransferGuard` in the stream service. The guard is
  registered *before* the serve-eligibility check to close a TOCTOU window
  where a writer could miss an in-flight reader.
- `stream_file_bytes` now honors a cancellation token at every await point
  (file read, network send, stream close) via `tokio::select!`, so a
  transfer aborts promptly instead of hanging on a stalled receiver.
- `begin_operation` marks a game active first, then cancels its outbound
  transfers and waits for the count to reach zero before any
  Updating/RemovingDownload work touches the filesystem.
- Active games are now hidden from library snapshots entirely while an
  operation is in flight, instead of freezing their last announced state,
  so peers stop discovering a game that is being mutated.

Authoritative version catalog:
- Replace the `HashSet<String>` catalog with `GameCatalog`, mapping each
  game id to its expected version (from the bundled game.db / ETI data).
- Serving requires the local `version.ini` to match the catalog version
  (`local_download_matches_catalog`); peer selection, file aggregation,
  and majority size validation all filter on the expected version
  (`peers_with_expected_version`, `aggregated_game_files`, and friends).

User-visible changes:
- The GUI shows confirmation dialogs before Update and Remove, and
  surfaces a sharing-status indicator on game cards and the detail modal.
- A new `OutboundTransferCountChanged` event lets the UI reflect live
  outbound transfer activity.

Test Plan:
- just test
- just frontend-test
- just clippy
2026-05-30 16:36:58 +02:00
24 changed files with 1182 additions and 228 deletions
Generated
+1
View File
@@ -2217,6 +2217,7 @@ dependencies = [
"tauri-plugin-shell",
"tauri-plugin-store",
"tokio",
"tokio-util",
"walkdir",
"windows 0.62.2",
]
+2 -2
View File
@@ -57,14 +57,14 @@ impl From<EtiGame> for Game {
release_year: eti_game.game_release,
publisher: eti_game.game_publisher,
max_players: eti_game.game_maxplayers,
version: eti_game.game_version,
version: eti_game.game_version.clone(),
genre: eti_game.genre_de,
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
size: (eti_game.game_size * 1024.0 * 1024.0 * 1024.0) as u64,
downloaded: false,
installed: false,
availability: Availability::LocalOnly,
eti_game_version: None,
eti_game_version: Some(eti_game.game_version),
local_version: None,
peer_count: 0, // ETI games start with 0 peers until peer system discovers them
}
+55 -1
View File
@@ -78,7 +78,7 @@ pub struct Game {
/// Backend-reported availability state for this game's local or peer summary.
#[serde(default)]
pub availability: Availability,
/// ETI game version from version.ini (YYYYMMDD format) (server)
/// Authoritative ETI game version from the bundled game.db (YYYYMMDD format).
pub eti_game_version: Option<String>,
/// Local game version from version.ini (YYYYMMDD format)
pub local_version: Option<String>,
@@ -198,6 +198,60 @@ impl Default for GameDB {
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct GameCatalog {
expected_versions: HashMap<String, Option<String>>,
}
impl GameCatalog {
#[must_use]
pub fn empty() -> Self {
Self {
expected_versions: HashMap::new(),
}
}
#[must_use]
pub fn from_game_db(game_db: &GameDB) -> Self {
Self {
expected_versions: game_db
.games
.values()
.map(|game| (game.id.clone(), game.eti_game_version.clone()))
.collect(),
}
}
#[must_use]
pub fn from_ids(ids: impl IntoIterator<Item = String>) -> Self {
Self {
expected_versions: ids.into_iter().map(|id| (id, None)).collect(),
}
}
pub fn insert(&mut self, id: String, expected_version: Option<String>) {
self.expected_versions.insert(id, expected_version);
}
#[must_use]
pub fn contains<S>(&self, id: S) -> bool
where
S: AsRef<str>,
{
self.expected_versions.contains_key(id.as_ref())
}
#[must_use]
pub fn expected_version<S>(&self, id: S) -> Option<&str>
where
S: AsRef<str>,
{
self.expected_versions
.get(id.as_ref())
.and_then(Option::as_deref)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct GameFileDescription {
pub game_id: String,
+21 -15
View File
@@ -12,7 +12,7 @@ use std::{
use eyre::Context;
use lanspread_compat::eti::get_games;
use lanspread_db::db::{Game, GameFileDescription};
use lanspread_db::db::{Game, GameCatalog, GameFileDescription};
use lanspread_peer::{
ActiveOperation,
ActiveOperationKind,
@@ -30,6 +30,7 @@ use lanspread_peer::{
use lanspread_peer_cli::{
CliCommand,
CommandEnvelope,
DEFAULT_FIXTURE_VERSION,
ExternalUnrarUnpacker,
FixtureSeed,
FixtureUnpacker,
@@ -114,7 +115,7 @@ struct DownloadMeasurement {
struct SharedState {
state: RwLock<CliState>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<HashSet<String>>>,
catalog: Arc<RwLock<GameCatalog>>,
notify: Notify,
games_dir: PathBuf,
state_dir: PathBuf,
@@ -146,6 +147,7 @@ async fn main() -> eyre::Result<()> {
catalog.clone(),
PeerStartOptions {
state_dir: Some(args.state_dir.clone()),
active_outbound_transfers: None,
},
)?;
let sender = handle.sender();
@@ -303,15 +305,8 @@ async fn list_peers(shared: &SharedState) -> eyre::Result<Value> {
async fn list_games(shared: &SharedState) -> eyre::Result<Value> {
let state = shared.state.read().await;
let catalog = shared.catalog.read().await.clone();
let remote = shared
.peer_game_db
.read()
.await
.get_all_games()
.into_iter()
.filter(|game| catalog.contains(&game.id))
.collect::<Vec<_>>();
let catalog = shared.catalog.read().await;
let remote = shared.peer_game_db.read().await.get_catalog_games(&catalog);
Ok(json!({
"local": state.local_games.clone(),
"remote": remote,
@@ -434,6 +429,7 @@ async fn event_loop(
}
}
#[allow(clippy::too_many_lines)]
async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'static str, Value) {
match event {
PeerEvent::LocalPeerReady { peer_id, addr } => {
@@ -458,6 +454,7 @@ async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'s
state.local_games.clone_from(&games);
("local-library-changed", json!({ "games": games }))
}
PeerEvent::OutboundTransferCountChanged => ("outbound-transfer-count-changed", json!({})),
PeerEvent::ActiveOperationsChanged { active_operations } => {
let mut state = shared.state.write().await;
state.active_operations.clone_from(&active_operations);
@@ -668,18 +665,27 @@ fn seed_fixtures(game_dir: &Path, fixtures: &[String]) -> eyre::Result<Vec<Fixtu
.collect()
}
async fn load_catalog(catalog_db: Option<&Path>, fixtures: &[FixtureSeed]) -> HashSet<String> {
let mut catalog = HashSet::new();
async fn load_catalog(catalog_db: Option<&Path>, fixtures: &[FixtureSeed]) -> GameCatalog {
let mut catalog = GameCatalog::empty();
if let Some(path) = catalog_db
&& path.exists()
{
match get_games(path).await {
Ok(games) => catalog.extend(games.into_iter().map(|game| game.game_id)),
Ok(games) => {
for game in games {
catalog.insert(game.game_id, Some(game.game_version));
}
}
Err(err) => eprintln!("failed to load catalog db {}: {err}", path.display()),
}
}
catalog.extend(fixtures.iter().map(|seed| seed.game_id.clone()));
for seed in fixtures {
catalog.insert(
seed.game_id.clone(),
Some(DEFAULT_FIXTURE_VERSION.to_string()),
);
}
catalog
}
+14 -10
View File
@@ -1,18 +1,17 @@
//! Shared context types for the peer system.
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
path::PathBuf,
sync::Arc,
};
use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc};
use lanspread_db::db::GameDB;
use lanspread_db::db::{GameCatalog, GameDB};
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use crate::{PeerEvent, Unpacker, events, library::LocalLibraryState, peer_db::PeerGameDB};
/// Thread-safe map of active outbound file transfers grouped by game ID.
pub type OutboundTransfers = Arc<RwLock<HashMap<String, Vec<(u64, CancellationToken)>>>>;
/// Mutating filesystem operation currently in flight for a game root.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OperationKind {
@@ -40,10 +39,11 @@ pub struct Ctx {
pub active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
pub active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
pub unpacker: Arc<dyn Unpacker>,
pub catalog: Arc<RwLock<HashSet<String>>>,
pub catalog: Arc<RwLock<GameCatalog>>,
pub peer_id: Arc<String>,
pub shutdown: CancellationToken,
pub task_tracker: TaskTracker,
pub active_outbound_transfers: OutboundTransfers,
}
/// Context for peer connection handling.
@@ -55,11 +55,12 @@ pub struct PeerCtx {
pub local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
pub active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
pub peer_game_db: Arc<RwLock<PeerGameDB>>,
pub catalog: Arc<RwLock<HashSet<String>>>,
pub catalog: Arc<RwLock<GameCatalog>>,
pub peer_id: Arc<String>,
pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender<PeerEvent>,
pub shutdown: CancellationToken,
pub task_tracker: TaskTracker,
pub active_outbound_transfers: OutboundTransfers,
}
impl std::fmt::Debug for PeerCtx {
@@ -84,7 +85,8 @@ impl Ctx {
unpacker: Arc<dyn Unpacker>,
shutdown: CancellationToken,
task_tracker: TaskTracker,
catalog: Arc<RwLock<HashSet<String>>>,
catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: OutboundTransfers,
) -> Self {
Self {
game_dir: Arc::new(RwLock::new(game_dir)),
@@ -100,6 +102,7 @@ impl Ctx {
peer_id: Arc::new(peer_id),
shutdown,
task_tracker,
active_outbound_transfers,
}
}
@@ -120,6 +123,7 @@ impl Ctx {
tx_notify_ui,
shutdown: self.shutdown.clone(),
task_tracker: self.task_tracker.clone(),
active_outbound_transfers: self.active_outbound_transfers.clone(),
}
}
}
+6 -1
View File
@@ -2,6 +2,7 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use lanspread_db::db::GameCatalog;
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use crate::{
@@ -65,9 +66,13 @@ fn active_operation_kind(operation: OperationKind) -> ActiveOperationKind {
pub async fn emit_peer_game_list(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
catalog: &Arc<RwLock<GameCatalog>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
) {
let games = { peer_game_db.read().await.get_all_games() };
let games = {
let catalog = catalog.read().await;
peer_game_db.read().await.get_catalog_games(&catalog)
};
send(tx_notify_ui, PeerEvent::ListGames(games));
}
+282 -67
View File
@@ -6,6 +6,7 @@ use std::{
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use lanspread_db::db::{GameDB, GameFileDescription};
@@ -23,7 +24,7 @@ use crate::{
game_from_summary,
get_game_file_descriptions,
local_dir_is_directory,
local_download_available,
local_download_matches_catalog,
rescan_local_game,
scan_local_library,
version_ini_is_regular_file,
@@ -38,10 +39,13 @@ use crate::{
// Command handlers
// =============================================================================
const OUTBOUND_TRANSFER_DRAIN_POLL_INTERVAL: Duration = Duration::from_millis(10);
const OUTBOUND_TRANSFER_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
/// Handles the `ListGames` command.
pub async fn handle_list_games_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
log::info!("ListGames command received");
events::emit_peer_game_list(&ctx.peer_game_db, tx_notify_ui).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, tx_notify_ui).await;
}
/// Tries to serve a game from local files.
@@ -54,7 +58,7 @@ async fn try_serve_local_game(
let active_operations = ctx.active_operations.read().await;
let catalog = ctx.catalog.read().await;
if !local_download_available(&game_dir, id, &active_operations, &catalog).await {
if !local_download_matches_catalog(&game_dir, id, &active_operations, &catalog).await {
return false;
}
drop(active_operations);
@@ -90,9 +94,10 @@ pub(crate) async fn handle_get_game_command(
}
log::info!("Requesting game from peers: {id}");
let expected_version = catalog_expected_version(ctx, &id).await;
let peers = {
let peer_game_db = ctx.peer_game_db.read().await;
source.select_peers(&peer_game_db, &id)
source.select_peers(&peer_game_db, &id, expected_version.as_deref())
};
if peers.is_empty() {
log::warn!("No peers have game {id}");
@@ -107,6 +112,7 @@ pub(crate) async fn handle_get_game_command(
ctx.task_tracker.spawn(fetch_game_details_from_peers(
peers,
id,
expected_version,
peer_game_db,
tx_notify_ui,
|peer_addr, game_id, peer_game_db| async move {
@@ -126,10 +132,16 @@ impl GameDetailSource {
matches!(self, Self::LocalOrPeers)
}
fn select_peers(self, peer_game_db: &PeerGameDB, id: &str) -> Vec<SocketAddr> {
fn select_peers(
self,
peer_game_db: &PeerGameDB,
id: &str,
expected_version: Option<&str>,
) -> Vec<SocketAddr> {
match self {
Self::LocalOrPeers => peer_game_db.peers_with_game(id),
Self::LatestPeersOnly => peer_game_db.peers_with_latest_version(id),
Self::LocalOrPeers | Self::LatestPeersOnly => {
peer_game_db.peers_with_expected_version(id, expected_version)
}
}
}
}
@@ -154,6 +166,7 @@ async fn request_game_details_and_update(
async fn fetch_game_details_from_peers<F, Fut>(
peers: Vec<SocketAddr>,
id: String,
expected_version: Option<String>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
mut fetch_details: F,
@@ -175,7 +188,12 @@ async fn fetch_game_details_from_peers<F, Fut>(
}
if fetched_any {
let aggregated_files = { peer_game_db.read().await.aggregated_game_files(&id) };
let aggregated_files = {
peer_game_db
.read()
.await
.aggregated_game_files(&id, expected_version.as_deref())
};
if let Err(e) = tx_notify_ui.send(PeerEvent::GotGameFiles {
id: id.clone(),
@@ -210,6 +228,7 @@ pub async fn handle_download_game_files_command(
}
let games_folder = { ctx.game_dir.read().await.clone() };
let expected_version = catalog_expected_version(ctx, &id).await;
// Use majority validation to get trusted file descriptions and peer whitelist
let (validated_descriptions, peer_whitelist, file_peer_map) = {
@@ -217,7 +236,7 @@ pub async fn handle_download_game_files_command(
.peer_game_db
.read()
.await
.validate_file_sizes_majority(&id)
.validate_file_sizes_majority(&id, expected_version.as_deref())
{
Ok((files, peers, file_peer_map)) => {
log::info!(
@@ -260,7 +279,7 @@ pub async fn handle_download_game_files_command(
let local_dl_available = {
let active_operations = ctx.active_operations.read().await;
let catalog = ctx.catalog.read().await;
local_download_available(&games_folder, &id, &active_operations, &catalog).await
local_download_matches_catalog(&games_folder, &id, &active_operations, &catalog).await
};
if peer_whitelist.is_empty() {
@@ -289,9 +308,17 @@ pub async fn handle_download_game_files_command(
return;
}
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
log::warn!("Operation for {id} already in progress; ignoring new download request");
return;
match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Downloading).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring new download request");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before downloading {id}");
send_download_failed(tx_notify_ui, &id);
return;
}
}
let active_operations = ctx.active_operations.clone();
@@ -476,9 +503,20 @@ async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
return;
};
if !begin_operation(ctx, tx_notify_ui, &id, prepared.operation_kind).await {
log::warn!("Operation for {id} already in progress; ignoring install command");
return;
match begin_operation(ctx, tx_notify_ui, &id, prepared.operation_kind).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring install command");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before install/update of {id}");
events::send(
tx_notify_ui,
PeerEvent::InstallGameFailed { id: id.clone() },
);
return;
}
}
run_started_install_operation(ctx, tx_notify_ui, id, prepared).await;
@@ -601,9 +639,20 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerE
return;
}
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::Uninstalling).await {
log::warn!("Operation for {id} already in progress; ignoring uninstall command");
return;
match begin_operation(ctx, tx_notify_ui, &id, OperationKind::Uninstalling).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring uninstall command");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before uninstall of {id}");
events::send(
tx_notify_ui,
PeerEvent::UninstallGameFailed { id: id.clone() },
);
return;
}
}
let game_root = { ctx.game_dir.read().await.join(&id) };
@@ -663,9 +712,20 @@ async fn run_remove_downloaded_operation(
return;
}
if !begin_operation(ctx, tx_notify_ui, &id, OperationKind::RemovingDownload).await {
log::warn!("Operation for {id} already in progress; ignoring downloaded-file removal");
return;
match begin_operation(ctx, tx_notify_ui, &id, OperationKind::RemovingDownload).await {
BeginOperationResult::Started => {}
BeginOperationResult::AlreadyActive => {
log::warn!("Operation for {id} already in progress; ignoring downloaded-file removal");
return;
}
BeginOperationResult::DrainTimedOut => {
log::error!("Timed out waiting for outbound transfers before removal of {id}");
events::send(
tx_notify_ui,
PeerEvent::RemoveDownloadedGameFailed { id: id.clone() },
);
return;
}
}
let game_dir = { ctx.game_dir.read().await.clone() };
@@ -715,12 +775,36 @@ async fn run_remove_downloaded_operation(
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum BeginOperationResult {
Started,
AlreadyActive,
DrainTimedOut,
}
async fn begin_operation(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
operation: OperationKind,
) -> bool {
) -> BeginOperationResult {
begin_operation_with_drain_timeout(
ctx,
tx_notify_ui,
id,
operation,
OUTBOUND_TRANSFER_DRAIN_TIMEOUT,
)
.await
}
async fn begin_operation_with_drain_timeout(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
id: &str,
operation: OperationKind,
drain_timeout: Duration,
) -> BeginOperationResult {
let started = {
let mut active_operations = ctx.active_operations.write().await;
match active_operations.entry(id.to_string()) {
@@ -732,11 +816,70 @@ async fn begin_operation(
}
};
if started {
events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await;
if !started {
return BeginOperationResult::AlreadyActive;
}
started
events::emit_active_operations(&ctx.active_operations, tx_notify_ui).await;
if operation_requires_outbound_drain(operation)
&& !cancel_and_wait_for_outbound_transfers(ctx, id, drain_timeout).await
{
end_operation(ctx, tx_notify_ui, id).await;
return BeginOperationResult::DrainTimedOut;
}
BeginOperationResult::Started
}
fn operation_requires_outbound_drain(operation: OperationKind) -> bool {
operation == OperationKind::Updating || operation == OperationKind::RemovingDownload
}
async fn cancel_and_wait_for_outbound_transfers(
ctx: &Ctx,
id: &str,
drain_timeout: Duration,
) -> bool {
let mut tokens_to_cancel = Vec::new();
{
let active = ctx.active_outbound_transfers.read().await;
if let Some(transfers) = active.get(id) {
for (_, token) in transfers {
tokens_to_cancel.push(token.clone());
}
}
}
for token in tokens_to_cancel {
token.cancel();
}
let drained = tokio::time::timeout(drain_timeout, async {
loop {
let count = {
let active = ctx.active_outbound_transfers.read().await;
active.get(id).map_or(0, Vec::len)
};
if count == 0 {
break;
}
tokio::time::sleep(OUTBOUND_TRANSFER_DRAIN_POLL_INTERVAL).await;
}
})
.await
.is_ok();
if !drained {
let count = {
let active = ctx.active_outbound_transfers.read().await;
active.get(id).map_or(0, Vec::len)
};
log::error!(
"Timed out after {drain_timeout:?} waiting for {count} outbound transfer(s) to drain for {id}"
);
}
drained
}
async fn transition_download_to_install(
@@ -818,6 +961,14 @@ async fn catalog_contains(ctx: &Ctx, id: &str) -> bool {
ctx.catalog.read().await.contains(id)
}
async fn catalog_expected_version(ctx: &Ctx, id: &str) -> Option<String> {
ctx.catalog
.read()
.await
.expected_version(id)
.map(ToOwned::to_owned)
}
/// Handles the `SetGameDir` command.
pub async fn handle_set_game_dir_command(
ctx: &Ctx,
@@ -1008,13 +1159,8 @@ async fn update_and_announce_games_with_policy(
active_operation_ids.remove(id);
}
if !active_operation_ids.is_empty() {
let previous = ctx.local_library.read().await.games.clone();
for id in &active_operation_ids {
if let Some(summary) = previous.get(id.as_str()) {
summaries.insert(id.clone(), summary.clone());
} else {
summaries.remove(id);
}
summaries.remove(id);
}
game_db = GameDB::from(summaries.values().map(game_from_summary).collect());
}
@@ -1068,13 +1214,14 @@ async fn update_and_announce_games_with_policy(
#[cfg(test)]
mod tests {
use std::{
collections::HashSet,
collections::HashMap,
net::SocketAddr,
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::Duration,
};
use lanspread_db::db::GameCatalog;
use lanspread_proto::{Availability, GameSummary};
use tokio::sync::mpsc;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -1115,7 +1262,8 @@ mod tests {
Arc::new(FakeUnpacker),
CancellationToken::new(),
TaskTracker::new(),
Arc::new(RwLock::new(HashSet::from(["game".to_string()]))),
Arc::new(RwLock::new(GameCatalog::from_ids(["game".to_string()]))),
Arc::new(RwLock::new(HashMap::new())),
)
}
@@ -1220,7 +1368,7 @@ mod tests {
}
#[test]
fn update_source_selects_latest_ready_peer_manifest() {
fn update_source_selects_expected_ready_peer_manifest() {
let old_addr = addr(12_000);
let new_addr = addr(12_001);
let local_only_addr = addr(12_002);
@@ -1242,13 +1390,13 @@ mod tests {
);
assert_eq!(
GameDetailSource::LatestPeersOnly.select_peers(&db, "game"),
GameDetailSource::LatestPeersOnly.select_peers(&db, "game", Some("20250101")),
vec![new_addr]
);
}
#[tokio::test]
async fn update_fetch_emits_fresh_manifest_from_latest_peer() {
async fn update_fetch_emits_fresh_manifest_from_expected_peer() {
let old_addr = addr(12_010);
let new_addr = addr(12_011);
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
@@ -1267,33 +1415,40 @@ mod tests {
}
let peers = {
let db = peer_game_db.read().await;
GameDetailSource::LatestPeersOnly.select_peers(&db, "game")
GameDetailSource::LatestPeersOnly.select_peers(&db, "game", Some("20250101"))
};
let (tx, mut rx) = mpsc::unbounded_channel();
let fetched_peers = Arc::new(Mutex::new(Vec::new()));
fetch_game_details_from_peers(peers, "game".to_string(), peer_game_db.clone(), tx, {
let fetched_peers = fetched_peers.clone();
move |peer_addr, game_id, peer_game_db| {
fetch_game_details_from_peers(
peers,
"game".to_string(),
Some("20250101".to_string()),
peer_game_db.clone(),
tx,
{
let fetched_peers = fetched_peers.clone();
async move {
fetched_peers
.lock()
.expect("fetched peer list should not be poisoned")
.push(peer_addr);
let files = vec![
file_desc(&game_id, "game/version.ini", 8),
file_desc(&game_id, "game/new.eti", 11),
];
peer_game_db.write().await.update_peer_game_files(
&"new".to_string(),
&game_id,
files.clone(),
);
Ok(files)
move |peer_addr, game_id, peer_game_db| {
let fetched_peers = fetched_peers.clone();
async move {
fetched_peers
.lock()
.expect("fetched peer list should not be poisoned")
.push(peer_addr);
let files = vec![
file_desc(&game_id, "game/version.ini", 8),
file_desc(&game_id, "game/new.eti", 11),
];
peer_game_db.write().await.update_peer_game_files(
&"new".to_string(),
&game_id,
files.clone(),
);
Ok(files)
}
}
}
})
},
)
.await;
assert_eq!(
@@ -1314,7 +1469,7 @@ mod tests {
file_descriptions
.iter()
.any(|desc| desc.relative_path == "game/new.eti" && desc.size == 11),
"latest peer manifest should be emitted to the download path"
"expected-version peer manifest should be emitted to the download path"
);
}
@@ -1329,6 +1484,7 @@ mod tests {
fetch_game_details_from_peers(
vec![first_addr, second_addr],
"game".to_string(),
Some("20250101".to_string()),
peer_game_db,
tx.clone(),
{
@@ -1362,7 +1518,7 @@ mod tests {
#[tokio::test]
async fn update_request_skips_local_manifest_even_when_download_exists() {
let temp = TempDir::new("lanspread-handler-latest-peer");
let temp = TempDir::new("lanspread-handler-expected-peer");
let root = temp.game_root();
write_file(&root.join("version.ini"), b"20240101");
write_file(&root.join("game.eti"), b"old archive");
@@ -1385,23 +1541,37 @@ mod tests {
}
#[tokio::test]
async fn local_library_scan_freezes_active_game_state() {
let temp = TempDir::new("lanspread-handler-active-freeze");
async fn local_library_scan_hides_active_game_state() {
let temp = TempDir::new("lanspread-handler-active-hide");
let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive");
let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
let catalog = ctx.catalog.read().await.clone();
// 1. Initial scan: the game is ready and announced
let scan = scan_local_library(temp.path(), ctx.state_dir.as_ref(), &catalog)
.await
.expect("scan should succeed");
update_and_announce_games(&ctx, &tx, scan).await;
let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else {
panic!("expected LocalLibraryChanged");
};
assert_eq!(games.len(), 1);
assert_eq!(games[0].id, "game");
// 2. Set the game as active/in-progress and scan again
ctx.active_operations
.write()
.await
.insert("game".to_string(), OperationKind::Installing);
let (tx, mut rx) = mpsc::unbounded_channel();
let catalog = ctx.catalog.read().await.clone();
let scan = scan_local_library(temp.path(), ctx.state_dir.as_ref(), &catalog)
.await
.expect("scan should succeed");
update_and_announce_games(&ctx, &tx, scan).await;
let PeerEvent::LocalLibraryChanged { games } = recv_event(&mut rx).await else {
@@ -1409,7 +1579,7 @@ mod tests {
};
assert!(
games.is_empty(),
"active game should keep its previous announced state"
"active game should be hidden/unannounced during operations"
);
}
@@ -1423,7 +1593,10 @@ mod tests {
let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
assert!(begin_operation(&ctx, &tx, "game", OperationKind::Updating).await);
assert_eq!(
begin_operation(&ctx, &tx, "game", OperationKind::Updating).await,
BeginOperationResult::Started
);
assert_active_update(
recv_event(&mut rx).await,
vec![ActiveOperation {
@@ -1433,6 +1606,48 @@ mod tests {
);
}
#[tokio::test]
async fn begin_operation_timeout_clears_active_operation_snapshot() {
let temp = TempDir::new("lanspread-handler-active-drain-timeout");
let root = temp.game_root();
write_file(&root.join("version.ini"), b"20250101");
write_file(&root.join("game.eti"), b"archive");
let ctx = test_ctx(temp.path().to_path_buf());
let (tx, mut rx) = mpsc::unbounded_channel();
let token = CancellationToken::new();
ctx.active_outbound_transfers
.write()
.await
.insert("game".to_string(), vec![(1, token.clone())]);
assert_eq!(
begin_operation_with_drain_timeout(
&ctx,
&tx,
"game",
OperationKind::Updating,
Duration::from_millis(1),
)
.await,
BeginOperationResult::DrainTimedOut
);
assert!(token.is_cancelled());
assert_active_update(
recv_event(&mut rx).await,
vec![ActiveOperation {
id: "game".to_string(),
operation: ActiveOperationKind::Updating,
}],
);
assert_active_update(recv_event(&mut rx).await, Vec::new());
assert!(
!ctx.active_operations.read().await.contains_key("game"),
"timed-out drain should not leave the operation stuck active"
);
}
#[tokio::test]
async fn unchanged_settled_scan_is_not_reemitted() {
let temp = TempDir::new("lanspread-handler-settled-unchanged");
+17 -6
View File
@@ -39,12 +39,12 @@ mod test_support;
// Public re-exports
// =============================================================================
use std::{collections::HashSet, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT};
pub use error::PeerError;
pub use install::{UnpackFuture, Unpacker};
use lanspread_db::db::{Game, GameFileDescription};
use lanspread_db::db::{Game, GameCatalog, GameFileDescription};
pub use migration::{MigrationReport, migrate_legacy_state};
pub use peer_db::{
MajorityValidationResult,
@@ -153,6 +153,8 @@ pub enum PeerEvent {
PeerCountUpdated(usize),
/// The local library contents changed after a scan.
LocalLibraryChanged { games: Vec<Game> },
/// The number of active outbound transfers changed.
OutboundTransferCountChanged,
/// The set of in-progress local operations changed.
ActiveOperationsChanged {
active_operations: Vec<ActiveOperation>,
@@ -262,6 +264,7 @@ pub enum PeerCommand {
pub struct PeerStartOptions {
/// Directory used for peer identity and other state.
pub state_dir: Option<PathBuf>,
pub active_outbound_transfers: Option<crate::context::OutboundTransfers>,
}
// =============================================================================
@@ -286,7 +289,7 @@ pub fn start_peer(
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<HashSet<String>>>,
catalog: Arc<RwLock<GameCatalog>>,
) -> eyre::Result<PeerRuntimeHandle> {
start_peer_with_options(
game_dir,
@@ -305,12 +308,17 @@ pub fn start_peer_with_options(
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<HashSet<String>>>,
catalog: Arc<RwLock<GameCatalog>>,
options: PeerStartOptions,
) -> eyre::Result<PeerRuntimeHandle> {
let PeerStartOptions { state_dir } = options;
let PeerStartOptions {
state_dir,
active_outbound_transfers,
} = options;
let state_dir = resolve_state_dir(state_dir.as_deref());
let game_dir = game_dir.into();
let active_outbound_transfers = active_outbound_transfers
.unwrap_or_else(|| Arc::new(RwLock::new(std::collections::HashMap::new())));
log::info!(
"Starting peer system with game directory: {}",
game_dir.display()
@@ -329,6 +337,7 @@ pub fn start_peer_with_options(
state_dir,
unpacker,
catalog,
active_outbound_transfers,
))
}
@@ -344,7 +353,8 @@ async fn run_peer(
unpacker: Arc<dyn Unpacker>,
shutdown: CancellationToken,
task_tracker: TaskTracker,
catalog: Arc<RwLock<HashSet<String>>>,
catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: crate::context::OutboundTransfers,
) -> eyre::Result<()> {
let ctx = Ctx::new(
peer_game_db,
@@ -355,6 +365,7 @@ async fn run_peer(
shutdown,
task_tracker,
catalog,
active_outbound_transfers,
);
if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await {
log::error!("Failed to load initial local game database: {err}");
+66 -14
View File
@@ -9,7 +9,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use lanspread_db::db::{Game, GameDB, GameFileDescription};
use lanspread_db::db::{Game, GameCatalog, GameDB, GameFileDescription};
use lanspread_proto::{Availability, GameSummary};
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, sync::Mutex};
@@ -51,7 +51,7 @@ pub async fn local_download_available(
game_dir: &Path,
game_id: &str,
active_operations: &HashMap<String, OperationKind>,
catalog: &HashSet<String>,
catalog: &GameCatalog,
) -> bool {
if !catalog.contains(game_id) {
log::debug!("Not serving game {game_id} locally because it is not in the catalog");
@@ -67,6 +67,40 @@ pub async fn local_download_available(
version_ini_is_regular_file(game_path.as_path()).await
}
/// Checks if a local game may be served to peers under the authoritative catalog version.
pub async fn local_download_matches_catalog(
game_dir: &Path,
game_id: &str,
active_operations: &HashMap<String, OperationKind>,
catalog: &GameCatalog,
) -> bool {
if !local_download_available(game_dir, game_id, active_operations, catalog).await {
return false;
}
let Some(expected_version) = catalog.expected_version(game_id) else {
return true;
};
let game_path = game_dir.join(game_id);
match lanspread_db::db::read_version_from_ini(&game_path) {
Ok(Some(local_version)) if local_version == expected_version => true,
Ok(Some(local_version)) => {
log::debug!(
"Not serving game {game_id}: local version.ini {local_version} does not match catalog {expected_version}"
);
false
}
Ok(None) => false,
Err(err) => {
log::warn!(
"Not serving game {game_id}: failed to read local version.ini for catalog comparison: {err}"
);
false
}
}
}
// =============================================================================
// Local library index and scanning
// =============================================================================
@@ -468,7 +502,7 @@ struct IndexUpdate {
async fn update_index_for_game(
game_root: &Path,
game_id: &str,
catalog: &HashSet<String>,
catalog: &GameCatalog,
index: &mut LibraryIndex,
) -> eyre::Result<IndexUpdate> {
if !catalog.contains(game_id) {
@@ -557,7 +591,7 @@ fn scan_from_index(index: &LibraryIndex) -> LocalLibraryScan {
pub async fn scan_local_library(
game_dir: impl AsRef<Path>,
state_dir: impl AsRef<Path>,
catalog: &HashSet<String>,
catalog: &GameCatalog,
) -> eyre::Result<LocalLibraryScan> {
let game_path = game_dir.as_ref();
let state_path = state_dir.as_ref();
@@ -645,7 +679,7 @@ pub async fn scan_local_library(
pub async fn rescan_local_game(
game_dir: impl AsRef<Path>,
state_dir: impl AsRef<Path>,
catalog: &HashSet<String>,
catalog: &GameCatalog,
game_id: &str,
) -> eyre::Result<LocalLibraryScan> {
let game_path = game_dir.as_ref();
@@ -682,10 +716,7 @@ pub async fn get_game_file_descriptions(
#[cfg(test)]
mod tests {
use std::{
collections::{HashMap, HashSet},
path::Path,
};
use std::{collections::HashMap, path::Path};
use lanspread_proto::Availability;
@@ -776,7 +807,7 @@ mod tests {
async fn scan_uses_version_ini_and_local_dir_as_independent_state() {
let temp = TempDir::new("lanspread-local-games");
let state = TempDir::new("lanspread-local-games-state");
let catalog = HashSet::from([
let catalog = GameCatalog::from_ids([
"ready".to_string(),
"local-only".to_string(),
"eti-only".to_string(),
@@ -830,7 +861,7 @@ mod tests {
async fn rescan_promotes_installed_only_game_to_ready_when_sentinel_appears() {
let temp = TempDir::new("lanspread-local-games");
let state = TempDir::new("lanspread-local-games-state");
let catalog = HashSet::from(["game".to_string()]);
let catalog = GameCatalog::from_ids(["game".to_string()]);
std::fs::create_dir_all(temp.path().join("game").join("local"))
.expect("local install dir should be created");
@@ -864,7 +895,7 @@ mod tests {
async fn concurrent_rescans_preserve_both_index_updates() {
let temp = TempDir::new("lanspread-local-games-concurrent");
let state = TempDir::new("lanspread-local-games-state");
let catalog = HashSet::from(["game-a".to_string(), "game-b".to_string()]);
let catalog = GameCatalog::from_ids(["game-a".to_string(), "game-b".to_string()]);
write_file(&temp.path().join("game-a").join("version.ini"), b"20250101");
write_file(&temp.path().join("game-b").join("version.ini"), b"20250101");
@@ -909,7 +940,7 @@ mod tests {
let game_root = temp.path().join("game");
write_file(&game_root.join("version.ini"), b"20250101");
let catalog = HashSet::from(["game".to_string()]);
let catalog = GameCatalog::from_ids(["game".to_string()]);
let no_operations = HashMap::new();
assert!(local_download_available(temp.path(), "game", &no_operations, &catalog).await);
@@ -917,8 +948,29 @@ mod tests {
assert!(!local_download_available(temp.path(), "game", &active_operations, &catalog).await);
assert!(
!local_download_available(temp.path(), "game", &no_operations, &HashSet::new()).await
!local_download_available(temp.path(), "game", &no_operations, &GameCatalog::empty())
.await
);
assert!(!local_download_available(temp.path(), "missing", &no_operations, &catalog).await);
}
#[tokio::test]
async fn local_download_matches_catalog_requires_expected_version() {
let temp = TempDir::new("lanspread-local-games");
let game_root = temp.path().join("game");
write_file(&game_root.join("version.ini"), b"20260101");
let mut catalog = GameCatalog::empty();
catalog.insert("game".to_string(), Some("20250101".to_string()));
let no_operations = HashMap::new();
assert!(
!local_download_matches_catalog(temp.path(), "game", &no_operations, &catalog).await
);
catalog.insert("game".to_string(), Some("20260101".to_string()));
assert!(
local_download_matches_catalog(temp.path(), "game", &no_operations, &catalog).await
);
}
}
+84 -9
View File
@@ -4,6 +4,7 @@ use bytes::Bytes;
use lanspread_db::db::GameFileDescription;
use lanspread_utils::maybe_addr;
use s2n_quic::{
application,
connection,
stream::{Error as StreamError, SendStream},
};
@@ -14,12 +15,24 @@ use tokio::{
use crate::{config::FILE_TRANSFER_BUFFER_SIZE, path_validation::validate_game_file_path};
fn cancel_send_stream(tx: &mut SendStream, remote_addr: impl std::fmt::Display, path: &Path) {
// Reset instead of finishing so truncated whole-file transfers cannot look like EOF.
if let Err(err) = tx.reset(application::Error::UNKNOWN) {
log::debug!(
"{remote_addr} failed to reset cancelled transfer for {}: {err}",
path.display()
);
}
}
#[allow(clippy::too_many_lines)]
async fn stream_file_bytes(
tx: &mut SendStream,
base_dir: &Path,
relative_path: &str,
offset: u64,
length: Option<u64>,
cancel_token: tokio_util::sync::CancellationToken,
) -> eyre::Result<()> {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
@@ -45,13 +58,34 @@ async fn stream_file_bytes(
let mut buf = vec![0u8; FILE_TRANSFER_BUFFER_SIZE];
while remaining > 0 {
if cancel_token.is_cancelled() {
log::info!(
"{remote_addr} transfer cancelled for {}",
validated_path.display()
);
cancel_send_stream(tx, remote_addr, &validated_path);
return Err(eyre::eyre!("File transfer cancelled by user"));
}
let read_len = std::cmp::min(remaining, buf.len() as u64);
let read_len: usize = read_len.try_into().unwrap_or(usize::MAX);
if read_len == 0 {
break;
}
let bytes_read = file.read(&mut buf[..read_len]).await?;
let bytes_read = tokio::select! {
() = cancel_token.cancelled() => {
log::info!(
"{remote_addr} transfer cancelled for {}",
validated_path.display()
);
cancel_send_stream(tx, remote_addr, &validated_path);
return Err(eyre::eyre!("File transfer cancelled by user"));
}
res = file.read(&mut buf[..read_len]) => {
res?
}
};
if bytes_read == 0 {
if !expect_exact {
transfer_complete = true;
@@ -59,7 +93,19 @@ async fn stream_file_bytes(
break;
}
tx.send(Bytes::copy_from_slice(&buf[..bytes_read])).await?;
tokio::select! {
() = cancel_token.cancelled() => {
log::info!(
"{remote_addr} transfer cancelled for {}",
validated_path.display()
);
cancel_send_stream(tx, remote_addr, &validated_path);
return Err(eyre::eyre!("File transfer cancelled by user"));
}
res = tx.send(Bytes::copy_from_slice(&buf[..bytes_read])) => {
res?;
}
}
remaining = remaining.saturating_sub(bytes_read as u64);
total_bytes += bytes_read as u64;
@@ -97,12 +143,21 @@ async fn stream_file_bytes(
validated_path.display()
);
match tx.close().await {
Ok(()) => {}
Err(err) if transfer_complete && is_clean_remote_close(&err) => {
log::debug!("{remote_addr} closed stream after transfer completion: {err}");
tokio::select! {
() = cancel_token.cancelled() => {
log::info!("{remote_addr} transfer cancelled while closing stream");
cancel_send_stream(tx, remote_addr, &validated_path);
return Err(eyre::eyre!("File transfer cancelled by user"));
}
res = tx.close() => {
match res {
Ok(()) => {}
Err(err) if transfer_complete && is_clean_remote_close(&err) => {
log::debug!("{remote_addr} closed stream after transfer completion: {err}");
}
Err(err) => return Err(err.into()),
}
}
Err(err) => return Err(err.into()),
}
Ok(())
}
@@ -121,8 +176,18 @@ pub async fn send_game_file_data(
game_file_desc: &GameFileDescription,
tx: &mut SendStream,
game_dir: &Path,
cancel_token: tokio_util::sync::CancellationToken,
) {
if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await {
if let Err(e) = stream_file_bytes(
tx,
game_dir,
&game_file_desc.relative_path,
0,
None,
cancel_token,
)
.await
{
let remote_addr = maybe_addr!(tx.connection().remote_addr());
log::error!(
"{remote_addr} failed to stream file {}: {e}",
@@ -138,8 +203,18 @@ pub async fn send_game_file_chunk(
length: u64,
tx: &mut SendStream,
game_dir: &Path,
cancel_token: tokio_util::sync::CancellationToken,
) {
if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await {
if let Err(e) = stream_file_bytes(
tx,
game_dir,
relative_path,
offset,
Some(length),
cancel_token,
)
.await
{
let remote_addr = maybe_addr!(tx.connection().remote_addr());
log::error!(
"{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}"
+141 -9
View File
@@ -7,7 +7,7 @@ use std::{
time::{Duration, Instant},
};
use lanspread_db::db::{Availability, Game, GameFileDescription};
use lanspread_db::db::{Availability, Game, GameCatalog, GameFileDescription};
use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot};
use crate::library::compute_library_digest;
@@ -357,6 +357,54 @@ impl PeerGameDB {
games
}
/// Returns catalog games aggregated from peers that advertise the expected catalog version.
#[must_use]
pub fn get_catalog_games(&self, catalog: &GameCatalog) -> Vec<Game> {
let mut aggregated: HashMap<String, Game> = HashMap::new();
let mut peer_counts: HashMap<String, u32> = HashMap::new();
for peer in self.peers.values() {
for game in peer.games.values().filter(|game| {
catalog.contains(&game.id)
&& game_matches_expected_version(game, catalog.expected_version(&game.id))
}) {
*peer_counts.entry(game.id.clone()).or_insert(0) += 1;
}
}
for peer in self.peers.values() {
for game in peer.games.values().filter(|game| {
catalog.contains(&game.id)
&& game_matches_expected_version(game, catalog.expected_version(&game.id))
}) {
aggregated
.entry(game.id.clone())
.and_modify(|existing| {
existing.peer_count = *peer_counts.get(&game.id).unwrap_or(&0);
if game.size > existing.size {
existing.size = game.size;
}
existing.set_downloaded(true);
if game.installed {
existing.installed = true;
}
})
.or_insert_with(|| {
let mut game_clone = summary_to_game(game);
if let Some(expected_version) = catalog.expected_version(&game.id) {
game_clone.eti_game_version = Some(expected_version.to_string());
}
game_clone.peer_count = *peer_counts.get(&game.id).unwrap_or(&0);
game_clone
});
}
}
let mut games: Vec<Game> = aggregated.into_values().collect();
games.sort_by(|a, b| a.name.cmp(&b.name));
games
}
/// Returns the latest version of a game across all peers.
#[must_use]
pub fn get_latest_version_for_game(&self, game_id: &str) -> Option<String> {
@@ -451,6 +499,24 @@ impl PeerGameDB {
.collect()
}
/// Returns addresses of peers that have the expected catalog version of a game.
#[must_use]
pub fn peers_with_expected_version(
&self,
game_id: &str,
expected_version: Option<&str>,
) -> Vec<SocketAddr> {
self.peers
.iter()
.filter(|(_, peer)| {
peer.games
.get(game_id)
.is_some_and(|game| game_matches_expected_version(game, expected_version))
})
.map(|(_, peer)| peer.addr)
.collect()
}
/// Returns addresses of peers that have the latest version of a game.
#[must_use]
pub fn peers_with_latest_version(&self, game_id: &str) -> Vec<SocketAddr> {
@@ -514,11 +580,33 @@ impl PeerGameDB {
.collect()
}
/// Returns file descriptions from peers that advertise the expected catalog version.
#[must_use]
pub fn expected_version_game_files_for(
&self,
game_id: &str,
expected_version: Option<&str>,
) -> Vec<(SocketAddr, Vec<GameFileDescription>)> {
let expected_peers = self.peers_with_expected_version(game_id, expected_version);
if expected_peers.is_empty() {
return Vec::new();
}
self.game_files_for(game_id)
.into_iter()
.filter(|(addr, _)| expected_peers.contains(addr))
.collect()
}
/// Returns aggregated file descriptions for a game across all peers.
#[must_use]
pub fn aggregated_game_files(&self, game_id: &str) -> Vec<GameFileDescription> {
pub fn aggregated_game_files(
&self,
game_id: &str,
expected_version: Option<&str>,
) -> Vec<GameFileDescription> {
let mut seen: HashMap<String, GameFileDescription> = HashMap::new();
for (_, files) in self.latest_game_files_for(game_id) {
for (_, files) in self.expected_version_game_files_for(game_id, expected_version) {
for file in files {
seen.entry(file.relative_path.clone()).or_insert(file);
}
@@ -559,8 +647,9 @@ impl PeerGameDB {
pub fn validate_file_sizes_majority(
&self,
game_id: &str,
expected_version: Option<&str>,
) -> eyre::Result<MajorityValidationResult> {
let game_files = self.latest_game_files_for(game_id);
let game_files = self.expected_version_game_files_for(game_id, expected_version);
if game_files.is_empty() {
return Ok((Vec::new(), Vec::new(), HashMap::new()));
}
@@ -813,6 +902,14 @@ fn game_is_ready(summary: &GameSummary) -> bool {
summary.availability == Availability::Ready
}
fn game_matches_expected_version(summary: &GameSummary, expected_version: Option<&str>) -> bool {
if !game_is_ready(summary) {
return false;
}
expected_version.is_none_or(|expected| summary.eti_version.as_deref() == Some(expected))
}
fn summary_to_game(summary: &GameSummary) -> Game {
let eti_game_version = game_is_ready(summary)
.then(|| summary.eti_version.clone())
@@ -925,6 +1022,41 @@ mod tests {
assert!(db.peers_with_latest_version("game").is_empty());
}
#[test]
fn catalog_aggregation_counts_only_expected_version_peers() {
let old_addr = addr(12003);
let expected_addr = addr(12004);
let newer_addr = addr(12005);
let mut db = PeerGameDB::new();
db.upsert_peer("old".to_string(), old_addr);
db.upsert_peer("expected".to_string(), expected_addr);
db.upsert_peer("newer".to_string(), newer_addr);
db.update_peer_games(
&"old".to_string(),
vec![summary("game", "20240101", Availability::Ready)],
);
db.update_peer_games(
&"expected".to_string(),
vec![summary("game", "20250101", Availability::Ready)],
);
db.update_peer_games(
&"newer".to_string(),
vec![summary("game", "20260101", Availability::Ready)],
);
let mut catalog = GameCatalog::empty();
catalog.insert("game".to_string(), Some("20250101".to_string()));
let games = db.get_catalog_games(&catalog);
assert_eq!(games.len(), 1);
assert_eq!(games[0].peer_count, 1);
assert_eq!(games[0].eti_game_version.as_deref(), Some("20250101"));
assert_eq!(
db.peers_with_expected_version("game", Some("20250101")),
vec![expected_addr]
);
}
#[test]
fn transport_addr_matches_known_peer_on_ephemeral_port() {
let advertised = ip_addr([10, 66, 0, 2], 40000);
@@ -979,7 +1111,7 @@ mod tests {
}
#[test]
fn validation_uses_latest_version_file_metadata() {
fn validation_uses_expected_version_file_metadata() {
let old_addr = addr(12003);
let new_addr = addr(12004);
let mut db = PeerGameDB::new();
@@ -1010,21 +1142,21 @@ mod tests {
],
);
let aggregated = db.aggregated_game_files("game");
let aggregated = db.aggregated_game_files("game", Some("20250101"));
let archive = aggregated
.iter()
.find(|desc| desc.relative_path == "game/archive.eti")
.expect("latest archive should be present");
.expect("expected-version archive should be present");
assert_eq!(archive.size, 20);
let (validated, peers, file_peer_map) = db
.validate_file_sizes_majority("game")
.validate_file_sizes_majority("game", Some("20250101"))
.expect("old-version file metadata should not create ambiguity");
assert_eq!(peers, vec![new_addr]);
let archive = validated
.iter()
.find(|desc| desc.relative_path == "game/archive.eti")
.expect("latest archive should validate");
.expect("expected-version archive should validate");
assert_eq!(archive.size, 20);
assert_eq!(file_peer_map.get("game/archive.eti"), Some(&vec![new_addr]));
}
@@ -2,6 +2,7 @@
use std::{net::SocketAddr, sync::Arc};
use lanspread_db::db::GameCatalog;
use lanspread_proto::{Hello, HelloAck, PROTOCOL_VERSION};
use tokio::sync::{RwLock, mpsc::UnboundedSender};
@@ -22,6 +23,7 @@ pub(crate) struct HandshakeCtx {
local_library: Arc<RwLock<LocalLibraryState>>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
tx_notify_ui: UnboundedSender<PeerEvent>,
catalog: Arc<RwLock<GameCatalog>>,
}
impl HandshakeCtx {
@@ -32,6 +34,7 @@ impl HandshakeCtx {
local_library: ctx.local_library.clone(),
peer_game_db: ctx.peer_game_db.clone(),
tx_notify_ui: tx_notify_ui.clone(),
catalog: ctx.catalog.clone(),
}
}
@@ -42,6 +45,7 @@ impl HandshakeCtx {
local_library: ctx.local_library.clone(),
peer_game_db: ctx.peer_game_db.clone(),
tx_notify_ui: ctx.tx_notify_ui.clone(),
catalog: ctx.catalog.clone(),
}
}
}
@@ -121,7 +125,7 @@ pub(crate) async fn perform_handshake_with_peer(
.await;
after_peer_library_recorded(&ctx, upsert, record_addr).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await;
Ok(())
}
@@ -156,7 +160,7 @@ pub(super) async fn accept_inbound_hello(
.await;
after_peer_library_recorded(&handshake_ctx, upsert, addr).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await;
build_hello_ack(ctx).await
}
@@ -201,12 +205,13 @@ async fn after_peer_library_recorded(
#[cfg(test)]
mod tests {
use std::{
collections::{HashMap, HashSet},
collections::HashMap,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
};
use lanspread_db::db::GameCatalog;
use lanspread_proto::{Availability, GameSummary, Hello, LibrarySnapshot, PROTOCOL_VERSION};
use tokio::sync::{RwLock, mpsc};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -242,6 +247,7 @@ mod tests {
local_library: Arc::new(RwLock::new(LocalLibraryState::empty())),
peer_game_db,
tx_notify_ui,
catalog: Arc::new(RwLock::new(GameCatalog::empty())),
}
}
@@ -301,6 +307,8 @@ mod tests {
#[tokio::test]
async fn inbound_hello_applies_remote_library_snapshot() {
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
let mut catalog = GameCatalog::empty();
catalog.insert("remote-game".to_string(), Some("20250101".to_string()));
let ctx = Ctx::new(
peer_game_db.clone(),
"local-peer".to_string(),
@@ -309,7 +317,8 @@ mod tests {
Arc::new(NoopUnpacker),
CancellationToken::new(),
TaskTracker::new(),
Arc::new(RwLock::new(HashSet::new())),
Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(HashMap::new())),
);
*ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000));
+12 -2
View File
@@ -2,6 +2,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use lanspread_db::db::GameCatalog;
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -18,6 +19,7 @@ use crate::{
pub async fn run_ping_service(
tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<GameCatalog>>,
active_operations: Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: Arc<RwLock<HashMap<String, CancellationToken>>>,
shutdown: CancellationToken,
@@ -40,6 +42,7 @@ pub async fn run_ping_service(
ping_idle_peers(
&peer_game_db,
&catalog,
&active_operations,
&active_downloads,
&tx_notify_ui,
@@ -50,6 +53,7 @@ pub async fn run_ping_service(
prune_stale_peers(
&peer_game_db,
&catalog,
&active_operations,
&active_downloads,
&tx_notify_ui,
@@ -60,6 +64,7 @@ pub async fn run_ping_service(
async fn ping_idle_peers(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
catalog: &Arc<RwLock<GameCatalog>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
@@ -75,6 +80,7 @@ async fn ping_idle_peers(
let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = peer_game_db.clone();
let catalog = catalog.clone();
let active_operations = active_operations.clone();
let active_downloads = active_downloads.clone();
let shutdown = shutdown.clone();
@@ -93,6 +99,7 @@ async fn ping_idle_peers(
log::warn!("Peer {peer_addr} failed ping check");
remove_peer_and_refresh(
&peer_game_db,
&catalog,
&active_operations,
&active_downloads,
&tx_notify_ui,
@@ -105,6 +112,7 @@ async fn ping_idle_peers(
log::error!("Failed to ping peer {peer_addr}: {err}");
remove_peer_and_refresh(
&peer_game_db,
&catalog,
&active_operations,
&active_downloads,
&tx_notify_ui,
@@ -120,6 +128,7 @@ async fn ping_idle_peers(
async fn prune_stale_peers(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
catalog: &Arc<RwLock<GameCatalog>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
@@ -137,7 +146,7 @@ async fn prune_stale_peers(
}
if removed_any {
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await;
events::emit_peer_game_list(peer_game_db, catalog, tx_notify_ui).await;
handle_active_downloads_without_peers(
peer_game_db,
active_operations,
@@ -150,6 +159,7 @@ async fn prune_stale_peers(
async fn remove_peer_and_refresh(
peer_game_db: &Arc<RwLock<PeerGameDB>>,
catalog: &Arc<RwLock<GameCatalog>>,
active_operations: &Arc<RwLock<HashMap<String, OperationKind>>>,
active_downloads: &Arc<RwLock<HashMap<String, CancellationToken>>>,
tx_notify_ui: &UnboundedSender<PeerEvent>,
@@ -157,7 +167,7 @@ async fn remove_peer_and_refresh(
log_label: &str,
) {
if remove_peer(peer_game_db, tx_notify_ui, peer_id, log_label).await {
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await;
events::emit_peer_game_list(peer_game_db, catalog, tx_notify_ui).await;
handle_active_downloads_without_peers(
peer_game_db,
active_operations,
@@ -336,12 +336,12 @@ fn should_ignore_game_child(name: &str) -> bool {
#[cfg(test)]
mod tests {
use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use lanspread_db::db::GameCatalog;
use notify::{
EventKind,
event::{AccessKind, AccessMode},
@@ -373,7 +373,7 @@ mod tests {
std::fs::write(path, bytes).expect("file should be written");
}
fn test_ctx(game_dir: PathBuf, catalog: HashSet<String>) -> Ctx {
fn test_ctx(game_dir: PathBuf, catalog: GameCatalog) -> Ctx {
Ctx::new(
Arc::new(RwLock::new(PeerGameDB::new())),
"peer".to_string(),
@@ -383,6 +383,7 @@ mod tests {
CancellationToken::new(),
TaskTracker::new(),
Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(std::collections::HashMap::new())),
)
}
@@ -445,7 +446,7 @@ mod tests {
let temp = TempDir::new("lanspread-local-monitor");
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
GameCatalog::from_ids(["game".to_string()]),
);
ctx.active_operations
.write()
@@ -480,7 +481,7 @@ mod tests {
write_file(&temp.path().join("game").join("version.ini"), b"20250101");
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
GameCatalog::from_ids(["game".to_string()]),
);
let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel();
@@ -515,7 +516,7 @@ mod tests {
write_file(&game_root.join("version.ini"), b"20250101");
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
GameCatalog::from_ids(["game".to_string()]),
);
let gate = RescanGate::default();
let (tx, mut rx) = mpsc::unbounded_channel();
@@ -551,7 +552,7 @@ mod tests {
write_file(&temp.path().join("game").join("version.ini"), b"20250101");
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
GameCatalog::from_ids(["game".to_string()]),
);
let (tx, mut rx) = mpsc::unbounded_channel();
@@ -575,7 +576,7 @@ mod tests {
);
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from(["game".to_string()]),
GameCatalog::from_ids(["game".to_string()]),
);
let (tx, mut rx) = mpsc::unbounded_channel();
+162 -25
View File
@@ -12,7 +12,7 @@ use crate::{
context::PeerCtx,
error::PeerError,
events,
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available},
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_matches_catalog},
peer::{send_game_file_chunk, send_game_file_data},
services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync},
};
@@ -162,7 +162,7 @@ async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelt
};
if applied {
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await;
} else {
let addr = {
let db = ctx.peer_game_db.read().await;
@@ -209,7 +209,7 @@ async fn get_game_response(ctx: &PeerCtx, id: String) -> Response {
async fn can_serve_game(ctx: &PeerCtx, game_dir: &std::path::Path, game_id: &str) -> bool {
let active_operations = ctx.active_operations.read().await;
let catalog = ctx.catalog.read().await;
local_download_available(game_dir, game_id, &active_operations, &catalog).await
local_download_matches_catalog(game_dir, game_id, &active_operations, &catalog).await
}
async fn can_dispatch_file_transfer(
@@ -218,10 +218,23 @@ async fn can_dispatch_file_transfer(
game_id: &str,
relative_path: &str,
) -> bool {
!path_points_inside_local(game_id, relative_path)
relative_path_belongs_to_game(game_id, relative_path)
&& !path_points_inside_local(game_id, relative_path)
&& can_serve_game(ctx, game_dir, game_id).await
}
fn relative_path_belongs_to_game(game_id: &str, relative_path: &str) -> bool {
let normalised = relative_path.replace('\\', "/");
if normalised.starts_with('/') {
return false;
}
normalised
.split('/')
.find(|part| !part.is_empty())
.is_some_and(|first| first == game_id)
}
fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool {
let normalised = relative_path.replace('\\', "/");
let mut parts = normalised.split('/').filter(|part| !part.is_empty());
@@ -232,6 +245,67 @@ fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool {
}
}
use std::sync::atomic::{AtomicU64, Ordering};
static NEXT_TRANSFER_ID: AtomicU64 = AtomicU64::new(1);
struct TransferGuard {
game_id: String,
id: u64,
active_outbound_transfers: crate::context::OutboundTransfers,
tx_notify_ui: tokio::sync::mpsc::UnboundedSender<crate::PeerEvent>,
}
impl TransferGuard {
async fn new(
game_id: String,
active_outbound_transfers: crate::context::OutboundTransfers,
tx_notify_ui: tokio::sync::mpsc::UnboundedSender<crate::PeerEvent>,
shutdown: &tokio_util::sync::CancellationToken,
) -> (Self, tokio_util::sync::CancellationToken) {
let id = NEXT_TRANSFER_ID.fetch_add(1, Ordering::SeqCst);
let token = shutdown.child_token();
{
let mut active = active_outbound_transfers.write().await;
active
.entry(game_id.clone())
.or_default()
.push((id, token.clone()));
}
let _ = tx_notify_ui.send(crate::PeerEvent::OutboundTransferCountChanged);
(
Self {
game_id,
id,
active_outbound_transfers,
tx_notify_ui,
},
token,
)
}
}
impl Drop for TransferGuard {
fn drop(&mut self) {
let game_id = self.game_id.clone();
let id = self.id;
let active_outbound_transfers = self.active_outbound_transfers.clone();
let tx_notify_ui = self.tx_notify_ui.clone();
tokio::spawn(async move {
{
let mut active = active_outbound_transfers.write().await;
if let Some(tokens) = active.get_mut(&game_id) {
tokens.retain(|(tid, _)| *tid != id);
if tokens.is_empty() {
active.remove(&game_id);
}
}
}
let _ = tx_notify_ui.send(crate::PeerEvent::OutboundTransferCountChanged);
});
}
}
async fn handle_file_data_request(
ctx: &PeerCtx,
desc: GameFileDescription,
@@ -242,6 +316,14 @@ async fn handle_file_data_request(
desc.relative_path
);
let (guard, cancel_token) = TransferGuard::new(
desc.game_id.clone(),
ctx.active_outbound_transfers.clone(),
ctx.tx_notify_ui.clone(),
&ctx.shutdown,
)
.await;
let mut tx = framed_tx.into_inner();
let game_dir = ctx.game_dir.read().await.clone();
if !can_dispatch_file_transfer(ctx, &game_dir, &desc.game_id, &desc.relative_path).await {
@@ -249,11 +331,13 @@ async fn handle_file_data_request(
"Declining GetGameFileData for {} because the game is not currently transferable",
desc.relative_path
);
drop(guard);
let _ = tx.close().await;
return FramedWrite::new(tx, LengthDelimitedCodec::new());
}
send_game_file_data(&desc, &mut tx, &game_dir).await;
send_game_file_data(&desc, &mut tx, &game_dir, cancel_token).await;
drop(guard);
FramedWrite::new(tx, LengthDelimitedCodec::new())
}
@@ -269,17 +353,36 @@ async fn handle_file_chunk_request(
"Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
);
let (guard, cancel_token) = TransferGuard::new(
game_id.clone(),
ctx.active_outbound_transfers.clone(),
ctx.tx_notify_ui.clone(),
&ctx.shutdown,
)
.await;
let mut tx = framed_tx.into_inner();
let game_dir = ctx.game_dir.read().await.clone();
if !can_dispatch_file_transfer(ctx, &game_dir, &game_id, &relative_path).await {
log::info!(
"Declining GetGameFileChunk for {relative_path} because the game is not currently transferable"
);
drop(guard);
let _ = tx.close().await;
return FramedWrite::new(tx, LengthDelimitedCodec::new());
}
send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &game_dir).await;
send_game_file_chunk(
&game_id,
&relative_path,
offset,
length,
&mut tx,
&game_dir,
cancel_token,
)
.await;
drop(guard);
FramedWrite::new(tx, LengthDelimitedCodec::new())
}
@@ -289,17 +392,17 @@ async fn handle_goodbye(ctx: &PeerCtx, _remote_addr: Option<SocketAddr>, peer_id
let Some(peer) = removed else { return };
events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, peer.addr).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.catalog, &ctx.tx_notify_ui).await;
}
#[cfg(test)]
mod tests {
use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::Arc,
};
use lanspread_db::db::GameCatalog;
use tokio::sync::{RwLock, mpsc};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -327,7 +430,7 @@ mod tests {
std::fs::write(path, bytes).expect("file should be written");
}
fn test_ctx(game_dir: PathBuf, catalog: HashSet<String>) -> PeerCtx {
fn test_ctx(game_dir: PathBuf, catalog: GameCatalog) -> PeerCtx {
let (tx_notify_ui, _rx) = mpsc::unbounded_channel();
Ctx::new(
Arc::new(RwLock::new(PeerGameDB::new())),
@@ -338,6 +441,7 @@ mod tests {
CancellationToken::new(),
TaskTracker::new(),
Arc::new(RwLock::new(catalog)),
Arc::new(RwLock::new(std::collections::HashMap::new())),
)
.to_peer_ctx(tx_notify_ui)
}
@@ -351,6 +455,19 @@ mod tests {
assert!(!path_points_inside_local("game", "game/archive.eti"));
}
#[test]
fn transferable_paths_must_belong_to_requested_game() {
assert!(relative_path_belongs_to_game("game", "game/version.ini"));
assert!(relative_path_belongs_to_game("game", "game\\archive.eti"));
assert!(!relative_path_belongs_to_game("game", "other/archive.eti"));
assert!(!relative_path_belongs_to_game("game", "archive.eti"));
assert!(!relative_path_belongs_to_game("game", "/game/archive.eti"));
assert!(!relative_path_belongs_to_game(
"game",
"../game/archive.eti"
));
}
#[tokio::test]
async fn get_game_response_respects_serve_gates() {
let temp = TempDir::new("lanspread-stream");
@@ -360,17 +477,19 @@ mod tests {
b"20250101",
);
write_file(&temp.path().join("active").join("version.ini"), b"20250101");
write_file(
&temp.path().join("wrong-version").join("version.ini"),
b"20260101",
);
std::fs::create_dir_all(temp.path().join("missing-sentinel"))
.expect("missing sentinel root should be created");
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from([
"ready".to_string(),
"active".to_string(),
"missing-sentinel".to_string(),
]),
);
let mut catalog = GameCatalog::empty();
catalog.insert("ready".to_string(), Some("20250101".to_string()));
catalog.insert("active".to_string(), Some("20250101".to_string()));
catalog.insert("missing-sentinel".to_string(), Some("20250101".to_string()));
catalog.insert("wrong-version".to_string(), Some("20250101".to_string()));
let ctx = test_ctx(temp.path().to_path_buf(), catalog);
ctx.active_operations
.write()
.await
@@ -388,6 +507,10 @@ mod tests {
get_game_response(&ctx, "active".to_string()).await,
Response::GameNotFound(id) if id == "active"
));
assert!(matches!(
get_game_response(&ctx, "wrong-version".to_string()).await,
Response::GameNotFound(id) if id == "wrong-version"
));
assert!(matches!(
get_game_response(&ctx, "missing-sentinel".to_string()).await,
Response::GameNotFound(id) if id == "missing-sentinel"
@@ -403,23 +526,28 @@ mod tests {
b"20250101",
);
write_file(&temp.path().join("active").join("version.ini"), b"20250101");
write_file(
&temp.path().join("wrong-version").join("version.ini"),
b"20260101",
);
std::fs::create_dir_all(temp.path().join("missing-sentinel"))
.expect("missing sentinel root should be created");
let ctx = test_ctx(
temp.path().to_path_buf(),
HashSet::from([
"ready".to_string(),
"active".to_string(),
"missing-sentinel".to_string(),
]),
);
let mut catalog = GameCatalog::empty();
catalog.insert("ready".to_string(), Some("20250101".to_string()));
catalog.insert("active".to_string(), Some("20250101".to_string()));
catalog.insert("missing-sentinel".to_string(), Some("20250101".to_string()));
catalog.insert("wrong-version".to_string(), Some("20250101".to_string()));
let ctx = test_ctx(temp.path().to_path_buf(), catalog);
ctx.active_operations
.write()
.await
.insert("active".to_string(), OperationKind::Downloading);
assert!(can_dispatch_file_transfer(&ctx, temp.path(), "ready", "ready/version.ini").await);
assert!(
!can_dispatch_file_transfer(&ctx, temp.path(), "ready", "active/version.ini").await
);
assert!(
!can_dispatch_file_transfer(
&ctx,
@@ -432,6 +560,15 @@ mod tests {
assert!(
!can_dispatch_file_transfer(&ctx, temp.path(), "active", "active/version.ini").await
);
assert!(
!can_dispatch_file_transfer(
&ctx,
temp.path(),
"wrong-version",
"wrong-version/version.ini",
)
.await
);
assert!(
!can_dispatch_file_transfer(
&ctx,
+7 -1
View File
@@ -11,6 +11,7 @@ use std::{
};
use futures::FutureExt as _;
use lanspread_db::db::GameCatalog;
use tokio::sync::{
RwLock,
mpsc::{UnboundedReceiver, UnboundedSender},
@@ -84,7 +85,8 @@ pub(crate) fn spawn_peer_runtime(
game_dir: PathBuf,
state_dir: PathBuf,
unpacker: Arc<dyn Unpacker>,
catalog: Arc<RwLock<std::collections::HashSet<String>>>,
catalog: Arc<RwLock<GameCatalog>>,
active_outbound_transfers: crate::context::OutboundTransfers,
) -> PeerRuntimeHandle {
let shutdown = CancellationToken::new();
let task_tracker = TaskTracker::new();
@@ -104,6 +106,7 @@ pub(crate) fn spawn_peer_runtime(
runtime_shutdown.clone(),
runtime_tracker.clone(),
catalog,
active_outbound_transfers,
)
.await
{
@@ -190,6 +193,7 @@ fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEv
fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = ctx.peer_game_db.clone();
let catalog = ctx.catalog.clone();
let active_operations = ctx.active_operations.clone();
let active_downloads = ctx.active_downloads.clone();
let shutdown = ctx.shutdown.clone();
@@ -207,6 +211,7 @@ fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
move || {
let tx_notify_ui = tx_notify_ui.clone();
let peer_game_db = peer_game_db.clone();
let catalog = catalog.clone();
let active_operations = active_operations.clone();
let active_downloads = active_downloads.clone();
let shutdown = shutdown.clone();
@@ -215,6 +220,7 @@ fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
run_ping_service(
tx_notify_ui,
peer_game_db,
catalog,
active_operations,
active_downloads,
shutdown,
@@ -44,6 +44,7 @@ tauri-plugin-shell = { workspace = true }
tauri-plugin-dialog = { workspace = true }
tauri-plugin-store = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
walkdir = { workspace = true }
[target.'cfg(windows)'.dependencies]
@@ -3,12 +3,12 @@ use std::{
net::SocketAddr,
path::{Component, Path, PathBuf},
sync::{Arc, OnceLock},
time::{SystemTime, UNIX_EPOCH},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use eyre::bail;
use lanspread_compat::eti::get_games;
use lanspread_db::db::{Availability, Game, GameDB, GameFileDescription};
use lanspread_db::db::{Availability, Game, GameCatalog, GameDB, GameFileDescription};
use lanspread_peer::{
ActiveOperation,
ActiveOperationKind,
@@ -31,6 +31,42 @@ use tokio::sync::{
// Learn more about Tauri commands at https://tauri.app/develop/calling-rust/
type OutboundTransfers =
Arc<RwLock<std::collections::HashMap<String, Vec<(u64, tokio_util::sync::CancellationToken)>>>>;
const OUTBOUND_TRANSFER_EMIT_DEBOUNCE: Duration = Duration::from_millis(100);
#[derive(Default)]
struct OutboundTransferEmitState {
scheduled: bool,
generation: u64,
}
impl OutboundTransferEmitState {
fn record_change(&mut self) -> bool {
self.generation = self.generation.saturating_add(1);
if self.scheduled {
return false;
}
self.scheduled = true;
true
}
fn observed_generation(&self) -> u64 {
self.generation
}
fn finish_emit(&mut self, observed_generation: u64) -> bool {
if self.generation != observed_generation {
return true;
}
self.scheduled = false;
false
}
}
/// Tauri-managed runtime state shared by commands and setup tasks.
#[derive(Default)]
struct LanSpreadState {
@@ -40,9 +76,11 @@ struct LanSpreadState {
active_operations: Arc<RwLock<HashMap<String, UiOperationKind>>>,
games_folder: Arc<RwLock<String>>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
catalog: Arc<RwLock<HashSet<String>>>,
catalog: Arc<RwLock<GameCatalog>>,
unpack_logs: Arc<RwLock<Vec<UnpackLogEntry>>>,
state_dir: OnceLock<PathBuf>,
active_outbound_transfers: OutboundTransfers,
outbound_transfer_emit: Arc<RwLock<OutboundTransferEmitState>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -79,6 +117,7 @@ struct LauncherGame {
#[serde(flatten)]
game: Game,
can_host_server: bool,
active_outbound_transfers: usize,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
@@ -829,6 +868,24 @@ fn apply_peer_local_games(game_db: &mut GameDB, local_games: &[Game]) {
}
}
fn apply_peer_remote_games(game_db: &mut GameDB, peer_games: Vec<Game>) {
// Peer events update availability, but catalog metadata stays anchored to game.db.
for game in game_db.games.values_mut() {
game.peer_count = 0;
}
for peer_game in peer_games {
if let Some(existing) = game_db.get_mut_game_by_id(&peer_game.id) {
existing.peer_count = peer_game.peer_count;
} else {
log::debug!(
"Peer advertised unknown game {id}; ignoring because game.db is ground truth",
id = peer_game.id
);
}
}
}
fn clear_all_local_game_states(game_db: &mut GameDB) {
for game in game_db.games.values_mut() {
clear_local_game_state(game);
@@ -847,17 +904,24 @@ async fn emit_games_list(app_handle: &AppHandle) {
return;
}
let active_transfers = state.active_outbound_transfers.read().await;
let games_to_emit = game_db
.all_games()
.into_iter()
.cloned()
.map(|game| LauncherGame {
can_host_server: game_can_host_server(&games_folder, &game),
game,
.map(|game| {
let active_outbound_transfers = active_transfers.get(&game.id).map_or(0, Vec::len);
LauncherGame {
can_host_server: game_can_host_server(&games_folder, &game),
active_outbound_transfers,
game,
}
})
.collect::<Vec<LauncherGame>>();
drop(game_db);
drop(active_transfers);
let active_operations = {
let active_operations = state.active_operations.read().await;
@@ -996,36 +1060,7 @@ async fn update_game_db(games: Vec<Game>, app: AppHandle) {
{
let mut game_db = state.games.write().await;
// Reset peer counts up front. Presence/metadata stay anchored to the baked game.db.
for game in game_db.games.values_mut() {
game.peer_count = 0;
}
for peer_game in games {
if let Some(existing) = game_db.get_mut_game_by_id(&peer_game.id) {
existing.peer_count = peer_game.peer_count;
if let Some(peer_version) = &peer_game.eti_game_version {
match &existing.eti_game_version {
Some(current_version) if current_version >= peer_version => {}
_ => {
existing.eti_game_version = Some(peer_version.clone());
log::debug!(
"Updated eti_game_version for {} to {} based on peer data",
peer_game.id,
peer_version
);
}
}
}
} else {
log::debug!(
"Peer advertised unknown game {id}; ignoring because game.db is ground truth",
id = peer_game.id
);
}
}
apply_peer_remote_games(&mut game_db, games);
}
emit_games_list(&app).await;
@@ -1399,7 +1434,7 @@ async fn ensure_bundled_game_db_loaded(app_handle: &AppHandle) {
if needs_load {
let game_db = load_bundled_game_db(app_handle).await;
let catalog = game_db.games.keys().cloned().collect::<HashSet<_>>();
let catalog = GameCatalog::from_game_db(&game_db);
*state.games.write().await = game_db;
*state.catalog.write().await = catalog;
}
@@ -1432,6 +1467,7 @@ async fn ensure_peer_started(app_handle: &AppHandle, games_folder: &Path) {
state.catalog.clone(),
PeerStartOptions {
state_dir: Some(state_dir),
active_outbound_transfers: Some(state.active_outbound_transfers.clone()),
},
) {
Ok(handle) => {
@@ -1469,6 +1505,44 @@ fn spawn_peer_event_loop(app_handle: AppHandle, mut rx_peer_event: UnboundedRece
});
}
async fn schedule_outbound_transfer_emit(app_handle: &AppHandle) {
let state = app_handle.state::<LanSpreadState>();
let should_spawn = {
let mut emit_state = state.outbound_transfer_emit.write().await;
emit_state.record_change()
};
if !should_spawn {
return;
}
let app_handle = app_handle.clone();
tauri::async_runtime::spawn(async move {
loop {
tokio::time::sleep(OUTBOUND_TRANSFER_EMIT_DEBOUNCE).await;
let observed_generation = {
let state = app_handle.state::<LanSpreadState>();
state
.outbound_transfer_emit
.read()
.await
.observed_generation()
};
emit_games_list(&app_handle).await;
let needs_follow_up_emit = {
let state = app_handle.state::<LanSpreadState>();
let mut emit_state = state.outbound_transfer_emit.write().await;
emit_state.finish_emit(observed_generation)
};
if !needs_follow_up_emit {
break;
}
}
});
}
#[allow(clippy::too_many_lines)]
async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
match event {
@@ -1495,6 +1569,10 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
}
emit_games_list(app_handle).await;
}
PeerEvent::OutboundTransferCountChanged => {
log::info!("PeerEvent::OutboundTransferCountChanged received");
schedule_outbound_transfer_emit(app_handle).await;
}
PeerEvent::GotGameFiles {
id,
file_descriptions,
@@ -1747,6 +1825,33 @@ mod tests {
}
}
fn eti_game_fixture(game_id: &str, game_version: &str) -> lanspread_compat::eti::EtiGame {
lanspread_compat::eti::EtiGame {
game_id: game_id.to_string(),
game_title: "Catalog Game".to_string(),
game_key: "catalog-game".to_string(),
game_release: "2000".to_string(),
game_publisher: "publisher".to_string(),
game_size: 1.0,
game_readme_de: "description".to_string(),
game_readme_en: "description".to_string(),
game_readme_fr: "description".to_string(),
game_maxplayers: 4,
game_master_req: 0,
genre_de: "genre".to_string(),
game_version: game_version.to_string(),
}
}
#[test]
fn eti_game_conversion_uses_catalog_version_as_authoritative_eti_version() {
let game = Game::from(eti_game_fixture("alpha", "20200721"));
assert_eq!(game.version, "20200721");
assert_eq!(game.eti_game_version.as_deref(), Some("20200721"));
assert_eq!(game.local_version, None);
}
#[test]
fn terminal_log_cleanup_preserves_crlf_and_collapses_redrawn_lines() {
let input = "Extracting foo 10%\rExtracting foo 80%\rExtracting foo OK\r\nAll done\r\n";
@@ -1901,6 +2006,32 @@ mod tests {
);
}
#[test]
fn outbound_transfer_emit_state_coalesces_bursts_without_losing_updates() {
let mut state = OutboundTransferEmitState::default();
assert!(
state.record_change(),
"first change should schedule an emit"
);
assert_eq!(state.observed_generation(), 1);
assert!(
!state.record_change(),
"second change should reuse the scheduled emit"
);
assert_eq!(state.observed_generation(), 2);
assert!(
state.finish_emit(1),
"a generation observed before the latest change needs a follow-up emit"
);
assert!(
!state.finish_emit(2),
"the latest observed generation clears the scheduled emit"
);
assert!(state.record_change(), "a later burst should schedule again");
}
#[test]
fn game_file_viewer_ids_must_be_single_path_components() {
assert!(is_single_component_game_id("game"));
@@ -2048,6 +2179,42 @@ mod tests {
assert!(game_db.get_game_by_id("unknown").is_none());
}
#[test]
fn peer_remote_snapshot_updates_counts_without_overwriting_catalog_version() {
let mut alpha = game_fixture("alpha", "Catalog Alpha");
alpha.size = 999;
alpha.eti_game_version = Some("20200721".to_string());
let mut beta = game_fixture("beta", "Catalog Beta");
beta.peer_count = 2;
beta.eti_game_version = Some("20200101".to_string());
let mut game_db = GameDB::from(vec![alpha, beta]);
let mut peer_alpha = game_fixture("alpha", "Peer Alpha");
peer_alpha.size = 42;
peer_alpha.peer_count = 3;
peer_alpha.eti_game_version = Some("20990101".to_string());
let mut unknown = game_fixture("unknown", "Unknown");
unknown.peer_count = 1;
unknown.eti_game_version = Some("20990101".to_string());
apply_peer_remote_games(&mut game_db, vec![peer_alpha, unknown]);
let alpha = game_db.get_game_by_id("alpha").expect("alpha remains");
assert_eq!(alpha.name, "Catalog Alpha");
assert_eq!(alpha.size, 999);
assert_eq!(alpha.peer_count, 3);
assert_eq!(alpha.eti_game_version.as_deref(), Some("20200721"));
let beta = game_db.get_game_by_id("beta").expect("beta remains");
assert_eq!(beta.peer_count, 0);
assert_eq!(beta.eti_game_version.as_deref(), Some("20200101"));
assert!(game_db.get_game_by_id("unknown").is_none());
}
}
#[allow(clippy::missing_panics_doc)]
@@ -3,6 +3,7 @@ import { JSX, KeyboardEvent } from 'react';
import { Game } from '../../lib/types';
import { CoverAspect } from '../../hooks/useSettings';
import { formatBytes } from '../../lib/format';
import { hasNewerLocalVersion } from '../../lib/gameState';
import { GameCover } from './GameCover';
import { StateChip } from '../StateChip';
@@ -42,6 +43,14 @@ export const GameCard = ({
onOpen(game);
}
};
const newerThanExpected = hasNewerLocalVersion(game);
const hasOutbound = game.active_outbound_transfers !== undefined && game.active_outbound_transfers > 0;
const statusMessage = hasOutbound
? `Sharing to ${game.active_outbound_transfers} peer${game.active_outbound_transfers === 1 ? '' : 's'}`
: (game.status_message ?? (newerThanExpected ? 'Newer than expected' : ''));
const statusLevel = hasOutbound
? 'info'
: (game.status_level ?? (newerThanExpected ? 'warning' : undefined));
return (
<button
@@ -66,8 +75,8 @@ export const GameCard = ({
<div className="card-meta">
{metaSeparator(formatBytes(game.size), game.genre || null)}
</div>
<div className={`card-status${game.status_level === 'error' ? ' is-error' : ''}`}>
{game.status_message ?? ''}
<div className={`card-status${statusLevel ? ` is-${statusLevel}` : ''}`}>
{statusMessage}
</div>
<ActionButton
game={game}
@@ -5,7 +5,7 @@ import { StateChip } from '../StateChip';
import { ActionButton } from '../ActionButton';
import { Game, InstallStatus } from '../../lib/types';
import { deriveState, isInProgress } from '../../lib/gameState';
import { deriveState, hasNewerLocalVersion, isInProgress } from '../../lib/gameState';
import { formatBytes, formatEtiVersion, formatPlayers } from '../../lib/format';
interface Props {
@@ -59,6 +59,18 @@ export const GameDetailModal = ({
|| game.installed
|| game.install_status === InstallStatus.Downloading
|| game.install_status === InstallStatus.Installing;
const newerThanExpected = hasNewerLocalVersion(game);
const newerStatus = newerThanExpected
? `Local version ${formatEtiVersion(game.local_version)} is newer than expected ${formatEtiVersion(game.eti_game_version)}.`
: undefined;
const hasOutbound = game.active_outbound_transfers !== undefined && game.active_outbound_transfers > 0;
const outboundStatus = hasOutbound
? `Sharing to ${game.active_outbound_transfers} peer${game.active_outbound_transfers === 1 ? '' : 's'}.`
: undefined;
const statusMessage = outboundStatus ?? game.status_message ?? newerStatus;
const statusLevel = hasOutbound
? 'info'
: (game.status_level ?? (newerStatus ? 'warning' : undefined));
return (
<Modal onClose={onClose}>
<button className="modal-close" type="button" onClick={onClose} aria-label="Close">
@@ -95,7 +107,7 @@ export const GameDetailModal = ({
<div className="meta-cell">
<div className="meta-label">Version</div>
<div className="meta-value meta-mono">
{formatEtiVersion(game.local_version ?? game.eti_game_version)}
{formatEtiVersion(game.eti_game_version ?? game.local_version)}
</div>
</div>
<div className="meta-cell">
@@ -108,9 +120,9 @@ export const GameDetailModal = ({
<p className="modal-desc">{description}</p>
)}
{game.status_message && (
<p className={`modal-status${game.status_level === 'error' ? ' is-error' : ''}`}>
{game.status_message}
{statusMessage && (
<p className={`modal-status${statusLevel ? ` is-${statusLevel}` : ''}`}>
{statusMessage}
</p>
)}
@@ -1,5 +1,6 @@
import { useCallback } from 'react';
import { invoke } from '@tauri-apps/api/core';
import { ask } from '@tauri-apps/plugin-dialog';
import { type UseGamesResult } from './useGames';
import { type UISettings } from './useSettings';
@@ -69,6 +70,14 @@ export const useGameActions = (
const update = useCallback(async (id: string) => {
try {
const game = games.games.find(item => item.id === id);
if (game && game.active_outbound_transfers && game.active_outbound_transfers > 0) {
const confirmed = await ask(
`Peers are currently downloading this game from you. Updating will abort their downloads. Do you want to proceed?`,
{ title: 'Active Transfers in Progress', kind: 'warning' }
);
if (!confirmed) return;
}
const success = await invoke<boolean>('update_game', {
id,
language: settings.language,
@@ -90,11 +99,19 @@ export const useGameActions = (
const removeDownload = useCallback(async (id: string) => {
try {
const game = games.games.find(item => item.id === id);
if (game && game.active_outbound_transfers && game.active_outbound_transfers > 0) {
const confirmed = await ask(
`Peers are currently downloading this game from you. Removing game files will abort their downloads. Do you want to proceed?`,
{ title: 'Active Transfers in Progress', kind: 'warning' }
);
if (!confirmed) return;
}
await invoke('remove_downloaded_game', { id });
} catch (err) {
console.error('remove_downloaded_game failed:', err);
}
}, []);
}, [games]);
const cancelDownload = useCallback(async (id: string) => {
try {
@@ -88,17 +88,30 @@ export const isUnavailable = (game: Game): boolean =>
&& game.peer_count === 0
&& game.install_status === InstallStatus.NotInstalled;
const parseVersionStamp = (version: string | undefined): number | null => {
if (!version || !/^\d{8}$/.test(version)) return null;
const parsed = parseInt(version, 10);
return Number.isNaN(parsed) ? null : parsed;
};
export const compareVersionStamps = (
left: string | undefined,
right: string | undefined,
): number | null => {
const parsedLeft = parseVersionStamp(left);
const parsedRight = parseVersionStamp(right);
if (parsedLeft === null || parsedRight === null) return null;
return parsedLeft - parsedRight;
};
export const hasNewerLocalVersion = (game: Game): boolean =>
(compareVersionStamps(game.local_version, game.eti_game_version) ?? 0) > 0;
export const needsUpdate = (game: Game): boolean => {
if (!game.installed) return false;
const peer = game.eti_game_version;
const local = game.local_version;
if (!local && peer) return true;
if (local && peer) {
const l = parseInt(local, 10);
const p = parseInt(peer, 10);
if (!Number.isNaN(l) && !Number.isNaN(p)) return p > l;
}
return false;
if (game.peer_count <= 0) return false;
if (!game.local_version && game.eti_game_version) return true;
return (compareVersionStamps(game.eti_game_version, game.local_version) ?? 0) > 0;
};
/** What pressing the card's main action button should do, given the state. */
@@ -21,7 +21,7 @@ export enum ActiveOperationKind {
RemovingDownload = 'RemovingDownload',
}
export type StatusLevel = 'info' | 'error';
export type StatusLevel = 'info' | 'warning' | 'error';
export interface DownloadProgress {
downloaded_bytes: number;
@@ -59,6 +59,7 @@ export interface Game {
download_progress?: DownloadProgress;
peer_count: number;
can_host_server?: boolean;
active_outbound_transfers?: number;
}
export interface ActiveOperation {
@@ -739,6 +739,12 @@
.card-status.is-error {
color: #f87171;
}
.card-status.is-warning {
color: #fbbf24;
}
.card-status.is-info {
color: #60a5fa;
}
.density-compact .card-body {
padding: 9px 10px 10px;
@@ -1383,6 +1389,16 @@
border-color: rgba(239, 68, 68, 0.4);
background: rgba(239, 68, 68, 0.08);
}
.modal-status.is-warning {
color: #fbbf24;
border-color: rgba(245, 158, 11, 0.4);
background: rgba(245, 158, 11, 0.08);
}
.modal-status.is-info {
color: #60a5fa;
border-color: rgba(96, 165, 250, 0.4);
background: rgba(96, 165, 250, 0.08);
}
.modal-actions {
display: flex;
align-items: center;