Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 51 additions & 12 deletions python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -1778,12 +1778,17 @@ def _trace_agent_invocation(
**merged_client_kwargs,
)

inner_response_telemetry_captured_fields: set[str] = set()
inner_response_telemetry_captured_fields_token = INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.set(
inner_response_telemetry_captured_fields
)
inner_accumulated_usage_token = INNER_ACCUMULATED_USAGE.set({})
if stream:
# Do NOT set the inner-telemetry context vars here: this synchronous run() body executes
# in the CALLER's context, but the ResponseStream may be consumed in a different context
# (e.g. ``stream = agent.run(stream=True)`` then ``await asyncio.create_task(consume(stream))``).
# The cleanup-hook reset (in _finalize_stream) runs in the consuming context, so a token
# created here would raise ``ValueError: <Token ...> was created in a different Context``.
# Instead the tokens are set lazily on the first pull (see _inner_telemetry_pull_context
# below), so set and reset both happen in the consumer's context.
inner_response_telemetry_captured_fields: set[str] = set()
inner_response_telemetry_captured_fields_token: contextvars.Token[set[str] | None] | None = None
inner_accumulated_usage_token: contextvars.Token[UsageDetails | None] | None = None
span = _start_streaming_span(attributes, OtelAttr.AGENT_NAME)

if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages and span.is_recording():
Expand Down Expand Up @@ -1827,8 +1832,9 @@ def _record_duration() -> None:
raise RuntimeError("Streaming telemetry requires a ResponseStream result.")
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
# The contextvars are set lazily on the first pull, which only happens after this
# synchronous setup phase returns the stream, so the tokens are still unset here and
# there is nothing to reset.
Comment on lines +1835 to +1837

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the comments here isn't associated with any actual code. I suspect these are copilot notes. Maybe we can remove.

_close_span()
raise

Expand Down Expand Up @@ -1868,26 +1874,59 @@ async def _finalize_stream() -> None:
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
finally:
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
# Reset only if the lazy set actually ran (it may not have if the stream was
# never pulled). These run in the consuming context — the same context the
# pull-context factory below set the tokens in — so the reset is cross-context safe.
if inner_response_telemetry_captured_fields_token is not None:
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
if inner_accumulated_usage_token is not None:
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
_close_span()

def _inner_telemetry_pull_context() -> contextlib.AbstractContextManager[Any]:
# Invoked at the start of every pull (and during stream resolution), in the
# consuming context. On the first pull it sets the inner-telemetry context vars so
# that set and the reset in _finalize_stream both run in the consumer's context,
# avoiding the cross-context Token reset failure. Setting happens before the
# underlying iterator is pulled, so inner chat completion spans created during the
# pull can still accumulate usage / mark captured fields.
nonlocal inner_response_telemetry_captured_fields_token, inner_accumulated_usage_token
if inner_response_telemetry_captured_fields_token is None:
inner_response_telemetry_captured_fields_token = INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.set(
inner_response_telemetry_captured_fields
)
inner_accumulated_usage_token = INNER_ACCUMULATED_USAGE.set({})
return _activate_span(span)

# The pull context manager attaches the span around each underlying iterator pull so
# that child spans created during the pull (e.g. inner chat completion spans from the
# underlying ChatTelemetryLayer) are parented under this agent invoke span. Attach and
# detach happen in the same async context as the pull, avoiding cross-context cleanup
# issues. The weakref finalizer ensures the span is closed even if the stream is
# garbage collected without being consumed.
# issues. It also lazily sets the inner-telemetry context vars on the first pull (see
# _inner_telemetry_pull_context). The weakref finalizer ensures the span is closed even
# if the stream is garbage collected without being consumed.
wrapped_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]] = (
result_stream
.with_cleanup_hook(_record_duration)
.with_cleanup_hook(_finalize_stream)
.with_pull_context_manager(lambda: _activate_span(span))
.with_pull_context_manager(_inner_telemetry_pull_context)
)
weakref.finalize(wrapped_stream, _close_span)
return wrapped_stream

