Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3889](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3889))
- Added log and metrics provider to langchain genai utils handler
([#4214](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4214))
- Added retriever memory search span/event instrumentation aligned with the
GenAI memory semantic convention proposal.
([#3250](https://github.com/open-telemetry/semantic-conventions/pull/3250))
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations

from typing import Any, Optional
from typing import Any, Optional, Sequence
from uuid import UUID

from langchain_core.callbacks import BaseCallbackHandler
Expand All @@ -24,14 +24,60 @@
from opentelemetry.instrumentation.langchain.invocation_manager import (
_InvocationManager,
)
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import (
ContentCapturingMode,
Error,
InputMessage,
LLMInvocation,
OutputMessage,
Text,
)
from opentelemetry.util.genai.utils import (
get_content_capturing_mode,
is_experimental_mode,
)

GEN_AI_MEMORY_STORE_ID = getattr(
GenAI, "GEN_AI_MEMORY_STORE_ID", "gen_ai.memory.store.id"
)
GEN_AI_MEMORY_STORE_NAME = getattr(
GenAI, "GEN_AI_MEMORY_STORE_NAME", "gen_ai.memory.store.name"
)
GEN_AI_MEMORY_QUERY = getattr(
GenAI, "GEN_AI_MEMORY_QUERY", "gen_ai.memory.query"
)
GEN_AI_MEMORY_SEARCH_RESULT_COUNT = getattr(
GenAI,
"GEN_AI_MEMORY_SEARCH_RESULT_COUNT",
"gen_ai.memory.search.result.count",
)
GEN_AI_MEMORY_NAMESPACE = getattr(
GenAI, "GEN_AI_MEMORY_NAMESPACE", "gen_ai.memory.namespace"
)

_SEARCH_MEMORY_MEMBER = getattr(
getattr(GenAI, "GenAiOperationNameValues", object()),
"SEARCH_MEMORY",
None,
)
SEARCH_MEMORY_OPERATION = (
_SEARCH_MEMORY_MEMBER.value
if _SEARCH_MEMORY_MEMBER is not None
else "search_memory"
)

_RETRIEVAL_MEMBER = getattr(
getattr(GenAI, "GenAiOperationNameValues", object()),
"RETRIEVAL",
None,
)
RETRIEVAL_OPERATION = (
_RETRIEVAL_MEMBER.value if _RETRIEVAL_MEMBER is not None else "retrieval"
)


class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler):
Expand All @@ -44,6 +90,62 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None:
self._telemetry_handler = telemetry_handler
self._invocation_manager = _InvocationManager()

@staticmethod
def _resolve_retriever_store_name(
serialized: dict[str, Any],
metadata: Optional[dict[str, Any]],
) -> Optional[str]:
if metadata and metadata.get("memory_store_name"):
return str(metadata["memory_store_name"])
if metadata and metadata.get("ls_retriever_name"):
return str(metadata["ls_retriever_name"])
name = serialized.get("name")
return str(name) if isinstance(name, str) and name else None

@staticmethod
def _resolve_retriever_store_id(
serialized: dict[str, Any],
metadata: Optional[dict[str, Any]],
) -> Optional[str]:
if metadata and metadata.get("memory_store_id"):
return str(metadata["memory_store_id"])

serialized_id = serialized.get("id")
if isinstance(serialized_id, str) and serialized_id:
return serialized_id
if isinstance(serialized_id, list) and serialized_id:
try:
return ".".join(str(part) for part in serialized_id)
except TypeError:
return None
return None

@staticmethod
def _should_capture_memory_query() -> bool:
if not is_experimental_mode():
return False
try:
mode = get_content_capturing_mode()
except ValueError:
return False
return mode in (
ContentCapturingMode.SPAN_ONLY,
ContentCapturingMode.SPAN_AND_EVENT,
)

@staticmethod
def _is_memory_retriever(
metadata: Optional[dict[str, Any]],
) -> bool:
"""Detect if a retriever is a memory retriever based on metadata hints."""
if not metadata:
return False
return bool(
metadata.get("memory_store_name")
or metadata.get("memory_store_id")
or metadata.get("is_memory_retriever")
)

def on_chat_model_start(
self,
serialized: dict[str, Any],
Expand Down Expand Up @@ -268,3 +370,100 @@ def on_llm_error(
)
if llm_invocation.span and not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_retriever_start(
self,
serialized: dict[str, Any],
query: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[list[str]] = None,
metadata: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> None:
provider = "unknown"
if metadata is not None:
provider = metadata.get("ls_provider", "unknown")

attributes: dict[str, Any] = {}
is_memory = self._is_memory_retriever(metadata)
operation = (
SEARCH_MEMORY_OPERATION if is_memory else RETRIEVAL_OPERATION
)

if store_name := self._resolve_retriever_store_name(
serialized, metadata
):
attributes[GEN_AI_MEMORY_STORE_NAME] = store_name
if store_id := self._resolve_retriever_store_id(serialized, metadata):
attributes[GEN_AI_MEMORY_STORE_ID] = store_id
if (
query
and self._should_capture_memory_query()
and isinstance(query, str)
):
attributes[GEN_AI_MEMORY_QUERY] = query
if metadata and metadata.get("memory_namespace"):
attributes[GEN_AI_MEMORY_NAMESPACE] = metadata["memory_namespace"]

llm_invocation = LLMInvocation(
request_model="",
provider=provider,
operation_name=operation,
attributes=attributes,
)
llm_invocation = self._telemetry_handler.start_llm(
invocation=llm_invocation
)
if llm_invocation.span and store_name:
llm_invocation.span.update_name(f"{operation} {store_name}")
self._invocation_manager.add_invocation_state(
run_id=run_id,
parent_run_id=parent_run_id,
invocation=llm_invocation,
)

def on_retriever_end(
self,
documents: Sequence[Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> None:
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
if llm_invocation is None or not isinstance(
llm_invocation, LLMInvocation
):
return

llm_invocation.attributes[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] = len(
documents
)
llm_invocation = self._telemetry_handler.stop_llm(
invocation=llm_invocation
)
if llm_invocation.span and not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_retriever_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> None:
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
if llm_invocation is None or not isinstance(
llm_invocation, LLMInvocation
):
return

error_otel = Error(message=str(error), type=type(error))
llm_invocation = self._telemetry_handler.fail_llm(
invocation=llm_invocation, error=error_otel
)
if llm_invocation.span and not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from __future__ import annotations

from unittest import mock
from uuid import uuid4

from opentelemetry.instrumentation.langchain.callback_handler import (
GEN_AI_MEMORY_QUERY,
GEN_AI_MEMORY_SEARCH_RESULT_COUNT,
GEN_AI_MEMORY_STORE_ID,
GEN_AI_MEMORY_STORE_NAME,
RETRIEVAL_OPERATION,
SEARCH_MEMORY_OPERATION,
OpenTelemetryLangChainCallbackHandler,
)
from opentelemetry.util.genai.types import ContentCapturingMode


def _build_handler():
telemetry_handler = mock.Mock()

def _start(invocation):
span = mock.Mock()
span.is_recording.return_value = True
invocation.span = span
return invocation

telemetry_handler.start_llm.side_effect = _start
telemetry_handler.stop_llm.side_effect = lambda invocation: invocation
telemetry_handler.fail_llm.side_effect = (
lambda invocation, error: invocation
)
return (
OpenTelemetryLangChainCallbackHandler(telemetry_handler),
telemetry_handler,
)


def test_retriever_defaults_to_retrieval_without_memory_metadata(monkeypatch):
"""Retrievers without memory metadata should emit 'retrieval' operation."""
handler, telemetry_handler = _build_handler()
monkeypatch.setattr(
"opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode",
lambda: False,
)

run_id = uuid4()
handler.on_retriever_start(
serialized={"name": "PineconeRetriever"},
query="what is RAG?",
run_id=run_id,
metadata={"ls_provider": "pinecone"},
)

invocation = handler._invocation_manager.get_invocation(run_id)
assert invocation is not None
assert invocation.operation_name == RETRIEVAL_OPERATION
telemetry_handler.start_llm.assert_called_once()


def test_retriever_uses_search_memory_with_memory_metadata(monkeypatch):
"""Retrievers with memory_store_name in metadata should emit 'search_memory'."""
handler, telemetry_handler = _build_handler()
monkeypatch.setattr(
"opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode",
lambda: True,
)
monkeypatch.setattr(
"opentelemetry.instrumentation.langchain.callback_handler.get_content_capturing_mode",
lambda: ContentCapturingMode.SPAN_ONLY,
)

run_id = uuid4()
handler.on_retriever_start(
serialized={
"name": "SessionMemoryRetriever",
"id": ["langchain", "retriever", "session"],
},
query="user preferences",
run_id=run_id,
metadata={
"ls_provider": "openai",
"memory_store_name": "SessionMemoryRetriever",
"memory_namespace": "user-123",
},
)

invocation = handler._invocation_manager.get_invocation(run_id)
assert invocation is not None
assert invocation.operation_name == SEARCH_MEMORY_OPERATION
assert (
invocation.attributes[GEN_AI_MEMORY_STORE_NAME]
== "SessionMemoryRetriever"
)
assert (
invocation.attributes[GEN_AI_MEMORY_STORE_ID]
== "langchain.retriever.session"
)
assert invocation.attributes[GEN_AI_MEMORY_QUERY] == "user preferences"
telemetry_handler.start_llm.assert_called_once()


def test_on_retriever_end_sets_search_result_count(monkeypatch):
handler, telemetry_handler = _build_handler()
monkeypatch.setattr(
"opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode",
lambda: False,
)

run_id = uuid4()
handler.on_retriever_start(
serialized={"name": "MemoryRetriever"},
query="q",
run_id=run_id,
metadata={"ls_provider": "openai"},
)
handler.on_retriever_end(documents=[object(), object()], run_id=run_id)

telemetry_handler.stop_llm.assert_called_once()
stop_invocation = telemetry_handler.stop_llm.call_args.kwargs["invocation"]
assert stop_invocation.attributes[GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 2


def test_on_retriever_error_fails_invocation(monkeypatch):
handler, telemetry_handler = _build_handler()
monkeypatch.setattr(
"opentelemetry.instrumentation.langchain.callback_handler.is_experimental_mode",
lambda: False,
)

run_id = uuid4()
handler.on_retriever_start(
serialized={"name": "VectorRetriever"},
query="q",
run_id=run_id,
metadata={"ls_provider": "openai"},
)
handler.on_retriever_error(RuntimeError("retrieval failed"), run_id=run_id)

telemetry_handler.fail_llm.assert_called_once()
fail_invocation = telemetry_handler.fail_llm.call_args.kwargs["invocation"]
assert fail_invocation.operation_name == RETRIEVAL_OPERATION
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

- Initial release: Mem0 memory operation instrumentation aligned with GenAI
memory semantic conventions.
([#3250](https://github.com/open-telemetry/semantic-conventions/pull/3250))
Loading
Loading