Compare commits

...

7 Commits

Author SHA1 Message Date
ddidderr 60663a461c fix: reject duplicate completed upload names
A user could select another local file with the same name as one that already
exists in completed storage. The upload would be allowed to start and only hit
an existing-file conflict late in the flow, which made the UI look like the
file was uploadable.

Reject duplicate sanitized names during upload creation so no staging record or
chunk transfer starts for a file that cannot be completed. Keep the completion
path non-replacing as a second guard by promoting through a no-overwrite file
creation path, with a hard-link fast path and copy fallback for custom temp
locations.

The browser now treats the server's duplicate-name conflict as a terminal row:
it disables the action, marks the item visually, and tells the user to rename
the file if they want to upload that copy.

Test Plan:
- just check

Refs: none
2026-05-30 18:42:55 +02:00
ddidderr 1923ff2a6f feat: support parallel multi-file uploads
The browser upload flow was built around one selected file and one global
upload state. That made the existing chunk pool useful for a single file, but
users could not start several selected files at the same time.

Refactor the browser state into per-file upload items. Each selected file now
has its own upload record, completed-chunk set, abort controller, retry state,
progress row, and saved IndexedDB resume record. The picker accepts multiple
files, `Start all` and `Resume all` use a bounded file-level pool, and each file
keeps the existing bounded chunk pool. This keeps parallel uploads useful
without letting one large selection create unbounded request fan-out.

Keep the server API unchanged. Each file still receives a separate server upload
id, and server-side progress remains authoritative before any missing chunks are
scheduled. Terminal conflicts still stop the affected file without overwriting
completed data.

Update the user-facing markup, styles, project docs, and test checklist for the
multi-file scheduler. Add a server regression test that interleaves two uploads
and verifies the completed files contain exactly their own bytes.

Test Plan:
- just check
- git diff --check
2026-05-30 18:32:29 +02:00
ddidderr a7b3abd54a fix: render fresh upload progress as empty
A newly selected file has no server upload record yet, so the UI calls the
progress renderer with zero completed chunks and zero total chunks. Treating
that zero-total state as complete made the progress bar jump to 100% before
any upload had started.

Render zero-total progress as empty instead. Existing resumable uploads still
show their server-authoritative completed chunk percentage, and completed
non-empty uploads still render as full because their completed count equals a
non-zero total.

Test Plan:
- just static-check
- just test
- git diff --check
2026-05-30 18:21:54 +02:00
ddidderr c072b93726 feat: write chunks directly to temp upload files
Completed uploads used to copy every staged chunk into a second file before
renaming the result into data/complete. That doubled write volume and required
peak disk space for both the chunk set and the final file.

Write each chunk directly into one private temp upload file at its final offset
instead. After a chunk write succeeds, record a tiny durable completion marker
for progress and resume scans. Completion now verifies the temp file length and
all markers, then renames the temp file into the completed upload directory.

Add UPL_TEMP_DIR and --temp-dir so operators can choose where upload metadata,
markers, and temp files live. The default remains data/staging, and docs call
out that the temp directory must be on the same filesystem as data/complete for
atomic promotion. The nginx example now aliases only the completed upload
directory, and the smoke test verifies that final-file alias.

This keeps the existing length-based validation model; it does not add per-chunk
hashing.

Test Plan:
- just check
- just nginx-smoke
- cargo clippy && cargo clippy --benches && cargo clippy --tests
- cargo +nightly fmt --all
- cargo clippy && cargo clippy --benches && cargo clippy --tests

Refs: none
2026-05-30 18:10:05 +02:00
ddidderr 428af52e2f fix: remove staging chunks after completed uploads
Successful completion moved the assembled file into data/complete but left the
upload staging directory behind, including all chunk files. Remove the upload's
staging directory only after the final file has been renamed into place so
incomplete and failed uploads remain resumable.

A repeat complete request for that old upload id now returns 404 because the
temporary upload record has been retired with its chunks.

Test Plan:
- just check

Refs: none
2026-05-30 17:59:21 +02:00
ddidderr 996ad5c4c8 feat: add CLI configuration flags
Add clap-powered --bind, --static-dir, and --data-dir flags for human-run
server configuration. The merge order is now explicit: command-line arguments
win over UPL_* environment variables, which still fall back to the existing
repository-local defaults.

Document the new flags and allow just run to forward arguments to cargo so the
help text can be checked through the normal task runner.

Test Plan:
- just check
- cargo run -- --help

Refs: none
2026-05-30 17:52:00 +02:00
ddidderr 8d81b436e5 fix: clarify saved upload completion UI
The previous page showed a static "Server online" pill even though it did not
track backend liveness. It also left the selected file in an uploadable state
after completion, which made it too easy to start the same file again and then
land in a saved record that could only fail with "complete file already
exists".

Remove the misleading server-status UI and make saved uploads describe their
next action. Records with every chunk uploaded now show a Finish action, stale
server records are cleared, and a terminal "complete file already exists"
response clears the saved browser progress instead of inviting another resume.
A successful completion also clears the active file selection so the primary
actions settle back to idle.

Test Plan:
- just check