async def _run() -> AgentResponse[Any]:
# Set the inner-telemetry context vars inside the coroutine so the set and the
# reset in `finally` always happen in the same execution context. `run()` is a sync
# method that returns this coroutine, which may be awaited in a different context than
# the one that called `run()` (e.g. `asyncio.create_task(agent.run(...))`, as used by
# BackgroundAgentsProvider). A contextvars.Token can only be reset in the context that
# created it, so setting eagerly in `run()`/`_trace_agent_invocation` and resetting
# here would raise "Token was created in a different Context".
inner_response_telemetry_captured_fields: set[str] = set()
inner_response_telemetry_captured_fields_token = INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.set(
inner_response_telemetry_captured_fields
)
inner_accumulated_usage_token = INNER_ACCUMULATED_USAGE.set({})
try:
with _get_span(attributes=attributes, span_name_attribute=OtelAttr.AGENT_NAME) as span:
try:
Expand Down
208 changes: 207 additions & 1 deletion python/packages/core/tests/core/test_observability.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import logging
from collections.abc import AsyncIterable, Awaitable, Mapping, Sequence
from typing import Any, cast
Expand Down Expand Up @@ -5249,7 +5250,212 @@ class FailingExecuteAgent(AgentTelemetryLayer, _FailingExecuteAgent): # type: i
assert agent_spans[0].status.status_code == StatusCode.ERROR


# region Test heavy operations skipped when span is not recording
@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True)
async def test_agent_run_contextvars_safe_when_awaited_in_different_context(
span_exporter: InMemorySpanExporter, enable_sensitive_data
):
"""``run()`` is a sync method that returns an awaitable; the telemetry contextvar set and reset
must happen in the same execution context so the returned coroutine can be awaited in a different
context.

Regression for background agents (``BackgroundAgentsProvider``), which do
``asyncio.create_task(agent.run(...))``: ``run()`` executes synchronously in the parent context
while the returned coroutine is awaited in a fresh copied context. If the contextvar token were
created eagerly in the parent context but reset inside the coroutine, this raised
``ValueError: <Token ...> was created in a different Context``.
"""

class _SimpleAgent:
AGENT_PROVIDER_NAME = "test_provider"

def __init__(self):
self._id = "simple"
self._name = "Simple"
self._description = "Agent that returns a response without raising"
self._default_options: dict[str, Any] = {}

@property
def id(self):
return self._id

@property
def name(self):
return self._name

@property
def description(self):
return self._description

@property
def default_options(self):
return self._default_options

def run(self, messages=None, *, stream: bool = False, session=None, **kwargs):
async def _inner() -> AgentResponse:
return AgentResponse(messages=[Message(role="assistant", contents=["hi"])])

return _inner()

class SimpleAgent(AgentTelemetryLayer, _SimpleAgent): # type: ignore[misc]
pass

agent = SimpleAgent()
span_exporter.clear()

# Mimic BackgroundAgentsProvider: invoke run() synchronously in this context, then await the
# returned coroutine inside a separate task (a different/copied context).
awaitable = agent.run(messages="Hello", stream=False)

async def _runner(aw):
return await aw

result = await asyncio.create_task(_runner(awaitable))
assert isinstance(result, AgentResponse)

spans = span_exporter.get_finished_spans()
agent_spans = [s for s in spans if s.attributes.get(OtelAttr.OPERATION.value) == OtelAttr.AGENT_INVOKE_OPERATION] # type: ignore[union-attr] # ty: ignore[unresolved-attribute]
assert len(agent_spans) == 1
assert agent_spans[0].status.status_code != StatusCode.ERROR


@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True)
async def test_agent_run_error_path_contextvars_safe_when_awaited_in_different_context(
span_exporter: InMemorySpanExporter, enable_sensitive_data
):
"""Error-path variant: the coroutine returned by ``run()`` raises, and is awaited in a different
context via ``asyncio.create_task``. The telemetry contextvars are set and reset inside the
coroutine (its ``finally``), so the reset on the exception path must not raise
``ValueError: <Token ...> was created in a different Context``; the original error must surface.
"""

