diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index fca693a..eb34dab 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -1343,6 +1343,8 @@ struct PeerCtx { local_game_db: Arc>>, local_peer_addr: Arc>>, downloading_games: Arc>>, + peer_game_db: Arc>, + tx_notify_ui: UnboundedSender, } impl std::fmt::Debug for PeerCtx { @@ -1382,6 +1384,8 @@ pub async fn run_peer( local_game_db: ctx.local_game_db.clone(), local_peer_addr: ctx.local_peer_addr.clone(), downloading_games: ctx.downloading_games.clone(), + peer_game_db: ctx.peer_game_db.clone(), + tx_notify_ui: tx_notify_ui.clone(), }; // Start server component @@ -1982,6 +1986,26 @@ async fn handle_peer_stream( Request::Invalid(_, _) => { log::error!("Received invalid request from peer"); } + Request::AnnounceGames(games) => { + log::info!( + "Received {} announced games from peer {remote_addr:?}", + games.len() + ); + if let Some(addr) = remote_addr { + let aggregated_games = { + let mut db = ctx.peer_game_db.write().await; + db.update_peer_games(addr, games); + db.get_all_games() + }; + + if let Err(e) = ctx + .tx_notify_ui + .send(PeerEvent::ListGames(aggregated_games)) + { + log::error!("Failed to send ListGames event: {e}"); + } + } + } } } Ok(None) => { @@ -2142,6 +2166,28 @@ async fn request_games_from_peer( Ok(()) } +async fn announce_games_to_peer(peer_addr: SocketAddr, games: Vec) -> eyre::Result<()> { + let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; + + let client = QuicClient::builder() + .with_tls(CERT_PEM)? + .with_io("0.0.0.0:0")? + .with_limits(limits)? + .start()?; + + let conn = Connect::new(peer_addr).with_server_name("localhost"); + let mut conn = client.connect(conn).await?; + + let stream = conn.open_bidirectional_stream().await?; + let (_, mut tx) = stream.split(); + + // Send AnnounceGames request + tx.send(Request::AnnounceGames(games).encode()).await?; + let _ = tx.close().await; + + Ok(()) +} + async fn request_game_details_from_peer( peer_addr: SocketAddr, game_id: &str, @@ -2435,10 +2481,24 @@ async fn run_local_game_monitor(tx_notify_ui: UnboundedSender, ctx: C .unwrap_or_default(); if let Err(e) = - tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games)) + tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) { log::error!("Failed to send LocalGamesUpdated event: {e}"); } + + // Broadcast update to all peers + let peer_addresses = + { ctx.peer_game_db.read().await.get_peer_addresses() }; + for peer_addr in peer_addresses { + let games_clone = all_games.clone(); + tokio::spawn(async move { + if let Err(e) = + announce_games_to_peer(peer_addr, games_clone).await + { + log::warn!("Failed to announce games to {peer_addr}: {e}"); + } + }); + } } } else { log::info!("Detected removed games: {removed_games:?}"); @@ -2450,9 +2510,23 @@ async fn run_local_game_monitor(tx_notify_ui: UnboundedSender, ctx: C .map(|db| db.all_games().into_iter().cloned().collect::>()) .unwrap_or_default(); - if let Err(e) = tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games)) { + if let Err(e) = + tx_notify_ui.send(PeerEvent::LocalGamesUpdated(all_games.clone())) + { log::error!("Failed to send LocalGamesUpdated event: {e}"); } + + // Broadcast update to all peers + let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() }; + for peer_addr in peer_addresses { + let games_clone = all_games.clone(); + tokio::spawn(async move { + if let Err(e) = announce_games_to_peer(peer_addr, games_clone).await + { + log::warn!("Failed to announce games to {peer_addr}: {e}"); + } + }); + } } } Err(e) => { diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs index 3e45192..6f7732c 100644 --- a/crates/lanspread-proto/src/lib.rs +++ b/crates/lanspread-proto/src/lib.rs @@ -16,6 +16,7 @@ pub enum Request { offset: u64, length: u64, }, + AnnounceGames(Vec), Invalid(Bytes, String), }