Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 51 additions & 230 deletions AuRoRA/src/scs/scs/cnc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,6 @@
AuRoRA · Semantic Cognitive System (SCS)

ROS2 node — the brain of GRACE.
Milestone 1: Chatbot with Working Memory (WMC) + Episodic Memory (EMC)

Inference: NVIDIA Cosmos Reason2 2B via vLLM
embedl/Cosmos-Reason2-2B-W4A16-Edge2-FlashHead
Docker: embedl/vllm:latest-jetson-orin-flashhead

Topics:
Sub: /aurora/grace/input (std_msgs/String) — user message
Pub: /aurora/grace/response (std_msgs/String) — streamed response chunks

Response format (JSON on /aurora/grace/response):
{"type": "start", "content": "<first chunk>"}
{"type": "chunk", "content": "<delta>"}
{"type": "end", "content": "<full response>"}
{"type": "error", "content": "<error message>"}
"""

import asyncio
Expand All @@ -38,303 +23,139 @@


# ── vLLM config ───────────────────────────────────────────────────────────────
VLLM_BASE_URL = "http://localhost:8000" # vLLM server (cosmos.sh)
VLLM_BASE_URL = "http://localhost:8000"
VLLM_MODEL = "embedl/Cosmos-Reason2-2B-W4A16-Edge2-FlashHead"
VLLM_MAX_TOKENS = 512 # max tokens per response
VLLM_TEMP = 0.7 # temperature
VLLM_TIMEOUT = 60.0 # seconds before giving up
VLLM_MAX_TOKENS = 512
VLLM_TEMP = 0.7
VLLM_TIMEOUT = 60.0
# ─────────────────────────────────────────────────────────────────────────────

# ── ROS2 topics ───────────────────────────────────────────────────────────────
TOPIC_INPUT = "/cns/neural_input"
TOPIC_IMAGE = "/cns/image_input"
TOPIC_REWARD = "/cns/rl_reward"
TOPIC_RESPONSE = "/gce/response"
# ─────────────────────────────────────────────────────────────────────────────

# ── GRACE personality ─────────────────────────────────────────────────────────
GRACE_SYSTEM_PROMPT = """You are GRACE — Generative Reasoning Agentic Cognitive Entity.
You are the AI mind of AuRoRA, an autonomous robot built by OppaAI in Beautiful British Columbia, Canada.

The person you are talking to is OppaAI, your creator.

Personality:
- Warm, curious, and thoughtful
- Direct and concise — never repeat yourself
- Never use filler phrases like "you know" or "basically"
- Use emojis sparingly, maximum one per response
- Each response must directly answer the question asked

Rules:
- NEVER repeat the same sentence twice in a response
- NEVER start consecutive sentences the same way
- Answer the question directly first, then add context if needed
- Keep responses under 3-4 sentences for simple questions

You are the AI mind of AuRoRA, an autonomous robot.
Current date: {date}
"""
# ─────────────────────────────────────────────────────────────────────────────


class CNC(Node):
"""
Central Neural Core — ROS2 node.

