feat: assemble completed uploads

Implement POST /api/uploads/{id}/complete. The storage layer now reloads upload
metadata, verifies that every expected chunk exists with the exact expected
length, concatenates chunks in order into a temporary final file, flushes it,
and renames it into data/complete only after assembly succeeds.

The endpoint preserves staging data after completion, rejects incomplete uploads
with a conflict response, and refuses to overwrite an existing completed file.
This keeps failed or duplicate completion attempts explicit rather than silently
clobbering local files.

Extend the model, router, documentation, and test checklist for completion
responses and add integration coverage for successful assembly, incomplete
uploads, staging preservation, and duplicate completion conflicts.

Test Plan:
- just check

Refs: PLAN.md milestone 8
This commit is contained in:
2026-05-30 17:02:59 +02:00
parent 1594c65d89
commit 5ca52b5780
7 changed files with 280 additions and 6 deletions
+1 -1
View File
@@ -19,7 +19,7 @@ upl
src/app.rs Axum router, shared state, static file service src/app.rs Axum router, shared state, static file service
src/api.rs HTTP handlers and API error responses src/api.rs HTTP handlers and API error responses
src/model.rs JSON request, response, and metadata shapes src/model.rs JSON request, response, and metadata shapes
src/storage.rs local filesystem layout, metadata, and chunk writes src/storage.rs local filesystem layout, chunks, and assembly
src/lib.rs library surface used by integration tests src/lib.rs library surface used by integration tests
Browser UI Browser UI
static/index.html upload tool markup static/index.html upload tool markup
+2
View File
@@ -16,6 +16,8 @@ Keep this file as the reusable verification checklist while implementing
- `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks. - `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks.
- `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks. - `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks.
- `GET /api/uploads/:id` reports completed chunks from disk. - `GET /api/uploads/:id` reports completed chunks from disk.
- `POST /api/uploads/:id/complete` assembles verified chunks.
- `POST /api/uploads/:id/complete` rejects incomplete uploads.
## Manual ## Manual
+16 -1
View File
@@ -10,7 +10,7 @@ use serde::Serialize;
use crate::{ use crate::{
app::AppState, app::AppState,
model::{CreateUploadRequest, CreateUploadResponse}, model::{CompleteUploadResponse, CreateUploadRequest, CreateUploadResponse},
storage::StorageError, storage::StorageError,
}; };
@@ -56,6 +56,19 @@ pub async fn put_chunk(
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
/// Assembles uploaded chunks into the final completed file.
///
/// # Errors
///
/// Returns an API error when the upload is unknown, incomplete, invalid, or
/// cannot be assembled on disk.
pub async fn complete_upload(
State(state): State<AppState>,
Path(upload_id): Path<String>,
) -> Result<Json<CompleteUploadResponse>, ApiError> {
Ok(Json(state.storage.complete_upload(&upload_id).await?))
}
#[derive(Debug)] #[derive(Debug)]
pub struct ApiError { pub struct ApiError {
status: StatusCode, status: StatusCode,
@@ -78,6 +91,8 @@ impl From<StorageError> for ApiError {
fn from(error: StorageError) -> Self { fn from(error: StorageError) -> Self {
let status = if error.is_not_found() { let status = if error.is_not_found() {
StatusCode::NOT_FOUND StatusCode::NOT_FOUND
} else if error.is_conflict() {
StatusCode::CONFLICT
} else if error.is_invalid_input() { } else if error.is_invalid_input() {
StatusCode::BAD_REQUEST StatusCode::BAD_REQUEST
} else { } else {
+4
View File
@@ -75,6 +75,10 @@ pub fn build_router(config: &AppConfig) -> Router {
.route("/healthz", get(healthz)) .route("/healthz", get(healthz))
.route("/api/uploads", post(api::create_upload)) .route("/api/uploads", post(api::create_upload))
.route("/api/uploads/{upload_id}", get(api::get_upload)) .route("/api/uploads/{upload_id}", get(api::get_upload))
.route(
"/api/uploads/{upload_id}/complete",
post(api::complete_upload),
)
.route( .route(
"/api/uploads/{upload_id}/chunks/{index}", "/api/uploads/{upload_id}/chunks/{index}",
axum::routing::put(api::put_chunk), axum::routing::put(api::put_chunk),
+16
View File
@@ -27,6 +27,13 @@ pub struct UploadProgressResponse {
pub completed_chunks: Vec<u64>, pub completed_chunks: Vec<u64>,
} }
#[derive(Debug, Deserialize, Serialize)]
pub struct CompleteUploadResponse {
pub upload_id: String,
pub name: String,
pub file_path: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct UploadMeta { pub struct UploadMeta {
pub id: String, pub id: String,
@@ -61,4 +68,13 @@ impl UploadMeta {
completed_chunks, completed_chunks,
} }
} }
#[must_use]
pub fn complete_response(&self, file_path: String) -> CompleteUploadResponse {
CompleteUploadResponse {
upload_id: self.id.clone(),
name: self.safe_name.clone(),
file_path,
}
}
} }
+73 -4
View File
@@ -5,10 +5,12 @@ use std::{
}; };
use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use tokio::fs; use tokio::{fs, io::AsyncWriteExt};
use uuid::Uuid; use uuid::Uuid;
use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta, UploadProgressResponse}; use crate::model::{
CHUNK_SIZE, CompleteUploadResponse, CreateUploadRequest, UploadMeta, UploadProgressResponse,
};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Storage { pub struct Storage {
@@ -123,6 +125,52 @@ impl Storage {
Ok(()) Ok(())
} }
/// Assembles a complete upload from verified chunk files.
///
/// # Errors
///
/// Returns an error when the upload is unknown, any expected chunk is
/// missing or has the wrong length, the final file already exists, or the
/// assembled file cannot be written and renamed.
pub async fn complete_upload(
&self,
upload_id: &str,
) -> Result<CompleteUploadResponse, StorageError> {
let meta = self.load_meta(upload_id).await?;
self.verify_all_chunks(&meta).await?;
let final_path = self.complete_dir().join(&meta.safe_name);
if fs::try_exists(&final_path).await? {
return Err(StorageError::Conflict("complete file already exists"));
}
let tmp_path = self
.complete_dir()
.join(format!(".{}.{}.tmp", meta.safe_name, meta.id));
if fs::try_exists(&tmp_path).await? {
fs::remove_file(&tmp_path).await?;
}
let mut output = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
.await?;
for index in 0..meta.total_chunks {
let bytes = fs::read(self.chunk_path(upload_id, index)).await?;
output.write_all(&bytes).await?;
}
output.flush().await?;
drop(output);
fs::rename(&tmp_path, &final_path).await?;
Ok(meta.complete_response(final_path.display().to_string()))
}
#[must_use] #[must_use]
pub fn data_dir(&self) -> &Path { pub fn data_dir(&self) -> &Path {
&self.data_dir &self.data_dir
@@ -190,10 +238,26 @@ impl Storage {
Ok(completed) Ok(completed)
} }
async fn verify_all_chunks(&self, meta: &UploadMeta) -> Result<(), StorageError> {
for index in 0..meta.total_chunks {
let expected_len = expected_chunk_len(meta, index)?;
let actual_len = file_len(&self.chunk_path(&meta.id, index)).await?;
if actual_len != Some(expected_len) {
return Err(StorageError::Conflict(
"upload is missing one or more complete chunks",
));
}
}
Ok(())
}
} }
#[derive(Debug)] #[derive(Debug)]
pub enum StorageError { pub enum StorageError {
Conflict(&'static str),
Format(time::error::Format), Format(time::error::Format),
IdCollision, IdCollision,
InvalidInput(&'static str), InvalidInput(&'static str),
@@ -212,6 +276,11 @@ impl StorageError {
pub fn is_not_found(&self) -> bool { pub fn is_not_found(&self) -> bool {
matches!(self, Self::NotFound) matches!(self, Self::NotFound)
} }
#[must_use]
pub fn is_conflict(&self) -> bool {
matches!(self, Self::Conflict(_))
}
} }
impl Display for StorageError { impl Display for StorageError {
@@ -219,7 +288,7 @@ impl Display for StorageError {
match self { match self {
Self::Format(error) => write!(formatter, "failed to format timestamp: {error}"), Self::Format(error) => write!(formatter, "failed to format timestamp: {error}"),
Self::IdCollision => formatter.write_str("could not allocate a unique upload id"), Self::IdCollision => formatter.write_str("could not allocate a unique upload id"),
Self::InvalidInput(message) => formatter.write_str(message), Self::Conflict(message) | Self::InvalidInput(message) => formatter.write_str(message),
Self::Io(error) => write!(formatter, "storage I/O error: {error}"), Self::Io(error) => write!(formatter, "storage I/O error: {error}"),
Self::Json(error) => write!(formatter, "metadata JSON error: {error}"), Self::Json(error) => write!(formatter, "metadata JSON error: {error}"),
Self::NotFound => formatter.write_str("upload not found"), Self::NotFound => formatter.write_str("upload not found"),
@@ -233,7 +302,7 @@ impl Error for StorageError {
Self::Format(error) => Some(error), Self::Format(error) => Some(error),
Self::Io(error) => Some(error), Self::Io(error) => Some(error),
Self::Json(error) => Some(error), Self::Json(error) => Some(error),
Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None, Self::Conflict(_) | Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None,
} }
} }
} }
+168
View File
@@ -0,0 +1,168 @@
use std::{
net::{Ipv4Addr, SocketAddr},
path::Path,
};
use axum::{
body::Body,
http::{Method, Request, StatusCode, header},
};
use http_body_util::BodyExt;
use serde_json::json;
use tempfile::TempDir;
use tower::ServiceExt;
use upl::{
app::{AppConfig, build_router},
model::{CHUNK_SIZE, CompleteUploadResponse, CreateUploadResponse},
};
#[tokio::test]
async fn assembles_completed_upload() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, "hello.txt", 11).await?;
let response = app
.clone()
.oneshot(chunk_request(
&upload.upload_id,
0,
b"hello world".to_vec(),
)?)
.await?;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let response = app
.clone()
.oneshot(empty_request(
Method::POST,
&format!("/api/uploads/{}/complete", upload.upload_id),
)?)
.await?;
assert_eq!(response.status(), StatusCode::OK);
let complete: CompleteUploadResponse = decode_body(response).await?;
assert_eq!(complete.name, "hello.txt");
assert_eq!(
tokio::fs::read(temp_dir.path().join("complete").join("hello.txt")).await?,
b"hello world"
);
assert!(
temp_dir
.path()
.join("staging")
.join(&upload.upload_id)
.is_dir()
);
let duplicate = app
.oneshot(empty_request(
Method::POST,
&format!("/api/uploads/{}/complete", upload.upload_id),
)?)
.await?;
assert_eq!(duplicate.status(), StatusCode::CONFLICT);
Ok(())
}
#[tokio::test]
async fn rejects_incomplete_upload() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, "partial.bin", CHUNK_SIZE + 1).await?;
let response = app
.clone()
.oneshot(chunk_request(&upload.upload_id, 1, b"x".to_vec())?)
.await?;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let response = app
.oneshot(empty_request(
Method::POST,
&format!("/api/uploads/{}/complete", upload.upload_id),
)?)
.await?;
assert_eq!(response.status(), StatusCode::CONFLICT);
assert!(
!temp_dir
.path()
.join("complete")
.join("partial.bin")
.exists()
);
Ok(())
}
async fn create_upload(
app: &axum::Router,
name: &str,
size: u64,
) -> Result<CreateUploadResponse, Box<dyn std::error::Error>> {
let response = app
.clone()
.oneshot(json_request(
Method::POST,
"/api/uploads",
&json!({
"name": name,
"size": size,
"last_modified": 1_760_000_000_000_i64
}),
)?)
.await?;
assert_eq!(response.status(), StatusCode::OK);
decode_body(response).await
}
fn test_app(data_dir: &Path) -> axum::Router {
build_router(&AppConfig::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
concat!(env!("CARGO_MANIFEST_DIR"), "/static"),
data_dir,
))
}
fn json_request(
method: Method,
uri: &str,
body: &serde_json::Value,
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
Ok(Request::builder()
.method(method)
.uri(uri)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(serde_json::to_vec(body)?))?)
}
fn chunk_request(
upload_id: &str,
index: u64,
body: Vec<u8>,
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
Ok(Request::builder()
.method(Method::PUT)
.uri(format!("/api/uploads/{upload_id}/chunks/{index}"))
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(Body::from(body))?)
}
fn empty_request(method: Method, uri: &str) -> Result<Request<Body>, Box<dyn std::error::Error>> {
Ok(Request::builder()
.method(method)
.uri(uri)
.body(Body::empty())?)
}
async fn decode_body<T>(response: axum::response::Response) -> Result<T, Box<dyn std::error::Error>>
where
T: serde::de::DeserializeOwned,
{
let body = response.into_body().collect().await?.to_bytes();
Ok(serde_json::from_slice(&body)?)
}