Compare commits

...

4 Commits

Author SHA1 Message Date
ddidderr 44e0629926 refactor(peer-cli): split download measurement event handlers
Extract the download-begin and chunk-finished measurement bookkeeping out of the
main peer-cli event reducer. This keeps the S37 throughput reporting behavior
unchanged while bringing the reducer back under the pedantic clippy line-count
threshold.

Test Plan:
- just fmt
- just clippy
- just test

Refs: S37 download throughput measurement harness.
2026-05-20 20:27:00 +02:00
ddidderr d7f7dc737e perf(peer): request larger QUIC UDP socket buffers
Configure the s2n-quic Tokio IO provider on both client and server instead of
using the address-only default provider. The configured provider asks the OS for
4 MiB send and receive buffers on each QUIC UDP socket, which avoids starting
bulk LAN transfers on the tiny default UDP buffer sizes.

I tested a wider version that also raised s2n-quic internal IO queues to 8 MiB,
but that regressed S37 to 710.19 and 736.20 MiB/s in repeat runs. This commit
keeps the narrower socket-buffer request, which measured faster than the prior
flow-control-only tuning while leaving the internal queue defaults intact.

The host used for measurement reports:
- net.core.rmem_max = 16777216
- net.core.wmem_max = 16777216
- net.core.rmem_default = 212992
- net.core.wmem_default = 212992

S37 single-source throughput:
- Step 1: 824.94 MiB/s, 6920.09 Mbit/s, 2.483s
- Step 2 sample A: 848.15 MiB/s, 7114.81 Mbit/s, 2.415s
- Step 2 sample B: 874.06 MiB/s, 7332.12 Mbit/s, 2.343s

Test Plan:
- just fmt
- sysctl net.core.rmem_max net.core.wmem_max net.core.rmem_default \
  net.core.wmem_default
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S37 --build-image
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S37

Refs: local LAN download performance investigation on 2026-05-20.
Depends-on: cd8bcbfeedfa (QUIC flow-control and BBR tuning).
2026-05-20 20:26:59 +02:00
ddidderr 5b689ec5f4 perf(peer): tune QUIC flow control for LAN downloads
Raise the s2n-quic connection and stream data windows on both the client and
server, increase the max send buffer, and use BBR with a larger initial
congestion window. The download path was already able to pipeline multiple chunk
streams, but those streams still shared small default connection-level budgets
that limited sustained LAN throughput.

The tuning keeps one current wire protocol and does not add fallback behavior.
It is deliberately centralized in the peer networking module so later transport
changes can use the same limits on both sides of the connection.

S37 single-source throughput:
- Before: 733.22 MiB/s, 6150.72 Mbit/s, 2.793s
- After: 824.94 MiB/s, 6920.09 Mbit/s, 2.483s
- Delta: +91.72 MiB/s, +769.37 Mbit/s, about +12.5%

Test Plan:
- just fmt
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S37 --build-image

Refs: local LAN download performance investigation on 2026-05-20.
Depends-on: 14e772c5c71a (peer-cli S37 throughput measurement).
2026-05-20 20:26:59 +02:00
ddidderr 8a9f420a06 test(peer-cli): measure single-source download throughput
Add peer-cli accounting for download sessions so terminal download events
report bytes, chunks, elapsed time, MiB/s, and Mbit/s. The extended scenario
runner now has S37, a focused single-source download benchmark that creates a
2 GiB sparse bf1942 archive, downloads it from one peer with install disabled,
and checks the destination archive size and reported byte count.

This gives the QUIC performance work a repeatable measurement below the 5 GiB
limit from the original request. The source file is sparse, so S37 is aimed at
the app, QUIC, and destination-write path rather than raw source-disk reads;
the existing correctness scenarios still cover normal game downloads.

Baseline S37 before QUIC tuning:
- 733.22 MiB/s
- 6150.72 Mbit/s
- 2.793s for 2.00 GiB plus version.ini
- 65 reported chunks

Test Plan:
- just fmt
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S37 --build-image

