428af52e2f
Successful completion moved the assembled file into data/complete but left the upload staging directory behind, including all chunk files. Remove the upload's staging directory only after the final file has been renamed into place so incomplete and failed uploads remain resumable. A repeat complete request for that old upload id now returns 404 because the temporary upload record has been retired with its chunks. Test Plan: - just check Refs: none
202 lines
5.2 KiB
Rust
202 lines
5.2 KiB
Rust
use std::{
|
|
net::{Ipv4Addr, SocketAddr},
|
|
path::Path,
|
|
};
|
|
|
|
use axum::{
|
|
body::Body,
|
|
http::{Method, Request, StatusCode, header},
|
|
};
|
|
use http_body_util::BodyExt;
|
|
use serde_json::json;
|
|
use tempfile::TempDir;
|
|
use tower::ServiceExt;
|
|
use upl::{
|
|
app::{AppConfig, build_router},
|
|
model::{CHUNK_SIZE, CompleteUploadResponse, CreateUploadResponse},
|
|
};
|
|
|
|
#[tokio::test]
|
|
async fn assembles_completed_upload() -> Result<(), Box<dyn std::error::Error>> {
|
|
let temp_dir = TempDir::new()?;
|
|
let app = test_app(temp_dir.path());
|
|
let upload = create_upload(&app, "hello.txt", 11).await?;
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(chunk_request(
|
|
&upload.upload_id,
|
|
0,
|
|
b"hello world".to_vec(),
|
|
)?)
|
|
.await?;
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", upload.upload_id),
|
|
)?)
|
|
.await?;
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
let complete: CompleteUploadResponse = decode_body(response).await?;
|
|
assert_eq!(complete.name, "hello.txt");
|
|
assert_eq!(
|
|
tokio::fs::read(temp_dir.path().join("complete").join("hello.txt")).await?,
|
|
b"hello world"
|
|
);
|
|
assert!(
|
|
!temp_dir
|
|
.path()
|
|
.join("staging")
|
|
.join(&upload.upload_id)
|
|
.exists()
|
|
);
|
|
|
|
let duplicate = app
|
|
.oneshot(empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", upload.upload_id),
|
|
)?)
|
|
.await?;
|
|
assert_eq!(duplicate.status(), StatusCode::NOT_FOUND);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_incomplete_upload() -> Result<(), Box<dyn std::error::Error>> {
|
|
let temp_dir = TempDir::new()?;
|
|
let app = test_app(temp_dir.path());
|
|
let upload = create_upload(&app, "partial.bin", CHUNK_SIZE + 1).await?;
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(chunk_request(&upload.upload_id, 1, b"x".to_vec())?)
|
|
.await?;
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
let response = app
|
|
.oneshot(empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", upload.upload_id),
|
|
)?)
|
|
.await?;
|
|
|
|
assert_eq!(response.status(), StatusCode::CONFLICT);
|
|
assert!(
|
|
!temp_dir
|
|
.path()
|
|
.join("complete")
|
|
.join("partial.bin")
|
|
.exists()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_corrupt_chunk_file() -> Result<(), Box<dyn std::error::Error>> {
|
|
let temp_dir = TempDir::new()?;
|
|
let app = test_app(temp_dir.path());
|
|
let upload = create_upload(&app, "corrupt.bin", 4).await?;
|
|
|
|
let chunk_path = temp_dir
|
|
.path()
|
|
.join("staging")
|
|
.join(&upload.upload_id)
|
|
.join("chunks")
|
|
.join("000000.part");
|
|
tokio::fs::write(chunk_path, b"bad").await?;
|
|
|
|
let response = app
|
|
.oneshot(empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", upload.upload_id),
|
|
)?)
|
|
.await?;
|
|
|
|
assert_eq!(response.status(), StatusCode::CONFLICT);
|
|
assert!(
|
|
!temp_dir
|
|
.path()
|
|
.join("complete")
|
|
.join("corrupt.bin")
|
|
.exists()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn create_upload(
|
|
app: &axum::Router,
|
|
name: &str,
|
|
size: u64,
|
|
) -> Result<CreateUploadResponse, Box<dyn std::error::Error>> {
|
|
let response = app
|
|
.clone()
|
|
.oneshot(json_request(
|
|
Method::POST,
|
|
"/api/uploads",
|
|
&json!({
|
|
"name": name,
|
|
"size": size,
|
|
"last_modified": 1_760_000_000_000_i64
|
|
}),
|
|
)?)
|
|
.await?;
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
decode_body(response).await
|
|
}
|
|
|
|
fn test_app(data_dir: &Path) -> axum::Router {
|
|
build_router(&AppConfig::new(
|
|
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
|
|
concat!(env!("CARGO_MANIFEST_DIR"), "/static"),
|
|
data_dir,
|
|
))
|
|
}
|
|
|
|
fn json_request(
|
|
method: Method,
|
|
uri: &str,
|
|
body: &serde_json::Value,
|
|
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
|
|
Ok(Request::builder()
|
|
.method(method)
|
|
.uri(uri)
|
|
.header(header::CONTENT_TYPE, "application/json")
|
|
.body(Body::from(serde_json::to_vec(body)?))?)
|
|
}
|
|
|
|
fn chunk_request(
|
|
upload_id: &str,
|
|
index: u64,
|
|
body: Vec<u8>,
|
|
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
|
|
Ok(Request::builder()
|
|
.method(Method::PUT)
|
|
.uri(format!("/api/uploads/{upload_id}/chunks/{index}"))
|
|
.header(header::CONTENT_TYPE, "application/octet-stream")
|
|
.body(Body::from(body))?)
|
|
}
|
|
|
|
fn empty_request(method: Method, uri: &str) -> Result<Request<Body>, Box<dyn std::error::Error>> {
|
|
Ok(Request::builder()
|
|
.method(method)
|
|
.uri(uri)
|
|
.body(Body::empty())?)
|
|
}
|
|
|
|
async fn decode_body<T>(response: axum::response::Response) -> Result<T, Box<dyn std::error::Error>>
|
|
where
|
|
T: serde::de::DeserializeOwned,
|
|
{
|
|
let body = response.into_body().collect().await?.to_bytes();
|
|
Ok(serde_json::from_slice(&body)?)
|
|
}
|