refactor(peer): make startup directory-driven
Peer startup used to bootstrap itself by spawning the runtime and immediately sending a SetGameDir command back through its own control channel. The Tauri integration then polled shared state until a directory appeared and waited two seconds before asking peers for games. That made startup ordering implicit and left a race-prone sleep in the UI bridge. Install the initial game directory directly into the peer context instead. The runtime now attempts the initial local-library scan before starting discovery, then launches the server, discovery, liveness, and local monitor services from that initialized context. Later directory changes still use SetGameDir, so the existing UI command surface stays intact. Use PathBuf and Path references across peer filesystem boundaries so directory state is represented as a path rather than an optional string. The Tauri layer now validates a selected game directory before storing it, loads the bundled catalog on first use, and starts or updates the peer runtime from one helper. Peer event fan-out is split into named handlers so the Tauri setup closure only wires state and starts the event loop. Shutdown goodbye notifications are still best-effort, but they are now awaited with a short timeout instead of being spawned and forgotten. The tradeoff is a small bounded wait during peer runtime shutdown in exchange for clearer task ownership. Test Plan: - cargo test -p lanspread-peer - cargo clippy - cargo clippy --benches - cargo clippy --tests - cargo +nightly fmt - git diff --check Refs: none
This commit is contained in:
@@ -9,9 +9,10 @@ It is designed to run headless – other crates (most notably
|
||||
|
||||
- `start_peer(game_dir, tx_events, peer_game_db)` boots the asynchronous runtime in the
|
||||
background and returns an `UnboundedSender<PeerCommand>` that the caller uses
|
||||
for control. The function immediately forwards the supplied game directory via
|
||||
`PeerCommand::SetGameDir` and keeps using the provided `PeerGameDB` so the UI
|
||||
layer can observe live peer metadata.
|
||||
for control. The initial game directory is installed directly into the peer
|
||||
context, the local library scan is attempted before discovery starts, and the
|
||||
provided `PeerGameDB` remains shared so the UI layer can observe live peer
|
||||
metadata.
|
||||
- `PeerCommand` represents the small control surface exposed to the UI layer:
|
||||
`ListGames`, `GetGame`, `DownloadGameFiles`, and `SetGameDir`.
|
||||
- `PeerEvent` enumerates everything the peer runtime reports back to the UI:
|
||||
@@ -20,7 +21,7 @@ It is designed to run headless – other crates (most notably
|
||||
`Game` definitions, tracks the latest ETI version per title, and keeps the
|
||||
last seen list of `GameFileDescription` entries for each peer.
|
||||
|
||||
Internally the peer runtime owns three long-lived tasks that run for the
|
||||
Internally the peer runtime owns four long-lived tasks that run for the
|
||||
lifetime of the process:
|
||||
|
||||
1. **Server component** (`run_server_component`) – listens for QUIC connections,
|
||||
@@ -33,6 +34,8 @@ lifetime of the process:
|
||||
remains responsive.
|
||||
3. **Ping service** (`run_ping_service`) – periodically issues QUIC ping requests
|
||||
to keep peer liveness up to date and prunes stale entries from `PeerGameDB`.
|
||||
4. **Local game monitor** (`run_local_game_monitor`) – periodically rescans the
|
||||
configured game directory and announces local library deltas to known peers.
|
||||
|
||||
`scan_local_library` maintains a lightweight on-disk index and produces both a
|
||||
`GameDB` and protocol summaries. The resulting database is used to respond to
|
||||
@@ -80,9 +83,10 @@ The Tauri application embeds this crate in
|
||||
`GameDB`, per-game download state, and the user-selected game directory.
|
||||
- The Tauri commands (`request_games`, `install_game`, `update_game`, and
|
||||
`update_game_directory`) translate UI actions into `PeerCommand`s. In
|
||||
particular, `update_game_directory` records the filesystem path, kicks off the
|
||||
peer runtime on first use, and mirrors the installed/uninstalled state into
|
||||
the UI-facing database.
|
||||
particular, `update_game_directory` validates the filesystem path before
|
||||
storing it, loads the bundled catalog on first use, kicks off the peer runtime
|
||||
on demand, and mirrors the installed/uninstalled state into the UI-facing
|
||||
database.
|
||||
- A background task consumes `PeerEvent`s and fans them out to the front-end via
|
||||
Tauri publish/subscribe events (`games-list-updated`, `game-download-*`,
|
||||
`peer-*`). Successful downloads trigger an `unrar` sidecar to unpack ETI
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
net::SocketAddr,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
@@ -14,7 +15,7 @@ use crate::{PeerEvent, library::LocalLibraryState, peer_db::PeerGameDB};
|
||||
/// Main context for the peer system.
|
||||
#[derive(Clone)]
|
||||
pub struct Ctx {
|
||||
pub game_dir: Arc<RwLock<Option<String>>>,
|
||||
pub game_dir: Arc<RwLock<PathBuf>>,
|
||||
pub local_game_db: Arc<RwLock<Option<GameDB>>>,
|
||||
pub local_library: Arc<RwLock<LocalLibraryState>>,
|
||||
pub peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
@@ -27,7 +28,7 @@ pub struct Ctx {
|
||||
/// Context for peer connection handling.
|
||||
#[derive(Clone)]
|
||||
pub struct PeerCtx {
|
||||
pub game_dir: Arc<RwLock<Option<String>>>,
|
||||
pub game_dir: Arc<RwLock<PathBuf>>,
|
||||
pub local_game_db: Arc<RwLock<Option<GameDB>>>,
|
||||
pub local_library: Arc<RwLock<LocalLibraryState>>,
|
||||
pub local_peer_addr: Arc<RwLock<Option<SocketAddr>>>,
|
||||
@@ -50,9 +51,9 @@ impl std::fmt::Debug for PeerCtx {
|
||||
|
||||
impl Ctx {
|
||||
/// Creates a new context with the given peer game database.
|
||||
pub fn new(peer_game_db: Arc<RwLock<PeerGameDB>>, peer_id: String) -> Self {
|
||||
pub fn new(peer_game_db: Arc<RwLock<PeerGameDB>>, peer_id: String, game_dir: PathBuf) -> Self {
|
||||
Self {
|
||||
game_dir: Arc::new(RwLock::new(None)),
|
||||
game_dir: Arc::new(RwLock::new(game_dir)),
|
||||
local_game_db: Arc::new(RwLock::new(None)),
|
||||
local_library: Arc::new(RwLock::new(LocalLibraryState::empty())),
|
||||
peer_game_db,
|
||||
|
||||
@@ -503,7 +503,7 @@ pub async fn retry_failed_chunks(
|
||||
pub async fn download_game_files(
|
||||
game_id: &str,
|
||||
game_file_descs: Vec<GameFileDescription>,
|
||||
games_folder: String,
|
||||
games_folder: PathBuf,
|
||||
peers: Vec<SocketAddr>,
|
||||
file_peer_map: HashMap<String, Vec<SocketAddr>>,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
@@ -512,8 +512,7 @@ pub async fn download_game_files(
|
||||
eyre::bail!("no peers available for game {game_id}");
|
||||
}
|
||||
|
||||
let base_dir = PathBuf::from(&games_folder);
|
||||
prepare_game_storage(&base_dir, &game_file_descs).await?;
|
||||
prepare_game_storage(&games_folder, &game_file_descs).await?;
|
||||
|
||||
tx_notify_ui.send(PeerEvent::DownloadGameFilesBegin {
|
||||
id: game_id.to_string(),
|
||||
@@ -523,7 +522,7 @@ pub async fn download_game_files(
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for (peer_addr, plan) in plans {
|
||||
let base_dir = base_dir.clone();
|
||||
let base_dir = games_folder.clone();
|
||||
let game_id = game_id.to_string();
|
||||
tasks.push(tokio::spawn(async move {
|
||||
download_from_peer(peer_addr, &game_id, plan, base_dir).await
|
||||
@@ -565,8 +564,14 @@ pub async fn download_game_files(
|
||||
if !failed_chunks.is_empty() && !peers.is_empty() {
|
||||
log::info!("Retrying {} failed chunks", failed_chunks.len());
|
||||
|
||||
let retry_results =
|
||||
retry_failed_chunks(failed_chunks, &peers, &base_dir, game_id, &file_peer_map).await;
|
||||
let retry_results = retry_failed_chunks(
|
||||
failed_chunks,
|
||||
&peers,
|
||||
&games_folder,
|
||||
game_id,
|
||||
&file_peer_map,
|
||||
)
|
||||
.await;
|
||||
|
||||
for chunk_result in retry_results {
|
||||
if let Err(e) = chunk_result.result {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Command handlers for peer commands.
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
|
||||
|
||||
use lanspread_db::db::GameFileDescription;
|
||||
use tokio::sync::{RwLock, mpsc::UnboundedSender};
|
||||
@@ -39,9 +39,6 @@ async fn try_serve_local_game(
|
||||
id: &str,
|
||||
) -> bool {
|
||||
let game_dir = { ctx.game_dir.read().await.clone() };
|
||||
let Some(game_dir) = game_dir else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let downloading = ctx.downloading_games.read().await;
|
||||
if !local_download_available(&game_dir, id, &downloading).await {
|
||||
@@ -145,10 +142,6 @@ pub async fn handle_download_game_files_command(
|
||||
) {
|
||||
log::info!("Got PeerCommand::DownloadGameFiles");
|
||||
let games_folder = { ctx.game_dir.read().await.clone() };
|
||||
let Some(games_folder) = games_folder else {
|
||||
log::error!("Cannot handle game file descriptions: games_folder is not set");
|
||||
return;
|
||||
};
|
||||
|
||||
// Use majority validation to get trusted file descriptions and peer whitelist
|
||||
let (validated_descriptions, peer_whitelist, file_peer_map) = {
|
||||
@@ -264,22 +257,17 @@ pub async fn handle_download_game_files_command(
|
||||
pub async fn handle_set_game_dir_command(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
game_dir: String,
|
||||
game_dir: PathBuf,
|
||||
) {
|
||||
*ctx.game_dir.write().await = Some(game_dir.clone());
|
||||
log::info!("Game directory set to: {game_dir}");
|
||||
*ctx.game_dir.write().await = game_dir.clone();
|
||||
log::info!("Game directory set to: {}", game_dir.display());
|
||||
|
||||
// Load local game database when game directory is set
|
||||
let game_dir = game_dir.clone();
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
match scan_local_library(&game_dir).await {
|
||||
Ok(scan) => {
|
||||
update_and_announce_games(&ctx_clone, &tx_notify_ui, scan).await;
|
||||
log::info!("Local game database loaded successfully");
|
||||
}
|
||||
match load_local_library(&ctx_clone, &tx_notify_ui).await {
|
||||
Ok(()) => log::info!("Local game database loaded successfully"),
|
||||
Err(e) => {
|
||||
log::error!("Failed to load local game database: {e}");
|
||||
}
|
||||
@@ -287,6 +275,17 @@ pub async fn handle_set_game_dir_command(
|
||||
});
|
||||
}
|
||||
|
||||
/// Loads the configured local library and announces the result.
|
||||
pub async fn load_local_library(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
let game_dir = { ctx.game_dir.read().await.clone() };
|
||||
let scan = scan_local_library(&game_dir).await?;
|
||||
update_and_announce_games(ctx, tx_notify_ui, scan).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles the `GetPeerCount` command.
|
||||
pub async fn handle_get_peer_count_command(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
||||
log::info!("GetPeerCount command received");
|
||||
|
||||
@@ -33,7 +33,7 @@ mod startup;
|
||||
// Public re-exports
|
||||
// =============================================================================
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
|
||||
|
||||
pub use config::{CHUNK_SIZE, MAX_RETRY_COUNT};
|
||||
pub use error::PeerError;
|
||||
@@ -52,6 +52,7 @@ use crate::{
|
||||
handle_get_peer_count_command,
|
||||
handle_list_games_command,
|
||||
handle_set_game_dir_command,
|
||||
load_local_library,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -106,7 +107,7 @@ pub enum PeerCommand {
|
||||
file_descriptions: Vec<GameFileDescription>,
|
||||
},
|
||||
/// Set the local game directory.
|
||||
SetGameDir(String),
|
||||
SetGameDir(PathBuf),
|
||||
/// Request the current peer count.
|
||||
GetPeerCount,
|
||||
}
|
||||
@@ -131,22 +132,22 @@ pub enum PeerCommand {
|
||||
///
|
||||
/// A channel sender for sending commands to the peer system.
|
||||
pub fn start_peer(
|
||||
game_dir: String,
|
||||
game_dir: impl Into<PathBuf>,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
) -> eyre::Result<UnboundedSender<PeerCommand>> {
|
||||
log::info!("Starting peer system with game directory: {game_dir}");
|
||||
let game_dir = game_dir.into();
|
||||
log::info!(
|
||||
"Starting peer system with game directory: {}",
|
||||
game_dir.display()
|
||||
);
|
||||
let peer_id = identity::load_or_create_peer_id()?;
|
||||
|
||||
let (tx_control, rx_control) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let tx_control_clone = tx_control.clone();
|
||||
startup::spawn_peer_runtime(rx_control, tx_notify_ui, peer_game_db, peer_id);
|
||||
startup::spawn_peer_runtime(rx_control, tx_notify_ui, peer_game_db, peer_id, game_dir);
|
||||
|
||||
// Set the game directory
|
||||
tx_control.send(PeerCommand::SetGameDir(game_dir))?;
|
||||
|
||||
Ok(tx_control_clone)
|
||||
Ok(tx_control)
|
||||
}
|
||||
|
||||
/// Main peer execution loop that handles peer commands and manages the peer system.
|
||||
@@ -155,11 +156,15 @@ async fn run_peer(
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
peer_id: String,
|
||||
game_dir: PathBuf,
|
||||
) -> eyre::Result<()> {
|
||||
let ctx = Ctx::new(peer_game_db.clone(), peer_id);
|
||||
startup::spawn_startup_services(&ctx, &tx_notify_ui)?;
|
||||
let ctx = Ctx::new(peer_game_db, peer_id, game_dir);
|
||||
if let Err(err) = load_local_library(&ctx, &tx_notify_ui).await {
|
||||
log::error!("Failed to load initial local game database: {err}");
|
||||
}
|
||||
startup::spawn_startup_services(&ctx, &tx_notify_ui);
|
||||
handle_peer_commands(&ctx, &tx_notify_ui, &mut rx_control).await;
|
||||
startup::spawn_goodbye_notifications(&ctx).await;
|
||||
startup::send_goodbye_notifications(&ctx).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -169,11 +174,7 @@ async fn handle_peer_commands(
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
rx_control: &mut UnboundedReceiver<PeerCommand>,
|
||||
) {
|
||||
loop {
|
||||
let Some(cmd) = rx_control.recv().await else {
|
||||
break;
|
||||
};
|
||||
|
||||
while let Some(cmd) = rx_control.recv().await {
|
||||
match cmd {
|
||||
PeerCommand::ListGames => {
|
||||
handle_list_games_command(ctx, tx_notify_ui).await;
|
||||
|
||||
@@ -55,7 +55,7 @@ pub async fn local_dir_has_content(path: &Path) -> bool {
|
||||
|
||||
/// Checks if a game is available for download locally.
|
||||
pub async fn local_download_available(
|
||||
game_dir: &str,
|
||||
game_dir: &Path,
|
||||
game_id: &str,
|
||||
downloading_games: &HashSet<String>,
|
||||
) -> bool {
|
||||
@@ -64,7 +64,7 @@ pub async fn local_download_available(
|
||||
return false;
|
||||
}
|
||||
|
||||
let game_path = PathBuf::from(game_dir).join(game_id);
|
||||
let game_path = game_dir.join(game_id);
|
||||
let eti_path = game_path.join(format!("{game_id}.eti"));
|
||||
|
||||
if tokio::fs::metadata(&eti_path).await.is_err() {
|
||||
@@ -109,10 +109,8 @@ pub struct LocalLibraryScan {
|
||||
pub revision: u64,
|
||||
}
|
||||
|
||||
fn library_index_path(game_dir: &str) -> PathBuf {
|
||||
PathBuf::from(game_dir)
|
||||
.join(LIBRARY_INDEX_DIR)
|
||||
.join(LIBRARY_INDEX_FILE)
|
||||
fn library_index_path(game_dir: &Path) -> PathBuf {
|
||||
game_dir.join(LIBRARY_INDEX_DIR).join(LIBRARY_INDEX_FILE)
|
||||
}
|
||||
|
||||
async fn load_library_index(path: &Path) -> LibraryIndex {
|
||||
@@ -408,10 +406,10 @@ fn empty_scan() -> LocalLibraryScan {
|
||||
// =============================================================================
|
||||
|
||||
/// Scans the local game directory and returns summaries plus a game database.
|
||||
pub async fn scan_local_library(game_dir: &str) -> eyre::Result<LocalLibraryScan> {
|
||||
let game_path = PathBuf::from(game_dir);
|
||||
pub async fn scan_local_library(game_dir: impl AsRef<Path>) -> eyre::Result<LocalLibraryScan> {
|
||||
let game_path = game_dir.as_ref();
|
||||
|
||||
let metadata = match tokio::fs::metadata(&game_path).await {
|
||||
let metadata = match tokio::fs::metadata(game_path).await {
|
||||
Ok(metadata) => metadata,
|
||||
Err(err) => {
|
||||
if err.kind() == ErrorKind::NotFound {
|
||||
@@ -433,14 +431,14 @@ pub async fn scan_local_library(game_dir: &str) -> eyre::Result<LocalLibraryScan
|
||||
return Ok(empty_scan());
|
||||
}
|
||||
|
||||
let index_path = library_index_path(game_dir);
|
||||
let index_path = library_index_path(game_path);
|
||||
let mut index = load_library_index(&index_path).await;
|
||||
let mut seen_ids = HashSet::new();
|
||||
let mut summaries = HashMap::new();
|
||||
let mut games = Vec::new();
|
||||
let mut changed = false;
|
||||
|
||||
let mut entries = tokio::fs::read_dir(&game_path).await?;
|
||||
let mut entries = tokio::fs::read_dir(game_path).await?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
if !path.is_dir() {
|
||||
@@ -451,7 +449,7 @@ pub async fn scan_local_library(game_dir: &str) -> eyre::Result<LocalLibraryScan
|
||||
continue;
|
||||
};
|
||||
|
||||
let update = update_index_for_game(&game_path, game_id, &mut index).await?;
|
||||
let update = update_index_for_game(game_path, game_id, &mut index).await?;
|
||||
changed |= update.changed;
|
||||
|
||||
let Some(summary) = update.summary else {
|
||||
@@ -493,7 +491,7 @@ pub async fn scan_local_library(game_dir: &str) -> eyre::Result<LocalLibraryScan
|
||||
/// Gets file descriptions for a game from the local filesystem.
|
||||
pub async fn get_game_file_descriptions(
|
||||
game_id: &str,
|
||||
game_dir: &str,
|
||||
game_dir: impl AsRef<Path>,
|
||||
) -> Result<Vec<GameFileDescription>, PeerError> {
|
||||
scan_game_descriptions(game_id, &PathBuf::from(game_dir)).await
|
||||
scan_game_descriptions(game_id, game_dir.as_ref()).await
|
||||
}
|
||||
|
||||
@@ -24,14 +24,12 @@ pub async fn run_local_game_monitor(tx_notify_ui: UnboundedSender<PeerEvent>, ct
|
||||
interval.tick().await;
|
||||
|
||||
let game_dir = { ctx.game_dir.read().await.clone() };
|
||||
if let Some(game_dir) = game_dir {
|
||||
match scan_local_library(&game_dir).await {
|
||||
Ok(scan) => {
|
||||
update_and_announce_games(&ctx, &tx_notify_ui, scan).await;
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Failed to scan local games directory: {err}");
|
||||
}
|
||||
match scan_local_library(&game_dir).await {
|
||||
Ok(scan) => {
|
||||
update_and_announce_games(&ctx, &tx_notify_ui, scan).await;
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Failed to scan local games directory: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Request dispatch for a single bidirectional QUIC stream.
|
||||
|
||||
use std::{net::SocketAddr, path::PathBuf};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use lanspread_db::db::{Game, GameFileDescription};
|
||||
@@ -269,9 +269,7 @@ async fn get_game_response(ctx: &PeerCtx, id: String) -> Response {
|
||||
return Response::GameNotFound(id);
|
||||
}
|
||||
|
||||
let Some(game_dir) = ctx.game_dir.read().await.clone() else {
|
||||
return Response::GameNotFound(id);
|
||||
};
|
||||
let game_dir = ctx.game_dir.read().await.clone();
|
||||
|
||||
let has_game = {
|
||||
let db_guard = ctx.local_game_db.read().await;
|
||||
@@ -311,18 +309,10 @@ async fn handle_file_data_request(
|
||||
desc.relative_path
|
||||
);
|
||||
|
||||
let Some(game_dir) = ctx.game_dir.read().await.clone() else {
|
||||
return send_invalid_request(
|
||||
framed_tx,
|
||||
desc.relative_path.as_bytes().to_vec(),
|
||||
"Game directory not set",
|
||||
)
|
||||
.await;
|
||||
};
|
||||
let game_dir = ctx.game_dir.read().await.clone();
|
||||
|
||||
let base_dir = PathBuf::from(game_dir);
|
||||
let mut tx = framed_tx.into_inner();
|
||||
send_game_file_data(&desc, &mut tx, &base_dir).await;
|
||||
send_game_file_data(&desc, &mut tx, &game_dir).await;
|
||||
FramedWrite::new(tx, LengthDelimitedCodec::new())
|
||||
}
|
||||
|
||||
@@ -338,34 +328,13 @@ async fn handle_file_chunk_request(
|
||||
"Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
|
||||
);
|
||||
|
||||
let Some(game_dir) = ctx.game_dir.read().await.clone() else {
|
||||
return send_invalid_request(
|
||||
framed_tx,
|
||||
relative_path.as_bytes().to_vec(),
|
||||
"Game directory not set",
|
||||
)
|
||||
.await;
|
||||
};
|
||||
let game_dir = ctx.game_dir.read().await.clone();
|
||||
|
||||
let base_dir = PathBuf::from(game_dir);
|
||||
let mut tx = framed_tx.into_inner();
|
||||
send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &base_dir).await;
|
||||
send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &game_dir).await;
|
||||
FramedWrite::new(tx, LengthDelimitedCodec::new())
|
||||
}
|
||||
|
||||
async fn send_invalid_request(
|
||||
framed_tx: ResponseWriter,
|
||||
raw_request: Vec<u8>,
|
||||
message: &str,
|
||||
) -> ResponseWriter {
|
||||
send_response(
|
||||
framed_tx,
|
||||
Response::InvalidRequest(raw_request.into(), message.to_string()),
|
||||
"InvalidRequest",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_goodbye(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, peer_id: String) {
|
||||
log::info!("Received Goodbye from peer {peer_id}");
|
||||
let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) };
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Peer runtime task startup and shutdown orchestration.
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
|
||||
|
||||
use tokio::sync::{
|
||||
RwLock,
|
||||
@@ -22,44 +22,42 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
const EPHEMERAL_SERVER_ADDR: &str = "0.0.0.0:0";
|
||||
|
||||
pub(crate) fn spawn_peer_runtime(
|
||||
rx_control: UnboundedReceiver<PeerCommand>,
|
||||
tx_notify_ui: UnboundedSender<PeerEvent>,
|
||||
peer_game_db: Arc<RwLock<PeerGameDB>>,
|
||||
peer_id: String,
|
||||
game_dir: PathBuf,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = run_peer(rx_control, tx_notify_ui, peer_game_db, peer_id).await {
|
||||
if let Err(err) = run_peer(rx_control, tx_notify_ui, peer_game_db, peer_id, game_dir).await
|
||||
{
|
||||
log::error!("Peer system failed: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_startup_services(
|
||||
ctx: &Ctx,
|
||||
tx_notify_ui: &UnboundedSender<PeerEvent>,
|
||||
) -> eyre::Result<()> {
|
||||
spawn_quic_server(ctx, tx_notify_ui)?;
|
||||
pub(crate) fn spawn_startup_services(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
||||
spawn_quic_server(ctx, tx_notify_ui);
|
||||
spawn_peer_discovery_service(ctx, tx_notify_ui);
|
||||
spawn_peer_liveness_service(ctx, tx_notify_ui);
|
||||
spawn_local_library_monitor(ctx, tx_notify_ui);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_goodbye_notifications(ctx: &Ctx) {
|
||||
pub(crate) async fn send_goodbye_notifications(ctx: &Ctx) {
|
||||
let peer_id = ctx.peer_id.as_ref().clone();
|
||||
let peer_addresses = { ctx.peer_game_db.read().await.get_peer_addresses() };
|
||||
|
||||
for peer_addr in peer_addresses {
|
||||
spawn_goodbye_notification(peer_addr, peer_id.clone());
|
||||
}
|
||||
futures::future::join_all(
|
||||
peer_addresses
|
||||
.into_iter()
|
||||
.map(|peer_addr| send_goodbye_notification(peer_addr, peer_id.clone())),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) -> eyre::Result<()> {
|
||||
let server_addr = EPHEMERAL_SERVER_ADDR.parse::<SocketAddr>()?;
|
||||
fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
||||
let server_addr = SocketAddr::from(([0, 0, 0, 0], 0));
|
||||
let peer_ctx = ctx.to_peer_ctx(tx_notify_ui.clone());
|
||||
let tx_notify_ui = tx_notify_ui.clone();
|
||||
|
||||
@@ -68,8 +66,6 @@ fn spawn_quic_server(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) -> ey
|
||||
log::error!("Server component error: {err}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spawn_peer_discovery_service(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEvent>) {
|
||||
@@ -107,10 +103,10 @@ fn spawn_local_library_monitor(ctx: &Ctx, tx_notify_ui: &UnboundedSender<PeerEve
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_goodbye_notification(peer_addr: SocketAddr, peer_id: String) {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = send_goodbye(peer_addr, peer_id).await {
|
||||
log::warn!("Failed to send Goodbye to {peer_addr}: {err}");
|
||||
}
|
||||
});
|
||||
async fn send_goodbye_notification(peer_addr: SocketAddr, peer_id: String) {
|
||||
match tokio::time::timeout(Duration::from_secs(1), send_goodbye(peer_addr, peer_id)).await {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(err)) => log::warn!("Failed to send Goodbye to {peer_addr}: {err}"),
|
||||
Err(_) => log::warn!("Timed out sending Goodbye to {peer_addr}"),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user