Files
lanspread/crates/lanspread-peer/src/services/stream.rs
T
ddidderr 6c8a2bb9f0 feat(peer): add transactional local game operations
Implement the peer-owned state model from PLAN.md. A root-level version.ini
is now the download completion sentinel, local/ as a directory is the install
predicate, and exact root-level version.ini detection prevents nested files
from becoming sentinels by accident.

Add the peer operation table that gates downloads, installs, updates, and
uninstalls by game ID. Serving paths now reject non-catalog games, active
operations, missing sentinels, and any request that points under local/.
Remote aggregation treats LocalOnly peers as non-downloadable so they do not
contribute peer counts, candidate source selection, or latest-version checks.

Move install-side filesystem mutation into lanspread-peer::install. The new
module writes atomic .lanspread.json intents, uses .local.installing and
.local.backup with .lanspread_owned markers, and performs startup recovery
from recorded intent plus filesystem state. Downloads now buffer version.ini
chunks in memory and commit the sentinel last through .version.ini.tmp.

Replace the fixed 15-second monitor with notify-backed non-recursive watches,
per-ID rescan gating, and a 300-second fallback scan. The optimized rescan
path updates one cached library-index entry and active operation IDs preserve
their previous summary during scans.

Test Plan:
- just fmt
- just clippy
- just test
- just build

Refs: PLAN.md
2026-05-15 18:18:55 +02:00

403 lines
13 KiB
Rust

