feat: Enable peers to announce and synchronize local game libraries.
This commit is contained in:
@@ -1343,6 +1343,8 @@ struct PeerCtx {
|
|||||||
local_game_db: Arc<RwLock<Option<GameDB>>>,
|
local_game_db: Arc<RwLock<Option<GameDB>>>,
|
||||||
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
|
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
|
||||||
downloading_games: Arc<RwLock<HashSet<String>>>,
|
downloading_games: Arc<RwLock<HashSet<String>>>,
|
||||||
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||||
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for PeerCtx {
|
impl std::fmt::Debug for PeerCtx {
|
||||||
@@ -1382,6 +1384,8 @@ pub async fn run_peer(
|
|||||||
local_game_db: ctx.local_game_db.clone(),
|
local_game_db: ctx.local_game_db.clone(),
|
||||||
local_peer_addr: ctx.local_peer_addr.clone(),
|
local_peer_addr: ctx.local_peer_addr.clone(),
|
||||||
downloading_games: ctx.downloading_games.clone(),
|
downloading_games: ctx.downloading_games.clone(),
|
||||||
|
peer_game_db: ctx.peer_game_db.clone(),
|
||||||
|
tx_notify_ui: tx_notify_ui.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start server component
|
// Start server component
|
||||||
@@ -1982,6 +1986,26 @@ async fn handle_peer_stream(
|
|||||||
Request::Invalid(_, _) => {
|
Request::Invalid(_, _) => {
|
||||||
log::error!("Received invalid request from peer");
|
log::error!("Received invalid request from peer");
|
||||||
}
|
}
|
||||||
|
Request::AnnounceGames(games) => {
|
||||||
|
log::info!(
|
||||||
|
"Received {} announced games from peer {remote_addr:?}",
|
||||||
|
games.len()
|
||||||
|
);
|
||||||
|
if let Some(addr) = remote_addr {
|
||||||
|
let aggregated_games = {
|
||||||
|
let mut db = ctx.peer_game_db.write().await;
|
||||||
|
db.update_peer_games(addr, games);
|
||||||
|
db.get_all_games()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = ctx
|
||||||
|
.tx_notify_ui
|
||||||
|
.send(PeerEvent::ListGames(aggregated_games))
|
||||||
|
{
|
||||||
|
log::error!("Failed to send ListGames event: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
@@ -2142,6 +2166,28 @@ async fn request_games_from_peer(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn announce_games_to_peer(peer_addr: SocketAddr, games: Vec<Game>) -> eyre::Result<()> {
|
||||||
|
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||||
|
|
||||||
|
let client = QuicClient::builder()
|
||||||
|
.with_tls(CERT_PEM)?
|
||||||
|
.with_io("0.0.0.0:0")?
|
||||||
|
.with_limits(limits)?
|
||||||
|
.start()?;
|
||||||
|
|
||||||
|
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||||
|
let mut conn = client.connect(conn).await?;
|
||||||
|
|
||||||
|
let stream = conn.open_bidirectional_stream().await?;
|
||||||
|
let (_, mut tx) = stream.split();
|
||||||
|
|
||||||
|
// Send AnnounceGames request
|
||||||
|
tx.send(Request::AnnounceGames(games).encode()).await?;
|
||||||
|
let _ = tx.close().await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn request_game_details_from_peer(
|
async fn request_game_details_from_peer(
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
game_id: &str,
|
game_id: &str,
|
||||||
@@ -2435,10 +2481,24 @@ async fn run_local_game_monitor(tx_notify_ui: UnboundedSender<PeerEvent>, ctx: C
|
|||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games))
|
tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone()))
|
||||||
{
|
{
|
||||||
log::error!("Failed to send LocalGamesUpdated event: {e}");
|
log::error!("Failed to send LocalGamesUpdated event: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast update to all peers
|
||||||
|
let peer_addresses =
|
||||||
|
{ ctx.peer_game_db.read().await.get_peer_addresses() };
|
||||||
|
for peer_addr in peer_addresses {
|
||||||
|
let games_clone = all_games.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) =
|
||||||
|
announce_games_to_peer(peer_addr, games_clone).await
|
||||||
|
{
|
||||||
|
log::warn!("Failed to announce games to {peer_addr}: {e}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log::info!("Detected removed games: {removed_games:?}");
|
log::info!("Detected removed games: {removed_games:?}");
|
||||||
@@ -2450,9 +2510,23 @@ async fn run_local_game_monitor(tx_notify_ui: UnboundedSender<PeerEvent>, ctx: C
|
|||||||
.map(|db| db.all_games().into_iter().cloned().collect::<Vec<Game>>())
|
.map(|db| db.all_games().into_iter().cloned().collect::<Vec<Game>>())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games)) {
|
if let Err(e) =
|
||||||
|
tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone()))
|
||||||
|
{
|
||||||
log::error!("Failed to send LocalGamesUpdated event: {e}");
|
log::error!("Failed to send LocalGamesUpdated event: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast update to all peers
|
||||||
|
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
|
||||||
|
for peer_addr in peer_addresses {
|
||||||
|
let games_clone = all_games.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await
|
||||||
|
{
|
||||||
|
log::warn!("Failed to announce games to {peer_addr}: {e}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ pub enum Request {
|
|||||||
offset: u64,
|
offset: u64,
|
||||||
length: u64,
|
length: u64,
|
||||||
},
|
},
|
||||||
|
AnnounceGames(Vec<Game>),
|
||||||
Invalid(Bytes, String),
|
Invalid(Bytes, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user