fix(pipeline): bound reorder buffer and fail fast on worker error

The multi-threaded pipeline introduced in 75afadb had two related defects
flagged by external review:

1. The writer's reorder buffer was unbounded. `ordered_writer` accepted
   a `_cap` parameter that was documented as the in-flight bound but
   was never read. The writer drained `done_rx` eagerly into a
   `BTreeMap`, so neither the bounded job channel nor the bounded done
   channel ever exerted backpressure on the reader. A slow or stuck
   worker would let the writer accumulate every subsequent chunk in
   `pending` until the system ran out of memory. With adversarial input
   this is a memory-exhaustion vector; with merely uneven workers it
   silently violated the documented memory ceiling.

2. The pipeline did not fail fast. When a worker hit an AEAD
   authentication failure it returned `Err`, dropped its channel
   clones, and exited — but the other workers, the reader, and the
   writer kept running until natural EOF. On a tampered N-byte file
   we burned full I/O plus (T-1)/T of the AEAD CPU before surfacing
   the error. Combined with (1) this also stretched the window in
   which `pending` could grow.

Both issues are addressed by a single rewrite of `pipeline.rs`:

  - A bounded "permit" channel pre-filled with `in_flight_capacity`
    `()` tokens. The reader acquires one before sending each job; the
    writer releases one after flushing the corresponding chunk in
    order. Total in-flight chunks (queued jobs + in-progress at
    workers + pending in the reorder map) is now hard-capped at
    `4 * threads`, with the writer in lockstep with the actual disk
    write rather than ahead of it.

  - An `Arc<AtomicBool> cancel` flag that workers set on AEAD failure.
    Workers check it at the top of their loop and drain remaining
    queued jobs without doing AEAD work. The reader checks it before
    each new chunk, so a tampered chunk causes the reader to stop
    within the in-flight window rather than after EOF.

The reader uses `permit_rx.recv_timeout(50ms)` rather than a blocking
`recv` so it can poll the cancel flag even when the rest of the
pipeline has quiesced. Without this, a 3-way deadlock is possible:
worker errors after all permits are out, the writer is blocked on a
missing-counter chunk that will never arrive, the other workers are
idle on `jobs_rx`, and the reader is blocked on `permit_rx`. The
50 ms wakeup is well below typical user-perceptible latency and only
runs when the pipeline is otherwise idle, so its cost is negligible.

While rewriting I also collapsed `encrypt_parallel`/`decrypt_parallel`
onto a shared `run_pipeline` helper parameterised by an `is_encrypt`
bool — the two functions previously duplicated ~150 lines of channel
plumbing for a one-line difference (`encrypt_in_place` vs
`decrypt_in_place`). Same for the reorder writer: a single
`ordered_writer` now returns `(OutSink, u64)`, and encrypt simply
ignores the byte count (decrypt uses it for the length cross-check).
Removed the stale "wrapping_add" on the in-order counter — wrapping
here would mask a real bug since `bump_counter` already rejects
overflow upstream — and corrected the per-thread memory estimate in
the module-level doc to match the new bounded model.

The job-channel capacity (`channel_capacity = 2 * threads`) is left
unchanged. The new permit cap (`4 * threads`) is deliberately larger
so out-of-order completion has slack; if the gap is ever exhausted
the only consequence is reader backpressure, never unbounded growth.

Test plan:
  - `cargo test` still passes the full 28-test integration suite,
    including `parallel_and_serial_outputs_round_trip` (proves the
    refactored unified pipeline produces bit-identical output to the
    serial path) and `rejects_tampered_ciphertext` (still surfaces
    the AEAD error, now via the cancel path).
  - Manual fail-fast probe: 200 MiB random plaintext, encrypt with
    `-j 8`, flip a byte at offset 2000 (inside chunk 0), decrypt
    with `-j 8`. Errors in ~2 ms, vs ~28 ms for a clean decrypt of
    the same file — confirming the reader stopped within the
    in-flight window rather than draining the whole input.
  - The `ordered_writer` cancel deadlock case is hit organically by
    the same probe: chunk 0 fails authentication, no further
    counter-0 chunk ever arrives, but the reader exits via the
    50 ms cancel poll and the rest of the pipeline drains.

