Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit a48d2af

Browse files
committed
fix: address conflict with main
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2 parents 35e5b8d + cb97b43 commit a48d2af

13 files changed

Lines changed: 643 additions & 235 deletions

File tree

.github/workflows/pr-validation.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ jobs:
4747
go install github.com/dapr/durabletask-go@main
4848
durabletask-go --port 4001 &
4949
tox -e py${{ matrix.python-version }}-e2e
50+
- name: Run examples
51+
run: |
52+
pip install mechanical-markdown
53+
cd examples
54+
durabletask-go --port 4001 &
55+
mm.py README.md
5056
publish:
5157
needs: build
5258
if: startswith(github.ref, 'refs/tags/v')

durabletask/aio/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor
2828
try:
2929
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
30+
3031
GrpcInstrumentorClient().instrument()
3132
except ImportError:
3233
pass

durabletask/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
# If `opentelemetry-instrumentation-grpc` is available, enable the gRPC client interceptor
2525
try:
2626
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
27+
2728
GrpcInstrumentorClient().instrument()
2829
except ImportError:
2930
pass
3031

32+
3133
class OrchestrationStatus(Enum):
3234
"""The status of an orchestration instance."""
3335

durabletask/internal/helpers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,14 +204,19 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
204204

205205

206206
def new_schedule_task_action(
207-
id: int, name: str, encoded_input: Optional[str], router: Optional[pb.TaskRouter] = None
207+
id: int,
208+
name: str,
209+
encoded_input: Optional[str],
210+
router: Optional[pb.TaskRouter] = None,
211+
task_execution_id: str = "",
208212
) -> pb.OrchestratorAction:
209213
return pb.OrchestratorAction(
210214
id=id,
211215
scheduleTask=pb.ScheduleTaskAction(
212216
name=name,
213217
input=get_string_value(encoded_input),
214218
router=router,
219+
taskExecutionId=task_execution_id,
215220
),
216221
router=router,
217222
)

durabletask/internal/shared.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ def get_default_host_address() -> str:
5050
return "localhost:4001"
5151

5252

53+
DEFAULT_GRPC_KEEPALIVE_OPTIONS: tuple[tuple[str, int], ...] = (
54+
("grpc.keepalive_time_ms", 30_000),
55+
("grpc.keepalive_timeout_ms", 10_000),
56+
("grpc.http2.max_pings_without_data", 0),
57+
("grpc.keepalive_permit_without_calls", 1),
58+
)
59+
60+
5361
def get_grpc_channel(
5462
host_address: Optional[str],
5563
secure_channel: bool = False,
@@ -81,10 +89,16 @@ def get_grpc_channel(
8189
host_address = host_address[len(protocol) :]
8290
break
8391

92+
merged = dict(DEFAULT_GRPC_KEEPALIVE_OPTIONS)
93+
if options:
94+
merged.update(dict(options))
95+
merged_options = list(merged.items())
8496
if secure_channel:
85-
channel = grpc.secure_channel(host_address, grpc.ssl_channel_credentials(), options=options)
97+
channel = grpc.secure_channel(
98+
host_address, grpc.ssl_channel_credentials(), options=merged_options
99+
)
86100
else:
87-
channel = grpc.insecure_channel(host_address, options=options)
101+
channel = grpc.insecure_channel(host_address, options=merged_options)
88102

89103
# Apply interceptors ONLY if they exist
90104
if interceptors:

durabletask/task.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ def is_replaying(self) -> bool:
7070
pass
7171

7272
@abstractmethod
73-
def set_custom_status(self, custom_status: Any) -> None:
73+
def set_custom_status(self, custom_status: str) -> None:
7474
"""Set the orchestration instance's custom status.
7575
7676
Parameters
7777
----------
78-
custom_status: Any
79-
A JSON-serializable custom status value to set.
78+
custom_status: str
79+
A custom status string to set.
8080
"""
8181
pass
8282

@@ -396,16 +396,24 @@ class RetryableTask(CompletableTask[T]):
396396
def __init__(
397397
self,
398398
retry_policy: RetryPolicy,
399-
action: pb.OrchestratorAction,
400399
start_time: datetime,
401400
is_sub_orch: bool,
401+
task_name: str,
402+
encoded_input: Optional[str] = None,
403+
task_execution_id: str = "",
404+
instance_id: Optional[str] = None,
405+
app_id: Optional[str] = None,
402406
) -> None:
403407
super().__init__()
404-
self._action = action
405408
self._retry_policy = retry_policy
406409
self._attempt_count = 1
407410
self._start_time = start_time
408411
self._is_sub_orch = is_sub_orch
412+
self._task_name = task_name
413+
self._encoded_input = encoded_input
414+
self._task_execution_id = task_execution_id
415+
self._instance_id = instance_id
416+
self._app_id = app_id
409417

410418
def increment_attempt_count(self) -> None:
411419
self._attempt_count += 1
@@ -479,9 +487,10 @@ def when_any(tasks: list[Task]) -> WhenAnyTask:
479487

480488

481489
class ActivityContext:
482-
def __init__(self, orchestration_id: str, task_id: int):
490+
def __init__(self, orchestration_id: str, task_id: int, task_execution_id: str = ""):
483491
self._orchestration_id = orchestration_id
484492
self._task_id = task_id
493+
self._task_execution_id = task_execution_id
485494

486495
@property
487496
def orchestration_id(self) -> str:
@@ -510,6 +519,21 @@ def task_id(self) -> int:
510519
"""
511520
return self._task_id
512521

522+
@property
523+
def task_execution_id(self) -> str:
524+
"""Get the task execution ID associated with this activity invocation.
525+
526+
The task execution ID is a UUID that is stable across retry attempts
527+
of the same activity call. It can be used for idempotency and
528+
deduplication when an activity may be retried.
529+
530+
Returns
531+
-------
532+
str
533+
The task execution ID for this activity invocation.
534+
"""
535+
return self._task_execution_id
536+
513537

514538
# Orchestrators are generators that yield tasks and receive/return any type
515539
Orchestrator = Callable[[OrchestrationContext, TInput], Union[Generator[Task, Any, Any], TOutput]]

0 commit comments

Comments
 (0)