Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 73c141e

Browse files
committed
Hook otel instrumentation when available
Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent 6554a4f commit 73c141e

3 files changed

Lines changed: 63 additions & 23 deletions

File tree

durabletask/aio/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@
2424
new_orchestration_state,
2525
)
2626

27+
# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor
28+
try:
29+
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
30+
GrpcInstrumentorClient().instrument()
31+
except ImportError:
32+
pass
33+
2734

2835
class AsyncTaskHubGrpcClient:
2936
def __init__(

durabletask/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121
TInput = TypeVar("TInput")
2222
TOutput = TypeVar("TOutput")
2323

24+
# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor
25+
try:
26+
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
27+
GrpcInstrumentorClient().instrument()
28+
except ImportError:
29+
pass
2430

2531
class OrchestrationStatus(Enum):
2632
"""The status of an orchestration instance."""

durabletask/worker.py

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Licensed under the MIT License.
33

44
import asyncio
5+
import contextlib
56
import inspect
67
import logging
78
import os
@@ -26,6 +27,17 @@
2627
TInput = TypeVar("TInput")
2728
TOutput = TypeVar("TOutput")
2829

30+
# If `opentelemetry-sdk` is available, enable the tracer
31+
try:
32+
from opentelemetry import trace
33+
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
34+
35+
otel_propagator = TraceContextTextMapPropagator()
36+
otel_tracer = trace.get_tracer(__name__)
37+
except ImportError:
38+
otel_tracer = None
39+
40+
2941

3042
def _log_all_threads(logger: logging.Logger, context: str = ""):
3143
"""Helper function to log all currently active threads for debugging."""
@@ -759,31 +771,46 @@ def _execute_activity(
759771
completionToken,
760772
):
761773
instance_id = req.orchestrationInstance.instanceId
762-
try:
763-
executor = _ActivityExecutor(self._registry, self._logger)
764-
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
765-
res = pb.ActivityResponse(
766-
instanceId=instance_id,
767-
taskId=req.taskId,
768-
result=ph.get_string_value(result),
769-
completionToken=completionToken,
770-
)
771-
except Exception as ex:
772-
res = pb.ActivityResponse(
773-
instanceId=instance_id,
774-
taskId=req.taskId,
775-
failureDetails=ph.new_failure_details(ex),
776-
completionToken=completionToken,
777-
)
778774

779-
try:
780-
stub.CompleteActivityTask(res)
781-
except grpc.RpcError as rpc_error: # type: ignore
782-
self._handle_grpc_execution_error(rpc_error, "activity")
783-
except Exception as ex:
784-
self._logger.exception(
785-
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"
775+
if otel_tracer is not None:
776+
span_context = otel_tracer.start_as_current_span(
777+
name=f'activity: {req.name}',
778+
context=otel_propagator.extract(carrier={"traceparent": req.parentTraceContext.traceParent}),
779+
attributes={
780+
"durabletask.task.instance_id": instance_id,
781+
"durabletask.task.id": req.taskId,
782+
"durabletask.activity.name": req.name,
783+
}
786784
)
785+
else:
786+
span_context = contextlib.nullcontext()
787+
788+
with span_context:
789+
try:
790+
executor = _ActivityExecutor(self._registry, self._logger)
791+
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
792+
res = pb.ActivityResponse(
793+
instanceId=instance_id,
794+
taskId=req.taskId,
795+
result=ph.get_string_value(result),
796+
completionToken=completionToken,
797+
)
798+
except Exception as ex:
799+
res = pb.ActivityResponse(
800+
instanceId=instance_id,
801+
taskId=req.taskId,
802+
failureDetails=ph.new_failure_details(ex),
803+
completionToken=completionToken,
804+
)
805+
806+
try:
807+
stub.CompleteActivityTask(res)
808+
except grpc.RpcError as rpc_error: # type: ignore
809+
self._handle_grpc_execution_error(rpc_error, "activity")
810+
except Exception as ex:
811+
self._logger.exception(
812+
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"
813+
)
787814

788815

789816
class _RuntimeOrchestrationContext(

0 commit comments

Comments
 (0)