//! QUIC server accept loop. use std::{net::SocketAddr, time::Duration}; use s2n_quic::{Connection, Server, provider::limits::Limits}; use tokio::sync::mpsc::UnboundedSender; use crate::{ PeerEvent, config::{CERT_PEM, KEY_PEM}, context::PeerCtx, events, services::{ advertise::{monitor_mdns_events, start_mdns_advertiser}, stream::handle_peer_stream, }, }; /// Runs the QUIC server and mDNS advertiser. pub async fn run_server_component( addr: SocketAddr, ctx: PeerCtx, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { let limits = Limits::default() .with_max_handshake_duration(Duration::from_secs(3))? .with_max_idle_timeout(Duration::from_secs(3))?; let mut server = Server::builder() .with_tls((CERT_PEM, KEY_PEM))? .with_io(addr)? .with_limits(limits)? .start()?; let server_addr = server.local_addr()?; log::info!("Peer server listening on {server_addr}"); let mdns_advertiser = start_mdns_advertiser(&ctx, server_addr).await?; let mdns_monitor = mdns_advertiser.monitor.clone(); let mdns_shutdown = ctx.shutdown.clone(); ctx.task_tracker.spawn(async move { monitor_mdns_events(mdns_monitor, mdns_shutdown).await; }); let ready_addr = (*ctx.local_peer_addr.read().await).unwrap_or_else(|| direct_connect_addr(server_addr)); let _mdns_advertiser = mdns_advertiser; events::send( &tx_notify_ui, PeerEvent::LocalPeerReady { peer_id: ctx.peer_id.as_ref().clone(), addr: ready_addr, }, ); loop { let connection = tokio::select! { () = ctx.shutdown.cancelled() => return Ok(()), connection = server.accept() => connection, }; let Some(connection) = connection else { eyre::bail!("QUIC server accept loop ended unexpectedly"); }; let ctx = ctx.clone(); let tx_notify_ui = tx_notify_ui.clone(); let task_tracker = ctx.task_tracker.clone(); task_tracker.spawn(async move { if let Err(err) = handle_peer_connection(connection, ctx, tx_notify_ui).await { log::error!("Peer connection error: {err}"); } }); } } fn direct_connect_addr(server_addr: SocketAddr) -> SocketAddr { if server_addr.ip().is_unspecified() { return SocketAddr::from(([127, 0, 0, 1], server_addr.port())); } server_addr } async fn handle_peer_connection( mut connection: Connection, ctx: PeerCtx, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { let remote_addr = connection.remote_addr()?; log::info!("{remote_addr} peer connected"); events::send(&tx_notify_ui, PeerEvent::PeerConnected(remote_addr)); loop { let stream = tokio::select! { () = ctx.shutdown.cancelled() => break, stream = connection.accept_bidirectional_stream() => stream, }; let Some(stream) = stream? else { break; }; let ctx = ctx.clone(); let task_tracker = ctx.task_tracker.clone(); task_tracker.spawn(async move { if let Err(err) = handle_peer_stream(stream, ctx, Some(remote_addr)).await { log::error!("{remote_addr:?} peer stream error: {err}"); } }); } events::send(&tx_notify_ui, PeerEvent::PeerDisconnected(remote_addr)); Ok(()) }