use std::{ net::{Ipv4Addr, SocketAddr}, path::Path, }; use axum::{ body::Body, http::{Method, Request, StatusCode, header}, }; use http_body_util::BodyExt; use serde_json::json; use tempfile::TempDir; use tower::ServiceExt; use upl::{ app::{AppConfig, build_router}, model::{CHUNK_SIZE, CompleteUploadResponse, CreateUploadResponse}, }; #[tokio::test] async fn assembles_completed_upload() -> Result<(), Box> { let temp_dir = TempDir::new()?; let app = test_app(temp_dir.path()); let upload = create_upload(&app, "hello.txt", 11).await?; let response = app .clone() .oneshot(chunk_request( &upload.upload_id, 0, b"hello world".to_vec(), )?) .await?; assert_eq!(response.status(), StatusCode::NO_CONTENT); let response = app .clone() .oneshot(empty_request( Method::POST, &format!("/api/uploads/{}/complete", upload.upload_id), )?) .await?; assert_eq!(response.status(), StatusCode::OK); let complete: CompleteUploadResponse = decode_body(response).await?; assert_eq!(complete.name, "hello.txt"); assert_eq!( tokio::fs::read(temp_dir.path().join("complete").join("hello.txt")).await?, b"hello world" ); assert!( !temp_dir .path() .join("staging") .join(&upload.upload_id) .exists() ); let duplicate = app .oneshot(empty_request( Method::POST, &format!("/api/uploads/{}/complete", upload.upload_id), )?) .await?; assert_eq!(duplicate.status(), StatusCode::NOT_FOUND); Ok(()) } #[tokio::test] async fn parallel_uploads_keep_bytes_separate() -> Result<(), Box> { let temp_dir = TempDir::new()?; let app = test_app(temp_dir.path()); let chunk_size = usize::try_from(CHUNK_SIZE)?; let left_upload = create_upload(&app, "left.bin", CHUNK_SIZE + 4).await?; let right_upload = create_upload(&app, "right.bin", CHUNK_SIZE + 5).await?; let mut expected_left = vec![b'l'; chunk_size]; expected_left.extend_from_slice(b"eft!"); let mut expected_right = vec![b'r'; chunk_size]; expected_right.extend_from_slice(b"ight!"); let left_first = chunk_request( &left_upload.upload_id, 0, expected_left[..chunk_size].to_vec(), )?; let left_final = chunk_request( &left_upload.upload_id, 1, expected_left[chunk_size..].to_vec(), )?; let right_first = chunk_request( &right_upload.upload_id, 0, expected_right[..chunk_size].to_vec(), )?; let right_final = chunk_request( &right_upload.upload_id, 1, expected_right[chunk_size..].to_vec(), )?; let (left_first, right_first, left_final, right_final) = tokio::join!( app.clone().oneshot(left_first), app.clone().oneshot(right_first), app.clone().oneshot(left_final), app.clone().oneshot(right_final), ); assert_eq!(left_first?.status(), StatusCode::NO_CONTENT); assert_eq!(right_first?.status(), StatusCode::NO_CONTENT); assert_eq!(left_final?.status(), StatusCode::NO_CONTENT); assert_eq!(right_final?.status(), StatusCode::NO_CONTENT); let left_complete = empty_request( Method::POST, &format!("/api/uploads/{}/complete", left_upload.upload_id), )?; let right_complete = empty_request( Method::POST, &format!("/api/uploads/{}/complete", right_upload.upload_id), )?; let (left_complete, right_complete) = tokio::join!( app.clone().oneshot(left_complete), app.clone().oneshot(right_complete), ); assert_eq!(left_complete?.status(), StatusCode::OK); assert_eq!(right_complete?.status(), StatusCode::OK); assert_eq!( tokio::fs::read(temp_dir.path().join("complete").join("left.bin")).await?, expected_left ); assert_eq!( tokio::fs::read(temp_dir.path().join("complete").join("right.bin")).await?, expected_right ); Ok(()) } #[tokio::test] async fn rejects_incomplete_upload() -> Result<(), Box> { let temp_dir = TempDir::new()?; let app = test_app(temp_dir.path()); let upload = create_upload(&app, "partial.bin", CHUNK_SIZE + 1).await?; let response = app .clone() .oneshot(chunk_request(&upload.upload_id, 1, b"x".to_vec())?) .await?; assert_eq!(response.status(), StatusCode::NO_CONTENT); let response = app .oneshot(empty_request( Method::POST, &format!("/api/uploads/{}/complete", upload.upload_id), )?) .await?; assert_eq!(response.status(), StatusCode::CONFLICT); assert!( !temp_dir .path() .join("complete") .join("partial.bin") .exists() ); Ok(()) } #[tokio::test] async fn rejects_completion_that_would_replace_file() -> Result<(), Box> { let temp_dir = TempDir::new()?; let app = test_app(temp_dir.path()); let upload = create_upload(&app, "clash.bin", 8).await?; tokio::fs::write( temp_dir.path().join("complete").join("clash.bin"), b"original", ) .await?; let response = app .clone() .oneshot(chunk_request(&upload.upload_id, 0, b"incoming".to_vec())?) .await?; assert_eq!(response.status(), StatusCode::NO_CONTENT); let response = app .oneshot(empty_request( Method::POST, &format!("/api/uploads/{}/complete", upload.upload_id), )?) .await?; assert_eq!(response.status(), StatusCode::CONFLICT); assert_eq!( tokio::fs::read(temp_dir.path().join("complete").join("clash.bin")).await?, b"original" ); assert!( temp_dir .path() .join("staging") .join(&upload.upload_id) .exists() ); Ok(()) } #[tokio::test] async fn rejects_tampered_temp_upload_file() -> Result<(), Box> { let temp_dir = TempDir::new()?; let app = test_app(temp_dir.path()); let upload = create_upload(&app, "corrupt.bin", 4).await?; let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id); tokio::fs::write(upload_dir.join(".upload.tmp"), b"bad").await?; tokio::fs::write(upload_dir.join("completed").join("000000.done"), b"").await?; let response = app .oneshot(empty_request( Method::POST, &format!("/api/uploads/{}/complete", upload.upload_id), )?) .await?; assert_eq!(response.status(), StatusCode::CONFLICT); assert!( !temp_dir .path() .join("complete") .join("corrupt.bin") .exists() ); Ok(()) } async fn create_upload( app: &axum::Router, name: &str, size: u64, ) -> Result> { let response = app .clone() .oneshot(json_request( Method::POST, "/api/uploads", &json!({ "name": name, "size": size, "last_modified": 1_760_000_000_000_i64 }), )?) .await?; assert_eq!(response.status(), StatusCode::OK); decode_body(response).await } fn test_app(data_dir: &Path) -> axum::Router { build_router(&AppConfig::new( SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), concat!(env!("CARGO_MANIFEST_DIR"), "/static"), data_dir, )) } fn json_request( method: Method, uri: &str, body: &serde_json::Value, ) -> Result, Box> { Ok(Request::builder() .method(method) .uri(uri) .header(header::CONTENT_TYPE, "application/json") .body(Body::from(serde_json::to_vec(body)?))?) } fn chunk_request( upload_id: &str, index: u64, body: Vec, ) -> Result, Box> { Ok(Request::builder() .method(Method::PUT) .uri(format!("/api/uploads/{upload_id}/chunks/{index}")) .header(header::CONTENT_TYPE, "application/octet-stream") .body(Body::from(body))?) } fn empty_request(method: Method, uri: &str) -> Result, Box> { Ok(Request::builder() .method(method) .uri(uri) .body(Body::empty())?) } async fn decode_body(response: axum::response::Response) -> Result> where T: serde::de::DeserializeOwned, { let body = response.into_body().collect().await?.to_bytes(); Ok(serde_json::from_slice(&body)?) }