Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import logging
import warnings
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -46,13 +47,89 @@ class ResourceInfo:
scope_version: str


@dataclass
class ConversationTurn:
"""A single user-assistant conversation turn."""
"""A single user-assistant conversation turn, with prior history preserved.

``input_messages`` holds every input event (``gen_ai.user.message`` and
``gen_ai.assistant.message``) in the order observed on the span, so
downstream consumers can reconstruct the actual conversation flow.
``assistant_messages`` holds only the current turn's output, derived from
``gen_ai.choice`` events. ``tool_results`` holds any tool outputs observed
on the span.

The ``user_message`` attribute is retained as a backwards-compatible alias
returning the most recent user turn's content; new code should iterate
``input_messages`` filtered by role.
"""

def __init__(
self,
user_message: Optional[str] = None,
assistant_messages: Optional[List[Dict[str, Any]]] = None,
tool_results: Optional[List[str]] = None,
input_messages: Optional[List[Dict[str, Any]]] = None,
):
"""Initialize a conversation turn.

``input_messages`` is the chronological list of input events
(``{"role": "user"|"assistant", "content": {...}}``). For backwards
compatibility, callers may instead pass a legacy ``user_message``
scalar, which is converted into a single-entry ``input_messages``
list. Supplying both is an error.
"""
if input_messages is not None and user_message is not None:
raise ValueError("Provide either input_messages or user_message, not both")
if input_messages is not None:
self.input_messages = list(input_messages)
elif user_message is not None:
self.input_messages = [{"content": {"content": user_message}, "role": "user"}]
else:
self.input_messages = []
self.assistant_messages = list(assistant_messages or [])
self.tool_results = list(tool_results or [])

def __repr__(self) -> str:
"""Return a debug representation listing every stored field."""
return (
f"ConversationTurn(input_messages={self.input_messages!r}, "
f"assistant_messages={self.assistant_messages!r}, "
f"tool_results={self.tool_results!r})"
)

def __eq__(self, other: object) -> bool:
"""Compare turns by the full set of instance fields."""
if not isinstance(other, ConversationTurn):
return NotImplemented
return (
self.input_messages == other.input_messages
and self.assistant_messages == other.assistant_messages
and self.tool_results == other.tool_results
)

user_message: str
assistant_messages: List[Dict[str, Any]]
tool_results: List[str]
@property
def user_message(self) -> str:
"""Return the most recent user turn's content (backwards-compatible alias).

Deprecated: iterate ``input_messages`` to access the full conversation
history. This alias returns only the last user entry.
"""
user_entries = [m for m in self.input_messages if m.get("role") == "user"]
if not user_entries:
return ""
if len(user_entries) > 1 or any(m.get("role") == "assistant" for m in self.input_messages):
warnings.warn(
"ConversationTurn.user_message drops prior user turns and history "
"when the span carries multiple input events. Iterate "
"ConversationTurn.input_messages for the full conversation.",
DeprecationWarning,
stacklevel=2,
)
last = user_entries[-1]
content = last.get("content")
if isinstance(content, dict):
inner = content.get("content", "")
return inner if isinstance(inner, str) else ""
return content if isinstance(content, str) else ""


