Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 33 additions & 141 deletions src/cuga/backend/cuga_graph/nodes/answer/final_answer.py
Original file line number Diff line number Diff line change
@@ -1,151 +1,43 @@
import functools
import json
from typing import Literal, Dict, Callable

from langchain_core.messages import AIMessage
from langgraph.types import Command

from cuga.backend.activity_tracker.tracker import ActivityTracker, Step
from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.final_answer_agent import FinalAnswerAgent
from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.prompts.load_prompt import FinalAnswerOutput
from cuga.backend.cuga_graph.nodes.shared.base_node import BaseNode
from cuga.backend.cuga_graph.nodes.human_in_the_loop.followup_model import (
create_save_reuse_action,
create_get_more_utterances,
from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.final_answer_agent import (
FinalAnswerAgent,
)
from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.prompts.load_prompt import (
FinalAnswerOutput,
)
from cuga.backend.cuga_graph.nodes.shared.base_node import BaseNode
from cuga.backend.cuga_graph.state.agent_state import AgentState
from cuga.config import settings
from cuga.backend.cuga_graph.utils.nodes_names import NodeNames, ActionIds, MessagePrefixes

tracker = ActivityTracker()

# Feature flag for human-in-the-loop functionality
ENABLE_SAVE_REUSE = settings.features.save_reuse


class HumanInTheLoopHandler:
"""Simple handler for human-in-the-loop interactions"""

def __init__(self):
self._action_handlers: Dict[str, Callable] = {
ActionIds.SAVE_REUSE: self._handle_save_reuse,
ActionIds.SAVE_REUSE_INTENT: self._handle_save_reuse_intent,
}

def handle_human_response(self, state: AgentState, node_name: str) -> Command:
"""Handle any human response based on action_id"""
action_id = state.hitl_response.action_id

if action_id in self._action_handlers:
return self._action_handlers[action_id](state, node_name)

# Default fallback
return Command(update=state.model_dump(), goto=NodeNames.END)

def add_action_handler(self, action_id: str, handler: Callable):
"""Add a custom action handler"""
self._action_handlers[action_id] = handler

def _handle_save_reuse(self, state: AgentState, node_name: str) -> Command:
"""Handle save/reuse action - get more utterances"""
state.hitl_action = create_get_more_utterances()
state.sender = node_name
return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS)

def _handle_save_reuse_intent(self, state: AgentState, node_name: str) -> Command:
"""Handle save/reuse intent - go to reuse agent"""
state.sender = node_name
return Command(update=state.model_dump(), goto=NodeNames.REUSE_AGENT)
from cuga.backend.cuga_graph.utils.nodes_names import NodeNames


class FinalAnswerNode(BaseNode):
def __init__(self, final_answer_agent: FinalAnswerAgent):
super().__init__()
self.final_answer_agent = final_answer_agent
self.hitl_handler = HumanInTheLoopHandler()

self.node = functools.partial(
FinalAnswerNode.node_handler,
agent=self.final_answer_agent,
name=self.final_answer_agent.name,
hitl_handler=self.hitl_handler,
"""Node responsible for generating the final answer."""

def __init__(self, final_answer_agent: FinalAnswerAgent) -> None:
super().__init__(final_answer_agent)

async def _invoke(
self,
state: AgentState,
agent: FinalAnswerAgent,
name: str,
) -> Command:
"""
Invoke the final answer agent to generate the response.

This method is called by `BaseNode.node_handler`.
"""
await self._generate_and_process_response(
state=state,
agent=agent,
name=name,
response_parser=FinalAnswerOutput,
)

@staticmethod
async def node_handler(
state: AgentState, agent: FinalAnswerAgent, name: str, hitl_handler: HumanInTheLoopHandler
) -> Command[Literal["__end__", "SuggestHumanActions", "ReuseAgent"]]:
# Handle human responses (only if HITL is enabled)
if ENABLE_SAVE_REUSE and state.sender == NodeNames.WAIT_FOR_RESPONSE:
return hitl_handler.handle_human_response(state, name)

# Handle direct chat calls (no processing needed)
if state.sender == NodeNames.CHAT_AGENT:
state.sender = name
final_answer_content = state.chat_agent_messages[-1].content
state.final_answer = final_answer_content
final_answer_output = FinalAnswerOutput(
thoughts=["Chat response provided directly."], final_answer=final_answer_content
)
state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
return Command(update=state.model_dump(), goto=NodeNames.END)

# Handle TaskAnalyzerAgent when final_answer is already set (no apps matched)
if state.sender == NodeNames.TASK_ANALYZER_AGENT and state.final_answer:
state.sender = name
final_answer_output = FinalAnswerOutput(
thoughts=[
"No applications matched the request. Providing available applications information."
],
final_answer=state.final_answer,
)
state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
return Command(update=state.model_dump(), goto=NodeNames.END)
if state.sender == NodeNames.CUGA_LITE:
state.sender = name
state.final_answer = state.final_answer
state.sender = name
final_answer_output = FinalAnswerOutput(
thoughts=[],
final_answer=state.final_answer,
)
state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
return Command(update=state.model_dump(), goto=NodeNames.END)
# Main processing: generate final answer
await FinalAnswerNode._generate_final_answer(state, agent, name)

# Route based on sender (only suggest human actions if HITL is enabled)
# Allow save/reuse from both PlanControllerAgent (task decomposition mode) and ChatAgent (chat mode)
if ENABLE_SAVE_REUSE and state.sender == NodeNames.PLAN_CONTROLLER_AGENT:
state.hitl_action = create_save_reuse_action()
state.sender = name
return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS)
else:
return Command(update=state.model_dump(), goto=NodeNames.END)

@staticmethod
async def _generate_final_answer(state: AgentState, agent: FinalAnswerAgent, name: str):
"""Generate and process the final answer"""
# Run the agent
response = await agent.run(state)
state.messages.append(response)

# Parse and process output
final_answer_output = FinalAnswerOutput(**json.loads(response.content))

# Add to chat if enabled
if settings.features.chat:
chat_message = f"{MessagePrefixes.ANSWER_PREFIX}{final_answer_output.final_answer}"
state.append_to_last_chat_message(chat_message)

# Track the step
tracker.collect_step(Step(name=name, data=final_answer_output.model_dump_json()))

# Replace variables and update state
final_answer_output.final_answer = state.variables_manager.replace_variables_placeholders(
final_answer_output.final_answer
state.sender = name
return Command(
update=state.model_dump(),
goto=NodeNames.END,
)
state.final_answer = final_answer_output.final_answer

Loading