diff --git a/README.md b/README.md index 1a63d01..70d242f 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ upl src/app.rs Axum router, shared state, static file service src/api.rs HTTP handlers and API error responses src/model.rs JSON request, response, and metadata shapes - src/storage.rs local filesystem layout, metadata, and chunk writes + src/storage.rs local filesystem layout, chunks, and assembly src/lib.rs library surface used by integration tests Browser UI static/index.html upload tool markup diff --git a/TESTS.md b/TESTS.md index 97ac13b..693921b 100644 --- a/TESTS.md +++ b/TESTS.md @@ -16,6 +16,8 @@ Keep this file as the reusable verification checklist while implementing - `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks. - `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks. - `GET /api/uploads/:id` reports completed chunks from disk. + - `POST /api/uploads/:id/complete` assembles verified chunks. + - `POST /api/uploads/:id/complete` rejects incomplete uploads. ## Manual diff --git a/src/api.rs b/src/api.rs index 4668a62..b4276b0 100644 --- a/src/api.rs +++ b/src/api.rs @@ -10,7 +10,7 @@ use serde::Serialize; use crate::{ app::AppState, - model::{CreateUploadRequest, CreateUploadResponse}, + model::{CompleteUploadResponse, CreateUploadRequest, CreateUploadResponse}, storage::StorageError, }; @@ -56,6 +56,19 @@ pub async fn put_chunk( Ok(StatusCode::NO_CONTENT) } +/// Assembles uploaded chunks into the final completed file. +/// +/// # Errors +/// +/// Returns an API error when the upload is unknown, incomplete, invalid, or +/// cannot be assembled on disk. +pub async fn complete_upload( + State(state): State, + Path(upload_id): Path, +) -> Result, ApiError> { + Ok(Json(state.storage.complete_upload(&upload_id).await?)) +} + #[derive(Debug)] pub struct ApiError { status: StatusCode, @@ -78,6 +91,8 @@ impl From for ApiError { fn from(error: StorageError) -> Self { let status = if error.is_not_found() { StatusCode::NOT_FOUND + } else if error.is_conflict() { + StatusCode::CONFLICT } else if error.is_invalid_input() { StatusCode::BAD_REQUEST } else { diff --git a/src/app.rs b/src/app.rs index 3627524..36711cd 100644 --- a/src/app.rs +++ b/src/app.rs @@ -75,6 +75,10 @@ pub fn build_router(config: &AppConfig) -> Router { .route("/healthz", get(healthz)) .route("/api/uploads", post(api::create_upload)) .route("/api/uploads/{upload_id}", get(api::get_upload)) + .route( + "/api/uploads/{upload_id}/complete", + post(api::complete_upload), + ) .route( "/api/uploads/{upload_id}/chunks/{index}", axum::routing::put(api::put_chunk), diff --git a/src/model.rs b/src/model.rs index be94724..0f4fe6c 100644 --- a/src/model.rs +++ b/src/model.rs @@ -27,6 +27,13 @@ pub struct UploadProgressResponse { pub completed_chunks: Vec, } +#[derive(Debug, Deserialize, Serialize)] +pub struct CompleteUploadResponse { + pub upload_id: String, + pub name: String, + pub file_path: String, +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct UploadMeta { pub id: String, @@ -61,4 +68,13 @@ impl UploadMeta { completed_chunks, } } + + #[must_use] + pub fn complete_response(&self, file_path: String) -> CompleteUploadResponse { + CompleteUploadResponse { + upload_id: self.id.clone(), + name: self.safe_name.clone(), + file_path, + } + } } diff --git a/src/storage.rs b/src/storage.rs index b62ae1a..7f3880c 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -5,10 +5,12 @@ use std::{ }; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; -use tokio::fs; +use tokio::{fs, io::AsyncWriteExt}; use uuid::Uuid; -use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta, UploadProgressResponse}; +use crate::model::{ + CHUNK_SIZE, CompleteUploadResponse, CreateUploadRequest, UploadMeta, UploadProgressResponse, +}; #[derive(Clone, Debug)] pub struct Storage { @@ -123,6 +125,52 @@ impl Storage { Ok(()) } + /// Assembles a complete upload from verified chunk files. + /// + /// # Errors + /// + /// Returns an error when the upload is unknown, any expected chunk is + /// missing or has the wrong length, the final file already exists, or the + /// assembled file cannot be written and renamed. + pub async fn complete_upload( + &self, + upload_id: &str, + ) -> Result { + let meta = self.load_meta(upload_id).await?; + + self.verify_all_chunks(&meta).await?; + + let final_path = self.complete_dir().join(&meta.safe_name); + if fs::try_exists(&final_path).await? { + return Err(StorageError::Conflict("complete file already exists")); + } + + let tmp_path = self + .complete_dir() + .join(format!(".{}.{}.tmp", meta.safe_name, meta.id)); + if fs::try_exists(&tmp_path).await? { + fs::remove_file(&tmp_path).await?; + } + + let mut output = fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(&tmp_path) + .await?; + + for index in 0..meta.total_chunks { + let bytes = fs::read(self.chunk_path(upload_id, index)).await?; + output.write_all(&bytes).await?; + } + + output.flush().await?; + drop(output); + + fs::rename(&tmp_path, &final_path).await?; + + Ok(meta.complete_response(final_path.display().to_string())) + } + #[must_use] pub fn data_dir(&self) -> &Path { &self.data_dir @@ -190,10 +238,26 @@ impl Storage { Ok(completed) } + + async fn verify_all_chunks(&self, meta: &UploadMeta) -> Result<(), StorageError> { + for index in 0..meta.total_chunks { + let expected_len = expected_chunk_len(meta, index)?; + let actual_len = file_len(&self.chunk_path(&meta.id, index)).await?; + + if actual_len != Some(expected_len) { + return Err(StorageError::Conflict( + "upload is missing one or more complete chunks", + )); + } + } + + Ok(()) + } } #[derive(Debug)] pub enum StorageError { + Conflict(&'static str), Format(time::error::Format), IdCollision, InvalidInput(&'static str), @@ -212,6 +276,11 @@ impl StorageError { pub fn is_not_found(&self) -> bool { matches!(self, Self::NotFound) } + + #[must_use] + pub fn is_conflict(&self) -> bool { + matches!(self, Self::Conflict(_)) + } } impl Display for StorageError { @@ -219,7 +288,7 @@ impl Display for StorageError { match self { Self::Format(error) => write!(formatter, "failed to format timestamp: {error}"), Self::IdCollision => formatter.write_str("could not allocate a unique upload id"), - Self::InvalidInput(message) => formatter.write_str(message), + Self::Conflict(message) | Self::InvalidInput(message) => formatter.write_str(message), Self::Io(error) => write!(formatter, "storage I/O error: {error}"), Self::Json(error) => write!(formatter, "metadata JSON error: {error}"), Self::NotFound => formatter.write_str("upload not found"), @@ -233,7 +302,7 @@ impl Error for StorageError { Self::Format(error) => Some(error), Self::Io(error) => Some(error), Self::Json(error) => Some(error), - Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None, + Self::Conflict(_) | Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None, } } } diff --git a/tests/completion.rs b/tests/completion.rs new file mode 100644 index 0000000..0380bdb --- /dev/null +++ b/tests/completion.rs @@ -0,0 +1,168 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + path::Path, +}; + +use axum::{ + body::Body, + http::{Method, Request, StatusCode, header}, +}; +use http_body_util::BodyExt; +use serde_json::json; +use tempfile::TempDir; +use tower::ServiceExt; +use upl::{ + app::{AppConfig, build_router}, + model::{CHUNK_SIZE, CompleteUploadResponse, CreateUploadResponse}, +}; + +#[tokio::test] +async fn assembles_completed_upload() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, "hello.txt", 11).await?; + + let response = app + .clone() + .oneshot(chunk_request( + &upload.upload_id, + 0, + b"hello world".to_vec(), + )?) + .await?; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + let response = app + .clone() + .oneshot(empty_request( + Method::POST, + &format!("/api/uploads/{}/complete", upload.upload_id), + )?) + .await?; + assert_eq!(response.status(), StatusCode::OK); + + let complete: CompleteUploadResponse = decode_body(response).await?; + assert_eq!(complete.name, "hello.txt"); + assert_eq!( + tokio::fs::read(temp_dir.path().join("complete").join("hello.txt")).await?, + b"hello world" + ); + assert!( + temp_dir + .path() + .join("staging") + .join(&upload.upload_id) + .is_dir() + ); + + let duplicate = app + .oneshot(empty_request( + Method::POST, + &format!("/api/uploads/{}/complete", upload.upload_id), + )?) + .await?; + assert_eq!(duplicate.status(), StatusCode::CONFLICT); + + Ok(()) +} + +#[tokio::test] +async fn rejects_incomplete_upload() -> Result<(), Box> { + let temp_dir = TempDir::new()?; + let app = test_app(temp_dir.path()); + let upload = create_upload(&app, "partial.bin", CHUNK_SIZE + 1).await?; + + let response = app + .clone() + .oneshot(chunk_request(&upload.upload_id, 1, b"x".to_vec())?) + .await?; + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + let response = app + .oneshot(empty_request( + Method::POST, + &format!("/api/uploads/{}/complete", upload.upload_id), + )?) + .await?; + + assert_eq!(response.status(), StatusCode::CONFLICT); + assert!( + !temp_dir + .path() + .join("complete") + .join("partial.bin") + .exists() + ); + + Ok(()) +} + +async fn create_upload( + app: &axum::Router, + name: &str, + size: u64, +) -> Result> { + let response = app + .clone() + .oneshot(json_request( + Method::POST, + "/api/uploads", + &json!({ + "name": name, + "size": size, + "last_modified": 1_760_000_000_000_i64 + }), + )?) + .await?; + + assert_eq!(response.status(), StatusCode::OK); + + decode_body(response).await +} + +fn test_app(data_dir: &Path) -> axum::Router { + build_router(&AppConfig::new( + SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), + concat!(env!("CARGO_MANIFEST_DIR"), "/static"), + data_dir, + )) +} + +fn json_request( + method: Method, + uri: &str, + body: &serde_json::Value, +) -> Result, Box> { + Ok(Request::builder() + .method(method) + .uri(uri) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_vec(body)?))?) +} + +fn chunk_request( + upload_id: &str, + index: u64, + body: Vec, +) -> Result, Box> { + Ok(Request::builder() + .method(Method::PUT) + .uri(format!("/api/uploads/{upload_id}/chunks/{index}")) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(Body::from(body))?) +} + +fn empty_request(method: Method, uri: &str) -> Result, Box> { + Ok(Request::builder() + .method(method) + .uri(uri) + .body(Body::empty())?) +} + +async fn decode_body(response: axum::response::Response) -> Result> +where + T: serde::de::DeserializeOwned, +{ + let body = response.into_body().collect().await?.to_bytes(); + Ok(serde_json::from_slice(&body)?) +}