From a9f2590672604098371766aa6a6f651aed537563 Mon Sep 17 00:00:00 2001 From: Max Ind Date: Thu, 18 Jun 2026 14:14:02 -0700 Subject: [PATCH] fix(otel): Save user.id in opentelemetry-instrumentation-google-genai emitted logs Prior implementation only covered the natively emitted logs. Co-authored-by: Max Ind PiperOrigin-RevId: 934544198 --- .../adk/telemetry/_experimental_semconv.py | 11 -- src/google/adk/telemetry/_user_id.py | 160 ++++++++++++++++++ src/google/adk/telemetry/tracing.py | 118 +++++-------- .../telemetry/functional_test_helpers.py | 6 + tests/unittests/telemetry/test_spans.py | 71 +------- .../telemetry/test_telemetry_context.py | 57 ++----- 6 files changed, 235 insertions(+), 188 deletions(-) create mode 100644 src/google/adk/telemetry/_user_id.py diff --git a/src/google/adk/telemetry/_experimental_semconv.py b/src/google/adk/telemetry/_experimental_semconv.py index 8f3fa64a9df..b366de77bf5 100644 --- a/src/google/adk/telemetry/_experimental_semconv.py +++ b/src/google/adk/telemetry/_experimental_semconv.py @@ -581,17 +581,6 @@ def _build_completion_span_attributes( # --------------------------------------------------------------------------- -def set_operation_details_common_attributes( - operation_details_common_attributes: MutableMapping[str, AttributeValue], - telemetry_config: TelemetryConfig, - attributes: Mapping[str, AttributeValue], - log_only_attributes: Mapping[str, AttributeValue] | None = None, -) -> None: - operation_details_common_attributes.update(attributes) - if log_only_attributes and telemetry_config.should_add_content_to_logs: - operation_details_common_attributes.update(log_only_attributes) - - def set_operation_details_attributes_from_request( operation_details_attributes: MutableMapping[str, AttributeValue], llm_request: LlmRequest, diff --git a/src/google/adk/telemetry/_user_id.py b/src/google/adk/telemetry/_user_id.py new file mode 100644 index 00000000000..0a15dbed91b --- /dev/null +++ b/src/google/adk/telemetry/_user_id.py @@ -0,0 +1,160 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Propagation of the ADK session ``user.id`` onto GenAI telemetry records. + +``user.id`` is propagated on the OTel context for the duration of an inference +span and copied onto the relevant log records by an installed +``LogRecordProcessor``. This single mechanism serves both inference paths: + +* the ADK-native path, where ADK emits the records itself, and +* the delegated path, where ``opentelemetry-instrumentation-google-genai`` + owns the span and the records and ignores the keys ADK stashes on the + context, so ADK cannot tag the records directly. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from collections.abc import MutableMapping +from contextlib import contextmanager +import logging +import threading +from typing import TYPE_CHECKING + +from opentelemetry import context as otel_context +from opentelemetry._logs import get_logger_provider +from opentelemetry.context import Context +from opentelemetry.sdk._logs import LogRecordProcessor +from opentelemetry.semconv._incubating.attributes.user_attributes import USER_ID +from typing_extensions import override + +from ._experimental_semconv import COMPLETION_DETAILS_EVENT_NAME +from ._stable_semconv import GEN_AI_USER_MESSAGE_EVENT + +if TYPE_CHECKING: + from opentelemetry.sdk._logs import ReadWriteLogRecord + + from .context import TelemetryConfig + +logger = logging.getLogger("google_adk." + __name__) + +# Unique, process-stable key under which the user id is stashed on the OTel +# context. ``create_key`` appends a uuid, so the key cannot collide with keys +# created elsewhere. +_USER_ID_CONTEXT_KEY = otel_context.create_key("adk-gen-ai-user-id") + +# Event names whose records carry user-authored content and should therefore be +# tagged with ``user.id``. Other records (e.g. ``gen_ai.system.message``, +# ``gen_ai.choice``) are deliberately left untouched so PII is not sprayed +# across every emitted record. +_USER_ID_EVENT_ALLOWLIST = frozenset({ + GEN_AI_USER_MESSAGE_EVENT, + COMPLETION_DETAILS_EVENT_NAME, +}) + +# Guards a single global install of the LogRecordProcessor. +_install_lock = threading.Lock() +_processor_installed = False + + +@contextmanager +def maybe_propagate_user_id_to_records( + user_id: str | None, + telemetry_config: TelemetryConfig, +) -> Iterator[None]: + """Stashes ``user_id`` on the OTel context for the user-id LogRecordProcessor. + + Wraps the whole inference span (both the ADK-native and the delegated paths). + The installed ``_UserIdLogRecordProcessor`` reads the value back off each log + record's captured context and copies it onto ``user.id``. A no-op when there + is no user id or when the per-request config disables content-bearing logs, so + ``user.id`` is only attached when message content is also being captured. + """ + _maybe_install_log_record_processor() + + if user_id is None or not telemetry_config.should_add_content_to_logs: + yield + return + token = otel_context.attach( + otel_context.set_value(_USER_ID_CONTEXT_KEY, user_id) + ) + try: + yield + finally: + otel_context.detach(token) + + +def _get_from_context(context: Context | None) -> str | None: + """Type-safe read of the propagated user id from ``context``.""" + value = otel_context.get_value(_USER_ID_CONTEXT_KEY, context) + return value if isinstance(value, str) else None + + +class _UserIdLogRecordProcessor(LogRecordProcessor): + """Copies the context-propagated ``user.id`` onto allowlisted log records. + + The records are emitted while the user-id context is active, and the OTel + ``LogRecord`` snapshots that context at construction, so the user id is + recoverable here from the record's captured context. Records emitted outside + an active user-id context, or whose event name is not allowlisted, are left + untouched. + """ + + @override + def on_emit(self, log_record: ReadWriteLogRecord) -> None: + record = log_record.log_record + if record.event_name not in _USER_ID_EVENT_ALLOWLIST: + return + user_id = _get_from_context(record.context) + if user_id is None: + return + + if isinstance(record.attributes, MutableMapping): + record.attributes[USER_ID] = user_id + else: + record.attributes = { + **(record.attributes or {}), + USER_ID: user_id, + } + + @override + def shutdown(self) -> None: + pass + + @override + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + +def _maybe_install_log_record_processor() -> None: + """Installs the user-id LogRecordProcessor once for the process. + + Idempotent: a no-op after the first successful install. Also a no-op while the + global logger provider is still the API-only no-op/proxy provider (which has + no ``add_log_record_processor``); in that case a later call retries once an + SDK logger provider is configured. + """ + global _processor_installed + if _processor_installed: + return + with _install_lock: + if _processor_installed: + return + provider = get_logger_provider() + add_processor = getattr(provider, "add_log_record_processor", None) + if add_processor is None: + return + add_processor(_UserIdLogRecordProcessor()) + _processor_installed = True diff --git a/src/google/adk/telemetry/tracing.py b/src/google/adk/telemetry/tracing.py index 1d379921a8b..98de23f2c7a 100644 --- a/src/google/adk/telemetry/tracing.py +++ b/src/google/adk/telemetry/tracing.py @@ -50,7 +50,6 @@ from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_TOOL_NAME from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_TOOL_TYPE from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GenAiSystemValues -from opentelemetry.semconv._incubating.attributes.user_attributes import USER_ID from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import Span @@ -63,7 +62,6 @@ from ._experimental_semconv import maybe_log_completion_details from ._experimental_semconv import set_operation_details_attributes_from_request from ._experimental_semconv import set_operation_details_attributes_from_response -from ._experimental_semconv import set_operation_details_common_attributes from ._serialization import safe_json_serialize from ._stable_semconv import choice_body from ._stable_semconv import GEN_AI_CHOICE_EVENT @@ -74,6 +72,7 @@ from ._stable_semconv import USER_CONTENT_ELIDED from ._stable_semconv import user_message_body from ._token_usage import TokenUsage +from ._user_id import maybe_propagate_user_id_to_records as _maybe_propagate_user_id_to_records from .context import TelemetryConfig # By default some ADK spans include attributes with potential PII data. @@ -567,23 +566,19 @@ def use_generate_content_span( "gcp.vertex.agent.event_id": model_response_event.id, "gcp.vertex.agent.invocation_id": invocation_context.invocation_id, } - log_only_common_attributes = {} - if invocation_context.session.user_id is not None: - log_only_common_attributes[USER_ID] = invocation_context.session.user_id - if _should_emit_native_telemetry(invocation_context.agent): - with _use_native_generate_content_span_stable_semconv( - llm_request=llm_request, - common_attributes=common_attributes, - log_only_common_attributes=log_only_common_attributes, - telemetry_config=telemetry_config, - ) as span: - yield span.span - else: - with _use_extra_generate_content_attributes( - common_attributes, - log_only_extra_attributes=log_only_common_attributes, - ): - yield + with _maybe_propagate_user_id_to_records( + invocation_context.session.user_id, telemetry_config + ): + if _should_emit_native_telemetry(invocation_context.agent): + with _use_native_generate_content_span_stable_semconv( + llm_request=llm_request, + common_attributes=common_attributes, + telemetry_config=telemetry_config, + ) as span: + yield span.span + else: + with _use_extra_generate_content_attributes(common_attributes): + yield @asynccontextmanager @@ -608,39 +603,35 @@ async def use_inference_span( "gcp.vertex.agent.event_id": model_response_event.id, "gcp.vertex.agent.invocation_id": invocation_context.invocation_id, } - log_only_common_attributes = {} - if invocation_context.session.user_id is not None: - log_only_common_attributes[USER_ID] = invocation_context.session.user_id - if _should_emit_native_telemetry(invocation_context.agent): - async with _use_native_generate_content_span( - llm_request=llm_request, - common_attributes=common_attributes, - log_only_common_attributes=log_only_common_attributes, - telemetry_config=telemetry_config, - ) as gc_span: - if telemetry_config.should_use_experimental_genai_semconv: - set_operation_details_common_attributes( - gc_span.operation_details_common_attributes, - telemetry_config, - common_attributes, - log_only_attributes=log_only_common_attributes, - ) - try: - yield gc_span - finally: - maybe_log_completion_details( - gc_span.span, - otel_logger, - gc_span.operation_details_attributes, - gc_span.operation_details_common_attributes, - telemetry_config, - ) - else: - with _use_extra_generate_content_attributes( - common_attributes, - log_only_extra_attributes=log_only_common_attributes, - ): - yield + # user.id is propagated on the OTel context and copied onto the relevant log + # records by the installed LogRecordProcessor (see ._user_id). This is the + # single mechanism for both the ADK-native and the delegated inference paths; + # on the delegated path the genai instrumentation library owns the records, so + # ADK cannot set the attribute directly. + with _maybe_propagate_user_id_to_records( + invocation_context.session.user_id, telemetry_config + ): + if _should_emit_native_telemetry(invocation_context.agent): + async with _use_native_generate_content_span( + llm_request=llm_request, + common_attributes=common_attributes, + telemetry_config=telemetry_config, + ) as gc_span: + if telemetry_config.should_use_experimental_genai_semconv: + gc_span.operation_details_common_attributes.update(common_attributes) + try: + yield gc_span + finally: + maybe_log_completion_details( + gc_span.span, + otel_logger, + gc_span.operation_details_attributes, + gc_span.operation_details_common_attributes, + telemetry_config, + ) + else: + with _use_extra_generate_content_attributes(common_attributes): + yield def _instrumented_with_opentelemetry_instrumentation_google_genai() -> bool: @@ -670,7 +661,6 @@ def _should_emit_native_telemetry(agent: BaseAgent) -> bool: @contextmanager def _use_extra_generate_content_attributes( extra_attributes: Mapping[str, AttributeValue], - log_only_extra_attributes: Mapping[str, AttributeValue] | None = None, ): try: from opentelemetry.instrumentation.google_genai import GENERATE_CONTENT_EXTRA_ATTRIBUTES_CONTEXT_KEY @@ -688,18 +678,6 @@ def _use_extra_generate_content_attributes( ctx = otel_context.set_value( GENERATE_CONTENT_EXTRA_ATTRIBUTES_CONTEXT_KEY, extra_attributes ) - if log_only_extra_attributes: - try: - from opentelemetry.instrumentation.google_genai import GENERATE_CONTENT_EVENT_ONLY_EXTRA_ATTRIBUTES_CONTEXT_KEY - - ctx = otel_context.set_value( - GENERATE_CONTENT_EVENT_ONLY_EXTRA_ATTRIBUTES_CONTEXT_KEY, - log_only_extra_attributes, - context=ctx, - ) - except (ImportError, AttributeError): - pass - tok = otel_context.attach(ctx) try: yield @@ -732,7 +710,6 @@ def _set_common_generate_content_attributes( def _use_native_generate_content_span_stable_semconv( llm_request: LlmRequest, common_attributes: Mapping[str, AttributeValue], - log_only_common_attributes: Mapping[str, AttributeValue] | None = None, telemetry_config: TelemetryConfig | None = None, ) -> Iterator[GenerateContentSpan]: telemetry_config = telemetry_config or TelemetryConfig() @@ -753,13 +730,6 @@ def _use_native_generate_content_span_stable_semconv( ) ) user_message_attributes = {GEN_AI_SYSTEM: _guess_gemini_system_name()} - if ( - telemetry_config.should_add_content_to_logs - and log_only_common_attributes - ): - user_id = log_only_common_attributes.get(USER_ID) - if user_id is not None: - user_message_attributes[USER_ID] = user_id for content in llm_request.contents: otel_logger.emit( @@ -778,13 +748,11 @@ async def _use_native_generate_content_span( llm_request: LlmRequest, common_attributes: Mapping[str, AttributeValue], telemetry_config: TelemetryConfig, - log_only_common_attributes: Mapping[str, AttributeValue] | None = None, ) -> AsyncIterator[GenerateContentSpan]: if not telemetry_config.should_use_experimental_genai_semconv: with _use_native_generate_content_span_stable_semconv( llm_request, common_attributes, - log_only_common_attributes=log_only_common_attributes, telemetry_config=telemetry_config, ) as gc_span: yield gc_span diff --git a/tests/unittests/telemetry/functional_test_helpers.py b/tests/unittests/telemetry/functional_test_helpers.py index a778ab2abe3..45d8dffe5af 100644 --- a/tests/unittests/telemetry/functional_test_helpers.py +++ b/tests/unittests/telemetry/functional_test_helpers.py @@ -49,6 +49,7 @@ from google.adk.runners import InMemoryRunner from google.adk.telemetry import node_tracing from google.adk.telemetry import tracing +from google.adk.telemetry._user_id import _UserIdLogRecordProcessor from google.adk.tools.function_tool import FunctionTool from google.adk.workflow._base_node import START from google.adk.workflow._workflow import Workflow @@ -315,6 +316,11 @@ def install_telemetry( ) logger_provider = LoggerProvider() + # Tag records with user.id before they are exported, mirroring the + # process-global install done by maybe_install_log_record_processor(). It must + # run before the exporting processor since SimpleLogRecordProcessor exports + # synchronously in on_emit. + logger_provider.add_log_record_processor(_UserIdLogRecordProcessor()) logger_provider.add_log_record_processor( SimpleLogRecordProcessor(log_exporter) ) diff --git a/tests/unittests/telemetry/test_spans.py b/tests/unittests/telemetry/test_spans.py index 477da7ea11d..247a4e59f60 100644 --- a/tests/unittests/telemetry/test_spans.py +++ b/tests/unittests/telemetry/test_spans.py @@ -27,7 +27,6 @@ from google.adk.models.llm_response import LlmResponse from google.adk.sessions.in_memory_session_service import InMemorySessionService from google.adk.telemetry._experimental_semconv import _safe_json_serialize_no_whitespaces -from google.adk.telemetry.tracing import _use_extra_generate_content_attributes from google.adk.telemetry.tracing import ADK_CAPTURE_MESSAGE_CONTENT_IN_SPANS from google.adk.telemetry.tracing import GCP_MCP_SERVER_DESTINATION_ID from google.adk.telemetry.tracing import safe_json_serialize @@ -961,11 +960,11 @@ async def test_generate_content_span( assert len(user_logs) == 2 assert expected_user1_body == user_logs[0].body assert expected_user2_body == user_logs[1].body - expected_user_log_attributes = {GEN_AI_SYSTEM: 'test_system'} - if capture_content and user_id is not None: - expected_user_log_attributes[USER_ID] = user_id + # user.id is no longer baked into the emitted record; it is added downstream + # by the installed _UserIdLogRecordProcessor (which the mocked logger here + # bypasses). See test_functional for the end-to-end user.id assertions. for log in user_logs: - assert log.attributes == expected_user_log_attributes + assert log.attributes == {GEN_AI_SYSTEM: 'test_system'} choice_log = next( (lr for lr in log_records if lr.event_name == 'gen_ai.choice'), @@ -1308,14 +1307,10 @@ async def test_generate_content_span_with_experimental_semconv( attributes = operation_details_log.attributes - if ( - capture_content in ['EVENT_ONLY', 'SPAN_AND_EVENT'] - and user_id is not None - ): - assert USER_ID in attributes - assert attributes[USER_ID] == user_id - else: - assert USER_ID not in attributes + # user.id is no longer baked into the emitted record; it is added downstream + # by the installed _UserIdLogRecordProcessor (which the mocked logger here + # bypasses). See test_functional for the end-to-end user.id assertions. + assert USER_ID not in attributes if capture_content in ['SPAN_AND_EVENT', 'EVENT_ONLY']: assert GEN_AI_SYSTEM_INSTRUCTIONS in attributes @@ -1470,56 +1465,6 @@ def test_safe_json_serialize_no_whitespaces_recursion_error_returns_not_serializ assert _safe_json_serialize_no_whitespaces({'a': 1}) == '' -def test_use_extra_generate_content_attributes_upgraded_version(monkeypatch): - # Arrange: Mock the presence of the new event-only context key in the contrib module - from opentelemetry.instrumentation import google_genai - - mock_event_only_key = 'MOCKED_EVENT_ONLY_EXTRA_ATTRIBUTES_CONTEXT_KEY' - monkeypatch.setattr( - google_genai, - 'GENERATE_CONTENT_EVENT_ONLY_EXTRA_ATTRIBUTES_CONTEXT_KEY', - mock_event_only_key, - raising=False, - ) - - # Act: Run the helper with mock.patch on the otel context - with mock.patch('opentelemetry.context.set_value') as mock_set_value: - with _use_extra_generate_content_attributes( - extra_attributes={'span.attr': 'value'}, - log_only_extra_attributes={USER_ID: 'user_123'}, - ): - pass - - # Assert: Verify set_value was called with the mocked event-only key - mock_set_value.assert_any_call( - mock_event_only_key, - {USER_ID: 'user_123'}, - context=mock.ANY, - ) - - -def test_use_extra_generate_content_attributes_older_version(monkeypatch): - # Arrange: Simulate an older version by deleting the key if present - from opentelemetry.instrumentation import google_genai - - if hasattr( - google_genai, 'GENERATE_CONTENT_EVENT_ONLY_EXTRA_ATTRIBUTES_CONTEXT_KEY' - ): - monkeypatch.delattr( - google_genai, 'GENERATE_CONTENT_EVENT_ONLY_EXTRA_ATTRIBUTES_CONTEXT_KEY' - ) - - # Act & Assert: Ensure execution does not throw any ImportError/AttributeError - try: - with _use_extra_generate_content_attributes( - extra_attributes={'span.attr': 'value'}, - log_only_extra_attributes={USER_ID: 'user_123'}, - ): - pass - except Exception as e: # pylint: disable=broad-exception-caught - pytest.fail(f'Graceful degradation failed: {e}') - - # --------------------------------------------------------------------------- # Tests for _detect_error_in_response # --------------------------------------------------------------------------- diff --git a/tests/unittests/telemetry/test_telemetry_context.py b/tests/unittests/telemetry/test_telemetry_context.py index b3664f1c2b1..cf1fdcf3fc8 100644 --- a/tests/unittests/telemetry/test_telemetry_context.py +++ b/tests/unittests/telemetry/test_telemetry_context.py @@ -25,7 +25,8 @@ from google.adk.telemetry import ContentCapturingMode from google.adk.telemetry import TelemetryConfig from google.adk.telemetry import tracing -from google.adk.telemetry._experimental_semconv import set_operation_details_common_attributes +from google.adk.telemetry._user_id import _get_from_context +from google.adk.telemetry._user_id import maybe_propagate_user_id_to_records from google.adk.telemetry.context import ADK_TELEMETRY_IGNORE_RUN_CONFIG from google.adk.telemetry.tracing import trace_inference_result from google.genai.types import Part @@ -450,49 +451,27 @@ def test_admin_lock_falls_back_to_env_not_per_request_field( assert cfg.should_add_content_to_legacy_spans is False -# --------------------------------------------------------------------------- -# set_operation_details_common_attributes: must honor the per-request config. -# -# log_only_attributes carry PII-ish data (e.g. user_id). The gate must consult -# the per-request TelemetryConfig, not just the process-global env var, or a -# request that opted out via capture_message_content=NO_CONTENT would still leak -# log-only attributes when the host env defaults to a content-capturing mode. -# --------------------------------------------------------------------------- - - -def _run_set_common_attrs( - telemetry_config: Optional[TelemetryConfig], -) -> dict: - """Runs set_operation_details_common_attributes and returns the result map.""" - out: dict = {} - set_operation_details_common_attributes( - out, - telemetry_config or TelemetryConfig(), - {'gen_ai.operation.name': 'chat'}, - log_only_attributes={'gen_ai.user.id': 'user-123'}, - ) - return out - - -def test_set_common_attrs_cfg_no_content_overrides_env_capture( +def test_user_id_propagation_cfg_no_content_overrides_env_capture( monkeypatch: pytest.MonkeyPatch, ): - """Per-request NO_CONTENT suppresses PII-ish log-only attrs even if env opts in. - - Security regression guard: ``log_only_attributes`` (e.g. ``user_id``) must be - gated on the per-request config, not just the process-global env var, or a - request that opted out via ``capture_message_content=NO_CONTENT`` would leak - log-only attributes when the host env defaults to a content-capturing mode. - The functional ``Runner`` tests do not assert on log-only attribute routing, - so this stays as a dedicated unit guard. + """Per-request NO_CONTENT suppresses user.id propagation even if env opts in. + + Security regression guard: ``user.id`` must be gated on the per-request + config, not just the process-global env var, or a request that opted out via + ``capture_message_content=NO_CONTENT`` would leak the user id onto records + when the host env defaults to a content-capturing mode. """ monkeypatch.setenv(_ENV_CAPTURE, 'EVENT_ONLY') - out = _run_set_common_attrs( - TelemetryConfig(capture_message_content=ContentCapturingMode.NO_CONTENT) + no_content = TelemetryConfig( + capture_message_content=ContentCapturingMode.NO_CONTENT ) - assert 'gen_ai.user.id' not in out - # Non-log-only attributes are always set. - assert out['gen_ai.operation.name'] == 'chat' + with maybe_propagate_user_id_to_records('user-123', no_content): + assert _get_from_context(None) is None + + # Sanity check the positive path: when content is captured the user id is + # propagated for the LogRecordProcessor to pick up. + with maybe_propagate_user_id_to_records('user-123', TelemetryConfig()): + assert _get_from_context(None) == 'user-123' # ---------------------------------------------------------------------------