-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathnodes.py
More file actions
120 lines (95 loc) · 4.22 KB
/
nodes.py
File metadata and controls
120 lines (95 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Tencent is pleased to support the open source community by making tRPC-Agent-Python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# tRPC-Agent-Python is licensed under Apache-2.0.
"""Node functions for graph interrupt workflow."""
from typing import Any
from typing import Dict
from trpc_agent_sdk.context import InvocationContext
from trpc_agent_sdk.dsl.graph import STATE_KEY_LAST_RESPONSE
from trpc_agent_sdk.dsl.graph import STATE_KEY_NODE_RESPONSES
from trpc_agent_sdk.dsl.graph import STATE_KEY_USER_INPUT
from trpc_agent_sdk.dsl.graph import interrupt
from .state import InterruptState
ROUTE_APPROVED = "approved"
ROUTE_REJECTED = "rejected"
def _normalize_text(text: str) -> str:
return text.strip() if text else ""
def _truncate_text(text: str, max_len: int = 100) -> str:
if len(text) <= max_len:
return text
return text[:max_len - 3] + "..."
def _log_node(node_name: str, message: str) -> None:
print(f"[node_execute:{node_name}] {message}")
async def approval_gate(state: InterruptState, ctx: InvocationContext) -> Dict[str, Any]:
"""Pause execution and wait for user approval via graph interrupt."""
request_text = _normalize_text(state.get(STATE_KEY_USER_INPUT, ""))
suggested_action = _normalize_text(state.get(STATE_KEY_LAST_RESPONSE, ""))
if not suggested_action:
suggested_action = _normalize_text(state.get(STATE_KEY_NODE_RESPONSES, {}).get("draft_action", ""))
if not suggested_action:
suggested_action = "Proceed with a minimal safe default action."
payload = {
"title": "Approval Required",
"request": request_text,
"suggested_action": suggested_action,
"options": ["approved", "rejected"],
"tip": "Provide status in FunctionResponse.response, e.g. {'status':'approved','note':'...'}",
}
_log_node("approval_gate", f"interrupt_payload={payload}")
decision = interrupt(payload)
status = "rejected"
note = ""
if isinstance(decision, dict):
status_value = str(decision.get("status", "approved")).strip().lower()
if status_value in {"approved", "rejected"}:
status = status_value
note = str(decision.get("note", "")).strip()
elif isinstance(decision, str):
status_value = decision.strip().lower()
if status_value in {"approved", "rejected"}:
status = status_value
summary_request = (f"User request: {request_text}\n"
f"Approved action: {suggested_action}\n"
f"Approval note: {note or '(none)'}\n"
"Summarize what was approved and what will happen next in 1-2 short sentences.")
result = {
"suggested_action": suggested_action,
"approval_status": status,
"approval_note": note,
"summary_request": summary_request,
"context_note": f"user={ctx.user_id} session={ctx.session_id}",
}
_log_node("approval_gate", f"resume_decision={result}")
return result
def route_after_approval(state: InterruptState) -> str:
"""Route to summary agent only when user approved."""
status = _normalize_text(state.get("approval_status", ROUTE_APPROVED)).lower()
if status == ROUTE_APPROVED:
return ROUTE_APPROVED
return ROUTE_REJECTED
async def finalize_output(state: InterruptState) -> Dict[str, Any]:
"""Build final output based on approval decision."""
status = state.get("approval_status", "approved")
note = state.get("approval_note", "")
suggested_action = state.get("suggested_action", "")
approval_summary = _normalize_text(state.get("approval_summary", ""))
if status == "approved":
summary = f"Decision: approved\nAction: {suggested_action}"
if approval_summary:
summary = f"{summary}\nSummary: {approval_summary}"
else:
summary = f"Decision: rejected\nAction blocked: {suggested_action}"
if note:
summary = f"{summary}\nNote: {note}"
output = f"""
==============================
Graph Interrupt Result
==============================
{summary}
Context: {state.get('context_note', '')}
""".strip()
result = {STATE_KEY_LAST_RESPONSE: output}
_log_node("finalize_output", f"return.last_response_len={len(output)}")
return result