From 5ca52b5780b9dbb9f6029ff7085a86e177cf98c5 Mon Sep 17 00:00:00 2001 From: ddidderr Date: Sat, 30 May 2026 17:02:59 +0200 Subject: [PATCH] feat: assemble completed uploads Implement POST /api/uploads/{id}/complete. The storage layer now reloads upload metadata, verifies that every expected chunk exists with the exact expected length, concatenates chunks in order into a temporary final file, flushes it, and renames it into data/complete only after assembly succeeds. The endpoint preserves staging data after completion, rejects incomplete uploads with a conflict response, and refuses to overwrite an existing completed file. This keeps failed or duplicate completion attempts explicit rather than silently clobbering local files. Extend the model, router, documentation, and test checklist for completion responses and add integration coverage for successful assembly, incomplete uploads, staging preservation, and duplicate completion conflicts. Test Plan: - just check Refs: PLAN.md milestone 8 --- README.md | 2 +- TESTS.md | 2 + src/api.rs | 17 ++++- src/app.rs | 4 ++ src/model.rs | 16 +++++ src/storage.rs | 77 ++++++++++++++++++-- tests/completion.rs | 168 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 280 insertions(+), 6 deletions(-) create mode 100644 tests/completion.rs diff --git a/README.md b/README.md index 1a63d01..70d242f 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ upl src/app.rs Axum router, shared state, static file service src/api.rs HTTP handlers and API error responses src/model.rs JSON request, response, and metadata shapes - src/storage.rs local filesystem layout, metadata, and chunk writes + src/storage.rs local filesystem layout, chunks, and assembly src/lib.rs library surface used by integration tests Browser UI static/index.html upload tool markup diff --git a/TESTS.md b/TESTS.md index 97ac13b..693921b 100644 --- a/TESTS.md +++ b/TESTS.md @@ -16,6 +16,8 @@ Keep this file as the reusable verification checklist while implementing - `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks. - `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks. - `GET /api/uploads/:id` reports completed chunks from disk. + - `POST /api/uploads/:id/complete` assembles verified chunks. + - `POST /api/uploads/:id/complete` rejects incomplete uploads. ## Manual diff --git a/src/api.rs b/src/api.rs index 4668a62..b4276b0 100644 --- a/src/api.rs +++ b/src/api.rs @@ -10,7 +10,7 @@ use serde::Serialize; use crate::{ app::AppState, - model::{CreateUploadRequest, CreateUploadResponse}, + model::{CompleteUploadResponse, CreateUploadRequest, CreateUploadResponse}, storage::StorageError, }; @@ -56,6 +56,19 @@ pub async fn put_chunk( Ok(StatusCode::NO_CONTENT) } +/// Assembles uploaded chunks into the final completed file. +/// +/// # Errors +/// +/// Returns an API error when the upload is unknown, incomplete, invalid, or +/// cannot be assembled on disk. +pub async fn complete_upload( + State(state): State, + Path(upload_id): Path, +) -> Result, ApiError> { + Ok(Json(state.storage.complete_upload(&upload_id).await?)) +} + #[derive(Debug)] pub struct ApiError { status: StatusCode, @@ -78,6 +91,8 @@ impl From for ApiError { fn from(error: StorageError) -> Self { let status = if error.is_not_found() { StatusCode::NOT_FOUND + } else if error.is_conflict() { + StatusCode::CONFLICT } else if error.is_invalid_input() { StatusCode::BAD_REQUEST } else { diff --git a/src/app.rs b/src/app.rs index 3627524..36711cd 100644 --- a/src/app.rs +++ b/src/app.rs @@ -75,6 +75,10 @@ pub fn build_router(config: &AppConfig) -> Router { .route("/healthz", get(healthz)) .route("/api/uploads", post(api::create_upload)) .route("/api/uploads/{upload_id}", get(api::get_upload)) + .route( + "/api/uploads/{upload_id}/complete", + post(api::complete_upload), + ) .route( "/api/uploads/{upload_id}/chunks/{index}", axum::routing::put(api::put_chunk), diff --git a/src/model.rs b/src/model.rs index be94724..0f4fe6c 100644 --- a/src/model.rs +++ b/src/model.rs @@ -27,6 +27,13 @@ pub struct UploadProgressResponse { pub completed_chunks: Vec, } +#[derive(Debug, Deserialize, Serialize)] +pub struct CompleteUploadResponse { + pub upload_id: String, + pub name: String, + pub file_path: String, +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct UploadMeta { pub id: String, @@ -61,4 +68,13 @@ impl UploadMeta { completed_chunks, } } + + #[must_use] + pub fn complete_response(&self, file_path: String) -> CompleteUploadResponse { + CompleteUploadResponse { + upload_id: self.id.clone(), + name: self.safe_name.clone(), + file_path, + } + } } diff --git a/src/storage.rs b/src/storage.rs index b62ae1a..7f3880c 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -5,10 +5,12 @@ use std::{ }; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; -use tokio::fs; +use tokio::{fs, io::AsyncWriteExt}; use uuid::Uuid; -use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta, UploadProgressResponse}; +use crate::model::{ + CHUNK_SIZE, CompleteUploadResponse, CreateUploadRequest, UploadMeta, UploadProgressResponse, +}; #[derive(Clone, Debug)] pub struct Storage { @@ -123,6 +125,52 @@ impl Storage { Ok(()) } + /// Assembles a complete upload from verified chunk files. + /// + /// # Errors + /// + /// Returns an error when the upload is unknown, any expected chunk is + /// missing or has the wrong length, the final file already exists, or the + /// assembled file cannot be written and renamed. + pub async fn complete_upload( + &self, + upload_id: &str, + ) -> Result { + let meta = self.load_meta(upload_id).await?; + + self.verify_all_chunks(&meta).await?; + + let final_path = self.complete_dir().join(&meta.safe_name); + if fs::try_exists(&final_path).await? { + return Err(StorageError::Conflict("complete file already exists")); + } + + let tmp_path = self + .complete_dir() + .join(format!(".{}.{}.tmp", meta.safe_name, meta.id)); + if fs::try_exists(&tmp_path).await? { + fs::remove_file(&tmp_path).await?; + } + + let mut output = fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(&tmp_path) + .await?; + + for index in 0..meta.total_chunks { + let bytes = fs::read(self.chunk_path(upload_id, index)).await?; + output.write_all(&bytes).await?; + } + + output.flush().await?; + drop(output); + + fs::rename(&tmp_path, &final_path).await?; + + Ok(meta.complete_response(final_path.display().to_string())) + } + #[must_use] pub fn data_dir(&self) -> &Path { &self.data_dir @@ -190,10 +238,26 @@ impl Storage { Ok(completed) } + + async fn verify_all_chunks(&self, meta: &UploadMeta) -> Result<(), StorageError> { + for index in 0..meta.total_chunks { + let expected_len = expected_chunk_len(meta, index)?; + let actual_len = file_len(&self.chunk_path(&meta.id, index)).await?; + + if actual_len != Some(expected_len) { + return Err(StorageError::Conflict( + "upload is missing one or more complete chunks", + )); + } + } + + Ok(()) + } } #[derive(Debug)] pub enum StorageError { + Conflict(&'static str), Format(time::error::Format), IdCollision, InvalidInput(&'static str), @@ -212,6 +276,11 @@ impl StorageError { pub fn is_not_found(&self) -> bool { matches!(self, Self::NotFound) } + + #[must_use] + pub fn is_conflict(&self) -> bool { + matches!(self, Self::Conflict(_)) + } } impl Display for StorageError { @@ -219,7 +288,7 @@ impl Display for StorageError { match self { Self::Format(error) => write!(formatter, "failed to format timestamp: {error}"), Self::IdCollision => formatter.write_str("could not allocate a unique upload id"), - Self::InvalidInput(message) => formatter.write_str(message), + Self::Conflict(message) | Self::InvalidInput(message) => formatter.write_str(message), Self::Io(error) => write!(formatter, "storage I/O error: {error}"), Self::Json(error) => write!(formatter, "metadata JSON error: {error}"), Self::NotFound => formatter.write_str("upload not found"), @@ -233,7 +302,7 @@ impl Error for StorageError { Self::Format(error) => Some(error), Self::Io(error) => Some(error), Self::Json(error) => Some(error), - Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None, + Self::Conflict(_) | Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None, } } } diff --git a/tests/completion.rs b/tests/completion.rs new file mode 100644 index 0000000..0380bdb --- /dev/null +++ b/tests/completion.rs @@ -0,0 +1,168 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + path::Path, +}; + +use axum::{ + body::Body, + http::{Method, Request, StatusCode, header}, +}; +use http_body_util::BodyExt; +use serde_json::json; +use tempfile::TempDir; +use tower::ServiceExt; +use upl::{ + app::{AppConfig, build_router}, + model::{CHUNK_SIZE, CompleteUploadResponse, CreateUploadResponse}, +}; + +#[tokio::test] +async fn assembles_completed_upload() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, "hello.txt", 11).await?; + + let response = app + .clone() + .oneshot(chunk_request( + &upload.upload_id, + 0, + b"hello world".to_vec(), + )?) + .await?; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + let response = app + .clone() + .oneshot(empty_request( + Method::POST, + &format!("/api/uploads/{}/complete", upload.upload_id), + )?) + .await?; + assert_eq!(response.status(), StatusCode::OK); + + let complete: CompleteUploadResponse = decode_body(response).await?; + assert_eq!(complete.name, "hello.txt"); + assert_eq!( + tokio::fs::read(temp_dir.path().join("complete").join("hello.txt")).await?, + b"hello world" + ); + assert!( + temp_dir + .path() + .join("staging") + .join(&upload.upload_id) + .is_dir() + ); + + let duplicate = app + .oneshot(empty_request( + Method::POST, + &format!("/api/uploads/{}/complete", upload.upload_id), + )?) + .await?; + assert_eq!(duplicate.status(), StatusCode::CONFLICT); + + Ok(()) +} + +#[tokio::test] +async fn rejects_incomplete_upload() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, "partial.bin", CHUNK_SIZE + 1).await?; + + let response = app + .clone() + .oneshot(chunk_request(&upload.upload_id, 1, b"x".to_vec())?) + .await?; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + let response = app + .oneshot(empty_request( + Method::POST, + &format!("/api/uploads/{}/complete", upload.upload_id), + )?) + .await?; + + assert_eq!(response.status(), StatusCode::CONFLICT); + assert!( + !temp_dir + .path() + .join("complete") + .join("partial.bin") + .exists() + ); + + Ok(()) +} + +async fn create_upload( + app: &axum::Router, + name: &str, + size: u64, +) -> Result> { + let response = app + .clone() + .oneshot(json_request( + Method::POST, + "/api/uploads", + &json!({ + "name": name, + "size": size, + "last_modified": 1_760_000_000_000_i64 + }), + )?) + .await?; + + assert_eq!(response.status(), StatusCode::OK); + + decode_body(response).await +} + +fn test_app(data_dir: &Path) -> axum::Router { + build_router(&AppConfig::new( + SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), + concat!(env!("CARGO_MANIFEST_DIR"), "/static"), + data_dir, + )) +} + +fn json_request( + method: Method, + uri: &str, + body: &serde_json::Value, +) -> Result, Box> { + Ok(Request::builder() + .method(method) + .uri(uri) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_vec(body)?))?) +} + +fn chunk_request( + upload_id: &str, + index: u64, + body: Vec, +) -> Result, Box> { + Ok(Request::builder() + .method(Method::PUT) + .uri(format!("/api/uploads/{upload_id}/chunks/{index}")) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(Body::from(body))?) +} + +fn empty_request(method: Method, uri: &str) -> Result, Box> { + Ok(Request::builder() + .method(method) + .uri(uri) + .body(Body::empty())?) +} + +async fn decode_body(response: axum::response::Response) -> Result> +where + T: serde::de::DeserializeOwned, +{ + let body = response.into_body().collect().await?.to_bytes(); + Ok(serde_json::from_slice(&body)?) +}