feat: store raw upload chunks

Add the chunk upload and progress APIs from PLAN.md. PUT
/api/uploads/{id}/chunks/{index} now accepts raw octet-stream bodies, validates
unknown upload ids, out-of-range chunk indexes, and exact chunk lengths, then
writes through a temporary .part.tmp path before renaming the completed chunk
into place. Re-uploading an already-complete chunk is idempotent when the
existing file length matches the expected length.

GET /api/uploads/{id} now reports server-authoritative progress by scanning the
chunk directory and only counting chunk files whose lengths match metadata. The
router also raises Axum's request body limit to 64 MiB so the planned 16 MiB
chunks can reach the handler, matching the nginx deployment guidance.

Document the chunk storage responsibility and extend the reusable test checklist
with the new progress and validation coverage.

Test Plan:
- just check

Refs: PLAN.md milestones 3 and 4
This commit is contained in:
2026-05-30 17:00:42 +02:00
parent 24ecdbd251
commit 1594c65d89
7 changed files with 422 additions and 5 deletions
+3 -1
View File
@@ -19,7 +19,7 @@ upl
src/app.rs Axum router, shared state, static file service
src/api.rs HTTP handlers and API error responses
src/model.rs JSON request, response, and metadata shapes
src/storage.rs local filesystem layout and durable metadata writes
src/storage.rs local filesystem layout, metadata, and chunk writes
src/lib.rs library surface used by integration tests
Browser UI
static/index.html upload tool markup
@@ -37,6 +37,8 @@ upl
inside this repository.
- `UPL_DATA_DIR` sets the upload data directory. It defaults to `data/` inside
this repository.
- The server accepts request bodies up to 64 MiB, which leaves room for the
planned 16 MiB upload chunks and matches the nginx example in `PLAN.md`.
## Common Commands
+4
View File
@@ -12,6 +12,10 @@ Keep this file as the reusable verification checklist while implementing
- `GET /healthz` reports `ok`.
- `POST /api/uploads` creates `meta.json` and chunk directories.
- `POST /api/uploads` rejects an empty file name.
- `PUT /api/uploads/:id/chunks/:index` stores validated chunk files.
- `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks.
- `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks.
- `GET /api/uploads/:id` reports completed chunks from disk.
## Manual
+33 -1
View File
@@ -1,5 +1,7 @@
use axum::{
Json,
body::Bytes,
extract::Path,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
@@ -26,6 +28,34 @@ pub async fn create_upload(
Ok(Json(meta.create_response()))
}
/// Returns server-authoritative upload progress.
///
/// # Errors
///
/// Returns an API error when the upload id is invalid, unknown, or cannot be
/// read from storage.
pub async fn get_upload(
State(state): State<AppState>,
Path(upload_id): Path<String>,
) -> Result<Json<crate::model::UploadProgressResponse>, ApiError> {
Ok(Json(state.storage.progress(&upload_id).await?))
}
/// Stores one raw binary chunk body.
///
/// # Errors
///
/// Returns an API error when the upload id is invalid or unknown, the chunk
/// index is out of range, the body length is wrong, or the write fails.
pub async fn put_chunk(
State(state): State<AppState>,
Path((upload_id, index)): Path<(String, u64)>,
body: Bytes,
) -> Result<StatusCode, ApiError> {
state.storage.store_chunk(&upload_id, index, &body).await?;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug)]
pub struct ApiError {
status: StatusCode,
@@ -46,7 +76,9 @@ impl IntoResponse for ApiError {
impl From<StorageError> for ApiError {
fn from(error: StorageError) -> Self {
let status = if error.is_invalid_input() {
let status = if error.is_not_found() {
StatusCode::NOT_FOUND
} else if error.is_invalid_input() {
StatusCode::BAD_REQUEST
} else {
StatusCode::INTERNAL_SERVER_ERROR
+8
View File
@@ -7,6 +7,7 @@ use std::{
use axum::{
Router,
extract::DefaultBodyLimit,
routing::{get, post},
};
use tower_http::services::{ServeDir, ServeFile};
@@ -17,6 +18,7 @@ const DEFAULT_BIND_ADDR: &str = "127.0.0.1:3000";
const STATIC_DIR_ENV: &str = "UPL_STATIC_DIR";
const DATA_DIR_ENV: &str = "UPL_DATA_DIR";
const BIND_ENV: &str = "UPL_BIND";
const MAX_REQUEST_BODY_BYTES: usize = 64 * 1024 * 1024;
#[derive(Clone, Debug)]
pub struct AppConfig {
@@ -72,6 +74,12 @@ pub fn build_router(config: &AppConfig) -> Router {
Router::new()
.route("/healthz", get(healthz))
.route("/api/uploads", post(api::create_upload))
.route("/api/uploads/{upload_id}", get(api::get_upload))
.route(
"/api/uploads/{upload_id}/chunks/{index}",
axum::routing::put(api::put_chunk),
)
.layer(DefaultBodyLimit::max(MAX_REQUEST_BODY_BYTES))
.fallback_service(static_service(&config.static_dir))
.with_state(state)
}
+22
View File
@@ -17,6 +17,16 @@ pub struct CreateUploadResponse {
pub completed_chunks: Vec<u64>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct UploadProgressResponse {
pub upload_id: String,
pub name: String,
pub size: u64,
pub chunk_size: u64,
pub total_chunks: u64,
pub completed_chunks: Vec<u64>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct UploadMeta {
pub id: String,
@@ -39,4 +49,16 @@ impl UploadMeta {
completed_chunks: Vec::new(),
}
}
#[must_use]
pub fn progress_response(&self, completed_chunks: Vec<u64>) -> UploadProgressResponse {
UploadProgressResponse {
upload_id: self.id.clone(),
name: self.original_name.clone(),
size: self.size,
chunk_size: self.chunk_size,
total_chunks: self.total_chunks,
completed_chunks,
}
}
}
+165 -3
View File
@@ -8,7 +8,7 @@ use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use tokio::fs;
use uuid::Uuid;
use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta};
use crate::model::{CHUNK_SIZE, CreateUploadRequest, UploadMeta, UploadProgressResponse};
#[derive(Clone, Debug)]
pub struct Storage {
@@ -70,6 +70,59 @@ impl Storage {
Err(StorageError::IdCollision)
}
/// Loads upload progress by scanning durable chunk files.
///
/// # Errors
///
/// Returns an error when the upload id is invalid, metadata is missing, or
/// the staging directory cannot be scanned.
pub async fn progress(&self, upload_id: &str) -> Result<UploadProgressResponse, StorageError> {
let meta = self.load_meta(upload_id).await?;
let completed_chunks = self.completed_chunks(&meta).await?;
Ok(meta.progress_response(completed_chunks))
}
/// Validates and stores one raw chunk body.
///
/// # Errors
///
/// Returns an error when the upload is unknown, the index is out of range,
/// the body length is not the expected chunk length, or the chunk cannot be
/// written and renamed into place.
pub async fn store_chunk(
&self,
upload_id: &str,
index: u64,
body: &[u8],
) -> Result<(), StorageError> {
let meta = self.load_meta(upload_id).await?;
let expected_len = expected_chunk_len(&meta, index)?;
let body_len = u64::try_from(body.len())
.map_err(|_| StorageError::InvalidInput("chunk body is too large"))?;
if body_len != expected_len {
return Err(StorageError::InvalidInput("chunk has the wrong length"));
}
let part_path = self.chunk_path(upload_id, index);
if let Some(existing_len) = file_len(&part_path).await? {
if existing_len == expected_len {
return Ok(());
}
return Err(StorageError::InvalidInput(
"existing chunk has the wrong length",
));
}
let tmp_path = part_path.with_extension("part.tmp");
fs::write(&tmp_path, body).await?;
fs::rename(&tmp_path, &part_path).await?;
Ok(())
}
#[must_use]
pub fn data_dir(&self) -> &Path {
&self.data_dir
@@ -87,6 +140,12 @@ impl Storage {
self.staging_dir().join(upload_id)
}
fn chunk_path(&self, upload_id: &str, index: u64) -> PathBuf {
self.upload_dir(upload_id)
.join("chunks")
.join(format!("{index:06}.part"))
}
async fn ensure_layout(&self) -> Result<(), StorageError> {
fs::create_dir_all(self.staging_dir()).await?;
fs::create_dir_all(self.complete_dir()).await?;
@@ -103,6 +162,34 @@ impl Storage {
Ok(())
}
async fn load_meta(&self, upload_id: &str) -> Result<UploadMeta, StorageError> {
validate_upload_id(upload_id)?;
let meta_path = self.upload_dir(upload_id).join("meta.json");
let bytes = match fs::read(meta_path).await {
Ok(bytes) => bytes,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
return Err(StorageError::NotFound);
}
Err(error) => return Err(error.into()),
};
Ok(serde_json::from_slice(&bytes)?)
}
async fn completed_chunks(&self, meta: &UploadMeta) -> Result<Vec<u64>, StorageError> {
let mut completed = Vec::new();
for index in 0..meta.total_chunks {
let expected_len = expected_chunk_len(meta, index)?;
if file_len(&self.chunk_path(&meta.id, index)).await? == Some(expected_len) {
completed.push(index);
}
}
Ok(completed)
}
}
#[derive(Debug)]
@@ -112,6 +199,7 @@ pub enum StorageError {
InvalidInput(&'static str),
Io(std::io::Error),
Json(serde_json::Error),
NotFound,
}
impl StorageError {
@@ -119,6 +207,11 @@ impl StorageError {
pub fn is_invalid_input(&self) -> bool {
matches!(self, Self::InvalidInput(_))
}
#[must_use]
pub fn is_not_found(&self) -> bool {
matches!(self, Self::NotFound)
}
}
impl Display for StorageError {
@@ -129,6 +222,7 @@ impl Display for StorageError {
Self::InvalidInput(message) => formatter.write_str(message),
Self::Io(error) => write!(formatter, "storage I/O error: {error}"),
Self::Json(error) => write!(formatter, "metadata JSON error: {error}"),
Self::NotFound => formatter.write_str("upload not found"),
}
}
}
@@ -139,7 +233,7 @@ impl Error for StorageError {
Self::Format(error) => Some(error),
Self::Io(error) => Some(error),
Self::Json(error) => Some(error),
Self::IdCollision | Self::InvalidInput(_) => None,
Self::IdCollision | Self::InvalidInput(_) | Self::NotFound => None,
}
}
}
@@ -188,9 +282,45 @@ fn total_chunks(size: u64, chunk_size: u64) -> u64 {
}
}
fn expected_chunk_len(meta: &UploadMeta, index: u64) -> Result<u64, StorageError> {
if index >= meta.total_chunks {
return Err(StorageError::InvalidInput("chunk index is out of range"));
}
if index + 1 == meta.total_chunks {
let bytes_before_final = meta.chunk_size * index;
Ok(meta.size - bytes_before_final)
} else {
Ok(meta.chunk_size)
}
}
async fn file_len(path: &Path) -> Result<Option<u64>, StorageError> {
match fs::metadata(path).await {
Ok(metadata) if metadata.is_file() => Ok(Some(metadata.len())),
Ok(_) => Ok(None),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(error) => Err(error.into()),
}
}
fn validate_upload_id(upload_id: &str) -> Result<(), StorageError> {
let is_valid = !upload_id.is_empty()
&& upload_id
.bytes()
.all(|byte| byte.is_ascii_alphanumeric() || byte == b'-' || byte == b'_');
if is_valid {
Ok(())
} else {
Err(StorageError::InvalidInput("upload id is invalid"))
}
}
#[cfg(test)]
mod tests {
use super::{safe_file_name, total_chunks};
use super::{expected_chunk_len, safe_file_name, total_chunks, validate_upload_id};
use crate::model::UploadMeta;
#[test]
fn computes_total_chunks() {
@@ -205,4 +335,36 @@ mod tests {
assert_eq!(safe_file_name("../movie:mkv"), "movie_mkv");
assert_eq!(safe_file_name(" "), "upload");
}
#[test]
fn computes_expected_chunk_lengths() -> Result<(), Box<dyn std::error::Error>> {
let meta = test_meta(33, 16, 3);
assert_eq!(expected_chunk_len(&meta, 0)?, 16);
assert_eq!(expected_chunk_len(&meta, 1)?, 16);
assert_eq!(expected_chunk_len(&meta, 2)?, 1);
assert!(expected_chunk_len(&meta, 3).is_err());
Ok(())
}
#[test]
fn validates_upload_ids() {
assert!(validate_upload_id("abc123").is_ok());
assert!(validate_upload_id("../abc").is_err());
assert!(validate_upload_id("").is_err());
}
fn test_meta(size: u64, chunk_size: u64, total_chunks: u64) -> UploadMeta {
UploadMeta {
id: "abc123".to_owned(),
original_name: "file.bin".to_owned(),
safe_name: "file.bin".to_owned(),
size,
last_modified: 0,
chunk_size,
total_chunks,
created_at: "2026-05-30T16:00:00Z".to_owned(),
}
}
}
+187
View File
@@ -0,0 +1,187 @@
use std::{
net::{Ipv4Addr, SocketAddr},
path::Path,
};
use axum::{
body::Body,
http::{Method, Request, StatusCode, header},
};
use http_body_util::BodyExt;
use serde_json::json;
use tempfile::TempDir;
use tower::ServiceExt;
use upl::{
app::{AppConfig, build_router},
model::{CHUNK_SIZE, CreateUploadResponse, UploadProgressResponse},
};
#[tokio::test]
async fn stores_chunks_and_reports_progress() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, temp_dir.path(), CHUNK_SIZE + 3).await?;
let final_chunk = vec![b'z'; 3];
let response = app
.clone()
.oneshot(chunk_request(&upload.upload_id, 1, final_chunk)?)
.await?;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let progress = get_progress(&app, &upload.upload_id).await?;
assert_eq!(progress.completed_chunks, vec![1]);
let first_chunk = vec![b'a'; usize::try_from(CHUNK_SIZE)?];
let response = app
.clone()
.oneshot(chunk_request(&upload.upload_id, 0, first_chunk)?)
.await?;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let progress = get_progress(&app, &upload.upload_id).await?;
assert_eq!(progress.completed_chunks, vec![0, 1]);
let chunk_path = temp_dir
.path()
.join("staging")
.join(&upload.upload_id)
.join("chunks")
.join("000000.part");
assert_eq!(tokio::fs::metadata(chunk_path).await?.len(), CHUNK_SIZE);
Ok(())
}
#[tokio::test]
async fn rejects_wrong_size_non_final_chunk() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, temp_dir.path(), CHUNK_SIZE + 1).await?;
let response = app
.oneshot(chunk_request(&upload.upload_id, 0, b"too short".to_vec())?)
.await?;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
Ok(())
}
#[tokio::test]
async fn accepts_duplicate_chunk_when_existing_length_matches()
-> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, temp_dir.path(), 4).await?;
let first = app
.clone()
.oneshot(chunk_request(&upload.upload_id, 0, b"data".to_vec())?)
.await?;
let second = app
.oneshot(chunk_request(&upload.upload_id, 0, b"data".to_vec())?)
.await?;
assert_eq!(first.status(), StatusCode::NO_CONTENT);
assert_eq!(second.status(), StatusCode::NO_CONTENT);
Ok(())
}
#[tokio::test]
async fn rejects_unknown_upload_id() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let response = app
.oneshot(chunk_request("missing", 0, b"data".to_vec())?)
.await?;
assert_eq!(response.status(), StatusCode::NOT_FOUND);
Ok(())
}
async fn create_upload(
app: &axum::Router,
data_dir: &Path,
size: u64,
) -> Result<CreateUploadResponse, Box<dyn std::error::Error>> {
let response = app
.clone()
.oneshot(json_request(
Method::POST,
"/api/uploads",
&json!({
"name": "chunked.bin",
"size": size,
"last_modified": 1_760_000_000_000_i64
}),
)?)
.await?;
assert_eq!(response.status(), StatusCode::OK);
assert!(data_dir.join("staging").is_dir());
decode_body(response).await
}
async fn get_progress(
app: &axum::Router,
upload_id: &str,
) -> Result<UploadProgressResponse, Box<dyn std::error::Error>> {
let response = app
.clone()
.oneshot(
Request::builder()
.method(Method::GET)
.uri(format!("/api/uploads/{upload_id}"))
.body(Body::empty())?,
)
.await?;
assert_eq!(response.status(), StatusCode::OK);
decode_body(response).await
}
fn test_app(data_dir: &Path) -> axum::Router {
build_router(&AppConfig::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
concat!(env!("CARGO_MANIFEST_DIR"), "/static"),
data_dir,
))
}
fn json_request(
method: Method,
uri: &str,
body: &serde_json::Value,
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
Ok(Request::builder()
.method(method)
.uri(uri)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(serde_json::to_vec(body)?))?)
}
fn chunk_request(
upload_id: &str,
index: u64,
body: Vec<u8>,
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
Ok(Request::builder()
.method(Method::PUT)
.uri(format!("/api/uploads/{upload_id}/chunks/{index}"))
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(Body::from(body))?)
}
async fn decode_body<T>(response: axum::response::Response) -> Result<T, Box<dyn std::error::Error>>
where
T: serde::de::DeserializeOwned,
{
let body = response.into_body().collect().await?.to_bytes();
Ok(serde_json::from_slice(&body)?)
}