Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,25 @@
# pylint: disable=logging-fstring-interpolation
from __future__ import annotations

import asyncio # pylint: disable=do-not-import-asyncio
import os
from typing import Any, AsyncGenerator, Union

from agent_framework import AgentProtocol
from agent_framework.azure import AzureAIAgentClient # pylint: disable=no-name-in-module
from agent_framework import SupportsAgentRun
from opentelemetry import trace

from azure.ai.agentserver.core import AgentRunContext, FoundryCBAgent
from azure.ai.agentserver.core.constants import Constants as AdapterConstants
from azure.ai.agentserver.core.logger import get_logger
from azure.ai.agentserver.core.models import (
CreateResponse,
Response as OpenAIResponse,
ResponseStreamEvent,
)
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential

from .models.agent_framework_input_converters import AgentFrameworkInputConverter
from .models.agent_framework_input_converters import transform_input
from .models.agent_framework_output_non_streaming_converter import (
AgentFrameworkOutputNonStreamingConverter,
)
from .models.agent_framework_output_streaming_converter import AgentFrameworkOutputStreamingConverter
from .models.constants import Constants

logger = get_logger()

Expand All @@ -37,62 +31,45 @@ class AgentFrameworkCBAgent(FoundryCBAgent):
"""
Adapter class for integrating Agent Framework agents with the FoundryCB agent interface.

This class wraps an Agent Framework `AgentProtocol` instance and provides a unified interface
This class wraps an Agent Framework `SupportsAgentRun` instance and provides a unified interface
for running agents in both streaming and non-streaming modes. It handles input and output
conversion between the Agent Framework and the expected formats for FoundryCB agents.

Parameters:
agent (AgentProtocol): An instance of an Agent Framework agent to be adapted.
agent (SupportsAgentRun): An instance of an Agent Framework agent to be adapted.

Usage:
- Instantiate with an Agent Framework agent.
- Call `agent_run` with a `CreateResponse` request body to execute the agent.
- Supports both streaming and non-streaming responses based on the `stream` flag.
"""

def __init__(self, agent: AgentProtocol):
def __init__(self, agent: SupportsAgentRun):
super().__init__()
self.agent = agent
self.tracer = None
logger.info(f"Initialized AgentFrameworkCBAgent with agent: {type(agent).__name__}")

def _resolve_stream_timeout(self, request_body: CreateResponse) -> float:
"""Resolve idle timeout for streaming updates.

Order of precedence:
1) request_body.stream_timeout_s (if provided)
2) env var Constants.AGENTS_ADAPTER_STREAM_TIMEOUT_S
3) Constants.DEFAULT_STREAM_TIMEOUT_S

:param request_body: The CreateResponse request body.
:type request_body: CreateResponse

:return: The resolved stream timeout in seconds.
:rtype: float
"""
override = request_body.get("stream_timeout_s", None)
if override is not None:
return float(override)
env_val = os.getenv(Constants.AGENTS_ADAPTER_STREAM_TIMEOUT_S)
return float(env_val) if env_val is not None else float(Constants.DEFAULT_STREAM_TIMEOUT_S)

def init_tracing(self):
exporter = os.environ.get(AdapterConstants.OTEL_EXPORTER_ENDPOINT)
app_insights_conn_str = os.environ.get(AdapterConstants.APPLICATION_INSIGHTS_CONNECTION_STRING)
project_endpoint = os.environ.get(AdapterConstants.AZURE_AI_PROJECT_ENDPOINT)

if project_endpoint:
project_client = AIProjectClient(endpoint=project_endpoint, credential=DefaultAzureCredential())
agent_client = AzureAIAgentClient(project_client=project_client)
agent_client.setup_azure_ai_observability()
elif exporter or app_insights_conn_str:
os.environ["WORKFLOW_ENABLE_OTEL"] = "true"
from agent_framework.observability import setup_observability

