//! JSONL command-line harness for running a peer without the Tauri GUI. use std::{ collections::{HashMap, HashSet}, ffi::OsString, io::Write as _, net::SocketAddr, path::{Path, PathBuf}, sync::{Arc, Mutex}, time::Duration, }; use eyre::Context; use lanspread_compat::eti::get_games; use lanspread_db::db::{Game, GameFileDescription}; use lanspread_peer::{ ActiveOperation, ActiveOperationKind, InstallOperation, PeerCommand, PeerEvent, PeerGameDB, PeerRuntimeComponent, PeerRuntimeHandle, PeerSnapshot, PeerStartOptions, start_peer_with_options, }; use lanspread_peer_cli::{ CliCommand, CommandEnvelope, ExternalUnrarUnpacker, FixtureSeed, FixtureUnpacker, error_line, event_line, parse_command_line, result_line, seed_fixture_game, }; use serde_json::{Value, json}; use tokio::{ io::{AsyncBufReadExt, BufReader}, sync::{Notify, RwLock, mpsc}, }; #[derive(Debug)] struct Args { name: String, games_dir: PathBuf, state_dir: PathBuf, catalog_db: Option, fixtures: Vec, unrar: Option, } #[derive(Clone)] struct JsonlWriter { lock: Arc>, } impl JsonlWriter { fn new() -> Self { Self { lock: Arc::new(Mutex::new(())), } } fn emit(&self, line: eyre::Result) { let line = match line { Ok(line) => line, Err(err) => { eprintln!("failed to prepare JSONL output: {err}"); return; } }; let Ok(_guard) = self.lock.lock() else { eprintln!("failed to lock stdout"); return; }; let mut stdout = std::io::stdout().lock(); if let Err(err) = writeln!(stdout, "{line}") { eprintln!("failed to write JSONL output: {err}"); } } } #[derive(Default)] struct CliState { local_peer: Option, local_games: Vec, remote_games: Vec, active_operations: Vec, game_files: HashMap>, unavailable_games: HashSet, } #[derive(Clone, serde::Serialize)] struct LocalPeer { peer_id: String, addr: String, } struct SharedState { state: RwLock, peer_game_db: Arc>, catalog: Arc>>, notify: Notify, } #[tokio::main] async fn main() -> eyre::Result<()> { let args = parse_args()?; tokio::fs::create_dir_all(&args.games_dir).await?; tokio::fs::create_dir_all(&args.state_dir).await?; let fixture_seeds = seed_fixtures(&args.games_dir, &args.fixtures)?; let catalog = load_catalog(args.catalog_db.as_deref(), &fixture_seeds).await; let (tx_events, rx_events) = mpsc::unbounded_channel(); let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new())); let catalog = Arc::new(RwLock::new(catalog)); let unpacker: Arc = match args.unrar { Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)), None => Arc::new(FixtureUnpacker), }; let mut handle = start_peer_with_options( args.games_dir.clone(), tx_events, peer_game_db.clone(), unpacker, catalog.clone(), PeerStartOptions { state_dir: Some(args.state_dir.clone()), }, )?; let sender = handle.sender(); let shared = Arc::new(SharedState { state: RwLock::new(CliState::default()), peer_game_db, catalog: catalog.clone(), notify: Notify::new(), }); let writer = JsonlWriter::new(); tokio::spawn(event_loop(rx_events, shared.clone(), writer.clone())); writer.emit(event_line( "cli-started", json!({ "name": args.name, "games_dir": args.games_dir, "state_dir": args.state_dir, "fixtures": fixture_seeds, }), )); command_loop(&sender, &mut handle, shared, writer).await } async fn command_loop( sender: &mpsc::UnboundedSender, handle: &mut PeerRuntimeHandle, shared: Arc, writer: JsonlWriter, ) -> eyre::Result<()> { let stdin = BufReader::new(tokio::io::stdin()); let mut lines = stdin.lines(); while let Some(line) = lines.next_line().await? { if line.trim().is_empty() { continue; } let envelope = match parse_command_line(&line) { Ok(envelope) => envelope, Err(err) => { writer.emit(error_line(&None, None, &err.to_string())); continue; } }; let command_name = envelope.command.name(); match handle_command(&envelope, sender, handle, &shared).await { Ok(data) => { writer.emit(result_line(&envelope.request_id, command_name, data)); if matches!(envelope.command, CliCommand::Shutdown) { break; } } Err(err) => { writer.emit(error_line( &envelope.request_id, Some(command_name), &err.to_string(), )); } } } Ok(()) } async fn handle_command( envelope: &CommandEnvelope, sender: &mpsc::UnboundedSender, handle: &mut PeerRuntimeHandle, shared: &Arc, ) -> eyre::Result { match &envelope.command { CliCommand::Status => status(shared).await, CliCommand::ListPeers => list_peers(shared).await, CliCommand::ListGames => list_games(shared).await, CliCommand::SetGameDir { path } => { sender.send(PeerCommand::SetGameDir(path.clone()))?; Ok(json!({"queued": true, "path": path})) } CliCommand::Download { game_id, install_after_download, } => { ensure_catalog_game(shared, game_id).await?; ensure_no_active_operation(shared, game_id).await?; let files = game_files_for_download(sender, shared, game_id).await?; sender.send(PeerCommand::DownloadGameFilesWithOptions { id: game_id.clone(), file_descriptions: files, install_after_download: *install_after_download, })?; Ok(json!({"queued": true, "game_id": game_id, "install": install_after_download})) } CliCommand::Install { game_id } => { ensure_catalog_game(shared, game_id).await?; ensure_no_active_operation(shared, game_id).await?; sender.send(PeerCommand::InstallGame { id: game_id.clone(), })?; Ok(json!({"queued": true, "game_id": game_id})) } CliCommand::Uninstall { game_id } => { ensure_catalog_game(shared, game_id).await?; ensure_no_active_operation(shared, game_id).await?; sender.send(PeerCommand::UninstallGame { id: game_id.clone(), })?; Ok(json!({"queued": true, "game_id": game_id})) } CliCommand::WaitPeers { count, timeout } => wait_peers(shared, *count, *timeout).await, CliCommand::Connect { addr } => { ensure_not_self_connect(shared, *addr).await?; sender.send(PeerCommand::ConnectPeer(*addr))?; Ok(json!({"queued": true, "addr": addr.to_string()})) } CliCommand::Shutdown => { handle.shutdown(); tokio::time::timeout(Duration::from_secs(5), handle.wait_stopped()) .await .wrap_err("timed out waiting for peer runtime shutdown")?; Ok(json!({"stopped": true})) } } } async fn status(shared: &SharedState) -> eyre::Result { let state = shared.state.read().await; let peer_count = shared.peer_game_db.read().await.peer_snapshots().len(); Ok(json!({ "local_peer": state.local_peer.clone(), "peer_count": peer_count, "local_games": state.local_games.len(), "remote_games": state.remote_games.len(), "active_operations": active_operations_json(&state.active_operations), })) } async fn list_peers(shared: &SharedState) -> eyre::Result { let peers = shared.peer_game_db.read().await.peer_snapshots(); Ok(json!({ "peers": peer_snapshots_json(&peers) })) } async fn list_games(shared: &SharedState) -> eyre::Result { let state = shared.state.read().await; let catalog = shared.catalog.read().await.clone(); let remote = shared .peer_game_db .read() .await .get_all_games() .into_iter() .filter(|game| catalog.contains(&game.id)) .collect::>(); Ok(json!({ "local": state.local_games.clone(), "remote": remote, "active_operations": active_operations_json(&state.active_operations), })) } async fn ensure_catalog_game(shared: &SharedState, game_id: &str) -> eyre::Result<()> { if shared.catalog.read().await.contains(game_id) { return Ok(()); } eyre::bail!("game {game_id} is not in the local catalog"); } async fn ensure_no_active_operation(shared: &SharedState, game_id: &str) -> eyre::Result<()> { let state = shared.state.read().await; if state .active_operations .iter() .any(|operation| operation.id == game_id) { eyre::bail!("operation already in progress for game {game_id}"); } Ok(()) } async fn ensure_not_self_connect(shared: &SharedState, addr: SocketAddr) -> eyre::Result<()> { let state = shared.state.read().await; if state .local_peer .as_ref() .is_some_and(|peer| peer.addr == addr.to_string()) { eyre::bail!("cannot connect peer to itself at {addr}"); } Ok(()) } async fn wait_peers(shared: &SharedState, count: usize, timeout: Duration) -> eyre::Result { let wait = async { loop { let peer_count = shared.peer_game_db.read().await.peer_snapshots().len(); if peer_count >= count { return peer_count; } shared.notify.notified().await; } }; let peer_count = tokio::time::timeout(timeout, wait) .await .wrap_err("timed out waiting for peers")?; Ok(json!({"peer_count": peer_count})) } async fn game_files_for_download( sender: &mpsc::UnboundedSender, shared: &SharedState, game_id: &str, ) -> eyre::Result> { { let mut state = shared.state.write().await; if let Some(files) = state.game_files.get(game_id).cloned() { return Ok(files); } state.unavailable_games.remove(game_id); } sender.send(PeerCommand::GetGame(game_id.to_string()))?; let wait = async { loop { let state = shared.state.read().await; if let Some(files) = state.game_files.get(game_id).cloned() { return Ok(files); } if state.unavailable_games.contains(game_id) { eyre::bail!("no peers have game {game_id}"); } drop(state); shared.notify.notified().await; } }; tokio::time::timeout(Duration::from_secs(10), wait) .await .wrap_err("timed out waiting for game file details")? } async fn event_loop( mut rx_events: mpsc::UnboundedReceiver, shared: Arc, writer: JsonlWriter, ) { while let Some(event) = rx_events.recv().await { let (event_name, data) = update_state_from_event(&shared, event).await; writer.emit(event_line(event_name, data)); shared.notify.notify_waiters(); } } async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'static str, Value) { match event { PeerEvent::LocalPeerReady { peer_id, addr } => { let local_peer = LocalPeer { peer_id, addr: addr.to_string(), }; shared.state.write().await.local_peer = Some(local_peer.clone()); ("local-peer-ready", json!(local_peer)) } PeerEvent::ListGames(games) => { let catalog = shared.catalog.read().await.clone(); let games = games .into_iter() .filter(|game| catalog.contains(&game.id)) .collect::>(); shared.state.write().await.remote_games = games.clone(); ("list-games", json!({ "games": games })) } PeerEvent::LocalLibraryChanged { games } => { let mut state = shared.state.write().await; state.local_games.clone_from(&games); ("local-library-changed", json!({ "games": games })) } PeerEvent::ActiveOperationsChanged { active_operations } => { let mut state = shared.state.write().await; state.active_operations.clone_from(&active_operations); ( "active-operations-changed", json!({ "active_operations": active_operations_json(&active_operations) }), ) } PeerEvent::GotGameFiles { id, file_descriptions, } => { shared .state .write() .await .game_files .insert(id.clone(), file_descriptions.clone()); ( "got-game-files", json!({"game_id": id, "file_descriptions": file_descriptions}), ) } PeerEvent::DownloadGameFilesBegin { id } => ("download-begin", json!({"game_id": id})), PeerEvent::DownloadGameFileChunkFinished { id, peer_addr, relative_path, offset, length, } => ( "download-chunk-finished", json!({ "game_id": id, "peer_addr": peer_addr.to_string(), "relative_path": relative_path, "offset": offset, "length": length, }), ), PeerEvent::DownloadGameFilesFinished { id } => game_id_event("download-finished", id), PeerEvent::DownloadGameFilesFailed { id } => game_id_event("download-failed", id), PeerEvent::DownloadGameFilesAllPeersGone { id } => game_id_event("download-peers-gone", id), PeerEvent::InstallGameBegin { id, operation } => ( "install-begin", json!({"game_id": id, "operation": install_operation_name(operation)}), ), PeerEvent::InstallGameFinished { id } => game_id_event("install-finished", id), PeerEvent::InstallGameFailed { id } => game_id_event("install-failed", id), PeerEvent::UninstallGameBegin { id } => game_id_event("uninstall-begin", id), PeerEvent::UninstallGameFinished { id } => game_id_event("uninstall-finished", id), PeerEvent::UninstallGameFailed { id } => game_id_event("uninstall-failed", id), PeerEvent::RemoveDownloadedGameBegin { id } => game_id_event("remove-download-begin", id), PeerEvent::RemoveDownloadedGameFinished { id } => { game_id_event("remove-download-finished", id) } PeerEvent::RemoveDownloadedGameFailed { id } => game_id_event("remove-download-failed", id), PeerEvent::NoPeersHaveGame { id } => no_peers_event(shared, id).await, PeerEvent::PeerConnected(addr) => ("peer-connected", peer_addr_json(addr)), PeerEvent::PeerDisconnected(addr) => ("peer-disconnected", peer_addr_json(addr)), PeerEvent::PeerDiscovered(addr) => ("peer-discovered", peer_addr_json(addr)), PeerEvent::PeerLost(addr) => ("peer-lost", peer_addr_json(addr)), PeerEvent::PeerCountUpdated(count) => ("peer-count-updated", json!({"count": count})), PeerEvent::RuntimeFailed { component, error } => ( "runtime-failed", json!({"component": runtime_component_name(component), "error": error}), ), } } fn game_id_event(kind: &'static str, id: String) -> (&'static str, Value) { (kind, json!({"game_id": id})) } async fn no_peers_event(shared: &SharedState, id: String) -> (&'static str, Value) { shared .state .write() .await .unavailable_games .insert(id.clone()); game_id_event("no-peers-have-game", id) } fn peer_addr_json(addr: SocketAddr) -> Value { json!({"addr": addr.to_string()}) } fn active_operations_json(active_operations: &[ActiveOperation]) -> Vec { active_operations .iter() .map(|operation| { json!({ "game_id": operation.id.clone(), "operation": active_operation_name(operation.operation), }) }) .collect() } fn peer_snapshots_json(peers: &[PeerSnapshot]) -> Vec { peers .iter() .map(|peer| { json!({ "peer_id": peer.peer_id.clone(), "addr": peer.addr.to_string(), "library_rev": peer.library_rev, "library_digest": peer.library_digest, "features": peer.features.clone(), "game_count": peer.game_count, "games": peer.games.clone(), }) }) .collect() } fn active_operation_name(operation: ActiveOperationKind) -> &'static str { (&operation).into() } fn install_operation_name(operation: InstallOperation) -> &'static str { (&operation).into() } fn runtime_component_name(component: PeerRuntimeComponent) -> &'static str { (&component).into() } fn seed_fixtures(game_dir: &Path, fixtures: &[String]) -> eyre::Result> { fixtures .iter() .map(|fixture| seed_fixture_game(game_dir, fixture)) .collect() } async fn load_catalog(catalog_db: Option<&Path>, fixtures: &[FixtureSeed]) -> HashSet { let mut catalog = HashSet::new(); if let Some(path) = catalog_db && path.exists() { match get_games(path).await { Ok(games) => catalog.extend(games.into_iter().map(|game| game.game_id)), Err(err) => eprintln!("failed to load catalog db {}: {err}", path.display()), } } catalog.extend(fixtures.iter().map(|seed| seed.game_id.clone())); catalog } fn parse_args() -> eyre::Result { let mut args = std::env::args_os().skip(1); let mut parsed = Args { name: "peer".to_string(), games_dir: PathBuf::from("games"), state_dir: PathBuf::from("state"), catalog_db: default_catalog_db(), fixtures: Vec::new(), unrar: None, }; while let Some(arg) = args.next() { match arg.to_str() { Some("--help" | "-h") => { print_help(); std::process::exit(0); } Some("--name") => parsed.name = next_string(&mut args, "--name")?, Some("--games-dir") => parsed.games_dir = next_path(&mut args, "--games-dir")?, Some("--state-dir") => parsed.state_dir = next_path(&mut args, "--state-dir")?, Some("--catalog-db") => parsed.catalog_db = Some(next_path(&mut args, "--catalog-db")?), Some("--fixture") => parsed.fixtures.push(next_string(&mut args, "--fixture")?), Some("--unrar") => parsed.unrar = Some(next_path(&mut args, "--unrar")?), Some(other) => eyre::bail!("unknown argument: {other}"), None => eyre::bail!("argument is not valid UTF-8: {arg:?}"), } } Ok(parsed) } fn default_catalog_db() -> Option { [ PathBuf::from("/app/game.db"), PathBuf::from("crates/lanspread-tauri-deno-ts/src-tauri/game.db"), ] .into_iter() .find(|path| path.exists()) } fn next_string(args: &mut impl Iterator, flag: &str) -> eyre::Result { args.next() .ok_or_else(|| eyre::eyre!("{flag} requires a value"))? .into_string() .map_err(|value| eyre::eyre!("{flag} value is not valid UTF-8: {value:?}")) } fn next_path(args: &mut impl Iterator, flag: &str) -> eyre::Result { Ok(PathBuf::from(next_string(args, flag)?)) } fn print_help() { eprintln!( "usage: lanspread-peer-cli [--name NAME] [--games-dir PATH] [--state-dir PATH] \\ [--catalog-db PATH] [--fixture GAME_ID] [--unrar PATH]\n\ Reads JSONL commands on stdin and writes result/event/error JSONL on stdout." ); }