Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
60663a461c
|
|||
|
1923ff2a6f
|
|||
|
a7b3abd54a
|
|||
|
c072b93726
|
|||
|
428af52e2f
|
|||
|
996ad5c4c8
|
|||
|
8d81b436e5
|
Generated
+121
@@ -2,6 +2,56 @@
|
||||
# It is not intended for manual editing.
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "anstream"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"anstyle-parse",
|
||||
"anstyle-query",
|
||||
"anstyle-wincon",
|
||||
"colorchoice",
|
||||
"is_terminal_polyfill",
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "1.0.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000"
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-parse"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e"
|
||||
dependencies = [
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-query"
|
||||
version = "1.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
|
||||
dependencies = [
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-wincon"
|
||||
version = "3.0.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"once_cell_polyfill",
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.102"
|
||||
@@ -90,6 +140,52 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
"clap_lex",
|
||||
"strsim",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "4.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570"
|
||||
|
||||
[[package]]
|
||||
name = "deranged"
|
||||
version = "0.5.8"
|
||||
@@ -314,6 +410,12 @@ dependencies = [
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "is_terminal_polyfill"
|
||||
version = "1.70.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.18"
|
||||
@@ -416,6 +518,12 @@ version = "1.21.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
|
||||
|
||||
[[package]]
|
||||
name = "once_cell_polyfill"
|
||||
version = "1.70.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.5"
|
||||
@@ -635,6 +743,12 @@ dependencies = [
|
||||
"windows-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.117"
|
||||
@@ -833,6 +947,7 @@ name = "upl"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"clap",
|
||||
"http-body-util",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -844,6 +959,12 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "utf8parse"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.23.2"
|
||||
|
||||
@@ -5,6 +5,7 @@ edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
axum = "0.8.9"
|
||||
clap = { version = "4.5.53", features = ["derive"] }
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = "1.0.150"
|
||||
time = { version = "0.3.47", features = ["formatting", "serde"] }
|
||||
|
||||
@@ -26,9 +26,9 @@ The program should stay simple:
|
||||
|
||||
The browser owns file selection and chunk scheduling.
|
||||
|
||||
- Let the user pick one file.
|
||||
- Let the user pick one or more files.
|
||||
- Slice it into fixed-size chunks with `Blob.slice()`.
|
||||
- Upload a few chunks concurrently.
|
||||
- Upload a few files concurrently, with a separate chunk pool per file.
|
||||
- Retry failed chunks with exponential backoff.
|
||||
- Persist pending upload state in IndexedDB.
|
||||
- Use the File System Access API when available so the same local file can be
|
||||
@@ -194,11 +194,12 @@ The server should not delete staging data until assembly succeeds.
|
||||
|
||||
### First Upload
|
||||
|
||||
1. User selects a file.
|
||||
2. Browser calls `POST /api/uploads`.
|
||||
3. Browser stores the returned `upload_id` and file handle in IndexedDB.
|
||||
4. Browser uploads missing chunks with a small concurrency pool.
|
||||
5. Browser calls `/complete` when all chunks are uploaded.
|
||||
1. User selects one or more files.
|
||||
2. Browser creates one selected upload row per file.
|
||||
3. Browser calls `POST /api/uploads` once for each file being started.
|
||||
4. Browser stores each returned `upload_id` and file handle in IndexedDB.
|
||||
5. Browser uploads missing chunks with bounded file and chunk concurrency pools.
|
||||
6. Browser calls `/complete` for each file when all of its chunks are uploaded.
|
||||
|
||||
### After Interruption
|
||||
|
||||
@@ -239,7 +240,8 @@ Start with these defaults:
|
||||
|
||||
```text
|
||||
chunk size: 16 MiB
|
||||
concurrency: 3
|
||||
file concurrency: 3
|
||||
chunk concurrency per file: 3
|
||||
max retries per chunk: 5
|
||||
```
|
||||
|
||||
|
||||
@@ -6,9 +6,9 @@
|
||||
browser -> nginx -> upl Rust server -> local filesystem
|
||||
```
|
||||
|
||||
The first implementation milestone provides the Rust server shell and static
|
||||
browser UI. Upload metadata, chunk persistence, resume state, and completion
|
||||
assembly are tracked in `PLAN.md` and will be added in later coherent slices.
|
||||
The server writes upload chunks directly into an inaccessible temp file at
|
||||
their final offsets. Once every chunk is present, completion promotes that temp
|
||||
file into the completed upload directory without replacing an existing file.
|
||||
|
||||
## Project Structure
|
||||
|
||||
@@ -19,12 +19,12 @@ upl
|
||||
src/app.rs Axum router, shared state, static file service
|
||||
src/api.rs HTTP handlers and API error responses
|
||||
src/model.rs JSON request, response, and metadata shapes
|
||||
src/storage.rs local filesystem layout, chunks, and assembly
|
||||
src/storage.rs local filesystem layout, offset writes, and final promotion
|
||||
src/lib.rs library surface used by integration tests
|
||||
Browser UI
|
||||
static/index.html upload tool markup
|
||||
static/styles.css responsive tool styling
|
||||
static/app.js upload scheduler, retries, and browser resume state
|
||||
static/app.js multi-file scheduler, retries, and browser resume state
|
||||
Deployment
|
||||
deploy/nginx/ nginx reverse proxy example
|
||||
scripts/ reusable local smoke tests
|
||||
@@ -35,13 +35,22 @@ upl
|
||||
|
||||
## Configuration
|
||||
|
||||
- `UPL_BIND` sets the listen address. It defaults to `127.0.0.1:3000`.
|
||||
- `UPL_STATIC_DIR` sets the static asset directory. It defaults to `static/`
|
||||
inside this repository.
|
||||
- `UPL_DATA_DIR` sets the upload data directory. It defaults to `data/` inside
|
||||
this repository.
|
||||
- `--bind` sets the listen address. It overrides `UPL_BIND` and defaults to
|
||||
`127.0.0.1:3000`.
|
||||
- `--static-dir` sets the static asset directory. It overrides `UPL_STATIC_DIR`
|
||||
and defaults to `static/` inside this repository.
|
||||
- `--data-dir` sets the completed upload data root. Completed files land under
|
||||
its `complete/` subdirectory. It overrides `UPL_DATA_DIR` and defaults to
|
||||
`data/` inside this repository.
|
||||
- `--temp-dir` sets the directory for upload metadata, completion markers, and
|
||||
inaccessible temp upload files. It overrides `UPL_TEMP_DIR` and defaults to
|
||||
`<data-dir>/staging`.
|
||||
- `upl --help` prints the full argument help text.
|
||||
- The server accepts request bodies up to 64 MiB, which leaves room for the
|
||||
planned 16 MiB upload chunks and matches the nginx example in `PLAN.md`.
|
||||
- Keep `UPL_TEMP_DIR` on the same filesystem as `<data-dir>/complete` for the
|
||||
cheapest final promotion. Cross-filesystem temp directories still work, but
|
||||
completion falls back to copying into a newly created final file.
|
||||
|
||||
## Common Commands
|
||||
|
||||
@@ -54,17 +63,33 @@ just run
|
||||
|
||||
`just check` also syntax-checks the static browser JavaScript with `node`.
|
||||
|
||||
## Browser Uploads
|
||||
|
||||
The browser UI accepts multiple selected files. `Start all` runs up to three
|
||||
file uploads at the same time, and each file still uploads up to three chunks
|
||||
concurrently. Every selected file keeps its own upload id, progress markers,
|
||||
abort controller, retry state, and saved IndexedDB resume record.
|
||||
|
||||
If a completed file with the same sanitized name already exists, the server
|
||||
rejects the upload before staging begins. The selected row is marked
|
||||
unavailable and tells the user to rename the file if they want to upload that
|
||||
copy.
|
||||
|
||||
## nginx
|
||||
|
||||
Run `upl` on localhost and put nginx in front of it for TLS and access control:
|
||||
|
||||
```sh
|
||||
UPL_BIND=127.0.0.1:3000 UPL_DATA_DIR=/srv/upl/data upl
|
||||
UPL_BIND=127.0.0.1:3000 \
|
||||
UPL_DATA_DIR=/srv/upl/data \
|
||||
UPL_TEMP_DIR=/srv/upl/data/staging \
|
||||
upl
|
||||
```
|
||||
|
||||
Use `deploy/nginx/upl.conf.example` as the starting point for the nginx site.
|
||||
Before exposing the service, replace the certificate paths and add a protection
|
||||
layer such as HTTP basic auth, an IP allowlist, or VPN-only access.
|
||||
layer such as HTTP basic auth, an IP allowlist, or VPN-only access. The nginx
|
||||
example aliases only `/srv/upl/data/complete`; do not expose `UPL_TEMP_DIR`.
|
||||
|
||||
For a local Docker-based reverse-proxy smoke test:
|
||||
|
||||
|
||||
@@ -10,22 +10,32 @@ Keep this file as the reusable verification checklist while implementing
|
||||
- Current coverage:
|
||||
- `GET /` serves the static browser page.
|
||||
- `GET /healthz` reports `ok`.
|
||||
- `POST /api/uploads` creates `meta.json` and chunk directories.
|
||||
- `POST /api/uploads` creates `meta.json`, a temp upload file, and a
|
||||
completion-marker directory.
|
||||
- `POST /api/uploads` rejects an empty file name.
|
||||
- `PUT /api/uploads/:id/chunks/:index` stores validated chunk files.
|
||||
- `POST /api/uploads` rejects a name that already exists in completed
|
||||
storage before staging begins.
|
||||
- `PUT /api/uploads/:id/chunks/:index` writes validated chunks into the
|
||||
temp upload file and records completion markers.
|
||||
- `PUT /api/uploads/:id/chunks/:index` rejects wrong-size chunks.
|
||||
- `PUT /api/uploads/:id/chunks/:index` rejects out-of-range indexes.
|
||||
- `PUT /api/uploads/:id/chunks/:index` accepts duplicate chunks.
|
||||
- `GET /api/uploads/:id` reports completed chunks from disk.
|
||||
- `POST /api/uploads/:id/complete` assembles verified chunks.
|
||||
- `GET /api/uploads/:id` reports completed chunks from disk markers.
|
||||
- `POST /api/uploads/:id/complete` promotes the verified temp upload file
|
||||
and removes staging data.
|
||||
- Parallel upload requests for separate files complete without crossing
|
||||
bytes between temp upload files.
|
||||
- `POST /api/uploads/:id/complete` rejects incomplete uploads.
|
||||
- `POST /api/uploads/:id/complete` rejects corrupt chunk files.
|
||||
- `POST /api/uploads/:id/complete` refuses to replace a completed file that
|
||||
appears after the upload was created.
|
||||
- `POST /api/uploads/:id/complete` rejects tampered temp upload files.
|
||||
- `static/app.js` passes `node --check`.
|
||||
- `just nginx-smoke`
|
||||
- Runs upl behind nginx in Docker.
|
||||
- Uploads a 17 MiB file through nginx.
|
||||
- Restarts the Rust backend mid-upload, resumes through nginx, completes, and
|
||||
compares SHA-256 hashes.
|
||||
- Serves the completed file through nginx's final-upload alias.
|
||||
|
||||
## Manual
|
||||
|
||||
@@ -34,6 +44,8 @@ deployment retests.
|
||||
|
||||
- Upload a small file in one pass.
|
||||
- Upload a file larger than one chunk.
|
||||
- Select multiple files and confirm several upload rows advance at the same
|
||||
time.
|
||||
- Kill the browser tab mid-upload and resume.
|
||||
- Restart the Rust server mid-upload and resume.
|
||||
- Interrupt the network and resume.
|
||||
|
||||
@@ -22,6 +22,14 @@ server {
|
||||
# auth_basic "upl";
|
||||
# auth_basic_user_file /etc/nginx/upl.htpasswd;
|
||||
|
||||
# Expose only completed uploads. Keep UPL_TEMP_DIR outside every nginx
|
||||
# alias/root so in-progress temp files and progress markers are private.
|
||||
location /files/ {
|
||||
alias /srv/upl/data/complete/;
|
||||
autoindex on;
|
||||
try_files $uri =404;
|
||||
}
|
||||
|
||||
location / {
|
||||
proxy_pass http://upl_backend;
|
||||
proxy_http_version 1.1;
|
||||
|
||||
@@ -20,5 +20,5 @@ check:
|
||||
just static-check
|
||||
just clippy
|
||||
|
||||
run:
|
||||
cargo run
|
||||
run *args:
|
||||
cargo run -- {{args}}
|
||||
|
||||
+19
-2
@@ -9,10 +9,13 @@ workspace_dir="$(pwd)"
|
||||
mkdir -p "$workspace_dir/target/nginx-smoke"
|
||||
tmp_dir="$(mktemp -d "$workspace_dir/target/nginx-smoke/run.XXXXXXXX")"
|
||||
data_dir="$tmp_dir/data"
|
||||
complete_dir="$data_dir/complete"
|
||||
temp_dir="$tmp_dir/upload-temp"
|
||||
nginx_conf_dir="$tmp_dir/nginx-conf.d"
|
||||
nginx_conf="$nginx_conf_dir/default.conf"
|
||||
backend_log="$tmp_dir/backend.log"
|
||||
source_file="$tmp_dir/source.bin"
|
||||
served_file="$tmp_dir/served.bin"
|
||||
chunk0="$tmp_dir/chunk0.part"
|
||||
chunk1="$tmp_dir/chunk1.part"
|
||||
backend_pid=""
|
||||
@@ -29,7 +32,7 @@ cleanup() {
|
||||
trap cleanup EXIT
|
||||
|
||||
start_backend() {
|
||||
UPL_BIND="0.0.0.0:$backend_port" UPL_DATA_DIR="$data_dir" \
|
||||
UPL_BIND="0.0.0.0:$backend_port" UPL_DATA_DIR="$data_dir" UPL_TEMP_DIR="$temp_dir" \
|
||||
cargo run --quiet >"$backend_log" 2>&1 &
|
||||
backend_pid="$!"
|
||||
wait_for "http://127.0.0.1:$backend_port/healthz"
|
||||
@@ -66,13 +69,19 @@ process.stdin.on("end", () => {
|
||||
' "$field"
|
||||
}
|
||||
|
||||
mkdir -p "$data_dir" "$nginx_conf_dir"
|
||||
mkdir -p "$complete_dir" "$temp_dir" "$nginx_conf_dir"
|
||||
|
||||
cat >"$nginx_conf" <<EOF
|
||||
server {
|
||||
listen $proxy_port;
|
||||
client_max_body_size 64m;
|
||||
|
||||
location /files/ {
|
||||
alias /upl-complete/;
|
||||
autoindex off;
|
||||
try_files \$uri =404;
|
||||
}
|
||||
|
||||
location / {
|
||||
proxy_pass http://host.docker.internal:$backend_port;
|
||||
proxy_http_version 1.1;
|
||||
@@ -95,6 +104,7 @@ docker run -d --rm \
|
||||
--add-host host.docker.internal:host-gateway \
|
||||
-p "127.0.0.1:$proxy_port:$proxy_port" \
|
||||
-v "$nginx_conf_dir:/etc/nginx/conf.d:ro" \
|
||||
-v "$complete_dir:/upl-complete:ro" \
|
||||
"$nginx_image" >/dev/null
|
||||
wait_for "http://127.0.0.1:$proxy_port/healthz"
|
||||
|
||||
@@ -143,10 +153,17 @@ complete_path="$(printf '%s' "$complete_response" | json_field file_path)"
|
||||
|
||||
source_hash="$(sha256sum "$source_file" | awk '{print $1}')"
|
||||
complete_hash="$(sha256sum "$complete_path" | awk '{print $1}')"
|
||||
curl -fsS "http://127.0.0.1:$proxy_port/files/source.bin" -o "$served_file"
|
||||
served_hash="$(sha256sum "$served_file" | awk '{print $1}')"
|
||||
|
||||
if [[ "$source_hash" != "$complete_hash" ]]; then
|
||||
echo "Checksum mismatch after nginx-proxied resume" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ "$source_hash" != "$served_hash" ]]; then
|
||||
echo "Checksum mismatch through nginx completed-file alias" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "nginx smoke ok: $upload_id"
|
||||
|
||||
+2
-2
@@ -55,12 +55,12 @@ pub async fn put_chunk(
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
/// Assembles uploaded chunks into the final completed file.
|
||||
/// Promotes a fully uploaded temp file into the final completed file.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an API error when the upload is unknown, incomplete, invalid, or
|
||||
/// cannot be assembled on disk.
|
||||
/// cannot be promoted on disk.
|
||||
pub async fn complete_upload(
|
||||
State(state): State<AppState>,
|
||||
Path(upload_id): Path<String>,
|
||||
|
||||
+196
-7
@@ -1,6 +1,7 @@
|
||||
use std::{
|
||||
env,
|
||||
error::Error,
|
||||
ffi::OsString,
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
@@ -10,6 +11,7 @@ use axum::{
|
||||
extract::DefaultBodyLimit,
|
||||
routing::{get, post},
|
||||
};
|
||||
use clap::Parser;
|
||||
use tower_http::services::{ServeDir, ServeFile};
|
||||
|
||||
use crate::{api, storage::Storage};
|
||||
@@ -17,6 +19,7 @@ use crate::{api, storage::Storage};
|
||||
const DEFAULT_BIND_ADDR: &str = "127.0.0.1:3000";
|
||||
const STATIC_DIR_ENV: &str = "UPL_STATIC_DIR";
|
||||
const DATA_DIR_ENV: &str = "UPL_DATA_DIR";
|
||||
const TEMP_DIR_ENV: &str = "UPL_TEMP_DIR";
|
||||
const BIND_ENV: &str = "UPL_BIND";
|
||||
const MAX_REQUEST_BODY_BYTES: usize = 64 * 1024 * 1024;
|
||||
|
||||
@@ -25,6 +28,7 @@ pub struct AppConfig {
|
||||
pub bind_addr: SocketAddr,
|
||||
pub static_dir: PathBuf,
|
||||
pub data_dir: PathBuf,
|
||||
pub temp_dir: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -32,23 +36,95 @@ pub struct AppState {
|
||||
pub storage: Storage,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Parser)]
|
||||
#[command(
|
||||
name = "upl",
|
||||
version,
|
||||
about = "Run the upl resumable upload server.",
|
||||
long_about = "Run the upl resumable upload server.\n\nCommand-line arguments override environment variables. When neither is set, upl uses local development defaults inside the repository.",
|
||||
after_help = "Environment variables:\n UPL_BIND Default listen address\n UPL_STATIC_DIR Default static asset directory\n UPL_DATA_DIR Default completed upload data directory\n UPL_TEMP_DIR Default temporary upload directory"
|
||||
)]
|
||||
pub struct CliArgs {
|
||||
/// Socket address to listen on. Overrides `UPL_BIND`. Defaults to 127.0.0.1:3000.
|
||||
#[arg(long, value_name = "ADDR")]
|
||||
pub bind: Option<SocketAddr>,
|
||||
|
||||
/// Directory containing index.html and other browser assets. Overrides `UPL_STATIC_DIR`.
|
||||
#[arg(long, value_name = "PATH")]
|
||||
pub static_dir: Option<PathBuf>,
|
||||
|
||||
/// Directory where completed upload files are written. Overrides `UPL_DATA_DIR`.
|
||||
#[arg(long, value_name = "PATH")]
|
||||
pub data_dir: Option<PathBuf>,
|
||||
|
||||
/// Directory where upload metadata, progress markers, and temp files are written. Overrides `UPL_TEMP_DIR`.
|
||||
#[arg(long, value_name = "PATH")]
|
||||
pub temp_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl AppConfig {
|
||||
/// Loads bind and static directory settings from environment variables.
|
||||
/// Loads settings from command-line arguments and environment variables.
|
||||
///
|
||||
/// Command-line arguments take precedence over environment variables.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when an argument or `UPL_BIND` is not a valid socket
|
||||
/// address.
|
||||
pub fn from_args() -> Result<Self, Box<dyn Error>> {
|
||||
Self::from_cli_and_env(CliArgs::parse())
|
||||
}
|
||||
|
||||
/// Loads settings from environment variables.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when `UPL_BIND` is set but is not a valid socket address.
|
||||
pub fn from_env() -> Result<Self, Box<dyn Error>> {
|
||||
let bind_addr = env::var(BIND_ENV)
|
||||
.unwrap_or_else(|_| DEFAULT_BIND_ADDR.to_owned())
|
||||
.parse()?;
|
||||
let static_dir = env::var_os(STATIC_DIR_ENV).map_or_else(default_static_dir, PathBuf::from);
|
||||
let data_dir = env::var_os(DATA_DIR_ENV).map_or_else(default_data_dir, PathBuf::from);
|
||||
Self::from_cli_and_env(CliArgs::default())
|
||||
}
|
||||
|
||||
fn from_cli_and_env(cli: CliArgs) -> Result<Self, Box<dyn Error>> {
|
||||
Self::from_sources(
|
||||
cli,
|
||||
env::var(BIND_ENV).ok(),
|
||||
env::var_os(STATIC_DIR_ENV),
|
||||
env::var_os(DATA_DIR_ENV),
|
||||
env::var_os(TEMP_DIR_ENV),
|
||||
)
|
||||
}
|
||||
|
||||
fn from_sources(
|
||||
cli: CliArgs,
|
||||
bind_env: Option<String>,
|
||||
static_dir_env: Option<OsString>,
|
||||
data_dir_env: Option<OsString>,
|
||||
temp_dir_env: Option<OsString>,
|
||||
) -> Result<Self, Box<dyn Error>> {
|
||||
let bind_addr = match (cli.bind, bind_env) {
|
||||
(Some(bind_addr), _) => bind_addr,
|
||||
(None, Some(bind_addr)) => bind_addr.parse()?,
|
||||
(None, None) => DEFAULT_BIND_ADDR.parse()?,
|
||||
};
|
||||
|
||||
let static_dir = cli
|
||||
.static_dir
|
||||
.or_else(|| static_dir_env.map(PathBuf::from))
|
||||
.unwrap_or_else(default_static_dir);
|
||||
let data_dir = cli
|
||||
.data_dir
|
||||
.or_else(|| data_dir_env.map(PathBuf::from))
|
||||
.unwrap_or_else(default_data_dir);
|
||||
let temp_dir = cli
|
||||
.temp_dir
|
||||
.or_else(|| temp_dir_env.map(PathBuf::from))
|
||||
.unwrap_or_else(|| default_temp_dir(&data_dir));
|
||||
|
||||
Ok(Self {
|
||||
bind_addr,
|
||||
static_dir,
|
||||
data_dir,
|
||||
temp_dir,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -57,18 +133,36 @@ impl AppConfig {
|
||||
bind_addr: SocketAddr,
|
||||
static_dir: impl Into<PathBuf>,
|
||||
data_dir: impl Into<PathBuf>,
|
||||
) -> Self {
|
||||
let data_dir = data_dir.into();
|
||||
let temp_dir = default_temp_dir(&data_dir);
|
||||
Self {
|
||||
bind_addr,
|
||||
static_dir: static_dir.into(),
|
||||
data_dir,
|
||||
temp_dir,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn new_with_temp_dir(
|
||||
bind_addr: SocketAddr,
|
||||
static_dir: impl Into<PathBuf>,
|
||||
data_dir: impl Into<PathBuf>,
|
||||
temp_dir: impl Into<PathBuf>,
|
||||
) -> Self {
|
||||
Self {
|
||||
bind_addr,
|
||||
static_dir: static_dir.into(),
|
||||
data_dir: data_dir.into(),
|
||||
temp_dir: temp_dir.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_router(config: &AppConfig) -> Router {
|
||||
let state = AppState {
|
||||
storage: Storage::new(&config.data_dir),
|
||||
storage: Storage::new(&config.data_dir, &config.temp_dir),
|
||||
};
|
||||
|
||||
Router::new()
|
||||
@@ -103,3 +197,98 @@ fn default_static_dir() -> PathBuf {
|
||||
fn default_data_dir() -> PathBuf {
|
||||
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("data")
|
||||
}
|
||||
|
||||
fn default_temp_dir(data_dir: &Path) -> PathBuf {
|
||||
data_dir.join("staging")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{ffi::OsString, net::SocketAddr, path::PathBuf};
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
use super::{AppConfig, CliArgs};
|
||||
|
||||
#[test]
|
||||
fn parses_config_arguments() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = CliArgs::try_parse_from([
|
||||
"upl",
|
||||
"--bind",
|
||||
"127.0.0.1:4000",
|
||||
"--static-dir",
|
||||
"public",
|
||||
"--data-dir",
|
||||
"uploads",
|
||||
"--temp-dir",
|
||||
"upload-temp",
|
||||
])?;
|
||||
|
||||
assert_eq!(args.bind, Some("127.0.0.1:4000".parse()?));
|
||||
assert_eq!(args.static_dir, Some(PathBuf::from("public")));
|
||||
assert_eq!(args.data_dir, Some(PathBuf::from("uploads")));
|
||||
assert_eq!(args.temp_dir, Some(PathBuf::from("upload-temp")));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_arguments_override_environment_values() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let config = AppConfig::from_sources(
|
||||
CliArgs {
|
||||
bind: Some("127.0.0.1:4000".parse()?),
|
||||
static_dir: Some(PathBuf::from("cli-static")),
|
||||
data_dir: Some(PathBuf::from("cli-data")),
|
||||
temp_dir: Some(PathBuf::from("cli-temp")),
|
||||
},
|
||||
Some("127.0.0.1:3001".to_owned()),
|
||||
Some(OsString::from("env-static")),
|
||||
Some(OsString::from("env-data")),
|
||||
Some(OsString::from("env-temp")),
|
||||
)?;
|
||||
|
||||
assert_eq!(config.bind_addr, "127.0.0.1:4000".parse::<SocketAddr>()?);
|
||||
assert_eq!(config.static_dir, PathBuf::from("cli-static"));
|
||||
assert_eq!(config.data_dir, PathBuf::from("cli-data"));
|
||||
assert_eq!(config.temp_dir, PathBuf::from("cli-temp"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn environment_values_are_used_when_arguments_are_absent()
|
||||
-> Result<(), Box<dyn std::error::Error>> {
|
||||
let config = AppConfig::from_sources(
|
||||
CliArgs::default(),
|
||||
Some("127.0.0.1:3001".to_owned()),
|
||||
Some(OsString::from("env-static")),
|
||||
Some(OsString::from("env-data")),
|
||||
Some(OsString::from("env-temp")),
|
||||
)?;
|
||||
|
||||
assert_eq!(config.bind_addr, "127.0.0.1:3001".parse::<SocketAddr>()?);
|
||||
assert_eq!(config.static_dir, PathBuf::from("env-static"));
|
||||
assert_eq!(config.data_dir, PathBuf::from("env-data"));
|
||||
assert_eq!(config.temp_dir, PathBuf::from("env-temp"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn temp_dir_defaults_under_data_dir() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let config = AppConfig::from_sources(
|
||||
CliArgs {
|
||||
data_dir: Some(PathBuf::from("uploads")),
|
||||
..CliArgs::default()
|
||||
},
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)?;
|
||||
|
||||
assert_eq!(config.temp_dir, PathBuf::from("uploads").join("staging"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ use upl::app::{AppConfig, build_router};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let config = AppConfig::from_env()?;
|
||||
let config = AppConfig::from_args()?;
|
||||
let listener = tokio::net::TcpListener::bind(config.bind_addr).await?;
|
||||
|
||||
println!("upl listening on http://{}", listener.local_addr()?);
|
||||
|
||||
+152
-63
@@ -1,11 +1,15 @@
|
||||
use std::{
|
||||
error::Error,
|
||||
fmt::{self, Display},
|
||||
io::SeekFrom,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
|
||||
use tokio::{fs, io::AsyncWriteExt};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{AsyncSeekExt, AsyncWriteExt},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::model::{
|
||||
@@ -16,25 +20,30 @@ use crate::model::{
|
||||
UploadProgressResponse,
|
||||
};
|
||||
|
||||
const FILE_EXISTS_MESSAGE: &str = "file already exists";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Storage {
|
||||
data_dir: PathBuf,
|
||||
temp_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
#[must_use]
|
||||
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
|
||||
pub fn new(data_dir: impl Into<PathBuf>, temp_dir: impl Into<PathBuf>) -> Self {
|
||||
Self {
|
||||
data_dir: data_dir.into(),
|
||||
temp_dir: temp_dir.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a durable upload metadata record under `data/staging`.
|
||||
/// Creates a durable upload metadata record and temp upload file.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when directories cannot be created, metadata cannot be
|
||||
/// serialized, or the metadata file cannot be written atomically.
|
||||
/// Returns an error when directories cannot be created, the temp file
|
||||
/// cannot be created, metadata cannot be serialized, or the metadata file
|
||||
/// cannot be written atomically.
|
||||
pub async fn create_upload(
|
||||
&self,
|
||||
request: CreateUploadRequest,
|
||||
@@ -47,6 +56,10 @@ impl Storage {
|
||||
self.ensure_layout().await?;
|
||||
|
||||
let safe_name = safe_file_name(original_name);
|
||||
if fs::try_exists(self.final_path(&safe_name)).await? {
|
||||
return Err(StorageError::Conflict(FILE_EXISTS_MESSAGE));
|
||||
}
|
||||
|
||||
let created_at = OffsetDateTime::now_utc().format(&Rfc3339)?;
|
||||
|
||||
for _ in 0..8 {
|
||||
@@ -56,7 +69,12 @@ impl Storage {
|
||||
continue;
|
||||
}
|
||||
|
||||
fs::create_dir_all(upload_dir.join("chunks")).await?;
|
||||
fs::create_dir_all(self.completed_dir(&id)).await?;
|
||||
fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(self.upload_file_path(&id))
|
||||
.await?;
|
||||
|
||||
let meta = UploadMeta {
|
||||
id,
|
||||
@@ -76,12 +94,12 @@ impl Storage {
|
||||
Err(StorageError::IdCollision)
|
||||
}
|
||||
|
||||
/// Loads upload progress by scanning durable chunk files.
|
||||
/// Loads upload progress by scanning durable completion markers.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when the upload id is invalid, metadata is missing, or
|
||||
/// the staging directory cannot be scanned.
|
||||
/// the temp directory cannot be scanned.
|
||||
pub async fn progress(&self, upload_id: &str) -> Result<UploadProgressResponse, StorageError> {
|
||||
let meta = self.load_meta(upload_id).await?;
|
||||
let completed_chunks = self.completed_chunks(&meta).await?;
|
||||
@@ -89,13 +107,13 @@ impl Storage {
|
||||
Ok(meta.progress_response(completed_chunks))
|
||||
}
|
||||
|
||||
/// Validates and stores one raw chunk body.
|
||||
/// Validates and stores one raw chunk body in the temp upload file.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when the upload is unknown, the index is out of range,
|
||||
/// the body length is not the expected chunk length, or the chunk cannot be
|
||||
/// written and renamed into place.
|
||||
/// written to its final offset in the temp upload file.
|
||||
pub async fn store_chunk(
|
||||
&self,
|
||||
upload_id: &str,
|
||||
@@ -111,31 +129,33 @@ impl Storage {
|
||||
return Err(StorageError::InvalidInput("chunk has the wrong length"));
|
||||
}
|
||||
|
||||
let part_path = self.chunk_path(upload_id, index);
|
||||
if let Some(existing_len) = file_len(&part_path).await? {
|
||||
if existing_len == expected_len {
|
||||
if self.chunk_is_complete(&meta, index).await? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
return Err(StorageError::InvalidInput(
|
||||
"existing chunk has the wrong length",
|
||||
));
|
||||
}
|
||||
let mut output = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.open(self.upload_file_path(upload_id))
|
||||
.await?;
|
||||
output
|
||||
.seek(SeekFrom::Start(chunk_offset(&meta, index)))
|
||||
.await?;
|
||||
output.write_all(body).await?;
|
||||
output.flush().await?;
|
||||
drop(output);
|
||||
|
||||
let tmp_path = part_path.with_extension("part.tmp");
|
||||
fs::write(&tmp_path, body).await?;
|
||||
fs::rename(&tmp_path, &part_path).await?;
|
||||
self.mark_chunk_complete(&meta, index).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Assembles a complete upload from verified chunk files.
|
||||
/// Atomically promotes a complete temp upload file into completed storage.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error when the upload is unknown, any expected chunk is
|
||||
/// missing or has the wrong length, the final file already exists, or the
|
||||
/// assembled file cannot be written and renamed.
|
||||
/// missing, the final file already exists, or the temp upload file cannot
|
||||
/// be renamed into place.
|
||||
pub async fn complete_upload(
|
||||
&self,
|
||||
upload_id: &str,
|
||||
@@ -144,33 +164,10 @@ impl Storage {
|
||||
|
||||
self.verify_all_chunks(&meta).await?;
|
||||
|
||||
let final_path = self.complete_dir().join(&meta.safe_name);
|
||||
if fs::try_exists(&final_path).await? {
|
||||
return Err(StorageError::Conflict("complete file already exists"));
|
||||
}
|
||||
let final_path = self.final_path(&meta.safe_name);
|
||||
|
||||
let tmp_path = self
|
||||
.complete_dir()
|
||||
.join(format!(".{}.{}.tmp", meta.safe_name, meta.id));
|
||||
if fs::try_exists(&tmp_path).await? {
|
||||
fs::remove_file(&tmp_path).await?;
|
||||
}
|
||||
|
||||
let mut output = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(&tmp_path)
|
||||
.await?;
|
||||
|
||||
for index in 0..meta.total_chunks {
|
||||
let bytes = fs::read(self.chunk_path(upload_id, index)).await?;
|
||||
output.write_all(&bytes).await?;
|
||||
}
|
||||
|
||||
output.flush().await?;
|
||||
drop(output);
|
||||
|
||||
fs::rename(&tmp_path, &final_path).await?;
|
||||
promote_file_without_overwrite(&self.upload_file_path(upload_id), &final_path).await?;
|
||||
self.remove_upload_dir(upload_id).await?;
|
||||
|
||||
Ok(meta.complete_response(final_path.display().to_string()))
|
||||
}
|
||||
@@ -180,27 +177,43 @@ impl Storage {
|
||||
&self.data_dir
|
||||
}
|
||||
|
||||
fn staging_dir(&self) -> PathBuf {
|
||||
self.data_dir.join("staging")
|
||||
#[must_use]
|
||||
pub fn temp_dir(&self) -> &Path {
|
||||
&self.temp_dir
|
||||
}
|
||||
|
||||
fn complete_dir(&self) -> PathBuf {
|
||||
fn staging_dir(&self) -> PathBuf {
|
||||
self.temp_dir.clone()
|
||||
}
|
||||
|
||||
fn final_dir(&self) -> PathBuf {
|
||||
self.data_dir.join("complete")
|
||||
}
|
||||
|
||||
fn final_path(&self, safe_name: &str) -> PathBuf {
|
||||
self.final_dir().join(safe_name)
|
||||
}
|
||||
|
||||
fn upload_dir(&self, upload_id: &str) -> PathBuf {
|
||||
self.staging_dir().join(upload_id)
|
||||
}
|
||||
|
||||
fn chunk_path(&self, upload_id: &str, index: u64) -> PathBuf {
|
||||
self.upload_dir(upload_id)
|
||||
.join("chunks")
|
||||
.join(format!("{index:06}.part"))
|
||||
fn upload_file_path(&self, upload_id: &str) -> PathBuf {
|
||||
self.upload_dir(upload_id).join(".upload.tmp")
|
||||
}
|
||||
|
||||
fn completed_dir(&self, upload_id: &str) -> PathBuf {
|
||||
self.upload_dir(upload_id).join("completed")
|
||||
}
|
||||
|
||||
fn completed_marker_path(&self, upload_id: &str, index: u64) -> PathBuf {
|
||||
self.completed_dir(upload_id)
|
||||
.join(format!("{index:06}.done"))
|
||||
}
|
||||
|
||||
async fn ensure_layout(&self) -> Result<(), StorageError> {
|
||||
fs::create_dir_all(self.staging_dir()).await?;
|
||||
fs::create_dir_all(self.complete_dir()).await?;
|
||||
fs::create_dir_all(self.final_dir()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -234,8 +247,7 @@ impl Storage {
|
||||
let mut completed = Vec::new();
|
||||
|
||||
for index in 0..meta.total_chunks {
|
||||
let expected_len = expected_chunk_len(meta, index)?;
|
||||
if file_len(&self.chunk_path(&meta.id, index)).await? == Some(expected_len) {
|
||||
if self.chunk_is_complete(meta, index).await? {
|
||||
completed.push(index);
|
||||
}
|
||||
}
|
||||
@@ -244,11 +256,12 @@ impl Storage {
|
||||
}
|
||||
|
||||
async fn verify_all_chunks(&self, meta: &UploadMeta) -> Result<(), StorageError> {
|
||||
for index in 0..meta.total_chunks {
|
||||
let expected_len = expected_chunk_len(meta, index)?;
|
||||
let actual_len = file_len(&self.chunk_path(&meta.id, index)).await?;
|
||||
if file_len(&self.upload_file_path(&meta.id)).await? != Some(meta.size) {
|
||||
return Err(StorageError::Conflict("upload data file is incomplete"));
|
||||
}
|
||||
|
||||
if actual_len != Some(expected_len) {
|
||||
for index in 0..meta.total_chunks {
|
||||
if !self.chunk_is_complete(meta, index).await? {
|
||||
return Err(StorageError::Conflict(
|
||||
"upload is missing one or more complete chunks",
|
||||
));
|
||||
@@ -257,6 +270,38 @@ impl Storage {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn chunk_is_complete(&self, meta: &UploadMeta, index: u64) -> Result<bool, StorageError> {
|
||||
expected_chunk_len(meta, index)?;
|
||||
Ok(file_len(&self.completed_marker_path(&meta.id, index))
|
||||
.await?
|
||||
.is_some())
|
||||
}
|
||||
|
||||
async fn mark_chunk_complete(&self, meta: &UploadMeta, index: u64) -> Result<(), StorageError> {
|
||||
let marker_path = self.completed_marker_path(&meta.id, index);
|
||||
match fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(marker_path)
|
||||
.await
|
||||
{
|
||||
Ok(mut marker) => {
|
||||
marker.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => Ok(()),
|
||||
Err(error) => Err(error.into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_upload_dir(&self, upload_id: &str) -> Result<(), StorageError> {
|
||||
match fs::remove_dir_all(self.upload_dir(upload_id)).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
|
||||
Err(error) => Err(error.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -368,6 +413,10 @@ fn expected_chunk_len(meta: &UploadMeta, index: u64) -> Result<u64, StorageError
|
||||
}
|
||||
}
|
||||
|
||||
fn chunk_offset(meta: &UploadMeta, index: u64) -> u64 {
|
||||
meta.chunk_size * index
|
||||
}
|
||||
|
||||
async fn file_len(path: &Path) -> Result<Option<u64>, StorageError> {
|
||||
match fs::metadata(path).await {
|
||||
Ok(metadata) if metadata.is_file() => Ok(Some(metadata.len())),
|
||||
@@ -377,6 +426,46 @@ async fn file_len(path: &Path) -> Result<Option<u64>, StorageError> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn promote_file_without_overwrite(
|
||||
source: &Path,
|
||||
destination: &Path,
|
||||
) -> Result<(), StorageError> {
|
||||
match fs::hard_link(source, destination).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
return Err(StorageError::Conflict(FILE_EXISTS_MESSAGE));
|
||||
}
|
||||
Err(_) => {}
|
||||
}
|
||||
|
||||
let mut output = match fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
.open(destination)
|
||||
.await
|
||||
{
|
||||
Ok(output) => output,
|
||||
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
return Err(StorageError::Conflict(FILE_EXISTS_MESSAGE));
|
||||
}
|
||||
Err(error) => return Err(error.into()),
|
||||
};
|
||||
|
||||
let copy_result = async {
|
||||
let mut input = fs::File::open(source).await?;
|
||||
tokio::io::copy(&mut input, &mut output).await?;
|
||||
output.flush().await
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(error) = copy_result {
|
||||
let _ = fs::remove_file(destination).await;
|
||||
return Err(error.into());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn validate_upload_id(upload_id: &str) -> Result<(), StorageError> {
|
||||
let is_valid = !upload_id.is_empty()
|
||||
&& upload_id
|
||||
|
||||
+556
-135
@@ -1,36 +1,33 @@
|
||||
const DB_NAME = "upl";
|
||||
const DB_VERSION = 1;
|
||||
const STORE_NAME = "uploads";
|
||||
const CONCURRENCY = 3;
|
||||
const CHUNK_CONCURRENCY = 3;
|
||||
const FILE_CONCURRENCY = 3;
|
||||
const MAX_RETRIES = 5;
|
||||
const BASE_RETRY_DELAY_MS = 500;
|
||||
const FILE_EXISTS_MESSAGE = "file already exists";
|
||||
|
||||
const fileInput = document.querySelector("#file-input");
|
||||
const pickButton = document.querySelector("#pick-button");
|
||||
const fileSummary = document.querySelector("#file-summary");
|
||||
const fileName = document.querySelector("#file-name");
|
||||
const fileSize = document.querySelector("#file-size");
|
||||
const uploadSection = document.querySelector("#upload-section");
|
||||
const uploadList = document.querySelector("#upload-list");
|
||||
const startButton = document.querySelector("#start-button");
|
||||
const pauseButton = document.querySelector("#pause-button");
|
||||
const resumeButton = document.querySelector("#resume-button");
|
||||
const eventLog = document.querySelector("#event-log");
|
||||
const progressBar = document.querySelector("#progress-bar");
|
||||
const progressMeta = document.querySelector("#progress-meta");
|
||||
const pendingSection = document.querySelector("#pending-section");
|
||||
const pendingList = document.querySelector("#pending-list");
|
||||
|
||||
const state = {
|
||||
abortController: null,
|
||||
completedChunks: new Set(),
|
||||
file: null,
|
||||
fileHandle: null,
|
||||
pendingRecords: [],
|
||||
record: null,
|
||||
resumeAfterReselect: null,
|
||||
running: false,
|
||||
schedulerAbortController: null,
|
||||
schedulerRunning: false,
|
||||
uploadItems: [],
|
||||
};
|
||||
|
||||
const dbReady = "indexedDB" in window ? openDatabase() : Promise.resolve(null);
|
||||
let nextUploadItemId = 1;
|
||||
let saveChain = Promise.resolve();
|
||||
|
||||
function formatBytes(bytes) {
|
||||
@@ -100,7 +97,9 @@ async function withStore(mode, callback) {
|
||||
|
||||
async function loadRecords() {
|
||||
const records = (await withStore("readonly", (store) => store.getAll())) ?? [];
|
||||
records.sort((left, right) => right.updated_at.localeCompare(left.updated_at));
|
||||
records.sort((left, right) =>
|
||||
(right.updated_at ?? "").localeCompare(left.updated_at ?? ""),
|
||||
);
|
||||
state.pendingRecords = records;
|
||||
renderPendingRecords();
|
||||
}
|
||||
@@ -108,7 +107,7 @@ async function loadRecords() {
|
||||
async function saveRecord(record) {
|
||||
const nextSave = saveChain.catch(() => null).then(() => writeRecord(record));
|
||||
saveChain = nextSave;
|
||||
await nextSave;
|
||||
return nextSave;
|
||||
}
|
||||
|
||||
async function writeRecord(record) {
|
||||
@@ -124,23 +123,31 @@ async function writeRecord(record) {
|
||||
await withStore("readwrite", (store) => store.put(storedRecord));
|
||||
log("Saved resume state without a reusable file handle.");
|
||||
}
|
||||
state.record = storedRecord;
|
||||
|
||||
await loadRecords();
|
||||
return storedRecord;
|
||||
}
|
||||
|
||||
async function deleteRecord(uploadId) {
|
||||
await withStore("readwrite", (store) => store.delete(uploadId));
|
||||
if (state.record?.upload_id === uploadId) {
|
||||
state.record = null;
|
||||
}
|
||||
await loadRecords();
|
||||
}
|
||||
|
||||
function renderPendingRecords() {
|
||||
pendingList.replaceChildren();
|
||||
pendingSection.hidden = state.pendingRecords.length === 0;
|
||||
|
||||
for (const record of state.pendingRecords) {
|
||||
const activeUploadIds = new Set(
|
||||
state.uploadItems
|
||||
.map((item) => item.record?.upload_id)
|
||||
.filter((uploadId) => Boolean(uploadId)),
|
||||
);
|
||||
const visibleRecords = state.pendingRecords.filter(
|
||||
(record) => !activeUploadIds.has(record.upload_id),
|
||||
);
|
||||
|
||||
pendingSection.hidden = visibleRecords.length === 0;
|
||||
|
||||
for (const record of visibleRecords) {
|
||||
const item = document.createElement("li");
|
||||
item.className = "pending-item";
|
||||
|
||||
@@ -151,12 +158,13 @@ function renderPendingRecords() {
|
||||
title.textContent = record.name;
|
||||
|
||||
const detail = document.createElement("span");
|
||||
detail.textContent = `${formatBytes(record.size)} - ${record.completed_chunks ?? 0} of ${record.total_chunks} chunks`;
|
||||
detail.textContent = savedUploadDetail(record);
|
||||
|
||||
const resume = document.createElement("button");
|
||||
resume.type = "button";
|
||||
resume.className = "secondary";
|
||||
resume.textContent = "Resume";
|
||||
resume.textContent = isReadyToFinish(record) ? "Finish" : "Resume";
|
||||
resume.disabled = state.schedulerRunning || !hasAvailableFileSlot();
|
||||
resume.addEventListener("click", () => {
|
||||
void resumePendingRecord(record);
|
||||
});
|
||||
@@ -164,7 +172,8 @@ function renderPendingRecords() {
|
||||
const remove = document.createElement("button");
|
||||
remove.type = "button";
|
||||
remove.className = "danger";
|
||||
remove.textContent = "Remove";
|
||||
remove.textContent = "Clear";
|
||||
remove.disabled = state.schedulerRunning;
|
||||
remove.addEventListener("click", () => {
|
||||
void deleteRecord(record.upload_id);
|
||||
});
|
||||
@@ -175,29 +184,115 @@ function renderPendingRecords() {
|
||||
}
|
||||
}
|
||||
|
||||
function renderFile(file) {
|
||||
if (!file) {
|
||||
fileSummary.hidden = true;
|
||||
fileName.textContent = "";
|
||||
fileSize.textContent = "";
|
||||
return;
|
||||
function renderUploadItems() {
|
||||
uploadList.replaceChildren();
|
||||
uploadSection.hidden = state.uploadItems.length === 0;
|
||||
|
||||
for (const item of state.uploadItems) {
|
||||
const row = document.createElement("li");
|
||||
row.className = "upload-item";
|
||||
if (item.terminal) {
|
||||
row.classList.add("upload-item-blocked");
|
||||
row.setAttribute("aria-invalid", "true");
|
||||
}
|
||||
|
||||
fileName.textContent = file.name;
|
||||
fileSize.textContent = formatBytes(file.size);
|
||||
fileSummary.hidden = false;
|
||||
const header = document.createElement("div");
|
||||
header.className = "upload-item-header";
|
||||
|
||||
const meta = document.createElement("div");
|
||||
meta.className = "upload-meta";
|
||||
|
||||
const title = document.createElement("strong");
|
||||
title.textContent = item.file.name;
|
||||
|
||||
const detail = document.createElement("span");
|
||||
detail.textContent = uploadItemDetail(item);
|
||||
|
||||
meta.append(title, detail);
|
||||
|
||||
const actions = document.createElement("div");
|
||||
actions.className = "upload-item-actions";
|
||||
|
||||
const start = document.createElement("button");
|
||||
start.type = "button";
|
||||
start.textContent = uploadActionLabel(item);
|
||||
start.disabled =
|
||||
!canRunItem(item) || state.schedulerRunning || !hasAvailableFileSlot();
|
||||
start.addEventListener("click", () => {
|
||||
void runUploadItem(item);
|
||||
});
|
||||
|
||||
const pause = document.createElement("button");
|
||||
pause.type = "button";
|
||||
pause.className = "secondary";
|
||||
pause.textContent = "Pause";
|
||||
pause.disabled = !item.running;
|
||||
pause.addEventListener("click", () => {
|
||||
item.abortController?.abort();
|
||||
});
|
||||
|
||||
const remove = document.createElement("button");
|
||||
remove.type = "button";
|
||||
remove.className = "secondary";
|
||||
remove.textContent = "Remove";
|
||||
remove.disabled = item.running || item.queued;
|
||||
remove.addEventListener("click", () => {
|
||||
removeUploadItem(item);
|
||||
});
|
||||
|
||||
actions.append(start, pause, remove);
|
||||
header.append(meta, actions);
|
||||
|
||||
const progress = document.createElement("div");
|
||||
progress.className = "upload-progress";
|
||||
|
||||
const progressWrap = document.createElement("div");
|
||||
progressWrap.className = "progress-wrap";
|
||||
progressWrap.setAttribute("aria-label", `${item.file.name} upload progress`);
|
||||
|
||||
const progressBar = document.createElement("div");
|
||||
progressBar.className = "progress-bar";
|
||||
progressBar.style.width = `${progressPercentage(
|
||||
item.completedCount,
|
||||
item.totalChunks,
|
||||
)}%`;
|
||||
|
||||
const progressMeta = document.createElement("div");
|
||||
progressMeta.className = "progress-meta";
|
||||
progressMeta.textContent = `${item.completedCount} of ${item.totalChunks} chunks`;
|
||||
|
||||
progressWrap.append(progressBar);
|
||||
progress.append(progressWrap, progressMeta);
|
||||
row.append(header, progress);
|
||||
uploadList.append(row);
|
||||
}
|
||||
}
|
||||
|
||||
function renderButtons() {
|
||||
startButton.disabled = !state.file || state.running || Boolean(state.record);
|
||||
pauseButton.disabled = !state.running;
|
||||
resumeButton.disabled = !state.file || state.running || !state.record;
|
||||
const hasRunnable = state.uploadItems.some((item) => canRunItem(item));
|
||||
const hasRunnableResume = state.uploadItems.some(
|
||||
(item) => item.record && canRunItem(item),
|
||||
);
|
||||
const hasRunningOrQueued = state.uploadItems.some((item) => item.running || item.queued);
|
||||
const hasFileSlot = hasAvailableFileSlot();
|
||||
|
||||
startButton.disabled = !hasRunnable || state.schedulerRunning || !hasFileSlot;
|
||||
pauseButton.disabled = !hasRunningOrQueued;
|
||||
resumeButton.disabled = !hasRunnableResume || state.schedulerRunning || !hasFileSlot;
|
||||
}
|
||||
|
||||
function updateProgress(completedCount, totalChunks) {
|
||||
const percentage = totalChunks === 0 ? 100 : (completedCount / totalChunks) * 100;
|
||||
progressBar.style.width = `${percentage}%`;
|
||||
progressMeta.textContent = `${completedCount} of ${totalChunks} chunks`;
|
||||
function renderAll() {
|
||||
renderUploadItems();
|
||||
renderPendingRecords();
|
||||
renderButtons();
|
||||
}
|
||||
|
||||
function progressPercentage(completedCount, totalChunks) {
|
||||
if (totalChunks <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return Math.min(100, Math.max(0, (completedCount / totalChunks) * 100));
|
||||
}
|
||||
|
||||
function sameFile(record, file) {
|
||||
@@ -208,34 +303,212 @@ function sameFile(record, file) {
|
||||
);
|
||||
}
|
||||
|
||||
function sameUploadItemFile(item, file) {
|
||||
return (
|
||||
item.file.name === file.name &&
|
||||
item.file.size === file.size &&
|
||||
item.file.lastModified === file.lastModified
|
||||
);
|
||||
}
|
||||
|
||||
function findPendingRecord(file) {
|
||||
return state.pendingRecords.find((record) => sameFile(record, file)) ?? null;
|
||||
}
|
||||
|
||||
async function selectFile(file, fileHandle = null, record = null) {
|
||||
if (record && !sameFile(record, file)) {
|
||||
log("Selected file does not match the pending upload.");
|
||||
function findUploadItem(file, record = null) {
|
||||
return (
|
||||
state.uploadItems.find((item) => {
|
||||
if (item.finished || item.terminal) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (record?.upload_id && item.record?.upload_id === record.upload_id) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return sameUploadItemFile(item, file);
|
||||
}) ?? null
|
||||
);
|
||||
}
|
||||
|
||||
function completedChunkCount(record) {
|
||||
return Math.min(record.completed_chunks ?? 0, record.total_chunks ?? 0);
|
||||
}
|
||||
|
||||
function isReadyToFinish(record) {
|
||||
const totalChunks = record.total_chunks ?? 0;
|
||||
return totalChunks === 0 || completedChunkCount(record) >= totalChunks;
|
||||
}
|
||||
|
||||
function isUploadItemReadyToFinish(item) {
|
||||
return (
|
||||
Boolean(item.record) &&
|
||||
(item.totalChunks === 0 || item.completedCount >= item.totalChunks)
|
||||
);
|
||||
}
|
||||
|
||||
function savedUploadDetail(record) {
|
||||
const totalChunks = record.total_chunks ?? 0;
|
||||
const completedChunks = completedChunkCount(record);
|
||||
|
||||
if (isReadyToFinish(record)) {
|
||||
return `${formatBytes(record.size)} - ready to finish`;
|
||||
}
|
||||
|
||||
if (completedChunks === 0) {
|
||||
return `${formatBytes(record.size)} - not uploaded yet`;
|
||||
}
|
||||
|
||||
return `${formatBytes(record.size)} - ${completedChunks} of ${totalChunks} chunks uploaded`;
|
||||
}
|
||||
|
||||
function uploadItemDetail(item) {
|
||||
return `${formatBytes(item.file.size)} - ${item.statusText}`;
|
||||
}
|
||||
|
||||
function uploadActionLabel(item) {
|
||||
if (item.terminal) {
|
||||
return "Unavailable";
|
||||
}
|
||||
|
||||
if (item.finished) {
|
||||
return "Done";
|
||||
}
|
||||
|
||||
if (!item.record) {
|
||||
return "Start";
|
||||
}
|
||||
|
||||
return isUploadItemReadyToFinish(item) ? "Finish" : "Resume";
|
||||
}
|
||||
|
||||
function initialUploadStatus(record) {
|
||||
if (!record) {
|
||||
return "Ready to create an upload record.";
|
||||
}
|
||||
|
||||
if (isReadyToFinish(record)) {
|
||||
return "Ready to finish saved upload.";
|
||||
}
|
||||
|
||||
return "Ready to resume upload.";
|
||||
}
|
||||
|
||||
function canRunItem(item) {
|
||||
return (
|
||||
Boolean(item.file) &&
|
||||
!item.running &&
|
||||
!item.queued &&
|
||||
!item.finished &&
|
||||
!item.terminal
|
||||
);
|
||||
}
|
||||
|
||||
function runningFileCount() {
|
||||
return state.uploadItems.filter((item) => item.running).length;
|
||||
}
|
||||
|
||||
function hasAvailableFileSlot() {
|
||||
return runningFileCount() < FILE_CONCURRENCY;
|
||||
}
|
||||
|
||||
function setItemProgress(item, completedCount, totalChunks) {
|
||||
item.totalChunks = Math.max(0, totalChunks);
|
||||
item.completedCount =
|
||||
item.totalChunks === 0
|
||||
? 0
|
||||
: Math.min(Math.max(0, completedCount), item.totalChunks);
|
||||
}
|
||||
|
||||
async function selectFiles(files, fileHandles = []) {
|
||||
const selectedFiles = Array.from(files);
|
||||
if (selectedFiles.length === 0) {
|
||||
log("Choose files to begin.");
|
||||
renderButtons();
|
||||
return;
|
||||
}
|
||||
|
||||
state.file = file;
|
||||
state.fileHandle = fileHandle;
|
||||
state.record = record ?? findPendingRecord(file);
|
||||
state.completedChunks = new Set();
|
||||
const resumeRecord = state.resumeAfterReselect;
|
||||
let matchedResumeRecord = false;
|
||||
let addedCount = 0;
|
||||
|
||||
renderFile(file);
|
||||
updateProgress(state.record?.completed_chunks ?? 0, state.record?.total_chunks ?? 0);
|
||||
renderButtons();
|
||||
for (const [index, file] of selectedFiles.entries()) {
|
||||
let record = findPendingRecord(file);
|
||||
if (resumeRecord && sameFile(resumeRecord, file)) {
|
||||
record = resumeRecord;
|
||||
matchedResumeRecord = true;
|
||||
}
|
||||
|
||||
log(state.record ? "Ready to resume upload." : "Ready to create an upload record.");
|
||||
const previousCount = state.uploadItems.length;
|
||||
addUploadItem(file, fileHandles[index] ?? null, record);
|
||||
if (state.uploadItems.length > previousCount) {
|
||||
addedCount += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (resumeRecord && !matchedResumeRecord) {
|
||||
log("Selected files did not include the pending upload file.");
|
||||
}
|
||||
|
||||
state.resumeAfterReselect = null;
|
||||
fileInput.value = "";
|
||||
renderAll();
|
||||
|
||||
if (addedCount === 1) {
|
||||
log("Ready to upload 1 file.");
|
||||
} else if (addedCount > 1) {
|
||||
log(`Ready to upload ${addedCount} files.`);
|
||||
}
|
||||
}
|
||||
|
||||
function addUploadItem(file, fileHandle = null, record = null) {
|
||||
if (record && !sameFile(record, file)) {
|
||||
log("Selected file does not match the pending upload.");
|
||||
return null;
|
||||
}
|
||||
|
||||
const existingItem = findUploadItem(file, record);
|
||||
if (existingItem) {
|
||||
log(`${file.name} is already selected.`);
|
||||
return existingItem;
|
||||
}
|
||||
|
||||
const item = {
|
||||
abortController: null,
|
||||
completedChunks: new Set(),
|
||||
completedCount: record ? completedChunkCount(record) : 0,
|
||||
file,
|
||||
fileHandle,
|
||||
finished: false,
|
||||
id: nextUploadItemId,
|
||||
queued: false,
|
||||
record,
|
||||
running: false,
|
||||
statusText: initialUploadStatus(record),
|
||||
terminal: false,
|
||||
totalChunks: record?.total_chunks ?? 0,
|
||||
};
|
||||
|
||||
nextUploadItemId += 1;
|
||||
state.uploadItems.push(item);
|
||||
return item;
|
||||
}
|
||||
|
||||
function removeUploadItem(item) {
|
||||
if (item.running || item.queued) {
|
||||
return;
|
||||
}
|
||||
|
||||
state.uploadItems = state.uploadItems.filter((candidate) => candidate.id !== item.id);
|
||||
renderAll();
|
||||
}
|
||||
|
||||
async function pickFile() {
|
||||
if ("showOpenFilePicker" in window) {
|
||||
try {
|
||||
const [handle] = await window.showOpenFilePicker({ multiple: false });
|
||||
const file = await handle.getFile();
|
||||
await selectFile(file, handle);
|
||||
const handles = await window.showOpenFilePicker({ multiple: true });
|
||||
const files = await Promise.all(handles.map((handle) => handle.getFile()));
|
||||
await selectFiles(files, handles);
|
||||
return;
|
||||
} catch (error) {
|
||||
if (isAbortError(error)) {
|
||||
@@ -249,18 +522,7 @@ async function pickFile() {
|
||||
}
|
||||
|
||||
fileInput.addEventListener("change", () => {
|
||||
const [file] = fileInput.files;
|
||||
|
||||
if (!file) {
|
||||
renderFile(null);
|
||||
renderButtons();
|
||||
log("Choose a file to begin.");
|
||||
return;
|
||||
}
|
||||
|
||||
const record = state.resumeAfterReselect ?? findPendingRecord(file);
|
||||
state.resumeAfterReselect = null;
|
||||
void selectFile(file, null, record);
|
||||
void selectFiles(fileInput.files);
|
||||
});
|
||||
|
||||
pickButton.addEventListener("click", () => {
|
||||
@@ -268,21 +530,19 @@ pickButton.addEventListener("click", () => {
|
||||
});
|
||||
|
||||
startButton.addEventListener("click", () => {
|
||||
void runUpload();
|
||||
void runUploadItems(state.uploadItems);
|
||||
});
|
||||
|
||||
pauseButton.addEventListener("click", () => {
|
||||
if (state.abortController) {
|
||||
state.abortController.abort();
|
||||
}
|
||||
pauseUploads();
|
||||
});
|
||||
|
||||
resumeButton.addEventListener("click", () => {
|
||||
void runUpload();
|
||||
void runUploadItems(state.uploadItems.filter((item) => item.record));
|
||||
});
|
||||
|
||||
async function resumePendingRecord(record) {
|
||||
if (state.running) {
|
||||
if (state.schedulerRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -290,14 +550,21 @@ async function resumePendingRecord(record) {
|
||||
const granted = await requestFileHandlePermission(record.file_handle);
|
||||
if (granted) {
|
||||
const file = await record.file_handle.getFile();
|
||||
await selectFile(file, record.file_handle, record);
|
||||
await runUpload();
|
||||
const item = addUploadItem(file, record.file_handle, record);
|
||||
renderAll();
|
||||
if (item) {
|
||||
await runUploadItem(item);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
state.resumeAfterReselect = record;
|
||||
log("Select the same file to resume.");
|
||||
log(
|
||||
isReadyToFinish(record)
|
||||
? "Select the same file to finish."
|
||||
: "Select the same file to resume.",
|
||||
);
|
||||
fileInput.click();
|
||||
}
|
||||
|
||||
@@ -314,101 +581,238 @@ async function requestFileHandlePermission(handle) {
|
||||
return (await handle.requestPermission(options)) === "granted";
|
||||
}
|
||||
|
||||
async function runUpload() {
|
||||
if (!state.file || state.running) {
|
||||
async function runUploadItems(items) {
|
||||
if (state.schedulerRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
const runnableItems = items.filter((item) => canRunItem(item));
|
||||
const availableSlots = FILE_CONCURRENCY - runningFileCount();
|
||||
if (runnableItems.length === 0 || availableSlots <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const controller = new AbortController();
|
||||
state.abortController = controller;
|
||||
state.running = true;
|
||||
renderButtons();
|
||||
state.schedulerAbortController = controller;
|
||||
state.schedulerRunning = true;
|
||||
|
||||
for (const item of runnableItems) {
|
||||
item.queued = true;
|
||||
item.statusText = "Queued.";
|
||||
}
|
||||
renderAll();
|
||||
|
||||
try {
|
||||
if (!state.record) {
|
||||
await createUploadRecord();
|
||||
await runPool(
|
||||
runnableItems,
|
||||
async (item) => {
|
||||
throwIfAborted(controller.signal);
|
||||
if (!item.queued) {
|
||||
return;
|
||||
}
|
||||
await runUploadItem(item);
|
||||
},
|
||||
availableSlots,
|
||||
controller.signal,
|
||||
);
|
||||
} catch (error) {
|
||||
if (!isAbortError(error)) {
|
||||
log(`Upload scheduler failed: ${error.message}`);
|
||||
}
|
||||
} finally {
|
||||
for (const item of runnableItems) {
|
||||
if (item.queued) {
|
||||
item.queued = false;
|
||||
item.statusText = "Paused.";
|
||||
}
|
||||
}
|
||||
|
||||
const progress = await fetchJson(`/api/uploads/${state.record.upload_id}`, {
|
||||
if (state.schedulerAbortController === controller) {
|
||||
state.schedulerAbortController = null;
|
||||
}
|
||||
state.schedulerRunning = false;
|
||||
renderAll();
|
||||
}
|
||||
}
|
||||
|
||||
function pauseUploads() {
|
||||
state.schedulerAbortController?.abort();
|
||||
|
||||
for (const item of state.uploadItems) {
|
||||
if (item.running) {
|
||||
item.abortController?.abort();
|
||||
} else if (item.queued) {
|
||||
item.queued = false;
|
||||
item.statusText = "Paused.";
|
||||
}
|
||||
}
|
||||
|
||||
renderAll();
|
||||
}
|
||||
|
||||
async function runUploadItem(item) {
|
||||
if (
|
||||
item.running ||
|
||||
item.finished ||
|
||||
item.terminal ||
|
||||
(!item.queued && !hasAvailableFileSlot()) ||
|
||||
(!item.queued && !canRunItem(item))
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const controller = new AbortController();
|
||||
item.abortController = controller;
|
||||
item.queued = false;
|
||||
item.running = true;
|
||||
item.statusText = item.record
|
||||
? "Checking saved progress."
|
||||
: "Creating upload record.";
|
||||
renderAll();
|
||||
|
||||
try {
|
||||
if (!item.record) {
|
||||
await createUploadRecord(item, controller.signal);
|
||||
}
|
||||
|
||||
const progress = await fetchJson(`/api/uploads/${item.record.upload_id}`, {
|
||||
signal: controller.signal,
|
||||
});
|
||||
state.completedChunks = new Set(progress.completed_chunks);
|
||||
await saveRecord({
|
||||
...state.record,
|
||||
completed_chunks: state.completedChunks.size,
|
||||
item.completedChunks = new Set(progress.completed_chunks);
|
||||
setItemProgress(item, item.completedChunks.size, progress.total_chunks);
|
||||
item.record = await saveRecord({
|
||||
...item.record,
|
||||
completed_chunks: item.completedCount,
|
||||
chunk_size: progress.chunk_size,
|
||||
total_chunks: progress.total_chunks,
|
||||
});
|
||||
updateProgress(state.completedChunks.size, progress.total_chunks);
|
||||
renderAll();
|
||||
|
||||
const missingChunks = buildMissingChunkList(progress.total_chunks, state.completedChunks);
|
||||
log(
|
||||
const missingChunks = buildMissingChunkList(progress.total_chunks, item.completedChunks);
|
||||
item.statusText =
|
||||
missingChunks.length === 0
|
||||
? "All chunks already uploaded."
|
||||
: `Uploading ${missingChunks.length} missing chunks.`,
|
||||
);
|
||||
: `Uploading ${missingChunks.length} missing chunks.`;
|
||||
renderAll();
|
||||
|
||||
await runPool(missingChunks, (index) =>
|
||||
uploadChunkWithRetry(index, progress.chunk_size, progress.total_chunks, controller.signal),
|
||||
await runPool(
|
||||
missingChunks,
|
||||
(index) =>
|
||||
uploadChunkWithRetry(
|
||||
item,
|
||||
index,
|
||||
progress.chunk_size,
|
||||
progress.total_chunks,
|
||||
controller.signal,
|
||||
),
|
||||
CHUNK_CONCURRENCY,
|
||||
controller.signal,
|
||||
);
|
||||
|
||||
if (controller.signal.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
log("Completing upload.");
|
||||
const complete = await fetchJson(`/api/uploads/${state.record.upload_id}/complete`, {
|
||||
item.statusText = "Completing upload.";
|
||||
renderAll();
|
||||
const uploadId = item.record.upload_id;
|
||||
const complete = await fetchJson(`/api/uploads/${uploadId}/complete`, {
|
||||
method: "POST",
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
updateProgress(progress.total_chunks, progress.total_chunks);
|
||||
log(`Complete: ${complete.file_path}`);
|
||||
await deleteRecord(state.record.upload_id);
|
||||
state.completedChunks = new Set();
|
||||
state.record = null;
|
||||
setItemProgress(item, progress.total_chunks, progress.total_chunks);
|
||||
item.finished = true;
|
||||
item.statusText = `Complete: ${complete.file_path}`;
|
||||
await deleteRecord(uploadId);
|
||||
} catch (error) {
|
||||
if (controller.signal.aborted || isAbortError(error)) {
|
||||
log("Upload paused.");
|
||||
item.statusText = "Paused.";
|
||||
} else if (await handleTerminalUploadError(item, error)) {
|
||||
controller.abort();
|
||||
} else {
|
||||
controller.abort();
|
||||
log(`Upload failed: ${error.message}`);
|
||||
item.statusText = `Upload failed: ${error.message}`;
|
||||
}
|
||||
} finally {
|
||||
if (state.abortController === controller) {
|
||||
state.abortController = null;
|
||||
if (item.abortController === controller) {
|
||||
item.abortController = null;
|
||||
}
|
||||
state.running = false;
|
||||
renderButtons();
|
||||
item.running = false;
|
||||
renderAll();
|
||||
}
|
||||
}
|
||||
|
||||
async function createUploadRecord() {
|
||||
async function handleTerminalUploadError(item, error) {
|
||||
if (typeof error.status !== "number") {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isFileExistsConflict(error)) {
|
||||
if (item.record) {
|
||||
await deleteRecord(item.record.upload_id);
|
||||
}
|
||||
item.record = null;
|
||||
item.completedChunks = new Set();
|
||||
setItemProgress(item, 0, 0);
|
||||
item.terminal = true;
|
||||
item.statusText =
|
||||
"File already exists on the server. Rename the file to upload this copy.";
|
||||
log(`${item.file.name}: file already exists. Rename it to upload this copy.`);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!item.record) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const uploadId = item.record.upload_id;
|
||||
|
||||
if (error.status === 404) {
|
||||
await deleteRecord(uploadId);
|
||||
item.record = null;
|
||||
item.completedChunks = new Set();
|
||||
setItemProgress(item, 0, 0);
|
||||
item.statusText = "Saved upload progress no longer exists. Start again.";
|
||||
log(`${item.file.name}: saved upload progress no longer exists on the server.`);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function isFileExistsConflict(error) {
|
||||
return error.status === 409 && error.message === FILE_EXISTS_MESSAGE;
|
||||
}
|
||||
|
||||
async function createUploadRecord(item, signal) {
|
||||
const response = await fetchJson("/api/uploads", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({
|
||||
name: state.file.name,
|
||||
size: state.file.size,
|
||||
last_modified: state.file.lastModified,
|
||||
name: item.file.name,
|
||||
size: item.file.size,
|
||||
last_modified: item.file.lastModified,
|
||||
}),
|
||||
signal,
|
||||
});
|
||||
|
||||
const record = {
|
||||
upload_id: response.upload_id,
|
||||
name: state.file.name,
|
||||
size: state.file.size,
|
||||
last_modified: state.file.lastModified,
|
||||
name: item.file.name,
|
||||
size: item.file.size,
|
||||
last_modified: item.file.lastModified,
|
||||
chunk_size: response.chunk_size,
|
||||
total_chunks: response.total_chunks,
|
||||
completed_chunks: 0,
|
||||
file_handle: state.fileHandle,
|
||||
file_handle: item.fileHandle,
|
||||
updated_at: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await saveRecord(record);
|
||||
updateProgress(0, response.total_chunks);
|
||||
log("Upload record created.");
|
||||
item.record = await saveRecord(record);
|
||||
setItemProgress(item, 0, response.total_chunks);
|
||||
item.statusText = "Upload record created.";
|
||||
renderAll();
|
||||
}
|
||||
|
||||
function buildMissingChunkList(totalChunks, completedChunks) {
|
||||
@@ -421,18 +825,19 @@ function buildMissingChunkList(totalChunks, completedChunks) {
|
||||
return missing;
|
||||
}
|
||||
|
||||
async function uploadChunkWithRetry(index, chunkSize, totalChunks, signal) {
|
||||
async function uploadChunkWithRetry(item, index, chunkSize, totalChunks, signal) {
|
||||
for (let attempt = 0; attempt <= MAX_RETRIES; attempt += 1) {
|
||||
throwIfAborted(signal);
|
||||
|
||||
try {
|
||||
await uploadChunk(index, chunkSize, signal);
|
||||
state.completedChunks.add(index);
|
||||
updateProgress(state.completedChunks.size, totalChunks);
|
||||
await saveRecord({
|
||||
...state.record,
|
||||
completed_chunks: state.completedChunks.size,
|
||||
await uploadChunk(item, index, chunkSize, signal);
|
||||
item.completedChunks.add(index);
|
||||
setItemProgress(item, item.completedChunks.size, totalChunks);
|
||||
item.record = await saveRecord({
|
||||
...item.record,
|
||||
completed_chunks: item.completedCount,
|
||||
});
|
||||
renderAll();
|
||||
return;
|
||||
} catch (error) {
|
||||
if (isAbortError(error) || attempt === MAX_RETRIES) {
|
||||
@@ -440,34 +845,42 @@ async function uploadChunkWithRetry(index, chunkSize, totalChunks, signal) {
|
||||
}
|
||||
|
||||
const delayMs = BASE_RETRY_DELAY_MS * 2 ** attempt;
|
||||
log(`Retrying chunk ${index} after ${delayMs} ms.`);
|
||||
item.statusText = `Retrying chunk ${index} after ${delayMs} ms.`;
|
||||
renderAll();
|
||||
await delay(delayMs, signal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function uploadChunk(index, chunkSize, signal) {
|
||||
async function uploadChunk(item, index, chunkSize, signal) {
|
||||
const start = index * chunkSize;
|
||||
const end = Math.min(state.file.size, start + chunkSize);
|
||||
const body = state.file.slice(start, end);
|
||||
const response = await fetch(`/api/uploads/${state.record.upload_id}/chunks/${index}`, {
|
||||
const end = Math.min(item.file.size, start + chunkSize);
|
||||
const body = item.file.slice(start, end);
|
||||
const response = await fetch(
|
||||
`/api/uploads/${item.record.upload_id}/chunks/${index}`,
|
||||
{
|
||||
method: "PUT",
|
||||
headers: { "Content-Type": "application/octet-stream" },
|
||||
body,
|
||||
signal,
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(await responseError(response));
|
||||
throw new ApiRequestError(await responseError(response), response.status);
|
||||
}
|
||||
}
|
||||
|
||||
async function runPool(items, worker) {
|
||||
async function runPool(items, worker, concurrency, signal = null) {
|
||||
let nextIndex = 0;
|
||||
const workers = Array.from(
|
||||
{ length: Math.min(CONCURRENCY, items.length) },
|
||||
{ length: Math.min(concurrency, items.length) },
|
||||
async () => {
|
||||
while (nextIndex < items.length) {
|
||||
if (signal) {
|
||||
throwIfAborted(signal);
|
||||
}
|
||||
|
||||
const item = items[nextIndex];
|
||||
nextIndex += 1;
|
||||
await worker(item);
|
||||
@@ -478,10 +891,18 @@ async function runPool(items, worker) {
|
||||
await Promise.all(workers);
|
||||
}
|
||||
|
||||
class ApiRequestError extends Error {
|
||||
constructor(message, status) {
|
||||
super(message);
|
||||
this.name = "ApiRequestError";
|
||||
this.status = status;
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchJson(url, options = {}) {
|
||||
const response = await fetch(url, options);
|
||||
if (!response.ok) {
|
||||
throw new Error(await responseError(response));
|
||||
throw new ApiRequestError(await responseError(response), response.status);
|
||||
}
|
||||
return response.json();
|
||||
}
|
||||
@@ -527,7 +948,7 @@ function isAbortError(error) {
|
||||
|
||||
async function initialize() {
|
||||
await loadRecords();
|
||||
renderButtons();
|
||||
renderAll();
|
||||
}
|
||||
|
||||
void initialize();
|
||||
|
||||
+12
-18
@@ -15,37 +15,31 @@
|
||||
<h1 id="app-title">upl</h1>
|
||||
<p class="subtle">Resumable uploads to this machine.</p>
|
||||
</div>
|
||||
<span class="status-pill" id="connection-status">Server online</span>
|
||||
</div>
|
||||
|
||||
<div class="file-picker">
|
||||
<button id="pick-button" type="button">Choose file</button>
|
||||
<input id="file-input" type="file">
|
||||
<button id="pick-button" type="button">Choose files</button>
|
||||
<input id="file-input" type="file" multiple>
|
||||
</div>
|
||||
|
||||
<div class="file-summary" id="file-summary" hidden>
|
||||
<strong id="file-name"></strong>
|
||||
<span id="file-size"></span>
|
||||
</div>
|
||||
|
||||
<div class="progress-wrap" aria-label="Upload progress">
|
||||
<div class="progress-bar" id="progress-bar"></div>
|
||||
</div>
|
||||
<div class="progress-meta" id="progress-meta">0 of 0 chunks</div>
|
||||
|
||||
<div class="actions">
|
||||
<button id="start-button" type="button" disabled>Start</button>
|
||||
<button id="pause-button" type="button" disabled>Pause</button>
|
||||
<button id="resume-button" type="button" disabled>Resume</button>
|
||||
<button id="start-button" type="button" disabled>Start all</button>
|
||||
<button id="pause-button" type="button" disabled>Pause all</button>
|
||||
<button id="resume-button" type="button" disabled>Resume all</button>
|
||||
</div>
|
||||
|
||||
<section class="upload-section" id="upload-section" hidden>
|
||||
<h2>Selected uploads</h2>
|
||||
<ul class="upload-list" id="upload-list"></ul>
|
||||
</section>
|
||||
|
||||
<section class="pending-section" id="pending-section" hidden>
|
||||
<h2>Pending uploads</h2>
|
||||
<h2>Saved upload progress</h2>
|
||||
<ul class="pending-list" id="pending-list"></ul>
|
||||
</section>
|
||||
|
||||
<ol class="event-log" id="event-log" aria-live="polite">
|
||||
<li>Choose a file to begin.</li>
|
||||
<li>Choose files to begin.</li>
|
||||
</ol>
|
||||
</section>
|
||||
</main>
|
||||
|
||||
+61
-21
@@ -71,17 +71,6 @@ h1 {
|
||||
color: var(--muted);
|
||||
}
|
||||
|
||||
.status-pill {
|
||||
flex: 0 0 auto;
|
||||
padding: 6px 10px;
|
||||
border: 1px solid color-mix(in srgb, var(--accent) 34%, transparent);
|
||||
border-radius: 999px;
|
||||
color: var(--accent-strong);
|
||||
background: color-mix(in srgb, var(--accent) 10%, transparent);
|
||||
font-size: 0.875rem;
|
||||
font-weight: 700;
|
||||
}
|
||||
|
||||
.file-picker {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
@@ -99,16 +88,57 @@ h1 {
|
||||
clip: rect(0, 0, 0, 0);
|
||||
}
|
||||
|
||||
.file-summary {
|
||||
.upload-section {
|
||||
display: grid;
|
||||
gap: 4px;
|
||||
padding: 14px;
|
||||
gap: 12px;
|
||||
}
|
||||
|
||||
.upload-list {
|
||||
display: grid;
|
||||
gap: 12px;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
list-style: none;
|
||||
}
|
||||
|
||||
.upload-item {
|
||||
display: grid;
|
||||
gap: 12px;
|
||||
padding: 12px;
|
||||
border: 1px solid var(--line);
|
||||
border-radius: 8px;
|
||||
}
|
||||
|
||||
.file-summary span {
|
||||
.upload-item-blocked {
|
||||
border-color: #f04438;
|
||||
background: rgb(240 68 56 / 8%);
|
||||
}
|
||||
|
||||
.upload-item-blocked .upload-meta span {
|
||||
color: #b42318;
|
||||
}
|
||||
|
||||
.upload-item-header {
|
||||
display: grid;
|
||||
grid-template-columns: minmax(0, 1fr) auto;
|
||||
align-items: start;
|
||||
gap: 12px;
|
||||
}
|
||||
|
||||
.upload-meta {
|
||||
display: grid;
|
||||
gap: 4px;
|
||||
min-width: 0;
|
||||
}
|
||||
|
||||
.upload-meta span {
|
||||
color: var(--muted);
|
||||
font-size: 0.875rem;
|
||||
}
|
||||
|
||||
.upload-progress {
|
||||
display: grid;
|
||||
gap: 6px;
|
||||
}
|
||||
|
||||
.progress-wrap {
|
||||
@@ -126,7 +156,6 @@ h1 {
|
||||
}
|
||||
|
||||
.progress-meta {
|
||||
margin-top: -12px;
|
||||
color: var(--muted);
|
||||
font-size: 0.875rem;
|
||||
}
|
||||
@@ -137,6 +166,13 @@ h1 {
|
||||
gap: 10px;
|
||||
}
|
||||
|
||||
.upload-item-actions {
|
||||
display: flex;
|
||||
flex-wrap: wrap;
|
||||
justify-content: flex-end;
|
||||
gap: 8px;
|
||||
}
|
||||
|
||||
button {
|
||||
min-width: 96px;
|
||||
min-height: 40px;
|
||||
@@ -195,7 +231,7 @@ h2 {
|
||||
}
|
||||
|
||||
.pending-item strong,
|
||||
.file-summary strong {
|
||||
.upload-meta strong {
|
||||
overflow-wrap: anywhere;
|
||||
}
|
||||
|
||||
@@ -236,10 +272,6 @@ h2 {
|
||||
display: grid;
|
||||
}
|
||||
|
||||
.status-pill {
|
||||
width: max-content;
|
||||
}
|
||||
|
||||
.upload-panel {
|
||||
padding: 18px;
|
||||
}
|
||||
@@ -247,4 +279,12 @@ h2 {
|
||||
.pending-item {
|
||||
grid-template-columns: 1fr;
|
||||
}
|
||||
|
||||
.upload-item-header {
|
||||
grid-template-columns: 1fr;
|
||||
}
|
||||
|
||||
.upload-item-actions {
|
||||
justify-content: flex-start;
|
||||
}
|
||||
}
|
||||
|
||||
+11
-9
@@ -42,13 +42,16 @@ async fn stores_chunks_and_reports_progress() -> Result<(), Box<dyn std::error::
|
||||
let progress = get_progress(&app, &upload.upload_id).await?;
|
||||
assert_eq!(progress.completed_chunks, vec![0, 1]);
|
||||
|
||||
let chunk_path = temp_dir
|
||||
.path()
|
||||
.join("staging")
|
||||
.join(&upload.upload_id)
|
||||
.join("chunks")
|
||||
.join("000000.part");
|
||||
assert_eq!(tokio::fs::metadata(chunk_path).await?.len(), CHUNK_SIZE);
|
||||
let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id);
|
||||
assert_eq!(
|
||||
tokio::fs::metadata(upload_dir.join(".upload.tmp"))
|
||||
.await?
|
||||
.len(),
|
||||
CHUNK_SIZE + 3
|
||||
);
|
||||
assert!(upload_dir.join("completed").join("000000.done").is_file());
|
||||
assert!(upload_dir.join("completed").join("000001.done").is_file());
|
||||
assert!(!upload_dir.join("chunks").exists());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -84,8 +87,7 @@ async fn rejects_out_of_range_chunk_index() -> Result<(), Box<dyn std::error::Er
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn accepts_duplicate_chunk_when_existing_length_matches()
|
||||
-> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn accepts_duplicate_completed_chunk() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let app = test_app(temp_dir.path());
|
||||
let upload = create_upload(&app, temp_dir.path(), 4).await?;
|
||||
|
||||
+121
-11
@@ -48,11 +48,11 @@ async fn assembles_completed_upload() -> Result<(), Box<dyn std::error::Error>>
|
||||
b"hello world"
|
||||
);
|
||||
assert!(
|
||||
temp_dir
|
||||
!temp_dir
|
||||
.path()
|
||||
.join("staging")
|
||||
.join(&upload.upload_id)
|
||||
.is_dir()
|
||||
.exists()
|
||||
);
|
||||
|
||||
let duplicate = app
|
||||
@@ -61,7 +61,81 @@ async fn assembles_completed_upload() -> Result<(), Box<dyn std::error::Error>>
|
||||
&format!("/api/uploads/{}/complete", upload.upload_id),
|
||||
)?)
|
||||
.await?;
|
||||
assert_eq!(duplicate.status(), StatusCode::CONFLICT);
|
||||
assert_eq!(duplicate.status(), StatusCode::NOT_FOUND);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn parallel_uploads_keep_bytes_separate() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let app = test_app(temp_dir.path());
|
||||
let chunk_size = usize::try_from(CHUNK_SIZE)?;
|
||||
let left_upload = create_upload(&app, "left.bin", CHUNK_SIZE + 4).await?;
|
||||
let right_upload = create_upload(&app, "right.bin", CHUNK_SIZE + 5).await?;
|
||||
|
||||
let mut expected_left = vec![b'l'; chunk_size];
|
||||
expected_left.extend_from_slice(b"eft!");
|
||||
let mut expected_right = vec![b'r'; chunk_size];
|
||||
expected_right.extend_from_slice(b"ight!");
|
||||
|
||||
let left_first = chunk_request(
|
||||
&left_upload.upload_id,
|
||||
0,
|
||||
expected_left[..chunk_size].to_vec(),
|
||||
)?;
|
||||
let left_final = chunk_request(
|
||||
&left_upload.upload_id,
|
||||
1,
|
||||
expected_left[chunk_size..].to_vec(),
|
||||
)?;
|
||||
let right_first = chunk_request(
|
||||
&right_upload.upload_id,
|
||||
0,
|
||||
expected_right[..chunk_size].to_vec(),
|
||||
)?;
|
||||
let right_final = chunk_request(
|
||||
&right_upload.upload_id,
|
||||
1,
|
||||
expected_right[chunk_size..].to_vec(),
|
||||
)?;
|
||||
|
||||
let (left_first, right_first, left_final, right_final) = tokio::join!(
|
||||
app.clone().oneshot(left_first),
|
||||
app.clone().oneshot(right_first),
|
||||
app.clone().oneshot(left_final),
|
||||
app.clone().oneshot(right_final),
|
||||
);
|
||||
|
||||
assert_eq!(left_first?.status(), StatusCode::NO_CONTENT);
|
||||
assert_eq!(right_first?.status(), StatusCode::NO_CONTENT);
|
||||
assert_eq!(left_final?.status(), StatusCode::NO_CONTENT);
|
||||
assert_eq!(right_final?.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
let left_complete = empty_request(
|
||||
Method::POST,
|
||||
&format!("/api/uploads/{}/complete", left_upload.upload_id),
|
||||
)?;
|
||||
let right_complete = empty_request(
|
||||
Method::POST,
|
||||
&format!("/api/uploads/{}/complete", right_upload.upload_id),
|
||||
)?;
|
||||
|
||||
let (left_complete, right_complete) = tokio::join!(
|
||||
app.clone().oneshot(left_complete),
|
||||
app.clone().oneshot(right_complete),
|
||||
);
|
||||
|
||||
assert_eq!(left_complete?.status(), StatusCode::OK);
|
||||
assert_eq!(right_complete?.status(), StatusCode::OK);
|
||||
assert_eq!(
|
||||
tokio::fs::read(temp_dir.path().join("complete").join("left.bin")).await?,
|
||||
expected_left
|
||||
);
|
||||
assert_eq!(
|
||||
tokio::fs::read(temp_dir.path().join("complete").join("right.bin")).await?,
|
||||
expected_right
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -98,18 +172,54 @@ async fn rejects_incomplete_upload() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_corrupt_chunk_file() -> Result<(), Box<dyn std::error::Error>> {
|
||||
async fn rejects_completion_that_would_replace_file() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let app = test_app(temp_dir.path());
|
||||
let upload = create_upload(&app, "clash.bin", 8).await?;
|
||||
tokio::fs::write(
|
||||
temp_dir.path().join("complete").join("clash.bin"),
|
||||
b"original",
|
||||
)
|
||||
.await?;
|
||||
|
||||
let response = app
|
||||
.clone()
|
||||
.oneshot(chunk_request(&upload.upload_id, 0, b"incoming".to_vec())?)
|
||||
.await?;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
let response = app
|
||||
.oneshot(empty_request(
|
||||
Method::POST,
|
||||
&format!("/api/uploads/{}/complete", upload.upload_id),
|
||||
)?)
|
||||
.await?;
|
||||
|
||||
assert_eq!(response.status(), StatusCode::CONFLICT);
|
||||
assert_eq!(
|
||||
tokio::fs::read(temp_dir.path().join("complete").join("clash.bin")).await?,
|
||||
b"original"
|
||||
);
|
||||
assert!(
|
||||
temp_dir
|
||||
.path()
|
||||
.join("staging")
|
||||
.join(&upload.upload_id)
|
||||
.exists()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_tampered_temp_upload_file() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let app = test_app(temp_dir.path());
|
||||
let upload = create_upload(&app, "corrupt.bin", 4).await?;
|
||||
|
||||
let chunk_path = temp_dir
|
||||
.path()
|
||||
.join("staging")
|
||||
.join(&upload.upload_id)
|
||||
.join("chunks")
|
||||
.join("000000.part");
|
||||
tokio::fs::write(chunk_path, b"bad").await?;
|
||||
let upload_dir = temp_dir.path().join("staging").join(&upload.upload_id);
|
||||
tokio::fs::write(upload_dir.join(".upload.tmp"), b"bad").await?;
|
||||
tokio::fs::write(upload_dir.join("completed").join("000000.done"), b"").await?;
|
||||
|
||||
let response = app
|
||||
.oneshot(empty_request(
|
||||
|
||||
@@ -22,8 +22,10 @@ async fn serves_index_page() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let body = String::from_utf8(body.to_vec())?;
|
||||
|
||||
assert!(body.contains("<title>upl</title>"));
|
||||
assert!(body.contains("Choose file"));
|
||||
assert!(body.contains("Pending uploads"));
|
||||
assert!(body.contains("Choose files"));
|
||||
assert!(body.contains("Selected uploads"));
|
||||
assert!(body.contains("Saved upload progress"));
|
||||
assert!(!body.contains("Server online"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -43,7 +43,8 @@ async fn creates_upload_metadata_on_disk() -> Result<(), Box<dyn std::error::Err
|
||||
|
||||
let upload_dir = temp_dir.path().join("staging").join(&response.upload_id);
|
||||
let meta_path = upload_dir.join("meta.json");
|
||||
assert!(upload_dir.join("chunks").is_dir());
|
||||
assert!(upload_dir.join(".upload.tmp").is_file());
|
||||
assert!(upload_dir.join("completed").is_dir());
|
||||
assert!(temp_dir.path().join("complete").is_dir());
|
||||
|
||||
let meta: UploadMeta = serde_json::from_slice(&tokio::fs::read(meta_path).await?)?;
|
||||
@@ -76,6 +77,41 @@ async fn rejects_empty_upload_name() -> Result<(), Box<dyn std::error::Error>> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_upload_name_that_already_exists() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let app = test_app(temp_dir.path());
|
||||
let complete_dir = temp_dir.path().join("complete");
|
||||
tokio::fs::create_dir_all(&complete_dir).await?;
|
||||
tokio::fs::write(complete_dir.join("xyz.foo"), b"original").await?;
|
||||
|
||||
let response = app
|
||||
.oneshot(json_request(
|
||||
"/api/uploads",
|
||||
&json!({
|
||||
"name": "xyz.foo",
|
||||
"size": 10,
|
||||
"last_modified": 1_760_000_000_000_i64
|
||||
}),
|
||||
)?)
|
||||
.await?;
|
||||
|
||||
assert_eq!(response.status(), StatusCode::CONFLICT);
|
||||
|
||||
let body = response.into_body().collect().await?.to_bytes();
|
||||
let body: serde_json::Value = serde_json::from_slice(&body)?;
|
||||
assert_eq!(body["error"], "file already exists");
|
||||
assert_eq!(
|
||||
tokio::fs::read(complete_dir.join("xyz.foo")).await?,
|
||||
b"original"
|
||||
);
|
||||
|
||||
let mut staging_entries = tokio::fs::read_dir(temp_dir.path().join("staging")).await?;
|
||||
assert!(staging_entries.next_entry().await?.is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn test_app(data_dir: &Path) -> axum::Router {
|
||||
build_router(&AppConfig::new(
|
||||
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
|
||||
|
||||
Reference in New Issue
Block a user