373def6d44
Add a streamed-install prototype that can receive archive-derived install bytes straight into local/ without first storing the peer-owned root archive payload. This is intended for low-disk clients that want to install a game but opt out of becoming a downloadable peer source for that game. The protocol gains a current-version-only StreamInstall request and framed StreamInstallFrame responses. The peer core owns the generic transport, transaction, path validation, size checks, CRC32 verification, and lifecycle state. The archive-specific work is hidden behind StreamInstallProvider so the prototype can use unrar while the final implementation can swap in a better provider without rewriting the peer command path. The receiver writes into .local.installing and only promotes to local/ after the full stream verifies. It deliberately does not write the root version.ini or archive files, so the settled local state is installed=true, downloaded=false, and availability=LocalOnly. That preserves the existing rule that local/ is not served to peers and makes streamed receivers non-sources by construction. The CLI is the only caller for now. It exposes stream-install and provides the prototype unrar implementation with unrar lt for entry metadata and unrar p for file bytes. This is simple and good enough to prove non-solid archive streaming, but it is not the production provider shape for solid archives because per-file unrar p would repeatedly decompress prefixes. The Tauri app explicitly passes stream_install_provider: None, so the GUI behavior stays unchanged until a real product path is designed. Document the production-readiness work in NEXT_STEPS.md. The main follow-up is to make the provider abstraction final-ish and replace the per-file CLI unrar provider with a one-pass archive provider, then wire a deliberate GUI low-disk mode, retry semantics, and broader failure scenarios. Test Plan: - just fmt - RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just test - python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py \ S39 S40 --build-image - RUSTC_WRAPPER= CARGO_BUILD_RUSTC_WRAPPER= just clippy - git diff --check - git diff --cached --check Follow-up: NEXT_STEPS.md
777 lines
25 KiB
Rust
777 lines
25 KiB
Rust
//! JSONL command-line harness for running a peer without the Tauri GUI.
|
|
|
|
use std::{
|
|
collections::{HashMap, HashSet},
|
|
ffi::OsString,
|
|
io::Write as _,
|
|
net::SocketAddr,
|
|
path::{Path, PathBuf},
|
|
sync::{Arc, Mutex},
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
use eyre::Context;
|
|
use lanspread_compat::eti::get_games;
|
|
use lanspread_db::db::{Game, GameCatalog, GameFileDescription};
|
|
use lanspread_peer::{
|
|
ActiveOperation,
|
|
ActiveOperationKind,
|
|
InstallOperation,
|
|
NoopStreamInstallProvider,
|
|
PeerCommand,
|
|
PeerEvent,
|
|
PeerGameDB,
|
|
PeerRuntimeComponent,
|
|
PeerRuntimeHandle,
|
|
PeerSnapshot,
|
|
PeerStartOptions,
|
|
StreamInstallProvider,
|
|
migrate_legacy_state,
|
|
start_peer_with_options,
|
|
};
|
|
use lanspread_peer_cli::{
|
|
CliCommand,
|
|
CommandEnvelope,
|
|
DEFAULT_FIXTURE_VERSION,
|
|
ExternalUnrarStreamProvider,
|
|
ExternalUnrarUnpacker,
|
|
FixtureSeed,
|
|
FixtureUnpacker,
|
|
error_line,
|
|
event_line,
|
|
parse_command_line,
|
|
result_line,
|
|
seed_fixture_game,
|
|
};
|
|
use serde_json::{Value, json};
|
|
use tokio::{
|
|
io::{AsyncBufReadExt, BufReader},
|
|
sync::{Notify, RwLock, mpsc},
|
|
};
|
|
|
|
#[derive(Debug)]
|
|
struct Args {
|
|
name: String,
|
|
games_dir: PathBuf,
|
|
state_dir: PathBuf,
|
|
catalog_db: Option<PathBuf>,
|
|
fixtures: Vec<String>,
|
|
unrar: Option<PathBuf>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct JsonlWriter {
|
|
lock: Arc<Mutex<()>>,
|
|
}
|
|
|
|
impl JsonlWriter {
|
|
fn new() -> Self {
|
|
Self {
|
|
lock: Arc::new(Mutex::new(())),
|
|
}
|
|
}
|
|
|
|
fn emit(&self, line: eyre::Result<String>) {
|
|
let line = match line {
|
|
Ok(line) => line,
|
|
Err(err) => {
|
|
eprintln!("failed to prepare JSONL output: {err}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let Ok(_guard) = self.lock.lock() else {
|
|
eprintln!("failed to lock stdout");
|
|
return;
|
|
};
|
|
|
|
let mut stdout = std::io::stdout().lock();
|
|
if let Err(err) = writeln!(stdout, "{line}") {
|
|
eprintln!("failed to write JSONL output: {err}");
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct CliState {
|
|
local_peer: Option<LocalPeer>,
|
|
local_games: Vec<Game>,
|
|
remote_games: Vec<Game>,
|
|
active_operations: Vec<ActiveOperation>,
|
|
game_files: HashMap<String, Vec<GameFileDescription>>,
|
|
unavailable_games: HashSet<String>,
|
|
downloads: HashMap<String, DownloadMeasurement>,
|
|
}
|
|
|
|
#[derive(Clone, serde::Serialize)]
|
|
struct LocalPeer {
|
|
peer_id: String,
|
|
addr: String,
|
|
}
|
|
|
|
struct DownloadMeasurement {
|
|
started_at: Instant,
|
|
bytes: u64,
|
|
chunks: u64,
|
|
}
|
|
|
|
struct SharedState {
|
|
state: RwLock<CliState>,
|
|
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
|
catalog: Arc<RwLock<GameCatalog>>,
|
|
notify: Notify,
|
|
games_dir: PathBuf,
|
|
state_dir: PathBuf,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> eyre::Result<()> {
|
|
let args = parse_args()?;
|
|
tokio::fs::create_dir_all(&args.games_dir).await?;
|
|
tokio::fs::create_dir_all(&args.state_dir).await?;
|
|
|
|
let fixture_seeds = seed_fixtures(&args.games_dir, &args.fixtures)?;
|
|
let catalog = load_catalog(args.catalog_db.as_deref(), &fixture_seeds).await;
|
|
let migration = migrate_legacy_state(&args.games_dir, &args.state_dir).await;
|
|
|
|
let (tx_events, rx_events) = mpsc::unbounded_channel();
|
|
let peer_game_db = Arc::new(RwLock::new(PeerGameDB::new()));
|
|
let catalog = Arc::new(RwLock::new(catalog));
|
|
let unrar_for_streaming = args.unrar.clone().or_else(default_unrar_program);
|
|
let unpacker: Arc<dyn lanspread_peer::Unpacker> = match args.unrar.clone() {
|
|
Some(path) => Arc::new(ExternalUnrarUnpacker::new(path)),
|
|
None => Arc::new(FixtureUnpacker),
|
|
};
|
|
let stream_install_provider: Arc<dyn StreamInstallProvider> = match unrar_for_streaming {
|
|
Some(path) => Arc::new(ExternalUnrarStreamProvider::new(path)),
|
|
None => Arc::new(NoopStreamInstallProvider),
|
|
};
|
|
|
|
let mut handle = start_peer_with_options(
|
|
args.games_dir.clone(),
|
|
tx_events,
|
|
peer_game_db.clone(),
|
|
unpacker,
|
|
catalog.clone(),
|
|
PeerStartOptions {
|
|
state_dir: Some(args.state_dir.clone()),
|
|
active_outbound_transfers: None,
|
|
stream_install_provider: Some(stream_install_provider),
|
|
},
|
|
)?;
|
|
let sender = handle.sender();
|
|
|
|
let shared = Arc::new(SharedState {
|
|
state: RwLock::new(CliState::default()),
|
|
peer_game_db,
|
|
catalog: catalog.clone(),
|
|
notify: Notify::new(),
|
|
games_dir: args.games_dir.clone(),
|
|
state_dir: args.state_dir.clone(),
|
|
});
|
|
let writer = JsonlWriter::new();
|
|
|
|
tokio::spawn(event_loop(rx_events, shared.clone(), writer.clone()));
|
|
|
|
writer.emit(event_line(
|
|
"cli-started",
|
|
json!({
|
|
"name": args.name,
|
|
"games_dir": args.games_dir,
|
|
"state_dir": args.state_dir,
|
|
"migration": migration,
|
|
"fixtures": fixture_seeds,
|
|
}),
|
|
));
|
|
|
|
command_loop(&sender, &mut handle, shared, writer).await
|
|
}
|
|
|
|
async fn command_loop(
|
|
sender: &mpsc::UnboundedSender<PeerCommand>,
|
|
handle: &mut PeerRuntimeHandle,
|
|
shared: Arc<SharedState>,
|
|
writer: JsonlWriter,
|
|
) -> eyre::Result<()> {
|
|
let stdin = BufReader::new(tokio::io::stdin());
|
|
let mut lines = stdin.lines();
|
|
|
|
while let Some(line) = lines.next_line().await? {
|
|
if line.trim().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
let envelope = match parse_command_line(&line) {
|
|
Ok(envelope) => envelope,
|
|
Err(err) => {
|
|
writer.emit(error_line(&None, None, &err.to_string()));
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let command_name = envelope.command.name();
|
|
match handle_command(&envelope, sender, handle, &shared).await {
|
|
Ok(data) => {
|
|
writer.emit(result_line(&envelope.request_id, command_name, data));
|
|
if matches!(envelope.command, CliCommand::Shutdown) {
|
|
break;
|
|
}
|
|
}
|
|
Err(err) => {
|
|
writer.emit(error_line(
|
|
&envelope.request_id,
|
|
Some(command_name),
|
|
&err.to_string(),
|
|
));
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_command(
|
|
envelope: &CommandEnvelope,
|
|
sender: &mpsc::UnboundedSender<PeerCommand>,
|
|
handle: &mut PeerRuntimeHandle,
|
|
shared: &Arc<SharedState>,
|
|
) -> eyre::Result<Value> {
|
|
match &envelope.command {
|
|
CliCommand::Status => status(shared).await,
|
|
CliCommand::ListPeers => list_peers(shared).await,
|
|
CliCommand::ListGames => list_games(shared).await,
|
|
CliCommand::SetGameDir { path } => {
|
|
sender.send(PeerCommand::SetGameDir(path.clone()))?;
|
|
Ok(json!({"queued": true, "path": path}))
|
|
}
|
|
CliCommand::Download {
|
|
game_id,
|
|
install_after_download,
|
|
} => {
|
|
ensure_catalog_game(shared, game_id).await?;
|
|
ensure_no_active_operation(shared, game_id).await?;
|
|
let files = game_files_for_download(sender, shared, game_id).await?;
|
|
sender.send(PeerCommand::DownloadGameFilesWithOptions {
|
|
id: game_id.clone(),
|
|
file_descriptions: files,
|
|
install_after_download: *install_after_download,
|
|
})?;
|
|
Ok(json!({"queued": true, "game_id": game_id, "install": install_after_download}))
|
|
}
|
|
CliCommand::StreamInstall { game_id } => {
|
|
ensure_catalog_game(shared, game_id).await?;
|
|
ensure_no_active_operation(shared, game_id).await?;
|
|
let _ = game_files_for_download(sender, shared, game_id).await?;
|
|
sender.send(PeerCommand::StreamInstallGame {
|
|
id: game_id.clone(),
|
|
})?;
|
|
Ok(json!({"queued": true, "game_id": game_id}))
|
|
}
|
|
CliCommand::Install { game_id } => {
|
|
ensure_catalog_game(shared, game_id).await?;
|
|
ensure_no_active_operation(shared, game_id).await?;
|
|
sender.send(PeerCommand::InstallGame {
|
|
id: game_id.clone(),
|
|
})?;
|
|
Ok(json!({"queued": true, "game_id": game_id}))
|
|
}
|
|
CliCommand::Play {
|
|
game_id,
|
|
username,
|
|
language,
|
|
} => play(shared, game_id, username, language.as_deref()).await,
|
|
CliCommand::Uninstall { game_id } => {
|
|
ensure_catalog_game(shared, game_id).await?;
|
|
ensure_no_active_operation(shared, game_id).await?;
|
|
sender.send(PeerCommand::UninstallGame {
|
|
id: game_id.clone(),
|
|
})?;
|
|
Ok(json!({"queued": true, "game_id": game_id}))
|
|
}
|
|
CliCommand::WaitPeers { count, timeout } => wait_peers(shared, *count, *timeout).await,
|
|
CliCommand::Connect { addr } => {
|
|
ensure_not_self_connect(shared, *addr).await?;
|
|
sender.send(PeerCommand::ConnectPeer(*addr))?;
|
|
Ok(json!({"queued": true, "addr": addr.to_string()}))
|
|
}
|
|
CliCommand::Shutdown => {
|
|
handle.shutdown();
|
|
tokio::time::timeout(Duration::from_secs(5), handle.wait_stopped())
|
|
.await
|
|
.wrap_err("timed out waiting for peer runtime shutdown")?;
|
|
Ok(json!({"stopped": true}))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn status(shared: &SharedState) -> eyre::Result<Value> {
|
|
let state = shared.state.read().await;
|
|
let peer_count = shared.peer_game_db.read().await.peer_snapshots().len();
|
|
Ok(json!({
|
|
"local_peer": state.local_peer.clone(),
|
|
"peer_count": peer_count,
|
|
"local_games": state.local_games.len(),
|
|
"remote_games": state.remote_games.len(),
|
|
"active_operations": active_operations_json(&state.active_operations),
|
|
}))
|
|
}
|
|
|
|
async fn list_peers(shared: &SharedState) -> eyre::Result<Value> {
|
|
let peers = shared.peer_game_db.read().await.peer_snapshots();
|
|
Ok(json!({ "peers": peer_snapshots_json(&peers) }))
|
|
}
|
|
|
|
async fn list_games(shared: &SharedState) -> eyre::Result<Value> {
|
|
let state = shared.state.read().await;
|
|
let catalog = shared.catalog.read().await;
|
|
let remote = shared.peer_game_db.read().await.get_catalog_games(&catalog);
|
|
Ok(json!({
|
|
"local": state.local_games.clone(),
|
|
"remote": remote,
|
|
"active_operations": active_operations_json(&state.active_operations),
|
|
}))
|
|
}
|
|
|
|
async fn play(
|
|
shared: &SharedState,
|
|
game_id: &str,
|
|
username: &str,
|
|
language: Option<&str>,
|
|
) -> eyre::Result<Value> {
|
|
ensure_catalog_game(shared, game_id).await?;
|
|
let game_root = shared.games_dir.join(game_id);
|
|
let outcome = lanspread_peer::apply_launch_settings_once(
|
|
&shared.state_dir,
|
|
&game_root,
|
|
game_id,
|
|
Some(username),
|
|
language,
|
|
)
|
|
.await?;
|
|
Ok(json!({ "game_id": game_id, "outcome": outcome }))
|
|
}
|
|
|
|
async fn ensure_catalog_game(shared: &SharedState, game_id: &str) -> eyre::Result<()> {
|
|
if shared.catalog.read().await.contains(game_id) {
|
|
return Ok(());
|
|
}
|
|
|
|
eyre::bail!("game {game_id} is not in the local catalog");
|
|
}
|
|
|
|
async fn ensure_no_active_operation(shared: &SharedState, game_id: &str) -> eyre::Result<()> {
|
|
let state = shared.state.read().await;
|
|
if state
|
|
.active_operations
|
|
.iter()
|
|
.any(|operation| operation.id == game_id)
|
|
{
|
|
eyre::bail!("operation already in progress for game {game_id}");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ensure_not_self_connect(shared: &SharedState, addr: SocketAddr) -> eyre::Result<()> {
|
|
let state = shared.state.read().await;
|
|
if state
|
|
.local_peer
|
|
.as_ref()
|
|
.is_some_and(|peer| peer.addr == addr.to_string())
|
|
{
|
|
eyre::bail!("cannot connect peer to itself at {addr}");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn wait_peers(shared: &SharedState, count: usize, timeout: Duration) -> eyre::Result<Value> {
|
|
let wait = async {
|
|
loop {
|
|
let peer_count = shared.peer_game_db.read().await.peer_snapshots().len();
|
|
if peer_count >= count {
|
|
return peer_count;
|
|
}
|
|
shared.notify.notified().await;
|
|
}
|
|
};
|
|
|
|
let peer_count = tokio::time::timeout(timeout, wait)
|
|
.await
|
|
.wrap_err("timed out waiting for peers")?;
|
|
Ok(json!({"peer_count": peer_count}))
|
|
}
|
|
|
|
async fn game_files_for_download(
|
|
sender: &mpsc::UnboundedSender<PeerCommand>,
|
|
shared: &SharedState,
|
|
game_id: &str,
|
|
) -> eyre::Result<Vec<GameFileDescription>> {
|
|
{
|
|
let mut state = shared.state.write().await;
|
|
if let Some(files) = state.game_files.get(game_id).cloned() {
|
|
return Ok(files);
|
|
}
|
|
state.unavailable_games.remove(game_id);
|
|
}
|
|
|
|
sender.send(PeerCommand::GetGame(game_id.to_string()))?;
|
|
let wait = async {
|
|
loop {
|
|
let state = shared.state.read().await;
|
|
if let Some(files) = state.game_files.get(game_id).cloned() {
|
|
return Ok(files);
|
|
}
|
|
if state.unavailable_games.contains(game_id) {
|
|
eyre::bail!("no peers have game {game_id}");
|
|
}
|
|
drop(state);
|
|
shared.notify.notified().await;
|
|
}
|
|
};
|
|
|
|
tokio::time::timeout(Duration::from_secs(10), wait)
|
|
.await
|
|
.wrap_err("timed out waiting for game file details")?
|
|
}
|
|
|
|
async fn event_loop(
|
|
mut rx_events: mpsc::UnboundedReceiver<PeerEvent>,
|
|
shared: Arc<SharedState>,
|
|
writer: JsonlWriter,
|
|
) {
|
|
while let Some(event) = rx_events.recv().await {
|
|
let (event_name, data) = update_state_from_event(&shared, event).await;
|
|
writer.emit(event_line(event_name, data));
|
|
shared.notify.notify_waiters();
|
|
}
|
|
}
|
|
|
|
#[allow(clippy::too_many_lines)]
|
|
async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'static str, Value) {
|
|
match event {
|
|
PeerEvent::LocalPeerReady { peer_id, addr } => {
|
|
let local_peer = LocalPeer {
|
|
peer_id,
|
|
addr: addr.to_string(),
|
|
};
|
|
shared.state.write().await.local_peer = Some(local_peer.clone());
|
|
("local-peer-ready", json!(local_peer))
|
|
}
|
|
PeerEvent::ListGames(games) => {
|
|
let catalog = shared.catalog.read().await.clone();
|
|
let games = games
|
|
.into_iter()
|
|
.filter(|game| catalog.contains(&game.id))
|
|
.collect::<Vec<_>>();
|
|
shared.state.write().await.remote_games = games.clone();
|
|
("list-games", json!({ "games": games }))
|
|
}
|
|
PeerEvent::LocalLibraryChanged { games } => {
|
|
let mut state = shared.state.write().await;
|
|
state.local_games.clone_from(&games);
|
|
("local-library-changed", json!({ "games": games }))
|
|
}
|
|
PeerEvent::OutboundTransferCountChanged => ("outbound-transfer-count-changed", json!({})),
|
|
PeerEvent::ActiveOperationsChanged { active_operations } => {
|
|
let mut state = shared.state.write().await;
|
|
state.active_operations.clone_from(&active_operations);
|
|
(
|
|
"active-operations-changed",
|
|
json!({ "active_operations": active_operations_json(&active_operations) }),
|
|
)
|
|
}
|
|
PeerEvent::GotGameFiles {
|
|
id,
|
|
file_descriptions,
|
|
} => {
|
|
shared
|
|
.state
|
|
.write()
|
|
.await
|
|
.game_files
|
|
.insert(id.clone(), file_descriptions.clone());
|
|
(
|
|
"got-game-files",
|
|
json!({"game_id": id, "file_descriptions": file_descriptions}),
|
|
)
|
|
}
|
|
PeerEvent::DownloadGameFilesBegin { id } => download_begin_event(shared, id).await,
|
|
PeerEvent::DownloadGameFileChunkFinished {
|
|
id,
|
|
peer_addr,
|
|
relative_path,
|
|
offset,
|
|
length,
|
|
} => {
|
|
download_chunk_finished_event(shared, id, peer_addr, relative_path, offset, length)
|
|
.await
|
|
}
|
|
PeerEvent::DownloadGameFilesProgress(progress) => (
|
|
"download-progress",
|
|
json!({
|
|
"game_id": progress.id,
|
|
"downloaded_bytes": progress.downloaded_bytes,
|
|
"total_bytes": progress.total_bytes,
|
|
"bytes_per_second": progress.bytes_per_second,
|
|
}),
|
|
),
|
|
PeerEvent::DownloadGameFilesFinished { id } => {
|
|
download_terminal_event(shared, "download-finished", id).await
|
|
}
|
|
PeerEvent::DownloadGameFilesFailed { id } => {
|
|
download_terminal_event(shared, "download-failed", id).await
|
|
}
|
|
PeerEvent::DownloadGameFilesAllPeersGone { id } => game_id_event("download-peers-gone", id),
|
|
PeerEvent::InstallGameBegin { id, operation } => (
|
|
"install-begin",
|
|
json!({"game_id": id, "operation": install_operation_name(operation)}),
|
|
),
|
|
PeerEvent::InstallGameFinished { id } => game_id_event("install-finished", id),
|
|
PeerEvent::InstallGameFailed { id } => game_id_event("install-failed", id),
|
|
PeerEvent::UninstallGameBegin { id } => game_id_event("uninstall-begin", id),
|
|
PeerEvent::UninstallGameFinished { id } => game_id_event("uninstall-finished", id),
|
|
PeerEvent::UninstallGameFailed { id } => game_id_event("uninstall-failed", id),
|
|
PeerEvent::RemoveDownloadedGameBegin { id } => game_id_event("remove-download-begin", id),
|
|
PeerEvent::RemoveDownloadedGameFinished { id } => {
|
|
game_id_event("remove-download-finished", id)
|
|
}
|
|
PeerEvent::RemoveDownloadedGameFailed { id } => game_id_event("remove-download-failed", id),
|
|
PeerEvent::NoPeersHaveGame { id } => no_peers_event(shared, id).await,
|
|
PeerEvent::PeerConnected(addr) => ("peer-connected", peer_addr_json(addr)),
|
|
PeerEvent::PeerDisconnected(addr) => ("peer-disconnected", peer_addr_json(addr)),
|
|
PeerEvent::PeerDiscovered(addr) => ("peer-discovered", peer_addr_json(addr)),
|
|
PeerEvent::PeerLost(addr) => ("peer-lost", peer_addr_json(addr)),
|
|
PeerEvent::PeerCountUpdated(count) => ("peer-count-updated", json!({"count": count})),
|
|
PeerEvent::RuntimeFailed { component, error } => (
|
|
"runtime-failed",
|
|
json!({"component": runtime_component_name(component), "error": error}),
|
|
),
|
|
}
|
|
}
|
|
|
|
fn game_id_event(kind: &'static str, id: String) -> (&'static str, Value) {
|
|
(kind, json!({"game_id": id}))
|
|
}
|
|
|
|
async fn download_begin_event(shared: &SharedState, id: String) -> (&'static str, Value) {
|
|
shared.state.write().await.downloads.insert(
|
|
id.clone(),
|
|
DownloadMeasurement {
|
|
started_at: Instant::now(),
|
|
bytes: 0,
|
|
chunks: 0,
|
|
},
|
|
);
|
|
game_id_event("download-begin", id)
|
|
}
|
|
|
|
async fn download_chunk_finished_event(
|
|
shared: &SharedState,
|
|
id: String,
|
|
peer_addr: SocketAddr,
|
|
relative_path: String,
|
|
offset: u64,
|
|
length: u64,
|
|
) -> (&'static str, Value) {
|
|
if let Some(measurement) = shared.state.write().await.downloads.get_mut(&id) {
|
|
measurement.bytes = measurement.bytes.saturating_add(length);
|
|
measurement.chunks = measurement.chunks.saturating_add(1);
|
|
}
|
|
|
|
(
|
|
"download-chunk-finished",
|
|
json!({
|
|
"game_id": id,
|
|
"peer_addr": peer_addr.to_string(),
|
|
"relative_path": relative_path,
|
|
"offset": offset,
|
|
"length": length,
|
|
}),
|
|
)
|
|
}
|
|
|
|
async fn download_terminal_event(
|
|
shared: &SharedState,
|
|
kind: &'static str,
|
|
id: String,
|
|
) -> (&'static str, Value) {
|
|
let measurement = shared.state.write().await.downloads.remove(&id);
|
|
let Some(measurement) = measurement else {
|
|
return game_id_event(kind, id);
|
|
};
|
|
|
|
let duration = measurement.started_at.elapsed();
|
|
let seconds = duration.as_secs_f64().max(f64::EPSILON);
|
|
#[allow(clippy::cast_precision_loss)]
|
|
let bytes = measurement.bytes as f64;
|
|
|
|
(
|
|
kind,
|
|
json!({
|
|
"game_id": id,
|
|
"throughput": {
|
|
"bytes": measurement.bytes,
|
|
"chunks": measurement.chunks,
|
|
"duration_ms": duration.as_secs_f64() * 1000.0,
|
|
"mib_per_s": bytes / seconds / 1_048_576.0,
|
|
"mbit_per_s": bytes * 8.0 / seconds / 1_000_000.0,
|
|
},
|
|
}),
|
|
)
|
|
}
|
|
|
|
async fn no_peers_event(shared: &SharedState, id: String) -> (&'static str, Value) {
|
|
shared
|
|
.state
|
|
.write()
|
|
.await
|
|
.unavailable_games
|
|
.insert(id.clone());
|
|
game_id_event("no-peers-have-game", id)
|
|
}
|
|
|
|
fn peer_addr_json(addr: SocketAddr) -> Value {
|
|
json!({"addr": addr.to_string()})
|
|
}
|
|
|
|
fn active_operations_json(active_operations: &[ActiveOperation]) -> Vec<Value> {
|
|
active_operations
|
|
.iter()
|
|
.map(|operation| {
|
|
json!({
|
|
"game_id": operation.id.clone(),
|
|
"operation": active_operation_name(operation.operation),
|
|
})
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
fn peer_snapshots_json(peers: &[PeerSnapshot]) -> Vec<Value> {
|
|
peers
|
|
.iter()
|
|
.map(|peer| {
|
|
json!({
|
|
"peer_id": peer.peer_id.clone(),
|
|
"addr": peer.addr.to_string(),
|
|
"library_rev": peer.library_rev,
|
|
"library_digest": peer.library_digest,
|
|
"features": peer.features.clone(),
|
|
"game_count": peer.game_count,
|
|
"games": peer.games.clone(),
|
|
})
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
fn active_operation_name(operation: ActiveOperationKind) -> &'static str {
|
|
(&operation).into()
|
|
}
|
|
|
|
fn install_operation_name(operation: InstallOperation) -> &'static str {
|
|
(&operation).into()
|
|
}
|
|
|
|
fn runtime_component_name(component: PeerRuntimeComponent) -> &'static str {
|
|
(&component).into()
|
|
}
|
|
|
|
fn seed_fixtures(game_dir: &Path, fixtures: &[String]) -> eyre::Result<Vec<FixtureSeed>> {
|
|
fixtures
|
|
.iter()
|
|
.map(|fixture| seed_fixture_game(game_dir, fixture))
|
|
.collect()
|
|
}
|
|
|
|
async fn load_catalog(catalog_db: Option<&Path>, fixtures: &[FixtureSeed]) -> GameCatalog {
|
|
let mut catalog = GameCatalog::empty();
|
|
if let Some(path) = catalog_db
|
|
&& path.exists()
|
|
{
|
|
match get_games(path).await {
|
|
Ok(games) => {
|
|
for game in games {
|
|
catalog.insert(game.game_id, Some(game.game_version));
|
|
}
|
|
}
|
|
Err(err) => eprintln!("failed to load catalog db {}: {err}", path.display()),
|
|
}
|
|
}
|
|
|
|
for seed in fixtures {
|
|
catalog.insert(
|
|
seed.game_id.clone(),
|
|
Some(DEFAULT_FIXTURE_VERSION.to_string()),
|
|
);
|
|
}
|
|
catalog
|
|
}
|
|
|
|
fn parse_args() -> eyre::Result<Args> {
|
|
let mut args = std::env::args_os().skip(1);
|
|
let mut parsed = Args {
|
|
name: "peer".to_string(),
|
|
games_dir: PathBuf::from("games"),
|
|
state_dir: PathBuf::from("state"),
|
|
catalog_db: default_catalog_db(),
|
|
fixtures: Vec::new(),
|
|
unrar: None,
|
|
};
|
|
|
|
while let Some(arg) = args.next() {
|
|
match arg.to_str() {
|
|
Some("--help" | "-h") => {
|
|
print_help();
|
|
std::process::exit(0);
|
|
}
|
|
Some("--name") => parsed.name = next_string(&mut args, "--name")?,
|
|
Some("--games-dir") => parsed.games_dir = next_path(&mut args, "--games-dir")?,
|
|
Some("--state-dir") => parsed.state_dir = next_path(&mut args, "--state-dir")?,
|
|
Some("--catalog-db") => parsed.catalog_db = Some(next_path(&mut args, "--catalog-db")?),
|
|
Some("--fixture") => parsed.fixtures.push(next_string(&mut args, "--fixture")?),
|
|
Some("--unrar") => parsed.unrar = Some(next_path(&mut args, "--unrar")?),
|
|
Some(other) => eyre::bail!("unknown argument: {other}"),
|
|
None => eyre::bail!("argument is not valid UTF-8: {arg:?}"),
|
|
}
|
|
}
|
|
|
|
Ok(parsed)
|
|
}
|
|
|
|
fn default_catalog_db() -> Option<PathBuf> {
|
|
[
|
|
PathBuf::from("/app/game.db"),
|
|
PathBuf::from("crates/lanspread-tauri-deno-ts/src-tauri/game.db"),
|
|
]
|
|
.into_iter()
|
|
.find(|path| path.exists())
|
|
}
|
|
|
|
fn default_unrar_program() -> Option<PathBuf> {
|
|
[
|
|
PathBuf::from("/usr/local/bin/unrar"),
|
|
PathBuf::from("/usr/bin/unrar"),
|
|
]
|
|
.into_iter()
|
|
.find(|path| path.exists())
|
|
}
|
|
|
|
fn next_string(args: &mut impl Iterator<Item = OsString>, flag: &str) -> eyre::Result<String> {
|
|
args.next()
|
|
.ok_or_else(|| eyre::eyre!("{flag} requires a value"))?
|
|
.into_string()
|
|
.map_err(|value| eyre::eyre!("{flag} value is not valid UTF-8: {value:?}"))
|
|
}
|
|
|
|
fn next_path(args: &mut impl Iterator<Item = OsString>, flag: &str) -> eyre::Result<PathBuf> {
|
|
Ok(PathBuf::from(next_string(args, flag)?))
|
|
}
|
|
|
|
fn print_help() {
|
|
eprintln!(
|
|
"usage: lanspread-peer-cli [--name NAME] [--games-dir PATH] [--state-dir PATH] \\
|
|
[--catalog-db PATH] [--fixture GAME_ID] [--unrar PATH]\n\
|
|
Reads JSONL commands on stdin and writes result/event/error JSONL on stdout."
|
|
);
|
|
}
|