1923ff2a6f
The browser upload flow was built around one selected file and one global upload state. That made the existing chunk pool useful for a single file, but users could not start several selected files at the same time. Refactor the browser state into per-file upload items. Each selected file now has its own upload record, completed-chunk set, abort controller, retry state, progress row, and saved IndexedDB resume record. The picker accepts multiple files, `Start all` and `Resume all` use a bounded file-level pool, and each file keeps the existing bounded chunk pool. This keeps parallel uploads useful without letting one large selection create unbounded request fan-out. Keep the server API unchanged. Each file still receives a separate server upload id, and server-side progress remains authoritative before any missing chunks are scheduled. Terminal conflicts still stop the affected file without overwriting completed data. Update the user-facing markup, styles, project docs, and test checklist for the multi-file scheduler. Add a server regression test that interleaves two uploads and verifies the completed files contain exactly their own bytes. Test Plan: - just check - git diff --check
272 lines
7.6 KiB
Rust
272 lines
7.6 KiB
Rust
use std::{
|
|
net::{Ipv4Addr, SocketAddr},
|
|
path::Path,
|
|
};
|
|
|
|
use axum::{
|
|
body::Body,
|
|
http::{Method, Request, StatusCode, header},
|
|
};
|
|
use http_body_util::BodyExt;
|
|
use serde_json::json;
|
|
use tempfile::TempDir;
|
|
use tower::ServiceExt;
|
|
use upl::{
|
|
app::{AppConfig, build_router},
|
|
model::{CHUNK_SIZE, CompleteUploadResponse, CreateUploadResponse},
|
|
};
|
|
|
|
#[tokio::test]
|
|
async fn assembles_completed_upload() -> Result<(), Box<dyn std::error::Error>> {
|
|
let temp_dir = TempDir::new()?;
|
|
let app = test_app(temp_dir.path());
|
|
let upload = create_upload(&app, "hello.txt", 11).await?;
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(chunk_request(
|
|
&upload.upload_id,
|
|
0,
|
|
b"hello world".to_vec(),
|
|
)?)
|
|
.await?;
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", upload.upload_id),
|
|
)?)
|
|
.await?;
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
let complete: CompleteUploadResponse = decode_body(response).await?;
|
|
assert_eq!(complete.name, "hello.txt");
|
|
assert_eq!(
|
|
tokio::fs::read(temp_dir.path().join("complete").join("hello.txt")).await?,
|
|
b"hello world"
|
|
);
|
|
assert!(
|
|
!temp_dir
|
|
.path()
|
|
.join("staging")
|
|
.join(&upload.upload_id)
|
|
.exists()
|
|
);
|
|
|
|
let duplicate = app
|
|
.oneshot(empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", upload.upload_id),
|
|
)?)
|
|
.await?;
|
|
assert_eq!(duplicate.status(), StatusCode::NOT_FOUND);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn parallel_uploads_keep_bytes_separate() -> Result<(), Box<dyn std::error::Error>> {
|
|
let temp_dir = TempDir::new()?;
|
|
let app = test_app(temp_dir.path());
|
|
let chunk_size = usize::try_from(CHUNK_SIZE)?;
|
|
let left_upload = create_upload(&app, "left.bin", CHUNK_SIZE + 4).await?;
|
|
let right_upload = create_upload(&app, "right.bin", CHUNK_SIZE + 5).await?;
|
|
|
|
let mut expected_left = vec![b'l'; chunk_size];
|
|
expected_left.extend_from_slice(b"eft!");
|
|
let mut expected_right = vec![b'r'; chunk_size];
|
|
expected_right.extend_from_slice(b"ight!");
|
|
|
|
let left_first = chunk_request(
|
|
&left_upload.upload_id,
|
|
0,
|
|
expected_left[..chunk_size].to_vec(),
|
|
)?;
|
|
let left_final = chunk_request(
|
|
&left_upload.upload_id,
|
|
1,
|
|
expected_left[chunk_size..].to_vec(),
|
|
)?;
|
|
let right_first = chunk_request(
|
|
&right_upload.upload_id,
|
|
0,
|
|
expected_right[..chunk_size].to_vec(),
|
|
)?;
|
|
let right_final = chunk_request(
|
|
&right_upload.upload_id,
|
|
1,
|
|
expected_right[chunk_size..].to_vec(),
|
|
)?;
|
|
|
|
let (left_first, right_first, left_final, right_final) = tokio::join!(
|
|
app.clone().oneshot(left_first),
|
|
app.clone().oneshot(right_first),
|
|
app.clone().oneshot(left_final),
|
|
app.clone().oneshot(right_final),
|
|
);
|
|
|
|
assert_eq!(left_first?.status(), StatusCode::NO_CONTENT);
|
|
assert_eq!(right_first?.status(), StatusCode::NO_CONTENT);
|
|
assert_eq!(left_final?.status(), StatusCode::NO_CONTENT);
|
|
assert_eq!(right_final?.status(), StatusCode::NO_CONTENT);
|
|
|
|
let left_complete = empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", left_upload.upload_id),
|
|
)?;
|
|
let right_complete = empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", right_upload.upload_id),
|
|
)?;
|
|
|
|
let (left_complete, right_complete) = tokio::join!(
|
|
app.clone().oneshot(left_complete),
|
|
app.clone().oneshot(right_complete),
|
|
);
|
|
|
|
assert_eq!(left_complete?.status(), StatusCode::OK);
|
|
assert_eq!(right_complete?.status(), StatusCode::OK);
|
|
assert_eq!(
|
|
tokio::fs::read(temp_dir.path().join("complete").join("left.bin")).await?,
|
|
expected_left
|
|
);
|
|
assert_eq!(
|
|
tokio::fs::read(temp_dir.path().join("complete").join("right.bin")).await?,
|
|
expected_right
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_incomplete_upload() -> Result<(), Box<dyn std::error::Error>> {
|
|
let temp_dir = TempDir::new()?;
|
|
let app = test_app(temp_dir.path());
|
|
let upload = create_upload(&app, "partial.bin", CHUNK_SIZE + 1).await?;
|
|
|
|
let response = app
|
|
.clone()
|
|
.oneshot(chunk_request(&upload.upload_id, 1, b"x".to_vec())?)
|
|
.await?;
|
|
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
|
|
|
let response = app
|
|
.oneshot(empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", upload.upload_id),
|
|
)?)
|
|
.await?;
|
|
|
|
assert_eq!(response.status(), StatusCode::CONFLICT);
|
|
assert!(
|
|
!temp_dir
|
|
.path()
|
|
.join("complete")
|
|
.join("partial.bin")
|
|
.exists()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_tampered_temp_upload_file() -> Result<(), Box<dyn std::error::Error>> {
|
|
let temp_dir = TempDir::new()?;
|
|
let app = test_app(temp_dir.path());
|
|
let upload = create_upload(&app, "corrupt.bin", 4).await?;
|
|
|
|
let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id);
|
|
tokio::fs::write(upload_dir.join(".upload.tmp"), b"bad").await?;
|
|
tokio::fs::write(upload_dir.join("completed").join("000000.done"), b"").await?;
|
|
|
|
let response = app
|
|
.oneshot(empty_request(
|
|
Method::POST,
|
|
&format!("/api/uploads/{}/complete", upload.upload_id),
|
|
)?)
|
|
.await?;
|
|
|
|
assert_eq!(response.status(), StatusCode::CONFLICT);
|
|
assert!(
|
|
!temp_dir
|
|
.path()
|
|
.join("complete")
|
|
.join("corrupt.bin")
|
|
.exists()
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn create_upload(
|
|
app: &axum::Router,
|
|
name: &str,
|
|
size: u64,
|
|
) -> Result<CreateUploadResponse, Box<dyn std::error::Error>> {
|
|
let response = app
|
|
.clone()
|
|
.oneshot(json_request(
|
|
Method::POST,
|
|
"/api/uploads",
|
|
&json!({
|
|
"name": name,
|
|
"size": size,
|
|
"last_modified": 1_760_000_000_000_i64
|
|
}),
|
|
)?)
|
|
.await?;
|
|
|
|
assert_eq!(response.status(), StatusCode::OK);
|
|
|
|
decode_body(response).await
|
|
}
|
|
|
|
fn test_app(data_dir: &Path) -> axum::Router {
|
|
build_router(&AppConfig::new(
|
|
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
|
|
concat!(env!("CARGO_MANIFEST_DIR"), "/static"),
|
|
data_dir,
|
|
))
|
|
}
|
|
|
|
fn json_request(
|
|
method: Method,
|
|
uri: &str,
|
|
body: &serde_json::Value,
|
|
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
|
|
Ok(Request::builder()
|
|
.method(method)
|
|
.uri(uri)
|
|
.header(header::CONTENT_TYPE, "application/json")
|
|
.body(Body::from(serde_json::to_vec(body)?))?)
|
|
}
|
|
|
|
fn chunk_request(
|
|
upload_id: &str,
|
|
index: u64,
|
|
body: Vec<u8>,
|
|
) -> Result<Request<Body>, Box<dyn std::error::Error>> {
|
|
Ok(Request::builder()
|
|
.method(Method::PUT)
|
|
.uri(format!("/api/uploads/{upload_id}/chunks/{index}"))
|
|
.header(header::CONTENT_TYPE, "application/octet-stream")
|
|
.body(Body::from(body))?)
|
|
}
|
|
|
|
fn empty_request(method: Method, uri: &str) -> Result<Request<Body>, Box<dyn std::error::Error>> {
|
|
Ok(Request::builder()
|
|
.method(method)
|
|
.uri(uri)
|
|
.body(Body::empty())?)
|
|
}
|
|
|
|
async fn decode_body<T>(response: axum::response::Response) -> Result<T, Box<dyn std::error::Error>>
|
|
where
|
|
T: serde::de::DeserializeOwned,
|
|
{
|
|
let body = response.into_body().collect().await?.to_bytes();
|
|
Ok(serde_json::from_slice(&body)?)
|
|
}
|