wip
This commit is contained in:
Generated
+27
@@ -2329,6 +2329,33 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lanspread-peer"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"chrono",
|
||||
"clap",
|
||||
"eyre",
|
||||
"gethostname",
|
||||
"itertools 0.14.0",
|
||||
"lanspread-compat",
|
||||
"lanspread-db",
|
||||
"lanspread-mdns",
|
||||
"lanspread-proto",
|
||||
"lanspread-utils",
|
||||
"log",
|
||||
"mimalloc",
|
||||
"s2n-quic",
|
||||
"semver",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lanspread-proto"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -7,6 +7,7 @@ members = [
|
||||
"crates/lanspread-proto",
|
||||
"crates/lanspread-server",
|
||||
"crates/lanspread-client",
|
||||
"crates/lanspread-peer",
|
||||
"crates/lanspread-tauri-deno-ts/src-tauri",
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
[package]
|
||||
name = "lanspread-peer"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[lints.rust]
|
||||
unsafe_code = "forbid"
|
||||
|
||||
[lints.clippy]
|
||||
pedantic = { level = "warn", priority = -1 }
|
||||
todo = "warn"
|
||||
unwrap_used = "warn"
|
||||
|
||||
[dependencies]
|
||||
# local
|
||||
lanspread-compat = { path = "../lanspread-compat" }
|
||||
lanspread-db = { path = "../lanspread-db" }
|
||||
lanspread-mdns = { path = "../lanspread-mdns" }
|
||||
lanspread-proto = { path = "../lanspread-proto" }
|
||||
lanspread-utils = { path = "../lanspread-utils" }
|
||||
|
||||
# external
|
||||
bytes = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
eyre = { workspace = true }
|
||||
gethostname = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
log = { workspace = true }
|
||||
mimalloc = { workspace = true }
|
||||
s2n-quic = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
semver = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
walkdir = { workspace = true }
|
||||
@@ -0,0 +1,23 @@
|
||||
use std::{net::IpAddr, path::PathBuf};
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
#[allow(clippy::doc_markdown)]
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Cli {
|
||||
/// IP address to bind to.
|
||||
#[clap(long)]
|
||||
pub ip: IpAddr,
|
||||
/// Listen port.
|
||||
#[clap(long)]
|
||||
pub port: u16,
|
||||
/// Game database path (SQLite).
|
||||
#[clap(long)]
|
||||
pub db: PathBuf,
|
||||
/// Games folder.
|
||||
#[clap(long)]
|
||||
pub game_dir: PathBuf,
|
||||
/// Thumbnails folder.
|
||||
#[clap(long)]
|
||||
pub thumbs_dir: PathBuf,
|
||||
}
|
||||
@@ -0,0 +1,376 @@
|
||||
#![allow(clippy::missing_errors_doc)]
|
||||
|
||||
use std::{fs::File, io::Write, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
|
||||
|
||||
use lanspread_db::db::{Game, GameFileDescription};
|
||||
use lanspread_proto::{Message, Request, Response};
|
||||
use s2n_quic::{
|
||||
Client as QuicClient,
|
||||
Connection,
|
||||
Server,
|
||||
client::Connect,
|
||||
provider::limits::Limits,
|
||||
stream::BidirectionalStream,
|
||||
};
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
sync::{
|
||||
RwLock,
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
},
|
||||
};
|
||||
|
||||
static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem"));
|
||||
static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem"));
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum PeerEvent {
|
||||
ListGames(Vec<Game>),
|
||||
GotGameFiles {
|
||||
id: String,
|
||||
file_descriptions: Vec<GameFileDescription>,
|
||||
},
|
||||
DownloadGameFilesBegin {
|
||||
id: String,
|
||||
},
|
||||
DownloadGameFilesFinished {
|
||||
id: String,
|
||||
},
|
||||
DownloadGameFilesFailed {
|
||||
id: String,
|
||||
},
|
||||
PeerConnected(SocketAddr),
|
||||
PeerDisconnected(SocketAddr),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum PeerCommand {
|
||||
ListGames,
|
||||
GetGame(String),
|
||||
DownloadGameFiles {
|
||||
id: String,
|
||||
file_descriptions: Vec<GameFileDescription>,
|
||||
},
|
||||
SetGameDir(String),
|
||||
ConnectToPeer(SocketAddr),
|
||||
}
|
||||
|
||||
async fn initial_peer_alive_check(conn: &mut Connection) -> bool {
|
||||
let stream = match conn.open_bidirectional_stream().await {
|
||||
Ok(stream) => stream,
|
||||
Err(e) => {
|
||||
log::error!("failed to open stream: {e}");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
|
||||
// send ping
|
||||
if let Err(e) = tx.send(Request::Ping.encode()).await {
|
||||
log::error!("failed to send ping to peer: {e}");
|
||||
return false;
|
||||
}
|
||||
let _ = tx.close().await;
|
||||
|
||||
// receive pong
|
||||
if let Ok(Some(response)) = rx.receive().await {
|
||||
let response = Response::decode(response);
|
||||
match response {
|
||||
Response::Pong => {
|
||||
log::info!("peer is alive");
|
||||
return true;
|
||||
}
|
||||
_ => {
|
||||
log::error!("peer sent invalid response to ping: {response:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
async fn receive_game_file(
|
||||
conn: &mut Connection,
|
||||
desc: &GameFileDescription,
|
||||
games_folder: &str,
|
||||
) -> eyre::Result<()> {
|
||||
log::info!("downloading: {desc:?}");
|
||||
|
||||
let stream = conn.open_bidirectional_stream().await?;
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
|
||||
let request = Request::GetGameFileData(desc.clone());
|
||||
|
||||
// request file
|
||||
tx.write_all(&request.encode()).await?;
|
||||
|
||||
// create file
|
||||
let path = PathBuf::from(&games_folder).join(&desc.relative_path);
|
||||
let mut file = File::create(&path)?;
|
||||
|
||||
// receive file contents
|
||||
while let Some(data) = rx.receive().await? {
|
||||
file.write_all(&data)?;
|
||||
}
|
||||
log::debug!("file download complete: {}", path.display());
|
||||
|
||||
tx.close().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download_game_files(
|
||||
game_id: &str,
|
||||
game_file_descs: Vec<GameFileDescription>,
|
||||
games_folder: String,
|
||||
peer_addr: SocketAddr,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
|
||||
|
||||
let client = QuicClient::builder()
|
||||
.with_tls(CERT_PEM)?
|
||||
.with_io("0.0.0.0:0")?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let conn = Connect::new(peer_addr).with_server_name("localhost");
|
||||
let mut conn = client.connect(conn).await?;
|
||||
conn.keep_alive(true)?;
|
||||
|
||||
let game_files = game_file_descs
|
||||
.iter()
|
||||
.filter(|desc| !desc.is_dir)
|
||||
.filter(|desc| !desc.is_version_ini())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if game_files.is_empty() {
|
||||
eyre::bail!("game_file_descs empty: no game files to download");
|
||||
}
|
||||
|
||||
tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin {
|
||||
id: game_id.to_string(),
|
||||
})?;
|
||||
|
||||
// receive all game files
|
||||
for file_desc in game_files {
|
||||
receive_game_file(&mut conn, file_desc, &games_folder).await?;
|
||||
}
|
||||
|
||||
let version_file_desc = game_file_descs
|
||||
.iter()
|
||||
.find(|desc| desc.is_version_ini())
|
||||
.ok_or_else(|| eyre::eyre!("version.ini not found"))?;
|
||||
|
||||
// receive version.ini
|
||||
receive_game_file(&mut conn, version_file_desc, &games_folder).await?;
|
||||
|
||||
log::info!("all files downloaded for game: {game_id}");
|
||||
tx_notify_ui.send(PeerEvent::DownloadGameFilesFinished {
|
||||
id: game_id.to_string(),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct Ctx {
|
||||
game_dir: Arc<RwLock<Option<String>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct PeerCtx {
|
||||
game_dir: Arc<RwLock<Option<String>>>,
|
||||
}
|
||||
|
||||
pub async fn run_peer(
|
||||
mut rx_control: UnboundedReceiver<PeerCommand>,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
// peer context
|
||||
let ctx = Ctx {
|
||||
game_dir: Arc::new(RwLock::new(None)),
|
||||
};
|
||||
|
||||
let peer_ctx = PeerCtx {
|
||||
game_dir: ctx.game_dir.clone(),
|
||||
};
|
||||
|
||||
// Start server component
|
||||
let server_addr = "0.0.0.0:0".parse::<SocketAddr>()?;
|
||||
let tx_notify_ui_clone = tx_notify_ui.clone();
|
||||
let peer_ctx_clone = peer_ctx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_server_component(server_addr, peer_ctx_clone, tx_notify_ui_clone).await
|
||||
{
|
||||
log::error!("Server component error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
// Handle client commands
|
||||
loop {
|
||||
let Some(cmd) = rx_control.recv().await else {
|
||||
break;
|
||||
};
|
||||
|
||||
match cmd {
|
||||
PeerCommand::ListGames => {
|
||||
// TODO: Implement peer discovery and game listing
|
||||
log::info!("ListGames command received");
|
||||
}
|
||||
PeerCommand::GetGame(id) => {
|
||||
log::info!("Requesting game from peer: {id}");
|
||||
// TODO: Implement game fetching from peers
|
||||
}
|
||||
PeerCommand::DownloadGameFiles {
|
||||
id,
|
||||
file_descriptions: _,
|
||||
} => {
|
||||
log::info!("Got PeerCommand::DownloadGameFiles");
|
||||
let games_folder = { ctx.game_dir.read().await.clone() };
|
||||
if let Some(_games_folder) = games_folder {
|
||||
// TODO: Implement peer file downloading
|
||||
log::info!("Would download game files for {id}");
|
||||
} else {
|
||||
log::error!("Cannot handle game file descriptions: games_folder is not set");
|
||||
}
|
||||
}
|
||||
PeerCommand::SetGameDir(game_dir) => {
|
||||
*ctx.game_dir.write().await = Some(game_dir.clone());
|
||||
log::info!("Game directory set to: {game_dir}");
|
||||
}
|
||||
PeerCommand::ConnectToPeer(peer_addr) => {
|
||||
log::info!("Connecting to peer: {peer_addr}");
|
||||
// TODO: Implement peer connection
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_server_component(
|
||||
addr: SocketAddr,
|
||||
ctx: PeerCtx,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
let limits = Limits::default()
|
||||
.with_max_handshake_duration(Duration::from_secs(3))?
|
||||
.with_max_idle_timeout(Duration::from_secs(3))?;
|
||||
|
||||
let mut server = Server::builder()
|
||||
.with_tls((CERT_PEM, KEY_PEM))?
|
||||
.with_io(addr)?
|
||||
.with_limits(limits)?
|
||||
.start()?;
|
||||
|
||||
let server_addr = server.local_addr()?;
|
||||
log::info!("Peer server listening on {server_addr}");
|
||||
|
||||
// TODO: Implement mDNS advertising for peer discovery
|
||||
|
||||
while let Some(connection) = server.accept().await {
|
||||
let ctx = ctx.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_peer_connection(connection, ctx, tx_notify_ui).await {
|
||||
log::error!("Peer connection error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_peer_connection(
|
||||
mut connection: Connection,
|
||||
ctx: PeerCtx,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
let remote_addr = connection.remote_addr()?;
|
||||
log::info!("{remote_addr} peer connected");
|
||||
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerConnected(remote_addr)) {
|
||||
log::error!("Failed to send PeerConnected event: {e}");
|
||||
}
|
||||
|
||||
// handle streams
|
||||
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
|
||||
let ctx = ctx.clone();
|
||||
let remote_addr = Some(remote_addr);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_peer_stream(stream, ctx, remote_addr).await {
|
||||
log::error!("{remote_addr:?} peer stream error: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if let Err(e) = tx_notify_ui.send(PeerEvent::PeerDisconnected(remote_addr)) {
|
||||
log::error!("Failed to send PeerDisconnected event: {e}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_peer_stream(
|
||||
stream: BidirectionalStream,
|
||||
ctx: PeerCtx,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
) -> eyre::Result<()> {
|
||||
let (mut rx, mut tx) = stream.split();
|
||||
|
||||
log::trace!("{remote_addr:?} peer stream opened");
|
||||
|
||||
// handle streams
|
||||
loop {
|
||||
match rx.receive().await {
|
||||
Ok(Some(data)) => {
|
||||
log::trace!(
|
||||
"{remote_addr:?} msg: (raw): {}",
|
||||
String::from_utf8_lossy(&data)
|
||||
);
|
||||
|
||||
let request = Request::decode(data);
|
||||
log::debug!("{remote_addr:?} msg: {request:?}");
|
||||
|
||||
match request {
|
||||
Request::Ping => {
|
||||
// Respond with pong
|
||||
if let Err(e) = tx.send(Response::Pong.encode()).await {
|
||||
log::error!("Failed to send pong: {e}");
|
||||
}
|
||||
}
|
||||
Request::ListGames => {
|
||||
// TODO: Return list of games from this peer
|
||||
log::info!("Received ListGames request from peer");
|
||||
}
|
||||
Request::GetGame { id } => {
|
||||
log::info!("Received GetGame request for {id} from peer");
|
||||
// TODO: Handle game request
|
||||
}
|
||||
Request::GetGameFileData(desc) => {
|
||||
log::info!(
|
||||
"Received GetGameFileData request for {} from peer",
|
||||
desc.relative_path
|
||||
);
|
||||
// TODO: Handle file data request
|
||||
}
|
||||
Request::Invalid(_, _) => {
|
||||
log::error!("Received invalid request from peer");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
log::trace!("{remote_addr:?} peer stream closed");
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("{remote_addr:?} peer stream error: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,166 @@
|
||||
use mimalloc::MiMalloc;
|
||||
|
||||
#[global_allocator]
|
||||
static GLOBAL: MiMalloc = MiMalloc;
|
||||
|
||||
mod cli;
|
||||
mod peer;
|
||||
|
||||
use std::{net::SocketAddr, time::Duration};
|
||||
|
||||
use clap::Parser as _;
|
||||
use cli::Cli;
|
||||
use gethostname::gethostname;
|
||||
use lanspread_compat::eti;
|
||||
use lanspread_db::db::{Game, GameDB};
|
||||
use lanspread_mdns::{DaemonEvent, LANSPREAD_SERVICE_TYPE, MdnsAdvertiser};
|
||||
use lanspread_peer::{PeerEvent, run_peer};
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use uuid::Uuid;
|
||||
|
||||
fn spawn_mdns_task(server_addr: SocketAddr) -> eyre::Result<()> {
|
||||
let peer_id = Uuid::now_v7().simple().to_string();
|
||||
|
||||
let hostname = gethostname();
|
||||
let hostname_str = hostname.to_str().unwrap_or("");
|
||||
|
||||
// Calculate maximum hostname length that fits with UUID in 63 char limit
|
||||
let max_hostname_len = 63usize.saturating_sub(peer_id.len() + 1); // +1 for the dash
|
||||
let truncated_hostname = if hostname_str.len() > max_hostname_len {
|
||||
hostname_str.get(..max_hostname_len).unwrap_or(hostname_str)
|
||||
} else {
|
||||
hostname_str
|
||||
};
|
||||
|
||||
let combined_str = if truncated_hostname.is_empty() {
|
||||
// If no hostname is available, use just the UUID
|
||||
peer_id
|
||||
} else {
|
||||
format!("{truncated_hostname}-{peer_id}")
|
||||
};
|
||||
|
||||
let mdns = MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, server_addr)?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Ok(event) = mdns.monitor.recv() {
|
||||
tracing::trace!("mDNS: {:?}", &event);
|
||||
if let DaemonEvent::Error(e) = event {
|
||||
tracing::error!("mDNS: {e}");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn prepare_game_db(cli: &Cli) -> eyre::Result<GameDB> {
|
||||
// build games from ETI database
|
||||
let mut games: Vec<Game> = eti::get_games(&cli.db)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect();
|
||||
|
||||
// filter out games that the peer does not have in game_dir
|
||||
games.retain(|game| cli.game_dir.join(&game.id).is_dir());
|
||||
|
||||
// read version.ini files and update eti_game_version
|
||||
for game in &mut games {
|
||||
let game_dir = cli.game_dir.join(&game.id);
|
||||
if let Ok(version) = lanspread_db::db::read_version_from_ini(&game_dir) {
|
||||
game.eti_game_version = version;
|
||||
if let Some(ref version) = game.eti_game_version {
|
||||
tracing::debug!("Read version for game {}: {}", game.id, version);
|
||||
}
|
||||
} else {
|
||||
tracing::warn!("Failed to read version.ini for game: {}", game.id);
|
||||
}
|
||||
}
|
||||
|
||||
let mut game_db = GameDB::from(games);
|
||||
|
||||
game_db.add_thumbnails(&cli.thumbs_dir);
|
||||
|
||||
game_db.all_games().iter().for_each(|game| {
|
||||
tracing::debug!("Found game: {game}");
|
||||
});
|
||||
tracing::info!("Prepared game database with {} games", game_db.games.len());
|
||||
|
||||
Ok(game_db)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> eyre::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
assert!(
|
||||
cli.game_dir.exists(),
|
||||
"Games folder does not exist: {}",
|
||||
cli.game_dir.to_str().expect("Invalid path")
|
||||
);
|
||||
|
||||
let server_addr = SocketAddr::from((cli.ip, cli.port));
|
||||
|
||||
// spawn mDNS listener task
|
||||
spawn_mdns_task(server_addr)?;
|
||||
|
||||
let game_db = prepare_game_db(&cli).await?;
|
||||
|
||||
tracing::info!("Peer listening on {server_addr}");
|
||||
|
||||
let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (tx_notify_ui, mut rx_notify_ui) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
// Start peer task
|
||||
let peer_task = tokio::spawn(async move { run_peer(rx_control, tx_notify_ui).await });
|
||||
|
||||
// Handle events from peer
|
||||
let event_handler = tokio::spawn(async move {
|
||||
while let Some(event) = rx_notify_ui.recv().await {
|
||||
match event {
|
||||
PeerEvent::ListGames(games) => {
|
||||
tracing::info!("Received list of {} games", games.len());
|
||||
}
|
||||
PeerEvent::GotGameFiles {
|
||||
id,
|
||||
file_descriptions,
|
||||
} => {
|
||||
tracing::info!(
|
||||
"Got game files for {}: {} files",
|
||||
id,
|
||||
file_descriptions.len()
|
||||
);
|
||||
}
|
||||
PeerEvent::DownloadGameFilesBegin { id } => {
|
||||
tracing::info!("Download started for game: {}", id);
|
||||
}
|
||||
PeerEvent::DownloadGameFilesFinished { id } => {
|
||||
tracing::info!("Download finished for game: {}", id);
|
||||
}
|
||||
PeerEvent::DownloadGameFilesFailed { id } => {
|
||||
tracing::error!("Download failed for game: {}", id);
|
||||
}
|
||||
PeerEvent::PeerConnected(addr) => {
|
||||
tracing::info!("Peer connected: {}", addr);
|
||||
}
|
||||
PeerEvent::PeerDisconnected(addr) => {
|
||||
tracing::info!("Peer disconnected: {}", addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// TODO: Add CLI interaction or other peer discovery logic here
|
||||
|
||||
// Wait for tasks
|
||||
let (peer_result, _) = tokio::join!(peer_task, event_handler);
|
||||
peer_result??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,189 @@
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use lanspread_db::db::{GameDB, GameFileDescription};
|
||||
use lanspread_proto::{Message as _, Request, Response};
|
||||
use lanspread_utils::maybe_addr;
|
||||
use s2n_quic::stream::SendStream;
|
||||
use tokio::{io::AsyncReadExt, sync::RwLock, time::Instant};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PeerRequestHandler {
|
||||
db: Arc<RwLock<GameDB>>,
|
||||
}
|
||||
|
||||
impl PeerRequestHandler {
|
||||
pub fn new(games: GameDB) -> PeerRequestHandler {
|
||||
PeerRequestHandler {
|
||||
db: Arc::new(RwLock::new(games)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_request(
|
||||
&self,
|
||||
request: Request,
|
||||
games_folder: &Path,
|
||||
tx: &mut SendStream,
|
||||
) -> eyre::Result<()> {
|
||||
let remote_addr = maybe_addr!(tx.connection().remote_addr());
|
||||
|
||||
// process request and generate response
|
||||
let response = self.process_request(request, games_folder).await;
|
||||
tracing::trace!("{remote_addr} peer response: {response:?}");
|
||||
|
||||
// write response back to client
|
||||
tx.send(response.encode()).await?;
|
||||
|
||||
// close the stream
|
||||
tx.close().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_ping() -> Response {
|
||||
Response::Pong
|
||||
}
|
||||
|
||||
async fn handle_list_games(&self) -> Response {
|
||||
let db = self.db.read().await;
|
||||
Response::ListGames(db.all_games().into_iter().cloned().collect())
|
||||
}
|
||||
|
||||
async fn handle_get_game(&self, id: String, games_folder: &Path) -> Response {
|
||||
if self.db.read().await.get_game_by_id(&id).is_none() {
|
||||
tracing::error!("Game not found in DB: {id}");
|
||||
return Response::GameNotFound(id);
|
||||
}
|
||||
|
||||
let game_dir = games_folder.join(&id);
|
||||
if !game_dir.exists() {
|
||||
tracing::error!("Game folder does not exist: {}", game_dir.display());
|
||||
return Response::GameNotFound(id);
|
||||
}
|
||||
|
||||
let mut game_files_descs: Vec<GameFileDescription> = vec![];
|
||||
|
||||
for entry in WalkDir::new(&game_dir)
|
||||
.into_iter()
|
||||
.filter_map(std::result::Result::ok)
|
||||
{
|
||||
match get_relative_path(games_folder, entry.path()) {
|
||||
Ok(relative_path) => match relative_path.to_str() {
|
||||
Some(relative_path) => {
|
||||
let game_file_description = GameFileDescription {
|
||||
game_id: id.clone(),
|
||||
relative_path: relative_path.to_string(),
|
||||
is_dir: entry.file_type().is_dir(),
|
||||
};
|
||||
|
||||
tracing::debug!("Found game file: {:?}", game_file_description);
|
||||
|
||||
game_files_descs.push(game_file_description);
|
||||
}
|
||||
None => {
|
||||
tracing::error!("Failed to get relative path: {relative_path:?}",);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get relative path: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Response::GetGame {
|
||||
id,
|
||||
file_descriptions: game_files_descs,
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_get_game_file_data() -> Response {
|
||||
Response::InvalidRequest(Bytes::new(), "Not implemented".to_string())
|
||||
}
|
||||
|
||||
fn handle_invalid(data: Bytes, err_msg: String) -> Response {
|
||||
Response::InvalidRequest(data, err_msg)
|
||||
}
|
||||
|
||||
pub async fn process_request(&self, request: Request, games_folder: &Path) -> Response {
|
||||
match request {
|
||||
Request::Ping => PeerRequestHandler::handle_ping(),
|
||||
Request::ListGames => self.handle_list_games().await,
|
||||
Request::GetGame { id } => self.handle_get_game(id, games_folder).await,
|
||||
Request::GetGameFileData(_) => PeerRequestHandler::handle_get_game_file_data(),
|
||||
Request::Invalid(data, err_msg) => PeerRequestHandler::handle_invalid(data, err_msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_game_file_data(
|
||||
game_file_desc: &GameFileDescription,
|
||||
tx: &mut SendStream,
|
||||
game_dir: &Path,
|
||||
) {
|
||||
let remote_addr = maybe_addr!(tx.connection().remote_addr());
|
||||
|
||||
tracing::debug!("{remote_addr} peer requested game file data: {game_file_desc:?}",);
|
||||
|
||||
// deliver file data to client
|
||||
let game_file = game_dir.join(&game_file_desc.relative_path);
|
||||
|
||||
let mut total_bytes = 0;
|
||||
let mut last_total_bytes = 0;
|
||||
let mut timestamp = Instant::now();
|
||||
|
||||
if let Ok(mut f) = tokio::fs::File::open(&game_file).await {
|
||||
let mut buf = BytesMut::with_capacity(64 * 1024);
|
||||
while let Ok(bytes_read) = f.read_buf(&mut buf).await {
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
total_bytes += bytes_read;
|
||||
|
||||
if last_total_bytes + 10_000_000 < total_bytes {
|
||||
let elapsed = timestamp.elapsed();
|
||||
let diff_bytes = total_bytes - last_total_bytes;
|
||||
|
||||
if elapsed.as_secs_f64() >= 1.0 {
|
||||
#[allow(clippy::cast_precision_loss)]
|
||||
let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1000.0 * 1000.0);
|
||||
|
||||
tracing::debug!(
|
||||
"{remote_addr} sending file data: {game_file:?}, MB/s: {mb_per_s:.2}",
|
||||
);
|
||||
last_total_bytes = total_bytes;
|
||||
timestamp = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = tx.send(buf.split_to(bytes_read).freeze()).await {
|
||||
tracing::error!("{remote_addr} failed to send file data: {e}",);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"{remote_addr} finished sending file data: {game_file:?}, total_bytes: {total_bytes}",
|
||||
);
|
||||
} else {
|
||||
tracing::error!("{remote_addr} failed to open file: {}", game_file.display());
|
||||
}
|
||||
|
||||
if let Err(e) = tx.close().await {
|
||||
tracing::error!("{remote_addr} failed to close stream: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
fn get_relative_path(base: &Path, deep_path: &Path) -> std::io::Result<PathBuf> {
|
||||
let base_canonical = base.canonicalize()?;
|
||||
let full_canonical = deep_path.canonicalize()?;
|
||||
|
||||
full_canonical
|
||||
.strip_prefix(&base_canonical)
|
||||
.map(std::path::Path::to_path_buf)
|
||||
.map_err(|_| std::io::Error::other("Path is not within base directory"))
|
||||
}
|
||||
Reference in New Issue
Block a user