5d58791192
The peer core already emits NoPeersHaveGame when a requested game cannot be
served by any known peer. The JSONL harness still waited for the generic file
detail timeout before returning the download command error, which made the
manual scenario slower and less precise.
Correlate the existing no-peers event with the pending CLI download command so
the harness returns a deterministic error immediately. This is harness
bookkeeping only; game availability and peer behavior remain owned by
lanspread-peer.
Test Plan:
- just fmt
- just test
- just clippy
- just peer-cli-build
- just peer-cli-image
- just peer-cli-alpha, just peer-cli-bravo, just peer-cli-charlie
- In charlie, send {"cmd":"download","game_id":"not-a-game"}; observe
no-peers-have-game followed by error "no peers have game not-a-game"
Refs: PEER_CLI_SCENARIOS.md
555 lines
18 KiB
Rust
555 lines
18 KiB
Rust
//! JSONL command-line harness for running a peer without the Tauri GUI.
|
|
|
|
use std::{
|
|
collections::{HashMap, HashSet},
|
|
ffi::OsString,
|
|
io::Write as _,
|
|
net::SocketAddr,
|
|
path::{Path, PathBuf},
|
|
sync::{Arc, Mutex},
|
|
time::Duration,
|
|
};
|
|
|
|
use eyre::Context;
|
|
use lanspread_compat::eti::get_games;
|
|
use lanspread_db::db::{Game, GameFileDescription};
|
|
use lanspread_peer::{
|
|
ActiveOperation,
|
|
ActiveOperationKind,
|
|
InstallOperation,
|
|
PeerCommand,
|
|
PeerEvent,
|
|
PeerGameDB,
|
|
PeerRuntimeComponent,
|
|
PeerRuntimeHandle,
|
|
PeerSnapshot,
|
|
PeerStartOptions,
|
|
start_peer_with_options,
|
|
};
|
|
use lanspread_peer_cli::{
|
|
CliCommand,
|
|
CommandEnvelope,
|
|
ExternalUnrarUnpacker,
|
|
FixtureSeed,
|
|
FixtureUnpacker,
|
|
error_line,
|
|
event_line,
|
|
parse_command_line,
|
|
result_line,
|
|
seed_fixture_game,
|
|
};
|
|
use serde_json::{Value, json};
|
|
use tokio::{
|
|
io::{AsyncBufReadExt, BufReader},
|
|
sync::{Notify, RwLock, mpsc},
|
|
};
|
|
|
|
#[derive(Debug)]
|
|
struct Args {
|
|
name: String,
|
|
games_dir: PathBuf,
|
|
state_dir: PathBuf,
|
|
catalog_db: Option<PathBuf>,
|
|
fixtures: Vec<String>,
|
|
unrar: Option<PathBuf>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct JsonlWriter {
|
|
lock: Arc<Mutex<()>>,
|
|
}
|
|
|
|
impl JsonlWriter {
|
|
fn new() -> Self {
|
|
Self {
|
|
lock: Arc::new(Mutex::new(())),
|
|
}
|
|
}
|
|
|
|
fn emit(&self, line: eyre::Result<String>) {
|
|
let line = match line {
|
|
Ok(line) => line,
|
|
Err(err) => {
|
|
eprintln!("failed to prepare JSONL output: {err}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let Ok(_guard) = self.lock.lock() else {
|
|
eprintln!("failed to lock stdout");
|
|
return;
|
|
};
|
|
|
|
let mut stdout = std::io::stdout().lock();
|
|
if let Err(err) = writeln!(stdout, "{line}") {
|
|
eprintln!("failed to write JSONL output: {err}");
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct CliState {
|
|
local_peer: Option<LocalPeer>,
|
|
local_games: Vec<Game>,
|
|
remote_games: Vec<Game>,
|
|
active_operations: Vec<ActiveOperation>,
|
|
game_files: HashMap<String, Vec<GameFileDescription>>,
|
|
unavailable_games: HashSet<String>,
|
|
}
|
|
|
|
#[derive(Clone, serde::Serialize)]
|
|
struct LocalPeer {
|
|
peer_id: String,
|
|
addr: String,
|
|
}
|
|
|
|
struct SharedState {
|
|
state: RwLock<CliState>,
|
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
|
notify: Notify,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> eyre::Result<()> {
|
|
let args = parse_args()?;
|
|
tokio::fs::create_dir_all(&args.games_dir).await?;
|
|
tokio::fs::create_dir_all(&args.state_dir).await?;
|
|
|
|
let fixture_seeds = seed_fixtures(&args.games_dir, &args.fixtures)?;
|
|
let catalog = load_catalog(args.catalog_db.as_deref(), &fixture_seeds).await;
|
|
|
|
let (tx_events, rx_events) = mpsc::unbounded_channel();
|
|
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
|
|
let catalog = Arc::new(RwLock::new(catalog));
|
|
let unpacker: Arc<dyn lanspread_peer::Unpacker> = match args.unrar {
|
|
Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)),
|
|
None => Arc::new(FixtureUnpacker),
|
|
};
|
|
|
|
let mut handle = start_peer_with_options(
|
|
args.games_dir.clone(),
|
|
tx_events,
|
|
peer_game_db.clone(),
|
|
unpacker,
|
|
catalog,
|
|
PeerStartOptions {
|
|
state_dir: Some(args.state_dir.clone()),
|
|
},
|
|
)?;
|
|
let sender = handle.sender();
|
|
|
|
let shared = Arc::new(SharedState {
|
|
state: RwLock::new(CliState::default()),
|
|
peer_game_db,
|
|
notify: Notify::new(),
|
|
});
|
|
let writer = JsonlWriter::new();
|
|
|
|
tokio::spawn(event_loop(rx_events, shared.clone(), writer.clone()));
|
|
|
|
writer.emit(event_line(
|
|
"cli-started",
|
|
json!({
|
|
"name": args.name,
|
|
"games_dir": args.games_dir,
|
|
"state_dir": args.state_dir,
|
|
"fixtures": fixture_seeds,
|
|
}),
|
|
));
|
|
|
|
command_loop(&sender, &mut handle, shared, writer).await
|
|
}
|
|
|
|
async fn command_loop(
|
|
sender: &mpsc::UnboundedSender<PeerCommand>,
|
|
handle: &mut PeerRuntimeHandle,
|
|
shared: Arc<SharedState>,
|
|
writer: JsonlWriter,
|
|
) -> eyre::Result<()> {
|
|
let stdin = BufReader::new(tokio::io::stdin());
|
|
let mut lines = stdin.lines();
|
|
|
|
while let Some(line) = lines.next_line().await? {
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let envelope = match parse_command_line(&line) {
|
|
Ok(envelope) => envelope,
|
|
Err(err) => {
|
|
writer.emit(error_line(&None, None, &err.to_string()));
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let command_name = envelope.command.name();
|
|
match handle_command(&envelope, sender, handle, &shared).await {
|
|
Ok(data) => {
|
|
writer.emit(result_line(&envelope.request_id, command_name, data));
|
|
if matches!(envelope.command, CliCommand::Shutdown) {
|
|
break;
|
|
}
|
|
}
|
|
Err(err) => {
|
|
writer.emit(error_line(
|
|
&envelope.request_id,
|
|
Some(command_name),
|
|
&err.to_string(),
|
|
));
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_command(
|
|
envelope: &CommandEnvelope,
|
|
sender: &mpsc::UnboundedSender<PeerCommand>,
|
|
handle: &mut PeerRuntimeHandle,
|
|
shared: &Arc<SharedState>,
|
|
) -> eyre::Result<Value> {
|
|
match &envelope.command {
|
|
CliCommand::Status => status(shared).await,
|
|
CliCommand::ListPeers => list_peers(shared).await,
|
|
CliCommand::ListGames => list_games(shared).await,
|
|
CliCommand::SetGameDir { path } => {
|
|
sender.send(PeerCommand::SetGameDir(path.clone()))?;
|
|
Ok(json!({"queued": true, "path": path}))
|
|
}
|
|
CliCommand::Download {
|
|
game_id,
|
|
install_after_download,
|
|
} => {
|
|
let files = game_files_for_download(sender, shared, game_id).await?;
|
|
sender.send(PeerCommand::DownloadGameFilesWithOptions {
|
|
id: game_id.clone(),
|
|
file_descriptions: files,
|
|
install_after_download: *install_after_download,
|
|
})?;
|
|
Ok(json!({"queued": true, "game_id": game_id, "install": install_after_download}))
|
|
}
|
|
CliCommand::Install { game_id } => {
|
|
sender.send(PeerCommand::InstallGame {
|
|
id: game_id.clone(),
|
|
})?;
|
|
Ok(json!({"queued": true, "game_id": game_id}))
|
|
}
|
|
CliCommand::Uninstall { game_id } => {
|
|
sender.send(PeerCommand::UninstallGame {
|
|
id: game_id.clone(),
|
|
})?;
|
|
Ok(json!({"queued": true, "game_id": game_id}))
|
|
}
|
|
CliCommand::WaitPeers { count, timeout } => wait_peers(shared, *count, *timeout).await,
|
|
CliCommand::Connect { addr } => {
|
|
sender.send(PeerCommand::ConnectPeer(*addr))?;
|
|
Ok(json!({"queued": true, "addr": addr.to_string()}))
|
|
}
|
|
CliCommand::Shutdown => {
|
|
handle.shutdown();
|
|
tokio::time::timeout(Duration::from_secs(5), handle.wait_stopped())
|
|
.await
|
|
.wrap_err("timed out waiting for peer runtime shutdown")?;
|
|
Ok(json!({"stopped": true}))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn status(shared: &SharedState) -> eyre::Result<Value> {
|
|
let state = shared.state.read().await;
|
|
let peer_count = shared.peer_game_db.read().await.peer_snapshots().len();
|
|
Ok(json!({
|
|
"local_peer": state.local_peer.clone(),
|
|
"peer_count": peer_count,
|
|
"local_games": state.local_games.len(),
|
|
"remote_games": state.remote_games.len(),
|
|
"active_operations": active_operations_json(&state.active_operations),
|
|
}))
|
|
}
|
|
|
|
async fn list_peers(shared: &SharedState) -> eyre::Result<Value> {
|
|
let peers = shared.peer_game_db.read().await.peer_snapshots();
|
|
Ok(json!({ "peers": peer_snapshots_json(&peers) }))
|
|
}
|
|
|
|
async fn list_games(shared: &SharedState) -> eyre::Result<Value> {
|
|
let state = shared.state.read().await;
|
|
let remote = shared.peer_game_db.read().await.get_all_games();
|
|
Ok(json!({
|
|
"local": state.local_games.clone(),
|
|
"remote": remote,
|
|
"active_operations": active_operations_json(&state.active_operations),
|
|
}))
|
|
}
|
|
|
|
async fn wait_peers(shared: &SharedState, count: usize, timeout: Duration) -> eyre::Result<Value> {
|
|
let wait = async {
|
|
loop {
|
|
let peer_count = shared.peer_game_db.read().await.peer_snapshots().len();
|
|
if peer_count >= count {
|
|
return peer_count;
|
|
}
|
|
shared.notify.notified().await;
|
|
}
|
|
};
|
|
|
|
let peer_count = tokio::time::timeout(timeout, wait)
|
|
.await
|
|
.wrap_err("timed out waiting for peers")?;
|
|
Ok(json!({"peer_count": peer_count}))
|
|
}
|
|
|
|
async fn game_files_for_download(
|
|
sender: &mpsc::UnboundedSender<PeerCommand>,
|
|
shared: &SharedState,
|
|
game_id: &str,
|
|
) -> eyre::Result<Vec<GameFileDescription>> {
|
|
{
|
|
let mut state = shared.state.write().await;
|
|
if let Some(files) = state.game_files.get(game_id).cloned() {
|
|
return Ok(files);
|
|
}
|
|
state.unavailable_games.remove(game_id);
|
|
}
|
|
|
|
sender.send(PeerCommand::GetGame(game_id.to_string()))?;
|
|
let wait = async {
|
|
loop {
|
|
let state = shared.state.read().await;
|
|
if let Some(files) = state.game_files.get(game_id).cloned() {
|
|
return Ok(files);
|
|
}
|
|
if state.unavailable_games.contains(game_id) {
|
|
eyre::bail!("no peers have game {game_id}");
|
|
}
|
|
drop(state);
|
|
shared.notify.notified().await;
|
|
}
|
|
};
|
|
|
|
tokio::time::timeout(Duration::from_secs(10), wait)
|
|
.await
|
|
.wrap_err("timed out waiting for game file details")?
|
|
}
|
|
|
|
async fn event_loop(
|
|
mut rx_events: mpsc::UnboundedReceiver<PeerEvent>,
|
|
shared: Arc<SharedState>,
|
|
writer: JsonlWriter,
|
|
) {
|
|
while let Some(event) = rx_events.recv().await {
|
|
let (event_name, data) = update_state_from_event(&shared, event).await;
|
|
writer.emit(event_line(event_name, data));
|
|
shared.notify.notify_waiters();
|
|
}
|
|
}
|
|
|
|
async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'static str, Value) {
|
|
match event {
|
|
PeerEvent::LocalPeerReady { peer_id, addr } => {
|
|
let local_peer = LocalPeer {
|
|
peer_id,
|
|
addr: addr.to_string(),
|
|
};
|
|
shared.state.write().await.local_peer = Some(local_peer.clone());
|
|
("local-peer-ready", json!(local_peer))
|
|
}
|
|
PeerEvent::ListGames(games) => {
|
|
shared.state.write().await.remote_games = games.clone();
|
|
("list-games", json!({ "games": games }))
|
|
}
|
|
PeerEvent::LocalGamesUpdated {
|
|
games,
|
|
active_operations,
|
|
} => {
|
|
let mut state = shared.state.write().await;
|
|
state.local_games.clone_from(&games);
|
|
state.active_operations.clone_from(&active_operations);
|
|
(
|
|
"local-games-updated",
|
|
json!({
|
|
"games": games,
|
|
"active_operations": active_operations_json(&active_operations),
|
|
}),
|
|
)
|
|
}
|
|
PeerEvent::GotGameFiles {
|
|
id,
|
|
file_descriptions,
|
|
} => {
|
|
shared
|
|
.state
|
|
.write()
|
|
.await
|
|
.game_files
|
|
.insert(id.clone(), file_descriptions.clone());
|
|
(
|
|
"got-game-files",
|
|
json!({"game_id": id, "file_descriptions": file_descriptions}),
|
|
)
|
|
}
|
|
PeerEvent::DownloadGameFilesBegin { id } => ("download-begin", json!({"game_id": id})),
|
|
PeerEvent::DownloadGameFilesFinished { id } => {
|
|
("download-finished", json!({"game_id": id}))
|
|
}
|
|
PeerEvent::DownloadGameFilesFailed { id } => ("download-failed", json!({"game_id": id})),
|
|
PeerEvent::DownloadGameFilesAllPeersGone { id } => {
|
|
("download-peers-gone", json!({"game_id": id}))
|
|
}
|
|
PeerEvent::InstallGameBegin { id, operation } => (
|
|
"install-begin",
|
|
json!({"game_id": id, "operation": install_operation_name(operation)}),
|
|
),
|
|
PeerEvent::InstallGameFinished { id } => ("install-finished", json!({"game_id": id})),
|
|
PeerEvent::InstallGameFailed { id } => ("install-failed", json!({"game_id": id})),
|
|
PeerEvent::UninstallGameBegin { id } => ("uninstall-begin", json!({"game_id": id})),
|
|
PeerEvent::UninstallGameFinished { id } => ("uninstall-finished", json!({"game_id": id})),
|
|
PeerEvent::UninstallGameFailed { id } => ("uninstall-failed", json!({"game_id": id})),
|
|
PeerEvent::NoPeersHaveGame { id } => {
|
|
shared
|
|
.state
|
|
.write()
|
|
.await
|
|
.unavailable_games
|
|
.insert(id.clone());
|
|
("no-peers-have-game", json!({"game_id": id}))
|
|
}
|
|
PeerEvent::PeerConnected(addr) => ("peer-connected", peer_addr_json(addr)),
|
|
PeerEvent::PeerDisconnected(addr) => ("peer-disconnected", peer_addr_json(addr)),
|
|
PeerEvent::PeerDiscovered(addr) => ("peer-discovered", peer_addr_json(addr)),
|
|
PeerEvent::PeerLost(addr) => ("peer-lost", peer_addr_json(addr)),
|
|
PeerEvent::PeerCountUpdated(count) => ("peer-count-updated", json!({"count": count})),
|
|
PeerEvent::RuntimeFailed { component, error } => (
|
|
"runtime-failed",
|
|
json!({"component": runtime_component_name(component), "error": error}),
|
|
),
|
|
}
|
|
}
|
|
|
|
fn peer_addr_json(addr: SocketAddr) -> Value {
|
|
json!({"addr": addr.to_string()})
|
|
}
|
|
|
|
fn active_operations_json(active_operations: &[ActiveOperation]) -> Vec<Value> {
|
|
active_operations
|
|
.iter()
|
|
.map(|operation| {
|
|
json!({
|
|
"game_id": operation.id.clone(),
|
|
"operation": active_operation_name(operation.operation),
|
|
})
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
fn peer_snapshots_json(peers: &[PeerSnapshot]) -> Vec<Value> {
|
|
peers
|
|
.iter()
|
|
.map(|peer| {
|
|
json!({
|
|
"peer_id": peer.peer_id.clone(),
|
|
"addr": peer.addr.to_string(),
|
|
"library_rev": peer.library_rev,
|
|
"library_digest": peer.library_digest,
|
|
"features": peer.features.clone(),
|
|
"game_count": peer.game_count,
|
|
"games": peer.games.clone(),
|
|
})
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
fn active_operation_name(operation: ActiveOperationKind) -> &'static str {
|
|
(&operation).into()
|
|
}
|
|
|
|
fn install_operation_name(operation: InstallOperation) -> &'static str {
|
|
(&operation).into()
|
|
}
|
|
|
|
fn runtime_component_name(component: PeerRuntimeComponent) -> &'static str {
|
|
(&component).into()
|
|
}
|
|
|
|
fn seed_fixtures(game_dir: &Path, fixtures: &[String]) -> eyre::Result<Vec<FixtureSeed>> {
|
|
fixtures
|
|
.iter()
|
|
.map(|fixture| seed_fixture_game(game_dir, fixture))
|
|
.collect()
|
|
}
|
|
|
|
async fn load_catalog(catalog_db: Option<&Path>, fixtures: &[FixtureSeed]) -> HashSet<String> {
|
|
let mut catalog = HashSet::new();
|
|
if let Some(path) = catalog_db
|
|
&& path.exists()
|
|
{
|
|
match get_games(path).await {
|
|
Ok(games) => catalog.extend(games.into_iter().map(|game| game.game_id)),
|
|
Err(err) => eprintln!("failed to load catalog db {}: {err}", path.display()),
|
|
}
|
|
}
|
|
|
|
catalog.extend(fixtures.iter().map(|seed| seed.game_id.clone()));
|
|
catalog
|
|
}
|
|
|
|
fn parse_args() -> eyre::Result<Args> {
|
|
let mut args = std::env::args_os().skip(1);
|
|
let mut parsed = Args {
|
|
name: "peer".to_string(),
|
|
games_dir: PathBuf::from("games"),
|
|
state_dir: PathBuf::from("state"),
|
|
catalog_db: default_catalog_db(),
|
|
fixtures: Vec::new(),
|
|
unrar: None,
|
|
};
|
|
|
|
while let Some(arg) = args.next() {
|
|
match arg.to_str() {
|
|
Some("--help" | "-h") => {
|
|
print_help();
|
|
std::process::exit(0);
|
|
}
|
|
Some("--name") => parsed.name = next_string(&mut args, "--name")?,
|
|
Some("--games-dir") => parsed.games_dir = next_path(&mut args, "--games-dir")?,
|
|
Some("--state-dir") => parsed.state_dir = next_path(&mut args, "--state-dir")?,
|
|
Some("--catalog-db") => parsed.catalog_db = Some(next_path(&mut args, "--catalog-db")?),
|
|
Some("--fixture") => parsed.fixtures.push(next_string(&mut args, "--fixture")?),
|
|
Some("--unrar") => parsed.unrar = Some(next_path(&mut args, "--unrar")?),
|
|
Some(other) => eyre::bail!("unknown argument: {other}"),
|
|
None => eyre::bail!("argument is not valid UTF-8: {arg:?}"),
|
|
}
|
|
}
|
|
|
|
Ok(parsed)
|
|
}
|
|
|
|
fn default_catalog_db() -> Option<PathBuf> {
|
|
[
|
|
PathBuf::from("/app/game.db"),
|
|
PathBuf::from("crates/lanspread-tauri-deno-ts/src-tauri/game.db"),
|
|
]
|
|
.into_iter()
|
|
.find(|path| path.exists())
|
|
}
|
|
|
|
fn next_string(args: &mut impl Iterator<Item = OsString>, flag: &str) -> eyre::Result<String> {
|
|
args.next()
|
|
.ok_or_else(|| eyre::eyre!("{flag} requires a value"))?
|
|
.into_string()
|
|
.map_err(|value| eyre::eyre!("{flag} value is not valid UTF-8: {value:?}"))
|
|
}
|
|
|
|
fn next_path(args: &mut impl Iterator<Item = OsString>, flag: &str) -> eyre::Result<PathBuf> {
|
|
Ok(PathBuf::from(next_string(args, flag)?))
|
|
}
|
|
|
|
fn print_help() {
|
|
eprintln!(
|
|
"usage: lanspread-peer-cli [--name NAME] [--games-dir PATH] [--state-dir PATH] \\
|
|
[--catalog-db PATH] [--fixture GAME_ID] [--unrar PATH]\n\
|
|
Reads JSONL commands on stdin and writes result/event/error JSONL on stdout."
|
|
);
|
|
}
|