Conversation
📝 WalkthroughWalkthroughAdds a Kaizen MCP integration: fetches task-specific guidelines to append to dynamically built prompts and conditionally persists chat trajectories (async or sync) based on new kaizen settings and task outcome. Includes config validators, settings, unit tests, and wiring into CugaLite graph/node flows. Changes
Sequence Diagram(s)sequenceDiagram
participant CugaLite as CugaLite Graph
participant KaizenInteg as KaizenIntegration
participant Transport as SSE Transport
participant KaizenMCP as Kaizen MCP Server
rect rgba(100, 200, 150, 0.5)
Note over CugaLite,KaizenMCP: Guideline fetch and injection
CugaLite->>KaizenInteg: get_guidelines(task_description)
activate KaizenInteg
KaizenInteg->>KaizenInteg: is_enabled()?
alt enabled
KaizenInteg->>Transport: create SSE client
KaizenInteg->>KaizenMCP: call_tool("get_guidelines", {task})
KaizenMCP-->>KaizenInteg: guidelines
KaizenInteg-->>CugaLite: guidelines (or None)
else disabled
KaizenInteg-->>CugaLite: None
end
deactivate KaizenInteg
CugaLite->>CugaLite: append guidelines to special_instructions
end
rect rgba(150, 150, 200, 0.5)
Note over CugaLite,KaizenMCP: Trajectory persistence
CugaLite->>KaizenInteg: save_trajectory(chat_messages, task_id, success)
activate KaizenInteg
KaizenInteg->>KaizenInteg: is_enabled()? should_save?
alt should save
KaizenInteg->>KaizenInteg: _convert_messages -> json
alt async_save
KaizenInteg->>KaizenInteg: asyncio.create_task(call_tool...)
else sync
KaizenInteg->>Transport: create SSE client
KaizenInteg->>KaizenMCP: call_tool("save_trajectory", {trajectory_data, task_id})
KaizenMCP-->>KaizenInteg: response
end
else skip
KaizenInteg-->>CugaLite: no-op
end
deactivate KaizenInteg
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_graph.py`:
- Around line 726-729: The code currently logs the full injected prompt via
logger.debug when building kaizen_section and special_instructions_final;
replace the debug log with a non-sensitive summary that only records the
injection event and the length (or character count) of
special_instructions_final rather than its contents. Locate the block that
builds kaizen_section and appends it to special_instructions_final (references:
kaizen_section and special_instructions_final) and change logger.debug(f"Kaizen:
Full special_instructions with guidelines...\n{special_instructions_final}") to
something like a single-line debug/info stating the injection occurred and
reporting len(special_instructions_final) (or another size metric), keeping the
existing logger.info("Kaizen: Injected guidelines into system prompt") call.
In `@src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.py`:
- Around line 395-396: The success flag currently uses only
self._has_error(state.final_answer or "") and can miss failures recorded in
state.error; update the derivation of success to consider both signals (e.g.,
set success = not (self._has_error(state.final_answer or "") or
bool(state.error))) so runs with state.error set are treated as failures and
trigger save_on_failure; adjust the block around task_id and success
(references: state.error, state.final_answer, self._has_error, success)
accordingly.
- Around line 397-402: The background save task started in CugaLiteNode (the
call to _asyncio.create_task(KaizenIntegration.save_trajectory(...))) must be
kept as a strong reference to avoid premature GC; add a self._background_tasks =
set() in CugaLiteNode.__init__, then replace the fire-and-forget create_task
call by assigning the returned Task to a variable, add a done callback that
removes the task from self._background_tasks, and add the task to
self._background_tasks; reference the CugaLiteNode class, the __init__ method,
and the create_task invocation/KaizenIntegration.save_trajectory call when
making this change.
In `@src/cuga/backend/kaizen/kaizen_integration.py`:
- Line 10: The Kaizen RPC/SSE calls must be bounded to avoid hanging; wrap the
calls to self._call_tool(...) and the SSE connect/call sequence inside
asyncio.wait_for with a configurable timeout (e.g., KAIZEN_RPC_TIMEOUT or a
method param) in get_guidelines() and the other similar call site (the SSE
connect/call block), catch asyncio.TimeoutError and treat it as non-fatal by
logging and returning a safe default (e.g., None or an empty guideline list) so
the Lite task proceeds; ensure asyncio is imported and the timeout value is
documented/configurable on the KaizenIntegration/client class.
- Around line 86-91: The debug call currently logs raw serialized chat payload
(logger.debug using trajectory_json), which can leak user data; remove or
replace that raw payload with non-sensitive metadata. Update the code around the
logger calls (the logger.info/logger.debug block that references
openai_messages, trajectory_json, task_id, success) to log only counts/ids
(e.g., len(openai_messages), truncated task_id, success) or a redacted/hash-safe
summary of trajectory_json (no raw content or secrets). Ensure logger.debug no
longer outputs trajectory_json raw content and instead uses a safe summary or
redaction before logging.
In `@src/cuga/backend/kaizen/tests/test_kaizen_integration.py`:
- Around line 13-15: Remove the unused imports MagicMock and ToolMessage from
the test module to satisfy linter Ruff; locate the import line in
src/cuga/backend/kaizen/tests/test_kaizen_integration.py where MagicMock and
ToolMessage are imported and delete those two symbols so only AsyncMock, patch
and the used message classes (HumanMessage, AIMessage, SystemMessage) remain.
In `@src/cuga/settings.toml`:
- Around line 131-137: The Kaizen integration defaults to enabled in the
[kaizen] TOML block (key: enabled) which contradicts the PR intent to require
opt-in; change the default to disabled by setting enabled = false in the
[kaizen] section and keep other keys (url, lite_mode_only, save_on_success,
save_on_failure, async_save) unchanged so fresh installs and CI won't attempt
the local SSE endpoint unless users explicitly enable Kaizen.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 491045ad-5ee7-466a-8318-1dc4b630734b
📒 Files selected for processing (8)
src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_graph.pysrc/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.pysrc/cuga/backend/kaizen/__init__.pysrc/cuga/backend/kaizen/kaizen_integration.pysrc/cuga/backend/kaizen/tests/__init__.pysrc/cuga/backend/kaizen/tests/test_kaizen_integration.pysrc/cuga/config.pysrc/cuga/settings.toml
|
Please handle ruff issues |
…ctory saving, and injected guidelines.
cfe6eee to
60ffb41
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (4)
src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.py (2)
397-398:⚠️ Potential issue | 🟠 MajorInclude
state.errorwhen determining Kaizen success flag.Currently,
successis derived only fromself._has_error(state.final_answer or ""). However, the Lite graph can record execution failures instate.errorwithout necessarily reflecting them infinal_answer. This could label failed runs as successful, bypassingsave_on_failurelogic.🛠️ Suggested fix
task_id = state.sub_task or tracker.task_id or "unknown" - success = not self._has_error(state.final_answer or "") + success = not (state.error or self._has_error(state.final_answer or ""))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.py` around lines 397 - 398, The success flag currently computed in the block that sets task_id and success (using self._has_error(state.final_answer or "")) must also consider state.error; update the logic that sets success (the variable named success) to treat any non-empty state.error as a failure in addition to self._has_error on state.final_answer, so that runs with execution errors are not marked successful and thus still trigger save_on_failure handling.
399-402:⚠️ Potential issue | 🟠 MajorRetain a strong reference to the background save task.
asyncio.create_task()without storing the returnedTaskmakes it eligible for garbage collection before completion. This is also flagged by Ruff (RUF006). Follow the pattern intracker.pyby storing background tasks in a set with a done callback.🛠️ Suggested fix
Add to
CugaLiteNode.__init__:self._background_tasks: set = set()Then update the async save block:
if settings.kaizen.async_save: - _asyncio.create_task(KaizenIntegration.save_trajectory(state.chat_messages, task_id, success)) + task = _asyncio.create_task( + KaizenIntegration.save_trajectory(list(state.chat_messages), task_id, success) + ) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.py` around lines 399 - 402, The background save task created when settings.kaizen.async_save is true is not retained and can be GC'd; update CugaLiteNode to hold a set of background tasks (e.g., add self._background_tasks: set = set() in CugaLiteNode.__init__) and when creating the task for KaizenIntegration.save_trajectory use _asyncio.create_task(...) and add the returned Task to self._background_tasks, attaching a done callback that removes the task from the set (follow the pattern used in tracker.py) so the task is strongly referenced until completion.src/cuga/backend/kaizen/kaizen_integration.py (2)
85-90:⚠️ Potential issue | 🟠 MajorAvoid logging raw trajectory payload.
Line 90 logs
trajectory_json[:500], which can capture user prompts, secrets, or tool outputs even at debug level. The info log on lines 85-89 already provides sufficient metadata (message count, char length, truncated task_id, success flag).🛠️ Suggested fix
logger.info( f"Kaizen: Saving trajectory ({len(openai_messages)} messages, " f"{len(trajectory_json)} chars, " f"task_id={task_id[:80]}, success={success})" ) - logger.debug(f"Kaizen: trajectory_data preview: {trajectory_json[:500]}") + logger.debug( + f"Kaizen: trajectory_data prepared ({len(openai_messages)} messages, " + f"{len(trajectory_json)} chars)" + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cuga/backend/kaizen/kaizen_integration.py` around lines 85 - 90, Remove the debug logging of raw trajectory payload (the logger.debug call that prints trajectory_json[:500]) to avoid leaking prompts/secrets; instead log only safe metadata or a non-sensitive artifact (e.g., lengths, counts, truncated task_id, or a hash) using the existing variables openai_messages, len(trajectory_json), task_id and success. Locate the logger.debug line that references trajectory_json and either delete it or replace it with a safe statement (e.g., logging "trajectory preview redacted" or a SHA256/hex digest of trajectory_json) so no raw user/tool content is emitted.
147-151:⚠️ Potential issue | 🟠 MajorBound the Kaizen RPC with a timeout.
The
_call_tool()method creates an SSE connection and awaits the tool call without any timeout. Per the module's "non-fatal" contract, a hung Kaizen endpoint should not stall CugaLite tasks indefinitely.🛠️ Suggested fix
+ import asyncio + `@classmethod` async def _call_tool(cls, tool_name: str, args: dict): ... from fastmcp import Client from fastmcp.client.transports import SSETransport from mcp.types import TextContent url = settings.kaizen.url transport = SSETransport(url) + timeout_s = getattr(settings, "kaizen", None) + timeout_s = getattr(timeout_s, "timeout", 30) if timeout_s else 30 - async with Client(transport) as client: - result = await client.call_tool(tool_name, args) + async def _invoke(): + async with Client(transport) as client: + return await client.call_tool(tool_name, args) + + try: + result = await asyncio.wait_for(_invoke(), timeout=timeout_s) + except asyncio.TimeoutError: + logger.warning(f"Kaizen {tool_name} timed out after {timeout_s}s (non-fatal)") + return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cuga/backend/kaizen/kaizen_integration.py` around lines 147 - 151, The _call_tool function currently awaits client.call_tool(...) with no bound; wrap the call in asyncio.wait_for using a configurable timeout (e.g., settings.kaizen.timeout) to prevent hangs: create transport = SSETransport(url) and keep the async with Client(transport) as client block, then replace result = await client.call_tool(tool_name, args) with result = await asyncio.wait_for(client.call_tool(tool_name, args), timeout=settings.kaizen.timeout). Add an except asyncio.TimeoutError handler that logs the timeout (including tool_name, args/context) and returns a non-fatal value or raises a controlled exception per the module contract so CugaLite tasks aren't stalled; ensure the async with still closes transport/client properly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.py`:
- Around line 397-398: The success flag currently computed in the block that
sets task_id and success (using self._has_error(state.final_answer or "")) must
also consider state.error; update the logic that sets success (the variable
named success) to treat any non-empty state.error as a failure in addition to
self._has_error on state.final_answer, so that runs with execution errors are
not marked successful and thus still trigger save_on_failure handling.
- Around line 399-402: The background save task created when
settings.kaizen.async_save is true is not retained and can be GC'd; update
CugaLiteNode to hold a set of background tasks (e.g., add
self._background_tasks: set = set() in CugaLiteNode.__init__) and when creating
the task for KaizenIntegration.save_trajectory use _asyncio.create_task(...) and
add the returned Task to self._background_tasks, attaching a done callback that
removes the task from the set (follow the pattern used in tracker.py) so the
task is strongly referenced until completion.
In `@src/cuga/backend/kaizen/kaizen_integration.py`:
- Around line 85-90: Remove the debug logging of raw trajectory payload (the
logger.debug call that prints trajectory_json[:500]) to avoid leaking
prompts/secrets; instead log only safe metadata or a non-sensitive artifact
(e.g., lengths, counts, truncated task_id, or a hash) using the existing
variables openai_messages, len(trajectory_json), task_id and success. Locate the
logger.debug line that references trajectory_json and either delete it or
replace it with a safe statement (e.g., logging "trajectory preview redacted" or
a SHA256/hex digest of trajectory_json) so no raw user/tool content is emitted.
- Around line 147-151: The _call_tool function currently awaits
client.call_tool(...) with no bound; wrap the call in asyncio.wait_for using a
configurable timeout (e.g., settings.kaizen.timeout) to prevent hangs: create
transport = SSETransport(url) and keep the async with Client(transport) as
client block, then replace result = await client.call_tool(tool_name, args) with
result = await asyncio.wait_for(client.call_tool(tool_name, args),
timeout=settings.kaizen.timeout). Add an except asyncio.TimeoutError handler
that logs the timeout (including tool_name, args/context) and returns a
non-fatal value or raises a controlled exception per the module contract so
CugaLite tasks aren't stalled; ensure the async with still closes
transport/client properly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d8b0acd8-3490-4776-af8e-ae442c275ac9
📒 Files selected for processing (8)
src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_graph.pysrc/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.pysrc/cuga/backend/kaizen/__init__.pysrc/cuga/backend/kaizen/kaizen_integration.pysrc/cuga/backend/kaizen/tests/__init__.pysrc/cuga/backend/kaizen/tests/test_kaizen_integration.pysrc/cuga/config.pysrc/cuga/settings.toml
✅ Files skipped from review due to trivial changes (1)
- src/cuga/config.py
🚧 Files skipped from review as they are similar to previous changes (2)
- src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_graph.py
- src/cuga/settings.toml
There was a problem hiding this comment.
♻️ Duplicate comments (2)
src/cuga/backend/kaizen/kaizen_integration.py (2)
85-90:⚠️ Potential issue | 🟠 MajorDon't log raw trajectory payloads.
Line 90 writes serialized chat history preview into logs. Even at debug level, this can capture user prompts, secrets, and tool output, creating a compliance/privacy risk. Log only metadata (counts, lengths, IDs) instead.
,🔒 Proposed fix to remove sensitive data from logs
logger.info( f"Kaizen: Saving trajectory ({len(openai_messages)} messages, " f"{len(trajectory_json)} chars, " f"task_id={task_id[:80]}, success={success})" ) - logger.debug(f"Kaizen: trajectory_data preview: {trajectory_json[:500]}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cuga/backend/kaizen/kaizen_integration.py` around lines 85 - 90, The current logger.debug call exposes raw serialized trajectory payload (trajectory_json) which may include sensitive user data; update the Kaizen logging in the save trajectory block that uses logger.info and logger.debug so it no longer emits raw trajectory_json—remove or replace the logger.debug(f"Kaizen: trajectory_data preview: {trajectory_json[:500]}") with a metadata-only log such as counts/lengths or a non-reversible fingerprint (e.g., len(trajectory_json), number of messages from openai_messages, and a truncated/hashed task_id or a fixed "<redacted>" marker) while keeping the existing logger.info call that logs len(openai_messages), len(trajectory_json), task_id[:80], and success.
148-161:⚠️ Potential issue | 🟠 MajorSSE connection establishment is still unbounded.
The timeout now correctly wraps
client.call_tool(), but the connection is established byasync with Client(transport)(line 151) which happens outside thewait_for. If the Kaizen endpoint is slow to accept the SSE connection, this can still block indefinitely, breaking the "non-fatal" contract.Wrap the entire connection and call sequence inside the timeout:
⏱️ Proposed fix to bound entire SSE sequence
url = settings.kaizen.url transport = SSETransport(url) + timeout = settings.kaizen.timeout - async with Client(transport) as client: - try: - result = await asyncio.wait_for( - client.call_tool(tool_name, args), timeout=settings.kaizen.timeout - ) - except asyncio.TimeoutError: - logger.warning( - f"Kaizen MCP call timed out after {settings.kaizen.timeout}s: " - f"tool={tool_name}, args={args}" - ) - return None + async def _invoke(): + async with Client(transport) as client: + return await client.call_tool(tool_name, args) + + try: + result = await asyncio.wait_for(_invoke(), timeout=timeout) + except asyncio.TimeoutError: + logger.warning( + f"Kaizen MCP call timed out after {timeout}s: " + f"tool={tool_name}, args={args}" + ) + return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cuga/backend/kaizen/kaizen_integration.py` around lines 148 - 161, The SSE connection (async with Client(transport)) is created outside the asyncio.wait_for, so a slow/blocked SSE handshake can hang; wrap the entire connect-and-call sequence in the timeout by moving the async with Client(transport): ... client.call_tool(...) block inside the asyncio.wait_for scope (i.e., call a coroutine that performs the async with Client(transport) and then awaits client.call_tool(tool_name, args)) and pass that coroutine to asyncio.wait_for using settings.kaizen.timeout so both connection establishment and the call are bounded.
🧹 Nitpick comments (1)
src/cuga/backend/kaizen/kaizen_integration.py (1)
42-51: Broad exception catch is intentional but consider narrowing.The
except Exceptionat line 49 aligns with the "non-fatal" contract documented in the module docstring. However, catching all exceptions can mask programming errors (e.g.,TypeErrorfrom incorrect arguments). Consider catching more specific exceptions like(ConnectionError, TimeoutError, OSError, json.JSONDecodeError)to let unexpected bugs surface during development.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cuga/backend/kaizen/kaizen_integration.py` around lines 42 - 51, The current broad except in the classmethod that calls cls._call_tool("get_guidelines", ...) should be narrowed so only expected non-fatal errors are swallowed; replace the blanket "except Exception" with a specific tuple like except (ConnectionError, TimeoutError, OSError, json.JSONDecodeError) as e to preserve the non-fatal contract while allowing programming errors (e.g., TypeError, NameError) to surface during development; ensure json.JSONDecodeError is imported/available and keep the existing logger.warning and return None behavior for those caught exceptions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/cuga/backend/kaizen/kaizen_integration.py`:
- Around line 85-90: The current logger.debug call exposes raw serialized
trajectory payload (trajectory_json) which may include sensitive user data;
update the Kaizen logging in the save trajectory block that uses logger.info and
logger.debug so it no longer emits raw trajectory_json—remove or replace the
logger.debug(f"Kaizen: trajectory_data preview: {trajectory_json[:500]}") with a
metadata-only log such as counts/lengths or a non-reversible fingerprint (e.g.,
len(trajectory_json), number of messages from openai_messages, and a
truncated/hashed task_id or a fixed "<redacted>" marker) while keeping the
existing logger.info call that logs len(openai_messages), len(trajectory_json),
task_id[:80], and success.
- Around line 148-161: The SSE connection (async with Client(transport)) is
created outside the asyncio.wait_for, so a slow/blocked SSE handshake can hang;
wrap the entire connect-and-call sequence in the timeout by moving the async
with Client(transport): ... client.call_tool(...) block inside the
asyncio.wait_for scope (i.e., call a coroutine that performs the async with
Client(transport) and then awaits client.call_tool(tool_name, args)) and pass
that coroutine to asyncio.wait_for using settings.kaizen.timeout so both
connection establishment and the call are bounded.
---
Nitpick comments:
In `@src/cuga/backend/kaizen/kaizen_integration.py`:
- Around line 42-51: The current broad except in the classmethod that calls
cls._call_tool("get_guidelines", ...) should be narrowed so only expected
non-fatal errors are swallowed; replace the blanket "except Exception" with a
specific tuple like except (ConnectionError, TimeoutError, OSError,
json.JSONDecodeError) as e to preserve the non-fatal contract while allowing
programming errors (e.g., TypeError, NameError) to surface during development;
ensure json.JSONDecodeError is imported/available and keep the existing
logger.warning and return None behavior for those caught exceptions.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c2d61463-2dff-47bf-a815-5414c1b4547b
📒 Files selected for processing (4)
src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.pysrc/cuga/backend/kaizen/kaizen_integration.pysrc/cuga/backend/kaizen/tests/test_kaizen_integration.pysrc/cuga/settings.toml
🚧 Files skipped from review as they are similar to previous changes (2)
- src/cuga/settings.toml
- src/cuga/backend/cuga_graph/nodes/cuga_lite/cuga_lite_node.py
|
@gaodan-fang @gjt-prog can u provide simple instructions how to run kaizen, also do u think we need to update the ui when memory triggers etc to be able to address things related to kaizen |
Yes, we are working on the instruction and can you elaborate more about the UI triggering part? Do you mean show memories generated by kaizen? |
@gaodan-fang meaning that we can view some nice output next the output of agent like what happened that relates to memory |
|
please resolve the conflict also connect to issue this PR |
- Combined auth validators from main with Kaizen integration validators - Kept both sets of configuration options for compatibility
resolve #109
Add Kaizen integration to CugaLite mode, enabling the agent to learn from past task trajectories and receive context-aware guidelines at task start.
What it does:
Fetches Kaizen guidelines before each CugaLite task and injects them into the system prompt
Saves agent trajectories to Kaizen after task completion for tip generation
Configurable via [kaizen] section in settings.toml (disabled by default)
Design decisions:
Standalone MCP client (not reusing existing tool registry) for reliability and isolation
Fresh SSE connection per call to avoid stale connection issues
Zero changes to Kaizen codebase required
Summary by CodeRabbit
New Features
Tests