feat(client): send stats snapshots to relay
Client counters were only local diagnostics even though the control protocol and relay now understand Stats messages. Add a client-core sender that opens a peer-to-relay unidirectional stream with the current TunnelStats snapshot. The Windows client reports stats whenever it prints its diagnostics snapshot. Failures are logged instead of stopping the frame pump because stats reporting is diagnostic and should not be the reason a live tunnel goes down. The client-core integration test now has the server decode the stats stream before shutdown so the send path is covered without a timing race. Test Plan: - cargo fmt --check - cargo test -p lanparty-client-core \ connects_to_relay_control_stream_as_client -- --nocapture - cargo test -p lanparty-client-core - cargo test -p lanparty-client-win - cargo clippy -p lanparty-client-core --all-targets -- -D warnings - cargo clippy -p lanparty-client-win --all-targets -- -D warnings - cargo test --workspace - cargo clippy --workspace --all-targets -- -D warnings - git diff --check Refs: PLAN.md
This commit is contained in:
@@ -278,6 +278,10 @@ impl ClientSession {
|
||||
recv_control_event(&self.connection).await
|
||||
}
|
||||
|
||||
pub async fn send_stats_snapshot(&self) -> Result<()> {
|
||||
self.relay_io().send_stats_snapshot().await
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn stats_snapshot(&self) -> TunnelStats {
|
||||
self.stats.snapshot()
|
||||
@@ -370,6 +374,11 @@ impl ClientRelayIo {
|
||||
pub fn stats_snapshot(&self) -> TunnelStats {
|
||||
self.stats.snapshot()
|
||||
}
|
||||
|
||||
pub async fn send_stats_snapshot(&self) -> Result<()> {
|
||||
let stats = self.stats.snapshot();
|
||||
send_control_event(&self.connection, ControlMessage::Stats(stats)).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -513,6 +522,21 @@ async fn recv_control_event(connection: &quinn::Connection) -> Result<ControlMes
|
||||
decode_control_frame(&frame).context("failed to decode relay control event")
|
||||
}
|
||||
|
||||
async fn send_control_event(connection: &quinn::Connection, message: ControlMessage) -> Result<()> {
|
||||
let mut send = connection
|
||||
.open_uni()
|
||||
.await
|
||||
.context("failed to open client control event stream")?;
|
||||
let frame =
|
||||
encode_control_message(&message).context("failed to encode client control event")?;
|
||||
send.write_all(&frame)
|
||||
.await
|
||||
.context("failed to write client control event")?;
|
||||
send.finish()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
@@ -632,6 +656,7 @@ mod tests {
|
||||
let (server_config, certificate) = test_server_config();
|
||||
let endpoint = Endpoint::server(server_config, "127.0.0.1:0".parse().unwrap()).unwrap();
|
||||
let server_addr = endpoint.local_addr().unwrap();
|
||||
let (stats_received_tx, stats_received_rx) = tokio::sync::oneshot::channel();
|
||||
let server_task = tokio::spawn(async move {
|
||||
let incoming = endpoint.accept().await.unwrap();
|
||||
let connection = incoming.await.unwrap();
|
||||
@@ -677,6 +702,15 @@ mod tests {
|
||||
event_send.write_all(&event).await.unwrap();
|
||||
event_send.finish().unwrap();
|
||||
|
||||
let mut stats_recv = connection.accept_uni().await.unwrap();
|
||||
let stats_frame = stats_recv.read_to_end(MAX_CONTROL_FRAME_LEN).await.unwrap();
|
||||
let stats_message = decode_control_frame(&stats_frame).unwrap();
|
||||
let ControlMessage::Stats(stats) = stats_message else {
|
||||
panic!("expected client stats event");
|
||||
};
|
||||
assert_eq!(stats, TunnelStats::new(1, 1, 1, 1, 1, 1));
|
||||
stats_received_tx.send(()).unwrap();
|
||||
|
||||
connection.closed().await;
|
||||
endpoint.close(0_u32.into(), b"test complete");
|
||||
endpoint.wait_idle().await;
|
||||
@@ -739,6 +773,11 @@ mod tests {
|
||||
assert_eq!(stats.malformed_frames(), 1);
|
||||
assert_eq!(client.stats_snapshot(), stats);
|
||||
|
||||
client.send_stats_snapshot().await.unwrap();
|
||||
tokio::time::timeout(Duration::from_secs(5), stats_received_rx)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
client.shutdown("test complete").await;
|
||||
tokio::time::timeout(Duration::from_secs(5), server_task)
|
||||
.await
|
||||
|
||||
@@ -145,11 +145,11 @@ async fn run_client(session: &ClientSession, relay_route_pin: &PinnedRelayRoute)
|
||||
verify_relay_route_is_pinned(session.config().relay_addr().ip(), relay_route_pin)
|
||||
.context("relay route changed after TAP activation")?;
|
||||
print_verified_relay_route(&relay_route);
|
||||
print_client_diagnostics(&client_diagnostics_snapshot(
|
||||
print_and_report_client_diagnostics(
|
||||
session,
|
||||
true,
|
||||
tap_diagnostics.clone(),
|
||||
));
|
||||
&client_diagnostics_snapshot(session, true, tap_diagnostics.clone()),
|
||||
)
|
||||
.await;
|
||||
println!(
|
||||
"bridging TAP frames; relay route is pinned and TAP route policy is scoped; press Ctrl-C to stop"
|
||||
);
|
||||
@@ -184,11 +184,10 @@ async fn run_client(session: &ClientSession, relay_route_pin: &PinnedRelayRoute)
|
||||
.context("relay route changed while bridging")?;
|
||||
}
|
||||
_ = diagnostics_check.tick() => {
|
||||
print_client_diagnostics(&client_diagnostics_snapshot(
|
||||
print_and_report_client_diagnostics(
|
||||
session,
|
||||
true,
|
||||
tap_diagnostics.clone(),
|
||||
));
|
||||
&client_diagnostics_snapshot(session, true, tap_diagnostics.clone()),
|
||||
).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -197,11 +196,11 @@ async fn run_client(session: &ClientSession, relay_route_pin: &PinnedRelayRoute)
|
||||
#[cfg(not(windows))]
|
||||
async fn run_client(session: &ClientSession) -> Result<()> {
|
||||
open_tap_adapter(session);
|
||||
print_client_diagnostics(&client_diagnostics_snapshot(
|
||||
print_and_report_client_diagnostics(
|
||||
session,
|
||||
false,
|
||||
TapDiagnostics::new(false, None, None, None),
|
||||
));
|
||||
&client_diagnostics_snapshot(session, false, TapDiagnostics::new(false, None, None, None)),
|
||||
)
|
||||
.await;
|
||||
println!("TAP frame pump and route pinning are not wired yet; press Ctrl-C to stop");
|
||||
|
||||
let control_events = run_control_event_log(session);
|
||||
@@ -386,6 +385,16 @@ fn print_client_diagnostics(diagnostics: &ClientDiagnostics) {
|
||||
println!("{}", format_client_diagnostics(diagnostics));
|
||||
}
|
||||
|
||||
async fn print_and_report_client_diagnostics(
|
||||
session: &ClientSession,
|
||||
diagnostics: &ClientDiagnostics,
|
||||
) {
|
||||
print_client_diagnostics(diagnostics);
|
||||
if let Err(error) = session.send_stats_snapshot().await {
|
||||
eprintln!("failed to send client stats to relay: {error:#}");
|
||||
}
|
||||
}
|
||||
|
||||
fn format_client_diagnostics(diagnostics: &ClientDiagnostics) -> String {
|
||||
let stats = diagnostics.stats();
|
||||
format!(
|
||||
|
||||
Reference in New Issue
Block a user