Skip to content

Commit e620020

Browse files
committed
concurrency
1 parent 03d27bd commit e620020

4 files changed

Lines changed: 81 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,22 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## v0.4.0
9+
10+
### New
11+
12+
- Added `ConcurrencyOptions` class for fine-grained concurrency control equivalent to .NET DurableTask SDK
13+
- Enhanced `TaskHubGrpcWorker` with `concurrency_options` parameter supporting separate limits for activities, orchestrations, and entities (100 * processor_count default each)
14+
15+
### Breaking Changes
16+
17+
- Removed `max_workers` parameter from `TaskHubGrpcWorker` constructor in favor of the more flexible `ConcurrencyOptions` approach
18+
819
## v0.3.0
920

1021
### New
1122

12-
- Added configurable worker concurrency with `max_workers` parameter in `TaskHubGrpcWorker` constructor - allows customization of ThreadPoolExecutor size (default: 16 workers)
23+
- Added configurable worker concurrency with `max_workers` parameter in `TaskHubGrpcWorker` constructor - allows customization of ThreadPoolExecutor size (default: 16 workers) (NOTE: This parameter was later removed in v0.4.0 in favor of `ConcurrencyOptions`)
1324

1425
### Fixed
1526

durabletask/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,8 @@
33

44
"""Durable Task SDK for Python"""
55

6+
from durabletask.worker import ConcurrencyOptions
7+
8+
__all__ = ["ConcurrencyOptions"]
69

710
PACKAGE_NAME = "durabletask"

durabletask/worker.py

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import concurrent.futures
55
import logging
6+
import os
67
import random
78
from datetime import datetime, timedelta
89
from threading import Event, Thread
@@ -24,6 +25,53 @@
2425
TOutput = TypeVar('TOutput')
2526

2627

28+
class ConcurrencyOptions:
29+
"""Configuration options for controlling concurrency of different work item types.
30+
31+
This class mirrors the .NET DurableTask SDK's ConcurrencyOptions class,
32+
providing fine-grained control over concurrent processing limits for
33+
activities, orchestrations, and entities.
34+
"""
35+
36+
def __init__(self,
37+
maximum_concurrent_activity_work_items: Optional[int] = None,
38+
maximum_concurrent_orchestration_work_items: Optional[int] = None):
39+
"""Initialize concurrency options.
40+
41+
Args:
42+
maximum_concurrent_activity_work_items: Maximum number of activity work items
43+
that can be processed concurrently. Defaults to 100 * processor_count.
44+
maximum_concurrent_orchestration_work_items: Maximum number of orchestration work items
45+
that can be processed concurrently. Defaults to 100 * processor_count.
46+
"""
47+
processor_count = os.cpu_count() or 1
48+
default_concurrency = 100 * processor_count
49+
50+
self.maximum_concurrent_activity_work_items = (
51+
maximum_concurrent_activity_work_items
52+
if maximum_concurrent_activity_work_items is not None
53+
else default_concurrency
54+
)
55+
56+
self.maximum_concurrent_orchestration_work_items = (
57+
maximum_concurrent_orchestration_work_items
58+
if maximum_concurrent_orchestration_work_items is not None
59+
else default_concurrency
60+
)
61+
62+
@property
63+
def max_total_workers(self) -> int:
64+
"""Calculate the maximum total workers needed for the thread pool.
65+
66+
Since Python's ThreadPoolExecutor doesn't differentiate between work item types,
67+
we use the maximum of all concurrency limits to ensure we have enough workers.
68+
"""
69+
return max(
70+
self.maximum_concurrent_activity_work_items,
71+
self.maximum_concurrent_orchestration_work_items,
72+
)
73+
74+
2775
class _Registry:
2876

2977
orchestrators: dict[str, task.Orchestrator]
@@ -93,14 +141,16 @@ def __init__(self, *,
93141
log_formatter: Optional[logging.Formatter] = None,
94142
secure_channel: bool = False,
95143
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
96-
max_workers: Optional[int] = None):
144+
concurrency_options: Optional[ConcurrencyOptions] = None):
97145
self._registry = _Registry()
98146
self._host_address = host_address if host_address else shared.get_default_host_address()
99147
self._logger = shared.get_logger("worker", log_handler, log_formatter)
100148
self._shutdown = Event()
101149
self._is_running = False
102150
self._secure_channel = secure_channel
103-
self._max_workers = max_workers if max_workers is not None else 16
151+
152+
# Use provided concurrency options or create default ones
153+
self._concurrency_options = concurrency_options if concurrency_options is not None else ConcurrencyOptions()
104154

105155
# Determine the interceptors to use
106156
if interceptors is not None:
@@ -112,6 +162,11 @@ def __init__(self, *,
112162
else:
113163
self._interceptors = None
114164

165+
@property
166+
def concurrency_options(self) -> ConcurrencyOptions:
167+
"""Get the current concurrency options for this worker."""
168+
return self._concurrency_options
169+
115170
def __enter__(self):
116171
return self
117172

@@ -211,7 +266,7 @@ def should_invalidate_connection(rpc_error: grpc.RpcError) -> bool:
211266

212267
# TODO: Investigate whether asyncio could be used to enable greater concurrency for async activity
213268
# functions. We'd need to know ahead of time whether a function is async or not.
214-
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_workers, thread_name_prefix="DurableTask") as executor:
269+
with concurrent.futures.ThreadPoolExecutor(max_workers=self._concurrency_options.max_total_workers, thread_name_prefix="DurableTask") as executor:
215270
while not self._shutdown.is_set():
216271
# Ensure we have a valid connection before attempting work
217272
if current_stub is None:
@@ -231,7 +286,13 @@ def should_invalidate_connection(rpc_error: grpc.RpcError) -> bool:
231286
# Type assertion since we know current_stub is not None at this point
232287
assert current_stub is not None, "current_stub should not be None at this point"
233288
stub = current_stub # Local reference for type safety
234-
self._response_stream = stub.GetWorkItems(pb.GetWorkItemsRequest())
289+
290+
# Create GetWorkItemsRequest with concurrency limits
291+
get_work_items_request = pb.GetWorkItemsRequest(
292+
maxConcurrentOrchestrationWorkItems=self._concurrency_options.maximum_concurrent_orchestration_work_items,
293+
maxConcurrentActivityWorkItems=self._concurrency_options.maximum_concurrent_activity_work_items
294+
)
295+
self._response_stream = stub.GetWorkItems(get_work_items_request)
235296
self._logger.info(f'Successfully connected to {self._host_address}. Waiting for work items...')
236297

237298
# Process work items concurrently as they arrive

examples/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ In some cases, the sample may require command-line parameters or user inputs. In
2424

2525
- [Activity sequence](./activity_sequence.py): Orchestration that schedules three activity calls in a sequence.
2626
- [Fan-out/fan-in](./fanout_fanin.py): Orchestration that schedules a dynamic number of activity calls in parallel, waits for all of them to complete, and then performs an aggregation on the results.
27-
- [Human interaction](./human_interaction.py): Orchestration that waits for a human to approve an order before continuing.
27+
- [Human interaction](./human_interaction.py): Orchestration that waits for a human to approve an order before continuing.

0 commit comments

Comments
 (0)