diff --git a/packages/opentelemetry-instrumentation-crewai/opentelemetry/instrumentation/crewai/instrumentation.py b/packages/opentelemetry-instrumentation-crewai/opentelemetry/instrumentation/crewai/instrumentation.py index 979b92166c..f2792f500a 100644 --- a/packages/opentelemetry-instrumentation-crewai/opentelemetry/instrumentation/crewai/instrumentation.py +++ b/packages/opentelemetry-instrumentation-crewai/opentelemetry/instrumentation/crewai/instrumentation.py @@ -10,6 +10,7 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.crewai.version import __version__ from opentelemetry.semconv._incubating.attributes import ( + error_attributes as ErrorAttributes, gen_ai_attributes as GenAIAttributes, ) from opentelemetry.semconv_ai import SpanAttributes, TraceloopSpanKindValues, Meters @@ -17,6 +18,31 @@ _instruments = ("crewai >= 0.70.0",) +# GenAI memory semantic convention attribute keys (fallback to string +# literals when the installed semconv package doesn't define them yet). +_GEN_AI_OPERATION_NAME = getattr(GenAIAttributes, "GEN_AI_OPERATION_NAME", "gen_ai.operation.name") +_GEN_AI_PROVIDER_NAME = getattr(GenAIAttributes, "GEN_AI_PROVIDER_NAME", "gen_ai.provider.name") +_GEN_AI_MEMORY_SCOPE = getattr(GenAIAttributes, "GEN_AI_MEMORY_SCOPE", "gen_ai.memory.scope") +_GEN_AI_MEMORY_TYPE = getattr(GenAIAttributes, "GEN_AI_MEMORY_TYPE", "gen_ai.memory.type") +_GEN_AI_MEMORY_QUERY = getattr(GenAIAttributes, "GEN_AI_MEMORY_QUERY", "gen_ai.memory.query") +_GEN_AI_MEMORY_CONTENT = getattr(GenAIAttributes, "GEN_AI_MEMORY_CONTENT", "gen_ai.memory.content") +_GEN_AI_MEMORY_NAMESPACE = getattr(GenAIAttributes, "GEN_AI_MEMORY_NAMESPACE", "gen_ai.memory.namespace") +_GEN_AI_MEMORY_SEARCH_RESULT_COUNT = getattr( + GenAIAttributes, "GEN_AI_MEMORY_SEARCH_RESULT_COUNT", "gen_ai.memory.search.result.count" +) +_GEN_AI_MEMORY_UPDATE_STRATEGY = getattr( + GenAIAttributes, "GEN_AI_MEMORY_UPDATE_STRATEGY", "gen_ai.memory.update.strategy" +) +_GEN_AI_MEMORY_IMPORTANCE = getattr(GenAIAttributes, "GEN_AI_MEMORY_IMPORTANCE", "gen_ai.memory.importance") +_ERROR_TYPE = getattr(ErrorAttributes, "ERROR_TYPE", "error.type") + +_PROVIDER = "crewai" + + +def _capture_content() -> bool: + """Check if memory content capture is enabled.""" + return os.environ.get("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "").lower() in ("true", "1") + class CrewAIInstrumentor(BaseInstrumentor): @@ -44,12 +70,40 @@ def _instrument(self, **kwargs): wrap_function_wrapper("crewai.llm", "LLM.call", wrap_llm_call(tracer, duration_histogram, token_histogram)) + # Memory operations (crewai.memory.unified_memory.Memory) + try: + wrap_function_wrapper( + "crewai.memory.unified_memory", "Memory.remember", + wrap_memory_remember(tracer, duration_histogram)) + wrap_function_wrapper( + "crewai.memory.unified_memory", "Memory.recall", + wrap_memory_recall(tracer, duration_histogram)) + wrap_function_wrapper( + "crewai.memory.unified_memory", "Memory.forget", + wrap_memory_forget(tracer, duration_histogram)) + wrap_function_wrapper( + "crewai.memory.unified_memory", "Memory.reset", + wrap_memory_reset(tracer, duration_histogram)) + except Exception: + # CrewAI versions before unified_memory may not have these classes + pass + def _uninstrument(self, **kwargs): unwrap("crewai.crew.Crew", "kickoff") unwrap("crewai.agent.Agent", "execute_task") unwrap("crewai.task.Task", "execute_sync") unwrap("crewai.llm.LLM", "call") + # Memory unwrap (ignore if not wrapped) + try: + from crewai.memory.unified_memory import Memory as UnifiedMemory + unwrap(UnifiedMemory, "remember") + unwrap(UnifiedMemory, "recall") + unwrap(UnifiedMemory, "forget") + unwrap(UnifiedMemory, "reset") + except Exception: + pass + def with_tracer_wrapper(func): """Helper for providing tracer for wrapper functions.""" @@ -199,3 +253,229 @@ def _create_metrics(meter: Meter): ) return token_histogram, duration_histogram + + +# --------------------------------------------------------------------------- +# Memory operation wrappers — aligned with GenAI memory semantic conventions +# --------------------------------------------------------------------------- + + +def _infer_memory_scope(instance) -> str: + """Infer memory scope from the Memory instance or its MemoryScope wrapper.""" + # MemoryScope has a _root attribute like "/agent/1" or "/user/123" + root = getattr(instance, "_root", None) + if root: + parts = root.strip("/").split("/") + if parts: + first = parts[0].lower() + if first in ("user", "agent", "session", "team", "global"): + return first + return "agent" + + +def _infer_memory_type(kwargs) -> str: + """Infer memory type from kwargs categories hint, defaulting to long_term.""" + categories = kwargs.get("categories") + if categories and isinstance(categories, list): + for cat in categories: + cl = str(cat).lower() + if "short" in cl: + return "short_term" + if "entity" in cl: + return "entity" + return "long_term" + + +def _set_memory_error(span, exc): + """Record error details on the span.""" + error_type = type(exc).__qualname__ + span.set_status(Status(StatusCode.ERROR, str(exc))) + set_span_attribute(span, _ERROR_TYPE, error_type) + return error_type + + +def _record_memory_duration(duration_histogram, duration_s, operation, error_type=None): + """Record memory operation duration metric.""" + if not duration_histogram: + return + attrs = { + _GEN_AI_OPERATION_NAME: operation, + GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER, + } + if error_type: + attrs[_ERROR_TYPE] = error_type + duration_histogram.record(max(duration_s, 0.0), attributes=attrs) + + +def wrap_memory_remember(tracer: Tracer, duration_histogram: Histogram): + """Wrap Memory.remember() → update_memory span.""" + def _wrapper(wrapped, instance, args, kwargs): + operation = "update_memory" + span_name = f"{operation} {_PROVIDER}" + error_type = None + start_time = time.time() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT, + attributes={GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER} + ) as span: + set_span_attribute(span, _GEN_AI_OPERATION_NAME, operation) + set_span_attribute(span, _GEN_AI_PROVIDER_NAME, _PROVIDER) + set_span_attribute(span, _GEN_AI_MEMORY_SCOPE, _infer_memory_scope(instance)) + set_span_attribute(span, _GEN_AI_MEMORY_TYPE, _infer_memory_type(kwargs)) + set_span_attribute(span, _GEN_AI_MEMORY_UPDATE_STRATEGY, "merge") + + # Namespace from source kwarg + source = kwargs.get("source") + if source: + set_span_attribute(span, _GEN_AI_MEMORY_NAMESPACE, str(source)) + + # Scope path + scope = kwargs.get("scope") + if scope: + set_span_attribute(span, "crewai.memory.scope_path", str(scope)) + + importance = kwargs.get("importance") + if importance is not None: + set_span_attribute(span, _GEN_AI_MEMORY_IMPORTANCE, float(importance)) + + # Content (opt-in) + if _capture_content() and args: + content = args[0] if args else kwargs.get("content") + if content and isinstance(content, str): + set_span_attribute(span, _GEN_AI_MEMORY_CONTENT, content) + + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + # MemoryRecord has an id attribute + if result and hasattr(result, "id"): + set_span_attribute(span, "gen_ai.memory.id", str(result.id)) + return result + except Exception as ex: + error_type = _set_memory_error(span, ex) + raise + finally: + _record_memory_duration( + duration_histogram, time.time() - start_time, operation, error_type + ) + return _wrapper + + +def wrap_memory_recall(tracer: Tracer, duration_histogram: Histogram): + """Wrap Memory.recall() → search_memory span.""" + def _wrapper(wrapped, instance, args, kwargs): + operation = "search_memory" + span_name = f"{operation} {_PROVIDER}" + error_type = None + start_time = time.time() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT, + attributes={GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER} + ) as span: + set_span_attribute(span, _GEN_AI_OPERATION_NAME, operation) + set_span_attribute(span, _GEN_AI_PROVIDER_NAME, _PROVIDER) + set_span_attribute(span, _GEN_AI_MEMORY_SCOPE, _infer_memory_scope(instance)) + set_span_attribute(span, _GEN_AI_MEMORY_TYPE, _infer_memory_type(kwargs)) + + # Query (opt-in) + query = args[0] if args else kwargs.get("query") + if _capture_content() and query and isinstance(query, str): + set_span_attribute(span, _GEN_AI_MEMORY_QUERY, query) + + # Scope path + scope = kwargs.get("scope") + if scope: + set_span_attribute(span, "crewai.memory.scope_path", str(scope)) + + source = kwargs.get("source") + if source: + set_span_attribute(span, _GEN_AI_MEMORY_NAMESPACE, str(source)) + + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + if isinstance(result, list): + set_span_attribute(span, _GEN_AI_MEMORY_SEARCH_RESULT_COUNT, len(result)) + return result + except Exception as ex: + error_type = _set_memory_error(span, ex) + raise + finally: + _record_memory_duration( + duration_histogram, time.time() - start_time, operation, error_type + ) + return _wrapper + + +def wrap_memory_forget(tracer: Tracer, duration_histogram: Histogram): + """Wrap Memory.forget() → delete_memory span.""" + def _wrapper(wrapped, instance, args, kwargs): + operation = "delete_memory" + span_name = f"{operation} {_PROVIDER}" + error_type = None + start_time = time.time() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT, + attributes={GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER} + ) as span: + set_span_attribute(span, _GEN_AI_OPERATION_NAME, operation) + set_span_attribute(span, _GEN_AI_PROVIDER_NAME, _PROVIDER) + set_span_attribute(span, _GEN_AI_MEMORY_SCOPE, _infer_memory_scope(instance)) + + scope = kwargs.get("scope") + if scope: + set_span_attribute(span, "crewai.memory.scope_path", str(scope)) + + record_ids = kwargs.get("record_ids") + if record_ids and isinstance(record_ids, list) and len(record_ids) == 1: + set_span_attribute(span, "gen_ai.memory.id", str(record_ids[0])) + + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + # forget() returns number of deleted records + if isinstance(result, int): + set_span_attribute(span, "crewai.memory.deleted_count", result) + return result + except Exception as ex: + error_type = _set_memory_error(span, ex) + raise + finally: + _record_memory_duration( + duration_histogram, time.time() - start_time, operation, error_type + ) + return _wrapper + + +def wrap_memory_reset(tracer: Tracer, duration_histogram: Histogram): + """Wrap Memory.reset() → delete_memory span (scope-level wipe).""" + def _wrapper(wrapped, instance, args, kwargs): + operation = "delete_memory" + span_name = f"{operation} {_PROVIDER}" + error_type = None + start_time = time.time() + with tracer.start_as_current_span( + span_name, kind=SpanKind.CLIENT, + attributes={GenAIAttributes.GEN_AI_SYSTEM: _PROVIDER} + ) as span: + set_span_attribute(span, _GEN_AI_OPERATION_NAME, operation) + set_span_attribute(span, _GEN_AI_PROVIDER_NAME, _PROVIDER) + set_span_attribute(span, _GEN_AI_MEMORY_SCOPE, _infer_memory_scope(instance)) + set_span_attribute(span, "crewai.memory.reset", True) + + scope = kwargs.get("scope") + if scope: + set_span_attribute(span, "crewai.memory.scope_path", str(scope)) + + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as ex: + error_type = _set_memory_error(span, ex) + raise + finally: + _record_memory_duration( + duration_histogram, time.time() - start_time, operation, error_type + ) + return _wrapper diff --git a/packages/opentelemetry-instrumentation-crewai/tests/test_memory_instrumentation.py b/packages/opentelemetry-instrumentation-crewai/tests/test_memory_instrumentation.py new file mode 100644 index 0000000000..6cc50ecdb4 --- /dev/null +++ b/packages/opentelemetry-instrumentation-crewai/tests/test_memory_instrumentation.py @@ -0,0 +1,233 @@ +"""Tests for CrewAI memory operation instrumentation. + +These tests verify that Memory.remember, Memory.recall, Memory.forget, +and Memory.reset produce spans with GenAI memory semantic convention attributes. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, + SpanExporter, + SpanExportResult, +) +from opentelemetry.trace import SpanKind, StatusCode + +from opentelemetry.instrumentation.crewai.instrumentation import ( + wrap_memory_remember, + wrap_memory_recall, + wrap_memory_forget, + wrap_memory_reset, + _GEN_AI_OPERATION_NAME, + _GEN_AI_PROVIDER_NAME, + _GEN_AI_MEMORY_SCOPE, + _GEN_AI_MEMORY_TYPE, + _GEN_AI_MEMORY_CONTENT, + _GEN_AI_MEMORY_QUERY, + _GEN_AI_MEMORY_SEARCH_RESULT_COUNT, + _GEN_AI_MEMORY_UPDATE_STRATEGY, + _GEN_AI_MEMORY_IMPORTANCE, + _ERROR_TYPE, +) + + +class _InMemoryExporter(SpanExporter): + def __init__(self): + self.spans = [] + + def export(self, spans): + self.spans.extend(spans) + return SpanExportResult.SUCCESS + + def shutdown(self): + pass + + def get_finished_spans(self): + return list(self.spans) + + +@pytest.fixture() +def tracer_provider(): + return TracerProvider() + + +@pytest.fixture() +def exporter(tracer_provider): + exp = _InMemoryExporter() + tracer_provider.add_span_processor(SimpleSpanProcessor(exp)) + return exp + + +@pytest.fixture() +def tracer(tracer_provider): + return tracer_provider.get_tracer("test") + + +def _get_span(exporter): + spans = exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + return span, dict(span.attributes) + + +class TestMemoryRemember: + def test_basic_remember(self, tracer, exporter): + mock_record = MagicMock() + mock_record.id = "rec-123" + wrapped = MagicMock(return_value=mock_record) + instance = MagicMock() + instance._root = None + + wrapper = wrap_memory_remember(tracer, None) + result = wrapper( + wrapped, instance, ("Agent met with John",), + {"scope": "/agent/1", "importance": 0.8} + ) + + assert result == mock_record + span, attrs = _get_span(exporter) + assert span.name == "update_memory crewai" + assert span.kind == SpanKind.CLIENT + assert attrs[_GEN_AI_OPERATION_NAME] == "update_memory" + assert attrs[_GEN_AI_PROVIDER_NAME] == "crewai" + assert attrs[_GEN_AI_MEMORY_UPDATE_STRATEGY] == "merge" + assert attrs[_GEN_AI_MEMORY_IMPORTANCE] == 0.8 + assert attrs["gen_ai.memory.id"] == "rec-123" + + def test_remember_infers_scope_from_root(self, tracer, exporter): + wrapped = MagicMock(return_value=MagicMock(id="r1")) + instance = MagicMock() + instance._root = "/user/alice" + + wrapper = wrap_memory_remember(tracer, None) + wrapper(wrapped, instance, ("data",), {}) + + span, attrs = _get_span(exporter) + assert attrs[_GEN_AI_MEMORY_SCOPE] == "user" + + def test_remember_captures_content(self, tracer, exporter, monkeypatch): + monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "true") + wrapped = MagicMock(return_value=MagicMock(id="r1")) + instance = MagicMock() + instance._root = None + + wrapper = wrap_memory_remember(tracer, None) + wrapper(wrapped, instance, ("User likes dark mode",), {}) + + span, attrs = _get_span(exporter) + assert attrs[_GEN_AI_MEMORY_CONTENT] == "User likes dark mode" + + def test_remember_no_content_by_default(self, tracer, exporter, monkeypatch): + monkeypatch.delenv("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", raising=False) + wrapped = MagicMock(return_value=MagicMock(id="r1")) + instance = MagicMock() + instance._root = None + + wrapper = wrap_memory_remember(tracer, None) + wrapper(wrapped, instance, ("secret",), {}) + + span, attrs = _get_span(exporter) + assert _GEN_AI_MEMORY_CONTENT not in attrs + + def test_remember_error(self, tracer, exporter): + wrapped = MagicMock(side_effect=RuntimeError("storage error")) + instance = MagicMock() + instance._root = None + + wrapper = wrap_memory_remember(tracer, None) + with pytest.raises(RuntimeError, match="storage error"): + wrapper(wrapped, instance, ("data",), {}) + + span, attrs = _get_span(exporter) + assert span.status.status_code == StatusCode.ERROR + assert attrs[_ERROR_TYPE] == "RuntimeError" + + +class TestMemoryRecall: + def test_basic_recall(self, tracer, exporter): + results = [MagicMock(), MagicMock()] + wrapped = MagicMock(return_value=results) + instance = MagicMock() + instance._root = "/agent/bot-1" + + wrapper = wrap_memory_recall(tracer, None) + result = wrapper(wrapped, instance, ("onboarding process",), {"limit": 5}) + + assert result == results + span, attrs = _get_span(exporter) + assert span.name == "search_memory crewai" + assert attrs[_GEN_AI_OPERATION_NAME] == "search_memory" + assert attrs[_GEN_AI_PROVIDER_NAME] == "crewai" + assert attrs[_GEN_AI_MEMORY_SCOPE] == "agent" + assert attrs[_GEN_AI_MEMORY_SEARCH_RESULT_COUNT] == 2 + + def test_recall_captures_query(self, tracer, exporter, monkeypatch): + monkeypatch.setenv("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "true") + wrapped = MagicMock(return_value=[]) + instance = MagicMock() + instance._root = None + + wrapper = wrap_memory_recall(tracer, None) + wrapper(wrapped, instance, ("find meeting notes",), {}) + + span, attrs = _get_span(exporter) + assert attrs[_GEN_AI_MEMORY_QUERY] == "find meeting notes" + + def test_recall_error(self, tracer, exporter): + wrapped = MagicMock(side_effect=TimeoutError("vector db timeout")) + instance = MagicMock() + instance._root = None + + wrapper = wrap_memory_recall(tracer, None) + with pytest.raises(TimeoutError): + wrapper(wrapped, instance, ("query",), {}) + + span, attrs = _get_span(exporter) + assert attrs[_ERROR_TYPE] == "TimeoutError" + + +class TestMemoryForget: + def test_basic_forget(self, tracer, exporter): + wrapped = MagicMock(return_value=3) + instance = MagicMock() + instance._root = "/user/alice" + + wrapper = wrap_memory_forget(tracer, None) + result = wrapper(wrapped, instance, (), {"scope": "/user/alice/temp"}) + + assert result == 3 + span, attrs = _get_span(exporter) + assert span.name == "delete_memory crewai" + assert attrs[_GEN_AI_OPERATION_NAME] == "delete_memory" + assert attrs[_GEN_AI_MEMORY_SCOPE] == "user" + assert attrs["crewai.memory.deleted_count"] == 3 + + def test_forget_single_record(self, tracer, exporter): + wrapped = MagicMock(return_value=1) + instance = MagicMock() + instance._root = None + + wrapper = wrap_memory_forget(tracer, None) + wrapper(wrapped, instance, (), {"record_ids": ["rec-42"]}) + + span, attrs = _get_span(exporter) + assert attrs["gen_ai.memory.id"] == "rec-42" + + +class TestMemoryReset: + def test_basic_reset(self, tracer, exporter): + wrapped = MagicMock(return_value=None) + instance = MagicMock() + instance._root = "/agent/bot" + + wrapper = wrap_memory_reset(tracer, None) + wrapper(wrapped, instance, (), {"scope": "/agent/bot"}) + + span, attrs = _get_span(exporter) + assert span.name == "delete_memory crewai" + assert attrs[_GEN_AI_OPERATION_NAME] == "delete_memory" + assert attrs[_GEN_AI_MEMORY_SCOPE] == "agent" + assert attrs["crewai.memory.reset"] is True