fix(peer): exchange full library snapshots during handshake
Peer A failed to learn Peer B's games. The handshake only carried
library_rev/library_digest metadata, and the post-handshake sync path
compared those revisions against per-peer revision numbers that were
never advanced via this code path, so the games map for the remote
peer stayed empty and the UI never showed them.
The fix is to put the authoritative library data into the handshake
itself. Hello and HelloAck now carry a LibrarySnapshot directly, and
both perform_handshake_with_peer (outbound) and accept_inbound_hello
(inbound) apply that snapshot to the peer DB before emitting the UI
events. The initial peer-game-list event is now driven by the
handshake rather than by a follow-up LibrarySummary/LibrarySnapshot
roundtrip.
Bumps PROTOCOL_VERSION to 4 because the wire layout of Hello/HelloAck
changed. Per CLAUDE.md's protocol policy there is no compatibility
shim; older peers will fail the version check and be ignored.
Cleanups that fall out of the new design:
- The Hello / HelloAck library_rev and library_digest fields were
duplicated by the embedded LibrarySnapshot (which carries its own
library_rev, and whose digest is recomputed on apply). Collapsed
both messages to just `library: LibrarySnapshot` to remove the
foot-gun where the two could diverge.
- Request::LibrarySummary and Request::LibrarySnapshot are now dead
on the sender side and were removed along with their stream.rs
handlers and the LibrarySummary struct. LibraryDelta stays — it
is still sent from handlers.rs when the local library changes.
- record_remote_library previously called update_peer_library and
then apply_library_snapshot, which immediately overwrote the
rev/digest just written. Added update_peer_features and rewired
the call site so each peer-DB field is written exactly once.
update_peer_library is retained because discovery.rs still uses
it for the mDNS TXT-record path, where no snapshot is available.
- Removed the now-unused LibraryUpdate enum, select_library_update,
send_local_library_summary, send_local_library_update_if_needed,
LocalLibraryState::delta_since, build_library_summary,
send_library_summary, and send_library_snapshot.
Behavior change visible to users: when two peers come up on the LAN
they now see each other's full game lists immediately after the
handshake instead of waiting for a follow-up sync that, in the broken
case, never made the games visible at all.
Test Plan
- just clippy (clean for the touched crates)
- just test (workspace: all suites pass, including the two new
handshake tests: outbound_hello_carries_local_library_snapshot
and inbound_hello_applies_remote_library_snapshot, the latter
asserting PeerDiscovered + PeerCountUpdated + ListGames events
fire with the remote game visible)
- Manual: start `just peer-cli-alpha` and `just peer-cli-bravo` in
separate terminals; confirm each peer's game list shows the
other's library entries after discovery completes, without
requiring any additional command.
Refs
- FINDINGS.md: triage note that Claude's review surfaced only
in-scope cleanups (dead variants, duplicated header fields,
redundant DB writes, stale test fixture), all addressed here.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,7 +3,7 @@ use std::{
|
||||
hash::{Hash, Hasher},
|
||||
};
|
||||
|
||||
use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot, LibrarySummary};
|
||||
use lanspread_proto::{GameSummary, LibraryDelta, LibrarySnapshot};
|
||||
|
||||
const MAX_DELTA_HISTORY: usize = 8;
|
||||
|
||||
@@ -50,13 +50,6 @@ impl LocalLibraryState {
|
||||
}
|
||||
Some(delta)
|
||||
}
|
||||
|
||||
pub fn delta_since(&self, from_rev: u64) -> Option<LibraryDelta> {
|
||||
self.recent_deltas
|
||||
.iter()
|
||||
.find(|delta| delta.from_rev == from_rev)
|
||||
.cloned()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compute_library_digest(games: &HashMap<String, GameSummary>) -> u64 {
|
||||
@@ -77,14 +70,6 @@ pub fn compute_library_digest(games: &HashMap<String, GameSummary>) -> u64 {
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
pub fn build_library_summary(state: &LocalLibraryState) -> LibrarySummary {
|
||||
LibrarySummary {
|
||||
library_rev: state.revision,
|
||||
library_digest: state.digest,
|
||||
game_count: state.games.len(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_library_snapshot(state: &LocalLibraryState) -> LibrarySnapshot {
|
||||
let mut games: Vec<GameSummary> = state.games.values().cloned().collect();
|
||||
games.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
|
||||
@@ -9,16 +9,7 @@ use bytes::BytesMut;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use if_addrs::{IfAddr, Interface, get_if_addrs};
|
||||
use lanspread_db::db::GameFileDescription;
|
||||
use lanspread_proto::{
|
||||
Hello,
|
||||
HelloAck,
|
||||
LibraryDelta,
|
||||
LibrarySnapshot,
|
||||
LibrarySummary,
|
||||
Message,
|
||||
Request,
|
||||
Response,
|
||||
};
|
||||
use lanspread_proto::{Hello, HelloAck, LibraryDelta, Message, Request, Response};
|
||||
use s2n_quic::{Client as QuicClient, Connection, client::Connect, provider::limits::Limits};
|
||||
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
|
||||
|
||||
@@ -123,36 +114,6 @@ pub async fn exchange_hello(peer_addr: SocketAddr, hello: Hello) -> eyre::Result
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_library_summary(
|
||||
peer_addr: SocketAddr,
|
||||
peer_id: &str,
|
||||
summary: LibrarySummary,
|
||||
) -> eyre::Result<()> {
|
||||
send_oneway_request(
|
||||
peer_addr,
|
||||
Request::LibrarySummary {
|
||||
peer_id: peer_id.to_string(),
|
||||
summary,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_library_snapshot(
|
||||
peer_addr: SocketAddr,
|
||||
peer_id: &str,
|
||||
snapshot: LibrarySnapshot,
|
||||
) -> eyre::Result<()> {
|
||||
send_oneway_request(
|
||||
peer_addr,
|
||||
Request::LibrarySnapshot {
|
||||
peer_id: peer_id.to_string(),
|
||||
snapshot,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_library_delta(
|
||||
peer_addr: SocketAddr,
|
||||
peer_id: &str,
|
||||
|
||||
@@ -248,6 +248,14 @@ impl PeerGameDB {
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the advertised feature list for a peer.
|
||||
pub fn update_peer_features(&mut self, peer_id: &PeerId, features: Vec<String>) {
|
||||
if let Some(peer) = self.peers.get_mut(peer_id) {
|
||||
peer.features = features;
|
||||
peer.last_seen = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies a full library snapshot for a peer.
|
||||
pub fn apply_library_snapshot(&mut self, peer_id: &PeerId, snapshot: LibrarySnapshot) {
|
||||
if let Some(peer) = self.peers.get_mut(peer_id) {
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use lanspread_proto::{Hello, HelloAck, LibraryDelta, LibrarySnapshot, PROTOCOL_VERSION};
|
||||
use lanspread_proto::{Hello, HelloAck, PROTOCOL_VERSION};
|
||||
use tokio::sync::{RwLock, mpsc::UnboundedSender};
|
||||
|
||||
use crate::{
|
||||
@@ -10,16 +10,11 @@ use crate::{
|
||||
context::{Ctx, PeerCtx},
|
||||
events,
|
||||
identity::default_features,
|
||||
library::{LocalLibraryState, build_library_snapshot, build_library_summary},
|
||||
network::{exchange_hello, send_library_delta, send_library_snapshot, send_library_summary},
|
||||
library::{LocalLibraryState, build_library_snapshot},
|
||||
network::exchange_hello,
|
||||
peer_db::{PeerGameDB, PeerId, PeerUpsert},
|
||||
};
|
||||
|
||||
enum LibraryUpdate {
|
||||
Delta(LibraryDelta),
|
||||
Snapshot(LibrarySnapshot),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct HandshakeCtx {
|
||||
peer_id: Arc<String>,
|
||||
@@ -61,12 +56,12 @@ async fn required_listen_addr(
|
||||
pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> eyre::Result<HelloAck> {
|
||||
let library_guard = ctx.local_library.read().await;
|
||||
let listen_addr = required_listen_addr(&ctx.local_peer_addr).await?;
|
||||
let library = build_library_snapshot(&library_guard);
|
||||
Ok(HelloAck {
|
||||
peer_id: ctx.peer_id.as_ref().clone(),
|
||||
proto_ver: PROTOCOL_VERSION,
|
||||
listen_addr,
|
||||
library_rev: library_guard.revision,
|
||||
library_digest: library_guard.digest,
|
||||
library,
|
||||
features: default_features(),
|
||||
})
|
||||
}
|
||||
@@ -74,12 +69,12 @@ pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> eyre::Result<HelloAck> {
|
||||
async fn build_hello_from_state(ctx: &HandshakeCtx) -> eyre::Result<Hello> {
|
||||
let library_guard = ctx.local_library.read().await;
|
||||
let listen_addr = required_listen_addr(&ctx.local_peer_addr).await?;
|
||||
let library = build_library_snapshot(&library_guard);
|
||||
Ok(Hello {
|
||||
peer_id: ctx.peer_id.as_ref().clone(),
|
||||
proto_ver: PROTOCOL_VERSION,
|
||||
listen_addr,
|
||||
library_rev: library_guard.revision,
|
||||
library_digest: library_guard.digest,
|
||||
library,
|
||||
features: default_features(),
|
||||
})
|
||||
}
|
||||
@@ -120,20 +115,13 @@ pub(crate) async fn perform_handshake_with_peer(
|
||||
&ctx.peer_game_db,
|
||||
ack.peer_id.clone(),
|
||||
record_addr,
|
||||
ack.library_rev,
|
||||
ack.library_digest,
|
||||
ack.features.clone(),
|
||||
ack.library,
|
||||
)
|
||||
.await;
|
||||
|
||||
after_peer_library_recorded(
|
||||
&ctx,
|
||||
upsert,
|
||||
record_addr,
|
||||
ack.library_rev,
|
||||
ack.library_digest,
|
||||
)
|
||||
.await;
|
||||
after_peer_library_recorded(&ctx, upsert, record_addr).await;
|
||||
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -162,20 +150,13 @@ pub(super) async fn accept_inbound_hello(
|
||||
&ctx.peer_game_db,
|
||||
hello.peer_id.clone(),
|
||||
addr,
|
||||
hello.library_rev,
|
||||
hello.library_digest,
|
||||
hello.features.clone(),
|
||||
hello.library,
|
||||
)
|
||||
.await;
|
||||
|
||||
after_peer_library_recorded(
|
||||
&handshake_ctx,
|
||||
upsert,
|
||||
addr,
|
||||
hello.library_rev,
|
||||
hello.library_digest,
|
||||
)
|
||||
.await;
|
||||
after_peer_library_recorded(&handshake_ctx, upsert, addr).await;
|
||||
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
|
||||
|
||||
build_hello_ack(ctx).await
|
||||
}
|
||||
@@ -197,13 +178,13 @@ async fn record_remote_library(
|
||||
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
||||
peer_id: PeerId,
|
||||
peer_addr: SocketAddr,
|
||||
library_rev: u64,
|
||||
library_digest: u64,
|
||||
features: Vec<String>,
|
||||
snapshot: lanspread_proto::LibrarySnapshot,
|
||||
) -> PeerUpsert {
|
||||
let mut db = peer_game_db.write().await;
|
||||
let upsert = db.upsert_peer(peer_id.clone(), peer_addr);
|
||||
db.update_peer_library(&peer_id, library_rev, library_digest, features);
|
||||
db.apply_library_snapshot(&peer_id, snapshot);
|
||||
db.update_peer_features(&peer_id, features);
|
||||
upsert
|
||||
}
|
||||
|
||||
@@ -211,85 +192,42 @@ async fn after_peer_library_recorded(
|
||||
ctx: &HandshakeCtx,
|
||||
upsert: PeerUpsert,
|
||||
peer_addr: SocketAddr,
|
||||
remote_library_rev: u64,
|
||||
remote_library_digest: u64,
|
||||
) {
|
||||
if upsert.is_new {
|
||||
events::emit_peer_discovered(&ctx.peer_game_db, &ctx.tx_notify_ui, peer_addr).await;
|
||||
send_local_library_summary(peer_addr, ctx).await;
|
||||
}
|
||||
|
||||
send_local_library_update_if_needed(peer_addr, ctx, remote_library_rev, remote_library_digest)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn send_local_library_summary(peer_addr: SocketAddr, ctx: &HandshakeCtx) {
|
||||
let summary = {
|
||||
let library_guard = ctx.local_library.read().await;
|
||||
build_library_summary(&library_guard)
|
||||
};
|
||||
let local_peer_id = ctx.peer_id.as_ref().clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = send_library_summary(peer_addr, &local_peer_id, summary).await {
|
||||
log::warn!("Failed to send library summary to {peer_addr}: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn send_local_library_update_if_needed(
|
||||
peer_addr: SocketAddr,
|
||||
ctx: &HandshakeCtx,
|
||||
remote_rev: u64,
|
||||
remote_digest: u64,
|
||||
) {
|
||||
if let Some(update) = select_library_update(&ctx.local_library, remote_rev, remote_digest).await
|
||||
{
|
||||
let local_peer_id = ctx.peer_id.as_ref().clone();
|
||||
tokio::spawn(async move {
|
||||
let result = match update {
|
||||
LibraryUpdate::Delta(delta) => {
|
||||
send_library_delta(peer_addr, &local_peer_id, delta).await
|
||||
}
|
||||
LibraryUpdate::Snapshot(snapshot) => {
|
||||
send_library_snapshot(peer_addr, &local_peer_id, snapshot).await
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = result {
|
||||
log::warn!("Failed to send library update to {peer_addr}: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn select_library_update(
|
||||
local_library: &Arc<RwLock<LocalLibraryState>>,
|
||||
remote_rev: u64,
|
||||
remote_digest: u64,
|
||||
) -> Option<LibraryUpdate> {
|
||||
let library_guard = local_library.read().await;
|
||||
if library_guard.digest == remote_digest || remote_rev > library_guard.revision {
|
||||
return None;
|
||||
}
|
||||
|
||||
if let Some(delta) = library_guard.delta_since(remote_rev) {
|
||||
return Some(LibraryUpdate::Delta(delta));
|
||||
}
|
||||
|
||||
Some(LibraryUpdate::Snapshot(build_library_snapshot(
|
||||
&library_guard,
|
||||
)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use lanspread_proto::{Availability, GameSummary, Hello, LibrarySnapshot, PROTOCOL_VERSION};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tokio_util::{sync::CancellationToken, task::TaskTracker};
|
||||
|
||||
use super::{HandshakeCtx, build_hello_from_state};
|
||||
use crate::{library::LocalLibraryState, peer_db::PeerGameDB};
|
||||
use super::{HandshakeCtx, accept_inbound_hello, build_hello_from_state};
|
||||
use crate::{
|
||||
PeerEvent,
|
||||
UnpackFuture,
|
||||
Unpacker,
|
||||
context::Ctx,
|
||||
library::LocalLibraryState,
|
||||
peer_db::PeerGameDB,
|
||||
};
|
||||
|
||||
struct NoopUnpacker;
|
||||
|
||||
impl Unpacker for NoopUnpacker {
|
||||
fn unpack<'a>(&'a self, _archive: &'a Path, _dest: &'a Path) -> UnpackFuture<'a> {
|
||||
Box::pin(async { Ok(()) })
|
||||
}
|
||||
}
|
||||
|
||||
fn addr(ip: [u8; 4], port: u16) -> SocketAddr {
|
||||
SocketAddr::from((ip, port))
|
||||
@@ -307,6 +245,19 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn summary(id: &str) -> GameSummary {
|
||||
GameSummary {
|
||||
id: id.to_string(),
|
||||
name: id.to_string(),
|
||||
size: 42,
|
||||
downloaded: true,
|
||||
installed: true,
|
||||
eti_version: Some("20250101".to_string()),
|
||||
manifest_hash: 7,
|
||||
availability: Availability::Ready,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn outbound_hello_requires_local_listener_addr() {
|
||||
let ctx = test_handshake_ctx(None);
|
||||
@@ -329,4 +280,86 @@ mod tests {
|
||||
|
||||
assert_eq!(hello.listen_addr, advertised);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn outbound_hello_carries_local_library_snapshot() {
|
||||
let ctx = test_handshake_ctx(Some(addr([10, 66, 0, 2], 40000)));
|
||||
ctx.local_library
|
||||
.write()
|
||||
.await
|
||||
.update_from_scan(HashMap::from([("game".to_string(), summary("game"))]), 7);
|
||||
|
||||
let hello = build_hello_from_state(&ctx)
|
||||
.await
|
||||
.expect("listener address is present");
|
||||
|
||||
assert_eq!(hello.library.library_rev, 7);
|
||||
assert_eq!(hello.library.games.len(), 1);
|
||||
assert_eq!(hello.library.games[0].id, "game");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn inbound_hello_applies_remote_library_snapshot() {
|
||||
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
|
||||
let ctx = Ctx::new(
|
||||
peer_game_db.clone(),
|
||||
"local-peer".to_string(),
|
||||
PathBuf::new(),
|
||||
Arc::new(NoopUnpacker),
|
||||
CancellationToken::new(),
|
||||
TaskTracker::new(),
|
||||
Arc::new(RwLock::new(HashSet::new())),
|
||||
);
|
||||
*ctx.local_peer_addr.write().await = Some(addr([127, 0, 0, 1], 4000));
|
||||
|
||||
let (tx_notify_ui, mut rx_notify_ui) = mpsc::unbounded_channel();
|
||||
let peer_ctx = ctx.to_peer_ctx(tx_notify_ui);
|
||||
let remote_addr = addr([127, 0, 0, 1], 5000);
|
||||
let hello = Hello {
|
||||
peer_id: "remote-peer".to_string(),
|
||||
proto_ver: PROTOCOL_VERSION,
|
||||
listen_addr: remote_addr,
|
||||
library: LibrarySnapshot {
|
||||
library_rev: 3,
|
||||
games: vec![summary("remote-game")],
|
||||
},
|
||||
features: Vec::new(),
|
||||
};
|
||||
|
||||
let ack = accept_inbound_hello(&peer_ctx, None, hello)
|
||||
.await
|
||||
.expect("current protocol hello should be accepted");
|
||||
|
||||
assert_eq!(ack.peer_id, "local-peer");
|
||||
let snapshots = peer_game_db.read().await.peer_snapshots();
|
||||
assert_eq!(snapshots.len(), 1);
|
||||
assert_eq!(snapshots[0].addr, remote_addr);
|
||||
assert_eq!(snapshots[0].game_count, 1);
|
||||
assert_eq!(snapshots[0].games[0].id, "remote-game");
|
||||
|
||||
assert!(matches!(
|
||||
rx_notify_ui
|
||||
.recv()
|
||||
.await
|
||||
.expect("peer discovery event should be emitted"),
|
||||
PeerEvent::PeerDiscovered(addr) if addr == remote_addr
|
||||
));
|
||||
assert!(matches!(
|
||||
rx_notify_ui
|
||||
.recv()
|
||||
.await
|
||||
.expect("peer count event should be emitted"),
|
||||
PeerEvent::PeerCountUpdated(1)
|
||||
));
|
||||
let PeerEvent::ListGames(games) = rx_notify_ui
|
||||
.recv()
|
||||
.await
|
||||
.expect("peer game list should be emitted")
|
||||
else {
|
||||
panic!("expected ListGames");
|
||||
};
|
||||
assert_eq!(games.len(), 1);
|
||||
assert_eq!(games[0].id, "remote-game");
|
||||
assert_eq!(games[0].peer_count, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::net::SocketAddr;
|
||||
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use lanspread_db::db::{Game, GameFileDescription};
|
||||
use lanspread_proto::{LibraryDelta, LibrarySnapshot, LibrarySummary, Message, Request, Response};
|
||||
use lanspread_proto::{LibraryDelta, Message, Request, Response};
|
||||
use s2n_quic::stream::{BidirectionalStream, SendStream};
|
||||
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
|
||||
|
||||
@@ -14,12 +14,7 @@ use crate::{
|
||||
events,
|
||||
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available},
|
||||
peer::{send_game_file_chunk, send_game_file_data},
|
||||
services::handshake::{
|
||||
HandshakeCtx,
|
||||
accept_inbound_hello,
|
||||
perform_handshake_with_peer,
|
||||
spawn_library_resync,
|
||||
},
|
||||
services::handshake::{HandshakeCtx, accept_inbound_hello, spawn_library_resync},
|
||||
};
|
||||
|
||||
type ResponseWriter = FramedWrite<SendStream, LengthDelimitedCodec>;
|
||||
@@ -90,14 +85,6 @@ async fn dispatch_request(
|
||||
}
|
||||
},
|
||||
Request::ListGames => handle_list_games(ctx, framed_tx).await,
|
||||
Request::LibrarySummary { peer_id, summary } => {
|
||||
handle_library_summary(ctx, peer_id, summary).await;
|
||||
framed_tx
|
||||
}
|
||||
Request::LibrarySnapshot { peer_id, snapshot } => {
|
||||
handle_library_snapshot(ctx, peer_id, snapshot).await;
|
||||
framed_tx
|
||||
}
|
||||
Request::LibraryDelta { peer_id, delta } => {
|
||||
handle_library_delta(ctx, peer_id, delta).await;
|
||||
framed_tx
|
||||
@@ -168,58 +155,6 @@ async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> Response
|
||||
send_response(framed_tx, Response::ListGames(games), "ListGames").await
|
||||
}
|
||||
|
||||
async fn handle_library_summary(ctx: &PeerCtx, peer_id: String, summary: LibrarySummary) {
|
||||
let (addr, previous_digest, features) = {
|
||||
let db = ctx.peer_game_db.read().await;
|
||||
let Some(addr) = db.peer_addr(&peer_id) else {
|
||||
log::debug!("Ignoring library summary from unknown peer {peer_id}");
|
||||
return;
|
||||
};
|
||||
let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0));
|
||||
(addr, digest, db.peer_features(&peer_id))
|
||||
};
|
||||
|
||||
{
|
||||
let mut db = ctx.peer_game_db.write().await;
|
||||
db.update_peer_library(
|
||||
&peer_id,
|
||||
summary.library_rev,
|
||||
summary.library_digest,
|
||||
features,
|
||||
);
|
||||
}
|
||||
|
||||
if summary.library_digest != previous_digest {
|
||||
ctx.task_tracker.spawn({
|
||||
let handshake_ctx = HandshakeCtx::from_peer_ctx(ctx);
|
||||
async move {
|
||||
if let Err(err) =
|
||||
perform_handshake_with_peer(handshake_ctx, addr, Some(peer_id)).await
|
||||
{
|
||||
log::warn!("Failed to refresh library from {addr}: {err}");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_library_snapshot(ctx: &PeerCtx, peer_id: String, snapshot: LibrarySnapshot) {
|
||||
let applied = {
|
||||
let mut db = ctx.peer_game_db.write().await;
|
||||
if db.peer_addr(&peer_id).is_some() {
|
||||
db.apply_library_snapshot(&peer_id, snapshot);
|
||||
true
|
||||
} else {
|
||||
log::debug!("Ignoring library snapshot from unknown peer {peer_id}");
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if applied {
|
||||
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelta) {
|
||||
let applied = {
|
||||
let mut db = ctx.peer_game_db.write().await;
|
||||
|
||||
@@ -4,7 +4,7 @@ use bytes::Bytes;
|
||||
use lanspread_db::db::{Game, GameFileDescription};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub const PROTOCOL_VERSION: u32 = 3;
|
||||
pub const PROTOCOL_VERSION: u32 = 4;
|
||||
|
||||
pub use lanspread_db::db::Availability;
|
||||
|
||||
@@ -25,8 +25,7 @@ pub struct Hello {
|
||||
pub peer_id: String,
|
||||
pub proto_ver: u32,
|
||||
pub listen_addr: SocketAddr,
|
||||
pub library_rev: u64,
|
||||
pub library_digest: u64,
|
||||
pub library: LibrarySnapshot,
|
||||
pub features: Vec<String>,
|
||||
}
|
||||
|
||||
@@ -35,18 +34,10 @@ pub struct HelloAck {
|
||||
pub peer_id: String,
|
||||
pub proto_ver: u32,
|
||||
pub listen_addr: SocketAddr,
|
||||
pub library_rev: u64,
|
||||
pub library_digest: u64,
|
||||
pub library: LibrarySnapshot,
|
||||
pub features: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct LibrarySummary {
|
||||
pub library_rev: u64,
|
||||
pub library_digest: u64,
|
||||
pub game_count: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct LibrarySnapshot {
|
||||
pub library_rev: u64,
|
||||
@@ -77,14 +68,6 @@ pub enum Request {
|
||||
length: u64,
|
||||
},
|
||||
Hello(Hello),
|
||||
LibrarySummary {
|
||||
peer_id: String,
|
||||
summary: LibrarySummary,
|
||||
},
|
||||
LibrarySnapshot {
|
||||
peer_id: String,
|
||||
snapshot: LibrarySnapshot,
|
||||
},
|
||||
LibraryDelta {
|
||||
peer_id: String,
|
||||
delta: LibraryDelta,
|
||||
|
||||
Reference in New Issue
Block a user