Skip to content
1 change: 1 addition & 0 deletions backend/src/lib/audio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from src.lib.audio.speaker_browser import SpeakerBrowser as SpeakerBrowser
from src.lib.audio.vad import VAD as VAD
from src.lib.audio.asr import ASR as ASR
from src.lib.audio.deepgram_asr import DeepgramASR as DeepgramASR
from src.lib.audio.tts import TTS as TTS
from src.lib.audio.tts_fish import FishTTS as FishTTS
from src.lib.audio.sts import STS as STS
Expand Down
45 changes: 44 additions & 1 deletion backend/src/lib/audio/audio_in.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import subprocess
import sys
from typing import Any

import sounddevice as sd
Expand All @@ -13,6 +15,8 @@
from src.core.component import ThreadedComponent, Tag
from src.core.frames import AudioFrame

_USE_PAREC = sys.platform == "linux"


class AudioInConfig(BaseModel):
model_config = ConfigDict(json_schema_extra={"options": {"device": {}}})
Expand All @@ -37,9 +41,42 @@ def __init__(self, config: AudioInConfig = AudioInConfig()) -> None:

@classmethod
def get_options(cls, values: dict[str, Any]) -> dict[str, Any]:
if _USE_PAREC:
from src.lib.audio.devices import list_pulse_sources

return {"config": {"device": list_pulse_sources()}}
return {"config": {"device": list_audio_input_devices()}}

def run(self, inputs: tuple[()], outputs: MicOutputs) -> None:
def _run_parec(self, outputs: MicOutputs) -> None:
device = self.config.device or None
cmd = [
"parec",
"--format=s16le",
f"--rate={self._sample_rate}",
f"--channels={self._channels}",
]
if device:
cmd.append(f"--device={device}")

frame_bytes = self._frame_samples * self._channels * 2
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
try:
while not self.stop_event.is_set():
data = proc.stdout.read(frame_bytes) # type: ignore[union-attr]
if not data:
break
outputs.audio.send(
AudioFrame.new(
data=data,
sample_rate=self._sample_rate,
channels=self._channels,
)
)
finally:
proc.terminate()
proc.wait()

def _run_sounddevice(self, outputs: MicOutputs) -> None:
with sd.InputStream(
samplerate=self._sample_rate,
channels=self._channels,
Expand All @@ -57,3 +94,9 @@ def run(self, inputs: tuple[()], outputs: MicOutputs) -> None:
channels=self._channels,
)
)

def run(self, inputs: tuple[()], outputs: MicOutputs) -> None:
if _USE_PAREC:
self._run_parec(outputs)
else:
self._run_sounddevice(outputs)
48 changes: 47 additions & 1 deletion backend/src/lib/audio/audio_out.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import subprocess
import sys
from typing import Any

import numpy as np
Expand All @@ -14,6 +16,8 @@
from src.core.component import ThreadedComponent, Tag
from src.core.frames import AudioDataFormat

_USE_PAPLAY = sys.platform == "linux"


class AudioOutConfig(BaseModel):
model_config = ConfigDict(json_schema_extra={"options": {"device": {}}})
Expand All @@ -36,9 +40,45 @@ def __init__(self, config: AudioOutConfig = AudioOutConfig()) -> None:

@classmethod
def get_options(cls, values: dict[str, Any]) -> dict[str, Any]:
if _USE_PAPLAY:
from src.lib.audio.devices import list_pulse_sinks

return {"config": {"device": list_pulse_sinks()}}
return {"config": {"device": list_audio_output_devices()}}

def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None:
def _run_paplay(self, inputs: SpeakerInputs) -> None:
device = self.config.device or None
cmd = [
"paplay",
"--raw",
"--format=s16le",
f"--rate={self._sample_rate}",
f"--channels={self._channels}",
]
if device:
cmd.append(f"--device={device}")

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
try:
for frame in inputs.audio:
if frame is None:
break
pcm_bytes = frame.get(
sample_rate=self._sample_rate,
num_channels=self._channels,
data_format=AudioDataFormat.PCM16,
)
try:
proc.stdin.write(pcm_bytes) # type: ignore[union-attr]
except BrokenPipeError:
break
finally:
if proc.stdin:
proc.stdin.close()
proc.terminate()
proc.wait()

