fix(peer): harden streamed install lifecycle
Claude Fable 5's branch review found that receiver cancellation or a QUIC send failure could leave the sender-side archive producer blocked on the bounded frame channel. That kept the outbound transfer guard alive and could block later installs or updates of the same game. Route archive frames through a cancellable StreamInstallFrameSink instead of exposing the raw channel sender to providers. The QUIC forwarder now cancels and closes the receive side before awaiting the producer, so a blocked send wakes and the transfer guard can drop normally. Make PeerCommand::StreamInstallGame own its peer metadata preflight inside the peer core. The Tauri layer now sends the command directly, and the peer runtime fetches file details from catalog-version peers before running the existing majority validation and retry logic. This removes the UI-only pending streamed install set and gives PeerEvent::GotGameFiles one meaning again: continue a normal archive download. Tighten the receiver transaction edge cases too. Rollback removes a newly created empty game root, but preserves pre-existing roots. Once streamed staging has been promoted to local/, intent or launch-settings cleanup failures are logged for startup recovery instead of reporting a failed install for bytes that are already committed. Accept missing RAR CRC32 metadata for zero-byte files as CRC32 00000000 while still requiring CRC32 metadata for non-empty files. Update the peer README, scenario docs, and next-steps handoff so the documented ownership and remaining trust limitation match the implementation. Test Plan: - just fmt - just test - just frontend-test - just clippy - git diff --check - python3 -m py_compile \ crates/lanspread-peer-cli/scripts/run_extended_scenarios.py - python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \ S39 S40 S41 S42 S43 S44 S45 S46 S47 --build-image Refs: streamed-install review handoff from Claude Fable 5
This commit is contained in:
@@ -39,6 +39,7 @@ pub struct StreamedInstallTransaction {
|
||||
id: String,
|
||||
staging: PathBuf,
|
||||
eti_version: Option<String>,
|
||||
created_game_root: bool,
|
||||
}
|
||||
|
||||
impl StreamedInstallTransaction {
|
||||
@@ -49,40 +50,61 @@ impl StreamedInstallTransaction {
|
||||
|
||||
pub async fn commit(self) -> eyre::Result<()> {
|
||||
let local = local_dir(&self.game_root);
|
||||
let result = async {
|
||||
tokio::fs::rename(&self.staging, &local)
|
||||
.await
|
||||
.wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))?;
|
||||
reset_launch_settings_marker(&self.state_dir, &self.id).await?;
|
||||
write_intent(
|
||||
&self.state_dir,
|
||||
&self.id,
|
||||
&InstallIntent::none(&self.id, self.eti_version.clone()),
|
||||
)
|
||||
if let Err(err) = tokio::fs::rename(&self.staging, &local)
|
||||
.await
|
||||
}
|
||||
.await;
|
||||
|
||||
if result.is_err() {
|
||||
.wrap_err_with(|| format!("failed to promote streamed install for {}", self.id))
|
||||
{
|
||||
if let Err(cleanup_err) = remove_dir_all_if_exists(&self.staging).await {
|
||||
log::warn!(
|
||||
"Failed to clean streamed install staging {}: {cleanup_err}",
|
||||
self.staging.display()
|
||||
);
|
||||
}
|
||||
if let Err(cleanup_err) =
|
||||
remove_created_empty_game_root(&self.game_root, self.created_game_root).await
|
||||
{
|
||||
log::warn!(
|
||||
"Failed to clean streamed install game root {}: {cleanup_err}",
|
||||
self.game_root.display()
|
||||
);
|
||||
}
|
||||
let _ = write_intent(
|
||||
&self.state_dir,
|
||||
&self.id,
|
||||
&InstallIntent::none(&self.id, self.eti_version.clone()),
|
||||
)
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
result
|
||||
if let Err(err) = reset_launch_settings_marker(&self.state_dir, &self.id).await {
|
||||
log::error!(
|
||||
"Streamed install for {} was promoted but launch-settings marker reset failed: {err}",
|
||||
self.id
|
||||
);
|
||||
}
|
||||
if let Err(err) = write_intent(
|
||||
&self.state_dir,
|
||||
&self.id,
|
||||
&InstallIntent::none(&self.id, self.eti_version.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
log::error!(
|
||||
"Streamed install for {} was promoted but intent cleanup failed: {err}",
|
||||
self.id
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn rollback(self) -> eyre::Result<()> {
|
||||
let staging_result = remove_dir_all_if_exists(&self.staging).await;
|
||||
let cleanup_result = async {
|
||||
remove_dir_all_if_exists(&self.staging).await?;
|
||||
remove_created_empty_game_root(&self.game_root, self.created_game_root).await
|
||||
}
|
||||
.await;
|
||||
let intent_result = write_intent(
|
||||
&self.state_dir,
|
||||
&self.id,
|
||||
@@ -90,7 +112,7 @@ impl StreamedInstallTransaction {
|
||||
)
|
||||
.await;
|
||||
|
||||
staging_result?;
|
||||
cleanup_result?;
|
||||
intent_result
|
||||
}
|
||||
}
|
||||
@@ -104,18 +126,36 @@ pub async fn begin_streamed_install(
|
||||
eyre::bail!("game {id} is already installed");
|
||||
}
|
||||
|
||||
let created_game_root = !path_exists(game_root).await;
|
||||
tokio::fs::create_dir_all(game_root).await?;
|
||||
let eti_version = read_downloaded_version(game_root).await;
|
||||
write_intent(
|
||||
if let Err(err) = write_intent(
|
||||
state_dir,
|
||||
id,
|
||||
&InstallIntent::new(id, InstallIntentState::Installing, eti_version.clone()),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
if let Err(cleanup_err) = remove_created_empty_game_root(game_root, created_game_root).await
|
||||
{
|
||||
log::warn!(
|
||||
"Failed to clean streamed install game root {}: {cleanup_err}",
|
||||
game_root.display()
|
||||
);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let staging = installing_dir(game_root);
|
||||
if let Err(err) = prepare_owned_empty_dir(&staging).await {
|
||||
let _ = write_intent(state_dir, id, &InstallIntent::none(id, eti_version)).await;
|
||||
if let Err(cleanup_err) = remove_created_empty_game_root(game_root, created_game_root).await
|
||||
{
|
||||
log::warn!(
|
||||
"Failed to clean streamed install game root {}: {cleanup_err}",
|
||||
game_root.display()
|
||||
);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
@@ -127,6 +167,7 @@ pub async fn begin_streamed_install(
|
||||
id: id.to_string(),
|
||||
staging,
|
||||
eti_version,
|
||||
created_game_root,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -586,6 +627,28 @@ async fn remove_dir_all_if_exists(path: &Path) -> eyre::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_created_empty_game_root(game_root: &Path, created: bool) -> eyre::Result<()> {
|
||||
if !created {
|
||||
return Ok(());
|
||||
}
|
||||
remove_empty_dir_if_exists(game_root).await
|
||||
}
|
||||
|
||||
async fn remove_empty_dir_if_exists(path: &Path) -> eyre::Result<()> {
|
||||
match tokio::fs::remove_dir(path).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(err)
|
||||
if matches!(
|
||||
err.kind(),
|
||||
ErrorKind::NotFound | ErrorKind::DirectoryNotEmpty
|
||||
) =>
|
||||
{
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn path_is_dir(path: &Path) -> bool {
|
||||
tokio::fs::metadata(path)
|
||||
.await
|
||||
@@ -727,6 +790,74 @@ mod tests {
|
||||
assert!(!launch_settings_applied_path(state.path(), "game").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn streamed_install_rollback_removes_new_empty_game_root() {
|
||||
let temp = TempDir::new("lanspread-install");
|
||||
let state = test_state();
|
||||
let root = temp.path().join("streamed-game");
|
||||
|
||||
let transaction = begin_streamed_install(&root, state.path(), "streamed-game")
|
||||
.await
|
||||
.expect("streamed transaction should begin");
|
||||
assert!(transaction.staging_dir().is_dir());
|
||||
|
||||
transaction
|
||||
.rollback()
|
||||
.await
|
||||
.expect("streamed rollback should succeed");
|
||||
|
||||
assert!(!root.exists());
|
||||
let intent = read_intent(state.path(), "streamed-game").await;
|
||||
assert_eq!(intent.state, InstallIntentState::None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn streamed_install_rollback_keeps_existing_game_root() {
|
||||
let temp = TempDir::new("lanspread-install");
|
||||
let state = test_state();
|
||||
let root = temp.game_root();
|
||||
write_file(&root.join("version.ini"), b"20250101");
|
||||
|
||||
let transaction = begin_streamed_install(&root, state.path(), "game")
|
||||
.await
|
||||
.expect("streamed transaction should begin");
|
||||
|
||||
transaction
|
||||
.rollback()
|
||||
.await
|
||||
.expect("streamed rollback should succeed");
|
||||
|
||||
assert!(root.is_dir());
|
||||
assert!(root.join("version.ini").is_file());
|
||||
assert!(!root.join(INSTALLING_DIR).exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn streamed_install_commit_succeeds_when_post_promote_intent_cleanup_fails() {
|
||||
let temp = TempDir::new("lanspread-install");
|
||||
let state = test_state();
|
||||
let root = temp.game_root();
|
||||
let transaction = begin_streamed_install(&root, state.path(), "game")
|
||||
.await
|
||||
.expect("streamed transaction should begin");
|
||||
write_file(&transaction.staging_dir().join("payload.txt"), b"installed");
|
||||
|
||||
let game_state_dir = crate::state_paths::game_state_dir(state.path(), "game");
|
||||
std::fs::remove_dir_all(&game_state_dir).expect("game state dir should be removed");
|
||||
write_file(&game_state_dir, b"not a directory");
|
||||
|
||||
transaction
|
||||
.commit()
|
||||
.await
|
||||
.expect("promoted streamed install should be reported as success");
|
||||
|
||||
assert_eq!(
|
||||
std::fs::read(root.join(LOCAL_DIR).join("payload.txt"))
|
||||
.expect("promoted payload should be present"),
|
||||
b"installed"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn install_unpacks_multiple_root_eti_archives_in_sorted_order() {
|
||||
let temp = TempDir::new("lanspread-install");
|
||||
|
||||
Reference in New Issue
Block a user