fix(ui): coalesce outbound transfer list refreshes
Every outbound transfer start and finish can arrive on a hot path while a peer is serving many file chunks. The Tauri event handler used to rebuild and emit the full games list for each edge, cloning all games and probing per-game server script files repeatedly during an active serve. Batch outbound-transfer count changes behind a short scheduled refresh. The peer still records exact counts in shared state, and the delayed refresh reads that state once per burst. A generation counter keeps changes that arrive while an emit is already scheduled from being lost; they trigger one follow-up emit with the latest counts. Test Plan: - just test - just clippy - git diff --check Refs: Claude review finding #2
This commit is contained in:
@@ -3,7 +3,7 @@ use std::{
|
|||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
path::{Component, Path, PathBuf},
|
path::{Component, Path, PathBuf},
|
||||||
sync::{Arc, OnceLock},
|
sync::{Arc, OnceLock},
|
||||||
time::{SystemTime, UNIX_EPOCH},
|
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||||
};
|
};
|
||||||
|
|
||||||
use eyre::bail;
|
use eyre::bail;
|
||||||
@@ -31,9 +31,41 @@ use tokio::sync::{
|
|||||||
|
|
||||||
// Learn more about Tauri commands at https://tauri.app/develop/calling-rust/
|
// Learn more about Tauri commands at https://tauri.app/develop/calling-rust/
|
||||||
|
|
||||||
type OutboundTransfers = Arc<
|
type OutboundTransfers =
|
||||||
RwLock<std::collections::HashMap<String, Vec<(u64, tokio_util::sync::CancellationToken)>>>,
|
Arc<RwLock<std::collections::HashMap<String, Vec<(u64, tokio_util::sync::CancellationToken)>>>>;
|
||||||
>;
|
|
||||||
|
const OUTBOUND_TRANSFER_EMIT_DEBOUNCE: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct OutboundTransferEmitState {
|
||||||
|
scheduled: bool,
|
||||||
|
generation: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OutboundTransferEmitState {
|
||||||
|
fn record_change(&mut self) -> bool {
|
||||||
|
self.generation = self.generation.saturating_add(1);
|
||||||
|
if self.scheduled {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.scheduled = true;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn observed_generation(&self) -> u64 {
|
||||||
|
self.generation
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finish_emit(&mut self, observed_generation: u64) -> bool {
|
||||||
|
if self.generation != observed_generation {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.scheduled = false;
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Tauri-managed runtime state shared by commands and setup tasks.
|
/// Tauri-managed runtime state shared by commands and setup tasks.
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@@ -48,6 +80,7 @@ struct LanSpreadState {
|
|||||||
unpack_logs: Arc<RwLock<Vec<UnpackLogEntry>>>,
|
unpack_logs: Arc<RwLock<Vec<UnpackLogEntry>>>,
|
||||||
state_dir: OnceLock<PathBuf>,
|
state_dir: OnceLock<PathBuf>,
|
||||||
active_outbound_transfers: OutboundTransfers,
|
active_outbound_transfers: OutboundTransfers,
|
||||||
|
outbound_transfer_emit: Arc<RwLock<OutboundTransferEmitState>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
@@ -1472,6 +1505,44 @@ fn spawn_peer_event_loop(app_handle: AppHandle, mut rx_peer_event: UnboundedRece
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn schedule_outbound_transfer_emit(app_handle: &AppHandle) {
|
||||||
|
let state = app_handle.state::<LanSpreadState>();
|
||||||
|
let should_spawn = {
|
||||||
|
let mut emit_state = state.outbound_transfer_emit.write().await;
|
||||||
|
emit_state.record_change()
|
||||||
|
};
|
||||||
|
|
||||||
|
if !should_spawn {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let app_handle = app_handle.clone();
|
||||||
|
tauri::async_runtime::spawn(async move {
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(OUTBOUND_TRANSFER_EMIT_DEBOUNCE).await;
|
||||||
|
|
||||||
|
let observed_generation = {
|
||||||
|
let state = app_handle.state::<LanSpreadState>();
|
||||||
|
state
|
||||||
|
.outbound_transfer_emit
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.observed_generation()
|
||||||
|
};
|
||||||
|
emit_games_list(&app_handle).await;
|
||||||
|
|
||||||
|
let needs_follow_up_emit = {
|
||||||
|
let state = app_handle.state::<LanSpreadState>();
|
||||||
|
let mut emit_state = state.outbound_transfer_emit.write().await;
|
||||||
|
emit_state.finish_emit(observed_generation)
|
||||||
|
};
|
||||||
|
if !needs_follow_up_emit {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_lines)]
|
#[allow(clippy::too_many_lines)]
|
||||||
async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
|
async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
|
||||||
match event {
|
match event {
|
||||||
@@ -1500,7 +1571,7 @@ async fn handle_peer_event(app_handle: &AppHandle, event: PeerEvent) {
|
|||||||
}
|
}
|
||||||
PeerEvent::OutboundTransferCountChanged => {
|
PeerEvent::OutboundTransferCountChanged => {
|
||||||
log::info!("PeerEvent::OutboundTransferCountChanged received");
|
log::info!("PeerEvent::OutboundTransferCountChanged received");
|
||||||
emit_games_list(app_handle).await;
|
schedule_outbound_transfer_emit(app_handle).await;
|
||||||
}
|
}
|
||||||
PeerEvent::GotGameFiles {
|
PeerEvent::GotGameFiles {
|
||||||
id,
|
id,
|
||||||
@@ -1935,6 +2006,32 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn outbound_transfer_emit_state_coalesces_bursts_without_losing_updates() {
|
||||||
|
let mut state = OutboundTransferEmitState::default();
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
state.record_change(),
|
||||||
|
"first change should schedule an emit"
|
||||||
|
);
|
||||||
|
assert_eq!(state.observed_generation(), 1);
|
||||||
|
assert!(
|
||||||
|
!state.record_change(),
|
||||||
|
"second change should reuse the scheduled emit"
|
||||||
|
);
|
||||||
|
assert_eq!(state.observed_generation(), 2);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
state.finish_emit(1),
|
||||||
|
"a generation observed before the latest change needs a follow-up emit"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!state.finish_emit(2),
|
||||||
|
"the latest observed generation clears the scheduled emit"
|
||||||
|
);
|
||||||
|
assert!(state.record_change(), "a later burst should schedule again");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn game_file_viewer_ids_must_be_single_path_components() {
|
fn game_file_viewer_ids_must_be_single_path_components() {
|
||||||
assert!(is_single_component_game_id("game"));
|
assert!(is_single_component_game_id("game"));
|
||||||
|
|||||||
Reference in New Issue
Block a user