From b575a23c2144573021d2aad0d80626501b28290e Mon Sep 17 00:00:00 2001 From: xingjianll <4396kevinliu@gmail.com> Date: Mon, 6 Apr 2026 00:52:08 -0400 Subject: [PATCH 1/8] feat: Linux PulseAudio support for AudioIn/AudioOut On Linux, use parec/paplay subprocess to access PulseAudio/PipeWire virtual devices (e.g. cable_a.monitor, cable_b) that sounddevice can't see through ALSA. Device dropdown lists PulseAudio sources/sinks via pactl. Falls back to sounddevice on macOS/Windows. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/lib/audio/audio_in.py | 45 +++++++++++++++++++++++++++- backend/src/lib/audio/audio_out.py | 48 +++++++++++++++++++++++++++++- backend/src/lib/audio/devices.py | 36 ++++++++++++++++++++++ 3 files changed, 127 insertions(+), 2 deletions(-) diff --git a/backend/src/lib/audio/audio_in.py b/backend/src/lib/audio/audio_in.py index 61622a6..fc17107 100644 --- a/backend/src/lib/audio/audio_in.py +++ b/backend/src/lib/audio/audio_in.py @@ -1,5 +1,7 @@ from __future__ import annotations +import subprocess +import sys from typing import Any import sounddevice as sd @@ -13,6 +15,8 @@ from src.core.component import ThreadedComponent, Tag from src.core.frames import AudioFrame +_USE_PAREC = sys.platform == "linux" + class AudioInConfig(BaseModel): model_config = ConfigDict(json_schema_extra={"options": {"device": {}}}) @@ -37,9 +41,42 @@ def __init__(self, config: AudioInConfig = AudioInConfig()) -> None: @classmethod def get_options(cls, values: dict[str, Any]) -> dict[str, Any]: + if _USE_PAREC: + from src.lib.audio.devices import list_pulse_sources + + return {"config": {"device": list_pulse_sources()}} return {"config": {"device": list_audio_input_devices()}} - def run(self, inputs: tuple[()], outputs: MicOutputs) -> None: + def _run_parec(self, outputs: MicOutputs) -> None: + device = self.config.device or None + cmd = [ + "parec", + "--format=s16le", + f"--rate={self._sample_rate}", + f"--channels={self._channels}", + ] + if device: + cmd.append(f"--device={device}") + + frame_bytes = self._frame_samples * self._channels * 2 + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) + try: + while not self.stop_event.is_set(): + data = proc.stdout.read(frame_bytes) # type: ignore[union-attr] + if not data: + break + outputs.audio.send( + AudioFrame.new( + data=data, + sample_rate=self._sample_rate, + channels=self._channels, + ) + ) + finally: + proc.terminate() + proc.wait() + + def _run_sounddevice(self, outputs: MicOutputs) -> None: with sd.InputStream( samplerate=self._sample_rate, channels=self._channels, @@ -57,3 +94,9 @@ def run(self, inputs: tuple[()], outputs: MicOutputs) -> None: channels=self._channels, ) ) + + def run(self, inputs: tuple[()], outputs: MicOutputs) -> None: + if _USE_PAREC: + self._run_parec(outputs) + else: + self._run_sounddevice(outputs) diff --git a/backend/src/lib/audio/audio_out.py b/backend/src/lib/audio/audio_out.py index 13860a2..428742e 100644 --- a/backend/src/lib/audio/audio_out.py +++ b/backend/src/lib/audio/audio_out.py @@ -1,5 +1,7 @@ from __future__ import annotations +import subprocess +import sys from typing import Any import numpy as np @@ -14,6 +16,8 @@ from src.core.component import ThreadedComponent, Tag from src.core.frames import AudioDataFormat +_USE_PAPLAY = sys.platform == "linux" + class AudioOutConfig(BaseModel): model_config = ConfigDict(json_schema_extra={"options": {"device": {}}}) @@ -36,9 +40,45 @@ def __init__(self, config: AudioOutConfig = AudioOutConfig()) -> None: @classmethod def get_options(cls, values: dict[str, Any]) -> dict[str, Any]: + if _USE_PAPLAY: + from src.lib.audio.devices import list_pulse_sinks + + return {"config": {"device": list_pulse_sinks()}} return {"config": {"device": list_audio_output_devices()}} - def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None: + def _run_paplay(self, inputs: SpeakerInputs) -> None: + device = self.config.device or None + cmd = [ + "paplay", + "--raw", + "--format=s16le", + f"--rate={self._sample_rate}", + f"--channels={self._channels}", + ] + if device: + cmd.append(f"--device={device}") + + proc = subprocess.Popen(cmd, stdin=subprocess.PIPE) + try: + for frame in inputs.audio: + if frame is None: + break + pcm_bytes = frame.get( + sample_rate=self._sample_rate, + num_channels=self._channels, + data_format=AudioDataFormat.PCM16, + ) + try: + proc.stdin.write(pcm_bytes) # type: ignore[union-attr] + except BrokenPipeError: + break + finally: + if proc.stdin: + proc.stdin.close() + proc.terminate() + proc.wait() + + def _run_sounddevice(self, inputs: SpeakerInputs) -> None: with sd.OutputStream( samplerate=self._sample_rate, channels=self._channels, @@ -59,3 +99,9 @@ def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None: stream.write( np.frombuffer(pcm_bytes, dtype=np.int16).reshape(-1, self._channels) ) + + def run(self, inputs: SpeakerInputs, outputs: tuple[()]) -> None: + if _USE_PAPLAY: + self._run_paplay(inputs) + else: + self._run_sounddevice(inputs) diff --git a/backend/src/lib/audio/devices.py b/backend/src/lib/audio/devices.py index 1add611..3a90236 100644 --- a/backend/src/lib/audio/devices.py +++ b/backend/src/lib/audio/devices.py @@ -106,6 +106,42 @@ def list_audio_output_devices( return list_audio_devices("output", include_default=include_default) +def _list_pulse_devices(kind: str) -> list[AudioDeviceOption]: + """List PulseAudio sources or sinks via pactl. + + Args: + kind: 'sources' for input devices, 'sinks' for output devices. + """ + import subprocess + + options: list[AudioDeviceOption] = [{"value": "", "label": "System Default"}] + try: + result = subprocess.run( + ["pactl", "list", "short", kind], + capture_output=True, + text=True, + timeout=5, + ) + for line in result.stdout.strip().splitlines(): + parts = line.split("\t") + if len(parts) >= 2: + name = parts[1] + options.append({"value": name, "label": name}) + except Exception: + pass + return options + + +def list_pulse_sources() -> list[AudioDeviceOption]: + """List PulseAudio/PipeWire input sources (including virtual monitors).""" + return _list_pulse_devices("sources") + + +def list_pulse_sinks() -> list[AudioDeviceOption]: + """List PulseAudio/PipeWire output sinks (including virtual sinks).""" + return _list_pulse_devices("sinks") + + def coerce_sounddevice_device(value: str | None) -> int | str | None: if value is None: return None From 8e19fc1c68207e2b14e793546f66c387dcda177e Mon Sep 17 00:00:00 2001 From: xingjianll <4396kevinliu@gmail.com> Date: Mon, 6 Apr 2026 02:19:42 -0400 Subject: [PATCH 2/8] docs: add Linux PulseAudio virtual cable setup for VRChat audio routing Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/user-manual/vrchat-audio-routing.mdx | 53 +++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/docs/user-manual/vrchat-audio-routing.mdx b/docs/user-manual/vrchat-audio-routing.mdx index 524153f..2c33b67 100644 --- a/docs/user-manual/vrchat-audio-routing.mdx +++ b/docs/user-manual/vrchat-audio-routing.mdx @@ -106,6 +106,59 @@ Point both remote components at the game machine's bridge: - `channels`: `1` - `frame_ms`: `20` +## Linux Local Setup + +On Linux (e.g. Linux Mint, Ubuntu), use PulseAudio/PipeWire virtual sinks +instead of VB-Cable. AudioIn and AudioOut automatically use `parec`/`paplay` +on Linux, which can see PulseAudio virtual devices that sounddevice cannot. + +### Create Virtual Cables + +```bash +pactl load-module module-null-sink sink_name=cable_a sink_properties=device.description="Cable-A" +pactl load-module module-null-sink sink_name=cable_b sink_properties=device.description="Cable-B" +``` + +These are temporary (lost on reboot). To make them persistent, add the commands +to `~/.config/pulse/default.pa`. + +### Signal Flow + +```text +AudioOut -> cable_a (sink) -> cable_a.monitor (source) -> VRChat microphone +VRChat audio output -> cable_b (sink) -> cable_b.monitor (source) -> AudioIn +``` + +### Setup + +1. Route VRChat's output to `cable_b`: open `pavucontrol`, go to the + **Playback** tab, and change VRChat's output to **Cable-B**. Alternatively, + set `cable_b` as the system default sink before launching VRChat: + ```bash + pactl set-default-sink cable_b + ``` + +2. Route VRChat's mic input to `cable_a.monitor`: open `pavucontrol`, go to + the **Recording** tab, and change VRChat's input source to + **Monitor of Cable-A**. + +3. In OpenNeuro, configure the components: + - `AudioIn` + - `device`: `cable_b.monitor` + - `sample_rate`: `48000` + - `channels`: `2` + - `AudioOut` + - `device`: `cable_a` + - `sample_rate`: `48000` + - `channels`: `1` + +### Optional: Hear Game Audio Locally + +If you still want to hear VRChat through your headset while routing to +OpenNeuro, enable monitoring on `cable_b` in `pavucontrol`: go to the +**Output Devices** tab, find **Cable-B**, click the lock icon, and enable +**Monitor of Cable-B** routed to your real audio device. + ## Recommended Graphs For voice-to-voice VRChat flows: From 5c2c88ecf8a9bb238ac5ceefdecbabb8fe8e37da Mon Sep 17 00:00:00 2001 From: xingjianll <4396kevinliu@gmail.com> Date: Mon, 6 Apr 2026 02:41:40 -0400 Subject: [PATCH 3/8] =?UTF-8?q?feat:=20DeepgramASR=20=E2=80=94=20streaming?= =?UTF-8?q?=20speech-to-text=20with=20VAD=20+=20diarization?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single component replaces separate VAD + ASR pipeline. Streams audio to Deepgram's WebSocket API (nova-3), returns final transcriptions with speaker labels (e.g. "[Speaker 0] hello"). Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/lib/audio/__init__.py | 1 + backend/src/lib/audio/deepgram_asr.py | 155 ++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 backend/src/lib/audio/deepgram_asr.py diff --git a/backend/src/lib/audio/__init__.py b/backend/src/lib/audio/__init__.py index 2666eb2..ace6daf 100644 --- a/backend/src/lib/audio/__init__.py +++ b/backend/src/lib/audio/__init__.py @@ -8,6 +8,7 @@ from src.lib.audio.speaker_browser import SpeakerBrowser as SpeakerBrowser from src.lib.audio.vad import VAD as VAD from src.lib.audio.asr import ASR as ASR +from src.lib.audio.deepgram_asr import DeepgramASR as DeepgramASR from src.lib.audio.tts import TTS as TTS from src.lib.audio.tts_fish import FishTTS as FishTTS from src.lib.audio.sts import STS as STS diff --git a/backend/src/lib/audio/deepgram_asr.py b/backend/src/lib/audio/deepgram_asr.py new file mode 100644 index 0000000..e1e3a08 --- /dev/null +++ b/backend/src/lib/audio/deepgram_asr.py @@ -0,0 +1,155 @@ +"""DeepgramASR — streaming speech-to-text with built-in VAD via Deepgram's WebSocket API. + +Replaces the separate VAD + ASR pipeline with a single component. +Send AudioFrames in, get TextFrames out. +""" + +from __future__ import annotations + +import json +import os +import threading +from typing import NamedTuple + +from pydantic import BaseModel + +from src.core.channel import Receiver, Sender +from src.core.component import ThreadedComponent, Tag +from src.core.frames import AudioDataFormat, AudioFrame, TextFrame + + +class DeepgramASRConfig(BaseModel): + api_key_env_var: str = "DEEPGRAM_API_KEY" + model: str = "nova-3" + language: str = "en" + sample_rate: int = 48000 + channels: int = 1 + interim_results: bool = True + utterance_end_ms: int = 1000 + vad_events: bool = True + smart_format: bool = True + diarize: bool = True + + +class DeepgramASRInputs(NamedTuple): + audio: Receiver[AudioFrame] + + +class DeepgramASROutputs(NamedTuple): + text: Sender[TextFrame] + + +class DeepgramASR(ThreadedComponent[DeepgramASRInputs, DeepgramASROutputs]): + tags = Tag(io={"conduit"}, functionality={"audio"}) + description = ( + "Streaming speech-to-text via **Deepgram** WebSocket API with built-in VAD. " + "Replaces separate VAD + ASR components. Send AudioFrames in, get TextFrames out." + ) + + def __init__(self, config: DeepgramASRConfig = DeepgramASRConfig()) -> None: + super().__init__() + self.config = config + + def run(self, inputs: DeepgramASRInputs, outputs: DeepgramASROutputs) -> None: + import websockets.sync.client as ws_client + + api_key = os.getenv(self.config.api_key_env_var, "") + if not api_key: + print(f"[DeepgramASR] {self.config.api_key_env_var} not set — stopping") + return + + c = self.config + params = ( + f"model={c.model}" + f"&language={c.language}" + f"&sample_rate={c.sample_rate}" + f"&channels={c.channels}" + f"&encoding=linear16" + f"&interim_results={'true' if c.interim_results else 'false'}" + f"&utterance_end_ms={c.utterance_end_ms}" + f"&vad_events={'true' if c.vad_events else 'false'}" + f"&smart_format={'true' if c.smart_format else 'false'}" + f"&diarize={'true' if c.diarize else 'false'}" + ) + url = f"wss://api.deepgram.com/v1/listen?{params}" + + print(f"[DeepgramASR] Connecting to Deepgram ({c.model})") + + try: + ws = ws_client.connect( + url, + additional_headers={"Authorization": f"Token {api_key}"}, + ) + except Exception as e: + print(f"[DeepgramASR] Connection failed: {e}") + print(f"[DeepgramASR] URL: {url}") + return + + print("[DeepgramASR] Connected") + + # Receiver thread: read transcription results from Deepgram + def recv_loop() -> None: + try: + for msg in ws: + if self.stop_event.is_set(): + break + if isinstance(msg, str): + data = json.loads(msg) + msg_type = data.get("type", "") + + if msg_type == "Results": + channel = data.get("channel", {}) + alternatives = channel.get("alternatives", []) + if alternatives: + alt = alternatives[0] + transcript = alt.get("transcript", "").strip() + is_final = data.get("is_final", False) + if transcript and is_final: + # Extract speaker from first word's diarization + words = alt.get("words", []) + speaker = ( + words[0].get("speaker", None) if words else None + ) + if speaker is not None: + text = f"[Speaker {speaker}] {transcript}" + else: + text = transcript + print(f"[DeepgramASR] {text}") + outputs.text.send(TextFrame.new(text=text)) + + elif msg_type == "UtteranceEnd": + pass # Could be used for turn-taking signals + + except Exception as e: + if not self.stop_event.is_set(): + print(f"[DeepgramASR] Receive error: {e}") + + recv_thread = threading.Thread(target=recv_loop, daemon=True) + recv_thread.start() + + # Send audio frames to Deepgram + try: + for frame in inputs.audio: + if frame is None or self.stop_event.is_set(): + break + pcm = frame.get( + sample_rate=c.sample_rate, + num_channels=c.channels, + data_format=AudioDataFormat.PCM16, + ) + try: + ws.send(pcm) + except Exception: + break + finally: + # Send close message to Deepgram + try: + ws.send(json.dumps({"type": "CloseStream"})) + except Exception: + pass + try: + ws.close() + except Exception: + pass + recv_thread.join(timeout=2.0) + print("[DeepgramASR] Stopped") From ac872e323ccb53b05e84d32cbf8211938e82fbd2 Mon Sep 17 00:00:00 2001 From: xingjianll <4396kevinliu@gmail.com> Date: Mon, 6 Apr 2026 03:33:54 -0400 Subject: [PATCH 4/8] feat: add AgentLoop component (copy of AgentState for future iteration) Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/lib/llm/__init__.py | 1 + backend/src/lib/llm/agent_loop.py | 282 ++++++++++++++++++++++++++++++ 2 files changed, 283 insertions(+) create mode 100644 backend/src/lib/llm/agent_loop.py diff --git a/backend/src/lib/llm/__init__.py b/backend/src/lib/llm/__init__.py index d9d96b2..50d8dcb 100644 --- a/backend/src/lib/llm/__init__.py +++ b/backend/src/lib/llm/__init__.py @@ -1,5 +1,6 @@ from src.lib.llm.llm import LLM as LLM from src.lib.llm.agent_state import AgentState as AgentState +from src.lib.llm.agent_loop import AgentLoop as AgentLoop from src.lib.llm.memory import Mem0 as Mem0 from src.lib.llm.character_card import CharacterCard as CharacterCard from src.lib.llm.messages_to_text import MessagesToText as MessagesToText diff --git a/backend/src/lib/llm/agent_loop.py b/backend/src/lib/llm/agent_loop.py new file mode 100644 index 0000000..2f5a35a --- /dev/null +++ b/backend/src/lib/llm/agent_loop.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +import math +from datetime import datetime +from typing import Any, NamedTuple + +from pydantic import BaseModel + +from src.core.channel import Receiver, Sender +from src.core.component import ThreadedComponent, Tag +from src.core.frames import ( + BodyPoseFrame, + MessageFrame, + ObjectLocationFrame, + TextFrame, + ToolCall, + ToolResult, +) +from src.core.config import PROJECTS_DIR, AppConfig +from src.core.utils import drain + + +class AgentLoopConfig(BaseModel): + system_prompt: str + post_prompt: str = ( + "Do nothing unless the user has talked and you haven't replied, " + "or if you are executing a task." + ) + + +class AgentLoopInputs[T](NamedTuple): + request: Receiver[T] + initial_msgs: Receiver[list[MessageFrame]] | None = None + speech: Receiver[TextFrame] | None = None + feedback: Receiver[TextFrame] | None = None + tool_call: Receiver[ToolCall] | None = None + tool_result: Receiver[ToolResult] | None = None + vision: Receiver[TextFrame] | None = None + pose: Receiver[BodyPoseFrame] | None = None + objects: Receiver[ObjectLocationFrame] | None = None + memory: Receiver[TextFrame] | None = None + pause: Receiver[TextFrame] | None = None + + +class AgentLoopOutputs(NamedTuple): + messages: Sender[list[MessageFrame]] + + +class AgentLoop[T](ThreadedComponent[AgentLoopInputs[T], AgentLoopOutputs]): + """Manages conversation history, optionally enriched by memory and character card.""" + + description = "Tracks and manages **agent conversation state**. Maintains message history enriched by optional *memory* and *character card* inputs, and outputs assembled `MessageFrame` lists for the LLM." + tags = Tag(io={"conduit"}, functionality={"llm"}) + + def __init__(self, config: AgentLoopConfig) -> None: + super().__init__() + self.config = config + self._history: list[MessageFrame] = [ + MessageFrame.new(role="system", content=config.system_prompt) + ] + + @staticmethod + def _heading_from_quat(w: float, x: float, y: float, z: float) -> float: + """Extract heading in degrees (clockwise from +Z) from a Y-up quaternion.""" + fwd_x = 2 * (x * z + w * y) + fwd_z = 1 - 2 * (x * x + y * y) + return -math.degrees(math.atan2(fwd_x, fwd_z)) + + @staticmethod + def _print_message(m: MessageFrame) -> None: + preview = m.content[:120] if m.content else "(no content)" + extra = "" + if m.tool_calls: + for tc in m.tool_calls: + args = tc.arguments[:80] if tc.arguments else "" + extra += f" {tc.name}({args})" + if m.tool_call_id: + extra += f" tool_call_id={m.tool_call_id}" + print(f" [{m.role}] {preview}{extra}") + + def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: + print("[AgentLoop] Starting Agent Loop") + + # Read initial prompts once (constant component, e.g. CharacterCard) + if inputs.initial_msgs is not None: + frame = next(inputs.initial_msgs) + if frame is not None: + self._history = frame + self._history + print(f"[AgentLoop] Initial messages loaded ({len(frame)} msgs)") + + # Configure receiver modes for optional inputs + if inputs.speech: + inputs.speech.blocking = False + if inputs.feedback: + inputs.feedback.blocking = False + if inputs.vision: + inputs.vision.newest = True + inputs.vision.blocking = False + if inputs.memory: + inputs.memory.blocking = False + if inputs.tool_call: + inputs.tool_call.blocking = False + if inputs.tool_result: + inputs.tool_result.blocking = False + if inputs.objects is not None: + inputs.objects.newest = True + inputs.objects.blocking = False + if inputs.pose is not None: + inputs.pose.newest = True + inputs.pose.blocking = False + if inputs.pause is not None: + inputs.pause.newest = True + inputs.pause.blocking = False + + # Buffer tool_calls until their matching tool_result arrives + pending_tool_calls: dict[str, ToolCall] = {} + + # Block on request, drain others on each trigger + for req in inputs.request: + if req is None: + break + + # Check for pause signal — drain all queued requests and wait for next + if inputs.pause is not None: + p = next(inputs.pause, None) + if p is not None: + print("[AgentLoop] Paused — draining requests") + continue + + # Drain everything except tool results and vision + for speech, feedback, memory, tc in drain( + inputs.speech, + inputs.feedback, + inputs.memory, + inputs.tool_call, + ): + if speech is not None: + ts = datetime.fromtimestamp(speech.pts / 1e9).strftime("%H:%M:%S") + msg = MessageFrame.new(role="user", content=f"[{ts}] {speech.text}") + self._history.append(msg) + self._print_message(msg) + if feedback is not None: + ts = datetime.fromtimestamp(feedback.pts / 1e9).strftime("%H:%M:%S") + msg = MessageFrame.new( + role="assistant", content=f"[{ts}] {feedback.text}" + ) + self._history.append(msg) + self._print_message(msg) + if memory is not None: + ts = datetime.fromtimestamp(memory.pts / 1e9).strftime("%H:%M:%S") + msg = MessageFrame.new( + role="system", content=f"[{ts}] {memory.text}" + ) + self._history.append(msg) + self._print_message(msg) + if tc is not None: + # Tool call in chronological position + placeholder result + ts = datetime.fromtimestamp(tc.pts / 1e9).strftime("%H:%M:%S") + msg_tc = MessageFrame.new( + role="assistant", + content="", + tool_calls=[tc], + ) + self._history.append(msg_tc) + self._print_message(msg_tc) + msg_tr = MessageFrame.new( + role="tool", + content="(pending)", + tool_call_id=tc.call_id, + ) + self._history.append(msg_tr) + pending_tool_calls[tc.call_id] = tc + + # Drain tool results and replace placeholders + if inputs.tool_result is not None: + for tr in inputs.tool_result: + if tr is None: + break + ts = datetime.fromtimestamp(tr.pts / 1e9).strftime("%H:%M:%S") + for i, m in enumerate(self._history): + if m.tool_call_id == tr.call_id and m.content == "(pending)": + self._history[i] = MessageFrame.new( + role="tool", + content=tr.content, + tool_call_id=tr.call_id, + ) + self._print_message(self._history[i]) + pending_tool_calls.pop(tr.call_id, None) + break + + # Build final messages: history + msgs = self._history.copy() + + # Latest vision caption (transient, not in history) + if inputs.vision is not None: + vision_frame = next(inputs.vision, None) + if vision_frame is not None: + ts = datetime.fromtimestamp(vision_frame.pts / 1e9).strftime( + "%H:%M:%S" + ) + msgs.append( + MessageFrame.new( + role="system", + content=f"[{ts}] {vision_frame.text}", + ) + ) + + # Latest object locations (transient, not in history) + if inputs.objects is not None: + obj_frame = next(inputs.objects, None) + if obj_frame is not None and len(obj_frame.labels) > 0: + lines = [] + for i in range(len(obj_frame.labels)): + x, y, z = obj_frame.positions[i] + lines.append( + f' "{obj_frame.labels[i]}" at ({x:.2f}, {y:.2f}, {z:.2f})' + ) + msgs.append( + MessageFrame.new( + role="system", + content="[Currently visible objects]\n" + "\n".join(lines), + ) + ) + + # Read agent spatial state (position + direction) + if inputs.pose is not None: + pose_frame = next(inputs.pose) + if pose_frame is not None: + poses = pose_frame.get() + waist = poses.get("waist") + if waist is not None: + px, py, pz = waist.pos_x, waist.pos_y, waist.pos_z + heading = self._heading_from_quat( + waist.rot_w, waist.rot_x, waist.rot_y, waist.rot_z + ) + msgs.append( + MessageFrame.new( + role="system", + content=f"[Position (y-up): x={px:.2f}, y={py:.2f}, z={pz:.2f}]\n" + f"[Heading (from +Z clockwise): {heading:.0f}°]", + ) + ) + + # Post-prompt (always last) + if self.config.post_prompt: + msgs.append( + MessageFrame.new(role="system", content=self.config.post_prompt) + ) + + self._dump_messages(msgs) + outputs.messages.send(msgs) + + print("[AgentLoop] Agent Loop stopped") + + @staticmethod + def _dump_messages(msgs: list[MessageFrame]) -> None: + """Serialize messages (as the LLM sees them) to the current project dir.""" + import json + + config = AppConfig.load_config() + if not config.current_project: + return + path = PROJECTS_DIR / config.current_project / "messages.json" + serialized = [] + for m in msgs: + msg: dict[str, Any] = {"role": m.role, "content": m.content} + if m.tool_calls: + msg["tool_calls"] = [ + { + "id": tc.call_id, + "type": "function", + "function": { + "name": tc.name, + "arguments": tc.arguments, + }, + } + for tc in m.tool_calls + ] + if m.tool_call_id: + msg["tool_call_id"] = m.tool_call_id + serialized.append(msg) + path.write_text(json.dumps(serialized, indent=2, ensure_ascii=False)) From 83252b936c919d2ee3067cf013e9965f27a2cbc9 Mon Sep 17 00:00:00 2001 From: xingjianll <4396kevinliu@gmail.com> Date: Mon, 6 Apr 2026 03:35:53 -0400 Subject: [PATCH 5/8] refactor: AgentLoop uses while loop instead of iterating on request Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/lib/llm/agent_loop.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/backend/src/lib/llm/agent_loop.py b/backend/src/lib/llm/agent_loop.py index 2f5a35a..3a51ef3 100644 --- a/backend/src/lib/llm/agent_loop.py +++ b/backend/src/lib/llm/agent_loop.py @@ -28,8 +28,7 @@ class AgentLoopConfig(BaseModel): ) -class AgentLoopInputs[T](NamedTuple): - request: Receiver[T] +class AgentLoopInputs(NamedTuple): initial_msgs: Receiver[list[MessageFrame]] | None = None speech: Receiver[TextFrame] | None = None feedback: Receiver[TextFrame] | None = None @@ -46,7 +45,7 @@ class AgentLoopOutputs(NamedTuple): messages: Sender[list[MessageFrame]] -class AgentLoop[T](ThreadedComponent[AgentLoopInputs[T], AgentLoopOutputs]): +class AgentLoop(ThreadedComponent[AgentLoopInputs, AgentLoopOutputs]): """Manages conversation history, optionally enriched by memory and character card.""" description = "Tracks and manages **agent conversation state**. Maintains message history enriched by optional *memory* and *character card* inputs, and outputs assembled `MessageFrame` lists for the LLM." @@ -115,12 +114,8 @@ def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: # Buffer tool_calls until their matching tool_result arrives pending_tool_calls: dict[str, ToolCall] = {} - # Block on request, drain others on each trigger - for req in inputs.request: - if req is None: - break - - # Check for pause signal — drain all queued requests and wait for next + while not self.stop_event.is_set(): + # Check for pause signal if inputs.pause is not None: p = next(inputs.pause, None) if p is not None: From cc9a4fc8a1bec22e5b997d446270b0d683af4005 Mon Sep 17 00:00:00 2001 From: xingjianll <4396kevinliu@gmail.com> Date: Mon, 6 Apr 2026 03:42:30 -0400 Subject: [PATCH 6/8] refactor: AgentLoop internalizes LLM via OpenAI Responses API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Removes messages output, adds token/text/tool_calls/eos outputs (same as LLM) - Calls OpenAI Responses API directly with streaming - Recursive _call_llm for tool call → tool result → LLM loop - Accepts ToolDef inputs for function calling - Drains speech/feedback/vision/objects/pose/memory each iteration - Only calls LLM when new user input arrives Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/lib/llm/agent_loop.py | 388 +++++++++++++++++------------- 1 file changed, 225 insertions(+), 163 deletions(-) diff --git a/backend/src/lib/llm/agent_loop.py b/backend/src/lib/llm/agent_loop.py index 3a51ef3..256638c 100644 --- a/backend/src/lib/llm/agent_loop.py +++ b/backend/src/lib/llm/agent_loop.py @@ -1,93 +1,130 @@ from __future__ import annotations +import json import math +import os +import time from datetime import datetime from typing import Any, NamedTuple +from openai import OpenAI from pydantic import BaseModel from src.core.channel import Receiver, Sender from src.core.component import ThreadedComponent, Tag from src.core.frames import ( BodyPoseFrame, + EOS, MessageFrame, ObjectLocationFrame, TextFrame, ToolCall, - ToolResult, + ToolDef, ) from src.core.config import PROJECTS_DIR, AppConfig from src.core.utils import drain class AgentLoopConfig(BaseModel): - system_prompt: str + model: str = "gpt-4.1" + system_prompt: str = "You are a helpful assistant." post_prompt: str = ( "Do nothing unless the user has talked and you haven't replied, " "or if you are executing a task." ) + temperature: float | None = None + top_p: float | None = None + max_tokens: int | None = None class AgentLoopInputs(NamedTuple): initial_msgs: Receiver[list[MessageFrame]] | None = None speech: Receiver[TextFrame] | None = None feedback: Receiver[TextFrame] | None = None - tool_call: Receiver[ToolCall] | None = None - tool_result: Receiver[ToolResult] | None = None + tool_result: Receiver[TextFrame] | None = None vision: Receiver[TextFrame] | None = None pose: Receiver[BodyPoseFrame] | None = None objects: Receiver[ObjectLocationFrame] | None = None memory: Receiver[TextFrame] | None = None - pause: Receiver[TextFrame] | None = None + tools: Receiver[ToolDef] | None = None class AgentLoopOutputs(NamedTuple): - messages: Sender[list[MessageFrame]] + token: Sender[TextFrame | EOS] + text: Sender[TextFrame] | None = None + tool_calls: Sender[ToolCall] | None = None + eos: Sender[EOS] | None = None class AgentLoop(ThreadedComponent[AgentLoopInputs, AgentLoopOutputs]): - """Manages conversation history, optionally enriched by memory and character card.""" + """Agentic loop: manages conversation state and calls the LLM directly. - description = "Tracks and manages **agent conversation state**. Maintains message history enriched by optional *memory* and *character card* inputs, and outputs assembled `MessageFrame` lists for the LLM." + Uses the OpenAI Responses API with streaming. Drains speech, feedback, + vision, and other inputs each iteration, builds the message context, + calls the model, and streams tokens + tool calls to outputs. + """ + + description = ( + "Self-contained **agentic loop** with built-in LLM. " + "Manages conversation history, drains multimodal inputs, " + "and streams tokens and tool calls using the OpenAI Responses API." + ) tags = Tag(io={"conduit"}, functionality={"llm"}) def __init__(self, config: AgentLoopConfig) -> None: super().__init__() self.config = config - self._history: list[MessageFrame] = [ - MessageFrame.new(role="system", content=config.system_prompt) - ] + self._client = OpenAI() + self._input: list[dict[str, Any]] = [] + self._tool_defs: list[dict[str, Any]] = [] + + # -- Helpers -- @staticmethod def _heading_from_quat(w: float, x: float, y: float, z: float) -> float: - """Extract heading in degrees (clockwise from +Z) from a Y-up quaternion.""" fwd_x = 2 * (x * z + w * y) fwd_z = 1 - 2 * (x * x + y * y) return -math.degrees(math.atan2(fwd_x, fwd_z)) - @staticmethod - def _print_message(m: MessageFrame) -> None: - preview = m.content[:120] if m.content else "(no content)" - extra = "" - if m.tool_calls: - for tc in m.tool_calls: - args = tc.arguments[:80] if tc.arguments else "" - extra += f" {tc.name}({args})" - if m.tool_call_id: - extra += f" tool_call_id={m.tool_call_id}" - print(f" [{m.role}] {preview}{extra}") + def _append_msg(self, role: str, content: str) -> None: + self._input.append({"role": role, "content": content}) + preview = content[:120] + print(f" [{role}] {preview}") + + # -- Main loop -- def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: - print("[AgentLoop] Starting Agent Loop") + print("[AgentLoop] Starting") + + # System prompt + self._input = [{"role": "system", "content": self.config.system_prompt}] - # Read initial prompts once (constant component, e.g. CharacterCard) + # Read initial prompts once (e.g. CharacterCard) if inputs.initial_msgs is not None: frame = next(inputs.initial_msgs) if frame is not None: - self._history = frame + self._history - print(f"[AgentLoop] Initial messages loaded ({len(frame)} msgs)") + for m in frame: + self._append_msg(m.role, m.content) - # Configure receiver modes for optional inputs + # Collect tool definitions once + if inputs.tools is not None: + inputs.tools.blocking = False + for td in inputs.tools: + if td is None: + break + self._tool_defs.append( + { + "type": "function", + "name": td.name, + "description": td.description, + "parameters": td.parameters, + **({"strict": td.strict} if td.strict is not None else {}), + } + ) + if self._tool_defs: + print(f"[AgentLoop] Tools: {[t['name'] for t in self._tool_defs]}") + + # Configure receiver modes if inputs.speech: inputs.speech.blocking = False if inputs.feedback: @@ -97,8 +134,6 @@ def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: inputs.vision.blocking = False if inputs.memory: inputs.memory.blocking = False - if inputs.tool_call: - inputs.tool_call.blocking = False if inputs.tool_result: inputs.tool_result.blocking = False if inputs.objects is not None: @@ -107,171 +142,198 @@ def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: if inputs.pose is not None: inputs.pose.newest = True inputs.pose.blocking = False - if inputs.pause is not None: - inputs.pause.newest = True - inputs.pause.blocking = False - - # Buffer tool_calls until their matching tool_result arrives - pending_tool_calls: dict[str, ToolCall] = {} while not self.stop_event.is_set(): - # Check for pause signal - if inputs.pause is not None: - p = next(inputs.pause, None) - if p is not None: - print("[AgentLoop] Paused — draining requests") - continue - - # Drain everything except tool results and vision - for speech, feedback, memory, tc in drain( + # Drain inputs into history + has_new_input = False + + for speech, feedback, memory in drain( inputs.speech, inputs.feedback, inputs.memory, - inputs.tool_call, ): if speech is not None: ts = datetime.fromtimestamp(speech.pts / 1e9).strftime("%H:%M:%S") - msg = MessageFrame.new(role="user", content=f"[{ts}] {speech.text}") - self._history.append(msg) - self._print_message(msg) + self._append_msg("user", f"[{ts}] {speech.text}") + has_new_input = True if feedback is not None: ts = datetime.fromtimestamp(feedback.pts / 1e9).strftime("%H:%M:%S") - msg = MessageFrame.new( - role="assistant", content=f"[{ts}] {feedback.text}" - ) - self._history.append(msg) - self._print_message(msg) + self._append_msg("assistant", f"[{ts}] {feedback.text}") if memory is not None: ts = datetime.fromtimestamp(memory.pts / 1e9).strftime("%H:%M:%S") - msg = MessageFrame.new( - role="system", content=f"[{ts}] {memory.text}" - ) - self._history.append(msg) - self._print_message(msg) - if tc is not None: - # Tool call in chronological position + placeholder result - ts = datetime.fromtimestamp(tc.pts / 1e9).strftime("%H:%M:%S") - msg_tc = MessageFrame.new( - role="assistant", - content="", - tool_calls=[tc], - ) - self._history.append(msg_tc) - self._print_message(msg_tc) - msg_tr = MessageFrame.new( - role="tool", - content="(pending)", - tool_call_id=tc.call_id, - ) - self._history.append(msg_tr) - pending_tool_calls[tc.call_id] = tc + self._append_msg("system", f"[{ts}] {memory.text}") + + # Nothing new — sleep briefly to avoid busy loop + if not has_new_input: + self.stop_event.wait(0.1) + continue + + # Build transient context (not persisted in history) + transient: list[dict[str, Any]] = [] - # Drain tool results and replace placeholders - if inputs.tool_result is not None: - for tr in inputs.tool_result: - if tr is None: - break - ts = datetime.fromtimestamp(tr.pts / 1e9).strftime("%H:%M:%S") - for i, m in enumerate(self._history): - if m.tool_call_id == tr.call_id and m.content == "(pending)": - self._history[i] = MessageFrame.new( - role="tool", - content=tr.content, - tool_call_id=tr.call_id, - ) - self._print_message(self._history[i]) - pending_tool_calls.pop(tr.call_id, None) - break - - # Build final messages: history - msgs = self._history.copy() - - # Latest vision caption (transient, not in history) if inputs.vision is not None: - vision_frame = next(inputs.vision, None) - if vision_frame is not None: - ts = datetime.fromtimestamp(vision_frame.pts / 1e9).strftime( - "%H:%M:%S" - ) - msgs.append( - MessageFrame.new( - role="system", - content=f"[{ts}] {vision_frame.text}", - ) + v = next(inputs.vision, None) + if v is not None: + ts = datetime.fromtimestamp(v.pts / 1e9).strftime("%H:%M:%S") + transient.append( + {"role": "system", "content": f"[{ts}] {v.text}"} ) - # Latest object locations (transient, not in history) if inputs.objects is not None: - obj_frame = next(inputs.objects, None) - if obj_frame is not None and len(obj_frame.labels) > 0: - lines = [] - for i in range(len(obj_frame.labels)): - x, y, z = obj_frame.positions[i] - lines.append( - f' "{obj_frame.labels[i]}" at ({x:.2f}, {y:.2f}, {z:.2f})' - ) - msgs.append( - MessageFrame.new( - role="system", - content="[Currently visible objects]\n" + "\n".join(lines), - ) + obj = next(inputs.objects, None) + if obj is not None and len(obj.labels) > 0: + lines = [ + f' "{obj.labels[i]}" at ({obj.positions[i][0]:.2f}, {obj.positions[i][1]:.2f}, {obj.positions[i][2]:.2f})' + for i in range(len(obj.labels)) + ] + transient.append( + { + "role": "system", + "content": "[Currently visible objects]\n" + + "\n".join(lines), + } ) - # Read agent spatial state (position + direction) if inputs.pose is not None: - pose_frame = next(inputs.pose) - if pose_frame is not None: - poses = pose_frame.get() + pose = next(inputs.pose, None) + if pose is not None: + poses = pose.get() waist = poses.get("waist") if waist is not None: - px, py, pz = waist.pos_x, waist.pos_y, waist.pos_z heading = self._heading_from_quat( waist.rot_w, waist.rot_x, waist.rot_y, waist.rot_z ) - msgs.append( - MessageFrame.new( - role="system", - content=f"[Position (y-up): x={px:.2f}, y={py:.2f}, z={pz:.2f}]\n" + transient.append( + { + "role": "system", + "content": f"[Position (y-up): x={waist.pos_x:.2f}, y={waist.pos_y:.2f}, z={waist.pos_z:.2f}]\n" f"[Heading (from +Z clockwise): {heading:.0f}°]", - ) + } ) - # Post-prompt (always last) if self.config.post_prompt: - msgs.append( - MessageFrame.new(role="system", content=self.config.post_prompt) + transient.append( + {"role": "system", "content": self.config.post_prompt} ) - self._dump_messages(msgs) - outputs.messages.send(msgs) + # Call LLM + api_input = self._input + transient + self._call_llm(api_input, inputs, outputs) - print("[AgentLoop] Agent Loop stopped") + print("[AgentLoop] Stopped") - @staticmethod - def _dump_messages(msgs: list[MessageFrame]) -> None: - """Serialize messages (as the LLM sees them) to the current project dir.""" - import json - - config = AppConfig.load_config() - if not config.current_project: - return - path = PROJECTS_DIR / config.current_project / "messages.json" - serialized = [] - for m in msgs: - msg: dict[str, Any] = {"role": m.role, "content": m.content} - if m.tool_calls: - msg["tool_calls"] = [ + def _call_llm( + self, + api_input: list[dict[str, Any]], + inputs: AgentLoopInputs, + outputs: AgentLoopOutputs, + ) -> None: + """Stream a single LLM response, emit tokens/tool calls, handle tool loop.""" + kwargs: dict[str, Any] = { + "model": self.config.model, + "input": api_input, + "stream": True, + } + if self.config.temperature is not None: + kwargs["temperature"] = self.config.temperature + if self.config.top_p is not None: + kwargs["top_p"] = self.config.top_p + if self.config.max_tokens is not None: + kwargs["max_output_tokens"] = self.config.max_tokens + if self._tool_defs: + kwargs["tools"] = self._tool_defs + kwargs["tool_choice"] = "auto" + + stream = self._client.responses.create(**kwargs) + + full_text = "" + tool_calls: list[dict[str, str]] = [] + + for event in stream: + if self.stop_event.is_set(): + break + + if event.type == "response.output_text.delta": + full_text += event.delta + outputs.token.send(TextFrame.new(text=event.delta)) + + elif event.type == "response.function_call_arguments.delta": + # Accumulate function call arguments + if not tool_calls or tool_calls[-1].get("_done"): + tool_calls.append( + {"call_id": "", "name": "", "arguments": ""} + ) + tool_calls[-1]["arguments"] += event.delta + + elif event.type == "response.output_item.added": + if hasattr(event, "item") and event.item.type == "function_call": + tool_calls.append( + { + "call_id": event.item.call_id, + "name": event.item.name, + "arguments": "", + } + ) + + elif event.type == "response.output_item.done": + if ( + hasattr(event, "item") + and event.item.type == "function_call" + and tool_calls + ): + tool_calls[-1]["call_id"] = event.item.call_id + tool_calls[-1]["name"] = event.item.name + tool_calls[-1]["arguments"] = event.item.arguments + tool_calls[-1]["_done"] = True # type: ignore[assignment] + + # Emit completed text + if full_text: + self._append_msg("assistant", full_text) + if outputs.text is not None: + outputs.text.send(TextFrame.new(text=full_text)) + + # Emit tool calls + if tool_calls and outputs.tool_calls is not None: + for tc in tool_calls: + outputs.tool_calls.send( + ToolCall.new( + call_id=tc["call_id"], + name=tc["name"], + arguments=tc["arguments"], + ) + ) + # Add function_call to input for next turn + self._input.append( { - "id": tc.call_id, - "type": "function", - "function": { - "name": tc.name, - "arguments": tc.arguments, - }, + "type": "function_call", + "call_id": tc["call_id"], + "name": tc["name"], + "arguments": tc["arguments"], } - for tc in m.tool_calls - ] - if m.tool_call_id: - msg["tool_call_id"] = m.tool_call_id - serialized.append(msg) - path.write_text(json.dumps(serialized, indent=2, ensure_ascii=False)) + ) + + # Wait for tool results and add them + if inputs.tool_result is not None: + inputs.tool_result.blocking = True + for tc in tool_calls: + result = next(inputs.tool_result, None) + if result is None: + break + self._input.append( + { + "type": "function_call_output", + "call_id": tc["call_id"], + "output": result.text, + } + ) + inputs.tool_result.blocking = False + + # Loop back — call LLM again with tool results + if not self.stop_event.is_set(): + self._call_llm(self._input, inputs, outputs) + return + + # EOS + outputs.token.send(EOS.END) + if outputs.eos is not None: + outputs.eos.send(EOS.END) From 90301f5a22097c7e0d0b3a640b339c958779ad16 Mon Sep 17 00:00:00 2001 From: xingjianll <4396kevinliu@gmail.com> Date: Mon, 6 Apr 2026 04:40:07 -0400 Subject: [PATCH 7/8] feat: AgentLoop with internalized LLM, vision input, diary tool, scratchpad tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AgentLoop calls OpenAI Responses API directly (no external LLM component) - Accepts VideoFrame input, encodes as base64 JPEG for vision - Built-in diary tool for tracking mental state (handled internally) - ScratchpadTool component with read/update tools - Continuous loop with no speech gating — agent is always active - No recursion — flat loop with fresh transient context each iteration - System prompt emphasizes embodiment and vision grounding Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/lib/llm/__init__.py | 1 + backend/src/lib/llm/agent_loop.py | 243 +++++++++++++++---------- backend/src/lib/llm/scratchpad_tool.py | 79 ++++++++ 3 files changed, 224 insertions(+), 99 deletions(-) create mode 100644 backend/src/lib/llm/scratchpad_tool.py diff --git a/backend/src/lib/llm/__init__.py b/backend/src/lib/llm/__init__.py index 50d8dcb..24585ef 100644 --- a/backend/src/lib/llm/__init__.py +++ b/backend/src/lib/llm/__init__.py @@ -9,3 +9,4 @@ from src.lib.llm.talk_tool import TalkTool as TalkTool from src.lib.llm.think_tool import ThinkTool as ThinkTool from src.lib.llm.movement_tool import MovementTool as MovementTool +from src.lib.llm.scratchpad_tool import ScratchpadTool as ScratchpadTool diff --git a/backend/src/lib/llm/agent_loop.py b/backend/src/lib/llm/agent_loop.py index 256638c..0a718db 100644 --- a/backend/src/lib/llm/agent_loop.py +++ b/backend/src/lib/llm/agent_loop.py @@ -1,12 +1,14 @@ from __future__ import annotations +import base64 +import io import json import math -import os -import time from datetime import datetime from typing import Any, NamedTuple +from PIL import Image + from openai import OpenAI from pydantic import BaseModel @@ -20,29 +22,53 @@ TextFrame, ToolCall, ToolDef, + VideoFrame, + VideoDataFormat, ) -from src.core.config import PROJECTS_DIR, AppConfig from src.core.utils import drain +_DEFAULT_SYSTEM_PROMPT = """\ +You are an embodied AI agent operating inside a real-time multimodal pipeline. + +You exist in a physical or virtual environment. You can see through a camera, \ +hear through a microphone, and act through tools (speaking, moving, etc.). + +# Vision +Each turn, you may receive an image — this is your current view of the world. \ +Use it to ground your responses in what you actually see. \ +Do not hallucinate objects or spatial relationships that are not visible. + +# Speech +User speech arrives as transcribed text with timestamps and speaker diarization. There could be more than one speaker.\ +You should use your vision to help you ground who actually said the latest message from the user. \ +Respond naturally and conversationally. Keep responses brief unless asked to elaborate. + +# Tools +You have access to tools for interacting with the environment. \ +Use them when the user asks you to act, not just describe. \ +When you call a tool, wait for the result before continuing. + +# Guidelines +- Be proactive. Do things when you are along. Think constantly but only talk when the latest user message hasn't been responded to.\ +- If you see something relevant to the conversation, mention it naturally. +- Do not repeat yourself or narrate your own actions unless asked. +""" + +_DEFAULT_POST_PROMPT = "Do nothing unless the user has talked and you haven't replied, " + + class AgentLoopConfig(BaseModel): model: str = "gpt-4.1" - system_prompt: str = "You are a helpful assistant." - post_prompt: str = ( - "Do nothing unless the user has talked and you haven't replied, " - "or if you are executing a task." - ) - temperature: float | None = None - top_p: float | None = None - max_tokens: int | None = None + system_prompt: str = _DEFAULT_SYSTEM_PROMPT + post_prompt: str = _DEFAULT_POST_PROMPT class AgentLoopInputs(NamedTuple): initial_msgs: Receiver[list[MessageFrame]] | None = None speech: Receiver[TextFrame] | None = None - feedback: Receiver[TextFrame] | None = None tool_result: Receiver[TextFrame] | None = None - vision: Receiver[TextFrame] | None = None + video: Receiver[VideoFrame] | None = None pose: Receiver[BodyPoseFrame] | None = None objects: Receiver[ObjectLocationFrame] | None = None memory: Receiver[TextFrame] | None = None @@ -59,9 +85,9 @@ class AgentLoopOutputs(NamedTuple): class AgentLoop(ThreadedComponent[AgentLoopInputs, AgentLoopOutputs]): """Agentic loop: manages conversation state and calls the LLM directly. - Uses the OpenAI Responses API with streaming. Drains speech, feedback, - vision, and other inputs each iteration, builds the message context, - calls the model, and streams tokens + tool calls to outputs. + Uses the OpenAI Responses API with streaming. Drains speech and other + inputs each iteration, builds the message context, calls the model, + and streams tokens + tool calls to outputs. """ description = ( @@ -71,6 +97,29 @@ class AgentLoop(ThreadedComponent[AgentLoopInputs, AgentLoopOutputs]): ) tags = Tag(io={"conduit"}, functionality={"llm"}) + _DIARY_TOOL: dict[str, Any] = { + "type": "function", + "name": "diary", + "description": ( + "Write a diary entry to track your mental state. " + "Record what you see, where you are, what you're doing, " + "how you feel, your current goal, and any observations. " + "Call this every few turns to maintain self-awareness. " + "Your diary entries stay in your history so you can always " + "look back at what you were thinking." + ), + "parameters": { + "type": "object", + "properties": { + "entry": { + "type": "string", + "description": "A natural language snapshot of your current mental state.", + }, + }, + "required": ["entry"], + }, + } + def __init__(self, config: AgentLoopConfig) -> None: super().__init__() self.config = config @@ -91,20 +140,28 @@ def _append_msg(self, role: str, content: str) -> None: preview = content[:120] print(f" [{role}] {preview}") + @staticmethod + def _encode_frame(frame: VideoFrame) -> str: + """Encode a VideoFrame as a base64 JPEG data URL.""" + img = Image.fromarray(frame.get(VideoDataFormat.RGB)) + buf = io.BytesIO() + img.save(buf, format="JPEG", quality=80) + b64 = base64.b64encode(buf.getvalue()).decode("ascii") + return f"data:image/jpeg;base64,{b64}" + # -- Main loop -- def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: print("[AgentLoop] Starting") - # System prompt - self._input = [{"role": "system", "content": self.config.system_prompt}] + self._input = [] # Read initial prompts once (e.g. CharacterCard) if inputs.initial_msgs is not None: frame = next(inputs.initial_msgs) if frame is not None: for m in frame: - self._append_msg(m.role, m.content) + self._append_msg(m.role, m.content or "") # Collect tool definitions once if inputs.tools is not None: @@ -121,21 +178,18 @@ def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: **({"strict": td.strict} if td.strict is not None else {}), } ) - if self._tool_defs: - print(f"[AgentLoop] Tools: {[t['name'] for t in self._tool_defs]}") + # Always include the built-in diary tool + self._tool_defs.append(self._DIARY_TOOL) + print(f"[AgentLoop] Tools: {[t['name'] for t in self._tool_defs]}") # Configure receiver modes if inputs.speech: inputs.speech.blocking = False - if inputs.feedback: - inputs.feedback.blocking = False - if inputs.vision: - inputs.vision.newest = True - inputs.vision.blocking = False if inputs.memory: inputs.memory.blocking = False - if inputs.tool_result: - inputs.tool_result.blocking = False + if inputs.video is not None: + inputs.video.newest = True + inputs.video.blocking = False if inputs.objects is not None: inputs.objects.newest = True inputs.objects.blocking = False @@ -144,39 +198,33 @@ def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: inputs.pose.blocking = False while not self.stop_event.is_set(): - # Drain inputs into history - has_new_input = False - - for speech, feedback, memory in drain( - inputs.speech, - inputs.feedback, - inputs.memory, - ): + # Drain any new speech into history + for (speech,) in drain(inputs.speech): if speech is not None: ts = datetime.fromtimestamp(speech.pts / 1e9).strftime("%H:%M:%S") self._append_msg("user", f"[{ts}] {speech.text}") - has_new_input = True - if feedback is not None: - ts = datetime.fromtimestamp(feedback.pts / 1e9).strftime("%H:%M:%S") - self._append_msg("assistant", f"[{ts}] {feedback.text}") - if memory is not None: - ts = datetime.fromtimestamp(memory.pts / 1e9).strftime("%H:%M:%S") - self._append_msg("system", f"[{ts}] {memory.text}") - - # Nothing new — sleep briefly to avoid busy loop - if not has_new_input: - self.stop_event.wait(0.1) - continue # Build transient context (not persisted in history) transient: list[dict[str, Any]] = [] - if inputs.vision is not None: - v = next(inputs.vision, None) - if v is not None: - ts = datetime.fromtimestamp(v.pts / 1e9).strftime("%H:%M:%S") + # Latest video frame — what the agent currently sees + if inputs.video is not None: + vf = next(inputs.video, None) + if vf is not None: transient.append( - {"role": "system", "content": f"[{ts}] {v.text}"} + { + "role": "user", + "content": [ + { + "type": "input_image", + "image_url": self._encode_frame(vf), + }, + { + "type": "input_text", + "text": "[This is what you currently see]", + }, + ], + } ) if inputs.objects is not None: @@ -212,9 +260,7 @@ def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: ) if self.config.post_prompt: - transient.append( - {"role": "system", "content": self.config.post_prompt} - ) + transient.append({"role": "system", "content": self.config.post_prompt}) # Call LLM api_input = self._input + transient @@ -231,15 +277,11 @@ def _call_llm( """Stream a single LLM response, emit tokens/tool calls, handle tool loop.""" kwargs: dict[str, Any] = { "model": self.config.model, + "instructions": self.config.system_prompt, "input": api_input, "stream": True, + "reasoning": {"effort": "low"}, } - if self.config.temperature is not None: - kwargs["temperature"] = self.config.temperature - if self.config.top_p is not None: - kwargs["top_p"] = self.config.top_p - if self.config.max_tokens is not None: - kwargs["max_output_tokens"] = self.config.max_tokens if self._tool_defs: kwargs["tools"] = self._tool_defs kwargs["tool_choice"] = "auto" @@ -260,9 +302,7 @@ def _call_llm( elif event.type == "response.function_call_arguments.delta": # Accumulate function call arguments if not tool_calls or tool_calls[-1].get("_done"): - tool_calls.append( - {"call_id": "", "name": "", "arguments": ""} - ) + tool_calls.append({"call_id": "", "name": "", "arguments": ""}) tool_calls[-1]["arguments"] += event.delta elif event.type == "response.output_item.added": @@ -292,46 +332,51 @@ def _call_llm( if outputs.text is not None: outputs.text.send(TextFrame.new(text=full_text)) - # Emit tool calls - if tool_calls and outputs.tool_calls is not None: - for tc in tool_calls: - outputs.tool_calls.send( - ToolCall.new( - call_id=tc["call_id"], - name=tc["name"], - arguments=tc["arguments"], - ) - ) - # Add function_call to input for next turn + # Process tool calls + for tc in tool_calls: + self._input.append( + { + "type": "function_call", + "call_id": tc["call_id"], + "name": tc["name"], + "arguments": tc["arguments"], + } + ) + + if tc["name"] == "diary": + # Built-in: handle internally + try: + entry = json.loads(tc["arguments"]).get("entry", "") + except (json.JSONDecodeError, KeyError): + entry = tc["arguments"] + print(f"[AgentLoop] Diary: {entry[:120]}") self._input.append( { - "type": "function_call", + "type": "function_call_output", "call_id": tc["call_id"], - "name": tc["name"], - "arguments": tc["arguments"], + "output": "ok", } ) - - # Wait for tool results and add them - if inputs.tool_result is not None: - inputs.tool_result.blocking = True - for tc in tool_calls: - result = next(inputs.tool_result, None) - if result is None: - break - self._input.append( - { - "type": "function_call_output", - "call_id": tc["call_id"], - "output": result.text, - } + else: + # External tool: emit and wait for result + if outputs.tool_calls is not None: + outputs.tool_calls.send( + ToolCall.new( + call_id=tc["call_id"], + name=tc["name"], + arguments=tc["arguments"], + ) ) - inputs.tool_result.blocking = False - - # Loop back — call LLM again with tool results - if not self.stop_event.is_set(): - self._call_llm(self._input, inputs, outputs) - return + if inputs.tool_result is not None: + result = next(inputs.tool_result, None) + if result is not None: + self._input.append( + { + "type": "function_call_output", + "call_id": tc["call_id"], + "output": result.text, + } + ) # EOS outputs.token.send(EOS.END) diff --git a/backend/src/lib/llm/scratchpad_tool.py b/backend/src/lib/llm/scratchpad_tool.py new file mode 100644 index 0000000..ee83057 --- /dev/null +++ b/backend/src/lib/llm/scratchpad_tool.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import json +from typing import NamedTuple + +from src.core.channel import Receiver, Sender +from src.core.component import ThreadedComponent, Tag +from src.core.frames import ToolCall, ToolDef, ToolResult + + +class ScratchpadToolInputs(NamedTuple): + tool_call: Receiver[ToolCall] + + +class ScratchpadToolOutputs(NamedTuple): + tool_def: Sender[ToolDef] + tool_result: Sender[ToolResult] + + +class ScratchpadTool(ThreadedComponent[ScratchpadToolInputs, ScratchpadToolOutputs]): + description = "Persistent scratchpad the agent can **read** and **update** to maintain working notes across turns." + tags = Tag(io={"conduit"}, functionality={"llm"}) + + def __init__(self) -> None: + super().__init__() + self._content: str = "" + + def setup(self, outputs: ScratchpadToolOutputs) -> None: + outputs.tool_def.send( + ToolDef.new( + name="scratchpad_update", + description="Overwrite the scratchpad with new content. Use this to keep track of plans, observations, or any working notes you want to persist across turns.", + parameters={ + "type": "object", + "properties": { + "content": { + "type": "string", + "description": "The new scratchpad content (replaces everything).", + }, + }, + "required": ["content"], + }, + ) + ) + outputs.tool_def.send( + ToolDef.new( + name="scratchpad_read", + description="Read the current scratchpad content.", + parameters={ + "type": "object", + "properties": {}, + }, + ) + ) + + def run(self, inputs: ScratchpadToolInputs, outputs: ScratchpadToolOutputs) -> None: + for call in inputs.tool_call: + if call is None: + break + + if call.name == "scratchpad_update": + try: + args = json.loads(call.arguments) + self._content = args.get("content", "") + except (json.JSONDecodeError, AttributeError): + self._content = call.arguments + print(f"[Scratchpad] Updated: {self._content[:80]}") + outputs.tool_result.send( + ToolResult.new(call_id=call.call_id, content="ok") + ) + + elif call.name == "scratchpad_read": + print(f"[Scratchpad] Read: {self._content[:80]}") + outputs.tool_result.send( + ToolResult.new( + call_id=call.call_id, + content=self._content or "(empty)", + ) + ) From 90544903a95deff997f5e908f59a21cb74fdccf9 Mon Sep 17 00:00:00 2001 From: xingjianll <4396kevinliu@gmail.com> Date: Mon, 6 Apr 2026 14:14:15 -0400 Subject: [PATCH 8/8] feat: AgentLoop ToolResult type + head heading + debug prints - Use ToolResult frame instead of TextFrame for tool_result input - Use result.call_id instead of local tc dict for correct pairing - Add head look heading alongside body heading in pose context - Add debug prints for pose data Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/src/lib/llm/agent_loop.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/backend/src/lib/llm/agent_loop.py b/backend/src/lib/llm/agent_loop.py index 0a718db..64ceef5 100644 --- a/backend/src/lib/llm/agent_loop.py +++ b/backend/src/lib/llm/agent_loop.py @@ -22,6 +22,7 @@ TextFrame, ToolCall, ToolDef, + ToolResult, VideoFrame, VideoDataFormat, ) @@ -67,7 +68,7 @@ class AgentLoopConfig(BaseModel): class AgentLoopInputs(NamedTuple): initial_msgs: Receiver[list[MessageFrame]] | None = None speech: Receiver[TextFrame] | None = None - tool_result: Receiver[TextFrame] | None = None + tool_result: Receiver[ToolResult] | None = None video: Receiver[VideoFrame] | None = None pose: Receiver[BodyPoseFrame] | None = None objects: Receiver[ObjectLocationFrame] | None = None @@ -247,15 +248,27 @@ def run(self, inputs: AgentLoopInputs, outputs: AgentLoopOutputs) -> None: if pose is not None: poses = pose.get() waist = poses.get("waist") + head = poses.get("head") + print(f"[AgentLoop] waist={waist} head={head}") if waist is not None: - heading = self._heading_from_quat( + body_heading = self._heading_from_quat( waist.rot_w, waist.rot_x, waist.rot_y, waist.rot_z ) + lines = [ + f"[Position (y-up): x={waist.pos_x:.2f}, y={waist.pos_y:.2f}, z={waist.pos_z:.2f}]", + f"[Body heading (from +Z clockwise): {body_heading:.0f}°]", + ] + if head is not None: + look_heading = self._heading_from_quat( + head.rot_w, head.rot_x, head.rot_y, head.rot_z + ) + lines.append(f"[Look heading (from +Z clockwise): {look_heading:.0f}°]") + pose_content = "\n".join(lines) + print(f"[AgentLoop] Pose context: {pose_content}") transient.append( { "role": "system", - "content": f"[Position (y-up): x={waist.pos_x:.2f}, y={waist.pos_y:.2f}, z={waist.pos_z:.2f}]\n" - f"[Heading (from +Z clockwise): {heading:.0f}°]", + "content": pose_content, } ) @@ -373,8 +386,8 @@ def _call_llm( self._input.append( { "type": "function_call_output", - "call_id": tc["call_id"], - "output": result.text, + "call_id": result.call_id, + "output": result.content, } )