Files
upl/tests/chunk_upload.rs
ddidderr c072b93726 feat: write chunks directly to temp upload files
Completed uploads used to copy every staged chunk into a second file before
renaming the result into data/complete. That doubled write volume and required
peak disk space for both the chunk set and the final file.

Write each chunk directly into one private temp upload file at its final offset
instead. After a chunk write succeeds, record a tiny durable completion marker
for progress and resume scans. Completion now verifies the temp file length and
all markers, then renames the temp file into the completed upload directory.

Add UPL_TEMP_DIR and --temp-dir so operators can choose where upload metadata,
markers, and temp files live. The default remains data/staging, and docs call
out that the temp directory must be on the same filesystem as data/complete for
atomic promotion. The nginx example now aliases only the completed upload
directory, and the smoke test verifies that final-file alias.

This keeps the existing length-based validation model; it does not add per-chunk
hashing.

Test Plan:
- just check
- just nginx-smoke
- cargo clippy && cargo clippy --benches && cargo clippy --tests
- cargo +nightly fmt --all
- cargo clippy && cargo clippy --benches && cargo clippy --tests

Refs: none
2026-05-30 18:10:05 +02:00

205 lines
5.7 KiB
Rust

use std::{
net::{Ipv4Addr, SocketAddr},
path::Path,
};
use axum::{
body::Body,
http::{Method, Request, StatusCode, header},
};
use http_body_util::BodyExt;
use serde_json::json;
use tempfile::TempDir;
use tower::ServiceExt;
use upl::{
app::{AppConfig, build_router},
model::{CHUNK_SIZE, CreateUploadResponse, UploadProgressResponse},
};
#[tokio::test]
async fn stores_chunks_and_reports_progress() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, temp_dir.path(), CHUNK_SIZE + 3).await?;
let final_chunk = vec![b'z'; 3];
let response = app
.clone()
.oneshot(chunk_request(&upload.upload_id, 1, final_chunk)?)
.await?;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let progress = get_progress(&app, &upload.upload_id).await?;
assert_eq!(progress.completed_chunks, vec![1]);
let first_chunk = vec![b'a'; usize::try_from(CHUNK_SIZE)?];
let response = app
.clone()
.oneshot(chunk_request(&upload.upload_id, 0, first_chunk)?)
.await?;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let progress = get_progress(&app, &upload.upload_id).await?;
assert_eq!(progress.completed_chunks, vec![0, 1]);
let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id);
assert_eq!(
tokio::fs::metadata(upload_dir.join(".upload.tmp"))
.await?
.len(),
CHUNK_SIZE + 3
);
assert!(upload_dir.join("completed").join("000000.done").is_file());
assert!(upload_dir.join("completed").join("000001.done").is_file());
assert!(!upload_dir.join("chunks").exists());
Ok(())
}
#[tokio::test]
async fn rejects_wrong_size_non_final_chunk() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, temp_dir.path(), CHUNK_SIZE + 1).await?;
let response = app
.oneshot(chunk_request(&upload.upload_id, 0, b"too short".to_vec())?)
.await?;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
Ok(())
}
#[tokio::test]
async fn rejects_out_of_range_chunk_index() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, temp_dir.path(), 4).await?;
let response = app
.oneshot(chunk_request(&upload.upload_id, 1, b"data".to_vec())?)
.await?;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
Ok(())
}
#[tokio::test]
async fn accepts_duplicate_completed_chunk() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, temp_dir.path(), 4).await?;
let first = app
.clone()
.oneshot(chunk_request(&upload.upload_id, 0, b"data".to_vec())?)
.await?;
let second = app
.oneshot(chunk_request(&upload.upload_id, 0, b"data".to_vec())?)
.await?;
assert_eq!(first.status(), StatusCode::NO_CONTENT);
assert_eq!(second.status(), StatusCode::NO_CONTENT);
Ok(())
}
#[tokio::test]
async fn rejects_unknown_upload_id() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let response = app
.oneshot(chunk_request("missing", 0, b"data".to_vec())?)
.await?;
assert_eq!(response.status(), StatusCode::NOT_FOUND);
Ok(())
}
async fn create_upload(
app: &axum::Router,
data_dir: &Path,
size: u64,
) -> Result<CreateUploadResponse, Box<dyn std::error::Error>> {
let response = app
.clone()
.oneshot(json_request(
Method::POST,
"/api/uploads",
&json!({
"name": "chunked.bin",
"size": size,
"last_modified": 1_760_000_000_000_i64
}),
)?)
.await?;
assert_eq!(response.status(), StatusCode::OK);
assert!(data_dir.join("staging").is_dir());
decode_body(response).await
}
async fn get_progress(
app: &axum::Router,
upload_id: &str,
) -> Result<UploadProgressResponse, Box<dyn std::error::Error>> {
let response = app
.clone()
.oneshot(
Request::builder()
.method(Method::GET)
.uri(format!("/api/uploads/{upload_id}"))
.body(Body::empty())?,
)
.await?;
assert_eq!(response.status(), StatusCode::OK);
decode_body(response).await
}
fn test_app(data_dir: &Path) -> axum::Router {
build_router(&AppConfig::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
concat!(env!("CARGO_MANIFEST_DIR"), "/static"),
data_dir,
))
}
fn json_request(
method: Method,
uri: &str,
body: &serde_json::Value,
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
Ok(Request::builder()
.method(method)
.uri(uri)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(serde_json::to_vec(body)?))?)
}
fn chunk_request(
upload_id: &str,
index: u64,
body: Vec<u8>,
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
Ok(Request::builder()
.method(Method::PUT)
.uri(format!("/api/uploads/{upload_id}/chunks/{index}"))
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(Body::from(body))?)
}
async fn decode_body<T>(response: axum::response::Response) -> Result<T, Box<dyn std::error::Error>>
where
T: serde::de::DeserializeOwned,
{
let body = response.into_body().collect().await?.to_bytes();
Ok(serde_json::from_slice(&body)?)
}