diff --git a/README.md b/README.md index dc6eae8..1a63d01 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ upl src/app.rs Axum router, shared state, static file service src/api.rs HTTP handlers and API error responses src/model.rs JSON request, response, and metadata shapes - src/storage.rs local filesystem layout and durable metadata writes + src/storage.rs local filesystem layout, metadata, and chunk writes src/lib.rs library surface used by integration tests Browser UI static/index.html upload tool markup @@ -37,6 +37,8 @@ upl inside this repository. - `UPL_DATA_DIR` sets the upload data directory. It defaults to `data/` inside this repository. +- The server accepts request bodies up to 64 MiB, which leaves room for the + planned 16 MiB upload chunks and matches the nginx example in `PLAN.md`. ## Common Commands diff --git a/TESTS.md b/TESTS.md index 9338299..97ac13b 100644 --- a/TESTS.md +++ b/TESTS.md @@ -12,6 +12,10 @@ Keep this file as the reusable verification checklist while implementing - `GET /healthz` reports `ok`. - `POST /api/uploads` creates `meta.json` and chunk directories. - `POST /api/uploads` rejects an empty file name. + - `PUT /api/uploads/:id/chunks/:index` stores validated chunk files. + - `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks. + - `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks. + - `GET /api/uploads/:id` reports completed chunks from disk. ## Manual diff --git a/src/api.rs b/src/api.rs index 8613d11..4668a62 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,7 @@ use axum::{ Json, + body::Bytes, + extract::Path, extract::State, http::StatusCode, response::{IntoResponse, Response}, @@ -26,6 +28,34 @@ pub async fn create_upload( Ok(Json(meta.create_response())) } +/// Returns server-authoritative upload progress. +/// +/// # Errors +/// +/// Returns an API error when the upload id is invalid, unknown, or cannot be +/// read from storage. +pub async fn get_upload( + State(state): State, + Path(upload_id): Path, +) -> Result, ApiError> { + Ok(Json(state.storage.progress(&upload_id).await?)) +} + +/// Stores one raw binary chunk body. +/// +/// # Errors +/// +/// Returns an API error when the upload id is invalid or unknown, the chunk +/// index is out of range, the body length is wrong, or the write fails. +pub async fn put_chunk( + State(state): State, + Path((upload_id, index)): Path<(String, u64)>, + body: Bytes, +) -> Result { + state.storage.store_chunk(&upload_id, index, &body).await?; + Ok(StatusCode::NO_CONTENT) +} + #[derive(Debug)] pub struct ApiError { status: StatusCode, @@ -46,7 +76,9 @@ impl IntoResponse for ApiError { impl From for ApiError { fn from(error: StorageError) -> Self { - let status = if error.is_invalid_input() { + let status = if error.is_not_found() { + StatusCode::NOT_FOUND + } else if error.is_invalid_input() { StatusCode::BAD_REQUEST } else { StatusCode::INTERNAL_SERVER_ERROR diff --git a/src/app.rs b/src/app.rs index cf9facb..3627524 100644 --- a/src/app.rs +++ b/src/app.rs @@ -7,6 +7,7 @@ use std::{ use axum::{ Router, + extract::DefaultBodyLimit, routing::{get, post}, }; use tower_http::services::{ServeDir, ServeFile}; @@ -17,6 +18,7 @@ const DEFAULT_BIND_ADDR: &str = "127.0.0.1:3000"; const STATIC_DIR_ENV: &str = "UPL_STATIC_DIR"; const DATA_DIR_ENV: &str = "UPL_DATA_DIR"; const BIND_ENV: &str = "UPL_BIND"; +const MAX_REQUEST_BODY_BYTES: usize = 64 * 1024 * 1024; #[derive(Clone, Debug)] pub struct AppConfig { @@ -72,6 +74,12 @@ pub fn build_router(config: &AppConfig) -> Router { Router::new() .route("/healthz", get(healthz)) .route("/api/uploads", post(api::create_upload)) + .route("/api/uploads/{upload_id}", get(api::get_upload)) + .route( + "/api/uploads/{upload_id}/chunks/{index}", + axum::routing::put(api::put_chunk), + ) + .layer(DefaultBodyLimit::max(MAX_REQUEST_BODY_BYTES)) .fallback_service(static_service(&config.static_dir)) .with_state(state) } diff --git a/src/model.rs b/src/model.rs index 0daf1ec..be94724 100644 --- a/src/model.rs +++ b/src/model.rs @@ -17,6 +17,16 @@ pub struct CreateUploadResponse { pub completed_chunks: Vec, } +#[derive(Debug, Deserialize, Serialize)] +pub struct UploadProgressResponse { + pub upload_id: String, + pub name: String, + pub size: u64, + pub chunk_size: u64, + pub total_chunks: u64, + pub completed_chunks: Vec, +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct UploadMeta { pub id: String, @@ -39,4 +49,16 @@ impl UploadMeta { completed_chunks: Vec::new(), } } + + #[must_use] + pub fn progress_response(&self, completed_chunks: Vec) -> UploadProgressResponse { + UploadProgressResponse { + upload_id: self.id.clone(), + name: self.original_name.clone(), + size: self.size, + chunk_size: self.chunk_size, + total_chunks: self.total_chunks, + completed_chunks, + } + } } diff --git a/src/storage.rs b/src/storage.rs index 3e28a36..b62ae1a 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -8,7 +8,7 @@ use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use tokio::fs; use uuid::Uuid; -use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta}; +use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta, UploadProgressResponse}; #[derive(Clone, Debug)] pub struct Storage { @@ -70,6 +70,59 @@ impl Storage { Err(StorageError::IdCollision) } + /// Loads upload progress by scanning durable chunk files. + /// + /// # Errors + /// + /// Returns an error when the upload id is invalid, metadata is missing, or + /// the staging directory cannot be scanned. + pub async fn progress(&self, upload_id: &str) -> Result { + let meta = self.load_meta(upload_id).await?; + let completed_chunks = self.completed_chunks(&meta).await?; + + Ok(meta.progress_response(completed_chunks)) + } + + /// Validates and stores one raw chunk body. + /// + /// # Errors + /// + /// Returns an error when the upload is unknown, the index is out of range, + /// the body length is not the expected chunk length, or the chunk cannot be + /// written and renamed into place. + pub async fn store_chunk( + &self, + upload_id: &str, + index: u64, + body: &[u8], + ) -> Result<(), StorageError> { + let meta = self.load_meta(upload_id).await?; + let expected_len = expected_chunk_len(&meta, index)?; + let body_len = u64::try_from(body.len()) + .map_err(|_| StorageError::InvalidInput("chunk body is too large"))?; + + if body_len != expected_len { + return Err(StorageError::InvalidInput("chunk has the wrong length")); + } + + let part_path = self.chunk_path(upload_id, index); + if let Some(existing_len) = file_len(&part_path).await? { + if existing_len == expected_len { + return Ok(()); + } + + return Err(StorageError::InvalidInput( + "existing chunk has the wrong length", + )); + } + + let tmp_path = part_path.with_extension("part.tmp"); + fs::write(&tmp_path, body).await?; + fs::rename(&tmp_path, &part_path).await?; + + Ok(()) + } + #[must_use] pub fn data_dir(&self) -> &Path { &self.data_dir @@ -87,6 +140,12 @@ impl Storage { self.staging_dir().join(upload_id) } + fn chunk_path(&self, upload_id: &str, index: u64) -> PathBuf { + self.upload_dir(upload_id) + .join("chunks") + .join(format!("{index:06}.part")) + } + async fn ensure_layout(&self) -> Result<(), StorageError> { fs::create_dir_all(self.staging_dir()).await?; fs::create_dir_all(self.complete_dir()).await?; @@ -103,6 +162,34 @@ impl Storage { Ok(()) } + + async fn load_meta(&self, upload_id: &str) -> Result { + validate_upload_id(upload_id)?; + + let meta_path = self.upload_dir(upload_id).join("meta.json"); + let bytes = match fs::read(meta_path).await { + Ok(bytes) => bytes, + Err(error) if error.kind() == std::io::ErrorKind::NotFound => { + return Err(StorageError::NotFound); + } + Err(error) => return Err(error.into()), + }; + + Ok(serde_json::from_slice(&bytes)?) + } + + async fn completed_chunks(&self, meta: &UploadMeta) -> Result, StorageError> { + let mut completed = Vec::new(); + + for index in 0..meta.total_chunks { + let expected_len = expected_chunk_len(meta, index)?; + if file_len(&self.chunk_path(&meta.id, index)).await? == Some(expected_len) { + completed.push(index); + } + } + + Ok(completed) + } } #[derive(Debug)] @@ -112,6 +199,7 @@ pub enum StorageError { InvalidInput(&'static str), Io(std::io::Error), Json(serde_json::Error), + NotFound, } impl StorageError { @@ -119,6 +207,11 @@ impl StorageError { pub fn is_invalid_input(&self) -> bool { matches!(self, Self::InvalidInput(_)) } + + #[must_use] + pub fn is_not_found(&self) -> bool { + matches!(self, Self::NotFound) + } } impl Display for StorageError { @@ -129,6 +222,7 @@ impl Display for StorageError { Self::InvalidInput(message) => formatter.write_str(message), Self::Io(error) => write!(formatter, "storage I/O error: {error}"), Self::Json(error) => write!(formatter, "metadata JSON error: {error}"), + Self::NotFound => formatter.write_str("upload not found"), } } } @@ -139,7 +233,7 @@ impl Error for StorageError { Self::Format(error) => Some(error), Self::Io(error) => Some(error), Self::Json(error) => Some(error), - Self::IdCollision | Self::InvalidInput(_) => None, + Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None, } } } @@ -188,9 +282,45 @@ fn total_chunks(size: u64, chunk_size: u64) -> u64 { } } +fn expected_chunk_len(meta: &UploadMeta, index: u64) -> Result { + if index >= meta.total_chunks { + return Err(StorageError::InvalidInput("chunk index is out of range")); + } + + if index + 1 == meta.total_chunks { + let bytes_before_final = meta.chunk_size * index; + Ok(meta.size - bytes_before_final) + } else { + Ok(meta.chunk_size) + } +} + +async fn file_len(path: &Path) -> Result, StorageError> { + match fs::metadata(path).await { + Ok(metadata) if metadata.is_file() => Ok(Some(metadata.len())), + Ok(_) => Ok(None), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(error) => Err(error.into()), + } +} + +fn validate_upload_id(upload_id: &str) -> Result<(), StorageError> { + let is_valid = !upload_id.is_empty() + && upload_id + .bytes() + .all(|byte| byte.is_ascii_alphanumeric() || byte == b'-' || byte == b'_'); + + if is_valid { + Ok(()) + } else { + Err(StorageError::InvalidInput("upload id is invalid")) + } +} + #[cfg(test)] mod tests { - use super::{safe_file_name, total_chunks}; + use super::{expected_chunk_len, safe_file_name, total_chunks, validate_upload_id}; + use crate::model::UploadMeta; #[test] fn computes_total_chunks() { @@ -205,4 +335,36 @@ mod tests { assert_eq!(safe_file_name("../movie:mkv"), "movie_mkv"); assert_eq!(safe_file_name(" "), "upload"); } + + #[test] + fn computes_expected_chunk_lengths() -> Result<(), Box> { + let meta = test_meta(33, 16, 3); + + assert_eq!(expected_chunk_len(&meta, 0)?, 16); + assert_eq!(expected_chunk_len(&meta, 1)?, 16); + assert_eq!(expected_chunk_len(&meta, 2)?, 1); + assert!(expected_chunk_len(&meta, 3).is_err()); + + Ok(()) + } + + #[test] + fn validates_upload_ids() { + assert!(validate_upload_id("abc123").is_ok()); + assert!(validate_upload_id("../abc").is_err()); + assert!(validate_upload_id("").is_err()); + } + + fn test_meta(size: u64, chunk_size: u64, total_chunks: u64) -> UploadMeta { + UploadMeta { + id: "abc123".to_owned(), + original_name: "file.bin".to_owned(), + safe_name: "file.bin".to_owned(), + size, + last_modified: 0, + chunk_size, + total_chunks, + created_at: "2026-05-30T16:00:00Z".to_owned(), + } + } } diff --git a/tests/chunk_upload.rs b/tests/chunk_upload.rs new file mode 100644 index 0000000..c641386 --- /dev/null +++ b/tests/chunk_upload.rs @@ -0,0 +1,187 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + path::Path, +}; + +use axum::{ + body::Body, + http::{Method, Request, StatusCode, header}, +}; +use http_body_util::BodyExt; +use serde_json::json; +use tempfile::TempDir; +use tower::ServiceExt; +use upl::{ + app::{AppConfig, build_router}, + model::{CHUNK_SIZE, CreateUploadResponse, UploadProgressResponse}, +}; + +#[tokio::test] +async fn stores_chunks_and_reports_progress() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, temp_dir.path(), CHUNK_SIZE + 3).await?; + + let final_chunk = vec![b'z'; 3]; + let response = app + .clone() + .oneshot(chunk_request(&upload.upload_id, 1, final_chunk)?) + .await?; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + let progress = get_progress(&app, &upload.upload_id).await?; + assert_eq!(progress.completed_chunks, vec![1]); + + let first_chunk = vec![b'a'; usize::try_from(CHUNK_SIZE)?]; + let response = app + .clone() + .oneshot(chunk_request(&upload.upload_id, 0, first_chunk)?) + .await?; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + let progress = get_progress(&app, &upload.upload_id).await?; + assert_eq!(progress.completed_chunks, vec![0, 1]); + + let chunk_path = temp_dir + .path() + .join("staging") + .join(&upload.upload_id) + .join("chunks") + .join("000000.part"); + assert_eq!(tokio::fs::metadata(chunk_path).await?.len(), CHUNK_SIZE); + + Ok(()) +} + +#[tokio::test] +async fn rejects_wrong_size_non_final_chunk() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, temp_dir.path(), CHUNK_SIZE + 1).await?; + + let response = app + .oneshot(chunk_request(&upload.upload_id, 0, b"too short".to_vec())?) + .await?; + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + Ok(()) +} + +#[tokio::test] +async fn accepts_duplicate_chunk_when_existing_length_matches() +-> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, temp_dir.path(), 4).await?; + + let first = app + .clone() + .oneshot(chunk_request(&upload.upload_id, 0, b"data".to_vec())?) + .await?; + let second = app + .oneshot(chunk_request(&upload.upload_id, 0, b"data".to_vec())?) + .await?; + + assert_eq!(first.status(), StatusCode::NO_CONTENT); + assert_eq!(second.status(), StatusCode::NO_CONTENT); + + Ok(()) +} + +#[tokio::test] +async fn rejects_unknown_upload_id() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + + let response = app + .oneshot(chunk_request("missing", 0, b"data".to_vec())?) + .await?; + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + Ok(()) +} + +async fn create_upload( + app: &axum::Router, + data_dir: &Path, + size: u64, +) -> Result> { + let response = app + .clone() + .oneshot(json_request( + Method::POST, + "/api/uploads", + &json!({ + "name": "chunked.bin", + "size": size, + "last_modified": 1_760_000_000_000_i64 + }), + )?) + .await?; + + assert_eq!(response.status(), StatusCode::OK); + assert!(data_dir.join("staging").is_dir()); + + decode_body(response).await +} + +async fn get_progress( + app: &axum::Router, + upload_id: &str, +) -> Result> { + let response = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri(format!("/api/uploads/{upload_id}")) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(response.status(), StatusCode::OK); + + decode_body(response).await +} + +fn test_app(data_dir: &Path) -> axum::Router { + build_router(&AppConfig::new( + SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), + concat!(env!("CARGO_MANIFEST_DIR"), "/static"), + data_dir, + )) +} + +fn json_request( + method: Method, + uri: &str, + body: &serde_json::Value, +) -> Result, Box> { + Ok(Request::builder() + .method(method) + .uri(uri) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_vec(body)?))?) +} + +fn chunk_request( + upload_id: &str, + index: u64, + body: Vec, +) -> Result, Box> { + Ok(Request::builder() + .method(Method::PUT) + .uri(format!("/api/uploads/{upload_id}/chunks/{index}")) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(Body::from(body))?) +} + +async fn decode_body(response: axum::response::Response) -> Result> +where + T: serde::de::DeserializeOwned, +{ + let body = response.into_body().collect().await?.to_bytes(); + Ok(serde_json::from_slice(&body)?) +}