Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit cdffd0a

Browse files
committed
chore: Use exposed class for schedule_new_orchestration instead of using protos
Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent 4242d46 commit cdffd0a

2 files changed

Lines changed: 27 additions & 4 deletions

File tree

durabletask/aio/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from durabletask.aio.internal.grpc_interceptor import DefaultClientInterceptorImpl
1818
from durabletask.aio.internal.shared import ClientInterceptor, get_grpc_aio_channel
1919
from durabletask.client import (
20+
OrchestrationIdReusePolicy,
2021
OrchestrationState,
2122
OrchestrationStatus,
2223
TInput,
@@ -81,7 +82,7 @@ async def schedule_new_orchestration(
8182
input: Optional[TInput] = None,
8283
instance_id: Optional[str] = None,
8384
start_at: Optional[datetime] = None,
84-
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
85+
reuse_id_policy: Optional[OrchestrationIdReusePolicy] = None,
8586
) -> str:
8687
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
8788

@@ -93,7 +94,7 @@ async def schedule_new_orchestration(
9394
else None,
9495
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
9596
version=helpers.get_string_value(None),
96-
orchestrationIdReusePolicy=reuse_id_policy,
97+
orchestrationIdReusePolicy=reuse_id_policy._to_pb() if reuse_id_policy else None,
9798
)
9899

99100
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")

durabletask/client.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,28 @@ def __str__(self):
4747
return helpers.get_orchestration_status_str(self.value)
4848

4949

50+
class OrchestrationIdReuseAction(Enum):
51+
"""Action to take when scheduling an orchestration whose ID already exists."""
52+
53+
ERROR = pb.ERROR
54+
IGNORE = pb.IGNORE
55+
TERMINATE = pb.TERMINATE
56+
57+
58+
@dataclass
59+
class OrchestrationIdReusePolicy:
60+
"""Policy controlling what happens when a new orchestration is scheduled with an ID that already exists."""
61+
62+
action: OrchestrationIdReuseAction
63+
operation_status: list[OrchestrationStatus]
64+
65+
def _to_pb(self) -> pb.OrchestrationIdReusePolicy:
66+
return pb.OrchestrationIdReusePolicy(
67+
operationStatus=[s.value for s in self.operation_status],
68+
action=self.action.value,
69+
)
70+
71+
5072
@dataclass
5173
class OrchestrationState:
5274
instance_id: str
@@ -166,7 +188,7 @@ def schedule_new_orchestration(
166188
input: Optional[TInput] = None,
167189
instance_id: Optional[str] = None,
168190
start_at: Optional[datetime] = None,
169-
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
191+
reuse_id_policy: Optional[OrchestrationIdReusePolicy] = None,
170192
) -> str:
171193
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
172194

@@ -180,7 +202,7 @@ def schedule_new_orchestration(
180202
input=input_pb,
181203
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
182204
version=wrappers_pb2.StringValue(value=""),
183-
orchestrationIdReusePolicy=reuse_id_policy,
205+
orchestrationIdReusePolicy=reuse_id_policy._to_pb() if reuse_id_policy else None,
184206
)
185207

186208
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")

0 commit comments

Comments
 (0)