From 1594c65d891bfb9bdc1f86481dffbb23aa3aeebb Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sat, 30 May 2026 17:00:42 +0200 Subject: [PATCH] feat: store raw upload chunks Add the chunk upload and progress APIs from PLAN.md. PUT /api/uploads/{id}/chunks/{index} now accepts raw octet-stream bodies, validates unknown upload ids, out-of-range chunk indexes, and exact chunk lengths, then writes through a temporary .part.tmp path before renaming the completed chunk into place. Re-uploading an already-complete chunk is idempotent when the existing file length matches the expected length. GET /api/uploads/{id} now reports server-authoritative progress by scanning the chunk directory and only counting chunk files whose lengths match metadata. The router also raises Axum's request body limit to 64 MiB so the planned 16 MiB chunks can reach the handler, matching the nginx deployment guidance. Document the chunk storage responsibility and extend the reusable test checklist with the new progress and validation coverage. Test Plan: - just check Refs: PLAN.md milestones 3 and 4 --- README.md | 4 +- TESTS.md | 4 + src/api.rs | 34 +++++++- src/app.rs | 8 ++ src/model.rs | 22 +++++ src/storage.rs | 168 ++++++++++++++++++++++++++++++++++++- tests/chunk_upload.rs | 187 ++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 422 insertions(+), 5 deletions(-) create mode 100644 tests/chunk_upload.rs diff --git a/README.md b/README.md index dc6eae8..1a63d01 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ upl src/app.rs Axum router, shared state, static file service src/api.rs HTTP handlers and API error responses src/model.rs JSON request, response, and metadata shapes - src/storage.rs local filesystem layout and durable metadata writes + src/storage.rs local filesystem layout, metadata, and chunk writes src/lib.rs library surface used by integration tests Browser UI static/index.html upload tool markup @@ -37,6 +37,8 @@ upl inside this repository. - `UPL_DATA_DIR` sets the upload data directory. It defaults to `data/` inside this repository. +- The server accepts request bodies up to 64 MiB, which leaves room for the + planned 16 MiB upload chunks and matches the nginx example in `PLAN.md`. ## Common Commands diff --git a/TESTS.md b/TESTS.md index 9338299..97ac13b 100644 --- a/TESTS.md +++ b/TESTS.md @@ -12,6 +12,10 @@ Keep this file as the reusable verification checklist while implementing - `GET /healthz` reports `ok`. - `POST /api/uploads` creates `meta.json` and chunk directories. - `POST /api/uploads` rejects an empty file name. + - `PUT /api/uploads/:id/chunks/:index` stores validated chunk files. + - `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks. + - `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks. + - `GET /api/uploads/:id` reports completed chunks from disk. ## Manual diff --git a/src/api.rs b/src/api.rs index 8613d11..4668a62 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,7 @@ use axum::{ Json, + body::Bytes, + extract::Path, extract::State, http::StatusCode, response::{IntoResponse, Response}, @@ -26,6 +28,34 @@ pub async fn create_upload( Ok(Json(meta.create_response())) } +/// Returns server-authoritative upload progress. +/// +/// # Errors +/// +/// Returns an API error when the upload id is invalid, unknown, or cannot be +/// read from storage. +pub async fn get_upload( + State(state): State, + Path(upload_id): Path, +) -> Result, ApiError> { + Ok(Json(state.storage.progress(&upload_id).await?)) +} + +/// Stores one raw binary chunk body. +/// +/// # Errors +/// +/// Returns an API error when the upload id is invalid or unknown, the chunk +/// index is out of range, the body length is wrong, or the write fails. +pub async fn put_chunk( + State(state): State, + Path((upload_id, index)): Path<(String, u64)>, + body: Bytes, +) -> Result { + state.storage.store_chunk(&upload_id, index, &body).await?; + Ok(StatusCode::NO_CONTENT) +} + #[derive(Debug)] pub struct ApiError { status: StatusCode, @@ -46,7 +76,9 @@ impl IntoResponse for ApiError { impl From for ApiError { fn from(error: StorageError) -> Self { - let status = if error.is_invalid_input() { + let status = if error.is_not_found() { + StatusCode::NOT_FOUND + } else if error.is_invalid_input() { StatusCode::BAD_REQUEST } else { StatusCode::INTERNAL_SERVER_ERROR diff --git a/src/app.rs b/src/app.rs index cf9facb..3627524 100644 --- a/src/app.rs +++ b/src/app.rs @@ -7,6 +7,7 @@ use std::{ use axum::{ Router, + extract::DefaultBodyLimit, routing::{get, post}, }; use tower_http::services::{ServeDir, ServeFile}; @@ -17,6 +18,7 @@ const DEFAULT_BIND_ADDR: &str = "127.0.0.1:3000"; const STATIC_DIR_ENV: &str = "UPL_STATIC_DIR"; const DATA_DIR_ENV: &str = "UPL_DATA_DIR"; const BIND_ENV: &str = "UPL_BIND"; +const MAX_REQUEST_BODY_BYTES: usize = 64 * 1024 * 1024; #[derive(Clone, Debug)] pub struct AppConfig { @@ -72,6 +74,12 @@ pub fn build_router(config: &AppConfig) -> Router { Router::new() .route("/healthz", get(healthz)) .route("/api/uploads", post(api::create_upload)) + .route("/api/uploads/{upload_id}", get(api::get_upload)) + .route( + "/api/uploads/{upload_id}/chunks/{index}", + axum::routing::put(api::put_chunk), + ) + .layer(DefaultBodyLimit::max(MAX_REQUEST_BODY_BYTES)) .fallback_service(static_service(&config.static_dir)) .with_state(state) } diff --git a/src/model.rs b/src/model.rs index 0daf1ec..be94724 100644 --- a/src/model.rs +++ b/src/model.rs @@ -17,6 +17,16 @@ pub struct CreateUploadResponse { pub completed_chunks: Vec, } +#[derive(Debug, Deserialize, Serialize)] +pub struct UploadProgressResponse { + pub upload_id: String, + pub name: String, + pub size: u64, + pub chunk_size: u64, + pub total_chunks: u64, + pub completed_chunks: Vec, +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct UploadMeta { pub id: String, @@ -39,4 +49,16 @@ impl UploadMeta { completed_chunks: Vec::new(), } } + + #[must_use] + pub fn progress_response(&self, completed_chunks: Vec) -> UploadProgressResponse { + UploadProgressResponse { + upload_id: self.id.clone(), + name: self.original_name.clone(), + size: self.size, + chunk_size: self.chunk_size, + total_chunks: self.total_chunks, + completed_chunks, + } + } } diff --git a/src/storage.rs b/src/storage.rs index 3e28a36..b62ae1a 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -8,7 +8,7 @@ use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use tokio::fs; use uuid::Uuid; -use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta}; +use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta, UploadProgressResponse}; #[derive(Clone, Debug)] pub struct Storage { @@ -70,6 +70,59 @@ impl Storage { Err(StorageError::IdCollision) } + /// Loads upload progress by scanning durable chunk files. + /// + /// # Errors + /// + /// Returns an error when the upload id is invalid, metadata is missing, or + /// the staging directory cannot be scanned. + pub async fn progress(&self, upload_id: &str) -> Result { + let meta = self.load_meta(upload_id).await?; + let completed_chunks = self.completed_chunks(&meta).await?; + + Ok(meta.progress_response(completed_chunks)) + } + + /// Validates and stores one raw chunk body. + /// + /// # Errors + /// + /// Returns an error when the upload is unknown, the index is out of range, + /// the body length is not the expected chunk length, or the chunk cannot be + /// written and renamed into place. + pub async fn store_chunk( + &self, + upload_id: &str, + index: u64, + body: &[u8], + ) -> Result<(), StorageError> { + let meta = self.load_meta(upload_id).await?; + let expected_len = expected_chunk_len(&meta, index)?; + let body_len = u64::try_from(body.len()) + .map_err(|_| StorageError::InvalidInput("chunk body is too large"))?; + + if body_len != expected_len { + return Err(StorageError::InvalidInput("chunk has the wrong length")); + } + + let part_path = self.chunk_path(upload_id, index); + if let Some(existing_len) = file_len(&part_path).await? { + if existing_len == expected_len { + return Ok(()); + } + + return Err(StorageError::InvalidInput( + "existing chunk has the wrong length", + )); + } + + let tmp_path = part_path.with_extension("part.tmp"); + fs::write(&tmp_path, body).await?; + fs::rename(&tmp_path, &part_path).await?; + + Ok(()) + } + #[must_use] pub fn data_dir(&self) -> &Path { &self.data_dir @@ -87,6 +140,12 @@ impl Storage { self.staging_dir().join(upload_id) } + fn chunk_path(&self, upload_id: &str, index: u64) -> PathBuf { + self.upload_dir(upload_id) + .join("chunks") + .join(format!("{index:06}.part")) + } + async fn ensure_layout(&self) -> Result<(), StorageError> { fs::create_dir_all(self.staging_dir()).await?; fs::create_dir_all(self.complete_dir()).await?; @@ -103,6 +162,34 @@ impl Storage { Ok(()) } + + async fn load_meta(&self, upload_id: &str) -> Result { + validate_upload_id(upload_id)?; + + let meta_path = self.upload_dir(upload_id).join("meta.json"); + let bytes = match fs::read(meta_path).await { + Ok(bytes) => bytes, + Err(error) if error.kind() == std::io::ErrorKind::NotFound => { + return Err(StorageError::NotFound); + } + Err(error) => return Err(error.into()), + }; + + Ok(serde_json::from_slice(&bytes)?) + } + + async fn completed_chunks(&self, meta: &UploadMeta) -> Result, StorageError> { + let mut completed = Vec::new(); + + for index in 0..meta.total_chunks { + let expected_len = expected_chunk_len(meta, index)?; + if file_len(&self.chunk_path(&meta.id, index)).await? == Some(expected_len) { + completed.push(index); + } + } + + Ok(completed) + } } #[derive(Debug)] @@ -112,6 +199,7 @@ pub enum StorageError { InvalidInput(&'static str), Io(std::io::Error), Json(serde_json::Error), + NotFound, } impl StorageError { @@ -119,6 +207,11 @@ impl StorageError { pub fn is_invalid_input(&self) -> bool { matches!(self, Self::InvalidInput(_)) } + + #[must_use] + pub fn is_not_found(&self) -> bool { + matches!(self, Self::NotFound) + } } impl Display for StorageError { @@ -129,6 +222,7 @@ impl Display for StorageError { Self::InvalidInput(message) => formatter.write_str(message), Self::Io(error) => write!(formatter, "storage I/O error: {error}"), Self::Json(error) => write!(formatter, "metadata JSON error: {error}"), + Self::NotFound => formatter.write_str("upload not found"), } } } @@ -139,7 +233,7 @@ impl Error for StorageError { Self::Format(error) => Some(error), Self::Io(error) => Some(error), Self::Json(error) => Some(error), - Self::IdCollision | Self::InvalidInput(_) => None, + Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None, } } } @@ -188,9 +282,45 @@ fn total_chunks(size: u64, chunk_size: u64) -> u64 { } } +fn expected_chunk_len(meta: &UploadMeta, index: u64) -> Result { + if index >= meta.total_chunks { + return Err(StorageError::InvalidInput("chunk index is out of range")); + } + + if index + 1 == meta.total_chunks { + let bytes_before_final = meta.chunk_size * index; + Ok(meta.size - bytes_before_final) + } else { + Ok(meta.chunk_size) + } +} + +async fn file_len(path: &Path) -> Result, StorageError> { + match fs::metadata(path).await { + Ok(metadata) if metadata.is_file() => Ok(Some(metadata.len())), + Ok(_) => Ok(None), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(error) => Err(error.into()), + } +} + +fn validate_upload_id(upload_id: &str) -> Result<(), StorageError> { + let is_valid = !upload_id.is_empty() + && upload_id + .bytes() + .all(|byte| byte.is_ascii_alphanumeric() || byte == b'-' || byte == b'_'); + + if is_valid { + Ok(()) + } else { + Err(StorageError::InvalidInput("upload id is invalid")) + } +} + #[cfg(test)] mod tests { - use super::{safe_file_name, total_chunks}; + use super::{expected_chunk_len, safe_file_name, total_chunks, validate_upload_id}; + use crate::model::UploadMeta; #[test] fn computes_total_chunks() { @@ -205,4 +335,36 @@ mod tests { assert_eq!(safe_file_name("../movie:mkv"), "movie_mkv"); assert_eq!(safe_file_name(" "), "upload"); } + + #[test] + fn computes_expected_chunk_lengths() -> Result<(), Box> { + let meta = test_meta(33, 16, 3); + + assert_eq!(expected_chunk_len(&meta, 0)?, 16); + assert_eq!(expected_chunk_len(&meta, 1)?, 16); + assert_eq!(expected_chunk_len(&meta, 2)?, 1); + assert!(expected_chunk_len(&meta, 3).is_err()); + + Ok(()) + } + + #[test] + fn validates_upload_ids() { + assert!(validate_upload_id("abc123").is_ok()); + assert!(validate_upload_id("../abc").is_err()); + assert!(validate_upload_id("").is_err()); + } + + fn test_meta(size: u64, chunk_size: u64, total_chunks: u64) -> UploadMeta { + UploadMeta { + id: "abc123".to_owned(), + original_name: "file.bin".to_owned(), + safe_name: "file.bin".to_owned(), + size, + last_modified: 0, + chunk_size, + total_chunks, + created_at: "2026-05-30T16:00:00Z".to_owned(), + } + } } diff --git a/tests/chunk_upload.rs b/tests/chunk_upload.rs new file mode 100644 index 0000000..c641386 --- /dev/null +++ b/tests/chunk_upload.rs @@ -0,0 +1,187 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + path::Path, +}; + +use axum::{ + body::Body, + http::{Method, Request, StatusCode, header}, +}; +use http_body_util::BodyExt; +use serde_json::json; +use tempfile::TempDir; +use tower::ServiceExt; +use upl::{ + app::{AppConfig, build_router}, + model::{CHUNK_SIZE, CreateUploadResponse, UploadProgressResponse}, +}; + +#[tokio::test] +async fn stores_chunks_and_reports_progress() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, temp_dir.path(), CHUNK_SIZE + 3).await?; + + let final_chunk = vec![b'z'; 3]; + let response = app + .clone() + .oneshot(chunk_request(&upload.upload_id, 1, final_chunk)?) + .await?; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + let progress = get_progress(&app, &upload.upload_id).await?; + assert_eq!(progress.completed_chunks, vec![1]); + + let first_chunk = vec![b'a'; usize::try_from(CHUNK_SIZE)?]; + let response = app + .clone() + .oneshot(chunk_request(&upload.upload_id, 0, first_chunk)?) + .await?; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + let progress = get_progress(&app, &upload.upload_id).await?; + assert_eq!(progress.completed_chunks, vec![0, 1]); + + let chunk_path = temp_dir + .path() + .join("staging") + .join(&upload.upload_id) + .join("chunks") + .join("000000.part"); + assert_eq!(tokio::fs::metadata(chunk_path).await?.len(), CHUNK_SIZE); + + Ok(()) +} + +#[tokio::test] +async fn rejects_wrong_size_non_final_chunk() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, temp_dir.path(), CHUNK_SIZE + 1).await?; + + let response = app + .oneshot(chunk_request(&upload.upload_id, 0, b"too short".to_vec())?) + .await?; + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + Ok(()) +} + +#[tokio::test] +async fn accepts_duplicate_chunk_when_existing_length_matches() +-> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, temp_dir.path(), 4).await?; + + let first = app + .clone() + .oneshot(chunk_request(&upload.upload_id, 0, b"data".to_vec())?) + .await?; + let second = app + .oneshot(chunk_request(&upload.upload_id, 0, b"data".to_vec())?) + .await?; + + assert_eq!(first.status(), StatusCode::NO_CONTENT); + assert_eq!(second.status(), StatusCode::NO_CONTENT); + + Ok(()) +} + +#[tokio::test] +async fn rejects_unknown_upload_id() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + + let response = app + .oneshot(chunk_request("missing", 0, b"data".to_vec())?) + .await?; + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + Ok(()) +} + +async fn create_upload( + app: &axum::Router, + data_dir: &Path, + size: u64, +) -> Result> { + let response = app + .clone() + .oneshot(json_request( + Method::POST, + "/api/uploads", + &json!({ + "name": "chunked.bin", + "size": size, + "last_modified": 1_760_000_000_000_i64 + }), + )?) + .await?; + + assert_eq!(response.status(), StatusCode::OK); + assert!(data_dir.join("staging").is_dir()); + + decode_body(response).await +} + +async fn get_progress( + app: &axum::Router, + upload_id: &str, +) -> Result> { + let response = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri(format!("/api/uploads/{upload_id}")) + .body(Body::empty())?, + ) + .await?; + + assert_eq!(response.status(), StatusCode::OK); + + decode_body(response).await +} + +fn test_app(data_dir: &Path) -> axum::Router { + build_router(&AppConfig::new( + SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), + concat!(env!("CARGO_MANIFEST_DIR"), "/static"), + data_dir, + )) +} + +fn json_request( + method: Method, + uri: &str, + body: &serde_json::Value, +) -> Result, Box> { + Ok(Request::builder() + .method(method) + .uri(uri) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_vec(body)?))?) +} + +fn chunk_request( + upload_id: &str, + index: u64, + body: Vec, +) -> Result, Box> { + Ok(Request::builder() + .method(Method::PUT) + .uri(format!("/api/uploads/{upload_id}/chunks/{index}")) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(Body::from(body))?) +} + +async fn decode_body(response: axum::response::Response) -> Result> +where + T: serde::de::DeserializeOwned, +{ + let body = response.into_body().collect().await?.to_bytes(); + Ok(serde_json::from_slice(&body)?) +}