Skip to content

Commit 0277629

Browse files
Merge pull request #1 from cogos-dev/feat/dashboard-barge-in
feat: voice dashboard, barge-in detection, kernel integration
2 parents 99a700a + 15e3e0a commit 0277629

17 files changed

Lines changed: 2661 additions & 26 deletions

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ jobs:
5757
r = client.get('/health')
5858
assert r.status_code == 200
5959
data = r.json()
60-
assert data['status'] == 'ok'
61-
assert 'engines_loaded' in data
62-
assert 'vad_loaded' in data
60+
assert data['status'] in ('ok', 'degraded')
61+
assert 'engines' in data
62+
assert 'modalities' in data
6363
6464
# Voices endpoint
6565
r = client.get('/v1/voices')

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,9 @@ __pycache__/
33
*.pyc
44
.DS_Store
55
.claude/
6+
7+
# ONNX Runtime WASM binaries (large, download via setup.sh)
8+
dashboard/vad/*.wasm
9+
dashboard/vad/*.mjs
10+
dashboard/vad/ort.js
11+
dashboard/vad/ort.min.mjs

agent_loop.py

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
"""Agent loop — receives percepts, calls LLM with tools, dispatches actions.
2+
3+
The agent loop is the bridge between the ModalityBus (perception/action)
4+
and the InferenceProvider (thinking). It maintains conversation history
5+
and routes tool calls through the bus.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import json as _json
11+
import logging
12+
import os
13+
import time
14+
from typing import TYPE_CHECKING
15+
16+
import httpx
17+
18+
from bus import ModalityBus
19+
from modality import CognitiveEvent, CognitiveIntent, ModalityType
20+
from pipeline_state import PipelineState
21+
from providers import AGENT_TOOLS, InferenceProvider
22+
23+
if TYPE_CHECKING:
24+
from channels import BrowserChannel
25+
26+
logger = logging.getLogger("mod3.agent_loop")
27+
28+
# Base system prompt — kernel context is appended dynamically
29+
_BASE_SYSTEM_PROMPT = (
30+
"You are Cog, a voice assistant running on Mod³ (Apple Silicon, fully local). "
31+
"You respond using tool calls. Use speak() for conversational voice responses — "
32+
"keep them concise, 1-3 sentences. Use send_text() only when the content is "
33+
"better read than heard (code, lists, links, structured data). "
34+
"No markdown in speak() text. Speak naturally. "
35+
"If the user asks something you can't do, say so briefly."
36+
)
37+
38+
# CogOS kernel endpoint for context enrichment
39+
_COGOS_ENDPOINT = os.environ.get("COGOS_ENDPOINT", "http://localhost:6931")
40+
41+
# Bus endpoint for logging exchanges (observation channel)
42+
_COGOS_BUS_ENDPOINT = f"{_COGOS_ENDPOINT}/v1/bus"
43+
44+
45+
def _fetch_kernel_context() -> str:
46+
"""Pull active context from CogOS kernel to enrich the system prompt.
47+
48+
Returns a context block string, or empty string if kernel unavailable.
49+
This is the afferent path: kernel → local model.
50+
"""
51+
try:
52+
resp = httpx.get(f"{_COGOS_ENDPOINT}/health", timeout=2.0)
53+
if resp.status_code != 200:
54+
return ""
55+
health = resp.json()
56+
57+
parts = []
58+
identity = health.get("identity", "cog")
59+
state = health.get("state", "unknown")
60+
parts.append(f"Kernel identity: {identity}, state: {state}")
61+
62+
# Try to get active session context
63+
try:
64+
ctx_resp = httpx.get(f"{_COGOS_ENDPOINT}/v1/context", timeout=2.0)
65+
if ctx_resp.status_code == 200:
66+
ctx = ctx_resp.json()
67+
nucleus = ctx.get("nucleus", "")
68+
if nucleus:
69+
parts.append(f"Active nucleus: {nucleus}")
70+
process_state = ctx.get("state", "")
71+
if process_state:
72+
parts.append(f"Process state: {process_state}")
73+
except Exception:
74+
pass
75+
76+
# Check for barge-in context (what was Claude saying when interrupted?)
77+
signal_file = os.environ.get("BARGEIN_SIGNAL", "/tmp/mod3-barge-in.json")
78+
try:
79+
if os.path.exists(signal_file):
80+
with open(signal_file) as f:
81+
signal = _json.load(f)
82+
interrupted = signal.get("interrupted")
83+
if interrupted:
84+
delivered = interrupted.get("delivered_text", "")
85+
pct = interrupted.get("spoken_pct", 0)
86+
parts.append(
87+
f"[barge-in] Claude's speech was interrupted at {pct * 100:.0f}%. "
88+
f'Delivered: "{delivered}". '
89+
f"The user interrupted to say something — acknowledge and respond to them."
90+
)
91+
except Exception:
92+
pass
93+
94+
if parts:
95+
return "\n\nKernel context:\n" + "\n".join(f"- {p}" for p in parts)
96+
return ""
97+
except Exception:
98+
return ""
99+
100+
101+
def _log_exchange_to_bus(user_text: str, assistant_text: str, provider_name: str):
102+
"""Log the local model exchange to the CogOS bus (observation channel).
103+
104+
This is the efferent path: local model → kernel → Claude can observe.
105+
"""
106+
try:
107+
payload = {
108+
"type": "modality.voice.exchange",
109+
"from": f"mod3-reflex:{provider_name}",
110+
"payload": {
111+
"user": user_text,
112+
"assistant": assistant_text,
113+
"provider": provider_name,
114+
"timestamp": time.time(),
115+
},
116+
}
117+
httpx.post(
118+
_COGOS_BUS_ENDPOINT,
119+
json=payload,
120+
timeout=2.0,
121+
)
122+
except Exception as e:
123+
logger.debug("Failed to log exchange to bus: %s", e)
124+
125+
126+
MAX_HISTORY = 50
127+
128+
129+
class AgentLoop:
130+
"""Conversational agent that receives percepts and acts through the bus."""
131+
132+
def __init__(
133+
self,
134+
bus: ModalityBus,
135+
provider: InferenceProvider,
136+
pipeline_state: PipelineState,
137+
channel_id: str = "",
138+
):
139+
self.bus = bus
140+
self.provider = provider
141+
self.pipeline_state = pipeline_state
142+
self.channel_id = channel_id
143+
self.conversation: list[dict[str, str]] = []
144+
self._channel_ref: BrowserChannel | None = None
145+
self._processing = False
146+
147+
async def handle_event(self, event: CognitiveEvent) -> None:
148+
"""Called when a CognitiveEvent arrives from the channel."""
149+
if not event.content.strip():
150+
return
151+
152+
if self._processing:
153+
logger.warning("agent busy, dropping: %s", event.content[:50])
154+
return
155+
156+
self._processing = True
157+
try:
158+
await self._process(event)
159+
except Exception as e:
160+
logger.error("agent_loop error: %s", e, exc_info=True)
161+
try:
162+
if self._channel_ref:
163+
await self._channel_ref.send_response_text(f"[error: {e}]")
164+
await self._channel_ref.send_response_complete()
165+
except Exception:
166+
pass # channel may be dead, don't block finally
167+
finally:
168+
self._processing = False
169+
170+
async def _process(self, event: CognitiveEvent) -> None:
171+
"""Core: event → provider → tool dispatch."""
172+
self.conversation.append({"role": "user", "content": event.content})
173+
self._trim_history()
174+
175+
t_start = time.perf_counter()
176+
177+
# Assemble system prompt with kernel context (afferent path)
178+
kernel_ctx = _fetch_kernel_context()
179+
system_prompt = _BASE_SYSTEM_PROMPT + kernel_ctx
180+
181+
response = await self.provider.chat(
182+
messages=self.conversation,
183+
tools=AGENT_TOOLS,
184+
system=system_prompt,
185+
)
186+
187+
t_llm = (time.perf_counter() - t_start) * 1000
188+
189+
# Dispatch tool calls
190+
assistant_parts: list[str] = []
191+
192+
for tc in response.tool_calls:
193+
if tc.name == "speak":
194+
text = tc.arguments.get("text", "")
195+
if text:
196+
assistant_parts.append(text)
197+
# Show text in chat panel
198+
if self._channel_ref:
199+
await self._channel_ref.send_response_text(text)
200+
# Route through bus → VoiceEncoder → TTS → channel.deliver
201+
intent = CognitiveIntent(
202+
modality=ModalityType.VOICE,
203+
content=text,
204+
target_channel=self.channel_id,
205+
metadata={
206+
"voice": self._channel_ref.config.get("voice", "bm_lewis")
207+
if self._channel_ref
208+
else "bm_lewis",
209+
"speed": self._channel_ref.config.get("speed", 1.25) if self._channel_ref else 1.25,
210+
},
211+
)
212+
# Fire-and-forget: bus.act(blocking=False) returns QueuedJob immediately,
213+
# OutputQueue drain thread handles TTS encoding + delivery.
214+
self.bus.act(intent, channel=self.channel_id)
215+
216+
elif tc.name == "send_text":
217+
text = tc.arguments.get("text", "")
218+
if text:
219+
assistant_parts.append(text)
220+
if self._channel_ref:
221+
await self._channel_ref.send_response_text(text)
222+
223+
# Fallback: if provider returned text but no tool calls, auto-speak
224+
if not response.tool_calls and response.text:
225+
text = response.text
226+
assistant_parts.append(text)
227+
if self._channel_ref:
228+
await self._channel_ref.send_response_text(text)
229+
intent = CognitiveIntent(
230+
modality=ModalityType.VOICE,
231+
content=text,
232+
target_channel=self.channel_id,
233+
metadata={
234+
"voice": self._channel_ref.config.get("voice", "bm_lewis") if self._channel_ref else "bm_lewis",
235+
"speed": self._channel_ref.config.get("speed", 1.25) if self._channel_ref else 1.25,
236+
},
237+
)
238+
self.bus.act(intent, channel=self.channel_id)
239+
240+
# Update conversation history
241+
if assistant_parts:
242+
assistant_text = " ".join(assistant_parts)
243+
self.conversation.append(
244+
{
245+
"role": "assistant",
246+
"content": assistant_text,
247+
}
248+
)
249+
250+
# Log exchange to CogOS bus (observation channel — Claude can see this)
251+
_log_exchange_to_bus(event.content, assistant_text, self.provider.name)
252+
253+
# Signal completion
254+
if self._channel_ref:
255+
await self._channel_ref.send_response_complete(
256+
metrics={"llm_ms": round(t_llm, 1), "provider": self.provider.name}
257+
)
258+
259+
def _trim_history(self) -> None:
260+
"""Keep conversation within MAX_HISTORY messages."""
261+
if len(self.conversation) > MAX_HISTORY:
262+
self.conversation = self.conversation[-MAX_HISTORY:]

0 commit comments

Comments
 (0)