feat!: multi-threaded pipeline + length-committed/random-access decrypt

Completes the two follow-ups deferred from the v0.10 format/secrets
work: multi-threaded AEAD encrypt/decrypt and a length-committed file
format that enables random-access decryption.

# Format change (file format v2)

Bumps the on-disk header version to 2 and introduces a flag bit
(`FLAG_LENGTH_COMMITTED`, bit 0). When set, an authenticated `u64 LE`
plaintext length is appended to the header after the nonce prefix. v1
files still decrypt unchanged. v2 readers reject unknown flag bits.

The flag is set automatically when the input is a regular file (we
stat the open FD to avoid TOCTOU). Stdin/pipes/FIFOs encrypt as before
with the flag clear. Sequential decrypt cross-checks the produced byte
count against the committed length as defense in depth (the AEAD
already authenticates the value via header AAD, but failing before we
rename the temp file into place is preferable to failing after).

# Random-access decrypt

`fcry -d -i FILE --offset N --length L` seeks directly to the chunk(s)
covering `[N, N+L)` and decrypts only those, without scanning the
predecessors. Requires a seekable file whose header has the
length-committed flag — stdin/pipe-encrypted files cannot use this
path and the CLI rejects it with a clear error.

The chunk layout is fully determined by `chunk_size` and the committed
total length (last chunk's plaintext is
`total - (n_chunks-1)*chunk_size`; its ciphertext length is
`last_pt + 16`). Each chunk's nonce is
`make_nonce(prefix, chunk_index, is_last_chunk)` which matches what
sequential encrypt produced, so plaintext slices come out
bit-identical to a full sequential decrypt.

# Multi-threaded pipeline

New `src/pipeline.rs` implements:

  reader thread → bounded jobs channel → N AEAD workers
                → bounded results channel → writer thread

The reader stays serial (it owns the input handle and uses lookahead
to detect the last chunk). Workers parallelize the AEAD step (each
chunk is independent under STREAM). The writer holds a
`BTreeMap<u32, Vec<u8>>` reorder buffer and only flushes in counter
order. Commit is deferred to the main thread, so a failure anywhere —
reader I/O, AEAD auth, writer I/O — drops `OutSink` without renaming
the temp file into place. The
`atomic_output_no_stale_tmp_on_failure` integration test still
passes.

Channel and reorder capacities scale with worker count (`2*threads`);
peak memory is roughly `chunk_size * 4 * threads`. With 1 MiB chunks
and 8 cores that's ~32 MiB, which we accept.

Default thread count is `std::thread::available_parallelism()`;
override with `-j/--threads N`. `-j 1` keeps the original serial path.
Stdin/stdout streaming works under the parallel path because `Stdin`
(unlocked) is `Send` — only `StdinLock` isn't, so the boxed reader
wraps `Stdin` directly in a `BufReader`.

Adds `crossbeam-channel = "0.5"` for bounded MPMC. The cipher
(`XChaCha20Poly1305`) and the header AAD are shared across workers via
`Arc`; the AEAD's internal key copy is zeroized on drop as before.

# CLI surface

  -j, --threads <N>     worker thread count (default: cores)
      --offset <BYTES>  random-access decrypt: slice start
      --length <BYTES>  random-access decrypt: slice length

`--offset`/`--length` require `--decrypt` and `--input-file` (clap
enforces; we also surface a clean runtime error if only one is
supplied).

# Test plan

* `cargo test` — 5 unit + 27 integration, all green.
* New integration coverage:
  - parallel roundtrip on multi-chunk inputs (`-j 4`)
  - parallel-encrypted ciphertext decrypted serially, and vice-versa
    (output bit-identical regardless of worker count)
  - parallel pipe stdin↔stdout (asserts flag byte is 0 for stdin
    inputs — no length committed without a known size)
  - file inputs auto-commit length (asserts version=2 and flags bit 0
    set in the raw header bytes)
  - random-access slices spanning chunk-aligned, mid-chunk,
    last-chunk, and full-file ranges
  - random-access rejects out-of-range and stdin-encrypted inputs,
    accepts zero-length
  - tampering the committed length byte fails AEAD authentication
  - hand-crafted v1 header still decodes (no flag bit set)
* `cargo clippy --all-targets -- -D warnings` clean.
* `cargo +nightly fmt` clean.

