From 411beb7038565720321b13c70e540f8e5507d489 Mon Sep 17 00:00:00 2001 From: Nagkumar Arkalgud Date: Tue, 24 Feb 2026 09:38:47 -0800 Subject: [PATCH] Add GenAI memory span instrumentation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../CHANGELOG.md | 3 + .../langchain/callback_handler.py | 197 +++++++++++++++++- .../tests/test_callback_handler_memory.py | 141 +++++++++++++ 3 files changed, 340 insertions(+), 1 deletion(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_callback_handler_memory.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md index 3db24e484e..911d0445fc 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md @@ -13,3 +13,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3889](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3889)) - Added log and metrics provider to langchain genai utils handler ([#4214](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4214)) +- Added retriever memory search span/event instrumentation aligned with the + GenAI memory semantic convention proposal. + ([#3250](https://github.com/open-telemetry/semantic-conventions/pull/3250)) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 8f642567ca..ee48ce3c5f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -14,7 +14,7 @@ from __future__ import annotations -from typing import Any, Optional +from typing import Any, Optional, Sequence from uuid import UUID from langchain_core.callbacks import BaseCallbackHandler @@ -24,14 +24,60 @@ from opentelemetry.instrumentation.langchain.invocation_manager import ( _InvocationManager, ) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( + ContentCapturingMode, Error, InputMessage, LLMInvocation, OutputMessage, Text, ) +from opentelemetry.util.genai.utils import ( + get_content_capturing_mode, + is_experimental_mode, +) + +GEN_AI_MEMORY_STORE_ID = getattr( + GenAI, "GEN_AI_MEMORY_STORE_ID", "gen_ai.memory.store.id" +) +GEN_AI_MEMORY_STORE_NAME = getattr( + GenAI, "GEN_AI_MEMORY_STORE_NAME", "gen_ai.memory.store.name" +) +GEN_AI_MEMORY_QUERY = getattr( + GenAI, "GEN_AI_MEMORY_QUERY", "gen_ai.memory.query" +) +GEN_AI_MEMORY_SEARCH_RESULT_COUNT = getattr( + GenAI, + "GEN_AI_MEMORY_SEARCH_RESULT_COUNT", + "gen_ai.memory.search.result.count", +) +GEN_AI_MEMORY_NAMESPACE = getattr( + GenAI, "GEN_AI_MEMORY_NAMESPACE", "gen_ai.memory.namespace" +) + +_SEARCH_MEMORY_MEMBER = getattr( + getattr(GenAI, "GenAiOperationNameValues", object()), + "SEARCH_MEMORY", + None, +) +SEARCH_MEMORY_OPERATION = ( + _SEARCH_MEMORY_MEMBER.value + if _SEARCH_MEMORY_MEMBER is not None + else "search_memory" +) + +_RETRIEVAL_MEMBER = getattr( + getattr(GenAI, "GenAiOperationNameValues", object()), + "RETRIEVAL", + None, +) +RETRIEVAL_OPERATION = ( + _RETRIEVAL_MEMBER.value if _RETRIEVAL_MEMBER is not None else "retrieval" +) class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler): @@ -44,6 +90,62 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None: self._telemetry_handler = telemetry_handler self._invocation_manager = _InvocationManager() + @staticmethod + def _resolve_retriever_store_name( + serialized: dict[str, Any], + metadata: Optional[dict[str, Any]], + ) -> Optional[str]: + if metadata and metadata.get("memory_store_name"): + return str(metadata["memory_store_name"]) + if metadata and metadata.get("ls_retriever_name"): + return str(metadata["ls_retriever_name"]) + name = serialized.get("name") + return str(name) if isinstance(name, str) and name else None + + @staticmethod + def _resolve_retriever_store_id( + serialized: dict[str, Any], + metadata: Optional[dict[str, Any]], + ) -> Optional[str]: + if metadata and metadata.get("memory_store_id"): + return str(metadata["memory_store_id"]) + + serialized_id = serialized.get("id") + if isinstance(serialized_id, str) and serialized_id: + return serialized_id + if isinstance(serialized_id, list) and serialized_id: + try: + return ".".join(str(part) for part in serialized_id) # type: ignore[reportUnknownArgumentType, reportUnknownVariableType] + except TypeError: + return None + return None + + @staticmethod + def _should_capture_memory_query() -> bool: + if not is_experimental_mode(): + return False + try: + mode = get_content_capturing_mode() + except ValueError: + return False + return mode in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + + @staticmethod + def _is_memory_retriever( + metadata: Optional[dict[str, Any]], + ) -> bool: + """Detect if a retriever is a memory retriever based on metadata hints.""" + if not metadata: + return False + return bool( + metadata.get("memory_store_name") + or metadata.get("memory_store_id") + or metadata.get("is_memory_retriever") + ) + def on_chat_model_start( self, serialized: dict[str, Any], @@ -268,3 +370,96 @@ def on_llm_error( ) if llm_invocation.span and not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) + + def on_retriever_start( + self, + serialized: dict[str, Any], + query: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + provider = "unknown" + if metadata is not None: + provider = metadata.get("ls_provider", "unknown") + + attributes: dict[str, Any] = {} + is_memory = self._is_memory_retriever(metadata) + operation = ( + SEARCH_MEMORY_OPERATION if is_memory else RETRIEVAL_OPERATION + ) + + if store_name := self._resolve_retriever_store_name( + serialized, metadata + ): + attributes[GEN_AI_MEMORY_STORE_NAME] = store_name + if store_id := self._resolve_retriever_store_id(serialized, metadata): + attributes[GEN_AI_MEMORY_STORE_ID] = store_id + if query and self._should_capture_memory_query(): + attributes[GEN_AI_MEMORY_QUERY] = query + if metadata and metadata.get("memory_namespace"): + attributes[GEN_AI_MEMORY_NAMESPACE] = metadata["memory_namespace"] + + llm_invocation = LLMInvocation( + request_model="", + provider=provider, + operation_name=operation, + attributes=attributes, + ) + llm_invocation = self._telemetry_handler.start_llm( + invocation=llm_invocation + ) + if llm_invocation.span and store_name: + llm_invocation.span.update_name(f"{operation} {store_name}") + self._invocation_manager.add_invocation_state( + run_id=run_id, + parent_run_id=parent_run_id, + invocation=llm_invocation, + ) + + def on_retriever_end( + self, + documents: Sequence[Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) + if llm_invocation is None or not isinstance( + llm_invocation, LLMInvocation + ): + return + + llm_invocation.attributes[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] = len( + documents + ) + llm_invocation = self._telemetry_handler.stop_llm( + invocation=llm_invocation + ) + if llm_invocation.span and not llm_invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id=run_id) + + def on_retriever_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) + if llm_invocation is None or not isinstance( + llm_invocation, LLMInvocation + ): + return + + error_otel = Error(message=str(error), type=type(error)) + llm_invocation = self._telemetry_handler.fail_llm( + invocation=llm_invocation, error=error_otel + ) + if llm_invocation.span and not llm_invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id=run_id) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_callback_handler_memory.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_callback_handler_memory.py new file mode 100644 index 0000000000..484c0819e2 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_callback_handler_memory.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +from unittest import mock +from uuid import uuid4 + +from opentelemetry.instrumentation.langchain.callback_handler import ( + GEN_AI_MEMORY_QUERY, + GEN_AI_MEMORY_SEARCH_RESULT_COUNT, + GEN_AI_MEMORY_STORE_ID, + GEN_AI_MEMORY_STORE_NAME, + RETRIEVAL_OPERATION, + SEARCH_MEMORY_OPERATION, + OpenTelemetryLangChainCallbackHandler, +) +from opentelemetry.util.genai.types import ContentCapturingMode + + +def _build_handler(): + telemetry_handler = mock.Mock() + + def _start(invocation): + span = mock.Mock() + span.is_recording.return_value = True + invocation.span = span + return invocation + + telemetry_handler.start_llm.side_effect = _start + telemetry_handler.stop_llm.side_effect = lambda invocation: invocation + telemetry_handler.fail_llm.side_effect = ( + lambda invocation, error: invocation + ) + return ( + OpenTelemetryLangChainCallbackHandler(telemetry_handler), + telemetry_handler, + ) + + +def test_retriever_defaults_to_retrieval_without_memory_metadata(monkeypatch): + """Retrievers without memory metadata should emit 'retrieval' operation.""" + handler, telemetry_handler = _build_handler() + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode", + lambda: False, + ) + + run_id = uuid4() + handler.on_retriever_start( + serialized={"name": "PineconeRetriever"}, + query="what is RAG?", + run_id=run_id, + metadata={"ls_provider": "pinecone"}, + ) + + invocation = handler._invocation_manager.get_invocation(run_id) + assert invocation is not None + assert invocation.operation_name == RETRIEVAL_OPERATION + telemetry_handler.start_llm.assert_called_once() + + +def test_retriever_uses_search_memory_with_memory_metadata(monkeypatch): + """Retrievers with memory_store_name in metadata should emit 'search_memory'.""" + handler, telemetry_handler = _build_handler() + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode", + lambda: True, + ) + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.get_content_capturing_mode", + lambda: ContentCapturingMode.SPAN_ONLY, + ) + + run_id = uuid4() + handler.on_retriever_start( + serialized={ + "name": "SessionMemoryRetriever", + "id": ["langchain", "retriever", "session"], + }, + query="user preferences", + run_id=run_id, + metadata={ + "ls_provider": "openai", + "memory_store_name": "SessionMemoryRetriever", + "memory_namespace": "user-123", + }, + ) + + invocation = handler._invocation_manager.get_invocation(run_id) + assert invocation is not None + assert invocation.operation_name == SEARCH_MEMORY_OPERATION + assert ( + invocation.attributes[GEN_AI_MEMORY_STORE_NAME] + == "SessionMemoryRetriever" + ) + assert ( + invocation.attributes[GEN_AI_MEMORY_STORE_ID] + == "langchain.retriever.session" + ) + assert invocation.attributes[GEN_AI_MEMORY_QUERY] == "user preferences" + telemetry_handler.start_llm.assert_called_once() + + +def test_on_retriever_end_sets_search_result_count(monkeypatch): + handler, telemetry_handler = _build_handler() + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode", + lambda: False, + ) + + run_id = uuid4() + handler.on_retriever_start( + serialized={"name": "MemoryRetriever"}, + query="q", + run_id=run_id, + metadata={"ls_provider": "openai"}, + ) + handler.on_retriever_end(documents=[object(), object()], run_id=run_id) + + telemetry_handler.stop_llm.assert_called_once() + stop_invocation = telemetry_handler.stop_llm.call_args.kwargs["invocation"] + assert stop_invocation.attributes[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 2 + + +def test_on_retriever_error_fails_invocation(monkeypatch): + handler, telemetry_handler = _build_handler() + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode", + lambda: False, + ) + + run_id = uuid4() + handler.on_retriever_start( + serialized={"name": "VectorRetriever"}, + query="q", + run_id=run_id, + metadata={"ls_provider": "openai"}, + ) + handler.on_retriever_error(RuntimeError("retrieval failed"), run_id=run_id) + + telemetry_handler.fail_llm.assert_called_once() + fail_invocation = telemetry_handler.fail_llm.call_args.kwargs["invocation"] + assert fail_invocation.operation_name == RETRIEVAL_OPERATION