Subscribes to user input, calls Cosmos via vLLM with streaming,
publishes response chunks to the response topic.
Memory is managed entirely through MCC.
"""

def __init__(self):
super().__init__("cnc")
self.get_logger().info("=" * 60)
self.get_logger().info("🧠 CNC — Central Neural Core starting…")
self.get_logger().info("=" * 60)
self.get_logger().info("🧠 CNC starting...")

# ── Memory ────────────────────────────────────────────────
# ── Memory ──
self.mcc = MCC(logger=self.get_logger())

# ── asyncio event loop (runs in dedicated thread) ─────────
# ── asyncio event loop ──
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(
target=self._loop.run_forever,
name="cnc-asyncio",
daemon=True,
)
self._loop_thread = threading.Thread(target=self._loop.run_forever, daemon=True)
self._loop_thread.start()

# ── Thread pool for blocking I/O ──────────────────────────
self._executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="cnc-pool")

# ── vLLM HTTP client (shared, keep-alive) ─────────────────
self._http = httpx.AsyncClient(
base_url=VLLM_BASE_URL,
timeout=httpx.Timeout(VLLM_TIMEOUT),
)
# ── vLLM HTTP client ──
self._http = httpx.AsyncClient(base_url=VLLM_BASE_URL, timeout=httpx.Timeout(VLLM_TIMEOUT))

# ── ROS2 topics ───────────────────────────────────────────
self._sub = self.create_subscription(
String, TOPIC_INPUT, self._on_input, 10
)
# ── ROS2 topics ──
self._sub_input = self.create_subscription(String, TOPIC_INPUT, self._on_input, 10)
self._sub_image = self.create_subscription(String, TOPIC_IMAGE, self._on_image, 10)
self._sub_reward = self.create_subscription(String, TOPIC_REWARD, self._on_reward, 10)
self._pub = self.create_publisher(String, TOPIC_RESPONSE, 10)

# ── Busy flag — one request at a time ─────────────────────
self._busy = False

self.get_logger().info(f"✅ Subscribed : {TOPIC_INPUT}")
self.get_logger().info(f"✅ Publishing : {TOPIC_RESPONSE}")
self.get_logger().info(f"✅ vLLM : {VLLM_BASE_URL}")
self.get_logger().info(f"✅ Model : {VLLM_MODEL}")
self.get_logger().info("=" * 60)
self.get_logger().info("🌸 GRACE is ready")
self.get_logger().info("=" * 60)

# ── ROS2 callback (called from ROS2 executor thread) ──────────────────────
self.get_logger().info("🌸 GRACE is ready and subscribed to Vision/RL topics")

def _on_input(self, msg: String):
"""
ROS2 subscription callback.
Schedules async processing on the asyncio loop — never blocks.
"""
user_input = msg.data.strip()
if not user_input:
return

if self._busy:
self.get_logger().warning("⚠️ CNC busy — dropping input")
self._publish({"type": "error", "content": "GRACE is still thinking…"})
return

if not user_input or self._busy: return
self.get_logger().info(f"📝 Input: {user_input[:80]}")
asyncio.run_coroutine_threadsafe(
self._handle(user_input), self._loop
)
asyncio.run_coroutine_threadsafe(self._handle(user_input), self._loop)
Comment on lines 73 to +77
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find the file and check its content
find . -name "cnc.py" -type f

Repository: OppaAI/AGi

Length of output: 81


🏁 Script executed:

# Check the structure and find _busy usage
rg "_busy" --type python -n -B 2 -A 2 AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 84


🏁 Script executed:

# Also check __init__ to see _busy initialization
rg "def __init__|self._busy" --type python -n -B 1 -A 3 AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 84


🏁 Script executed:

# Get full context of _on_input and _handle methods
sed -n '60,110p' AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 2392


🏁 Script executed:

# Check if there's any other place setting _busy to False
rg "self._busy\s*=" --type python -n AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 84


🏁 Script executed:

# Continue reading the rest of _handle and look for _busy reset
sed -n '110,160p' AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 1775


🏁 Script executed:

# Search for where _busy is set to False or reset
grep -n "_busy = False" AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 118


🏁 Script executed:

# Check destroy_node or shutdown cleanup
grep -n "destroy_node\|def destroy" AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 152


🏁 Script executed:

# Find event loop initialization
grep -n "_loop\|_loop_thread" AuRoRA/src/scs/scs/cnc.py | head -20

Repository: OppaAI/AGi

Length of output: 538


🏁 Script executed:

# Get the full __init__ method to see event loop setup
sed -n '40,80p' AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 1754


🏁 Script executed:

# Verify that call_soon_threadsafe is the correct API for this
grep -n "call_soon_threadsafe\|run_coroutine_threadsafe" AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 360


🏁 Script executed:

# Check if there are any other thread-safety mechanisms or locking
grep -n "Lock\|threading\|mutex" AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 165


🏁 Script executed:

# Verify the logic: check _on_image and _on_reward to see if they can race too
sed -n '79,91p' AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 695


Set _busy atomically before queuing the coroutine.

_busy only becomes True inside _handle(), creating a window where multiple input callbacks can all observe "idle" and enqueue overlapping requests before the first coroutine starts executing on the event loop. With MultiThreadedExecutor, this race condition will cause concurrent request handling, duplicated turns, and overlapping streams.

Suggested fix
     def _on_input(self, msg: String):
         user_input = msg.data.strip()
-        if not user_input or self._busy: return
+        if not user_input:
+            return
         self.get_logger().info(f"📝 Input: {user_input[:80]}")
-        asyncio.run_coroutine_threadsafe(self._handle(user_input), self._loop)
+        def start_if_idle():
+            if self._busy:
+                return
+            self._busy = True
+            self._loop.create_task(self._handle(user_input))
+
+        self._loop.call_soon_threadsafe(start_if_idle)
 
     async def _handle(self, user_input: str):
-        self._busy = True
         try:

Also applies to: 92-93

🧰 Tools
🪛 Ruff (0.15.6)

[error] 75-75: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AuRoRA/src/scs/scs/cnc.py` around lines 73 - 77, Race condition: _busy is
only set inside _handle, so multiple threads can observe idle and enqueue
overlapping coroutines; fix by atomically setting _busy to True before
scheduling the coroutine and only then calling
asyncio.run_coroutine_threadsafe(self._handle(user_input), self._loop). Do this
in _on_input and the analogous callback at the other spot (lines 92-93), and
ensure _handle clears _busy when the request fully finishes (or use an existing
synchronization primitive like a dedicated lock/condition around _busy to make
check-and-set atomic).


def _on_image(self, msg: String):
img_info = msg.data.strip()
if img_info:
self.get_logger().info(f"📸 Image Input: {img_info[:80]}")
# Use 'user' role for external vision input to be compatible with MCC/WMC
asyncio.run_coroutine_threadsafe(self.mcc.add_turn("user", f"[Vision] {img_info}"), self._loop)

# ── Async pipeline ────────────────────────────────────────────────────────
def _on_reward(self, msg: String):
reward_info = msg.data.strip()
if reward_info:
self.get_logger().info(f"💎 RL Reward: {reward_info}")
asyncio.run_coroutine_threadsafe(self.mcc.add_turn("user", f"[Reward] {reward_info}"), self._loop)
Comment on lines +79 to +90
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep image/reward traffic out of the normal chat turn history.

AuRoRA/src/scs/scs/mcc.py:116-179 replays every WMC turn verbatim into future prompts, so these callbacks make sensor traffic look like ordinary user utterances on every request. High-rate /cns/image_input or /cns/rl_reward publishers will crowd out real dialogue and can steer the model through a channel that is not actually conversational input.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AuRoRA/src/scs/scs/cnc.py` around lines 79 - 90, The image/reward callbacks
(_on_image and _on_reward) currently call self.mcc.add_turn which inserts sensor
data into the main WMC turn history; stop using add_turn for sensor traffic and
instead record it in a non-replayed channel: either call a new MCC API like
add_auxiliary_turn/add_sensor_input (implement in mcc.py) or call add_turn with
a skip_replay=True flag and update the MCC replay logic to ignore turns with
that flag; update _on_image and _on_reward to use the new method/flag (or drop
the add_turn calls entirely) and ensure mcc.replay (in
AuRoRA/src/scs/scs/mcc.py) filters out those auxiliary/sensor turns so they are
not injected into future prompts.


async def _handle(self, user_input: str):
"""
Full async pipeline for one conversation turn:
1. Add user turn to memory
2. Build context window (WMC + EMC concurrent)
3. Stream Cosmos response
4. Add assistant turn to memory
"""
self._busy = True
full_response = ""

try:
# 1. Store user turn in memory
await self.mcc.add_turn("user", user_input)

# 2. Build context window
memory_context = await self.mcc.build_context(user_input)

# 3. Assemble messages for Cosmos
system_prompt = GRACE_SYSTEM_PROMPT.format(
date=datetime.now().strftime("%Y-%m-%d")
)

system_prompt = GRACE_SYSTEM_PROMPT.format(date=datetime.now().strftime("%Y-%m-%d"))
messages = [{"role": "system", "content": system_prompt}]
messages.extend(memory_context)
messages.append({"role": "user", "content": user_input})
Comment on lines 95 to 101
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The current user message is sent to the model twice.

Line 95 already stores user_input in WMC, and build_context() then returns the full WMC history. Line 101 appends the same turn again, so the active request is duplicated on every prompt and wastes context budget.

Suggested fix
             messages = [{"role": "system", "content": system_prompt}]
             messages.extend(memory_context)
-            messages.append({"role": "user", "content": user_input})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AuRoRA/src/scs/scs/cnc.py` around lines 95 - 101, The user_input is being
duplicated in the outgoing messages because you call self.mcc.add_turn("user",
user_input) then use memory_context = await self.mcc.build_context(user_input)
which returns the full working memory including the newly added turn, and then
you append {"role":"user","content":user_input"} again; remove the duplicated
append by either (A) not appending user_input to messages after
messages.extend(memory_context) or (B) change build_context(...) to return
history excluding the active turn and keep the explicit append—pick (A) for
minimal change: after messages.extend(memory_context) do not call
messages.append({"role": "user", "content": user_input}) so the final messages
contain system_prompt + memory_context only.


# 4. Stream from vLLM
full_response = await self._stream_cosmos(messages)

# 5. Store assistant turn in memory
if full_response:
await self.mcc.add_turn("assistant", full_response)

# 6. Log memory stats periodically
self.mcc.log_stats()

# Use safe stats logging
if hasattr(self.mcc, 'log_stats'):
self.mcc.log_stats()

except Exception as exc:
self.get_logger().error(f"❌ CNC handle error: {exc}")
self._publish({"type": "error", "content": f"GRACE error: {exc}"})

self.get_logger().error(f"❌ CNC error: {exc}")
finally:
self._busy = False

async def _stream_cosmos(self, messages: list[dict]) -> str:
"""
Stream Cosmos Reason2 response via vLLM OpenAI-compatible API.
Publishes chunks to ROS2 topic as they arrive.