class _FailingRunAgent:
AGENT_PROVIDER_NAME = "test_provider"

def __init__(self):
self._id = "failing_run"
self._name = "Failing Run"
self._description = "Agent whose run coroutine raises"
self._default_options: dict[str, Any] = {}

@property
def id(self):
return self._id

@property
def name(self):
return self._name

@property
def description(self):
return self._description

@property
def default_options(self):
return self._default_options

def run(self, messages=None, *, stream: bool = False, session=None, **kwargs):
async def _inner() -> AgentResponse:
raise RuntimeError("run failed")

return _inner()

class FailingRunAgent(AgentTelemetryLayer, _FailingRunAgent): # type: ignore[misc]
pass

agent = FailingRunAgent()
span_exporter.clear()

awaitable = agent.run(messages="Hello", stream=False)

async def _runner(aw):
return await aw

# The original RuntimeError must propagate unchanged — not a cross-context ValueError from the
# contextvar reset in the coroutine's finally block.
with pytest.raises(RuntimeError, match="run failed"):
await asyncio.create_task(_runner(awaitable))

spans = span_exporter.get_finished_spans()
agent_spans = [s for s in spans if s.attributes.get(OtelAttr.OPERATION.value) == OtelAttr.AGENT_INVOKE_OPERATION] # type: ignore[union-attr] # ty: ignore[unresolved-attribute]
assert len(agent_spans) == 1
assert agent_spans[0].status.status_code == StatusCode.ERROR


@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True)
async def test_agent_streaming_contextvars_safe_when_consumed_in_different_context(
span_exporter: InMemorySpanExporter, enable_sensitive_data
):
"""``run(stream=True)`` returns a ``ResponseStream`` synchronously, but its cleanup hooks (which
reset the telemetry contextvars) run when the stream is *consumed* — possibly in a different
context (e.g. ``stream = agent.run(stream=True)`` then ``await asyncio.create_task(consume(stream))``).

The contextvars are therefore set lazily on the first pull, in the consuming context, so the set
and the reset both run there. Otherwise this raised
``ValueError: <Token ...> was created in a different Context``.
"""
from agent_framework import AgentResponseUpdate

class _StreamingAgent:
AGENT_PROVIDER_NAME = "test_provider"

def __init__(self):
self._id = "streaming_xctx"
self._name = "Streaming XCtx"
self._description = "Streaming agent for cross-context consumption"
self._default_options: dict[str, Any] = {}

@property
def id(self):
return self._id

@property
def name(self):
return self._name

@property
def description(self):
return self._description

@property
def default_options(self):
return self._default_options

def run(self, messages=None, *, stream: bool = False, session=None, **kwargs):
if stream:

async def _stream():
yield AgentResponseUpdate(contents=[Content.from_text("Hello ")], role="assistant")
yield AgentResponseUpdate(contents=[Content.from_text("World")], role="assistant")

return ResponseStream(_stream(), finalizer=AgentResponse.from_updates)
raise NotImplementedError

class StreamingAgent(AgentTelemetryLayer, _StreamingAgent): # type: ignore[misc]
pass

agent = StreamingAgent()
span_exporter.clear()

# Create the stream synchronously in this context, then consume it inside a separate task (a
# different/copied context) — mirroring how a caller might hand the stream off to be drained.
stream = agent.run(messages="Hello", stream=True)

async def _consume(s):
collected = []
async for update in s:
collected.append(update)
await s.get_final_response()
return collected

updates = await asyncio.create_task(_consume(stream))
assert len(updates) == 2

spans = span_exporter.get_finished_spans()
agent_spans = [s for s in spans if s.attributes.get(OtelAttr.OPERATION.value) == OtelAttr.AGENT_INVOKE_OPERATION] # type: ignore[union-attr] # ty: ignore[unresolved-attribute]
assert len(agent_spans) == 1
assert agent_spans[0].status.status_code != StatusCode.ERROR


#
# When ``ENABLE_INSTRUMENTATION`` is on (the default) but no OpenTelemetry
# tracer provider has been configured, the global provider is the
Expand Down
Loading
Loading