[feat][code] proto crate, one stream per request

This commit is contained in:
2024-11-08 22:22:50 +01:00
parent 04a39790b8
commit 9d8f579a0f
18 changed files with 862 additions and 306 deletions

View File

@ -3,9 +3,20 @@ name = "lanspread-server"
version = "0.1.0"
edition = "2021"
[lints.rust]
unsafe_code = "forbid"
[lints.clippy]
pedantic = { level = "warn", priority = -1 }
todo = "warn"
unwrap_used = "warn"
[dependencies]
# local
lanspread-db = { path = "../lanspread-db" }
lanspread-proto = { path = "../lanspread-proto" }
lanspread-utils = { path = "../lanspread-utils" }
# external
bytes = { workspace = true }
eyre = { workspace = true }
@ -14,3 +25,5 @@ s2n-quic = { workspace = true }
serde_json = { workspace = true }
semver = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

View File

@ -1,10 +1,18 @@
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use std::{
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
};
use bytes::Bytes;
use itertools::Itertools as _;
use lanspread_db::GameDB;
use lanspread_db::db::GameDB;
use lanspread_proto::{Message as _, Request, Response};
use lanspread_utils::maybe_addr;
use s2n_quic::Server as QuicServer;
use tokio::sync::Mutex;
use testing::generate_test_db;
use tokio::{io::AsyncWriteExt, sync::Mutex};
use tracing_subscriber::EnvFilter;
mod testing;
static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem"));
static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem"));
@ -13,44 +21,96 @@ const SERVER_ADDR: &str = "0.0.0.0";
const SERVER_PORT: u16 = 13337;
pub(crate) struct Server {
pub(crate) db: Arc<Mutex<GameDB>>,
db_path: PathBuf,
}
#[derive(Clone, Debug)]
struct ServerCtx {
handler: RequestHandler,
}
#[derive(Clone, Debug)]
struct ConnectionCtx {
server_ctx: Arc<ServerCtx>,
remote_addr: String,
}
impl Server {
pub(crate) fn new<S: Into<PathBuf>>(db_path: S) -> Self {
let db_path = db_path.into();
let db = Arc::new(Mutex::new(GameDB::load_from_file(&db_path).unwrap()));
Server { db, db_path }
fn new<S: Into<PathBuf>>(db_path: S) -> Self {
Server {
db_path: db_path.into(),
}
}
pub(crate) async fn run(&mut self, addr: SocketAddr) -> eyre::Result<()> {
async fn run(&mut self, addr: SocketAddr) -> eyre::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let mut server = QuicServer::builder()
.with_tls((CERT_PEM, KEY_PEM))?
.with_io(addr)?
.start()?;
let db = self.db.clone();
let server_ctx = Arc::new(ServerCtx {
handler: RequestHandler::new(&self.db_path)?,
});
while let Some(mut connection) = server.accept().await {
let conn_ctx = Arc::new(ConnectionCtx {
server_ctx: server_ctx.clone(),
remote_addr: maybe_addr!(connection.remote_addr()),
});
// spawn a new task for the connection
let db = db.clone();
tokio::spawn(async move {
eprintln!("Connection accepted from {:?}", connection.remote_addr());
tracing::info!("{} connected", conn_ctx.remote_addr);
while let Ok(Some(mut stream)) = connection.accept_bidirectional_stream().await {
while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await {
tracing::debug!("{} stream opened: {:?}", conn_ctx.remote_addr, stream);
let (mut rx, mut tx) = stream.split();
let conn_ctx = conn_ctx.clone();
// spawn a new task for the stream
let db = db.clone();
tokio::spawn(async move {
eprintln!("Stream opened from {:?}", stream.connection().remote_addr());
tracing::debug!("{} stream opened", conn_ctx.remote_addr);
// echo any data back to the stream
while let Ok(Some(data)) = stream.receive().await {
eprintln!("got data from client: {data:?}");
if data.as_ref() == b"get_games" {
let games_vec: Vec<_> =
db.lock().await.games.values().cloned().collect();
let json = serde_json::to_string(&games_vec).unwrap();
stream.send(Bytes::from(json)).await.unwrap();
// handle streams
while let Ok(Some(data)) = rx.receive().await {
tracing::trace!(
"{} client request (raw): {}",
conn_ctx.remote_addr,
String::from_utf8_lossy(&data)
);
let request = Request::decode(&data);
tracing::debug!(
"{} client request (decoded): {:?}",
conn_ctx.remote_addr,
request
);
let response =
conn_ctx.server_ctx.handler.handle_request(request).await;
tracing::trace!(
"{} server response: {:?}",
conn_ctx.remote_addr,
response
);
let raw_response = response.encode();
tracing::trace!(
"{} server response (raw): {}",
conn_ctx.remote_addr,
String::from_utf8_lossy(&raw_response)
);
// write response back to client
if let Err(e) = tx.write_all(&raw_response).await {
tracing::error!(?e);
}
// close the stream
if let Err(e) = tx.close().await {
tracing::error!(?e);
}
}
});
@ -62,55 +122,54 @@ impl Server {
}
}
fn generate_test_db<P: Into<PathBuf>>(db_path: P) {
let db_path = db_path.into();
#[derive(Clone, Debug)]
struct RequestHandler {
db: Arc<Mutex<GameDB>>,
}
let mut db = GameDB::new();
db.add_game(
"Call of Duty 3",
"A shooter game in war.",
"call_of_duty.tar.zst",
64,
semver::Version::new(1, 0, 0),
);
db.add_game(
"Counter-Strike Source",
"Valve's iconic shooter.",
"cstrike.tar.zst",
32,
semver::Version::new(1, 0, 0),
);
db.add_game(
"Factorio",
"Best game of all time, seriously.",
"factorio.tar.zst",
128,
semver::Version::new(1, 0, 0),
);
impl RequestHandler {
fn new(db_path: &Path) -> eyre::Result<Self> {
let db = GameDB::load_from_file(db_path)?;
Ok(RequestHandler {
db: Arc::new(Mutex::new(db)),
})
}
db.update_game(1, Some("Call of Duty 4"), None, None);
db.save_to_file(&db_path).unwrap();
async fn handle_request(&self, request: Request) -> Response {
match request {
Request::ListGames => {
let db = self.db.lock().await;
Response::Games(db.all_games().into_iter().cloned().collect())
}
Request::GetGame { id } => {
let db = self.db.lock().await;
match db.get_game_by_id(id) {
Some(game) => Response::Game(game.clone()),
None => Response::GameNotFound(id),
}
}
Request::Invalid(data, err_msg) => {
tracing::error!(
"got invalid request from client (error: {}): {}",
err_msg,
String::from_utf8_lossy(&data)
);
Response::InvalidRequest(data, err_msg)
}
}
}
}
const GAME_DB_PATH: &str = "/home/pfs/shm/game.db";
#[tokio::main]
async fn main() {
async fn main() -> eyre::Result<()> {
generate_test_db(GAME_DB_PATH);
let mut server = Server::new(GAME_DB_PATH);
server
.db
.lock()
.await
.list_games()
.iter()
.sorted()
.for_each(|game| println!("{game:?}"));
#[allow(clippy::unwrap_used)]
server
.run(format!("{SERVER_ADDR}:{SERVER_PORT}").parse().unwrap())
.await
.unwrap();
}

View File

@ -0,0 +1,35 @@
#![allow(clippy::unwrap_used)]
use std::path::PathBuf;
use lanspread_db::db::GameDB;
pub(crate) fn generate_test_db<P: Into<PathBuf>>(db_path: P) {
let db_path = db_path.into();
let mut db = GameDB::new();
db.add_game(
"Call of Duty 3",
"A shooter game in war.",
"call_of_duty.tar.zst",
64,
semver::Version::new(1, 0, 0),
);
db.add_game(
"Counter-Strike Source",
"Valve's iconic shooter.",
"cstrike.tar.zst",
32,
semver::Version::new(1, 0, 0),
);
db.add_game(
"Factorio",
"Best game of all time, seriously.",
"factorio.tar.zst",
128,
semver::Version::new(1, 0, 0),
);
db.update_game(1, Some("Call of Duty 4"), None, None);
db.save_to_file(&db_path).unwrap();
}