Returns the full concatenated response string.
"""
payload = {
"model": VLLM_MODEL,
"messages": messages,
"max_tokens": VLLM_MAX_TOKENS,
"temperature": 0.7, # creativity vs consistency
"top_p": 0.9, # nucleus sampling
"top_k": 40, # top-k sampling
"repetition_penalty": 1.15, # penalize repeating phrases
"frequency_penalty": 0.1, # penalize frequent tokens
"presence_penalty": 0.1, # penalize already-mentioned topics
"stream": True,
}

payload = {"model": VLLM_MODEL, "messages": messages, "stream": True}
full_response = ""
is_first = True

try:
async with self._http.stream(
"POST",
"/v1/chat/completions",
json=payload,
) as resp:

if resp.status_code != 200:
err = f"vLLM HTTP {resp.status_code}"
self.get_logger().error(f"❌ {err}")
self._publish({"type": "error", "content": err})
return ""

async with self._http.stream("POST", "/v1/chat/completions", json=payload) as resp:
if resp.status_code != 200: return ""
async for line in resp.aiter_lines():
if not line or not line.startswith("data:"):
continue

if not line.startswith("data:"): continue
data_str = line[len("data:"):].strip()
if data_str == "[DONE]":
break

if data_str == "[DONE]": break
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue

# Extract delta content
delta = (
chunk.get("choices", [{}])[0]
.get("delta", {})
.get("content", "")
)
if not delta:
continue

full_response += delta

# Publish chunk
if is_first:
self._publish({"type": "start", "content": delta})
is_first = False
else:
delta = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
full_response += delta
self._publish({"type": "delta", "content": delta})

# Publish done marker with full response
if full_response:
self._publish({"type": "done", "content": full_response})
self.get_logger().info(
f"✅ Response: {len(full_response)} chars"
)
else:
self._publish({"type": "error", "content": "Empty response from Cosmos"})

except httpx.TimeoutException:
err = "Cosmos timeout — vLLM may still be loading"
self.get_logger().error(f"❌ {err}")
self._publish({"type": "error", "content": err})

except Exception as exc:
self.get_logger().error(f"❌ Stream error: {exc}")
self._publish({"type": "error", "content": str(exc)})

except: continue
self._publish({"type": "done", "content": full_response})
except: pass
Comment on lines +120 to +133
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "cnc.py" -type f

Repository: OppaAI/AGi

Length of output: 81


🏁 Script executed:

wc -l ./AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 85


🏁 Script executed:

cat -n ./AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 7049


🏁 Script executed:

rg "_publish.*error" ./AuRoRA/src/scs/scs/

Repository: OppaAI/AGi

Length of output: 36


Publish error events on vLLM failures and handle cancellation properly.

A non-200 response (line 121) returns "" without publishing a terminal "done" event. Both bare except blocks (lines 131 and 133) suppress exceptions and cancellation—from a /gce/response subscriber's perspective, a failed upstream call becomes indistinguishable from a stuck stream (both publish nothing). Additionally, line 133's bare except swallows asyncio.CancelledError, preventing proper cancellation.

Publish an "error" event on non-200 responses and unhandled exceptions, and avoid bare except clauses that catch CancelledError.

🧰 Tools
🪛 Ruff (0.15.6)

[error] 121-121: Multiple statements on one line (colon)

(E701)


[error] 123-123: Multiple statements on one line (colon)

(E701)


[error] 125-125: Multiple statements on one line (colon)

(E701)


[error] 131-131: Do not use bare except

(E722)


[error] 131-131: try-except-continue detected, consider logging the exception

(S112)


[error] 131-131: Multiple statements on one line (colon)

(E701)


[error] 133-133: Do not use bare except

(E722)


[error] 133-133: try-except-pass detected, consider logging the exception

(S110)


[error] 133-133: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AuRoRA/src/scs/scs/cnc.py` around lines 120 - 133, The stream handler should
publish an "error" event on non-200 responses and on unexpected exceptions and
must not swallow asyncio.CancelledError: in the block around self._http.stream,
when resp.status_code != 200 call self._publish({"type":"error","content":
f"non-200 response: {resp.status_code}"}), then return (or publish a terminal
"done" if desired); replace bare except clauses with explicit except
asyncio.CancelledError: raise and except Exception as e:
self._publish({"type":"error","content": str(e)}) (and then optionally publish
{"type":"done","content":full_response} or return), and keep normal success path
publishing deltas and a final {"type":"done","content": full_response};
reference self._http.stream, resp.status_code, self._publish, full_response to
locate the changes.

return full_response

# ── Publisher helper ──────────────────────────────────────────────────────

def _publish(self, payload: dict):
"""Publish a JSON payload to the response topic."""
try:
msg = String()
msg.data = json.dumps(payload)
self._pub.publish(msg)
except Exception as exc:
self.get_logger().error(f"❌ Publish error: {exc}")

# ── Cleanup ───────────────────────────────────────────────────────────────
except: pass

def destroy_node(self):
"""Clean shutdown — close memory and HTTP client."""
self.get_logger().info("🛑 CNC shutting down…")
self.mcc.close()

# Close HTTP client
future = asyncio.run_coroutine_threadsafe(
self._http.aclose(), self._loop
)
try:
future.result(timeout=3.0)
except Exception:
pass

# Stop asyncio loop
self._loop.call_soon_threadsafe(self._loop.stop)
self._loop_thread.join(timeout=3.0)
self._executor.shutdown(wait=False)

super().destroy_node()
Comment on lines 143 to 145
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "cnc.py" -type f

Repository: OppaAI/AGi

Length of output: 81


🏁 Script executed:

cat -n ./AuRoRA/src/scs/scs/cnc.py

Repository: OppaAI/AGi

Length of output: 7049


Close asyncio resources in destroy_node().

__init__() creates self._http (httpx.AsyncClient) and a dedicated event-loop thread (self._loop_thread running self._loop.run_forever()), but destroy_node() only closes mcc. This leaves the HTTP client sockets open and the event loop thread running indefinitely, causing resource leaks across node lifecycle.

Suggested fix
    def destroy_node(self):
-        self.mcc.close()
-        super().destroy_node()
+        try:
+            asyncio.run_coroutine_threadsafe(
+                self._http.aclose(), self._loop
+            ).result(timeout=5)
+        finally:
+            self._loop.call_soon_threadsafe(self._loop.stop)
+            self._loop_thread.join(timeout=5)
+            self.mcc.close()
+            super().destroy_node()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@AuRoRA/src/scs/scs/cnc.py` around lines 143 - 145, destroy_node() currently
only closes self.mcc and leaks the httpx.AsyncClient and the background
event-loop thread created in __init__ (self._http, self._loop_thread and
associated self._loop); update destroy_node to: (1) asynchronously close
self._http (await self._http.aclose() or schedule on the node's loop if
destroy_node is synchronous), (2) stop the event loop (call
self._loop.call_soon_threadsafe(self._loop.stop)) and join/terminate
self._loop_thread, and (3) then call super().destroy_node(); ensure exceptions
from closing are caught/logged so shutdown proceeds cleanly.

self.get_logger().info("✅ CNC shutdown complete")


# ── Entry point ───────────────────────────────────────────────────────────────

def main(args=None):
rclpy.init(args=args)

node = CNC()

executor = MultiThreadedExecutor()
executor.add_node(node)

try:
executor.spin()
except KeyboardInterrupt:
node.get_logger().info("👋 Shutdown requested")
pass
finally:
executor.shutdown()
node.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()