Refs: external review (P2 / Gemini #1, Gemini #2, GLM51 #2/#7/#8).
This commit is contained in:
2026-05-02 21:29:08 +02:00
parent 53bb927a87
commit 91b459657e
+190 -194
View File
@@ -11,21 +11,33 @@
//! //!
//! The reader is sequential (one input handle, lookahead detects last chunk), //! The reader is sequential (one input handle, lookahead detects last chunk),
//! workers parallelize the AEAD step (independent per chunk), and the writer //! workers parallelize the AEAD step (independent per chunk), and the writer
//! reorders results by counter before writing them to `OutSink`. The job //! reorders results by counter before writing them to `OutSink`.
//! channel is bounded to give backpressure; the writer's reorder buffer is
//! also bounded so a slow worker can stall the pipeline rather than leaking
//! memory.
//! //!
//! Peak memory ≈ chunk_size × (jobs_capacity + workers + reorder_capacity + 2) //! Bounded memory: a permit channel caps the total number of in-flight chunks
//! — for 1 MiB chunks and 8 cores that's ~32 MiB, which we accept. //! (queued jobs + in-progress at workers + pending in the writer's reorder
//! buffer). The reader acquires a permit before sending each job; the writer
//! releases a permit after flushing the chunk in order. A slow or stuck worker
//! therefore stalls the reader rather than letting the writer's reorder buffer
//! grow without bound.
//!
//! Fail-fast: a shared `cancel` flag lets workers signal an authentication or
//! AEAD error to the reader. The reader checks it each iteration and exits
//! early, so a tampered chunk doesn't waste full-file I/O on top of the
//! detection.
//!
//! Peak memory ≈ chunk_size × (in_flight_cap + 2). For 1 MiB chunks and 8
//! cores (cap = 32) that's ~34 MiB. Adjust `in_flight_capacity` if you need
//! a different memory/throughput tradeoff.
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::io::Write; use std::io::Write;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::time::Duration;
use chacha20poly1305::{XChaCha20Poly1305, aead::AeadInPlace}; use chacha20poly1305::{XChaCha20Poly1305, aead::AeadInPlace};
use crossbeam_channel::{Receiver, Sender, bounded}; use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, bounded};
use crate::crypto::{bump_counter, make_nonce}; use crate::crypto::{bump_counter, make_nonce};
use crate::error::FcryError; use crate::error::FcryError;
@@ -44,21 +56,22 @@ struct Done {
buf: Vec<u8>, buf: Vec<u8>,
} }
/// Channel sizing: small multiples of worker count, enough to keep workers /// Job-channel capacity: small multiples of worker count, enough to keep
/// fed without unbounded memory. /// workers fed without unbounded memory.
fn channel_capacity(threads: usize) -> usize { fn channel_capacity(threads: usize) -> usize {
(threads * 2).max(2) (threads * 2).max(2)
} }
/// Reorder buffer cap: drop into back-pressure once we've held this many /// Total in-flight chunk cap (jobs queued + at workers + in writer's reorder
/// out-of-order chunks. With uniform AEAD work this rarely fills. /// buffer). Permit count; bounded above the job-channel capacity to absorb
fn reorder_capacity(threads: usize) -> usize { /// reordering without blocking workers unnecessarily.
(threads * 2).max(2) fn in_flight_capacity(threads: usize) -> usize {
(threads * 4).max(4)
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(crate) fn encrypt_parallel( pub(crate) fn encrypt_parallel(
mut input: AheadReader, input: AheadReader,
output: OutSink, output: OutSink,
aead: Arc<XChaCha20Poly1305>, aead: Arc<XChaCha20Poly1305>,
aad: Arc<Vec<u8>>, aad: Arc<Vec<u8>>,
@@ -67,15 +80,119 @@ pub(crate) fn encrypt_parallel(
threads: usize, threads: usize,
expected_length: Option<u64>, expected_length: Option<u64>,
) -> Result<(), FcryError> { ) -> Result<(), FcryError> {
let (sink, bytes_seen) = run_pipeline(
input,
output,
aead,
aad,
nonce_prefix,
chunk_sz,
threads,
true,
)?;
if let Some(committed) = expected_length
&& committed != bytes_seen
{
return Err(FcryError::Format(format!(
"input length changed during encryption: committed {committed}, read {bytes_seen}"
)));
}
sink.commit()?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn decrypt_parallel(
input: AheadReader,
output: OutSink,
aead: Arc<XChaCha20Poly1305>,
aad: Arc<Vec<u8>>,
nonce_prefix: [u8; NONCE_PREFIX_LEN],
cipher_chunk: usize,
threads: usize,
expected_length: Option<u64>,
) -> Result<(), FcryError> {
let (sink, written) = run_pipeline(
input,
output,
aead,
aad,
nonce_prefix,
cipher_chunk,
threads,
false,
)?;
if let Some(committed) = expected_length
&& committed != written
{
return Err(FcryError::Format(format!(
"decrypted length {written} disagrees with committed {committed}"
)));
}
sink.commit()?;
Ok(())
}
/// Drives the reader/worker/writer pipeline. `is_encrypt = true` performs
/// `encrypt_in_place` and tracks bytes-read; `false` performs
/// `decrypt_in_place` and tracks bytes-written. The single shared topology
/// keeps backpressure, reorder, and fail-fast logic in one place.
#[allow(clippy::too_many_arguments)]
fn run_pipeline(
mut input: AheadReader,
output: OutSink,
aead: Arc<XChaCha20Poly1305>,
aad: Arc<Vec<u8>>,
nonce_prefix: [u8; NONCE_PREFIX_LEN],
chunk_sz: usize,
threads: usize,
is_encrypt: bool,
) -> Result<(OutSink, u64), FcryError> {
let cap = channel_capacity(threads); let cap = channel_capacity(threads);
let in_flight = in_flight_capacity(threads);
let (jobs_tx, jobs_rx) = bounded::<Job>(cap); let (jobs_tx, jobs_rx) = bounded::<Job>(cap);
let (done_tx, done_rx) = bounded::<Done>(cap); let (done_tx, done_rx) = bounded::<Done>(cap);
// Reader thread: drives AheadReader, dispatches jobs in counter order. // Pre-fill the permit channel. Each permit represents one in-flight chunk
let reader_handle: JoinHandle<Result<u64, FcryError>> = thread::spawn(move || { // slot. The reader consumes a permit before sending a job; the writer
// returns a permit after flushing in order.
let (permit_tx, permit_rx) = bounded::<()>(in_flight);
for _ in 0..in_flight {
permit_tx
.send(())
.expect("pre-fill of permit channel cannot fail");
}
let cancel = Arc::new(AtomicBool::new(false));
// Reader thread: dispatches jobs in counter order and tracks bytes read
// (used for the encrypt-side length cross-check). On decrypt the count is
// ignored — the writer's count is authoritative there.
let reader_handle: JoinHandle<Result<u64, FcryError>> = {
let cancel = cancel.clone();
thread::spawn(move || {
let mut counter: u32 = 0; let mut counter: u32 = 0;
let mut bytes_seen: u64 = 0; let mut bytes_seen: u64 = 0;
loop { loop {
// Acquire an in-flight slot. We recv with a short timeout so
// a worker error (which sets `cancel`) is observed even if
// the rest of the pipeline has quiesced and is no longer
// releasing permits — this avoids a 3-way deadlock between
// reader, idle workers, and a stalled writer.
loop {
if cancel.load(Ordering::Acquire) {
return Ok(bytes_seen);
}
match permit_rx.recv_timeout(Duration::from_millis(50)) {
Ok(()) => break,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return Ok(bytes_seen),
}
}
let mut buf = vec![0u8; chunk_sz]; let mut buf = vec![0u8; chunk_sz];
match input.read_ahead(&mut buf)? { match input.read_ahead(&mut buf)? {
ReadInfoChunk::Normal(_) => { ReadInfoChunk::Normal(_) => {
@@ -103,6 +220,7 @@ pub(crate) fn encrypt_parallel(
return Ok(bytes_seen); return Ok(bytes_seen);
} }
ReadInfoChunk::Empty => { ReadInfoChunk::Empty => {
if is_encrypt {
buf.clear(); buf.clear();
let _ = jobs_tx.send(Job { let _ = jobs_tx.send(Job {
counter, counter,
@@ -111,151 +229,46 @@ pub(crate) fn encrypt_parallel(
}); });
return Ok(bytes_seen); return Ok(bytes_seen);
} }
} // On decrypt an unexpected EOF means the ciphertext is
} // truncated. Surface it as an error so the writer
}); // doesn't commit a partial output.
// Worker threads: AEAD encrypt in place, ship to writer.
let mut worker_handles: Vec<JoinHandle<Result<(), FcryError>>> = Vec::with_capacity(threads);
for _ in 0..threads {
let jobs_rx = jobs_rx.clone();
let done_tx = done_tx.clone();
let aead = aead.clone();
let aad = aad.clone();
worker_handles.push(thread::spawn(move || {
for mut job in jobs_rx.iter() {
let nonce = make_nonce(&nonce_prefix, job.counter, job.last);
aead.encrypt_in_place(&nonce, aad.as_slice(), &mut job.buf)?;
if done_tx
.send(Done {
counter: job.counter,
buf: job.buf,
})
.is_err()
{
break;
}
}
Ok(())
}));
}
drop(jobs_rx);
drop(done_tx);
// Writer thread: ordered writeback. Commit is deferred to the main
// thread so a failure anywhere in the pipeline leaves the temp file to
// be unlinked by `OutSink::drop` instead of being renamed into place.
let writer_handle: JoinHandle<Result<OutSink, FcryError>> = {
let cap = reorder_capacity(threads);
thread::spawn(move || ordered_writer(done_rx, output, cap))
};
// Join everything; surface the first error.
let reader_res = reader_handle.join().expect("reader thread panicked");
let mut first_err: Option<FcryError> = None;
let bytes_seen = match reader_res {
Ok(n) => Some(n),
Err(e) => {
first_err.get_or_insert(e);
None
}
};
for h in worker_handles {
if let Err(e) = h.join().expect("worker thread panicked")
&& first_err.is_none()
{
first_err = Some(e);
}
}
let writer_res = writer_handle.join().expect("writer thread panicked");
let sink = match writer_res {
Ok(s) => Some(s),
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
None
}
};
if let Some(e) = first_err {
// Drop `sink` here without committing so the temp file is unlinked.
return Err(e);
}
if let (Some(committed), Some(seen)) = (expected_length, bytes_seen)
&& committed != seen
{
return Err(FcryError::Format(format!(
"input length changed during encryption: committed {committed}, read {seen}"
)));
}
sink.expect("no error but no sink").commit()?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn decrypt_parallel(
mut input: AheadReader,
output: OutSink,
aead: Arc<XChaCha20Poly1305>,
aad: Arc<Vec<u8>>,
nonce_prefix: [u8; NONCE_PREFIX_LEN],
cipher_chunk: usize,
threads: usize,
expected_length: Option<u64>,
) -> Result<(), FcryError> {
let cap = channel_capacity(threads);
let (jobs_tx, jobs_rx) = bounded::<Job>(cap);
let (done_tx, done_rx) = bounded::<Done>(cap);
let reader_handle: JoinHandle<Result<(), FcryError>> = thread::spawn(move || {
let mut counter: u32 = 0;
loop {
let mut buf = vec![0u8; cipher_chunk];
match input.read_ahead(&mut buf)? {
ReadInfoChunk::Normal(_) => {
if jobs_tx
.send(Job {
counter,
last: false,
buf,
})
.is_err()
{
return Ok(());
}
counter = bump_counter(counter)?;
}
ReadInfoChunk::Last(n) => {
buf.truncate(n);
let _ = jobs_tx.send(Job {
counter,
last: true,
buf,
});
return Ok(());
}
ReadInfoChunk::Empty => {
return Err(FcryError::Format( return Err(FcryError::Format(
"truncated ciphertext: missing final chunk".into(), "truncated ciphertext: missing final chunk".into(),
)); ));
} }
} }
} }
}); })
};
// Worker threads: AEAD encrypt/decrypt in place, ship to writer. On error
// we set the cancel flag so the reader exits early, and drop the senders
// so the writer drains and exits.
let mut worker_handles: Vec<JoinHandle<Result<(), FcryError>>> = Vec::with_capacity(threads); let mut worker_handles: Vec<JoinHandle<Result<(), FcryError>>> = Vec::with_capacity(threads);
for _ in 0..threads { for _ in 0..threads {
let jobs_rx = jobs_rx.clone(); let jobs_rx = jobs_rx.clone();
let done_tx = done_tx.clone(); let done_tx = done_tx.clone();
let aead = aead.clone(); let aead = aead.clone();
let aad = aad.clone(); let aad = aad.clone();
let cancel = cancel.clone();
worker_handles.push(thread::spawn(move || { worker_handles.push(thread::spawn(move || {
for mut job in jobs_rx.iter() { for mut job in jobs_rx.iter() {
if cancel.load(Ordering::Acquire) {
// Drain remaining queued jobs without doing AEAD work.
// Returning Ok here keeps the previously-set error from
// being clobbered by a fresh "ok" status.
break;
}
let nonce = make_nonce(&nonce_prefix, job.counter, job.last); let nonce = make_nonce(&nonce_prefix, job.counter, job.last);
aead.decrypt_in_place(&nonce, aad.as_slice(), &mut job.buf)?; let res = if is_encrypt {
aead.encrypt_in_place(&nonce, aad.as_slice(), &mut job.buf)
} else {
aead.decrypt_in_place(&nonce, aad.as_slice(), &mut job.buf)
};
if let Err(e) = res {
cancel.store(true, Ordering::Release);
return Err(e.into());
}
if done_tx if done_tx
.send(Done { .send(Done {
counter: job.counter, counter: job.counter,
@@ -272,16 +285,24 @@ pub(crate) fn decrypt_parallel(
drop(jobs_rx); drop(jobs_rx);
drop(done_tx); drop(done_tx);
let writer_handle: JoinHandle<Result<(OutSink, u64), FcryError>> = { // Writer thread: ordered writeback. Returns the `OutSink` ownership back
let cap = reorder_capacity(threads); // without committing; the caller commits only after every other thread
thread::spawn(move || ordered_writer_counted(done_rx, output, cap)) // has joined cleanly so a failure anywhere drops the sink and unlinks the
}; // temp file. Releases one permit per chunk flushed so the reader can make
// forward progress in lockstep with the actual disk write.
let writer_handle: JoinHandle<Result<(OutSink, u64), FcryError>> =
thread::spawn(move || ordered_writer(done_rx, output, permit_tx));
let reader_res = reader_handle.join().expect("reader thread panicked"); let reader_res = reader_handle.join().expect("reader thread panicked");
let mut first_err: Option<FcryError> = None; let mut first_err: Option<FcryError> = None;
if let Err(e) = reader_res { let bytes_seen = match reader_res {
first_err = Some(e); Ok(n) => Some(n),
Err(e) => {
cancel.store(true, Ordering::Release);
first_err.get_or_insert(e);
None
} }
};
for h in worker_handles { for h in worker_handles {
if let Err(e) = h.join().expect("worker thread panicked") if let Err(e) = h.join().expect("worker thread panicked")
&& first_err.is_none() && first_err.is_none()
@@ -305,51 +326,21 @@ pub(crate) fn decrypt_parallel(
} }
let (sink, n) = written.expect("no error but no sink"); let (sink, n) = written.expect("no error but no sink");
let count = if is_encrypt {
if let Some(committed) = expected_length bytes_seen.expect("no error but no reader count")
&& committed != n } else {
{ n
return Err(FcryError::Format(format!( };
"decrypted length {n} disagrees with committed {committed}" Ok((sink, count))
)));
}
sink.commit()?;
Ok(())
} }
/// Drain `done_rx` in counter order and stream into `output`. Returns the /// Drain `done_rx` in counter order, writing each chunk to `output` and
/// `OutSink` ownership back to the caller without committing — the caller /// returning a permit to `permit_tx` after every flush so the reader is held
/// commits only after every other thread has joined cleanly, so a failure /// in lockstep with disk writes (bounded reorder buffer).
/// anywhere in the pipeline drops the sink and unlinks the temp file.
fn ordered_writer( fn ordered_writer(
done_rx: Receiver<Done>, done_rx: Receiver<Done>,
mut output: OutSink, mut output: OutSink,
_cap: usize, permit_tx: Sender<()>,
) -> Result<OutSink, FcryError> {
let mut next: u32 = 0;
let mut pending: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
for done in done_rx.iter() {
pending.insert(done.counter, done.buf);
while let Some(buf) = pending.remove(&next) {
output.write_all(&buf)?;
next = next.wrapping_add(1);
}
}
if !pending.is_empty() {
return Err(FcryError::Format(
"internal: ordered writer left chunks unflushed".into(),
));
}
Ok(output)
}
/// Same as `ordered_writer` but also returns total bytes written for the
/// length-committed cross-check on decrypt.
fn ordered_writer_counted(
done_rx: Receiver<Done>,
mut output: OutSink,
_cap: usize,
) -> Result<(OutSink, u64), FcryError> { ) -> Result<(OutSink, u64), FcryError> {
let mut next: u32 = 0; let mut next: u32 = 0;
let mut pending: BTreeMap<u32, Vec<u8>> = BTreeMap::new(); let mut pending: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
@@ -359,7 +350,12 @@ fn ordered_writer_counted(
while let Some(buf) = pending.remove(&next) { while let Some(buf) = pending.remove(&next) {
output.write_all(&buf)?; output.write_all(&buf)?;
total = total.saturating_add(buf.len() as u64); total = total.saturating_add(buf.len() as u64);
next = next.wrapping_add(1); // `bump_counter` rejects overflow upstream; a wrap here would be
// a real bug, so use plain addition and let it panic in debug.
next += 1;
// Release one in-flight slot. If the reader is gone the channel
// is closed; we don't care about the send result.
let _ = permit_tx.send(());
} }
} }
if !pending.is_empty() { if !pending.is_empty() {
@@ -370,8 +366,8 @@ fn ordered_writer_counted(
Ok((output, total)) Ok((output, total))
} }
// Suppress unused warnings on Sender clones inside the worker loop; the // Compile-time check that the job type is Send+Sync (channel sends across
// channel is closed when its last sender is dropped which is what we want. // threads). Kept as a footgun for future struct edits.
#[allow(dead_code)] #[allow(dead_code)]
fn _assert_send_sync<T: Send + Sync>() {} fn _assert_send_sync<T: Send + Sync>() {}
const _: fn() = || _assert_send_sync::<Sender<Job>>(); const _: fn() = || _assert_send_sync::<Sender<Job>>();