Files
lanspread/crates/lanspread-peer/src/peer.rs
T
ddidderr 0f10108438 perf(peer): widen LAN bulk-transfer windows and buffers
Centralize the bulk-transfer sizing in config.rs and bump the values used
on both ends of a QUIC connection:

- CHUNK_SIZE: 32 MiB -> 128 MiB
- QUIC_CONNECTION_DATA_WINDOW: 64 MiB -> 256 MiB
- QUIC_STREAM_DATA_WINDOW: 32 MiB -> 128 MiB
- QUIC_MAX_SEND_BUFFER_SIZE: 32 MiB -> 128 MiB
- QUIC_INITIAL_CONGESTION_WINDOW: 1 MiB -> 4 MiB
- FILE_TRANSFER_BUFFER_SIZE: 64 KiB -> 1 MiB (new constant)

The previous 32 MiB stream window was already comfortably above the
bandwidth-delay product of a sub-millisecond LAN at 2.5 GbE. The further
bump is deliberately generous: the goal is to push flow control and
per-syscall overhead far enough out of the way that they cannot be the
suspect when isolating the remaining LAN download bottleneck (disk, NIC,
or s2n-quic platform offload on the sending host). Memory pressure from
the larger windows is not observable on a desktop client moving GB-sized
games.

stream_file_bytes previously read the local file in 64 KiB chunks. At
multi-Gbit/s send rates that produced many thousands of disk reads per
second; bumping to 1 MiB keeps the per-file syscall load modest with no
measurable latency cost on streamed bulk transfers. The buffer size lives
in config.rs as FILE_TRANSFER_BUFFER_SIZE so it stays adjustable from one
place.

Also add a started/MiB-per-second log line at info level when a file
finishes streaming. This matches the S37 measurement methodology already
used in the peer-cli harness and makes per-file send throughput visible in
normal operation.

The peer-cli extended-scenarios harness uses CHUNK_SIZE as the tolerance
bound for chunk-boundary variance in its assertions, so its constant is
bumped to match. The multi-chunk planning unit test is rewritten to
reference CHUNK_SIZE symbolically (CHUNK_SIZE * 3 + CHUNK_SIZE / 2)
instead of a hardcoded 120 MiB; the previous literal would silently
degrade into a single-chunk test at the new chunk size and stop
exercising the spread-across-peers code path.

Test Plan:
- just fmt
- just clippy
- just test
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S37 \
  --build-image
- python3 crates/lanspread-peer-cli/scripts/run_extended_scenarios.py S37

Refs: local LAN download performance investigation on 2026-05-20.
Depends-on: d7f7dc737e (QUIC UDP socket buffer sizing).
2026-05-20 21:20:25 +02:00

149 lines
4.5 KiB
Rust

use std::{convert::TryInto, path::Path};
use bytes::Bytes;
use lanspread_db::db::GameFileDescription;
use lanspread_utils::maybe_addr;
use s2n_quic::{
connection,
stream::{Error as StreamError, SendStream},
};
use tokio::{
io::{AsyncReadExt, AsyncSeekExt},
time::Instant,
};
use crate::{config::FILE_TRANSFER_BUFFER_SIZE, path_validation::validate_game_file_path};
async fn stream_file_bytes(
tx: &mut SendStream,
base_dir: &Path,
relative_path: &str,
offset: u64,
length: Option<u64>,
) -> eyre::Result<()> {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
// Validate the path to prevent directory traversal
let validated_path = validate_game_file_path(base_dir, relative_path)?;
log::debug!(
"{remote_addr} streaming file bytes for peer: {}, offset: {offset}, length: {length:?}",
validated_path.display()
);
let mut file = tokio::fs::File::open(&validated_path).await?;
if offset > 0 {
file.seek(std::io::SeekFrom::Start(offset)).await?;
}
let mut remaining = length.unwrap_or(u64::MAX);
let expect_exact = length.is_some();
let mut transfer_complete = matches!(length, Some(0));
let mut total_bytes = 0u64;
let mut last_total_bytes = 0u64;
let started = Instant::now();
let mut timestamp = Instant::now();
let mut buf = vec![0u8; FILE_TRANSFER_BUFFER_SIZE];
while remaining > 0 {
let read_len = std::cmp::min(remaining, buf.len() as u64);
let read_len: usize = read_len.try_into().unwrap_or(usize::MAX);
if read_len == 0 {
break;
}
let bytes_read = file.read(&mut buf[..read_len]).await?;
if bytes_read == 0 {
if !expect_exact {
transfer_complete = true;
}
break;
}
tx.send(Bytes::copy_from_slice(&buf[..bytes_read])).await?;
remaining = remaining.saturating_sub(bytes_read as u64);
total_bytes += bytes_read as u64;
if expect_exact && remaining == 0 {
transfer_complete = true;
break;
}
if last_total_bytes + 10_000_000 < total_bytes {
let elapsed = timestamp.elapsed();
let diff_bytes = total_bytes - last_total_bytes;
if elapsed.as_secs_f64() >= 1.0 {
#[allow(clippy::cast_precision_loss)]
let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0);
log::debug!(
"{remote_addr} sending file data: {}, MB/s: {mb_per_s:.2}",
validated_path.display()
);
last_total_bytes = total_bytes;
timestamp = Instant::now();
}
}
}
let elapsed = started.elapsed();
#[allow(clippy::cast_precision_loss)]
let mib_per_s = if elapsed.as_secs_f64() > 0.0 {
total_bytes as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0)
} else {
0.0
};
log::info!(
"{remote_addr} finished streaming file bytes: {}, total_bytes: {total_bytes}, MiB/s: {mib_per_s:.2}",
validated_path.display()
);
match tx.close().await {
Ok(()) => {}
Err(err) if transfer_complete && is_clean_remote_close(&err) => {
log::debug!("{remote_addr} closed stream after transfer completion: {err}");
}
Err(err) => return Err(err.into()),
}
Ok(())
}
fn is_clean_remote_close(err: &StreamError) -> bool {
matches!(
err,
StreamError::ConnectionError {
error: connection::Error::Closed { .. },
..
}
)
}
pub async fn send_game_file_data(
game_file_desc: &GameFileDescription,
tx: &mut SendStream,
game_dir: &Path,
) {
if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
log::error!(
"{remote_addr} failed to stream file {}: {e}",
game_file_desc.relative_path
);
}
}
pub async fn send_game_file_chunk(
game_id: &str,
relative_path: &str,
offset: u64,
length: u64,
tx: &mut SendStream,
game_dir: &Path,
) {
if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await {
let remote_addr = maybe_addr!(tx.connection().remote_addr());
log::error!(
"{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}"
);
}
}