//! Request dispatch for a single bidirectional QUIC stream.
use std::net::SocketAddr;
use futures::{SinkExt, StreamExt};
use lanspread_db::db::{Game, GameFileDescription};
use lanspread_proto::{LibraryDelta, LibrarySnapshot, LibrarySummary, Message, Request, Response};
use s2n_quic::stream::{BidirectionalStream, SendStream};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::{
PeerEvent,
context::PeerCtx,
error::PeerError,
events,
local_games::{get_game_file_descriptions, is_local_dir_name, local_download_available},
peer::{send_game_file_chunk, send_game_file_data},
remote_peer::{ensure_peer_id_for_addr, update_peer_from_game_list},
services::handshake::{
accept_inbound_hello,
perform_handshake_with_peer,
spawn_library_resync,
},
};
type ResponseWriter = FramedWrite<SendStream, LengthDelimitedCodec>;
/// Handles a bidirectional stream from a peer.
pub(super) async fn handle_peer_stream(
stream: BidirectionalStream,
ctx: PeerCtx,
remote_addr: Option<SocketAddr>,
) -> eyre::Result<()> {
let (rx, tx) = stream.split();
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
log::trace!("{remote_addr:?} peer stream opened");
loop {
let next_message = tokio::select! {
() = ctx.shutdown.cancelled() => break,
next_message = framed_rx.next() => next_message,
};
match next_message {
Some(Ok(data)) => {
log::trace!(
"{:?} msg: (raw): {}",
remote_addr,
String::from_utf8_lossy(&data)
);
let request = Request::decode(data.freeze());
log::debug!("{remote_addr:?} msg: {request:?}");
note_peer_activity(&ctx, remote_addr).await;
framed_tx = dispatch_request(&ctx, remote_addr, request, framed_tx).await;
}
Some(Err(err)) => {
log::error!("{remote_addr:?} peer stream error: {err}");
break;
}
None => {
log::trace!("{remote_addr:?} peer stream closed");
break;
}
}
}
Ok(())
}
async fn dispatch_request(
ctx: &PeerCtx,
remote_addr: Option<SocketAddr>,
request: Request,
framed_tx: ResponseWriter,
) -> ResponseWriter {
match request {
Request::Ping => send_response(framed_tx, Response::Pong, "pong").await,
Request::Hello(hello) => {
let ack = accept_inbound_hello(ctx, remote_addr, hello).await;
send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await
}
Request::ListGames => handle_list_games(ctx, framed_tx).await,
Request::LibrarySummary(summary) => {
handle_library_summary(ctx, remote_addr, summary).await;
framed_tx
}
Request::LibrarySnapshot(snapshot) => {
handle_library_snapshot(ctx, remote_addr, snapshot).await;
framed_tx
}
Request::LibraryDelta(delta) => {
handle_library_delta(ctx, remote_addr, delta).await;
framed_tx
}
Request::GetGame { id } => handle_get_game(ctx, id, framed_tx).await,
Request::GetGameFileData(desc) => handle_file_data_request(ctx, desc, framed_tx).await,
Request::GetGameFileChunk {
game_id,
relative_path,
offset,
length,
} => {
handle_file_chunk_request(ctx, game_id, relative_path, offset, length, framed_tx).await
}
Request::Goodbye { peer_id } => {
handle_goodbye(ctx, remote_addr, peer_id).await;
framed_tx
}
Request::Invalid(_, _) => {
log::error!("Received invalid request from peer");
framed_tx
}
Request::AnnounceGames(games) => {
handle_announce_games(ctx, remote_addr, games).await;
framed_tx
}
}
}
async fn note_peer_activity(ctx: &PeerCtx, remote_addr: Option<SocketAddr>) {
if let Some(addr) = remote_addr {
ctx.peer_game_db
.write()
.await
.update_last_seen_by_addr(&addr);
}
}
async fn send_response(
mut framed_tx: ResponseWriter,
response: Response,
label: &str,
) -> ResponseWriter {
if let Err(err) = framed_tx.send(response.encode()).await {
log::error!("Failed to send {label} response: {err}");
}
framed_tx
}
async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> ResponseWriter {
log::info!("Received ListGames request from peer");
let snapshot = {
let db_guard = ctx.local_game_db.read().await;
if let Some(db) = db_guard.as_ref() {
db.all_games().into_iter().cloned().collect::<Vec<Game>>()
} else {
log::info!("Local game database not yet loaded, responding with empty game list");
Vec::new()
}
};
let games = if snapshot.is_empty() {
snapshot
} else {
let active_operations = ctx.active_operations.read().await;
snapshot
.into_iter()
.filter(|game| !active_operations.contains_key(&game.id))
.collect()
};
send_response(framed_tx, Response::ListGames(games), "ListGames").await
}
async fn handle_library_summary(
ctx: &PeerCtx,
remote_addr: Option<SocketAddr>,
summary: LibrarySummary,
) {
let Some(addr) = remote_addr else {
return;
};
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
let (previous_digest, previous_count, features) = {
let db = ctx.peer_game_db.read().await;
let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0));
(
digest,
db.peer_game_count(&peer_id),
db.peer_features(&peer_id),
)
};
{
let mut db = ctx.peer_game_db.write().await;
db.update_peer_library(
&peer_id,
summary.library_rev,
summary.library_digest,
features,
);
}
if summary.library_digest != previous_digest || previous_count == 0 {
ctx.task_tracker.spawn({
let peer_id_arc = ctx.peer_id.clone();
let local_library = ctx.local_library.clone();
let peer_game_db = ctx.peer_game_db.clone();
let tx_notify_ui = ctx.tx_notify_ui.clone();
async move {
if let Err(err) = perform_handshake_with_peer(
peer_id_arc,
local_library,
peer_game_db,
tx_notify_ui,
addr,
Some(peer_id),
)
.await
{
log::warn!("Failed to refresh library from {addr}: {err}");
}
}
});
}
}
async fn handle_library_snapshot(
ctx: &PeerCtx,
remote_addr: Option<SocketAddr>,
snapshot: LibrarySnapshot,
) {
if let Some(addr) = remote_addr {
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
{
let mut db = ctx.peer_game_db.write().await;
db.apply_library_snapshot(&peer_id, snapshot);
}
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
}
}
async fn handle_library_delta(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, delta: LibraryDelta) {
let Some(addr) = remote_addr else {
return;
};
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
let applied = {
let mut db = ctx.peer_game_db.write().await;
db.apply_library_delta(&peer_id, delta)
};
if applied {
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
} else {
spawn_library_resync(
ctx.peer_id.clone(),
ctx.local_library.clone(),
ctx.peer_game_db.clone(),
ctx.tx_notify_ui.clone(),
addr,
peer_id,
"resync",
);
}
}
async fn handle_get_game(ctx: &PeerCtx, id: String, framed_tx: ResponseWriter) -> ResponseWriter {
log::info!("Received GetGame request for {id} from peer");
let response = get_game_response(ctx, id).await;
send_response(framed_tx, response, "GetGame").await
}
async fn get_game_response(ctx: &PeerCtx, id: String) -> Response {
let game_dir = ctx.game_dir.read().await.clone();
if !can_serve_game(ctx, &game_dir, &id).await {
return Response::GameNotFound(id);
}
match get_game_file_descriptions(&id, &game_dir).await {
Ok(file_descriptions) => Response::GetGame {
id,
file_descriptions,
},
Err(PeerError::FileSizeDetermination { path, source }) => {
let error_msg = format!("Failed to determine file size for {path}: {source}");
log::error!("File size determination error for game {id}: {error_msg}");
Response::InternalPeerError(error_msg)
}
Err(err) => {
log::error!("Failed to get game file descriptions for {id}: {err}");
Response::GameNotFound(id)
}
}
}
async fn can_serve_game(ctx: &PeerCtx, game_dir: &std::path::Path, game_id: &str) -> bool {
let active_operations = ctx.active_operations.read().await;
let catalog = ctx.catalog.read().await;
local_download_available(game_dir, game_id, &active_operations, &catalog).await
}
fn path_points_inside_local(game_id: &str, relative_path: &str) -> bool {
let normalised = relative_path.replace('\\', "/");
let mut parts = normalised.split('/').filter(|part| !part.is_empty());
match (parts.next(), parts.next()) {
(Some(first), _) if is_local_dir_name(first) => true,
(Some(first), Some(second)) if first == game_id && is_local_dir_name(second) => true,
_ => false,
}
}
async fn handle_file_data_request(
ctx: &PeerCtx,
desc: GameFileDescription,
framed_tx: ResponseWriter,
) -> ResponseWriter {
log::info!(
"Received GetGameFileData request for {} from peer",
desc.relative_path
);
let mut tx = framed_tx.into_inner();
let game_dir = ctx.game_dir.read().await.clone();
if path_points_inside_local(&desc.game_id, &desc.relative_path)
|| !can_serve_game(ctx, &game_dir, &desc.game_id).await
{
log::info!(
"Declining GetGameFileData for {} because the game is not currently transferable",
desc.relative_path
);
let _ = tx.close().await;
return FramedWrite::new(tx, LengthDelimitedCodec::new());
}
send_game_file_data(&desc, &mut tx, &game_dir).await;
FramedWrite::new(tx, LengthDelimitedCodec::new())
}
async fn handle_file_chunk_request(
ctx: &PeerCtx,
game_id: String,
relative_path: String,
offset: u64,
length: u64,
framed_tx: ResponseWriter,
) -> ResponseWriter {
log::info!(
"Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
);
let mut tx = framed_tx.into_inner();
let game_dir = ctx.game_dir.read().await.clone();
if path_points_inside_local(&game_id, &relative_path)
|| !can_serve_game(ctx, &game_dir, &game_id).await
{
log::info!(
"Declining GetGameFileChunk for {relative_path} because the game is not currently transferable"
);
let _ = tx.close().await;
return FramedWrite::new(tx, LengthDelimitedCodec::new());
}
send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &game_dir).await;
FramedWrite::new(tx, LengthDelimitedCodec::new())
}
async fn handle_goodbye(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, peer_id: String) {
log::info!("Received Goodbye from peer {peer_id}");
let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) };
if removed.is_none() {
return;
}
if let Some(addr) = remote_addr {
events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, addr).await;
}
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
}
async fn handle_announce_games(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, games: Vec<Game>) {
log::info!(
"Received {} announced games from peer {remote_addr:?}",
games.len()
);
if let Some(addr) = remote_addr {
let aggregated_games = update_peer_from_game_list(&ctx.peer_game_db, addr, &games).await;
events::send(&ctx.tx_notify_ui, PeerEvent::ListGames(aggregated_games));
}
}
#[cfg(test)]
mod tests {
use super::path_points_inside_local;
#[test]
fn local_relative_paths_are_never_transferable() {
assert!(path_points_inside_local("game", "game/local/save.dat"));
assert!(path_points_inside_local("game", "local/save.dat"));
assert!(path_points_inside_local("game", "game\\local\\save.dat"));
assert!(!path_points_inside_local("game", "game/version.ini"));
assert!(!path_points_inside_local("game", "game/archive.eti"));
}
}