Refs: none
2026-05-30 17:39:44 +02:00
19 changed files with 1375 additions and 306 deletions
Generated
+121
View File
@@ -2,6 +2,56 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "anstream"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000"
[[package]]
name = "anstyle-parse"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
dependencies = [
"windows-sys",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys",
]
[[package]]
name = "anyhow"
version = "1.0.102"
@@ -90,6 +140,52 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "clap"
version = "4.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
[[package]]
name = "colorchoice"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570"
[[package]]
name = "deranged"
version = "0.5.8"
@@ -314,6 +410,12 @@ dependencies = [
"serde_core",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
[[package]]
name = "itoa"
version = "1.0.18"
@@ -416,6 +518,12 @@ version = "1.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
[[package]]
name = "once_cell_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "parking_lot"
version = "0.12.5"
@@ -635,6 +743,12 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"
version = "2.0.117"
@@ -833,6 +947,7 @@ name = "upl"
version = "0.1.0"
dependencies = [
"axum",
"clap",
"http-body-util",
"serde",
"serde_json",
@@ -844,6 +959,12 @@ dependencies = [
"uuid",
]
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.23.2"
+1
View File
@@ -5,6 +5,7 @@ edition = "2024"
[dependencies]
axum = "0.8.9"
clap = { version = "4.5.53", features = ["derive"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.150"
time = { version = "0.3.47", features = ["formatting", "serde"] }
+10 -8
View File
@@ -26,9 +26,9 @@ The program should stay simple:
The browser owns file selection and chunk scheduling.
- Let the user pick one file.
- Let the user pick one or more files.
- Slice it into fixed-size chunks with `Blob.slice()`.
- Upload a few chunks concurrently.
- Upload a few files concurrently, with a separate chunk pool per file.
- Retry failed chunks with exponential backoff.
- Persist pending upload state in IndexedDB.
- Use the File System Access API when available so the same local file can be
@@ -194,11 +194,12 @@ The server should not delete staging data until assembly succeeds.
### First Upload
1. User selects a file.
2. Browser calls `POST /api/uploads`.
3. Browser stores the returned `upload_id` and file handle in IndexedDB.
4. Browser uploads missing chunks with a small concurrency pool.
5. Browser calls `/complete` when all chunks are uploaded.
1. User selects one or more files.
2. Browser creates one selected upload row per file.
3. Browser calls `POST /api/uploads` once for each file being started.
4. Browser stores each returned `upload_id` and file handle in IndexedDB.
5. Browser uploads missing chunks with bounded file and chunk concurrency pools.
6. Browser calls `/complete` for each file when all of its chunks are uploaded.
### After Interruption
@@ -239,7 +240,8 @@ Start with these defaults:
```text
chunk size: 16 MiB
concurrency: 3
file concurrency: 3
chunk concurrency per file: 3
max retries per chunk: 5
```
+37 -12
View File
@@ -6,9 +6,9 @@
browser -> nginx -> upl Rust server -> local filesystem
```
The first implementation milestone provides the Rust server shell and static
browser UI. Upload metadata, chunk persistence, resume state, and completion
assembly are tracked in `PLAN.md` and will be added in later coherent slices.
The server writes upload chunks directly into an inaccessible temp file at
their final offsets. Once every chunk is present, completion promotes that temp
file into the completed upload directory without replacing an existing file.
## Project Structure
@@ -19,12 +19,12 @@ upl
src/app.rs Axum router, shared state, static file service
src/api.rs HTTP handlers and API error responses
src/model.rs JSON request, response, and metadata shapes
src/storage.rs local filesystem layout, chunks, and assembly
src/storage.rs local filesystem layout, offset writes, and final promotion
src/lib.rs library surface used by integration tests
Browser UI
static/index.html upload tool markup
static/styles.css responsive tool styling
static/app.js upload scheduler, retries, and browser resume state
static/app.js multi-file scheduler, retries, and browser resume state
Deployment
deploy/nginx/ nginx reverse proxy example
scripts/ reusable local smoke tests
@@ -35,13 +35,22 @@ upl
## Configuration
- `UPL_BIND` sets the listen address. It defaults to `127.0.0.1:3000`.
- `UPL_STATIC_DIR` sets the static asset directory. It defaults to `static/`
inside this repository.
- `UPL_DATA_DIR` sets the upload data directory. It defaults to `data/` inside
this repository.
- `--bind` sets the listen address. It overrides `UPL_BIND` and defaults to
`127.0.0.1:3000`.
- `--static-dir` sets the static asset directory. It overrides `UPL_STATIC_DIR`
and defaults to `static/` inside this repository.
- `--data-dir` sets the completed upload data root. Completed files land under
its `complete/` subdirectory. It overrides `UPL_DATA_DIR` and defaults to
`data/` inside this repository.
- `--temp-dir` sets the directory for upload metadata, completion markers, and
inaccessible temp upload files. It overrides `UPL_TEMP_DIR` and defaults to
`<data-dir>/staging`.
- `upl --help` prints the full argument help text.
- The server accepts request bodies up to 64 MiB, which leaves room for the
planned 16 MiB upload chunks and matches the nginx example in `PLAN.md`.
- Keep `UPL_TEMP_DIR` on the same filesystem as `<data-dir>/complete` for the
cheapest final promotion. Cross-filesystem temp directories still work, but
completion falls back to copying into a newly created final file.
## Common Commands
@@ -54,17 +63,33 @@ just run
`just check` also syntax-checks the static browser JavaScript with `node`.
## Browser Uploads
The browser UI accepts multiple selected files. `Start all` runs up to three
file uploads at the same time, and each file still uploads up to three chunks
concurrently. Every selected file keeps its own upload id, progress markers,
abort controller, retry state, and saved IndexedDB resume record.
If a completed file with the same sanitized name already exists, the server
rejects the upload before staging begins. The selected row is marked
unavailable and tells the user to rename the file if they want to upload that
copy.
## nginx
Run `upl` on localhost and put nginx in front of it for TLS and access control:
```sh
UPL_BIND=127.0.0.1:3000 UPL_DATA_DIR=/srv/upl/data upl
UPL_BIND=127.0.0.1:3000 \
UPL_DATA_DIR=/srv/upl/data \
UPL_TEMP_DIR=/srv/upl/data/staging \
upl
```
Use `deploy/nginx/upl.conf.example` as the starting point for the nginx site.
Before exposing the service, replace the certificate paths and add a protection
layer such as HTTP basic auth, an IP allowlist, or VPN-only access.
layer such as HTTP basic auth, an IP allowlist, or VPN-only access. The nginx
example aliases only `/srv/upl/data/complete`; do not expose `UPL_TEMP_DIR`.
For a local Docker-based reverse-proxy smoke test:
+17 -5
View File
@@ -10,22 +10,32 @@ Keep this file as the reusable verification checklist while implementing
- Current coverage:
- `GET /` serves the static browser page.
- `GET /healthz` reports `ok`.
- `POST /api/uploads` creates `meta.json` and chunk directories.
- `POST /api/uploads` creates `meta.json`, a temp upload file, and a
completion-marker directory.
- `POST /api/uploads` rejects an empty file name.
- `PUT /api/uploads/:id/chunks/:index` stores validated chunk files.
- `POST /api/uploads` rejects a name that already exists in completed
storage before staging begins.
- `PUT /api/uploads/:id/chunks/:index` writes validated chunks into the
temp upload file and records completion markers.
- `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks.
- `PUT /api/uploads/:id/chunks/:index` rejects out-of-range indexes.
- `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks.
- `GET /api/uploads/:id` reports completed chunks from disk.
- `POST /api/uploads/:id/complete` assembles verified chunks.
- `GET /api/uploads/:id` reports completed chunks from disk markers.
- `POST /api/uploads/:id/complete` promotes the verified temp upload file
and removes staging data.
- Parallel upload requests for separate files complete without crossing
bytes between temp upload files.
- `POST /api/uploads/:id/complete` rejects incomplete uploads.
- `POST /api/uploads/:id/complete` rejects corrupt chunk files.
- `POST /api/uploads/:id/complete` refuses to replace a completed file that
appears after the upload was created.
- `POST /api/uploads/:id/complete` rejects tampered temp upload files.
- `static/app.js` passes `node --check`.
- `just nginx-smoke`
- Runs upl behind nginx in Docker.
- Uploads a 17 MiB file through nginx.
- Restarts the Rust backend mid-upload, resumes through nginx, completes, and
compares SHA-256 hashes.
- Serves the completed file through nginx's final-upload alias.
## Manual
@@ -34,6 +44,8 @@ deployment retests.
- Upload a small file in one pass.
- Upload a file larger than one chunk.
- Select multiple files and confirm several upload rows advance at the same
time.
- Kill the browser tab mid-upload and resume.
- Restart the Rust server mid-upload and resume.
- Interrupt the network and resume.
+8
View File
@@ -22,6 +22,14 @@ server {
# auth_basic "upl";
# auth_basic_user_file /etc/nginx/upl.htpasswd;
# Expose only completed uploads. Keep UPL_TEMP_DIR outside every nginx
# alias/root so in-progress temp files and progress markers are private.
location /files/ {
alias /srv/upl/data/complete/;
autoindex on;
try_files $uri =404;
}
location / {
proxy_pass http://upl_backend;
proxy_http_version 1.1;
+2 -2
View File
@@ -20,5 +20,5 @@ check:
just static-check
just clippy
run:
cargo run
run *args:
cargo run -- {{args}}
+19 -2
View File
@@ -9,10 +9,13 @@ workspace_dir="$(pwd)"
mkdir -p "$workspace_dir/target/nginx-smoke"
tmp_dir="$(mktemp -d "$workspace_dir/target/nginx-smoke/run.XXXXXXXX")"
data_dir="$tmp_dir/data"
complete_dir="$data_dir/complete"
temp_dir="$tmp_dir/upload-temp"
nginx_conf_dir="$tmp_dir/nginx-conf.d"
nginx_conf="$nginx_conf_dir/default.conf"
backend_log="$tmp_dir/backend.log"
source_file="$tmp_dir/source.bin"
served_file="$tmp_dir/served.bin"
chunk0="$tmp_dir/chunk0.part"
chunk1="$tmp_dir/chunk1.part"
backend_pid=""
@@ -29,7 +32,7 @@ cleanup() {
trap cleanup EXIT
start_backend() {
UPL_BIND="0.0.0.0:$backend_port" UPL_DATA_DIR="$data_dir" \
UPL_BIND="0.0.0.0:$backend_port" UPL_DATA_DIR="$data_dir" UPL_TEMP_DIR="$temp_dir" \
cargo run --quiet >"$backend_log" 2>&1 &
backend_pid="$!"
wait_for "http://127.0.0.1:$backend_port/healthz"
@@ -66,13 +69,19 @@ process.stdin.on("end", () => {
' "$field"
}
mkdir -p "$data_dir" "$nginx_conf_dir"
mkdir -p "$complete_dir" "$temp_dir" "$nginx_conf_dir"
cat >"$nginx_conf" <<EOF
server {
listen $proxy_port;
client_max_body_size 64m;
location /files/ {
alias /upl-complete/;
autoindex off;
try_files \$uri =404;
}
location / {
proxy_pass http://host.docker.internal:$backend_port;
proxy_http_version 1.1;
@@ -95,6 +104,7 @@ docker run -d --rm \
--add-host host.docker.internal:host-gateway \
-p "127.0.0.1:$proxy_port:$proxy_port" \
-v "$nginx_conf_dir:/etc/nginx/conf.d:ro" \
-v "$complete_dir:/upl-complete:ro" \
"$nginx_image" >/dev/null
wait_for "http://127.0.0.1:$proxy_port/healthz"
@@ -143,10 +153,17 @@ complete_path="$(printf '%s' "$complete_response" | json_field file_path)"
source_hash="$(sha256sum "$source_file" | awk '{print $1}')"
complete_hash="$(sha256sum "$complete_path" | awk '{print $1}')"
curl -fsS "http://127.0.0.1:$proxy_port/files/source.bin" -o "$served_file"
served_hash="$(sha256sum "$served_file" | awk '{print $1}')"
if [[ "$source_hash" != "$complete_hash" ]]; then
echo "Checksum mismatch after nginx-proxied resume" >&2
exit 1
fi
if [[ "$source_hash" != "$served_hash" ]]; then
echo "Checksum mismatch through nginx completed-file alias" >&2
exit 1
fi
echo "nginx smoke ok: $upload_id"
+2 -2
View File
@@ -55,12 +55,12 @@ pub async fn put_chunk(
Ok(StatusCode::NO_CONTENT)
}
/// Assembles uploaded chunks into the final completed file.
/// Promotes a fully uploaded temp file into the final completed file.
///
/// # Errors
///
/// Returns an API error when the upload is unknown, incomplete, invalid, or
/// cannot be assembled on disk.
/// cannot be promoted on disk.
pub async fn complete_upload(
State(state): State<AppState>,
Path(upload_id): Path<String>,
+196 -7
View File
@@ -1,6 +1,7 @@
use std::{
env,
error::Error,
ffi::OsString,
net::SocketAddr,
path::{Path, PathBuf},
};
@@ -10,6 +11,7 @@ use axum::{
extract::DefaultBodyLimit,
routing::{get, post},
};
use clap::Parser;
use tower_http::services::{ServeDir, ServeFile};
use crate::{api, storage::Storage};
@@ -17,6 +19,7 @@ use crate::{api, storage::Storage};
const DEFAULT_BIND_ADDR: &str = "127.0.0.1:3000";
const STATIC_DIR_ENV: &str = "UPL_STATIC_DIR";
const DATA_DIR_ENV: &str = "UPL_DATA_DIR";
const TEMP_DIR_ENV: &str = "UPL_TEMP_DIR";
const BIND_ENV: &str = "UPL_BIND";
const MAX_REQUEST_BODY_BYTES: usize = 64 * 1024 * 1024;
@@ -25,6 +28,7 @@ pub struct AppConfig {
pub bind_addr: SocketAddr,
pub static_dir: PathBuf,
pub data_dir: PathBuf,
pub temp_dir: PathBuf,
}
#[derive(Clone, Debug)]
@@ -32,23 +36,95 @@ pub struct AppState {
pub storage: Storage,
}
#[derive(Clone, Debug, Default, Parser)]
#[command(
name = "upl",
version,
about = "Run the upl resumable upload server.",
long_about = "Run the upl resumable upload server.\n\nCommand-line arguments override environment variables. When neither is set, upl uses local development defaults inside the repository.",
after_help = "Environment variables:\n UPL_BIND Default listen address\n UPL_STATIC_DIR Default static asset directory\n UPL_DATA_DIR Default completed upload data directory\n UPL_TEMP_DIR Default temporary upload directory"
)]
pub struct CliArgs {
/// Socket address to listen on. Overrides `UPL_BIND`. Defaults to 127.0.0.1:3000.
#[arg(long, value_name = "ADDR")]
pub bind: Option<SocketAddr>,
/// Directory containing index.html and other browser assets. Overrides `UPL_STATIC_DIR`.
#[arg(long, value_name = "PATH")]
pub static_dir: Option<PathBuf>,
/// Directory where completed upload files are written. Overrides `UPL_DATA_DIR`.
#[arg(long, value_name = "PATH")]
pub data_dir: Option<PathBuf>,
/// Directory where upload metadata, progress markers, and temp files are written. Overrides `UPL_TEMP_DIR`.
#[arg(long, value_name = "PATH")]
pub temp_dir: Option<PathBuf>,
}
impl AppConfig {
/// Loads bind and static directory settings from environment variables.
/// Loads settings from command-line arguments and environment variables.
///
/// Command-line arguments take precedence over environment variables.
///
/// # Errors
///
/// Returns an error when an argument or `UPL_BIND` is not a valid socket
/// address.
pub fn from_args() -> Result<Self, Box<dyn Error>> {
Self::from_cli_and_env(CliArgs::parse())
}
/// Loads settings from environment variables.
///
/// # Errors
///
/// Returns an error when `UPL_BIND` is set but is not a valid socket address.
pub fn from_env() -> Result<Self, Box<dyn Error>> {
let bind_addr = env::var(BIND_ENV)
.unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_owned())
.parse()?;
let static_dir = env::var_os(STATIC_DIR_ENV).map_or_else(default_static_dir, PathBuf::from);
let data_dir = env::var_os(DATA_DIR_ENV).map_or_else(default_data_dir, PathBuf::from);
Self::from_cli_and_env(CliArgs::default())
}
fn from_cli_and_env(cli: CliArgs) -> Result<Self, Box<dyn Error>> {
Self::from_sources(
cli,
env::var(BIND_ENV).ok(),
env::var_os(STATIC_DIR_ENV),
env::var_os(DATA_DIR_ENV),
env::var_os(TEMP_DIR_ENV),
)
}
fn from_sources(
cli: CliArgs,
bind_env: Option<String>,
static_dir_env: Option<OsString>,
data_dir_env: Option<OsString>,
temp_dir_env: Option<OsString>,
) -> Result<Self, Box<dyn Error>> {
let bind_addr = match (cli.bind, bind_env) {
(Some(bind_addr), _) => bind_addr,
(None, Some(bind_addr)) => bind_addr.parse()?,
(None, None) => DEFAULT_BIND_ADDR.parse()?,
};
let static_dir = cli
.static_dir
.or_else(|| static_dir_env.map(PathBuf::from))
.unwrap_or_else(default_static_dir);
let data_dir = cli
.data_dir
.or_else(|| data_dir_env.map(PathBuf::from))
.unwrap_or_else(default_data_dir);
let temp_dir = cli
.temp_dir
.or_else(|| temp_dir_env.map(PathBuf::from))
.unwrap_or_else(|| default_temp_dir(&data_dir));
Ok(Self {
bind_addr,
static_dir,
data_dir,
temp_dir,
})
}
@@ -57,18 +133,36 @@ impl AppConfig {
bind_addr: SocketAddr,
static_dir: impl Into<PathBuf>,
data_dir: impl Into<PathBuf>,
) -> Self {
let data_dir = data_dir.into();
let temp_dir = default_temp_dir(&data_dir);
Self {
bind_addr,
static_dir: static_dir.into(),
data_dir,
temp_dir,
}
}
#[must_use]
pub fn new_with_temp_dir(
bind_addr: SocketAddr,
static_dir: impl Into<PathBuf>,
data_dir: impl Into<PathBuf>,
temp_dir: impl Into<PathBuf>,
) -> Self {
Self {
bind_addr,
static_dir: static_dir.into(),
data_dir: data_dir.into(),
temp_dir: temp_dir.into(),
}
}
}
pub fn build_router(config: &AppConfig) -> Router {
let state = AppState {
storage: Storage::new(&config.data_dir),
storage: Storage::new(&config.data_dir, &config.temp_dir),
};
Router::new()
@@ -103,3 +197,98 @@ fn default_static_dir() -> PathBuf {
fn default_data_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("data")
}
fn default_temp_dir(data_dir: &Path) -> PathBuf {
data_dir.join("staging")
}
#[cfg(test)]
mod tests {
use std::{ffi::OsString, net::SocketAddr, path::PathBuf};
use clap::Parser;
use super::{AppConfig, CliArgs};
#[test]
fn parses_config_arguments() -> Result<(), Box<dyn std::error::Error>> {
let args = CliArgs::try_parse_from([
"upl",
"--bind",
"127.0.0.1:4000",
"--static-dir",
"public",
"--data-dir",
"uploads",
"--temp-dir",
"upload-temp",
])?;
assert_eq!(args.bind, Some("127.0.0.1:4000".parse()?));
assert_eq!(args.static_dir, Some(PathBuf::from("public")));
assert_eq!(args.data_dir, Some(PathBuf::from("uploads")));
assert_eq!(args.temp_dir, Some(PathBuf::from("upload-temp")));
Ok(())
}
#[test]
fn cli_arguments_override_environment_values() -> Result<(), Box<dyn std::error::Error>> {
let config = AppConfig::from_sources(
CliArgs {
bind: Some("127.0.0.1:4000".parse()?),
static_dir: Some(PathBuf::from("cli-static")),
data_dir: Some(PathBuf::from("cli-data")),
temp_dir: Some(PathBuf::from("cli-temp")),
},
Some("127.0.0.1:3001".to_owned()),
Some(OsString::from("env-static")),
Some(OsString::from("env-data")),
Some(OsString::from("env-temp")),
)?;
assert_eq!(config.bind_addr, "127.0.0.1:4000".parse::<SocketAddr>()?);
assert_eq!(config.static_dir, PathBuf::from("cli-static"));
assert_eq!(config.data_dir, PathBuf::from("cli-data"));
assert_eq!(config.temp_dir, PathBuf::from("cli-temp"));
Ok(())
}
#[test]
fn environment_values_are_used_when_arguments_are_absent()
-> Result<(), Box<dyn std::error::Error>> {
let config = AppConfig::from_sources(
CliArgs::default(),
Some("127.0.0.1:3001".to_owned()),
Some(OsString::from("env-static")),
Some(OsString::from("env-data")),
Some(OsString::from("env-temp")),
)?;
assert_eq!(config.bind_addr, "127.0.0.1:3001".parse::<SocketAddr>()?);
assert_eq!(config.static_dir, PathBuf::from("env-static"));
assert_eq!(config.data_dir, PathBuf::from("env-data"));
assert_eq!(config.temp_dir, PathBuf::from("env-temp"));
Ok(())
}
#[test]
fn temp_dir_defaults_under_data_dir() -> Result<(), Box<dyn std::error::Error>> {
let config = AppConfig::from_sources(
CliArgs {
data_dir: Some(PathBuf::from("uploads")),
..CliArgs::default()
},
None,
None,
None,
None,
)?;
assert_eq!(config.temp_dir, PathBuf::from("uploads").join("staging"));
Ok(())
}
}
+1 -1
View File
@@ -4,7 +4,7 @@ use upl::app::{AppConfig, build_router};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = AppConfig::from_env()?;
let config = AppConfig::from_args()?;
let listener = tokio::net::TcpListener::bind(config.bind_addr).await?;
println!("upl listening on http://{}", listener.local_addr()?);
+154 -65
View File
@@ -1,11 +1,15 @@
use std::{
error::Error,
fmt::{self, Display},
io::SeekFrom,
path::{Path, PathBuf},
};
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use tokio::{fs, io::AsyncWriteExt};
use tokio::{
fs,
io::{AsyncSeekExt, AsyncWriteExt},
};
use uuid::Uuid;
use crate::model::{
@@ -16,25 +20,30 @@ use crate::model::{
UploadProgressResponse,
};
const FILE_EXISTS_MESSAGE: &str = "file already exists";
#[derive(Clone, Debug)]
pub struct Storage {
data_dir: PathBuf,
temp_dir: PathBuf,
}
impl Storage {
#[must_use]
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
pub fn new(data_dir: impl Into<PathBuf>, temp_dir: impl Into<PathBuf>) -> Self {
Self {
data_dir: data_dir.into(),
temp_dir: temp_dir.into(),
}
}
/// Creates a durable upload metadata record under `data/staging`.
/// Creates a durable upload metadata record and temp upload file.
///
/// # Errors
///
/// Returns an error when directories cannot be created, metadata cannot be
/// serialized, or the metadata file cannot be written atomically.
/// Returns an error when directories cannot be created, the temp file
/// cannot be created, metadata cannot be serialized, or the metadata file
/// cannot be written atomically.
pub async fn create_upload(
&self,
request: CreateUploadRequest,
@@ -47,6 +56,10 @@ impl Storage {
self.ensure_layout().await?;
let safe_name = safe_file_name(original_name);
if fs::try_exists(self.final_path(&safe_name)).await? {
return Err(StorageError::Conflict(FILE_EXISTS_MESSAGE));
}
let created_at = OffsetDateTime::now_utc().format(&Rfc3339)?;
for _ in 0..8 {
@@ -56,7 +69,12 @@ impl Storage {
continue;
}
fs::create_dir_all(upload_dir.join("chunks")).await?;
fs::create_dir_all(self.completed_dir(&id)).await?;
fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(self.upload_file_path(&id))
.await?;
let meta = UploadMeta {
id,
@@ -76,12 +94,12 @@ impl Storage {
Err(StorageError::IdCollision)
}
/// Loads upload progress by scanning durable chunk files.
/// Loads upload progress by scanning durable completion markers.
///
/// # Errors
///
/// Returns an error when the upload id is invalid, metadata is missing, or
/// the staging directory cannot be scanned.
/// the temp directory cannot be scanned.
pub async fn progress(&self, upload_id: &str) -> Result<UploadProgressResponse, StorageError> {
let meta = self.load_meta(upload_id).await?;
let completed_chunks = self.completed_chunks(&meta).await?;
@@ -89,13 +107,13 @@ impl Storage {
Ok(meta.progress_response(completed_chunks))
}
/// Validates and stores one raw chunk body.
/// Validates and stores one raw chunk body in the temp upload file.
///
/// # Errors
///
/// Returns an error when the upload is unknown, the index is out of range,
/// the body length is not the expected chunk length, or the chunk cannot be
/// written and renamed into place.
/// written to its final offset in the temp upload file.
pub async fn store_chunk(
&self,
upload_id: &str,
@@ -111,31 +129,33 @@ impl Storage {
return Err(StorageError::InvalidInput("chunk has the wrong length"));
}
let part_path = self.chunk_path(upload_id, index);
if let Some(existing_len) = file_len(&part_path).await? {
if existing_len == expected_len {
return Ok(());
}
return Err(StorageError::InvalidInput(
"existing chunk has the wrong length",
));
if self.chunk_is_complete(&meta, index).await? {
return Ok(());
}
let tmp_path = part_path.with_extension("part.tmp");
fs::write(&tmp_path, body).await?;
fs::rename(&tmp_path, &part_path).await?;
let mut output = fs::OpenOptions::new()
.write(true)
.open(self.upload_file_path(upload_id))
.await?;
output
.seek(SeekFrom::Start(chunk_offset(&meta, index)))
.await?;
output.write_all(body).await?;
output.flush().await?;
drop(output);
self.mark_chunk_complete(&meta, index).await?;
Ok(())
}
/// Assembles a complete upload from verified chunk files.
/// Atomically promotes a complete temp upload file into completed storage.
///
/// # Errors
///
/// Returns an error when the upload is unknown, any expected chunk is
/// missing or has the wrong length, the final file already exists, or the
/// assembled file cannot be written and renamed.
/// missing, the final file already exists, or the temp upload file cannot
/// be renamed into place.
pub async fn complete_upload(
&self,
upload_id: &str,
@@ -144,33 +164,10 @@ impl Storage {
self.verify_all_chunks(&meta).await?;
let final_path = self.complete_dir().join(&meta.safe_name);
if fs::try_exists(&final_path).await? {
return Err(StorageError::Conflict("complete file already exists"));
}
let final_path = self.final_path(&meta.safe_name);
let tmp_path = self
.complete_dir()
.join(format!(".{}.{}.tmp", meta.safe_name, meta.id));
if fs::try_exists(&tmp_path).await? {
fs::remove_file(&tmp_path).await?;
}
let mut output = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
.await?;
for index in 0..meta.total_chunks {
let bytes = fs::read(self.chunk_path(upload_id, index)).await?;
output.write_all(&bytes).await?;
}
output.flush().await?;
drop(output);
fs::rename(&tmp_path, &final_path).await?;
promote_file_without_overwrite(&self.upload_file_path(upload_id), &final_path).await?;
self.remove_upload_dir(upload_id).await?;
Ok(meta.complete_response(final_path.display().to_string()))
}
@@ -180,27 +177,43 @@ impl Storage {
&self.data_dir
}
fn staging_dir(&self) -> PathBuf {
self.data_dir.join("staging")
#[must_use]
pub fn temp_dir(&self) -> &Path {
&self.temp_dir
}
fn complete_dir(&self) -> PathBuf {
fn staging_dir(&self) -> PathBuf {
self.temp_dir.clone()
}
fn final_dir(&self) -> PathBuf {
self.data_dir.join("complete")
}
fn final_path(&self, safe_name: &str) -> PathBuf {
self.final_dir().join(safe_name)
}
fn upload_dir(&self, upload_id: &str) -> PathBuf {
self.staging_dir().join(upload_id)
}
fn chunk_path(&self, upload_id: &str, index: u64) -> PathBuf {
self.upload_dir(upload_id)
.join("chunks")
.join(format!("{index:06}.part"))
fn upload_file_path(&self, upload_id: &str) -> PathBuf {
self.upload_dir(upload_id).join(".upload.tmp")
}
fn completed_dir(&self, upload_id: &str) -> PathBuf {
self.upload_dir(upload_id).join("completed")
}
fn completed_marker_path(&self, upload_id: &str, index: u64) -> PathBuf {
self.completed_dir(upload_id)
.join(format!("{index:06}.done"))
}
async fn ensure_layout(&self) -> Result<(), StorageError> {
fs::create_dir_all(self.staging_dir()).await?;
fs::create_dir_all(self.complete_dir()).await?;
fs::create_dir_all(self.final_dir()).await?;
Ok(())
}
@@ -234,8 +247,7 @@ impl Storage {
let mut completed = Vec::new();
for index in 0..meta.total_chunks {
let expected_len = expected_chunk_len(meta, index)?;
if file_len(&self.chunk_path(&meta.id, index)).await? == Some(expected_len) {
if self.chunk_is_complete(meta, index).await? {
completed.push(index);
}
}
@@ -244,11 +256,12 @@ impl Storage {
}
async fn verify_all_chunks(&self, meta: &UploadMeta) -> Result<(), StorageError> {
for index in 0..meta.total_chunks {
let expected_len = expected_chunk_len(meta, index)?;
let actual_len = file_len(&self.chunk_path(&meta.id, index)).await?;
if file_len(&self.upload_file_path(&meta.id)).await? != Some(meta.size) {
return Err(StorageError::Conflict("upload data file is incomplete"));
}
if actual_len != Some(expected_len) {
for index in 0..meta.total_chunks {
if !self.chunk_is_complete(meta, index).await? {
return Err(StorageError::Conflict(
"upload is missing one or more complete chunks",
));
@@ -257,6 +270,38 @@ impl Storage {
Ok(())
}
async fn chunk_is_complete(&self, meta: &UploadMeta, index: u64) -> Result<bool, StorageError> {
expected_chunk_len(meta, index)?;
Ok(file_len(&self.completed_marker_path(&meta.id, index))
.await?
.is_some())
}
async fn mark_chunk_complete(&self, meta: &UploadMeta, index: u64) -> Result<(), StorageError> {
let marker_path = self.completed_marker_path(&meta.id, index);
match fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(marker_path)
.await
{
Ok(mut marker) => {
marker.flush().await?;
Ok(())
}
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => Ok(()),
Err(error) => Err(error.into()),
}
}
async fn remove_upload_dir(&self, upload_id: &str) -> Result<(), StorageError> {
match fs::remove_dir_all(self.upload_dir(upload_id)).await {
Ok(()) => Ok(()),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(error) => Err(error.into()),
}
}
}
#[derive(Debug)]
@@ -368,6 +413,10 @@ fn expected_chunk_len(meta: &UploadMeta, index: u64) -> Result<u64, StorageError
}
}
fn chunk_offset(meta: &UploadMeta, index: u64) -> u64 {
meta.chunk_size * index
}
async fn file_len(path: &Path) -> Result<Option<u64>, StorageError> {
match fs::metadata(path).await {
Ok(metadata) if metadata.is_file() => Ok(Some(metadata.len())),
@@ -377,6 +426,46 @@ async fn file_len(path: &Path) -> Result<Option<u64>, StorageError> {
}
}
async fn promote_file_without_overwrite(
source: &Path,
destination: &Path,
) -> Result<(), StorageError> {
match fs::hard_link(source, destination).await {
Ok(()) => return Ok(()),
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
return Err(StorageError::Conflict(FILE_EXISTS_MESSAGE));
}
Err(_) => {}
}
let mut output = match fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(destination)
.await
{
Ok(output) => output,
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
return Err(StorageError::Conflict(FILE_EXISTS_MESSAGE));
}
Err(error) => return Err(error.into()),
};
let copy_result = async {
let mut input = fs::File::open(source).await?;
tokio::io::copy(&mut input, &mut output).await?;
output.flush().await
}
.await;
if let Err(error) = copy_result {
let _ = fs::remove_file(destination).await;
return Err(error.into());
}
Ok(())
}
fn validate_upload_id(upload_id: &str) -> Result<(), StorageError> {
let is_valid = !upload_id.is_empty()
&& upload_id
+561 -140
View File
@@ -1,36 +1,33 @@
const DB_NAME = "upl";
const DB_VERSION = 1;
const STORE_NAME = "uploads";
const CONCURRENCY = 3;
const CHUNK_CONCURRENCY = 3;
const FILE_CONCURRENCY = 3;
const MAX_RETRIES = 5;
const BASE_RETRY_DELAY_MS = 500;
const FILE_EXISTS_MESSAGE = "file already exists";
const fileInput = document.querySelector("#file-input");
const pickButton = document.querySelector("#pick-button");
const fileSummary = document.querySelector("#file-summary");
const fileName = document.querySelector("#file-name");
const fileSize = document.querySelector("#file-size");
const uploadSection = document.querySelector("#upload-section");
const uploadList = document.querySelector("#upload-list");
const startButton = document.querySelector("#start-button");
const pauseButton = document.querySelector("#pause-button");
const resumeButton = document.querySelector("#resume-button");
const eventLog = document.querySelector("#event-log");
const progressBar = document.querySelector("#progress-bar");
const progressMeta = document.querySelector("#progress-meta");
const pendingSection = document.querySelector("#pending-section");
const pendingList = document.querySelector("#pending-list");
const state = {
abortController: null,
completedChunks: new Set(),
file: null,
fileHandle: null,
pendingRecords: [],
record: null,
resumeAfterReselect: null,
running: false,
schedulerAbortController: null,
schedulerRunning: false,
uploadItems: [],
};
const dbReady = "indexedDB" in window ? openDatabase() : Promise.resolve(null);
let nextUploadItemId = 1;
let saveChain = Promise.resolve();
function formatBytes(bytes) {
@@ -100,7 +97,9 @@ async function withStore(mode, callback) {
async function loadRecords() {
const records = (await withStore("readonly", (store) => store.getAll())) ?? [];
records.sort((left, right) => right.updated_at.localeCompare(left.updated_at));
records.sort((left, right) =>
(right.updated_at ?? "").localeCompare(left.updated_at ?? ""),
);
state.pendingRecords = records;
renderPendingRecords();
}
@@ -108,7 +107,7 @@ async function loadRecords() {
async function saveRecord(record) {
const nextSave = saveChain.catch(() => null).then(() => writeRecord(record));
saveChain = nextSave;
await nextSave;
return nextSave;
}
async function writeRecord(record) {
@@ -124,23 +123,31 @@ async function writeRecord(record) {
await withStore("readwrite", (store) => store.put(storedRecord));
log("Saved resume state without a reusable file handle.");
}
state.record = storedRecord;
await loadRecords();
return storedRecord;
}
async function deleteRecord(uploadId) {
await withStore("readwrite", (store) => store.delete(uploadId));
if (state.record?.upload_id === uploadId) {
state.record = null;
}
await loadRecords();
}
function renderPendingRecords() {
pendingList.replaceChildren();
pendingSection.hidden = state.pendingRecords.length === 0;
for (const record of state.pendingRecords) {
const activeUploadIds = new Set(
state.uploadItems
.map((item) => item.record?.upload_id)
.filter((uploadId) => Boolean(uploadId)),
);
const visibleRecords = state.pendingRecords.filter(
(record) => !activeUploadIds.has(record.upload_id),
);
pendingSection.hidden = visibleRecords.length === 0;
for (const record of visibleRecords) {
const item = document.createElement("li");
item.className = "pending-item";
@@ -151,12 +158,13 @@ function renderPendingRecords() {
title.textContent = record.name;
const detail = document.createElement("span");
detail.textContent = `${formatBytes(record.size)} - ${record.completed_chunks ?? 0} of ${record.total_chunks} chunks`;
detail.textContent = savedUploadDetail(record);
const resume = document.createElement("button");
resume.type = "button";
resume.className = "secondary";
resume.textContent = "Resume";
resume.textContent = isReadyToFinish(record) ? "Finish" : "Resume";
resume.disabled = state.schedulerRunning || !hasAvailableFileSlot();
resume.addEventListener("click", () => {
void resumePendingRecord(record);
});
@@ -164,7 +172,8 @@ function renderPendingRecords() {
const remove = document.createElement("button");
remove.type = "button";
remove.className = "danger";
remove.textContent = "Remove";
remove.textContent = "Clear";
remove.disabled = state.schedulerRunning;
remove.addEventListener("click", () => {
void deleteRecord(record.upload_id);
});
@@ -175,29 +184,115 @@ function renderPendingRecords() {
}
}
function renderFile(file) {
if (!file) {
fileSummary.hidden = true;
fileName.textContent = "";
fileSize.textContent = "";
return;
}
function renderUploadItems() {
uploadList.replaceChildren();
uploadSection.hidden = state.uploadItems.length === 0;
fileName.textContent = file.name;
fileSize.textContent = formatBytes(file.size);
fileSummary.hidden = false;
for (const item of state.uploadItems) {
const row = document.createElement("li");
row.className = "upload-item";
if (item.terminal) {
row.classList.add("upload-item-blocked");
row.setAttribute("aria-invalid", "true");
}
const header = document.createElement("div");
header.className = "upload-item-header";
const meta = document.createElement("div");
meta.className = "upload-meta";
const title = document.createElement("strong");
title.textContent = item.file.name;
const detail = document.createElement("span");
detail.textContent = uploadItemDetail(item);
meta.append(title, detail);
const actions = document.createElement("div");
actions.className = "upload-item-actions";
const start = document.createElement("button");
start.type = "button";
start.textContent = uploadActionLabel(item);
start.disabled =
!canRunItem(item) || state.schedulerRunning || !hasAvailableFileSlot();
start.addEventListener("click", () => {
void runUploadItem(item);
});
const pause = document.createElement("button");
pause.type = "button";
pause.className = "secondary";
pause.textContent = "Pause";
pause.disabled = !item.running;
pause.addEventListener("click", () => {
item.abortController?.abort();
});
const remove = document.createElement("button");
remove.type = "button";
remove.className = "secondary";
remove.textContent = "Remove";
remove.disabled = item.running || item.queued;
remove.addEventListener("click", () => {
removeUploadItem(item);
});
actions.append(start, pause, remove);
header.append(meta, actions);
const progress = document.createElement("div");
progress.className = "upload-progress";
const progressWrap = document.createElement("div");
progressWrap.className = "progress-wrap";
progressWrap.setAttribute("aria-label", `${item.file.name} upload progress`);
const progressBar = document.createElement("div");
progressBar.className = "progress-bar";
progressBar.style.width = `${progressPercentage(
item.completedCount,
item.totalChunks,
)}%`;
const progressMeta = document.createElement("div");
progressMeta.className = "progress-meta";
progressMeta.textContent = `${item.completedCount} of ${item.totalChunks} chunks`;
progressWrap.append(progressBar);
progress.append(progressWrap, progressMeta);
row.append(header, progress);
uploadList.append(row);
}
}
function renderButtons() {
startButton.disabled = !state.file || state.running || Boolean(state.record);
pauseButton.disabled = !state.running;
resumeButton.disabled = !state.file || state.running || !state.record;
const hasRunnable = state.uploadItems.some((item) => canRunItem(item));
const hasRunnableResume = state.uploadItems.some(
(item) => item.record && canRunItem(item),
);
const hasRunningOrQueued = state.uploadItems.some((item) => item.running || item.queued);
const hasFileSlot = hasAvailableFileSlot();
startButton.disabled = !hasRunnable || state.schedulerRunning || !hasFileSlot;
pauseButton.disabled = !hasRunningOrQueued;
resumeButton.disabled = !hasRunnableResume || state.schedulerRunning || !hasFileSlot;
}
function updateProgress(completedCount, totalChunks) {
const percentage = totalChunks === 0 ? 100 : (completedCount / totalChunks) * 100;
progressBar.style.width = `${percentage}%`;
progressMeta.textContent = `${completedCount} of ${totalChunks} chunks`;
function renderAll() {
renderUploadItems();
renderPendingRecords();
renderButtons();
}
function progressPercentage(completedCount, totalChunks) {
if (totalChunks <= 0) {
return 0;
}
return Math.min(100, Math.max(0, (completedCount / totalChunks) * 100));
}
function sameFile(record, file) {
@@ -208,34 +303,212 @@ function sameFile(record, file) {
);
}
function sameUploadItemFile(item, file) {
return (
item.file.name === file.name &&
item.file.size === file.size &&
item.file.lastModified === file.lastModified
);
}
function findPendingRecord(file) {
return state.pendingRecords.find((record) => sameFile(record, file)) ?? null;
}
async function selectFile(file, fileHandle = null, record = null) {
if (record && !sameFile(record, file)) {
log("Selected file does not match the pending upload.");
function findUploadItem(file, record = null) {
return (
state.uploadItems.find((item) => {
if (item.finished || item.terminal) {
return false;
}
if (record?.upload_id && item.record?.upload_id === record.upload_id) {
return true;
}
return sameUploadItemFile(item, file);
}) ?? null
);
}
function completedChunkCount(record) {
return Math.min(record.completed_chunks ?? 0, record.total_chunks ?? 0);
}
function isReadyToFinish(record) {
const totalChunks = record.total_chunks ?? 0;
return totalChunks === 0 || completedChunkCount(record) >= totalChunks;
}
function isUploadItemReadyToFinish(item) {
return (
Boolean(item.record) &&
(item.totalChunks === 0 || item.completedCount >= item.totalChunks)
);
}
function savedUploadDetail(record) {
const totalChunks = record.total_chunks ?? 0;
const completedChunks = completedChunkCount(record);
if (isReadyToFinish(record)) {
return `${formatBytes(record.size)} - ready to finish`;
}
if (completedChunks === 0) {
return `${formatBytes(record.size)} - not uploaded yet`;
}
return `${formatBytes(record.size)} - ${completedChunks} of ${totalChunks} chunks uploaded`;
}
function uploadItemDetail(item) {
return `${formatBytes(item.file.size)} - ${item.statusText}`;
}
function uploadActionLabel(item) {
if (item.terminal) {
return "Unavailable";
}
if (item.finished) {
return "Done";
}
if (!item.record) {
return "Start";
}
return isUploadItemReadyToFinish(item) ? "Finish" : "Resume";
}
function initialUploadStatus(record) {
if (!record) {
return "Ready to create an upload record.";
}
if (isReadyToFinish(record)) {
return "Ready to finish saved upload.";
}
return "Ready to resume upload.";
}
function canRunItem(item) {
return (
Boolean(item.file) &&
!item.running &&
!item.queued &&
!item.finished &&
!item.terminal
);
}
function runningFileCount() {
return state.uploadItems.filter((item) => item.running).length;
}
function hasAvailableFileSlot() {
return runningFileCount() < FILE_CONCURRENCY;
}
function setItemProgress(item, completedCount, totalChunks) {
item.totalChunks = Math.max(0, totalChunks);
item.completedCount =
item.totalChunks === 0
? 0
: Math.min(Math.max(0, completedCount), item.totalChunks);
}
async function selectFiles(files, fileHandles = []) {
const selectedFiles = Array.from(files);
if (selectedFiles.length === 0) {
log("Choose files to begin.");
renderButtons();
return;
}
state.file = file;
state.fileHandle = fileHandle;
state.record = record ?? findPendingRecord(file);
state.completedChunks = new Set();
const resumeRecord = state.resumeAfterReselect;
let matchedResumeRecord = false;
let addedCount = 0;
renderFile(file);
updateProgress(state.record?.completed_chunks ?? 0, state.record?.total_chunks ?? 0);
renderButtons();
for (const [index, file] of selectedFiles.entries()) {
let record = findPendingRecord(file);
if (resumeRecord && sameFile(resumeRecord, file)) {
record = resumeRecord;
matchedResumeRecord = true;
}
log(state.record ? "Ready to resume upload." : "Ready to create an upload record.");
const previousCount = state.uploadItems.length;
addUploadItem(file, fileHandles[index] ?? null, record);
if (state.uploadItems.length > previousCount) {
addedCount += 1;
}
}
if (resumeRecord && !matchedResumeRecord) {
log("Selected files did not include the pending upload file.");
}
state.resumeAfterReselect = null;
fileInput.value = "";
renderAll();
if (addedCount === 1) {
log("Ready to upload 1 file.");
} else if (addedCount > 1) {
log(`Ready to upload ${addedCount} files.`);
}
}
function addUploadItem(file, fileHandle = null, record = null) {
if (record && !sameFile(record, file)) {
log("Selected file does not match the pending upload.");
return null;
}
const existingItem = findUploadItem(file, record);
if (existingItem) {
log(`${file.name} is already selected.`);
return existingItem;
}
const item = {
abortController: null,
completedChunks: new Set(),
completedCount: record ? completedChunkCount(record) : 0,
file,
fileHandle,
finished: false,
id: nextUploadItemId,
queued: false,
record,
running: false,
statusText: initialUploadStatus(record),
terminal: false,
totalChunks: record?.total_chunks ?? 0,
};
nextUploadItemId += 1;
state.uploadItems.push(item);
return item;
}
function removeUploadItem(item) {
if (item.running || item.queued) {
return;
}
state.uploadItems = state.uploadItems.filter((candidate) => candidate.id !== item.id);
renderAll();
}
async function pickFile() {
if ("showOpenFilePicker" in window) {
try {
const [handle] = await window.showOpenFilePicker({ multiple: false });
const file = await handle.getFile();
await selectFile(file, handle);
const handles = await window.showOpenFilePicker({ multiple: true });
const files = await Promise.all(handles.map((handle) => handle.getFile()));
await selectFiles(files, handles);
return;
} catch (error) {
if (isAbortError(error)) {
@@ -249,18 +522,7 @@ async function pickFile() {
}
fileInput.addEventListener("change", () => {
const [file] = fileInput.files;
if (!file) {
renderFile(null);
renderButtons();
log("Choose a file to begin.");
return;
}
const record = state.resumeAfterReselect ?? findPendingRecord(file);
state.resumeAfterReselect = null;
void selectFile(file, null, record);
void selectFiles(fileInput.files);
});
pickButton.addEventListener("click", () => {
@@ -268,21 +530,19 @@ pickButton.addEventListener("click", () => {
});
startButton.addEventListener("click", () => {
void runUpload();
void runUploadItems(state.uploadItems);
});
pauseButton.addEventListener("click", () => {
if (state.abortController) {
state.abortController.abort();
}
pauseUploads();
});
resumeButton.addEventListener("click", () => {
void runUpload();
void runUploadItems(state.uploadItems.filter((item) => item.record));
});
async function resumePendingRecord(record) {
if (state.running) {
if (state.schedulerRunning) {
return;
}
@@ -290,14 +550,21 @@ async function resumePendingRecord(record) {
const granted = await requestFileHandlePermission(record.file_handle);
if (granted) {
const file = await record.file_handle.getFile();
await selectFile(file, record.file_handle, record);
await runUpload();
const item = addUploadItem(file, record.file_handle, record);
renderAll();
if (item) {
await runUploadItem(item);
}
return;
}
}
state.resumeAfterReselect = record;
log("Select the same file to resume.");
log(
isReadyToFinish(record)
? "Select the same file to finish."
: "Select the same file to resume.",
);
fileInput.click();
}
@@ -314,101 +581,238 @@ async function requestFileHandlePermission(handle) {
return (await handle.requestPermission(options)) === "granted";
}
async function runUpload() {
if (!state.file || state.running) {
async function runUploadItems(items) {
if (state.schedulerRunning) {
return;
}
const runnableItems = items.filter((item) => canRunItem(item));
const availableSlots = FILE_CONCURRENCY - runningFileCount();
if (runnableItems.length === 0 || availableSlots <= 0) {
return;
}
const controller = new AbortController();
state.abortController = controller;
state.running = true;
renderButtons();
state.schedulerAbortController = controller;
state.schedulerRunning = true;
for (const item of runnableItems) {
item.queued = true;
item.statusText = "Queued.";
}
renderAll();
try {
if (!state.record) {
await createUploadRecord();
await runPool(
runnableItems,
async (item) => {
throwIfAborted(controller.signal);
if (!item.queued) {
return;
}
await runUploadItem(item);
},
availableSlots,
controller.signal,
);
} catch (error) {
if (!isAbortError(error)) {
log(`Upload scheduler failed: ${error.message}`);
}
} finally {
for (const item of runnableItems) {
if (item.queued) {
item.queued = false;
item.statusText = "Paused.";
}
}
const progress = await fetchJson(`/api/uploads/${state.record.upload_id}`, {
if (state.schedulerAbortController === controller) {
state.schedulerAbortController = null;
}
state.schedulerRunning = false;
renderAll();
}
}
function pauseUploads() {
state.schedulerAbortController?.abort();
for (const item of state.uploadItems) {
if (item.running) {
item.abortController?.abort();
} else if (item.queued) {
item.queued = false;
item.statusText = "Paused.";
}
}
renderAll();
}
async function runUploadItem(item) {
if (
item.running ||
item.finished ||
item.terminal ||
(!item.queued && !hasAvailableFileSlot()) ||
(!item.queued && !canRunItem(item))
) {
return;
}
const controller = new AbortController();
item.abortController = controller;
item.queued = false;
item.running = true;
item.statusText = item.record
? "Checking saved progress."
: "Creating upload record.";
renderAll();
try {
if (!item.record) {
await createUploadRecord(item, controller.signal);
}
const progress = await fetchJson(`/api/uploads/${item.record.upload_id}`, {
signal: controller.signal,
});
state.completedChunks = new Set(progress.completed_chunks);
await saveRecord({
...state.record,
completed_chunks: state.completedChunks.size,
item.completedChunks = new Set(progress.completed_chunks);
setItemProgress(item, item.completedChunks.size, progress.total_chunks);
item.record = await saveRecord({
...item.record,
completed_chunks: item.completedCount,
chunk_size: progress.chunk_size,
total_chunks: progress.total_chunks,
});
updateProgress(state.completedChunks.size, progress.total_chunks);
renderAll();
const missingChunks = buildMissingChunkList(progress.total_chunks, state.completedChunks);
log(
const missingChunks = buildMissingChunkList(progress.total_chunks, item.completedChunks);
item.statusText =
missingChunks.length === 0
? "All chunks already uploaded."
: `Uploading ${missingChunks.length} missing chunks.`,
);
: `Uploading ${missingChunks.length} missing chunks.`;
renderAll();
await runPool(missingChunks, (index) =>
uploadChunkWithRetry(index, progress.chunk_size, progress.total_chunks, controller.signal),
await runPool(
missingChunks,
(index) =>
uploadChunkWithRetry(
item,
index,
progress.chunk_size,
progress.total_chunks,
controller.signal,
),
CHUNK_CONCURRENCY,
controller.signal,
);
if (controller.signal.aborted) {
return;
}
log("Completing upload.");
const complete = await fetchJson(`/api/uploads/${state.record.upload_id}/complete`, {
item.statusText = "Completing upload.";
renderAll();
const uploadId = item.record.upload_id;
const complete = await fetchJson(`/api/uploads/${uploadId}/complete`, {
method: "POST",
signal: controller.signal,
});
updateProgress(progress.total_chunks, progress.total_chunks);
log(`Complete: ${complete.file_path}`);
await deleteRecord(state.record.upload_id);
state.completedChunks = new Set();
state.record = null;
setItemProgress(item, progress.total_chunks, progress.total_chunks);
item.finished = true;
item.statusText = `Complete: ${complete.file_path}`;
await deleteRecord(uploadId);
} catch (error) {
if (controller.signal.aborted || isAbortError(error)) {
log("Upload paused.");
item.statusText = "Paused.";
} else if (await handleTerminalUploadError(item, error)) {
controller.abort();
} else {
controller.abort();
log(`Upload failed: ${error.message}`);
item.statusText = `Upload failed: ${error.message}`;
}
} finally {
if (state.abortController === controller) {
state.abortController = null;
if (item.abortController === controller) {
item.abortController = null;
}
state.running = false;
renderButtons();
item.running = false;
renderAll();
}
}
async function createUploadRecord() {
async function handleTerminalUploadError(item, error) {
if (typeof error.status !== "number") {
return false;
}
if (isFileExistsConflict(error)) {
if (item.record) {
await deleteRecord(item.record.upload_id);
}
item.record = null;
item.completedChunks = new Set();
setItemProgress(item, 0, 0);
item.terminal = true;
item.statusText =
"File already exists on the server. Rename the file to upload this copy.";
log(`${item.file.name}: file already exists. Rename it to upload this copy.`);
return true;
}
if (!item.record) {
return false;
}
const uploadId = item.record.upload_id;
if (error.status === 404) {
await deleteRecord(uploadId);
item.record = null;
item.completedChunks = new Set();
setItemProgress(item, 0, 0);
item.statusText = "Saved upload progress no longer exists. Start again.";
log(`${item.file.name}: saved upload progress no longer exists on the server.`);
return true;
}
return false;
}
function isFileExistsConflict(error) {
return error.status === 409 && error.message === FILE_EXISTS_MESSAGE;
}
async function createUploadRecord(item, signal) {
const response = await fetchJson("/api/uploads", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
name: state.file.name,
size: state.file.size,
last_modified: state.file.lastModified,
name: item.file.name,
size: item.file.size,
last_modified: item.file.lastModified,
}),
signal,
});
const record = {
upload_id: response.upload_id,
name: state.file.name,
size: state.file.size,
last_modified: state.file.lastModified,
name: item.file.name,
size: item.file.size,
last_modified: item.file.lastModified,
chunk_size: response.chunk_size,
total_chunks: response.total_chunks,
completed_chunks: 0,
file_handle: state.fileHandle,
file_handle: item.fileHandle,
updated_at: new Date().toISOString(),
};
await saveRecord(record);
updateProgress(0, response.total_chunks);
log("Upload record created.");
item.record = await saveRecord(record);
setItemProgress(item, 0, response.total_chunks);
item.statusText = "Upload record created.";
renderAll();
}
function buildMissingChunkList(totalChunks, completedChunks) {
@@ -421,18 +825,19 @@ function buildMissingChunkList(totalChunks, completedChunks) {
return missing;
}
async function uploadChunkWithRetry(index, chunkSize, totalChunks, signal) {
async function uploadChunkWithRetry(item, index, chunkSize, totalChunks, signal) {
for (let attempt = 0; attempt <= MAX_RETRIES; attempt += 1) {
throwIfAborted(signal);
try {
await uploadChunk(index, chunkSize, signal);
state.completedChunks.add(index);
updateProgress(state.completedChunks.size, totalChunks);
await saveRecord({
...state.record,
completed_chunks: state.completedChunks.size,
await uploadChunk(item, index, chunkSize, signal);
item.completedChunks.add(index);
setItemProgress(item, item.completedChunks.size, totalChunks);
item.record = await saveRecord({
...item.record,
completed_chunks: item.completedCount,
});
renderAll();
return;
} catch (error) {
if (isAbortError(error) || attempt === MAX_RETRIES) {
@@ -440,34 +845,42 @@ async function uploadChunkWithRetry(index, chunkSize, totalChunks, signal) {
}
const delayMs = BASE_RETRY_DELAY_MS * 2 ** attempt;
log(`Retrying chunk ${index} after ${delayMs} ms.`);
item.statusText = `Retrying chunk ${index} after ${delayMs} ms.`;
renderAll();
await delay(delayMs, signal);
}
}
}
async function uploadChunk(index, chunkSize, signal) {
async function uploadChunk(item, index, chunkSize, signal) {
const start = index * chunkSize;
const end = Math.min(state.file.size, start + chunkSize);
const body = state.file.slice(start, end);
const response = await fetch(`/api/uploads/${state.record.upload_id}/chunks/${index}`, {
method: "PUT",
headers: { "Content-Type": "application/octet-stream" },
body,
signal,
});
const end = Math.min(item.file.size, start + chunkSize);
const body = item.file.slice(start, end);
const response = await fetch(
`/api/uploads/${item.record.upload_id}/chunks/${index}`,
{
method: "PUT",
headers: { "Content-Type": "application/octet-stream" },
body,
signal,
},
);
if (!response.ok) {
throw new Error(await responseError(response));
throw new ApiRequestError(await responseError(response), response.status);
}
}
async function runPool(items, worker) {
async function runPool(items, worker, concurrency, signal = null) {
let nextIndex = 0;
const workers = Array.from(
{ length: Math.min(CONCURRENCY, items.length) },
{ length: Math.min(concurrency, items.length) },
async () => {
while (nextIndex < items.length) {
if (signal) {
throwIfAborted(signal);
}
const item = items[nextIndex];
nextIndex += 1;
await worker(item);
@@ -478,10 +891,18 @@ async function runPool(items, worker) {
await Promise.all(workers);
}
class ApiRequestError extends Error {
constructor(message, status) {
super(message);
this.name = "ApiRequestError";
this.status = status;
}
}
async function fetchJson(url, options = {}) {
const response = await fetch(url, options);
if (!response.ok) {
throw new Error(await responseError(response));
throw new ApiRequestError(await responseError(response), response.status);
}
return response.json();
}
@@ -527,7 +948,7 @@ function isAbortError(error) {
async function initialize() {
await loadRecords();
renderButtons();
renderAll();
}
void initialize();
+12 -18
View File
@@ -15,37 +15,31 @@
<h1 id="app-title">upl</h1>
<p class="subtle">Resumable uploads to this machine.</p>
</div>
<span class="status-pill" id="connection-status">Server online</span>
</div>
<div class="file-picker">
<button id="pick-button" type="button">Choose file</button>
<input id="file-input" type="file">
<button id="pick-button" type="button">Choose files</button>
<input id="file-input" type="file" multiple>
</div>
<div class="file-summary" id="file-summary" hidden>
<strong id="file-name"></strong>
<span id="file-size"></span>
</div>
<div class="progress-wrap" aria-label="Upload progress">
<div class="progress-bar" id="progress-bar"></div>
</div>
<div class="progress-meta" id="progress-meta">0 of 0 chunks</div>
<div class="actions">
<button id="start-button" type="button" disabled>Start</button>
<button id="pause-button" type="button" disabled>Pause</button>
<button id="resume-button" type="button" disabled>Resume</button>
<button id="start-button" type="button" disabled>Start all</button>
<button id="pause-button" type="button" disabled>Pause all</button>
<button id="resume-button" type="button" disabled>Resume all</button>
</div>
<section class="upload-section" id="upload-section" hidden>
<h2>Selected uploads</h2>
<ul class="upload-list" id="upload-list"></ul>
</section>
<section class="pending-section" id="pending-section" hidden>
<h2>Pending uploads</h2>
<h2>Saved upload progress</h2>
<ul class="pending-list" id="pending-list"></ul>
</section>
<ol class="event-log" id="event-log" aria-live="polite">
<li>Choose a file to begin.</li>
<li>Choose files to begin.</li>
</ol>
</section>
</main>
+61 -21
View File
@@ -71,17 +71,6 @@ h1 {
color: var(--muted);
}
.status-pill {
flex: 0 0 auto;
padding: 6px 10px;
border: 1px solid color-mix(in srgb, var(--accent) 34%, transparent);
border-radius: 999px;
color: var(--accent-strong);
background: color-mix(in srgb, var(--accent) 10%, transparent);
font-size: 0.875rem;
font-weight: 700;
}
.file-picker {
display: flex;
align-items: center;
@@ -99,16 +88,57 @@ h1 {
clip: rect(0, 0, 0, 0);
}
.file-summary {
.upload-section {
display: grid;
gap: 4px;
padding: 14px;
gap: 12px;
}
.upload-list {
display: grid;
gap: 12px;
margin: 0;
padding: 0;
list-style: none;
}
.upload-item {
display: grid;
gap: 12px;
padding: 12px;
border: 1px solid var(--line);
border-radius: 8px;
}
.file-summary span {
.upload-item-blocked {
border-color: #f04438;
background: rgb(240 68 56 / 8%);
}
.upload-item-blocked .upload-meta span {
color: #b42318;
}
.upload-item-header {
display: grid;
grid-template-columns: minmax(0, 1fr) auto;
align-items: start;
gap: 12px;
}
.upload-meta {
display: grid;
gap: 4px;
min-width: 0;
}
.upload-meta span {
color: var(--muted);
font-size: 0.875rem;
}
.upload-progress {
display: grid;
gap: 6px;
}
.progress-wrap {
@@ -126,7 +156,6 @@ h1 {
}
.progress-meta {
margin-top: -12px;
color: var(--muted);
font-size: 0.875rem;
}
@@ -137,6 +166,13 @@ h1 {
gap: 10px;
}
.upload-item-actions {
display: flex;
flex-wrap: wrap;
justify-content: flex-end;
gap: 8px;
}
button {
min-width: 96px;
min-height: 40px;
@@ -195,7 +231,7 @@ h2 {
}
.pending-item strong,
.file-summary strong {
.upload-meta strong {
overflow-wrap: anywhere;
}
@@ -236,10 +272,6 @@ h2 {
display: grid;
}
.status-pill {
width: max-content;
}
.upload-panel {
padding: 18px;
}
@@ -247,4 +279,12 @@ h2 {
.pending-item {
grid-template-columns: 1fr;
}
.upload-item-header {
grid-template-columns: 1fr;
}
.upload-item-actions {
justify-content: flex-start;
}
}
+11 -9
View File
@@ -42,13 +42,16 @@ async fn stores_chunks_and_reports_progress() -> Result<(), Box<dyn std::error::
let progress = get_progress(&app, &upload.upload_id).await?;
assert_eq!(progress.completed_chunks, vec![0, 1]);
let chunk_path = temp_dir
.path()
.join("staging")
.join(&upload.upload_id)
.join("chunks")
.join("000000.part");
assert_eq!(tokio::fs::metadata(chunk_path).await?.len(), CHUNK_SIZE);
let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id);
assert_eq!(
tokio::fs::metadata(upload_dir.join(".upload.tmp"))
.await?
.len(),
CHUNK_SIZE + 3
);
assert!(upload_dir.join("completed").join("000000.done").is_file());
assert!(upload_dir.join("completed").join("000001.done").is_file());
assert!(!upload_dir.join("chunks").exists());
Ok(())
}
@@ -84,8 +87,7 @@ async fn rejects_out_of_range_chunk_index() -> Result<(), Box<dyn std::error::Er
}
#[tokio::test]
async fn accepts_duplicate_chunk_when_existing_length_matches()
-> Result<(), Box<dyn std::error::Error>> {
async fn accepts_duplicate_completed_chunk() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, temp_dir.path(), 4).await?;
+121 -11
View File
@@ -48,11 +48,11 @@ async fn assembles_completed_upload() -> Result<(), Box<dyn std::error::Error>>
b"hello world"
);
assert!(
temp_dir
!temp_dir
.path()
.join("staging")
.join(&upload.upload_id)
.is_dir()
.exists()
);
let duplicate = app
@@ -61,7 +61,81 @@ async fn assembles_completed_upload() -> Result<(), Box<dyn std::error::Error>>
&format!("/api/uploads/{}/complete", upload.upload_id),
)?)
.await?;
assert_eq!(duplicate.status(), StatusCode::CONFLICT);
assert_eq!(duplicate.status(), StatusCode::NOT_FOUND);
Ok(())
}
#[tokio::test]
async fn parallel_uploads_keep_bytes_separate() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let chunk_size = usize::try_from(CHUNK_SIZE)?;
let left_upload = create_upload(&app, "left.bin", CHUNK_SIZE + 4).await?;
let right_upload = create_upload(&app, "right.bin", CHUNK_SIZE + 5).await?;
let mut expected_left = vec![b'l'; chunk_size];
expected_left.extend_from_slice(b"eft!");
let mut expected_right = vec![b'r'; chunk_size];
expected_right.extend_from_slice(b"ight!");
let left_first = chunk_request(
&left_upload.upload_id,
0,
expected_left[..chunk_size].to_vec(),
)?;
let left_final = chunk_request(
&left_upload.upload_id,
1,
expected_left[chunk_size..].to_vec(),
)?;
let right_first = chunk_request(
&right_upload.upload_id,
0,
expected_right[..chunk_size].to_vec(),
)?;
let right_final = chunk_request(
&right_upload.upload_id,
1,
expected_right[chunk_size..].to_vec(),
)?;
let (left_first, right_first, left_final, right_final) = tokio::join!(
app.clone().oneshot(left_first),
app.clone().oneshot(right_first),
app.clone().oneshot(left_final),
app.clone().oneshot(right_final),
);
assert_eq!(left_first?.status(), StatusCode::NO_CONTENT);
assert_eq!(right_first?.status(), StatusCode::NO_CONTENT);
assert_eq!(left_final?.status(), StatusCode::NO_CONTENT);
assert_eq!(right_final?.status(), StatusCode::NO_CONTENT);
let left_complete = empty_request(
Method::POST,
&format!("/api/uploads/{}/complete", left_upload.upload_id),
)?;
let right_complete = empty_request(
Method::POST,
&format!("/api/uploads/{}/complete", right_upload.upload_id),
)?;
let (left_complete, right_complete) = tokio::join!(
app.clone().oneshot(left_complete),
app.clone().oneshot(right_complete),
);
assert_eq!(left_complete?.status(), StatusCode::OK);
assert_eq!(right_complete?.status(), StatusCode::OK);
assert_eq!(
tokio::fs::read(temp_dir.path().join("complete").join("left.bin")).await?,
expected_left
);
assert_eq!(
tokio::fs::read(temp_dir.path().join("complete").join("right.bin")).await?,
expected_right
);
Ok(())
}
@@ -98,18 +172,54 @@ async fn rejects_incomplete_upload() -> Result<(), Box<dyn std::error::Error>> {
}
#[tokio::test]
async fn rejects_corrupt_chunk_file() -> Result<(), Box<dyn std::error::Error>> {
async fn rejects_completion_that_would_replace_file() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, "clash.bin", 8).await?;
tokio::fs::write(
temp_dir.path().join("complete").join("clash.bin"),
b"original",
)
.await?;
let response = app
.clone()
.oneshot(chunk_request(&upload.upload_id, 0, b"incoming".to_vec())?)
.await?;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let response = app
.oneshot(empty_request(
Method::POST,
&format!("/api/uploads/{}/complete", upload.upload_id),
)?)
.await?;
assert_eq!(response.status(), StatusCode::CONFLICT);
assert_eq!(
tokio::fs::read(temp_dir.path().join("complete").join("clash.bin")).await?,
b"original"
);
assert!(
temp_dir
.path()
.join("staging")
.join(&upload.upload_id)
.exists()
);
Ok(())
}
#[tokio::test]
async fn rejects_tampered_temp_upload_file() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let upload = create_upload(&app, "corrupt.bin", 4).await?;
let chunk_path = temp_dir
.path()
.join("staging")
.join(&upload.upload_id)
.join("chunks")
.join("000000.part");
tokio::fs::write(chunk_path, b"bad").await?;
let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id);
tokio::fs::write(upload_dir.join(".upload.tmp"), b"bad").await?;
tokio::fs::write(upload_dir.join("completed").join("000000.done"), b"").await?;
let response = app
.oneshot(empty_request(
+4 -2
View File
@@ -22,8 +22,10 @@ async fn serves_index_page() -> Result<(), Box<dyn std::error::Error>> {
let body = String::from_utf8(body.to_vec())?;
assert!(body.contains("<title>upl</title>"));
assert!(body.contains("Choose file"));
assert!(body.contains("Pending uploads"));
assert!(body.contains("Choose files"));
assert!(body.contains("Selected uploads"));
assert!(body.contains("Saved upload progress"));
assert!(!body.contains("Server online"));
Ok(())
}
+37 -1
View File
@@ -43,7 +43,8 @@ async fn creates_upload_metadata_on_disk() -> Result<(), Box<dyn std::error::Err
let upload_dir = temp_dir.path().join("staging").join(&response.upload_id);
let meta_path = upload_dir.join("meta.json");
assert!(upload_dir.join("chunks").is_dir());
assert!(upload_dir.join(".upload.tmp").is_file());
assert!(upload_dir.join("completed").is_dir());
assert!(temp_dir.path().join("complete").is_dir());
let meta: UploadMeta = serde_json::from_slice(&tokio::fs::read(meta_path).await?)?;
@@ -76,6 +77,41 @@ async fn rejects_empty_upload_name() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
#[tokio::test]
async fn rejects_upload_name_that_already_exists() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = TempDir::new()?;
let app = test_app(temp_dir.path());
let complete_dir = temp_dir.path().join("complete");
tokio::fs::create_dir_all(&complete_dir).await?;
tokio::fs::write(complete_dir.join("xyz.foo"), b"original").await?;
let response = app
.oneshot(json_request(
"/api/uploads",
&json!({
"name": "xyz.foo",
"size": 10,
"last_modified": 1_760_000_000_000_i64
}),
)?)
.await?;
assert_eq!(response.status(), StatusCode::CONFLICT);
let body = response.into_body().collect().await?.to_bytes();
let body: serde_json::Value = serde_json::from_slice(&body)?;
assert_eq!(body["error"], "file already exists");
assert_eq!(
tokio::fs::read(complete_dir.join("xyz.foo")).await?,
b"original"
);
let mut staging_entries = tokio::fs::read_dir(temp_dir.path().join("staging")).await?;
assert!(staging_entries.next_entry().await?.is_none());
Ok(())
}
fn test_app(data_dir: &Path) -> axum::Router {
build_router(&AppConfig::new(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),