#![allow(clippy::missing_errors_doc)] use std::{fs::File, io::Write as _, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use bytes::{Bytes, BytesMut}; use lanspread_db::db::{Game, GameFileDescription}; use lanspread_proto::{Message as _, Request, Response}; use lanspread_utils::maybe_addr; use s2n_quic::{client::Connect, provider::limits::Limits, Client as QuicClient, Connection}; use tokio::{ io::AsyncWriteExt, stream, sync::{ mpsc::{UnboundedReceiver, UnboundedSender}, Mutex, }, }; static CERT_PEM: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../cert.pem")); #[derive(Debug)] pub enum ClientEvent { ListGames(Vec), GotGameFiles(Vec), DownloadGameFilesBegin { id: String }, DownloadGameFilesFinished { id: String }, } #[derive(Debug)] pub enum ClientCommand { ListGames, GetGame(String), DownloadGameFiles(Vec), ServerAddr(SocketAddr), SetGameDir(String), } async fn initial_server_alive_check(conn: &mut Connection) -> bool { let stream = match conn.open_bidirectional_stream().await { Ok(stream) => stream, Err(e) => { log::error!("failed to open stream: {e}"); return false; } }; let (mut rx, mut tx) = stream.split(); // send ping if let Err(e) = tx.send(Request::Ping.encode()).await { log::error!("failed to send ping to server: {e}"); return false; } let _ = tx.close().await; // receive pong if let Ok(Some(response)) = rx.receive().await { let response = Response::decode(response); if let Response::Pong = response { log::info!("server is alive"); return true; } log::error!("server sent invalid response to ping: {response:?}"); } false } async fn download_game_files( game_file_descs: Vec, games_dir: String, server_addr: SocketAddr, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { let limits = Limits::default() .with_max_handshake_duration(Duration::from_secs(3))? .with_max_idle_timeout(Duration::from_secs(1))?; let client = QuicClient::builder() .with_tls(CERT_PEM)? .with_io("0.0.0.0:0")? .with_limits(limits)? .start()?; let conn = Connect::new(server_addr).with_server_name("localhost"); let mut conn = client.connect(conn).await?; conn.keep_alive(true)?; let game_file_descs = game_file_descs .into_iter() .filter(|desc| !desc.is_dir) .collect::>(); if game_file_descs.is_empty() { log::error!("game_file_descs empty: no game files to download"); return Ok(()); } let game_id = game_file_descs .first() .expect("game_file_descs empty: 2nd case CANNOT HAPPEN") .game_id .clone(); tx_notify_ui.send(ClientEvent::DownloadGameFilesBegin { id: game_id.clone(), })?; for file_desc in game_file_descs { log::info!("downloading file: {}", file_desc.relative_path); let stream = conn.open_bidirectional_stream().await?; let (mut rx, mut tx) = stream.split(); let request = Request::GetGameFileData(file_desc.clone()); if let Ok(()) = tx.write_all(&request.encode()).await { let path = PathBuf::from(&games_dir).join(&file_desc.relative_path); let mut file = match File::create(&path) { Ok(file) => file, Err(e) => { log::error!("failed to create file: {e}"); continue; } }; // if let Err(e) = tokio::io::copy(&mut rx, &mut file).await { // log::error!("failed to download file: {e}"); // continue; // } while let Ok(Some(data)) = rx.receive().await { if let Err(e) = file.write_all(&data) { log::error!("failed to write to file: {e}"); break; } } log::error!("file download complete: {}", path.display()); } if let Err(e) = tx.close().await { log::error!("failed to close stream: {e}"); } } log::info!("all files downloaded for game: {game_id}"); tx_notify_ui.send(ClientEvent::DownloadGameFilesFinished { id: game_id })?; Ok(()) } struct Ctx { game_dir: Arc>>, } #[allow(clippy::too_many_lines)] pub async fn run( mut rx_control: UnboundedReceiver, tx_notify_ui: UnboundedSender, ) -> eyre::Result<()> { // blocking wait for remote address log::debug!("waiting for server address"); let server_addr = loop { if let Some(ClientCommand::ServerAddr(addr)) = rx_control.recv().await { log::info!("got server address: {addr}"); break addr; } }; // client context let ctx = Ctx { game_dir: Arc::new(Mutex::new(None)), }; loop { let limits = Limits::default() .with_max_handshake_duration(Duration::from_secs(3))? .with_max_idle_timeout(Duration::from_secs(1))?; let client = QuicClient::builder() .with_tls(CERT_PEM)? .with_io("0.0.0.0:0")? .with_limits(limits)? .start()?; let conn = Connect::new(server_addr).with_server_name("localhost"); let mut conn = match client.connect(conn).await { Ok(conn) => conn, Err(e) => { log::error!("failed to connect to server: {e}"); tokio::time::sleep(Duration::from_secs(3)).await; continue; } }; conn.keep_alive(true)?; if !initial_server_alive_check(&mut conn).await { continue; } log::info!( "connected: (server: {}) (client: {})", maybe_addr!(conn.remote_addr()), maybe_addr!(conn.local_addr()) ); // tx while let Some(cmd) = rx_control.recv().await { let request = match cmd { ClientCommand::ListGames => Request::ListGames, ClientCommand::GetGame(id) => { log::debug!("requesting game from server: {id}"); Request::GetGame { id } } ClientCommand::ServerAddr(_) => { log::warn!("unexpected ServerAddr command from UI client"); continue; } ClientCommand::SetGameDir(game_dir) => { *ctx.game_dir.lock().await = Some(game_dir.clone()); continue; } ClientCommand::DownloadGameFiles(game_file_descs) => { log::info!("got ClientCommand::DownloadGameFiles"); let games_dir = { ctx.game_dir.lock().await.clone() }; if let Some(games_dir) = games_dir { let tx_notify_ui = tx_notify_ui.clone(); tokio::task::spawn(async move { if let Err(e) = download_game_files( game_file_descs, games_dir, server_addr, tx_notify_ui, ) .await { log::error!("failed to download game files: {e}"); } }); } else { log::error!("Cannot handle game file descriptions: game_dir is not set"); } continue; } }; let data = request.encode(); log::debug!("encoded data: {}", String::from_utf8_lossy(&data)); let stream = match conn.open_bidirectional_stream().await { Ok(stream) => stream, Err(e) => { log::error!("failed to open stream: {e}"); break; } }; let (mut rx, mut tx) = stream.split(); if let Err(e) = tx.send(data).await { log::error!("failed to send request to server {:?}", e); } let mut data = BytesMut::new(); while let Ok(Some(bytes)) = rx.receive().await { data.extend_from_slice(&bytes); } log::debug!("{} bytes received from server", data.len()); log::trace!("msg: (raw): {}", String::from_utf8_lossy(&data)); let response = Response::decode(data.freeze()); log::trace!("msg: {response:?}"); match response { Response::Games(games) => { for game in &games { log::trace!("{game}"); } if let Err(e) = tx_notify_ui.send(ClientEvent::ListGames(games)) { log::debug!("failed to send ClientEvent::ListGames to client {e:?}"); } else { log::info!("sent ClientEvent::ListGames to Tauri client"); } } Response::GetGame(game_file_descs) => { log::info!( "got {} game file descriptions from server", game_file_descs.len() ); let games_dir = { ctx.game_dir.lock().await.clone() }; match games_dir { Some(games_dir) => { game_file_descs.iter().filter(|f| f.is_dir).for_each(|dir| { let path = PathBuf::from(&games_dir).join(&dir.relative_path); if let Err(e) = std::fs::create_dir_all(path) { log::error!("failed to create directory: {e}"); } }); if let Err(e) = tx_notify_ui.send(ClientEvent::GotGameFiles(game_file_descs)) { log::error!( "failed to send ClientEvent::GotGameFiles to client: {e}" ); } } None => { log::error!( "Cannot handle game file descriptions: game_dir is not set" ); } } } Response::GameNotFound(id) => log::debug!("game not found {id}"), Response::InvalidRequest(request_bytes, err) => log::error!( "server says our request was invalid (error: {}): {}", err, String::from_utf8_lossy(&request_bytes) ), Response::EncodingError(err) => { log::error!("server encoding error: {err}"); } Response::DecodingError(data, err) => { log::error!( "response decoding error: {} (data: {})", err, String::from_utf8_lossy(&data) ); } Response::Pong => (), // ignore (should never happen) } if let Err(err) = tx.close().await { log::error!("failed to close stream: {err}"); } } } }