-
Notifications
You must be signed in to change notification settings - Fork 109
test: add multi-turn conversation serialization integration test #455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,178 @@ | ||
| """Integration test: multi-turn Strands conversation history preservation. | ||
|
|
||
| Ticket V2197498381 reported that the span serializer dropped all but the most | ||
| recent user turn and mixed prior assistant turns into the current-turn output. | ||
| This test runs a real multi-turn Strands agent, captures the emitted OTel | ||
| spans, and asserts that `convert_strands_to_adot` serializes the full | ||
| conversation in chronological order. | ||
|
|
||
| Run with: | ||
| AWS_PROFILE=<profile> BEDROCK_TEST_REGION=us-west-2 \ | ||
| uv run pytest tests_integ/evaluation/integrations/strands_agents_evals/\ | ||
| test_multi_turn_conversation_serialization.py -xvs --log-cli-level=INFO | ||
| """ | ||
|
|
||
| import logging | ||
| import os | ||
|
|
||
| import pytest | ||
| from strands import Agent | ||
| from strands_evals.telemetry import StrandsEvalsTelemetry | ||
|
|
||
| from bedrock_agentcore.evaluation.span_to_adot_serializer import convert_strands_to_adot | ||
|
|
||
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
| REGION = os.environ.get("BEDROCK_TEST_REGION", "us-west-2") | ||
|
|
||
| pytestmark = pytest.mark.filterwarnings("ignore::UserWarning:pydantic.main") | ||
|
|
||
|
|
||
| @pytest.mark.integration | ||
| class TestMultiTurnConversationSerialization: | ||
| """End-to-end validation that multi-turn history survives serialization.""" | ||
|
|
||
| def _run_multi_turn_agent(self, turns: list[str]) -> list: | ||
| """Run an agent through several user turns and return the captured spans.""" | ||
| telemetry = StrandsEvalsTelemetry().setup_in_memory_exporter() | ||
| agent = Agent( | ||
| system_prompt=( | ||
| "You are a helpful assistant. Keep responses short — one sentence. " | ||
| "Remember details the user shares across turns." | ||
| ), | ||
| ) | ||
| for user_msg in turns: | ||
| response = agent(user_msg) | ||
| logger.info("user=%r -> assistant=%r", user_msg, str(response)[:80]) | ||
|
|
||
| return list(telemetry.in_memory_exporter.get_finished_spans()) | ||
|
|
||
| @staticmethod | ||
| def _flatten(msg: dict) -> str: | ||
| """Extract the text payload from a serialized message entry.""" | ||
| content = msg.get("content", "") | ||
| if isinstance(content, dict): | ||
| for key in ("content", "message"): | ||
| val = content.get(key) | ||
| if isinstance(val, str): | ||
| return val | ||
| return str(content) | ||
| return str(content) | ||
|
|
||
| def _find_multi_turn_chat_record(self, adot_docs: list, raw_spans: list) -> tuple[dict, int]: | ||
| """Locate the ADOT log record derived from the final multi-turn Strands chat span. | ||
|
|
||
| Strands emits three span kinds per turn; only ``chat`` spans carry the | ||
| full conversation history as span events. This test exercises that span | ||
| specifically. Returns the log record and the number of prior | ||
| assistant-message history events observed on the source span. | ||
| """ | ||
| chat_spans = [s for s in raw_spans if s.name == "chat"] | ||
| assert chat_spans, f"Expected at least one Strands 'chat' span, got span names: {[s.name for s in raw_spans]}" | ||
| final_chat_span = chat_spans[-1] | ||
| history_asst_events = [e for e in final_chat_span.events if e.name == "gen_ai.assistant.message"] | ||
| expected_history_count = len(history_asst_events) | ||
|
|
||
| trace_id = format(final_chat_span.context.trace_id, "032x") | ||
| span_id = format(final_chat_span.context.span_id, "016x") | ||
| matching = [ | ||
| d | ||
| for d in adot_docs | ||
| if d.get("traceId") == trace_id | ||
| and d.get("spanId") == span_id | ||
| and "body" in d | ||
| and "input" in d.get("body", {}) | ||
| ] | ||
| assert matching, f"Could not find log record for final 'chat' span traceId={trace_id} spanId={span_id}" | ||
| return matching[0], expected_history_count | ||
|
|
||
| def test_multi_turn_history_preserved_in_adot_serialization(self): | ||
| """The final-turn chat span must serialize full conversation history.""" | ||
| turns = [ | ||
| "My name is Taro.", | ||
| "I live in Seattle.", | ||
| "I work on distributed systems.", | ||
| "What is my name and where do I live?", | ||
| ] | ||
|
|
||
| raw_spans = self._run_multi_turn_agent(turns) | ||
| assert raw_spans, "Expected Strands to emit OTel spans" | ||
|
|
||
| adot_docs = convert_strands_to_adot(raw_spans) | ||
| assert adot_docs, "convert_strands_to_adot returned nothing" | ||
|
|
||
| record, expected_history_count = self._find_multi_turn_chat_record(adot_docs, raw_spans) | ||
| input_messages = record["body"]["input"]["messages"] | ||
| output_messages = record["body"]["output"]["messages"] | ||
|
|
||
| user_entries = [m for m in input_messages if m.get("role") == "user"] | ||
| assistant_entries = [m for m in input_messages if m.get("role") == "assistant"] | ||
|
|
||
| user_texts = [self._flatten(m) for m in user_entries] | ||
| assistant_texts = [self._flatten(m) for m in assistant_entries] | ||
| output_texts = [self._flatten(m) for m in output_messages] | ||
|
|
||
| logger.info("final chat span expected prior-assistant history events: %d", expected_history_count) | ||
| logger.info("final-turn input.messages user entries: %d", len(user_entries)) | ||
| logger.info("final-turn input.messages assistant entries: %d", len(assistant_entries)) | ||
| logger.info("final-turn output.messages entries: %d", len(output_messages)) | ||
| for i, t in enumerate(user_texts): | ||
| logger.info(" user[%d]: %r", i, t[:100]) | ||
| for i, t in enumerate(assistant_texts): | ||
| logger.info(" history-assistant[%d]: %r", i, t[:100]) | ||
| for i, t in enumerate(output_texts): | ||
| logger.info(" output[%d]: %r", i, t[:100]) | ||
|
|
||
| # Bug 1 regression: every prior user turn must survive serialization. | ||
| assert len(user_entries) == len(turns), ( | ||
| f"Pre-fix bug dropped all but newest user turn. Expected {len(turns)} user entries " | ||
| f"on the final chat span, got {len(user_entries)}. texts={user_texts}" | ||
| ) | ||
| joined_users = " | ".join(user_texts) | ||
| for probe in ("Taro", "Seattle", "distributed systems"): | ||
| assert probe in joined_users, ( | ||
| f"Expected earlier user turn containing {probe!r} to survive; " | ||
| f"final-turn user entries were: {user_texts}" | ||
| ) | ||
|
|
||
| # Bug 2 regression: prior assistant turns must land in input, not output. | ||
| assert len(assistant_entries) == expected_history_count, ( | ||
| f"Expected {expected_history_count} prior assistant turns in input.messages " | ||
| f"(matching the gen_ai.assistant.message events on the span); " | ||
| f"got {len(assistant_entries)}" | ||
| ) | ||
|
|
||
| # Output on the last turn must be exactly one entry (gen_ai.choice), not | ||
| # N+1 entries from history being mixed in. | ||
| assert len(output_messages) == 1, ( | ||
| f"Pre-fix bug mixed history into output.messages. Expected exactly 1 output " | ||
| f"entry on final turn, got {len(output_messages)}: {output_texts}" | ||
| ) | ||
|
|
||
| # Output must not contain stale assistant-history content. | ||
| joined_output = " ".join(output_texts).lower() | ||
| assert joined_output, "output.messages was empty" | ||
| for stale in ( | ||
| "nice to meet you, taro", | ||
| "got it, taro from seattle", | ||
| "that's interesting work", | ||
| ): | ||
| assert stale not in joined_output, ( | ||
| f"output.messages contains stale history entry containing {stale!r}: {output_texts}" | ||
| ) | ||
|
|
||
| # Chronological ordering: roles must interleave, not group by role. | ||
| roles_in_order = [m["role"] for m in input_messages] | ||
| # For N user turns with N-1 prior assistant turns, the expected order is | ||
| # user, assistant, user, assistant, ..., user. | ||
| expected_pattern = [] | ||
| for i in range(len(turns)): | ||
| expected_pattern.append("user") | ||
| if i < expected_history_count: | ||
| expected_pattern.append("assistant") | ||
| assert roles_in_order == expected_pattern, ( | ||
| f"Expected chronological role interleaving {expected_pattern}; got {roles_in_order}. " | ||
| f"Pre-fix role-grouped ordering would have emitted " | ||
| f"[{'user, ' * len(turns)}{'assistant, ' * expected_history_count}] instead." | ||
| ) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential contradiction with PR #454's builder ordering. This asserts chronological interleaving: However, input_messages = [{...} for user_msg in conversation.user_messages] # all users
input_messages.extend(conversation.history_messages) # then all assistantsThat would produce Can you confirm: does |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: These stale-content assertions are fragile — they assume specific model responses (
"nice to meet you, taro", etc.) that Claude may not produce. When the model responds differently, these pass vacuously without actually testing anything. Thelen(output_messages) == 1assertion above already structurally validates that history isn't leaking into output. Consider removing these or replacing with a structural check (e.g., verify no output text substring appears in any input entry).