setup_observability(
enable_sensitive_data=True,
otlp_endpoint=exporter,
applicationinsights_connection_string=app_insights_conn_str,

if app_insights_conn_str:
from agent_framework.observability import create_resource, enable_instrumentation

from azure.monitor.opentelemetry import configure_azure_monitor

configure_azure_monitor(
connection_string=app_insights_conn_str,
resource=create_resource(),
)
enable_instrumentation(enable_sensitive_data=True)
elif exporter:
from agent_framework.observability import configure_otel_providers

os.environ.setdefault("OTEL_EXPORTER_OTLP_ENDPOINT", exporter)
configure_otel_providers(enable_sensitive_data=True)

self.tracer = trace.get_tracer(__name__)

async def agent_run(
Expand All @@ -102,10 +79,7 @@ async def agent_run(
AsyncGenerator[ResponseStreamEvent, Any],
]:
logger.info(f"Starting agent_run with stream={context.stream}")
request_input = context.request.get("input")

input_converter = AgentFrameworkInputConverter()
message = input_converter.transform_input(request_input)
message = transform_input(context.request.get("input"))
logger.debug(f"Transformed input message type: {type(message)}")

# Use split converters
Expand All @@ -115,24 +89,12 @@ async def agent_run(

async def stream_updates():
update_count = 0
timeout_s = self._resolve_stream_timeout(context.request)
logger.info("Starting streaming with idle-timeout=%.2fs", timeout_s)
for ev in streaming_converter.initial_events():
yield ev

# Iterate with per-update timeout; terminate if idle too long
aiter = self.agent.run_stream(message).__aiter__()
while True:
try:
update = await asyncio.wait_for(aiter.__anext__(), timeout=timeout_s)
except StopAsyncIteration:
logger.debug("Agent streaming iterator finished (StopAsyncIteration)")
break
except asyncio.TimeoutError:
logger.warning("Streaming idle timeout reached (%.1fs); terminating stream.", timeout_s)
for ev in streaming_converter.completion_events():
yield ev
return
# agent.run(stream=True) returns a ResponseStream (async iterable)
response_stream = self.agent.run(message, stream=True)
async for update in response_stream:
update_count += 1
transformed = streaming_converter.transform_output_for_streaming(update)
for event in transformed:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,120 +1,139 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=too-many-nested-blocks,too-many-return-statements,too-many-branches
# mypy: disable-error-code="no-redef"
from __future__ import annotations

from typing import Dict, List

from agent_framework import ChatMessage, Role as ChatRole
from agent_framework._types import TextContent
from agent_framework import Content, Message

from azure.ai.agentserver.core.logger import get_logger

logger = get_logger()


class AgentFrameworkInputConverter:
def transform_input( # pylint: disable=too-many-return-statements
input_item: str | List[Dict] | None,
) -> str | Message | list[str | Message] | None:
"""Normalize inputs for agent.run.

Accepts: str | List | None
Returns: None | str | ChatMessage | list[str] | list[ChatMessage]
"""

def transform_input(
self,
input: str | List[Dict] | None,
) -> str | ChatMessage | list[str] | list[ChatMessage] | None:
logger.debug("Transforming input of type: %s", type(input))
Returns: None | str | Message | list[str] | list[Message]

if input is None:
:param input_item: The raw input to normalize.
:type input_item: str or List[Dict] or None
"""
logger.debug("Transforming input of type: %s", type(input_item))

if input_item is None:
return None

if isinstance(input_item, str):
return input_item

try:
if isinstance(input_item, list):
messages: list[str | Message] = []

for item in input_item:
match item:
# Case 1: ImplicitUserMessage — no "role" or "type" key
case {"content": content} if "role" not in item and "type" not in item:
messages.extend(_parse_implicit_user_content(content))

# Case 2: Explicit message with role
case {"type": "message", "role": role, "content": content}:
_parse_explicit_message(role, content, messages)

# Determine the most natural return type
if not messages:
return None
if len(messages) == 1:
return messages[0]
if all(isinstance(m, str) for m in messages):
return [m for m in messages if isinstance(m, str)]
if all(isinstance(m, Message) for m in messages):
return [m for m in messages if isinstance(m, Message)]

# Mixed content: coerce Message to str by extracting text content parts
return _coerce_to_strings(messages)

raise TypeError(f"Unsupported input type: {type(input_item)}")
except Exception as e:
logger.debug("Error processing messages: %s", e, exc_info=True)
raise Exception(f"Error processing messages: {e}") from e # pylint: disable=broad-exception-raised


def _parse_implicit_user_content(content: str | list | None) -> list[str]:
"""Extract text from an implicit user message (no role/type keys).

:param content: The content to parse.
:type content: str or list or None
:return: A list of extracted text strings.
:rtype: list[str]
"""
match content:
case str():
return [content]
case list():
text_parts = [_extract_input_text(item) for item in content]
joined = " ".join(t for t in text_parts if t)
return [joined] if joined else []
case _:
return []


def _parse_explicit_message(role: str, content: str | list | None, sink: list[str | Message]) -> None:
"""Parse an explicit message dict and append to sink.

:param role: The role of the message sender.
:type role: str
:param content: The message content.
:type content: str or list or None
:param sink: The list to append parsed messages to.
:type sink: list[str | Message]
"""
match role:
case "user" | "assistant" | "system" | "tool":
pass
case _:
raise ValueError(f"Unsupported message role: {role!r}")

content_text = ""
match content:
case str():
content_text = content
case list():
text_parts = [_extract_input_text(item) for item in content]
content_text = " ".join(t for t in text_parts if t)

if content_text:
sink.append(Message(role=role, contents=[Content.from_text(content_text)]))


def _coerce_to_strings(messages: list[str | Message]) -> list[str | Message]:
"""Coerce a mixed list of str/Message into all strings.

:param messages: The mixed list of strings and Messages.
:type messages: list[str | Message]
:return: A list with Messages coerced to strings.
:rtype: list[str | Message]
"""
result: list[str | Message] = []
for msg in messages:
match msg:
case Message():
text_parts = [c.text for c in (getattr(msg, "contents", None) or []) if c.type == "text"]
result.append(" ".join(text_parts) if text_parts else str(msg))
case str():
result.append(msg)
return result


def _extract_input_text(content_item: Dict) -> str | None:
match content_item:
case {"type": "input_text", "text": str() as text}:
return text
case _:
return None

if isinstance(input, str):
return input

try:
if isinstance(input, list):
messages: list[str | ChatMessage] = []

for item in input:
# Case 1: ImplicitUserMessage with content as str or list of ItemContentInputText
if self._is_implicit_user_message(item):
content = item.get("content", None)
if isinstance(content, str):
messages.append(content)
elif isinstance(content, list):
text_parts: list[str] = []
for content_item in content:
text_content = self._extract_input_text(content_item)
if text_content:
text_parts.append(text_content)
if text_parts:
messages.append(" ".join(text_parts))

# Case 2: Explicit message params (user/assistant/system)
elif (
item.get("type") == "message"
and item.get("role") is not None
and item.get("content") is not None
):
role_map = {
"user": ChatRole.USER,
"assistant": ChatRole.ASSISTANT,
"system": ChatRole.SYSTEM,
}
role = role_map.get(item.get("role", "user"), ChatRole.USER)

content_text = ""
item_content = item.get("content", None)
if item_content and isinstance(item_content, list):
text_parts: list[str] = []
for content_item in item_content:
item_text = self._extract_input_text(content_item)
if item_text:
text_parts.append(item_text)
content_text = " ".join(text_parts) if text_parts else ""
elif item_content and isinstance(item_content, str):
content_text = str(item_content)

if content_text:
messages.append(ChatMessage(role=role, text=content_text))

# Determine the most natural return type
if not messages:
return None
if len(messages) == 1:
return messages[0]
if all(isinstance(m, str) for m in messages):
return [m for m in messages if isinstance(m, str)]
if all(isinstance(m, ChatMessage) for m in messages):
return [m for m in messages if isinstance(m, ChatMessage)]

# Mixed content: coerce ChatMessage to str by extracting TextContent parts
result: list[str] = []
for msg in messages:
if isinstance(msg, ChatMessage):
text_parts: list[str] = []
for c in getattr(msg, "contents", []) or []:
if isinstance(c, TextContent):
text_parts.append(c.text)
result.append(" ".join(text_parts) if text_parts else str(msg))
else:
result.append(str(msg))
return result

raise TypeError(f"Unsupported input type: {type(input)}")
except Exception as e:
logger.error("Error processing messages: %s", e, exc_info=True)
raise Exception(f"Error processing messages: {e}") from e # pylint: disable=broad-exception-raised

def _is_implicit_user_message(self, item: Dict) -> bool:
return "content" in item and "role" not in item and "type" not in item

def _extract_input_text(self, content_item: Dict) -> str:
if content_item.get("type") == "input_text" and "text" in content_item:
text_content = content_item.get("text")
if isinstance(text_content, str):
return text_content
return None # type: ignore
Loading
Loading