From 9c1b94fa6a0d43b59f64be1cb8bedff176b050c8 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Tue, 11 Nov 2025 21:30:26 +0100 Subject: [PATCH] wip --- Cargo.lock | 10 +- crates/lanspread-peer/Cargo.toml | 8 - crates/lanspread-peer/src/cli.rs | 23 -- crates/lanspread-peer/src/lib.rs | 26 +- crates/lanspread-peer/src/main.rs | 178 ------------- crates/lanspread-peer/src/peer.rs | 156 +----------- .../src-tauri/Cargo.toml | 2 +- .../src-tauri/src/lib.rs | 234 ++++++++++++------ crates/lanspread-utils/src/macros.rs | 4 +- 9 files changed, 191 insertions(+), 450 deletions(-) delete mode 100644 crates/lanspread-peer/src/cli.rs delete mode 100644 crates/lanspread-peer/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 015513e..30a045a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2334,24 +2334,16 @@ name = "lanspread-peer" version = "0.1.0" dependencies = [ "bytes", - "chrono", - "clap", "eyre", "gethostname", - "itertools 0.14.0", - "lanspread-compat", "lanspread-db", "lanspread-mdns", "lanspread-proto", "lanspread-utils", "log", - "mimalloc", "s2n-quic", - "semver", - "serde_json", "tokio", "tracing", - "tracing-subscriber", "uuid", "walkdir", ] @@ -2398,9 +2390,9 @@ name = "lanspread-tauri-deno-ts" version = "0.1.0" dependencies = [ "eyre", - "lanspread-client", "lanspread-db", "lanspread-mdns", + "lanspread-peer", "log", "mimalloc", "serde", diff --git a/crates/lanspread-peer/Cargo.toml b/crates/lanspread-peer/Cargo.toml index 322249b..04e4eaf 100644 --- a/crates/lanspread-peer/Cargo.toml +++ b/crates/lanspread-peer/Cargo.toml @@ -13,7 +13,6 @@ unwrap_used = "warn" [dependencies] # local -lanspread-compat = { path = "../lanspread-compat" } lanspread-db = { path = "../lanspread-db" } lanspread-mdns = { path = "../lanspread-mdns" } lanspread-proto = { path = "../lanspread-proto" } @@ -21,18 +20,11 @@ lanspread-utils = { path = "../lanspread-utils" } # external bytes = { workspace = true } -chrono = { workspace = true } -clap = { workspace = true } eyre = { workspace = true } gethostname = { workspace = true } -itertools = { workspace = true } log = { workspace = true } -mimalloc = { workspace = true } s2n-quic = { workspace = true } -serde_json = { workspace = true } -semver = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } diff --git a/crates/lanspread-peer/src/cli.rs b/crates/lanspread-peer/src/cli.rs deleted file mode 100644 index e1ac0ec..0000000 --- a/crates/lanspread-peer/src/cli.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::{net::IpAddr, path::PathBuf}; - -use clap::Parser; - -#[allow(clippy::doc_markdown)] -#[derive(Debug, Parser)] -pub struct Cli { - /// IP address to bind to. - #[clap(long)] - pub ip: IpAddr, - /// Listen port. - #[clap(long)] - pub port: u16, - /// Game database path (SQLite). - #[clap(long)] - pub db: PathBuf, - /// Games folder. - #[clap(long)] - pub game_dir: PathBuf, - /// Thumbnails folder. - #[clap(long)] - pub thumbs_dir: PathBuf, -} diff --git a/crates/lanspread-peer/src/lib.rs b/crates/lanspread-peer/src/lib.rs index 87dec9e..a0b8262 100644 --- a/crates/lanspread-peer/src/lib.rs +++ b/crates/lanspread-peer/src/lib.rs @@ -34,6 +34,30 @@ use uuid::Uuid; use crate::peer::{send_game_file_chunk, send_game_file_data}; +/// Initialize and start the peer system +/// This function replaces the main.rs entry point and allows the peer to be started from other crates +pub fn start_peer( + game_dir: String, + tx_notify_ui: UnboundedSender, +) -> eyre::Result> { + log::info!("Starting peer system with game directory: {game_dir}"); + + let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel(); + + // Start the peer in a background task + let tx_control_clone = tx_control.clone(); + tokio::spawn(async move { + if let Err(e) = run_peer(rx_control, tx_notify_ui).await { + log::error!("Peer system failed: {e}"); + } + }); + + // Set the game directory + tx_control.send(PeerCommand::SetGameDir(game_dir))?; + + Ok(tx_control_clone) +} + static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); static KEY_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../key.pem")); @@ -769,7 +793,6 @@ struct Ctx { struct PeerCtx { game_dir: Arc>>, local_game_db: Arc>>, - peer_game_db: Arc>, } pub async fn run_peer( @@ -786,7 +809,6 @@ pub async fn run_peer( let peer_ctx = PeerCtx { game_dir: ctx.game_dir.clone(), local_game_db: ctx.local_game_db.clone(), - peer_game_db: ctx.peer_game_db.clone(), }; // Start server component diff --git a/crates/lanspread-peer/src/main.rs b/crates/lanspread-peer/src/main.rs deleted file mode 100644 index 12c540b..0000000 --- a/crates/lanspread-peer/src/main.rs +++ /dev/null @@ -1,178 +0,0 @@ -use mimalloc::MiMalloc; - -#[global_allocator] -static GLOBAL: MiMalloc = MiMalloc; - -mod cli; -mod peer; - -use std::{net::SocketAddr, time::Duration}; - -use clap::Parser as _; -use cli::Cli; -use gethostname::gethostname; -use lanspread_compat::eti; -use lanspread_db::db::{Game, GameDB}; -use lanspread_mdns::{DaemonEvent, LANSPREAD_SERVICE_TYPE, MdnsAdvertiser}; -use lanspread_peer::{PeerCommand, PeerEvent, run_peer}; -use tracing_subscriber::EnvFilter; -use uuid::Uuid; - -fn spawn_mdns_task(server_addr: SocketAddr) -> eyre::Result<()> { - let peer_id = Uuid::now_v7().simple().to_string(); - - let hostname = gethostname(); - let hostname_str = hostname.to_str().unwrap_or(""); - - // Calculate maximum hostname length that fits with UUID in 63 char limit - let max_hostname_len = 63usize.saturating_sub(peer_id.len() + 1); // +1 for the dash - let truncated_hostname = if hostname_str.len() > max_hostname_len { - hostname_str.get(..max_hostname_len).unwrap_or(hostname_str) - } else { - hostname_str - }; - - let combined_str = if truncated_hostname.is_empty() { - // If no hostname is available, use just the UUID - peer_id - } else { - format!("{truncated_hostname}-{peer_id}") - }; - - let mdns = MdnsAdvertiser::new(LANSPREAD_SERVICE_TYPE, &combined_str, server_addr)?; - - tokio::spawn(async move { - while let Ok(event) = mdns.monitor.recv() { - tracing::trace!("mDNS: {:?}", &event); - if let DaemonEvent::Error(e) = event { - tracing::error!("mDNS: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }); - - Ok(()) -} - -async fn prepare_game_db(cli: &Cli) -> eyre::Result { - // build games from ETI database - let mut games: Vec = eti::get_games(&cli.db) - .await? - .into_iter() - .map(Into::into) - .collect(); - - // filter out games that the peer does not have in game_dir - games.retain(|game| cli.game_dir.join(&game.id).is_dir()); - - // read version.ini files and update eti_game_version - for game in &mut games { - let game_dir = cli.game_dir.join(&game.id); - if let Ok(version) = lanspread_db::db::read_version_from_ini(&game_dir) { - game.eti_game_version = version; - if let Some(ref version) = game.eti_game_version { - tracing::debug!("Read version for game {}: {}", game.id, version); - } - } else { - tracing::warn!("Failed to read version.ini for game: {}", game.id); - } - } - - let mut game_db = GameDB::from(games); - - game_db.add_thumbnails(&cli.thumbs_dir); - - game_db.all_games().iter().for_each(|game| { - tracing::debug!("Found game: {game}"); - }); - tracing::info!("Prepared game database with {} games", game_db.games.len()); - - Ok(game_db) -} - -#[tokio::main] -async fn main() -> eyre::Result<()> { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .init(); - - let cli = Cli::parse(); - - assert!( - cli.game_dir.exists(), - "Games folder does not exist: {}", - cli.game_dir.to_str().expect("Invalid path") - ); - - let server_addr = SocketAddr::from((cli.ip, cli.port)); - - // spawn mDNS listener task - spawn_mdns_task(server_addr)?; - - let _game_db = prepare_game_db(&cli).await?; - - tracing::info!("Peer listening on {server_addr}"); - - let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel(); - let (tx_notify_ui, mut rx_notify_ui) = tokio::sync::mpsc::unbounded_channel(); - - // Start peer task - let peer_task = tokio::spawn(async move { run_peer(rx_control, tx_notify_ui).await }); - - // Handle events from peer - let event_handler = tokio::spawn(async move { - while let Some(event) = rx_notify_ui.recv().await { - match event { - PeerEvent::ListGames(games) => { - tracing::info!("Received list of {} games", games.len()); - } - PeerEvent::GotGameFiles { - id, - file_descriptions, - } => { - tracing::info!( - "Got game files for {}: {} files", - id, - file_descriptions.len() - ); - } - PeerEvent::DownloadGameFilesBegin { id } => { - tracing::info!("Download started for game: {}", id); - } - PeerEvent::DownloadGameFilesFinished { id } => { - tracing::info!("Download finished for game: {}", id); - } - PeerEvent::DownloadGameFilesFailed { id } => { - tracing::error!("Download failed for game: {}", id); - } - PeerEvent::PeerConnected(addr) => { - tracing::info!("Peer connected: {}", addr); - } - PeerEvent::PeerDisconnected(addr) => { - tracing::info!("Peer disconnected: {}", addr); - } - PeerEvent::PeerDiscovered(addr) => { - tracing::info!("Peer discovered: {}", addr); - } - PeerEvent::PeerLost(addr) => { - tracing::info!("Peer lost: {}", addr); - } - } - } - }); - - // Set the game directory from CLI args - if let Err(e) = tx_control.send(PeerCommand::SetGameDir( - cli.game_dir.to_string_lossy().to_string(), - )) { - tracing::error!("Failed to send SetGameDir command: {e}"); - } - - // TODO: Add additional CLI interaction or other peer discovery logic here - - // Wait for tasks - let (peer_result, _) = tokio::join!(peer_task, event_handler); - peer_result??; - - Ok(()) -} diff --git a/crates/lanspread-peer/src/peer.rs b/crates/lanspread-peer/src/peer.rs index c46833b..c223f7e 100644 --- a/crates/lanspread-peer/src/peer.rs +++ b/crates/lanspread-peer/src/peer.rs @@ -1,145 +1,13 @@ -use std::{ - convert::TryInto, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{convert::TryInto, path::Path}; use bytes::Bytes; -use lanspread_db::db::{GameDB, GameFileDescription}; -use lanspread_proto::{Message as _, Request, Response}; +use lanspread_db::db::GameFileDescription; use lanspread_utils::maybe_addr; use s2n_quic::stream::SendStream; use tokio::{ io::{AsyncReadExt, AsyncSeekExt}, - sync::RwLock, time::Instant, }; -use walkdir::WalkDir; - -#[derive(Clone, Debug)] -pub struct PeerRequestHandler { - db: Arc>, -} - -impl PeerRequestHandler { - pub fn new(games: GameDB) -> PeerRequestHandler { - PeerRequestHandler { - db: Arc::new(RwLock::new(games)), - } - } - - pub async fn handle_request( - &self, - request: Request, - games_folder: &Path, - tx: &mut SendStream, - ) -> eyre::Result<()> { - let remote_addr = maybe_addr!(tx.connection().remote_addr()); - - // process request and generate response - let response = self.process_request(request, games_folder).await; - tracing::trace!("{remote_addr} peer response: {response:?}"); - - // write response back to client - tx.send(response.encode()).await?; - - // close the stream - tx.close().await?; - - Ok(()) - } - - fn handle_ping() -> Response { - Response::Pong - } - - async fn handle_list_games(&self) -> Response { - let db = self.db.read().await; - Response::ListGames(db.all_games().into_iter().cloned().collect()) - } - - async fn handle_get_game(&self, id: String, games_folder: &Path) -> Response { - if self.db.read().await.get_game_by_id(&id).is_none() { - tracing::error!("Game not found in DB: {id}"); - return Response::GameNotFound(id); - } - - let game_dir = games_folder.join(&id); - if !game_dir.exists() { - tracing::error!("Game folder does not exist: {}", game_dir.display()); - return Response::GameNotFound(id); - } - - let mut game_files_descs: Vec = vec![]; - - for entry in WalkDir::new(&game_dir) - .into_iter() - .filter_map(std::result::Result::ok) - { - match get_relative_path(games_folder, entry.path()) { - Ok(relative_path) => match relative_path.to_str() { - Some(relative_path) => { - let is_dir = entry.file_type().is_dir(); - let size = if is_dir { - None - } else { - match entry.metadata() { - Ok(metadata) => Some(metadata.len()), - Err(e) => { - tracing::error!( - "Failed to read metadata for {}: {e}", - relative_path - ); - None - } - } - }; - let game_file_description = GameFileDescription { - game_id: id.clone(), - relative_path: relative_path.to_string(), - is_dir, - size, - }; - - tracing::debug!("Found game file: {:?}", game_file_description); - - game_files_descs.push(game_file_description); - } - None => { - tracing::error!("Failed to get relative path: {relative_path:?}",); - } - }, - Err(e) => { - tracing::error!("Failed to get relative path: {e}"); - } - } - } - - Response::GetGame { - id, - file_descriptions: game_files_descs, - } - } - - fn handle_get_game_file_data() -> Response { - Response::InvalidRequest(Bytes::new(), "Not implemented".to_string()) - } - - fn handle_invalid(data: Bytes, err_msg: String) -> Response { - Response::InvalidRequest(data, err_msg) - } - - pub async fn process_request(&self, request: Request, games_folder: &Path) -> Response { - match request { - Request::Ping => PeerRequestHandler::handle_ping(), - Request::ListGames => self.handle_list_games().await, - Request::GetGame { id } => self.handle_get_game(id, games_folder).await, - Request::GetGameFileData(_) => PeerRequestHandler::handle_get_game_file_data(), - Request::GetGameFileChunk { .. } => PeerRequestHandler::handle_get_game_file_data(), - Request::Invalid(data, err_msg) => PeerRequestHandler::handle_invalid(data, err_msg), - } - } -} async fn stream_file_bytes( tx: &mut SendStream, @@ -150,7 +18,7 @@ async fn stream_file_bytes( ) -> eyre::Result<()> { let remote_addr = maybe_addr!(tx.connection().remote_addr()); let game_file = base_dir.join(relative_path); - tracing::debug!( + log::debug!( "{remote_addr} streaming file bytes for peer: {:?}, offset: {offset}, length: {length:?}", game_file ); @@ -189,7 +57,7 @@ async fn stream_file_bytes( if elapsed.as_secs_f64() >= 1.0 { #[allow(clippy::cast_precision_loss)] let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0); - tracing::debug!( + log::debug!( "{remote_addr} sending file data: {:?}, MB/s: {mb_per_s:.2}", game_file ); @@ -199,7 +67,7 @@ async fn stream_file_bytes( } } - tracing::debug!( + log::debug!( "{remote_addr} finished streaming file bytes: {:?}, total_bytes: {total_bytes}", game_file ); @@ -215,7 +83,7 @@ pub async fn send_game_file_data( ) { if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await { let remote_addr = maybe_addr!(tx.connection().remote_addr()); - tracing::error!( + log::error!( "{remote_addr} failed to stream file {}: {e}", game_file_desc.relative_path ); @@ -232,18 +100,8 @@ pub async fn send_game_file_chunk( ) { if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await { let remote_addr = maybe_addr!(tx.connection().remote_addr()); - tracing::error!( + log::error!( "{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}" ); } } - -fn get_relative_path(base: &Path, deep_path: &Path) -> std::io::Result { - let base_canonical = base.canonicalize()?; - let full_canonical = deep_path.canonicalize()?; - - full_canonical - .strip_prefix(&base_canonical) - .map(std::path::Path::to_path_buf) - .map_err(|_| std::io::Error::other("Path is not within base directory")) -} diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/Cargo.toml b/crates/lanspread-tauri-deno-ts/src-tauri/Cargo.toml index c3046ad..a4df579 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/Cargo.toml +++ b/crates/lanspread-tauri-deno-ts/src-tauri/Cargo.toml @@ -28,7 +28,7 @@ tauri-build = { version = "2", features = [] } [dependencies] # local -lanspread-client = { path = "../../lanspread-client" } +lanspread-peer = { path = "../../lanspread-peer" } lanspread-db = { path = "../../lanspread-db" } lanspread-mdns = { path = "../../lanspread-mdns" } diff --git a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs index d6b2066..7038ef9 100644 --- a/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs +++ b/crates/lanspread-tauri-deno-ts/src-tauri/src/lib.rs @@ -1,14 +1,13 @@ use std::{ collections::HashSet, - net::SocketAddr, path::{Path, PathBuf}, sync::Arc, }; use eyre::bail; -use lanspread_client::{ClientCommand, ClientEvent}; use lanspread_db::db::{Game, GameDB}; use lanspread_mdns::{LANSPREAD_SERVICE_TYPE, discover_service}; +use lanspread_peer::{PeerCommand, PeerEvent, start_peer}; use tauri::{AppHandle, Emitter as _, Manager}; use tauri_plugin_shell::{ShellExt, process::Command}; use tokio::sync::{RwLock, mpsc::UnboundedSender}; @@ -16,8 +15,7 @@ use tokio::sync::{RwLock, mpsc::UnboundedSender}; // Learn more about Tauri commands at https://tauri.app/develop/calling-rust/ struct LanSpreadState { - server_addr: RwLock>, - client_ctrl: UnboundedSender, + peer_ctrl: Arc>>>, games: Arc>, games_in_download: Arc>>, games_folder: Arc>, @@ -27,8 +25,15 @@ struct LanSpreadState { fn request_games(state: tauri::State) { log::debug!("request_games"); - if let Err(e) = state.inner().client_ctrl.send(ClientCommand::ListGames) { - log::error!("Failed to send message to client: {e:?}"); + let peer_ctrl = + tauri::async_runtime::block_on(async { state.inner().peer_ctrl.read().await.clone() }); + + if let Some(peer_ctrl) = peer_ctrl { + if let Err(e) = peer_ctrl.send(PeerCommand::ListGames) { + log::error!("Failed to send message to peer: {e:?}"); + } + } else { + log::warn!("Peer system not initialized yet"); } } @@ -46,11 +51,18 @@ fn install_game(id: String, state: tauri::State) -> bool { return false; } - if let Err(e) = state.inner().client_ctrl.send(ClientCommand::GetGame(id)) { - log::error!("Failed to send message to client: {e:?}"); - } + let peer_ctrl = + tauri::async_runtime::block_on(async { state.inner().peer_ctrl.read().await.clone() }); - true + if let Some(peer_ctrl) = peer_ctrl { + if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id)) { + log::error!("Failed to send message to peer: {e:?}"); + } + true + } else { + log::warn!("Peer system not initialized yet"); + false + } } /// Backup the current game folder by renaming it to `___TO_BE_DELETE___GameNameHere` @@ -157,22 +169,25 @@ fn update_game(id: String, state: tauri::State) -> bool { log::info!("Starting update for game: {id}"); // Start the download process - if let Err(e) = state - .inner() - .client_ctrl - .send(ClientCommand::GetGame(id.clone())) - { - log::error!("Failed to send message to client: {e:?}"); + let peer_ctrl = + tauri::async_runtime::block_on(async { state.inner().peer_ctrl.read().await.clone() }); - // Try to restore backup if download fails to start - if let Err(restore_err) = restore_game_folder(&game_path, &backup_path) { - log::error!("Failed to restore backup after download failure: {restore_err}"); + if let Some(peer_ctrl) = peer_ctrl { + if let Err(e) = peer_ctrl.send(PeerCommand::GetGame(id.clone())) { + log::error!("Failed to send message to peer: {e:?}"); + + // Try to restore backup if download fails to start + if let Err(restore_err) = restore_game_folder(&game_path, &backup_path) { + log::error!("Failed to restore backup after download failure: {restore_err}"); + } + + return false; } - - return false; + true + } else { + log::warn!("Peer system not initialized yet"); + false } - - true } #[cfg(target_os = "windows")] @@ -304,11 +319,21 @@ fn set_game_install_state_from_path(game_db: &mut GameDB, path: &Path, installed fn update_game_directory(app_handle: tauri::AppHandle, path: String) { log::info!("update_game_directory: {path}"); - app_handle - .state::() - .client_ctrl - .send(ClientCommand::SetGameDir(path.clone())) - .expect("Failed to send ClientCommand: SetGameDir"); + let peer_ctrl = tauri::async_runtime::block_on(async { + app_handle + .state::() + .inner() + .peer_ctrl + .read() + .await + .clone() + }); + + if let Some(peer_ctrl) = peer_ctrl { + if let Err(e) = peer_ctrl.send(PeerCommand::SetGameDir(path.clone())) { + log::error!("Failed to send PeerCommand::SetGameDir: {e}"); + } + } { tauri::async_runtime::block_on(async { @@ -366,24 +391,30 @@ fn update_game_directory(app_handle: tauri::AppHandle, path: String) { }); } -async fn find_server(app: AppHandle) { - log::info!("Looking for server..."); +async fn find_peers(app: AppHandle) { + log::info!("Looking for peers..."); loop { match discover_service(LANSPREAD_SERVICE_TYPE) { - Ok(server_addr) => { - log::info!("Found server at {server_addr}"); + Ok(peer_addr) => { + log::info!("Found peer at {peer_addr}"); let state: tauri::State = app.state(); - *state.server_addr.write().await = Some(server_addr); - state - .client_ctrl - .send(ClientCommand::ServerAddr(server_addr)) - .expect("Failed to send ClientCommand: ServerAddr"); - request_games(state); + let peer_ctrl = state.peer_ctrl.read().await.clone(); + if let Some(peer_ctrl) = peer_ctrl { + if let Err(e) = peer_ctrl.send(PeerCommand::ConnectToPeer(peer_addr)) { + log::error!("Failed to send PeerCommand::ConnectToPeer: {e}"); + } + request_games(state); + } else { + log::warn!( + "Peer system not initialized yet, cannot connect to discovered peer" + ); + } break; } Err(e) => { - log::warn!("Failed to find server: {e} - retrying..."); + log::warn!("Failed to find peers: {e} - retrying..."); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } } } @@ -391,7 +422,7 @@ async fn find_server(app: AppHandle) { async fn update_game_db(games: Vec, app: AppHandle) { for game in &games { - log::trace!("client event ListGames iter: {game:?}"); + log::trace!("peer event ListGames iter: {game:?}"); } let state = app.state::(); @@ -481,17 +512,11 @@ pub fn run() { .level(log::LevelFilter::Info) .level_for("mdns_sd::service_daemon", log::LevelFilter::Off); - // channel to pass commands to the client - let (tx_client_control, rx_client_control) = - tokio::sync::mpsc::unbounded_channel::(); - - // channel to receive events from the client - let (tx_client_event, mut rx_client_event) = - tokio::sync::mpsc::unbounded_channel::(); + // channel to receive events from the peer + let (tx_peer_event, mut rx_peer_event) = tokio::sync::mpsc::unbounded_channel::(); let lanspread_state = LanSpreadState { - server_addr: RwLock::new(None), - client_ctrl: tx_client_control, + peer_ctrl: Arc::new(RwLock::new(None)), games: Arc::new(RwLock::new(GameDB::empty())), games_in_download: Arc::new(RwLock::new(HashSet::new())), games_folder: Arc::new(RwLock::new(String::new())), @@ -510,45 +535,73 @@ pub fn run() { update_game ]) .manage(lanspread_state) - .setup(|app| { + .setup({ + let tx_peer_event_clone = tx_peer_event.clone(); + move |app| { let app_handle = app.handle().clone(); - // discover server - tauri::async_runtime::spawn(async move { find_server(app_handle).await }); + // discover peers + tauri::async_runtime::spawn(async move { find_peers(app_handle).await }); + + // Initialize peer system when games directory is set + let app_handle_clone = app.handle().clone(); tauri::async_runtime::spawn(async move { - lanspread_client::run(rx_client_control, tx_client_event).await + // Wait for games directory to be set + loop { + let games_folder = { + let state = app_handle_clone.state::(); + state.games_folder.read().await.clone() + }; + + if !games_folder.is_empty() { + match start_peer(games_folder, tx_peer_event_clone) { + Ok(peer_ctrl) => { + let state = app_handle_clone.state::(); + *state.peer_ctrl.write().await = Some(peer_ctrl); + log::info!("Peer system initialized successfully"); + } + Err(e) => { + log::error!("Failed to initialize peer system: {e}"); + } + } + break; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } }); let app_handle = app.handle().clone(); tauri::async_runtime::spawn(async move { - while let Some(event) = rx_client_event.recv().await { + while let Some(event) = rx_peer_event.recv().await { match event { - ClientEvent::ListGames(games) => { - log::info!("ClientEvent::ListGames received"); + PeerEvent::ListGames(games) => { + log::info!("PeerEvent::ListGames received"); update_game_db(games, app_handle.clone()).await; } - ClientEvent::GotGameFiles { id, file_descriptions } => { - log::info!("ClientEvent::GotGameFiles received"); + PeerEvent::GotGameFiles { id, file_descriptions } => { + log::info!("PeerEvent::GotGameFiles received"); if let Err(e) = app_handle.emit( "game-download-pre", Some(id.clone()), ) { - log::error!("ClientEvent::GotGameFiles: Failed to emit game-download-pre event: {e}"); + log::error!("PeerEvent::GotGameFiles: Failed to emit game-download-pre event: {e}"); } - app_handle - .state::() - .inner() - .client_ctrl - .send(ClientCommand::DownloadGameFiles{ + let state = app_handle.state::(); + let peer_ctrl = state.peer_ctrl.read().await.clone(); + if let Some(peer_ctrl) = peer_ctrl { + if let Err(e) = peer_ctrl.send(PeerCommand::DownloadGameFiles{ id, file_descriptions, - }) - .expect("Failed to send ClientCommand: DownloadGameFiles"); + }) { + log::error!("Failed to send PeerCommand::DownloadGameFiles: {e}"); + } + } } - ClientEvent::DownloadGameFilesBegin { id } => { - log::info!("ClientEvent::DownloadGameFilesBegin received"); + PeerEvent::DownloadGameFilesBegin { id } => { + log::info!("PeerEvent::DownloadGameFilesBegin received"); app_handle .state::() @@ -559,13 +612,13 @@ pub fn run() { .insert(id.clone()); if let Err(e) = app_handle.emit("game-download-begin", Some(id)) { - log::error!("ClientEvent::DownloadGameFilesBegin: Failed to emit game-download-begin event: {e}"); + log::error!("PeerEvent::DownloadGameFilesBegin: Failed to emit game-download-begin event: {e}"); } } - ClientEvent::DownloadGameFilesFinished { id } => { - log::info!("ClientEvent::DownloadGameFilesFinished received"); + PeerEvent::DownloadGameFilesFinished { id } => { + log::info!("PeerEvent::DownloadGameFilesFinished received"); if let Err(e) = app_handle.emit("game-download-finished", Some(id.clone())) { - log::error!("ClientEvent::DownloadGameFilesFinished: Failed to emit game-download-finished event: {e}"); + log::error!("PeerEvent::DownloadGameFilesFinished: Failed to emit game-download-finished event: {e}"); } app_handle @@ -594,22 +647,22 @@ pub fn run() { if !games_folder.is_empty() { let backup_name = format!("___TO_BE_DELETE___{id}"); - let backup_path = PathBuf::from(games_folder).join(backup_name); + let backup_path = PathBuf::from(&games_folder).join(backup_name); if let Err(e) = cleanup_backup_folder(&backup_path) { log::error!("Failed to cleanup backup folder after successful update: {e}"); } } - log::info!("ClientEvent::UnpackGameFinished received"); + log::info!("PeerEvent::UnpackGameFinished received"); if let Err(e) = app_handle.emit("game-unpack-finished", Some(id.clone())) { - log::error!("ClientEvent::UnpackGameFinished: Failed to emit game-unpack-finished event: {e}"); + log::error!("PeerEvent::UnpackGameFinished: Failed to emit game-unpack-finished event: {e}"); } }); } } - ClientEvent::DownloadGameFilesFailed { id } => { - log::warn!("ClientEvent::DownloadGameFilesFailed received"); + PeerEvent::DownloadGameFilesFailed { id } => { + log::warn!("PeerEvent::DownloadGameFilesFailed received"); if let Err(e) = app_handle.emit("game-download-failed", Some(id.clone())) { log::error!("Failed to emit game-download-failed event: {e}"); @@ -652,12 +705,37 @@ pub fn run() { log::error!("Failed to restore backup after download failure: {e}"); } } - }, + } + PeerEvent::PeerConnected(addr) => { + log::info!("Peer connected: {}", addr); + if let Err(e) = app_handle.emit("peer-connected", Some(addr.to_string())) { + log::error!("Failed to emit peer-connected event: {e}"); + } + } + PeerEvent::PeerDisconnected(addr) => { + log::info!("Peer disconnected: {}", addr); + if let Err(e) = app_handle.emit("peer-disconnected", Some(addr.to_string())) { + log::error!("Failed to emit peer-disconnected event: {e}"); + } + } + PeerEvent::PeerDiscovered(addr) => { + log::info!("Peer discovered: {}", addr); + if let Err(e) = app_handle.emit("peer-discovered", Some(addr.to_string())) { + log::error!("Failed to emit peer-discovered event: {e}"); + } + } + PeerEvent::PeerLost(addr) => { + log::info!("Peer lost: {}", addr); + if let Err(e) = app_handle.emit("peer-lost", Some(addr.to_string())) { + log::error!("Failed to emit peer-lost event: {e}"); + } + } } } }); Ok(()) + } }) .run(tauri::generate_context!()) .expect("error while running tauri application"); diff --git a/crates/lanspread-utils/src/macros.rs b/crates/lanspread-utils/src/macros.rs index e651165..a957085 100644 --- a/crates/lanspread-utils/src/macros.rs +++ b/crates/lanspread-utils/src/macros.rs @@ -1,8 +1,8 @@ #[macro_export] macro_rules! maybe_addr { ($addr:expr) => { - $addr.map_or(Arc::new("".to_string()), |addr| { - Arc::new(addr.to_string()) + $addr.map_or(std::sync::Arc::new("".to_string()), |addr| { + std::sync::Arc::new(addr.to_string()) }) }; }