ChatGPT Codex 5.5 xhigh refactored even more
This commit is contained in:
@@ -0,0 +1,397 @@
|
||||
//! Request dispatch for a single bidirectional QUIC stream.
|
||||
|
||||
use std::{net::SocketAddr, path::PathBuf};
|
||||
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use lanspread_db::db::{Game, GameFileDescription};
|
||||
use lanspread_proto::{LibraryDelta, LibrarySnapshot, LibrarySummary, Message, Request, Response};
|
||||
use s2n_quic::stream::{BidirectionalStream, SendStream};
|
||||
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
|
||||
|
||||
use crate::{
|
||||
PeerEvent,
|
||||
context::PeerCtx,
|
||||
error::PeerError,
|
||||
events,
|
||||
local_games::get_game_file_descriptions,
|
||||
peer::{send_game_file_chunk, send_game_file_data},
|
||||
remote_peer::{ensure_peer_id_for_addr, update_peer_from_game_list},
|
||||
services::handshake::{
|
||||
accept_inbound_hello,
|
||||
perform_handshake_with_peer,
|
||||
spawn_library_resync,
|
||||
},
|
||||
};
|
||||
|
||||
type ResponseWriter = FramedWrite<SendStream, LengthDelimitedCodec>;
|
||||
|
||||
/// Handles a bidirectional stream from a peer.
|
||||
pub(super) async fn handle_peer_stream(
|
||||
stream: BidirectionalStream,
|
||||
ctx: PeerCtx,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
) -> eyre::Result<()> {
|
||||
let (rx, tx) = stream.split();
|
||||
let mut framed_rx = FramedRead::new(rx, LengthDelimitedCodec::new());
|
||||
let mut framed_tx = FramedWrite::new(tx, LengthDelimitedCodec::new());
|
||||
|
||||
log::trace!("{remote_addr:?} peer stream opened");
|
||||
|
||||
loop {
|
||||
match framed_rx.next().await {
|
||||
Some(Ok(data)) => {
|
||||
log::trace!(
|
||||
"{:?} msg: (raw): {}",
|
||||
remote_addr,
|
||||
String::from_utf8_lossy(&data)
|
||||
);
|
||||
|
||||
let request = Request::decode(data.freeze());
|
||||
log::debug!("{remote_addr:?} msg: {request:?}");
|
||||
note_peer_activity(&ctx, remote_addr).await;
|
||||
framed_tx = dispatch_request(&ctx, remote_addr, request, framed_tx).await;
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
log::error!("{remote_addr:?} peer stream error: {err}");
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
log::trace!("{remote_addr:?} peer stream closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn dispatch_request(
|
||||
ctx: &PeerCtx,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
request: Request,
|
||||
framed_tx: ResponseWriter,
|
||||
) -> ResponseWriter {
|
||||
match request {
|
||||
Request::Ping => send_response(framed_tx, Response::Pong, "pong").await,
|
||||
Request::Hello(hello) => {
|
||||
let ack = accept_inbound_hello(ctx, remote_addr, hello).await;
|
||||
send_response(framed_tx, Response::HelloAck(ack), "HelloAck").await
|
||||
}
|
||||
Request::ListGames => handle_list_games(ctx, framed_tx).await,
|
||||
Request::LibrarySummary(summary) => {
|
||||
handle_library_summary(ctx, remote_addr, summary).await;
|
||||
framed_tx
|
||||
}
|
||||
Request::LibrarySnapshot(snapshot) => {
|
||||
handle_library_snapshot(ctx, remote_addr, snapshot).await;
|
||||
framed_tx
|
||||
}
|
||||
Request::LibraryDelta(delta) => {
|
||||
handle_library_delta(ctx, remote_addr, delta).await;
|
||||
framed_tx
|
||||
}
|
||||
Request::GetGame { id } => handle_get_game(ctx, id, framed_tx).await,
|
||||
Request::GetGameFileData(desc) => handle_file_data_request(ctx, desc, framed_tx).await,
|
||||
Request::GetGameFileChunk {
|
||||
game_id,
|
||||
relative_path,
|
||||
offset,
|
||||
length,
|
||||
} => {
|
||||
handle_file_chunk_request(ctx, game_id, relative_path, offset, length, framed_tx).await
|
||||
}
|
||||
Request::Goodbye { peer_id } => {
|
||||
handle_goodbye(ctx, remote_addr, peer_id).await;
|
||||
framed_tx
|
||||
}
|
||||
Request::Invalid(_, _) => {
|
||||
log::error!("Received invalid request from peer");
|
||||
framed_tx
|
||||
}
|
||||
Request::AnnounceGames(games) => {
|
||||
handle_announce_games(ctx, remote_addr, games).await;
|
||||
framed_tx
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn note_peer_activity(ctx: &PeerCtx, remote_addr: Option<SocketAddr>) {
|
||||
if let Some(addr) = remote_addr {
|
||||
ctx.peer_game_db
|
||||
.write()
|
||||
.await
|
||||
.update_last_seen_by_addr(&addr);
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_response(
|
||||
mut framed_tx: ResponseWriter,
|
||||
response: Response,
|
||||
label: &str,
|
||||
) -> ResponseWriter {
|
||||
if let Err(err) = framed_tx.send(response.encode()).await {
|
||||
log::error!("Failed to send {label} response: {err}");
|
||||
}
|
||||
framed_tx
|
||||
}
|
||||
|
||||
async fn handle_list_games(ctx: &PeerCtx, framed_tx: ResponseWriter) -> ResponseWriter {
|
||||
log::info!("Received ListGames request from peer");
|
||||
let snapshot = {
|
||||
let db_guard = ctx.local_game_db.read().await;
|
||||
if let Some(db) = db_guard.as_ref() {
|
||||
db.all_games().into_iter().cloned().collect::<Vec<Game>>()
|
||||
} else {
|
||||
log::info!("Local game database not yet loaded, responding with empty game list");
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
|
||||
let games = if snapshot.is_empty() {
|
||||
snapshot
|
||||
} else {
|
||||
let downloading = ctx.downloading_games.read().await;
|
||||
snapshot
|
||||
.into_iter()
|
||||
.filter(|game| !downloading.contains(&game.id))
|
||||
.collect()
|
||||
};
|
||||
|
||||
send_response(framed_tx, Response::ListGames(games), "ListGames").await
|
||||
}
|
||||
|
||||
async fn handle_library_summary(
|
||||
ctx: &PeerCtx,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
summary: LibrarySummary,
|
||||
) {
|
||||
let Some(addr) = remote_addr else {
|
||||
return;
|
||||
};
|
||||
|
||||
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
|
||||
let (previous_digest, previous_count, features) = {
|
||||
let db = ctx.peer_game_db.read().await;
|
||||
let (_, digest) = db.peer_library_state(&peer_id).unwrap_or((0, 0));
|
||||
(
|
||||
digest,
|
||||
db.peer_game_count(&peer_id),
|
||||
db.peer_features(&peer_id),
|
||||
)
|
||||
};
|
||||
|
||||
{
|
||||
let mut db = ctx.peer_game_db.write().await;
|
||||
db.update_peer_library(
|
||||
&peer_id,
|
||||
summary.library_rev,
|
||||
summary.library_digest,
|
||||
features,
|
||||
);
|
||||
}
|
||||
|
||||
if summary.library_digest != previous_digest || previous_count == 0 {
|
||||
tokio::spawn({
|
||||
let peer_id_arc = ctx.peer_id.clone();
|
||||
let local_library = ctx.local_library.clone();
|
||||
let peer_game_db = ctx.peer_game_db.clone();
|
||||
let tx_notify_ui = ctx.tx_notify_ui.clone();
|
||||
async move {
|
||||
if let Err(err) = perform_handshake_with_peer(
|
||||
peer_id_arc,
|
||||
local_library,
|
||||
peer_game_db,
|
||||
tx_notify_ui,
|
||||
addr,
|
||||
Some(peer_id),
|
||||
)
|
||||
.await
|
||||
{
|
||||
log::warn!("Failed to refresh library from {addr}: {err}");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_library_snapshot(
|
||||
ctx: &PeerCtx,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
snapshot: LibrarySnapshot,
|
||||
) {
|
||||
if let Some(addr) = remote_addr {
|
||||
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
|
||||
{
|
||||
let mut db = ctx.peer_game_db.write().await;
|
||||
db.apply_library_snapshot(&peer_id, snapshot);
|
||||
}
|
||||
|
||||
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_library_delta(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, delta: LibraryDelta) {
|
||||
let Some(addr) = remote_addr else {
|
||||
return;
|
||||
};
|
||||
|
||||
let peer_id = ensure_peer_id_for_addr(&ctx.peer_game_db, addr).await;
|
||||
let applied = {
|
||||
let mut db = ctx.peer_game_db.write().await;
|
||||
db.apply_library_delta(&peer_id, delta)
|
||||
};
|
||||
|
||||
if applied {
|
||||
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
|
||||
} else {
|
||||
spawn_library_resync(
|
||||
ctx.peer_id.clone(),
|
||||
ctx.local_library.clone(),
|
||||
ctx.peer_game_db.clone(),
|
||||
ctx.tx_notify_ui.clone(),
|
||||
addr,
|
||||
peer_id,
|
||||
"resync",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_get_game(ctx: &PeerCtx, id: String, framed_tx: ResponseWriter) -> ResponseWriter {
|
||||
log::info!("Received GetGame request for {id} from peer");
|
||||
let response = get_game_response(ctx, id).await;
|
||||
send_response(framed_tx, response, "GetGame").await
|
||||
}
|
||||
|
||||
async fn get_game_response(ctx: &PeerCtx, id: String) -> Response {
|
||||
let downloading = ctx.downloading_games.read().await.contains(&id);
|
||||
if downloading {
|
||||
log::info!("Declining to serve GetGame for {id} because download is in progress");
|
||||
return Response::GameNotFound(id);
|
||||
}
|
||||
|
||||
let Some(game_dir) = ctx.game_dir.read().await.clone() else {
|
||||
return Response::GameNotFound(id);
|
||||
};
|
||||
|
||||
let has_game = {
|
||||
let db_guard = ctx.local_game_db.read().await;
|
||||
db_guard
|
||||
.as_ref()
|
||||
.is_some_and(|db| db.get_game_by_id(&id).is_some())
|
||||
};
|
||||
|
||||
if !has_game {
|
||||
return Response::GameNotFound(id);
|
||||
}
|
||||
|
||||
match get_game_file_descriptions(&id, &game_dir).await {
|
||||
Ok(file_descriptions) => Response::GetGame {
|
||||
id,
|
||||
file_descriptions,
|
||||
},
|
||||
Err(PeerError::FileSizeDetermination { path, source }) => {
|
||||
let error_msg = format!("Failed to determine file size for {path}: {source}");
|
||||
log::error!("File size determination error for game {id}: {error_msg}");
|
||||
Response::InternalPeerError(error_msg)
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Failed to get game file descriptions for {id}: {err}");
|
||||
Response::GameNotFound(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_file_data_request(
|
||||
ctx: &PeerCtx,
|
||||
desc: GameFileDescription,
|
||||
framed_tx: ResponseWriter,
|
||||
) -> ResponseWriter {
|
||||
log::info!(
|
||||
"Received GetGameFileData request for {} from peer",
|
||||
desc.relative_path
|
||||
);
|
||||
|
||||
let Some(game_dir) = ctx.game_dir.read().await.clone() else {
|
||||
return send_invalid_request(
|
||||
framed_tx,
|
||||
desc.relative_path.as_bytes().to_vec(),
|
||||
"Game directory not set",
|
||||
)
|
||||
.await;
|
||||
};
|
||||
|
||||
let base_dir = PathBuf::from(game_dir);
|
||||
let mut tx = framed_tx.into_inner();
|
||||
send_game_file_data(&desc, &mut tx, &base_dir).await;
|
||||
FramedWrite::new(tx, LengthDelimitedCodec::new())
|
||||
}
|
||||
|
||||
async fn handle_file_chunk_request(
|
||||
ctx: &PeerCtx,
|
||||
game_id: String,
|
||||
relative_path: String,
|
||||
offset: u64,
|
||||
length: u64,
|
||||
framed_tx: ResponseWriter,
|
||||
) -> ResponseWriter {
|
||||
log::info!(
|
||||
"Received GetGameFileChunk request for {relative_path} (offset {offset}, length {length})"
|
||||
);
|
||||
|
||||
let Some(game_dir) = ctx.game_dir.read().await.clone() else {
|
||||
return send_invalid_request(
|
||||
framed_tx,
|
||||
relative_path.as_bytes().to_vec(),
|
||||
"Game directory not set",
|
||||
)
|
||||
.await;
|
||||
};
|
||||
|
||||
let base_dir = PathBuf::from(game_dir);
|
||||
let mut tx = framed_tx.into_inner();
|
||||
send_game_file_chunk(&game_id, &relative_path, offset, length, &mut tx, &base_dir).await;
|
||||
FramedWrite::new(tx, LengthDelimitedCodec::new())
|
||||
}
|
||||
|
||||
async fn send_invalid_request(
|
||||
framed_tx: ResponseWriter,
|
||||
raw_request: Vec<u8>,
|
||||
message: &str,
|
||||
) -> ResponseWriter {
|
||||
send_response(
|
||||
framed_tx,
|
||||
Response::InvalidRequest(raw_request.into(), message.to_string()),
|
||||
"InvalidRequest",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_goodbye(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, peer_id: String) {
|
||||
log::info!("Received Goodbye from peer {peer_id}");
|
||||
let removed = { ctx.peer_game_db.write().await.remove_peer(&peer_id) };
|
||||
if removed.is_none() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(addr) = remote_addr {
|
||||
events::emit_peer_lost(&ctx.peer_game_db, &ctx.tx_notify_ui, addr).await;
|
||||
}
|
||||
|
||||
events::emit_peer_game_list(&ctx.peer_game_db, &ctx.tx_notify_ui).await;
|
||||
}
|
||||
|
||||
async fn handle_announce_games(ctx: &PeerCtx, remote_addr: Option<SocketAddr>, games: Vec<Game>) {
|
||||
log::info!(
|
||||
"Received {} announced games from peer {remote_addr:?}",
|
||||
games.len()
|
||||
);
|
||||
|
||||
if let Some(addr) = remote_addr {
|
||||
let aggregated_games = update_peer_from_game_list(&ctx.peer_game_db, addr, &games).await;
|
||||
events::send(
|
||||
&ctx.tx_notify_ui,
|
||||
PeerEvent::ListGames(aggregated_games),
|
||||
"ListGames",
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user