Refine peer networking and chunked file transfers
This commit is contained in:
@@ -3,12 +3,16 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use bytes::Bytes;
|
||||
use lanspread_db::db::{GameDB, GameFileDescription};
|
||||
use lanspread_proto::{Message as _, Request, Response};
|
||||
use lanspread_utils::maybe_addr;
|
||||
use s2n_quic::stream::SendStream;
|
||||
use tokio::{io::AsyncReadExt, sync::RwLock, time::Instant};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncSeekExt},
|
||||
sync::RwLock,
|
||||
time::Instant,
|
||||
};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -74,10 +78,26 @@ impl RequestHandler {
|
||||
match get_relative_path(games_folder, entry.path()) {
|
||||
Ok(relative_path) => match relative_path.to_str() {
|
||||
Some(relative_path) => {
|
||||
let is_dir = entry.file_type().is_dir();
|
||||
let size = if is_dir {
|
||||
None
|
||||
} else {
|
||||
match entry.metadata() {
|
||||
Ok(metadata) => Some(metadata.len()),
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to read metadata for {}: {e}",
|
||||
relative_path
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
let game_file_description = GameFileDescription {
|
||||
game_id: id.clone(),
|
||||
relative_path: relative_path.to_string(),
|
||||
is_dir: entry.file_type().is_dir(),
|
||||
is_dir,
|
||||
size,
|
||||
};
|
||||
|
||||
tracing::debug!("Found game file: {:?}", game_file_description);
|
||||
@@ -114,67 +134,105 @@ impl RequestHandler {
|
||||
Request::ListGames => self.handle_list_games().await,
|
||||
Request::GetGame { id } => self.handle_get_game(id, games_folder).await,
|
||||
Request::GetGameFileData(_) => RequestHandler::handle_get_game_file_data(),
|
||||
Request::GetGameFileChunk { .. } => RequestHandler::handle_get_game_file_data(),
|
||||
Request::Invalid(data, err_msg) => RequestHandler::handle_invalid(data, err_msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stream_file_bytes(
|
||||
tx: &mut SendStream,
|
||||
base_dir: &Path,
|
||||
relative_path: &str,
|
||||
offset: u64,
|
||||
length: Option<u64>,
|
||||
) -> eyre::Result<()> {
|
||||
let remote_addr = maybe_addr!(tx.connection().remote_addr());
|
||||
let game_file = base_dir.join(relative_path);
|
||||
tracing::debug!(
|
||||
"{remote_addr} streaming file bytes: {:?}, offset: {offset}, length: {:?}",
|
||||
game_file
|
||||
);
|
||||
|
||||
let mut file = tokio::fs::File::open(&game_file).await?;
|
||||
if offset > 0 {
|
||||
file.seek(std::io::SeekFrom::Start(offset)).await?;
|
||||
}
|
||||
|
||||
let mut remaining = length.unwrap_or(u64::MAX);
|
||||
let mut total_bytes = 0u64;
|
||||
let mut last_total_bytes = 0u64;
|
||||
let mut timestamp = Instant::now();
|
||||
let mut buf = vec![0u8; 64 * 1024];
|
||||
|
||||
while remaining > 0 {
|
||||
let read_len = std::cmp::min(remaining, buf.len() as u64) as usize;
|
||||
if read_len == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let bytes_read = file.read(&mut buf[..read_len]).await?;
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
tx.send(Bytes::copy_from_slice(&buf[..bytes_read])).await?;
|
||||
remaining = remaining.saturating_sub(bytes_read as u64);
|
||||
total_bytes += bytes_read as u64;
|
||||
|
||||
if last_total_bytes + 10_000_000 < total_bytes {
|
||||
let elapsed = timestamp.elapsed();
|
||||
let diff_bytes = total_bytes - last_total_bytes;
|
||||
|
||||
if elapsed.as_secs_f64() >= 1.0 {
|
||||
#[allow(clippy::cast_precision_loss)]
|
||||
let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1_000_000.0);
|
||||
tracing::debug!(
|
||||
"{remote_addr} sending file data: {:?}, MB/s: {mb_per_s:.2}",
|
||||
game_file
|
||||
);
|
||||
last_total_bytes = total_bytes;
|
||||
timestamp = Instant::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"{remote_addr} finished streaming file bytes: {:?}, total_bytes: {total_bytes}",
|
||||
game_file
|
||||
);
|
||||
|
||||
tx.close().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn send_game_file_data(
|
||||
game_file_desc: &GameFileDescription,
|
||||
tx: &mut SendStream,
|
||||
game_dir: &Path,
|
||||
) {
|
||||
let remote_addr = maybe_addr!(tx.connection().remote_addr());
|
||||
|
||||
tracing::debug!("{remote_addr} client requested game file data: {game_file_desc:?}",);
|
||||
|
||||
// deliver file data to client
|
||||
let game_file = game_dir.join(&game_file_desc.relative_path);
|
||||
|
||||
let mut total_bytes = 0;
|
||||
let mut last_total_bytes = 0;
|
||||
let mut timestamp = Instant::now();
|
||||
|
||||
if let Ok(mut f) = tokio::fs::File::open(&game_file).await {
|
||||
let mut buf = BytesMut::with_capacity(64 * 1024);
|
||||
while let Ok(bytes_read) = f.read_buf(&mut buf).await {
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
total_bytes += bytes_read;
|
||||
|
||||
if last_total_bytes + 10_000_000 < total_bytes {
|
||||
let elapsed = timestamp.elapsed();
|
||||
let diff_bytes = total_bytes - last_total_bytes;
|
||||
|
||||
if elapsed.as_secs_f64() >= 1.0 {
|
||||
#[allow(clippy::cast_precision_loss)]
|
||||
let mb_per_s = (diff_bytes as f64) / (elapsed.as_secs_f64() * 1000.0 * 1000.0);
|
||||
|
||||
tracing::debug!(
|
||||
"{remote_addr} sending file data: {game_file:?}, MB/s: {mb_per_s:.2}",
|
||||
);
|
||||
last_total_bytes = total_bytes;
|
||||
timestamp = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = tx.send(buf.split_to(bytes_read).freeze()).await {
|
||||
tracing::error!("{remote_addr} failed to send file data: {e}",);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"{remote_addr} finished sending file data: {game_file:?}, total_bytes: {total_bytes}",
|
||||
if let Err(e) = stream_file_bytes(tx, game_dir, &game_file_desc.relative_path, 0, None).await {
|
||||
let remote_addr = maybe_addr!(tx.connection().remote_addr());
|
||||
tracing::error!(
|
||||
"{remote_addr} failed to stream file {}: {e}",
|
||||
game_file_desc.relative_path
|
||||
);
|
||||
} else {
|
||||
tracing::error!("{remote_addr} failed to open file: {}", game_file.display());
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = tx.close().await {
|
||||
tracing::error!("{remote_addr} failed to close stream: {e}");
|
||||
pub(crate) async fn send_game_file_chunk(
|
||||
game_id: &str,
|
||||
relative_path: &str,
|
||||
offset: u64,
|
||||
length: u64,
|
||||
tx: &mut SendStream,
|
||||
game_dir: &Path,
|
||||
) {
|
||||
if let Err(e) = stream_file_bytes(tx, game_dir, relative_path, offset, Some(length)).await {
|
||||
let remote_addr = maybe_addr!(tx.connection().remote_addr());
|
||||
tracing::error!(
|
||||
"{remote_addr} failed to stream chunk {game_id}/{relative_path} offset {offset} length {length}: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user