diff --git a/CLAUDE.md b/CLAUDE.md index 9fcd47e..3ab6388 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -27,7 +27,7 @@ scripts/c64cast.sh -u tr:// clip.mp4 tune.sid # TeensyROM+ over auto-de scripts/c64cast.sh -u u64://192.168.2.64 'https://youtu.be/...' # needs the `yt` extra ``` -**Audio is on by default** (`AudioCfg.enabled` defaults True); `--no-audio` mutes. Flag groups (`-h` shows them grouped): `connection`, `quick playback`, `video input`, `audio`, `vision input`, `playlist`, `introspection`, `debug`. +**Audio is on by default** (`AudioCfg.enabled` defaults True); `--no-audio` mutes. On the U64, video audio defaults to the high-fidelity **Ultimate Audio FPGA PCM sampler** (`[audio].backend = "auto"` → sampler when available; see [sampler.py](c64cast/sampler.py)); `backend = "dac"` forces the lo-fi 4-bit `$D418` DAC (the only path on TeensyROM, and the path for mic/webcam audio everywhere). Flag groups (`-h` shows them grouped): `connection`, `quick playback`, `video input`, `audio`, `vision input`, `playlist`, `introspection`, `debug`. Notable: `--config`, `-v` / `-vv` (info / debug logging), `--log-file PATH` (mirror logs to disk for headless runs). Terminal logging uses `rich.logging.RichHandler` (colored + timestamped) when the `logging` extra is installed; falls back to plain stdlib `StreamHandler` otherwise. The DMA password (if the U64 has one set) is supplied via `C64CAST_DMA_PASSWORD` env var or `[ultimate64] dma_password` in the config — **no CLI flag**, so secrets don't leak into shell history or `ps` output. The env var takes precedence when both are set. @@ -69,6 +69,9 @@ c64cast/ │ delta uploads; REST for read_memory, reset, run_prg │ (BASIC clear loop + SID-player SYS stub), probe ├── audio.py AudioStreamer: NMI + SID DAC + ring buffer + sample tap +├── sampler.py UltimateAudioSampler: U64 "Ultimate Audio" FPGA PCM +│ sampler ($DF20) — hi-fi video audio from a streaming +│ REU ring, zero SID/$D418/NMI/CPU (off the C64 bus) ├── video.py WebcamSource (shared cv2 camera broker) + AVFileSource (PyAV) ├── vision.py VisionController: webcam hand-gestures → pause/skip/cycle │ (MediaPipe HandLandmarker; sibling to keyboard.py) @@ -195,6 +198,18 @@ Sample encoding can optionally apply **TPDF dither** (±1 LSB triangular, contro **`position_seconds()`** is the audio-master clock: `(pushed - queued) / sample_rate`. The C64-side ring buffer adds ~1 s of constant latency past this, harmless for relative sync. +### `sampler.py` — UltimateAudioSampler (U64 "Ultimate Audio" FPGA PCM) + +The U64 firmware exposes a 7-channel **FPGA PCM sampler** at `$DF20-$DFFF` ("Ultimate Audio", Gideon's register API v0.2). It plays 8/16-bit PCM up to 48 kHz **straight out of REU SDRAM with zero SID / `$D418` / NMI / CPU / turbo involvement** — so it's immune to the bus-halt / badline problems the 4-bit DAC fights, and is **vastly higher fidelity**. It's the **default video-audio backend on the U64** ([audio].backend = "auto"); the 4-bit `$D418` DAC stays for TeensyROM (no sampler) and as an opt-in lo-fi path. Mic/webcam audio always uses the DAC. + +`sampler.py` has two halves. **Pure register helpers** (unit-testable): `divider_for_rate(rate)` = `round(6_250_000 / rate)`, `control_byte(...)` (gate b0 / repeat b1 / irq b2 / mode b4-5 [`00`=8-bit, `01`=16-bit LE]), `pack_pcm(int16, bits)` (signed 8-bit, or int16-LE), `channel_register_writes(...)` (the big-endian register byte layout: start `$01000000`+REU offset, length, rate divider, repeat A/B as **byte positions in the sample**), `program_channel`/`gate_off`. **`UltimateAudioSampler`** is the scene-facing object mirroring the slice of `AudioStreamer` scenes call (`sample_rate`, `position_seconds`, `push_samples`, `get_recent_samples`, `stop`, plus no-op `set_pre_emphasis`/`mark_eof`, `is_sampler=True`). + +**Streaming REU ring**: program channel 0 as an A↔B loop over `[ring_base, ring_base+ring_size)` (base `$200000` — above the mic ring `$110000`, below video staging `$E00000`, so it coexists with REU-staged bitmap video; default 1 MiB). `start()` prefills the ring with NEUTRAL (silence) + a prebuffer of real PCM, then gates the loop on and records `gate_time`. A writer thread REUWRITEs decoded PCM **ahead of a wall-clock-computed read head** (`read = (monotonic - gate_time) * actual_rate * bps`, mod ring_size) — wrapping at the boundary, NEUTRAL-padding only past a low watermark (`_lead_panic`, a genuine producer underrun — not whenever the queue is briefly empty). **No servo/governor/NMI**: the FPGA sample clock is crystal-exact, so the read head is *computed*, never read back, and the loop is open-loop + drift-free. `sample_rate` is set to the FPGA's exact `REF/divider` (a <0.5% constant pitch offset from nominal, so A/V is drift-free; `AVFileSource` resamples to it). `position_seconds()` = `clamp(monotonic - gate_time, 0, total)` — same contract as the REU-pump branch, so `VideoScene._clock_s` works unchanged. HW-de-risked on .64 (gapless / correct pitch / no drift over 3 min) before integration. + +`[audio].backend` ("auto"|"dac"|"sampler") is resolved per video scene in `config.build_scene` via `resolve_audio_backend(setting, *, supports_sampler, sampler_available)` (mirrors `resolve_use_reu_staged`): "auto" → sampler iff both true, else dac; explicit "sampler" warns + falls back to dac when unavailable. The sampler is constructed as the scene's audio object (a `VideoScene` drives it polymorphically — `setup()` branches on `isinstance(audio, UltimateAudioSampler)`). `sampler_sample_rate` (default 44100) + `sampler_bits` (default 16) are validated by `config.validate_sampler_cfg`. **fps:** because the sampler is off the C64 bus, sampler-audio bitmap video does **not** get the 4-bit DAC's 20 fps cap (`_frame_push_default_fps` is fed `has_digitized_audio=False`); it defaults to the muted half-rate (30/25) pending an HW fps A/B that may raise it. + +**Provisioning** (`doctor.provision_sampler`/`restore_sampler`, gated on `profile.supports_sampler` + not `--skip-probe` + `_wants_sampler`): enables `Map Ultimate Audio $DF20-DFFF` if disabled and unmutes `Vol Sampler L`/`R` (to `" 0 dB"`) if OFF — live + volatile, restored at teardown (composite-keyed `SystemStack.sampler_restore`). Because the ring lives in REU SDRAM, `_wants_sampler` also pulls the REU into `_wants_reu`, so `provision_reu` enables the REU (16 MB) for a sampler run — which makes `"auto"` video resolve to the tear-free REU bank-swap path (the sampler installs no `$0314` IRQ, so REU-staged video and the sampler coexist with no IRQ contention). `doctor.sampler_is_available(api)` (map enabled + a channel audible) feeds `cli._resolve_sampler_available`, and `_probe_sampler_status` reports the state in `--doctor`. + ### `video.py` — WebcamSource (shared broker) + AVFileSource (PyAV) **`WebcamSource`** is an always-on shared camera broker. A single `cv2.VideoCapture` is single-consumer (every `.read()` consumes the next device frame; concurrent reads from two threads aren't safe), so one background grab thread owns the capture, continuously reads the newest frame, and `read()` hands out an independent **copy** of the latest frame. That lets the webcam scene (when active) and the always-on vision controller (`vision.py`) share **one** physical camera with no contention — and keeps the live-webcam path low-latency (always the freshest frame, stale ones overwritten). `WebcamScene._read_frame()` is unchanged — it still calls `source.read()`. The camera is opened once per stack in `cli.py` when `needs_webcam or cfg.vision.enabled`, stored on `SystemStack.source`, released at teardown. diff --git a/c64cast.schema.json b/c64cast.schema.json index 630c62b..aa165de 100644 --- a/c64cast.schema.json +++ b/c64cast.schema.json @@ -153,9 +153,29 @@ }, "sample_rate": { "type": "integer", - "description": "Audio sample rate in Hz fed to the SID DAC. Default 10500 lifts the Nyquist to ~5.25 kHz so fricatives/sibilants survive (8000 lost them); HW-verified safe on NTSC + PAL with no NMI handler overrun. NTSC can go to ~11025; keep PAL <= ~10500. Rates that overrun the handler are rejected at load (see c64.nmi_rate_safety).", + "description": "Audio sample rate in Hz fed to the SID DAC. Default 10500 lifts the Nyquist to ~5.25 kHz so fricatives/sibilants survive (8000 lost them); HW-verified safe on NTSC + PAL with no NMI handler overrun. NTSC can go to ~11025; keep PAL <= ~10500. Rates that overrun the handler are rejected at load (see c64.nmi_rate_safety). Sampler-backend playback uses [audio].sampler_sample_rate instead.", "default": 10500 }, + "backend": { + "type": "string", + "description": "Video-audio backend: 'auto' (sampler on a capable U64, else DAC), 'dac' (4-bit $D418 NMI DAC, all backends, lo-fi), or 'sampler' (U64 'Ultimate Audio' FPGA PCM, high fidelity, off the C64 bus).", + "enum": [ + "auto", + "dac", + "sampler" + ], + "default": "auto" + }, + "sampler_sample_rate": { + "type": "integer", + "description": "Sample rate (Hz) for the Ultimate Audio sampler backend. 1000..48000; default 44100 (CD quality). The FPGA plays at the nearest divider of its 6.25 MHz reference (a <0.5% constant pitch offset, drift-free).", + "default": 44100 + }, + "sampler_bits": { + "type": "integer", + "description": "PCM bit depth for the Ultimate Audio sampler backend: 8 (signed) or 16 (signed little-endian). Default 16.", + "default": 16 + }, "mic_sensitivity": { "type": "number", "description": "Microphone input gain multiplier.", diff --git a/c64cast/backend.py b/c64cast/backend.py index 93e7295..70ebc5b 100644 --- a/c64cast/backend.py +++ b/c64cast/backend.py @@ -108,6 +108,7 @@ class HardwareProfile: supports_run_prg: bool = True # launch a PRG (clear-loop, SID player) supports_run_crt: bool = True # launch a CRT (cartridge) supports_reu: bool = True # REU writes (use_reu_pump / use_reu_staged) + supports_sampler: bool = False # "Ultimate Audio" FPGA PCM sampler ($DF20) reu_bus_clean: bool = False # REU writes don't perturb the C64 bus/SID writes_are_acked: bool = False # each write returns an ack (=> flush ~free) kernal_irq_intact: bool = True # the kernal IRQ chain runs at bring-up @@ -140,6 +141,7 @@ class HardwareProfile: supports_run_prg=True, supports_run_crt=True, supports_reu=True, + supports_sampler=True, # "Ultimate Audio" FPGA PCM sampler (gated by probe) reu_bus_clean=True, # U64 REUWRITE is an ARM-side memcpy; no bus halt writes_are_acked=False, # socket DMAWRITE is fire-and-forget kernal_irq_intact=True, @@ -169,6 +171,7 @@ class HardwareProfile: supports_run_prg=True, # PostFile + LaunchFile supports_run_crt=True, # RemoteLaunch handles CRT launch supports_reu=False, # no REUWRITE opcode + supports_sampler=False, # no FPGA PCM sampler (Ultimate-only feature) reu_bus_clean=False, writes_are_acked=True, # every write returns Ack/Fail -> flush ~free kernal_irq_intact=True, diff --git a/c64cast/cli.py b/c64cast/cli.py index f22a372..b4b8a92 100644 --- a/c64cast/cli.py +++ b/c64cast/cli.py @@ -515,6 +515,54 @@ def _resolve_reu_available(cfg: cfgmod.Config, api: C64Backend) -> bool: return False +def _resolve_sampler_available(cfg: cfgmod.Config, api: C64Backend) -> bool: + """Decide whether the U64 "Ultimate Audio" FPGA PCM sampler should back + video-scene audio for this system, by asking the U64 whether it's exposed + + routed (mirrors `_resolve_reu_available`). + + Returns False (→ the 4-bit DAC) unless [audio].backend is auto/sampler AND + the backend has the sampler (supports_sampler) AND a probe is allowed AND + the firmware reports it available. `provision_sampler` runs BEFORE this in + build_stack, so a box this run just enabled reads available. Any uncertainty + (forced dac, --skip-probe, no-sampler backend, failed query, mapped-off) + degrades to the DAC so audio is never silently silent.""" + if cfg.audio.backend == "dac": + return False # forced DAC ignores the probe entirely + if not api.profile.supports_sampler: + return False # backend (e.g. TeensyROM) has no FPGA sampler + if cfg.debug.skip_probe: + log.info( + "[audio].backend = %s, but --skip-probe is set — using the 4-bit " + "DAC for video audio (sampler undetected).", + cfg.audio.backend, + ) + return False + from . import doctor + + avail = doctor.sampler_is_available(api) + if avail: + log.info( + "[audio].backend = %s: Ultimate Audio sampler available — " + "high-fidelity video audio (FPGA PCM, off the C64 bus).", + cfg.audio.backend, + ) + return True + if avail is None: + log.warning( + "[audio].backend = %s: could not read the Ultimate Audio sampler " + "state — using the 4-bit DAC for video audio.", + cfg.audio.backend, + ) + else: + log.info( + "[audio].backend = %s: Ultimate Audio sampler not available " + "(map disabled / mixer muted / firmware lacks it) — using the " + "4-bit DAC for video audio.", + cfg.audio.backend, + ) + return False + + def _coerce_reu_for_backend(cfg: cfgmod.Config, api: C64Backend) -> None: """Disable the REU-staged audio/video opt-ins when the backend has no REU. @@ -627,6 +675,10 @@ def build_stack( from . import doctor as _doctor reu_restore = _doctor.provision_reu(api, cfg) + # Auto-enable the Ultimate Audio sampler (map $DF20 + unmute Sampler mixer, + # live + volatile) when a video scene will use it. Runs BEFORE + # _resolve_sampler_available so the probe sees it on; restored at teardown. + sampler_restore = _doctor.provision_sampler(api, cfg) audio = ( AudioStreamer( @@ -647,8 +699,15 @@ def build_stack( ) reu_available = _resolve_reu_available(cfg, api) + sampler_available = _resolve_sampler_available(cfg, api) playlist_scenes = cfgmod.scenes_from_config( - cfg, api, audio, source, is_ensemble=is_ensemble, reu_available=reu_available + cfg, + api, + audio, + source, + is_ensemble=is_ensemble, + reu_available=reu_available, + sampler_available=sampler_available, ) # The system video rate (60 NTSC / 50 PAL) is resolved into the @@ -781,6 +840,8 @@ def build_stack( vision_controller=vision_controller, reu_available=reu_available, reu_restore=reu_restore, + sampler_available=sampler_available, + sampler_restore=sampler_restore, framebuffer=framebuffer, preview_window=preview_window, recorder=recorder, @@ -806,6 +867,8 @@ def teardown_stack(stack: SystemStack) -> None: # Restore any REU config we auto-provisioned, while the REST session is # still open (no-op when nothing was changed; volatile regardless). ("REU restore", lambda: _doctor.restore_reu(stack.api, stack.reu_restore)), + # Same for the Ultimate Audio sampler map/mixer auto-provisioning. + ("sampler restore", lambda: _doctor.restore_sampler(stack.api, stack.sampler_restore)), ("U64 reset", stack.api.reset), ("API close", stack.api.close), ("camera release", lambda: stack.source.release() if stack.source else None), @@ -1030,6 +1093,7 @@ def main(argv=None) -> int: # target system (broken/pitch-dropped audio) before the playlist runs. try: cfgmod.validate_nmi_sample_rate(cfg) + cfgmod.validate_sampler_cfg(cfg) except cfgmod.ConfigError as e: log.error("%s", e) return 5 @@ -1099,6 +1163,7 @@ def main(argv=None) -> int: _st.source, is_ensemble=True, reu_available=_st.reu_available, + sampler_available=_st.sampler_available, ) ) @@ -1132,6 +1197,7 @@ def _on_sighup(_signum, _frame): st.source, is_ensemble=loaded.is_ensemble, reu_available=st.reu_available, + sampler_available=st.sampler_available, ) new_factory = interstitial_factory(st.api, new_cfg.interstitial) st.playlist.request_reload(new_scenes, new_factory) @@ -1165,6 +1231,7 @@ def _on_sighup(_signum, _frame): st.source, is_ensemble=loaded.is_ensemble, reu_available=st.reu_available, + sampler_available=st.sampler_available, ) ) for st, p in zip(stacks, loaded.paths, strict=True) diff --git a/c64cast/config.py b/c64cast/config.py index 88c0a99..503f501 100644 --- a/c64cast/config.py +++ b/c64cast/config.py @@ -31,6 +31,7 @@ from .audio import AudioStreamer from .backend import C64Backend from .modes import DisplayMode + from .sampler import UltimateAudioSampler from .scenes import Scene from .songlengths import LengthsDB from .video import WebcamSource @@ -103,6 +104,13 @@ # (mic when audio is enabled, else silence). A drift test pins this list. _AUDIO_SOURCE_CHOICES = ("none", "mic", "sid") +# Video-audio backend selector ([audio].backend). "dac" = the 4-bit $D418 NMI +# DAC (every backend; lo-fi, bus-coupled). "sampler" = the U64 "Ultimate Audio" +# FPGA PCM sampler (high fidelity, off the C64 bus; U64 only — see sampler.py). +# "auto" = sampler on a sampler-capable U64 with the feature available, else +# dac. A drift test pins this list. +_AUDIO_BACKEND_CHOICES = ("auto", "dac", "sampler") + # The scene types (mirrors validate_scene_cfg). Used by the introspection # layer's `applies_to` filtering; declared here so SceneCfg metadata can name # them symbolically. @@ -338,7 +346,41 @@ class AudioCfg: "the Nyquist to ~5.25 kHz so fricatives/sibilants survive (8000 lost " "them); HW-verified safe on NTSC + PAL with no NMI handler overrun. NTSC " "can go to ~11025; keep PAL <= ~10500. Rates that overrun the handler are " - "rejected at load (see c64.nmi_rate_safety)." + "rejected at load (see c64.nmi_rate_safety). Sampler-backend playback uses " + "[audio].sampler_sample_rate instead." + }, + ) + # Video-audio backend. The sampler (U64 "Ultimate Audio" FPGA PCM, see + # sampler.py) plays straight from REU with zero SID/$D418/NMI/CPU, so it is + # vastly higher fidelity than the 4-bit DAC and immune to the bus-halt + # problems the DAC fights. "auto" picks it on a sampler-capable U64 when the + # feature is available (else falls back to the DAC); "dac" forces the lo-fi + # 4-bit DAC (the only path on TeensyROM); "sampler" forces the sampler and + # warns+falls-back to the DAC if it isn't available. Resolved per video scene + # in build_scene via resolve_audio_backend; mic/webcam audio stays on the DAC. + backend: str = field( + default="auto", + metadata={ + "help": "Video-audio backend: 'auto' (sampler on a capable U64, else " + "DAC), 'dac' (4-bit $D418 NMI DAC, all backends, lo-fi), or 'sampler' " + "(U64 'Ultimate Audio' FPGA PCM, high fidelity, off the C64 bus).", + "choices": _AUDIO_BACKEND_CHOICES, + }, + ) + sampler_sample_rate: int = field( + default=44100, + metadata={ + "help": "Sample rate (Hz) for the Ultimate Audio sampler backend. " + "1000..48000; default 44100 (CD quality). The FPGA plays at the nearest " + "divider of its 6.25 MHz reference (a <0.5% constant pitch offset, " + "drift-free)." + }, + ) + sampler_bits: int = field( + default=16, + metadata={ + "help": "PCM bit depth for the Ultimate Audio sampler backend: 8 (signed) " + "or 16 (signed little-endian). Default 16." }, ) mic_sensitivity: float = field( @@ -2075,6 +2117,40 @@ def resolve_double_buffer( return bool(setting) +def resolve_audio_backend( + setting: str, + *, + supports_sampler: bool, + sampler_available: bool, +) -> str: + """Resolve the [audio].backend selector to a concrete ``"sampler"`` or + ``"dac"`` for video-scene audio (mirrors resolve_use_reu_staged's pattern). + + The sampler is the U64 "Ultimate Audio" FPGA PCM path (sampler.py) — high + fidelity, entirely off the C64 bus. ``supports_sampler`` is the backend + capability (True on the Ultimate, False on TeensyROM); ``sampler_available`` + is the startup probe's verdict that the firmware exposes + routes it. + + * ``"auto"`` → ``"sampler"`` iff both are true, else ``"dac"``. + * ``"sampler"`` → ``"sampler"`` iff both are true; otherwise logs a + warning and degrades to ``"dac"`` (never silently silent). + * ``"dac"`` → always ``"dac"`` (the lo-fi 4-bit $D418 path).""" + if setting == "dac": + return "dac" + if supports_sampler and sampler_available: + return "sampler" + if setting == "sampler": + log.warning( + "[audio].backend = 'sampler' but the Ultimate Audio sampler is " + "unavailable on this system (%s) — falling back to the 4-bit DAC. " + "Enable 'Map Ultimate Audio $DF20-DFFF' (F2 -> C64 and Cartridge " + "Settings) and set Vol Sampler L/R audible (F2 -> Audio Mixer), or " + "set [audio].backend = 'dac' to silence this warning.", + "no sampler support" if not supports_sampler else "feature not enabled", + ) + return "dac" + + def _build_display_mode( name: str, palette_mode: str = "percell", @@ -2602,6 +2678,24 @@ def validate_nmi_sample_rate(cfg: Config) -> None: log.warning("[audio].sample_rate: %s", message) +def validate_sampler_cfg(cfg: Config) -> None: + """Guard the Ultimate Audio sampler settings ([audio].sampler_bits / + sampler_sample_rate). Raises ConfigError on an unusable value. No-op when + audio is disabled; the rate is only *used* when [audio].backend resolves to + the sampler, but validating unconditionally keeps a typo from lurking until + the backend is selected. The ring is length-independent (streaming), so + there is no per-clip overflow check — see sampler.py.""" + if not cfg.audio.enabled: + return + if cfg.audio.sampler_bits not in (8, 16): + raise ConfigError(f"[audio].sampler_bits must be 8 or 16, got {cfg.audio.sampler_bits}") + if not 1000 <= cfg.audio.sampler_sample_rate <= 48000: + raise ConfigError( + "[audio].sampler_sample_rate must be 1000..48000 Hz, got " + f"{cfg.audio.sampler_sample_rate}" + ) + + def validate_scene_cfg(s: SceneCfg, cfg: Config, *, audio_enabled: bool) -> None: """Pre-construction validation for a SceneCfg. @@ -2711,6 +2805,7 @@ def build_scene( *, is_ensemble: bool = False, reu_available: bool = False, + sampler_available: bool = False, ) -> Scene: """Build a single Scene from a SceneCfg. @@ -2731,7 +2826,11 @@ def build_scene( `reu_available` is the startup probe's verdict on whether the U64's REU is enabled; it resolves the [video].use_reu_staged "auto" setting (see resolve_use_reu_staged). Callers that build scenes without a live probe - (validation, doctor) leave it False so auto degrades to host-DMA.""" + (validation, doctor) leave it False so auto degrades to host-DMA. + + `sampler_available` is the probe's verdict on whether the U64's Ultimate + Audio sampler is exposed + routed; it resolves [audio].backend for video + scenes (see resolve_audio_backend). False without a probe → DAC.""" from .scenes import BlankScene, SourceScene, VideoScene, WebcamScene validate_scene_cfg(s, cfg, audio_enabled=audio is not None) @@ -2805,8 +2904,33 @@ def build_scene( backend_supports_reu=backend_supports_reu, ) # Default: audio ON for videos (it's part of the file). - # The user can mute one with `audio = false`. - scene_audio = None if s.audio is False else audio + # The user can mute one with `audio = false`. Distinct name from the + # other branches' `scene_audio` because this one may hold the sampler. + video_audio: AudioStreamer | UltimateAudioSampler | None = ( + None if s.audio is False else audio + ) + # Resolve the video-audio backend. On a sampler-capable U64 with the + # Ultimate Audio sampler available, swap the shared 4-bit DAC streamer + # for a per-scene UltimateAudioSampler (high fidelity, off the C64 bus — + # see sampler.py). It satisfies the same scene-facing audio contract + # (sample_rate / position_seconds / push_samples / stop), so VideoScene + # drives it polymorphically; mic/webcam scenes keep the shared DAC. + using_sampler = False + if video_audio is not None: + backend = resolve_audio_backend( + cfg.audio.backend, + supports_sampler=api.profile.supports_sampler, + sampler_available=sampler_available, + ) + if backend == "sampler": + from .sampler import UltimateAudioSampler + + video_audio = UltimateAudioSampler( + api, + sample_rate=cfg.audio.sampler_sample_rate, + bits=cfg.audio.sampler_bits, + ) + using_sampler = True assert s.file is not None # narrowed by validate_scene_cfg # A single media URL (YouTube et al.) is resolved here — the ONE # resolution path shared with quick playback — so config-driven videos @@ -2827,7 +2951,7 @@ def build_scene( video_name = title scene = VideoScene( api, - scene_audio, + video_audio, mode, file_spec, prepend_alignment_marker=(cfg.audio.source_alignment_marker and cfg.audio.use_reu_pump), @@ -2837,7 +2961,14 @@ def build_scene( if video_name: scene.name = video_name if s.target_fps is None: - fps = _frame_push_default_fps(mode, scene_audio is not None, cfg.ultimate64.system) + # The sampler plays entirely off the C64 bus, so it does NOT impose + # the 4-bit DAC's bitmap fps cap (which exists only because the DAC's + # NMI + ring DMAWRITEs compete with frame uploads for the bus). Treat + # sampler audio as non-digitized so bitmap video gets the muted + # half-rate default (30/25) instead of 20 fps. (An HW fps A/B may + # raise this further — see reference_ultimate_audio_sampler.) + has_digitized_audio = video_audio is not None and not using_sampler + fps = _frame_push_default_fps(mode, has_digitized_audio, cfg.ultimate64.system) if fps is not None: scene.target_fps = fps elif s.type == "waveform": @@ -3055,6 +3186,7 @@ def scenes_from_config( *, is_ensemble: bool = False, reu_available: bool = False, + sampler_available: bool = False, ) -> list[Scene]: """Build the playlist scene list from cfg.scenes. @@ -3071,7 +3203,10 @@ def scenes_from_config( `build_scene` for the rationale. `reu_available` propagates to `build_scene` to resolve the - [video].use_reu_staged "auto" setting (see resolve_use_reu_staged).""" + [video].use_reu_staged "auto" setting (see resolve_use_reu_staged). + + `sampler_available` propagates to `build_scene` to resolve the + [audio].backend selector for video scenes (see resolve_audio_backend).""" from .scenes import VideoScene, WebcamScene from .video import _ensure_pyav @@ -3085,7 +3220,14 @@ def scenes_from_config( base: list[Scene] = [ build_scene( - s, cfg, api, audio, source, is_ensemble=is_ensemble, reu_available=reu_available + s, + cfg, + api, + audio, + source, + is_ensemble=is_ensemble, + reu_available=reu_available, + sampler_available=sampler_available, ) for s in cfg.scenes if not s.follower_only diff --git a/c64cast/doctor.py b/c64cast/doctor.py index 4627376..8a40930 100644 --- a/c64cast/doctor.py +++ b/c64cast/doctor.py @@ -428,6 +428,9 @@ def _probe_connectivity(loaded: LoadResult) -> list[Diagnostic]: # Catches the U2+ "emulated SID disabled" case where every # tune is silent while video + the oscilloscope still work. out.extend(_probe_sid_status(name, cfg, api)) + # Probe the Ultimate Audio sampler state when the config will + # use it for video audio (backend auto/sampler + video scene). + out.extend(_probe_sampler_status(name, cfg, api)) finally: api.close() return out @@ -469,6 +472,15 @@ def _wants_reu(cfg: object) -> tuple[bool, list[str]]: # `is True` excludes both the "auto" string and any other truthy value. if video is not None and getattr(video, "use_reu_staged", False) is True: reasons.append("[video].use_reu_staged = true") + # The Ultimate Audio sampler streams its PCM ring out of REU SDRAM, so a run + # that will use it needs the REU enabled + sized. Provisioning it also makes + # "auto" video resolve to the tear-free REU bank-swap path — and since the + # sampler runs off the C64 bus with no $0314 IRQ, REU-staged video and the + # sampler coexist cleanly (no NMI/IRQ contention). Forward ref to + # _wants_sampler (both are module-level; resolved at call time). + wants_samp, _ = _wants_sampler(cfg) + if wants_samp: + reasons.append("[audio].backend sampler (REU-backed PCM ring)") return bool(reasons), reasons @@ -627,6 +639,173 @@ def reu_is_enabled(api: object) -> bool | None: return section.get(_REU_ENABLED_FIELD) == "Enabled" +# ---- Ultimate Audio FPGA PCM sampler ($DF20-$DFFF) ---------------------- +# The $DF20 I/O map lives in "C64 and Cartridge Settings"; the stereo mixer +# routing/level in "Audio Mixer". The presence of these config keys is how we +# detect that the firmware exposes the sampler at all (sampler.py). +_SAMPLER_MAP_CATEGORY = _REU_CONFIG_CATEGORY # "C64 and Cartridge Settings" +_SAMPLER_MAP_FIELD = "Map Ultimate Audio $DF20-DFFF" +_SAMPLER_MIXER_CATEGORY = "Audio Mixer" +_SAMPLER_VOL_FIELDS = ("Vol Sampler L", "Vol Sampler R") +# The mixer volume enum's audible "0 dB" label. The firmware's volumes[] table +# (u64_config.cc) stores it with a LEADING SPACE (" 0 dB", index 24); the REST +# GET returns it verbatim and the PUT expects the same label, so match it. +_SAMPLER_VOL_AUDIBLE = " 0 dB" +_SAMPLER_VOL_OFF = "OFF" +# Composite restore-key separator: provision_sampler spans two config +# categories (map vs mixer), so the restore dict keys are "category\x1ffield". +_RESTORE_SEP = "\x1f" + + +def read_sampler_config( + api: object, +) -> tuple[bool | None, bool | None, dict[str, str]]: + """Read the U64's Ultimate Audio sampler state over REST. + + Returns ``(present, map_enabled, volumes)``: + * ``present`` — True if the firmware exposes the sampler config keys (it + has the feature), False if absent, None if the REST query failed. + * ``map_enabled`` — the $DF20 I/O-map enable (None when not present). + * ``volumes`` — current ``{field: value}`` for the Sampler mixer channels + (for restore). Reuses `_fetch_config_section` so it tracks firmware + response-shape variants identically to the REU/SID probes.""" + cart, _d1, err1 = _fetch_config_section( + api, _SAMPLER_MAP_CATEGORY, field_hint=_SAMPLER_MAP_FIELD + ) + mixer, _d2, err2 = _fetch_config_section( + api, _SAMPLER_MIXER_CATEGORY, field_hint=_SAMPLER_VOL_FIELDS[0] + ) + if err1 is not None or err2 is not None: + return None, None, {} + map_raw = cart.get(_SAMPLER_MAP_FIELD) + present = (map_raw is not None) and all(f in mixer for f in _SAMPLER_VOL_FIELDS) + if not present: + return False, None, {} + volumes: dict[str, str] = {} + for field in _SAMPLER_VOL_FIELDS: + v = mixer.get(field) + if isinstance(v, str): + volumes[field] = v + return True, (map_raw == "Enabled"), volumes + + +def sampler_is_available(api: object) -> bool | None: + """True iff the firmware exposes the Ultimate Audio sampler AND it is + currently usable (the $DF20 I/O map is enabled and at least one Sampler + mixer channel is not OFF). None when the REST query failed; False when the + feature is absent / mapped-off / muted. + + Used by `cli._resolve_sampler_available` to resolve [audio].backend — None + or False degrades to the 4-bit DAC. Run AFTER `provision_sampler` so a box + this run just enabled reads as available.""" + present, map_enabled, volumes = read_sampler_config(api) + if present is None: + return None + if not present: + return False + audible = any(v != _SAMPLER_VOL_OFF for v in volumes.values()) + return bool(map_enabled) and audible + + +def _wants_sampler(cfg: object) -> tuple[bool, list[str]]: + """Return (wants_sampler, reasons). The run wants the sampler when audio is + enabled, [audio].backend is auto/sampler (not the forced DAC), and there's a + video scene to play through it (the only scene type wired to the sampler).""" + reasons: list[str] = [] + # Duck-type to avoid a circular doctor<->config import (see _wants_reu). + audio = getattr(cfg, "audio", None) + if audio is None or not getattr(audio, "enabled", False): + return False, reasons + backend = getattr(audio, "backend", "auto") + if backend not in ("auto", "sampler"): + return False, reasons + scenes = getattr(cfg, "scenes", None) or [] + if any(getattr(s, "type", None) == "video" for s in scenes): + reasons.append(f"[audio].backend = {backend!r} + video scene(s)") + return bool(reasons), reasons + + +def provision_sampler(api: object, cfg: object) -> dict[str, str] | None: + """Auto-enable the Ultimate Audio sampler for a run that will use it — + LIVE + VOLATILE (mirrors `provision_reu`). Enables the $DF20 I/O map if off + and unmutes the Sampler mixer channels if OFF, capturing the originals for + `restore_sampler` at teardown. Returns the restore dict (composite keys + ``"category\\x1ffield" -> original``) or None when nothing was changed. + + Gated on ``profile.supports_sampler`` + not ``--skip-probe`` + `_wants_sampler`. + The change is NOT saved to flash, so it reverts on power-cycle even if the + restore is missed. Best-effort: a REST failure logs and returns what changed + so far (so teardown still restores it).""" + profile = getattr(api, "profile", None) + if profile is None or not getattr(profile, "supports_sampler", False): + return None + if getattr(getattr(cfg, "debug", None), "skip_probe", False): + return None + wants, reasons = _wants_sampler(cfg) + if not wants: + return None + + import requests + + present, map_enabled, volumes = read_sampler_config(api) + if present is None: + log.warning( + "sampler: config wants the Ultimate Audio sampler (%s) but its state " + "could not be read — leaving it unchanged.", + ", ".join(reasons), + ) + return None + if not present: + # Firmware doesn't expose the sampler; resolve falls back to the DAC. + return None + + restore: dict[str, str] = {} + if not map_enabled: + try: + api.put_config_item(_SAMPLER_MAP_CATEGORY, _SAMPLER_MAP_FIELD, "Enabled") # type: ignore[attr-defined] + except requests.RequestException as e: + log.warning("sampler: could not enable %s over REST: %s", _SAMPLER_MAP_FIELD, e) + return restore or None + restore[f"{_SAMPLER_MAP_CATEGORY}{_RESTORE_SEP}{_SAMPLER_MAP_FIELD}"] = "Disabled" + + for fieldname, cur in volumes.items(): + if cur != _SAMPLER_VOL_OFF: + continue + try: + api.put_config_item(_SAMPLER_MIXER_CATEGORY, fieldname, _SAMPLER_VOL_AUDIBLE) # type: ignore[attr-defined] + except requests.RequestException as e: + log.warning("sampler: could not unmute %s: %s", fieldname, e) + else: + restore[f"{_SAMPLER_MIXER_CATEGORY}{_RESTORE_SEP}{fieldname}"] = cur + + if restore: + log.info( + "sampler: Ultimate Audio enabled for this run (%s) — live, volatile " + "(reverts on power-cycle), restored at teardown.", + ", ".join(reasons), + ) + return restore or None + + +def restore_sampler(api: object, restore: dict[str, str] | None) -> None: + """Put the sampler config fields changed by `provision_sampler` back to + their originals at teardown. No-op when nothing was provisioned. Best-effort + — a failed restore just logs (the change was volatile anyway).""" + if not restore: + return + + import requests + + for key, value in restore.items(): + category, _, fieldname = key.partition(_RESTORE_SEP) + try: + api.put_config_item(category, fieldname, value) # type: ignore[attr-defined] + except requests.RequestException as e: + log.warning("sampler: could not restore %s = %s: %s", fieldname, value, e) + else: + log.info("sampler: restored %s = %s", fieldname, value) + + # The Ultimate's emulated-SID enable lives here. Both U64 and U2+ expose it. _AUDIO_CONFIG_CATEGORY = "Audio Output Settings" _SID_LEFT_FIELD = "SID Left" @@ -877,6 +1056,105 @@ def _probe_reu_status(name: str, cfg: object, api: object) -> list[Diagnostic]: ] +def _probe_sampler_status(name: str, cfg: object, api: object) -> list[Diagnostic]: + """If the config will use the Ultimate Audio sampler for video audio, check + the U64's sampler state via REST. Returns an empty list when not wanted. + Emits: + * ok — sampler mapped + audible (high-fidelity path ready), OR mapped + off / muted but the run will auto-enable it live, OR backend is + 'auto' on hardware without the feature (falls back to the DAC) + * warn — REST query failed, or an explicit 'sampler' on a no-sampler backend + * error — explicit 'sampler' but the U64 firmware lacks the feature + """ + wants, reasons = _wants_sampler(cfg) + if not wants: + return [] + + subject = f"{name} (Ultimate Audio sampler)" + reason_str = ", ".join(reasons) + backend = getattr(getattr(cfg, "audio", None), "backend", "auto") + supports = bool(getattr(getattr(api, "profile", None), "supports_sampler", False)) + + if not supports: + # A non-sampler backend (TeensyROM): 'auto' silently uses the DAC; an + # explicit 'sampler' can't be honored. + if backend == "sampler": + return [ + Diagnostic( + level="warn", + category="connectivity", + subject=subject, + message="[audio].backend = 'sampler' but this backend has no " + "FPGA sampler — video audio uses the 4-bit DAC.", + hint="Set [audio].backend = 'dac' or 'auto' for this backend.", + ) + ] + return [] + + present, map_enabled, volumes = read_sampler_config(api) + if present is None: + return [ + Diagnostic( + level="warn", + category="connectivity", + subject=subject, + message="REST query for the Ultimate Audio sampler state failed.", + hint=f"Config will use the sampler ({reason_str}). If video audio is " + "silent, check F2 -> C64 and Cartridge Settings -> Map Ultimate " + "Audio $DF20-DFFF, and F2 -> Audio Mixer -> Vol Sampler L/R.", + ) + ] + if not present: + if backend == "sampler": + return [ + Diagnostic( + level="error", + category="connectivity", + subject=subject, + message="[audio].backend = 'sampler' but this U64 firmware does " + "not expose the Ultimate Audio sampler.", + hint="Update the U64 firmware, or set [audio].backend = 'dac' / " + "'auto' (auto falls back to the 4-bit DAC).", + ) + ] + return [ + Diagnostic( + level="ok", + category="connectivity", + subject=subject, + message="firmware has no Ultimate Audio sampler; [audio].backend = " + "auto falls back to the 4-bit DAC.", + ) + ] + + audible = any(v != _SAMPLER_VOL_OFF for v in volumes.values()) + if map_enabled and audible: + return [ + Diagnostic( + level="ok", + category="connectivity", + subject=subject, + message=f"Ultimate Audio mapped + audible — high-fidelity video " + f"audio ({reason_str}).", + ) + ] + off_bits = [] + if not map_enabled: + off_bits.append("$DF20 I/O map disabled") + if not audible: + off_bits.append("Sampler mixer channels OFF") + return [ + Diagnostic( + level="ok", + category="connectivity", + subject=subject, + message=f"{' + '.join(off_bits)}; will be enabled live for this run ({reason_str}).", + hint="Auto-enable is volatile (reverts on power-cycle) and restored at " + "teardown. Set [audio].backend = 'dac' to use the 4-bit DAC instead.", + ) + ] + + # --------------------------------------------------------------------------- # Report formatting # --------------------------------------------------------------------------- diff --git a/c64cast/ensemble.py b/c64cast/ensemble.py index 6619bfa..3d293ca 100644 --- a/c64cast/ensemble.py +++ b/c64cast/ensemble.py @@ -68,6 +68,16 @@ class SystemStack: # no-REU backend, or --skip-probe). The change is volatile firmware state, # so a missed restore still clears on the next power-cycle. reu_restore: dict[str, str] | None = None + # Startup probe verdict: True iff the U64's Ultimate Audio FPGA PCM sampler + # is exposed + routed. Resolves [audio].backend for video scenes at + # build-time (incl. SIGHUP/control-plane reloads + ensemble followers). + # False under --skip-probe / a failed query / a no-sampler backend, so the + # backend degrades to the 4-bit DAC rather than producing silence. + sampler_available: bool = False + # Sampler config fields auto-provisioned for this run (doctor.provision_sampler), + # composite-keyed "category\x1ffield" -> original, restored at teardown. None + # when nothing changed (already enabled, no-sampler backend, or --skip-probe). + sampler_restore: dict[str, str] | None = None framebuffer: Framebuffer | None = None preview_window: PreviewWindow | None = None recorder: StreamRecorder | None = None diff --git a/c64cast/sampler.py b/c64cast/sampler.py new file mode 100644 index 0000000..554b172 --- /dev/null +++ b/c64cast/sampler.py @@ -0,0 +1,572 @@ +"""Ultimate Audio FPGA PCM sampler ($DF20-$DFFF) — register helpers + a +streaming REU ring that plays arbitrary-length PCM at full fidelity. + +The U64 firmware exposes a 7-channel FPGA PCM sampler ("Ultimate Audio", +Gideon's register API v0.2, doc in 1541ultimate/doc/ultimate_audio_v0.2.pdf). +It plays 8- or 16-bit PCM up to 48 kHz **directly out of REU SDRAM** with zero +SID / ``$D418`` / NMI / CPU / turbo involvement — so it is immune to every +bus-halt / badline problem the 4-bit ``$D418`` NMI DAC fights, and is vastly +higher fidelity. On the U64 it is the default video-audio backend; the DAC +stays for TeensyROM (no sampler) and as an opt-in lo-fi mode. + +Two halves: + +* **Pure register helpers** (unit-testable, no hardware): the channel register + map, the rate divider, the control byte, the 8/16-bit PCM pack, and a + byte-layout builder for one channel's registers. +* **``UltimateAudioSampler``** — the scene-facing audio object that mirrors the + subset of ``audio.AudioStreamer`` that scenes call (``sample_rate``, + ``position_seconds``, ``stop``, ``push_samples``, ``get_recent_samples``). It + runs a **streaming REU ring** built on the sampler's own A↔B repeat loop: + program channel 0 to loop a region of REU forever while gated, then a writer + thread REUWRITEs decoded PCM *ahead of the computed read head*, wrapping. The + FPGA's sample clock is crystal-exact, so the read head is **computed** from + wall-clock (``(monotonic - gate_time) * rate``), never read back — the loop is + open-loop and drift-free, no servo/governor/NMI needed (much simpler than the + ``$D418`` REU pump). +""" + +from __future__ import annotations + +import logging +import queue +import threading +import time +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from .backend import C64Backend + from .dsp import AudioDSP + +log = logging.getLogger("c64cast.sampler") + +# -------------------------------------------------------------------------- +# Register spec (Ultimate Audio v0.2). Multi-byte fields are BIG-ENDIAN. +# -------------------------------------------------------------------------- +SAMPLER_IO_BASE = 0xDF20 # channel 0 base; reads here give the IRQ status reg +SAMPLER_VERSION_REG = 0xDF21 # reads $10 when the sampler is present +SAMPLER_CHANNEL_STRIDE = 0x20 # each channel occupies 32 consecutive bytes +SAMPLER_NUM_CHANNELS = 7 +SAMPLER_REF_CLOCK = 6_250_000 # the rate divider is REF / sample_rate + +# Channel register offsets (relative to the channel base). +REG_CONTROL = 0x00 +REG_VOLUME = 0x01 # 0..63 +REG_PAN = 0x02 # 7/8 = center, 0 = full left, 15 = full right +REG_START = 0x04 # 4 bytes BE: $01000000 + REU offset +REG_LENGTH = 0x09 # 3 bytes BE: length in bytes (16-bit ⇒ even) +REG_RATE = 0x0E # 2 bytes BE: divider = round(REF / rate) +REG_REPEAT_A = 0x11 # 3 bytes BE: loop revert point (byte offset in sample) +REG_REPEAT_B = 0x15 # 3 bytes BE: loop end point (byte offset in sample) +REG_INT_CLEAR = 0x1F # write 1 = clear this channel's IRQ, $FF = all + +# Control register bits. +CTRL_GATE = 0x01 # 0→1 (re)starts playback from the sample start +CTRL_REPEAT = 0x02 # loop A↔B while gated; on gate-off, play to end then stop +CTRL_INTERRUPT = 0x04 # raise IRQ at end of sample +CTRL_MODE_8BIT = 0x00 # mode b4-5 = 00 +CTRL_MODE_16BIT = 0x10 # mode b4-5 = 01 (little-endian) +CTRL_INTERLEAVE = 0x40 # skip odd samples (stereo-in-REU; unused here) + +# The sample start address selects REU SDRAM via the upper address byte $01; +# the lower 24 bits are the REU offset. (The REU base in U2 SDRAM is $01000000.) +REU_ADDR_SELECT_BYTE = 0x01 + +SAMPLER_VOLUME_MAX = 63 +SAMPLER_PAN_CENTER = 7 + +# -------------------------------------------------------------------------- +# Streaming-ring defaults. +# -------------------------------------------------------------------------- +# REU offset of the ring. Sits above the $D418-DAC mic ring ($110000) and well +# below the REU-staged video region ($E00000), so the sampler ring coexists +# with REU-staged bitmap video. +DEFAULT_RING_BASE = 0x200000 + +# Ring size. The ring is a jitter buffer, NOT the playback latency — latency is +# set by the lead/prebuffer target below, independent of ring size. 1 MiB is +# ~5.9 s of headroom at 16-bit/44.1k while keeping the one-time NEUTRAL prefill +# (~1.3 s at REUWRITE's ~820 KB/s) short. The whole track streams through it. +DEFAULT_RING_SIZE = 0x100000 # 1 MiB + +# Buffering depth: how far the write head leads the read head. This IS the +# push→hear latency (and so the constant A/V offset, harmless like the DAC's +# ~1 s ring latency). Big enough to ride out PyAV decode hiccups + a REUWRITE +# burst landing while the FPGA reads, small enough to keep latency low. +DEFAULT_LEAD_SECONDS = 0.5 + +REU_WRITE_SLICE = 32 * 1024 # cap per REUWRITE so a NEUTRAL pad can't burst huge +SAMPLE_TAP_SIZE = 2048 # most-recent-samples tap for spectrum overlays +_INT16_FULL_SCALE = 32768.0 + + +# -------------------------------------------------------------------------- +# Pure helpers (no hardware) — directly unit-testable. +# -------------------------------------------------------------------------- +def divider_for_rate(rate: float) -> int: + """Sample-rate divider for the FPGA's 6.25 MHz reference (≥ 1).""" + if rate <= 0: + raise ValueError(f"rate must be positive, got {rate}") + return max(1, round(SAMPLER_REF_CLOCK / rate)) + + +def actual_rate_for_divider(divider: int) -> float: + """The exact rate the FPGA plays at for a given divider (REF / divider). + + Differs from the nominal request by < 0.5% (e.g. 44100 → div 142 → + 44014.08 Hz). Feeding samples *at this rate* keeps A/V drift-free; the small + nominal offset is an inaudible constant pitch shift, not a drift.""" + if divider <= 0: + raise ValueError(f"divider must be positive, got {divider}") + return SAMPLER_REF_CLOCK / divider + + +def bytes_per_sample(bits: int) -> int: + if bits == 8: + return 1 + if bits == 16: + return 2 + raise ValueError(f"sampler bits must be 8 or 16, got {bits}") + + +def control_byte( + *, + gate: bool, + repeat: bool = False, + interrupt: bool = False, + bits: int = 16, + interleave: bool = False, +) -> int: + """Assemble the control-register byte from its bit fields.""" + value = 0 + if gate: + value |= CTRL_GATE + if repeat: + value |= CTRL_REPEAT + if interrupt: + value |= CTRL_INTERRUPT + value |= CTRL_MODE_16BIT if bits == 16 else CTRL_MODE_8BIT + if interleave: + value |= CTRL_INTERLEAVE + return value + + +def pack_pcm(samples_int16: np.ndarray, bits: int) -> bytes: + """Pack mono int16 samples to the sampler's PCM byte format. + + 8-bit is **signed** two's-complement (HW-confirmed); 16-bit is signed + little-endian. The int16→int8 step rounds (not truncates) for fidelity.""" + arr = np.asarray(samples_int16) + if bits == 8: + scaled = np.clip(np.rint(arr.astype(np.float32) / 256.0), -128, 127) + return bytes(scaled.astype(np.int8).tobytes()) + if bits == 16: + return bytes(np.ascontiguousarray(arr.astype(" int: + """I/O base address of a sampler channel (0..6).""" + if not 0 <= channel < SAMPLER_NUM_CHANNELS: + raise ValueError(f"channel must be 0..{SAMPLER_NUM_CHANNELS - 1}, got {channel}") + return SAMPLER_IO_BASE + channel * SAMPLER_CHANNEL_STRIDE + + +def _be_bytes(value: int, nbytes: int) -> list[int]: + """Big-endian byte list (high byte first), masked to ``nbytes``.""" + return [(value >> (8 * (nbytes - 1 - i))) & 0xFF for i in range(nbytes)] + + +def channel_register_writes( + *, + reu_offset: int, + length: int, + divider: int, + volume: int, + pan: int, + repeat: bool, + repeat_a: int, + repeat_b: int, +) -> list[tuple[int, list[int]]]: + """Ordered ``(channel-relative offset, [byte values])`` register writes to + program a channel, **excluding** the final control/gate write (issue that + last so playback starts only once every other register is set). + + Pure: builds the exact big-endian byte layout, no hardware. The unit test + pins this layout.""" + start_addr = (REU_ADDR_SELECT_BYTE << 24) | (reu_offset & 0xFFFFFF) + writes: list[tuple[int, list[int]]] = [ + (REG_START, _be_bytes(start_addr, 4)), + (REG_LENGTH, _be_bytes(length, 3)), + (REG_RATE, _be_bytes(divider, 2)), + (REG_VOLUME, [volume & 0x3F]), + (REG_PAN, [pan & 0x0F]), + ] + if repeat: + writes.append((REG_REPEAT_A, _be_bytes(repeat_a, 3))) + writes.append((REG_REPEAT_B, _be_bytes(repeat_b, 3))) + return writes + + +def program_channel( + api: C64Backend, + channel: int, + *, + reu_offset: int, + length: int, + rate: float, + bits: int, + volume: int = SAMPLER_VOLUME_MAX, + pan: int = SAMPLER_PAN_CENTER, + repeat: bool = False, + repeat_a: int = 0, + repeat_b: int = 0, + gate: bool = True, +) -> None: + """Program a sampler channel and (optionally) gate it on. + + All non-control registers are written and flushed first, then the control + byte — so the FPGA never starts playback against a half-written channel.""" + divider = divider_for_rate(rate) + base = channel_base(channel) + for offset, values in channel_register_writes( + reu_offset=reu_offset, + length=length, + divider=divider, + volume=volume, + pan=pan, + repeat=repeat, + repeat_a=repeat_a, + repeat_b=repeat_b, + ): + api.write_regs(f"{base + offset:04X}", *values) + api.flush() + if gate: + ctrl = control_byte(gate=True, repeat=repeat, bits=bits) + api.write_memory(f"{base:04X}", f"{ctrl:02X}") + api.flush() + + +def gate_off(api: C64Backend, channel: int = 0) -> None: + """Clear a channel's control register (gate off → playback stops).""" + api.write_memory(f"{channel_base(channel):04X}", "00") + api.flush() + + +# -------------------------------------------------------------------------- +# Streaming sampler. +# -------------------------------------------------------------------------- +class UltimateAudioSampler: + """Plays arbitrary-length PCM through a streaming REU ring on sampler + channel 0. + + Lifecycle mirrors the scene-facing slice of ``AudioStreamer``: + + sampler = UltimateAudioSampler(api, sample_rate=44100, bits=16) + sampler.start() # prefill + gate the looping ring + ... sampler.push_samples(int16) # writer thread streams it into the ring + sampler.position_seconds() # wall-clock read head → A/V master clock + sampler.stop() # gate off, join the writer + + The ring is the sampler's A↔B loop over ``[ring_base, ring_base+ring_size)``. + A writer thread keeps the write head ~``lead`` bytes ahead of the + computed read head (``read = (monotonic - gate_time) * rate``), wrapping at + ``ring_size`` and NEUTRAL-padding on producer underrun so the FPGA never + reads a stale/lapped byte. No servo: the FPGA clock is exact. + """ + + #: Marker so scenes can duck-type the sampler apart from AudioStreamer + #: (parallel to the streamer's ``use_reu_pump`` attribute) without importing + #: this module — VideoScene.setup branches on ``getattr(audio, "is_sampler")``. + is_sampler = True + + def __init__( + self, + api: C64Backend, + *, + sample_rate: int = 44100, + bits: int = 16, + channel: int = 0, + dsp: AudioDSP | None = None, + volume: int = SAMPLER_VOLUME_MAX, + pan: int = SAMPLER_PAN_CENTER, + ring_base: int = DEFAULT_RING_BASE, + ring_size: int = DEFAULT_RING_SIZE, + lead_seconds: float = DEFAULT_LEAD_SECONDS, + queue_max_chunks: int = 256, + ) -> None: + self.api = api + self.bits = bits + self.channel = channel + self._dsp = dsp + self._volume = volume + self._pan = pan + + self.bps = bytes_per_sample(bits) + self._divider = divider_for_rate(sample_rate) + self._actual_rate = actual_rate_for_divider(self._divider) + # AVFileSource resamples to this; feeding samples at the FPGA's real + # rate is what makes the wall-clock read head drift-free. + self.sample_rate = int(round(self._actual_rate)) + + self.ring_base = ring_base + # Frame-align the ring (16-bit length must be even; the A↔B loop wraps + # exactly at ring_size so it must be a whole number of samples). + self.ring_size = (ring_size // self.bps) * self.bps + self._neutral_unit = b"\x00" * self.bps # signed PCM silence is zero + + lead_bytes = int(self._actual_rate * lead_seconds) * self.bps + # Keep the lead under half the ring so write-ahead can't lap the reader. + self._lead_target = max(self.bps, min(lead_bytes, self.ring_size // 2)) + # Low watermark: the writer only NEUTRAL-pads (inserts silence) once the + # lead drains this low — a genuine producer stall, not a queue that's + # briefly empty because the writer is topping up toward the target. + self._lead_panic = max(self.bps, self._lead_target // 4) + + self._q: queue.Queue[bytes] = queue.Queue(maxsize=queue_max_chunks) + self._writer: threading.Thread | None = None + self._running = False + self._stopped = False + self._eof = False + + self._gate_time = 0.0 + self._written = 0 # absolute bytes written to the ring (monotone) + self._pushed_samples = 0 # total source samples accepted via push_samples + + # Telemetry for the teardown log / de-risk. + self._underrun_pads = 0 + self._lead_min = -1 + self._lead_max = -1 + + # Most-recent-samples tap for spectrum-style overlays. + self._tap_buf = np.zeros(SAMPLE_TAP_SIZE, dtype=np.float32) + self._tap_write = 0 + self._tap_lock = threading.Lock() + + # ---- bring-up --------------------------------------------------------- + def start(self, prebuffer_timeout: float = 2.0) -> None: + """Prefill the ring with silence, prebuffer ``lead`` bytes of real PCM, + then gate the looping channel on. + + Prefilling the whole ring with NEUTRAL guarantees the FPGA never reads + uninitialized REU even under a startup jitter spike; the prebuffer seeds + the write-ahead lead so the writer starts already ahead of the reader.""" + self._prefill_neutral() + + prebuf = self._collect_prebuffer(self._lead_target, prebuffer_timeout) + if prebuf: + self._write_wrapped(0, prebuf) + self._written = len(prebuf) + + program_channel( + self.api, + self.channel, + reu_offset=self.ring_base, + length=self.ring_size, + rate=self._actual_rate, + bits=self.bits, + volume=self._volume, + pan=self._pan, + repeat=True, + repeat_a=0, + repeat_b=self.ring_size, + gate=True, + ) + self._gate_time = time.monotonic() + self._running = True + self._writer = threading.Thread(target=self._writer_loop, name="uaudio-writer", daemon=True) + self._writer.start() + log.info( + "sampler: streaming ring up — %d-bit @ %d Hz (div %d, %.2f Hz actual), " + "ring %d KiB @ $%06X, lead %.2f s", + self.bits, + self.sample_rate, + self._divider, + self._actual_rate, + self.ring_size // 1024, + self.ring_base, + self._lead_target / self.bps / self._actual_rate, + ) + + def _prefill_neutral(self) -> None: + block = self._neutral_unit * (REU_WRITE_SLICE // self.bps) + for off in range(0, self.ring_size, len(block)): + n = min(len(block), self.ring_size - off) + self.api.reu_write(self.ring_base + off, block[:n]) + self.api.flush() + + def _collect_prebuffer(self, want_bytes: int, timeout: float) -> bytes: + """Drain at least ``want_bytes`` of queued PCM, blocking up to + ``timeout`` total for the producer to deliver it (whatever arrived by + then is used — the writer fills the rest ahead of the reader, NEUTRAL + on underrun). Returns **all** collected bytes (never truncated — a + partial-chunk truncation would drop samples and glitch the stream).""" + deadline = time.monotonic() + timeout + chunks: list[bytes] = [] + have = 0 + while have < want_bytes: + remaining = deadline - time.monotonic() + if remaining <= 0: + break + try: + chunk = self._q.get(timeout=remaining) + except queue.Empty: + break + chunks.append(chunk) + have += len(chunk) + return b"".join(chunks) + + # ---- streaming -------------------------------------------------------- + def push_samples(self, samples_int16: np.ndarray) -> None: + """Accept mono int16 from the demuxer; encode + enqueue for the writer. + + Blocks when the queue is full so PyAV naturally throttles to the + playback rate (same backpressure as the DAC's ``push_samples``).""" + if self._stopped: + return + floats = samples_int16.astype(np.float32) / _INT16_FULL_SCALE + self._tap_push(floats) + if self._dsp is not None and self._dsp.active: + floats = self._dsp.process(floats) + out_i16 = np.clip(np.rint(floats * 32767.0), -32768, 32767).astype(np.int16) + self._pushed_samples += int(samples_int16.shape[0]) + self._q.put(pack_pcm(out_i16, self.bits), block=True) + + def mark_eof(self) -> None: + """Source exhausted — clamp ``position_seconds`` to the pushed total so + an over-running wall clock can't desync the (now-ended) video.""" + self._eof = True + + def set_pre_emphasis(self, amount: float | None) -> None: + """No-op: pre-emphasis is a 4-bit-DAC fidelity aid, irrelevant to the + 16-bit sampler path. Present so the sampler satisfies the same + scene-facing contract as AudioStreamer (Scene.setup calls this on the + scene's audio object regardless of backend).""" + + def _read_consumed_bytes(self) -> int: + if not self._running: + return 0 + elapsed = time.monotonic() - self._gate_time + return int(elapsed * self._actual_rate) * self.bps + + def _writer_loop(self) -> None: + while self._running: + lead = self._written - self._read_consumed_bytes() + self._lead_min = lead if self._lead_min < 0 else min(self._lead_min, lead) + self._lead_max = max(self._lead_max, lead) + if lead >= self._lead_target: + # Far enough ahead — idle briefly. The bounded queue + blocking + # push provide producer backpressure, so the lead can't run away. + time.sleep(0.002) + continue + try: + data = self._q.get(timeout=0.02) + except queue.Empty: + # Queue momentarily empty. NEUTRAL-padding is a *last resort* — + # it inserts silence into the stream, so only do it when the + # lead has actually drained to the low watermark (a real + # underrun → without a pad the FPGA would replay stale ring + # data, the "echo" the $D418 pump fought). While the lead is + # still safe, just wait for the producer rather than glitch. + if lead > self._lead_panic: + continue + pad_frames = max(1, (self._lead_target - lead) // self.bps) + pad_frames = min(pad_frames, REU_WRITE_SLICE // self.bps) + data = self._neutral_unit * pad_frames + self._underrun_pads += 1 + self._write_wrapped(self._written % self.ring_size, data) + self._written += len(data) + + def _write_wrapped(self, ring_pos: int, data: bytes) -> None: + """REUWRITE ``data`` into the ring at ``ring_pos``, splitting at the ring + boundary and capping each transfer at one slice.""" + view = memoryview(data) + pos = ring_pos + while view: + room = self.ring_size - pos + n = min(len(view), room, REU_WRITE_SLICE) + self.api.reu_write(self.ring_base + pos, bytes(view[:n])) + view = view[n:] + pos += n + if pos >= self.ring_size: + pos = 0 + + # ---- clock ------------------------------------------------------------ + def position_seconds(self) -> float: + """Wall-clock seconds since the ring was gated on — the heard playback + position (same contract as ``AudioStreamer.position_seconds`` in REU-pump + mode). Clamped to the pushed total after EOF. The FPGA crystal vs the + host monotonic clock differ by ~ppm, so this is drift-free for A/V sync.""" + if not self._running: + return 0.0 + elapsed = time.monotonic() - self._gate_time + if self._eof and self._pushed_samples: + total_s = self._pushed_samples / self._actual_rate + return max(0.0, min(elapsed, total_s)) + return max(0.0, elapsed) + + # ---- sample tap (spectrum overlays) ----------------------------------- + def _tap_push(self, mono_floats: np.ndarray) -> None: + n = mono_floats.shape[0] + with self._tap_lock: + if n >= SAMPLE_TAP_SIZE: + self._tap_buf[:] = mono_floats[-SAMPLE_TAP_SIZE:] + self._tap_write = 0 + return + end = self._tap_write + n + if end <= SAMPLE_TAP_SIZE: + self._tap_buf[self._tap_write : end] = mono_floats + else: + split = SAMPLE_TAP_SIZE - self._tap_write + self._tap_buf[self._tap_write :] = mono_floats[:split] + self._tap_buf[: end - SAMPLE_TAP_SIZE] = mono_floats[split:] + self._tap_write = end % SAMPLE_TAP_SIZE + + def get_recent_samples(self, n: int) -> np.ndarray: + """Most recent ``n`` float samples (oldest first), a fresh copy.""" + n = min(int(n), SAMPLE_TAP_SIZE) + out = np.empty(n, dtype=np.float32) + with self._tap_lock: + w = self._tap_write + start = (w - n) % SAMPLE_TAP_SIZE + tail = SAMPLE_TAP_SIZE - start + if n <= tail: + out[:] = self._tap_buf[start : start + n] + else: + out[:tail] = self._tap_buf[start:] + out[tail:] = self._tap_buf[: n - tail] + return out + + # ---- shutdown --------------------------------------------------------- + def stop(self) -> None: + """Gate the channel off and join the writer thread. Firmware-config + restore (Audio Mixer / I/O map) is separate, in doctor at teardown.""" + self._stopped = True + self._running = False + if self._writer is not None: + self._writer.join(timeout=1.0) + self._writer = None + try: + gate_off(self.api, self.channel) + except Exception as e: # best-effort; teardown must not raise + log.debug("sampler gate-off failed: %s", e) + while not self._q.empty(): + try: + self._q.get_nowait() + except queue.Empty: + break + if self._underrun_pads: + log.warning( + "sampler: %d underrun pads this session (producer stalled)", + self._underrun_pads, + ) + if self._lead_min >= 0: + log.info( + "sampler: write-ahead lead min=%d max=%d bytes (target=%d, ring=%d)", + self._lead_min, + self._lead_max, + self._lead_target, + self.ring_size, + ) diff --git a/c64cast/scenes.py b/c64cast/scenes.py index c842ab0..3fe14a7 100644 --- a/c64cast/scenes.py +++ b/c64cast/scenes.py @@ -46,6 +46,7 @@ from .modes import BitmapDisplayMode, DisplayMode from .palette import ColorFitAccumulator, ColorMapAccumulator from .profiler import get_profiler +from .sampler import UltimateAudioSampler from .video import ( AVFileSource, WebcamSource, @@ -65,6 +66,13 @@ log = logging.getLogger(__name__) +# A scene's audio object is either the shared 4-bit DAC streamer or (video +# scenes on a sampler-capable U64) the Ultimate Audio FPGA sampler. Both +# satisfy the scene-facing contract (sample_rate / position_seconds / stop / +# push_samples / set_pre_emphasis); backend-specific bring-up branches narrow +# via isinstance. +SceneAudio = AudioStreamer | UltimateAudioSampler + _C64_ASPECT = 320 / 200 # Rolling-window auto_fit: how many opening frames to fold into the online @@ -135,7 +143,7 @@ class Scene: def __init__( self, api: C64Backend, - audio: AudioStreamer | None, + audio: SceneAudio | None, display_mode: DisplayMode | None, name: str, ): @@ -291,7 +299,9 @@ def __init__( def setup(self) -> None: super().setup() self.start_time = time.time() - if self.audio: + # The webcam mic path is always the 4-bit DAC streamer (the sampler is a + # video-only backend), so narrow to AudioStreamer for start_mic. + if isinstance(self.audio, AudioStreamer): # Mirror VideoScene: when the display mode installs the # bank-swap merged dispatcher at $0314 (audio_reu_pump_active # flag on a bitmap mode with use_reu_staged), the mic REU pump @@ -535,8 +545,8 @@ def setup(self) -> None: super().setup() self.start_time = time.time() # Only start mic capture if the scene opted in *and* the global - # audio is enabled — same model as WebcamScene. - if self.audio: + # audio is enabled — same model as WebcamScene. Always the DAC streamer. + if isinstance(self.audio, AudioStreamer): self.audio.start_mic( self.audio_cfg.device, self.audio_cfg.mic_sensitivity, self.audio_cfg.noise_gate ) @@ -846,7 +856,7 @@ def competes_for_audio_lock(self) -> bool: def __init__( self, api: C64Backend, - audio: AudioStreamer | None, + audio: SceneAudio | None, display_mode: DisplayMode, file: str, prepend_alignment_marker: bool = False, @@ -1044,7 +1054,15 @@ def setup(self) -> None: self.display_mode.set_color_map(None) has_audio = (self.source.a_stream is not None) and (self.audio is not None) - if has_audio and getattr(self.audio, "use_reu_pump", False): + if has_audio and isinstance(self.audio, UltimateAudioSampler): + # Ultimate Audio FPGA sampler path: bring up the streaming REU ring + # (prefill + gate the A↔B loop), then feed the demuxer's decoded + # int16 straight to its push_samples. No SID/$D418/NMI bring-up — + # the FPGA plays from REU off the C64 bus. position_seconds()/stop() + # are polymorphic, so _clock_s() + teardown() are unchanged. + self.audio.start() + self.source.start(audio_push=self.audio.push_samples) + elif has_audio and getattr(self.audio, "use_reu_pump", False): # REU-staged path: pre-decode entire audio track, 4-bit encode # with the same gain/dither pipeline as the host-DMA path uses # per sample, then upload to REU. Video frames still come from @@ -1052,7 +1070,7 @@ def setup(self) -> None: # demuxer SKIPS audio decode entirely (otherwise it competes # with video decode in the same thread for CPU, causing # noticeable video lag at scene start until the demuxer catches up). - assert self.audio is not None + assert isinstance(self.audio, AudioStreamer) # DAC streamer (not sampler) audio_4bit = self._preencode_audio_for_reu() # Bitmap display modes (hires/mhires) push ~300 KB/sec via host # DMAWRITE which halts the C64 bus ~30 % of the time. NMI service @@ -1093,7 +1111,7 @@ def setup(self) -> None: ) self.source.start(audio_push=None) elif has_audio: - assert self.audio is not None + assert isinstance(self.audio, AudioStreamer) # DAC streamer (not sampler) self.audio.start_for_external_source() self.source.start(audio_push=self.audio.push_samples) else: @@ -1114,7 +1132,9 @@ def _preencode_audio_for_reu(self) -> bytes: brief blip at scene start, then real content begins. Used to anchor Cam Link captures to a known source-timeline-zero for cross-capture comparison.""" - assert self.audio is not None + # The REU-pump pre-encode is a DAC-streamer-only path (the sampler + # streams 16-bit PCM through its own ring); narrow to AudioStreamer. + assert isinstance(self.audio, AudioStreamer) sr = self.audio.sample_rate # Decode full audio to int16 mono at sample rate. int16 = decode_audio_full(self.filepath, sr) @@ -1162,6 +1182,12 @@ def _clock_s(self) -> float: def process_frame(self, current_time: float) -> bool: if self.source is None or self.source.finished: + # Tell a sampler the source is exhausted so position_seconds() + # clamps to the pushed total (no-op for the DAC streamer). Idempotent. + if self.audio is not None: + mark_eof = getattr(self.audio, "mark_eof", None) + if callable(mark_eof): + mark_eof() return False clock_s = self._clock_s() diff --git a/c64cast/wizard.py b/c64cast/wizard.py index 5b526f7..489e954 100644 --- a/c64cast/wizard.py +++ b/c64cast/wizard.py @@ -155,9 +155,12 @@ def _apply_globals( system: str | None = None, audio_enabled: bool | None = None, vision_enabled: bool | None = None, + audio_overrides: dict[str, object] | None = None, ) -> None: """Overlay the essential global settings onto a Config (in place). Each is - skipped when None so callers can leave any at its dataclass default.""" + skipped when None so callers can leave any at its dataclass default. + `audio_overrides` is a `{field: value}` map setattr'd onto cfg.audio (e.g. + backend / sampler_sample_rate / sampler_bits).""" if url: cfg.ultimate64.url = url if system: @@ -166,6 +169,8 @@ def _apply_globals( cfg.audio.enabled = audio_enabled if vision_enabled is not None: cfg.vision.enabled = vision_enabled + for key, value in (audio_overrides or {}).items(): + setattr(cfg.audio, key, value) def build_config( @@ -177,12 +182,18 @@ def build_config( system: str | None = None, audio_enabled: bool | None = None, vision_enabled: bool | None = None, + audio_overrides: dict[str, object] | None = None, ) -> cfgmod.Config: """Assemble a single-scene Config from collected answers. Pure — no I/O — so the wizard's terminal shell stays a thin layer over this.""" cfg = cfgmod.Config() _apply_globals( - cfg, url=url, system=system, audio_enabled=audio_enabled, vision_enabled=vision_enabled + cfg, + url=url, + system=system, + audio_enabled=audio_enabled, + vision_enabled=vision_enabled, + audio_overrides=audio_overrides, ) cfg.scenes = [make_scene(scene_type, scene_fields, overlays)] return cfg @@ -195,6 +206,7 @@ def build_multi_config( system: str | None = None, audio_enabled: bool | None = None, vision_enabled: bool | None = None, + audio_overrides: dict[str, object] | None = None, playlist: dict[str, object] | None = None, interstitial: dict[str, object] | None = None, ) -> cfgmod.Config: @@ -203,7 +215,12 @@ def build_multi_config( setattr onto the matching section. Pure — no I/O.""" cfg = cfgmod.Config() _apply_globals( - cfg, url=url, system=system, audio_enabled=audio_enabled, vision_enabled=vision_enabled + cfg, + url=url, + system=system, + audio_enabled=audio_enabled, + vision_enabled=vision_enabled, + audio_overrides=audio_overrides, ) cfg.scenes = list(scenes) for key, value in (playlist or {}).items(): @@ -527,6 +544,36 @@ def _prompt_one_scene( } +def _prompt_audio_backend(q, audio_enabled: bool) -> dict[str, object] | None: # type: ignore[no-untyped-def] + """Ask the video-audio backend when audio is on (the wizard targets the + Ultimate, which has the FPGA sampler). Returns an `audio_overrides` dict for + `_apply_globals` ({} when audio is off / left at default), or None on cancel.""" + if not audio_enabled: + return {} + backend = q.select( + "Video-audio backend (sampler = U64 Ultimate Audio FPGA PCM, hi-fi; " + "dac = lo-fi 4-bit $D418)", + choices=list(cfgmod._AUDIO_BACKEND_CHOICES), + default="auto", + ).ask() + if backend is None: + return None + overrides: dict[str, object] = {"backend": backend} + if backend == "sampler": + bits = q.select("Sampler PCM bit depth", choices=["16", "8"], default="16").ask() + if bits is None: + return None + rate = q.text("Sampler sample rate (Hz, 1000..48000)", default="44100").ask() + if rate is None: + return None + try: + overrides["sampler_sample_rate"] = int(rate) + except ValueError: + overrides["sampler_sample_rate"] = 44100 + overrides["sampler_bits"] = int(bits) + return overrides + + def _prompt_globals(q) -> tuple[str, str] | None: # type: ignore[no-untyped-def] """Ask the U64 URL + video system. Returns (url, system) or None.""" url = q.text("Ultimate 64 URL", default=cfgmod.Ultimate64Cfg().url).ask() @@ -694,6 +741,9 @@ def _run_single(q, path_arg: str | None) -> tuple[str, bool] | None: # type: ig scene = _prompt_one_scene(q, audio_enabled=None, ask_audio=True) if scene is None: return None + audio_overrides = _prompt_audio_backend(q, bool(scene["audio_enabled"])) + if audio_overrides is None: + return None globals_ = _prompt_globals(q) if globals_ is None: return None @@ -706,6 +756,7 @@ def _run_single(q, path_arg: str | None) -> tuple[str, bool] | None: # type: ig url=url, system=system, audio_enabled=scene["audio_enabled"], # type: ignore[arg-type] + audio_overrides=audio_overrides, ) err = validate(cfg) @@ -725,6 +776,10 @@ def _run_multi(q, path_arg: str | None) -> tuple[str, bool] | None: # type: ign if scenes is None: return None + audio_overrides = _prompt_audio_backend(q, bool(audio_enabled)) + if audio_overrides is None: + return None + opts = _prompt_playlist_opts(q, scenes) if opts is None: return None @@ -747,6 +802,7 @@ def _run_multi(q, path_arg: str | None) -> tuple[str, bool] | None: # type: ign url=url, system=system, audio_enabled=audio_enabled, + audio_overrides=audio_overrides, playlist=playlist_overrides, interstitial=interstitial_overrides, ) diff --git a/config/c64cast.example.toml b/config/c64cast.example.toml index e8761f4..6dda37c 100644 --- a/config/c64cast.example.toml +++ b/config/c64cast.example.toml @@ -209,6 +209,17 @@ double_buffer = "auto" # Host-DMA double-buffer (page flip) for enabled = true # on by default; --no-audio mutes device = -1 # -1 = system default input; -L lists indices sample_rate = 10500 # SID DAC rate; 10500 keeps fricatives (>8000 Nyquist). NTSC≤~11025, PAL≤~10500 +backend = "auto" # video-audio backend: "auto" = U64 "Ultimate + # Audio" FPGA PCM sampler when available (high + # fidelity, off the C64 bus), else the 4-bit + # DAC; "dac" forces the lo-fi $D418 DAC (all + # backends); "sampler" forces the sampler + # (warns + falls back to DAC if unavailable). + # mic/webcam audio always uses the DAC. +sampler_sample_rate = 44100 # Hz for the sampler backend (1000..48000). + # 44100 = CD quality (the FPGA plays at the + # nearest divider of its 6.25 MHz reference). +sampler_bits = 16 # sampler PCM depth: 8 (signed) or 16 (signed LE) mic_sensitivity = 1.5 noise_gate = 0.05 dither = false # TPDF dither pre-quantize. false (default diff --git a/config/examples/README.md b/config/examples/README.md index 914813e..d1c2cc7 100644 --- a/config/examples/README.md +++ b/config/examples/README.md @@ -62,6 +62,7 @@ Exit with `Ctrl+C`. | `scene-blank.toml` | Blank PETSCII canvas + a `big_text` overlay. | | `scene-slideshow.toml` | Cycle through still images from a directory/glob. | | `scene-video.toml` | Video + soundtrack playback. Requires `video` extra. | +| `scene-video-sampler.toml` | Video + hi-fi audio via the U64 Ultimate Audio FPGA sampler. | | `scene-waveform.toml` | SID jukebox + oscilloscope. Requires a `.sid` file. | | `scene-midi.toml` | MIDI → SID synth. Requires `midi` extra + MIDI source. | | `scene-launcher.toml` | Launch a native `.prg`/`.crt` and hand over the machine. | @@ -130,6 +131,7 @@ Five files exercise the audio path; the first three require `[audio] enabled = t | `scene-webcam-audio.toml` | Mic capture → SID DAC (needs `mic` extra) | | `overlay-spectrum_petscii.toml` | Mic capture + visual 8-band FFT (needs `mic`) | | `scene-video.toml` | Video-file soundtrack (needs `video`) | +| `scene-video-sampler.toml` | Video soundtrack via Ultimate Audio FPGA sampler | | `scene-waveform.toml` | Native SID playback of a `.sid` file | | `scene-midi.toml` | MIDI input → in-process SID synth (needs `midi`) | diff --git a/config/examples/scene-video-sampler.toml b/config/examples/scene-video-sampler.toml new file mode 100644 index 0000000..0cfae0b --- /dev/null +++ b/config/examples/scene-video-sampler.toml @@ -0,0 +1,44 @@ +#:schema ../../c64cast.schema.json +# Single-scene demo: video playback with HIGH-FIDELITY audio via the U64's +# "Ultimate Audio" FPGA PCM sampler (instead of the lo-fi 4-bit $D418 DAC). +# +# The sampler plays 8/16-bit PCM straight out of REU SDRAM with ZERO +# SID/$D418/NMI/CPU involvement — so it is immune to the bus-halt problems the +# 4-bit DAC fights, and sounds vastly better. It's U64-only (the TeensyROM has +# no FPGA sampler and stays on the DAC). See sampler.py + docs/caveats.md. +# +# Requires the `video` extra (PyAV): +# pip install -e .[video] +# +# Prerequisites on the U64 (auto-provisioned live for this run when missing, +# and restored at teardown — but you can also set them yourself in the F2 menu): +# * C64 and Cartridge Settings -> Map Ultimate Audio $DF20-DFFF = Enabled +# * Audio Mixer -> Vol Sampler L / Vol Sampler R audible (0 dB, not OFF) +# The sampler's PCM ring lives in REU SDRAM, so the REU is enabled + sized to +# 16 MB for this run too (also auto-provisioned + restored). With the sampler +# off the C64 bus, video defaults to the tear-free REU bank-swap path. +# +# Run: +# python -m c64cast --config config/examples/scene-video-sampler.toml + +[ultimate64] +url = "http://ultimate-64-ii.lan" + +[audio] +enabled = true # required for the soundtrack to be heard +backend = "sampler" # force the FPGA PCM sampler (default "auto" picks + # it automatically on a capable U64) +sampler_sample_rate = 44100 # CD quality +sampler_bits = 16 # 16-bit signed PCM (or 8 for signed 8-bit) + +[playlist] +interleave_videos = false # the single scene IS the video + +[[scenes]] +type = "video" +display = "mhires" # bitmap video; sampler audio leaves the bus free +# `file =` accepts a path, directory, glob, or comma-separated combination; +# each setup picks a random match. Omit it to scan assets/videos/. +file = "assets/videos/your-video.mp4" +# No `duration_s` — video scenes run until EOF (the loader rejects the field); +# single-scene mode loops the file forever. diff --git a/docs/caveats.md b/docs/caveats.md index e56b3bd..22c44a1 100644 --- a/docs/caveats.md +++ b/docs/caveats.md @@ -5,12 +5,12 @@ why. Read this before you spend an evening debugging "it's almost working, but…". For end-user options see [usage.md](usage.md); for the architecture overview see [CLAUDE.md](../CLAUDE.md). -## Audio is intentionally lo-fi +## Audio is intentionally lo-fi (the 4-bit `$D418` DAC) The SID DAC streaming path writes 4-bit samples (0-15) to the SID volume -nibble at $D418 at 8 kHz. That's an objectively bad audio format — but +nibble at $D418 at ~10.5 kHz. That's an objectively bad audio format — but it's the format a real C64 plays back. You can raise `[audio] -sample_rate` in config, but the C64-side NMI is sized for 8 kHz and +sample_rate` in config, but the C64-side NMI is sized for that rate and nothing in the pipeline resamples; a different rate just plays at the wrong pitch. @@ -18,6 +18,40 @@ There is no master volume, no SID filter, no anti-aliasing. The noise gate (`noise_gate`) and pre-DAC gain (`mic_sensitivity`) are the only shaping knobs. Hum and hiss are part of the aesthetic. +## High-fidelity video audio: the Ultimate Audio FPGA sampler (U64) + +On the Ultimate 64 the lo-fi DAC above is **not** the default for *video* +playback. The U64 firmware exposes an "Ultimate Audio" FPGA PCM sampler at +`$DF20-$DFFF` that plays 8/16-bit PCM (up to 48 kHz) **directly out of REU +SDRAM** — the FPGA fetches and converts the samples itself, with **zero** +SID / `$D418` / NMI / CPU / turbo involvement. So it sidesteps every bus-halt +and badline problem the 4-bit DAC fights (it kept the DAC capped at ~16 kHz and +20 fps), and it sounds like an actual sound card instead of a digi-player. + +`[audio].backend` selects it: `"auto"` (default) uses the sampler on a capable +U64 and falls back to the 4-bit DAC otherwise; `"dac"` forces the lo-fi DAC +(the only path on the TeensyROM, which has no FPGA sampler); `"sampler"` forces +it and warns + falls back to the DAC if it isn't available. `sampler_sample_rate` +(default 44100) and `sampler_bits` (8 or 16, default 16) tune quality. Mic and +webcam audio always use the 4-bit DAC. + +Implementation (`c64cast/sampler.py`): a **streaming REU ring**. Channel 0 is +programmed as an A↔B loop over a region of REU; a host writer thread REUWRITEs +decoded PCM ahead of a *wall-clock-computed* read head and wraps. The FPGA +sample clock is crystal-exact, so the read position is computed (never read +back) and the whole thing is open-loop and drift-free — no servo, no governor, +no NMI. The sample rate is the FPGA's exact `6.25 MHz / divider`, a constant +<0.5 % offset from the nominal request (inaudible, and drift-free because A/V +both ride the same clock). The ring lives in REU SDRAM, so a sampler run also +provisions the REU (16 MB) — which makes overlay-free bitmap video resolve to +the tear-free REU bank-swap path; the sampler installs no `$0314` IRQ, so the +two coexist with no contention. + +Prerequisites on the U64 (auto-provisioned live + restored at teardown when +missing, or set them yourself in F2): **C64 and Cartridge Settings → Map Ultimate +Audio $DF20-DFFF = Enabled**, and **Audio Mixer → Vol Sampler L / Vol Sampler R** +audible (0 dB, not OFF). `c64cast --doctor` reports the sampler's state. + ## SID playback uses a C64-side player PRG, not `runners:sidplay` `WaveformScene` deliberately avoids the U64 firmware's diff --git a/docs/usage.md b/docs/usage.md index 479067c..7e87681 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -378,11 +378,22 @@ device = -1 # -1 = system default camera; `--list-device [audio] enabled = true # on by default; --no-audio mutes device = -1 # sounddevice input index; -1 = system default -sample_rate = 8000 # SID DAC sample rate; don't change unless you really mean it +sample_rate = 10500 # 4-bit $D418 DAC rate; don't change unless you really mean it +backend = "auto" # video audio: "auto" (U64 Ultimate Audio FPGA + # sampler when available, else DAC), "dac" + # (lo-fi 4-bit $D418, all backends), "sampler" + # (force the hi-fi FPGA PCM sampler) +sampler_sample_rate = 44100 # sampler backend rate (1000..48000); CD quality +sampler_bits = 16 # sampler PCM depth: 8 (signed) or 16 (signed LE) mic_sensitivity = 1.5 # pre-DAC gain noise_gate = 0.05 # below this RMS, sample is silenced ``` +On the Ultimate 64, `backend = "auto"` plays a video's soundtrack through the +**Ultimate Audio FPGA PCM sampler** — far higher fidelity than the 4-bit DAC +and entirely off the C64 bus. See "High-fidelity video audio" in +[caveats.md](caveats.md). Mic/webcam audio always uses the 4-bit DAC. + ### `[interstitial]` ```toml diff --git a/tests/test_doctor.py b/tests/test_doctor.py index 0717e8a..2eae831 100644 --- a/tests/test_doctor.py +++ b/tests/test_doctor.py @@ -273,10 +273,15 @@ def test_no_reu_request_skips_reu_probe(self): def test_auto_use_reu_staged_is_not_a_hard_requirement(self): """The default `use_reu_staged = "auto"` is self-healing (it falls back to host-DMA when REU is off), so the doctor must NOT demand REU — - even with REU disabled and a bitmap scene, no REU diagnostic fires.""" + even with REU disabled and a bitmap scene, no REU diagnostic fires. + + backend = "dac" isolates this from the sampler path (the sampler is a + separate hard REU reason — its own provisioning test covers that).""" loaded = _load(""" [ultimate64] url = "http://fake" + [audio] + backend = "dac" [video] use_reu_staged = "auto" [[scenes]] @@ -576,11 +581,16 @@ def test_skipped_when_auto_reu_off(self): def test_skipped_without_hard_opt_in(self): """use_reu_staged = "auto" is NOT a hard requirement (it self-heals to - host-DMA double-buffer), so it must not trigger provisioning.""" + host-DMA double-buffer), so it must not trigger provisioning. + + backend = "dac" isolates this from the sampler path (which IS a hard + REU reason — covered by ProvisionSamplerTest).""" api = _FakeApi(reu_status="Disabled", reu_size="2 MB") cfg = _cfg(""" [ultimate64] url = "http://fake" + [audio] + backend = "dac" [video] use_reu_staged = "auto" [[scenes]] diff --git a/tests/test_introspect.py b/tests/test_introspect.py index 5b96bfb..129146c 100644 --- a/tests/test_introspect.py +++ b/tests/test_introspect.py @@ -181,6 +181,16 @@ def test_audio_source_choices_pinned(self): self.assertEqual(meta["choices"], cfgmod._AUDIO_SOURCE_CHOICES) self.assertEqual(meta["applies_to"], ("generative",)) + def test_audio_backend_choices_pinned(self): + # The video-audio backend selector is a fixed literal set (no registry): + # pin it so a new value can't be added to AudioCfg.backend metadata + # without resolve_audio_backend + build_scene learning to honor it. + self.assertEqual(cfgmod._AUDIO_BACKEND_CHOICES, ("auto", "dac", "sampler")) + from dataclasses import fields + + meta = {f.name: f for f in fields(cfgmod.AudioCfg)}["backend"].metadata + self.assertEqual(meta["choices"], cfgmod._AUDIO_BACKEND_CHOICES) + def test_scene_types(self): self.assertEqual(set(cfgmod.SCENE_TYPES), set(introspect.scene_type_names())) diff --git a/tests/test_sampler.py b/tests/test_sampler.py new file mode 100644 index 0000000..cb9d072 --- /dev/null +++ b/tests/test_sampler.py @@ -0,0 +1,473 @@ +"""Unit tests for the Ultimate Audio FPGA PCM sampler (c64cast/sampler.py) and +its config/doctor integration. No hardware: a recording fake backend stands in +for the U64, and the doctor REST queries are mocked.""" + +from __future__ import annotations + +import time +import unittest +from typing import Any, cast +from unittest import mock + +import numpy as np + +from c64cast import config as cfgmod +from c64cast import doctor +from c64cast import sampler as s + + +# --------------------------------------------------------------------------- +# Pure register helpers +# --------------------------------------------------------------------------- +class PureHelperTest(unittest.TestCase): + def test_divider_table_matches_doc(self): + # round(6.25 MHz / rate); 44100 -> 142 is the documented value. + self.assertEqual(s.divider_for_rate(44100), 142) + self.assertEqual(s.divider_for_rate(48000), 130) + self.assertEqual(s.divider_for_rate(8000), 781) + self.assertEqual(s.divider_for_rate(16000), 391) + + def test_divider_rejects_nonpositive(self): + with self.assertRaises(ValueError): + s.divider_for_rate(0) + + def test_actual_rate_roundtrips(self): + div = s.divider_for_rate(44100) + self.assertAlmostEqual(s.actual_rate_for_divider(div), 6_250_000 / 142, places=2) + + def test_bytes_per_sample(self): + self.assertEqual(s.bytes_per_sample(8), 1) + self.assertEqual(s.bytes_per_sample(16), 2) + with self.assertRaises(ValueError): + s.bytes_per_sample(24) + + def test_pack_pcm_8bit_is_signed(self): + arr = np.array([0, 32767, -32768, 256, -256], dtype=np.int16) + out = np.frombuffer(s.pack_pcm(arr, 8), dtype=np.int8) + self.assertEqual(list(out), [0, 127, -128, 1, -1]) + + def test_pack_pcm_16bit_is_le(self): + self.assertEqual(list(s.pack_pcm(np.array([1, -1], dtype=np.int16), 16)), [1, 0, 255, 255]) + + def test_pack_pcm_rejects_bad_bits(self): + with self.assertRaises(ValueError): + s.pack_pcm(np.array([0], dtype=np.int16), 12) + + def test_control_byte_bits(self): + self.assertEqual(s.control_byte(gate=True, repeat=True, bits=16), 0x13) + self.assertEqual(s.control_byte(gate=True, bits=8), 0x01) + self.assertEqual(s.control_byte(gate=False, repeat=True, bits=8), 0x02) + self.assertEqual(s.control_byte(gate=True, interrupt=True, bits=16), 0x15) + + def test_channel_base(self): + self.assertEqual(s.channel_base(0), 0xDF20) + self.assertEqual(s.channel_base(1), 0xDF40) + self.assertEqual(s.channel_base(6), 0xDFE0) + with self.assertRaises(ValueError): + s.channel_base(7) + + def test_channel_register_writes_layout(self): + writes = dict( + s.channel_register_writes( + reu_offset=0x200000, + length=0x100000, + divider=142, + volume=63, + pan=7, + repeat=True, + repeat_a=0, + repeat_b=0x100000, + ) + ) + # Start address = $01000000 + REU offset, big-endian. + self.assertEqual(writes[s.REG_START], [0x01, 0x20, 0x00, 0x00]) + self.assertEqual(writes[s.REG_LENGTH], [0x10, 0x00, 0x00]) + self.assertEqual(writes[s.REG_RATE], [0x00, 0x8E]) # 142 + self.assertEqual(writes[s.REG_VOLUME], [0x3F]) + self.assertEqual(writes[s.REG_PAN], [0x07]) + self.assertEqual(writes[s.REG_REPEAT_A], [0x00, 0x00, 0x00]) + self.assertEqual(writes[s.REG_REPEAT_B], [0x10, 0x00, 0x00]) + + def test_register_writes_omit_repeat_when_off(self): + writes = dict( + s.channel_register_writes( + reu_offset=0, + length=100, + divider=142, + volume=63, + pan=7, + repeat=False, + repeat_a=0, + repeat_b=0, + ) + ) + self.assertNotIn(s.REG_REPEAT_A, writes) + self.assertNotIn(s.REG_REPEAT_B, writes) + + +# --------------------------------------------------------------------------- +# Recording fake backend for the streamer +# --------------------------------------------------------------------------- +class _FakeBackend: + """Records the writes a UltimateAudioSampler issues (reu_write / write_regs / + write_memory / flush). No socket, no REST.""" + + def __init__(self) -> None: + self.reu_writes: list[tuple[int, int]] = [] # (offset, length) + self.reg_writes: list[tuple[str, tuple[int, ...]]] = [] + self.mem_writes: list[tuple[str, str]] = [] + self.flushes = 0 + + def reu_write(self, offset: int, data: bytes) -> None: + self.reu_writes.append((offset, len(data))) + + def write_regs(self, base_addr: str, *values: int) -> None: + self.reg_writes.append((base_addr.upper(), values)) + + def write_memory(self, address: str, data_hex: str) -> None: + self.mem_writes.append((address.upper(), data_hex.upper())) + + def flush(self) -> None: + self.flushes += 1 + + +def _make(api: _FakeBackend, **kw) -> s.UltimateAudioSampler: + """Build a sampler against the recording fake (cast like the audio tests' + `cast(Ultimate64API, FakeAPI())` — the fake duck-types the write surface).""" + return s.UltimateAudioSampler(cast(Any, api), **kw) + + +class StreamerTest(unittest.TestCase): + def test_init_resolves_rate_and_ring(self): + smp = _make(_FakeBackend(), sample_rate=44100, bits=16, ring_size=4097) + self.assertEqual(smp.bps, 2) + self.assertEqual(smp._divider, 142) + self.assertEqual(smp.sample_rate, round(6_250_000 / 142)) + # Ring frame-aligned (even for 16-bit). + self.assertEqual(smp.ring_size % 2, 0) + self.assertTrue(smp.is_sampler) + + def test_write_wrapped_splits_at_ring_boundary(self): + api = _FakeBackend() + smp = _make(api, sample_rate=44100, bits=8, ring_base=0x200000, ring_size=16) + smp._write_wrapped(10, b"ABCDEF") # 6 bytes from pos 10 in a 16-byte ring + # 6 bytes at base+10, then 0 wrap... 10+6=16 exactly, no wrap. + self.assertEqual(api.reu_writes, [(0x200000 + 10, 6)]) + api.reu_writes.clear() + smp._write_wrapped(12, b"ABCDEF") # crosses: 4 at +12, 2 at +0 + self.assertEqual(api.reu_writes, [(0x200000 + 12, 4), (0x200000, 2)]) + + def test_position_seconds_zero_before_start(self): + smp = _make(_FakeBackend(), sample_rate=44100, bits=16) + self.assertEqual(smp.position_seconds(), 0.0) + + def test_position_seconds_tracks_wallclock(self): + smp = _make(_FakeBackend(), sample_rate=44100, bits=16) + smp._running = True + smp._gate_time = time.monotonic() - 2.0 + self.assertAlmostEqual(smp.position_seconds(), 2.0, delta=0.2) + + def test_position_clamps_after_eof(self): + smp = _make(_FakeBackend(), sample_rate=44100, bits=16) + smp._running = True + smp._gate_time = time.monotonic() - 100.0 + smp._pushed_samples = smp.sample_rate # ~1 s of audio pushed + smp.mark_eof() + self.assertAlmostEqual(smp.position_seconds(), 1.0, delta=0.1) + + def test_read_consumed_bytes_is_frame_aligned(self): + smp = _make(_FakeBackend(), sample_rate=44100, bits=16) + smp._running = True + smp._gate_time = time.monotonic() - 1.0 + consumed = smp._read_consumed_bytes() + self.assertEqual(consumed % smp.bps, 0) + self.assertGreater(consumed, 0) + + def test_start_prefills_and_gates_then_stop_gates_off(self): + api = _FakeBackend() + smp = _make( + api, sample_rate=44100, bits=16, ring_base=0x200000, ring_size=8192, lead_seconds=0.01 + ) + # Prime the queue so the prebuffer returns immediately (no 2 s block). + smp.push_samples(np.zeros(2048, dtype=np.int16)) + smp.start(prebuffer_timeout=0.1) + try: + # Prefill wrote the ring (NEUTRAL) before gating. + self.assertTrue(api.reu_writes) + # Control register at $DF20 was written with gate+repeat+mode16 (0x13). + gate_writes = [v for a, v in api.mem_writes if a == "DF20"] + self.assertIn("13", gate_writes) + self.assertTrue(smp._running) + finally: + smp.stop() + # Gate-off wrote $DF20 = 00. + self.assertEqual(api.mem_writes[-1], ("DF20", "00")) + self.assertFalse(smp._running) + + def test_get_recent_samples_returns_pushed(self): + smp = _make(_FakeBackend(), sample_rate=44100, bits=16) + smp.push_samples(np.ones(100, dtype=np.int16) * 16384) + recent = smp.get_recent_samples(50) + self.assertEqual(recent.shape, (50,)) + self.assertTrue(np.all(recent > 0.4)) + + def test_set_pre_emphasis_is_noop(self): + # Scene.setup calls this on the audio object regardless of backend. + smp = _make(_FakeBackend(), sample_rate=44100, bits=16) + smp.set_pre_emphasis(0.9) # must not raise + + +# --------------------------------------------------------------------------- +# resolve_audio_backend + validate_sampler_cfg +# --------------------------------------------------------------------------- +class ResolveAudioBackendTest(unittest.TestCase): + def test_auto_picks_sampler_when_available(self): + self.assertEqual( + cfgmod.resolve_audio_backend("auto", supports_sampler=True, sampler_available=True), + "sampler", + ) + + def test_auto_falls_back_to_dac(self): + self.assertEqual( + cfgmod.resolve_audio_backend("auto", supports_sampler=True, sampler_available=False), + "dac", + ) + self.assertEqual( + cfgmod.resolve_audio_backend("auto", supports_sampler=False, sampler_available=False), + "dac", + ) + + def test_dac_is_forced(self): + self.assertEqual( + cfgmod.resolve_audio_backend("dac", supports_sampler=True, sampler_available=True), + "dac", + ) + + def test_explicit_sampler_warns_and_falls_back(self): + with self.assertLogs("c64cast.config", level="WARNING"): + got = cfgmod.resolve_audio_backend( + "sampler", supports_sampler=False, sampler_available=False + ) + self.assertEqual(got, "dac") + + def test_explicit_sampler_succeeds_when_available(self): + self.assertEqual( + cfgmod.resolve_audio_backend("sampler", supports_sampler=True, sampler_available=True), + "sampler", + ) + + +class ValidateSamplerCfgTest(unittest.TestCase): + def _cfg(self, *, bits=16, rate=44100, enabled=True): + cfg = cfgmod.Config() + cfg.audio.enabled = enabled + cfg.audio.sampler_bits = bits + cfg.audio.sampler_sample_rate = rate + return cfg + + def test_valid_passes(self): + cfgmod.validate_sampler_cfg(self._cfg()) # no raise + + def test_bad_bits_rejected(self): + with self.assertRaises(cfgmod.ConfigError): + cfgmod.validate_sampler_cfg(self._cfg(bits=12)) + + def test_out_of_range_rate_rejected(self): + with self.assertRaises(cfgmod.ConfigError): + cfgmod.validate_sampler_cfg(self._cfg(rate=96000)) + with self.assertRaises(cfgmod.ConfigError): + cfgmod.validate_sampler_cfg(self._cfg(rate=10)) + + def test_skipped_when_audio_disabled(self): + # Even an invalid value is ignored when audio is off. + cfgmod.validate_sampler_cfg(self._cfg(bits=99, enabled=False)) + + +# --------------------------------------------------------------------------- +# doctor: availability + provisioning +# --------------------------------------------------------------------------- +class _FakeProfile: + def __init__(self, supports_sampler: bool = True) -> None: + self.supports_sampler = supports_sampler + + +class _FakeRestApi: + """Category-aware fake: read_sampler_config GETs two config sections, so + session.get must return the right one per URL.""" + + def __init__( + self, + *, + present: bool = True, + map_status: str = "Enabled", + vol_l: str = " 0 dB", + vol_r: str = " 0 dB", + supports_sampler: bool = True, + put_error: Exception | None = None, + get_error: Exception | None = None, + ) -> None: + self.base_url = "http://fake" + self.profile = _FakeProfile(supports_sampler) + self.put_calls: list[tuple[str, str, str]] = [] + self._put_error = put_error + cart: dict[str, str] = {} + mixer: dict[str, str] = {} + if present: + cart["Map Ultimate Audio $DF20-DFFF"] = map_status + mixer["Vol Sampler L"] = vol_l + mixer["Vol Sampler R"] = vol_r + self._sections = { + "C64 and Cartridge Settings": cart, + "Audio Mixer": mixer, + } + self.session = mock.MagicMock() + + def _get(url, timeout=3.0): + from urllib.parse import unquote + + if get_error is not None: + raise get_error + cat = unquote(url.split("/v1/configs/")[-1]) + resp = mock.MagicMock() + resp.json.return_value = {cat: self._sections.get(cat, {}), "errors": []} + resp.raise_for_status = mock.MagicMock() + return resp + + self.session.get.side_effect = _get + + def put_config_item( + self, category: str, item: str, value: str, *, timeout: float = 3.0 + ) -> None: + if self._put_error is not None: + raise self._put_error + self.put_calls.append((category, item, value)) + + +def _video_cfg(*, backend="auto", enabled=True, skip_probe=False): + cfg = cfgmod.Config() + cfg.audio.enabled = enabled + cfg.audio.backend = backend + cfg.debug.skip_probe = skip_probe + cfg.scenes = [cfgmod.SceneCfg(type="video", file="x.mp4")] + return cfg + + +class SamplerAvailabilityTest(unittest.TestCase): + def test_available_when_mapped_and_audible(self): + self.assertIs(doctor.sampler_is_available(_FakeRestApi()), True) + + def test_unavailable_when_map_disabled(self): + self.assertIs(doctor.sampler_is_available(_FakeRestApi(map_status="Disabled")), False) + + def test_unavailable_when_muted(self): + self.assertIs(doctor.sampler_is_available(_FakeRestApi(vol_l="OFF", vol_r="OFF")), False) + + def test_audible_when_one_channel_on(self): + self.assertIs(doctor.sampler_is_available(_FakeRestApi(vol_r="OFF")), True) + + def test_unavailable_when_feature_absent(self): + self.assertIs(doctor.sampler_is_available(_FakeRestApi(present=False)), False) + + def test_none_on_query_failure(self): + import requests + + api = _FakeRestApi(get_error=requests.Timeout("read timeout")) + self.assertIsNone(doctor.sampler_is_available(api)) + + +class WantsSamplerTest(unittest.TestCase): + def test_wants_with_auto_and_video(self): + wants, reasons = doctor._wants_sampler(_video_cfg(backend="auto")) + self.assertTrue(wants) + self.assertTrue(reasons) + + def test_wants_with_explicit_sampler(self): + self.assertTrue(doctor._wants_sampler(_video_cfg(backend="sampler"))[0]) + + def test_not_wanted_with_dac(self): + self.assertFalse(doctor._wants_sampler(_video_cfg(backend="dac"))[0]) + + def test_not_wanted_without_audio(self): + self.assertFalse(doctor._wants_sampler(_video_cfg(enabled=False))[0]) + + def test_not_wanted_without_video_scene(self): + cfg = cfgmod.Config() + cfg.audio.enabled = True + cfg.scenes = [cfgmod.SceneCfg(type="waveform", file="t.sid")] + self.assertFalse(doctor._wants_sampler(cfg)[0]) + + +class ProvisionSamplerTest(unittest.TestCase): + def test_noop_when_already_enabled(self): + api = _FakeRestApi(map_status="Enabled", vol_l=" 0 dB", vol_r=" 0 dB") + self.assertIsNone(doctor.provision_sampler(api, _video_cfg())) + self.assertEqual(api.put_calls, []) + + def test_enables_map_when_disabled(self): + api = _FakeRestApi(map_status="Disabled") + restore = doctor.provision_sampler(api, _video_cfg()) + self.assertIsNotNone(restore) + self.assertIn( + ("C64 and Cartridge Settings", "Map Ultimate Audio $DF20-DFFF", "Enabled"), + api.put_calls, + ) + # Restore maps the composite key back to "Disabled". + assert restore is not None + self.assertIn("Disabled", restore.values()) + + def test_unmutes_when_off(self): + api = _FakeRestApi(vol_l="OFF", vol_r="OFF") + restore = doctor.provision_sampler(api, _video_cfg()) + assert restore is not None + unmutes = [c for c in api.put_calls if c[0] == "Audio Mixer"] + self.assertEqual(len(unmutes), 2) + self.assertEqual(list(restore.values()).count("OFF"), 2) + + def test_skipped_on_no_sampler_backend(self): + api = _FakeRestApi(supports_sampler=False, map_status="Disabled") + self.assertIsNone(doctor.provision_sampler(api, _video_cfg())) + self.assertEqual(api.put_calls, []) + + def test_skipped_under_skip_probe(self): + api = _FakeRestApi(map_status="Disabled") + self.assertIsNone(doctor.provision_sampler(api, _video_cfg(skip_probe=True))) + self.assertEqual(api.put_calls, []) + + def test_skipped_when_backend_dac(self): + api = _FakeRestApi(map_status="Disabled") + self.assertIsNone(doctor.provision_sampler(api, _video_cfg(backend="dac"))) + self.assertEqual(api.put_calls, []) + + def test_restore_puts_originals_back(self): + api = _FakeRestApi(map_status="Disabled", vol_l="OFF", vol_r=" 0 dB") + restore = doctor.provision_sampler(api, _video_cfg()) + api.put_calls.clear() + doctor.restore_sampler(api, restore) + # Map restored to Disabled, the muted channel back to OFF. + self.assertIn( + ("C64 and Cartridge Settings", "Map Ultimate Audio $DF20-DFFF", "Disabled"), + api.put_calls, + ) + self.assertIn(("Audio Mixer", "Vol Sampler L", "OFF"), api.put_calls) + + def test_restore_noop_on_none(self): + api = _FakeRestApi() + doctor.restore_sampler(api, None) # must not raise + self.assertEqual(api.put_calls, []) + + +class WantsReuCouplingTest(unittest.TestCase): + """The sampler streams its ring out of REU SDRAM, so a sampler run must + pull the REU into _wants_reu (provisioning + the doctor REU probe).""" + + def test_sampler_makes_wants_reu_true(self): + wants, reasons = doctor._wants_reu(_video_cfg(backend="auto")) + self.assertTrue(wants) + self.assertTrue(any("sampler" in r for r in reasons)) + + def test_dac_video_does_not_want_reu(self): + self.assertFalse(doctor._wants_reu(_video_cfg(backend="dac"))[0]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_wizard.py b/tests/test_wizard.py index 05cd112..82b6d9a 100644 --- a/tests/test_wizard.py +++ b/tests/test_wizard.py @@ -319,6 +319,7 @@ def test_headless_webcam_build_writes_valid_config(self): "Display mode": "petscii", "Scene name": "My Scene", "Enable SID audio": True, + "Video-audio backend": "auto", # audio on → backend prompt fires "advanced": False, "Add overlays": True, "Select overlays": lambda choices: [c for c in choices if c.startswith("clock")], @@ -347,6 +348,7 @@ def test_headless_webcam_build_writes_valid_config(self): self.assertEqual(cfg.scenes[0].display, "petscii") self.assertEqual(cfg.scenes[0].name, "My Scene") self.assertTrue(cfg.audio.enabled) + self.assertEqual(cfg.audio.backend, "auto") self.assertEqual(cfg.ultimate64.system, "PAL") self.assertEqual(cfg.scenes[0].overlays[0]["type"], "clock") self.assertIsNone(wizard.validate(cfg))