feat: write chunks directly to temp upload files
Completed uploads used to copy every staged chunk into a second file before renaming the result into data/complete. That doubled write volume and required peak disk space for both the chunk set and the final file. Write each chunk directly into one private temp upload file at its final offset instead. After a chunk write succeeds, record a tiny durable completion marker for progress and resume scans. Completion now verifies the temp file length and all markers, then renames the temp file into the completed upload directory. Add UPL_TEMP_DIR and --temp-dir so operators can choose where upload metadata, markers, and temp files live. The default remains data/staging, and docs call out that the temp directory must be on the same filesystem as data/complete for atomic promotion. The nginx example now aliases only the completed upload directory, and the smoke test verifies that final-file alias. This keeps the existing length-based validation model; it does not add per-chunk hashing. Test Plan: - just check - just nginx-smoke - cargo clippy && cargo clippy --benches && cargo clippy --tests - cargo +nightly fmt --all - cargo clippy && cargo clippy --benches && cargo clippy --tests Refs: none
This commit is contained in:
@@ -6,9 +6,9 @@
|
||||
browser -> nginx -> upl Rust server -> local filesystem
|
||||
```
|
||||
|
||||
The first implementation milestone provides the Rust server shell and static
|
||||
browser UI. Upload metadata, chunk persistence, resume state, and completion
|
||||
assembly are tracked in `PLAN.md` and will be added in later coherent slices.
|
||||
The server writes upload chunks directly into an inaccessible temp file at
|
||||
their final offsets. Once every chunk is present, completion atomically renames
|
||||
that temp file into the completed upload directory.
|
||||
|
||||
## Project Structure
|
||||
|
||||
@@ -19,7 +19,7 @@ upl
|
||||
src/app.rs Axum router, shared state, static file service
|
||||
src/api.rs HTTP handlers and API error responses
|
||||
src/model.rs JSON request, response, and metadata shapes
|
||||
src/storage.rs local filesystem layout, chunks, and assembly
|
||||
src/storage.rs local filesystem layout, offset writes, and final rename
|
||||
src/lib.rs library surface used by integration tests
|
||||
Browser UI
|
||||
static/index.html upload tool markup
|
||||
@@ -39,11 +39,17 @@ upl
|
||||
`127.0.0.1:3000`.
|
||||
- `--static-dir` sets the static asset directory. It overrides `UPL_STATIC_DIR`
|
||||
and defaults to `static/` inside this repository.
|
||||
- `--data-dir` sets the upload data directory. It overrides `UPL_DATA_DIR` and
|
||||
defaults to `data/` inside this repository.
|
||||
- `--data-dir` sets the completed upload data root. Completed files land under
|
||||
its `complete/` subdirectory. It overrides `UPL_DATA_DIR` and defaults to
|
||||
`data/` inside this repository.
|
||||
- `--temp-dir` sets the directory for upload metadata, completion markers, and
|
||||
inaccessible temp upload files. It overrides `UPL_TEMP_DIR` and defaults to
|
||||
`<data-dir>/staging`.
|
||||
- `upl --help` prints the full argument help text.
|
||||
- The server accepts request bodies up to 64 MiB, which leaves room for the
|
||||
planned 16 MiB upload chunks and matches the nginx example in `PLAN.md`.
|
||||
- Keep `UPL_TEMP_DIR` on the same filesystem as `<data-dir>/complete` so
|
||||
completion can promote files with an atomic rename.
|
||||
|
||||
## Common Commands
|
||||
|
||||
@@ -61,12 +67,16 @@ just run
|
||||
Run `upl` on localhost and put nginx in front of it for TLS and access control:
|
||||
|
||||
```sh
|
||||
UPL_BIND=127.0.0.1:3000 UPL_DATA_DIR=/srv/upl/data upl
|
||||
UPL_BIND=127.0.0.1:3000 \
|
||||
UPL_DATA_DIR=/srv/upl/data \
|
||||
UPL_TEMP_DIR=/srv/upl/data/staging \
|
||||
upl
|
||||
```
|
||||
|
||||
Use `deploy/nginx/upl.conf.example` as the starting point for the nginx site.
|
||||
Before exposing the service, replace the certificate paths and add a protection
|
||||
layer such as HTTP basic auth, an IP allowlist, or VPN-only access.
|
||||
layer such as HTTP basic auth, an IP allowlist, or VPN-only access. The nginx
|
||||
example aliases only `/srv/upl/data/complete`; do not expose `UPL_TEMP_DIR`.
|
||||
|
||||
For a local Docker-based reverse-proxy smoke test:
|
||||
|
||||
|
||||
@@ -10,23 +10,26 @@ Keep this file as the reusable verification checklist while implementing
|
||||
- Current coverage:
|
||||
- `GET /` serves the static browser page.
|
||||
- `GET /healthz` reports `ok`.
|
||||
- `POST /api/uploads` creates `meta.json` and chunk directories.
|
||||
- `POST /api/uploads` creates `meta.json`, a temp upload file, and a
|
||||
completion-marker directory.
|
||||
- `POST /api/uploads` rejects an empty file name.
|
||||
- `PUT /api/uploads/:id/chunks/:index` stores validated chunk files.
|
||||
- `PUT /api/uploads/:id/chunks/:index` writes validated chunks into the
|
||||
temp upload file and records completion markers.
|
||||
- `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks.
|
||||
- `PUT /api/uploads/:id/chunks/:index` rejects out-of-range indexes.
|
||||
- `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks.
|
||||
- `GET /api/uploads/:id` reports completed chunks from disk.
|
||||
- `POST /api/uploads/:id/complete` assembles verified chunks and removes
|
||||
staging data.
|
||||
- `GET /api/uploads/:id` reports completed chunks from disk markers.
|
||||
- `POST /api/uploads/:id/complete` renames the verified temp upload file
|
||||
and removes staging data.
|
||||
- `POST /api/uploads/:id/complete` rejects incomplete uploads.
|
||||
- `POST /api/uploads/:id/complete` rejects corrupt chunk files.
|
||||
- `POST /api/uploads/:id/complete` rejects tampered temp upload files.
|
||||
- `static/app.js` passes `node --check`.
|
||||
- `just nginx-smoke`
|
||||
- Runs upl behind nginx in Docker.
|
||||
- Uploads a 17 MiB file through nginx.
|
||||
- Restarts the Rust backend mid-upload, resumes through nginx, completes, and
|
||||
compares SHA-256 hashes.
|
||||
- Serves the completed file through nginx's final-upload alias.
|
||||
|
||||
## Manual
|
||||
|
||||
|
||||
@@ -22,6 +22,14 @@ server {
|
||||
# auth_basic "upl";
|
||||
# auth_basic_user_file /etc/nginx/upl.htpasswd;
|
||||
|
||||
# Expose only completed uploads. Keep UPL_TEMP_DIR outside every nginx
|
||||
# alias/root so in-progress temp files and progress markers are private.
|
||||
location /files/ {
|
||||
alias /srv/upl/data/complete/;
|
||||
autoindex on;
|
||||
try_files $uri =404;
|
||||
}
|
||||
|
||||
location / {
|
||||
proxy_pass http://upl_backend;
|
||||
proxy_http_version 1.1;
|
||||
|
||||
+19
-2
@@ -9,10 +9,13 @@ workspace_dir="$(pwd)"
|
||||
mkdir -p "$workspace_dir/target/nginx-smoke"
|
||||
tmp_dir="$(mktemp -d "$workspace_dir/target/nginx-smoke/run.XXXXXXXX")"
|
||||
data_dir="$tmp_dir/data"
|
||||
complete_dir="$data_dir/complete"
|
||||
temp_dir="$tmp_dir/upload-temp"
|
||||
nginx_conf_dir="$tmp_dir/nginx-conf.d"
|
||||
nginx_conf="$nginx_conf_dir/default.conf"
|
||||
backend_log="$tmp_dir/backend.log"
|
||||
source_file="$tmp_dir/source.bin"
|
||||
served_file="$tmp_dir/served.bin"
|
||||
chunk0="$tmp_dir/chunk0.part"
|
||||
chunk1="$tmp_dir/chunk1.part"
|
||||
backend_pid=""
|
||||
@@ -29,7 +32,7 @@ cleanup() {
|
||||
trap cleanup EXIT
|
||||
|
||||
start_backend() {
|
||||
UPL_BIND="0.0.0.0:$backend_port" UPL_DATA_DIR="$data_dir" \
|
||||
UPL_BIND="0.0.0.0:$backend_port" UPL_DATA_DIR="$data_dir" UPL_TEMP_DIR="$temp_dir" \
|
||||
cargo run --quiet >"$backend_log" 2>&1 &
|
||||
backend_pid="$!"
|
||||
wait_for "http://127.0.0.1:$backend_port/healthz"
|
||||
@@ -66,13 +69,19 @@ process.stdin.on("end", () => {
|
||||
' "$field"
|
||||
}
|
||||
|
||||
mkdir -p "$data_dir" "$nginx_conf_dir"
|
||||
mkdir -p "$complete_dir" "$temp_dir" "$nginx_conf_dir"
|
||||
|
||||
cat >"$nginx_conf" <<EOF
|
||||
server {
|
||||
listen $proxy_port;
|
||||
client_max_body_size 64m;
|
||||
|
||||
location /files/ {
|
||||
alias /upl-complete/;
|
||||
autoindex off;
|
||||
try_files \$uri =404;
|
||||
}
|
||||
|
||||
location / {
|
||||
proxy_pass http://host.docker.internal:$backend_port;
|
||||
proxy_http_version 1.1;
|
||||
@@ -95,6 +104,7 @@ docker run -d --rm \
|
||||
--add-host host.docker.internal:host-gateway \
|
||||
-p "127.0.0.1:$proxy_port:$proxy_port" \
|
||||
-v "$nginx_conf_dir:/etc/nginx/conf.d:ro" \
|
||||
-v "$complete_dir:/upl-complete:ro" \
|
||||
"$nginx_image" >/dev/null
|
||||
wait_for "http://127.0.0.1:$proxy_port/healthz"
|
||||
|
||||
@@ -143,10 +153,17 @@ complete_path="$(printf '%s' "$complete_response" | json_field file_path)"
|
||||
|
||||
source_hash="$(sha256sum "$source_file" | awk '{print $1}')"
|
||||
complete_hash="$(sha256sum "$complete_path" | awk '{print $1}')"
|
||||
curl -fsS "http://127.0.0.1:$proxy_port/files/source.bin" -o "$served_file"
|
||||
served_hash="$(sha256sum "$served_file" | awk '{print $1}')"
|
||||
|
||||
if [[ "$source_hash" != "$complete_hash" ]]; then
|
||||
echo "Checksum mismatch after nginx-proxied resume" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ "$source_hash" != "$served_hash" ]]; then
|
||||
echo "Checksum mismatch through nginx completed-file alias" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "nginx smoke ok: $upload_id"
|
||||
|
||||
+2
-2
@@ -55,12 +55,12 @@ pub async fn put_chunk(
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
/// Assembles uploaded chunks into the final completed file.
|
||||
/// Promotes a fully uploaded temp file into the final completed file.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an API error when the upload is unknown, incomplete, invalid, or
|
||||
/// cannot be assembled on disk.
|
||||
/// cannot be promoted on disk.
|
||||
pub async fn complete_upload(
|
||||
State(state): State<AppState>,
|
||||
Path(upload_id): Path<String>,
|
||||
|
||||
+64
-3
@@ -19,6 +19,7 @@ use crate::{api, storage::Storage};
|
||||
const DEFAULT_BIND_ADDR: &str = "127.0.0.1:3000";
|
||||
const STATIC_DIR_ENV: &str = "UPL_STATIC_DIR";
|
||||
const DATA_DIR_ENV: &str = "UPL_DATA_DIR";
|
||||
const TEMP_DIR_ENV: &str = "UPL_TEMP_DIR";
|
||||
const BIND_ENV: &str = "UPL_BIND";
|
||||
const MAX_REQUEST_BODY_BYTES: usize = 64 * 1024 * 1024;
|
||||
|
||||
@@ -27,6 +28,7 @@ pub struct AppConfig {
|
||||
pub bind_addr: SocketAddr,
|
||||
pub static_dir: PathBuf,
|
||||
pub data_dir: PathBuf,
|
||||
pub temp_dir: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -40,7 +42,7 @@ pub struct AppState {
|
||||
version,
|
||||
about = "Run the upl resumable upload server.",
|
||||
long_about = "Run the upl resumable upload server.\n\nCommand-line arguments override environment variables. When neither is set, upl uses local development defaults inside the repository.",
|
||||
after_help = "Environment variables:\n UPL_BIND Default listen address\n UPL_STATIC_DIR Default static asset directory\n UPL_DATA_DIR Default upload data directory"
|
||||
after_help = "Environment variables:\n UPL_BIND Default listen address\n UPL_STATIC_DIR Default static asset directory\n UPL_DATA_DIR Default completed upload data directory\n UPL_TEMP_DIR Default temporary upload directory"
|
||||
)]
|
||||
pub struct CliArgs {
|
||||
/// Socket address to listen on. Overrides `UPL_BIND`. Defaults to 127.0.0.1:3000.
|
||||
@@ -51,9 +53,13 @@ pub struct CliArgs {
|
||||
#[arg(long, value_name = "PATH")]
|
||||
pub static_dir: Option<PathBuf>,
|
||||
|
||||
/// Directory where upload staging chunks and completed files are written. Overrides `UPL_DATA_DIR`.
|
||||
/// Directory where completed upload files are written. Overrides `UPL_DATA_DIR`.
|
||||
#[arg(long, value_name = "PATH")]
|
||||
pub data_dir: Option<PathBuf>,
|
||||
|
||||
/// Directory where upload metadata, progress markers, and temp files are written. Overrides `UPL_TEMP_DIR`.
|
||||
#[arg(long, value_name = "PATH")]
|
||||
pub temp_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl AppConfig {
|
||||
@@ -84,6 +90,7 @@ impl AppConfig {
|
||||
env::var(BIND_ENV).ok(),
|
||||
env::var_os(STATIC_DIR_ENV),
|
||||
env::var_os(DATA_DIR_ENV),
|
||||
env::var_os(TEMP_DIR_ENV),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -92,6 +99,7 @@ impl AppConfig {
|
||||
bind_env: Option<String>,
|
||||
static_dir_env: Option<OsString>,
|
||||
data_dir_env: Option<OsString>,
|
||||
temp_dir_env: Option<OsString>,
|
||||
) -> Result<Self, Box<dyn Error>> {
|
||||
let bind_addr = match (cli.bind, bind_env) {
|
||||
(Some(bind_addr), _) => bind_addr,
|
||||
@@ -107,11 +115,16 @@ impl AppConfig {
|
||||
.data_dir
|
||||
.or_else(|| data_dir_env.map(PathBuf::from))
|
||||
.unwrap_or_else(default_data_dir);
|
||||
let temp_dir = cli
|
||||
.temp_dir
|
||||
.or_else(|| temp_dir_env.map(PathBuf::from))
|
||||
.unwrap_or_else(|| default_temp_dir(&data_dir));
|
||||
|
||||
Ok(Self {
|
||||
bind_addr,
|
||||
static_dir,
|
||||
data_dir,
|
||||
temp_dir,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -120,18 +133,36 @@ impl AppConfig {
|
||||
bind_addr: SocketAddr,
|
||||
static_dir: impl Into<PathBuf>,
|
||||
data_dir: impl Into<PathBuf>,
|
||||
) -> Self {
|
||||
let data_dir = data_dir.into();
|
||||
let temp_dir = default_temp_dir(&data_dir);
|
||||
Self {
|
||||
bind_addr,
|
||||
static_dir: static_dir.into(),
|
||||
data_dir,
|
||||
temp_dir,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn new_with_temp_dir(
|
||||
bind_addr: SocketAddr,
|
||||
static_dir: impl Into<PathBuf>,
|
||||
data_dir: impl Into<PathBuf>,
|
||||
temp_dir: impl Into<PathBuf>,
|
||||
) -> Self {
|
||||
Self {
|
||||
bind_addr,
|
||||
static_dir: static_dir.into(),
|
||||
data_dir: data_dir.into(),
|
||||
temp_dir: temp_dir.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_router(config: &AppConfig) -> Router {
|
||||
let state = AppState {
|
||||
storage: Storage::new(&config.data_dir),
|
||||
storage: Storage::new(&config.data_dir, &config.temp_dir),
|
||||
};
|
||||
|
||||
Router::new()
|
||||
@@ -167,6 +198,10 @@ fn default_data_dir() -> PathBuf {
|
||||
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("data")
|
||||
}
|
||||
|
||||
fn default_temp_dir(data_dir: &Path) -> PathBuf {
|
||||
data_dir.join("staging")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{ffi::OsString, net::SocketAddr, path::PathBuf};
|
||||
@@ -185,11 +220,14 @@ mod tests {
|
||||
"public",
|
||||
"--data-dir",
|
||||
"uploads",
|
||||
"--temp-dir",
|
||||
"upload-temp",
|
||||
])?;
|
||||
|
||||
assert_eq!(args.bind, Some("127.0.0.1:4000".parse()?));
|
||||
assert_eq!(args.static_dir, Some(PathBuf::from("public")));
|
||||
assert_eq!(args.data_dir, Some(PathBuf::from("uploads")));
|
||||
assert_eq!(args.temp_dir, Some(PathBuf::from("upload-temp")));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -201,15 +239,18 @@ mod tests {
|
||||
bind: Some("127.0.0.1:4000".parse()?),
|
||||
static_dir: Some(PathBuf::from("cli-static")),
|
||||
data_dir: Some(PathBuf::from("cli-data")),
|
||||
temp_dir: Some(PathBuf::from("cli-temp")),
|
||||
},
|
||||
Some("127.0.0.1:3001".to_owned()),
|
||||
Some(OsString::from("env-static")),
|
||||
Some(OsString::from("env-data")),
|
||||
Some(OsString::from("env-temp")),
|
||||
)?;
|
||||
|
||||
assert_eq!(config.bind_addr, "127.0.0.1:4000".parse::<SocketAddr>()?);
|
||||
assert_eq!(config.static_dir, PathBuf::from("cli-static"));
|
||||
assert_eq!(config.data_dir, PathBuf::from("cli-data"));
|
||||
assert_eq!(config.temp_dir, PathBuf::from("cli-temp"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -222,11 +263,31 @@ mod tests {
|
||||
Some("127.0.0.1:3001".to_owned()),
|
||||
Some(OsString::from("env-static")),
|
||||
Some(OsString::from("env-data")),
|
||||
Some(OsString::from("env-temp")),
|
||||
)?;
|
||||
|
||||
assert_eq!(config.bind_addr, "127.0.0.1:3001".parse::<SocketAddr>()?);
|
||||
assert_eq!(config.static_dir, PathBuf::from("env-static"));
|
||||
assert_eq!(config.data_dir, PathBuf::from("env-data"));
|
||||
assert_eq!(config.temp_dir, PathBuf::from("env-temp"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn temp_dir_defaults_under_data_dir() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let config = AppConfig::from_sources(
|
||||
CliArgs {
|
||||
data_dir: Some(PathBuf::from("uploads")),
|
||||
..CliArgs::default()
|
||||
},
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)?;
|
||||
|
||||
assert_eq!(config.temp_dir, PathBuf::from("uploads").join("staging"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+93
-60
@@ -1,11 +1,15 @@
|
||||
use std::{
|
||||
error::Error,
|
||||
fmt::{self, Display},
|
||||
io::SeekFrom,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
|
||||
use tokio::{fs, io::AsyncWriteExt};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{AsyncSeekExt, AsyncWriteExt},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::model::{
|
||||
@@ -19,22 +23,25 @@ use crate::model::{
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Storage {
|
||||
data_dir: PathBuf,
|
||||
temp_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
#[must_use]
|
||||
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
|
||||
pub fn new(data_dir: impl Into<PathBuf>, temp_dir: impl Into<PathBuf>) -> Self {
|
||||
Self {
|
||||
data_dir: data_dir.into(),
|
||||
temp_dir: temp_dir.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a durable upload metadata record under `data/staging`.
|
||||
/// Creates a durable upload metadata record and temp upload file.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when directories cannot be created, metadata cannot be
|
||||
/// serialized, or the metadata file cannot be written atomically.
|
||||
/// Returns an error when directories cannot be created, the temp file
|
||||
/// cannot be created, metadata cannot be serialized, or the metadata file
|
||||
/// cannot be written atomically.
|
||||
pub async fn create_upload(
|
||||
&self,
|
||||
request: CreateUploadRequest,
|
||||
@@ -56,7 +63,12 @@ impl Storage {
|
||||
continue;
|
||||
}
|
||||
|
||||
fs::create_dir_all(upload_dir.join("chunks")).await?;
|
||||
fs::create_dir_all(self.completed_dir(&id)).await?;
|
||||
fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(self.upload_file_path(&id))
|
||||
.await?;
|
||||
|
||||
let meta = UploadMeta {
|
||||
id,
|
||||
@@ -76,12 +88,12 @@ impl Storage {
|
||||
Err(StorageError::IdCollision)
|
||||
}
|
||||
|
||||
/// Loads upload progress by scanning durable chunk files.
|
||||
/// Loads upload progress by scanning durable completion markers.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when the upload id is invalid, metadata is missing, or
|
||||
/// the staging directory cannot be scanned.
|
||||
/// the temp directory cannot be scanned.
|
||||
pub async fn progress(&self, upload_id: &str) -> Result<UploadProgressResponse, StorageError> {
|
||||
let meta = self.load_meta(upload_id).await?;
|
||||
let completed_chunks = self.completed_chunks(&meta).await?;
|
||||
@@ -89,13 +101,13 @@ impl Storage {
|
||||
Ok(meta.progress_response(completed_chunks))
|
||||
}
|
||||
|
||||
/// Validates and stores one raw chunk body.
|
||||
/// Validates and stores one raw chunk body in the temp upload file.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when the upload is unknown, the index is out of range,
|
||||
/// the body length is not the expected chunk length, or the chunk cannot be
|
||||
/// written and renamed into place.
|
||||
/// written to its final offset in the temp upload file.
|
||||
pub async fn store_chunk(
|
||||
&self,
|
||||
upload_id: &str,
|
||||
@@ -111,31 +123,33 @@ impl Storage {
|
||||
return Err(StorageError::InvalidInput("chunk has the wrong length"));
|
||||
}
|
||||
|
||||
let part_path = self.chunk_path(upload_id, index);
|
||||
if let Some(existing_len) = file_len(&part_path).await? {
|
||||
if existing_len == expected_len {
|
||||
if self.chunk_is_complete(&meta, index).await? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
return Err(StorageError::InvalidInput(
|
||||
"existing chunk has the wrong length",
|
||||
));
|
||||
}
|
||||
let mut output = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.open(self.upload_file_path(upload_id))
|
||||
.await?;
|
||||
output
|
||||
.seek(SeekFrom::Start(chunk_offset(&meta, index)))
|
||||
.await?;
|
||||
output.write_all(body).await?;
|
||||
output.flush().await?;
|
||||
drop(output);
|
||||
|
||||
let tmp_path = part_path.with_extension("part.tmp");
|
||||
fs::write(&tmp_path, body).await?;
|
||||
fs::rename(&tmp_path, &part_path).await?;
|
||||
self.mark_chunk_complete(&meta, index).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Assembles a complete upload from verified chunk files.
|
||||
/// Atomically promotes a complete temp upload file into completed storage.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when the upload is unknown, any expected chunk is
|
||||
/// missing or has the wrong length, the final file already exists, or the
|
||||
/// assembled file cannot be written and renamed.
|
||||
/// missing, the final file already exists, or the temp upload file cannot
|
||||
/// be renamed into place.
|
||||
pub async fn complete_upload(
|
||||
&self,
|
||||
upload_id: &str,
|
||||
@@ -144,33 +158,12 @@ impl Storage {
|
||||
|
||||
self.verify_all_chunks(&meta).await?;
|
||||
|
||||
let final_path = self.complete_dir().join(&meta.safe_name);
|
||||
let final_path = self.final_dir().join(&meta.safe_name);
|
||||
if fs::try_exists(&final_path).await? {
|
||||
return Err(StorageError::Conflict("complete file already exists"));
|
||||
}
|
||||
|
||||
let tmp_path = self
|
||||
.complete_dir()
|
||||
.join(format!(".{}.{}.tmp", meta.safe_name, meta.id));
|
||||
if fs::try_exists(&tmp_path).await? {
|
||||
fs::remove_file(&tmp_path).await?;
|
||||
}
|
||||
|
||||
let mut output = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&tmp_path)
|
||||
.await?;
|
||||
|
||||
for index in 0..meta.total_chunks {
|
||||
let bytes = fs::read(self.chunk_path(upload_id, index)).await?;
|
||||
output.write_all(&bytes).await?;
|
||||
}
|
||||
|
||||
output.flush().await?;
|
||||
drop(output);
|
||||
|
||||
fs::rename(&tmp_path, &final_path).await?;
|
||||
fs::rename(self.upload_file_path(upload_id), &final_path).await?;
|
||||
self.remove_upload_dir(upload_id).await?;
|
||||
|
||||
Ok(meta.complete_response(final_path.display().to_string()))
|
||||
@@ -181,11 +174,16 @@ impl Storage {
|
||||
&self.data_dir
|
||||
}
|
||||
|
||||
fn staging_dir(&self) -> PathBuf {
|
||||
self.data_dir.join("staging")
|
||||
#[must_use]
|
||||
pub fn temp_dir(&self) -> &Path {
|
||||
&self.temp_dir
|
||||
}
|
||||
|
||||
fn complete_dir(&self) -> PathBuf {
|
||||
fn staging_dir(&self) -> PathBuf {
|
||||
self.temp_dir.clone()
|
||||
}
|
||||
|
||||
fn final_dir(&self) -> PathBuf {
|
||||
self.data_dir.join("complete")
|
||||
}
|
||||
|
||||
@@ -193,15 +191,22 @@ impl Storage {
|
||||
self.staging_dir().join(upload_id)
|
||||
}
|
||||
|
||||
fn chunk_path(&self, upload_id: &str, index: u64) -> PathBuf {
|
||||
self.upload_dir(upload_id)
|
||||
.join("chunks")
|
||||
.join(format!("{index:06}.part"))
|
||||
fn upload_file_path(&self, upload_id: &str) -> PathBuf {
|
||||
self.upload_dir(upload_id).join(".upload.tmp")
|
||||
}
|
||||
|
||||
fn completed_dir(&self, upload_id: &str) -> PathBuf {
|
||||
self.upload_dir(upload_id).join("completed")
|
||||
}
|
||||
|
||||
fn completed_marker_path(&self, upload_id: &str, index: u64) -> PathBuf {
|
||||
self.completed_dir(upload_id)
|
||||
.join(format!("{index:06}.done"))
|
||||
}
|
||||
|
||||
async fn ensure_layout(&self) -> Result<(), StorageError> {
|
||||
fs::create_dir_all(self.staging_dir()).await?;
|
||||
fs::create_dir_all(self.complete_dir()).await?;
|
||||
fs::create_dir_all(self.final_dir()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -235,8 +240,7 @@ impl Storage {
|
||||
let mut completed = Vec::new();
|
||||
|
||||
for index in 0..meta.total_chunks {
|
||||
let expected_len = expected_chunk_len(meta, index)?;
|
||||
if file_len(&self.chunk_path(&meta.id, index)).await? == Some(expected_len) {
|
||||
if self.chunk_is_complete(meta, index).await? {
|
||||
completed.push(index);
|
||||
}
|
||||
}
|
||||
@@ -245,11 +249,12 @@ impl Storage {
|
||||
}
|
||||
|
||||
async fn verify_all_chunks(&self, meta: &UploadMeta) -> Result<(), StorageError> {
|
||||
for index in 0..meta.total_chunks {
|
||||
let expected_len = expected_chunk_len(meta, index)?;
|
||||
let actual_len = file_len(&self.chunk_path(&meta.id, index)).await?;
|
||||
if file_len(&self.upload_file_path(&meta.id)).await? != Some(meta.size) {
|
||||
return Err(StorageError::Conflict("upload data file is incomplete"));
|
||||
}
|
||||
|
||||
if actual_len != Some(expected_len) {
|
||||
for index in 0..meta.total_chunks {
|
||||
if !self.chunk_is_complete(meta, index).await? {
|
||||
return Err(StorageError::Conflict(
|
||||
"upload is missing one or more complete chunks",
|
||||
));
|
||||
@@ -259,6 +264,30 @@ impl Storage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn chunk_is_complete(&self, meta: &UploadMeta, index: u64) -> Result<bool, StorageError> {
|
||||
expected_chunk_len(meta, index)?;
|
||||
Ok(file_len(&self.completed_marker_path(&meta.id, index))
|
||||
.await?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
async fn mark_chunk_complete(&self, meta: &UploadMeta, index: u64) -> Result<(), StorageError> {
|
||||
let marker_path = self.completed_marker_path(&meta.id, index);
|
||||
match fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(marker_path)
|
||||
.await
|
||||
{
|
||||
Ok(mut marker) => {
|
||||
marker.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => Ok(()),
|
||||
Err(error) => Err(error.into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_upload_dir(&self, upload_id: &str) -> Result<(), StorageError> {
|
||||
match fs::remove_dir_all(self.upload_dir(upload_id)).await {
|
||||
Ok(()) => Ok(()),
|
||||
@@ -377,6 +406,10 @@ fn expected_chunk_len(meta: &UploadMeta, index: u64) -> Result<u64, StorageError
|
||||
}
|
||||
}
|
||||
|
||||
fn chunk_offset(meta: &UploadMeta, index: u64) -> u64 {
|
||||
meta.chunk_size * index
|
||||
}
|
||||
|
||||
async fn file_len(path: &Path) -> Result<Option<u64>, StorageError> {
|
||||
match fs::metadata(path).await {
|
||||
Ok(metadata) if metadata.is_file() => Ok(Some(metadata.len())),
|
||||
|
||||
+11
-9
@@ -42,13 +42,16 @@ async fn stores_chunks_and_reports_progress() -> Result<(), Box<dyn std::error::
|
||||
let progress = get_progress(&app, &upload.upload_id).await?;
|
||||
assert_eq!(progress.completed_chunks, vec![0, 1]);
|
||||
|
||||
let chunk_path = temp_dir
|
||||
.path()
|
||||
.join("staging")
|
||||
.join(&upload.upload_id)
|
||||
.join("chunks")
|
||||
.join("000000.part");
|
||||
assert_eq!(tokio::fs::metadata(chunk_path).await?.len(), CHUNK_SIZE);
|
||||
let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id);
|
||||
assert_eq!(
|
||||
tokio::fs::metadata(upload_dir.join(".upload.tmp"))
|
||||
.await?
|
||||
.len(),
|
||||
CHUNK_SIZE + 3
|
||||
);
|
||||
assert!(upload_dir.join("completed").join("000000.done").is_file());
|
||||
assert!(upload_dir.join("completed").join("000001.done").is_file());
|
||||
assert!(!upload_dir.join("chunks").exists());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -84,8 +87,7 @@ async fn rejects_out_of_range_chunk_index() -> Result<(), Box<dyn std::error::Er
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn accepts_duplicate_chunk_when_existing_length_matches()
|
||||
-> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn accepts_duplicate_completed_chunk() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let app = test_app(temp_dir.path());
|
||||
let upload = create_upload(&app, temp_dir.path(), 4).await?;
|
||||
|
||||
+4
-8
@@ -98,18 +98,14 @@ async fn rejects_incomplete_upload() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_corrupt_chunk_file() -> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn rejects_tampered_temp_upload_file() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let app = test_app(temp_dir.path());
|
||||
let upload = create_upload(&app, "corrupt.bin", 4).await?;
|
||||
|
||||
let chunk_path = temp_dir
|
||||
.path()
|
||||
.join("staging")
|
||||
.join(&upload.upload_id)
|
||||
.join("chunks")
|
||||
.join("000000.part");
|
||||
tokio::fs::write(chunk_path, b"bad").await?;
|
||||
let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id);
|
||||
tokio::fs::write(upload_dir.join(".upload.tmp"), b"bad").await?;
|
||||
tokio::fs::write(upload_dir.join("completed").join("000000.done"), b"").await?;
|
||||
|
||||
let response = app
|
||||
.oneshot(empty_request(
|
||||
|
||||
@@ -43,7 +43,8 @@ async fn creates_upload_metadata_on_disk() -> Result<(), Box<dyn std::error::Err
|
||||
|
||||
let upload_dir = temp_dir.path().join("staging").join(&response.upload_id);
|
||||
let meta_path = upload_dir.join("meta.json");
|
||||
assert!(upload_dir.join("chunks").is_dir());
|
||||
assert!(upload_dir.join(".upload.tmp").is_file());
|
||||
assert!(upload_dir.join("completed").is_dir());
|
||||
assert!(temp_dir.path().join("complete").is_dir());
|
||||
|
||||
let meta: UploadMeta = serde_json::from_slice(&tokio::fs::read(meta_path).await?)?;
|
||||
|
||||
Reference in New Issue
Block a user