diff --git a/README.md b/README.md index 9a76eac..dc6eae8 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,10 @@ assembly are tracked in `PLAN.md` and will be added in later coherent slices. upl Rust server src/main.rs binary entrypoint and listener setup - src/app.rs Axum router, health endpoint, static file service + src/app.rs Axum router, shared state, static file service + src/api.rs HTTP handlers and API error responses + src/model.rs JSON request, response, and metadata shapes + src/storage.rs local filesystem layout and durable metadata writes src/lib.rs library surface used by integration tests Browser UI static/index.html upload tool markup @@ -32,6 +35,8 @@ upl - `UPL_BIND` sets the listen address. It defaults to `127.0.0.1:3000`. - `UPL_STATIC_DIR` sets the static asset directory. It defaults to `static/` inside this repository. +- `UPL_DATA_DIR` sets the upload data directory. It defaults to `data/` inside + this repository. ## Common Commands diff --git a/TESTS.md b/TESTS.md index ffcc4b5..9338299 100644 --- a/TESTS.md +++ b/TESTS.md @@ -10,6 +10,8 @@ Keep this file as the reusable verification checklist while implementing - Current coverage: - `GET /` serves the static browser page. - `GET /healthz` reports `ok`. + - `POST /api/uploads` creates `meta.json` and chunk directories. + - `POST /api/uploads` rejects an empty file name. ## Manual diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 0000000..8613d11 --- /dev/null +++ b/src/api.rs @@ -0,0 +1,65 @@ +use axum::{ + Json, + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use serde::Serialize; + +use crate::{ + app::AppState, + model::{CreateUploadRequest, CreateUploadResponse}, + storage::StorageError, +}; + +/// Creates an upload record and persists its metadata before returning. +/// +/// # Errors +/// +/// Returns an API error when request validation fails or metadata cannot be +/// written to storage. +pub async fn create_upload( + State(state): State, + Json(request): Json, +) -> Result, ApiError> { + let meta = state.storage.create_upload(request).await?; + Ok(Json(meta.create_response())) +} + +#[derive(Debug)] +pub struct ApiError { + status: StatusCode, + message: String, +} + +impl IntoResponse for ApiError { + fn into_response(self) -> Response { + ( + self.status, + Json(ErrorResponse { + error: self.message, + }), + ) + .into_response() + } +} + +impl From for ApiError { + fn from(error: StorageError) -> Self { + let status = if error.is_invalid_input() { + StatusCode::BAD_REQUEST + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + + Self { + status, + message: error.to_string(), + } + } +} + +#[derive(Serialize)] +struct ErrorResponse { + error: String, +} diff --git a/src/app.rs b/src/app.rs index 651c4cf..cf9facb 100644 --- a/src/app.rs +++ b/src/app.rs @@ -5,17 +5,29 @@ use std::{ path::{Path, PathBuf}, }; -use axum::{Router, routing::get}; +use axum::{ + Router, + routing::{get, post}, +}; use tower_http::services::{ServeDir, ServeFile}; +use crate::{api, storage::Storage}; + const DEFAULT_BIND_ADDR: &str = "127.0.0.1:3000"; const STATIC_DIR_ENV: &str = "UPL_STATIC_DIR"; +const DATA_DIR_ENV: &str = "UPL_DATA_DIR"; const BIND_ENV: &str = "UPL_BIND"; #[derive(Clone, Debug)] pub struct AppConfig { pub bind_addr: SocketAddr, pub static_dir: PathBuf, + pub data_dir: PathBuf, +} + +#[derive(Clone, Debug)] +pub struct AppState { + pub storage: Storage, } impl AppConfig { @@ -29,26 +41,39 @@ impl AppConfig { .unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_owned()) .parse()?; let static_dir = env::var_os(STATIC_DIR_ENV).map_or_else(default_static_dir, PathBuf::from); + let data_dir = env::var_os(DATA_DIR_ENV).map_or_else(default_data_dir, PathBuf::from); Ok(Self { bind_addr, static_dir, + data_dir, }) } #[must_use] - pub fn new(bind_addr: SocketAddr, static_dir: impl Into) -> Self { + pub fn new( + bind_addr: SocketAddr, + static_dir: impl Into, + data_dir: impl Into, + ) -> Self { Self { bind_addr, static_dir: static_dir.into(), + data_dir: data_dir.into(), } } } pub fn build_router(config: &AppConfig) -> Router { + let state = AppState { + storage: Storage::new(&config.data_dir), + }; + Router::new() .route("/healthz", get(healthz)) + .route("/api/uploads", post(api::create_upload)) .fallback_service(static_service(&config.static_dir)) + .with_state(state) } async fn healthz() -> &'static str { @@ -62,3 +87,7 @@ fn static_service(static_dir: &Path) -> ServeDir { fn default_static_dir() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("static") } + +fn default_data_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("data") +} diff --git a/src/lib.rs b/src/lib.rs index 309be62..38db1b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,4 @@ +pub mod api; pub mod app; +pub mod model; +pub mod storage; diff --git a/src/model.rs b/src/model.rs new file mode 100644 index 0000000..0daf1ec --- /dev/null +++ b/src/model.rs @@ -0,0 +1,42 @@ +use serde::{Deserialize, Serialize}; + +pub const CHUNK_SIZE: u64 = 16 * 1024 * 1024; + +#[derive(Debug, Deserialize)] +pub struct CreateUploadRequest { + pub name: String, + pub size: u64, + pub last_modified: i64, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct CreateUploadResponse { + pub upload_id: String, + pub chunk_size: u64, + pub total_chunks: u64, + pub completed_chunks: Vec, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UploadMeta { + pub id: String, + pub original_name: String, + pub safe_name: String, + pub size: u64, + pub last_modified: i64, + pub chunk_size: u64, + pub total_chunks: u64, + pub created_at: String, +} + +impl UploadMeta { + #[must_use] + pub fn create_response(&self) -> CreateUploadResponse { + CreateUploadResponse { + upload_id: self.id.clone(), + chunk_size: self.chunk_size, + total_chunks: self.total_chunks, + completed_chunks: Vec::new(), + } + } +} diff --git a/src/storage.rs b/src/storage.rs new file mode 100644 index 0000000..3e28a36 --- /dev/null +++ b/src/storage.rs @@ -0,0 +1,208 @@ +use std::{ + error::Error, + fmt::{self, Display}, + path::{Path, PathBuf}, +}; + +use time::{OffsetDateTime, format_description::well_known::Rfc3339}; +use tokio::fs; +use uuid::Uuid; + +use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta}; + +#[derive(Clone, Debug)] +pub struct Storage { + data_dir: PathBuf, +} + +impl Storage { + #[must_use] + pub fn new(data_dir: impl Into) -> Self { + Self { + data_dir: data_dir.into(), + } + } + + /// Creates a durable upload metadata record under `data/staging`. + /// + /// # Errors + /// + /// Returns an error when directories cannot be created, metadata cannot be + /// serialized, or the metadata file cannot be written atomically. + pub async fn create_upload( + &self, + request: CreateUploadRequest, + ) -> Result { + let original_name = request.name.trim(); + if original_name.is_empty() { + return Err(StorageError::InvalidInput("file name cannot be empty")); + } + + self.ensure_layout().await?; + + let safe_name = safe_file_name(original_name); + let created_at = OffsetDateTime::now_utc().format(&Rfc3339)?; + + for _ in 0..8 { + let id = Uuid::new_v4().simple().to_string(); + let upload_dir = self.upload_dir(&id); + if fs::try_exists(&upload_dir).await? { + continue; + } + + fs::create_dir_all(upload_dir.join("chunks")).await?; + + let meta = UploadMeta { + id, + original_name: original_name.to_owned(), + safe_name, + size: request.size, + last_modified: request.last_modified, + chunk_size: CHUNK_SIZE, + total_chunks: total_chunks(request.size, CHUNK_SIZE), + created_at, + }; + + self.write_meta(&meta).await?; + return Ok(meta); + } + + Err(StorageError::IdCollision) + } + + #[must_use] + pub fn data_dir(&self) -> &Path { + &self.data_dir + } + + fn staging_dir(&self) -> PathBuf { + self.data_dir.join("staging") + } + + fn complete_dir(&self) -> PathBuf { + self.data_dir.join("complete") + } + + fn upload_dir(&self, upload_id: &str) -> PathBuf { + self.staging_dir().join(upload_id) + } + + async fn ensure_layout(&self) -> Result<(), StorageError> { + fs::create_dir_all(self.staging_dir()).await?; + fs::create_dir_all(self.complete_dir()).await?; + Ok(()) + } + + async fn write_meta(&self, meta: &UploadMeta) -> Result<(), StorageError> { + let meta_path = self.upload_dir(&meta.id).join("meta.json"); + let tmp_path = meta_path.with_extension("json.tmp"); + let json = serde_json::to_vec_pretty(meta)?; + + fs::write(&tmp_path, json).await?; + fs::rename(&tmp_path, &meta_path).await?; + + Ok(()) + } +} + +#[derive(Debug)] +pub enum StorageError { + Format(time::error::Format), + IdCollision, + InvalidInput(&'static str), + Io(std::io::Error), + Json(serde_json::Error), +} + +impl StorageError { + #[must_use] + pub fn is_invalid_input(&self) -> bool { + matches!(self, Self::InvalidInput(_)) + } +} + +impl Display for StorageError { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Format(error) => write!(formatter, "failed to format timestamp: {error}"), + Self::IdCollision => formatter.write_str("could not allocate a unique upload id"), + Self::InvalidInput(message) => formatter.write_str(message), + Self::Io(error) => write!(formatter, "storage I/O error: {error}"), + Self::Json(error) => write!(formatter, "metadata JSON error: {error}"), + } + } +} + +impl Error for StorageError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Format(error) => Some(error), + Self::Io(error) => Some(error), + Self::Json(error) => Some(error), + Self::IdCollision | Self::InvalidInput(_) => None, + } + } +} + +impl From for StorageError { + fn from(error: std::io::Error) -> Self { + Self::Io(error) + } +} + +impl From for StorageError { + fn from(error: serde_json::Error) -> Self { + Self::Json(error) + } +} + +impl From for StorageError { + fn from(error: time::error::Format) -> Self { + Self::Format(error) + } +} + +fn safe_file_name(name: &str) -> String { + let safe: String = name + .chars() + .map(|character| match character { + '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_', + character if character.is_control() => '_', + character => character, + }) + .collect(); + + let trimmed = safe.trim_matches(['.', ' ', '_']); + if trimmed.is_empty() { + "upload".to_owned() + } else { + trimmed.to_owned() + } +} + +fn total_chunks(size: u64, chunk_size: u64) -> u64 { + if size == 0 { + 0 + } else { + size.div_ceil(chunk_size) + } +} + +#[cfg(test)] +mod tests { + use super::{safe_file_name, total_chunks}; + + #[test] + fn computes_total_chunks() { + assert_eq!(total_chunks(0, 16), 0); + assert_eq!(total_chunks(1, 16), 1); + assert_eq!(total_chunks(16, 16), 1); + assert_eq!(total_chunks(17, 16), 2); + } + + #[test] + fn sanitizes_file_names() { + assert_eq!(safe_file_name("../movie:mkv"), "movie_mkv"); + assert_eq!(safe_file_name(" "), "upload"); + } +} diff --git a/tests/static_server.rs b/tests/static_server.rs index 8e1c89e..4d612c8 100644 --- a/tests/static_server.rs +++ b/tests/static_server.rs @@ -1,4 +1,7 @@ -use std::net::{Ipv4Addr, SocketAddr}; +use std::{ + net::{Ipv4Addr, SocketAddr}, + path::Path, +}; use axum::{body::Body, http::Request}; use http_body_util::BodyExt; @@ -44,5 +47,6 @@ fn test_app() -> axum::Router { build_router(&AppConfig::new( SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), concat!(env!("CARGO_MANIFEST_DIR"), "/static"), + Path::new(env!("CARGO_MANIFEST_DIR")).join("target/test-data/static-server"), )) } diff --git a/tests/upload_creation.rs b/tests/upload_creation.rs new file mode 100644 index 0000000..f12477a --- /dev/null +++ b/tests/upload_creation.rs @@ -0,0 +1,96 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + path::Path, +}; + +use axum::{ + body::Body, + http::{Request, StatusCode, header}, +}; +use http_body_util::BodyExt; +use serde_json::json; +use tempfile::TempDir; +use tower::ServiceExt; +use upl::{ + app::{AppConfig, build_router}, + model::{CHUNK_SIZE, CreateUploadResponse, UploadMeta}, +}; + +#[tokio::test] +async fn creates_upload_metadata_on_disk() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + + let response = app + .oneshot(json_request( + "/api/uploads", + &json!({ + "name": "movie:mkv", + "size": CHUNK_SIZE + 1, + "last_modified": 1_760_000_000_000_i64 + }), + )?) + .await?; + + assert_eq!(response.status(), StatusCode::OK); + + let response_body = response.into_body().collect().await?.to_bytes(); + let response: CreateUploadResponse = serde_json::from_slice(&response_body)?; + + assert_eq!(response.chunk_size, CHUNK_SIZE); + assert_eq!(response.total_chunks, 2); + assert!(response.completed_chunks.is_empty()); + + let upload_dir = temp_dir.path().join("staging").join(&response.upload_id); + let meta_path = upload_dir.join("meta.json"); + assert!(upload_dir.join("chunks").is_dir()); + assert!(temp_dir.path().join("complete").is_dir()); + + let meta: UploadMeta = serde_json::from_slice(&tokio::fs::read(meta_path).await?)?; + assert_eq!(meta.id, response.upload_id); + assert_eq!(meta.original_name, "movie:mkv"); + assert_eq!(meta.safe_name, "movie_mkv"); + assert_eq!(meta.total_chunks, 2); + + Ok(()) +} + +#[tokio::test] +async fn rejects_empty_upload_name() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + + let response = app + .oneshot(json_request( + "/api/uploads", + &json!({ + "name": " ", + "size": 10, + "last_modified": 1_760_000_000_000_i64 + }), + )?) + .await?; + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + Ok(()) +} + +fn test_app(data_dir: &Path) -> axum::Router { + build_router(&AppConfig::new( + SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), + concat!(env!("CARGO_MANIFEST_DIR"), "/static"), + data_dir, + )) +} + +fn json_request( + uri: &str, + body: &serde_json::Value, +) -> Result, Box> { + Ok(Request::builder() + .method("POST") + .uri(uri) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_vec(&body)?))?) +}