def _run_sounddevice(self, inputs: SpeakerInputs) -> None:
with sd.OutputStream(
samplerate=self._sample_rate,
channels=self._channels,
Expand All @@ -59,3 +99,9 @@ def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None:
stream.write(
np.frombuffer(pcm_bytes, dtype=np.int16).reshape(-1, self._channels)
)

def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None:
if _USE_PAPLAY:
self._run_paplay(inputs)
else:
self._run_sounddevice(inputs)
155 changes: 155 additions & 0 deletions backend/src/lib/audio/deepgram_asr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""DeepgramASR — streaming speech-to-text with built-in VAD via Deepgram's WebSocket API.

Replaces the separate VAD + ASR pipeline with a single component.
Send AudioFrames in, get TextFrames out.
"""

from __future__ import annotations

import json
import os
import threading
from typing import NamedTuple

from pydantic import BaseModel

from src.core.channel import Receiver, Sender
from src.core.component import ThreadedComponent, Tag
from src.core.frames import AudioDataFormat, AudioFrame, TextFrame


class DeepgramASRConfig(BaseModel):
api_key_env_var: str = "DEEPGRAM_API_KEY"
model: str = "nova-3"
language: str = "en"
sample_rate: int = 48000
channels: int = 1
interim_results: bool = True
utterance_end_ms: int = 1000
vad_events: bool = True
smart_format: bool = True
diarize: bool = True


class DeepgramASRInputs(NamedTuple):
audio: Receiver[AudioFrame]


class DeepgramASROutputs(NamedTuple):
text: Sender[TextFrame]


class DeepgramASR(ThreadedComponent[DeepgramASRInputs, DeepgramASROutputs]):
tags = Tag(io={"conduit"}, functionality={"audio"})
description = (
"Streaming speech-to-text via **Deepgram** WebSocket API with built-in VAD. "
"Replaces separate VAD + ASR components. Send AudioFrames in, get TextFrames out."
)

def __init__(self, config: DeepgramASRConfig = DeepgramASRConfig()) -> None:
super().__init__()
self.config = config

def run(self, inputs: DeepgramASRInputs, outputs: DeepgramASROutputs) -> None:
import websockets.sync.client as ws_client

api_key = os.getenv(self.config.api_key_env_var, "")
if not api_key:
print(f"[DeepgramASR] {self.config.api_key_env_var} not set — stopping")
return

c = self.config
params = (
f"model={c.model}"
f"&language={c.language}"
f"&sample_rate={c.sample_rate}"
f"&channels={c.channels}"
f"&encoding=linear16"
f"&interim_results={'true' if c.interim_results else 'false'}"
f"&utterance_end_ms={c.utterance_end_ms}"
f"&vad_events={'true' if c.vad_events else 'false'}"
f"&smart_format={'true' if c.smart_format else 'false'}"
f"&diarize={'true' if c.diarize else 'false'}"
)
url = f"wss://api.deepgram.com/v1/listen?{params}"

print(f"[DeepgramASR] Connecting to Deepgram ({c.model})")

try:
ws = ws_client.connect(
url,
additional_headers={"Authorization": f"Token {api_key}"},
)
except Exception as e:
print(f"[DeepgramASR] Connection failed: {e}")
print(f"[DeepgramASR] URL: {url}")
return

print("[DeepgramASR] Connected")

# Receiver thread: read transcription results from Deepgram
def recv_loop() -> None:
try:
for msg in ws:
if self.stop_event.is_set():
break
if isinstance(msg, str):
data = json.loads(msg)
msg_type = data.get("type", "")

if msg_type == "Results":
channel = data.get("channel", {})
alternatives = channel.get("alternatives", [])
if alternatives:
alt = alternatives[0]
transcript = alt.get("transcript", "").strip()
is_final = data.get("is_final", False)
if transcript and is_final:
# Extract speaker from first word's diarization
words = alt.get("words", [])
speaker = (
words[0].get("speaker", None) if words else None
)
if speaker is not None:
text = f"[Speaker {speaker}] {transcript}"
else:
text = transcript
print(f"[DeepgramASR] {text}")
outputs.text.send(TextFrame.new(text=text))

elif msg_type == "UtteranceEnd":
pass # Could be used for turn-taking signals

except Exception as e:
if not self.stop_event.is_set():
print(f"[DeepgramASR] Receive error: {e}")

recv_thread = threading.Thread(target=recv_loop, daemon=True)
recv_thread.start()

# Send audio frames to Deepgram
try:
for frame in inputs.audio:
if frame is None or self.stop_event.is_set():
break
pcm = frame.get(
sample_rate=c.sample_rate,
num_channels=c.channels,
data_format=AudioDataFormat.PCM16,
)
try:
ws.send(pcm)
except Exception:
break
finally:
# Send close message to Deepgram
try:
ws.send(json.dumps({"type": "CloseStream"}))
except Exception:
pass
try:
ws.close()
except Exception:
pass
recv_thread.join(timeout=2.0)
print("[DeepgramASR] Stopped")
36 changes: 36 additions & 0 deletions backend/src/lib/audio/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,42 @@ def list_audio_output_devices(
return list_audio_devices("output", include_default=include_default)


def _list_pulse_devices(kind: str) -> list[AudioDeviceOption]:
"""List PulseAudio sources or sinks via pactl.

Args:
kind: 'sources' for input devices, 'sinks' for output devices.
"""
import subprocess

options: list[AudioDeviceOption] = [{"value": "", "label": "System Default"}]
try:
result = subprocess.run(
["pactl", "list", "short", kind],
capture_output=True,
text=True,
timeout=5,
)
for line in result.stdout.strip().splitlines():
parts = line.split("\t")
if len(parts) >= 2:
name = parts[1]
options.append({"value": name, "label": name})
except Exception:
pass
return options


def list_pulse_sources() -> list[AudioDeviceOption]:
"""List PulseAudio/PipeWire input sources (including virtual monitors)."""
return _list_pulse_devices("sources")


def list_pulse_sinks() -> list[AudioDeviceOption]:
"""List PulseAudio/PipeWire output sinks (including virtual sinks)."""
return _list_pulse_devices("sinks")


def coerce_sounddevice_device(value: str | None) -> int | str | None:
if value is None:
return None
Expand Down
2 changes: 2 additions & 0 deletions backend/src/lib/llm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from src.lib.llm.llm import LLM as LLM
from src.lib.llm.agent_state import AgentState as AgentState
from src.lib.llm.agent_loop import AgentLoop as AgentLoop
from src.lib.llm.memory import Mem0 as Mem0
from src.lib.llm.character_card import CharacterCard as CharacterCard
from src.lib.llm.messages_to_text import MessagesToText as MessagesToText
Expand All @@ -8,3 +9,4 @@
from src.lib.llm.talk_tool import TalkTool as TalkTool
from src.lib.llm.think_tool import ThinkTool as ThinkTool
from src.lib.llm.movement_tool import MovementTool as MovementTool
from src.lib.llm.scratchpad_tool import ScratchpadTool as ScratchpadTool
Loading
Loading