[code] improve structure (focus: server)

This commit is contained in:
2025-03-02 13:06:18 +01:00
parent bcf9ad68ad
commit adf6f9d757
11 changed files with 393 additions and 368 deletions

View File

@@ -1,304 +1,54 @@
#![allow(clippy::doc_markdown)]
mod cli;
mod quic;
mod req;
use std::{
fs::File,
io::Read as _,
net::{IpAddr, SocketAddr},
path::{Path, PathBuf},
sync::Arc,
};
use std::{convert::Into, net::SocketAddr};
use assets::Thumbnails;
use bytes::Bytes;
use clap::Parser;
use lanspread_compat::eti::{self, EtiGame};
use lanspread_db::db::{Game, GameDB, GameFileDescription};
use clap::Parser as _;
use cli::Cli;
use lanspread_compat::eti;
use lanspread_db::db::{Game, GameDB};
use lanspread_mdns::{
DaemonEvent,
MdnsAdvertiser,
LANSPREAD_INSTANCE_NAME,
LANSPREAD_SERVICE_TYPE,
DaemonEvent, LANSPREAD_INSTANCE_NAME, LANSPREAD_SERVICE_TYPE, MdnsAdvertiser,
};
use lanspread_proto::{Message as _, Request, Response};
use lanspread_utils::maybe_addr;
use s2n_quic::Server as QuicServer;
use tokio::{io::AsyncWriteExt, sync::Mutex};
use quic::run_server;
use tracing_subscriber::EnvFilter;
use walkdir::WalkDir;
mod assets;
fn spawn_mdns_task(server_addr: SocketAddr) -> eyre::Result<()> {
let mdns = MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, LANSPREAD_INSTANCE_NAME, server_addr)?;
static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem"));
static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem"));
#[derive(Clone, Debug)]
struct ServerCtx {
handler: RequestHandler,
games_folder: PathBuf,
}
#[derive(Clone, Debug)]
struct ConnectionCtx {
server_ctx: Arc<ServerCtx>,
remote_addr: String,
}
async fn run(addr: SocketAddr, db: GameDB, games_folder: PathBuf) -> eyre::Result<()> {
let mut server = QuicServer::builder()
.with_tls((CERT_PEM, KEY_PEM))?
.with_io(addr)?
.start()?;
let server_ctx = Arc::new(ServerCtx {
handler: RequestHandler::new(db),
games_folder,
});
while let Some(mut connection) = server.accept().await {
let conn_ctx = Arc::new(ConnectionCtx {
server_ctx: server_ctx.clone(),
remote_addr: maybe_addr!(connection.remote_addr()),
});
// spawn a new task for the connection
tokio::spawn(async move {
tracing::info!("{} connected", conn_ctx.remote_addr);
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
let (mut rx, mut tx) = stream.split();
let conn_ctx = conn_ctx.clone();
// spawn a new task for the stream
tokio::spawn(async move {
tracing::trace!("{} stream opened", conn_ctx.remote_addr);
// handle streams
while let Ok(Some(data)) = rx.receive().await {
tracing::trace!(
"{} msg: (raw): {}",
conn_ctx.remote_addr,
String::from_utf8_lossy(&data)
);
let request = Request::decode(data);
tracing::debug!("{} msg: {:?}", conn_ctx.remote_addr, request);
if let Request::GetGameFileData(game_file_desc) = &request {
tracing::debug!(
"{} client requested game file data: {:?}",
conn_ctx.remote_addr,
game_file_desc
);
// deliver file data to client
let path = conn_ctx
.server_ctx
.games_folder
.join(&game_file_desc.relative_path);
if let Ok(mut file) = File::open(&path) {
let mut buf = vec![0; 64 * 1024];
while let Ok(n) = file.read(&mut buf) {
if n == 0 {
break;
}
if let Err(e) = tx.write_all(&buf[..n]).await {
tracing::error!(
"{} failed to send file data: {}",
conn_ctx.remote_addr,
e
);
}
}
// if let Err(e) = tokio::io::copy(&mut file, &mut tx).await {
// tracing::error!(
// "{} failed to send file data: {}",
// conn_ctx.remote_addr,
// e
// );
// }
} else {
tracing::error!(
"{} failed to open file: {}",
conn_ctx.remote_addr,
path.display()
);
}
if let Err(e) = tx.close().await {
tracing::error!(
"{} failed to close stream: {}",
conn_ctx.remote_addr,
e
);
}
continue;
}
let response = conn_ctx
.server_ctx
.handler
.handle_request(request, &conn_ctx)
.await;
tracing::trace!("{} server response: {:?}", conn_ctx.remote_addr, response);
let raw_response = response.encode();
tracing::trace!(
"{} server response (raw): {}",
conn_ctx.remote_addr,
String::from_utf8_lossy(&raw_response)
);
// write response back to client
if let Err(e) = tx.write_all(&raw_response).await {
tracing::error!(?e);
}
// close the stream
if let Err(e) = tx.close().await {
tracing::error!(?e);
}
}
});
tokio::spawn(async move {
while let Ok(event) = mdns.monitor.recv() {
tracing::info!("mDNS: {:?}", &event);
if let DaemonEvent::Error(e) = event {
tracing::error!("mDNS: {e}");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
});
}
}
});
Ok(())
}
fn get_relative_path(base: &Path, deep_path: &Path) -> std::io::Result<PathBuf> {
let base_canonical = base.canonicalize()?;
let full_canonical = deep_path.canonicalize()?;
async fn prepare_game_db(cli: &Cli) -> eyre::Result<GameDB> {
// build games from ETI database
let mut games: Vec<Game> = eti::get_games(&cli.db)
.await?
.into_iter()
.map(Into::into)
.collect();
full_canonical
.strip_prefix(&base_canonical)
.map(std::path::Path::to_path_buf)
.map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::Other,
"Path is not within base directory",
)
})
}
// filter out games that the server does not have in game_dir
games.retain(|game| cli.game_dir.join(&game.id).is_dir());
#[derive(Clone, Debug)]
struct RequestHandler {
db: Arc<Mutex<GameDB>>,
}
let mut game_db = GameDB::from(games);
impl RequestHandler {
fn new(games: GameDB) -> RequestHandler {
RequestHandler {
db: Arc::new(Mutex::new(games)),
}
}
game_db.add_thumbnails(&cli.thumbs_dir);
async fn handle_request(&self, request: Request, conn_ctx: &ConnectionCtx) -> Response {
match request {
Request::Ping => Response::Pong,
Request::ListGames => {
let db = self.db.lock().await;
Response::Games(db.all_games().into_iter().cloned().collect())
}
Request::GetGame { id } => {
if self.db.lock().await.get_game_by_id(&id).is_none() {
tracing::error!("Game not found in DB: {id}");
return Response::GameNotFound(id);
}
tracing::info!("Prepared game database with {} games", game_db.games.len());
let games_folder = &conn_ctx.server_ctx.games_folder;
let game_dir = games_folder.join(&id);
if !game_dir.exists() {
tracing::error!("Game folder does not exist: {}", game_dir.display());
return Response::GameNotFound(id);
}
let mut game_files_descs: Vec<GameFileDescription> = vec![];
for entry in WalkDir::new(&game_dir)
.into_iter()
.filter_map(std::result::Result::ok)
{
match get_relative_path(games_folder, entry.path()) {
Ok(relative_path) => match relative_path.to_str() {
Some(relative_path) => {
let game_file_description = GameFileDescription {
game_id: id.clone(),
relative_path: relative_path.to_string(),
is_dir: entry.file_type().is_dir(),
};
tracing::debug!("Found game file: {:?}", game_file_description);
game_files_descs.push(game_file_description);
}
None => {
tracing::error!("Failed to get relative path: {relative_path:?}",);
}
},
Err(e) => {
tracing::error!("Failed to get relative path: {e}");
}
}
}
Response::GetGame(game_files_descs)
}
Request::GetGameFileData(_) => {
Response::InvalidRequest(Bytes::new(), "Not implemented".to_string())
}
Request::Invalid(data, err_msg) => {
tracing::error!(
"got invalid request from client (error: {}): {}",
err_msg,
String::from_utf8_lossy(&data)
);
Response::InvalidRequest(data, err_msg)
}
}
}
}
#[derive(Debug, Parser)]
struct Cli {
/// IP address to bind to.
#[clap(long)]
ip: IpAddr,
/// Listen port.
#[clap(long)]
port: u16,
/// Game database path (SQLite).
#[clap(long)]
db: PathBuf,
/// Games folder.
#[clap(long)]
folder: PathBuf,
/// Thumbnails folder.
#[clap(long)]
thumbnails: PathBuf,
}
fn eti_game_to_game(eti_game: EtiGame) -> Game {
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
Game {
id: eti_game.game_id,
name: eti_game.game_title,
description: eti_game.game_readme_de,
release_year: eti_game.game_release,
publisher: eti_game.game_publisher,
max_players: eti_game.game_maxplayers,
version: eti_game.game_version,
genre: eti_game.genre_de,
size: (eti_game.game_size * 1024.0 * 1024.0 * 1024.0) as u64,
thumbnail: None,
installed: false,
}
Ok(game_db)
}
#[tokio::main]
@@ -309,57 +59,19 @@ async fn main() -> eyre::Result<()> {
let cli = Cli::parse();
#[allow(clippy::manual_assert)]
if !cli.folder.exists() {
panic!(
"Games folder does not exist: {}",
cli.folder.to_str().expect("Invalid path")
);
}
assert!(
cli.game_dir.exists(),
"Games folder does not exist: {}",
cli.game_dir.to_str().expect("Invalid path")
);
let eti_games = eti::get_games(&cli.db).await?;
let mut games: Vec<Game> = eti_games.into_iter().map(eti_game_to_game).collect();
let server_addr = SocketAddr::from((cli.ip, cli.port));
// Filter games based on existing folders
games.retain(|game| {
let game_folder = cli.folder.join(&game.id);
let exists = game_folder.exists() && game_folder.is_dir();
if !exists {
tracing::debug!("Skipping game {}: folder not found", game.id);
}
exists
});
// spawn mDNS listener task
spawn_mdns_task(server_addr)?;
let thumbnails = Thumbnails::new(cli.thumbnails);
let game_db = prepare_game_db(&cli).await?;
// add thumbnails to games
for game in &mut games {
if let Some(thumbnail) = thumbnails.get(&game.id) {
game.thumbnail = Some(thumbnail);
} else {
tracing::warn!("No thumbnail found: {}", game.id);
}
}
let game_db = GameDB::from(games);
let mdns = MdnsAdvertiser::new(
LANSPREAD_SERVICE_TYPE,
LANSPREAD_INSTANCE_NAME,
(cli.ip, cli.port).into(),
)?;
tokio::spawn(async move {
while let Ok(event) = mdns.monitor.recv() {
tracing::info!("mDNS: {:?}", &event);
if let DaemonEvent::Error(e) = event {
tracing::error!("mDNS: {e}");
break;
}
}
});
tracing::info!("Server listening on {}:{}", cli.ip, cli.port);
run(SocketAddr::from((cli.ip, cli.port)), game_db, cli.folder).await
tracing::info!("Server listening on {server_addr}");
run_server(server_addr, game_db, cli.game_dir).await
}