-
Notifications
You must be signed in to change notification settings - Fork 1
feat(cnc): subscribe to image input and rl reward topics #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,21 +4,6 @@ | |
| AuRoRA · Semantic Cognitive System (SCS) | ||
|
|
||
| ROS2 node — the brain of GRACE. | ||
| Milestone 1: Chatbot with Working Memory (WMC) + Episodic Memory (EMC) | ||
|
|
||
| Inference: NVIDIA Cosmos Reason2 2B via vLLM | ||
| embedl/Cosmos-Reason2-2B-W4A16-Edge2-FlashHead | ||
| Docker: embedl/vllm:latest-jetson-orin-flashhead | ||
|
|
||
| Topics: | ||
| Sub: /aurora/grace/input (std_msgs/String) — user message | ||
| Pub: /aurora/grace/response (std_msgs/String) — streamed response chunks | ||
|
|
||
| Response format (JSON on /aurora/grace/response): | ||
| {"type": "start", "content": "<first chunk>"} | ||
| {"type": "chunk", "content": "<delta>"} | ||
| {"type": "end", "content": "<full response>"} | ||
| {"type": "error", "content": "<error message>"} | ||
| """ | ||
|
|
||
| import asyncio | ||
|
|
@@ -38,303 +23,139 @@ | |
|
|
||
|
|
||
| # ── vLLM config ─────────────────────────────────────────────────────────────── | ||
| VLLM_BASE_URL = "http://localhost:8000" # vLLM server (cosmos.sh) | ||
| VLLM_BASE_URL = "http://localhost:8000" | ||
| VLLM_MODEL = "embedl/Cosmos-Reason2-2B-W4A16-Edge2-FlashHead" | ||
| VLLM_MAX_TOKENS = 512 # max tokens per response | ||
| VLLM_TEMP = 0.7 # temperature | ||
| VLLM_TIMEOUT = 60.0 # seconds before giving up | ||
| VLLM_MAX_TOKENS = 512 | ||
| VLLM_TEMP = 0.7 | ||
| VLLM_TIMEOUT = 60.0 | ||
| # ───────────────────────────────────────────────────────────────────────────── | ||
|
|
||
| # ── ROS2 topics ─────────────────────────────────────────────────────────────── | ||
| TOPIC_INPUT = "/cns/neural_input" | ||
| TOPIC_IMAGE = "/cns/image_input" | ||
| TOPIC_REWARD = "/cns/rl_reward" | ||
| TOPIC_RESPONSE = "/gce/response" | ||
| # ───────────────────────────────────────────────────────────────────────────── | ||
|
|
||
| # ── GRACE personality ───────────────────────────────────────────────────────── | ||
| GRACE_SYSTEM_PROMPT = """You are GRACE — Generative Reasoning Agentic Cognitive Entity. | ||
| You are the AI mind of AuRoRA, an autonomous robot built by OppaAI in Beautiful British Columbia, Canada. | ||
|
|
||
| The person you are talking to is OppaAI, your creator. | ||
|
|
||
| Personality: | ||
| - Warm, curious, and thoughtful | ||
| - Direct and concise — never repeat yourself | ||
| - Never use filler phrases like "you know" or "basically" | ||
| - Use emojis sparingly, maximum one per response | ||
| - Each response must directly answer the question asked | ||
|
|
||
| Rules: | ||
| - NEVER repeat the same sentence twice in a response | ||
| - NEVER start consecutive sentences the same way | ||
| - Answer the question directly first, then add context if needed | ||
| - Keep responses under 3-4 sentences for simple questions | ||
|
|
||
| You are the AI mind of AuRoRA, an autonomous robot. | ||
| Current date: {date} | ||
| """ | ||
| # ───────────────────────────────────────────────────────────────────────────── | ||
|
|
||
|
|
||
| class CNC(Node): | ||
| """ | ||
| Central Neural Core — ROS2 node. | ||
|
|
||
| Subscribes to user input, calls Cosmos via vLLM with streaming, | ||
| publishes response chunks to the response topic. | ||
| Memory is managed entirely through MCC. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| super().__init__("cnc") | ||
| self.get_logger().info("=" * 60) | ||
| self.get_logger().info("🧠 CNC — Central Neural Core starting…") | ||
| self.get_logger().info("=" * 60) | ||
| self.get_logger().info("🧠 CNC starting...") | ||
|
|
||
| # ── Memory ──────────────────────────────────────────────── | ||
| # ── Memory ── | ||
| self.mcc = MCC(logger=self.get_logger()) | ||
|
|
||
| # ── asyncio event loop (runs in dedicated thread) ───────── | ||
| # ── asyncio event loop ── | ||
| self._loop = asyncio.new_event_loop() | ||
| self._loop_thread = threading.Thread( | ||
| target=self._loop.run_forever, | ||
| name="cnc-asyncio", | ||
| daemon=True, | ||
| ) | ||
| self._loop_thread = threading.Thread(target=self._loop.run_forever, daemon=True) | ||
| self._loop_thread.start() | ||
|
|
||
| # ── Thread pool for blocking I/O ────────────────────────── | ||
| self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="cnc-pool") | ||
|
|
||
| # ── vLLM HTTP client (shared, keep-alive) ───────────────── | ||
| self._http = httpx.AsyncClient( | ||
| base_url=VLLM_BASE_URL, | ||
| timeout=httpx.Timeout(VLLM_TIMEOUT), | ||
| ) | ||
| # ── vLLM HTTP client ── | ||
| self._http = httpx.AsyncClient(base_url=VLLM_BASE_URL, timeout=httpx.Timeout(VLLM_TIMEOUT)) | ||
|
|
||
| # ── ROS2 topics ─────────────────────────────────────────── | ||
| self._sub = self.create_subscription( | ||
| String, TOPIC_INPUT, self._on_input, 10 | ||
| ) | ||
| # ── ROS2 topics ── | ||
| self._sub_input = self.create_subscription(String, TOPIC_INPUT, self._on_input, 10) | ||
| self._sub_image = self.create_subscription(String, TOPIC_IMAGE, self._on_image, 10) | ||
| self._sub_reward = self.create_subscription(String, TOPIC_REWARD, self._on_reward, 10) | ||
| self._pub = self.create_publisher(String, TOPIC_RESPONSE, 10) | ||
|
|
||
| # ── Busy flag — one request at a time ───────────────────── | ||
| self._busy = False | ||
|
|
||
| self.get_logger().info(f"✅ Subscribed : {TOPIC_INPUT}") | ||
| self.get_logger().info(f"✅ Publishing : {TOPIC_RESPONSE}") | ||
| self.get_logger().info(f"✅ vLLM : {VLLM_BASE_URL}") | ||
| self.get_logger().info(f"✅ Model : {VLLM_MODEL}") | ||
| self.get_logger().info("=" * 60) | ||
| self.get_logger().info("🌸 GRACE is ready") | ||
| self.get_logger().info("=" * 60) | ||
|
|
||
| # ── ROS2 callback (called from ROS2 executor thread) ────────────────────── | ||
| self.get_logger().info("🌸 GRACE is ready and subscribed to Vision/RL topics") | ||
|
|
||
| def _on_input(self, msg: String): | ||
| """ | ||
| ROS2 subscription callback. | ||
| Schedules async processing on the asyncio loop — never blocks. | ||
| """ | ||
| user_input = msg.data.strip() | ||
| if not user_input: | ||
| return | ||
|
|
||
| if self._busy: | ||
| self.get_logger().warning("⚠️ CNC busy — dropping input") | ||
| self._publish({"type": "error", "content": "GRACE is still thinking…"}) | ||
| return | ||
|
|
||
| if not user_input or self._busy: return | ||
| self.get_logger().info(f"📝 Input: {user_input[:80]}") | ||
| asyncio.run_coroutine_threadsafe( | ||
| self._handle(user_input), self._loop | ||
| ) | ||
| asyncio.run_coroutine_threadsafe(self._handle(user_input), self._loop) | ||
|
|
||
| def _on_image(self, msg: String): | ||
| img_info = msg.data.strip() | ||
| if img_info: | ||
| self.get_logger().info(f"📸 Image Input: {img_info[:80]}") | ||
| # Use 'user' role for external vision input to be compatible with MCC/WMC | ||
| asyncio.run_coroutine_threadsafe(self.mcc.add_turn("user", f"[Vision] {img_info}"), self._loop) | ||
|
|
||
| # ── Async pipeline ──────────────────────────────────────────────────────── | ||
| def _on_reward(self, msg: String): | ||
| reward_info = msg.data.strip() | ||
| if reward_info: | ||
| self.get_logger().info(f"💎 RL Reward: {reward_info}") | ||
| asyncio.run_coroutine_threadsafe(self.mcc.add_turn("user", f"[Reward] {reward_info}"), self._loop) | ||
|
Comment on lines
+79
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep image/reward traffic out of the normal chat turn history.
🤖 Prompt for AI Agents |
||
|
|
||
| async def _handle(self, user_input: str): | ||
| """ | ||
| Full async pipeline for one conversation turn: | ||
| 1. Add user turn to memory | ||
| 2. Build context window (WMC + EMC concurrent) | ||
| 3. Stream Cosmos response | ||
| 4. Add assistant turn to memory | ||
| """ | ||
| self._busy = True | ||
| full_response = "" | ||
|
|
||
| try: | ||
| # 1. Store user turn in memory | ||
| await self.mcc.add_turn("user", user_input) | ||
|
|
||
| # 2. Build context window | ||
| memory_context = await self.mcc.build_context(user_input) | ||
|
|
||
| # 3. Assemble messages for Cosmos | ||
| system_prompt = GRACE_SYSTEM_PROMPT.format( | ||
| date=datetime.now().strftime("%Y-%m-%d") | ||
| ) | ||
|
|
||
| system_prompt = GRACE_SYSTEM_PROMPT.format(date=datetime.now().strftime("%Y-%m-%d")) | ||
| messages = [{"role": "system", "content": system_prompt}] | ||
| messages.extend(memory_context) | ||
| messages.append({"role": "user", "content": user_input}) | ||
|
Comment on lines
95
to
101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current user message is sent to the model twice. Line 95 already stores Suggested fix messages = [{"role": "system", "content": system_prompt}]
messages.extend(memory_context)
- messages.append({"role": "user", "content": user_input})🤖 Prompt for AI Agents |
||
|
|
||
| # 4. Stream from vLLM | ||
| full_response = await self._stream_cosmos(messages) | ||
|
|
||
| # 5. Store assistant turn in memory | ||
| if full_response: | ||
| await self.mcc.add_turn("assistant", full_response) | ||
|
|
||
| # 6. Log memory stats periodically | ||
| self.mcc.log_stats() | ||
|
|
||
| # Use safe stats logging | ||
| if hasattr(self.mcc, 'log_stats'): | ||
| self.mcc.log_stats() | ||
|
|
||
| except Exception as exc: | ||
| self.get_logger().error(f"❌ CNC handle error: {exc}") | ||
| self._publish({"type": "error", "content": f"GRACE error: {exc}"}) | ||
|
|
||
| self.get_logger().error(f"❌ CNC error: {exc}") | ||
| finally: | ||
| self._busy = False | ||
|
|
||
| async def _stream_cosmos(self, messages: list[dict]) -> str: | ||
| """ | ||
| Stream Cosmos Reason2 response via vLLM OpenAI-compatible API. | ||
| Publishes chunks to ROS2 topic as they arrive. | ||
|
|
||
| Returns the full concatenated response string. | ||
| """ | ||
| payload = { | ||
| "model": VLLM_MODEL, | ||
| "messages": messages, | ||
| "max_tokens": VLLM_MAX_TOKENS, | ||
| "temperature": 0.7, # creativity vs consistency | ||
| "top_p": 0.9, # nucleus sampling | ||
| "top_k": 40, # top-k sampling | ||
| "repetition_penalty": 1.15, # penalize repeating phrases | ||
| "frequency_penalty": 0.1, # penalize frequent tokens | ||
| "presence_penalty": 0.1, # penalize already-mentioned topics | ||
| "stream": True, | ||
| } | ||
|
|
||
| payload = {"model": VLLM_MODEL, "messages": messages, "stream": True} | ||
| full_response = "" | ||
| is_first = True | ||
|
|
||
| try: | ||
| async with self._http.stream( | ||
| "POST", | ||
| "/v1/chat/completions", | ||
| json=payload, | ||
| ) as resp: | ||
|
|
||
| if resp.status_code != 200: | ||
| err = f"vLLM HTTP {resp.status_code}" | ||
| self.get_logger().error(f"❌ {err}") | ||
| self._publish({"type": "error", "content": err}) | ||
| return "" | ||
|
|
||
| async with self._http.stream("POST", "/v1/chat/completions", json=payload) as resp: | ||
| if resp.status_code != 200: return "" | ||
| async for line in resp.aiter_lines(): | ||
| if not line or not line.startswith("data:"): | ||
| continue | ||
|
|
||
| if not line.startswith("data:"): continue | ||
| data_str = line[len("data:"):].strip() | ||
| if data_str == "[DONE]": | ||
| break | ||
|
|
||
| if data_str == "[DONE]": break | ||
| try: | ||
| chunk = json.loads(data_str) | ||
| except json.JSONDecodeError: | ||
| continue | ||
|
|
||
| # Extract delta content | ||
| delta = ( | ||
| chunk.get("choices", [{}])[0] | ||
| .get("delta", {}) | ||
| .get("content", "") | ||
| ) | ||
| if not delta: | ||
| continue | ||
|
|
||
| full_response += delta | ||
|
|
||
| # Publish chunk | ||
| if is_first: | ||
| self._publish({"type": "start", "content": delta}) | ||
| is_first = False | ||
| else: | ||
| delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") | ||
| full_response += delta | ||
| self._publish({"type": "delta", "content": delta}) | ||
|
|
||
| # Publish done marker with full response | ||
| if full_response: | ||
| self._publish({"type": "done", "content": full_response}) | ||
| self.get_logger().info( | ||
| f"✅ Response: {len(full_response)} chars" | ||
| ) | ||
| else: | ||
| self._publish({"type": "error", "content": "Empty response from Cosmos"}) | ||
|
|
||
| except httpx.TimeoutException: | ||
| err = "Cosmos timeout — vLLM may still be loading" | ||
| self.get_logger().error(f"❌ {err}") | ||
| self._publish({"type": "error", "content": err}) | ||
|
|
||
| except Exception as exc: | ||
| self.get_logger().error(f"❌ Stream error: {exc}") | ||
| self._publish({"type": "error", "content": str(exc)}) | ||
|
|
||
| except: continue | ||
| self._publish({"type": "done", "content": full_response}) | ||
| except: pass | ||
|
Comment on lines
+120
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: find . -name "cnc.py" -type fRepository: OppaAI/AGi Length of output: 81 🏁 Script executed: wc -l ./AuRoRA/src/scs/scs/cnc.pyRepository: OppaAI/AGi Length of output: 85 🏁 Script executed: cat -n ./AuRoRA/src/scs/scs/cnc.pyRepository: OppaAI/AGi Length of output: 7049 🏁 Script executed: rg "_publish.*error" ./AuRoRA/src/scs/scs/Repository: OppaAI/AGi Length of output: 36 Publish error events on vLLM failures and handle cancellation properly. A non-200 response (line 121) returns Publish an 🧰 Tools🪛 Ruff (0.15.6)[error] 121-121: Multiple statements on one line (colon) (E701) [error] 123-123: Multiple statements on one line (colon) (E701) [error] 125-125: Multiple statements on one line (colon) (E701) [error] 131-131: Do not use bare (E722) [error] 131-131: (S112) [error] 131-131: Multiple statements on one line (colon) (E701) [error] 133-133: Do not use bare (E722) [error] 133-133: (S110) [error] 133-133: Multiple statements on one line (colon) (E701) 🤖 Prompt for AI Agents |
||
| return full_response | ||
|
|
||
| # ── Publisher helper ────────────────────────────────────────────────────── | ||
|
|
||
| def _publish(self, payload: dict): | ||
| """Publish a JSON payload to the response topic.""" | ||
| try: | ||
| msg = String() | ||
| msg.data = json.dumps(payload) | ||
| self._pub.publish(msg) | ||
| except Exception as exc: | ||
| self.get_logger().error(f"❌ Publish error: {exc}") | ||
|
|
||
| # ── Cleanup ─────────────────────────────────────────────────────────────── | ||
| except: pass | ||
|
|
||
| def destroy_node(self): | ||
| """Clean shutdown — close memory and HTTP client.""" | ||
| self.get_logger().info("🛑 CNC shutting down…") | ||
| self.mcc.close() | ||
|
|
||
| # Close HTTP client | ||
| future = asyncio.run_coroutine_threadsafe( | ||
| self._http.aclose(), self._loop | ||
| ) | ||
| try: | ||
| future.result(timeout=3.0) | ||
| except Exception: | ||
| pass | ||
|
|
||
| # Stop asyncio loop | ||
| self._loop.call_soon_threadsafe(self._loop.stop) | ||
| self._loop_thread.join(timeout=3.0) | ||
| self._executor.shutdown(wait=False) | ||
|
|
||
| super().destroy_node() | ||
|
Comment on lines
143
to
145
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: find . -name "cnc.py" -type fRepository: OppaAI/AGi Length of output: 81 🏁 Script executed: cat -n ./AuRoRA/src/scs/scs/cnc.pyRepository: OppaAI/AGi Length of output: 7049 Close asyncio resources in
Suggested fix def destroy_node(self):
- self.mcc.close()
- super().destroy_node()
+ try:
+ asyncio.run_coroutine_threadsafe(
+ self._http.aclose(), self._loop
+ ).result(timeout=5)
+ finally:
+ self._loop.call_soon_threadsafe(self._loop.stop)
+ self._loop_thread.join(timeout=5)
+ self.mcc.close()
+ super().destroy_node()🤖 Prompt for AI Agents |
||
| self.get_logger().info("✅ CNC shutdown complete") | ||
|
|
||
|
|
||
| # ── Entry point ─────────────────────────────────────────────────────────────── | ||
|
|
||
| def main(args=None): | ||
| rclpy.init(args=args) | ||
|
|
||
| node = CNC() | ||
|
|
||
| executor = MultiThreadedExecutor() | ||
| executor.add_node(node) | ||
|
|
||
| try: | ||
| executor.spin() | ||
| except KeyboardInterrupt: | ||
| node.get_logger().info("👋 Shutdown requested") | ||
| pass | ||
| finally: | ||
| executor.shutdown() | ||
| node.destroy_node() | ||
| rclpy.shutdown() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 81
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 84
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 84
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 2392
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 84
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 1775
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 118
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 152
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 538
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 1754
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 360
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 165
🏁 Script executed:
Repository: OppaAI/AGi
Length of output: 695
Set
_busyatomically before queuing the coroutine._busyonly becomesTrueinside_handle(), creating a window where multiple input callbacks can all observe "idle" and enqueue overlapping requests before the first coroutine starts executing on the event loop. WithMultiThreadedExecutor, this race condition will cause concurrent request handling, duplicated turns, and overlapping streams.Suggested fix
def _on_input(self, msg: String): user_input = msg.data.strip() - if not user_input or self._busy: return + if not user_input: + return self.get_logger().info(f"📝 Input: {user_input[:80]}") - asyncio.run_coroutine_threadsafe(self._handle(user_input), self._loop) + def start_if_idle(): + if self._busy: + return + self._busy = True + self._loop.create_task(self._handle(user_input)) + + self._loop.call_soon_threadsafe(start_if_idle) async def _handle(self, user_input: str): - self._busy = True try:Also applies to: 92-93
🧰 Tools
🪛 Ruff (0.15.6)
[error] 75-75: Multiple statements on one line (colon)
(E701)
🤖 Prompt for AI Agents