fix(peer): refresh settled install state after operations
The follow-up review found a few stale lifecycle edges around local game transactions. Recovery could sweep active roots, post-operation refreshes still re-ran full startup recovery, and the UI kept inferring local-only state from downloaded and installed flags instead of the backend availability. This updates the peer lifecycle so startup recovery skips active operations, install/update/uninstall refresh only the affected game after the operation guard is dropped, and path-changing game-directory updates are rejected while operations are active. It also removes the dead UpdateGame command, drops the unused manifest_hash write field while preserving old JSON reads, renames the internal install-finished event, and carries availability through the DB, peer summaries, Tauri refreshes, and the React model. The included follow-up documents record the review source, implementation decisions, and the remaining FOLLOW_UP_2.md work so later commits can stay small instead of reopening the completed plan items. Test Plan: - git diff --check - just fmt - just clippy - just test Follow-up-Plan: FOLLOW_UP_PLAN.md
This commit is contained in:
@@ -1,6 +1,11 @@
|
||||
//! Command handlers for peer commands.
|
||||
|
||||
use std::{collections::hash_map::Entry, net::SocketAddr, path::PathBuf, sync::Arc};
|
||||
use std::{
|
||||
collections::{HashSet, hash_map::Entry},
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use lanspread_db::db::{GameDB, GameFileDescription};
|
||||
use tokio::sync::{RwLock, mpsc::UnboundedSender};
|
||||
@@ -19,6 +24,7 @@ use crate::{
|
||||
get_game_file_descriptions,
|
||||
local_dir_is_directory,
|
||||
local_download_available,
|
||||
rescan_local_game,
|
||||
scan_local_library,
|
||||
version_ini_is_regular_file,
|
||||
},
|
||||
@@ -211,12 +217,7 @@ pub async fn handle_download_game_files_command(
|
||||
{
|
||||
log::error!("Failed to send DownloadGameFilesFinished event: {e}");
|
||||
}
|
||||
spawn_install_operation(
|
||||
ctx,
|
||||
tx_notify_ui,
|
||||
id.clone(),
|
||||
RequestedInstallOperation::Auto,
|
||||
);
|
||||
spawn_install_operation(ctx, tx_notify_ui, id.clone());
|
||||
} else {
|
||||
log::error!("No trusted peers available after majority validation for game {id}");
|
||||
}
|
||||
@@ -267,13 +268,7 @@ pub async fn handle_download_game_files_command(
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
run_install_operation(
|
||||
&ctx_clone,
|
||||
&tx_notify_ui_clone,
|
||||
download_id,
|
||||
RequestedInstallOperation::Auto,
|
||||
)
|
||||
.await;
|
||||
run_install_operation(&ctx_clone, &tx_notify_ui_clone, download_id).await;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("Download failed for {download_id}: {e}");
|
||||
@@ -288,16 +283,7 @@ pub async fn handle_install_game_command(
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
id: String,
|
||||
) {
|
||||
spawn_install_operation(ctx, tx_notify_ui, id, RequestedInstallOperation::Install);
|
||||
}
|
||||
|
||||
/// Handles the `UpdateGame` command.
|
||||
pub async fn handle_update_game_command(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
id: String,
|
||||
) {
|
||||
spawn_install_operation(ctx, tx_notify_ui, id, RequestedInstallOperation::Update);
|
||||
spawn_install_operation(ctx, tx_notify_ui, id);
|
||||
}
|
||||
|
||||
/// Handles the `UninstallGame` command.
|
||||
@@ -313,32 +299,15 @@ pub async fn handle_uninstall_game_command(
|
||||
});
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum RequestedInstallOperation {
|
||||
Auto,
|
||||
Install,
|
||||
Update,
|
||||
}
|
||||
|
||||
fn spawn_install_operation(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
id: String,
|
||||
requested: RequestedInstallOperation,
|
||||
) {
|
||||
fn spawn_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>, id: String) {
|
||||
let ctx = ctx.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
ctx.task_tracker.clone().spawn(async move {
|
||||
run_install_operation(&ctx, &tx_notify_ui, id, requested).await;
|
||||
run_install_operation(&ctx, &tx_notify_ui, id).await;
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_install_operation(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
id: String,
|
||||
requested: RequestedInstallOperation,
|
||||
) {
|
||||
async fn run_install_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>, id: String) {
|
||||
if !catalog_contains(ctx, &id).await {
|
||||
log::warn!("Ignoring install command for non-catalog game {id}");
|
||||
return;
|
||||
@@ -352,14 +321,10 @@ async fn run_install_operation(
|
||||
}
|
||||
|
||||
let local_present = local_dir_is_directory(&game_root).await;
|
||||
let operation = match requested {
|
||||
RequestedInstallOperation::Auto | RequestedInstallOperation::Install if local_present => {
|
||||
InstallOperation::Updating
|
||||
}
|
||||
RequestedInstallOperation::Auto | RequestedInstallOperation::Install => {
|
||||
InstallOperation::Installing
|
||||
}
|
||||
RequestedInstallOperation::Update => InstallOperation::Updating,
|
||||
let operation = if local_present {
|
||||
InstallOperation::Updating
|
||||
} else {
|
||||
InstallOperation::Installing
|
||||
};
|
||||
let operation_kind = match operation {
|
||||
InstallOperation::Installing => OperationKind::Installing,
|
||||
@@ -371,20 +336,24 @@ async fn run_install_operation(
|
||||
return;
|
||||
}
|
||||
|
||||
let _operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone());
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
PeerEvent::InstallGameBegin {
|
||||
id: id.clone(),
|
||||
operation,
|
||||
},
|
||||
);
|
||||
let result = {
|
||||
let _operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone());
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
PeerEvent::InstallGameBegin {
|
||||
id: id.clone(),
|
||||
operation,
|
||||
},
|
||||
);
|
||||
|
||||
let result = match operation {
|
||||
InstallOperation::Installing => {
|
||||
install::install(&game_root, &id, ctx.unpacker.clone()).await
|
||||
match operation {
|
||||
InstallOperation::Installing => {
|
||||
install::install(&game_root, &id, ctx.unpacker.clone()).await
|
||||
}
|
||||
InstallOperation::Updating => {
|
||||
install::update(&game_root, &id, ctx.unpacker.clone()).await
|
||||
}
|
||||
}
|
||||
InstallOperation::Updating => install::update(&game_root, &id, ctx.unpacker.clone()).await,
|
||||
};
|
||||
|
||||
match result {
|
||||
@@ -393,14 +362,17 @@ async fn run_install_operation(
|
||||
tx_notify_ui,
|
||||
PeerEvent::InstallGameFinished { id: id.clone() },
|
||||
);
|
||||
if let Err(err) = load_local_library(ctx, tx_notify_ui).await {
|
||||
if let Err(err) = refresh_local_game(ctx, tx_notify_ui, &id).await {
|
||||
log::error!("Failed to refresh local library after install: {err}");
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Install operation failed for {id}: {err}");
|
||||
events::send(tx_notify_ui, PeerEvent::InstallGameFailed { id });
|
||||
if let Err(refresh_err) = load_local_library(ctx, tx_notify_ui).await {
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
PeerEvent::InstallGameFailed { id: id.clone() },
|
||||
);
|
||||
if let Err(refresh_err) = refresh_local_game(ctx, tx_notify_ui, &id).await {
|
||||
log::error!("Failed to refresh local library after install failure: {refresh_err}");
|
||||
}
|
||||
}
|
||||
@@ -418,14 +390,18 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerE
|
||||
return;
|
||||
}
|
||||
|
||||
let _operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone());
|
||||
let game_root = { ctx.game_dir.read().await.join(&id) };
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
PeerEvent::UninstallGameBegin { id: id.clone() },
|
||||
);
|
||||
let result = {
|
||||
let _operation_guard = OperationGuard::new(id.clone(), ctx.active_operations.clone());
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
PeerEvent::UninstallGameBegin { id: id.clone() },
|
||||
);
|
||||
|
||||
match install::uninstall(&game_root, &id).await {
|
||||
install::uninstall(&game_root, &id).await
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
events::send(
|
||||
tx_notify_ui,
|
||||
@@ -441,7 +417,7 @@ async fn run_uninstall_operation(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerE
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = load_local_library(ctx, tx_notify_ui).await {
|
||||
if let Err(err) = refresh_local_game(ctx, tx_notify_ui, &id).await {
|
||||
log::error!("Failed to refresh local library after uninstall: {err}");
|
||||
}
|
||||
}
|
||||
@@ -467,6 +443,32 @@ pub async fn handle_set_game_dir_command(
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
game_dir: PathBuf,
|
||||
) {
|
||||
let current_game_dir = ctx.game_dir.read().await.clone();
|
||||
if current_game_dir == game_dir {
|
||||
log::info!(
|
||||
"Game directory {} unchanged; refreshing without recovery",
|
||||
game_dir.display()
|
||||
);
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
ctx.task_tracker.spawn(async move {
|
||||
if let Err(err) = refresh_local_library(&ctx_clone, &tx_notify_ui).await {
|
||||
log::error!("Failed to refresh local game database: {err}");
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
let active_ids = active_operation_ids(ctx).await;
|
||||
if !active_ids.is_empty() {
|
||||
log::warn!(
|
||||
"Rejecting game directory change to {} while operations are active for: {}",
|
||||
game_dir.display(),
|
||||
active_ids.into_iter().collect::<Vec<_>>().join(", ")
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
*ctx.game_dir.write().await = game_dir.clone();
|
||||
log::info!("Game directory set to: {}", game_dir.display());
|
||||
|
||||
@@ -489,13 +491,46 @@ pub async fn load_local_library(
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
let game_dir = { ctx.game_dir.read().await.clone() };
|
||||
install::recover_on_startup(&game_dir).await?;
|
||||
let active_ids = active_operation_ids(ctx).await;
|
||||
install::recover_on_startup(&game_dir, &active_ids).await?;
|
||||
scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir).await
|
||||
}
|
||||
|
||||
async fn refresh_local_library(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
let game_dir = { ctx.game_dir.read().await.clone() };
|
||||
scan_and_announce_local_library(ctx, tx_notify_ui, &game_dir).await
|
||||
}
|
||||
|
||||
async fn scan_and_announce_local_library(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
game_dir: &Path,
|
||||
) -> eyre::Result<()> {
|
||||
let catalog = ctx.catalog.read().await.clone();
|
||||
let scan = scan_local_library(&game_dir, &catalog).await?;
|
||||
let scan = scan_local_library(game_dir, &catalog).await?;
|
||||
update_and_announce_games(ctx, tx_notify_ui, scan).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn refresh_local_game(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
id: &str,
|
||||
) -> eyre::Result<()> {
|
||||
let game_dir = { ctx.game_dir.read().await.clone() };
|
||||
let catalog = ctx.catalog.read().await.clone();
|
||||
let scan = rescan_local_game(&game_dir, &catalog, id).await?;
|
||||
update_and_announce_games(ctx, tx_notify_ui, scan).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn active_operation_ids(ctx: &Ctx) -> HashSet<String> {
|
||||
ctx.active_operations.read().await.keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// Handles the `GetPeerCount` command.
|
||||
pub async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
||||
log::info!("GetPeerCount command received");
|
||||
@@ -589,3 +624,225 @@ pub async fn update_and_announce_games(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
||||
|
||||
use super::*;
|
||||
use crate::{UnpackFuture, Unpacker};
|
||||
|
||||
struct TempDir(PathBuf);
|
||||
|
||||
impl TempDir {
|
||||
fn new(prefix: &str) -> Self {
|
||||
let mut path = std::env::temp_dir();
|
||||
path.push(format!(
|
||||
"{prefix}-{}-{}",
|
||||
std::process::id(),
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_nanos()
|
||||
));
|
||||
std::fs::create_dir_all(&path).expect("temp dir should be created");
|
||||
Self(path)
|
||||
}
|
||||
|
||||
fn path(&self) -> &Path {
|
||||
&self.0
|
||||
}
|
||||
|
||||
fn game_root(&self) -> PathBuf {
|
||||
self.0.join("game")
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TempDir {
|
||||
fn drop(&mut self) {
|
||||
let _ = std::fs::remove_dir_all(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
struct FakeUnpacker;
|
||||
|
||||
impl Unpacker for FakeUnpacker {
|
||||
fn unpack<'a>(&'a self, _archive: &'a Path, dest: &'a Path) -> UnpackFuture<'a> {
|
||||
Box::pin(async move {
|
||||
tokio::fs::write(dest.join("payload.txt"), b"installed").await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn write_file(path: &Path, bytes: &[u8]) {
|
||||
if let Some(parent) = path.parent() {
|
||||
std::fs::create_dir_all(parent).expect("parent dir should be created");
|
||||
}
|
||||
std::fs::write(path, bytes).expect("file should be written");
|
||||
}
|
||||
|
||||
fn test_ctx(game_dir: PathBuf) -> Ctx {
|
||||
Ctx::new(
|
||||
Arc::new(RwLock::new(PeerGameDB::new())),
|
||||
"peer".to_string(),
|
||||
game_dir,
|
||||
Arc::new(FakeUnpacker),
|
||||
CancellationToken::new(),
|
||||
TaskTracker::new(),
|
||||
Arc::new(RwLock::new(HashSet::from(["game".to_string()]))),
|
||||
)
|
||||
}
|
||||
|
||||
async fn recv_event(rx: &mut mpsc::UnboundedReceiver<PeerEvent>) -> PeerEvent {
|
||||
tokio::time::timeout(Duration::from_secs(1), rx.recv())
|
||||
.await
|
||||
.expect("event should arrive")
|
||||
.expect("event channel should remain open")
|
||||
}
|
||||
|
||||
fn assert_local_update(event: PeerEvent, installed: bool, downloaded: bool) {
|
||||
let PeerEvent::LocalGamesUpdated(games) = event else {
|
||||
panic!("expected LocalGamesUpdated");
|
||||
};
|
||||
let game = games
|
||||
.iter()
|
||||
.find(|game| game.id == "game")
|
||||
.expect("game should be announced");
|
||||
assert_eq!(game.installed, installed);
|
||||
assert_eq!(game.downloaded, downloaded);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn install_refreshes_settled_state_after_guard_release() {
|
||||
let temp = TempDir::new("lanspread-handler-install");
|
||||
let root = temp.game_root();
|
||||
write_file(&root.join("version.ini"), b"20250101");
|
||||
write_file(&root.join("game.eti"), b"archive");
|
||||
|
||||
let ctx = test_ctx(temp.path().to_path_buf());
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
|
||||
run_install_operation(&ctx, &tx, "game".to_string()).await;
|
||||
|
||||
match recv_event(&mut rx).await {
|
||||
PeerEvent::InstallGameBegin { id, operation } => {
|
||||
assert_eq!(id, "game");
|
||||
assert_eq!(operation, InstallOperation::Installing);
|
||||
}
|
||||
_ => panic!("expected InstallGameBegin"),
|
||||
}
|
||||
assert!(matches!(
|
||||
recv_event(&mut rx).await,
|
||||
PeerEvent::InstallGameFinished { id } if id == "game"
|
||||
));
|
||||
assert!(ctx.active_operations.read().await.is_empty());
|
||||
assert_local_update(recv_event(&mut rx).await, true, true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn update_refreshes_settled_state_after_guard_release() {
|
||||
let temp = TempDir::new("lanspread-handler-update");
|
||||
let root = temp.game_root();
|
||||
write_file(&root.join("version.ini"), b"20250101");
|
||||
write_file(&root.join("game.eti"), b"archive");
|
||||
write_file(&root.join("local").join("old.txt"), b"old");
|
||||
|
||||
let ctx = test_ctx(temp.path().to_path_buf());
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
|
||||
run_install_operation(&ctx, &tx, "game".to_string()).await;
|
||||
|
||||
match recv_event(&mut rx).await {
|
||||
PeerEvent::InstallGameBegin { id, operation } => {
|
||||
assert_eq!(id, "game");
|
||||
assert_eq!(operation, InstallOperation::Updating);
|
||||
}
|
||||
_ => panic!("expected InstallGameBegin"),
|
||||
}
|
||||
assert!(matches!(
|
||||
recv_event(&mut rx).await,
|
||||
PeerEvent::InstallGameFinished { id } if id == "game"
|
||||
));
|
||||
assert!(ctx.active_operations.read().await.is_empty());
|
||||
assert_local_update(recv_event(&mut rx).await, true, true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn uninstall_refreshes_settled_state_after_guard_release() {
|
||||
let temp = TempDir::new("lanspread-handler-uninstall");
|
||||
let root = temp.game_root();
|
||||
write_file(&root.join("version.ini"), b"20250101");
|
||||
write_file(&root.join("game.eti"), b"archive");
|
||||
write_file(&root.join("local").join("old.txt"), b"old");
|
||||
|
||||
let ctx = test_ctx(temp.path().to_path_buf());
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
|
||||
run_uninstall_operation(&ctx, &tx, "game".to_string()).await;
|
||||
|
||||
assert!(matches!(
|
||||
recv_event(&mut rx).await,
|
||||
PeerEvent::UninstallGameBegin { id } if id == "game"
|
||||
));
|
||||
assert!(matches!(
|
||||
recv_event(&mut rx).await,
|
||||
PeerEvent::UninstallGameFinished { id } if id == "game"
|
||||
));
|
||||
assert!(ctx.active_operations.read().await.is_empty());
|
||||
assert_local_update(recv_event(&mut rx).await, false, true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_changing_set_game_dir_is_rejected_while_operations_are_active() {
|
||||
let current = TempDir::new("lanspread-handler-current-dir");
|
||||
let next = TempDir::new("lanspread-handler-next-dir");
|
||||
let ctx = test_ctx(current.path().to_path_buf());
|
||||
ctx.active_operations
|
||||
.write()
|
||||
.await
|
||||
.insert("game".to_string(), OperationKind::Downloading);
|
||||
let (tx, _rx) = mpsc::unbounded_channel();
|
||||
|
||||
handle_set_game_dir_command(&ctx, &tx, next.path().to_path_buf()).await;
|
||||
|
||||
assert_eq!(*ctx.game_dir.read().await, current.path());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn same_path_set_game_dir_refreshes_without_recovery() {
|
||||
let temp = TempDir::new("lanspread-handler-same-dir");
|
||||
write_file(&temp.game_root().join(".version.ini.tmp"), b"tmp");
|
||||
let ctx = test_ctx(temp.path().to_path_buf());
|
||||
let (tx, _rx) = mpsc::unbounded_channel();
|
||||
|
||||
handle_set_game_dir_command(&ctx, &tx, temp.path().to_path_buf()).await;
|
||||
ctx.task_tracker.close();
|
||||
ctx.task_tracker.wait().await;
|
||||
|
||||
assert!(temp.game_root().join(".version.ini.tmp").is_file());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_changing_set_game_dir_runs_recovery() {
|
||||
let current = TempDir::new("lanspread-handler-old-dir");
|
||||
let next = TempDir::new("lanspread-handler-new-dir");
|
||||
write_file(&next.game_root().join(".version.ini.tmp"), b"tmp");
|
||||
let ctx = test_ctx(current.path().to_path_buf());
|
||||
let (tx, _rx) = mpsc::unbounded_channel();
|
||||
|
||||
handle_set_game_dir_command(&ctx, &tx, next.path().to_path_buf()).await;
|
||||
ctx.task_tracker.close();
|
||||
ctx.task_tracker.wait().await;
|
||||
|
||||
assert!(!next.game_root().join(".version.ini.tmp").exists());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user