refactor: Centralize local game database updates and announcements, and add retry logic for requesting games from peers.

This commit is contained in:
2025-11-18 21:42:47 +01:00
parent f9923bd61e
commit df01131f8d
+127 -106
View File
@@ -1481,7 +1481,7 @@ pub async fn run_peer(
.await; .await;
} }
PeerCommand::SetGameDir(game_dir) => { PeerCommand::SetGameDir(game_dir) => {
handle_set_game_dir_command(&ctx, game_dir).await; handle_set_game_dir_command(&ctx, &tx_notify_ui, game_dir).await;
} }
PeerCommand::GetPeerCount => { PeerCommand::GetPeerCount => {
handle_get_peer_count_command(&ctx, &tx_notify_ui).await; handle_get_peer_count_command(&ctx, &tx_notify_ui).await;
@@ -1786,17 +1786,23 @@ async fn handle_download_game_files_command(
ctx.active_downloads.write().await.insert(id, handle); ctx.active_downloads.write().await.insert(id, handle);
} }
async fn handle_set_game_dir_command(ctx: &Ctx, game_dir: String) { async fn handle_set_game_dir_command(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
game_dir: String,
) {
*ctx.game_dir.write().await = Some(game_dir.clone()); *ctx.game_dir.write().await = Some(game_dir.clone());
log::info!("Game directory set to: {game_dir}"); log::info!("Game directory set to: {game_dir}");
// Load local game database when game directory is set // Load local game database when game directory is set
let game_dir = game_dir.clone(); let game_dir = game_dir.clone();
let local_game_db = ctx.local_game_db.clone(); let tx_notify_ui = tx_notify_ui.clone();
let ctx_clone = ctx.clone();
tokio::spawn(async move { tokio::spawn(async move {
match load_local_game_db(&game_dir).await { match load_local_game_db(&game_dir).await {
Ok(db) => { Ok(db) => {
*local_game_db.write().await = Some(db); update_and_announce_games(&ctx_clone, &tx_notify_ui, db).await;
log::info!("Local game database loaded successfully"); log::info!("Local game database loaded successfully");
} }
Err(e) => { Err(e) => {
@@ -1806,6 +1812,80 @@ async fn handle_set_game_dir_command(ctx: &Ctx, game_dir: String) {
}); });
} }
async fn update_and_announce_games(
ctx: &Ctx,
tx_notify_ui: &UnboundedSender<PeerEvent>,
new_db: GameDB,
) {
let local_game_db = ctx.local_game_db.clone();
let mut db_guard = local_game_db.write().await;
let previous_games = db_guard
.as_ref()
.map(|db| db.games.keys().cloned().collect::<HashSet<_>>())
.unwrap_or_default();
let current_game_ids = new_db.games.keys().cloned().collect::<HashSet<_>>();
// Check if any games were removed
let removed_games: Vec<String> = previous_games
.difference(&current_game_ids)
.cloned()
.collect();
if removed_games.is_empty() {
// Check if any games were added or updated
if previous_games != current_game_ids {
log::debug!("Local games directory structure changed, updating database");
*db_guard = Some(new_db);
let all_games = db_guard
.as_ref()
.map(|db| db.all_games().into_iter().cloned().collect::<Vec<Game>>())
.unwrap_or_default();
if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) {
log::error!("Failed to send LocalGamesUpdated event: {e}");
}
// Broadcast update to all peers
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
for peer_addr in peer_addresses {
let games_clone = all_games.clone();
tokio::spawn(async move {
if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await {
log::warn!("Failed to announce games to {peer_addr}: {e}");
}
});
}
}
} else {
log::info!("Detected removed games: {removed_games:?}");
*db_guard = Some(new_db);
// Notify UI about the change
let all_games = db_guard
.as_ref()
.map(|db| db.all_games().into_iter().cloned().collect::<Vec<Game>>())
.unwrap_or_default();
if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) {
log::error!("Failed to send LocalGamesUpdated event: {e}");
}
// Broadcast update to all peers
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
for peer_addr in peer_addresses {
let games_clone = all_games.clone();
tokio::spawn(async move {
if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await {
log::warn!("Failed to announce games to {peer_addr}: {e}");
}
});
}
}
}
async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) { async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
log::info!("GetPeerCount command received"); log::info!("GetPeerCount command received");
let peer_count = { ctx.peer_game_db.read().await.get_peer_addresses().len() }; let peer_count = { ctx.peer_game_db.read().await.get_peer_addresses().len() };
@@ -2124,9 +2204,13 @@ async fn run_peer_discovery(
let tx_notify_ui_clone = tx_notify_ui.clone(); let tx_notify_ui_clone = tx_notify_ui.clone();
let peer_game_db_clone = peer_game_db.clone(); let peer_game_db_clone = peer_game_db.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = if let Err(e) = request_games_from_peer(
request_games_from_peer(peer_addr, tx_notify_ui_clone, peer_game_db_clone) peer_addr,
.await tx_notify_ui_clone,
peer_game_db_clone,
0,
)
.await
{ {
log::error!("Failed to request games from peer {peer_addr}: {e}"); log::error!("Failed to request games from peer {peer_addr}: {e}");
} }
@@ -2154,7 +2238,37 @@ async fn request_games_from_peer(
peer_addr: SocketAddr, peer_addr: SocketAddr,
tx_notify_ui: UnboundedSender<PeerEvent>, tx_notify_ui: UnboundedSender<PeerEvent>,
peer_game_db: Arc<RwLock<PeerGameDB>>, peer_game_db: Arc<RwLock<PeerGameDB>>,
mut retry_count: u32,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
loop {
match fetch_games_from_peer(peer_addr).await {
Ok(games) => {
log::info!("Received {} games from peer {peer_addr}", games.len());
if games.is_empty() && retry_count < 1 {
log::info!("Received 0 games from peer {peer_addr}, scheduling retry in 5s");
tokio::time::sleep(Duration::from_secs(5)).await;
retry_count += 1;
continue;
}
let aggregated_games = {
let mut db = peer_game_db.write().await;
db.update_peer_games(peer_addr, games);
db.get_all_games()
};
if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) {
log::error!("Failed to send ListGames event: {e}");
}
return Ok(());
}
Err(e) => return Err(e),
}
}
}
async fn fetch_games_from_peer(peer_addr: SocketAddr) -> eyre::Result<Vec<Game>> {
let mut conn = connect_to_peer(peer_addr).await?; let mut conn = connect_to_peer(peer_addr).await?;
let stream = conn.open_bidirectional_stream().await?; let stream = conn.open_bidirectional_stream().await?;
@@ -2173,26 +2287,12 @@ async fn request_games_from_peer(
} }
let response = Response::decode(data.freeze()); let response = Response::decode(data.freeze());
match response { if let Response::ListGames(games) = response {
Response::ListGames(games) => { Ok(games)
log::info!("Received {} games from peer {peer_addr}", games.len()); } else {
log::warn!("Unexpected response from peer {peer_addr}: {response:?}");
let aggregated_games = { Ok(Vec::new()) // Treat unexpected response as empty list or error?
let mut db = peer_game_db.write().await;
db.update_peer_games(peer_addr, games);
db.get_all_games()
};
if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(aggregated_games)) {
log::error!("Failed to send ListGames event: {e}");
}
}
_ => {
log::warn!("Unexpected response from peer {peer_addr}: {response:?}");
}
} }
Ok(())
} }
async fn announce_games_to_peer(peer_addr: SocketAddr, games: Vec<Game>) -> eyre::Result<()> { async fn announce_games_to_peer(peer_addr: SocketAddr, games: Vec<Game>) -> eyre::Result<()> {
@@ -2465,86 +2565,7 @@ async fn run_local_game_monitor(tx_notify_ui: UnboundedSender<PeerEvent>, ctx: C
if let Some(ref game_dir) = game_dir { if let Some(ref game_dir) = game_dir {
match scan_local_games(game_dir).await { match scan_local_games(game_dir).await {
Ok(current_games) => { Ok(current_games) => {
let local_game_db = ctx.local_game_db.clone(); update_and_announce_games(&ctx, &tx_notify_ui, current_games).await;
let mut db_guard = local_game_db.write().await;
let previous_games = db_guard
.as_ref()
.map(|db| db.games.keys().cloned().collect::<HashSet<_>>())
.unwrap_or_default();
let current_game_ids =
current_games.games.keys().cloned().collect::<HashSet<_>>();
// Check if any games were removed
let removed_games: Vec<String> = previous_games
.difference(&current_game_ids)
.cloned()
.collect();
if removed_games.is_empty() {
// Check if any games were added or updated
if previous_games != current_game_ids {
log::debug!(
"Local games directory structure changed, updating database"
);
*db_guard = Some(current_games);
let all_games = db_guard
.as_ref()
.map(|db| {
db.all_games().into_iter().cloned().collect::<Vec<Game>>()
})
.unwrap_or_default();
if let Err(e) =
tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone()))
{
log::error!("Failed to send LocalGamesUpdated event: {e}");
}
// Broadcast update to all peers
let peer_addresses =
{ ctx.peer_game_db.read().await.get_peer_addresses() };
for peer_addr in peer_addresses {
let games_clone = all_games.clone();
tokio::spawn(async move {
if let Err(e) =
announce_games_to_peer(peer_addr, games_clone).await
{
log::warn!("Failed to announce games to {peer_addr}: {e}");
}
});
}
}
} else {
log::info!("Detected removed games: {removed_games:?}");
*db_guard = Some(current_games);
// Notify UI about the change
let all_games = db_guard
.as_ref()
.map(|db| db.all_games().into_iter().cloned().collect::<Vec<Game>>())
.unwrap_or_default();
if let Err(e) =
tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone()))
{
log::error!("Failed to send LocalGamesUpdated event: {e}");
}
// Broadcast update to all peers
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
for peer_addr in peer_addresses {
let games_clone = all_games.clone();
tokio::spawn(async move {
if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await
{
log::warn!("Failed to announce games to {peer_addr}: {e}");
}
});
}
}
} }
Err(e) => { Err(e) => {
log::error!("Failed to scan local games directory: {e}"); log::error!("Failed to scan local games directory: {e}");