@dataclass
Expand Down Expand Up @@ -191,7 +268,14 @@ def build_conversation_log_record(
metadata: SpanMetadata,
resource_info: ResourceInfo,
) -> Dict[str, Any]:
"""Build ADOT log record for conversation turn."""
"""Build ADOT log record for conversation turn.

``body.input.messages`` carries ``ConversationTurn.input_messages`` in
event arrival order, so downstream consumers reconstruct the exact
conversation the model received (user and prior assistant turns
interleaved). ``body.output.messages`` carries only the current turn's
output.
"""
output_messages = []
for i, msg in enumerate(conversation.assistant_messages):
output_msg = msg.copy()
Expand All @@ -206,7 +290,7 @@ def build_conversation_log_record(

body = {
"output": {"messages": output_messages},
"input": {"messages": [{"content": {"content": conversation.user_message}, "role": "user"}]},
"input": {"messages": list(conversation.input_messages)},
}

return cls._build_log_record_base(metadata, resource_info, body)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,29 @@ class StrandsEventParser:

@classmethod
def extract_conversation_turn(cls, events: List[Any]) -> Optional[ConversationTurn]:
"""Extract conversation turn from Strands span events."""
user_message = None
assistant_messages = []
tool_results = []
"""Extract conversation turn from Strands span events.

Per OTel GenAI semantic conventions, ``gen_ai.{user,assistant,tool}.message``
events describe input context (including prior assistant turns replayed
as history), while ``gen_ai.choice`` describes the model's current-turn
output. ``input_messages`` preserves event arrival order so downstream
consumers can reconstruct the actual conversation flow (``[user,
assistant, user, assistant, user]`` rather than role-grouped).
"""
input_messages: List[Dict[str, Any]] = []
assistant_messages: List[Dict[str, Any]] = []
tool_results: List[str] = []

for event in events:
event_attrs = dict(event.attributes) if hasattr(event, "attributes") and event.attributes else {}

match event.name:
case cls.EVENT_USER_MESSAGE:
user_message = event_attrs.get("content", "")
content = event_attrs.get("content", "")
if content:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if content: silently drops empty events. The old scalar path had different behavior — user_message = "" would fail the outer if user_message gate and reject the whole turn. Now empty events are skipped individually. If the intent is to drop empty content, add a log line so it's visible.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logger.debug(...) on all three empty-content skip paths (user, assistant, tool). The outer-gate semantics are also restored — a parse that yields only an empty user event now fails the gate (has_user_input requires a non-empty input_messages entry with role=user), matching the old if user_message behavior.

input_messages.append({"content": {"content": content}, "role": "user"})
else:
logger.debug("Skipping gen_ai.user.message with empty content")

case cls.EVENT_CHOICE:
message = event_attrs.get("message", "")
Expand All @@ -66,16 +78,21 @@ def extract_conversation_turn(cls, events: List[Any]) -> Optional[ConversationTu
case cls.EVENT_ASSISTANT_MESSAGE:
content = event_attrs.get("content", "")
if content:
assistant_messages.append({"content": {"content": content}, "role": "assistant"})
input_messages.append({"content": {"content": content}, "role": "assistant"})
else:
logger.debug("Skipping gen_ai.assistant.message with empty content")

case cls.EVENT_TOOL_MESSAGE:
content = event_attrs.get("content", "")
if content:
tool_results.append(content)
else:
logger.debug("Skipping gen_ai.tool.message with empty content")

if user_message and assistant_messages:
has_user_input = any(m.get("role") == "user" for m in input_messages)
if has_user_input and assistant_messages:
return ConversationTurn(
user_message=user_message,
input_messages=input_messages,
assistant_messages=assistant_messages,
tool_results=tool_results,
)
Expand Down
105 changes: 105 additions & 0 deletions tests/bedrock_agentcore/evaluation/span_to_adot_serializer/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Shared fixtures for span_to_adot_serializer tests."""

from unittest.mock import Mock

import pytest

from bedrock_agentcore.evaluation.span_to_adot_serializer.adot_models import (
ResourceInfo,
SpanMetadata,
)


@pytest.fixture
def mock_span_context():
"""Create a mock span context."""
context = Mock()
context.trace_id = 0x1234567890ABCDEF1234567890ABCDEF
context.span_id = 0x1234567890ABCDEF
context.trace_flags = 1
return context


@pytest.fixture
def mock_resource():
"""Create a mock resource."""
resource = Mock()
resource.attributes = {"service.name": "test-service"}
return resource


@pytest.fixture
def mock_instrumentation_scope():
"""Create a mock instrumentation scope."""
scope = Mock()
scope.name = "strands.agent"
scope.version = "1.0.0"
return scope


@pytest.fixture
def mock_status():
"""Create a mock status."""
status = Mock()
status.status_code = Mock()
status.status_code.__str__ = Mock(return_value="StatusCode.OK")
return status


@pytest.fixture
def mock_span(mock_span_context, mock_resource, mock_instrumentation_scope, mock_status):
"""Create a mock OTel span."""
span = Mock()
span.context = mock_span_context
span.resource = mock_resource
span.instrumentation_scope = mock_instrumentation_scope
span.status = mock_status
span.parent = None
span.name = "test-span"
span.start_time = 1000000000
span.end_time = 2000000000
span.kind = Mock()
span.kind.__str__ = Mock(return_value="SpanKind.INTERNAL")
span.attributes = {"gen_ai.operation.name": "chat"}
span.events = []
return span


@pytest.fixture
def mock_event():
"""Create a mock span event factory."""

def _create_event(name, attributes):
event = Mock()
event.name = name
event.attributes = attributes
return event

return _create_event


@pytest.fixture
def span_metadata():
"""Create test SpanMetadata."""
return SpanMetadata(
trace_id="1234567890abcdef1234567890abcdef",
span_id="1234567890abcdef",
parent_span_id=None,
name="test-span",
start_time=1000000000,
end_time=2000000000,
duration=1000000000,
kind="INTERNAL",
flags=1,
status_code="OK",
)


@pytest.fixture
def resource_info():
"""Create test ResourceInfo."""
return ResourceInfo(
resource_attributes={"service.name": "test-service"},
scope_name="strands.agent",
scope_version="1.0.0",
)
Loading
Loading