Skip to content

Commit d9b69ee

Browse files
committed
[add] callback-safe transport for audio playback commands.
1 parent be43f87 commit d9b69ee

1 file changed

Lines changed: 300 additions & 1 deletion

File tree

crates/lambda-rs/src/audio/playback.rs

Lines changed: 300 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,17 @@
55
//! This module provides a minimal, backend-agnostic playback facade that
66
//! supports one active `SoundBuffer` at a time.
77
8-
use std::sync::Arc;
8+
use std::{
9+
cell::UnsafeCell,
10+
mem::MaybeUninit,
11+
sync::{
12+
atomic::{
13+
AtomicUsize,
14+
Ordering,
15+
},
16+
Arc,
17+
},
18+
};
919

1020
use crate::audio::{
1121
AudioError,
@@ -15,6 +25,107 @@ use crate::audio::{
1525

1626
const DEFAULT_GAIN_RAMP_FRAMES: usize = 128;
1727

28+
/// A fixed-capacity, single-producer/single-consumer queue.
29+
///
30+
/// The queue is designed for real-time audio callbacks:
31+
/// - `push` and `pop` MUST NOT block.
32+
/// - `pop` MUST NOT allocate.
33+
///
34+
/// # Safety
35+
/// This type is only sound when used as SPSC (exactly one producer thread and
36+
/// one consumer thread).
37+
#[allow(dead_code)]
38+
struct CommandQueue<T, const CAPACITY: usize> {
39+
buffer: [UnsafeCell<MaybeUninit<T>>; CAPACITY],
40+
head: AtomicUsize,
41+
tail: AtomicUsize,
42+
}
43+
44+
unsafe impl<T: Send, const CAPACITY: usize> Send for CommandQueue<T, CAPACITY> {}
45+
unsafe impl<T: Send, const CAPACITY: usize> Sync for CommandQueue<T, CAPACITY> {}
46+
47+
#[allow(dead_code)]
48+
impl<T, const CAPACITY: usize> CommandQueue<T, CAPACITY> {
49+
/// Create a new empty queue.
50+
///
51+
/// # Returns
52+
/// A queue with a fixed capacity.
53+
fn new() -> Self {
54+
assert!(CAPACITY > 0, "command queue capacity must be non-zero");
55+
56+
return Self {
57+
buffer: std::array::from_fn(|_| {
58+
return UnsafeCell::new(MaybeUninit::uninit());
59+
}),
60+
head: AtomicUsize::new(0),
61+
tail: AtomicUsize::new(0),
62+
};
63+
}
64+
65+
/// Attempt to enqueue a value.
66+
///
67+
/// # Arguments
68+
/// - `value`: The value to enqueue.
69+
///
70+
/// # Returns
71+
/// `Ok(())` when the value was enqueued. `Err(value)` when the queue is full.
72+
fn push(&self, value: T) -> Result<(), T> {
73+
let head = self.head.load(Ordering::Acquire);
74+
let tail = self.tail.load(Ordering::Relaxed);
75+
76+
if tail.wrapping_sub(head) >= CAPACITY {
77+
return Err(value);
78+
}
79+
80+
let index = tail % CAPACITY;
81+
let slot = self.buffer[index].get();
82+
unsafe {
83+
(&mut *slot).write(value);
84+
}
85+
86+
self.tail.store(tail.wrapping_add(1), Ordering::Release);
87+
return Ok(());
88+
}
89+
90+
/// Attempt to dequeue a value.
91+
///
92+
/// # Returns
93+
/// `Some(value)` when a value is available, otherwise `None`.
94+
fn pop(&self) -> Option<T> {
95+
let tail = self.tail.load(Ordering::Acquire);
96+
let head = self.head.load(Ordering::Relaxed);
97+
98+
if head == tail {
99+
return None;
100+
}
101+
102+
let index = head % CAPACITY;
103+
let slot = self.buffer[index].get();
104+
let value = unsafe { (&*slot).assume_init_read() };
105+
106+
self.head.store(head.wrapping_add(1), Ordering::Release);
107+
return Some(value);
108+
}
109+
}
110+
111+
impl<T, const CAPACITY: usize> Drop for CommandQueue<T, CAPACITY> {
112+
fn drop(&mut self) {
113+
let tail = self.tail.load(Ordering::Relaxed);
114+
let mut head = self.head.load(Ordering::Relaxed);
115+
116+
while head != tail {
117+
let index = head % CAPACITY;
118+
let slot = self.buffer[index].get();
119+
unsafe {
120+
std::ptr::drop_in_place((&mut *slot).as_mut_ptr());
121+
}
122+
head = head.wrapping_add(1);
123+
}
124+
125+
return;
126+
}
127+
}
128+
18129
/// A linear gain ramp used to de-click transport transitions.
19130
#[derive(Clone, Copy, Debug, PartialEq)]
20131
struct GainRamp {
@@ -304,6 +415,115 @@ impl PlaybackScheduler {
304415
}
305416
}
306417

418+
/// Commands produced by `SoundInstance` transport operations.
419+
#[derive(Debug)]
420+
#[allow(dead_code)]
421+
enum PlaybackCommand {
422+
SetBuffer(Arc<SoundBuffer>),
423+
SetLooping(bool),
424+
Play,
425+
Pause,
426+
Stop,
427+
SetActiveInstanceId(u64),
428+
}
429+
430+
/// A callback-safe controller that drains transport commands and renders audio.
431+
///
432+
/// This type is intended to be owned by the platform audio callback closure.
433+
#[allow(dead_code)]
434+
struct PlaybackController<const COMMAND_CAPACITY: usize> {
435+
command_queue: Arc<CommandQueue<PlaybackCommand, COMMAND_CAPACITY>>,
436+
scheduler: PlaybackScheduler,
437+
active_instance_id: u64,
438+
}
439+
440+
#[allow(dead_code)]
441+
impl<const COMMAND_CAPACITY: usize> PlaybackController<COMMAND_CAPACITY> {
442+
/// Create a controller configured for a fixed output channel count.
443+
///
444+
/// # Arguments
445+
/// - `channels`: Interleaved output channel count.
446+
/// - `command_queue`: Shared producer/consumer command queue.
447+
///
448+
/// # Returns
449+
/// A controller initialized to `Stopped` with no active buffer.
450+
fn new(
451+
channels: usize,
452+
command_queue: Arc<CommandQueue<PlaybackCommand, COMMAND_CAPACITY>>,
453+
) -> Self {
454+
return Self::new_with_ramp_frames(
455+
channels,
456+
DEFAULT_GAIN_RAMP_FRAMES,
457+
command_queue,
458+
);
459+
}
460+
461+
/// Create a controller with an explicit gain ramp length.
462+
///
463+
/// # Arguments
464+
/// - `channels`: Interleaved output channel count.
465+
/// - `ramp_frames`: Gain ramp length in frames.
466+
/// - `command_queue`: Shared producer/consumer command queue.
467+
///
468+
/// # Returns
469+
/// A controller initialized to `Stopped` with no active buffer.
470+
fn new_with_ramp_frames(
471+
channels: usize,
472+
ramp_frames: usize,
473+
command_queue: Arc<CommandQueue<PlaybackCommand, COMMAND_CAPACITY>>,
474+
) -> Self {
475+
return Self {
476+
command_queue,
477+
scheduler: PlaybackScheduler::new_with_ramp_frames(channels, ramp_frames),
478+
active_instance_id: 0,
479+
};
480+
}
481+
482+
/// Drain any pending transport commands.
483+
///
484+
/// # Returns
485+
/// `()` after applying all pending commands.
486+
fn drain_commands(&mut self) {
487+
while let Some(command) = self.command_queue.pop() {
488+
match command {
489+
PlaybackCommand::SetBuffer(buffer) => {
490+
self.scheduler.set_buffer(buffer);
491+
}
492+
PlaybackCommand::SetLooping(looping) => {
493+
self.scheduler.set_looping(looping);
494+
}
495+
PlaybackCommand::Play => {
496+
self.scheduler.play();
497+
}
498+
PlaybackCommand::Pause => {
499+
self.scheduler.pause();
500+
}
501+
PlaybackCommand::Stop => {
502+
self.scheduler.stop();
503+
}
504+
PlaybackCommand::SetActiveInstanceId(instance_id) => {
505+
self.active_instance_id = instance_id;
506+
}
507+
}
508+
}
509+
510+
return;
511+
}
512+
513+
/// Render audio for a callback tick.
514+
///
515+
/// # Arguments
516+
/// - `writer`: Real-time writer for the current callback output buffer.
517+
///
518+
/// # Returns
519+
/// `()` after draining commands and writing the output buffer.
520+
fn render(&mut self, writer: &mut dyn AudioOutputWriter) {
521+
self.drain_commands();
522+
self.scheduler.render(writer);
523+
return;
524+
}
525+
}
526+
307527
/// A queryable playback state for a `SoundInstance`.
308528
///
309529
/// This state is observable from the application thread and is intended to
@@ -525,6 +745,37 @@ impl AudioContext {
525745
mod tests {
526746
use super::*;
527747

748+
/// Command queues MUST preserve FIFO ordering.
749+
#[test]
750+
fn command_queue_preserves_order() {
751+
let queue: CommandQueue<u32, 8> = CommandQueue::new();
752+
753+
queue.push(1).unwrap();
754+
queue.push(2).unwrap();
755+
queue.push(3).unwrap();
756+
757+
assert_eq!(queue.pop(), Some(1));
758+
assert_eq!(queue.pop(), Some(2));
759+
assert_eq!(queue.pop(), Some(3));
760+
assert_eq!(queue.pop(), None);
761+
return;
762+
}
763+
764+
/// Command queues MUST reject pushes when full.
765+
#[test]
766+
fn command_queue_rejects_when_full() {
767+
let queue: CommandQueue<u32, 2> = CommandQueue::new();
768+
769+
assert!(queue.push(10).is_ok());
770+
assert!(queue.push(11).is_ok());
771+
assert!(matches!(queue.push(12), Err(12)));
772+
773+
assert_eq!(queue.pop(), Some(10));
774+
assert_eq!(queue.pop(), Some(11));
775+
assert_eq!(queue.pop(), None);
776+
return;
777+
}
778+
528779
struct TestAudioOutput {
529780
channels: u16,
530781
frames: usize,
@@ -712,4 +963,52 @@ mod tests {
712963
assert!((last - first).abs() <= 1e-6);
713964
return;
714965
}
966+
967+
/// Controllers MUST drain queued commands before rendering audio.
968+
#[test]
969+
fn controller_drains_commands_before_render() {
970+
let command_queue: Arc<CommandQueue<PlaybackCommand, 16>> =
971+
Arc::new(CommandQueue::new());
972+
let buffer = make_test_buffer(vec![0.1, 0.2], 1);
973+
974+
command_queue
975+
.push(PlaybackCommand::SetBuffer(buffer))
976+
.unwrap();
977+
command_queue
978+
.push(PlaybackCommand::SetLooping(true))
979+
.unwrap();
980+
command_queue.push(PlaybackCommand::Play).unwrap();
981+
982+
let mut controller =
983+
PlaybackController::new_with_ramp_frames(1, 0, command_queue);
984+
985+
let mut writer = TestAudioOutput::new(1, 4);
986+
controller.render(&mut writer);
987+
988+
assert!((writer.sample(0, 0) - 0.1).abs() <= 1e-6);
989+
assert!((writer.sample(1, 0) - 0.2).abs() <= 1e-6);
990+
assert!((writer.sample(2, 0) - 0.1).abs() <= 1e-6);
991+
assert!((writer.sample(3, 0) - 0.2).abs() <= 1e-6);
992+
assert_eq!(controller.active_instance_id, 0);
993+
return;
994+
}
995+
996+
/// Controllers MUST update the active instance id when commanded.
997+
#[test]
998+
fn controller_updates_active_instance_id() {
999+
let command_queue: Arc<CommandQueue<PlaybackCommand, 16>> =
1000+
Arc::new(CommandQueue::new());
1001+
1002+
command_queue
1003+
.push(PlaybackCommand::SetActiveInstanceId(42))
1004+
.unwrap();
1005+
1006+
let mut controller =
1007+
PlaybackController::new_with_ramp_frames(1, 0, command_queue);
1008+
1009+
let mut writer = TestAudioOutput::new(1, 1);
1010+
controller.render(&mut writer);
1011+
assert_eq!(controller.active_instance_id, 42);
1012+
return;
1013+
}
7151014
}

0 commit comments

Comments
 (0)