fix(peer): record listener addresses during handshakes
Peers discovered over mDNS could still attribute later library sync traffic to temporary QUIC source ports. In a real GUI LAN run this made Host B try to push its library to Host A's outbound port instead of Host A's advertised listener, so Host A discovered the peer but never saw its games. Carry the stable listener address in Hello and HelloAck, and key library sync messages by peer_id instead of inferring identity from the transport source address. The handshake path now explicitly refreshes an empty peer library from the known listener address, matching the reliability of the direct-connect CLI path without overwriting richer snapshot state when it already arrived. This changes the current wire protocol, so PROTOCOL_VERSION is bumped to 3 and all peers must be rebuilt together. The architecture note now documents that listener addresses come from mDNS or Hello/HelloAck, never from ephemeral QUIC source ports. Test Plan: - just fmt - just test - just clippy - just build - git diff --check Refs: Local Linux/Win11 GUI LAN test logs from 2026-05-18.
This commit is contained in:
@@ -27,7 +27,11 @@ chunked file transfers.
|
|||||||
When a peer is discovered:
|
When a peer is discovered:
|
||||||
|
|
||||||
1. Connect and send `Hello { peer_id, proto_ver, library_rev, library_digest, features }`.
|
1. Connect and send `Hello { peer_id, proto_ver, library_rev, library_digest, features }`.
|
||||||
2. Receive `HelloAck { peer_id, proto_ver, library_rev, library_digest, features }`.
|
`Hello` also carries the sender's advertised `listen_addr`; the QUIC source
|
||||||
|
port is only a temporary transport port and must not be recorded as the
|
||||||
|
peer's listener.
|
||||||
|
2. Receive `HelloAck { peer_id, proto_ver, listen_addr, library_rev,
|
||||||
|
library_digest, features }`.
|
||||||
3. If the remote `peer_id` is already known but the address changed, update it.
|
3. If the remote `peer_id` is already known but the address changed, update it.
|
||||||
4. If protocol versions are incompatible, drop the peer (and keep mDNS watching).
|
4. If protocol versions are incompatible, drop the peer (and keep mDNS watching).
|
||||||
5. If library digests match, do nothing else.
|
5. If library digests match, do nothing else.
|
||||||
@@ -51,12 +55,12 @@ When a peer is discovered:
|
|||||||
|
|
||||||
### Summary and snapshot
|
### Summary and snapshot
|
||||||
|
|
||||||
- `LibrarySummary { library_rev, library_digest, game_count }`
|
- `LibrarySummary { peer_id, summary: { library_rev, library_digest, game_count } }`
|
||||||
- `LibrarySnapshot { library_rev, games: Vec<GameSummary> }`
|
- `LibrarySnapshot { peer_id, snapshot: { library_rev, games: Vec<GameSummary> } }`
|
||||||
|
|
||||||
### Delta updates
|
### Delta updates
|
||||||
|
|
||||||
- `LibraryDelta { from_rev, to_rev, added, updated, removed }`
|
- `LibraryDelta { peer_id, delta: { from_rev, to_rev, added, updated, removed } }`
|
||||||
- `removed` is a list of game IDs.
|
- `removed` is a list of game IDs.
|
||||||
- Deltas are idempotent; ignore if `to_rev` <= known rev.
|
- Deltas are idempotent; ignore if `to_rev` <= known rev.
|
||||||
|
|
||||||
@@ -142,6 +146,8 @@ Most scans become O(number of game dirs), with full recursion only when needed.
|
|||||||
## Fault tolerance rules
|
## Fault tolerance rules
|
||||||
|
|
||||||
- Every peer is keyed by `peer_id`, not by IP address.
|
- Every peer is keyed by `peer_id`, not by IP address.
|
||||||
|
- Peer addresses are listener addresses from mDNS or `Hello`/`HelloAck`, never
|
||||||
|
ephemeral QUIC source ports.
|
||||||
- `library_rev` is monotonic and guards against out-of-order updates.
|
- `library_rev` is monotonic and guards against out-of-order updates.
|
||||||
- Any mismatch or missing delta falls back to `LibrarySnapshot`.
|
- Any mismatch or missing delta falls back to `LibrarySnapshot`.
|
||||||
- Loss of goodbye is harmless; stale timeout is authoritative.
|
- Loss of goodbye is harmless; stale timeout is authoritative.
|
||||||
@@ -153,8 +159,10 @@ Most scans become O(number of game dirs), with full recursion only when needed.
|
|||||||
`LibraryDelta`, and optional `Goodbye` messages.
|
`LibraryDelta`, and optional `Goodbye` messages.
|
||||||
- Thread `peer_id`, `library_rev`, and `manifest_hash` through all
|
- Thread `peer_id`, `library_rev`, and `manifest_hash` through all
|
||||||
library and manifest-bearing types.
|
library and manifest-bearing types.
|
||||||
- Make `HelloAck` carry the remote `library_rev` and `manifest_hash`
|
- Make `Hello` and `HelloAck` carry the sender's `listen_addr`,
|
||||||
so the client can immediately select `LibraryDelta` vs `LibrarySnapshot`.
|
`library_rev`, and `library_digest` so both sides can record stable
|
||||||
|
listener addresses and immediately select `LibraryDelta` vs
|
||||||
|
`LibrarySnapshot`.
|
||||||
2. Peer identity:
|
2. Peer identity:
|
||||||
- Persist a stable `peer_id` (UUID) in the peer config and inject it into
|
- Persist a stable `peer_id` (UUID) in the peer config and inject it into
|
||||||
`PeerInfo` and `PeerGameDB` at startup.
|
`PeerInfo` and `PeerGameDB` at startup.
|
||||||
|
|||||||
@@ -8,8 +8,7 @@ use std::{
|
|||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use lanspread_db::db::{Game, GameDB, GameFileDescription};
|
use lanspread_db::db::{GameDB, GameFileDescription};
|
||||||
use lanspread_proto::GameSummary;
|
|
||||||
use tokio::sync::{RwLock, mpsc::UnboundedSender};
|
use tokio::sync::{RwLock, mpsc::UnboundedSender};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -31,7 +30,7 @@ use crate::{
|
|||||||
scan_local_library,
|
scan_local_library,
|
||||||
version_ini_is_regular_file,
|
version_ini_is_regular_file,
|
||||||
},
|
},
|
||||||
network::{request_game_details_from_peer, request_game_list_from_peer, send_library_delta},
|
network::{request_game_details_from_peer, send_library_delta},
|
||||||
peer_db::PeerGameDB,
|
peer_db::PeerGameDB,
|
||||||
remote_peer::ensure_peer_id_for_addr,
|
remote_peer::ensure_peer_id_for_addr,
|
||||||
services::perform_handshake_with_peer,
|
services::perform_handshake_with_peer,
|
||||||
@@ -706,6 +705,7 @@ pub async fn handle_connect_peer_command(
|
|||||||
) {
|
) {
|
||||||
log::info!("Direct connect command received for {addr}");
|
log::info!("Direct connect command received for {addr}");
|
||||||
let peer_id = ctx.peer_id.clone();
|
let peer_id = ctx.peer_id.clone();
|
||||||
|
let local_peer_addr = ctx.local_peer_addr.clone();
|
||||||
let local_library = ctx.local_library.clone();
|
let local_library = ctx.local_library.clone();
|
||||||
let peer_game_db = ctx.peer_game_db.clone();
|
let peer_game_db = ctx.peer_game_db.clone();
|
||||||
let tx_notify_ui = tx_notify_ui.clone();
|
let tx_notify_ui = tx_notify_ui.clone();
|
||||||
@@ -713,54 +713,20 @@ pub async fn handle_connect_peer_command(
|
|||||||
ctx.task_tracker.spawn(async move {
|
ctx.task_tracker.spawn(async move {
|
||||||
if let Err(err) = perform_handshake_with_peer(
|
if let Err(err) = perform_handshake_with_peer(
|
||||||
peer_id,
|
peer_id,
|
||||||
|
local_peer_addr,
|
||||||
local_library,
|
local_library,
|
||||||
peer_game_db.clone(),
|
peer_game_db,
|
||||||
tx_notify_ui.clone(),
|
tx_notify_ui,
|
||||||
addr,
|
addr,
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
log::warn!("Failed direct connect to {addr}: {err}");
|
log::warn!("Failed direct connect to {addr}: {err}");
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(err) = refresh_direct_peer_games(&peer_game_db, &tx_notify_ui, addr).await {
|
|
||||||
log::warn!("Failed to refresh direct peer games from {addr}: {err}");
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn refresh_direct_peer_games(
|
|
||||||
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
|
||||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
|
||||||
addr: SocketAddr,
|
|
||||||
) -> eyre::Result<()> {
|
|
||||||
let games = request_game_list_from_peer(addr).await?;
|
|
||||||
let summaries = games.into_iter().map(game_to_summary).collect::<Vec<_>>();
|
|
||||||
let peer_id = ensure_peer_id_for_addr(peer_game_db, addr).await;
|
|
||||||
{
|
|
||||||
let mut db = peer_game_db.write().await;
|
|
||||||
db.update_peer_games(&peer_id, summaries);
|
|
||||||
}
|
|
||||||
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn game_to_summary(game: Game) -> GameSummary {
|
|
||||||
let availability = game.normalized_availability();
|
|
||||||
GameSummary {
|
|
||||||
id: game.id,
|
|
||||||
name: game.name,
|
|
||||||
size: game.size,
|
|
||||||
downloaded: game.downloaded,
|
|
||||||
installed: game.installed,
|
|
||||||
eti_version: game.eti_game_version,
|
|
||||||
manifest_hash: 0,
|
|
||||||
availability,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
// Game announcement helpers
|
// Game announcement helpers
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
@@ -823,8 +789,9 @@ pub async fn update_and_announce_games(
|
|||||||
|
|
||||||
for peer_addr in peer_targets {
|
for peer_addr in peer_targets {
|
||||||
let delta = delta.clone();
|
let delta = delta.clone();
|
||||||
|
let peer_id = ctx.peer_id.as_ref().clone();
|
||||||
ctx.task_tracker.spawn(async move {
|
ctx.task_tracker.spawn(async move {
|
||||||
if let Err(e) = send_library_delta(peer_addr, delta).await {
|
if let Err(e) = send_library_delta(peer_addr, &peer_id, delta).await {
|
||||||
log::warn!("Failed to send library delta to {peer_addr}: {e}");
|
log::warn!("Failed to send library delta to {peer_addr}: {e}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -125,20 +125,47 @@ pub async fn exchange_hello(peer_addr: SocketAddr, hello: Hello) -> eyre::Result
|
|||||||
|
|
||||||
pub async fn send_library_summary(
|
pub async fn send_library_summary(
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
|
peer_id: &str,
|
||||||
summary: LibrarySummary,
|
summary: LibrarySummary,
|
||||||
) -> eyre::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
send_oneway_request(peer_addr, Request::LibrarySummary(summary)).await
|
send_oneway_request(
|
||||||
|
peer_addr,
|
||||||
|
Request::LibrarySummary {
|
||||||
|
peer_id: peer_id.to_string(),
|
||||||
|
summary,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_library_snapshot(
|
pub async fn send_library_snapshot(
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
|
peer_id: &str,
|
||||||
snapshot: LibrarySnapshot,
|
snapshot: LibrarySnapshot,
|
||||||
) -> eyre::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
send_oneway_request(peer_addr, Request::LibrarySnapshot(snapshot)).await
|
send_oneway_request(
|
||||||
|
peer_addr,
|
||||||
|
Request::LibrarySnapshot {
|
||||||
|
peer_id: peer_id.to_string(),
|
||||||
|
snapshot,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_library_delta(peer_addr: SocketAddr, delta: LibraryDelta) -> eyre::Result<()> {
|
pub async fn send_library_delta(
|
||||||
send_oneway_request(peer_addr, Request::LibraryDelta(delta)).await
|
peer_addr: SocketAddr,
|
||||||
|
peer_id: &str,
|
||||||
|
delta: LibraryDelta,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
send_oneway_request(
|
||||||
|
peer_addr,
|
||||||
|
Request::LibraryDelta {
|
||||||
|
peer_id: peer_id.to_string(),
|
||||||
|
delta,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_goodbye(peer_addr: SocketAddr, peer_id: String) -> eyre::Result<()> {
|
pub async fn send_goodbye(peer_addr: SocketAddr, peer_id: String) -> eyre::Result<()> {
|
||||||
|
|||||||
@@ -2,9 +2,15 @@
|
|||||||
|
|
||||||
use std::{net::SocketAddr, sync::Arc};
|
use std::{net::SocketAddr, sync::Arc};
|
||||||
|
|
||||||
|
use lanspread_db::db::Game;
|
||||||
|
use lanspread_proto::GameSummary;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::peer_db::{PeerGameDB, PeerId};
|
use crate::{
|
||||||
|
events,
|
||||||
|
network::request_game_list_from_peer,
|
||||||
|
peer_db::{PeerGameDB, PeerId},
|
||||||
|
};
|
||||||
|
|
||||||
pub async fn ensure_peer_id_for_addr(
|
pub async fn ensure_peer_id_for_addr(
|
||||||
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
||||||
@@ -19,3 +25,37 @@ pub async fn ensure_peer_id_for_addr(
|
|||||||
db.upsert_peer(addr_id.clone(), peer_addr);
|
db.upsert_peer(addr_id.clone(), peer_addr);
|
||||||
addr_id
|
addr_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn refresh_peer_games(
|
||||||
|
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
||||||
|
tx_notify_ui: &tokio::sync::mpsc::UnboundedSender<crate::PeerEvent>,
|
||||||
|
peer_addr: SocketAddr,
|
||||||
|
peer_id: &PeerId,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
if peer_game_db.read().await.peer_game_count(peer_id) > 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let games = request_game_list_from_peer(peer_addr).await?;
|
||||||
|
let summaries = games.into_iter().map(game_to_summary).collect::<Vec<_>>();
|
||||||
|
{
|
||||||
|
let mut db = peer_game_db.write().await;
|
||||||
|
db.update_peer_games(peer_id, summaries);
|
||||||
|
}
|
||||||
|
events::emit_peer_game_list(peer_game_db, tx_notify_ui).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn game_to_summary(game: Game) -> GameSummary {
|
||||||
|
let availability = game.normalized_availability();
|
||||||
|
GameSummary {
|
||||||
|
id: game.id,
|
||||||
|
name: game.name,
|
||||||
|
size: game.size,
|
||||||
|
downloaded: game.downloaded,
|
||||||
|
installed: game.installed,
|
||||||
|
eti_version: game.eti_game_version,
|
||||||
|
manifest_hash: 0,
|
||||||
|
availability,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -173,12 +173,14 @@ fn spawn_protocol_negotiation(
|
|||||||
) {
|
) {
|
||||||
let peer_addr = info.addr;
|
let peer_addr = info.addr;
|
||||||
let peer_id_arc = ctx.peer_id.clone();
|
let peer_id_arc = ctx.peer_id.clone();
|
||||||
|
let local_peer_addr = ctx.local_peer_addr.clone();
|
||||||
let local_library = ctx.local_library.clone();
|
let local_library = ctx.local_library.clone();
|
||||||
let peer_game_db = ctx.peer_game_db.clone();
|
let peer_game_db = ctx.peer_game_db.clone();
|
||||||
|
|
||||||
ctx.task_tracker.spawn(async move {
|
ctx.task_tracker.spawn(async move {
|
||||||
if let Err(err) = perform_handshake_with_peer(
|
if let Err(err) = perform_handshake_with_peer(
|
||||||
peer_id_arc,
|
peer_id_arc,
|
||||||
|
local_peer_addr,
|
||||||
local_library,
|
local_library,
|
||||||
peer_game_db,
|
peer_game_db,
|
||||||
tx_notify_ui,
|
tx_notify_ui,
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ use crate::{
|
|||||||
library::{LocalLibraryState, build_library_snapshot, build_library_summary},
|
library::{LocalLibraryState, build_library_snapshot, build_library_summary},
|
||||||
network::{exchange_hello, send_library_delta, send_library_snapshot, send_library_summary},
|
network::{exchange_hello, send_library_delta, send_library_snapshot, send_library_summary},
|
||||||
peer_db::{PeerGameDB, PeerId, PeerUpsert},
|
peer_db::{PeerGameDB, PeerId, PeerUpsert},
|
||||||
|
remote_peer::refresh_peer_games,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum LibraryUpdate {
|
enum LibraryUpdate {
|
||||||
@@ -22,9 +23,11 @@ enum LibraryUpdate {
|
|||||||
|
|
||||||
pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> HelloAck {
|
pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> HelloAck {
|
||||||
let library_guard = ctx.local_library.read().await;
|
let library_guard = ctx.local_library.read().await;
|
||||||
|
let listen_addr = *ctx.local_peer_addr.read().await;
|
||||||
HelloAck {
|
HelloAck {
|
||||||
peer_id: ctx.peer_id.as_ref().clone(),
|
peer_id: ctx.peer_id.as_ref().clone(),
|
||||||
proto_ver: PROTOCOL_VERSION,
|
proto_ver: PROTOCOL_VERSION,
|
||||||
|
listen_addr,
|
||||||
library_rev: library_guard.revision,
|
library_rev: library_guard.revision,
|
||||||
library_digest: library_guard.digest,
|
library_digest: library_guard.digest,
|
||||||
features: default_features(),
|
features: default_features(),
|
||||||
@@ -33,12 +36,15 @@ pub(super) async fn build_hello_ack(ctx: &PeerCtx) -> HelloAck {
|
|||||||
|
|
||||||
async fn build_hello_from_state(
|
async fn build_hello_from_state(
|
||||||
peer_id: &str,
|
peer_id: &str,
|
||||||
|
local_peer_addr: &Arc<RwLock<Option<SocketAddr>>>,
|
||||||
local_library: &Arc<RwLock<LocalLibraryState>>,
|
local_library: &Arc<RwLock<LocalLibraryState>>,
|
||||||
) -> Hello {
|
) -> Hello {
|
||||||
let library_guard = local_library.read().await;
|
let library_guard = local_library.read().await;
|
||||||
|
let listen_addr = *local_peer_addr.read().await;
|
||||||
Hello {
|
Hello {
|
||||||
peer_id: peer_id.to_string(),
|
peer_id: peer_id.to_string(),
|
||||||
proto_ver: PROTOCOL_VERSION,
|
proto_ver: PROTOCOL_VERSION,
|
||||||
|
listen_addr,
|
||||||
library_rev: library_guard.revision,
|
library_rev: library_guard.revision,
|
||||||
library_digest: library_guard.digest,
|
library_digest: library_guard.digest,
|
||||||
features: default_features(),
|
features: default_features(),
|
||||||
@@ -47,13 +53,14 @@ async fn build_hello_from_state(
|
|||||||
|
|
||||||
pub(crate) async fn perform_handshake_with_peer(
|
pub(crate) async fn perform_handshake_with_peer(
|
||||||
peer_id: Arc<String>,
|
peer_id: Arc<String>,
|
||||||
|
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
|
||||||
local_library: Arc<RwLock<LocalLibraryState>>,
|
local_library: Arc<RwLock<LocalLibraryState>>,
|
||||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
peer_id_hint: Option<PeerId>,
|
peer_id_hint: Option<PeerId>,
|
||||||
) -> eyre::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
let hello = build_hello_from_state(peer_id.as_ref(), &local_library).await;
|
let hello = build_hello_from_state(peer_id.as_ref(), &local_peer_addr, &local_library).await;
|
||||||
let ack = exchange_hello(peer_addr, hello).await?;
|
let ack = exchange_hello(peer_addr, hello).await?;
|
||||||
|
|
||||||
if ack.proto_ver != PROTOCOL_VERSION {
|
if ack.proto_ver != PROTOCOL_VERSION {
|
||||||
@@ -79,10 +86,11 @@ pub(crate) async fn perform_handshake_with_peer(
|
|||||||
let _ = peer_game_db.write().await.remove_peer(expected);
|
let _ = peer_game_db.write().await.remove_peer(expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let record_addr = ack.listen_addr.unwrap_or(peer_addr);
|
||||||
let upsert = record_remote_library(
|
let upsert = record_remote_library(
|
||||||
&peer_game_db,
|
&peer_game_db,
|
||||||
ack.peer_id.clone(),
|
ack.peer_id.clone(),
|
||||||
peer_addr,
|
record_addr,
|
||||||
ack.library_rev,
|
ack.library_rev,
|
||||||
ack.library_digest,
|
ack.library_digest,
|
||||||
ack.features.clone(),
|
ack.features.clone(),
|
||||||
@@ -91,7 +99,8 @@ pub(crate) async fn perform_handshake_with_peer(
|
|||||||
|
|
||||||
after_peer_library_recorded(
|
after_peer_library_recorded(
|
||||||
upsert,
|
upsert,
|
||||||
peer_addr,
|
record_addr,
|
||||||
|
peer_id.as_ref(),
|
||||||
ack.library_rev,
|
ack.library_rev,
|
||||||
ack.library_digest,
|
ack.library_digest,
|
||||||
&local_library,
|
&local_library,
|
||||||
@@ -100,6 +109,12 @@ pub(crate) async fn perform_handshake_with_peer(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
if let Err(err) =
|
||||||
|
refresh_peer_games(&peer_game_db, &tx_notify_ui, record_addr, &ack.peer_id).await
|
||||||
|
{
|
||||||
|
log::warn!("Failed to refresh peer games from {record_addr}: {err}");
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,7 +136,8 @@ pub(super) async fn accept_inbound_hello(
|
|||||||
return build_hello_ack(ctx).await;
|
return build_hello_ack(ctx).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(addr) = peer_record_addr(&ctx.peer_game_db, &hello.peer_id, remote_addr).await {
|
if let Some(addr) = peer_record_addr(&ctx.peer_game_db, &hello.peer_id, hello.listen_addr).await
|
||||||
|
{
|
||||||
let upsert = record_remote_library(
|
let upsert = record_remote_library(
|
||||||
&ctx.peer_game_db,
|
&ctx.peer_game_db,
|
||||||
hello.peer_id.clone(),
|
hello.peer_id.clone(),
|
||||||
@@ -135,6 +151,7 @@ pub(super) async fn accept_inbound_hello(
|
|||||||
after_peer_library_recorded(
|
after_peer_library_recorded(
|
||||||
upsert,
|
upsert,
|
||||||
addr,
|
addr,
|
||||||
|
ctx.peer_id.as_ref(),
|
||||||
hello.library_rev,
|
hello.library_rev,
|
||||||
hello.library_digest,
|
hello.library_digest,
|
||||||
&ctx.local_library,
|
&ctx.local_library,
|
||||||
@@ -142,6 +159,13 @@ pub(super) async fn accept_inbound_hello(
|
|||||||
&ctx.tx_notify_ui,
|
&ctx.tx_notify_ui,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
spawn_peer_game_refresh(ctx, addr, hello.peer_id.clone());
|
||||||
|
} else {
|
||||||
|
log::debug!(
|
||||||
|
"Ignoring inbound hello from {} without a known listener address",
|
||||||
|
hello.peer_id
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
build_hello_ack(ctx).await
|
build_hello_ack(ctx).await
|
||||||
@@ -150,15 +174,28 @@ pub(super) async fn accept_inbound_hello(
|
|||||||
async fn peer_record_addr(
|
async fn peer_record_addr(
|
||||||
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
peer_game_db: &Arc<RwLock<PeerGameDB>>,
|
||||||
peer_id: &PeerId,
|
peer_id: &PeerId,
|
||||||
remote_addr: Option<SocketAddr>,
|
listen_addr: Option<SocketAddr>,
|
||||||
) -> Option<SocketAddr> {
|
) -> Option<SocketAddr> {
|
||||||
let remote_addr = remote_addr?;
|
|
||||||
let db = peer_game_db.read().await;
|
let db = peer_game_db.read().await;
|
||||||
Some(db.peer_addr(peer_id).unwrap_or(remote_addr))
|
listen_addr.or_else(|| db.peer_addr(peer_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn spawn_peer_game_refresh(ctx: &PeerCtx, peer_addr: SocketAddr, peer_id: PeerId) {
|
||||||
|
let peer_game_db = ctx.peer_game_db.clone();
|
||||||
|
let tx_notify_ui = ctx.tx_notify_ui.clone();
|
||||||
|
ctx.task_tracker.spawn(async move {
|
||||||
|
if let Err(err) =
|
||||||
|
refresh_peer_games(&peer_game_db, &tx_notify_ui, peer_addr, &peer_id).await
|
||||||
|
{
|
||||||
|
log::warn!("Failed to refresh peer games from {peer_addr}: {err}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub(super) fn spawn_library_resync(
|
pub(super) fn spawn_library_resync(
|
||||||
peer_id: Arc<String>,
|
peer_id: Arc<String>,
|
||||||
|
local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
|
||||||
local_library: Arc<RwLock<LocalLibraryState>>,
|
local_library: Arc<RwLock<LocalLibraryState>>,
|
||||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
@@ -169,6 +206,7 @@ pub(super) fn spawn_library_resync(
|
|||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(err) = perform_handshake_with_peer(
|
if let Err(err) = perform_handshake_with_peer(
|
||||||
peer_id,
|
peer_id,
|
||||||
|
local_peer_addr,
|
||||||
local_library,
|
local_library,
|
||||||
peer_game_db,
|
peer_game_db,
|
||||||
tx_notify_ui,
|
tx_notify_ui,
|
||||||
@@ -196,9 +234,11 @@ async fn record_remote_library(
|
|||||||
upsert
|
upsert
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn after_peer_library_recorded(
|
async fn after_peer_library_recorded(
|
||||||
upsert: PeerUpsert,
|
upsert: PeerUpsert,
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
|
local_peer_id: &str,
|
||||||
remote_library_rev: u64,
|
remote_library_rev: u64,
|
||||||
remote_library_digest: u64,
|
remote_library_digest: u64,
|
||||||
local_library: &Arc<RwLock<LocalLibraryState>>,
|
local_library: &Arc<RwLock<LocalLibraryState>>,
|
||||||
@@ -207,11 +247,12 @@ async fn after_peer_library_recorded(
|
|||||||
) {
|
) {
|
||||||
if upsert.is_new {
|
if upsert.is_new {
|
||||||
events::emit_peer_discovered(peer_game_db, tx_notify_ui, peer_addr).await;
|
events::emit_peer_discovered(peer_game_db, tx_notify_ui, peer_addr).await;
|
||||||
send_local_library_summary(peer_addr, local_library).await;
|
send_local_library_summary(peer_addr, local_peer_id, local_library).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
send_local_library_update_if_needed(
|
send_local_library_update_if_needed(
|
||||||
peer_addr,
|
peer_addr,
|
||||||
|
local_peer_id,
|
||||||
local_library,
|
local_library,
|
||||||
remote_library_rev,
|
remote_library_rev,
|
||||||
remote_library_digest,
|
remote_library_digest,
|
||||||
@@ -221,15 +262,17 @@ async fn after_peer_library_recorded(
|
|||||||
|
|
||||||
async fn send_local_library_summary(
|
async fn send_local_library_summary(
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
|
local_peer_id: &str,
|
||||||
local_library: &Arc<RwLock<LocalLibraryState>>,
|
local_library: &Arc<RwLock<LocalLibraryState>>,
|
||||||
) {
|
) {
|
||||||
let summary = {
|
let summary = {
|
||||||
let library_guard = local_library.read().await;
|
let library_guard = local_library.read().await;
|
||||||
build_library_summary(&library_guard)
|
build_library_summary(&library_guard)
|
||||||
};
|
};
|
||||||
|
let local_peer_id = local_peer_id.to_string();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(err) = send_library_summary(peer_addr, summary).await {
|
if let Err(err) = send_library_summary(peer_addr, &local_peer_id, summary).await {
|
||||||
log::warn!("Failed to send library summary to {peer_addr}: {err}");
|
log::warn!("Failed to send library summary to {peer_addr}: {err}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -237,16 +280,20 @@ async fn send_local_library_summary(
|
|||||||
|
|
||||||
async fn send_local_library_update_if_needed(
|
async fn send_local_library_update_if_needed(
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
|
local_peer_id: &str,
|
||||||
local_library: &Arc<RwLock<LocalLibraryState>>,
|
local_library: &Arc<RwLock<LocalLibraryState>>,
|
||||||
remote_rev: u64,
|
remote_rev: u64,
|
||||||
remote_digest: u64,
|
remote_digest: u64,
|
||||||
) {
|
) {
|
||||||
if let Some(update) = select_library_update(local_library, remote_rev, remote_digest).await {
|
if let Some(update) = select_library_update(local_library, remote_rev, remote_digest).await {
|
||||||
|
let local_peer_id = local_peer_id.to_string();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let result = match update {
|
let result = match update {
|
||||||
LibraryUpdate::Delta(delta) => send_library_delta(peer_addr, delta).await,
|
LibraryUpdate::Delta(delta) => {
|
||||||
|
send_library_delta(peer_addr, &local_peer_id, delta).await
|
||||||
|
}
|
||||||
LibraryUpdate::Snapshot(snapshot) => {
|
LibraryUpdate::Snapshot(snapshot) => {
|
||||||
send_library_snapshot(peer_addr, snapshot).await
|
send_library_snapshot(peer_addr, &local_peer_id, snapshot).await
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -293,15 +340,33 @@ mod tests {
|
|||||||
async fn inbound_hello_keeps_existing_listening_addr() {
|
async fn inbound_hello_keeps_existing_listening_addr() {
|
||||||
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
|
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
|
||||||
let advertised = addr([10, 66, 0, 2], 40000);
|
let advertised = addr([10, 66, 0, 2], 40000);
|
||||||
let transport_source = addr([10, 66, 0, 2], 52000);
|
|
||||||
peer_game_db
|
peer_game_db
|
||||||
.write()
|
.write()
|
||||||
.await
|
.await
|
||||||
.upsert_peer("peer".to_string(), advertised);
|
.upsert_peer("peer".to_string(), advertised);
|
||||||
|
|
||||||
let record_addr =
|
let record_addr = peer_record_addr(&peer_game_db, &"peer".to_string(), None).await;
|
||||||
peer_record_addr(&peer_game_db, &"peer".to_string(), Some(transport_source)).await;
|
|
||||||
|
|
||||||
assert_eq!(record_addr, Some(advertised));
|
assert_eq!(record_addr, Some(advertised));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn inbound_hello_prefers_reported_listening_addr() {
|
||||||
|
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
|
||||||
|
let advertised = addr([10, 66, 0, 2], 40000);
|
||||||
|
|
||||||
|
let record_addr =
|
||||||
|
peer_record_addr(&peer_game_db, &"peer".to_string(), Some(advertised)).await;
|
||||||
|
|
||||||
|
assert_eq!(record_addr, Some(advertised));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn inbound_hello_without_known_listener_is_not_recorded() {
|
||||||
|
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
|
||||||
|
|
||||||
|
let record_addr = peer_record_addr(&peer_game_db, &"peer".to_string(), None).await;
|
||||||
|
|
||||||
|
assert_eq!(record_addr, None);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ use crate::{
|
|||||||
events,
|
events,
|
||||||
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available},
|
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available},
|
||||||
peer::{send_game_file_chunk, send_game_file_data},
|
peer::{send_game_file_chunk, send_game_file_data},
|
||||||
remote_peer::ensure_peer_id_for_addr,
|
|
||||||
services::handshake::{
|
services::handshake::{
|
||||||
accept_inbound_hello,
|
accept_inbound_hello,
|
||||||
perform_handshake_with_peer,
|
perform_handshake_with_peer,
|
||||||
@@ -82,16 +81,16 @@ async fn dispatch_request(
|
|||||||
send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await
|
send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await
|
||||||
}
|
}
|
||||||
Request::ListGames => handle_list_games(ctx, framed_tx).await,
|
Request::ListGames => handle_list_games(ctx, framed_tx).await,
|
||||||
Request::LibrarySummary(summary) => {
|
Request::LibrarySummary { peer_id, summary } => {
|
||||||
handle_library_summary(ctx, remote_addr, summary).await;
|
handle_library_summary(ctx, peer_id, summary).await;
|
||||||
framed_tx
|
framed_tx
|
||||||
}
|
}
|
||||||
Request::LibrarySnapshot(snapshot) => {
|
Request::LibrarySnapshot { peer_id, snapshot } => {
|
||||||
handle_library_snapshot(ctx, remote_addr, snapshot).await;
|
handle_library_snapshot(ctx, peer_id, snapshot).await;
|
||||||
framed_tx
|
framed_tx
|
||||||
}
|
}
|
||||||
Request::LibraryDelta(delta) => {
|
Request::LibraryDelta { peer_id, delta } => {
|
||||||
handle_library_delta(ctx, remote_addr, delta).await;
|
handle_library_delta(ctx, peer_id, delta).await;
|
||||||
framed_tx
|
framed_tx
|
||||||
}
|
}
|
||||||
Request::GetGame { id } => handle_get_game(ctx, id, framed_tx).await,
|
Request::GetGame { id } => handle_get_game(ctx, id, framed_tx).await,
|
||||||
@@ -160,20 +159,16 @@ async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> Response
|
|||||||
send_response(framed_tx, Response::ListGames(games), "ListGames").await
|
send_response(framed_tx, Response::ListGames(games), "ListGames").await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_library_summary(
|
async fn handle_library_summary(ctx: &PeerCtx, peer_id: String, summary: LibrarySummary) {
|
||||||
ctx: &PeerCtx,
|
let (addr, previous_digest, previous_count, features) = {
|
||||||
remote_addr: Option<SocketAddr>,
|
|
||||||
summary: LibrarySummary,
|
|
||||||
) {
|
|
||||||
let Some(addr) = remote_addr else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
|
|
||||||
let (previous_digest, previous_count, features) = {
|
|
||||||
let db = ctx.peer_game_db.read().await;
|
let db = ctx.peer_game_db.read().await;
|
||||||
|
let Some(addr) = db.peer_addr(&peer_id) else {
|
||||||
|
log::debug!("Ignoring library summary from unknown peer {peer_id}");
|
||||||
|
return;
|
||||||
|
};
|
||||||
let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0));
|
let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0));
|
||||||
(
|
(
|
||||||
|
addr,
|
||||||
digest,
|
digest,
|
||||||
db.peer_game_count(&peer_id),
|
db.peer_game_count(&peer_id),
|
||||||
db.peer_features(&peer_id),
|
db.peer_features(&peer_id),
|
||||||
@@ -193,12 +188,14 @@ async fn handle_library_summary(
|
|||||||
if summary.library_digest != previous_digest || previous_count == 0 {
|
if summary.library_digest != previous_digest || previous_count == 0 {
|
||||||
ctx.task_tracker.spawn({
|
ctx.task_tracker.spawn({
|
||||||
let peer_id_arc = ctx.peer_id.clone();
|
let peer_id_arc = ctx.peer_id.clone();
|
||||||
|
let local_peer_addr = ctx.local_peer_addr.clone();
|
||||||
let local_library = ctx.local_library.clone();
|
let local_library = ctx.local_library.clone();
|
||||||
let peer_game_db = ctx.peer_game_db.clone();
|
let peer_game_db = ctx.peer_game_db.clone();
|
||||||
let tx_notify_ui = ctx.tx_notify_ui.clone();
|
let tx_notify_ui = ctx.tx_notify_ui.clone();
|
||||||
async move {
|
async move {
|
||||||
if let Err(err) = perform_handshake_with_peer(
|
if let Err(err) = perform_handshake_with_peer(
|
||||||
peer_id_arc,
|
peer_id_arc,
|
||||||
|
local_peer_addr,
|
||||||
local_library,
|
local_library,
|
||||||
peer_game_db,
|
peer_game_db,
|
||||||
tx_notify_ui,
|
tx_notify_ui,
|
||||||
@@ -214,28 +211,32 @@ async fn handle_library_summary(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_library_snapshot(
|
async fn handle_library_snapshot(ctx: &PeerCtx, peer_id: String, snapshot: LibrarySnapshot) {
|
||||||
ctx: &PeerCtx,
|
let applied = {
|
||||||
remote_addr: Option<SocketAddr>,
|
let mut db = ctx.peer_game_db.write().await;
|
||||||
snapshot: LibrarySnapshot,
|
if db.peer_addr(&peer_id).is_some() {
|
||||||
) {
|
|
||||||
if let Some(addr) = remote_addr {
|
|
||||||
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
|
|
||||||
{
|
|
||||||
let mut db = ctx.peer_game_db.write().await;
|
|
||||||
db.apply_library_snapshot(&peer_id, snapshot);
|
db.apply_library_snapshot(&peer_id, snapshot);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
log::debug!("Ignoring library snapshot from unknown peer {peer_id}");
|
||||||
|
false
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if applied {
|
||||||
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
|
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_library_delta(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, delta: LibraryDelta) {
|
async fn handle_library_delta(ctx: &PeerCtx, peer_id: String, delta: LibraryDelta) {
|
||||||
let Some(addr) = remote_addr else {
|
let Some(addr) = ({
|
||||||
|
let db = ctx.peer_game_db.read().await;
|
||||||
|
db.peer_addr(&peer_id)
|
||||||
|
}) else {
|
||||||
|
log::debug!("Ignoring library delta from unknown peer {peer_id}");
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
|
|
||||||
let applied = {
|
let applied = {
|
||||||
let mut db = ctx.peer_game_db.write().await;
|
let mut db = ctx.peer_game_db.write().await;
|
||||||
db.apply_library_delta(&peer_id, delta)
|
db.apply_library_delta(&peer_id, delta)
|
||||||
@@ -246,6 +247,7 @@ async fn handle_library_delta(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, de
|
|||||||
} else {
|
} else {
|
||||||
spawn_library_resync(
|
spawn_library_resync(
|
||||||
ctx.peer_id.clone(),
|
ctx.peer_id.clone(),
|
||||||
|
ctx.local_peer_addr.clone(),
|
||||||
ctx.local_library.clone(),
|
ctx.local_library.clone(),
|
||||||
ctx.peer_game_db.clone(),
|
ctx.peer_game_db.clone(),
|
||||||
ctx.tx_notify_ui.clone(),
|
ctx.tx_notify_ui.clone(),
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use lanspread_db::db::{Game, GameFileDescription};
|
use lanspread_db::db::{Game, GameFileDescription};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
pub const PROTOCOL_VERSION: u32 = 2;
|
pub const PROTOCOL_VERSION: u32 = 3;
|
||||||
|
|
||||||
pub use lanspread_db::db::Availability;
|
pub use lanspread_db::db::Availability;
|
||||||
|
|
||||||
@@ -22,6 +24,7 @@ pub struct GameSummary {
|
|||||||
pub struct Hello {
|
pub struct Hello {
|
||||||
pub peer_id: String,
|
pub peer_id: String,
|
||||||
pub proto_ver: u32,
|
pub proto_ver: u32,
|
||||||
|
pub listen_addr: Option<SocketAddr>,
|
||||||
pub library_rev: u64,
|
pub library_rev: u64,
|
||||||
pub library_digest: u64,
|
pub library_digest: u64,
|
||||||
pub features: Vec<String>,
|
pub features: Vec<String>,
|
||||||
@@ -31,6 +34,7 @@ pub struct Hello {
|
|||||||
pub struct HelloAck {
|
pub struct HelloAck {
|
||||||
pub peer_id: String,
|
pub peer_id: String,
|
||||||
pub proto_ver: u32,
|
pub proto_ver: u32,
|
||||||
|
pub listen_addr: Option<SocketAddr>,
|
||||||
pub library_rev: u64,
|
pub library_rev: u64,
|
||||||
pub library_digest: u64,
|
pub library_digest: u64,
|
||||||
pub features: Vec<String>,
|
pub features: Vec<String>,
|
||||||
@@ -73,9 +77,18 @@ pub enum Request {
|
|||||||
length: u64,
|
length: u64,
|
||||||
},
|
},
|
||||||
Hello(Hello),
|
Hello(Hello),
|
||||||
LibrarySummary(LibrarySummary),
|
LibrarySummary {
|
||||||
LibrarySnapshot(LibrarySnapshot),
|
peer_id: String,
|
||||||
LibraryDelta(LibraryDelta),
|
summary: LibrarySummary,
|
||||||
|
},
|
||||||
|
LibrarySnapshot {
|
||||||
|
peer_id: String,
|
||||||
|
snapshot: LibrarySnapshot,
|
||||||
|
},
|
||||||
|
LibraryDelta {
|
||||||
|
peer_id: String,
|
||||||
|
delta: LibraryDelta,
|
||||||
|
},
|
||||||
Goodbye {
|
Goodbye {
|
||||||
peer_id: String,
|
peer_id: String,
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user