diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md index 3db24e484e..911d0445fc 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md @@ -13,3 +13,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3889](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3889)) - Added log and metrics provider to langchain genai utils handler ([#4214](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4214)) +- Added retriever memory search span/event instrumentation aligned with the + GenAI memory semantic convention proposal. + ([#3250](https://github.com/open-telemetry/semantic-conventions/pull/3250)) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 8f642567ca..e7ca7356ea 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -14,7 +14,7 @@ from __future__ import annotations -from typing import Any, Optional +from typing import Any, Optional, Sequence from uuid import UUID from langchain_core.callbacks import BaseCallbackHandler @@ -24,14 +24,60 @@ from opentelemetry.instrumentation.langchain.invocation_manager import ( _InvocationManager, ) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( + ContentCapturingMode, Error, InputMessage, LLMInvocation, OutputMessage, Text, ) +from opentelemetry.util.genai.utils import ( + get_content_capturing_mode, + is_experimental_mode, +) + +GEN_AI_MEMORY_STORE_ID = getattr( + GenAI, "GEN_AI_MEMORY_STORE_ID", "gen_ai.memory.store.id" +) +GEN_AI_MEMORY_STORE_NAME = getattr( + GenAI, "GEN_AI_MEMORY_STORE_NAME", "gen_ai.memory.store.name" +) +GEN_AI_MEMORY_QUERY = getattr( + GenAI, "GEN_AI_MEMORY_QUERY", "gen_ai.memory.query" +) +GEN_AI_MEMORY_SEARCH_RESULT_COUNT = getattr( + GenAI, + "GEN_AI_MEMORY_SEARCH_RESULT_COUNT", + "gen_ai.memory.search.result.count", +) +GEN_AI_MEMORY_NAMESPACE = getattr( + GenAI, "GEN_AI_MEMORY_NAMESPACE", "gen_ai.memory.namespace" +) + +_SEARCH_MEMORY_MEMBER = getattr( + getattr(GenAI, "GenAiOperationNameValues", object()), + "SEARCH_MEMORY", + None, +) +SEARCH_MEMORY_OPERATION = ( + _SEARCH_MEMORY_MEMBER.value + if _SEARCH_MEMORY_MEMBER is not None + else "search_memory" +) + +_RETRIEVAL_MEMBER = getattr( + getattr(GenAI, "GenAiOperationNameValues", object()), + "RETRIEVAL", + None, +) +RETRIEVAL_OPERATION = ( + _RETRIEVAL_MEMBER.value if _RETRIEVAL_MEMBER is not None else "retrieval" +) class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler): @@ -44,6 +90,62 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None: self._telemetry_handler = telemetry_handler self._invocation_manager = _InvocationManager() + @staticmethod + def _resolve_retriever_store_name( + serialized: dict[str, Any], + metadata: Optional[dict[str, Any]], + ) -> Optional[str]: + if metadata and metadata.get("memory_store_name"): + return str(metadata["memory_store_name"]) + if metadata and metadata.get("ls_retriever_name"): + return str(metadata["ls_retriever_name"]) + name = serialized.get("name") + return str(name) if isinstance(name, str) and name else None + + @staticmethod + def _resolve_retriever_store_id( + serialized: dict[str, Any], + metadata: Optional[dict[str, Any]], + ) -> Optional[str]: + if metadata and metadata.get("memory_store_id"): + return str(metadata["memory_store_id"]) + + serialized_id = serialized.get("id") + if isinstance(serialized_id, str) and serialized_id: + return serialized_id + if isinstance(serialized_id, list) and serialized_id: + try: + return ".".join(str(part) for part in serialized_id) + except TypeError: + return None + return None + + @staticmethod + def _should_capture_memory_query() -> bool: + if not is_experimental_mode(): + return False + try: + mode = get_content_capturing_mode() + except ValueError: + return False + return mode in ( + ContentCapturingMode.SPAN_ONLY, + ContentCapturingMode.SPAN_AND_EVENT, + ) + + @staticmethod + def _is_memory_retriever( + metadata: Optional[dict[str, Any]], + ) -> bool: + """Detect if a retriever is a memory retriever based on metadata hints.""" + if not metadata: + return False + return bool( + metadata.get("memory_store_name") + or metadata.get("memory_store_id") + or metadata.get("is_memory_retriever") + ) + def on_chat_model_start( self, serialized: dict[str, Any], @@ -268,3 +370,100 @@ def on_llm_error( ) if llm_invocation.span and not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) + + def on_retriever_start( + self, + serialized: dict[str, Any], + query: str, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + provider = "unknown" + if metadata is not None: + provider = metadata.get("ls_provider", "unknown") + + attributes: dict[str, Any] = {} + is_memory = self._is_memory_retriever(metadata) + operation = ( + SEARCH_MEMORY_OPERATION if is_memory else RETRIEVAL_OPERATION + ) + + if store_name := self._resolve_retriever_store_name( + serialized, metadata + ): + attributes[GEN_AI_MEMORY_STORE_NAME] = store_name + if store_id := self._resolve_retriever_store_id(serialized, metadata): + attributes[GEN_AI_MEMORY_STORE_ID] = store_id + if ( + query + and self._should_capture_memory_query() + and isinstance(query, str) + ): + attributes[GEN_AI_MEMORY_QUERY] = query + if metadata and metadata.get("memory_namespace"): + attributes[GEN_AI_MEMORY_NAMESPACE] = metadata["memory_namespace"] + + llm_invocation = LLMInvocation( + request_model="", + provider=provider, + operation_name=operation, + attributes=attributes, + ) + llm_invocation = self._telemetry_handler.start_llm( + invocation=llm_invocation + ) + if llm_invocation.span and store_name: + llm_invocation.span.update_name(f"{operation} {store_name}") + self._invocation_manager.add_invocation_state( + run_id=run_id, + parent_run_id=parent_run_id, + invocation=llm_invocation, + ) + + def on_retriever_end( + self, + documents: Sequence[Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) + if llm_invocation is None or not isinstance( + llm_invocation, LLMInvocation + ): + return + + llm_invocation.attributes[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] = len( + documents + ) + llm_invocation = self._telemetry_handler.stop_llm( + invocation=llm_invocation + ) + if llm_invocation.span and not llm_invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id=run_id) + + def on_retriever_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> None: + llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) + if llm_invocation is None or not isinstance( + llm_invocation, LLMInvocation + ): + return + + error_otel = Error(message=str(error), type=type(error)) + llm_invocation = self._telemetry_handler.fail_llm( + invocation=llm_invocation, error=error_otel + ) + if llm_invocation.span and not llm_invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id=run_id) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_callback_handler_memory.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_callback_handler_memory.py new file mode 100644 index 0000000000..484c0819e2 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_callback_handler_memory.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +from unittest import mock +from uuid import uuid4 + +from opentelemetry.instrumentation.langchain.callback_handler import ( + GEN_AI_MEMORY_QUERY, + GEN_AI_MEMORY_SEARCH_RESULT_COUNT, + GEN_AI_MEMORY_STORE_ID, + GEN_AI_MEMORY_STORE_NAME, + RETRIEVAL_OPERATION, + SEARCH_MEMORY_OPERATION, + OpenTelemetryLangChainCallbackHandler, +) +from opentelemetry.util.genai.types import ContentCapturingMode + + +def _build_handler(): + telemetry_handler = mock.Mock() + + def _start(invocation): + span = mock.Mock() + span.is_recording.return_value = True + invocation.span = span + return invocation + + telemetry_handler.start_llm.side_effect = _start + telemetry_handler.stop_llm.side_effect = lambda invocation: invocation + telemetry_handler.fail_llm.side_effect = ( + lambda invocation, error: invocation + ) + return ( + OpenTelemetryLangChainCallbackHandler(telemetry_handler), + telemetry_handler, + ) + + +def test_retriever_defaults_to_retrieval_without_memory_metadata(monkeypatch): + """Retrievers without memory metadata should emit 'retrieval' operation.""" + handler, telemetry_handler = _build_handler() + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode", + lambda: False, + ) + + run_id = uuid4() + handler.on_retriever_start( + serialized={"name": "PineconeRetriever"}, + query="what is RAG?", + run_id=run_id, + metadata={"ls_provider": "pinecone"}, + ) + + invocation = handler._invocation_manager.get_invocation(run_id) + assert invocation is not None + assert invocation.operation_name == RETRIEVAL_OPERATION + telemetry_handler.start_llm.assert_called_once() + + +def test_retriever_uses_search_memory_with_memory_metadata(monkeypatch): + """Retrievers with memory_store_name in metadata should emit 'search_memory'.""" + handler, telemetry_handler = _build_handler() + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode", + lambda: True, + ) + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.get_content_capturing_mode", + lambda: ContentCapturingMode.SPAN_ONLY, + ) + + run_id = uuid4() + handler.on_retriever_start( + serialized={ + "name": "SessionMemoryRetriever", + "id": ["langchain", "retriever", "session"], + }, + query="user preferences", + run_id=run_id, + metadata={ + "ls_provider": "openai", + "memory_store_name": "SessionMemoryRetriever", + "memory_namespace": "user-123", + }, + ) + + invocation = handler._invocation_manager.get_invocation(run_id) + assert invocation is not None + assert invocation.operation_name == SEARCH_MEMORY_OPERATION + assert ( + invocation.attributes[GEN_AI_MEMORY_STORE_NAME] + == "SessionMemoryRetriever" + ) + assert ( + invocation.attributes[GEN_AI_MEMORY_STORE_ID] + == "langchain.retriever.session" + ) + assert invocation.attributes[GEN_AI_MEMORY_QUERY] == "user preferences" + telemetry_handler.start_llm.assert_called_once() + + +def test_on_retriever_end_sets_search_result_count(monkeypatch): + handler, telemetry_handler = _build_handler() + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode", + lambda: False, + ) + + run_id = uuid4() + handler.on_retriever_start( + serialized={"name": "MemoryRetriever"}, + query="q", + run_id=run_id, + metadata={"ls_provider": "openai"}, + ) + handler.on_retriever_end(documents=[object(), object()], run_id=run_id) + + telemetry_handler.stop_llm.assert_called_once() + stop_invocation = telemetry_handler.stop_llm.call_args.kwargs["invocation"] + assert stop_invocation.attributes[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 2 + + +def test_on_retriever_error_fails_invocation(monkeypatch): + handler, telemetry_handler = _build_handler() + monkeypatch.setattr( + "opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode", + lambda: False, + ) + + run_id = uuid4() + handler.on_retriever_start( + serialized={"name": "VectorRetriever"}, + query="q", + run_id=run_id, + metadata={"ls_provider": "openai"}, + ) + handler.on_retriever_error(RuntimeError("retrieval failed"), run_id=run_id) + + telemetry_handler.fail_llm.assert_called_once() + fail_invocation = telemetry_handler.fail_llm.call_args.kwargs["invocation"] + assert fail_invocation.operation_name == RETRIEVAL_OPERATION diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-mem0/CHANGELOG.md new file mode 100644 index 0000000000..b22e5f411d --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/CHANGELOG.md @@ -0,0 +1,12 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## Unreleased + +- Initial release: Mem0 memory operation instrumentation aligned with GenAI + memory semantic conventions. + ([#3250](https://github.com/open-telemetry/semantic-conventions/pull/3250)) diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/LICENSE b/instrumentation-genai/opentelemetry-instrumentation-mem0/LICENSE new file mode 100644 index 0000000000..eba7b73574 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/LICENSE @@ -0,0 +1,7 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + See https://www.apache.org/licenses/LICENSE-2.0 for the full license text. diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/README.rst b/instrumentation-genai/opentelemetry-instrumentation-mem0/README.rst new file mode 100644 index 0000000000..6a605b14e7 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/README.rst @@ -0,0 +1,39 @@ +OpenTelemetry Mem0 Instrumentation +=================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-mem0.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-mem0/ + +This library allows tracing Mem0 memory operations (add, search, update, +delete) using OpenTelemetry, emitting spans with GenAI memory semantic +convention attributes. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-mem0 + +Usage +----- + +.. code-block:: python + + from opentelemetry.instrumentation.mem0 import Mem0Instrumentor + from mem0 import Memory + + Mem0Instrumentor().instrument() + + m = Memory() + m.add("I prefer dark mode", user_id="alice") + results = m.search("preferences", user_id="alice") + +References +---------- + +* `OpenTelemetry Project `_ +* `Mem0 `_ +* `GenAI Memory Semantic Conventions `_ diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-mem0/pyproject.toml new file mode 100644 index 0000000000..e01ddef095 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/pyproject.toml @@ -0,0 +1,61 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "opentelemetry-instrumentation-mem0" +dynamic = ["version"] +description = "OpenTelemetry Mem0 instrumentation" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", +] +dependencies = [ + "opentelemetry-api ~= 1.37", + "opentelemetry-instrumentation ~= 0.58b0", + "opentelemetry-semantic-conventions ~= 0.58b0", + "opentelemetry-util-genai ~= 0.58b0", +] + +[project.optional-dependencies] +instruments = [ + "mem0ai >= 0.1.0", +] + +[project.entry-points.opentelemetry_instrumentor] +mem0 = "opentelemetry.instrumentation.mem0:Mem0Instrumentor" + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation-genai/opentelemetry-instrumentation-mem0" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/mem0/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", + "/examples", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/__init__.py new file mode 100644 index 0000000000..145fac804a --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/__init__.py @@ -0,0 +1,124 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenTelemetry Mem0 Instrumentation +=================================== + +Instrumentation for the Mem0 Python SDK memory operations, aligned with +the `GenAI memory semantic conventions +`_. + +Usage +----- + +.. code-block:: python + + from opentelemetry.instrumentation.mem0 import Mem0Instrumentor + from mem0 import Memory + + Mem0Instrumentor().instrument() + + m = Memory() + m.add("I prefer dark mode", user_id="alice") + results = m.search("preferences", user_id="alice") + +Configuration +------------- + +Memory content capture can be enabled by setting the environment variable: +``OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true`` + +API +--- +""" + +from typing import Any, Collection + +from wrapt import wrap_function_wrapper + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.mem0.package import _instruments +from opentelemetry.instrumentation.mem0.patch import ( + wrap_memory_add, + wrap_memory_delete, + wrap_memory_delete_all, + wrap_memory_get_all, + wrap_memory_search, + wrap_memory_update, +) +from opentelemetry.instrumentation.utils import unwrap + + +class Mem0Instrumentor(BaseInstrumentor): + """An instrumentor for the Mem0 Python SDK. + + Traces Mem0 memory operations (add, search, update, delete) and emits + spans with GenAI memory semantic convention attributes plus a + ``gen_ai.client.operation.duration`` histogram metric. + """ + + def __init__(self) -> None: + super().__init__() + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs: Any) -> None: + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + + wrap_function_wrapper( + module="mem0.memory.main", + name="Memory.add", + wrapper=wrap_memory_add(tracer_provider, meter_provider), + ) + wrap_function_wrapper( + module="mem0.memory.main", + name="Memory.search", + wrapper=wrap_memory_search(tracer_provider, meter_provider), + ) + wrap_function_wrapper( + module="mem0.memory.main", + name="Memory.update", + wrapper=wrap_memory_update(tracer_provider, meter_provider), + ) + wrap_function_wrapper( + module="mem0.memory.main", + name="Memory.delete", + wrapper=wrap_memory_delete(tracer_provider, meter_provider), + ) + wrap_function_wrapper( + module="mem0.memory.main", + name="Memory.delete_all", + wrapper=wrap_memory_delete_all(tracer_provider, meter_provider), + ) + wrap_function_wrapper( + module="mem0.memory.main", + name="Memory.get_all", + wrapper=wrap_memory_get_all(tracer_provider, meter_provider), + ) + + def _uninstrument(self, **kwargs: Any) -> None: + import mem0.memory.main # noqa: PLC0415 + + for method in ( + "add", + "search", + "update", + "delete", + "delete_all", + "get_all", + ): + unwrap(mem0.memory.main.Memory, method) diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/package.py b/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/package.py new file mode 100644 index 0000000000..2d4c16baf3 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/package.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +_instruments = ("mem0ai >= 0.1.0",) diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/patch.py b/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/patch.py new file mode 100644 index 0000000000..1e52404691 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/patch.py @@ -0,0 +1,447 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Wrapping functions for Mem0 Memory class methods. + +Each wrapper emits a CLIENT span with GenAI memory semantic convention +attributes, an ``error.type`` attribute on failure, and records a +``gen_ai.client.operation.duration`` histogram metric. +""" + +from __future__ import annotations + +import os +import timeit +from typing import Any, Callable, Optional + +from opentelemetry import trace +from opentelemetry.metrics import MeterProvider, get_meter +from opentelemetry.semconv._incubating.attributes import ( + error_attributes as ErrorAttributes, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.trace import SpanKind, StatusCode +from opentelemetry.util.genai.instruments import create_duration_histogram + +_INSTRUMENTATION_NAME = "opentelemetry.instrumentation.mem0" + +# --------------------------------------------------------------------------- +# Attribute constants — resolved at import time so they stay in sync with +# whichever semconv version is installed. +# --------------------------------------------------------------------------- + + +def _attr(name: str, fallback: str) -> str: + return getattr(GenAIAttributes, name, fallback) + + +GEN_AI_OPERATION_NAME = _attr("GEN_AI_OPERATION_NAME", "gen_ai.operation.name") +GEN_AI_SYSTEM = _attr("GEN_AI_SYSTEM", "gen_ai.system") +GEN_AI_PROVIDER_NAME = _attr("GEN_AI_PROVIDER_NAME", "gen_ai.provider.name") +GEN_AI_MEMORY_STORE_ID = _attr( + "GEN_AI_MEMORY_STORE_ID", "gen_ai.memory.store.id" +) +GEN_AI_MEMORY_STORE_NAME = _attr( + "GEN_AI_MEMORY_STORE_NAME", "gen_ai.memory.store.name" +) +GEN_AI_MEMORY_ID = _attr("GEN_AI_MEMORY_ID", "gen_ai.memory.id") +GEN_AI_MEMORY_TYPE = _attr("GEN_AI_MEMORY_TYPE", "gen_ai.memory.type") +GEN_AI_MEMORY_SCOPE = _attr("GEN_AI_MEMORY_SCOPE", "gen_ai.memory.scope") +GEN_AI_MEMORY_QUERY = _attr("GEN_AI_MEMORY_QUERY", "gen_ai.memory.query") +GEN_AI_MEMORY_CONTENT = _attr("GEN_AI_MEMORY_CONTENT", "gen_ai.memory.content") +GEN_AI_MEMORY_NAMESPACE = _attr( + "GEN_AI_MEMORY_NAMESPACE", "gen_ai.memory.namespace" +) +GEN_AI_MEMORY_SEARCH_RESULT_COUNT = _attr( + "GEN_AI_MEMORY_SEARCH_RESULT_COUNT", "gen_ai.memory.search.result.count" +) +GEN_AI_MEMORY_UPDATE_STRATEGY = _attr( + "GEN_AI_MEMORY_UPDATE_STRATEGY", "gen_ai.memory.update.strategy" +) +ERROR_TYPE = getattr(ErrorAttributes, "ERROR_TYPE", "error.type") + +_PROVIDER = "mem0" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _capture_content() -> bool: + return os.environ.get( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "" + ).lower() in ("true", "1") + + +def _scope_from_kwargs(kwargs: dict[str, Any]) -> Optional[str]: + """Infer memory scope from Mem0 filter kwargs.""" + if kwargs.get("user_id"): + return "user" + if kwargs.get("agent_id"): + return "agent" + if kwargs.get("run_id"): + return "session" + return None + + +def _namespace_from_kwargs(kwargs: dict[str, Any]) -> Optional[str]: + """Build namespace string from Mem0 identity kwargs.""" + user_id = kwargs.get("user_id") + agent_id = kwargs.get("agent_id") + if user_id: + return f"user:{user_id}" + if agent_id: + return f"agent:{agent_id}" + return None + + +def _set_common_attributes( + span: trace.Span, + operation: str, + kwargs: dict[str, Any], +) -> None: + """Set attributes shared by all memory operations.""" + span.set_attribute(GEN_AI_OPERATION_NAME, operation) + span.set_attribute(GEN_AI_SYSTEM, _PROVIDER) + span.set_attribute(GEN_AI_PROVIDER_NAME, _PROVIDER) + + scope = _scope_from_kwargs(kwargs) + if scope: + span.set_attribute(GEN_AI_MEMORY_SCOPE, scope) + + namespace = _namespace_from_kwargs(kwargs) + if namespace: + span.set_attribute(GEN_AI_MEMORY_NAMESPACE, namespace) + + +def _set_error(span: trace.Span, exc: BaseException) -> str: + """Record error details on the span and return the error type string.""" + error_type = type(exc).__qualname__ + span.set_status(StatusCode.ERROR, str(exc)) + span.set_attribute(ERROR_TYPE, error_type) + span.record_exception(exc) + return error_type + + +def _record_duration( + duration_histogram, + duration_s: float, + operation: str, + error_type: Optional[str] = None, +) -> None: + """Record the operation duration metric.""" + if duration_histogram is None: + return + attrs: dict[str, Any] = { + GEN_AI_OPERATION_NAME: operation, + GEN_AI_SYSTEM: _PROVIDER, + } + if error_type: + attrs[ERROR_TYPE] = error_type + duration_histogram.record(max(duration_s, 0.0), attributes=attrs) + + +def _result_count(result: Any) -> Optional[int]: + """Extract result count from a Mem0 response (dict or list).""" + if isinstance(result, dict) and "results" in result: + return len(result["results"]) + if isinstance(result, list): + return len(result) + return None + + +def _first_memory_id(result: Any) -> Optional[str]: + """Extract the first memory id from an add/update result.""" + if isinstance(result, dict) and result.get("results"): + for item in result["results"]: + if isinstance(item, dict) and item.get("id"): + return str(item["id"]) + return None + + +# --------------------------------------------------------------------------- +# Wrapper factories +# --------------------------------------------------------------------------- + + +def wrap_memory_add( + tracer_provider: Optional[trace.TracerProvider] = None, + meter_provider: Optional[MeterProvider] = None, +) -> Callable: + tracer = trace.get_tracer( + _INSTRUMENTATION_NAME, tracer_provider=tracer_provider + ) + meter = get_meter(_INSTRUMENTATION_NAME, meter_provider=meter_provider) + duration_histogram = create_duration_histogram(meter) + + def wrapper( + wrapped: Callable, instance: Any, args: Any, kwargs: Any + ) -> Any: + span_name = f"update_memory {_PROVIDER}" + error_type = None + start = timeit.default_timer() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT + ) as span: + _set_common_attributes(span, "update_memory", kwargs) + # Mem0 add() is an upsert + span.set_attribute(GEN_AI_MEMORY_UPDATE_STRATEGY, "merge") + + if _capture_content() and args: + messages = args[0] if args else kwargs.get("messages") + if messages and isinstance(messages, str): + span.set_attribute(GEN_AI_MEMORY_CONTENT, messages) + + try: + result = wrapped(*args, **kwargs) + except Exception as exc: + error_type = _set_error(span, exc) + raise + finally: + _record_duration( + duration_histogram, + timeit.default_timer() - start, + "update_memory", + error_type, + ) + + mem_id = _first_memory_id(result) + if mem_id: + span.set_attribute(GEN_AI_MEMORY_ID, mem_id) + + return result + + return wrapper + + +def wrap_memory_search( + tracer_provider: Optional[trace.TracerProvider] = None, + meter_provider: Optional[MeterProvider] = None, +) -> Callable: + tracer = trace.get_tracer( + _INSTRUMENTATION_NAME, tracer_provider=tracer_provider + ) + meter = get_meter(_INSTRUMENTATION_NAME, meter_provider=meter_provider) + duration_histogram = create_duration_histogram(meter) + + def wrapper( + wrapped: Callable, instance: Any, args: Any, kwargs: Any + ) -> Any: + span_name = f"search_memory {_PROVIDER}" + error_type = None + start = timeit.default_timer() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT + ) as span: + _set_common_attributes(span, "search_memory", kwargs) + + query = args[0] if args else kwargs.get("query") + if _capture_content() and query and isinstance(query, str): + span.set_attribute(GEN_AI_MEMORY_QUERY, query) + + try: + result = wrapped(*args, **kwargs) + except Exception as exc: + error_type = _set_error(span, exc) + raise + finally: + _record_duration( + duration_histogram, + timeit.default_timer() - start, + "search_memory", + error_type, + ) + + count = _result_count(result) + if count is not None: + span.set_attribute(GEN_AI_MEMORY_SEARCH_RESULT_COUNT, count) + + return result + + return wrapper + + +def wrap_memory_update( + tracer_provider: Optional[trace.TracerProvider] = None, + meter_provider: Optional[MeterProvider] = None, +) -> Callable: + tracer = trace.get_tracer( + _INSTRUMENTATION_NAME, tracer_provider=tracer_provider + ) + meter = get_meter(_INSTRUMENTATION_NAME, meter_provider=meter_provider) + duration_histogram = create_duration_histogram(meter) + + def wrapper( + wrapped: Callable, instance: Any, args: Any, kwargs: Any + ) -> Any: + memory_id = args[0] if args else kwargs.get("memory_id") + span_name = f"update_memory {_PROVIDER}" + error_type = None + start = timeit.default_timer() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT + ) as span: + _set_common_attributes(span, "update_memory", kwargs) + span.set_attribute(GEN_AI_MEMORY_UPDATE_STRATEGY, "overwrite") + if memory_id: + span.set_attribute(GEN_AI_MEMORY_ID, str(memory_id)) + + if _capture_content(): + data = kwargs.get("data") + if data and isinstance(data, str): + span.set_attribute(GEN_AI_MEMORY_CONTENT, data) + + try: + result = wrapped(*args, **kwargs) + except Exception as exc: + error_type = _set_error(span, exc) + raise + finally: + _record_duration( + duration_histogram, + timeit.default_timer() - start, + "update_memory", + error_type, + ) + + return result + + return wrapper + + +def wrap_memory_delete( + tracer_provider: Optional[trace.TracerProvider] = None, + meter_provider: Optional[MeterProvider] = None, +) -> Callable: + tracer = trace.get_tracer( + _INSTRUMENTATION_NAME, tracer_provider=tracer_provider + ) + meter = get_meter(_INSTRUMENTATION_NAME, meter_provider=meter_provider) + duration_histogram = create_duration_histogram(meter) + + def wrapper( + wrapped: Callable, instance: Any, args: Any, kwargs: Any + ) -> Any: + memory_id = args[0] if args else kwargs.get("memory_id") + span_name = f"delete_memory {_PROVIDER}" + error_type = None + start = timeit.default_timer() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT + ) as span: + _set_common_attributes(span, "delete_memory", kwargs) + if memory_id: + span.set_attribute(GEN_AI_MEMORY_ID, str(memory_id)) + + try: + result = wrapped(*args, **kwargs) + except Exception as exc: + error_type = _set_error(span, exc) + raise + finally: + _record_duration( + duration_histogram, + timeit.default_timer() - start, + "delete_memory", + error_type, + ) + + return result + + return wrapper + + +def wrap_memory_delete_all( + tracer_provider: Optional[trace.TracerProvider] = None, + meter_provider: Optional[MeterProvider] = None, +) -> Callable: + tracer = trace.get_tracer( + _INSTRUMENTATION_NAME, tracer_provider=tracer_provider + ) + meter = get_meter(_INSTRUMENTATION_NAME, meter_provider=meter_provider) + duration_histogram = create_duration_histogram(meter) + + def wrapper( + wrapped: Callable, instance: Any, args: Any, kwargs: Any + ) -> Any: + span_name = f"delete_memory {_PROVIDER}" + error_type = None + start = timeit.default_timer() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT + ) as span: + _set_common_attributes(span, "delete_memory", kwargs) + + try: + result = wrapped(*args, **kwargs) + except Exception as exc: + error_type = _set_error(span, exc) + raise + finally: + _record_duration( + duration_histogram, + timeit.default_timer() - start, + "delete_memory", + error_type, + ) + + return result + + return wrapper + + +def wrap_memory_get_all( + tracer_provider: Optional[trace.TracerProvider] = None, + meter_provider: Optional[MeterProvider] = None, +) -> Callable: + tracer = trace.get_tracer( + _INSTRUMENTATION_NAME, tracer_provider=tracer_provider + ) + meter = get_meter(_INSTRUMENTATION_NAME, meter_provider=meter_provider) + duration_histogram = create_duration_histogram(meter) + + def wrapper( + wrapped: Callable, instance: Any, args: Any, kwargs: Any + ) -> Any: + span_name = f"search_memory {_PROVIDER}" + error_type = None + start = timeit.default_timer() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT + ) as span: + _set_common_attributes(span, "search_memory", kwargs) + + try: + result = wrapped(*args, **kwargs) + except Exception as exc: + error_type = _set_error(span, exc) + raise + finally: + _record_duration( + duration_histogram, + timeit.default_timer() - start, + "search_memory", + error_type, + ) + + count = _result_count(result) + if count is not None: + span.set_attribute(GEN_AI_MEMORY_SEARCH_RESULT_COUNT, count) + + return result + + return wrapper diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/version.py b/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/version.py new file mode 100644 index 0000000000..e7bf4a48eb --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/src/opentelemetry/instrumentation/mem0/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.1b0.dev" diff --git a/instrumentation-genai/opentelemetry-instrumentation-mem0/tests/test_mem0_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-mem0/tests/test_mem0_instrumentation.py new file mode 100644 index 0000000000..2404cfbabb --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-mem0/tests/test_mem0_instrumentation.py @@ -0,0 +1,312 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Mem0 memory operation instrumentation.""" + +from __future__ import annotations + +from unittest import mock + +import pytest + +from opentelemetry.instrumentation.mem0.patch import ( + ERROR_TYPE, + GEN_AI_MEMORY_CONTENT, + GEN_AI_MEMORY_ID, + GEN_AI_MEMORY_NAMESPACE, + GEN_AI_MEMORY_QUERY, + GEN_AI_MEMORY_SCOPE, + GEN_AI_MEMORY_SEARCH_RESULT_COUNT, + GEN_AI_MEMORY_UPDATE_STRATEGY, + GEN_AI_OPERATION_NAME, + GEN_AI_PROVIDER_NAME, + GEN_AI_SYSTEM, + wrap_memory_add, + wrap_memory_delete, + wrap_memory_delete_all, + wrap_memory_get_all, + wrap_memory_search, + wrap_memory_update, +) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, + SpanExporter, + SpanExportResult, +) +from opentelemetry.trace import SpanKind, StatusCode + + +class _InMemoryExporter(SpanExporter): + def __init__(self): + self.spans = [] + + def export(self, spans): + self.spans.extend(spans) + return SpanExportResult.SUCCESS + + def shutdown(self): + pass + + def get_finished_spans(self): + return list(self.spans) + + +@pytest.fixture() +def tracer_provider(): + provider = TracerProvider() + return provider + + +@pytest.fixture() +def exporter(tracer_provider): + exp = _InMemoryExporter() + tracer_provider.add_span_processor(SimpleSpanProcessor(exp)) + return exp + + +@pytest.fixture() +def metric_reader(): + return InMemoryMetricReader() + + +@pytest.fixture() +def meter_provider(metric_reader): + return MeterProvider(metric_readers=[metric_reader]) + + +def _get_attrs(exporter): + spans = exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + return span, {k: v for k, v in span.attributes.items()} + + +def _get_duration_metric(metric_reader): + """Return the recorded duration data points.""" + metrics = metric_reader.get_metrics_data() + for resource_metric in metrics.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + if metric.name == "gen_ai.client.operation.duration": + return metric.data.data_points + return [] + + +class TestMemoryAdd: + def test_basic_add( + self, tracer_provider, exporter, meter_provider, metric_reader + ): + wrapped = mock.Mock(return_value={"results": [{"id": "mem-1"}]}) + wrapper = wrap_memory_add(tracer_provider, meter_provider) + result = wrapper( + wrapped, None, ("I like dark mode",), {"user_id": "alice"} + ) + + assert result == {"results": [{"id": "mem-1"}]} + wrapped.assert_called_once() + + span, attrs = _get_attrs(exporter) + assert span.name == "update_memory mem0" + assert span.kind == SpanKind.CLIENT + assert attrs[GEN_AI_OPERATION_NAME] == "update_memory" + assert attrs[GEN_AI_SYSTEM] == "mem0" + assert attrs[GEN_AI_PROVIDER_NAME] == "mem0" + assert attrs[GEN_AI_MEMORY_SCOPE] == "user" + assert attrs[GEN_AI_MEMORY_NAMESPACE] == "user:alice" + assert attrs[GEN_AI_MEMORY_ID] == "mem-1" + assert attrs[GEN_AI_MEMORY_UPDATE_STRATEGY] == "merge" + + # Duration metric recorded + points = _get_duration_metric(metric_reader) + assert len(points) >= 1 + + def test_add_captures_content_when_enabled( + self, tracer_provider, exporter, meter_provider, monkeypatch + ): + monkeypatch.setenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "true" + ) + wrapped = mock.Mock(return_value={"results": []}) + wrapper = wrap_memory_add(tracer_provider, meter_provider) + wrapper(wrapped, None, ("I like dark mode",), {"user_id": "bob"}) + + span, attrs = _get_attrs(exporter) + assert attrs[GEN_AI_MEMORY_CONTENT] == "I like dark mode" + + def test_add_does_not_capture_content_by_default( + self, tracer_provider, exporter, meter_provider, monkeypatch + ): + monkeypatch.delenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", raising=False + ) + wrapped = mock.Mock(return_value={"results": []}) + wrapper = wrap_memory_add(tracer_provider, meter_provider) + wrapper(wrapped, None, ("secret data",), {"user_id": "bob"}) + + span, attrs = _get_attrs(exporter) + assert GEN_AI_MEMORY_CONTENT not in attrs + + def test_add_error_records_exception_and_type( + self, tracer_provider, exporter, meter_provider, metric_reader + ): + wrapped = mock.Mock(side_effect=RuntimeError("db down")) + wrapper = wrap_memory_add(tracer_provider, meter_provider) + + with pytest.raises(RuntimeError, match="db down"): + wrapper(wrapped, None, ("data",), {"user_id": "alice"}) + + span, attrs = _get_attrs(exporter) + assert span.status.status_code == StatusCode.ERROR + assert attrs[ERROR_TYPE] == "RuntimeError" + + # Duration metric still recorded on error + points = _get_duration_metric(metric_reader) + assert len(points) >= 1 + + +class TestMemorySearch: + def test_basic_search( + self, tracer_provider, exporter, meter_provider, metric_reader + ): + wrapped = mock.Mock( + return_value={"results": [{"id": "r1"}, {"id": "r2"}]} + ) + wrapper = wrap_memory_search(tracer_provider, meter_provider) + result = wrapper(wrapped, None, ("preferences",), {"user_id": "alice"}) + + assert result == {"results": [{"id": "r1"}, {"id": "r2"}]} + + span, attrs = _get_attrs(exporter) + assert span.name == "search_memory mem0" + assert attrs[GEN_AI_OPERATION_NAME] == "search_memory" + assert attrs[GEN_AI_PROVIDER_NAME] == "mem0" + assert attrs[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 2 + assert attrs[GEN_AI_MEMORY_SCOPE] == "user" + + points = _get_duration_metric(metric_reader) + assert len(points) >= 1 + + def test_search_captures_query_when_enabled( + self, tracer_provider, exporter, meter_provider, monkeypatch + ): + monkeypatch.setenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "true" + ) + wrapped = mock.Mock(return_value=[]) + wrapper = wrap_memory_search(tracer_provider, meter_provider) + wrapper(wrapped, None, ("my query",), {"agent_id": "bot-1"}) + + span, attrs = _get_attrs(exporter) + assert attrs[GEN_AI_MEMORY_QUERY] == "my query" + assert attrs[GEN_AI_MEMORY_SCOPE] == "agent" + + def test_search_list_result( + self, tracer_provider, exporter, meter_provider + ): + wrapped = mock.Mock( + return_value=[{"id": "a"}, {"id": "b"}, {"id": "c"}] + ) + wrapper = wrap_memory_search(tracer_provider, meter_provider) + wrapper(wrapped, None, ("q",), {}) + + span, attrs = _get_attrs(exporter) + assert attrs[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 3 + + +class TestMemoryUpdate: + def test_basic_update( + self, tracer_provider, exporter, meter_provider, metric_reader + ): + wrapped = mock.Mock(return_value={"status": "ok"}) + wrapper = wrap_memory_update(tracer_provider, meter_provider) + result = wrapper(wrapped, None, ("mem-42",), {"data": "new content"}) + + assert result == {"status": "ok"} + + span, attrs = _get_attrs(exporter) + assert span.name == "update_memory mem0" + assert attrs[GEN_AI_OPERATION_NAME] == "update_memory" + assert attrs[GEN_AI_MEMORY_ID] == "mem-42" + assert attrs[GEN_AI_MEMORY_UPDATE_STRATEGY] == "overwrite" + assert attrs[GEN_AI_PROVIDER_NAME] == "mem0" + + points = _get_duration_metric(metric_reader) + assert len(points) >= 1 + + def test_update_captures_content_when_enabled( + self, tracer_provider, exporter, meter_provider, monkeypatch + ): + monkeypatch.setenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "true" + ) + wrapped = mock.Mock(return_value={"status": "ok"}) + wrapper = wrap_memory_update(tracer_provider, meter_provider) + wrapper(wrapped, None, ("mem-42",), {"data": "updated content"}) + + span, attrs = _get_attrs(exporter) + assert attrs[GEN_AI_MEMORY_CONTENT] == "updated content" + + +class TestMemoryDelete: + def test_basic_delete( + self, tracer_provider, exporter, meter_provider, metric_reader + ): + wrapped = mock.Mock(return_value=None) + wrapper = wrap_memory_delete(tracer_provider, meter_provider) + wrapper(wrapped, None, ("mem-99",), {}) + + span, attrs = _get_attrs(exporter) + assert span.name == "delete_memory mem0" + assert attrs[GEN_AI_OPERATION_NAME] == "delete_memory" + assert attrs[GEN_AI_MEMORY_ID] == "mem-99" + assert attrs[GEN_AI_PROVIDER_NAME] == "mem0" + + points = _get_duration_metric(metric_reader) + assert len(points) >= 1 + + +class TestMemoryDeleteAll: + def test_delete_all_with_user( + self, tracer_provider, exporter, meter_provider + ): + wrapped = mock.Mock(return_value=None) + wrapper = wrap_memory_delete_all(tracer_provider, meter_provider) + wrapper(wrapped, None, (), {"user_id": "alice"}) + + span, attrs = _get_attrs(exporter) + assert span.name == "delete_memory mem0" + assert attrs[GEN_AI_OPERATION_NAME] == "delete_memory" + assert attrs[GEN_AI_MEMORY_SCOPE] == "user" + assert attrs[GEN_AI_PROVIDER_NAME] == "mem0" + + +class TestMemoryGetAll: + def test_get_all( + self, tracer_provider, exporter, meter_provider, metric_reader + ): + wrapped = mock.Mock(return_value={"results": [{"id": "a"}]}) + wrapper = wrap_memory_get_all(tracer_provider, meter_provider) + wrapper(wrapped, None, (), {"user_id": "alice"}) + + span, attrs = _get_attrs(exporter) + assert span.name == "search_memory mem0" + assert attrs[GEN_AI_OPERATION_NAME] == "search_memory" + assert attrs[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 1 + assert attrs[GEN_AI_PROVIDER_NAME] == "mem0" + + points = _get_duration_metric(metric_reader) + assert len(points) >= 1 diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/CHANGELOG.md index 95f69d6ded..19d2f24dc0 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased - Document official package metadata and README for the OpenAI Agents instrumentation. ([#3859](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3859)) +- Add support for GenAI memory operation spans and attributes (`search_memory`, + `update_memory`, `delete_memory`, `create_memory_store`, + `delete_memory_store`). + ([#3250](https://github.com/open-telemetry/semantic-conventions/pull/3250)) ## Version 0.1.0 (2025-10-15) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py index d1dce8ec5e..54b1c15cfa 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py @@ -107,14 +107,27 @@ class GenAIProvider: _OPERATION_VALUES = _enum_values(GenAIAttributes.GenAiOperationNameValues) +def _operation_value(member: str, fallback: str) -> str: + return _OPERATION_VALUES.get(member, fallback) + + class GenAIOperationName: - CHAT = _OPERATION_VALUES["CHAT"] - GENERATE_CONTENT = _OPERATION_VALUES["GENERATE_CONTENT"] - TEXT_COMPLETION = _OPERATION_VALUES["TEXT_COMPLETION"] - EMBEDDINGS = _OPERATION_VALUES["EMBEDDINGS"] - CREATE_AGENT = _OPERATION_VALUES["CREATE_AGENT"] - INVOKE_AGENT = _OPERATION_VALUES["INVOKE_AGENT"] - EXECUTE_TOOL = _OPERATION_VALUES["EXECUTE_TOOL"] + CHAT = _operation_value("CHAT", "chat") + GENERATE_CONTENT = _operation_value("GENERATE_CONTENT", "generate_content") + TEXT_COMPLETION = _operation_value("TEXT_COMPLETION", "text_completion") + EMBEDDINGS = _operation_value("EMBEDDINGS", "embeddings") + CREATE_AGENT = _operation_value("CREATE_AGENT", "create_agent") + INVOKE_AGENT = _operation_value("INVOKE_AGENT", "invoke_agent") + EXECUTE_TOOL = _operation_value("EXECUTE_TOOL", "execute_tool") + SEARCH_MEMORY = _operation_value("SEARCH_MEMORY", "search_memory") + UPDATE_MEMORY = _operation_value("UPDATE_MEMORY", "update_memory") + DELETE_MEMORY = _operation_value("DELETE_MEMORY", "delete_memory") + CREATE_MEMORY_STORE = _operation_value( + "CREATE_MEMORY_STORE", "create_memory_store" + ) + DELETE_MEMORY_STORE = _operation_value( + "DELETE_MEMORY_STORE", "delete_memory_store" + ) # Operations below are not yet covered by the spec but remain for backwards compatibility TRANSCRIPTION = "transcription" SPEECH = "speech_generation" @@ -129,6 +142,14 @@ class GenAIOperationName: "agentspan": INVOKE_AGENT, } + MEMORY_OPERATIONS = { + SEARCH_MEMORY, + UPDATE_MEMORY, + DELETE_MEMORY, + CREATE_MEMORY_STORE, + DELETE_MEMORY_STORE, + } + _OUTPUT_VALUES = _enum_values(GenAIAttributes.GenAiOutputTypeValues) @@ -220,6 +241,36 @@ def _attr(name: str, fallback: str) -> str: "GEN_AI_OUTPUT_MESSAGES", "gen_ai.output.messages" ) GEN_AI_DATA_SOURCE_ID = _attr("GEN_AI_DATA_SOURCE_ID", "gen_ai.data_source.id") +GEN_AI_MEMORY_STORE_ID = _attr( + "GEN_AI_MEMORY_STORE_ID", "gen_ai.memory.store.id" +) +GEN_AI_MEMORY_STORE_NAME = _attr( + "GEN_AI_MEMORY_STORE_NAME", "gen_ai.memory.store.name" +) +GEN_AI_MEMORY_ID = _attr("GEN_AI_MEMORY_ID", "gen_ai.memory.id") +GEN_AI_MEMORY_TYPE = _attr("GEN_AI_MEMORY_TYPE", "gen_ai.memory.type") +GEN_AI_MEMORY_SCOPE = _attr("GEN_AI_MEMORY_SCOPE", "gen_ai.memory.scope") +GEN_AI_MEMORY_CONTENT = _attr("GEN_AI_MEMORY_CONTENT", "gen_ai.memory.content") +GEN_AI_MEMORY_QUERY = _attr("GEN_AI_MEMORY_QUERY", "gen_ai.memory.query") +GEN_AI_MEMORY_NAMESPACE = _attr( + "GEN_AI_MEMORY_NAMESPACE", "gen_ai.memory.namespace" +) +GEN_AI_MEMORY_SEARCH_RESULT_COUNT = _attr( + "GEN_AI_MEMORY_SEARCH_RESULT_COUNT", "gen_ai.memory.search.result.count" +) +GEN_AI_MEMORY_SEARCH_SIMILARITY_THRESHOLD = _attr( + "GEN_AI_MEMORY_SEARCH_SIMILARITY_THRESHOLD", + "gen_ai.memory.search.similarity.threshold", +) +GEN_AI_MEMORY_EXPIRATION_DATE = _attr( + "GEN_AI_MEMORY_EXPIRATION_DATE", "gen_ai.memory.expiration_date" +) +GEN_AI_MEMORY_IMPORTANCE = _attr( + "GEN_AI_MEMORY_IMPORTANCE", "gen_ai.memory.importance" +) +GEN_AI_MEMORY_UPDATE_STRATEGY = _attr( + "GEN_AI_MEMORY_UPDATE_STRATEGY", "gen_ai.memory.update.strategy" +) # The semantic conventions currently expose multiple usage token attributes; we retain the # completion/prompt aliases for backwards compatibility where used. @@ -395,6 +446,7 @@ def get_span_name( model: Optional[str] = None, agent_name: Optional[str] = None, tool_name: Optional[str] = None, + memory_store_name: Optional[str] = None, ) -> str: """Generate spec-compliant span name based on operation type.""" base_name = operation_name @@ -420,6 +472,13 @@ def get_span_name( if operation_name == GenAIOperationName.HANDOFF: return f"{base_name} {agent_name}" if agent_name else base_name + if operation_name in GenAIOperationName.MEMORY_OPERATIONS: + return ( + f"{base_name} {memory_store_name}" + if memory_store_name + else base_name + ) + return base_name @@ -1258,6 +1317,9 @@ def _sanitize_usage_payload(usage: Any) -> None: def _get_span_kind(self, span_data: Any) -> SpanKind: """Determine appropriate span kind based on span data type.""" + operation_name = self._extract_operation_name(span_data) + if operation_name in GenAIOperationName.MEMORY_OPERATIONS: + return SpanKind.CLIENT if _is_instance_of(span_data, FunctionSpanData): return SpanKind.INTERNAL # Tool execution is internal if _is_instance_of( @@ -1353,9 +1415,16 @@ def on_span_start(self, span: Span[Any]) -> None: if _is_instance_of(span.span_data, FunctionSpanData) else None ) + memory_store_name = self._get_memory_store_name(span.span_data) # Generate spec-compliant span name - span_name = get_span_name(operation_name, model, agent_name, tool_name) + span_name = get_span_name( + operation_name, + model, + agent_name, + tool_name, + memory_store_name, + ) attributes = { GEN_AI_PROVIDER_NAME: self.system_name, @@ -1436,21 +1505,30 @@ def on_span_end(self, span: Span[Any]) -> None: otel_span.set_attribute(key, value) attributes[key] = value - if _is_instance_of( - span.span_data, (GenerationSpanData, ResponseSpanData) - ): - operation_name = attributes.get(GEN_AI_OPERATION_NAME) + operation_name = attributes.get(GEN_AI_OPERATION_NAME) + if operation_name: model_for_name = attributes.get(GEN_AI_REQUEST_MODEL) or ( attributes.get(GEN_AI_RESPONSE_MODEL) ) - if operation_name and model_for_name: - agent_name_for_name = attributes.get(GEN_AI_AGENT_NAME) - tool_name_for_name = attributes.get(GEN_AI_TOOL_NAME) + agent_name_for_name = attributes.get(GEN_AI_AGENT_NAME) + tool_name_for_name = attributes.get(GEN_AI_TOOL_NAME) + memory_store_name_for_name = attributes.get( + GEN_AI_MEMORY_STORE_NAME + ) + should_update_name = False + if _is_instance_of( + span.span_data, (GenerationSpanData, ResponseSpanData) + ): + should_update_name = model_for_name is not None + elif operation_name in GenAIOperationName.MEMORY_OPERATIONS: + should_update_name = memory_store_name_for_name is not None + if should_update_name: new_name = get_span_name( operation_name, model_for_name, agent_name_for_name, tool_name_for_name, + memory_store_name_for_name, ) if new_name != otel_span.name: otel_span.update_name(new_name) @@ -1506,8 +1584,364 @@ def force_flush(self) -> None: """Force flush (no-op for this processor).""" pass + @staticmethod + def _normalize_operation_name(operation: Any) -> Optional[str]: + if not isinstance(operation, str): + return None + normalized = "_".join( + operation.strip().lower().replace("-", "_").split() + ) + operation_map = { + "chat": GenAIOperationName.CHAT, + "generate_content": GenAIOperationName.GENERATE_CONTENT, + "text_completion": GenAIOperationName.TEXT_COMPLETION, + "embeddings": GenAIOperationName.EMBEDDINGS, + "create": GenAIOperationName.CREATE_AGENT, + "create_agent": GenAIOperationName.CREATE_AGENT, + "invoke": GenAIOperationName.INVOKE_AGENT, + "invoke_agent": GenAIOperationName.INVOKE_AGENT, + "execute_tool": GenAIOperationName.EXECUTE_TOOL, + "search_memory": GenAIOperationName.SEARCH_MEMORY, + "memory_search": GenAIOperationName.SEARCH_MEMORY, + "update_memory": GenAIOperationName.UPDATE_MEMORY, + "upsert_memory": GenAIOperationName.UPDATE_MEMORY, + "delete_memory": GenAIOperationName.DELETE_MEMORY, + "create_memory_store": GenAIOperationName.CREATE_MEMORY_STORE, + "delete_memory_store": GenAIOperationName.DELETE_MEMORY_STORE, + "transcription": GenAIOperationName.TRANSCRIPTION, + "speech_generation": GenAIOperationName.SPEECH, + "guardrail_check": GenAIOperationName.GUARDRAIL, + "agent_handoff": GenAIOperationName.HANDOFF, + } + if normalized in operation_map: + return operation_map[normalized] + + for candidate, value in operation_map.items(): + if normalized.startswith(f"{candidate}_"): + return value + return None + + @staticmethod + def _span_data_payload(span_data: Any) -> dict[str, Any]: + payload: dict[str, Any] = {} + export = getattr(span_data, "export", None) + if not callable(export): + return payload + try: + exported = export() + except Exception: + return payload + if not isinstance(exported, dict): + return payload + + payload.update(exported) + nested_data = exported.get("data") + if isinstance(nested_data, dict): + payload.update(nested_data) + return payload + + @staticmethod + def _first_non_none(*values: Any) -> Any: + for value in values: + if value is not None: + return value + return None + + @staticmethod + def _to_int(value: Any) -> Optional[int]: + if value is None: + return None + if isinstance(value, bool): + return int(value) + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + if isinstance(value, str): + try: + return int(value) + except ValueError: + return None + return None + + @staticmethod + def _to_float(value: Any) -> Optional[float]: + if value is None: + return None + if isinstance(value, (int, float)) and not isinstance(value, bool): + return float(value) + if isinstance(value, str): + try: + return float(value) + except ValueError: + return None + return None + + def _extract_operation_name(self, span_data: Any) -> Optional[str]: + payload = self._span_data_payload(span_data) + candidates = ( + getattr(span_data, "operation", None), + getattr(span_data, "operation_name", None), + payload.get("operation"), + payload.get("operation_name"), + payload.get(GEN_AI_OPERATION_NAME), + payload.get("gen_ai.operation.name"), + payload.get("name"), + getattr(span_data, "name", None), + payload.get("type"), + getattr(span_data, "type", None), + ) + for candidate in candidates: + normalized = self._normalize_operation_name(candidate) + if normalized: + return normalized + return None + + @staticmethod + def _memory_value( + span_data: Any, + payload: dict[str, Any], + *keys: str, + ) -> Any: + for key in keys: + value = getattr(span_data, key, None) + if value is not None: + return value + for key in keys: + value = payload.get(key) + if value is not None: + return value + return None + + def _get_memory_store_name(self, span_data: Any) -> Optional[str]: + payload = self._span_data_payload(span_data) + memory_store = payload.get("memory_store") + memory_store_name = self._first_non_none( + self._memory_value( + span_data, + payload, + "memory_store_name", + "store_name", + GEN_AI_MEMORY_STORE_NAME, + "gen_ai.memory.store.name", + ), + memory_store.get("name") + if isinstance(memory_store, dict) + else None, + ) + return ( + str(memory_store_name) if memory_store_name is not None else None + ) + + def _get_attributes_from_memory_span_data( + self, + span_data: Any, + operation_name: str, + ) -> Iterator[tuple[str, AttributeValue]]: + payload = self._span_data_payload(span_data) + memory_store = payload.get("memory_store") + memory_item = payload.get("memory") + + if not isinstance(memory_store, dict): + memory_store = {} + if not isinstance(memory_item, dict): + memory_item = {} + + yield GEN_AI_OPERATION_NAME, operation_name + + store_id = self._first_non_none( + self._memory_value( + span_data, + payload, + "memory_store_id", + "store_id", + GEN_AI_MEMORY_STORE_ID, + "gen_ai.memory.store.id", + ), + memory_store.get("id"), + ) + if store_id is not None: + yield GEN_AI_MEMORY_STORE_ID, str(store_id) + + store_name = self._first_non_none( + self._memory_value( + span_data, + payload, + "memory_store_name", + "store_name", + GEN_AI_MEMORY_STORE_NAME, + "gen_ai.memory.store.name", + ), + memory_store.get("name"), + ) + if store_name is not None: + yield GEN_AI_MEMORY_STORE_NAME, str(store_name) + + memory_id = self._first_non_none( + self._memory_value( + span_data, + payload, + "memory_id", + GEN_AI_MEMORY_ID, + "gen_ai.memory.id", + ), + memory_item.get("id"), + ) + if memory_id is not None: + yield GEN_AI_MEMORY_ID, str(memory_id) + + memory_type = self._first_non_none( + self._memory_value( + span_data, + payload, + "memory_type", + GEN_AI_MEMORY_TYPE, + "gen_ai.memory.type", + ), + memory_item.get("type"), + ) + if memory_type is not None: + yield GEN_AI_MEMORY_TYPE, str(memory_type) + + memory_scope = self._memory_value( + span_data, + payload, + "memory_scope", + GEN_AI_MEMORY_SCOPE, + "gen_ai.memory.scope", + ) + if memory_scope is not None: + yield GEN_AI_MEMORY_SCOPE, str(memory_scope) + + memory_namespace = self._memory_value( + span_data, + payload, + "memory_namespace", + GEN_AI_MEMORY_NAMESPACE, + "gen_ai.memory.namespace", + ) + if memory_namespace is not None: + yield GEN_AI_MEMORY_NAMESPACE, str(memory_namespace) + + query_value = self._memory_value( + span_data, + payload, + "memory_query", + "query", + GEN_AI_MEMORY_QUERY, + "gen_ai.memory.query", + ) + if query_value is not None and self.include_sensitive_data: + yield GEN_AI_MEMORY_QUERY, str(query_value) + + result_count = self._to_int( + self._memory_value( + span_data, + payload, + "result_count", + "results_count", + "memory_search_result_count", + GEN_AI_MEMORY_SEARCH_RESULT_COUNT, + "gen_ai.memory.search.result.count", + ) + ) + if result_count is not None: + yield GEN_AI_MEMORY_SEARCH_RESULT_COUNT, result_count + + similarity_threshold = self._to_float( + self._memory_value( + span_data, + payload, + "similarity_threshold", + "score_threshold", + "memory_similarity_threshold", + GEN_AI_MEMORY_SEARCH_SIMILARITY_THRESHOLD, + "gen_ai.memory.search.similarity.threshold", + ) + ) + if similarity_threshold is not None: + yield ( + GEN_AI_MEMORY_SEARCH_SIMILARITY_THRESHOLD, + similarity_threshold, + ) + + update_strategy = self._memory_value( + span_data, + payload, + "update_strategy", + "strategy", + GEN_AI_MEMORY_UPDATE_STRATEGY, + "gen_ai.memory.update.strategy", + ) + if update_strategy is not None: + yield GEN_AI_MEMORY_UPDATE_STRATEGY, str(update_strategy) + + memory_content = self._first_non_none( + self._memory_value( + span_data, + payload, + "memory_content", + GEN_AI_MEMORY_CONTENT, + "gen_ai.memory.content", + ), + memory_item.get("content"), + memory_item.get("value"), + ) + if memory_content is not None and self.include_sensitive_data: + if isinstance(memory_content, (dict, list)): + yield GEN_AI_MEMORY_CONTENT, safe_json_dumps(memory_content) + else: + yield GEN_AI_MEMORY_CONTENT, str(memory_content) + + expiration_date = self._memory_value( + span_data, + payload, + "expiration_date", + "expires_at", + GEN_AI_MEMORY_EXPIRATION_DATE, + "gen_ai.memory.expiration_date", + ) + if expiration_date is not None: + yield GEN_AI_MEMORY_EXPIRATION_DATE, str(expiration_date) + + importance = self._to_float( + self._memory_value( + span_data, + payload, + "importance", + GEN_AI_MEMORY_IMPORTANCE, + "gen_ai.memory.importance", + ) + ) + if importance is not None: + yield GEN_AI_MEMORY_IMPORTANCE, importance + + agent_id = self._memory_value( + span_data, + payload, + "agent_id", + GEN_AI_AGENT_ID, + "gen_ai.agent.id", + ) + if agent_id is not None: + yield GEN_AI_AGENT_ID, str(agent_id) + + conversation_id = self._memory_value( + span_data, + payload, + "conversation_id", + "session_id", + "thread_id", + GEN_AI_CONVERSATION_ID, + "gen_ai.conversation.id", + ) + if conversation_id is not None: + yield GEN_AI_CONVERSATION_ID, str(conversation_id) + def _get_operation_name(self, span_data: Any) -> str: """Determine operation name from span data type.""" + if operation_name := self._extract_operation_name(span_data): + return operation_name if _is_instance_of(span_data, GenerationSpanData): # Check if it's embeddings if hasattr(span_data, "embedding_dimension"): @@ -1519,17 +1953,6 @@ def _get_operation_name(self, span_data: Any) -> str: return GenAIOperationName.CHAT return GenAIOperationName.TEXT_COMPLETION if _is_instance_of(span_data, AgentSpanData): - # Could be create_agent or invoke_agent based on context - operation = getattr(span_data, "operation", None) - normalized = ( - operation.strip().lower() - if isinstance(operation, str) - else None - ) - if normalized in {"create", "create_agent"}: - return GenAIOperationName.CREATE_AGENT - if normalized in {"invoke", "invoke_agent"}: - return GenAIOperationName.INVOKE_AGENT return GenAIOperationName.INVOKE_AGENT if _is_instance_of(span_data, FunctionSpanData): return GenAIOperationName.EXECUTE_TOOL @@ -1576,6 +1999,13 @@ def _extract_genai_attributes( for key, value in self._get_server_attributes().items(): yield key, value + operation_name = self._get_operation_name(span_data) + if operation_name in GenAIOperationName.MEMORY_OPERATIONS: + yield from self._get_attributes_from_memory_span_data( + span_data, operation_name + ) + return + # Process different span types if _is_instance_of(span_data, GenerationSpanData): yield from self._get_attributes_from_generation_span_data( diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_span_processor_unit.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_span_processor_unit.py index b2c8c7c8f3..be4eff9fb0 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_span_processor_unit.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_span_processor_unit.py @@ -55,6 +55,11 @@ class _GenAiOperationNameValues(Enum): CREATE_AGENT = "create_agent" INVOKE_AGENT = "invoke_agent" EXECUTE_TOOL = "execute_tool" + SEARCH_MEMORY = "search_memory" + UPDATE_MEMORY = "update_memory" + DELETE_MEMORY = "delete_memory" + CREATE_MEMORY_STORE = "create_memory_store" + DELETE_MEMORY_STORE = "delete_memory_store" class _GenAiOutputTypeValues(Enum): TEXT = "text" @@ -174,6 +179,15 @@ def test_operation_and_span_naming(processor_setup): == sp.GenAIOperationName.INVOKE_AGENT ) + class MemorySpanData: + operation = "search_memory" + + memory_span = MemorySpanData() + assert ( + processor._get_operation_name(memory_span) + == sp.GenAIOperationName.SEARCH_MEMORY + ) + function_data = FunctionSpanData() assert ( processor._get_operation_name(function_data) @@ -194,6 +208,7 @@ class UnknownSpanData: assert processor._get_span_kind(GenerationSpanData()) is SpanKind.CLIENT assert processor._get_span_kind(FunctionSpanData()) is SpanKind.INTERNAL + assert processor._get_span_kind(memory_span) is SpanKind.CLIENT assert ( sp.get_span_name(sp.GenAIOperationName.CHAT, model="gpt-4o") @@ -213,6 +228,13 @@ class UnknownSpanData: sp.get_span_name(sp.GenAIOperationName.CREATE_AGENT, agent_name=None) == "create_agent" ) + assert ( + sp.get_span_name( + sp.GenAIOperationName.SEARCH_MEMORY, + memory_store_name="session-memory", + ) + == "search_memory session-memory" + ) def test_attribute_builders(processor_setup): @@ -370,6 +392,46 @@ def __init__(self) -> None: assert function_attrs[sp.GEN_AI_TOOL_CALL_RESULT] == {"temperature": 70} assert function_attrs[sp.GEN_AI_OUTPUT_TYPE] == sp.GenAIOutputType.JSON + class MemorySearchSpanData: + operation = "search_memory" + conversation_id = "thread-123" + + @staticmethod + def export(): + return { + "data": { + "memory_store": { + "id": "ms-1", + "name": "session-store", + }, + "query": "weather preferences", + "result_count": 2, + "similarity_threshold": 0.8, + "memory_namespace": "user-42", + } + } + + class MemorySpan: + def __init__(self) -> None: + self.span_data = MemorySearchSpanData() + + memory_attrs = _collect( + processor._extract_genai_attributes( + MemorySpan(), sp.ContentPayload(), None + ) + ) + assert ( + memory_attrs[sp.GEN_AI_OPERATION_NAME] + == sp.GenAIOperationName.SEARCH_MEMORY + ) + assert memory_attrs[sp.GEN_AI_MEMORY_STORE_ID] == "ms-1" + assert memory_attrs[sp.GEN_AI_MEMORY_STORE_NAME] == "session-store" + assert memory_attrs[sp.GEN_AI_MEMORY_QUERY] == "weather preferences" + assert memory_attrs[sp.GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 2 + assert memory_attrs[sp.GEN_AI_MEMORY_SEARCH_SIMILARITY_THRESHOLD] == 0.8 + assert memory_attrs[sp.GEN_AI_MEMORY_NAMESPACE] == "user-42" + assert memory_attrs[sp.GEN_AI_CONVERSATION_ID] == "thread-123" + def test_extract_genai_attributes_unknown_type(processor_setup): processor, _ = processor_setup diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_zz_coverage_improvements.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_zz_coverage_improvements.py index 4d8a84b94a..cc7333deb2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_zz_coverage_improvements.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_zz_coverage_improvements.py @@ -342,6 +342,14 @@ def test_span_name_unknown_operation(self): name = sp.get_span_name("unknown_operation") assert name == "unknown_operation" + def test_span_name_memory_operation(self): + sp, _ = _get_modules() + name = sp.get_span_name( + "search_memory", + memory_store_name="customer-store", + ) + assert name == "search_memory customer-store" + class TestInferOutputType: """Tests for _infer_output_type method."""