-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllm_mcp_client.py
More file actions
221 lines (187 loc) · 7.07 KB
/
llm_mcp_client.py
File metadata and controls
221 lines (187 loc) · 7.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""
Interactive LLM client that pulls tool definitions from the MCP server and
lets you test them via the OpenAI chat completions API.
"""
import asyncio
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Union
from fastmcp import Client
from openai import OpenAI
import yaml
try:
from dotenv import load_dotenv
except ImportError:
load_dotenv = None
CONFIG_PATH = Path("config.yml")
DEFAULT_MODEL = "gpt-5.1"
EXIT_WORDS = {"exit", "quit", "q"}
def load_api_key() -> str:
"""Load OPENAI_API_KEY from .env or the current environment."""
if load_dotenv is not None:
load_dotenv()
else:
_fallback_load_env_file()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError(
"OPENAI_API_KEY is missing. Add it to a .env file or export it before running."
)
return api_key
def _fallback_load_env_file() -> None:
env_path = Path(".env")
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'"))
def load_config() -> Dict[str, Any]:
"""Load configuration from config.yml (or return an empty dict if missing)."""
if not CONFIG_PATH.exists():
return {}
try:
data = yaml.safe_load(CONFIG_PATH.read_text()) or {}
if not isinstance(data, dict):
return {}
return data
except Exception:
# Keep the client usable even if config is malformed.
return {}
def resolve_mcp_transport(config: Dict[str, Any]) -> Union[str, Path]:
"""
Resolve the MCP server target.
Prefer an HTTP URL (realistic deployment), fall back to a local path for stdio.
"""
url = config.get("mcp_server_url")
if isinstance(url, str) and url.strip():
return url.strip()
path_value = config.get("mcp_server_path") or "globalapi_mcp_server.py"
mcp_path = Path(path_value).expanduser().resolve()
if not mcp_path.exists():
raise FileNotFoundError(
f"MCP server path not found: {mcp_path}. "
"Update config.yml:mcp_server_path or set mcp_server_url."
)
return mcp_path
async def build_openai_tools(mcp_client: Client) -> List[Dict[str, Any]]:
"""Fetch tool metadata from the MCP server and adapt it for OpenAI."""
tools = await mcp_client.list_tools()
openai_tools: List[Dict[str, Any]] = []
print("\nFetched tools from MCP server:")
for tool in tools:
schema = tool.inputSchema or {"type": "object", "properties": {}}
openai_tools.append(
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": schema,
},
}
)
required = ", ".join(schema.get("required", [])) if isinstance(schema, dict) else ""
print(f"- {tool.name} (requires: {required or 'none'})")
return openai_tools
async def call_mcp_tool(mcp_client: Client, name: str, arguments: Dict[str, Any]) -> str:
"""Call a tool on the MCP server and return a JSON string for the chat history."""
result = await mcp_client.call_tool(name=name, arguments=arguments or {})
if result.data is not None:
payload: Any = result.data
elif result.structured_content is not None:
payload = result.structured_content
elif result.content:
payload = [getattr(item, "text", str(item)) for item in result.content]
else:
payload = "Tool returned no content."
return json.dumps(payload, ensure_ascii=False, default=str)
async def run_conversation_turn(
llm: OpenAI,
mcp_client: Client,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]],
model_name: str,
) -> None:
"""Send the conversation to the LLM, handle tool calls, and print the reply."""
while True:
completion = llm.chat.completions.create(
model=model_name,
messages=messages,
tools=tools,
tool_choice="auto",
)
message = completion.choices[0].message
tool_calls = message.tool_calls or []
assistant_content = message.content or ""
if tool_calls:
messages.append(
{
"role": "assistant",
"content": assistant_content,
"tool_calls": [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in tool_calls
],
}
)
for tc in tool_calls:
try:
args = json.loads(tc.function.arguments or "{}")
except json.JSONDecodeError:
args = {}
print(f"\n> Calling tool '{tc.function.name}' with {args}")
try:
tool_response = await call_mcp_tool(
mcp_client, tc.function.name, args
)
except Exception as exc: # pylint: disable=broad-except
tool_response = f"Error calling tool {tc.function.name}: {exc}"
print(tool_response)
messages.append(
{
"role": "tool",
"tool_call_id": tc.id,
"content": tool_response,
}
)
continue
messages.append({"role": "assistant", "content": assistant_content})
print(f"\nAssistant: {assistant_content}\n")
return
async def main() -> None:
api_key = load_api_key()
config = load_config()
model_name = config.get("openai_model") or DEFAULT_MODEL
mcp_transport = resolve_mcp_transport(config)
llm = OpenAI(api_key=api_key)
print("Starting MCP client...")
print(f"Using OpenAI model: {model_name}")
print(f"MCP target: {mcp_transport}")
async with Client(mcp_transport) as mcp_client:
openai_tools = await build_openai_tools(mcp_client)
print("\nAsk a question to test the tools. Type 'exit' to quit.")
messages: List[Dict[str, Any]] = []
while True:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in EXIT_WORDS:
print("Goodbye.")
break
messages.append({"role": "user", "content": user_input})
await run_conversation_turn(
llm, mcp_client, messages, openai_tools, model_name
)
if __name__ == "__main__":
asyncio.run(main())