From 2c51f05a378d1615c34db33166e56cd4761b3d07 Mon Sep 17 00:00:00 2001
From: Pasquale Grimaldi
Date: Tue, 10 Mar 2026 23:02:36 +0100
Subject: [PATCH] add Qwen model support and Bedrock API Keys authentication
- Enable Qwen models on Bedrock (add routing in chat_completion)
- Add AWS_BEARER_TOKEN_BEDROCK env var for bearer token authentication
- Implement Converse API for bearer token auth (works with Claude and Qwen)
- Handle tool calls and tool results in Converse API format
- Fall back to boto3/aioboto3 when bearer token is not set
---
src/openagents/lms/providers.py | 203 ++++++++++++++++++++++++++++++--
1 file changed, 195 insertions(+), 8 deletions(-)
diff --git a/src/openagents/lms/providers.py b/src/openagents/lms/providers.py
index 04907cf74..f4b27650f 100644
--- a/src/openagents/lms/providers.py
+++ b/src/openagents/lms/providers.py
@@ -233,15 +233,19 @@ class BedrockProvider(BaseModelProvider):
def __init__(self, model_name: str, region: Optional[str] = None, **kwargs):
self.model_name = model_name
self.region = region or os.getenv("AWS_DEFAULT_REGION", "us-east-1")
+ self.bearer_token = os.getenv("AWS_BEARER_TOKEN_BEDROCK")
+ self.session = None
+
+ if not self.bearer_token:
+ try:
+ import aioboto3
+ except ImportError:
+ raise ImportError(
+ "aioboto3 package is required for async Bedrock provider (unless using AWS_BEARER_TOKEN_BEDROCK). "
+ "Install with: pip install aioboto3"
+ )
+ self.session = aioboto3.Session()
- try:
- import aioboto3
- except ImportError:
- raise ImportError(
- "aioboto3 package is required for async Bedrock provider. Install with: pip install aioboto3"
- )
-
- self.session = aioboto3.Session()
async def chat_completion(
self,
@@ -252,6 +256,8 @@ async def chat_completion(
# Format depends on the specific model
if "claude" in self.model_name.lower():
return await self._claude_bedrock_completion(messages, tools)
+ elif "qwen" in self.model_name.lower():
+ return await self._qwen_bedrock_completion(messages, tools)
else:
raise NotImplementedError(
f"Model {self.model_name} not yet supported in Bedrock provider"
@@ -263,6 +269,10 @@ async def _claude_bedrock_completion(
tools: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Handle Claude models on Bedrock."""
+ # Convert to Converse API format for bearer token, or invoke_model for boto3
+ if self.bearer_token:
+ return await self._claude_bedrock_bearer(messages, tools)
+
# Convert to Claude Bedrock format
claude_messages = []
system_message = None
@@ -322,6 +332,183 @@ async def _claude_bedrock_completion(
return result
+ async def _claude_bedrock_bearer(
+ self,
+ messages: List[Dict[str, Any]],
+ tools: Optional[List[Dict[str, Any]]] = None,
+ ) -> Dict[str, Any]:
+ """Handle Claude models on Bedrock using bearer token (Converse API)."""
+ import aiohttp
+
+ # Build Converse API request body
+ converse_messages = []
+ system_parts = []
+
+ for msg in messages:
+ if msg["role"] == "system":
+ system_parts.append({"text": msg["content"]})
+ elif msg["role"] == "tool":
+ # Converse API expects tool results inside a "user" role message
+ tool_result = {
+ "toolResult": {
+ "toolUseId": msg.get("tool_call_id", ""),
+ "content": [{"text": msg.get("content", "")}]
+ }
+ }
+ converse_messages.append({
+ "role": "user",
+ "content": [tool_result]
+ })
+ elif msg["role"] == "assistant" and msg.get("tool_calls"):
+ # Convert assistant tool_calls to Converse format
+ content = []
+ if msg.get("content"):
+ content.append({"text": msg["content"]})
+ for tc in msg["tool_calls"]:
+ content.append({
+ "toolUse": {
+ "toolUseId": tc.get("id", ""),
+ "name": tc.get("name", tc.get("function", {}).get("name", "")),
+ "input": json.loads(tc.get("arguments", tc.get("function", {}).get("arguments", "{}")))
+ }
+ })
+ converse_messages.append({"role": "assistant", "content": content})
+ else:
+ converse_messages.append({
+ "role": msg["role"],
+ "content": [{"text": msg.get("content", "") or ""}]
+ })
+
+ body = {"messages": converse_messages}
+
+ if system_parts:
+ body["system"] = system_parts
+
+ if tools:
+ tool_config = {"tools": []}
+ for tool in tools:
+ tool_spec = {
+ "name": tool.get("name", ""),
+ "description": tool.get("description", ""),
+ "inputSchema": {
+ "json": tool.get("input_schema", tool.get("parameters", {"type": "object", "properties": {}}))
+ }
+ }
+ tool_config["tools"].append({"toolSpec": tool_spec})
+ body["toolConfig"] = tool_config
+
+ url = f"https://bedrock-runtime.{self.region}.amazonaws.com/model/{self.model_name}/converse"
+
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ url,
+ json=body,
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {self.bearer_token}",
+ },
+ ) as resp:
+ if resp.status != 200:
+ error_text = await resp.text()
+ raise RuntimeError(f"Bedrock API error ({resp.status}): {error_text}")
+ response_body = await resp.json()
+
+ # Parse Converse API response
+ result = {"content": "", "tool_calls": []}
+
+ output = response_body.get("output", {})
+ message = output.get("message", {})
+ for content_block in message.get("content", []):
+ if "text" in content_block:
+ result["content"] += content_block["text"]
+ elif "toolUse" in content_block:
+ tool_use = content_block["toolUse"]
+ result["tool_calls"].append({
+ "id": tool_use["toolUseId"],
+ "name": tool_use["name"],
+ "arguments": json.dumps(tool_use["input"]),
+ })
+
+ # Extract token usage
+ usage = response_body.get("usage", {})
+ if usage:
+ input_tokens = usage.get("inputTokens")
+ output_tokens = usage.get("outputTokens")
+ result["usage"] = {
+ "input_tokens": input_tokens,
+ "output_tokens": output_tokens,
+ "total_tokens": (input_tokens or 0) + (output_tokens or 0) if input_tokens or output_tokens else None,
+ }
+
+ return result
+
+ async def _qwen_bedrock_completion(
+ self,
+ messages: List[Dict[str, Any]],
+ tools: Optional[List[Dict[str, Any]]] = None,
+ ) -> Dict[str, Any]:
+ """Handle Qwen models on Bedrock (OpenAI-compatible format)."""
+ # Use Converse API with bearer token if available
+ if self.bearer_token:
+ return await self._claude_bedrock_bearer(messages, tools)
+
+ # Qwen uses OpenAI-compatible message format
+ qwen_messages = []
+
+ for msg in messages:
+ qwen_messages.append({"role": msg["role"], "content": msg["content"]})
+
+ body = {
+ "messages": qwen_messages,
+ "max_tokens": 4096,
+ }
+
+ if tools:
+ body["tools"] = [{"type": "function", "function": tool} for tool in tools]
+ body["tool_choice"] = "auto"
+
+ async with self.session.client(
+ "bedrock-runtime", region_name=self.region
+ ) as client:
+ response = await client.invoke_model(
+ modelId=self.model_name, body=json.dumps(body)
+ )
+
+ response_body = json.loads(response["body"].read())
+
+ # Standardize response format (OpenAI-compatible response)
+ result = {"content": "", "tool_calls": []}
+
+ # Handle OpenAI-compatible response format
+ if "choices" in response_body:
+ choice = response_body["choices"][0]
+ message = choice.get("message", {})
+ result["content"] = message.get("content", "")
+
+ if "tool_calls" in message and message["tool_calls"]:
+ for tool_call in message["tool_calls"]:
+ result["tool_calls"].append(
+ {
+ "id": tool_call.get("id", ""),
+ "name": tool_call["function"]["name"],
+ "arguments": tool_call["function"]["arguments"],
+ }
+ )
+ # Handle direct content response
+ elif "content" in response_body:
+ result["content"] = response_body["content"]
+
+ # Extract token usage
+ usage = response_body.get("usage", {})
+ if usage:
+ result["usage"] = {
+ "input_tokens": usage.get("prompt_tokens"),
+ "output_tokens": usage.get("completion_tokens"),
+ "total_tokens": usage.get("total_tokens"),
+ }
+
+ return result
+
def format_tools(self, tools: List[Any]) -> List[Dict[str, Any]]:
"""Format tools for Bedrock."""
formatted_tools = []