From 9d8f579a0f7a5afdbf4e45cb0ad78bcad0bf35a4 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Fri, 8 Nov 2024 22:22:50 +0100 Subject: [PATCH] [feat][code] proto crate, one stream per request --- Cargo.lock | 178 +++++++++++++++++++- Cargo.toml | 25 ++- client.sh | 6 +- crates/lanspread-client/Cargo.toml | 12 ++ crates/lanspread-client/src/main.rs | 155 +++++++++++++---- crates/lanspread-db/Cargo.toml | 8 + crates/lanspread-db/src/db.rs | 193 +++++++++++++++++++++ crates/lanspread-db/src/lib.rs | 206 +---------------------- crates/lanspread-db/src/serialization.rs | 19 +++ crates/lanspread-proto/Cargo.toml | 22 +++ crates/lanspread-proto/src/lib.rs | 87 ++++++++++ crates/lanspread-server/Cargo.toml | 13 ++ crates/lanspread-server/src/main.rs | 183 +++++++++++++------- crates/lanspread-server/src/testing.rs | 35 ++++ crates/lanspread-utils/Cargo.toml | 14 ++ crates/lanspread-utils/src/lib.rs | 1 + crates/lanspread-utils/src/macros.rs | 6 + server.sh | 5 +- 18 files changed, 862 insertions(+), 306 deletions(-) create mode 100644 crates/lanspread-db/src/db.rs create mode 100644 crates/lanspread-db/src/serialization.rs create mode 100644 crates/lanspread-proto/Cargo.toml create mode 100644 crates/lanspread-proto/src/lib.rs create mode 100644 crates/lanspread-server/src/testing.rs create mode 100644 crates/lanspread-utils/Cargo.toml create mode 100644 crates/lanspread-utils/src/lib.rs create mode 100644 crates/lanspread-utils/src/macros.rs diff --git a/Cargo.lock b/Cargo.lock index 9012081..6f88dd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,9 +456,13 @@ version = "0.1.0" dependencies = [ "eyre", "lanspread-db", + "lanspread-proto", + "lanspread-utils", "s2n-quic", "serde_json", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -471,6 +475,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "lanspread-proto" +version = "0.1.0" +dependencies = [ + "bytes", + "lanspread-db", + "serde", + "serde_json", + "tracing", +] + [[package]] name = "lanspread-server" version = "0.1.0" @@ -479,12 +494,20 @@ dependencies = [ "eyre", "itertools 0.13.0", "lanspread-db", + "lanspread-proto", + "lanspread-utils", "s2n-quic", "semver", "serde_json", "tokio", + "tracing", + "tracing-subscriber", ] +[[package]] +name = "lanspread-utils" +version = "0.1.0" + [[package]] name = "lazy_static" version = "1.5.0" @@ -541,6 +564,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "memchr" version = "2.7.4" @@ -599,6 +631,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-integer" version = "0.1.46" @@ -643,6 +685,12 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.3" @@ -809,8 +857,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.8", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -821,9 +878,15 @@ checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -971,6 +1034,7 @@ dependencies = [ "pin-project-lite", "s2n-codec", "subtle", + "tracing", "zerocopy", ] @@ -1128,6 +1192,15 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1197,6 +1270,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tokio" version = "1.41.1" @@ -1226,6 +1309,67 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "unicode-ident" version = "1.0.13" @@ -1244,6 +1388,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -1268,6 +1418,28 @@ dependencies = [ "rustix", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index e58c805..a4f9a64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,9 @@ members = [ "crates/lanspread-db", "crates/lanspread-server", - "crates/lanspread-client" + "crates/lanspread-client", + "crates/lanspread-utils", + "crates/lanspread-proto", ] resolver = "2" @@ -11,8 +13,27 @@ bytes = "1.8" clap = "4.5" eyre = "0.6" itertools = "0.13" -s2n-quic = "1.49" +s2n-quic = { version = "1.49", features = ["provider-event-tracing"] } semver = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1.41", features = ["full"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[profile.release] +debug = true +debug-assertions = true +overflow-checks = true +strip = false +lto = false +panic = "unwind" +codegen-units = 1 + +[profile.release-lto] +inherits = "release" +lto = true +debug = false +debug-assertions = false +overflow-checks = false +strip = true diff --git a/client.sh b/client.sh index 523ea17..c7d95e6 100755 --- a/client.sh +++ b/client.sh @@ -1,3 +1,7 @@ #!/usr/bin/env bash -cargo run --release -p lanspread-client +export RUST_LOG=info,lanspread_client=debug,lanspread_proto=debug +#export RUST_LOG=error + +exec cargo run -p lanspread-client < <(while sleep 0.1; do echo "list"; sleep 0.1; echo "get 1"; sleep 0.1; echo "get 25"; done) +#RUST_LOG=info exec cargo run --profile release-lto -p lanspread-client < <(while sleep 0.1; do echo "list"; sleep 0.1; echo "get 1"; sleep 0.1; echo "get 25"; done) diff --git a/crates/lanspread-client/Cargo.toml b/crates/lanspread-client/Cargo.toml index c28a582..2388eec 100644 --- a/crates/lanspread-client/Cargo.toml +++ b/crates/lanspread-client/Cargo.toml @@ -3,11 +3,23 @@ name = "lanspread-client" version = "0.1.0" edition = "2021" +[lints.rust] +unsafe_code = "forbid" + +[lints.clippy] +pedantic = { level = "warn", priority = -1 } +todo = "warn" +unwrap_used = "warn" + [dependencies] # local lanspread-db = { path = "../lanspread-db" } +lanspread-proto = { path = "../lanspread-proto" } +lanspread-utils = { path = "../lanspread-utils" } # external eyre = { workspace = true } s2n-quic = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/crates/lanspread-client/src/main.rs b/crates/lanspread-client/src/main.rs index 240a131..8362b45 100644 --- a/crates/lanspread-client/src/main.rs +++ b/crates/lanspread-client/src/main.rs @@ -1,65 +1,154 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, time::Duration}; -use lanspread_db::{Game, GameDB}; -use s2n_quic::{client::Connect, Client as QuicClient}; -use tokio::{io::AsyncWriteExt as _, sync::Mutex}; +use lanspread_proto::{Message as _, Request, Response}; +use lanspread_utils::maybe_addr; +use s2n_quic::{client::Connect, provider::limits::Limits, Client as QuicClient}; +use tokio::{ + io::{AsyncBufReadExt as _, AsyncWriteExt as _}, + sync::mpsc::UnboundedReceiver, +}; +use tracing_subscriber::EnvFilter; static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); const SERVER_ADDR: &str = "127.0.0.1"; const SERVER_PORT: u16 = 13337; -struct Client { - db: Arc>, +#[derive(Debug)] +enum ControlMessage { + ListGames, + GetGame(u64), } -impl Client { - pub(crate) fn new() -> Self { - Client { - db: Arc::new(Mutex::new(GameDB::new())), - } - } +struct Client; + +impl Client { + pub(crate) async fn run( + addr: SocketAddr, + mut rx_control: UnboundedReceiver, + ) -> eyre::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?; - pub(crate) async fn run(&mut self, addr: SocketAddr) -> eyre::Result<()> { let client = QuicClient::builder() .with_tls(CERT_PEM)? .with_io("0.0.0.0:0")? + .with_limits(limits)? .start()?; - let connect1 = Connect::new(addr).with_server_name("localhost"); - let mut connection1 = client.connect(connect1).await?; - connection1.keep_alive(true)?; + let conn = Connect::new(addr).with_server_name("localhost"); + let mut conn = client.connect(conn).await?; + conn.keep_alive(true)?; - let stream = connection1.open_bidirectional_stream().await?; - let (mut rx, mut tx) = stream.split(); + tracing::info!( + "connected: (server: {}) (client: {})", + maybe_addr!(conn.remote_addr()), + maybe_addr!(conn.local_addr()) + ); - let buf = b"get_games"; - tx.write_all(&buf[..]).await?; + // tx + while let Some(cmd) = rx_control.recv().await { + let request = match cmd { + ControlMessage::ListGames => Request::ListGames, + ControlMessage::GetGame(id) => Request::GetGame { id }, + }; - while let Ok(Some(data)) = rx.receive().await { - let games: Vec = serde_json::from_slice(&data)?; - self.db = Arc::new(Mutex::new(GameDB::from(games))); - tx.close().await.unwrap(); - let db = self.db.lock().await; + let data = request.encode(); + tracing::trace!("encoded data: {}", String::from_utf8_lossy(&data)); - eprintln!("received GameDB:"); - for game in db.games.values() { - eprintln!("{:#?}", game); + let stream = conn.open_bidirectional_stream().await?; + let (mut rx, mut tx) = stream.split(); + + if let Err(e) = tx.write_all(&data).await { + tracing::error!(?e, "failed to send request to server"); + } + + if let Ok(Some(data)) = rx.receive().await { + tracing::trace!("server response (raw): {}", String::from_utf8_lossy(&data)); + + let response = Response::decode(&data); + tracing::trace!( + "server response (decoded): {}", + String::from_utf8_lossy(&data) + ); + match response { + Response::Games(games) => { + for game in games { + tracing::debug!(?game); + } + } + Response::Game(game) => tracing::debug!(?game, "game received"), + Response::GameNotFound(id) => tracing::debug!(?id, "game not found"), + Response::InvalidRequest(request_bytes, err) => tracing::error!( + "server says our request was invalid (error: {}): {}", + err, + String::from_utf8_lossy(&request_bytes) + ), + Response::EncodingError(err) => { + tracing::error!("server encoding error: {err}"); + } + Response::DecodingError(data, err) => { + tracing::error!( + "response decoding error: {} (data: {})", + err, + String::from_utf8_lossy(&data) + ); + } + } + + if let Err(err) = tx.close().await { + tracing::error!("failed to close stream: {err}"); + } } } - eprintln!("server closed"); + tracing::info!("server closed connection"); Ok(()) } } #[tokio::main] async fn main() -> eyre::Result<()> { - let mut client = Client::new(); + let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel::(); - client - .run(format!("{SERVER_ADDR}:{SERVER_PORT}").parse().unwrap()) - .await?; + // Spawn client in a separate task + let client_handle = tokio::spawn(async move { + #[allow(clippy::unwrap_used)] + let addr = format!("{SERVER_ADDR}:{SERVER_PORT}").parse().unwrap(); + Client::run(addr, rx_control).await + }); + // Handle stdin commands in the main task + let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()); + let mut line = String::new(); + + loop { + line.clear(); + if stdin.read_line(&mut line).await? == 0 { + break; // EOF reached + } + + // Trim whitespace and handle commands + match line.trim() { + "list" => { + tx_control.send(ControlMessage::ListGames)?; + } + cmd if cmd.starts_with("get ") => { + if let Ok(id) = cmd[4..].trim().parse::() { + tx_control.send(ControlMessage::GetGame(id))?; + } else { + println!("Invalid game ID"); + } + } + "quit" | "exit" => break, + "" => continue, + _ => println!("Unknown command. Available commands: list, get , quit"), + } + } + + client_handle.await??; Ok(()) } diff --git a/crates/lanspread-db/Cargo.toml b/crates/lanspread-db/Cargo.toml index a86fb90..1aff67f 100644 --- a/crates/lanspread-db/Cargo.toml +++ b/crates/lanspread-db/Cargo.toml @@ -3,6 +3,14 @@ name = "lanspread-db" version = "0.1.0" edition = "2021" +[lints.rust] +unsafe_code = "forbid" + +[lints.clippy] +pedantic = { level = "warn", priority = -1 } +todo = "warn" +unwrap_used = "warn" + [dependencies] # external eyre = { workspace = true} diff --git a/crates/lanspread-db/src/db.rs b/crates/lanspread-db/src/db.rs new file mode 100644 index 0000000..078abc7 --- /dev/null +++ b/crates/lanspread-db/src/db.rs @@ -0,0 +1,193 @@ +#![allow(clippy::missing_errors_doc)] + +use std::{ + collections::HashMap, + fmt, + fs::{File, OpenOptions}, + path::Path, +}; + +use serde::{Deserialize, Serialize}; + +use crate::serialization::version_serde; + +/// A game +#[derive(Clone, Serialize, Deserialize)] +pub struct Game { + /// example: 1 + pub id: u64, + /// example: Call of Duty 3 + pub name: String, + /// example: A shooter game in war. + pub description: String, + /// example: `call_of_duty.tar.zst` + pub install_archive: String, + /// example: 8 + pub max_players: u32, + /// example: 1.0.0 + #[serde(with = "version_serde")] + pub version: semver::Version, + + /// size (bytes) (not serialized) + #[serde(skip)] + pub size: u64, +} + +impl fmt::Debug for Game { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}: {} {} ({} players) ({}: {} MB) {}", + self.id, + self.name, + self.version, + self.max_players, + self.install_archive, + self.size, + self.description, + ) + } +} + +impl fmt::Display for Game { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name) + } +} + +impl PartialEq for Game { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + } +} + +impl Eq for Game {} + +impl PartialOrd for Game { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.name.cmp(&other.name)) + } +} + +impl Ord for Game { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.name.cmp(&other.name) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GameDB { + pub games: HashMap, + next_id: u64, +} + +impl GameDB { + #[must_use] + pub fn new() -> Self { + GameDB { + games: HashMap::new(), + next_id: 1, + } + } + + #[must_use] + pub fn from(games: Vec) -> Self { + let mut db = GameDB::new(); + for game in games { + let id = game.id; + db.games.insert(game.id, game); + db.next_id = db.next_id.max(id + 1); + } + db + } + + pub fn add_game>( + &mut self, + name: S, + description: S, + install_archive: S, + max_players: u32, + version: semver::Version, + ) -> u64 { + let id = self.next_id; + self.next_id += 1; + let game = Game { + id, + name: name.into(), + description: description.into(), + install_archive: install_archive.into(), + max_players, + version, + size: 0, + }; + self.games.insert(id, game); + id + } + + #[must_use] + pub fn get_game_by_id(&self, id: u64) -> Option<&Game> { + self.games.get(&id) + } + + #[must_use] + pub fn get_game_by_name(&self, name: &str) -> Option<&Game> { + self.games.values().find(|game| game.name == name) + } + + pub fn update_game>( + &mut self, + id: u64, + name: Option, + description: Option, + install_archive: Option, + ) -> bool { + if let Some(game) = self.games.get_mut(&id) { + if let Some(new_name) = name { + game.name = new_name.into(); + } + if let Some(new_description) = description { + game.description = new_description.into(); + } + if let Some(archive) = install_archive { + game.install_archive = archive.into(); + } + true + } else { + false + } + } + + pub fn delete_game(&mut self, id: u64) -> bool { + self.games.remove(&id).is_some() + } + + #[must_use] + pub fn all_games(&self) -> Vec<&Game> { + self.games.values().collect() + } + + pub fn save_to_file(&self, path: &Path) -> eyre::Result<()> { + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path)?; + + let games: Vec<&Game> = self.games.values().collect(); + serde_json::to_writer(file, &games)?; + Ok(()) + } + + pub fn load_from_file(path: &Path) -> eyre::Result { + let file = File::open(path)?; + let games: Vec = serde_json::from_reader(file)?; + let db = GameDB::from(games); + Ok(db) + } +} + +impl Default for GameDB { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/lanspread-db/src/lib.rs b/crates/lanspread-db/src/lib.rs index 0e30845..dd88d14 100644 --- a/crates/lanspread-db/src/lib.rs +++ b/crates/lanspread-db/src/lib.rs @@ -1,204 +1,2 @@ -use std::{ - collections::HashMap, - fmt, - fs::{File, OpenOptions}, - path::Path, -}; - -use serde::{Deserialize, Serialize}; - -mod version_serde { - use semver::Version; - use serde::{self, Deserialize, Deserializer, Serializer}; - - pub fn serialize(version: &Version, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&version.to_string()) - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - Version::parse(&s).map_err(serde::de::Error::custom) - } -} - -/// A game -#[derive(Clone, Serialize, Deserialize)] -pub struct Game { - /// example: 1 - pub id: u64, - /// example: Call of Duty 3 - pub name: String, - /// example: A shooter game in war. - pub description: String, - /// example: call_of_duty.tar.zst - pub install_archive: String, - /// example: 8 - pub max_players: u32, - /// example: 1.0.0 - #[serde(with = "version_serde")] - pub version: semver::Version, - - /// size (bytes) (not serialized) - #[serde(skip)] - pub size: u64, -} - -impl fmt::Debug for Game { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{}: {} {} ({} players) ({}: {} MB)\n {}", - self.id, - self.name, - self.version, - self.max_players, - self.install_archive, - self.size, - self.description, - ) - } -} - -impl fmt::Display for Game { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.name) - } -} - -impl PartialEq for Game { - fn eq(&self, other: &Self) -> bool { - self.name == other.name - } -} - -impl Eq for Game {} - -impl PartialOrd for Game { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.name.cmp(&other.name)) - } -} - -impl Ord for Game { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.name.cmp(&other.name) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct GameDB { - pub games: HashMap, - next_id: u64, -} - -impl GameDB { - pub fn new() -> Self { - GameDB { - games: HashMap::new(), - next_id: 1, - } - } - - pub fn from(games: Vec) -> Self { - let mut db = GameDB::new(); - for game in games { - let id = game.id; - db.games.insert(game.id, game); - db.next_id = db.next_id.max(id + 1); - } - db - } - - pub fn add_game>( - &mut self, - name: S, - description: S, - install_archive: S, - max_players: u32, - version: semver::Version, - ) -> u64 { - let id = self.next_id; - self.next_id += 1; - let game = Game { - id, - name: name.into(), - description: description.into(), - install_archive: install_archive.into(), - max_players, - version, - size: 0, - }; - self.games.insert(id, game); - id - } - - pub fn get_game(&self, id: u64) -> Option<&Game> { - self.games.get(&id) - } - - pub fn update_game>( - &mut self, - id: u64, - name: Option, - description: Option, - install_archive: Option, - ) -> bool { - if let Some(game) = self.games.get_mut(&id) { - if let Some(new_name) = name { - game.name = new_name.into(); - } - if let Some(new_description) = description { - game.description = new_description.into(); - } - if let Some(archive) = install_archive { - game.install_archive = archive.into(); - } - true - } else { - false - } - } - - pub fn delete_game(&mut self, id: u64) -> bool { - self.games.remove(&id).is_some() - } - - pub fn list_games(&self) -> Vec<&Game> { - self.games.values().collect() - } - - pub fn find_game(&self, name: &str) -> Option<&Game> { - self.games.values().find(|game| game.name == name) - } - - pub fn save_to_file(&self, path: &Path) -> eyre::Result<()> { - let file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path)?; - - let games: Vec<&Game> = self.games.values().collect(); - serde_json::to_writer(file, &games)?; - Ok(()) - } - - pub fn load_from_file(path: &Path) -> eyre::Result { - let rdr = File::open(path)?; - let games: Vec = serde_json::from_reader(rdr)?; - let db = GameDB::from(games); - Ok(db) - } -} - -impl Default for GameDB { - fn default() -> Self { - Self::new() - } -} +pub mod db; +mod serialization; diff --git a/crates/lanspread-db/src/serialization.rs b/crates/lanspread-db/src/serialization.rs new file mode 100644 index 0000000..ab98885 --- /dev/null +++ b/crates/lanspread-db/src/serialization.rs @@ -0,0 +1,19 @@ +pub(crate) mod version_serde { + use semver::Version; + use serde::{self, Deserialize, Deserializer, Serializer}; + + pub fn serialize(version: &Version, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&version.to_string()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Version::parse(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/lanspread-proto/Cargo.toml b/crates/lanspread-proto/Cargo.toml new file mode 100644 index 0000000..54add86 --- /dev/null +++ b/crates/lanspread-proto/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "lanspread-proto" +version = "0.1.0" +edition = "2021" + +[lints.rust] +unsafe_code = "forbid" + +[lints.clippy] +pedantic = { level = "warn", priority = -1 } +todo = "warn" +unwrap_used = "warn" + +[dependencies] +# local +lanspread-db = { path = "../lanspread-db" } + +# external +bytes = "1.8" +serde = { workspace = true } +serde_json = { workspace = true } +tracing = { workspace = true } diff --git a/crates/lanspread-proto/src/lib.rs b/crates/lanspread-proto/src/lib.rs new file mode 100644 index 0000000..7ec3917 --- /dev/null +++ b/crates/lanspread-proto/src/lib.rs @@ -0,0 +1,87 @@ +use bytes::Bytes; +use lanspread_db::db::Game; +use serde::{Deserialize, Serialize}; +use tracing::error; + +#[derive(Debug, Serialize, Deserialize)] +pub enum Request { + ListGames, + GetGame { id: u64 }, + Invalid(Vec, String), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Response { + Games(Vec), + Game(Game), + GameNotFound(u64), + InvalidRequest(Vec, String), + EncodingError(String), + DecodingError(Vec, String), +} + +// Add Message trait +pub trait Message { + fn decode(bytes: &[u8]) -> Self; + fn encode(&self) -> Bytes; +} + +// Implement for Request +impl Message for Request { + fn decode(bytes: &[u8]) -> Self { + match serde_json::from_slice(bytes) { + Ok(t) => t, + Err(e) => { + tracing::error!( + "got invalid request from client (error: {}): {}", + e, + String::from_utf8_lossy(bytes) + ); + Request::Invalid(bytes.into(), e.to_string()) + } + } + } + + fn encode(&self) -> Bytes { + match serde_json::to_vec(self) { + Ok(s) => Bytes::from(s), + Err(e) => { + error!(?e, "Request encoding error"); + Bytes::from(format!(r#"{{"error": "encoding error: {e}"}}"#)) + } + } + } +} + +// Implement for Response +impl Message for Response { + fn decode(bytes: &[u8]) -> Self { + match serde_json::from_slice(bytes) { + Ok(t) => t, + Err(e) => Response::DecodingError(bytes.into(), e.to_string()), + } + } + + fn encode(&self) -> Bytes { + match serde_json::to_vec(self) { + Ok(s) => Bytes::from(s), + Err(e) => { + error!(?e, "Response encoding error"); + Bytes::from(format!(r#"{{"error": "encoding error: {e}"}}"#)) + } + } + } +} + +// Helper methods for Response +impl Response { + #[must_use] + pub fn games(games: Vec) -> Self { + Response::Games(games) + } + + #[must_use] + pub fn game(game: Game) -> Self { + Response::Game(game) + } +} diff --git a/crates/lanspread-server/Cargo.toml b/crates/lanspread-server/Cargo.toml index 6bc0836..56d801d 100644 --- a/crates/lanspread-server/Cargo.toml +++ b/crates/lanspread-server/Cargo.toml @@ -3,9 +3,20 @@ name = "lanspread-server" version = "0.1.0" edition = "2021" +[lints.rust] +unsafe_code = "forbid" + +[lints.clippy] +pedantic = { level = "warn", priority = -1 } +todo = "warn" +unwrap_used = "warn" + [dependencies] # local lanspread-db = { path = "../lanspread-db" } +lanspread-proto = { path = "../lanspread-proto" } +lanspread-utils = { path = "../lanspread-utils" } + # external bytes = { workspace = true } eyre = { workspace = true } @@ -14,3 +25,5 @@ s2n-quic = { workspace = true } serde_json = { workspace = true } semver = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/crates/lanspread-server/src/main.rs b/crates/lanspread-server/src/main.rs index 57fa73e..4361eca 100644 --- a/crates/lanspread-server/src/main.rs +++ b/crates/lanspread-server/src/main.rs @@ -1,10 +1,18 @@ -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{ + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, +}; -use bytes::Bytes; -use itertools::Itertools as _; -use lanspread_db::GameDB; +use lanspread_db::db::GameDB; +use lanspread_proto::{Message as _, Request, Response}; +use lanspread_utils::maybe_addr; use s2n_quic::Server as QuicServer; -use tokio::sync::Mutex; +use testing::generate_test_db; +use tokio::{io::AsyncWriteExt, sync::Mutex}; +use tracing_subscriber::EnvFilter; + +mod testing; static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem")); static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); @@ -13,44 +21,96 @@ const SERVER_ADDR: &str = "0.0.0.0"; const SERVER_PORT: u16 = 13337; pub(crate) struct Server { - pub(crate) db: Arc>, db_path: PathBuf, } +#[derive(Clone, Debug)] +struct ServerCtx { + handler: RequestHandler, +} + +#[derive(Clone, Debug)] +struct ConnectionCtx { + server_ctx: Arc, + remote_addr: String, +} + impl Server { - pub(crate) fn new>(db_path: S) -> Self { - let db_path = db_path.into(); - let db = Arc::new(Mutex::new(GameDB::load_from_file(&db_path).unwrap())); - Server { db, db_path } + fn new>(db_path: S) -> Self { + Server { + db_path: db_path.into(), + } } - pub(crate) async fn run(&mut self, addr: SocketAddr) -> eyre::Result<()> { + async fn run(&mut self, addr: SocketAddr) -> eyre::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + let mut server = QuicServer::builder() .with_tls((CERT_PEM, KEY_PEM))? .with_io(addr)? .start()?; - let db = self.db.clone(); + let server_ctx = Arc::new(ServerCtx { + handler: RequestHandler::new(&self.db_path)?, + }); + while let Some(mut connection) = server.accept().await { + let conn_ctx = Arc::new(ConnectionCtx { + server_ctx: server_ctx.clone(), + remote_addr: maybe_addr!(connection.remote_addr()), + }); // spawn a new task for the connection - let db = db.clone(); tokio::spawn(async move { - eprintln!("Connection accepted from {:?}", connection.remote_addr()); + tracing::info!("{} connected", conn_ctx.remote_addr); - while let Ok(Some(mut stream)) = connection.accept_bidirectional_stream().await { + while let Ok(Some(stream)) = connection.accept_bidirectional_stream().await { + tracing::debug!("{} stream opened: {:?}", conn_ctx.remote_addr, stream); + + let (mut rx, mut tx) = stream.split(); + + let conn_ctx = conn_ctx.clone(); // spawn a new task for the stream - let db = db.clone(); tokio::spawn(async move { - eprintln!("Stream opened from {:?}", stream.connection().remote_addr()); + tracing::debug!("{} stream opened", conn_ctx.remote_addr); - // echo any data back to the stream - while let Ok(Some(data)) = stream.receive().await { - eprintln!("got data from client: {data:?}"); - if data.as_ref() == b"get_games" { - let games_vec: Vec<_> = - db.lock().await.games.values().cloned().collect(); - let json = serde_json::to_string(&games_vec).unwrap(); - stream.send(Bytes::from(json)).await.unwrap(); + // handle streams + while let Ok(Some(data)) = rx.receive().await { + tracing::trace!( + "{} client request (raw): {}", + conn_ctx.remote_addr, + String::from_utf8_lossy(&data) + ); + + let request = Request::decode(&data); + tracing::debug!( + "{} client request (decoded): {:?}", + conn_ctx.remote_addr, + request + ); + let response = + conn_ctx.server_ctx.handler.handle_request(request).await; + tracing::trace!( + "{} server response: {:?}", + conn_ctx.remote_addr, + response + ); + let raw_response = response.encode(); + tracing::trace!( + "{} server response (raw): {}", + conn_ctx.remote_addr, + String::from_utf8_lossy(&raw_response) + ); + + // write response back to client + if let Err(e) = tx.write_all(&raw_response).await { + tracing::error!(?e); + } + + // close the stream + if let Err(e) = tx.close().await { + tracing::error!(?e); } } }); @@ -62,55 +122,54 @@ impl Server { } } -fn generate_test_db>(db_path: P) { - let db_path = db_path.into(); +#[derive(Clone, Debug)] +struct RequestHandler { + db: Arc>, +} - let mut db = GameDB::new(); - db.add_game( - "Call of Duty 3", - "A shooter game in war.", - "call_of_duty.tar.zst", - 64, - semver::Version::new(1, 0, 0), - ); - db.add_game( - "Counter-Strike Source", - "Valve's iconic shooter.", - "cstrike.tar.zst", - 32, - semver::Version::new(1, 0, 0), - ); - db.add_game( - "Factorio", - "Best game of all time, seriously.", - "factorio.tar.zst", - 128, - semver::Version::new(1, 0, 0), - ); +impl RequestHandler { + fn new(db_path: &Path) -> eyre::Result { + let db = GameDB::load_from_file(db_path)?; + Ok(RequestHandler { + db: Arc::new(Mutex::new(db)), + }) + } - db.update_game(1, Some("Call of Duty 4"), None, None); - db.save_to_file(&db_path).unwrap(); + async fn handle_request(&self, request: Request) -> Response { + match request { + Request::ListGames => { + let db = self.db.lock().await; + Response::Games(db.all_games().into_iter().cloned().collect()) + } + Request::GetGame { id } => { + let db = self.db.lock().await; + match db.get_game_by_id(id) { + Some(game) => Response::Game(game.clone()), + None => Response::GameNotFound(id), + } + } + Request::Invalid(data, err_msg) => { + tracing::error!( + "got invalid request from client (error: {}): {}", + err_msg, + String::from_utf8_lossy(&data) + ); + Response::InvalidRequest(data, err_msg) + } + } + } } const GAME_DB_PATH: &str = "/home/pfs/shm/game.db"; #[tokio::main] -async fn main() { +async fn main() -> eyre::Result<()> { generate_test_db(GAME_DB_PATH); let mut server = Server::new(GAME_DB_PATH); - server - .db - .lock() - .await - .list_games() - .iter() - .sorted() - .for_each(|game| println!("{game:?}")); - + #[allow(clippy::unwrap_used)] server .run(format!("{SERVER_ADDR}:{SERVER_PORT}").parse().unwrap()) .await - .unwrap(); } diff --git a/crates/lanspread-server/src/testing.rs b/crates/lanspread-server/src/testing.rs new file mode 100644 index 0000000..1fcbbb1 --- /dev/null +++ b/crates/lanspread-server/src/testing.rs @@ -0,0 +1,35 @@ +#![allow(clippy::unwrap_used)] + +use std::path::PathBuf; + +use lanspread_db::db::GameDB; + +pub(crate) fn generate_test_db>(db_path: P) { + let db_path = db_path.into(); + + let mut db = GameDB::new(); + db.add_game( + "Call of Duty 3", + "A shooter game in war.", + "call_of_duty.tar.zst", + 64, + semver::Version::new(1, 0, 0), + ); + db.add_game( + "Counter-Strike Source", + "Valve's iconic shooter.", + "cstrike.tar.zst", + 32, + semver::Version::new(1, 0, 0), + ); + db.add_game( + "Factorio", + "Best game of all time, seriously.", + "factorio.tar.zst", + 128, + semver::Version::new(1, 0, 0), + ); + + db.update_game(1, Some("Call of Duty 4"), None, None); + db.save_to_file(&db_path).unwrap(); +} diff --git a/crates/lanspread-utils/Cargo.toml b/crates/lanspread-utils/Cargo.toml new file mode 100644 index 0000000..e5519c3 --- /dev/null +++ b/crates/lanspread-utils/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "lanspread-utils" +version = "0.1.0" +edition = "2021" + +[lints.rust] +unsafe_code = "forbid" + +[lints.clippy] +pedantic = { level = "warn", priority = -1 } +todo = "warn" +unwrap_used = "warn" + +[dependencies] diff --git a/crates/lanspread-utils/src/lib.rs b/crates/lanspread-utils/src/lib.rs new file mode 100644 index 0000000..eda363d --- /dev/null +++ b/crates/lanspread-utils/src/lib.rs @@ -0,0 +1 @@ +pub mod macros; diff --git a/crates/lanspread-utils/src/macros.rs b/crates/lanspread-utils/src/macros.rs new file mode 100644 index 0000000..691ecaf --- /dev/null +++ b/crates/lanspread-utils/src/macros.rs @@ -0,0 +1,6 @@ +#[macro_export] +macro_rules! maybe_addr { + ($addr:expr) => { + $addr.map_or("".to_string(), |addr| addr.to_string()) + }; +} diff --git a/server.sh b/server.sh index a3a04e2..db3a12f 100755 --- a/server.sh +++ b/server.sh @@ -1,3 +1,6 @@ #!/usr/bin/env bash -cargo run --release -p lanspread-server +export RUST_LOG=info,lanspread_server=debug,lanspread_proto=debug +exec cargo run -p lanspread-server + +#RUST_LOG=info exec cargo run --profile release-lto -p lanspread-server