diff --git a/Cargo.lock b/Cargo.lock index d4d68ae..9e6ddd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1041,6 +1041,16 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "filetime" +version = "0.2.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c287a33c7f0a620c38e641e7f60827713987b3c0f26e8ddc9462cc69cf75759" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1128,6 +1138,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -1907,6 +1926,35 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd168d97690d0b8c412d6b6c10360277f4d7ee495c5d0d5d5fe0854923255cc" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "intrusive-collections" version = "0.10.1" @@ -2069,6 +2117,26 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07293a4e297ac234359b510362495713f75ea345d5307140414f20c69ffeb087" +dependencies = [ + "bitflags 2.11.1", + "libc", +] + [[package]] name = "lanspread-compat" version = "0.1.0" @@ -2113,6 +2181,7 @@ dependencies = [ "lanspread-proto", "lanspread-utils", "log", + "notify", "s2n-quic", "serde", "serde_json", @@ -2415,6 +2484,34 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "notify" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c533b4c39709f9ba5005d8002048266593c1cfaf3c5f0739d5b8ab0c6c504009" +dependencies = [ + "bitflags 2.11.1", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.52.0", +] + +[[package]] +name = "notify-types" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585d3cb5e12e01aed9e8a1f70d5c6b5e86fe2a6e48fc8cd0b3e0b8df6f6eb174" +dependencies = [ + "instant", +] + [[package]] name = "num-conv" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index a366b78..7544488 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ if-addrs = "0.15" log = "0.4" mdns-sd = "0.19" mimalloc = { version = "0.1", features = ["secure"] } +notify = "7" s2n-quic = { version = "1", features = ["provider-event-tracing"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/crates/lanspread-db/src/db.rs b/crates/lanspread-db/src/db.rs index e96a1e0..b3eb663 100644 --- a/crates/lanspread-db/src/db.rs +++ b/crates/lanspread-db/src/db.rs @@ -179,7 +179,8 @@ pub struct GameFileDescription { impl GameFileDescription { #[must_use] pub fn is_version_ini(&self) -> bool { - self.relative_path.ends_with("/version.ini") + let expected = format!("{}/version.ini", self.game_id); + self.relative_path.replace('\\', "/") == expected } #[must_use] @@ -206,7 +207,7 @@ impl fmt::Debug for GameFileDescription { mod tests { use serde_json::json; - use super::Game; + use super::{Game, GameFileDescription}; #[test] fn installed_defaults_to_false_when_missing() { @@ -234,4 +235,31 @@ mod tests { "missing installed flag should default to false" ); } + + #[test] + fn version_ini_predicate_matches_only_game_root_sentinel() { + let root = GameFileDescription { + game_id: "aoe2".to_string(), + relative_path: "aoe2/version.ini".to_string(), + is_dir: false, + size: 8, + }; + assert!(root.is_version_ini()); + + let nested = GameFileDescription { + game_id: "aoe2".to_string(), + relative_path: "aoe2/local/version.ini".to_string(), + is_dir: false, + size: 8, + }; + assert!(!nested.is_version_ini()); + + let other_game = GameFileDescription { + game_id: "aoe2".to_string(), + relative_path: "other/version.ini".to_string(), + is_dir: false, + size: 8, + }; + assert!(!other_game.is_version_ini()); + } } diff --git a/crates/lanspread-peer/Cargo.toml b/crates/lanspread-peer/Cargo.toml index a4060b4..060d36e 100644 --- a/crates/lanspread-peer/Cargo.toml +++ b/crates/lanspread-peer/Cargo.toml @@ -21,6 +21,7 @@ futures = { workspace = true } gethostname = { workspace = true } if-addrs = { workspace = true } log = { workspace = true } +notify = { workspace = true } s2n-quic = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/lanspread-peer/src/config.rs b/crates/lanspread-peer/src/config.rs index 473fd55..254eb26 100644 --- a/crates/lanspread-peer/src/config.rs +++ b/crates/lanspread-peer/src/config.rs @@ -17,8 +17,8 @@ pub const CHUNK_SIZE: u64 = 32 * 1024 * 1024; /// Maximum number of retry attempts for failed chunk downloads. pub const MAX_RETRY_COUNT: usize = 3; -/// Interval for local game directory monitoring (seconds). -pub const LOCAL_GAME_MONITOR_INTERVAL_SECS: u64 = 15; +/// Fallback interval for reconciling missed filesystem watcher events (seconds). +pub const LOCAL_GAME_FALLBACK_SCAN_SECS: u64 = 300; /// TLS certificate for QUIC connections. pub static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); diff --git a/crates/lanspread-peer/src/context.rs b/crates/lanspread-peer/src/context.rs index 0eca907..02d0dba 100644 --- a/crates/lanspread-peer/src/context.rs +++ b/crates/lanspread-peer/src/context.rs @@ -11,7 +11,20 @@ use lanspread_db::db::GameDB; use tokio::sync::RwLock; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use crate::{PeerEvent, library::LocalLibraryState, peer_db::PeerGameDB}; +use crate::{PeerEvent, Unpacker, library::LocalLibraryState, peer_db::PeerGameDB}; + +/// Mutating filesystem operation currently in flight for a game root. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum OperationKind { + /// Downloading or replacing archive files. + Downloading, + /// Extracting into a previously uninstalled game root. + Installing, + /// Replacing an existing `local/` install. + Updating, + /// Removing an existing `local/` install. + Uninstalling, +} /// Main context for the peer system. #[derive(Clone)] @@ -21,8 +34,10 @@ pub struct Ctx { pub local_library: Arc>, pub peer_game_db: Arc>, pub local_peer_addr: Arc>>, - pub downloading_games: Arc>>, + pub active_operations: Arc>>, pub active_downloads: Arc>>, + pub unpacker: Arc, + pub catalog: Arc>>, pub peer_id: Arc, pub shutdown: CancellationToken, pub task_tracker: TaskTracker, @@ -35,8 +50,9 @@ pub struct PeerCtx { pub local_game_db: Arc>>, pub local_library: Arc>, pub local_peer_addr: Arc>>, - pub downloading_games: Arc>>, + pub active_operations: Arc>>, pub peer_game_db: Arc>, + pub catalog: Arc>>, pub peer_id: Arc, pub tx_notify_ui: tokio::sync::mpsc::UnboundedSender, pub shutdown: CancellationToken, @@ -49,7 +65,7 @@ impl std::fmt::Debug for PeerCtx { .field("game_dir", &"...") .field("local_game_db", &"...") .field("local_peer_addr", &"...") - .field("downloading_games", &"...") + .field("active_operations", &"...") .finish() } } @@ -60,8 +76,10 @@ impl Ctx { peer_game_db: Arc>, peer_id: String, game_dir: PathBuf, + unpacker: Arc, shutdown: CancellationToken, task_tracker: TaskTracker, + catalog: Arc>>, ) -> Self { Self { game_dir: Arc::new(RwLock::new(game_dir)), @@ -69,8 +87,10 @@ impl Ctx { local_library: Arc::new(RwLock::new(LocalLibraryState::empty())), peer_game_db, local_peer_addr: Arc::new(RwLock::new(None)), - downloading_games: Arc::new(RwLock::new(HashSet::new())), + active_operations: Arc::new(RwLock::new(HashMap::new())), active_downloads: Arc::new(RwLock::new(HashMap::new())), + unpacker, + catalog, peer_id: Arc::new(peer_id), shutdown, task_tracker, @@ -87,8 +107,9 @@ impl Ctx { local_game_db: self.local_game_db.clone(), local_library: self.local_library.clone(), local_peer_addr: self.local_peer_addr.clone(), - downloading_games: self.downloading_games.clone(), + active_operations: self.active_operations.clone(), peer_game_db: self.peer_game_db.clone(), + catalog: self.catalog.clone(), peer_id: self.peer_id.clone(), tx_notify_ui, shutdown: self.shutdown.clone(), @@ -97,42 +118,60 @@ impl Ctx { } } -/// Removes download tracking no matter how a download task exits. -pub(crate) struct DownloadStateGuard { +/// Removes operation tracking no matter how a task exits. +pub(crate) struct OperationGuard { id: String, - downloading_games: Arc>>, + active_operations: Arc>>, active_downloads: Arc>>, + clears_download: bool, } -impl DownloadStateGuard { +impl OperationGuard { pub(crate) fn new( id: String, - downloading_games: Arc>>, + active_operations: Arc>>, + ) -> Self { + Self { + id, + active_operations, + active_downloads: Arc::new(RwLock::new(HashMap::new())), + clears_download: false, + } + } + + pub(crate) fn download( + id: String, + active_operations: Arc>>, active_downloads: Arc>>, ) -> Self { Self { id, - downloading_games, + active_operations, active_downloads, + clears_download: true, } } } -impl Drop for DownloadStateGuard { +impl Drop for OperationGuard { fn drop(&mut self) { let id = self.id.clone(); - if let Ok(mut guard) = self.downloading_games.try_write() { + if let Ok(mut guard) = self.active_operations.try_write() { guard.remove(&id); } else if let Ok(handle) = tokio::runtime::Handle::try_current() { - let downloading_games = self.downloading_games.clone(); + let active_operations = self.active_operations.clone(); handle.spawn({ let id = id.clone(); async move { - downloading_games.write().await.remove(&id); + active_operations.write().await.remove(&id); } }); } else { - log::error!("Failed to clean downloading state for {id}: no Tokio runtime"); + log::error!("Failed to clean operation state for {id}: no Tokio runtime"); + } + + if !self.clears_download { + return; } if let Ok(mut guard) = self.active_downloads.try_write() { @@ -153,33 +192,29 @@ impl Drop for DownloadStateGuard { #[cfg(test)] mod tests { - use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - time::Duration, - }; + use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; - use super::DownloadStateGuard; + use super::{OperationGuard, OperationKind}; - type DownloadTracking = ( - Arc>>, + type OperationTracking = ( + Arc>>, Arc>>, CancellationToken, ); async fn wait_for_tracking_clear( id: &str, - downloading_games: &Arc>>, + active_operations: &Arc>>, active_downloads: &Arc>>, ) { tokio::time::timeout(Duration::from_secs(1), async { loop { - let downloading_contains = downloading_games.read().await.contains(id); + let operation_contains = active_operations.read().await.contains_key(id); let active_contains = active_downloads.read().await.contains_key(id); - if !downloading_contains && !active_contains { + if !operation_contains && !active_contains { break; } tokio::task::yield_now().await; @@ -189,57 +224,60 @@ mod tests { .expect("download tracking should be cleared"); } - fn tracked_download_state(id: &str) -> DownloadTracking { - let downloading_games = Arc::new(RwLock::new(HashSet::from([id.to_string()]))); + fn tracked_download_state(id: &str) -> OperationTracking { + let active_operations = Arc::new(RwLock::new(HashMap::from([( + id.to_string(), + OperationKind::Downloading, + )]))); let cancel = CancellationToken::new(); let active_downloads = Arc::new(RwLock::new(HashMap::from([( id.to_string(), cancel.clone(), )]))); - (downloading_games, active_downloads, cancel) + (active_operations, active_downloads, cancel) } #[tokio::test] - async fn download_state_guard_clears_tracking_on_completion() { + async fn operation_guard_clears_tracking_on_completion() { let id = "game-complete"; - let (downloading_games, active_downloads, _) = tracked_download_state(id); + let (active_operations, active_downloads, _) = tracked_download_state(id); - drop(DownloadStateGuard::new( + drop(OperationGuard::download( id.to_string(), - downloading_games.clone(), + active_operations.clone(), active_downloads.clone(), )); - wait_for_tracking_clear(id, &downloading_games, &active_downloads).await; + wait_for_tracking_clear(id, &active_operations, &active_downloads).await; } #[tokio::test] - async fn download_state_guard_clears_tracking_after_cancellation() { + async fn operation_guard_clears_tracking_after_cancellation() { let id = "game-cancelled"; - let (downloading_games, active_downloads, cancel) = tracked_download_state(id); + let (active_operations, active_downloads, cancel) = tracked_download_state(id); cancel.cancel(); - drop(DownloadStateGuard::new( + drop(OperationGuard::download( id.to_string(), - downloading_games.clone(), + active_operations.clone(), active_downloads.clone(), )); - wait_for_tracking_clear(id, &downloading_games, &active_downloads).await; + wait_for_tracking_clear(id, &active_operations, &active_downloads).await; } #[tokio::test] - async fn download_state_guard_clears_tracking_when_task_is_dropped() { + async fn operation_guard_clears_tracking_when_task_is_dropped() { let id = "game-aborted"; - let (downloading_games, active_downloads, _) = tracked_download_state(id); + let (active_operations, active_downloads, _) = tracked_download_state(id); let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); let handle = tokio::spawn({ - let downloading_games = downloading_games.clone(); + let active_operations = active_operations.clone(); let active_downloads = active_downloads.clone(); async move { let _guard = - DownloadStateGuard::new(id.to_string(), downloading_games, active_downloads); + OperationGuard::download(id.to_string(), active_operations, active_downloads); let _ = ready_tx.send(()); std::future::pending::<()>().await; } @@ -249,6 +287,6 @@ mod tests { handle.abort(); let _ = handle.await; - wait_for_tracking_clear(id, &downloading_games, &active_downloads).await; + wait_for_tracking_clear(id, &active_operations, &active_downloads).await; } } diff --git a/crates/lanspread-peer/src/download.rs b/crates/lanspread-peer/src/download.rs index 215a4cf..370749f 100644 --- a/crates/lanspread-peer/src/download.rs +++ b/crates/lanspread-peer/src/download.rs @@ -4,13 +4,14 @@ use std::{ collections::{HashMap, VecDeque}, net::SocketAddr, path::{Path, PathBuf}, + sync::Arc, }; use lanspread_db::db::GameFileDescription; use tokio::{ fs::OpenOptions, io::{AsyncSeekExt, AsyncWriteExt}, - sync::mpsc::UnboundedSender, + sync::{Mutex, mpsc::UnboundedSender}, }; use tokio_util::{ codec::{FramedWrite, LengthDelimitedCodec}, @@ -53,6 +54,49 @@ pub struct ChunkDownloadResult { pub peer_addr: SocketAddr, } +#[derive(Debug)] +pub struct VersionIniBuffer { + relative_path: String, + bytes: Mutex>, +} + +impl VersionIniBuffer { + fn new(desc: &GameFileDescription) -> eyre::Result { + if desc.is_dir { + eyre::bail!("version.ini sentinel cannot be a directory"); + } + let size = usize::try_from(desc.size)?; + Ok(Self { + relative_path: desc.relative_path.clone(), + bytes: Mutex::new(vec![0; size]), + }) + } + + fn matches(&self, relative_path: &str) -> bool { + self.relative_path == relative_path + } + + async fn write_at(&self, offset: u64, bytes: &[u8]) -> eyre::Result<()> { + let offset = usize::try_from(offset)?; + let mut buffer = self.bytes.lock().await; + let end = offset + .checked_add(bytes.len()) + .ok_or_else(|| eyre::eyre!("version.ini chunk offset overflow"))?; + if end > buffer.len() { + eyre::bail!( + "version.ini chunk exceeds buffer: end {end}, buffer {}", + buffer.len() + ); + } + buffer[offset..end].copy_from_slice(bytes); + Ok(()) + } + + async fn snapshot(&self) -> Vec { + self.bytes.lock().await.clone() + } +} + fn ensure_download_not_cancelled( cancel_token: &CancellationToken, game_id: &str, @@ -63,6 +107,35 @@ fn ensure_download_not_cancelled( Ok(()) } +fn partition_download_descriptions( + game_id: &str, + game_file_descs: Vec, + tx_notify_ui: &UnboundedSender, +) -> eyre::Result<(GameFileDescription, Vec)> { + let mut version_descs = Vec::new(); + let mut transfer_descs = Vec::new(); + + for desc in game_file_descs { + if desc.is_version_ini() { + version_descs.push(desc.clone()); + } + transfer_descs.push(desc); + } + + if version_descs.len() != 1 { + let _ = tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + }); + eyre::bail!( + "expected exactly one root-level version.ini sentinel for {game_id}, found {}", + version_descs.len() + ); + } + + let version_desc = version_descs.remove(0); + Ok((version_desc, transfer_descs)) +} + // ============================================================================= // Storage preparation // ============================================================================= @@ -73,6 +146,10 @@ pub async fn prepare_game_storage( file_descs: &[GameFileDescription], ) -> eyre::Result<()> { for desc in file_descs { + if desc.is_version_ini() { + continue; + } + // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(games_folder, &desc.relative_path)?; @@ -113,6 +190,76 @@ pub async fn prepare_game_storage( Ok(()) } +async fn begin_version_ini_transaction(game_root: &Path) -> eyre::Result<()> { + tokio::fs::create_dir_all(game_root).await?; + remove_file_if_exists(&game_root.join(".version.ini.tmp")).await?; + remove_file_if_exists(&game_root.join(".version.ini.discarded")).await?; + + let version_path = game_root.join("version.ini"); + if tokio::fs::metadata(&version_path) + .await + .is_ok_and(|metadata| metadata.is_file()) + { + tokio::fs::rename(version_path, game_root.join(".version.ini.discarded")).await?; + } + Ok(()) +} + +async fn rollback_version_ini_transaction(game_root: &Path) { + if let Err(err) = remove_file_if_exists(&game_root.join(".version.ini.tmp")).await { + log::warn!( + "Failed to sweep partial version.ini tmp in {}: {err}", + game_root.display() + ); + } + if let Err(err) = remove_file_if_exists(&game_root.join(".version.ini.discarded")).await { + log::warn!( + "Failed to sweep discarded version.ini in {}: {err}", + game_root.display() + ); + } +} + +async fn commit_version_ini_buffer( + game_root: &Path, + buffer: &VersionIniBuffer, +) -> eyre::Result<()> { + let tmp_path = game_root.join(".version.ini.tmp"); + let version_path = game_root.join("version.ini"); + let bytes = buffer.snapshot().await; + + let mut file = tokio::fs::File::create(&tmp_path).await?; + file.write_all(&bytes).await?; + file.sync_all().await?; + drop(file); + + tokio::fs::rename(&tmp_path, &version_path).await?; + sync_parent_dir(&version_path)?; + remove_file_if_exists(&game_root.join(".version.ini.discarded")).await?; + Ok(()) +} + +#[cfg(unix)] +fn sync_parent_dir(path: &Path) -> std::io::Result<()> { + if let Some(parent) = path.parent() { + std::fs::File::open(parent)?.sync_all()?; + } + Ok(()) +} + +#[cfg(not(unix))] +fn sync_parent_dir(_path: &Path) -> std::io::Result<()> { + Ok(()) +} + +async fn remove_file_if_exists(path: &Path) -> eyre::Result<()> { + match tokio::fs::remove_file(path).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.into()), + } +} + // ============================================================================= // Peer plan building // ============================================================================= @@ -194,6 +341,7 @@ pub async fn download_chunk( base_dir: &Path, game_id: &str, chunk: &DownloadChunk, + version_buffer: Option>, ) -> eyre::Result<()> { use futures::SinkExt; use lanspread_proto::{Message, Request}; @@ -212,6 +360,12 @@ pub async fn download_chunk( framed_tx.close().await?; + if let Some(buffer) = version_buffer + && buffer.matches(&chunk.relative_path) + { + return download_version_ini_chunk(&mut rx, chunk, &buffer).await; + } + // Validate the path to prevent directory traversal let validated_path = validate_game_file_path(base_dir, &chunk.relative_path)?; let mut file = OpenOptions::new() @@ -261,6 +415,28 @@ pub async fn download_chunk( Ok(()) } +async fn download_version_ini_chunk( + rx: &mut s2n_quic::stream::ReceiveStream, + chunk: &DownloadChunk, + buffer: &VersionIniBuffer, +) -> eyre::Result<()> { + let mut received = Vec::new(); + while let Some(bytes) = rx.receive().await? { + received.extend_from_slice(&bytes); + } + + if chunk.length > 0 && u64::try_from(received.len())? != chunk.length { + eyre::bail!( + "Incomplete version.ini chunk download: expected {} bytes, received {} bytes at offset {}", + chunk.length, + received.len(), + chunk.offset + ); + } + + buffer.write_at(chunk.offset, &received).await +} + /// Verifies that a chunk was written correctly. async fn verify_chunk_integrity( file_path: &Path, @@ -329,6 +505,7 @@ pub async fn download_from_peer( plan: PeerDownloadPlan, games_folder: PathBuf, cancel_token: &CancellationToken, + version_buffer: Option>, ) -> eyre::Result> { if plan.chunks.is_empty() && plan.whole_files.is_empty() { return Ok(Vec::new()); @@ -354,7 +531,8 @@ pub async fn download_from_peer( chunk.length, peer_addr ); - let result = download_chunk(&mut conn, &base_dir, game_id, chunk).await; + let result = + download_chunk(&mut conn, &base_dir, game_id, chunk, version_buffer.clone()).await; results.push(ChunkDownloadResult { chunk: chunk.clone(), result, @@ -425,6 +603,7 @@ pub async fn retry_failed_chunks( game_id: &str, file_peer_map: &HashMap>, cancel_token: &CancellationToken, + version_buffer: Option>, ) -> eyre::Result> { let mut exhausted = Vec::new(); let mut queue: VecDeque = failed_chunks.into_iter().collect(); @@ -476,6 +655,7 @@ pub async fn retry_failed_chunks( plan, base_dir.to_path_buf(), cancel_token, + version_buffer.clone(), ) .await { @@ -556,27 +736,62 @@ pub async fn download_game_files( } if cancel_token.is_cancelled() { - return Ok(()); + eyre::bail!("download cancelled for game {game_id}"); } - prepare_game_storage(&games_folder, &game_file_descs).await?; + let (version_desc, transfer_descs) = + partition_download_descriptions(game_id, game_file_descs, &tx_notify_ui)?; + let version_buffer = match VersionIniBuffer::new(&version_desc) { + Ok(buffer) => Arc::new(buffer), + Err(err) => { + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } + }; + let game_root = games_folder.join(game_id); + + if let Err(err) = begin_version_ini_transaction(&game_root).await { + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } + if let Err(err) = prepare_game_storage(&games_folder, &transfer_descs).await { + rollback_version_ini_transaction(&game_root).await; + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } if cancel_token.is_cancelled() { - return Ok(()); + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); } tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin { id: game_id.to_string(), })?; - let plans = build_peer_plans(&peers, &game_file_descs, &file_peer_map); + let plans = build_peer_plans(&peers, &transfer_descs, &file_peer_map); let mut tasks = Vec::new(); for (peer_addr, plan) in plans { let base_dir = games_folder.clone(); let game_id = game_id.to_string(); let cancel_token = cancel_token.clone(); + let version_buffer = version_buffer.clone(); tasks.push(tokio::spawn(async move { - download_from_peer(peer_addr, &game_id, plan, base_dir, &cancel_token).await + download_from_peer( + peer_addr, + &game_id, + plan, + base_dir, + &cancel_token, + Some(version_buffer), + ) + .await })); } @@ -585,13 +800,15 @@ pub async fn download_game_files( for handle in tasks { if cancel_token.is_cancelled() { - return Ok(()); + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); } match handle.await { Ok(Ok(results)) => { if cancel_token.is_cancelled() { - return Ok(()); + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); } for chunk_result in results { @@ -614,7 +831,10 @@ pub async fn download_game_files( } } } - Ok(Err(_)) | Err(_) if cancel_token.is_cancelled() => return Ok(()), + Ok(Err(_)) | Err(_) if cancel_token.is_cancelled() => { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } Ok(Err(e)) => last_err = Some(e), Err(e) => last_err = Some(eyre::eyre!("task join error: {e}")), } @@ -623,7 +843,8 @@ pub async fn download_game_files( // Retry failed chunks if any if !failed_chunks.is_empty() && !peers.is_empty() { if cancel_token.is_cancelled() { - return Ok(()); + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); } log::info!("Retrying {} failed chunks", failed_chunks.len()); @@ -635,11 +856,15 @@ pub async fn download_game_files( game_id, &file_peer_map, &cancel_token, + Some(version_buffer.clone()), ) .await { Ok(results) => results, - Err(_) if cancel_token.is_cancelled() => return Ok(()), + Err(_) if cancel_token.is_cancelled() => { + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); + } Err(err) => { last_err = Some(err); Vec::new() @@ -648,7 +873,8 @@ pub async fn download_game_files( for chunk_result in retry_results { if cancel_token.is_cancelled() { - return Ok(()); + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); } if let Err(e) = chunk_result.result { @@ -659,16 +885,25 @@ pub async fn download_game_files( } if cancel_token.is_cancelled() { - return Ok(()); + rollback_version_ini_transaction(&game_root).await; + eyre::bail!("download cancelled for game {game_id}"); } if let Some(err) = last_err { + rollback_version_ini_transaction(&game_root).await; tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: game_id.to_string(), })?; return Err(err); } + if let Err(err) = commit_version_ini_buffer(&game_root, &version_buffer).await { + rollback_version_ini_transaction(&game_root).await; + tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { + id: game_id.to_string(), + })?; + return Err(err); + } log::info!("all files downloaded for game: {game_id}"); tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished { id: game_id.to_string(), @@ -688,6 +923,34 @@ mod tests { SocketAddr::from(([127, 0, 0, 1], port)) } + struct TempDir(PathBuf); + + impl TempDir { + fn new() -> Self { + let mut path = std::env::temp_dir(); + path.push(format!( + "lanspread-download-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + std::fs::create_dir_all(&path).expect("temp dir should be created"); + Self(path) + } + + fn path(&self) -> &Path { + &self.0 + } + } + + impl Drop for TempDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + #[test] fn build_peer_plans_handles_partial_final_chunk() { let peers = vec![loopback_addr(12000), loopback_addr(12001)]; @@ -773,4 +1036,126 @@ mod tests { } } } + + #[tokio::test] + async fn prepare_game_storage_skips_version_ini_sentinel() { + let temp = TempDir::new(); + let descs = vec![GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }]; + + prepare_game_storage(temp.path(), &descs) + .await + .expect("storage preparation should succeed"); + + assert!(!temp.path().join("game").join("version.ini").exists()); + } + + #[tokio::test] + async fn version_ini_buffer_accepts_out_of_order_chunks() { + let desc = GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }; + let buffer = VersionIniBuffer::new(&desc).expect("buffer should be created"); + + buffer + .write_at(4, b"0101") + .await + .expect("second chunk should write"); + buffer + .write_at(0, b"2025") + .await + .expect("first chunk should write"); + + assert_eq!(buffer.snapshot().await, b"20250101"); + } + + #[tokio::test] + async fn commit_version_ini_writes_sentinel_last_and_sweeps_discarded() { + let temp = TempDir::new(); + let game_root = temp.path().join("game"); + tokio::fs::create_dir_all(&game_root) + .await + .expect("game root should be created"); + tokio::fs::write(game_root.join(".version.ini.discarded"), b"old") + .await + .expect("discarded sentinel should be written"); + + let desc = GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }; + let buffer = VersionIniBuffer::new(&desc).expect("buffer should be created"); + buffer + .write_at(0, b"20250101") + .await + .expect("version should be buffered"); + + commit_version_ini_buffer(&game_root, &buffer) + .await + .expect("version sentinel should commit"); + + assert_eq!( + std::fs::read(game_root.join("version.ini")).expect("version.ini should exist"), + b"20250101" + ); + assert!(!game_root.join(".version.ini.tmp").exists()); + assert!(!game_root.join(".version.ini.discarded").exists()); + } + + #[test] + fn partition_requires_exactly_one_root_version_ini() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); + let duplicate = vec![ + GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }, + GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/local/version.ini".to_string(), + is_dir: false, + size: 8, + }, + ]; + + let (version, transfer) = partition_download_descriptions("game", duplicate, &tx) + .expect("only one root sentinel"); + assert_eq!(version.relative_path, "game/version.ini"); + assert_eq!(transfer.len(), 2); + + let missing = vec![GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/archive.eti".to_string(), + is_dir: false, + size: 1, + }]; + assert!(partition_download_descriptions("game", missing, &tx).is_err()); + + let multiple = vec![ + GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }, + GameFileDescription { + game_id: "game".to_string(), + relative_path: "game/version.ini".to_string(), + is_dir: false, + size: 8, + }, + ]; + assert!(partition_download_descriptions("game", multiple, &tx).is_err()); + } } diff --git a/crates/lanspread-peer/src/handlers.rs b/crates/lanspread-peer/src/handlers.rs index 97c1b36..c5ecce3 100644 --- a/crates/lanspread-peer/src/handlers.rs +++ b/crates/lanspread-peer/src/handlers.rs @@ -1,21 +1,26 @@ //! Command handlers for peer commands. -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{collections::hash_map::Entry, net::SocketAddr, path::PathBuf, sync::Arc}; -use lanspread_db::db::GameFileDescription; +use lanspread_db::db::{GameDB, GameFileDescription}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ + InstallOperation, PeerEvent, - context::{Ctx, DownloadStateGuard}, + context::{Ctx, OperationGuard, OperationKind}, download::download_game_files, events, identity::FEATURE_LIBRARY_DELTA, + install, local_games::{ LocalLibraryScan, + game_from_summary, get_game_file_descriptions, + local_dir_is_directory, local_download_available, scan_local_library, + version_ini_is_regular_file, }, network::{announce_games_to_peer, request_game_details_from_peer, send_library_delta}, peer_db::PeerGameDB, @@ -40,11 +45,13 @@ async fn try_serve_local_game( ) -> bool { let game_dir = { ctx.game_dir.read().await.clone() }; - let downloading = ctx.downloading_games.read().await; - if !local_download_available(&game_dir, id, &downloading).await { + let active_operations = ctx.active_operations.read().await; + let catalog = ctx.catalog.read().await; + if !local_download_available(&game_dir, id, &active_operations, &catalog).await { return false; } - drop(downloading); + drop(active_operations); + drop(catalog); match get_game_file_descriptions(id, &game_dir).await { Ok(file_descriptions) => { @@ -187,8 +194,9 @@ pub async fn handle_download_game_files_command( } let local_dl_available = { - let downloading = ctx.downloading_games.read().await; - local_download_available(&games_folder, &id, &downloading).await + let active_operations = ctx.active_operations.read().await; + let catalog = ctx.catalog.read().await; + local_download_available(&games_folder, &id, &active_operations, &catalog).await }; if peer_whitelist.is_empty() { @@ -203,6 +211,12 @@ pub async fn handle_download_game_files_command( { log::error!("Failed to send DownloadGameFilesFinished event: {e}"); } + spawn_install_operation( + ctx, + tx_notify_ui, + id.clone(), + RequestedInstallOperation::Auto, + ); } else { log::error!("No trusted peers available after majority validation for game {id}"); } @@ -210,18 +224,24 @@ pub async fn handle_download_game_files_command( } { - let mut in_progress = ctx.downloading_games.write().await; - if !in_progress.insert(id.clone()) { - log::warn!("Download for {id} already in progress; ignoring new request"); - return; + let mut in_progress = ctx.active_operations.write().await; + match in_progress.entry(id.clone()) { + Entry::Vacant(entry) => { + entry.insert(OperationKind::Downloading); + } + Entry::Occupied(_) => { + log::warn!("Operation for {id} already in progress; ignoring new download request"); + return; + } } } - let downloading_games = ctx.downloading_games.clone(); + let active_operations = ctx.active_operations.clone(); let active_downloads = ctx.active_downloads.clone(); let tx_notify_ui_clone = tx_notify_ui.clone(); let download_id = id.clone(); let cancel_token = ctx.shutdown.child_token(); + let ctx_clone = ctx.clone(); ctx.active_downloads .write() @@ -229,26 +249,218 @@ pub async fn handle_download_game_files_command( .insert(id, cancel_token.clone()); ctx.task_tracker.spawn(async move { - let _download_state_guard = - DownloadStateGuard::new(download_id.clone(), downloading_games, active_downloads); + let result = { + let _download_state_guard = + OperationGuard::download(download_id.clone(), active_operations, active_downloads); - let result = download_game_files( - &download_id, - resolved_descriptions, - games_folder, - peer_whitelist, - file_peer_map, - tx_notify_ui_clone.clone(), - cancel_token, - ) - .await; + download_game_files( + &download_id, + resolved_descriptions, + games_folder, + peer_whitelist, + file_peer_map, + tx_notify_ui_clone.clone(), + cancel_token, + ) + .await + }; - if let Err(e) = result { - log::error!("Download failed for {download_id}: {e}"); + match result { + Ok(()) => { + run_install_operation( + &ctx_clone, + &tx_notify_ui_clone, + download_id, + RequestedInstallOperation::Auto, + ) + .await; + } + Err(e) => { + log::error!("Download failed for {download_id}: {e}"); + } } }); } +/// Handles the `InstallGame` command. +pub async fn handle_install_game_command( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, +) { + spawn_install_operation(ctx, tx_notify_ui, id, RequestedInstallOperation::Install); +} + +/// Handles the `UpdateGame` command. +pub async fn handle_update_game_command( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, +) { + spawn_install_operation(ctx, tx_notify_ui, id, RequestedInstallOperation::Update); +} + +/// Handles the `UninstallGame` command. +pub async fn handle_uninstall_game_command( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, +) { + let ctx = ctx.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + ctx.task_tracker.clone().spawn(async move { + run_uninstall_operation(&ctx, &tx_notify_ui, id).await; + }); +} + +#[derive(Clone, Copy)] +enum RequestedInstallOperation { + Auto, + Install, + Update, +} + +fn spawn_install_operation( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, + requested: RequestedInstallOperation, +) { + let ctx = ctx.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + ctx.task_tracker.clone().spawn(async move { + run_install_operation(&ctx, &tx_notify_ui, id, requested).await; + }); +} + +async fn run_install_operation( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + id: String, + requested: RequestedInstallOperation, +) { + if !catalog_contains(ctx, &id).await { + log::warn!("Ignoring install command for non-catalog game {id}"); + return; + } + + let game_root = { ctx.game_dir.read().await.join(&id) }; + if !version_ini_is_regular_file(&game_root).await { + log::warn!("Ignoring install command for {id}: version.ini sentinel is absent"); + events::send(tx_notify_ui, PeerEvent::InstallGameFailed { id }); + return; + } + + let local_present = local_dir_is_directory(&game_root).await; + let operation = match requested { + RequestedInstallOperation::Auto | RequestedInstallOperation::Install if local_present => { + InstallOperation::Updating + } + RequestedInstallOperation::Auto | RequestedInstallOperation::Install => { + InstallOperation::Installing + } + RequestedInstallOperation::Update => InstallOperation::Updating, + }; + let operation_kind = match operation { + InstallOperation::Installing => OperationKind::Installing, + InstallOperation::Updating => OperationKind::Updating, + }; + + if !begin_operation(ctx, &id, operation_kind).await { + log::warn!("Operation for {id} already in progress; ignoring install command"); + return; + } + + let _operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone()); + events::send( + tx_notify_ui, + PeerEvent::InstallGameBegin { + id: id.clone(), + operation, + }, + ); + + let result = match operation { + InstallOperation::Installing => { + install::install(&game_root, &id, ctx.unpacker.clone()).await + } + InstallOperation::Updating => install::update(&game_root, &id, ctx.unpacker.clone()).await, + }; + + match result { + Ok(()) => { + events::send( + tx_notify_ui, + PeerEvent::InstallGameFinished { id: id.clone() }, + ); + if let Err(err) = load_local_library(ctx, tx_notify_ui).await { + log::error!("Failed to refresh local library after install: {err}"); + } + } + Err(err) => { + log::error!("Install operation failed for {id}: {err}"); + events::send(tx_notify_ui, PeerEvent::InstallGameFailed { id }); + if let Err(refresh_err) = load_local_library(ctx, tx_notify_ui).await { + log::error!("Failed to refresh local library after install failure: {refresh_err}"); + } + } + } +} + +async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender, id: String) { + if !catalog_contains(ctx, &id).await { + log::warn!("Ignoring uninstall command for non-catalog game {id}"); + return; + } + + if !begin_operation(ctx, &id, OperationKind::Uninstalling).await { + log::warn!("Operation for {id} already in progress; ignoring uninstall command"); + return; + } + + let _operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone()); + let game_root = { ctx.game_dir.read().await.join(&id) }; + events::send( + tx_notify_ui, + PeerEvent::UninstallGameBegin { id: id.clone() }, + ); + + match install::uninstall(&game_root, &id).await { + Ok(()) => { + events::send( + tx_notify_ui, + PeerEvent::UninstallGameFinished { id: id.clone() }, + ); + } + Err(err) => { + log::error!("Uninstall operation failed for {id}: {err}"); + events::send( + tx_notify_ui, + PeerEvent::UninstallGameFailed { id: id.clone() }, + ); + } + } + + if let Err(err) = load_local_library(ctx, tx_notify_ui).await { + log::error!("Failed to refresh local library after uninstall: {err}"); + } +} + +async fn begin_operation(ctx: &Ctx, id: &str, operation: OperationKind) -> bool { + let mut active_operations = ctx.active_operations.write().await; + match active_operations.entry(id.to_string()) { + Entry::Vacant(entry) => { + entry.insert(operation); + true + } + Entry::Occupied(_) => false, + } +} + +async fn catalog_contains(ctx: &Ctx, id: &str) -> bool { + ctx.catalog.read().await.contains(id) +} + /// Handles the `SetGameDir` command. pub async fn handle_set_game_dir_command( ctx: &Ctx, @@ -277,7 +489,9 @@ pub async fn load_local_library( tx_notify_ui: &UnboundedSender, ) -> eyre::Result<()> { let game_dir = { ctx.game_dir.read().await.clone() }; - let scan = scan_local_library(&game_dir).await?; + install::recover_on_startup(&game_dir).await?; + let catalog = ctx.catalog.read().await.clone(); + let scan = scan_local_library(&game_dir, &catalog).await?; update_and_announce_games(ctx, tx_notify_ui, scan).await; Ok(()) } @@ -299,11 +513,30 @@ pub async fn update_and_announce_games( scan: LocalLibraryScan, ) { let LocalLibraryScan { - game_db, - summaries, + mut game_db, + mut summaries, revision, } = scan; + let active_ids = ctx + .active_operations + .read() + .await + .keys() + .cloned() + .collect::>(); + if !active_ids.is_empty() { + let previous = ctx.local_library.read().await.games.clone(); + for id in active_ids { + if let Some(summary) = previous.get(&id) { + summaries.insert(id, summary.clone()); + } else { + summaries.remove(&id); + } + } + game_db = GameDB::from(summaries.values().map(game_from_summary).collect()); + } + let delta = { let mut library_guard = ctx.local_library.write().await; library_guard.update_from_scan(summaries, revision) diff --git a/crates/lanspread-peer/src/install/intent.rs b/crates/lanspread-peer/src/install/intent.rs new file mode 100644 index 0000000..67fd1bd --- /dev/null +++ b/crates/lanspread-peer/src/install/intent.rs @@ -0,0 +1,196 @@ +use std::{ + path::{Path, PathBuf}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncWriteExt; + +const INTENT_SCHEMA_VERSION: u32 = 1; +const INTENT_FILE: &str = ".lanspread.json"; +const INTENT_TMP_FILE: &str = ".lanspread.json.tmp"; + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub enum InstallIntentState { + None, + Installing, + Updating, + Uninstalling, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct InstallIntent { + pub schema_version: u32, + pub id: String, + pub recorded_at: u64, + pub state: InstallIntentState, + pub eti_version: Option, + pub manifest_hash: Option, +} + +impl InstallIntent { + pub fn new(id: &str, state: InstallIntentState, eti_version: Option) -> Self { + Self { + schema_version: INTENT_SCHEMA_VERSION, + id: id.to_string(), + recorded_at: now_unix_secs(), + state, + eti_version, + manifest_hash: None, + } + } + + pub fn none(id: &str, eti_version: Option) -> Self { + Self::new(id, InstallIntentState::None, eti_version) + } +} + +pub fn intent_path(game_root: &Path) -> PathBuf { + game_root.join(INTENT_FILE) +} + +pub fn intent_tmp_path(game_root: &Path) -> PathBuf { + game_root.join(INTENT_TMP_FILE) +} + +pub async fn read_intent(game_root: &Path, id: &str) -> InstallIntent { + let path = intent_path(game_root); + let data = match tokio::fs::read_to_string(&path).await { + Ok(data) => data, + Err(err) => { + if err.kind() != std::io::ErrorKind::NotFound { + log::warn!("Failed to read install intent {}: {err}", path.display()); + } + return InstallIntent::none(id, None); + } + }; + + match serde_json::from_str::(&data) { + Ok(intent) if intent.schema_version == INTENT_SCHEMA_VERSION && intent.id == id => intent, + Ok(intent) => { + log::warn!( + "Ignoring install intent {} with schema {} for id {}", + path.display(), + intent.schema_version, + intent.id + ); + InstallIntent::none(id, None) + } + Err(err) => { + log::warn!("Ignoring corrupt install intent {}: {err}", path.display()); + InstallIntent::none(id, None) + } + } +} + +pub async fn write_intent(game_root: &Path, intent: &InstallIntent) -> eyre::Result<()> { + tokio::fs::create_dir_all(game_root).await?; + let path = intent_path(game_root); + let tmp_path = intent_tmp_path(game_root); + let data = serde_json::to_vec_pretty(intent)?; + + let mut file = tokio::fs::File::create(&tmp_path).await?; + file.write_all(&data).await?; + file.sync_all().await?; + drop(file); + + tokio::fs::rename(&tmp_path, &path).await?; + sync_parent_dir(&path)?; + Ok(()) +} + +fn now_unix_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +#[cfg(unix)] +fn sync_parent_dir(path: &Path) -> std::io::Result<()> { + if let Some(parent) = path.parent() { + std::fs::File::open(parent)?.sync_all()?; + } + Ok(()) +} + +#[cfg(not(unix))] +fn sync_parent_dir(_path: &Path) -> std::io::Result<()> { + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::path::{Path, PathBuf}; + + use super::*; + + struct TempDir(PathBuf); + + impl TempDir { + fn new() -> Self { + let mut path = std::env::temp_dir(); + path.push(format!( + "lanspread-intent-{}-{}", + std::process::id(), + now_unix_secs() + )); + path.push(format!("{:?}", std::thread::current().id()).replace(['(', ')'], "")); + std::fs::create_dir_all(&path).expect("temp dir should be created"); + Self(path) + } + + fn path(&self) -> &Path { + &self.0 + } + } + + impl Drop for TempDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + #[tokio::test] + async fn tmp_write_without_rename_leaves_previous_intent_intact() { + let temp = TempDir::new(); + let previous = InstallIntent::new( + "game", + InstallIntentState::Updating, + Some("20240101".to_string()), + ); + write_intent(temp.path(), &previous) + .await + .expect("previous intent should be written"); + + tokio::fs::write( + intent_tmp_path(temp.path()), + serde_json::to_vec(&InstallIntent::new( + "game", + InstallIntentState::Installing, + Some("20250101".to_string()), + )) + .expect("intent should serialize"), + ) + .await + .expect("tmp intent should be written"); + + let recovered = read_intent(temp.path(), "game").await; + assert_eq!(recovered.state, InstallIntentState::Updating); + assert_eq!(recovered.eti_version.as_deref(), Some("20240101")); + } + + #[tokio::test] + async fn schema_mismatch_is_treated_as_missing() { + let temp = TempDir::new(); + tokio::fs::write( + intent_path(temp.path()), + r#"{"schema_version":2,"id":"game","recorded_at":0,"state":"Updating"}"#, + ) + .await + .expect("intent should be written"); + + let recovered = read_intent(temp.path(), "game").await; + assert_eq!(recovered.state, InstallIntentState::None); + } +} diff --git a/crates/lanspread-peer/src/install/mod.rs b/crates/lanspread-peer/src/install/mod.rs new file mode 100644 index 0000000..ec4d465 --- /dev/null +++ b/crates/lanspread-peer/src/install/mod.rs @@ -0,0 +1,6 @@ +mod intent; +mod transaction; +pub mod unpack; + +pub use transaction::{install, recover_on_startup, uninstall, update}; +pub use unpack::{UnpackFuture, Unpacker}; diff --git a/crates/lanspread-peer/src/install/transaction.rs b/crates/lanspread-peer/src/install/transaction.rs new file mode 100644 index 0000000..3cf2eea --- /dev/null +++ b/crates/lanspread-peer/src/install/transaction.rs @@ -0,0 +1,651 @@ +use std::{ + io::ErrorKind, + path::{Path, PathBuf}, + sync::Arc, +}; + +use eyre::WrapErr; + +use super::{ + intent::{InstallIntent, InstallIntentState, read_intent, write_intent}, + unpack::Unpacker, +}; +use crate::local_games::version_ini_is_regular_file; + +const LOCAL_DIR: &str = "local"; +const INSTALLING_DIR: &str = ".local.installing"; +const BACKUP_DIR: &str = ".local.backup"; +const OWNED_MARKER: &str = ".lanspread_owned"; +const VERSION_TMP_FILE: &str = ".version.ini.tmp"; +const VERSION_DISCARDED_FILE: &str = ".version.ini.discarded"; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum FsEntryState { + Present, + Missing, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +struct InstallFsState { + local: FsEntryState, + installing: FsEntryState, + backup: FsEntryState, +} + +pub async fn install(game_root: &Path, id: &str, unpacker: Arc) -> eyre::Result<()> { + let eti_version = read_downloaded_version(game_root).await; + write_intent( + game_root, + &InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()), + ) + .await?; + + let result = install_inner(game_root, id, unpacker).await; + match result { + Ok(()) => { + write_intent(game_root, &InstallIntent::none(id, eti_version)).await?; + Ok(()) + } + Err(err) => { + if let Err(cleanup_err) = remove_dir_all_if_exists(&installing_dir(game_root)).await { + log::warn!( + "Failed to clean install staging {}: {cleanup_err}", + installing_dir(game_root).display() + ); + } + write_intent(game_root, &InstallIntent::none(id, eti_version)).await?; + Err(err) + } + } +} + +pub async fn update(game_root: &Path, id: &str, unpacker: Arc) -> eyre::Result<()> { + let eti_version = read_downloaded_version(game_root).await; + write_intent( + game_root, + &InstallIntent::new(id, InstallIntentState::Updating, eti_version.clone()), + ) + .await?; + + let result = update_inner(game_root, id, unpacker).await; + match result { + Ok(()) => { + write_intent(game_root, &InstallIntent::none(id, eti_version)).await?; + if let Err(err) = remove_dir_all_if_exists(&backup_dir(game_root)).await { + log::warn!( + "Failed to clean install backup {}: {err}", + backup_dir(game_root).display() + ); + } + Ok(()) + } + Err(err) => { + let rollback = rollback_update(game_root).await; + write_intent(game_root, &InstallIntent::none(id, eti_version)).await?; + if let Err(rollback_err) = rollback { + return Err(err.wrap_err(format!("rollback also failed: {rollback_err}"))); + } + Err(err) + } + } +} + +pub async fn uninstall(game_root: &Path, id: &str) -> eyre::Result<()> { + let eti_version = read_downloaded_version(game_root).await; + write_intent( + game_root, + &InstallIntent::new(id, InstallIntentState::Uninstalling, eti_version.clone()), + ) + .await?; + + let result = uninstall_inner(game_root).await; + match result { + Ok(()) => { + write_intent(game_root, &InstallIntent::none(id, eti_version)).await?; + Ok(()) + } + Err(err) => { + let rollback = restore_backup(game_root).await; + if let Err(rollback_err) = rollback { + return Err(err.wrap_err(format!("rollback also failed: {rollback_err}"))); + } + write_intent(game_root, &InstallIntent::none(id, eti_version)).await?; + Err(err) + } + } +} + +pub async fn recover_on_startup(game_dir: &Path) -> eyre::Result<()> { + recover_download_transients(game_dir).await?; + + let mut entries = match tokio::fs::read_dir(game_dir).await { + Ok(entries) => entries, + Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()), + Err(err) => return Err(err.into()), + }; + + while let Some(entry) = entries.next_entry().await? { + if !entry.file_type().await?.is_dir() { + continue; + } + + let Some(id) = entry.file_name().to_str().map(ToOwned::to_owned) else { + continue; + }; + if id == ".lanspread" { + continue; + } + + recover_game_root(&entry.path(), &id).await?; + } + + Ok(()) +} + +pub async fn recover_game_root(game_root: &Path, id: &str) -> eyre::Result<()> { + recover_download_transients(game_root).await?; + + let intent = read_intent(game_root, id).await; + let fs = inspect_install_fs(game_root).await; + match intent.state { + InstallIntentState::None => recover_none_intent(game_root).await?, + InstallIntentState::Installing => recover_installing(game_root, id, intent, fs).await?, + InstallIntentState::Updating => recover_updating(game_root, id, intent, fs).await?, + InstallIntentState::Uninstalling => recover_uninstalling(game_root, id, intent, fs).await?, + } + Ok(()) +} + +async fn install_inner( + game_root: &Path, + id: &str, + unpacker: Arc, +) -> eyre::Result<()> { + let local = local_dir(game_root); + if path_is_dir(&local).await { + eyre::bail!("game {id} is already installed"); + } + + let staging = installing_dir(game_root); + prepare_owned_empty_dir(&staging).await?; + unpack_archives(game_root, &staging, unpacker).await?; + tokio::fs::rename(&staging, &local) + .await + .wrap_err_with(|| format!("failed to promote install for {id}"))?; + Ok(()) +} + +async fn update_inner(game_root: &Path, id: &str, unpacker: Arc) -> eyre::Result<()> { + let local = local_dir(game_root); + let backup = backup_dir(game_root); + let staging = installing_dir(game_root); + + if !path_is_dir(&local).await { + eyre::bail!("game {id} is not installed"); + } + prepare_backup_slot(&backup).await?; + tokio::fs::rename(&local, &backup) + .await + .wrap_err_with(|| format!("failed to move existing install for {id} to backup"))?; + drop_owned_marker(&backup).await?; + + prepare_owned_empty_dir(&staging).await?; + unpack_archives(game_root, &staging, unpacker).await?; + tokio::fs::rename(&staging, &local) + .await + .wrap_err_with(|| format!("failed to promote update for {id}"))?; + Ok(()) +} + +async fn uninstall_inner(game_root: &Path) -> eyre::Result<()> { + let local = local_dir(game_root); + let backup = backup_dir(game_root); + + if !path_is_dir(&local).await { + return Ok(()); + } + + prepare_backup_slot(&backup).await?; + tokio::fs::rename(&local, &backup).await?; + drop_owned_marker(&backup).await?; + tokio::fs::remove_dir_all(&backup).await?; + Ok(()) +} + +async fn unpack_archives( + game_root: &Path, + staging: &Path, + unpacker: Arc, +) -> eyre::Result<()> { + let archives = root_eti_archives(game_root).await?; + if archives.is_empty() { + eyre::bail!("no .eti archives found in {}", game_root.display()); + } + + for archive in archives { + unpacker.unpack(&archive, staging).await?; + } + Ok(()) +} + +async fn root_eti_archives(game_root: &Path) -> eyre::Result> { + let mut entries = tokio::fs::read_dir(game_root).await?; + let mut archives = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + if !entry.file_type().await?.is_file() { + continue; + } + let path = entry.path(); + if path.extension().is_some_and(|extension| extension == "eti") { + archives.push(path); + } + } + archives.sort(); + Ok(archives) +} + +async fn recover_none_intent(game_root: &Path) -> eyre::Result<()> { + sweep_owned_orphan(&installing_dir(game_root)).await?; + sweep_owned_orphan(&backup_dir(game_root)).await?; + Ok(()) +} + +async fn recover_installing( + game_root: &Path, + id: &str, + intent: InstallIntent, + fs: InstallFsState, +) -> eyre::Result<()> { + if let InstallFsState { + installing: FsEntryState::Present, + .. + } = fs + { + remove_dir_all_if_exists(&installing_dir(game_root)).await?; + } + write_intent(game_root, &InstallIntent::none(id, intent.eti_version)).await +} + +async fn recover_updating( + game_root: &Path, + id: &str, + intent: InstallIntent, + fs: InstallFsState, +) -> eyre::Result<()> { + match fs { + InstallFsState { + local: FsEntryState::Missing, + installing: FsEntryState::Present, + backup: FsEntryState::Present, + } => { + remove_dir_all_if_exists(&installing_dir(game_root)).await?; + restore_backup(game_root).await?; + } + InstallFsState { + local: FsEntryState::Present, + installing: FsEntryState::Present, + backup: FsEntryState::Present, + } => { + remove_dir_all_if_exists(&installing_dir(game_root)).await?; + remove_dir_all_if_exists(&backup_dir(game_root)).await?; + } + InstallFsState { + local: FsEntryState::Present, + installing: FsEntryState::Missing, + backup: FsEntryState::Present, + } => remove_dir_all_if_exists(&backup_dir(game_root)).await?, + _ => {} + } + write_intent(game_root, &InstallIntent::none(id, intent.eti_version)).await +} + +async fn recover_uninstalling( + game_root: &Path, + id: &str, + intent: InstallIntent, + fs: InstallFsState, +) -> eyre::Result<()> { + match fs { + InstallFsState { + local: FsEntryState::Missing, + installing: FsEntryState::Missing, + backup: FsEntryState::Present, + } => remove_dir_all_if_exists(&backup_dir(game_root)).await?, + InstallFsState { + local: FsEntryState::Present, + installing: FsEntryState::Missing, + backup: FsEntryState::Missing, + } => uninstall_inner(game_root).await?, + _ => {} + } + write_intent(game_root, &InstallIntent::none(id, intent.eti_version)).await +} + +async fn recover_download_transients(root: &Path) -> eyre::Result<()> { + remove_file_if_exists(&root.join(VERSION_TMP_FILE)).await?; + remove_file_if_exists(&root.join(VERSION_DISCARDED_FILE)).await?; + Ok(()) +} + +async fn inspect_install_fs(game_root: &Path) -> InstallFsState { + InstallFsState { + local: path_is_dir(&local_dir(game_root)).await.into(), + installing: path_is_dir(&installing_dir(game_root)).await.into(), + backup: path_is_dir(&backup_dir(game_root)).await.into(), + } +} + +async fn read_downloaded_version(game_root: &Path) -> Option { + if !version_ini_is_regular_file(game_root).await { + return None; + } + match lanspread_db::db::read_version_from_ini(game_root) { + Ok(version) => version, + Err(err) => { + log::warn!( + "Failed to read version.ini in {}: {err}", + game_root.display() + ); + None + } + } +} + +async fn prepare_owned_empty_dir(path: &Path) -> eyre::Result<()> { + if path.exists() { + if owned_marker(path).is_file() { + tokio::fs::remove_dir_all(path).await?; + } else { + eyre::bail!("refusing to reuse markerless directory {}", path.display()); + } + } + tokio::fs::create_dir_all(path).await?; + drop_owned_marker(path).await +} + +async fn prepare_backup_slot(path: &Path) -> eyre::Result<()> { + if !path.exists() { + return Ok(()); + } + if owned_marker(path).is_file() { + tokio::fs::remove_dir_all(path).await?; + return Ok(()); + } + eyre::bail!("refusing to replace markerless backup {}", path.display()); +} + +async fn drop_owned_marker(path: &Path) -> eyre::Result<()> { + tokio::fs::write(owned_marker(path), []).await?; + Ok(()) +} + +async fn sweep_owned_orphan(path: &Path) -> eyre::Result<()> { + if !path.exists() { + return Ok(()); + } + if owned_marker(path).is_file() { + remove_dir_all_if_exists(path).await?; + } else { + log::warn!( + "Leaving markerless reserved directory untouched: {}", + path.display() + ); + } + Ok(()) +} + +async fn rollback_update(game_root: &Path) -> eyre::Result<()> { + remove_dir_all_if_exists(&installing_dir(game_root)).await?; + restore_backup(game_root).await +} + +async fn restore_backup(game_root: &Path) -> eyre::Result<()> { + let local = local_dir(game_root); + let backup = backup_dir(game_root); + if !path_is_dir(&backup).await { + return Ok(()); + } + remove_dir_all_if_exists(&local).await?; + tokio::fs::rename(&backup, &local).await?; + Ok(()) +} + +async fn remove_file_if_exists(path: &Path) -> eyre::Result<()> { + match tokio::fs::remove_file(path).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.into()), + } +} + +async fn remove_dir_all_if_exists(path: &Path) -> eyre::Result<()> { + match tokio::fs::remove_dir_all(path).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.into()), + } +} + +async fn path_is_dir(path: &Path) -> bool { + tokio::fs::metadata(path) + .await + .is_ok_and(|metadata| metadata.is_dir()) +} + +fn local_dir(game_root: &Path) -> PathBuf { + game_root.join(LOCAL_DIR) +} + +fn installing_dir(game_root: &Path) -> PathBuf { + game_root.join(INSTALLING_DIR) +} + +fn backup_dir(game_root: &Path) -> PathBuf { + game_root.join(BACKUP_DIR) +} + +fn owned_marker(path: &Path) -> PathBuf { + path.join(OWNED_MARKER) +} + +impl From for FsEntryState { + fn from(value: bool) -> Self { + if value { Self::Present } else { Self::Missing } + } +} + +#[cfg(test)] +mod tests { + use std::{ + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + }; + + use super::*; + use crate::install::unpack::UnpackFuture; + + #[derive(Default)] + struct FakeUnpacker { + fail: bool, + archives: Mutex>, + } + + impl FakeUnpacker { + fn failing() -> Self { + Self { + fail: true, + archives: Mutex::new(Vec::new()), + } + } + } + + impl Unpacker for FakeUnpacker { + fn unpack<'a>(&'a self, archive: &'a Path, dest: &'a Path) -> UnpackFuture<'a> { + Box::pin(async move { + self.archives + .lock() + .expect("archive list should not be poisoned") + .push(archive.to_path_buf()); + if self.fail { + eyre::bail!("forced unpack failure"); + } + tokio::fs::write(dest.join("payload.txt"), b"installed").await?; + Ok(()) + }) + } + } + + struct TempDir(PathBuf); + + impl TempDir { + fn new() -> Self { + let mut path = std::env::temp_dir(); + path.push(format!( + "lanspread-install-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + std::fs::create_dir_all(&path).expect("temp dir should be created"); + Self(path) + } + + fn game_root(&self) -> PathBuf { + self.0.join("game") + } + } + + impl Drop for TempDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn write_file(path: &Path, bytes: &[u8]) { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).expect("parent dir should be created"); + } + std::fs::write(path, bytes).expect("file should be written"); + } + + fn successful_unpacker() -> Arc { + Arc::new(FakeUnpacker::default()) + } + + #[tokio::test] + async fn install_success_promotes_staging_and_clears_intent() { + let temp = TempDir::new(); + let root = temp.game_root(); + write_file(&root.join("game.eti"), b"archive"); + write_file(&root.join("version.ini"), b"20250101"); + + install(&root, "game", successful_unpacker()) + .await + .expect("install should succeed"); + + assert!(root.join("local").join("payload.txt").is_file()); + assert!(!root.join(".local.installing").exists()); + let intent = read_intent(&root, "game").await; + assert_eq!(intent.state, InstallIntentState::None); + } + + #[tokio::test] + async fn update_failure_restores_previous_local() { + let temp = TempDir::new(); + let root = temp.game_root(); + write_file(&root.join("game.eti"), b"archive"); + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join("local").join("old.txt"), b"old"); + + let err = update(&root, "game", Arc::new(FakeUnpacker::failing())) + .await + .expect_err("update should fail"); + + assert!(err.to_string().contains("forced unpack failure")); + assert!(root.join("local").join("old.txt").is_file()); + assert!(!root.join(".local.installing").exists()); + assert!(!root.join(".local.backup").exists()); + let intent = read_intent(&root, "game").await; + assert_eq!(intent.state, InstallIntentState::None); + } + + #[tokio::test] + async fn uninstall_removes_only_local_install() { + let temp = TempDir::new(); + let root = temp.game_root(); + write_file(&root.join("game.eti"), b"archive"); + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join("local").join("payload.txt"), b"installed"); + + uninstall(&root, "game") + .await + .expect("uninstall should succeed"); + + assert!(!root.join("local").exists()); + assert!(root.join("game.eti").is_file()); + assert!(root.join("version.ini").is_file()); + } + + #[tokio::test] + async fn recovery_restores_backup_for_interrupted_update() { + let temp = TempDir::new(); + let root = temp.game_root(); + write_file(&root.join("version.ini"), b"20250101"); + write_file(&root.join(".local.backup").join("old.txt"), b"old"); + write_file(&root.join(".local.installing").join("new.txt"), b"new"); + write_file(&root.join(".local.backup").join(OWNED_MARKER), b""); + write_file(&root.join(".local.installing").join(OWNED_MARKER), b""); + write_intent( + &root, + &InstallIntent::new( + "game", + InstallIntentState::Updating, + Some("20250101".into()), + ), + ) + .await + .expect("intent should be written"); + + recover_game_root(&root, "game") + .await + .expect("recovery should succeed"); + + assert!(root.join("local").join("old.txt").is_file()); + assert!(!root.join(".local.installing").exists()); + assert!(!root.join(".local.backup").exists()); + assert_eq!( + read_intent(&root, "game").await.state, + InstallIntentState::None + ); + } + + #[tokio::test] + async fn none_recovery_leaves_markerless_reserved_dirs_untouched() { + let temp = TempDir::new(); + let root = temp.game_root(); + write_file(&root.join(".local.backup").join("user.txt"), b"user"); + + recover_game_root(&root, "game") + .await + .expect("recovery should succeed"); + + assert!(root.join(".local.backup").join("user.txt").is_file()); + } + + #[tokio::test] + async fn download_recovery_sweeps_reserved_version_files() { + let temp = TempDir::new(); + let root = temp.game_root(); + write_file(&root.join(VERSION_TMP_FILE), b"tmp"); + write_file(&root.join(VERSION_DISCARDED_FILE), b"old"); + + recover_game_root(&root, "game") + .await + .expect("recovery should succeed"); + + assert!(!root.join(VERSION_TMP_FILE).exists()); + assert!(!root.join(VERSION_DISCARDED_FILE).exists()); + } +} diff --git a/crates/lanspread-peer/src/install/unpack.rs b/crates/lanspread-peer/src/install/unpack.rs new file mode 100644 index 0000000..44ee67e --- /dev/null +++ b/crates/lanspread-peer/src/install/unpack.rs @@ -0,0 +1,12 @@ +use std::{future::Future, path::Path, pin::Pin}; + +/// Boxed future returned by an injected game archive unpacker. +pub type UnpackFuture<'a> = Pin> + Send + 'a>>; + +/// Extracts one archive into a staging directory. +/// +/// The peer crate owns the install transaction while the shell-specific unrar +/// integration stays in the Tauri crate through this injected trait. +pub trait Unpacker: Send + Sync { + fn unpack<'a>(&'a self, archive: &'a Path, dest: &'a Path) -> UnpackFuture<'a>; +} diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 2ac864f..f19e7c2 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -19,6 +19,7 @@ mod error; mod events; mod handlers; mod identity; +mod install; mod library; mod local_games; mod network; @@ -33,10 +34,11 @@ mod startup; // Public re-exports // ============================================================================= -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{collections::HashSet, net::SocketAddr, path::PathBuf, sync::Arc}; pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT}; pub use error::PeerError; +pub use install::{UnpackFuture, Unpacker}; use lanspread_db::db::{Game, GameFileDescription}; pub use peer_db::{MajorityValidationResult, PeerGameDB, PeerId, PeerInfo, PeerUpsert}; use tokio::sync::{ @@ -52,8 +54,11 @@ use crate::{ handle_download_game_files_command, handle_get_game_command, handle_get_peer_count_command, + handle_install_game_command, handle_list_games_command, handle_set_game_dir_command, + handle_uninstall_game_command, + handle_update_game_command, load_local_library, }, }; @@ -80,6 +85,21 @@ pub enum PeerEvent { DownloadGameFilesFailed { id: String }, /// All peers with the game have disconnected during download. DownloadGameFilesAllPeersGone { id: String }, + /// Install or update transaction has started for a game. + InstallGameBegin { + id: String, + operation: InstallOperation, + }, + /// Install or update transaction has completed successfully. + InstallGameFinished { id: String }, + /// Install or update transaction has failed after rollback. + InstallGameFailed { id: String }, + /// Uninstall transaction has started for a game. + UninstallGameBegin { id: String }, + /// Uninstall transaction has completed successfully. + UninstallGameFinished { id: String }, + /// Uninstall transaction has failed after rollback. + UninstallGameFailed { id: String }, /// No peers have the requested game. NoPeersHaveGame { id: String }, /// A peer has connected. @@ -116,6 +136,15 @@ pub enum PeerRuntimeComponent { LocalMonitor, } +/// Install-side operation represented in lifecycle events. +#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::IntoStaticStr)] +pub enum InstallOperation { + /// Fresh install into a missing `local/` directory. + Installing, + /// Update that replaces an existing `local/` directory. + Updating, +} + /// Commands sent to the peer system from the UI. #[derive(Clone, Debug)] pub enum PeerCommand { @@ -128,6 +157,12 @@ pub enum PeerCommand { id: String, file_descriptions: Vec, }, + /// Install already-downloaded archives into `local/`. + InstallGame { id: String }, + /// Update an installed game from already-downloaded archives. + UpdateGame { id: String }, + /// Remove only the `local/` install for a game. + UninstallGame { id: String }, /// Set the local game directory. SetGameDir(PathBuf), /// Request the current peer count. @@ -150,10 +185,13 @@ pub enum PeerCommand { /// * `game_dir` - Path to the local game directory /// * `tx_notify_ui` - Channel for sending events to the UI /// * `peer_game_db` - Shared peer game database +#[allow(clippy::implicit_hasher)] pub fn start_peer( game_dir: impl Into, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, + unpacker: Arc, + catalog: Arc>>, ) -> eyre::Result { let game_dir = game_dir.into(); log::info!( @@ -171,20 +209,33 @@ pub fn start_peer( peer_game_db, peer_id, game_dir, + unpacker, + catalog, )) } /// Main peer execution loop that handles peer commands and manages the peer system. +#[allow(clippy::too_many_arguments, clippy::implicit_hasher)] async fn run_peer( mut rx_control: UnboundedReceiver, tx_notify_ui: UnboundedSender, peer_game_db: Arc>, peer_id: String, game_dir: PathBuf, + unpacker: Arc, shutdown: CancellationToken, task_tracker: TaskTracker, + catalog: Arc>>, ) -> eyre::Result<()> { - let ctx = Ctx::new(peer_game_db, peer_id, game_dir, shutdown, task_tracker); + let ctx = Ctx::new( + peer_game_db, + peer_id, + game_dir, + unpacker, + shutdown, + task_tracker, + catalog, + ); if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await { log::error!("Failed to load initial local game database: {err}"); } @@ -237,6 +288,15 @@ async fn handle_peer_commands( } => { handle_download_game_files_command(ctx, tx_notify_ui, id, file_descriptions).await; } + PeerCommand::InstallGame { id } => { + handle_install_game_command(ctx, tx_notify_ui, id).await; + } + PeerCommand::UpdateGame { id } => { + handle_update_game_command(ctx, tx_notify_ui, id).await; + } + PeerCommand::UninstallGame { id } => { + handle_uninstall_game_command(ctx, tx_notify_ui, id).await; + } PeerCommand::SetGameDir(game_dir) => { handle_set_game_dir_command(ctx, tx_notify_ui, game_dir).await; } diff --git a/crates/lanspread-peer/src/local_games.rs b/crates/lanspread-peer/src/local_games.rs index fe9d1a8..d779444 100644 --- a/crates/lanspread-peer/src/local_games.rs +++ b/crates/lanspread-peer/src/local_games.rs @@ -12,7 +12,7 @@ use lanspread_db::db::{Game, GameDB, GameFileDescription}; use lanspread_proto::{Availability, GameSummary}; use serde::{Deserialize, Serialize}; -use crate::error::PeerError; +use crate::{context::OperationKind, error::PeerError}; // ============================================================================= // Local directory helpers @@ -28,51 +28,41 @@ pub fn is_local_dir_name(name: &str) -> bool { name == "local" } -/// Checks if a local directory has any content. -pub async fn local_dir_has_content(path: &Path) -> bool { +/// Checks if `local/` is a committed install directory. +pub async fn local_dir_is_directory(path: &Path) -> bool { let local_dir = path.join("local"); - if tokio::fs::metadata(&local_dir).await.is_err() { - return false; - } + tokio::fs::metadata(&local_dir) + .await + .is_ok_and(|metadata| metadata.is_dir()) +} - let mut entries = match tokio::fs::read_dir(&local_dir).await { - Ok(entries) => entries, - Err(e) => { - log::warn!("Failed to read local dir {}: {e}", local_dir.display()); - return false; - } - }; - - match entries.next_entry().await { - Ok(Some(_)) => true, - Ok(None) => false, - Err(e) => { - log::warn!("Failed to iterate local dir {}: {e}", local_dir.display()); - false - } - } +/// Checks if the root-level `version.ini` sentinel exists as a regular file. +pub async fn version_ini_is_regular_file(game_path: &Path) -> bool { + let version_path = game_path.join("version.ini"); + tokio::fs::metadata(&version_path) + .await + .is_ok_and(|metadata| metadata.is_file()) } /// Checks if a game is available for download locally. pub async fn local_download_available( game_dir: &Path, game_id: &str, - downloading_games: &HashSet, + active_operations: &HashMap, + catalog: &HashSet, ) -> bool { - if downloading_games.contains(game_id) { - log::debug!("Not serving game {game_id} locally because it is still downloading"); + if !catalog.contains(game_id) { + log::debug!("Not serving game {game_id} locally because it is not in the catalog"); + return false; + } + + if active_operations.contains_key(game_id) { + log::debug!("Not serving game {game_id} locally because an operation is active"); return false; } let game_path = game_dir.join(game_id); - let eti_path = game_path.join(format!("{game_id}.eti")); - - if tokio::fs::metadata(&eti_path).await.is_err() { - return false; - } - - // Only treat as pending install if the local installation directory is empty/missing - !local_dir_has_content(game_path.as_path()).await + version_ini_is_regular_file(game_path.as_path()).await } // ============================================================================= @@ -81,6 +71,9 @@ pub async fn local_download_available( const LIBRARY_INDEX_DIR: &str = ".lanspread"; const LIBRARY_INDEX_FILE: &str = "library_index.json"; +const INTENT_LOG_FILE: &str = ".lanspread.json"; +const VERSION_TMP_FILE: &str = ".version.ini.tmp"; +const VERSION_DISCARDED_FILE: &str = ".version.ini.discarded"; #[derive(Debug, Clone, Serialize, Deserialize)] struct LibraryIndex { @@ -96,12 +89,18 @@ struct GameIndexEntry { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] struct GameFingerprint { - eti_size: Option, - eti_mtime: Option, + eti_files: Vec, version_mtime: Option, local_dir_present: bool, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +struct EtiFingerprint { + name: String, + size: u64, + mtime: Option, +} + #[derive(Debug, Clone)] pub struct LocalLibraryScan { pub game_db: GameDB, @@ -154,32 +153,75 @@ fn system_time_to_secs(time: SystemTime) -> u64 { .as_secs() } -async fn fingerprint_game_dir(game_path: &Path, game_id: &str) -> eyre::Result { - let eti_path = game_path.join(format!("{game_id}.eti")); - let (eti_size, eti_mtime) = match tokio::fs::metadata(&eti_path).await { - Ok(metadata) => ( - Some(metadata.len()), - metadata.modified().ok().map(system_time_to_secs), - ), - Err(_) => (None, None), +fn is_root_eti_name(name: &str) -> bool { + Path::new(name) + .extension() + .is_some_and(|extension| extension == "eti") +} + +async fn root_eti_fingerprints(game_path: &Path) -> eyre::Result> { + let mut entries = match tokio::fs::read_dir(game_path).await { + Ok(entries) => entries, + Err(err) if err.kind() == ErrorKind::NotFound => return Ok(Vec::new()), + Err(err) => return Err(err.into()), }; + let mut eti_files = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + let file_type = entry.file_type().await?; + if !file_type.is_file() { + continue; + } + + let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { + continue; + }; + if !is_root_eti_name(&name) { + continue; + } + + let metadata = entry.metadata().await?; + eti_files.push(EtiFingerprint { + name, + size: metadata.len(), + mtime: metadata.modified().ok().map(system_time_to_secs), + }); + } + + eti_files.sort_by(|a, b| a.name.cmp(&b.name)); + Ok(eti_files) +} + +async fn fingerprint_game_dir(game_path: &Path) -> eyre::Result { + let eti_files = root_eti_fingerprints(game_path).await?; + let version_path = game_path.join("version.ini"); let version_mtime = match tokio::fs::metadata(&version_path).await { - Ok(metadata) => metadata.modified().ok().map(system_time_to_secs), - Err(_) => None, + Ok(metadata) if metadata.is_file() => metadata.modified().ok().map(system_time_to_secs), + Err(_) | Ok(_) => None, }; - let local_dir_present = local_dir_has_content(game_path).await; + let local_dir_present = local_dir_is_directory(game_path).await; Ok(GameFingerprint { - eti_size, - eti_mtime, + eti_files, version_mtime, local_dir_present, }) } +pub fn is_ignored_game_root_name(name: &str) -> bool { + name == LIBRARY_INDEX_DIR +} + +fn is_reserved_transient_name(name: &str) -> bool { + name.starts_with(".local.") + || name == VERSION_TMP_FILE + || name == VERSION_DISCARDED_FILE + || name == INTENT_LOG_FILE + || name == LIBRARY_INDEX_DIR +} + fn should_skip_root_entry(entry: &walkdir::DirEntry) -> bool { if entry.depth() != 1 { return false; @@ -190,6 +232,9 @@ fn should_skip_root_entry(entry: &walkdir::DirEntry) -> bool { } if let Some(name) = entry.file_name().to_str() { + if is_reserved_transient_name(name) { + return true; + } if entry.file_type().is_dir() && name == ".sync" { return true; } @@ -282,20 +327,14 @@ fn manifest_hash(file_descriptions: &[GameFileDescription]) -> u64 { async fn build_game_summary(game_dir: &Path, game_id: &str) -> Result { let game_path = game_dir.join(game_id); - let eti_path = game_path.join(format!("{game_id}.eti")); - let downloaded = tokio::fs::metadata(&eti_path).await.is_ok(); - if !downloaded { - return Err(PeerError::Other(eyre::eyre!( - "Game is not downloaded: {game_id}" - ))); - } + let downloaded = version_ini_is_regular_file(&game_path).await; + let installed = local_dir_is_directory(&game_path).await; - let installed = local_dir_has_content(&game_path).await; - let eti_version = if installed { + let eti_version = if downloaded { match lanspread_db::db::read_version_from_ini(&game_path) { Ok(version) => version, Err(e) => { - log::warn!("Failed to read version.ini for installed game {game_id}: {e}"); + log::warn!("Failed to read version.ini for downloaded game {game_id}: {e}"); None } } @@ -310,6 +349,11 @@ async fn build_game_summary(game_dir: &Path, game_id: &str) -> Result Result Game { +pub(crate) fn game_from_summary(summary: &GameSummary) -> Game { Game { id: summary.id.clone(), name: summary.name.clone(), @@ -350,12 +394,23 @@ struct IndexUpdate { async fn update_index_for_game( game_root: &Path, game_id: &str, + catalog: &HashSet, index: &mut LibraryIndex, ) -> eyre::Result { - let game_path = game_root.join(game_id); - let fingerprint = fingerprint_game_dir(&game_path, game_id).await?; + if !catalog.contains(game_id) { + return Ok(IndexUpdate { + summary: None, + changed: index.games.remove(game_id).is_some(), + }); + } - if fingerprint.eti_size.is_none() { + let game_path = game_root.join(game_id); + let fingerprint = fingerprint_game_dir(&game_path).await?; + + if fingerprint.version_mtime.is_none() + && !fingerprint.local_dir_present + && fingerprint.eti_files.is_empty() + { return Ok(IndexUpdate { summary: None, changed: index.games.remove(game_id).is_some(), @@ -401,12 +456,34 @@ fn empty_scan() -> LocalLibraryScan { } } +fn scan_from_index(index: &LibraryIndex) -> LocalLibraryScan { + let summaries = index + .games + .iter() + .map(|(id, entry)| (id.clone(), entry.summary.clone())) + .collect::>(); + let games = index + .games + .values() + .map(|entry| game_from_summary(&entry.summary)) + .collect::>(); + + LocalLibraryScan { + game_db: GameDB::from(games), + summaries, + revision: index.revision, + } +} + // ============================================================================= // Game database loading // ============================================================================= /// Scans the local game directory and returns summaries plus a game database. -pub async fn scan_local_library(game_dir: impl AsRef) -> eyre::Result { +pub async fn scan_local_library( + game_dir: impl AsRef, + catalog: &HashSet, +) -> eyre::Result { let game_path = game_dir.as_ref(); let metadata = match tokio::fs::metadata(game_path).await { @@ -448,8 +525,11 @@ pub async fn scan_local_library(game_dir: impl AsRef) -> eyre::Result) -> eyre::Result, + catalog: &HashSet, + game_id: &str, +) -> eyre::Result { + let game_path = game_dir.as_ref(); + let index_path = library_index_path(game_path); + let mut index = load_library_index(&index_path).await; + + let update = update_index_for_game(game_path, game_id, catalog, &mut index).await?; + if update.changed { + index.revision = index.revision.saturating_add(1); + if let Err(err) = save_library_index(&index_path, &index).await { + log::warn!( + "Failed to persist library index {}: {err}", + index_path.display() + ); + } + } + + Ok(scan_from_index(&index)) +} + // ============================================================================= // Game file descriptions // ============================================================================= @@ -495,3 +599,123 @@ pub async fn get_game_file_descriptions( ) -> Result, PeerError> { scan_game_descriptions(game_id, game_dir.as_ref()).await } + +#[cfg(test)] +mod tests { + use std::{ + collections::{HashMap, HashSet}, + path::{Path, PathBuf}, + }; + + use lanspread_proto::Availability; + + use super::*; + use crate::context::OperationKind; + + struct TempDir(PathBuf); + + impl TempDir { + fn new() -> Self { + let mut path = std::env::temp_dir(); + path.push(format!( + "lanspread-local-games-{}-{}", + std::process::id(), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + std::fs::create_dir_all(&path).expect("temp dir should be created"); + Self(path) + } + + fn path(&self) -> &Path { + &self.0 + } + } + + impl Drop for TempDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn write_file(path: &Path, bytes: &[u8]) { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).expect("parent dir should be created"); + } + std::fs::write(path, bytes).expect("file should be written"); + } + + #[tokio::test] + async fn scan_uses_version_ini_and_local_dir_as_independent_state() { + let temp = TempDir::new(); + let catalog = HashSet::from([ + "ready".to_string(), + "local-only".to_string(), + "eti-only".to_string(), + ]); + + write_file(&temp.path().join("ready").join("version.ini"), b"20250101"); + std::fs::create_dir_all(temp.path().join("local-only").join("local")) + .expect("local dir should be created"); + write_file( + &temp.path().join("eti-only").join("eti-only.eti"), + b"archive", + ); + write_file( + &temp.path().join("non-catalog").join("version.ini"), + b"20250101", + ); + + let scan = scan_local_library(temp.path(), &catalog) + .await + .expect("scan should succeed"); + + let ready = scan + .summaries + .get("ready") + .expect("catalog game with sentinel should be indexed"); + assert!(ready.downloaded); + assert!(!ready.installed); + assert_eq!(ready.eti_version.as_deref(), Some("20250101")); + assert_eq!(ready.availability, Availability::Ready); + + let local_only = scan + .summaries + .get("local-only") + .expect("local-only install should be indexed"); + assert!(!local_only.downloaded); + assert!(local_only.installed); + assert_eq!(local_only.availability, Availability::LocalOnly); + + let eti_only = scan + .summaries + .get("eti-only") + .expect("eti-only root should be retained as local state"); + assert!(!eti_only.downloaded); + assert!(!eti_only.installed); + assert_eq!(eti_only.availability, Availability::LocalOnly); + + assert!(!scan.summaries.contains_key("non-catalog")); + } + + #[tokio::test] + async fn local_download_available_gates_on_catalog_operation_and_sentinel() { + let temp = TempDir::new(); + let game_root = temp.path().join("game"); + write_file(&game_root.join("version.ini"), b"20250101"); + + let catalog = HashSet::from(["game".to_string()]); + let no_operations = HashMap::new(); + assert!(local_download_available(temp.path(), "game", &no_operations, &catalog).await); + + let active_operations = HashMap::from([("game".to_string(), OperationKind::Downloading)]); + assert!(!local_download_available(temp.path(), "game", &active_operations, &catalog).await); + + assert!( + !local_download_available(temp.path(), "game", &no_operations, &HashSet::new()).await + ); + assert!(!local_download_available(temp.path(), "missing", &no_operations, &catalog).await); + } +} diff --git a/crates/lanspread-peer/src/peer_db.rs b/crates/lanspread-peer/src/peer_db.rs index 850666a..455f95b 100644 --- a/crates/lanspread-peer/src/peer_db.rs +++ b/crates/lanspread-peer/src/peer_db.rs @@ -8,7 +8,7 @@ use std::{ }; use lanspread_db::db::{Game, GameFileDescription}; -use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot}; +use lanspread_proto::{Availability, GameSummary, LibraryDelta, LibrarySnapshot}; use crate::library::compute_library_digest; pub type PeerId = String; @@ -265,8 +265,8 @@ impl PeerGameDB { // Count peers per game for peer in self.peers.values() { - for game_id in peer.games.keys() { - *peer_counts.entry(game_id.clone()).or_insert(0) += 1; + for game in peer.games.values().filter(|game| game_is_ready(game)) { + *peer_counts.entry(game.id.clone()).or_insert(0) += 1; } } @@ -276,20 +276,22 @@ impl PeerGameDB { aggregated .entry(game.id.clone()) .and_modify(|existing| { - if let (Some(new_version), Some(current)) = - (&game.eti_version, &existing.eti_game_version) - { - if new_version > current { - existing.eti_game_version = Some(new_version.clone()); + if game_is_ready(game) { + if let (Some(new_version), Some(current)) = + (&game.eti_version, &existing.eti_game_version) + { + if new_version > current { + existing.eti_game_version = Some(new_version.clone()); + } + } else if existing.eti_game_version.is_none() { + existing.eti_game_version.clone_from(&game.eti_version); } - } else if existing.eti_game_version.is_none() { - existing.eti_game_version.clone_from(&game.eti_version); } - existing.peer_count = peer_counts[&game.id]; + existing.peer_count = *peer_counts.get(&game.id).unwrap_or(&0); if game.size > existing.size { existing.size = game.size; } - if game.downloaded { + if game_is_ready(game) { existing.downloaded = true; } if game.installed { @@ -298,7 +300,8 @@ impl PeerGameDB { }) .or_insert_with(|| { let mut game_clone = summary_to_game(game); - game_clone.peer_count = peer_counts[&game.id]; + game_clone.peer_count = *peer_counts.get(&game.id).unwrap_or(&0); + game_clone.downloaded = game_is_ready(game); game_clone }); } @@ -316,6 +319,7 @@ impl PeerGameDB { for peer in self.peers.values() { if let Some(game) = peer.games.get(game_id) + && game_is_ready(game) && let Some(ref version) = game.eti_version { match &latest_version { @@ -373,7 +377,7 @@ impl PeerGameDB { pub fn peers_with_game(&self, game_id: &str) -> Vec { self.peers .iter() - .filter(|(_, peer)| peer.games.contains_key(game_id)) + .filter(|(_, peer)| peer.games.get(game_id).is_some_and(game_is_ready)) .map(|(_, peer)| peer.addr) .collect() } @@ -388,7 +392,9 @@ impl PeerGameDB { .iter() .filter(|(_, peer)| { if let Some(game) = peer.games.get(game_id) { - if let Some(ref version) = game.eti_version { + if game_is_ready(game) + && let Some(ref version) = game.eti_version + { version == latest } else { false @@ -411,6 +417,9 @@ impl PeerGameDB { self.peers .values() .filter_map(|peer| { + if !peer.games.get(game_id).is_some_and(game_is_ready) { + return None; + } peer.files .get(game_id) .cloned() @@ -438,6 +447,9 @@ impl PeerGameDB { for peer in self.peers.values() { if let Some(game) = peer.games.get(game_id) { + if !game_is_ready(game) { + continue; + } if game.size == 0 { continue; } @@ -711,7 +723,15 @@ fn create_peer_whitelist(peer_scores: HashMap) -> Vec bool { + summary.availability == Availability::Ready +} + fn summary_to_game(summary: &GameSummary) -> Game { + let eti_game_version = game_is_ready(summary) + .then(|| summary.eti_version.clone()) + .flatten(); + Game { id: summary.id.clone(), name: summary.name.clone(), @@ -724,8 +744,83 @@ fn summary_to_game(summary: &GameSummary) -> Game { size: summary.size, downloaded: summary.downloaded, installed: summary.installed, - eti_game_version: summary.eti_version.clone(), + eti_game_version, local_version: None, peer_count: 0, } } + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + + use super::*; + + fn addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + + fn summary(id: &str, version: &str, availability: Availability) -> GameSummary { + GameSummary { + id: id.to_string(), + name: id.to_string(), + size: 42, + downloaded: availability == Availability::Ready, + installed: true, + eti_version: Some(version.to_string()), + manifest_hash: 7, + availability, + } + } + + #[test] + fn aggregation_counts_only_ready_peers_as_download_sources() { + let ready_addr = addr(12000); + let local_only_addr = addr(12001); + let mut db = PeerGameDB::new(); + db.upsert_peer("ready".to_string(), ready_addr); + db.upsert_peer("local".to_string(), local_only_addr); + db.update_peer_games( + &"ready".to_string(), + vec![summary("game", "20240101", Availability::Ready)], + ); + db.update_peer_games( + &"local".to_string(), + vec![summary("game", "20990101", Availability::LocalOnly)], + ); + + let games = db.get_all_games(); + assert_eq!(games.len(), 1); + assert_eq!(games[0].peer_count, 1); + assert!(games[0].downloaded); + assert_eq!(games[0].eti_game_version.as_deref(), Some("20240101")); + + assert_eq!(db.peers_with_game("game"), vec![ready_addr]); + assert_eq!( + db.get_latest_version_for_game("game").as_deref(), + Some("20240101") + ); + assert_eq!(db.peers_with_latest_version("game"), vec![ready_addr]); + } + + #[test] + fn local_only_peer_does_not_make_game_downloadable() { + let local_only_addr = addr(12002); + let mut db = PeerGameDB::new(); + db.upsert_peer("local".to_string(), local_only_addr); + db.update_peer_games( + &"local".to_string(), + vec![summary("game", "20240101", Availability::LocalOnly)], + ); + + let games = db.get_all_games(); + assert_eq!(games.len(), 1); + assert_eq!(games[0].peer_count, 0); + assert!(!games[0].downloaded); + assert_eq!(games[0].eti_game_version, None); + + assert!(db.peers_with_game("game").is_empty()); + assert_eq!(db.get_latest_version_for_game("game"), None); + assert!(db.peers_with_latest_version("game").is_empty()); + } +} diff --git a/crates/lanspread-peer/src/remote_peer.rs b/crates/lanspread-peer/src/remote_peer.rs index d49a040..2e4def0 100644 --- a/crates/lanspread-peer/src/remote_peer.rs +++ b/crates/lanspread-peer/src/remote_peer.rs @@ -26,6 +26,12 @@ pub async fn ensure_peer_id_for_addr( } pub fn summary_from_game(game: &Game) -> GameSummary { + let availability = if game.downloaded { + Availability::Ready + } else { + Availability::LocalOnly + }; + GameSummary { id: game.id.clone(), name: game.name.clone(), @@ -34,7 +40,7 @@ pub fn summary_from_game(game: &Game) -> GameSummary { installed: game.installed, eti_version: game.eti_game_version.clone(), manifest_hash: 0, - availability: Availability::Ready, + availability, } } diff --git a/crates/lanspread-peer/src/services/liveness.rs b/crates/lanspread-peer/src/services/liveness.rs index 0b45b10..a52977c 100644 --- a/crates/lanspread-peer/src/services/liveness.rs +++ b/crates/lanspread-peer/src/services/liveness.rs @@ -1,10 +1,6 @@ //! Peer liveness checks and stale-peer cleanup. -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -12,6 +8,7 @@ use tokio_util::{sync::CancellationToken, task::TaskTracker}; use crate::{ PeerEvent, config::{PEER_PING_IDLE_SECS, PEER_PING_INTERVAL_SECS, peer_stale_timeout}, + context::OperationKind, events, network::ping_peer, peer_db::{PeerGameDB, PeerId}, @@ -21,7 +18,7 @@ use crate::{ pub async fn run_ping_service( tx_notify_ui: UnboundedSender, peer_game_db: Arc>, - downloading_games: Arc>>, + active_operations: Arc>>, active_downloads: Arc>>, shutdown: CancellationToken, task_tracker: TaskTracker, @@ -43,7 +40,7 @@ pub async fn run_ping_service( ping_idle_peers( &peer_game_db, - &downloading_games, + &active_operations, &active_downloads, &tx_notify_ui, &shutdown, @@ -53,7 +50,7 @@ pub async fn run_ping_service( prune_stale_peers( &peer_game_db, - &downloading_games, + &active_operations, &active_downloads, &tx_notify_ui, ) @@ -63,7 +60,7 @@ pub async fn run_ping_service( async fn ping_idle_peers( peer_game_db: &Arc>, - downloading_games: &Arc>>, + active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, shutdown: &CancellationToken, @@ -78,7 +75,7 @@ async fn ping_idle_peers( let tx_notify_ui = tx_notify_ui.clone(); let peer_game_db = peer_game_db.clone(); - let downloading_games = downloading_games.clone(); + let active_operations = active_operations.clone(); let active_downloads = active_downloads.clone(); let shutdown = shutdown.clone(); @@ -96,7 +93,7 @@ async fn ping_idle_peers( log::warn!("Peer {peer_addr} failed ping check"); remove_peer_and_refresh( &peer_game_db, - &downloading_games, + &active_operations, &active_downloads, &tx_notify_ui, peer_id, @@ -108,7 +105,7 @@ async fn ping_idle_peers( log::error!("Failed to ping peer {peer_addr}: {err}"); remove_peer_and_refresh( &peer_game_db, - &downloading_games, + &active_operations, &active_downloads, &tx_notify_ui, peer_id, @@ -123,7 +120,7 @@ async fn ping_idle_peers( async fn prune_stale_peers( peer_game_db: &Arc>, - downloading_games: &Arc>>, + active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, ) { @@ -143,7 +140,7 @@ async fn prune_stale_peers( events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; handle_active_downloads_without_peers( peer_game_db, - downloading_games, + active_operations, active_downloads, tx_notify_ui, ) @@ -153,7 +150,7 @@ async fn prune_stale_peers( async fn remove_peer_and_refresh( peer_game_db: &Arc>, - downloading_games: &Arc>>, + active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, peer_id: PeerId, @@ -163,7 +160,7 @@ async fn remove_peer_and_refresh( events::emit_peer_game_list(peer_game_db, tx_notify_ui).await; handle_active_downloads_without_peers( peer_game_db, - downloading_games, + active_operations, active_downloads, tx_notify_ui, ) @@ -189,16 +186,16 @@ async fn remove_peer( async fn handle_active_downloads_without_peers( peer_game_db: &Arc>, - downloading_games: &Arc>>, + active_operations: &Arc>>, active_downloads: &Arc>>, tx_notify_ui: &UnboundedSender, ) { let active_ids = { - downloading_games + active_operations .read() .await .iter() - .cloned() + .filter_map(|(id, kind)| (*kind == OperationKind::Downloading).then_some(id.clone())) .collect::>() }; if active_ids.is_empty() { @@ -210,6 +207,7 @@ async fn handle_active_downloads_without_peers( continue; } + active_operations.write().await.remove(&id); let Some(cancel_token) = active_downloads.write().await.remove(&id) else { continue; }; @@ -229,21 +227,21 @@ async fn peers_still_have_game(peer_game_db: &Arc>, game_id: #[cfg(test)] mod tests { - use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - }; + use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use super::handle_active_downloads_without_peers; - use crate::{PeerEvent, peer_db::PeerGameDB}; + use crate::{PeerEvent, context::OperationKind, peer_db::PeerGameDB}; #[tokio::test] async fn all_peers_gone_cancels_download_and_emits_only_peers_gone() { let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); - let downloading_games = Arc::new(RwLock::new(HashSet::from(["game".to_string()]))); + let active_operations = Arc::new(RwLock::new(HashMap::from([( + "game".to_string(), + OperationKind::Downloading, + )]))); let cancel = CancellationToken::new(); let active_downloads = Arc::new(RwLock::new(HashMap::from([( "game".to_string(), @@ -253,13 +251,14 @@ mod tests { handle_active_downloads_without_peers( &peer_game_db, - &downloading_games, + &active_operations, &active_downloads, &tx, ) .await; assert!(cancel.is_cancelled()); + assert!(!active_operations.read().await.contains_key("game")); assert!(!active_downloads.read().await.contains_key("game")); let event = rx.recv().await.expect("peers-gone event should be emitted"); diff --git a/crates/lanspread-peer/src/services/local_monitor.rs b/crates/lanspread-peer/src/services/local_monitor.rs index fbfe802..24b5b60 100644 --- a/crates/lanspread-peer/src/services/local_monitor.rs +++ b/crates/lanspread-peer/src/services/local_monitor.rs @@ -1,42 +1,372 @@ //! Local game directory monitor. -use std::time::Duration; +use std::{ + collections::HashSet, + path::{Component, Path, PathBuf}, + sync::Arc, + time::Duration, +}; -use tokio::sync::mpsc::UnboundedSender; +use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher}; +use tokio::sync::{RwLock, mpsc::UnboundedSender}; use crate::{ PeerEvent, - config::LOCAL_GAME_MONITOR_INTERVAL_SECS, + config::LOCAL_GAME_FALLBACK_SCAN_SECS, context::Ctx, handlers::update_and_announce_games, - local_games::scan_local_library, + local_games::{ + is_ignored_game_root_name, + is_local_dir_name, + rescan_local_game, + scan_local_library, + }, }; +struct WatchState { + watcher: RecommendedWatcher, + game_dir: PathBuf, + watched: HashSet, +} + +#[derive(Clone, Default)] +struct RescanGate { + running: Arc>>, + pending: Arc>>, +} + /// Monitors the local game directory for changes. pub async fn run_local_game_monitor( tx_notify_ui: UnboundedSender, ctx: Ctx, ) -> eyre::Result<()> { - log::info!( - "Starting local game directory monitor ({LOCAL_GAME_MONITOR_INTERVAL_SECS}s interval)" - ); + log::info!("Starting notify-based local game directory monitor"); - let mut interval = tokio::time::interval(Duration::from_secs(LOCAL_GAME_MONITOR_INTERVAL_SECS)); + let (watch_tx, mut watch_rx) = tokio::sync::mpsc::unbounded_channel::>(); + let mut watch_state = build_watch_state(&ctx, watch_tx.clone()).await; + let gate = RescanGate::default(); + let mut fallback_interval = + tokio::time::interval(Duration::from_secs(LOCAL_GAME_FALLBACK_SCAN_SECS)); loop { tokio::select! { () = ctx.shutdown.cancelled() => return Ok(()), - _ = interval.tick() => {} - } - - let game_dir = { ctx.game_dir.read().await.clone() }; - match scan_local_library(&game_dir).await { - Ok(scan) => { - update_and_announce_games(&ctx, &tx_notify_ui, scan).await; + _ = fallback_interval.tick() => { + run_fallback_scan(&ctx, &tx_notify_ui).await; + reconcile_watch_state(&ctx, &mut watch_state, watch_tx.clone()).await; } - Err(err) => { - log::error!("Failed to scan local games directory: {err}"); + Some(event) = watch_rx.recv() => { + handle_watch_event( + &ctx, + &tx_notify_ui, + &gate, + event, + ).await; + reconcile_watch_state(&ctx, &mut watch_state, watch_tx.clone()).await; } } } } + +async fn build_watch_state( + ctx: &Ctx, + watch_tx: tokio::sync::mpsc::UnboundedSender>, +) -> Option { + let game_dir = ctx.game_dir.read().await.clone(); + let mut fs_watcher = match RecommendedWatcher::new( + move |result| { + let _ = watch_tx.send(result); + }, + Config::default(), + ) { + Ok(watcher) => watcher, + Err(err) => { + log::warn!("Filesystem watcher unavailable; falling back to periodic scans: {err}"); + return None; + } + }; + + let watched_paths = match watch_game_roots(&mut fs_watcher, &game_dir).await { + Ok(paths) => paths, + Err(err) => { + log::warn!( + "Failed to initialize filesystem watcher for {}: {err}; falling back to periodic scans", + game_dir.display() + ); + return None; + } + }; + + Some(WatchState { + watcher: fs_watcher, + game_dir, + watched: watched_paths, + }) +} + +async fn reconcile_watch_state( + ctx: &Ctx, + watch_state: &mut Option, + watch_tx: tokio::sync::mpsc::UnboundedSender>, +) { + let current_game_dir = ctx.game_dir.read().await.clone(); + if watch_state + .as_ref() + .is_none_or(|state| state.game_dir != current_game_dir) + { + *watch_state = build_watch_state(ctx, watch_tx).await; + return; + } + + if let Some(state) = watch_state + && let Err(err) = reconcile_game_root_watches(state).await + { + log::warn!( + "Failed to reconcile filesystem watches for {}: {err}", + state.game_dir.display() + ); + } +} + +async fn watch_game_roots( + watcher: &mut RecommendedWatcher, + game_dir: &Path, +) -> eyre::Result> { + let mut watched_paths = HashSet::new(); + watch_path(watcher, game_dir, &mut watched_paths)?; + + for root in list_game_roots(game_dir).await? { + watch_path(watcher, &root, &mut watched_paths)?; + } + + Ok(watched_paths) +} + +async fn reconcile_game_root_watches(state: &mut WatchState) -> eyre::Result<()> { + let desired = { + let mut desired = HashSet::from([state.game_dir.clone()]); + desired.extend(list_game_roots(&state.game_dir).await?); + desired + }; + + let stale_paths = state + .watched + .difference(&desired) + .cloned() + .collect::>(); + for path in stale_paths { + if let Err(err) = state.watcher.unwatch(&path) { + log::debug!("Failed to unwatch {}: {err}", path.display()); + } + state.watched.remove(&path); + } + + let new_paths = desired + .difference(&state.watched) + .cloned() + .collect::>(); + for path in new_paths { + watch_path(&mut state.watcher, &path, &mut state.watched)?; + } + + Ok(()) +} + +fn watch_path( + watcher: &mut RecommendedWatcher, + path: &Path, + watched_paths: &mut HashSet, +) -> notify::Result<()> { + watcher.watch(path, RecursiveMode::NonRecursive)?; + watched_paths.insert(path.to_path_buf()); + Ok(()) +} + +async fn list_game_roots(game_dir: &Path) -> eyre::Result> { + let mut roots = Vec::new(); + let mut entries = match tokio::fs::read_dir(game_dir).await { + Ok(entries) => entries, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(roots), + Err(err) => return Err(err.into()), + }; + + while let Some(entry) = entries.next_entry().await? { + if !entry.file_type().await?.is_dir() { + continue; + } + let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { + continue; + }; + if is_ignored_game_root_name(&name) { + continue; + } + roots.push(entry.path()); + } + Ok(roots) +} + +async fn handle_watch_event( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + gate: &RescanGate, + event: notify::Result, +) { + let event = match event { + Ok(event) => event, + Err(err) => { + log::warn!("Filesystem watcher event error: {err}"); + return; + } + }; + + let game_dir = ctx.game_dir.read().await.clone(); + let ids = event + .paths + .iter() + .filter_map(|path| game_id_from_event_path(&game_dir, path)) + .collect::>(); + + for id in ids { + if ctx.active_operations.read().await.contains_key(&id) { + log::debug!("Dropping filesystem event for {id}: operation active"); + continue; + } + queue_rescan(ctx, tx_notify_ui, gate, id).await; + } +} + +async fn queue_rescan( + ctx: &Ctx, + tx_notify_ui: &UnboundedSender, + gate: &RescanGate, + id: String, +) { + { + let mut running = gate.running.write().await; + if running.contains(&id) { + gate.pending.write().await.insert(id); + return; + } + running.insert(id.clone()); + } + + let ctx = ctx.clone(); + let tx_notify_ui = tx_notify_ui.clone(); + let gate = gate.clone(); + ctx.task_tracker.clone().spawn(async move { + run_gated_rescan(ctx, tx_notify_ui, gate, id).await; + }); +} + +async fn run_gated_rescan( + ctx: Ctx, + tx_notify_ui: UnboundedSender, + gate: RescanGate, + id: String, +) { + loop { + gate.pending.write().await.remove(&id); + + if ctx.active_operations.read().await.contains_key(&id) { + break; + } + + let game_dir = ctx.game_dir.read().await.clone(); + let catalog = ctx.catalog.read().await.clone(); + match rescan_local_game(&game_dir, &catalog, &id).await { + Ok(scan) => update_and_announce_games(&ctx, &tx_notify_ui, scan).await, + Err(err) => log::error!("Failed to rescan local game {id}: {err}"), + } + + if !gate.pending.write().await.remove(&id) { + break; + } + } + + gate.running.write().await.remove(&id); +} + +async fn run_fallback_scan(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { + let game_dir = ctx.game_dir.read().await.clone(); + let catalog = ctx.catalog.read().await.clone(); + match scan_local_library(&game_dir, &catalog).await { + Ok(scan) => update_and_announce_games(ctx, tx_notify_ui, scan).await, + Err(err) => log::error!("Failed to scan local games directory: {err}"), + } +} + +fn game_id_from_event_path(game_dir: &Path, path: &Path) -> Option { + let relative = path.strip_prefix(game_dir).ok()?; + let mut components = relative.components(); + let game_id = component_name(components.next()?)?; + if is_ignored_game_root_name(game_id) { + return None; + } + + if let Some(second) = components.next().and_then(component_name) + && should_ignore_game_child(second) + { + return None; + } + + Some(game_id.to_string()) +} + +fn component_name(component: Component<'_>) -> Option<&str> { + match component { + Component::Normal(name) => name.to_str(), + _ => None, + } +} + +fn should_ignore_game_child(name: &str) -> bool { + is_local_dir_name(name) + || name.starts_with(".local.") + || name.starts_with(".version.ini.") + || name == ".lanspread" + || name == ".lanspread.json" + || name == ".sync" + || name == ".softlan_game_installed" +} + +#[cfg(test)] +mod tests { + use super::{game_id_from_event_path, should_ignore_game_child}; + + #[test] + fn event_paths_map_to_top_level_game_id() { + let root = std::path::Path::new("/games"); + assert_eq!( + game_id_from_event_path(root, std::path::Path::new("/games/aoe2/version.ini")) + .as_deref(), + Some("aoe2") + ); + assert_eq!( + game_id_from_event_path(root, std::path::Path::new("/games/aoe2/local/save.dat")), + None + ); + assert_eq!( + game_id_from_event_path(root, std::path::Path::new("/games/.lanspread/index.json")), + None + ); + } + + #[test] + fn event_ignore_list_covers_reserved_names() { + for name in [ + "local", + ".local.installing", + ".local.backup", + ".version.ini.tmp", + ".version.ini.discarded", + ".lanspread", + ".lanspread.json", + ".sync", + ".softlan_game_installed", + ] { + assert!(should_ignore_game_child(name)); + } + assert!(!should_ignore_game_child("version.ini")); + assert!(!should_ignore_game_child("game.eti")); + } +} diff --git a/crates/lanspread-peer/src/services/stream.rs b/crates/lanspread-peer/src/services/stream.rs index 7808442..106082d 100644 --- a/crates/lanspread-peer/src/services/stream.rs +++ b/crates/lanspread-peer/src/services/stream.rs @@ -13,7 +13,7 @@ use crate::{ context::PeerCtx, error::PeerError, events, - local_games::get_game_file_descriptions, + local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available}, peer::{send_game_file_chunk, send_game_file_data}, remote_peer::{ensure_peer_id_for_addr, update_peer_from_game_list}, services::handshake::{ @@ -155,10 +155,10 @@ async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> Response let games = if snapshot.is_empty() { snapshot } else { - let downloading = ctx.downloading_games.read().await; + let active_operations = ctx.active_operations.read().await; snapshot .into_iter() - .filter(|game| !downloading.contains(&game.id)) + .filter(|game| !active_operations.contains_key(&game.id)) .collect() }; @@ -268,22 +268,8 @@ async fn handle_get_game(ctx: &PeerCtx, id: String, framed_tx: ResponseWriter) - } async fn get_game_response(ctx: &PeerCtx, id: String) -> Response { - let downloading = ctx.downloading_games.read().await.contains(&id); - if downloading { - log::info!("Declining to serve GetGame for {id} because download is in progress"); - return Response::GameNotFound(id); - } - let game_dir = ctx.game_dir.read().await.clone(); - - let has_game = { - let db_guard = ctx.local_game_db.read().await; - db_guard - .as_ref() - .is_some_and(|db| db.get_game_by_id(&id).is_some()) - }; - - if !has_game { + if !can_serve_game(ctx, &game_dir, &id).await { return Response::GameNotFound(id); } @@ -304,6 +290,22 @@ async fn get_game_response(ctx: &PeerCtx, id: String) -> Response { } } +async fn can_serve_game(ctx: &PeerCtx, game_dir: &std::path::Path, game_id: &str) -> bool { + let active_operations = ctx.active_operations.read().await; + let catalog = ctx.catalog.read().await; + local_download_available(game_dir, game_id, &active_operations, &catalog).await +} + +fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool { + let normalised = relative_path.replace('\\', "/"); + let mut parts = normalised.split('/').filter(|part| !part.is_empty()); + match (parts.next(), parts.next()) { + (Some(first), _) if is_local_dir_name(first) => true, + (Some(first), Some(second)) if first == game_id && is_local_dir_name(second) => true, + _ => false, + } +} + async fn handle_file_data_request( ctx: &PeerCtx, desc: GameFileDescription, @@ -314,9 +316,19 @@ async fn handle_file_data_request( desc.relative_path ); - let game_dir = ctx.game_dir.read().await.clone(); - let mut tx = framed_tx.into_inner(); + let game_dir = ctx.game_dir.read().await.clone(); + if path_points_inside_local(&desc.game_id, &desc.relative_path) + || !can_serve_game(ctx, &game_dir, &desc.game_id).await + { + log::info!( + "Declining GetGameFileData for {} because the game is not currently transferable", + desc.relative_path + ); + let _ = tx.close().await; + return FramedWrite::new(tx, LengthDelimitedCodec::new()); + } + send_game_file_data(&desc, &mut tx, &game_dir).await; FramedWrite::new(tx, LengthDelimitedCodec::new()) } @@ -333,9 +345,18 @@ async fn handle_file_chunk_request( "Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})" ); - let game_dir = ctx.game_dir.read().await.clone(); - let mut tx = framed_tx.into_inner(); + let game_dir = ctx.game_dir.read().await.clone(); + if path_points_inside_local(&game_id, &relative_path) + || !can_serve_game(ctx, &game_dir, &game_id).await + { + log::info!( + "Declining GetGameFileChunk for {relative_path} because the game is not currently transferable" + ); + let _ = tx.close().await; + return FramedWrite::new(tx, LengthDelimitedCodec::new()); + } + send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &game_dir).await; FramedWrite::new(tx, LengthDelimitedCodec::new()) } @@ -365,3 +386,17 @@ async fn handle_announce_games(ctx: &PeerCtx, remote_addr: Option, g events::send(&ctx.tx_notify_ui, PeerEvent::ListGames(aggregated_games)); } } + +#[cfg(test)] +mod tests { + use super::path_points_inside_local; + + #[test] + fn local_relative_paths_are_never_transferable() { + assert!(path_points_inside_local("game", "game/local/save.dat")); + assert!(path_points_inside_local("game", "local/save.dat")); + assert!(path_points_inside_local("game", "game\\local\\save.dat")); + assert!(!path_points_inside_local("game", "game/version.ini")); + assert!(!path_points_inside_local("game", "game/archive.eti")); + } +} diff --git a/crates/lanspread-peer/src/startup.rs b/crates/lanspread-peer/src/startup.rs index 7c92490..a6c95f9 100644 --- a/crates/lanspread-peer/src/startup.rs +++ b/crates/lanspread-peer/src/startup.rs @@ -22,6 +22,7 @@ use crate::{ PeerCommand, PeerEvent, PeerRuntimeComponent, + Unpacker, context::Ctx, events, network::send_goodbye, @@ -73,6 +74,7 @@ pub(crate) enum SupervisionPolicy { BestEffort, } +#[allow(clippy::too_many_arguments, clippy::implicit_hasher)] pub(crate) fn spawn_peer_runtime( tx_control: UnboundedSender, rx_control: UnboundedReceiver, @@ -80,6 +82,8 @@ pub(crate) fn spawn_peer_runtime( peer_game_db: Arc>, peer_id: String, game_dir: PathBuf, + unpacker: Arc, + catalog: Arc>>, ) -> PeerRuntimeHandle { let shutdown = CancellationToken::new(); let task_tracker = TaskTracker::new(); @@ -94,8 +98,10 @@ pub(crate) fn spawn_peer_runtime( peer_game_db, peer_id, game_dir, + unpacker, runtime_shutdown.clone(), runtime_tracker.clone(), + catalog, ) .await { @@ -182,7 +188,7 @@ fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender) { let tx_notify_ui = tx_notify_ui.clone(); let peer_game_db = ctx.peer_game_db.clone(); - let downloading_games = ctx.downloading_games.clone(); + let active_operations = ctx.active_operations.clone(); let active_downloads = ctx.active_downloads.clone(); let shutdown = ctx.shutdown.clone(); let task_tracker = ctx.task_tracker.clone(); @@ -199,7 +205,7 @@ fn spawn_peer_liveness_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender