From fcb85decad2642ad45fabf975c8e81daea324439 Mon Sep 17 00:00:00 2001 From: OpenSauce Date: Wed, 3 Jun 2026 23:03:53 +0100 Subject: [PATCH 1/4] perf(rt): eliminate permit_alloc sites and dealloc-on-RT in the engine (#239) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove all six `permit_alloc` escape hatches from `audio/engine.rs` and route the remaining drop-on-RT cases through `rt_drop.retire`, so the real-time process path and message-handler arms allocate and deallocate nothing. - peak_meter: replace `Arc>` with three atomics (`AtomicU32` x2 + `AtomicBool`) — no per-block `Arc::new`. - recorder: pre-allocate a recycling buffer pool in `Recorder::new` (sized via a new `max_block_samples` arg) instead of `Vec::with_capacity` per block. - pitch shifter: construct `PitchShifter` off the RT thread in `EngineHandle::set_pitch_shift` and ship it as `SetPitchShift(Option>)`; the RT side just swaps + retires. - samplers: hold as `Box` so `SetSamplers` swaps pointers and retires the old box directly (no `Box::new` type-erasure on RT). - IR cabinet: hold convolver as `Box`; `SwapIrConvolver` swaps in place and retires the whole `PreparedIr` (old convolver + name) off-RT. - input filters: `SetInputFilters` now `mem::replace` + retire instead of dropping the previous filters on assignment. - chain: reserve `DEFAULT_CHAIN_CAPACITY` up front so `AddStage` doesn't realloc. Tests: add a `message_arms` module that queues each message off-thread and drains it via `handle_messages()` inside the alloc audit (defeats warm-up blindness); add a metronome processing test; wrap the standalone tuner and samplers tests. Move `assert_no_alloc` to dev-dependencies (test-only now). --- rustortion-core/Cargo.toml | 4 +- rustortion-core/benches/common/mod.rs | 2 +- rustortion-core/src/amp/chain.rs | 24 ++- rustortion-core/src/audio/engine.rs | 99 ++++++---- rustortion-core/src/audio/peak_meter.rs | 66 +++++-- rustortion-core/src/audio/recorder.rs | 65 ++++++- rustortion-core/src/ir/cabinet.rs | 22 ++- rustortion-core/src/ir/load_service.rs | 2 +- rustortion-core/tests/no_alloc.rs | 246 ++++++++++++++++++++++-- rustortion-plugin/src/ir_helper.rs | 2 +- rustortion-standalone/src/gui/app.rs | 13 +- rustortion-standalone/tests/engine.rs | 39 ++-- 12 files changed, 476 insertions(+), 108 deletions(-) diff --git a/rustortion-core/Cargo.toml b/rustortion-core/Cargo.toml index e0f5932..136228d 100644 --- a/rustortion-core/Cargo.toml +++ b/rustortion-core/Cargo.toml @@ -16,12 +16,14 @@ anyhow = "1.0" rustfft = "6.4" realfft = "3.5" arc-swap = "1.8" -assert_no_alloc = { version = "1.1", features = ["warn_debug"] } nam-rs = "0.2.0" [dev-dependencies] criterion = { version = "0.8", features = ["html_reports"] } tempfile = "3.24" +# Test-only: RT-path allocation auditing (tests/no_alloc.rs). `warn_debug` +# makes it count violations instead of aborting the test binary. +assert_no_alloc = { version = "1.1", features = ["warn_debug"] } [[bench]] name = "impulse_responses" diff --git a/rustortion-core/benches/common/mod.rs b/rustortion-core/benches/common/mod.rs index 06d758d..89f0936 100644 --- a/rustortion-core/benches/common/mod.rs +++ b/rustortion-core/benches/common/mod.rs @@ -24,7 +24,7 @@ pub fn create_test_cabinet(ir_length: usize, sample_rate: usize) -> IrCabinet { let mut convolver = Convolver::new_fir(max_ir_samples); convolver.set_ir(&ir_samples).unwrap(); - cabinet.swap_convolver(convolver); + cabinet.set_convolver(convolver); cabinet } diff --git a/rustortion-core/src/amp/chain.rs b/rustortion-core/src/amp/chain.rs index 12a6bfa..f685474 100644 --- a/rustortion-core/src/amp/chain.rs +++ b/rustortion-core/src/amp/chain.rs @@ -5,16 +5,34 @@ struct BypassableStage { bypassed: bool, } +/// Default stage capacity reserved up front so that adding/inserting stages on +/// the RT thread (`AddStage`) doesn't reallocate the backing `Vec` until the +/// chain grows past this many stages. +pub const DEFAULT_CHAIN_CAPACITY: usize = 16; + // AmplifierChain holds a sequence of processing stages. -#[derive(Default)] pub struct AmplifierChain { stages: Vec, } +impl Default for AmplifierChain { + fn default() -> Self { + Self::new() + } +} + impl AmplifierChain { #[must_use] - pub const fn new() -> Self { - Self { stages: Vec::new() } + pub fn new() -> Self { + Self::with_capacity(DEFAULT_CHAIN_CAPACITY) + } + + /// Create a chain that can hold `capacity` stages before reallocating. + #[must_use] + pub fn with_capacity(capacity: usize) -> Self { + Self { + stages: Vec::with_capacity(capacity), + } } pub fn add_stage(&mut self, stage: Box) { diff --git a/rustortion-core/src/audio/engine.rs b/rustortion-core/src/audio/engine.rs index 3089b4a..8531923 100644 --- a/rustortion-core/src/audio/engine.rs +++ b/rustortion-core/src/audio/engine.rs @@ -1,5 +1,4 @@ use anyhow::Result; -use assert_no_alloc::permit_alloc; use crossbeam::channel::{Receiver, Sender, bounded}; use log::{debug, error}; @@ -17,7 +16,10 @@ use crate::tuner::Tuner; pub struct PreparedIr { pub name: String, - pub convolver: Convolver, + /// Boxed so it can be swapped into the cabinet on the RT thread without + /// reallocating, and so the whole `PreparedIr` (old convolver + name) can + /// be retired off the RT thread in one piece. + pub convolver: Box, } pub enum EngineMessage { @@ -35,7 +37,9 @@ pub enum EngineMessage { SetIrBypass(bool), SetIrGain(f32), SetTunerEnabled(bool), - SetPitchShift(i32), + /// Carries a fully-constructed pitch shifter (built off the RT thread), or + /// `None` to disable pitch shifting (the `0` semitones bypass case). + SetPitchShift(Option>), SetStageBypassed(usize, bool), SetSamplers(Box), } @@ -49,12 +53,14 @@ pub struct Engine { engine_receiver: Receiver, /// Handle for sending arbitrary objects off the RT thread for deallocation. rt_drop: RtDropHandle, - samplers: Samplers, + /// Boxed so swapping samplers (sample-rate / buffer changes) on the RT + /// thread exchanges pointers and retires the old box directly. + samplers: Box, tuner: Option, recorder: Option, peak_meter: Option, metronome: Option, - pitch_shifter: Option, + pitch_shifter: Option>, input_highpass: Option>, input_lowpass: Option>, /// When true, skip tuner, peak meter, recorder, and metronome processing. @@ -83,7 +89,7 @@ impl Engine { ir_cabinet, engine_receiver, rt_drop, - samplers, + samplers: Box::new(samplers), tuner: Some(tuner), recorder: None, peak_meter: Some(peak_meter), @@ -115,7 +121,7 @@ impl Engine { ir_cabinet, engine_receiver, rt_drop: rt_drop_handle, - samplers, + samplers: Box::new(samplers), tuner: None, recorder: None, peak_meter: None, @@ -162,9 +168,7 @@ impl Engine { } if let Some(ref mut shifter) = self.pitch_shifter { - // FIXME(no_alloc): PitchShifter::process_block uses Vec scratch - // buffers internally — see audio/pitch_shifter.rs. - permit_alloc(|| shifter.process_block(output)); + shifter.process_block(output); } if let Some(ref mut cab) = self.ir_cabinet { @@ -172,17 +176,13 @@ impl Engine { } if let Some(ref mut peak_meter) = self.peak_meter { - // FIXME(no_alloc): PeakMeter::process does Arc::new(PeakMeterInfo) - // every block — see audio/peak_meter.rs:62. - permit_alloc(|| peak_meter.process(output)); + peak_meter.process(output); } if !self.lightweight && let Some(recorder) = self.recorder.as_mut() { - // FIXME(no_alloc): Recorder::record_block does Vec::with_capacity - // per block — see audio/recorder.rs:47. - permit_alloc(|| recorder.record_block(output))?; + recorder.record_block(output)?; } Ok(()) @@ -286,16 +286,26 @@ impl Engine { } } EngineMessage::SetInputFilters(hp, lp) => { - self.input_highpass = hp; - self.input_lowpass = lp; + // Retire the previous filters off the RT thread instead of + // dropping them here on direct assignment. + if let Some(old) = std::mem::replace(&mut self.input_highpass, hp) { + self.rt_drop.retire(old); + } + if let Some(old) = std::mem::replace(&mut self.input_lowpass, lp) { + self.rt_drop.retire(old); + } debug!("Updated input filters"); } - EngineMessage::SwapIrConvolver(prepared) => { + EngineMessage::SwapIrConvolver(mut prepared) => { if let Some(ref mut cab) = self.ir_cabinet { debug!("IR convolver swapped: {}", prepared.name); - let old = cab.swap_convolver(prepared.convolver); - permit_alloc(|| self.rt_drop.retire(Box::new(old))); + // Swap the new convolver in; `prepared` is left holding + // the old convolver. Retire the whole `PreparedIr` (old + // convolver + name `String`) off the RT thread so + // nothing deallocates here. + cab.swap_convolver(&mut prepared.convolver); } + self.rt_drop.retire(prepared); } EngineMessage::ClearIr => { if let Some(ref mut cab) = self.ir_cabinet { @@ -326,12 +336,12 @@ impl Engine { EngineMessage::StopRecording => { self.handle_stop_recording(); } - EngineMessage::SetPitchShift(semitones) => { - self.handle_pitch_shift(semitones); + EngineMessage::SetPitchShift(shifter) => { + self.handle_pitch_shift(shifter); } EngineMessage::SetSamplers(new_samplers) => { - let old = std::mem::replace(&mut self.samplers, *new_samplers); - permit_alloc(|| self.rt_drop.retire(Box::new(old))); + let old = std::mem::replace(&mut self.samplers, new_samplers); + self.rt_drop.retire(old); debug!("Samplers swapped"); } } @@ -364,19 +374,15 @@ impl Engine { self.recorder = None; } - fn handle_pitch_shift(&mut self, semitones: i32) { - if semitones == 0 { - self.pitch_shifter = None; - debug!("Pitch shift disabled (bypass)"); - } else if let Some(ref mut shifter) = self.pitch_shifter { - shifter.set_semitones(semitones as f32); - debug!("Pitch shift set to {semitones} semitones"); - } else { - // FIXME(no_alloc): PitchShifter::new allocates FFT scratch buffers - // — see audio/pitch_shifter.rs. - self.pitch_shifter = Some(permit_alloc(|| PitchShifter::new(semitones as f32))); - debug!("Pitch shift set to {semitones} semitones"); + fn handle_pitch_shift(&mut self, shifter: Option>) { + // The shifter (if any) is constructed off the RT thread in + // `EngineHandle::set_pitch_shift`; here we just swap it in and retire + // the previous one off the RT thread. + let old = std::mem::replace(&mut self.pitch_shifter, shifter); + if let Some(old) = old { + self.rt_drop.retire(old); } + debug!("Pitch shifter updated"); } } @@ -448,8 +454,14 @@ impl EngineHandle { } pub fn set_pitch_shift(&self, semitones: i32) { - let update = EngineMessage::SetPitchShift(semitones); - self.send(update); + // Construct the pitch shifter here (GUI thread) so the RT thread never + // allocates its FFT plans / scratch buffers. `0` semitones == bypass. + let shifter = if semitones == 0 { + None + } else { + Some(Box::new(PitchShifter::new(semitones as f32))) + }; + self.send(EngineMessage::SetPitchShift(shifter)); } pub fn set_stage_bypassed(&self, idx: usize, bypassed: bool) { @@ -461,8 +473,13 @@ impl EngineHandle { self.send(update); } - pub fn start_recording(&self, sample_rate: usize, output_dir: &str) -> Result<()> { - let recorder = Recorder::new(sample_rate as u32, output_dir)?; + pub fn start_recording( + &self, + sample_rate: usize, + output_dir: &str, + max_block_samples: usize, + ) -> Result<()> { + let recorder = Recorder::new(sample_rate as u32, output_dir, max_block_samples)?; let update = EngineMessage::StartRecording(recorder); self.send(update); diff --git a/rustortion-core/src/audio/peak_meter.rs b/rustortion-core/src/audio/peak_meter.rs index 6fa3c30..cb95ce1 100644 --- a/rustortion-core/src/audio/peak_meter.rs +++ b/rustortion-core/src/audio/peak_meter.rs @@ -1,17 +1,59 @@ -use arc_swap::ArcSwap; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; const CLIP_THRESHOLD: f32 = 0.95; +/// Lock-free, allocation-free shared peak-meter readout. +/// +/// `PeakMeterInfo` is plain data with no need for reference counting, so the +/// three fields are stored as atomics rather than swapped behind an `Arc`. +/// This keeps `PeakMeter::process` (called per audio block on the RT thread) +/// free of the `Arc::new` allocation the previous `ArcSwap` design incurred. +/// +/// `f32` values are stored as their bit patterns. All access is `Relaxed`: +/// the fields are independent and a momentarily-torn read across them is +/// cosmetically irrelevant for a level meter. +struct PeakMeterShared { + peak_db: AtomicU32, + peak_linear: AtomicU32, + is_clipping: AtomicBool, +} + +impl PeakMeterShared { + fn new() -> Self { + let default = PeakMeterInfo::default(); + Self { + peak_db: AtomicU32::new(default.peak_db.to_bits()), + peak_linear: AtomicU32::new(default.peak_linear.to_bits()), + is_clipping: AtomicBool::new(default.is_clipping), + } + } + + fn store(&self, peak_db: f32, peak_linear: f32, is_clipping: bool) { + self.peak_db.store(peak_db.to_bits(), Ordering::Relaxed); + self.peak_linear + .store(peak_linear.to_bits(), Ordering::Relaxed); + self.is_clipping.store(is_clipping, Ordering::Relaxed); + } + + fn load(&self) -> PeakMeterInfo { + PeakMeterInfo { + peak_db: f32::from_bits(self.peak_db.load(Ordering::Relaxed)), + peak_linear: f32::from_bits(self.peak_linear.load(Ordering::Relaxed)), + is_clipping: self.is_clipping.load(Ordering::Relaxed), + } + } +} + pub struct PeakMeter { current_peak: f32, samples_since_peak: usize, peak_hold_samples: usize, - info: Arc>, + shared: Arc, } pub struct PeakMeterHandle { - info: Arc>, + shared: Arc, } #[derive(Debug, Clone, Default)] @@ -23,16 +65,16 @@ pub struct PeakMeterInfo { impl PeakMeter { pub fn new(sample_rate: usize) -> (Self, PeakMeterHandle) { - let info = Arc::new(ArcSwap::from_pointee(PeakMeterInfo::default())); + let shared = Arc::new(PeakMeterShared::new()); ( Self { current_peak: 0.0, samples_since_peak: 0, peak_hold_samples: sample_rate * 2, // 2 Seconds - info: Arc::clone(&info), + shared: Arc::clone(&shared), }, - PeakMeterHandle { info }, + PeakMeterHandle { shared }, ) } @@ -59,23 +101,21 @@ impl PeakMeter { let is_clipping = self.current_peak >= CLIP_THRESHOLD; - self.info.store(Arc::new(PeakMeterInfo { - peak_db, - peak_linear: self.current_peak, - is_clipping, - })); + self.shared.store(peak_db, self.current_peak, is_clipping); } pub fn reset(&mut self) { self.current_peak = 0.0; self.samples_since_peak = 0; - self.info.store(Arc::new(PeakMeterInfo::default())); + let default = PeakMeterInfo::default(); + self.shared + .store(default.peak_db, default.peak_linear, default.is_clipping); } } impl PeakMeterHandle { pub fn get_info(&self) -> PeakMeterInfo { - self.info.load().as_ref().clone() + self.shared.load() } } diff --git a/rustortion-core/src/audio/recorder.rs b/rustortion-core/src/audio/recorder.rs index ecb1d33..1ea8d2f 100644 --- a/rustortion-core/src/audio/recorder.rs +++ b/rustortion-core/src/audio/recorder.rs @@ -5,30 +5,57 @@ use log::{error, info}; use std::{fs, thread}; type AudioBlock = Vec; -const BLOCK_CHANNEL_CAPACITY: usize = 32; +const BLOCK_CHANNEL_CAPACITY: usize = 64; +/// Number of pre-allocated buffers cycling between the RT thread and the writer. +/// Sized to twice the in-flight channel capacity so a free buffer is always +/// available to `record_block` (it never starves the pool) — the blocking +/// `send` remains the sole backpressure throttle, exactly as before, only now +/// without the per-block `Vec` allocation. +const BUFFER_POOL_SIZE: usize = BLOCK_CHANNEL_CAPACITY * 2; pub struct Recorder { recorder_sender: Sender, + /// Pool of emptied buffers returned by the writer thread for reuse, so + /// `record_block` never allocates on the RT thread. + recycle_receiver: Receiver, + /// Largest input block (in samples) the pre-allocated buffers can hold + /// without reallocating. Blocks larger than this are dropped. + max_block_samples: usize, handle: thread::JoinHandle<()>, } impl Recorder { /// Creates a new Recorder instance. - pub fn new(sample_rate: u32, record_dir: &str) -> Result { + /// + /// `max_block_samples` is the largest input block size the recorder will be + /// asked to handle; the buffer pool is pre-sized to it so that + /// `record_block` performs no allocation on the RT thread. + pub fn new(sample_rate: u32, record_dir: &str, max_block_samples: usize) -> Result { let (recorder_sender, recorder_receiver) = bounded::(BLOCK_CHANNEL_CAPACITY); + let (recycle_sender, recycle_receiver) = bounded::(BUFFER_POOL_SIZE); fs::create_dir_all(record_dir)?; + // Pre-allocate the buffer pool. Each input sample becomes two + // interleaved stereo `i16`s, so size for `max_block_samples * 2`. + for _ in 0..BUFFER_POOL_SIZE { + // Can't fail: the channel is empty and sized to match the loop. + let _ = recycle_sender.try_send(AudioBlock::with_capacity(max_block_samples * 2)); + } + let filename = format!( "{record_dir}/recording_{}.wav", chrono::Local::now().format("%Y%m%d_%H%M%S") ); info!("Recording to: {filename}"); - let handle = - thread::spawn(move || run_writer_thread(sample_rate, filename, recorder_receiver)); + let handle = thread::spawn(move || { + run_writer_thread(sample_rate, filename, recorder_receiver, &recycle_sender); + }); Ok(Self { recorder_sender, + recycle_receiver, + max_block_samples, handle, }) } @@ -43,8 +70,22 @@ impl Recorder { } /// Record block takes a slice of f32 samples and sends them to the writer thread. + /// + /// Reuses a pre-allocated buffer from the recycle pool, so it never + /// allocates. A block larger than the pool was sized for is dropped (it + /// would otherwise force a reallocation). The pool is sized so a free buffer + /// is always available; the blocking `send` provides backpressure (as + /// before) without the per-block `Vec` allocation. If the pool is somehow + /// momentarily empty (writer stalled) the block is dropped rather than + /// blocking — and a stalled writer means a full `send` would block anyway. pub fn record_block(&self, samples: &[f32]) -> Result<()> { - let mut block = AudioBlock::with_capacity(samples.len() * 2); + if samples.len() > self.max_block_samples { + return Ok(()); + } + let Ok(mut block) = self.recycle_receiver.try_recv() else { + return Ok(()); + }; + block.clear(); for sample in samples { let v = (sample * i16::MAX as f32).clamp(i16::MIN as f32, i16::MAX as f32) as i16; block.push(v); @@ -57,7 +98,12 @@ impl Recorder { } /// Runs the writer thread, that writes audio blocks received over its channel to a WAV file. -fn run_writer_thread(sample_rate: u32, filename: String, recorder_receiver: Receiver) { +fn run_writer_thread( + sample_rate: u32, + filename: String, + recorder_receiver: Receiver, + recycle_sender: &Sender, +) { let spec = hound::WavSpec { channels: 2, sample_rate, @@ -79,6 +125,9 @@ fn run_writer_thread(sample_rate: u32, filename: String, recorder_receiver: Rece error!("Failed to write sample to WAV file '{filename}': {e}"); } } + // Return the buffer to the pool for reuse. If the pool is full or the + // RT side has gone away, just drop it. + let _ = recycle_sender.try_send(block); } if let Err(e) = writer.finalize() { @@ -105,10 +154,10 @@ mod tests { let temp_dir = TempDir::new()?; let record_dir = temp_dir.path().to_str().unwrap(); - let recorder = Recorder::new(SAMPLE_RATE, record_dir)?; + let block_size = 256; + let recorder = Recorder::new(SAMPLE_RATE, record_dir, block_size)?; let total_samples = (SAMPLE_RATE as f32 * DURATION_SECS) as usize; - let block_size = 256; let mut generated_samples = 0; while generated_samples < total_samples { diff --git a/rustortion-core/src/ir/cabinet.rs b/rustortion-core/src/ir/cabinet.rs index d145f4e..9e5c57e 100644 --- a/rustortion-core/src/ir/cabinet.rs +++ b/rustortion-core/src/ir/cabinet.rs @@ -15,7 +15,10 @@ pub enum ConvolverType { pub const DEFAULT_MAX_IR_MS: usize = 50; pub struct IrCabinet { - convolver: Convolver, + /// Boxed so the convolver can be swapped in/out on the RT thread by + /// exchanging pointers (`swap_convolver`) without moving the heavy + /// convolver struct or allocating to type-erase it for `rt_drop`. + convolver: Box, bypassed: bool, output_gain: f32, @@ -23,10 +26,10 @@ pub struct IrCabinet { impl IrCabinet { pub fn new(convolver_type: ConvolverType, max_ir_samples: usize) -> Self { - let convolver = match convolver_type { + let convolver = Box::new(match convolver_type { ConvolverType::Fir => Convolver::new_fir(max_ir_samples), ConvolverType::TwoStage => Convolver::new_two_stage(), - }; + }); debug!("IrCabinet created: {convolver_type:?} convolver, max {max_ir_samples} samples"); @@ -37,8 +40,17 @@ impl IrCabinet { } } - pub const fn swap_convolver(&mut self, convolver: Convolver) -> Convolver { - std::mem::replace(&mut self.convolver, convolver) + /// RT-safe convolver swap: exchanges the cabinet's convolver with `other` + /// in place. Neither side allocates or deallocates — the caller is left + /// holding the previous convolver (e.g. to retire it off the RT thread). + pub const fn swap_convolver(&mut self, other: &mut Box) { + std::mem::swap(&mut self.convolver, other); + } + + /// Install a convolver by value, reusing the existing heap allocation. + /// Intended for setup and tests, not the RT thread. + pub fn set_convolver(&mut self, convolver: Convolver) { + *self.convolver = convolver; } pub fn clear_convolver(&mut self) { diff --git a/rustortion-core/src/ir/load_service.rs b/rustortion-core/src/ir/load_service.rs index d8934b6..26b70d6 100644 --- a/rustortion-core/src/ir/load_service.rs +++ b/rustortion-core/src/ir/load_service.rs @@ -128,7 +128,7 @@ pub fn spawn( build_convolver(coefficients, convolver_type, max_ir_samples); let prepared = PreparedIr { name: name.clone(), - convolver, + convolver: Box::new(convolver), }; engine_handle.swap_ir_convolver(prepared); diff --git a/rustortion-core/tests/no_alloc.rs b/rustortion-core/tests/no_alloc.rs index 57afc20..af047fe 100644 --- a/rustortion-core/tests/no_alloc.rs +++ b/rustortion-core/tests/no_alloc.rs @@ -28,7 +28,7 @@ use rustortion_core::amp::stages::poweramp::{PowerAmpStage, PowerAmpType}; use rustortion_core::amp::stages::preamp::PreampStage; use rustortion_core::amp::stages::reverb::ReverbStage; use rustortion_core::amp::stages::tonestack::{ToneStackModel, ToneStackStage}; -use rustortion_core::audio::engine::{Engine, EngineHandle}; +use rustortion_core::audio::engine::{Engine, EngineHandle, PreparedIr}; use rustortion_core::audio::peak_meter::PeakMeter; use rustortion_core::audio::rt_drop::{RtDropHandle, RtDropReceiver}; use rustortion_core::audio::samplers::Samplers; @@ -346,7 +346,7 @@ mod ir_cabinet { // out of scope — see extras::peak_meter_does_not_allocate. let max_ir_samples = (SAMPLE_RATE * DEFAULT_MAX_IR_MS) / 1000; let mut cabinet = IrCabinet::new(ConvolverType::Fir, max_ir_samples); - cabinet.swap_convolver(make_fir_convolver()); + cabinet.set_convolver(make_fir_convolver()); let (mut engine, _handle, _rx) = plugin_engine_with_ir(1.0, cabinet); let (input, mut output) = buffers(); @@ -358,7 +358,7 @@ mod ir_cabinet { // Covers: IrCabinet + TwoStageConvolver (FFT) process_block. let max_ir_samples = (SAMPLE_RATE * DEFAULT_MAX_IR_MS) / 1000; let mut cabinet = IrCabinet::new(ConvolverType::TwoStage, max_ir_samples); - cabinet.swap_convolver(make_two_stage_convolver()); + cabinet.set_convolver(make_two_stage_convolver()); let (mut engine, _handle, _rx) = plugin_engine_with_ir(1.0, cabinet); let (input, mut output) = buffers(); @@ -378,7 +378,7 @@ mod ir_cabinet { let mut cabinet = IrCabinet::new(ConvolverType::Fir, max_ir_samples); let mut convolver = Convolver::new_fir(max_ir_samples); convolver.set_ir(&ir_samples).unwrap(); - cabinet.swap_convolver(convolver); + cabinet.set_convolver(convolver); let (mut engine, _handle, _rx) = plugin_engine_with_ir(1.0, cabinet); let (input, mut output) = buffers(); @@ -435,19 +435,20 @@ mod extras { #[test] fn pitch_shifter_does_not_allocate() { - // Covers: PitchShifter::new (one-shot at SetPitchShift drain) AND - // process_block (per audio block). Both allocate; engine.rs gates - // them with permit_alloc + FIXME (pitch_shifter.rs). - // - // We send `set_pitch_shift` *inside* the assert scope so the - // construction is observed — the warm-up call would otherwise drain - // the message before the assertion starts. + // Covers: the RT-side SetPitchShift drain (mem::replace + rt_drop.retire + // of the previous shifter) AND PitchShifter::process_block (per audio + // block). The shifter itself is now constructed off the RT thread inside + // EngineHandle::set_pitch_shift, so we call that *before* the assert + // scope (GUI-thread work) and assert that draining the message and + // running the shifter on the hot path allocates nothing. let (mut engine, handle, _rx) = plugin_engine(1.0); let (input, mut output) = buffers(); engine.process(&input, &mut output).unwrap(); + // Construct + queue the shifter off the asserted path. + handle.set_pitch_shift(7); + let violations = check_no_alloc(|| { - handle.set_pitch_shift(7); for _ in 0..16 { engine.process(&input, &mut output).unwrap(); } @@ -461,16 +462,229 @@ mod extras { #[test] fn recorder_record_block_does_not_allocate() { // Covers: Recorder::record_block called from Engine::process under - // lightweight=false. The recorder allocates internally; engine.rs - // wraps that call in permit_alloc with a FIXME pointing to - // recorder.rs:47. + // lightweight=false. The recorder reuses a pre-allocated buffer pool + // (see recorder.rs) so the RT-thread call allocates nothing. let (mut engine, handle) = full_engine(1.0, None); let tmp = tempfile::tempdir().unwrap(); handle - .start_recording(SAMPLE_RATE, tmp.path().to_str().unwrap()) + .start_recording(SAMPLE_RATE, tmp.path().to_str().unwrap(), BUFFER_SIZE) .unwrap(); let (input, mut output) = buffers(); assert_engine_alloc_free(&mut engine, &input, &mut output, 32); } } + +// --------------------------------------------------------------------------- +// Message-handler arms exercised *inside* the asserted scope +// --------------------------------------------------------------------------- + +mod message_arms { + //! Each test queues an `EngineMessage` and then drains it via + //! `engine.handle_messages()` *inside* `check_no_alloc`, so any allocation + //! or deallocation performed by the message handler on the RT thread is + //! caught. + //! + //! This guards against "warm-up blindness": the shared + //! `assert_engine_alloc_free` helper drains pending messages in its + //! pre-assertion warm-up `process()` call, which would hide allocations in + //! `handle_messages`. + //! + //! Building and *sending* the message (`EngineHandle::*`) is GUI-thread work + //! — it boxes the payload and `try_send`s it — and so is done *before* the + //! scope. Only the RT-side drain is asserted. We drain via + //! `handle_messages()` rather than `process()` so the audit is scoped to the + //! handlers and isn't perturbed by a freshly swapped-in samplers/convolver's + //! first-`process()` cold-start (a separate, pre-existing cost covered by + //! the steady-state hot-path tests). + + use super::*; + + /// Queue `setup` messages, drain + warm up outside the audit, then drain + /// `body`'s queued message under `check_no_alloc`. Returns the violations. + fn assert_drain_alloc_free( + engine: &mut Engine, + input: &[f32], + output: &mut [f32], + queue: impl FnOnce(), + ) -> u32 { + // Drain any setup messages and amortise first-call work outside the + // audit. + engine.process(input, output).unwrap(); + // GUI-thread work: build + send the message under test. + queue(); + // Assert only the RT-side drain. + check_no_alloc(|| engine.handle_messages()) + } + + #[test] + fn add_stage_drain_does_not_allocate() { + // AddStage inserts into AmplifierChain.stages. The chain reserves + // DEFAULT_CHAIN_CAPACITY up front so the insert does not reallocate. + let (mut engine, handle, _rx) = plugin_engine(1.0); + let (input, mut output) = buffers(); + let violations = assert_drain_alloc_free(&mut engine, &input, &mut output, || { + handle.add_stage(0, Box::new(LevelStage::new(0.5))); + }); + assert_eq!( + violations, 0, + "AddStage drain allocated {violations} time(s)" + ); + } + + #[test] + fn replace_stage_drain_does_not_allocate() { + // ReplaceStage swaps the stage in place and retires the old one via + // rt_drop (a non-allocating try_send). + let (mut engine, handle, _rx) = plugin_engine(1.0); + let (input, mut output) = buffers(); + handle.add_stage(0, Box::new(LevelStage::new(0.5))); + let violations = assert_drain_alloc_free(&mut engine, &input, &mut output, || { + handle.replace_stage(0, Box::new(LevelStage::new(0.25))); + }); + assert_eq!( + violations, 0, + "ReplaceStage drain allocated {violations} time(s)" + ); + } + + #[test] + fn remove_stage_drain_does_not_allocate() { + // RemoveStage removes from the Vec and retires the old stage via + // rt_drop. Vec::remove does not reallocate. + let (mut engine, handle, _rx) = plugin_engine(1.0); + let (input, mut output) = buffers(); + handle.add_stage(0, Box::new(LevelStage::new(0.5))); + let violations = assert_drain_alloc_free(&mut engine, &input, &mut output, || { + handle.remove_stage(0); + }); + assert_eq!( + violations, 0, + "RemoveStage drain allocated {violations} time(s)" + ); + } + + #[test] + fn set_input_filters_drain_does_not_allocate() { + // SetInputFilters now retires the previous filters via rt_drop rather + // than dropping them on assignment. Install filters first so the + // asserted swap exercises the retire path with non-None old values. + let (mut engine, handle, _rx) = plugin_engine(1.0); + let (input, mut output) = buffers(); + handle.set_input_filters( + Some(Box::new(FilterStage::new( + FilterType::Highpass, + 80.0, + SAMPLE_RATE_F32, + ))), + Some(Box::new(FilterStage::new( + FilterType::Lowpass, + 8000.0, + SAMPLE_RATE_F32, + ))), + ); + let violations = assert_drain_alloc_free(&mut engine, &input, &mut output, || { + handle.set_input_filters( + Some(Box::new(FilterStage::new( + FilterType::Highpass, + 100.0, + SAMPLE_RATE_F32, + ))), + Some(Box::new(FilterStage::new( + FilterType::Lowpass, + 6000.0, + SAMPLE_RATE_F32, + ))), + ); + }); + assert_eq!( + violations, 0, + "SetInputFilters drain allocated {violations} time(s)" + ); + } + + #[test] + fn set_samplers_drain_does_not_allocate() { + // SetSamplers swaps the boxed samplers and retires the old box directly + // (no Box::new type-erasure on the RT thread). + let (mut engine, handle, _rx) = plugin_engine(1.0); + let (input, mut output) = buffers(); + let samplers = Samplers::new(BUFFER_SIZE, 2.0, SAMPLE_RATE).unwrap(); + let violations = assert_drain_alloc_free(&mut engine, &input, &mut output, || { + handle.set_samplers(samplers); + }); + assert_eq!( + violations, 0, + "SetSamplers drain allocated {violations} time(s)" + ); + } + + #[test] + fn swap_ir_convolver_drain_does_not_allocate() { + // SwapIrConvolver swaps the boxed convolver in place and retires the + // whole PreparedIr (old convolver + name String) via rt_drop. + let max_ir_samples = (SAMPLE_RATE * DEFAULT_MAX_IR_MS) / 1000; + let mut cabinet = IrCabinet::new(ConvolverType::Fir, max_ir_samples); + cabinet.set_convolver(make_fir_convolver()); + let (mut engine, handle, _rx) = plugin_engine_with_ir(1.0, cabinet); + let (input, mut output) = buffers(); + let prepared = PreparedIr { + name: "swap-test".to_string(), + convolver: Box::new(make_fir_convolver()), + }; + let violations = assert_drain_alloc_free(&mut engine, &input, &mut output, || { + handle.swap_ir_convolver(prepared); + }); + assert_eq!( + violations, 0, + "SwapIrConvolver drain allocated {violations} time(s)" + ); + } + + #[test] + fn set_tuner_enabled_drain_does_not_allocate() { + // SetTunerEnabled(true) flips a bool; the subsequent tuner.process + // appends into a pre-allocated 4096-sample buffer. Stay well under the + // detection threshold so no AMDF run (which allocates a readout) fires. + // This one drives process() (not just the drain) to also cover the + // tuner buffer-growth path. + let (mut engine, handle) = full_engine(1.0, None); + let (input, mut output) = buffers(); + engine.process(&input, &mut output).unwrap(); + + let violations = check_no_alloc(|| { + handle.set_tuner_enabled(true); + for _ in 0..8 { + engine.process(&input, &mut output).unwrap(); + } + }); + assert_eq!( + violations, 0, + "SetTunerEnabled drain allocated {violations} time(s)" + ); + } +} + +// --------------------------------------------------------------------------- +// Metronome +// --------------------------------------------------------------------------- + +#[test] +fn metronome_processing_does_not_allocate() { + // Covers: Metronome::process_block, the per-block work behind + // Engine::process_metronome (driven on the RT thread for the standalone + // metronome output port). + let mut metronome = Metronome::new(120.0, SAMPLE_RATE); + metronome.toggle_metronome(); // enable + let mut output = vec![0.0_f32; BUFFER_SIZE]; + + let violations = check_no_alloc(|| { + for _ in 0..32 { + metronome.process_block(&mut output); + } + }); + assert_eq!( + violations, 0, + "metronome processing allocated {violations} time(s)" + ); +} diff --git a/rustortion-plugin/src/ir_helper.rs b/rustortion-plugin/src/ir_helper.rs index 4def729..bf975c9 100644 --- a/rustortion-plugin/src/ir_helper.rs +++ b/rustortion-plugin/src/ir_helper.rs @@ -35,7 +35,7 @@ fn set_ir_samples(handle: &EngineHandle, name: &str, ir_samples: &[f32], sample_ } else { handle.swap_ir_convolver(PreparedIr { name: name.to_string(), - convolver, + convolver: Box::new(convolver), }); } } diff --git a/rustortion-standalone/src/gui/app.rs b/rustortion-standalone/src/gui/app.rs index d8c420d..ee1388d 100644 --- a/rustortion-standalone/src/gui/app.rs +++ b/rustortion-standalone/src/gui/app.rs @@ -304,14 +304,13 @@ impl AmplifierApp { match message { Message::StartRecording => { let sample_rate = self.shared.backend.manager().sample_rate(); + let buffer_size = self.shared.backend.manager().buffer_size(); let recording_dir = &self.settings.recording_dir; - if let Err(e) = self - .shared - .backend - .manager() - .engine() - .start_recording(sample_rate, recording_dir) - { + if let Err(e) = self.shared.backend.manager().engine().start_recording( + sample_rate, + recording_dir, + buffer_size, + ) { error!("Failed to start recording: {e}"); } else { self.shared.is_recording = true; diff --git a/rustortion-standalone/tests/engine.rs b/rustortion-standalone/tests/engine.rs index 4039469..5e24136 100644 --- a/rustortion-standalone/tests/engine.rs +++ b/rustortion-standalone/tests/engine.rs @@ -230,22 +230,32 @@ fn samplers_preserve_tone_signal() -> Result<()> { }) .collect(); - // FFT samplers have a delay so we need to prime them. + // FFT samplers have a delay so we need to prime them (this also + // amortises rubato's first-call scratch setup before the audit). for _ in 0..10 { samplers.copy_input(&input)?; let _ = samplers.upsample()?; let _ = samplers.downsample()?; } - samplers.copy_input(&input)?; - - let upsampled = samplers.upsample()?; - let upsampled_rms = - (upsampled.iter().map(|x| x * x).sum::() / upsampled.len() as f32).sqrt(); - - let downsampled = samplers.downsample()?; - let downsampled_rms = - (downsampled.iter().map(|x| x * x).sum::() / downsampled.len() as f32).sqrt(); + // The steady-state up/downsample hot path must not allocate on the RT + // thread. Measure the RMS values inside the asserted scope. + let mut upsampled_rms = 0.0f32; + let mut downsampled_rms = 0.0f32; + let violations = check_no_alloc(|| { + samplers.copy_input(&input).unwrap(); + let upsampled = samplers.upsample().unwrap(); + upsampled_rms = + (upsampled.iter().map(|x| x * x).sum::() / upsampled.len() as f32).sqrt(); + + let downsampled = samplers.downsample().unwrap(); + downsampled_rms = + (downsampled.iter().map(|x| x * x).sum::() / downsampled.len() as f32).sqrt(); + }); + assert_eq!( + violations, 0, + "{oversample}x oversample: samplers allocated on RT path" + ); let input_rms = (input.iter().map(|x| x * x).sum::() / input.len() as f32).sqrt(); @@ -289,7 +299,14 @@ fn engine_tuner_enabled_no_output() -> Result<()> { let input = vec![0.0f32; BUFFER_SIZE]; let mut output = vec![0.0f32; BUFFER_SIZE]; - engine.process(&input, &mut output)?; + + // With the tuner enabled, process() runs the tuner and returns early. The + // tuner appends one block into its pre-allocated 4096-sample buffer, so the + // RT path allocates nothing. + let violations = check_no_alloc(|| { + engine.process(&input, &mut output).unwrap(); + }); + assert_eq!(violations, 0, "tuner-enabled process allocated on RT path"); assert!(output.iter().all(|&x| x == 0.0), "expected silent output"); From 46db4069529a4beea35e7e0726603c8c556ae4af Mon Sep 17 00:00:00 2001 From: OpenSauce Date: Wed, 3 Jun 2026 23:26:45 +0100 Subject: [PATCH 2/4] refactor(recorder): make record_block fully non-blocking (RT-safe) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up on the #239 recorder fix. The previous step removed the per-block `Vec` allocation but kept a blocking `send` for backpressure — which can park the audio thread on disk I/O (and crossbeam's parking allocates). Blocking is as much an RT hazard as allocating. Make the handoff fully non-blocking: `record_block` `try_send`s a pooled buffer and, if the writer has fallen behind, drops the block and records an overrun instead of blocking. The audio output thread is never held hostage by the disk — a disk stall now degrades the recording (a gap) rather than the live output (an xrun), matching how DAWs behave. - Size the buffer pool / handoff channel by audio rate (`BUFFER_SECONDS` of slack) so the writer can lag several seconds before anything is dropped — bounded (no RT alloc) but effectively unbounded in practice (~1.5 MB). - Add an `overruns()` counter so dropped audio is observable rather than silent. - test_recorder verifies the buffered path stays lossless under load. --- rustortion-core/src/audio/recorder.rs | 90 ++++++++++++++++++++------- 1 file changed, 67 insertions(+), 23 deletions(-) diff --git a/rustortion-core/src/audio/recorder.rs b/rustortion-core/src/audio/recorder.rs index 1ea8d2f..dee82ec 100644 --- a/rustortion-core/src/audio/recorder.rs +++ b/rustortion-core/src/audio/recorder.rs @@ -1,26 +1,37 @@ use anyhow::Result; -use crossbeam::channel::{Receiver, Sender, bounded}; +use crossbeam::channel::{Receiver, Sender, TrySendError, bounded}; use hound::WavWriter; use log::{error, info}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{fs, thread}; type AudioBlock = Vec; -const BLOCK_CHANNEL_CAPACITY: usize = 64; -/// Number of pre-allocated buffers cycling between the RT thread and the writer. -/// Sized to twice the in-flight channel capacity so a free buffer is always -/// available to `record_block` (it never starves the pool) — the blocking -/// `send` remains the sole backpressure throttle, exactly as before, only now -/// without the per-block `Vec` allocation. -const BUFFER_POOL_SIZE: usize = BLOCK_CHANNEL_CAPACITY * 2; + +/// Pre-allocate enough buffering for this many seconds of audio. Bounded (so the +/// RT thread never allocates), but sized so large it's effectively unbounded in +/// practice: the writer would have to fall this far behind — a multi-second disk +/// stall — before `record_block` drops anything. At stereo 16-bit this is only +/// ~`BUFFER_SECONDS * sample_rate * 4` bytes (≈1.5 MB for 8 s @ 48 kHz). +const BUFFER_SECONDS: usize = 8; +/// Floor on the buffer size in blocks, in case the host block size is huge. +const MIN_BUFFER_BLOCKS: usize = 16; pub struct Recorder { + /// Non-blocking handoff of filled buffers to the writer thread. recorder_sender: Sender, /// Pool of emptied buffers returned by the writer thread for reuse, so /// `record_block` never allocates on the RT thread. recycle_receiver: Receiver, + /// Used to return a buffer to the pool when the handoff channel is full. + recycle_sender: Sender, /// Largest input block (in samples) the pre-allocated buffers can hold /// without reallocating. Blocks larger than this are dropped. max_block_samples: usize, + /// Count of blocks dropped because the writer couldn't keep up (disk + /// stall). The RT thread never blocks on the writer — it drops instead — + /// so this surfaces any lost audio. + overruns: Arc, handle: thread::JoinHandle<()>, } @@ -31,13 +42,21 @@ impl Recorder { /// asked to handle; the buffer pool is pre-sized to it so that /// `record_block` performs no allocation on the RT thread. pub fn new(sample_rate: u32, record_dir: &str, max_block_samples: usize) -> Result { - let (recorder_sender, recorder_receiver) = bounded::(BLOCK_CHANNEL_CAPACITY); - let (recycle_sender, recycle_receiver) = bounded::(BUFFER_POOL_SIZE); + // Size the buffer pool / handoff channel by time so it absorbs several + // seconds of writer lag before ever dropping a block. Both the channel + // and the pool hold the same number of buffers so the producer never + // starves the pool while the channel still has room. + let buffer_blocks = (BUFFER_SECONDS * sample_rate as usize) + .div_ceil(max_block_samples.max(1)) + .max(MIN_BUFFER_BLOCKS); + + let (recorder_sender, recorder_receiver) = bounded::(buffer_blocks); + let (recycle_sender, recycle_receiver) = bounded::(buffer_blocks); fs::create_dir_all(record_dir)?; // Pre-allocate the buffer pool. Each input sample becomes two // interleaved stereo `i16`s, so size for `max_block_samples * 2`. - for _ in 0..BUFFER_POOL_SIZE { + for _ in 0..buffer_blocks { // Can't fail: the channel is empty and sized to match the loop. let _ = recycle_sender.try_send(AudioBlock::with_capacity(max_block_samples * 2)); } @@ -48,18 +67,32 @@ impl Recorder { ); info!("Recording to: {filename}"); + let writer_recycle_sender = recycle_sender.clone(); let handle = thread::spawn(move || { - run_writer_thread(sample_rate, filename, recorder_receiver, &recycle_sender); + run_writer_thread( + sample_rate, + filename, + recorder_receiver, + &writer_recycle_sender, + ); }); Ok(Self { recorder_sender, recycle_receiver, + recycle_sender, max_block_samples, + overruns: Arc::new(AtomicU64::new(0)), handle, }) } + /// Number of audio blocks dropped because the writer thread fell behind. + /// Zero in normal operation; non-zero indicates the disk couldn't keep up. + pub fn overruns(&self) -> u64 { + self.overruns.load(Ordering::Relaxed) + } + /// Stops the recording and waits for the writer thread to finish. /// This is needed for WAV files to be finalized properly. pub fn stop(self) -> Result<()> { @@ -69,20 +102,22 @@ impl Recorder { .map_err(|e| anyhow::anyhow!("Writer thread panicked (join failed): {e:?}")) } - /// Record block takes a slice of f32 samples and sends them to the writer thread. + /// Record a block of `f32` samples by handing a filled buffer to the writer + /// thread. /// - /// Reuses a pre-allocated buffer from the recycle pool, so it never - /// allocates. A block larger than the pool was sized for is dropped (it - /// would otherwise force a reallocation). The pool is sized so a free buffer - /// is always available; the blocking `send` provides backpressure (as - /// before) without the per-block `Vec` allocation. If the pool is somehow - /// momentarily empty (writer stalled) the block is dropped rather than - /// blocking — and a stalled writer means a full `send` would block anyway. + /// Real-time safe: it never allocates and never blocks. It takes a + /// pre-allocated buffer from the recycle pool, fills it, and `try_send`s it + /// to the writer (which buffers in memory and flushes to disk on its own + /// thread). If the writer has fallen behind — pool empty or handoff channel + /// full — the block is dropped and an overrun is recorded rather than + /// stalling the audio thread on disk I/O. pub fn record_block(&self, samples: &[f32]) -> Result<()> { if samples.len() > self.max_block_samples { + self.overruns.fetch_add(1, Ordering::Relaxed); return Ok(()); } let Ok(mut block) = self.recycle_receiver.try_recv() else { + self.overruns.fetch_add(1, Ordering::Relaxed); return Ok(()); }; block.clear(); @@ -91,9 +126,18 @@ impl Recorder { block.push(v); block.push(v); } - self.recorder_sender - .send(block) - .map_err(|e| anyhow::anyhow!("Failed to send audio block on channel: {e}")) + match self.recorder_sender.try_send(block) { + Ok(()) => Ok(()), + Err(TrySendError::Full(block)) => { + // Writer behind: return the buffer to the pool, drop the audio. + let _ = self.recycle_sender.try_send(block); + self.overruns.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + Err(TrySendError::Disconnected(_)) => { + Err(anyhow::anyhow!("recorder writer thread has stopped")) + } + } } } From dafb090aa95ce8290e333e4f83c6c4bfced1255f Mon Sep 17 00:00:00 2001 From: OpenSauce Date: Fri, 5 Jun 2026 20:34:10 +0100 Subject: [PATCH 3/4] fix(rt): close remaining dealloc/alloc-on-RT gaps - rt_drop: retire() leaked the Box on the RT thread when the bounded channel was full or disconnected (free() on RT). Leak it via mem::forget instead and surface the event through a leaked() counter; bump channel capacity 16 -> 64. - recorder: record_block constructed an anyhow error (alloc) on RT when the writer thread had exited. Count it as an overrun instead; return () rather than Result. - recorder pool: size for max(period, MAX_BUFFER_FRAMES) so a mid-recording JACK buffer-size increase doesn't drop every block. --- rustortion-core/src/audio/engine.rs | 2 +- rustortion-core/src/audio/recorder.rs | 20 +++++--- rustortion-core/src/audio/rt_drop.rs | 68 ++++++++++++++++++++++--- rustortion-standalone/src/audio/jack.rs | 5 +- rustortion-standalone/src/gui/app.rs | 13 ++++- 5 files changed, 90 insertions(+), 18 deletions(-) diff --git a/rustortion-core/src/audio/engine.rs b/rustortion-core/src/audio/engine.rs index 8531923..5e5d55f 100644 --- a/rustortion-core/src/audio/engine.rs +++ b/rustortion-core/src/audio/engine.rs @@ -182,7 +182,7 @@ impl Engine { if !self.lightweight && let Some(recorder) = self.recorder.as_mut() { - recorder.record_block(output)?; + recorder.record_block(output); } Ok(()) diff --git a/rustortion-core/src/audio/recorder.rs b/rustortion-core/src/audio/recorder.rs index dee82ec..6006afa 100644 --- a/rustortion-core/src/audio/recorder.rs +++ b/rustortion-core/src/audio/recorder.rs @@ -110,15 +110,17 @@ impl Recorder { /// to the writer (which buffers in memory and flushes to disk on its own /// thread). If the writer has fallen behind — pool empty or handoff channel /// full — the block is dropped and an overrun is recorded rather than - /// stalling the audio thread on disk I/O. - pub fn record_block(&self, samples: &[f32]) -> Result<()> { + /// stalling the audio thread on disk I/O. If the writer thread has exited + /// (e.g. it failed to create the WAV file), every block is likewise counted + /// as an overrun; the climbing count surfaces the failure off-RT. + pub fn record_block(&self, samples: &[f32]) { if samples.len() > self.max_block_samples { self.overruns.fetch_add(1, Ordering::Relaxed); - return Ok(()); + return; } let Ok(mut block) = self.recycle_receiver.try_recv() else { self.overruns.fetch_add(1, Ordering::Relaxed); - return Ok(()); + return; }; block.clear(); for sample in samples { @@ -127,15 +129,17 @@ impl Recorder { block.push(v); } match self.recorder_sender.try_send(block) { - Ok(()) => Ok(()), + Ok(()) => {} Err(TrySendError::Full(block)) => { // Writer behind: return the buffer to the pool, drop the audio. let _ = self.recycle_sender.try_send(block); self.overruns.fetch_add(1, Ordering::Relaxed); - Ok(()) } Err(TrySendError::Disconnected(_)) => { - Err(anyhow::anyhow!("recorder writer thread has stopped")) + // Writer thread is gone. Constructing an `anyhow` error here + // would allocate on the RT thread; count it as an overrun like + // any other lost block instead. + self.overruns.fetch_add(1, Ordering::Relaxed); } } } @@ -215,7 +219,7 @@ mod tests { block.push(sample); } - recorder.record_block(&block)?; + recorder.record_block(&block); generated_samples += samples_to_generate; } diff --git a/rustortion-core/src/audio/rt_drop.rs b/rustortion-core/src/audio/rt_drop.rs index e88f15b..22cc7af 100644 --- a/rustortion-core/src/audio/rt_drop.rs +++ b/rustortion-core/src/audio/rt_drop.rs @@ -1,4 +1,11 @@ use crossbeam::channel::{Receiver, Sender, bounded}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Capacity of the RT-drop channel. Sized generously so that a burst of stage +/// edits / IR swaps retired in a single `handle_messages` drain doesn't fill it +/// while the background drop thread is briefly descheduled. +const RT_DROP_CAPACITY: usize = 64; /// Handle for sending objects off the RT thread for deallocation. /// @@ -6,6 +13,11 @@ use crossbeam::channel::{Receiver, Sender, bounded}; /// drops these objects, keeping deallocations off the RT thread. pub struct RtDropHandle { drop_tx: Sender>, + /// Count of objects leaked (not dropped) because the channel was full or + /// disconnected. Non-zero means the background drop thread couldn't keep up + /// and memory was intentionally leaked to preserve the no-dealloc-on-RT + /// guarantee. + leaked: Arc, } /// Receiving end of the RT drop channel. @@ -20,14 +32,36 @@ pub struct RtDropReceiver { impl RtDropHandle { /// Create a paired handle and receiver for RT-safe object disposal. pub fn new() -> (Self, RtDropReceiver) { - let (drop_tx, drop_rx) = bounded(16); - (Self { drop_tx }, RtDropReceiver { drop_rx }) + let (drop_tx, drop_rx) = bounded(RT_DROP_CAPACITY); + ( + Self { + drop_tx, + leaked: Arc::new(AtomicU64::new(0)), + }, + RtDropReceiver { drop_rx }, + ) } /// Send an object to be dropped on a background thread. /// Uses `try_send` to never block the RT thread. + /// + /// If the channel is full (drop thread fell behind) or disconnected (drop + /// thread gone), the object is **leaked** rather than dropped here. Dropping + /// it would run the allocator's `free` on the RT thread, reintroducing + /// exactly the deallocation this mechanism exists to eliminate. The leak is + /// surfaced via [`leaked`](Self::leaked) so it isn't silent. pub fn retire(&self, value: Box) { - let _ = self.drop_tx.try_send(value); + if let Err(err) = self.drop_tx.try_send(value) { + std::mem::forget(err.into_inner()); + self.leaked.fetch_add(1, Ordering::Relaxed); + } + } + + /// Number of objects leaked because the drop channel was full or + /// disconnected. Zero in normal operation; non-zero indicates the + /// background drop thread couldn't keep up. + pub fn leaked(&self) -> u64 { + self.leaked.load(Ordering::Relaxed) } } @@ -45,6 +79,7 @@ impl RtDropReceiver { #[cfg(test)] mod tests { use super::*; + use std::sync::atomic::AtomicUsize; #[test] fn retire_sends_value_to_receiver() { @@ -56,11 +91,32 @@ mod tests { } #[test] - fn retire_does_not_block_when_full() { + fn retire_leaks_instead_of_dropping_when_full() { + struct DropCounter(Arc); + impl Drop for DropCounter { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::Relaxed); + } + } + + // Keep the receiver alive so failures are `Full`, not `Disconnected`. let (handle, _rx) = RtDropHandle::new(); - for i in 0..20 { - let boxed: Box = Box::new(i); + let drops = Arc::new(AtomicUsize::new(0)); + let overflow = 5; + + for _ in 0..(RT_DROP_CAPACITY + overflow) { + let boxed: Box = Box::new(DropCounter(drops.clone())); handle.retire(boxed); } + + // The overflow items were leaked (forgotten), not dropped on this + // thread. The first `RT_DROP_CAPACITY` are still queued in the channel + // (alive), so nothing has been deallocated yet. + assert_eq!(handle.leaked(), overflow as u64); + assert_eq!( + drops.load(Ordering::Relaxed), + 0, + "overflow items must be leaked, not deallocated on the RT thread" + ); } } diff --git a/rustortion-standalone/src/audio/jack.rs b/rustortion-standalone/src/audio/jack.rs index d84aff4..c08adac 100644 --- a/rustortion-standalone/src/audio/jack.rs +++ b/rustortion-standalone/src/audio/jack.rs @@ -40,7 +40,10 @@ impl jack::NotificationHandler for NotificationHandler { } impl ProcessHandler { - const MAX_BUFFER_FRAMES: usize = 8192; + /// Largest JACK period (in frames) sized for without reallocating. Also + /// used to size the recorder's buffer pool so a mid-recording buffer-size + /// increase up to this bound doesn't start dropping blocks. + pub const MAX_BUFFER_FRAMES: usize = 8192; pub fn new(client: &Client, audio_engine: Engine) -> Result { let ports = Ports::new(client).context("failed to create audio ports")?; diff --git a/rustortion-standalone/src/gui/app.rs b/rustortion-standalone/src/gui/app.rs index ee1388d..55f2730 100644 --- a/rustortion-standalone/src/gui/app.rs +++ b/rustortion-standalone/src/gui/app.rs @@ -304,12 +304,21 @@ impl AmplifierApp { match message { Message::StartRecording => { let sample_rate = self.shared.backend.manager().sample_rate(); - let buffer_size = self.shared.backend.manager().buffer_size(); + // Size the recorder pool for the worst-case JACK period, not the + // current one: JACK can raise the buffer size mid-recording, and + // a pool sized to the smaller period would then drop every block + // as an overrun. See `ProcessHandler::MAX_BUFFER_FRAMES`. + let max_block_samples = self + .shared + .backend + .manager() + .buffer_size() + .max(crate::audio::jack::ProcessHandler::MAX_BUFFER_FRAMES); let recording_dir = &self.settings.recording_dir; if let Err(e) = self.shared.backend.manager().engine().start_recording( sample_rate, recording_dir, - buffer_size, + max_block_samples, ) { error!("Failed to start recording: {e}"); } else { From 83d0ff7af0f972866cc919ff950a22f3a5415900 Mon Sep 17 00:00:00 2001 From: OpenSauce Date: Fri, 5 Jun 2026 20:52:19 +0100 Subject: [PATCH 4/4] fix(rt): hard-cap chain length so AddStage never reallocates on RT insert_stage (the RT AddStage path) reallocated the stages Vec once a chain grew past the reserved capacity, reintroducing alloc (and dealloc of the old buffer) on the audio thread. A standalone user could trigger it by adding stages live past the reserve. - Reserve raised 16 -> 64 (~1.5 KB) and made the hard cap. - insert_stage now refuses to grow past reserved capacity, returning the rejected stage so the engine retires it via rt_drop instead of freeing it on RT. - UI caps the chain at DEFAULT_CHAIN_CAPACITY and disables Add Stage at the cap. - no_alloc: add add_stage_at_capacity_does_not_allocate covering the capacity-crossing drain. --- rustortion-core/src/amp/chain.rs | 30 +++++++++++++++++++++-------- rustortion-core/src/audio/engine.rs | 10 ++++++++-- rustortion-core/tests/no_alloc.rs | 23 +++++++++++++++++++++- rustortion-ui/src/app.rs | 25 +++++++++++++++--------- 4 files changed, 68 insertions(+), 20 deletions(-) diff --git a/rustortion-core/src/amp/chain.rs b/rustortion-core/src/amp/chain.rs index f685474..6729e12 100644 --- a/rustortion-core/src/amp/chain.rs +++ b/rustortion-core/src/amp/chain.rs @@ -5,10 +5,13 @@ struct BypassableStage { bypassed: bool, } -/// Default stage capacity reserved up front so that adding/inserting stages on -/// the RT thread (`AddStage`) doesn't reallocate the backing `Vec` until the -/// chain grows past this many stages. -pub const DEFAULT_CHAIN_CAPACITY: usize = 16; +/// Stage capacity reserved up front, and the hard cap on chain length. +/// +/// `insert_stage` (the RT `AddStage` path) never grows the backing `Vec` past +/// its reserved capacity, so it never reallocates on the RT thread; the UI +/// enforces the same number as the maximum stage count. ~24 bytes per slot, so +/// ~1.5 KB reserved. +pub const DEFAULT_CHAIN_CAPACITY: usize = 64; // AmplifierChain holds a sequence of processing stages. pub struct AmplifierChain { @@ -80,8 +83,18 @@ impl AmplifierChain { self.stages.get(idx).map(|s| s.inner.get_parameter(name)) } - /// Insert a stage at the given index. - pub fn insert_stage(&mut self, idx: usize, stage: Box) { + /// Insert a stage at the given index **without reallocating**. + /// + /// This runs on the RT thread (via `AddStage`). If the chain is already at + /// its reserved capacity, the stage is **returned** rather than inserted, so + /// the caller can dispose of it off the RT thread (growing the `Vec` here + /// would allocate, and dropping the rejected box here would free, on the + /// audio thread). Returns `None` when the stage was inserted. + #[must_use] + pub fn insert_stage(&mut self, idx: usize, stage: Box) -> Option> { + if self.stages.len() == self.stages.capacity() { + return Some(stage); + } let idx = idx.min(self.stages.len()); self.stages.insert( idx, @@ -90,6 +103,7 @@ impl AmplifierChain { bypassed: false, }, ); + None } /// Remove and return the stage at the given index. @@ -162,7 +176,7 @@ mod tests { let mut chain = AmplifierChain::new(); chain.add_stage(make_level(1.0)); chain.add_stage(make_level(1.0)); - chain.insert_stage(1, make_level(0.5)); + assert!(chain.insert_stage(1, make_level(0.5)).is_none()); let out = chain.process(1.0); assert!((out - 0.5).abs() < 1e-6); } @@ -257,7 +271,7 @@ mod tests { let mut chain = AmplifierChain::new(); chain.add_stage(make_level(1.0)); chain.set_bypassed(0, true); - chain.insert_stage(0, make_level(0.5)); // insert before bypassed + assert!(chain.insert_stage(0, make_level(0.5)).is_none()); // insert before bypassed // idx 0 = new (active, 0.5x), idx 1 = old (bypassed, 1.0x) let out = chain.process(1.0); assert!( diff --git a/rustortion-core/src/audio/engine.rs b/rustortion-core/src/audio/engine.rs index 5e5d55f..21ac102 100644 --- a/rustortion-core/src/audio/engine.rs +++ b/rustortion-core/src/audio/engine.rs @@ -263,8 +263,14 @@ impl Engine { } } EngineMessage::AddStage(idx, stage) => { - self.chain.insert_stage(idx, stage); - debug!("Added stage at index {idx}"); + if let Some(rejected) = self.chain.insert_stage(idx, stage) { + // Chain is at its reserved capacity. Retire the rejected + // stage off the RT thread rather than dropping (freeing) + // it here. The UI caps stage count, so this is a backstop. + self.rt_drop.retire(rejected); + } else { + debug!("Added stage at index {idx}"); + } } EngineMessage::RemoveStage(idx) => { if let Some(old) = self.chain.remove_stage(idx) { diff --git a/rustortion-core/tests/no_alloc.rs b/rustortion-core/tests/no_alloc.rs index af047fe..873e823 100644 --- a/rustortion-core/tests/no_alloc.rs +++ b/rustortion-core/tests/no_alloc.rs @@ -14,7 +14,7 @@ use assert_no_alloc::{AllocDisabler, assert_no_alloc, reset_violation_count, violation_count}; use hound::{WavSpec, WavWriter}; -use rustortion_core::amp::chain::AmplifierChain; +use rustortion_core::amp::chain::{AmplifierChain, DEFAULT_CHAIN_CAPACITY}; use rustortion_core::amp::stages::Stage; use rustortion_core::amp::stages::clipper::ClipperType; use rustortion_core::amp::stages::compressor::CompressorStage; @@ -532,6 +532,27 @@ mod message_arms { ); } + #[test] + fn add_stage_at_capacity_does_not_allocate() { + // Fill the chain to its reserved capacity (off-audit), then assert the + // capacity-crossing AddStage drain neither reallocates nor deallocates: + // the chain rejects the stage instead of growing its Vec, and the engine + // retires the rejected box via rt_drop rather than freeing it on RT. + let (mut engine, handle, _rx) = plugin_engine(1.0); + let (input, mut output) = buffers(); + for _ in 0..DEFAULT_CHAIN_CAPACITY { + handle.add_stage(0, Box::new(LevelStage::new(1.0))); + engine.process(&input, &mut output).unwrap(); + } + let violations = assert_drain_alloc_free(&mut engine, &input, &mut output, || { + handle.add_stage(0, Box::new(LevelStage::new(0.5))); + }); + assert_eq!( + violations, 0, + "at-capacity AddStage drain allocated {violations} time(s)" + ); + } + #[test] fn replace_stage_drain_does_not_allocate() { // ReplaceStage swaps the stage in place and retires the old one via diff --git a/rustortion-ui/src/app.rs b/rustortion-ui/src/app.rs index 6ae2594..8713be0 100644 --- a/rustortion-ui/src/app.rs +++ b/rustortion-ui/src/app.rs @@ -22,6 +22,7 @@ use crate::stages::{ }; use crate::tabs::Tab; use crate::tr; +use rustortion_core::amp::chain::DEFAULT_CHAIN_CAPACITY; use rustortion_core::preset::InputFilterConfig; const REBUILD_INTERVAL: Duration = Duration::from_millis(100); @@ -90,14 +91,18 @@ impl SharedApp { } Message::RebuildTick => self.flush_dirty_params(), Message::AddStage => { - self.flush_dirty_params(); - let new_stage = StageConfig::from(self.selected_stage_type); - let category = new_stage.category(); - let insert_idx = self.category_end_index(category); - self.stages.insert(insert_idx, new_stage); - self.collapsed_stages.insert(insert_idx, false); - self.backend.add_stage(insert_idx, &self.stages[insert_idx]); - self.backend.persist_chain_state(&self.stages); + // Cap the chain so the engine's stage list never has to grow on + // the RT thread. See `DEFAULT_CHAIN_CAPACITY`. + if self.stages.len() < DEFAULT_CHAIN_CAPACITY { + self.flush_dirty_params(); + let new_stage = StageConfig::from(self.selected_stage_type); + let category = new_stage.category(); + let insert_idx = self.category_end_index(category); + self.stages.insert(insert_idx, new_stage); + self.collapsed_stages.insert(insert_idx, false); + self.backend.add_stage(insert_idx, &self.stages[insert_idx]); + self.backend.persist_chain_state(&self.stages); + } } Message::RemoveStage(idx) => { if idx < self.stages.len() { @@ -506,9 +511,11 @@ impl SharedApp { available_types.first().copied() }; + // Disable "Add Stage" once the chain hits its capacity cap. + let add_msg = (self.stages.len() < DEFAULT_CHAIN_CAPACITY).then_some(Message::AddStage); row![ pick_list(available_types, selected, Message::StageTypeSelected), - button(tr!(add_stage)).on_press(Message::AddStage), + button(tr!(add_stage)).on_press_maybe(add_msg), ] .spacing(SPACING_NORMAL) .align_y(Alignment::Center)