diff --git a/README.md b/README.md index 49bf0247..088c6f93 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ BRAINTRUST_API_KEY= braintrust eval tutorial_eval.py | [Strands](py/src/braintrust/integrations/strands/) | Yes | `strands-agents>=1.20.0` | | [Agno](py/src/braintrust/integrations/agno/) | Yes | `agno>=2.1.0` | | [AgentScope](py/src/braintrust/integrations/agentscope/) | Yes | `agentscope>=1.0.0` | +| [Temporal](py/src/braintrust/integrations/temporal/) | Yes | `temporalio>=1.19.0` | | [pytest plugin](py/src/braintrust/wrappers/pytest_plugin/README.md) | No | `pytest>=8` | ## Documentation diff --git a/py/noxfile.py b/py/noxfile.py index f08d3153..08a292de 100644 --- a/py/noxfile.py +++ b/py/noxfile.py @@ -444,7 +444,7 @@ def test_mistral(session, version): def test_temporal(session, version): _install_test_deps(session) _install_matrix_dep(session, "temporalio", version) - _run_tests(session, "braintrust/contrib/temporal") + _run_tests(session, f"{INTEGRATION_DIR}/temporal") PYTEST_VERSIONS = _get_matrix_versions("pytest-matrix") diff --git a/py/pyproject.toml b/py/pyproject.toml index 29ea13f1..bcb62a48 100644 --- a/py/pyproject.toml +++ b/py/pyproject.toml @@ -55,6 +55,8 @@ otel = [ performance = [ "orjson; platform_python_implementation != 'PyPy'", ] +# Deprecated: Temporal is now available via braintrust.auto_instrument(); +# keep this extra as a compatibility alias until a future major release. temporal = [ "temporalio>=1.19.0; python_version>='3.10'", ] diff --git a/py/src/braintrust/auto.py b/py/src/braintrust/auto.py index 117afeac..a30c1bf6 100644 --- a/py/src/braintrust/auto.py +++ b/py/src/braintrust/auto.py @@ -27,6 +27,7 @@ OpenRouterIntegration, PydanticAIIntegration, StrandsIntegration, + TemporalIntegration, ) from braintrust.integrations.base import BaseIntegration @@ -68,6 +69,7 @@ def auto_instrument( autogen: bool = True, crewai: bool = True, strands: bool = True, + temporal: bool = True, ) -> dict[str, bool]: """ Auto-instrument supported AI/ML libraries for Braintrust tracing. @@ -98,6 +100,7 @@ def auto_instrument( autogen: Enable AutoGen instrumentation (default: True) crewai: Enable CrewAI instrumentation (default: True) strands: Enable Strands Agents instrumentation (default: True) + temporal: Enable Temporal instrumentation (default: True) Returns: Dict mapping integration name to whether it was successfully instrumented. @@ -183,6 +186,8 @@ def auto_instrument( results["crewai"] = _instrument_integration(CrewAIIntegration) if strands: results["strands"] = _instrument_integration(StrandsIntegration) + if temporal: + results["temporal"] = _instrument_integration(TemporalIntegration) return results diff --git a/py/src/braintrust/contrib/temporal/__init__.py b/py/src/braintrust/contrib/temporal/__init__.py index b25a3946..8b46a971 100644 --- a/py/src/braintrust/contrib/temporal/__init__.py +++ b/py/src/braintrust/contrib/temporal/__init__.py @@ -1,441 +1,20 @@ -"""Braintrust integration for Temporal workflows and activities. +"""Deprecated compatibility exports for Braintrust Temporal integration. -This module provides Temporal integration that automatically traces workflow executions -and activities in Braintrust. To use this integration, install braintrust with the -temporal extra: - - pip install braintrust[temporal] - -Components ----------- - -There are two main components: - -- **BraintrustPlugin**: Use this for both Temporal clients and workers. It's a convenience - wrapper that automatically configures the interceptor and sandbox settings. - -- **BraintrustInterceptor**: The underlying interceptor. You can use this directly if you - need more control, but ``BraintrustPlugin`` is recommended for most use cases. - -Worker Setup ------------- - -Use ``BraintrustPlugin`` when creating a worker:: - - import braintrust - from braintrust.contrib.temporal import BraintrustPlugin - from temporalio.client import Client - from temporalio.worker import Worker - - braintrust.init_logger(project="my-project") - - client = await Client.connect("localhost:7233") - - worker = Worker( - client, - task_queue="my-queue", - workflows=[MyWorkflow], - activities=[my_activity], - plugins=[BraintrustPlugin()], - ) - - await worker.run() - -Client Setup ------------- - -Use ``BraintrustPlugin`` when creating a client to propagate span context to workflows:: - - import braintrust - from braintrust.contrib.temporal import BraintrustPlugin - from temporalio.client import Client - - braintrust.init_logger(project="my-project") - - client = await Client.connect( - "localhost:7233", - plugins=[BraintrustPlugin()], - ) - - # Spans created around workflow calls will be linked as parents - with braintrust.start_span(name="my-operation") as span: - result = await client.execute_workflow( - MyWorkflow.run, - args, - id="workflow-id", - task_queue="my-queue", - ) - -What Gets Traced ----------------- - -The integration will automatically: - -- Trace workflow executions -- Trace all activity executions -- Trace local activities -- Maintain parent-child relationships between client calls, workflows, and activities -- Handle child workflows -- Respect Temporal replay safety (no duplicate spans during replay) +Prefer `braintrust.auto_instrument()` for automatic setup, or import explicit helpers +from `braintrust.integrations.temporal`. """ -import dataclasses -from collections.abc import Mapping -from typing import Any - -import braintrust -import temporalio.activity -import temporalio.api.common.v1 -import temporalio.client -import temporalio.converter -import temporalio.worker -import temporalio.workflow -from temporalio.plugin import SimplePlugin -from temporalio.worker import WorkflowRunner -from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner - - -# Braintrust dynamically chooses its context implementation at runtime based on -# BRAINTRUST_OTEL_COMPAT environment variable. When first accessed, it reads -# os.environ which is restricted in the sandbox. Therefore if the first use -# is inside the sandbox, it will fail. So we eagerly reference it here to -# force initialization at import time (before sandbox evaluation). -try: - braintrust.current_span() -except Exception: - # It's okay if this fails (e.g., no logger initialized yet) - pass - -# Store module-level reference to braintrust.current_span to avoid re-importing -# inside extern functions (which can trigger sandbox restrictions) -_current_span = braintrust.current_span - -# Header key for passing span context between client, workflows, and activities -_HEADER_KEY = "_braintrust-span" - - -class BraintrustInterceptor(temporalio.client.Interceptor, temporalio.worker.Interceptor): - """Braintrust interceptor for tracing Temporal workflows and activities. - - This interceptor can be used with both Temporal clients and workers to automatically - trace workflow executions and activity runs. It maintains proper parent-child - relationships in the trace hierarchy and respects Temporal's replay safety requirements. - - The interceptor: - - Creates spans for workflow executions (using sandbox_unrestricted) - - Captures activity execution as spans with metadata - - Propagates span context from client → workflow → activities - - Handles both regular activities and local activities - - Supports child workflows - - Logs errors from failed activities and workflows - - Ensures replay safety (no duplicate spans during workflow replay) - """ - - def __init__(self, logger: Any | None = None) -> None: - """Initialize interceptor. - - Args: - logger: Optional background logger for testing. - """ - self.payload_converter = temporalio.converter.PayloadConverter.default - self._bg_logger = logger - # Capture logger instance at init time for cross-thread use - if logger: - braintrust.logger._state._override_bg_logger.logger = logger - self._logger = braintrust.current_logger() - - def _get_logger(self) -> Any | None: - """Get logger for creating spans. - - Sets thread-local override if background logger provided (for testing), - then returns captured logger instance. - """ - if self._bg_logger: - braintrust.logger._state._override_bg_logger.logger = self._bg_logger - return self._logger - - def intercept_client(self, next: temporalio.client.OutboundInterceptor) -> temporalio.client.OutboundInterceptor: - """Intercept client calls to propagate span context to workflows.""" - return _BraintrustClientOutboundInterceptor(next, self) - - def intercept_activity( - self, next: temporalio.worker.ActivityInboundInterceptor - ) -> temporalio.worker.ActivityInboundInterceptor: - """Intercept activity executions to create activity spans.""" - return _BraintrustActivityInboundInterceptor(next, self) - - def workflow_interceptor_class( - self, input: temporalio.worker.WorkflowInterceptorClassInput - ) -> type["BraintrustWorkflowInboundInterceptor"] | None: - """Return workflow interceptor class to propagate context to activities.""" - input.unsafe_extern_functions["__braintrust_get_logger"] = self._get_logger - return BraintrustWorkflowInboundInterceptor - - def _span_context_to_headers( - self, - span_context: dict[str, Any], - headers: Mapping[str, temporalio.api.common.v1.Payload], - ) -> Mapping[str, temporalio.api.common.v1.Payload]: - """Add span context to headers.""" - if span_context: - payloads = self.payload_converter.to_payloads([span_context]) - if payloads: - headers = { - **headers, - _HEADER_KEY: payloads[0], - } - return headers - - def _span_context_from_headers( - self, headers: Mapping[str, temporalio.api.common.v1.Payload] - ) -> dict[str, Any] | None: - """Extract span context from headers.""" - if _HEADER_KEY not in headers: - return None - header_payload = headers.get(_HEADER_KEY) - if not header_payload: - return None - payloads = self.payload_converter.from_payloads([header_payload]) - if not payloads: - return None - return payloads[0] if payloads[0] else None - - -class _BraintrustClientOutboundInterceptor(temporalio.client.OutboundInterceptor): - """Client interceptor that propagates span context to workflows.""" - - def __init__(self, next: temporalio.client.OutboundInterceptor, root: BraintrustInterceptor) -> None: - super().__init__(next) - self.root = root - - async def start_workflow( - self, input: temporalio.client.StartWorkflowInput - ) -> temporalio.client.WorkflowHandle[Any, Any]: - # Get current span context and add it to workflow headers - current_span = _current_span() - if current_span: - span_context = current_span.export() - input.headers = self.root._span_context_to_headers(span_context, input.headers) - - return await super().start_workflow(input) - - -class _BraintrustActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): - """Activity interceptor that creates spans for activity executions.""" - - def __init__( - self, - next: temporalio.worker.ActivityInboundInterceptor, - root: BraintrustInterceptor, - ) -> None: - super().__init__(next) - self.root = root - - async def execute_activity(self, input: temporalio.worker.ExecuteActivityInput) -> Any: - info = temporalio.activity.info() - - # Extract parent span context from headers - parent_span_context = self.root._span_context_from_headers(input.headers) - - logger = self.root._get_logger() - if not logger: - return await super().execute_activity(input) - - # Create Braintrust span for activity execution, linked to workflow span - span = logger.start_span( - name=f"temporal.activity.{info.activity_type}", - type="task", - parent=parent_span_context or None, - metadata={ - "temporal.activity_type": info.activity_type, - "temporal.activity_id": info.activity_id, - "temporal.workflow_id": info.workflow_id, - "temporal.workflow_run_id": info.workflow_run_id, - }, - ) - span.set_current() - - try: - result = await super().execute_activity(input) - return result - except Exception as e: - span.log(error=str(e)) - raise - finally: - span.unset_current() - span.end() - - -class BraintrustWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor): - """Workflow interceptor that creates workflow spans and propagates context to activities. - - This interceptor creates a span for the workflow execution using sandbox_unrestricted - to bypass Temporal's sandbox restrictions. The workflow span is the parent for all - activities and child workflows executed within it. - """ - - def __init__(self, next: temporalio.worker.WorkflowInboundInterceptor) -> None: - super().__init__(next) - self.payload_converter = temporalio.converter.PayloadConverter.default - self._parent_span_context: dict[str, Any] | None = None - - def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: - super().init(_BraintrustWorkflowOutboundInterceptor(outbound, self)) - - async def execute_workflow(self, input: temporalio.worker.ExecuteWorkflowInput) -> Any: - # Extract parent span context from workflow headers (set by client) - parent_span_context = None - if _HEADER_KEY in input.headers: - header_payload = input.headers.get(_HEADER_KEY) - if header_payload: - payloads = self.payload_converter.from_payloads([header_payload]) - if payloads: - parent_span_context = payloads[0] - - # Store parent span context for activities (will be overwritten if we create a workflow span) - self._parent_span_context = parent_span_context - - # Create a span for the workflow execution using sandbox_unrestricted - # to bypass the sandbox restrictions on logger state access - span = None - if not temporalio.workflow.unsafe.is_replaying(): - with temporalio.workflow.unsafe.sandbox_unrestricted(): - # Get logger via extern function (supports test logger parameter) - get_logger = temporalio.workflow.extern_functions()["__braintrust_get_logger"] - logger = get_logger() - - if logger: - info = temporalio.workflow.info() - span = logger.start_span( - name=f"temporal.workflow.{info.workflow_type}", - type="task", - parent=parent_span_context or None, - metadata={ - "temporal.workflow_type": info.workflow_type, - "temporal.workflow_id": info.workflow_id, - "temporal.run_id": info.run_id, - }, - ) - span.set_current() - - # Update parent span context for activities - self._parent_span_context = span.export() - - try: - result = await super().execute_workflow(input) - return result - except Exception as e: - if span: - with temporalio.workflow.unsafe.sandbox_unrestricted(): - span.log(error=str(e)) - raise - finally: - if span: - with temporalio.workflow.unsafe.sandbox_unrestricted(): - span.unset_current() - span.end() - - -class _BraintrustWorkflowOutboundInterceptor(temporalio.worker.WorkflowOutboundInterceptor): - """Outbound workflow interceptor that propagates span context to activities.""" - - def __init__( - self, - next: temporalio.worker.WorkflowOutboundInterceptor, - root: BraintrustWorkflowInboundInterceptor, - ) -> None: - super().__init__(next) - self.root = root - - def _add_span_context_to_headers( - self, headers: Mapping[str, temporalio.api.common.v1.Payload] - ) -> Mapping[str, temporalio.api.common.v1.Payload]: - """Add parent span context to headers if available. - - Note: We always pass span context through headers, even during replay, - so activities can maintain proper parent-child relationships. The replay - safety is handled in the activity interceptor, which only creates spans - when the activity actually executes (not during replay). - """ - if self.root._parent_span_context: - payloads = self.root.payload_converter.to_payloads([self.root._parent_span_context]) - if payloads: - return {**headers, _HEADER_KEY: payloads[0]} - return headers - - def start_activity(self, input: temporalio.worker.StartActivityInput) -> temporalio.workflow.ActivityHandle: - input.headers = self._add_span_context_to_headers(input.headers) - return super().start_activity(input) - - def start_local_activity( - self, input: temporalio.worker.StartLocalActivityInput - ) -> temporalio.workflow.ActivityHandle: - input.headers = self._add_span_context_to_headers(input.headers) - return super().start_local_activity(input) - - def start_child_workflow( - self, input: temporalio.worker.StartChildWorkflowInput - ) -> temporalio.workflow.ChildWorkflowHandle: - input.headers = self._add_span_context_to_headers(input.headers) - return super().start_child_workflow(input) - - -def _modify_workflow_runner(existing: WorkflowRunner | None) -> WorkflowRunner | None: - """Add braintrust to sandbox passthrough modules.""" - if isinstance(existing, SandboxedWorkflowRunner): - new_restrictions = existing.restrictions.with_passthrough_modules("braintrust") - return dataclasses.replace(existing, restrictions=new_restrictions) - return existing - - -class BraintrustPlugin(SimplePlugin): - """Braintrust plugin for Temporal that automatically configures tracing. - - This plugin simplifies Braintrust integration with Temporal by: - - Automatically adding BraintrustInterceptor to the worker - - Configuring the sandbox to allow braintrust imports without unsafe.imports_passed_through() - - Example usage: - from braintrust.contrib.temporal import BraintrustPlugin - from temporalio.worker import Worker - - worker = Worker( - client, - task_queue="my-queue", - workflows=[MyWorkflow], - activities=[my_activity], - plugins=[BraintrustPlugin()], - ) - - Requires temporalio >= 1.19.0. - """ - - def __init__(self, logger: Any | None = None) -> None: - """Initialize the Braintrust plugin. - - Args: - logger: Optional background logger for testing. - """ - interceptor = BraintrustInterceptor(logger=logger) - import inspect +import warnings - params = inspect.signature(SimplePlugin.__init__).parameters - # temporalio plugin APIs differ by version. Newer releases may expose - # a merged `interceptors` kwarg, while older releases keep separate - # client/worker interceptor kwargs. Build kwargs dynamically so pylint - # does not validate unsupported keywords against the installed version. - kwargs: dict[str, Any] = { - "name": "braintrust", - "workflow_runner": _modify_workflow_runner, - } - if "interceptors" in params: - kwargs["interceptors"] = [interceptor] - else: - kwargs["client_interceptors"] = [interceptor] - kwargs["worker_interceptors"] = [interceptor] +warnings.warn( + "braintrust.contrib.temporal is deprecated and will be removed in a future release. " + "Use braintrust.auto_instrument() or braintrust.integrations.temporal instead.", + DeprecationWarning, + stacklevel=2, +) - super().__init__(**kwargs) +from braintrust.integrations.temporal import BraintrustInterceptor, BraintrustPlugin # noqa: E402 __all__ = ["BraintrustInterceptor", "BraintrustPlugin"] diff --git a/py/src/braintrust/integrations/__init__.py b/py/src/braintrust/integrations/__init__.py index 417dd09d..425f7703 100644 --- a/py/src/braintrust/integrations/__init__.py +++ b/py/src/braintrust/integrations/__init__.py @@ -17,6 +17,7 @@ from .openrouter import OpenRouterIntegration from .pydantic_ai import PydanticAIIntegration from .strands import StrandsIntegration +from .temporal import TemporalIntegration __all__ = [ @@ -39,4 +40,5 @@ "OpenRouterIntegration", "PydanticAIIntegration", "StrandsIntegration", + "TemporalIntegration", ] diff --git a/py/src/braintrust/integrations/auto_test_scripts/test_auto_temporal.py b/py/src/braintrust/integrations/auto_test_scripts/test_auto_temporal.py new file mode 100644 index 00000000..091ad7e7 --- /dev/null +++ b/py/src/braintrust/integrations/auto_test_scripts/test_auto_temporal.py @@ -0,0 +1,16 @@ +"""Test auto_instrument for Temporal.""" + +from braintrust.auto import auto_instrument +from braintrust.integrations.temporal import BraintrustPlugin, setup_temporal + + +results = auto_instrument() +assert results.get("temporal") == True + +results2 = auto_instrument() +assert results2.get("temporal") == True + +assert setup_temporal() == True +assert BraintrustPlugin is not None + +print("SUCCESS") diff --git a/py/src/braintrust/integrations/temporal/__init__.py b/py/src/braintrust/integrations/temporal/__init__.py new file mode 100644 index 00000000..9cfe364f --- /dev/null +++ b/py/src/braintrust/integrations/temporal/__init__.py @@ -0,0 +1,43 @@ +"""Braintrust integration for Temporal workflows and activities.""" + +import logging +from typing import TYPE_CHECKING, Any + +from braintrust.logger import NOOP_SPAN, current_span, init_logger + +from .integration import TemporalIntegration + + +if TYPE_CHECKING: + from .plugin import BraintrustInterceptor, BraintrustPlugin + + +logger = logging.getLogger(__name__) + +__all__ = [ + "BraintrustInterceptor", + "BraintrustPlugin", + "TemporalIntegration", + "setup_temporal", +] + + +def setup_temporal( + api_key: str | None = None, + project_id: str | None = None, + project_name: str | None = None, +) -> bool: + """Set up Braintrust auto-instrumentation for Temporal.""" + span = current_span() + if span == NOOP_SPAN: + init_logger(project=project_name, api_key=api_key, project_id=project_id) + + return TemporalIntegration.setup() + + +def __getattr__(name: str) -> Any: + if name in {"BraintrustInterceptor", "BraintrustPlugin"}: + from .plugin import BraintrustInterceptor, BraintrustPlugin + + return {"BraintrustInterceptor": BraintrustInterceptor, "BraintrustPlugin": BraintrustPlugin}[name] + raise AttributeError(name) diff --git a/py/src/braintrust/integrations/temporal/integration.py b/py/src/braintrust/integrations/temporal/integration.py new file mode 100644 index 00000000..6ac41c1d --- /dev/null +++ b/py/src/braintrust/integrations/temporal/integration.py @@ -0,0 +1,14 @@ +"""Temporal integration orchestration.""" + +from braintrust.integrations.base import BaseIntegration + +from .patchers import ClientConnectPatcher, WorkerInitPatcher + + +class TemporalIntegration(BaseIntegration): + """Braintrust instrumentation for Temporal workflows and activities.""" + + name = "temporal" + import_names = ("temporalio",) + min_version = "1.19.0" + patchers = (ClientConnectPatcher, WorkerInitPatcher) diff --git a/py/src/braintrust/integrations/temporal/patchers.py b/py/src/braintrust/integrations/temporal/patchers.py new file mode 100644 index 00000000..ee478a20 --- /dev/null +++ b/py/src/braintrust/integrations/temporal/patchers.py @@ -0,0 +1,58 @@ +"""Temporal patchers and public helpers.""" + +from typing import Any + +from braintrust.integrations.base import FunctionWrapperPatcher + + +def _plugin_classes() -> tuple[type[Any], type[Any]]: + from .plugin import BraintrustInterceptor, BraintrustPlugin + + return BraintrustInterceptor, BraintrustPlugin + + +def _has_braintrust_plugin(plugin: Any) -> bool: + BraintrustInterceptor, BraintrustPlugin = _plugin_classes() + if isinstance(plugin, BraintrustPlugin): + return True + interceptors = [] + for attr in ("interceptors", "client_interceptors", "worker_interceptors"): + interceptors.extend(getattr(plugin, attr, None) or []) + return any(isinstance(interceptor, BraintrustInterceptor) for interceptor in interceptors) + + +def _with_braintrust_plugin(plugins: Any | None) -> list[Any]: + """Return plugins with a Braintrust plugin appended unless already present.""" + plugin_list = list(plugins or []) + if any(_has_braintrust_plugin(plugin) for plugin in plugin_list): + return plugin_list + _, BraintrustPlugin = _plugin_classes() + return [*plugin_list, BraintrustPlugin()] + + +def _client_connect_wrapper(wrapped: Any, instance: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + kwargs["plugins"] = _with_braintrust_plugin(kwargs.get("plugins")) + return wrapped(*args, **kwargs) + + +def _worker_init_wrapper(wrapped: Any, instance: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + kwargs["plugins"] = _with_braintrust_plugin(kwargs.get("plugins")) + return wrapped(*args, **kwargs) + + +class ClientConnectPatcher(FunctionWrapperPatcher): + """Patch Temporal client connections to install BraintrustPlugin.""" + + name = "temporal.client.connect" + target_module = "temporalio.client" + target_path = "Client.connect" + wrapper = _client_connect_wrapper + + +class WorkerInitPatcher(FunctionWrapperPatcher): + """Patch Temporal workers to install BraintrustPlugin.""" + + name = "temporal.worker.init" + target_module = "temporalio.worker" + target_path = "Worker.__init__" + wrapper = _worker_init_wrapper diff --git a/py/src/braintrust/integrations/temporal/plugin.py b/py/src/braintrust/integrations/temporal/plugin.py new file mode 100644 index 00000000..b25a3946 --- /dev/null +++ b/py/src/braintrust/integrations/temporal/plugin.py @@ -0,0 +1,441 @@ +"""Braintrust integration for Temporal workflows and activities. + +This module provides Temporal integration that automatically traces workflow executions +and activities in Braintrust. To use this integration, install braintrust with the +temporal extra: + + pip install braintrust[temporal] + +Components +---------- + +There are two main components: + +- **BraintrustPlugin**: Use this for both Temporal clients and workers. It's a convenience + wrapper that automatically configures the interceptor and sandbox settings. + +- **BraintrustInterceptor**: The underlying interceptor. You can use this directly if you + need more control, but ``BraintrustPlugin`` is recommended for most use cases. + +Worker Setup +------------ + +Use ``BraintrustPlugin`` when creating a worker:: + + import braintrust + from braintrust.contrib.temporal import BraintrustPlugin + from temporalio.client import Client + from temporalio.worker import Worker + + braintrust.init_logger(project="my-project") + + client = await Client.connect("localhost:7233") + + worker = Worker( + client, + task_queue="my-queue", + workflows=[MyWorkflow], + activities=[my_activity], + plugins=[BraintrustPlugin()], + ) + + await worker.run() + +Client Setup +------------ + +Use ``BraintrustPlugin`` when creating a client to propagate span context to workflows:: + + import braintrust + from braintrust.contrib.temporal import BraintrustPlugin + from temporalio.client import Client + + braintrust.init_logger(project="my-project") + + client = await Client.connect( + "localhost:7233", + plugins=[BraintrustPlugin()], + ) + + # Spans created around workflow calls will be linked as parents + with braintrust.start_span(name="my-operation") as span: + result = await client.execute_workflow( + MyWorkflow.run, + args, + id="workflow-id", + task_queue="my-queue", + ) + +What Gets Traced +---------------- + +The integration will automatically: + +- Trace workflow executions +- Trace all activity executions +- Trace local activities +- Maintain parent-child relationships between client calls, workflows, and activities +- Handle child workflows +- Respect Temporal replay safety (no duplicate spans during replay) +""" + +import dataclasses +from collections.abc import Mapping +from typing import Any + +import braintrust +import temporalio.activity +import temporalio.api.common.v1 +import temporalio.client +import temporalio.converter +import temporalio.worker +import temporalio.workflow +from temporalio.plugin import SimplePlugin +from temporalio.worker import WorkflowRunner +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner + + +# Braintrust dynamically chooses its context implementation at runtime based on +# BRAINTRUST_OTEL_COMPAT environment variable. When first accessed, it reads +# os.environ which is restricted in the sandbox. Therefore if the first use +# is inside the sandbox, it will fail. So we eagerly reference it here to +# force initialization at import time (before sandbox evaluation). +try: + braintrust.current_span() +except Exception: + # It's okay if this fails (e.g., no logger initialized yet) + pass + +# Store module-level reference to braintrust.current_span to avoid re-importing +# inside extern functions (which can trigger sandbox restrictions) +_current_span = braintrust.current_span + +# Header key for passing span context between client, workflows, and activities +_HEADER_KEY = "_braintrust-span" + + +class BraintrustInterceptor(temporalio.client.Interceptor, temporalio.worker.Interceptor): + """Braintrust interceptor for tracing Temporal workflows and activities. + + This interceptor can be used with both Temporal clients and workers to automatically + trace workflow executions and activity runs. It maintains proper parent-child + relationships in the trace hierarchy and respects Temporal's replay safety requirements. + + The interceptor: + - Creates spans for workflow executions (using sandbox_unrestricted) + - Captures activity execution as spans with metadata + - Propagates span context from client → workflow → activities + - Handles both regular activities and local activities + - Supports child workflows + - Logs errors from failed activities and workflows + - Ensures replay safety (no duplicate spans during workflow replay) + """ + + def __init__(self, logger: Any | None = None) -> None: + """Initialize interceptor. + + Args: + logger: Optional background logger for testing. + """ + self.payload_converter = temporalio.converter.PayloadConverter.default + self._bg_logger = logger + # Capture logger instance at init time for cross-thread use + if logger: + braintrust.logger._state._override_bg_logger.logger = logger + self._logger = braintrust.current_logger() + + def _get_logger(self) -> Any | None: + """Get logger for creating spans. + + Sets thread-local override if background logger provided (for testing), + then returns captured logger instance. + """ + if self._bg_logger: + braintrust.logger._state._override_bg_logger.logger = self._bg_logger + return self._logger + + def intercept_client(self, next: temporalio.client.OutboundInterceptor) -> temporalio.client.OutboundInterceptor: + """Intercept client calls to propagate span context to workflows.""" + return _BraintrustClientOutboundInterceptor(next, self) + + def intercept_activity( + self, next: temporalio.worker.ActivityInboundInterceptor + ) -> temporalio.worker.ActivityInboundInterceptor: + """Intercept activity executions to create activity spans.""" + return _BraintrustActivityInboundInterceptor(next, self) + + def workflow_interceptor_class( + self, input: temporalio.worker.WorkflowInterceptorClassInput + ) -> type["BraintrustWorkflowInboundInterceptor"] | None: + """Return workflow interceptor class to propagate context to activities.""" + input.unsafe_extern_functions["__braintrust_get_logger"] = self._get_logger + return BraintrustWorkflowInboundInterceptor + + def _span_context_to_headers( + self, + span_context: dict[str, Any], + headers: Mapping[str, temporalio.api.common.v1.Payload], + ) -> Mapping[str, temporalio.api.common.v1.Payload]: + """Add span context to headers.""" + if span_context: + payloads = self.payload_converter.to_payloads([span_context]) + if payloads: + headers = { + **headers, + _HEADER_KEY: payloads[0], + } + return headers + + def _span_context_from_headers( + self, headers: Mapping[str, temporalio.api.common.v1.Payload] + ) -> dict[str, Any] | None: + """Extract span context from headers.""" + if _HEADER_KEY not in headers: + return None + header_payload = headers.get(_HEADER_KEY) + if not header_payload: + return None + payloads = self.payload_converter.from_payloads([header_payload]) + if not payloads: + return None + return payloads[0] if payloads[0] else None + + +class _BraintrustClientOutboundInterceptor(temporalio.client.OutboundInterceptor): + """Client interceptor that propagates span context to workflows.""" + + def __init__(self, next: temporalio.client.OutboundInterceptor, root: BraintrustInterceptor) -> None: + super().__init__(next) + self.root = root + + async def start_workflow( + self, input: temporalio.client.StartWorkflowInput + ) -> temporalio.client.WorkflowHandle[Any, Any]: + # Get current span context and add it to workflow headers + current_span = _current_span() + if current_span: + span_context = current_span.export() + input.headers = self.root._span_context_to_headers(span_context, input.headers) + + return await super().start_workflow(input) + + +class _BraintrustActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): + """Activity interceptor that creates spans for activity executions.""" + + def __init__( + self, + next: temporalio.worker.ActivityInboundInterceptor, + root: BraintrustInterceptor, + ) -> None: + super().__init__(next) + self.root = root + + async def execute_activity(self, input: temporalio.worker.ExecuteActivityInput) -> Any: + info = temporalio.activity.info() + + # Extract parent span context from headers + parent_span_context = self.root._span_context_from_headers(input.headers) + + logger = self.root._get_logger() + if not logger: + return await super().execute_activity(input) + + # Create Braintrust span for activity execution, linked to workflow span + span = logger.start_span( + name=f"temporal.activity.{info.activity_type}", + type="task", + parent=parent_span_context or None, + metadata={ + "temporal.activity_type": info.activity_type, + "temporal.activity_id": info.activity_id, + "temporal.workflow_id": info.workflow_id, + "temporal.workflow_run_id": info.workflow_run_id, + }, + ) + span.set_current() + + try: + result = await super().execute_activity(input) + return result + except Exception as e: + span.log(error=str(e)) + raise + finally: + span.unset_current() + span.end() + + +class BraintrustWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor): + """Workflow interceptor that creates workflow spans and propagates context to activities. + + This interceptor creates a span for the workflow execution using sandbox_unrestricted + to bypass Temporal's sandbox restrictions. The workflow span is the parent for all + activities and child workflows executed within it. + """ + + def __init__(self, next: temporalio.worker.WorkflowInboundInterceptor) -> None: + super().__init__(next) + self.payload_converter = temporalio.converter.PayloadConverter.default + self._parent_span_context: dict[str, Any] | None = None + + def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: + super().init(_BraintrustWorkflowOutboundInterceptor(outbound, self)) + + async def execute_workflow(self, input: temporalio.worker.ExecuteWorkflowInput) -> Any: + # Extract parent span context from workflow headers (set by client) + parent_span_context = None + if _HEADER_KEY in input.headers: + header_payload = input.headers.get(_HEADER_KEY) + if header_payload: + payloads = self.payload_converter.from_payloads([header_payload]) + if payloads: + parent_span_context = payloads[0] + + # Store parent span context for activities (will be overwritten if we create a workflow span) + self._parent_span_context = parent_span_context + + # Create a span for the workflow execution using sandbox_unrestricted + # to bypass the sandbox restrictions on logger state access + span = None + if not temporalio.workflow.unsafe.is_replaying(): + with temporalio.workflow.unsafe.sandbox_unrestricted(): + # Get logger via extern function (supports test logger parameter) + get_logger = temporalio.workflow.extern_functions()["__braintrust_get_logger"] + logger = get_logger() + + if logger: + info = temporalio.workflow.info() + span = logger.start_span( + name=f"temporal.workflow.{info.workflow_type}", + type="task", + parent=parent_span_context or None, + metadata={ + "temporal.workflow_type": info.workflow_type, + "temporal.workflow_id": info.workflow_id, + "temporal.run_id": info.run_id, + }, + ) + span.set_current() + + # Update parent span context for activities + self._parent_span_context = span.export() + + try: + result = await super().execute_workflow(input) + return result + except Exception as e: + if span: + with temporalio.workflow.unsafe.sandbox_unrestricted(): + span.log(error=str(e)) + raise + finally: + if span: + with temporalio.workflow.unsafe.sandbox_unrestricted(): + span.unset_current() + span.end() + + +class _BraintrustWorkflowOutboundInterceptor(temporalio.worker.WorkflowOutboundInterceptor): + """Outbound workflow interceptor that propagates span context to activities.""" + + def __init__( + self, + next: temporalio.worker.WorkflowOutboundInterceptor, + root: BraintrustWorkflowInboundInterceptor, + ) -> None: + super().__init__(next) + self.root = root + + def _add_span_context_to_headers( + self, headers: Mapping[str, temporalio.api.common.v1.Payload] + ) -> Mapping[str, temporalio.api.common.v1.Payload]: + """Add parent span context to headers if available. + + Note: We always pass span context through headers, even during replay, + so activities can maintain proper parent-child relationships. The replay + safety is handled in the activity interceptor, which only creates spans + when the activity actually executes (not during replay). + """ + if self.root._parent_span_context: + payloads = self.root.payload_converter.to_payloads([self.root._parent_span_context]) + if payloads: + return {**headers, _HEADER_KEY: payloads[0]} + return headers + + def start_activity(self, input: temporalio.worker.StartActivityInput) -> temporalio.workflow.ActivityHandle: + input.headers = self._add_span_context_to_headers(input.headers) + return super().start_activity(input) + + def start_local_activity( + self, input: temporalio.worker.StartLocalActivityInput + ) -> temporalio.workflow.ActivityHandle: + input.headers = self._add_span_context_to_headers(input.headers) + return super().start_local_activity(input) + + def start_child_workflow( + self, input: temporalio.worker.StartChildWorkflowInput + ) -> temporalio.workflow.ChildWorkflowHandle: + input.headers = self._add_span_context_to_headers(input.headers) + return super().start_child_workflow(input) + + +def _modify_workflow_runner(existing: WorkflowRunner | None) -> WorkflowRunner | None: + """Add braintrust to sandbox passthrough modules.""" + if isinstance(existing, SandboxedWorkflowRunner): + new_restrictions = existing.restrictions.with_passthrough_modules("braintrust") + return dataclasses.replace(existing, restrictions=new_restrictions) + return existing + + +class BraintrustPlugin(SimplePlugin): + """Braintrust plugin for Temporal that automatically configures tracing. + + This plugin simplifies Braintrust integration with Temporal by: + - Automatically adding BraintrustInterceptor to the worker + - Configuring the sandbox to allow braintrust imports without unsafe.imports_passed_through() + + Example usage: + from braintrust.contrib.temporal import BraintrustPlugin + from temporalio.worker import Worker + + worker = Worker( + client, + task_queue="my-queue", + workflows=[MyWorkflow], + activities=[my_activity], + plugins=[BraintrustPlugin()], + ) + + Requires temporalio >= 1.19.0. + """ + + def __init__(self, logger: Any | None = None) -> None: + """Initialize the Braintrust plugin. + + Args: + logger: Optional background logger for testing. + """ + interceptor = BraintrustInterceptor(logger=logger) + import inspect + + params = inspect.signature(SimplePlugin.__init__).parameters + + # temporalio plugin APIs differ by version. Newer releases may expose + # a merged `interceptors` kwarg, while older releases keep separate + # client/worker interceptor kwargs. Build kwargs dynamically so pylint + # does not validate unsupported keywords against the installed version. + kwargs: dict[str, Any] = { + "name": "braintrust", + "workflow_runner": _modify_workflow_runner, + } + if "interceptors" in params: + kwargs["interceptors"] = [interceptor] + else: + kwargs["client_interceptors"] = [interceptor] + kwargs["worker_interceptors"] = [interceptor] + + super().__init__(**kwargs) + + +__all__ = ["BraintrustInterceptor", "BraintrustPlugin"] diff --git a/py/src/braintrust/contrib/temporal/test_temporal.py b/py/src/braintrust/integrations/temporal/test_temporal.py similarity index 95% rename from py/src/braintrust/contrib/temporal/test_temporal.py rename to py/src/braintrust/integrations/temporal/test_temporal.py index 03999310..5a1a3fbc 100644 --- a/py/src/braintrust/contrib/temporal/test_temporal.py +++ b/py/src/braintrust/integrations/temporal/test_temporal.py @@ -8,6 +8,7 @@ import pytest import pytest_asyncio +from braintrust.integrations.test_utils import verify_autoinstrument_script pytest.importorskip("temporalio") @@ -19,7 +20,7 @@ import temporalio.testing import temporalio.worker import temporalio.workflow -from braintrust.contrib.temporal import BraintrustInterceptor, BraintrustPlugin +from braintrust.integrations.temporal import BraintrustInterceptor, BraintrustPlugin from braintrust.test_helpers import init_test_logger from temporalio.common import RetryPolicy from temporalio.worker import Worker @@ -220,6 +221,23 @@ async def run(self, input: TaskInput) -> int: return child_result +class TestAutoInstrumentation: + """Tests for Temporal auto-instrumentation helpers.""" + + def test_auto_instrument_temporal_subprocess(self): + verify_autoinstrument_script("test_auto_temporal.py") + + def test_contrib_temporal_compat_import_deprecated(self): + with pytest.warns(DeprecationWarning, match="braintrust.contrib.temporal is deprecated"): + import importlib + import sys + + sys.modules.pop("braintrust.contrib.temporal", None) + compat = importlib.import_module("braintrust.contrib.temporal") + + assert compat.BraintrustPlugin is BraintrustPlugin + + # Integration Tests