wip
This commit is contained in:
@@ -1,8 +1,19 @@
|
|||||||
#![allow(clippy::missing_errors_doc)]
|
#![allow(clippy::missing_errors_doc)]
|
||||||
|
|
||||||
use std::{fs::File, io::Write, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
fs::File,
|
||||||
|
io::Write,
|
||||||
|
net::SocketAddr,
|
||||||
|
path::PathBuf,
|
||||||
|
sync::Arc,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
use lanspread_db::db::{Game, GameFileDescription};
|
use bytes::BytesMut;
|
||||||
|
use gethostname::gethostname;
|
||||||
|
use lanspread_db::db::{Game, GameDB, GameFileDescription};
|
||||||
|
use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, MdnsAdvertiser, discover_service};
|
||||||
use lanspread_proto::{Message, Request, Response};
|
use lanspread_proto::{Message, Request, Response};
|
||||||
use s2n_quic::{
|
use s2n_quic::{
|
||||||
Client as QuicClient,
|
Client as QuicClient,
|
||||||
@@ -19,6 +30,7 @@ use tokio::{
|
|||||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem"));
|
static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem"));
|
||||||
static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem"));
|
static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem"));
|
||||||
@@ -41,6 +53,98 @@ pub enum PeerEvent {
|
|||||||
},
|
},
|
||||||
PeerConnected(SocketAddr),
|
PeerConnected(SocketAddr),
|
||||||
PeerDisconnected(SocketAddr),
|
PeerDisconnected(SocketAddr),
|
||||||
|
PeerDiscovered(SocketAddr),
|
||||||
|
PeerLost(SocketAddr),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct PeerInfo {
|
||||||
|
pub addr: SocketAddr,
|
||||||
|
pub last_seen: Instant,
|
||||||
|
pub games: Vec<Game>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PeerGameDB {
|
||||||
|
peers: HashMap<SocketAddr, PeerInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PeerGameDB {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
peers: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_peer(&mut self, addr: SocketAddr) {
|
||||||
|
let peer_info = PeerInfo {
|
||||||
|
addr,
|
||||||
|
last_seen: Instant::now(),
|
||||||
|
games: Vec::new(),
|
||||||
|
};
|
||||||
|
self.peers.insert(addr, peer_info);
|
||||||
|
log::info!("Added peer: {addr}");
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_peer(&mut self, addr: &SocketAddr) -> Option<PeerInfo> {
|
||||||
|
self.peers.remove(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_peer_games(&mut self, addr: SocketAddr, games: Vec<Game>) {
|
||||||
|
if let Some(peer) = self.peers.get_mut(&addr) {
|
||||||
|
peer.games = games;
|
||||||
|
peer.last_seen = Instant::now();
|
||||||
|
log::info!("Updated games for peer: {addr}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_last_seen(&mut self, addr: &SocketAddr) {
|
||||||
|
if let Some(peer) = self.peers.get_mut(addr) {
|
||||||
|
peer.last_seen = Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_all_games(&self) -> Vec<Game> {
|
||||||
|
let mut all_games = Vec::new();
|
||||||
|
for peer in self.peers.values() {
|
||||||
|
all_games.extend(peer.games.clone());
|
||||||
|
}
|
||||||
|
all_games
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_latest_version_for_game(&self, game_id: &str) -> Option<String> {
|
||||||
|
let mut latest_version: Option<String> = None;
|
||||||
|
|
||||||
|
for peer in self.peers.values() {
|
||||||
|
if let Some(game) = peer.games.iter().find(|g| g.id == game_id) {
|
||||||
|
if let Some(ref version) = game.eti_game_version {
|
||||||
|
match &latest_version {
|
||||||
|
None => latest_version = Some(version.clone()),
|
||||||
|
Some(current_latest) => {
|
||||||
|
// Simple string comparison for now - could use semver
|
||||||
|
if version > current_latest {
|
||||||
|
latest_version = Some(version.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
latest_version
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_peer_addresses(&self) -> Vec<SocketAddr> {
|
||||||
|
self.peers.keys().copied().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_stale_peers(&self, timeout: Duration) -> Vec<SocketAddr> {
|
||||||
|
self.peers
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, peer)| peer.last_seen.elapsed() > timeout)
|
||||||
|
.map(|(addr, _)| *addr)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -174,11 +278,15 @@ async fn download_game_files(
|
|||||||
|
|
||||||
struct Ctx {
|
struct Ctx {
|
||||||
game_dir: Arc<RwLock<Option<String>>>,
|
game_dir: Arc<RwLock<Option<String>>>,
|
||||||
|
local_game_db: Arc<RwLock<Option<GameDB>>>,
|
||||||
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
struct PeerCtx {
|
struct PeerCtx {
|
||||||
game_dir: Arc<RwLock<Option<String>>>,
|
game_dir: Arc<RwLock<Option<String>>>,
|
||||||
|
local_game_db: Arc<RwLock<Option<GameDB>>>,
|
||||||
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_peer(
|
pub async fn run_peer(
|
||||||
@@ -188,10 +296,14 @@ pub async fn run_peer(
|
|||||||
// peer context
|
// peer context
|
||||||
let ctx = Ctx {
|
let ctx = Ctx {
|
||||||
game_dir: Arc::new(RwLock::new(None)),
|
game_dir: Arc::new(RwLock::new(None)),
|
||||||
|
local_game_db: Arc::new(RwLock::new(None)),
|
||||||
|
peer_game_db: Arc::new(RwLock::new(PeerGameDB::new())),
|
||||||
};
|
};
|
||||||
|
|
||||||
let peer_ctx = PeerCtx {
|
let peer_ctx = PeerCtx {
|
||||||
game_dir: ctx.game_dir.clone(),
|
game_dir: ctx.game_dir.clone(),
|
||||||
|
local_game_db: ctx.local_game_db.clone(),
|
||||||
|
peer_game_db: ctx.peer_game_db.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start server component
|
// Start server component
|
||||||
@@ -206,6 +318,20 @@ pub async fn run_peer(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Start peer discovery task
|
||||||
|
let tx_notify_ui_discovery = tx_notify_ui.clone();
|
||||||
|
let peer_game_db_discovery = ctx.peer_game_db.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
run_peer_discovery(tx_notify_ui_discovery, peer_game_db_discovery).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Start ping service task
|
||||||
|
let tx_notify_ui_ping = tx_notify_ui.clone();
|
||||||
|
let peer_game_db_ping = ctx.peer_game_db.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
run_ping_service(tx_notify_ui_ping, peer_game_db_ping).await;
|
||||||
|
});
|
||||||
|
|
||||||
// Handle client commands
|
// Handle client commands
|
||||||
loop {
|
loop {
|
||||||
let Some(cmd) = rx_control.recv().await else {
|
let Some(cmd) = rx_control.recv().await else {
|
||||||
@@ -214,11 +340,14 @@ pub async fn run_peer(
|
|||||||
|
|
||||||
match cmd {
|
match cmd {
|
||||||
PeerCommand::ListGames => {
|
PeerCommand::ListGames => {
|
||||||
// TODO: Implement peer discovery and game listing
|
|
||||||
log::info!("ListGames command received");
|
log::info!("ListGames command received");
|
||||||
|
let all_games = { ctx.peer_game_db.read().await.get_all_games() };
|
||||||
|
if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(all_games)) {
|
||||||
|
log::error!("Failed to send ListGames event: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
PeerCommand::GetGame(id) => {
|
PeerCommand::GetGame(id) => {
|
||||||
log::info!("Requesting game from peer: {id}");
|
log::info!("Requesting game from peers: {id}");
|
||||||
// TODO: Implement game fetching from peers
|
// TODO: Implement game fetching from peers
|
||||||
}
|
}
|
||||||
PeerCommand::DownloadGameFiles {
|
PeerCommand::DownloadGameFiles {
|
||||||
@@ -237,10 +366,11 @@ pub async fn run_peer(
|
|||||||
PeerCommand::SetGameDir(game_dir) => {
|
PeerCommand::SetGameDir(game_dir) => {
|
||||||
*ctx.game_dir.write().await = Some(game_dir.clone());
|
*ctx.game_dir.write().await = Some(game_dir.clone());
|
||||||
log::info!("Game directory set to: {game_dir}");
|
log::info!("Game directory set to: {game_dir}");
|
||||||
|
// TODO: Load local game database when game directory is set
|
||||||
}
|
}
|
||||||
PeerCommand::ConnectToPeer(peer_addr) => {
|
PeerCommand::ConnectToPeer(peer_addr) => {
|
||||||
log::info!("Connecting to peer: {peer_addr}");
|
log::info!("Connecting to peer: {peer_addr}");
|
||||||
// TODO: Implement peer connection
|
// TODO: Implement direct peer connection
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -266,7 +396,42 @@ async fn run_server_component(
|
|||||||
let server_addr = server.local_addr()?;
|
let server_addr = server.local_addr()?;
|
||||||
log::info!("Peer server listening on {server_addr}");
|
log::info!("Peer server listening on {server_addr}");
|
||||||
|
|
||||||
// TODO: Implement mDNS advertising for peer discovery
|
// Start mDNS advertising for peer discovery
|
||||||
|
let peer_id = Uuid::now_v7().simple().to_string();
|
||||||
|
let hostname = gethostname::gethostname();
|
||||||
|
let hostname_str = hostname.to_str().unwrap_or("");
|
||||||
|
|
||||||
|
// Calculate maximum hostname length that fits with UUID in 63 char limit
|
||||||
|
let max_hostname_len = 63usize.saturating_sub(peer_id.len() + 1);
|
||||||
|
let truncated_hostname = if hostname_str.len() > max_hostname_len {
|
||||||
|
hostname_str.get(..max_hostname_len).unwrap_or(hostname_str)
|
||||||
|
} else {
|
||||||
|
hostname_str
|
||||||
|
};
|
||||||
|
|
||||||
|
let combined_str = if truncated_hostname.is_empty() {
|
||||||
|
peer_id
|
||||||
|
} else {
|
||||||
|
format!("{truncated_hostname}-{peer_id}")
|
||||||
|
};
|
||||||
|
|
||||||
|
let mdns = MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, server_addr)?;
|
||||||
|
|
||||||
|
// Monitor mDNS events
|
||||||
|
let _tx_notify_ui_mdns = tx_notify_ui.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Ok(event) = mdns.monitor.recv() {
|
||||||
|
match event {
|
||||||
|
lanspread_mdns::DaemonEvent::Error(e) => {
|
||||||
|
log::error!("mDNS error: {e}");
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
log::trace!("mDNS event: {:?}", event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
while let Some(connection) = server.accept().await {
|
while let Some(connection) = server.accept().await {
|
||||||
let ctx = ctx.clone();
|
let ctx = ctx.clone();
|
||||||
@@ -320,19 +485,20 @@ async fn handle_peer_stream(
|
|||||||
) -> eyre::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
let (mut rx, mut tx) = stream.split();
|
let (mut rx, mut tx) = stream.split();
|
||||||
|
|
||||||
log::trace!("{remote_addr:?} peer stream opened");
|
log::trace!("{:?} peer stream opened", remote_addr);
|
||||||
|
|
||||||
// handle streams
|
// handle streams
|
||||||
loop {
|
loop {
|
||||||
match rx.receive().await {
|
match rx.receive().await {
|
||||||
Ok(Some(data)) => {
|
Ok(Some(data)) => {
|
||||||
log::trace!(
|
log::trace!(
|
||||||
"{remote_addr:?} msg: (raw): {}",
|
"{:?} msg: (raw): {}",
|
||||||
|
remote_addr,
|
||||||
String::from_utf8_lossy(&data)
|
String::from_utf8_lossy(&data)
|
||||||
);
|
);
|
||||||
|
|
||||||
let request = Request::decode(data);
|
let request = Request::decode(data);
|
||||||
log::debug!("{remote_addr:?} msg: {request:?}");
|
log::debug!("{:?} msg: {request:?}", remote_addr);
|
||||||
|
|
||||||
match request {
|
match request {
|
||||||
Request::Ping => {
|
Request::Ping => {
|
||||||
@@ -342,12 +508,38 @@ async fn handle_peer_stream(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Request::ListGames => {
|
Request::ListGames => {
|
||||||
// TODO: Return list of games from this peer
|
// Return list of games from this peer
|
||||||
log::info!("Received ListGames request from peer");
|
log::info!("Received ListGames request from peer");
|
||||||
|
let games = if let Some(ref db) = *ctx.local_game_db.read().await {
|
||||||
|
db.all_games().into_iter().cloned().collect()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = tx.send(Response::ListGames(games).encode()).await {
|
||||||
|
log::error!("Failed to send ListGames response: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Request::GetGame { id } => {
|
Request::GetGame { id } => {
|
||||||
log::info!("Received GetGame request for {id} from peer");
|
log::info!("Received GetGame request for {id} from peer");
|
||||||
// TODO: Handle game request
|
// TODO: Handle game request using local game DB
|
||||||
|
let response = if let Some(ref db) = *ctx.local_game_db.read().await {
|
||||||
|
if db.get_game_by_id(&id).is_some() {
|
||||||
|
// TODO: Return actual game file descriptions
|
||||||
|
Response::GetGame {
|
||||||
|
id,
|
||||||
|
file_descriptions: Vec::new(),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Response::GameNotFound(id)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Response::GameNotFound(id)
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = tx.send(response.encode()).await {
|
||||||
|
log::error!("Failed to send GetGame response: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Request::GetGameFileData(desc) => {
|
Request::GetGameFileData(desc) => {
|
||||||
log::info!(
|
log::info!(
|
||||||
@@ -355,6 +547,18 @@ async fn handle_peer_stream(
|
|||||||
desc.relative_path
|
desc.relative_path
|
||||||
);
|
);
|
||||||
// TODO: Handle file data request
|
// TODO: Handle file data request
|
||||||
|
if let Err(e) = tx
|
||||||
|
.send(
|
||||||
|
Response::InvalidRequest(
|
||||||
|
desc.relative_path.as_bytes().to_vec().into(),
|
||||||
|
"File transfer not implemented yet".to_string(),
|
||||||
|
)
|
||||||
|
.encode(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
log::error!("Failed to send GetGameFileData response: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Request::Invalid(_, _) => {
|
Request::Invalid(_, _) => {
|
||||||
log::error!("Received invalid request from peer");
|
log::error!("Received invalid request from peer");
|
||||||
@@ -362,11 +566,11 @@ async fn handle_peer_stream(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
log::trace!("{remote_addr:?} peer stream closed");
|
log::trace!("{:?} peer stream closed", remote_addr);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("{remote_addr:?} peer stream error: {e}");
|
log::error!("{:?} peer stream error: {e}", remote_addr);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -374,3 +578,206 @@ async fn handle_peer_stream(
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn run_peer_discovery(
|
||||||
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||||
|
) {
|
||||||
|
log::info!("Starting peer discovery task");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match discover_service(LANSPREAD_SERVICE_TYPE) {
|
||||||
|
Ok(peer_addr) => {
|
||||||
|
log::info!("Discovered peer at: {peer_addr}");
|
||||||
|
|
||||||
|
// Add peer to database
|
||||||
|
let is_new_peer = {
|
||||||
|
let mut db = peer_game_db.write().await;
|
||||||
|
let peer_addresses = db.get_peer_addresses();
|
||||||
|
if !peer_addresses.contains(&peer_addr) {
|
||||||
|
db.add_peer(peer_addr);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_new_peer {
|
||||||
|
// Notify UI about new peer
|
||||||
|
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDiscovered(peer_addr)) {
|
||||||
|
log::error!("Failed to send PeerDiscovered event: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request games from this peer
|
||||||
|
let tx_notify_ui_clone = tx_notify_ui.clone();
|
||||||
|
let peer_game_db_clone = peer_game_db.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = request_games_from_peer(
|
||||||
|
peer_addr,
|
||||||
|
tx_notify_ui_clone,
|
||||||
|
peer_game_db_clone,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
log::error!("Failed to request games from peer {peer_addr}: {e}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::debug!("Peer discovery error: {e}");
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait before next discovery cycle
|
||||||
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn request_games_from_peer(
|
||||||
|
peer_addr: SocketAddr,
|
||||||
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||||
|
|
||||||
|
let client = QuicClient::builder()
|
||||||
|
.with_tls(CERT_PEM)?
|
||||||
|
.with_io("0.0.0.0:0")?
|
||||||
|
.with_limits(limits)?
|
||||||
|
.start()?;
|
||||||
|
|
||||||
|
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||||
|
let mut conn = client.connect(conn).await?;
|
||||||
|
|
||||||
|
let stream = conn.open_bidirectional_stream().await?;
|
||||||
|
let (mut rx, mut tx) = stream.split();
|
||||||
|
|
||||||
|
// Send ListGames request
|
||||||
|
tx.send(Request::ListGames.encode()).await?;
|
||||||
|
let _ = tx.close().await;
|
||||||
|
|
||||||
|
// Receive response
|
||||||
|
let mut data = BytesMut::new();
|
||||||
|
while let Ok(Some(bytes)) = rx.receive().await {
|
||||||
|
data.extend_from_slice(&bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = Response::decode(data.freeze());
|
||||||
|
match response {
|
||||||
|
Response::ListGames(games) => {
|
||||||
|
log::info!("Received {} games from peer {peer_addr}", games.len());
|
||||||
|
|
||||||
|
// Update peer games in database
|
||||||
|
{
|
||||||
|
let mut db = peer_game_db.write().await;
|
||||||
|
db.update_peer_games(peer_addr, games.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify UI about updated games
|
||||||
|
if let Err(e) = tx_notify_ui.send(PeerEvent::ListGames(games)) {
|
||||||
|
log::error!("Failed to send ListGames event: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
log::warn!("Unexpected response from peer {peer_addr}: {response:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_ping_service(
|
||||||
|
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||||
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||||
|
) {
|
||||||
|
log::info!("Starting ping service (10s interval)");
|
||||||
|
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(10));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
let peer_addresses = { peer_game_db.read().await.get_peer_addresses() };
|
||||||
|
|
||||||
|
for peer_addr in peer_addresses {
|
||||||
|
let tx_notify_ui_clone = tx_notify_ui.clone();
|
||||||
|
let peer_game_db_clone = peer_game_db.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
match ping_peer(peer_addr).await {
|
||||||
|
Ok(is_alive) => {
|
||||||
|
if is_alive {
|
||||||
|
// Update last seen time
|
||||||
|
peer_game_db_clone
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.update_last_seen(&peer_addr);
|
||||||
|
} else {
|
||||||
|
log::warn!("Peer {peer_addr} failed ping check");
|
||||||
|
|
||||||
|
// Remove stale peer
|
||||||
|
let removed_peer =
|
||||||
|
peer_game_db_clone.write().await.remove_peer(&peer_addr);
|
||||||
|
if removed_peer.is_some() {
|
||||||
|
log::info!("Removed stale peer: {peer_addr}");
|
||||||
|
if let Err(e) =
|
||||||
|
tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr))
|
||||||
|
{
|
||||||
|
log::error!("Failed to send PeerLost event: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to ping peer {peer_addr}: {e}");
|
||||||
|
|
||||||
|
// Remove peer on error
|
||||||
|
let removed_peer = peer_game_db_clone.write().await.remove_peer(&peer_addr);
|
||||||
|
if removed_peer.is_some() {
|
||||||
|
log::info!("Removed peer due to ping error: {peer_addr}");
|
||||||
|
if let Err(e) = tx_notify_ui_clone.send(PeerEvent::PeerLost(peer_addr))
|
||||||
|
{
|
||||||
|
log::error!("Failed to send PeerLost event: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also clean up stale peers
|
||||||
|
let stale_peers = {
|
||||||
|
peer_game_db
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.get_stale_peers(Duration::from_secs(30))
|
||||||
|
};
|
||||||
|
for stale_addr in stale_peers {
|
||||||
|
let removed_peer = peer_game_db.write().await.remove_peer(&stale_addr);
|
||||||
|
if removed_peer.is_some() {
|
||||||
|
log::info!("Removed stale peer: {stale_addr}");
|
||||||
|
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerLost(stale_addr)) {
|
||||||
|
log::error!("Failed to send PeerLost event: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ping_peer(peer_addr: SocketAddr) -> eyre::Result<bool> {
|
||||||
|
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||||
|
|
||||||
|
let client = QuicClient::builder()
|
||||||
|
.with_tls(CERT_PEM)?
|
||||||
|
.with_io("0.0.0.0:0")?
|
||||||
|
.with_limits(limits)?
|
||||||
|
.start()?;
|
||||||
|
|
||||||
|
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||||
|
let mut conn = client.connect(conn).await?;
|
||||||
|
|
||||||
|
initial_peer_alive_check(&mut conn).await;
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
|||||||
@@ -152,6 +152,12 @@ async fn main() -> eyre::Result<()> {
|
|||||||
PeerEvent::PeerDisconnected(addr) => {
|
PeerEvent::PeerDisconnected(addr) => {
|
||||||
tracing::info!("Peer disconnected: {}", addr);
|
tracing::info!("Peer disconnected: {}", addr);
|
||||||
}
|
}
|
||||||
|
PeerEvent::PeerDiscovered(addr) => {
|
||||||
|
tracing::info!("Peer discovered: {}", addr);
|
||||||
|
}
|
||||||
|
PeerEvent::PeerLost(addr) => {
|
||||||
|
tracing::info!("Peer lost: {}", addr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user