diff --git a/backend/src/lib/audio/__init__.py b/backend/src/lib/audio/__init__.py index 2666eb2..ace6daf 100644 --- a/backend/src/lib/audio/__init__.py +++ b/backend/src/lib/audio/__init__.py @@ -8,6 +8,7 @@ from src.lib.audio.speaker_browser import SpeakerBrowser as SpeakerBrowser from src.lib.audio.vad import VAD as VAD from src.lib.audio.asr import ASR as ASR +from src.lib.audio.deepgram_asr import DeepgramASR as DeepgramASR from src.lib.audio.tts import TTS as TTS from src.lib.audio.tts_fish import FishTTS as FishTTS from src.lib.audio.sts import STS as STS diff --git a/backend/src/lib/audio/audio_in.py b/backend/src/lib/audio/audio_in.py index 61622a6..fc17107 100644 --- a/backend/src/lib/audio/audio_in.py +++ b/backend/src/lib/audio/audio_in.py @@ -1,5 +1,7 @@ from __future__ import annotations +import subprocess +import sys from typing import Any import sounddevice as sd @@ -13,6 +15,8 @@ from src.core.component import ThreadedComponent, Tag from src.core.frames import AudioFrame +_USE_PAREC = sys.platform == "linux" + class AudioInConfig(BaseModel): model_config = ConfigDict(json_schema_extra={"options": {"device": {}}}) @@ -37,9 +41,42 @@ def __init__(self, config: AudioInConfig = AudioInConfig()) -> None: @classmethod def get_options(cls, values: dict[str, Any]) -> dict[str, Any]: + if _USE_PAREC: + from src.lib.audio.devices import list_pulse_sources + + return {"config": {"device": list_pulse_sources()}} return {"config": {"device": list_audio_input_devices()}} - def run(self, inputs: tuple[()], outputs: MicOutputs) -> None: + def _run_parec(self, outputs: MicOutputs) -> None: + device = self.config.device or None + cmd = [ + "parec", + "--format=s16le", + f"--rate={self._sample_rate}", + f"--channels={self._channels}", + ] + if device: + cmd.append(f"--device={device}") + + frame_bytes = self._frame_samples * self._channels * 2 + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) + try: + while not self.stop_event.is_set(): + data = proc.stdout.read(frame_bytes) # type: ignore[union-attr] + if not data: + break + outputs.audio.send( + AudioFrame.new( + data=data, + sample_rate=self._sample_rate, + channels=self._channels, + ) + ) + finally: + proc.terminate() + proc.wait() + + def _run_sounddevice(self, outputs: MicOutputs) -> None: with sd.InputStream( samplerate=self._sample_rate, channels=self._channels, @@ -57,3 +94,9 @@ def run(self, inputs: tuple[()], outputs: MicOutputs) -> None: channels=self._channels, ) ) + + def run(self, inputs: tuple[()], outputs: MicOutputs) -> None: + if _USE_PAREC: + self._run_parec(outputs) + else: + self._run_sounddevice(outputs) diff --git a/backend/src/lib/audio/audio_out.py b/backend/src/lib/audio/audio_out.py index 13860a2..428742e 100644 --- a/backend/src/lib/audio/audio_out.py +++ b/backend/src/lib/audio/audio_out.py @@ -1,5 +1,7 @@ from __future__ import annotations +import subprocess +import sys from typing import Any import numpy as np @@ -14,6 +16,8 @@ from src.core.component import ThreadedComponent, Tag from src.core.frames import AudioDataFormat +_USE_PAPLAY = sys.platform == "linux" + class AudioOutConfig(BaseModel): model_config = ConfigDict(json_schema_extra={"options": {"device": {}}}) @@ -36,9 +40,45 @@ def __init__(self, config: AudioOutConfig = AudioOutConfig()) -> None: @classmethod def get_options(cls, values: dict[str, Any]) -> dict[str, Any]: + if _USE_PAPLAY: + from src.lib.audio.devices import list_pulse_sinks + + return {"config": {"device": list_pulse_sinks()}} return {"config": {"device": list_audio_output_devices()}} - def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None: + def _run_paplay(self, inputs: SpeakerInputs) -> None: + device = self.config.device or None + cmd = [ + "paplay", + "--raw", + "--format=s16le", + f"--rate={self._sample_rate}", + f"--channels={self._channels}", + ] + if device: + cmd.append(f"--device={device}") + + proc = subprocess.Popen(cmd, stdin=subprocess.PIPE) + try: + for frame in inputs.audio: + if frame is None: + break + pcm_bytes = frame.get( + sample_rate=self._sample_rate, + num_channels=self._channels, + data_format=AudioDataFormat.PCM16, + ) + try: + proc.stdin.write(pcm_bytes) # type: ignore[union-attr] + except BrokenPipeError: + break + finally: + if proc.stdin: + proc.stdin.close() + proc.terminate() + proc.wait() + + def _run_sounddevice(self, inputs: SpeakerInputs) -> None: with sd.OutputStream( samplerate=self._sample_rate, channels=self._channels, @@ -59,3 +99,9 @@ def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None: stream.write( np.frombuffer(pcm_bytes, dtype=np.int16).reshape(-1, self._channels) ) + + def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None: + if _USE_PAPLAY: + self._run_paplay(inputs) + else: + self._run_sounddevice(inputs) diff --git a/backend/src/lib/audio/deepgram_asr.py b/backend/src/lib/audio/deepgram_asr.py new file mode 100644 index 0000000..e1e3a08 --- /dev/null +++ b/backend/src/lib/audio/deepgram_asr.py @@ -0,0 +1,155 @@ +"""DeepgramASR — streaming speech-to-text with built-in VAD via Deepgram's WebSocket API. + +Replaces the separate VAD + ASR pipeline with a single component. +Send AudioFrames in, get TextFrames out. +""" + +from __future__ import annotations + +import json +import os +import threading +from typing import NamedTuple + +from pydantic import BaseModel + +from src.core.channel import Receiver, Sender +from src.core.component import ThreadedComponent, Tag +from src.core.frames import AudioDataFormat, AudioFrame, TextFrame + + +class DeepgramASRConfig(BaseModel): + api_key_env_var: str = "DEEPGRAM_API_KEY" + model: str = "nova-3" + language: str = "en" + sample_rate: int = 48000 + channels: int = 1 + interim_results: bool = True + utterance_end_ms: int = 1000 + vad_events: bool = True + smart_format: bool = True + diarize: bool = True + + +class DeepgramASRInputs(NamedTuple): + audio: Receiver[AudioFrame] + + +class DeepgramASROutputs(NamedTuple): + text: Sender[TextFrame] + + +class DeepgramASR(ThreadedComponent[DeepgramASRInputs, DeepgramASROutputs]): + tags = Tag(io={"conduit"}, functionality={"audio"}) + description = ( + "Streaming speech-to-text via **Deepgram** WebSocket API with built-in VAD. " + "Replaces separate VAD + ASR components. Send AudioFrames in, get TextFrames out." + ) + + def __init__(self, config: DeepgramASRConfig = DeepgramASRConfig()) -> None: + super().__init__() + self.config = config + + def run(self, inputs: DeepgramASRInputs, outputs: DeepgramASROutputs) -> None: + import websockets.sync.client as ws_client + + api_key = os.getenv(self.config.api_key_env_var, "") + if not api_key: + print(f"[DeepgramASR] {self.config.api_key_env_var} not set — stopping") + return + + c = self.config + params = ( + f"model={c.model}" + f"&language={c.language}" + f"&sample_rate={c.sample_rate}" + f"&channels={c.channels}" + f"&encoding=linear16" + f"&interim_results={'true' if c.interim_results else 'false'}" + f"&utterance_end_ms={c.utterance_end_ms}" + f"&vad_events={'true' if c.vad_events else 'false'}" + f"&smart_format={'true' if c.smart_format else 'false'}" + f"&diarize={'true' if c.diarize else 'false'}" + ) + url = f"wss://api.deepgram.com/v1/listen?{params}" + + print(f"[DeepgramASR] Connecting to Deepgram ({c.model})") + + try: + ws = ws_client.connect( + url, + additional_headers={"Authorization": f"Token {api_key}"}, + ) + except Exception as e: + print(f"[DeepgramASR] Connection failed: {e}") + print(f"[DeepgramASR] URL: {url}") + return + + print("[DeepgramASR] Connected") + + # Receiver thread: read transcription results from Deepgram + def recv_loop() -> None: + try: + for msg in ws: + if self.stop_event.is_set(): + break + if isinstance(msg, str): + data = json.loads(msg) + msg_type = data.get("type", "") + + if msg_type == "Results": + channel = data.get("channel", {}) + alternatives = channel.get("alternatives", []) + if alternatives: + alt = alternatives[0] + transcript = alt.get("transcript", "").strip() + is_final = data.get("is_final", False) + if transcript and is_final: + # Extract speaker from first word's diarization + words = alt.get("words", []) + speaker = ( + words[0].get("speaker", None) if words else None + ) + if speaker is not None: + text = f"[Speaker {speaker}] {transcript}" + else: + text = transcript + print(f"[DeepgramASR] {text}") + outputs.text.send(TextFrame.new(text=text)) + + elif msg_type == "UtteranceEnd": + pass # Could be used for turn-taking signals + + except Exception as e: + if not self.stop_event.is_set(): + print(f"[DeepgramASR] Receive error: {e}") + + recv_thread = threading.Thread(target=recv_loop, daemon=True) + recv_thread.start() + + # Send audio frames to Deepgram + try: + for frame in inputs.audio: + if frame is None or self.stop_event.is_set(): + break + pcm = frame.get( + sample_rate=c.sample_rate, + num_channels=c.channels, + data_format=AudioDataFormat.PCM16, + ) + try: + ws.send(pcm) + except Exception: + break + finally: + # Send close message to Deepgram + try: + ws.send(json.dumps({"type": "CloseStream"})) + except Exception: + pass + try: + ws.close() + except Exception: + pass + recv_thread.join(timeout=2.0) + print("[DeepgramASR] Stopped") diff --git a/backend/src/lib/audio/devices.py b/backend/src/lib/audio/devices.py index 1add611..3a90236 100644 --- a/backend/src/lib/audio/devices.py +++ b/backend/src/lib/audio/devices.py @@ -106,6 +106,42 @@ def list_audio_output_devices( return list_audio_devices("output", include_default=include_default) +def _list_pulse_devices(kind: str) -> list[AudioDeviceOption]: + """List PulseAudio sources or sinks via pactl. + + Args: + kind: 'sources' for input devices, 'sinks' for output devices. + """ + import subprocess + + options: list[AudioDeviceOption] = [{"value": "", "label": "System Default"}] + try: + result = subprocess.run( + ["pactl", "list", "short", kind], + capture_output=True, + text=True, + timeout=5, + ) + for line in result.stdout.strip().splitlines(): + parts = line.split("\t") + if len(parts) >= 2: + name = parts[1] + options.append({"value": name, "label": name}) + except Exception: + pass + return options + + +def list_pulse_sources() -> list[AudioDeviceOption]: + """List PulseAudio/PipeWire input sources (including virtual monitors).""" + return _list_pulse_devices("sources") + + +def list_pulse_sinks() -> list[AudioDeviceOption]: + """List PulseAudio/PipeWire output sinks (including virtual sinks).""" + return _list_pulse_devices("sinks") + + def coerce_sounddevice_device(value: str | None) -> int | str | None: if value is None: return None diff --git a/backend/src/lib/llm/__init__.py b/backend/src/lib/llm/__init__.py index d9d96b2..24585ef 100644 --- a/backend/src/lib/llm/__init__.py +++ b/backend/src/lib/llm/__init__.py @@ -1,5 +1,6 @@ from src.lib.llm.llm import LLM as LLM from src.lib.llm.agent_state import AgentState as AgentState +from src.lib.llm.agent_loop import AgentLoop as AgentLoop from src.lib.llm.memory import Mem0 as Mem0 from src.lib.llm.character_card import CharacterCard as CharacterCard from src.lib.llm.messages_to_text import MessagesToText as MessagesToText @@ -8,3 +9,4 @@ from src.lib.llm.talk_tool import TalkTool as TalkTool from src.lib.llm.think_tool import ThinkTool as ThinkTool from src.lib.llm.movement_tool import MovementTool as MovementTool +from src.lib.llm.scratchpad_tool import ScratchpadTool as ScratchpadTool diff --git a/backend/src/lib/llm/agent_loop.py b/backend/src/lib/llm/agent_loop.py new file mode 100644 index 0000000..64ceef5 --- /dev/null +++ b/backend/src/lib/llm/agent_loop.py @@ -0,0 +1,397 @@ +from __future__ import annotations + +import base64 +import io +import json +import math +from datetime import datetime +from typing import Any, NamedTuple + +from PIL import Image + +from openai import OpenAI +from pydantic import BaseModel + +from src.core.channel import Receiver, Sender +from src.core.component import ThreadedComponent, Tag +from src.core.frames import ( + BodyPoseFrame, + EOS, + MessageFrame, + ObjectLocationFrame, + TextFrame, + ToolCall, + ToolDef, + ToolResult, + VideoFrame, + VideoDataFormat, +) +from src.core.utils import drain + + +_DEFAULT_SYSTEM_PROMPT = """\ +You are an embodied AI agent operating inside a real-time multimodal pipeline. + +You exist in a physical or virtual environment. You can see through a camera, \ +hear through a microphone, and act through tools (speaking, moving, etc.). + +# Vision +Each turn, you may receive an image — this is your current view of the world. \ +Use it to ground your responses in what you actually see. \ +Do not hallucinate objects or spatial relationships that are not visible. + +# Speech +User speech arrives as transcribed text with timestamps and speaker diarization. There could be more than one speaker.\ +You should use your vision to help you ground who actually said the latest message from the user. \ +Respond naturally and conversationally. Keep responses brief unless asked to elaborate. + +# Tools +You have access to tools for interacting with the environment. \ +Use them when the user asks you to act, not just describe. \ +When you call a tool, wait for the result before continuing. + +# Guidelines +- Be proactive. Do things when you are along. Think constantly but only talk when the latest user message hasn't been responded to.\ +- If you see something relevant to the conversation, mention it naturally. +- Do not repeat yourself or narrate your own actions unless asked. +""" + +_DEFAULT_POST_PROMPT = "Do nothing unless the user has talked and you haven't replied, " + + +class AgentLoopConfig(BaseModel): + model: str = "gpt-4.1" + system_prompt: str = _DEFAULT_SYSTEM_PROMPT + post_prompt: str = _DEFAULT_POST_PROMPT + + +class AgentLoopInputs(NamedTuple): + initial_msgs: Receiver[list[MessageFrame]] | None = None + speech: Receiver[TextFrame] | None = None + tool_result: Receiver[ToolResult] | None = None + video: Receiver[VideoFrame] | None = None + pose: Receiver[BodyPoseFrame] | None = None + objects: Receiver[ObjectLocationFrame] | None = None + memory: Receiver[TextFrame] | None = None + tools: Receiver[ToolDef] | None = None + + +class AgentLoopOutputs(NamedTuple): + token: Sender[TextFrame | EOS] + text: Sender[TextFrame] | None = None + tool_calls: Sender[ToolCall] | None = None + eos: Sender[EOS] | None = None + + +class AgentLoop(ThreadedComponent[AgentLoopInputs, AgentLoopOutputs]): + """Agentic loop: manages conversation state and calls the LLM directly. + + Uses the OpenAI Responses API with streaming. Drains speech and other + inputs each iteration, builds the message context, calls the model, + and streams tokens + tool calls to outputs. + """ + + description = ( + "Self-contained **agentic loop** with built-in LLM. " + "Manages conversation history, drains multimodal inputs, " + "and streams tokens and tool calls using the OpenAI Responses API." + ) + tags = Tag(io={"conduit"}, functionality={"llm"}) + + _DIARY_TOOL: dict[str, Any] = { + "type": "function", + "name": "diary", + "description": ( + "Write a diary entry to track your mental state. " + "Record what you see, where you are, what you're doing, " + "how you feel, your current goal, and any observations. " + "Call this every few turns to maintain self-awareness. " + "Your diary entries stay in your history so you can always " + "look back at what you were thinking." + ), + "parameters": { + "type": "object", + "properties": { + "entry": { + "type": "string", + "description": "A natural language snapshot of your current mental state.", + }, + }, + "required": ["entry"], + }, + } + + def __init__(self, config: AgentLoopConfig) -> None: + super().__init__() + self.config = config + self._client = OpenAI() + self._input: list[dict[str, Any]] = [] + self._tool_defs: list[dict[str, Any]] = [] + + # -- Helpers -- + + @staticmethod + def _heading_from_quat(w: float, x: float, y: float, z: float) -> float: + fwd_x = 2 * (x * z + w * y) + fwd_z = 1 - 2 * (x * x + y * y) + return -math.degrees(math.atan2(fwd_x, fwd_z)) + + def _append_msg(self, role: str, content: str) -> None: + self._input.append({"role": role, "content": content}) + preview = content[:120] + print(f" [{role}] {preview}") + + @staticmethod + def _encode_frame(frame: VideoFrame) -> str: + """Encode a VideoFrame as a base64 JPEG data URL.""" + img = Image.fromarray(frame.get(VideoDataFormat.RGB)) + buf = io.BytesIO() + img.save(buf, format="JPEG", quality=80) + b64 = base64.b64encode(buf.getvalue()).decode("ascii") + return f"data:image/jpeg;base64,{b64}" + + # -- Main loop -- + + def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: + print("[AgentLoop] Starting") + + self._input = [] + + # Read initial prompts once (e.g. CharacterCard) + if inputs.initial_msgs is not None: + frame = next(inputs.initial_msgs) + if frame is not None: + for m in frame: + self._append_msg(m.role, m.content or "") + + # Collect tool definitions once + if inputs.tools is not None: + inputs.tools.blocking = False + for td in inputs.tools: + if td is None: + break + self._tool_defs.append( + { + "type": "function", + "name": td.name, + "description": td.description, + "parameters": td.parameters, + **({"strict": td.strict} if td.strict is not None else {}), + } + ) + # Always include the built-in diary tool + self._tool_defs.append(self._DIARY_TOOL) + print(f"[AgentLoop] Tools: {[t['name'] for t in self._tool_defs]}") + + # Configure receiver modes + if inputs.speech: + inputs.speech.blocking = False + if inputs.memory: + inputs.memory.blocking = False + if inputs.video is not None: + inputs.video.newest = True + inputs.video.blocking = False + if inputs.objects is not None: + inputs.objects.newest = True + inputs.objects.blocking = False + if inputs.pose is not None: + inputs.pose.newest = True + inputs.pose.blocking = False + + while not self.stop_event.is_set(): + # Drain any new speech into history + for (speech,) in drain(inputs.speech): + if speech is not None: + ts = datetime.fromtimestamp(speech.pts / 1e9).strftime("%H:%M:%S") + self._append_msg("user", f"[{ts}] {speech.text}") + + # Build transient context (not persisted in history) + transient: list[dict[str, Any]] = [] + + # Latest video frame — what the agent currently sees + if inputs.video is not None: + vf = next(inputs.video, None) + if vf is not None: + transient.append( + { + "role": "user", + "content": [ + { + "type": "input_image", + "image_url": self._encode_frame(vf), + }, + { + "type": "input_text", + "text": "[This is what you currently see]", + }, + ], + } + ) + + if inputs.objects is not None: + obj = next(inputs.objects, None) + if obj is not None and len(obj.labels) > 0: + lines = [ + f' "{obj.labels[i]}" at ({obj.positions[i][0]:.2f}, {obj.positions[i][1]:.2f}, {obj.positions[i][2]:.2f})' + for i in range(len(obj.labels)) + ] + transient.append( + { + "role": "system", + "content": "[Currently visible objects]\n" + + "\n".join(lines), + } + ) + + if inputs.pose is not None: + pose = next(inputs.pose, None) + if pose is not None: + poses = pose.get() + waist = poses.get("waist") + head = poses.get("head") + print(f"[AgentLoop] waist={waist} head={head}") + if waist is not None: + body_heading = self._heading_from_quat( + waist.rot_w, waist.rot_x, waist.rot_y, waist.rot_z + ) + lines = [ + f"[Position (y-up): x={waist.pos_x:.2f}, y={waist.pos_y:.2f}, z={waist.pos_z:.2f}]", + f"[Body heading (from +Z clockwise): {body_heading:.0f}°]", + ] + if head is not None: + look_heading = self._heading_from_quat( + head.rot_w, head.rot_x, head.rot_y, head.rot_z + ) + lines.append(f"[Look heading (from +Z clockwise): {look_heading:.0f}°]") + pose_content = "\n".join(lines) + print(f"[AgentLoop] Pose context: {pose_content}") + transient.append( + { + "role": "system", + "content": pose_content, + } + ) + + if self.config.post_prompt: + transient.append({"role": "system", "content": self.config.post_prompt}) + + # Call LLM + api_input = self._input + transient + self._call_llm(api_input, inputs, outputs) + + print("[AgentLoop] Stopped") + + def _call_llm( + self, + api_input: list[dict[str, Any]], + inputs: AgentLoopInputs, + outputs: AgentLoopOutputs, + ) -> None: + """Stream a single LLM response, emit tokens/tool calls, handle tool loop.""" + kwargs: dict[str, Any] = { + "model": self.config.model, + "instructions": self.config.system_prompt, + "input": api_input, + "stream": True, + "reasoning": {"effort": "low"}, + } + if self._tool_defs: + kwargs["tools"] = self._tool_defs + kwargs["tool_choice"] = "auto" + + stream = self._client.responses.create(**kwargs) + + full_text = "" + tool_calls: list[dict[str, str]] = [] + + for event in stream: + if self.stop_event.is_set(): + break + + if event.type == "response.output_text.delta": + full_text += event.delta + outputs.token.send(TextFrame.new(text=event.delta)) + + elif event.type == "response.function_call_arguments.delta": + # Accumulate function call arguments + if not tool_calls or tool_calls[-1].get("_done"): + tool_calls.append({"call_id": "", "name": "", "arguments": ""}) + tool_calls[-1]["arguments"] += event.delta + + elif event.type == "response.output_item.added": + if hasattr(event, "item") and event.item.type == "function_call": + tool_calls.append( + { + "call_id": event.item.call_id, + "name": event.item.name, + "arguments": "", + } + ) + + elif event.type == "response.output_item.done": + if ( + hasattr(event, "item") + and event.item.type == "function_call" + and tool_calls + ): + tool_calls[-1]["call_id"] = event.item.call_id + tool_calls[-1]["name"] = event.item.name + tool_calls[-1]["arguments"] = event.item.arguments + tool_calls[-1]["_done"] = True # type: ignore[assignment] + + # Emit completed text + if full_text: + self._append_msg("assistant", full_text) + if outputs.text is not None: + outputs.text.send(TextFrame.new(text=full_text)) + + # Process tool calls + for tc in tool_calls: + self._input.append( + { + "type": "function_call", + "call_id": tc["call_id"], + "name": tc["name"], + "arguments": tc["arguments"], + } + ) + + if tc["name"] == "diary": + # Built-in: handle internally + try: + entry = json.loads(tc["arguments"]).get("entry", "") + except (json.JSONDecodeError, KeyError): + entry = tc["arguments"] + print(f"[AgentLoop] Diary: {entry[:120]}") + self._input.append( + { + "type": "function_call_output", + "call_id": tc["call_id"], + "output": "ok", + } + ) + else: + # External tool: emit and wait for result + if outputs.tool_calls is not None: + outputs.tool_calls.send( + ToolCall.new( + call_id=tc["call_id"], + name=tc["name"], + arguments=tc["arguments"], + ) + ) + if inputs.tool_result is not None: + result = next(inputs.tool_result, None) + if result is not None: + self._input.append( + { + "type": "function_call_output", + "call_id": result.call_id, + "output": result.content, + } + ) + + # EOS + outputs.token.send(EOS.END) + if outputs.eos is not None: + outputs.eos.send(EOS.END) diff --git a/backend/src/lib/llm/scratchpad_tool.py b/backend/src/lib/llm/scratchpad_tool.py new file mode 100644 index 0000000..ee83057 --- /dev/null +++ b/backend/src/lib/llm/scratchpad_tool.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import json +from typing import NamedTuple + +from src.core.channel import Receiver, Sender +from src.core.component import ThreadedComponent, Tag +from src.core.frames import ToolCall, ToolDef, ToolResult + + +class ScratchpadToolInputs(NamedTuple): + tool_call: Receiver[ToolCall] + + +class ScratchpadToolOutputs(NamedTuple): + tool_def: Sender[ToolDef] + tool_result: Sender[ToolResult] + + +class ScratchpadTool(ThreadedComponent[ScratchpadToolInputs, ScratchpadToolOutputs]): + description = "Persistent scratchpad the agent can **read** and **update** to maintain working notes across turns." + tags = Tag(io={"conduit"}, functionality={"llm"}) + + def __init__(self) -> None: + super().__init__() + self._content: str = "" + + def setup(self, outputs: ScratchpadToolOutputs) -> None: + outputs.tool_def.send( + ToolDef.new( + name="scratchpad_update", + description="Overwrite the scratchpad with new content. Use this to keep track of plans, observations, or any working notes you want to persist across turns.", + parameters={ + "type": "object", + "properties": { + "content": { + "type": "string", + "description": "The new scratchpad content (replaces everything).", + }, + }, + "required": ["content"], + }, + ) + ) + outputs.tool_def.send( + ToolDef.new( + name="scratchpad_read", + description="Read the current scratchpad content.", + parameters={ + "type": "object", + "properties": {}, + }, + ) + ) + + def run(self, inputs: ScratchpadToolInputs, outputs: ScratchpadToolOutputs) -> None: + for call in inputs.tool_call: + if call is None: + break + + if call.name == "scratchpad_update": + try: + args = json.loads(call.arguments) + self._content = args.get("content", "") + except (json.JSONDecodeError, AttributeError): + self._content = call.arguments + print(f"[Scratchpad] Updated: {self._content[:80]}") + outputs.tool_result.send( + ToolResult.new(call_id=call.call_id, content="ok") + ) + + elif call.name == "scratchpad_read": + print(f"[Scratchpad] Read: {self._content[:80]}") + outputs.tool_result.send( + ToolResult.new( + call_id=call.call_id, + content=self._content or "(empty)", + ) + ) diff --git a/docs/user-manual/vrchat-audio-routing.mdx b/docs/user-manual/vrchat-audio-routing.mdx index 524153f..2c33b67 100644 --- a/docs/user-manual/vrchat-audio-routing.mdx +++ b/docs/user-manual/vrchat-audio-routing.mdx @@ -106,6 +106,59 @@ Point both remote components at the game machine's bridge: - `channels`: `1` - `frame_ms`: `20` +## Linux Local Setup + +On Linux (e.g. Linux Mint, Ubuntu), use PulseAudio/PipeWire virtual sinks +instead of VB-Cable. AudioIn and AudioOut automatically use `parec`/`paplay` +on Linux, which can see PulseAudio virtual devices that sounddevice cannot. + +### Create Virtual Cables + +```bash +pactl load-module module-null-sink sink_name=cable_a sink_properties=device.description="Cable-A" +pactl load-module module-null-sink sink_name=cable_b sink_properties=device.description="Cable-B" +``` + +These are temporary (lost on reboot). To make them persistent, add the commands +to `~/.config/pulse/default.pa`. + +### Signal Flow + +```text +AudioOut -> cable_a (sink) -> cable_a.monitor (source) -> VRChat microphone +VRChat audio output -> cable_b (sink) -> cable_b.monitor (source) -> AudioIn +``` + +### Setup + +1. Route VRChat's output to `cable_b`: open `pavucontrol`, go to the + **Playback** tab, and change VRChat's output to **Cable-B**. Alternatively, + set `cable_b` as the system default sink before launching VRChat: + ```bash + pactl set-default-sink cable_b + ``` + +2. Route VRChat's mic input to `cable_a.monitor`: open `pavucontrol`, go to + the **Recording** tab, and change VRChat's input source to + **Monitor of Cable-A**. + +3. In OpenNeuro, configure the components: + - `AudioIn` + - `device`: `cable_b.monitor` + - `sample_rate`: `48000` + - `channels`: `2` + - `AudioOut` + - `device`: `cable_a` + - `sample_rate`: `48000` + - `channels`: `1` + +### Optional: Hear Game Audio Locally + +If you still want to hear VRChat through your headset while routing to +OpenNeuro, enable monitoring on `cable_b` in `pavucontrol`: go to the +**Output Devices** tab, find **Cable-B**, click the lock icon, and enable +**Monitor of Cable-B** routed to your real audio device. + ## Recommended Graphs For voice-to-voice VRChat flows: