-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_agent.py
More file actions
143 lines (124 loc) · 6.45 KB
/
mcp_agent.py
File metadata and controls
143 lines (124 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# mcp_agent.py
# 这是一个模拟处理复杂工作流的MCP Agent。
import requests
import json
import uuid
class MCPClient:
"""一个简单的MCP客户端,用于与服务器通信"""
def __init__(self, server_url):
self.server_url = server_url
self.headers = {'Content-Type': 'application/json'}
def _send_request(self, method, params=None):
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": str(uuid.uuid4())
}
print(f"\n[Agent] --> Sending request to server: {method}")
try:
response = requests.post(self.server_url, data=json.dumps(payload), headers=self.headers)
response.raise_for_status()
response_json = response.json()
if "error" in response_json:
raise Exception(f"Server Error: {response_json['error']['message']}")
return response_json.get("result")
except requests.exceptions.RequestException as e:
print(f"[Agent] Communication error: {e}")
return None
except Exception as e:
print(f"[Agent] An error occurred: {e}")
return None
def discover(self):
return self._send_request("mcp/discover")
def run(self, capability, parameters):
params = {"capability": capability, "parameters": parameters}
return self._send_request("mcp/run", params)
class WorkflowAgent:
"""
负责编排和执行整个入职工作流的Agent
"""
def __init__(self, mcp_server_url):
self.client = MCPClient(mcp_server_url)
self.capabilities = None
self.workflow_context = {} # 用于在工作流步骤之间传递数据
def initialize(self):
"""发现服务器能力并准备好工作"""
print("[Agent] Initializing... Discovering server capabilities.")
discovery_result = self.client.discover()
if discovery_result:
self.capabilities = discovery_result.get("capabilities")
print(f"[Agent] Discovery successful. Found {len(self.capabilities)} capabilities.")
return True
print("[Agent] Failed to discover capabilities. Agent cannot start.")
return False
def execute_onboarding_workflow(self, employee_id, full_name, position):
"""
核心工作流逻辑。
在真实场景中,这里的计划可能是由一个大型语言模型(LLM)动态生成的。
此处我们用一个硬编码的计划来模拟。
"""
print("-" * 50)
print(f"[Agent] Starting onboarding workflow for {full_name} ({employee_id})")
print("-" * 50)
# --- 步骤 1: 在HR系统中创建档案 ---
print("\n[PLAN] Step 1: Create employee profile in HRIS.")
profile_params = {"employee_id": employee_id, "full_name": full_name, "position": position}
profile_result = self.client.run("hris_create_employee_profile", profile_params)
if not profile_result: return # 如果失败则终止
self.workflow_context['employee_id'] = profile_result['employee_id']
print(f"[SUCCESS] Profile created for {self.workflow_context['employee_id']}.")
# --- 步骤 2: 查找合适的笔记本电脑 ---
print("\n[PLAN] Step 2: Find a suitable laptop from ITAM.")
# 模拟LLM的决策:软件工程师需要高性能电脑
laptop_req = 'high-performance' if position.lower() == 'software engineer' else 'standard'
laptop_result = self.client.run("itam_find_available_laptop", {"position_requirement": laptop_req})
if not laptop_result: return
self.workflow_context['laptop'] = laptop_result
print(f"[SUCCESS] Found laptop: {laptop_result['model']} (ID: {laptop_result['asset_id']}).")
# --- 步骤 3: 分配笔记本电脑 ---
print("\n[PLAN] Step 3: Assign the laptop to the employee.")
assign_params = {
"asset_id": self.workflow_context['laptop']['asset_id'],
"employee_id": self.workflow_context['employee_id']
}
assign_result = self.client.run("itam_assign_asset_to_employee", assign_params)
if not assign_result: return
print(f"[SUCCESS] {assign_result['status']}.")
# --- 步骤 4: 创建用户账户 ---
print("\n[PLAN] Step 4: Create user accounts in IAM.")
account_params = {"employee_id": employee_id, "full_name": full_name}
account_result = self.client.run("iam_create_user_accounts", account_params)
if not account_result: return
self.workflow_context['accounts'] = account_result
print(f"[SUCCESS] Accounts created: Email -> {account_result['email']}, Jira -> {account_result['jira_account']}.")
# --- 步骤 5: 通知IT支持团队 ---
print("\n[PLAN] Step 5: Notify IT support team.")
it_message = (f"Please prepare laptop {self.workflow_context['laptop']['asset_id']} "
f"({self.workflow_context['laptop']['model']}) for new employee "
f"{full_name} ({employee_id}).")
self.client.run("notifier_send_notification", {"recipient": "it-support@examplecorp.com", "message": it_message})
print(f"[SUCCESS] IT support team notified.")
# --- 步骤 6: 发送欢迎邮件给新员工 ---
print("\n[PLAN] Step 6: Send welcome email to the new employee.")
welcome_message = (f"Welcome to the team, {full_name}!\n\n"
f"Your accounts have been created:\n"
f"- Email: {self.workflow_context['accounts']['email']}\n"
f"- Jira: {self.workflow_context['accounts']['jira_account']}\n\n"
f"IT will contact you regarding your new {self.workflow_context['laptop']['model']}.")
self.client.run("notifier_send_notification", {"recipient": self.workflow_context['accounts']['email'], "message": welcome_message})
print(f"[SUCCESS] Welcome email sent.")
print("\n" + "-" * 50)
print("[Agent] Onboarding workflow completed successfully!")
print("-" * 50)
if __name__ == '__main__':
# 确保你的MCP Server正在运行
SERVER_URL = "http://localhost:8000"
agent = WorkflowAgent(SERVER_URL)
if agent.initialize():
# --- 启动工作流 ---
agent.execute_onboarding_workflow(
employee_id="john.doe",
full_name="John Doe",
position="Software Engineer"
)