Removes `TODO.md` since both deferred items are now implemented.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-02 20:33:00 +02:00
parent f72f9034f3
commit 75afadb1ec
10 changed files with 1095 additions and 51 deletions
+377
View File
@@ -0,0 +1,377 @@
// SPDX-License-Identifier: GPL-3.0-only
//! Multi-threaded encrypt/decrypt pipeline.
//!
//! Topology:
//!
//! ```text
//! reader thread → jobs (bounded MPMC) → N AEAD workers →
//! → results (bounded MPMC) → writer thread
//! ```
//!
//! The reader is sequential (one input handle, lookahead detects last chunk),
//! workers parallelize the AEAD step (independent per chunk), and the writer
//! reorders results by counter before writing them to `OutSink`. The job
//! channel is bounded to give backpressure; the writer's reorder buffer is
//! also bounded so a slow worker can stall the pipeline rather than leaking
//! memory.
//!
//! Peak memory ≈ chunk_size × (jobs_capacity + workers + reorder_capacity + 2)
//! — for 1 MiB chunks and 8 cores that's ~32 MiB, which we accept.
use std::collections::BTreeMap;
use std::io::Write;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use chacha20poly1305::{XChaCha20Poly1305, aead::AeadInPlace};
use crossbeam_channel::{Receiver, Sender, bounded};
use crate::crypto::{bump_counter, make_nonce};
use crate::error::FcryError;
use crate::header::NONCE_PREFIX_LEN;
use crate::reader::{AheadReader, ReadInfoChunk};
use crate::utils::OutSink;
struct Job {
counter: u32,
last: bool,
buf: Vec<u8>,
}
struct Done {
counter: u32,
buf: Vec<u8>,
}
/// Channel sizing: small multiples of worker count, enough to keep workers
/// fed without unbounded memory.
fn channel_capacity(threads: usize) -> usize {
(threads * 2).max(2)
}
/// Reorder buffer cap: drop into back-pressure once we've held this many
/// out-of-order chunks. With uniform AEAD work this rarely fills.
fn reorder_capacity(threads: usize) -> usize {
(threads * 2).max(2)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn encrypt_parallel(
mut input: AheadReader,
output: OutSink,
aead: Arc<XChaCha20Poly1305>,
aad: Arc<Vec<u8>>,
nonce_prefix: [u8; NONCE_PREFIX_LEN],
chunk_sz: usize,
threads: usize,
expected_length: Option<u64>,
) -> Result<(), FcryError> {
let cap = channel_capacity(threads);
let (jobs_tx, jobs_rx) = bounded::<Job>(cap);
let (done_tx, done_rx) = bounded::<Done>(cap);
// Reader thread: drives AheadReader, dispatches jobs in counter order.
let reader_handle: JoinHandle<Result<u64, FcryError>> = thread::spawn(move || {
let mut counter: u32 = 0;
let mut bytes_seen: u64 = 0;
loop {
let mut buf = vec![0u8; chunk_sz];
match input.read_ahead(&mut buf)? {
ReadInfoChunk::Normal(_) => {
if jobs_tx
.send(Job {
counter,
last: false,
buf,
})
.is_err()
{
return Ok(bytes_seen);
}
bytes_seen = bytes_seen.saturating_add(chunk_sz as u64);
counter = bump_counter(counter)?;
}
ReadInfoChunk::Last(n) => {
buf.truncate(n);
let _ = jobs_tx.send(Job {
counter,
last: true,
buf,
});
bytes_seen = bytes_seen.saturating_add(n as u64);
return Ok(bytes_seen);
}
ReadInfoChunk::Empty => {
buf.clear();
let _ = jobs_tx.send(Job {
counter,
last: true,
buf,
});
return Ok(bytes_seen);
}
}
}
});
// Worker threads: AEAD encrypt in place, ship to writer.
let mut worker_handles: Vec<JoinHandle<Result<(), FcryError>>> = Vec::with_capacity(threads);
for _ in 0..threads {
let jobs_rx = jobs_rx.clone();
let done_tx = done_tx.clone();
let aead = aead.clone();
let aad = aad.clone();
worker_handles.push(thread::spawn(move || {
for mut job in jobs_rx.iter() {
let nonce = make_nonce(&nonce_prefix, job.counter, job.last);
aead.encrypt_in_place(&nonce, aad.as_slice(), &mut job.buf)?;
if done_tx
.send(Done {
counter: job.counter,
buf: job.buf,
})
.is_err()
{
break;
}
}
Ok(())
}));
}
drop(jobs_rx);
drop(done_tx);
// Writer thread: ordered writeback. Commit is deferred to the main
// thread so a failure anywhere in the pipeline leaves the temp file to
// be unlinked by `OutSink::drop` instead of being renamed into place.
let writer_handle: JoinHandle<Result<OutSink, FcryError>> = {
let cap = reorder_capacity(threads);
thread::spawn(move || ordered_writer(done_rx, output, cap))
};
// Join everything; surface the first error.
let reader_res = reader_handle.join().expect("reader thread panicked");
let mut first_err: Option<FcryError> = None;
let bytes_seen = match reader_res {
Ok(n) => Some(n),
Err(e) => {
first_err.get_or_insert(e);
None
}
};
for h in worker_handles {
if let Err(e) = h.join().expect("worker thread panicked")
&& first_err.is_none()
{
first_err = Some(e);
}
}
let writer_res = writer_handle.join().expect("writer thread panicked");
let sink = match writer_res {
Ok(s) => Some(s),
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
None
}
};
if let Some(e) = first_err {
// Drop `sink` here without committing so the temp file is unlinked.
return Err(e);
}
if let (Some(committed), Some(seen)) = (expected_length, bytes_seen)
&& committed != seen
{
return Err(FcryError::Format(format!(
"input length changed during encryption: committed {committed}, read {seen}"
)));
}
sink.expect("no error but no sink").commit()?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn decrypt_parallel(
mut input: AheadReader,
output: OutSink,
aead: Arc<XChaCha20Poly1305>,
aad: Arc<Vec<u8>>,
nonce_prefix: [u8; NONCE_PREFIX_LEN],
cipher_chunk: usize,
threads: usize,
expected_length: Option<u64>,
) -> Result<(), FcryError> {
let cap = channel_capacity(threads);
let (jobs_tx, jobs_rx) = bounded::<Job>(cap);
let (done_tx, done_rx) = bounded::<Done>(cap);
let reader_handle: JoinHandle<Result<(), FcryError>> = thread::spawn(move || {
let mut counter: u32 = 0;
loop {
let mut buf = vec![0u8; cipher_chunk];
match input.read_ahead(&mut buf)? {
ReadInfoChunk::Normal(_) => {
if jobs_tx
.send(Job {
counter,
last: false,
buf,
})
.is_err()
{
return Ok(());
}
counter = bump_counter(counter)?;
}
ReadInfoChunk::Last(n) => {
buf.truncate(n);
let _ = jobs_tx.send(Job {
counter,
last: true,
buf,
});
return Ok(());
}
ReadInfoChunk::Empty => {
return Err(FcryError::Format(
"truncated ciphertext: missing final chunk".into(),
));
}
}
}
});
let mut worker_handles: Vec<JoinHandle<Result<(), FcryError>>> = Vec::with_capacity(threads);
for _ in 0..threads {
let jobs_rx = jobs_rx.clone();
let done_tx = done_tx.clone();
let aead = aead.clone();
let aad = aad.clone();
worker_handles.push(thread::spawn(move || {
for mut job in jobs_rx.iter() {
let nonce = make_nonce(&nonce_prefix, job.counter, job.last);
aead.decrypt_in_place(&nonce, aad.as_slice(), &mut job.buf)?;
if done_tx
.send(Done {
counter: job.counter,
buf: job.buf,
})
.is_err()
{
break;
}
}
Ok(())
}));
}
drop(jobs_rx);
drop(done_tx);
let writer_handle: JoinHandle<Result<(OutSink, u64), FcryError>> = {
let cap = reorder_capacity(threads);
thread::spawn(move || ordered_writer_counted(done_rx, output, cap))
};
let reader_res = reader_handle.join().expect("reader thread panicked");
let mut first_err: Option<FcryError> = None;
if let Err(e) = reader_res {
first_err = Some(e);
}
for h in worker_handles {
if let Err(e) = h.join().expect("worker thread panicked")
&& first_err.is_none()
{
first_err = Some(e);
}
}
let writer_res = writer_handle.join().expect("writer thread panicked");
let written = match writer_res {
Ok((sink, n)) => Some((sink, n)),
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
None
}
};
if let Some(e) = first_err {
return Err(e);
}
let (sink, n) = written.expect("no error but no sink");
if let Some(committed) = expected_length
&& committed != n
{
return Err(FcryError::Format(format!(
"decrypted length {n} disagrees with committed {committed}"
)));
}
sink.commit()?;
Ok(())
}
/// Drain `done_rx` in counter order and stream into `output`. Returns the
/// `OutSink` ownership back to the caller without committing — the caller
/// commits only after every other thread has joined cleanly, so a failure
/// anywhere in the pipeline drops the sink and unlinks the temp file.
fn ordered_writer(
done_rx: Receiver<Done>,
mut output: OutSink,
_cap: usize,
) -> Result<OutSink, FcryError> {
let mut next: u32 = 0;
let mut pending: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
for done in done_rx.iter() {
pending.insert(done.counter, done.buf);
while let Some(buf) = pending.remove(&next) {
output.write_all(&buf)?;
next = next.wrapping_add(1);
}
}
if !pending.is_empty() {
return Err(FcryError::Format(
"internal: ordered writer left chunks unflushed".into(),
));
}
Ok(output)
}
/// Same as `ordered_writer` but also returns total bytes written for the
/// length-committed cross-check on decrypt.
fn ordered_writer_counted(
done_rx: Receiver<Done>,
mut output: OutSink,
_cap: usize,
) -> Result<(OutSink, u64), FcryError> {
let mut next: u32 = 0;
let mut pending: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
let mut total: u64 = 0;
for done in done_rx.iter() {
pending.insert(done.counter, done.buf);
while let Some(buf) = pending.remove(&next) {
output.write_all(&buf)?;
total = total.saturating_add(buf.len() as u64);
next = next.wrapping_add(1);
}
}
if !pending.is_empty() {
return Err(FcryError::Format(
"internal: ordered writer left chunks unflushed".into(),
));
}
Ok((output, total))
}
// Suppress unused warnings on Sender clones inside the worker loop; the
// channel is closed when its last sender is dropped which is what we want.
#[allow(dead_code)]
fn _assert_send_sync<T: Send + Sync>() {}
const _: fn() = || _assert_send_sync::<Sender<Job>>();