peers gone...
This commit is contained in:
@@ -33,6 +33,7 @@ use tokio::{
|
||||
RwLock,
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -126,6 +127,9 @@ pub enum PeerEvent {
|
||||
DownloadGameFilesFailed {
|
||||
id: String,
|
||||
},
|
||||
DownloadGameFilesAllPeersGone {
|
||||
id: String,
|
||||
},
|
||||
NoPeersHaveGame {
|
||||
id: String,
|
||||
},
|
||||
@@ -1309,6 +1313,7 @@ struct Ctx {
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
|
||||
downloading_games: Arc<RwLock<HashSet<String>>>,
|
||||
active_downloads: Arc<RwLock<HashMap<String, JoinHandle<()>>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -1347,6 +1352,7 @@ pub async fn run_peer(
|
||||
peer_game_db: Arc::new(RwLock::new(PeerGameDB::new())),
|
||||
local_peer_addr: Arc::new(RwLock::new(None)),
|
||||
downloading_games: Arc::new(RwLock::new(HashSet::new())),
|
||||
active_downloads: Arc::new(RwLock::new(HashMap::new())),
|
||||
};
|
||||
|
||||
let peer_ctx = PeerCtx {
|
||||
@@ -1384,8 +1390,16 @@ pub async fn run_peer(
|
||||
// Start ping service task
|
||||
let tx_notify_ui_ping = tx_notify_ui.clone();
|
||||
let peer_game_db_ping = ctx.peer_game_db.clone();
|
||||
let downloading_games_ping = ctx.downloading_games.clone();
|
||||
let active_downloads_ping = ctx.active_downloads.clone();
|
||||
tokio::spawn(async move {
|
||||
run_ping_service(tx_notify_ui_ping, peer_game_db_ping).await;
|
||||
run_ping_service(
|
||||
tx_notify_ui_ping,
|
||||
peer_game_db_ping,
|
||||
downloading_games_ping,
|
||||
active_downloads_ping,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
// Start local game directory monitoring task
|
||||
@@ -1686,32 +1700,39 @@ async fn handle_download_game_files_command(
|
||||
}
|
||||
|
||||
let downloading_games = ctx.downloading_games.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
tokio::spawn(async move {
|
||||
let active_downloads = ctx.active_downloads.clone();
|
||||
let tx_notify_ui_clone = tx_notify_ui.clone();
|
||||
let download_id = id.clone();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let result = download_game_files(
|
||||
&id,
|
||||
&download_id,
|
||||
resolved_descriptions,
|
||||
games_folder,
|
||||
peer_whitelist,
|
||||
file_peer_map,
|
||||
tx_notify_ui.clone(),
|
||||
tx_notify_ui_clone.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
{
|
||||
let mut guard = downloading_games.write().await;
|
||||
guard.remove(&id);
|
||||
guard.remove(&download_id);
|
||||
}
|
||||
|
||||
if let Err(e) = result {
|
||||
log::error!("Download failed for {id}: {e}");
|
||||
if let Err(send_err) =
|
||||
tx_notify_ui.send(PeerEvent::DownloadGameFilesFailed { id: id.clone() })
|
||||
{
|
||||
log::error!("Download failed for {download_id}: {e}");
|
||||
if let Err(send_err) = tx_notify_ui_clone.send(PeerEvent::DownloadGameFilesFailed {
|
||||
id: download_id.clone(),
|
||||
}) {
|
||||
log::error!("Failed to send DownloadGameFilesFailed event: {send_err}");
|
||||
}
|
||||
}
|
||||
|
||||
let _ = active_downloads.write().await.remove(&download_id);
|
||||
});
|
||||
|
||||
ctx.active_downloads.write().await.insert(id, handle);
|
||||
}
|
||||
|
||||
async fn handle_set_game_dir_command(ctx: &Ctx, game_dir: String) {
|
||||
@@ -2155,9 +2176,54 @@ async fn request_game_details_from_peer(
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_active_downloads_without_peers(
|
||||
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
||||
downloading_games: &Arc<RwLock<HashSet<String>>>,
|
||||
active_downloads: &Arc<RwLock<HashMap<String, JoinHandle<()>>>>,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
) {
|
||||
let active_ids: Vec<String> = { downloading_games.read().await.iter().cloned().collect() };
|
||||
if active_ids.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
for id in active_ids {
|
||||
let has_peers = {
|
||||
let guard = peer_game_db.read().await;
|
||||
!guard.peers_with_game(&id).is_empty()
|
||||
};
|
||||
|
||||
if has_peers {
|
||||
continue;
|
||||
}
|
||||
|
||||
let removed_from_tracking = {
|
||||
let mut guard = downloading_games.write().await;
|
||||
guard.remove(&id)
|
||||
};
|
||||
|
||||
if !removed_from_tracking {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(handle) = { active_downloads.write().await.remove(&id) } {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
if let Err(e) =
|
||||
tx_notify_ui.send(PeerEvent::DownloadGameFilesAllPeersGone { id: id.clone() })
|
||||
{
|
||||
log::error!("Failed to send DownloadGameFilesAllPeersGone event: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_lines)]
|
||||
async fn run_ping_service(
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
downloading_games: Arc<RwLock<HashSet<String>>>,
|
||||
active_downloads: Arc<RwLock<HashMap<String, JoinHandle<()>>>>,
|
||||
) {
|
||||
log::info!(
|
||||
"Starting ping service ({PEER_PING_INTERVAL_SECS}s interval, \
|
||||
@@ -2174,6 +2240,8 @@ async fn run_ping_service(
|
||||
for peer_addr in peer_addresses {
|
||||
let tx_notify_ui_clone = tx_notify_ui.clone();
|
||||
let peer_game_db_clone = peer_game_db.clone();
|
||||
let downloading_games_clone = downloading_games.clone();
|
||||
let active_downloads_clone = active_downloads.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
match ping_peer(peer_addr).await {
|
||||
@@ -2208,6 +2276,13 @@ async fn run_ping_service(
|
||||
}
|
||||
|
||||
emit_peer_game_list(&peer_game_db_clone, &tx_notify_ui_clone).await;
|
||||
handle_active_downloads_without_peers(
|
||||
&peer_game_db_clone,
|
||||
&downloading_games_clone,
|
||||
&active_downloads_clone,
|
||||
&tx_notify_ui_clone,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2233,6 +2308,13 @@ async fn run_ping_service(
|
||||
}
|
||||
|
||||
emit_peer_game_list(&peer_game_db_clone, &tx_notify_ui_clone).await;
|
||||
handle_active_downloads_without_peers(
|
||||
&peer_game_db_clone,
|
||||
&downloading_games_clone,
|
||||
&active_downloads_clone,
|
||||
&tx_notify_ui_clone,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2266,6 +2348,13 @@ async fn run_ping_service(
|
||||
|
||||
if removed_any {
|
||||
emit_peer_game_list(&peer_game_db, &tx_notify_ui).await;
|
||||
handle_active_downloads_without_peers(
|
||||
&peer_game_db,
|
||||
&downloading_games,
|
||||
&active_downloads,
|
||||
&tx_notify_ui,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user