Refs: local LAN download performance investigation on 2026-05-20.
2026-05-20 20:26:58 +02:00
6 changed files with 207 additions and 21 deletions
+1
View File
@@ -44,6 +44,7 @@ for deterministic local runs; mDNS/macvlan remains an environment smoke path.
| S34 | Many-small-files game without `.eti` | A catalog game root contains `version.ini` plus many small regular files and no archive. | Download with `install=false` transfers every file, chunk events are coherent for small files, and source/receiver manifests match exactly. |
| S35 | Unknown game ID from remote peer | A remote peer advertises a game ID that is not in the receiver's catalog. | The receiver does not list the unknown game as downloadable, download attempts fail deterministically, and no local files are created. |
| S36 | Latest singleton beats stale majority | Five peers advertise one game; one peer has `20260501`, four peers have `20250101`. | `list-games` reports `eti_game_version=20260501`; all descriptors and chunks come from the singleton latest peer; stale peers contribute zero bytes. |
| S37 | Single-source download throughput | A source peer advertises a temporary catalog game with one sparse `2 GiB` `.eti`; an empty client downloads it with `install=false`. | The client emits `download-finished` with throughput measurements (`bytes`, `duration_ms`, `mib_per_s`, `mbit_per_s`), and the downloaded archive size matches the source. |
## Version-Skew Contract
@@ -26,6 +26,8 @@ CONTAINER_PREFIX = "lanspread-peer-cli-ext"
CATALOG_DB = "/app/game.db"
FIXTURES = REPO / "crates" / "lanspread-peer-cli" / "fixtures"
CHUNK_SIZE = 32 * 1024 * 1024
PERF_GAME_ID = "bf1942"
PERF_GAME_SIZE = 2 * 1024 * 1024 * 1024
IGNORED_DIFF_NAMES = {".lanspread", ".lanspread.json", "local"}
@@ -322,6 +324,7 @@ class Runner:
("S34", self.s34_many_small_files),
("S35", self.s35_unknown_game_filtered),
("S36", self.s36_latest_singleton),
("S37", self.s37_single_source_download_throughput),
]
for scenario_id, scenario in scenarios:
@@ -1015,6 +1018,45 @@ class Runner:
raise ScenarioError("got-game-files had no descriptors")
return "client reported latest 20260501 with peer_count=5; only singleton latest peer sent chunks; diff matched"
def s37_single_source_download_throughput(self) -> str:
source_dir = self.fixture_root / "s37-source"
create_large_sparse_game(source_dir / PERF_GAME_ID, size=PERF_GAME_SIZE)
source = self.peer("s37-source", games_dir=source_dir)
client = self.peer("s37-client")
connect_many(client, [source])
wait_remote_game(client, PERF_GAME_ID, peer_count=1, version="20260520")
waiter = LineWaiter(len(client.output))
client.send({"cmd": "download", "game_id": PERF_GAME_ID, "install": False})
finished = client.wait_for(
event_is("download-finished", PERF_GAME_ID),
timeout=300,
description=f"{PERF_GAME_ID} throughput download",
waiter=waiter,
)
destination_archive = client.host_games_dir / PERF_GAME_ID / f"{PERF_GAME_ID}.eti"
if destination_archive.stat().st_size != PERF_GAME_SIZE:
raise ScenarioError(
f"downloaded archive size mismatch: {destination_archive.stat().st_size} != {PERF_GAME_SIZE}"
)
throughput = finished.get("data", {}).get("throughput")
if not throughput:
raise ScenarioError(f"download-finished did not include throughput: {finished}")
expected_bytes = PERF_GAME_SIZE + len("20260520")
if int(throughput["bytes"]) != expected_bytes:
raise ScenarioError(
f"throughput byte count mismatch: {throughput['bytes']} != {expected_bytes}"
)
return (
f"{PERF_GAME_ID} {format_bytes(PERF_GAME_SIZE)} single-source download: "
f"{throughput['mib_per_s']:.2f} MiB/s, "
f"{throughput['mbit_per_s']:.2f} Mbit/s, "
f"{throughput['duration_ms'] / 1000.0:.3f}s, "
f"{throughput['chunks']} chunks"
)
def run(command: list[str], description: str) -> subprocess.CompletedProcess[str]:
result = subprocess.run(
@@ -1122,6 +1164,20 @@ def create_many_small_game(root: Path) -> None:
(root / "version.ini").write_text("20250101", encoding="utf-8")
def create_large_sparse_game(root: Path, *, size: int) -> None:
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True)
(root / "version.ini").write_text("20260520", encoding="utf-8")
archive = root / f"{root.name}.eti"
with archive.open("wb") as handle:
handle.truncate(size)
def format_bytes(size: int) -> str:
return f"{size / 1024 / 1024 / 1024:.2f} GiB"
def connect_many(client: Peer, peers: list[Peer]) -> None:
for peer in peers:
client.connect_to(peer)
+86 -14
View File
@@ -7,7 +7,7 @@ use std::{
net::SocketAddr,
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::Duration,
time::{Duration, Instant},
};
use eyre::Context;
@@ -95,6 +95,7 @@ struct CliState {
active_operations: Vec<ActiveOperation>,
game_files: HashMap<String, Vec<GameFileDescription>>,
unavailable_games: HashSet<String>,
downloads: HashMap<String, DownloadMeasurement>,
}
#[derive(Clone, serde::Serialize)]
@@ -103,6 +104,12 @@ struct LocalPeer {
addr: String,
}
struct DownloadMeasurement {
started_at: Instant,
bytes: u64,
chunks: u64,
}
struct SharedState {
state: RwLock<CliState>,
peer_game_db: Arc<RwLock<PeerGameDB>>,
@@ -443,25 +450,23 @@ async fn update_state_from_event(shared: &SharedState, event: PeerEvent) -> (&'s
json!({"game_id": id, "file_descriptions": file_descriptions}),
)
}
PeerEvent::DownloadGameFilesBegin { id } => ("download-begin", json!({"game_id": id})),
PeerEvent::DownloadGameFilesBegin { id } => download_begin_event(shared, id).await,
PeerEvent::DownloadGameFileChunkFinished {
id,
peer_addr,
relative_path,
offset,
length,
} => (
"download-chunk-finished",
json!({
"game_id": id,
"peer_addr": peer_addr.to_string(),
"relative_path": relative_path,
"offset": offset,
"length": length,
}),
),
PeerEvent::DownloadGameFilesFinished { id } => game_id_event("download-finished", id),
PeerEvent::DownloadGameFilesFailed { id } => game_id_event("download-failed", id),
} => {
download_chunk_finished_event(shared, id, peer_addr, relative_path, offset, length)
.await
}
PeerEvent::DownloadGameFilesFinished { id } => {
download_terminal_event(shared, "download-finished", id).await
}
PeerEvent::DownloadGameFilesFailed { id } => {
download_terminal_event(shared, "download-failed", id).await
}
PeerEvent::DownloadGameFilesAllPeersGone { id } => game_id_event("download-peers-gone", id),
PeerEvent::InstallGameBegin { id, operation } => (
"install-begin",
@@ -494,6 +499,73 @@ fn game_id_event(kind: &'static str, id: String) -> (&'static str, Value) {
(kind, json!({"game_id": id}))
}
async fn download_begin_event(shared: &SharedState, id: String) -> (&'static str, Value) {
shared.state.write().await.downloads.insert(
id.clone(),
DownloadMeasurement {
started_at: Instant::now(),
bytes: 0,
chunks: 0,
},
);
game_id_event("download-begin", id)
}
async fn download_chunk_finished_event(
shared: &SharedState,
id: String,
peer_addr: SocketAddr,
relative_path: String,
offset: u64,
length: u64,
) -> (&'static str, Value) {
if let Some(measurement) = shared.state.write().await.downloads.get_mut(&id) {
measurement.bytes = measurement.bytes.saturating_add(length);
measurement.chunks = measurement.chunks.saturating_add(1);
}
(
"download-chunk-finished",
json!({
"game_id": id,
"peer_addr": peer_addr.to_string(),
"relative_path": relative_path,
"offset": offset,
"length": length,
}),
)
}
async fn download_terminal_event(
shared: &SharedState,
kind: &'static str,
id: String,
) -> (&'static str, Value) {
let measurement = shared.state.write().await.downloads.remove(&id);
let Some(measurement) = measurement else {
return game_id_event(kind, id);
};
let duration = measurement.started_at.elapsed();
let seconds = duration.as_secs_f64().max(f64::EPSILON);
#[allow(clippy::cast_precision_loss)]
let bytes = measurement.bytes as f64;
(
kind,
json!({
"game_id": id,
"throughput": {
"bytes": measurement.bytes,
"chunks": measurement.chunks,
"duration_ms": duration.as_secs_f64() * 1000.0,
"mib_per_s": bytes / seconds / 1_048_576.0,
"mbit_per_s": bytes * 8.0 / seconds / 1_000_000.0,
},
}),
)
}
async fn no_peers_event(shared: &SharedState, id: String) -> (&'static str, Value) {
shared
.state
+15
View File
@@ -23,6 +23,21 @@ pub const PEER_DOWNLOAD_STREAM_WINDOW: usize = 4;
/// Maximum number of retry attempts for failed chunk downloads.
pub const MAX_RETRY_COUNT: usize = 3;
/// QUIC connection-level receive window for bulk LAN transfers (64 MiB).
pub const QUIC_CONNECTION_DATA_WINDOW: u64 = 64 * 1024 * 1024;
/// QUIC per-stream receive window for bulk LAN transfers (32 MiB).
pub const QUIC_STREAM_DATA_WINDOW: u64 = 32 * 1024 * 1024;
/// Maximum queued send data per QUIC stream (32 MiB).
pub const QUIC_MAX_SEND_BUFFER_SIZE: u32 = 32 * 1024 * 1024;
/// Initial congestion window for LAN-oriented BBR transfers (1 MiB).
pub const QUIC_INITIAL_CONGESTION_WINDOW: u32 = 1024 * 1024;
/// Requested OS UDP send and receive buffer size for QUIC sockets (4 MiB).
pub const QUIC_SOCKET_BUFFER_SIZE: usize = 4 * 1024 * 1024;
/// Fallback interval for reconciling missed filesystem watcher events (seconds).
pub const LOCAL_GAME_FALLBACK_SCAN_SECS: u64 = 300;
+44 -4
View File
@@ -10,19 +10,59 @@ use futures::{SinkExt, StreamExt};
use if_addrs::{IfAddr, Interface, get_if_addrs};
use lanspread_db::db::GameFileDescription;
use lanspread_proto::{Hello, HelloAck, LibraryDelta, Message, Request, Response};
use s2n_quic::{Client as QuicClient, Connection, client::Connect, provider::limits::Limits};
use s2n_quic::{
Client as QuicClient,
Connection,
client::Connect,
provider::{
congestion_controller,
io::tokio::{Builder as QuicIoBuilder, Provider as QuicIoProvider},
limits::Limits,
},
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::config::CERT_PEM;
use crate::config::{
CERT_PEM,
QUIC_CONNECTION_DATA_WINDOW,
QUIC_INITIAL_CONGESTION_WINDOW,
QUIC_MAX_SEND_BUFFER_SIZE,
QUIC_SOCKET_BUFFER_SIZE,
QUIC_STREAM_DATA_WINDOW,
};
pub(crate) fn quic_limits() -> eyre::Result<Limits> {
Ok(Limits::default()
.with_data_window(QUIC_CONNECTION_DATA_WINDOW)?
.with_bidirectional_local_data_window(QUIC_STREAM_DATA_WINDOW)?
.with_bidirectional_remote_data_window(QUIC_STREAM_DATA_WINDOW)?
.with_unidirectional_data_window(QUIC_STREAM_DATA_WINDOW)?
.with_max_send_buffer_size(QUIC_MAX_SEND_BUFFER_SIZE)?)
}
pub(crate) fn quic_congestion_controller() -> congestion_controller::Bbr {
congestion_controller::bbr::Builder::default()
.with_initial_congestion_window(QUIC_INITIAL_CONGESTION_WINDOW)
.build()
}
pub(crate) fn quic_io(addr: SocketAddr) -> eyre::Result<QuicIoProvider> {
Ok(QuicIoBuilder::default()
.with_receive_address(addr)?
.with_send_buffer_size(QUIC_SOCKET_BUFFER_SIZE)?
.with_recv_buffer_size(QUIC_SOCKET_BUFFER_SIZE)?
.build()?)
}
/// Establishes a QUIC connection to a peer.
pub async fn connect_to_peer(addr: SocketAddr) -> eyre::Result<Connection> {
let limits = Limits::default().with_max_handshake_duration(Duration::from_secs(3))?;
let limits = quic_limits()?.with_max_handshake_duration(Duration::from_secs(3))?;
let client = QuicClient::builder()
.with_tls(CERT_PEM)?
.with_io("0.0.0.0:0")?
.with_io(quic_io(SocketAddr::from(([0, 0, 0, 0], 0)))?)?
.with_limits(limits)?
.with_congestion_controller(quic_congestion_controller())?
.start()?;
let conn = Connect::new(addr).with_server_name("localhost");
+5 -3
View File
@@ -2,7 +2,7 @@
use std::{net::SocketAddr, time::Duration};
use s2n_quic::{Connection, Server, provider::limits::Limits};
use s2n_quic::{Connection, Server};
use tokio::sync::mpsc::UnboundedSender;
use crate::{
@@ -10,6 +10,7 @@ use crate::{
config::{CERT_PEM, KEY_PEM},
context::PeerCtx,
events,
network::{quic_congestion_controller, quic_io, quic_limits},
services::{
advertise::{monitor_mdns_events, start_mdns_advertiser},
stream::handle_peer_stream,
@@ -22,14 +23,15 @@ pub async fn run_server_component(
ctx: PeerCtx,
tx_notify_ui: UnboundedSender<PeerEvent>,
) -> eyre::Result<()> {
let limits = Limits::default()
let limits = quic_limits()?
.with_max_handshake_duration(Duration::from_secs(3))?
.with_max_idle_timeout(Duration::from_secs(3))?;
let mut server = Server::builder()
.with_tls((CERT_PEM, KEY_PEM))?
.with_io(addr)?
.with_io(quic_io(addr)?)?
.with_limits(limits)?
.with_congestion_controller(quic_congestion_controller())?
.start()?;
let server_addr = server.local_addr()?;