Skip to content

Commit 5758091

Browse files
berndverstBernd VerstCopilotandystaples
authored
Add gRPC client and worker connection resiliency (#135)
* Add gRPC resiliency design spec Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Ignore local worktrees Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add gRPC resiliency implementation plan Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add gRPC resiliency option types Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add grpc resiliency validation tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Thread gRPC resiliency options through constructors Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Strengthen retained client state tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add shared gRPC resiliency helpers Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add completion long-poll resiliency test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add grpc resiliency edge-case tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Harden worker gRPC stream reconnect behavior Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix worker channel cleanup on teardown Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add worker silent disconnect tests Extend worker resiliency coverage with an end-to-end silent-disconnect recovery test and an explicit reconnect backoff assertion. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add sync client gRPC channel recreation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Reset sync client long-poll failure tracking Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add sync client recreation input test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add sync client recreation test coverage Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add async client gRPC channel recreation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add async channel recreation transport test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add gRPC connection resiliency Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Remove repo-wide pytest importlib addopts Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Update gRPC resiliency plan tracking Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix worker channel retirement for in-flight completions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix worker shutdown channel draining Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Rename Azure Managed gRPC resiliency test module Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix sync client channel cleanup Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address automated review feedback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Remove superpowers docs from PR Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address andystaples PR review feedback - Make FailureTracker thread-safe with an internal lock so multi-threaded sync clients can't race the consecutive-failure counter (review [3/10]). - Track _AsyncWorkerManager pool shutdown via an explicit _pool_is_shutdown flag instead of reading ThreadPoolExecutor._shutdown (CPython private API, review [4/10]). - Collapse identical wrap_execution/wrap_cancellation closures in the worker stream loop into a single wrap_with_release helper (review [5/10]). - Promote the retired-channel close delay and jitter exponent cap to named module-level constants (review [7/10]). - Key _InFlightChannelTracker on the channel object instead of id(channel) so the lifetime invariant is local to the tracker (review [9/10]). - Rename TaskHubGrpcWorker._can_recreate_channel() to the existing _owns_channel attribute used by the clients, so both files use the same name for the same concept (review [2/10]). - Add regression tests for FailureTracker concurrency and for thread-pool recreation after manager shutdown. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Refactor client resiliency to use UnaryUnaryClientInterceptor Centralize client failure tracking and channel-recreate triggering in a `ClientResiliencyInterceptor` (sync) and `AsyncClientResiliencyInterceptor` (async) instead of the per-call `_invoke_unary` indirection. This addresses feedback [1/10] on PR #135: resiliency wiring now lives in one place and the call sites read as normal stub calls. - Add `ClientResiliencyInterceptor` and `AsyncClientResiliencyInterceptor` in `durabletask/internal/grpc_resiliency.py`. - Switch `LONG_POLL_METHODS` and `is_client_transport_failure` to use full gRPC method paths (`/TaskHubSidecarService/...`) so the interceptor can match the `method` field on `ClientCallDetails` directly. - Wire the resiliency interceptor into `TaskHubGrpcClient` and `AsyncTaskHubGrpcClient`: it is always prepended (defensive copy of any user interceptors) and re-applied on every channel recreate so all unary calls flow through it. - Remove both `_invoke_unary` methods and revert all 34 call sites to ordinary `self._stub.MethodName(req)` (or `await ...` for async). - Caller-owned channels (sync and async) deliberately bypass the resiliency interceptor since they are never recreated; this preserves the caller's exact channel reference and avoids `grpc.aio`'s lack of a public `intercept_channel` equivalent. - Add test shims (`_ResilientSyncTestStub`/`_ResilientAsyncTestStub` plus `install_resilient_test_stubs`) so tests that patch `stubs.TaskHubSidecarServiceStub` with `MagicMock` still observe the failure-tracking pipeline. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Use inspect.isawaitable in AsyncClientResiliencyInterceptor Replace the ad-hoc `hasattr(result, '__await__')` check in `AsyncClientResiliencyInterceptor._record_outcome` with the canonical `inspect.isawaitable` predicate, and tighten the `on_recreate` callback annotation to `Callable[[], Union[None, Awaitable[object]]]` so it reflects the actual contract (sync callbacks return None, async callbacks return an Awaitable that we await). Addresses the github-code-quality 'Statement has no effect' warning surfaced on PR #135 by making the awaitable check explicit and type-driven. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Discard awaitable result with explicit underscore in resiliency interceptor CodeQL's `py/ineffectual-statement` heuristic re-flagged `await result` in `AsyncClientResiliencyInterceptor._record_outcome` after the previous fix: the rule treats expression statements whose value is discarded as unused, and does not recognise that `await` is always a side-effecting suspension point (the whole purpose of the call is to run the async recreate callback to completion). Rewriting the line as `_ = await result` keeps the exact same runtime behaviour but documents the intent (return value intentionally discarded) and satisfies the linter. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fire-and-forget channel recreate; hoist client state; tighten async interceptor exception handling Addresses three follow-up review comments on the resiliency interceptor refactor: [1/3] Channel recreate now runs fire-and-forget (daemon thread for sync, asyncio.create_task for async). The original RPC error propagates to the caller without being delayed by DNS, TLS handshake, or contention on _recreate_lock. A client-side single-flight guard avoids spawning duplicate work when many failures land in a burst; the existing cooldown still prevents thrash. close() waits for any in-flight recreate to finish so the teardown path stays deterministic. A _recreate_done_event (test seam) lets tests synchronise on completion without polling. [2/3] Hoisted _closing, _recreate_lock, _last_recreate_time, _retired_channels / _retired_channel_close_tasks above ClientResiliencyInterceptor construction in both __init__ methods so the bound recreate callback is safe to invoke at any time during construction. [3/3] AsyncClientResiliencyInterceptor now uses 'except Exception' (so asyncio.CancelledError, KeyboardInterrupt and SystemExit propagate unchanged) and mirrors the sync interceptor's policy by resetting the failure counter on non-AioRpcError exceptions. _record_outcome is now synchronous on both interceptors because the on_recreate callback no longer awaits the recreate. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Modernize PR-new code for Python 3.10+ baseline Apply three Python 3.10+ idioms to code introduced by this PR: - PEP 604 union syntax: replace Optional[X] with X | None in grpc_options.py and grpc_resiliency.py. - Dataclass tightening: add slots=True, kw_only=True to FailureTracker, GrpcRetryPolicyOptions, GrpcChannelOptions, GrpcWorkerResiliencyOptions, and GrpcClientResiliencyOptions. Update the two positional FailureTracker(...) call sites in client.py to use threshold=... kwargs. - PEP 617 parenthesized context managers: rewrite the 10 chained 'with patch(...), patch(...):' blocks added by this PR in test_client.py. Pre-existing chained sites are left untouched to keep the diff surgical. Internal-only change (no public API or behavior impact). 85/85 resiliency-focused tests pass; 232/8 passed/skipped in the broader non-e2e suite. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Bernd Verst <beverst@microsoft.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: andystaples <77818326+andystaples@users.noreply.github.com>
1 parent 75e916b commit 5758091

15 files changed

Lines changed: 2844 additions & 99 deletions

File tree

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,7 @@ dmypy.json
131131

132132
# IDEs
133133
.idea
134+
.worktrees/
135+
docs/superpowers/
134136

135-
coverage.lcov
137+
coverage.lcov

CHANGELOG.md

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,32 @@ ADDED
2020
`TaskHubGrpcClient`, `AsyncTaskHubGrpcClient`, and `TaskHubGrpcWorker` to
2121
support pre-configured channel passthrough and low-level gRPC channel
2222
customization.
23-
- Added `get_orchestration_history()` and `list_instance_ids()` to the sync and async gRPC clients.
24-
- Added in-memory backend support for `StreamInstanceHistory` and `ListInstanceIds` so local orchestration tests can retrieve history and page terminal instance IDs by completion window.
23+
- Added `GrpcWorkerResiliencyOptions` and `GrpcClientResiliencyOptions`, plus
24+
`resiliency_options` constructor parameters on `TaskHubGrpcClient`,
25+
`AsyncTaskHubGrpcClient`, and `TaskHubGrpcWorker`, to configure hello
26+
deadlines, silent-disconnect detection, reconnect backoff, and channel
27+
recreation thresholds for SDK-managed gRPC connections.
28+
- Added `get_orchestration_history()` and `list_instance_ids()` to the sync
29+
and async gRPC clients.
30+
- Added in-memory backend support for `StreamInstanceHistory` and
31+
`ListInstanceIds` so local orchestration tests can retrieve history and page
32+
terminal instance IDs by completion window.
33+
34+
FIXED
35+
36+
- Improved `TaskHubGrpcWorker` recovery from stale or disconnected gRPC streams
37+
so configured hello timeouts apply on fresh connections, received work resets
38+
failure tracking, SDK-owned channels are refreshed and cleaned up safely, and
39+
caller-owned channels are never recreated or closed during reconnects.
40+
- Fixed `TaskHubGrpcWorker` so in-flight and queued work item completions keep
41+
draining across graceful gRPC stream resets and worker shutdown before the
42+
worker retires an SDK-owned channel.
43+
- Improved sync and async gRPC clients so repeated transport failures recreate
44+
SDK-owned channels, while long-poll deadlines, successful replies, and
45+
application-level RPC errors do not trigger unnecessary channel replacement.
46+
- Fixed `TaskHubGrpcClient.close()` so explicit sync client shutdown now closes
47+
any previously retired SDK-owned gRPC channels immediately instead of waiting
48+
for the delayed cleanup timer.
2549

2650
## v1.4.0
2751

durabletask-azuremanaged/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
1414
`DurableTaskSchedulerClient`, `AsyncDurableTaskSchedulerClient`, and
1515
`DurableTaskSchedulerWorker` to allow combining custom gRPC interceptors with
1616
DTS defaults and to support pre-configured/customized gRPC channels.
17+
- Added pass-through `resiliency_options` support on
18+
`DurableTaskSchedulerClient`, `AsyncDurableTaskSchedulerClient`, and
19+
`DurableTaskSchedulerWorker` so Azure Managed applications can use the core
20+
SDK's gRPC resiliency option types through their constructors.
1721
- Added `workerid` gRPC metadata on Durable Task Scheduler worker calls for
1822
improved worker identity and observability.
1923
- Improved sync access token refresh concurrency handling to avoid duplicate

durabletask-azuremanaged/durabletask/azuremanaged/client.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
DTSDefaultClientInterceptorImpl,
1616
)
1717
from durabletask.client import AsyncTaskHubGrpcClient, TaskHubGrpcClient
18-
from durabletask.grpc_options import GrpcChannelOptions
18+
from durabletask.grpc_options import (
19+
GrpcChannelOptions,
20+
GrpcClientResiliencyOptions,
21+
)
1922
import durabletask.internal.shared as shared
2023
from durabletask.payload.store import PayloadStore
2124

@@ -30,6 +33,7 @@ def __init__(self, *,
3033
secure_channel: bool = True,
3134
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
3235
channel_options: Optional[GrpcChannelOptions] = None,
36+
resiliency_options: Optional[GrpcClientResiliencyOptions] = None,
3337
default_version: Optional[str] = None,
3438
payload_store: Optional[PayloadStore] = None,
3539
log_handler: Optional[logging.Handler] = None,
@@ -54,6 +58,7 @@ def __init__(self, *,
5458
log_formatter=log_formatter,
5559
interceptors=resolved_interceptors,
5660
channel_options=channel_options,
61+
resiliency_options=resiliency_options,
5762
default_version=default_version,
5863
payload_store=payload_store)
5964

@@ -74,6 +79,8 @@ class AsyncDurableTaskSchedulerClient(AsyncTaskHubGrpcClient):
7479
If None, anonymous authentication will be used.
7580
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
7681
Defaults to True.
82+
resiliency_options (Optional[GrpcClientResiliencyOptions], optional): Client-side
83+
gRPC resiliency settings forwarded to the base async client.
7784
default_version (Optional[str], optional): Default version string for orchestrations.
7885
payload_store (Optional[PayloadStore], optional): A payload store for
7986
externalizing large payloads. If None, payloads are sent inline.
@@ -104,6 +111,7 @@ def __init__(self, *,
104111
secure_channel: bool = True,
105112
interceptors: Optional[Sequence[shared.AsyncClientInterceptor]] = None,
106113
channel_options: Optional[GrpcChannelOptions] = None,
114+
resiliency_options: Optional[GrpcClientResiliencyOptions] = None,
107115
default_version: Optional[str] = None,
108116
payload_store: Optional[PayloadStore] = None,
109117
log_handler: Optional[logging.Handler] = None,
@@ -128,5 +136,6 @@ def __init__(self, *,
128136
log_formatter=log_formatter,
129137
interceptors=resolved_interceptors,
130138
channel_options=channel_options,
139+
resiliency_options=resiliency_options,
131140
default_version=default_version,
132141
payload_store=payload_store)

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313

1414
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
1515
DTSDefaultClientInterceptorImpl
16-
from durabletask.grpc_options import GrpcChannelOptions
16+
from durabletask.grpc_options import (
17+
GrpcChannelOptions,
18+
GrpcWorkerResiliencyOptions,
19+
)
1720
import durabletask.internal.shared as shared
1821
from durabletask.payload.store import PayloadStore
1922
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
@@ -34,6 +37,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
3437
If None, anonymous authentication will be used.
3538
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
3639
Defaults to True.
40+
resiliency_options (Optional[GrpcWorkerResiliencyOptions], optional): Worker-side
41+
gRPC resiliency settings forwarded to the base worker.
3742
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
3843
for controlling worker concurrency limits. If None, default concurrency
3944
settings will be used.
@@ -74,6 +79,7 @@ def __init__(self, *,
7479
secure_channel: bool = True,
7580
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
7681
channel_options: Optional[GrpcChannelOptions] = None,
82+
resiliency_options: Optional[GrpcWorkerResiliencyOptions] = None,
7783
concurrency_options: Optional[ConcurrencyOptions] = None,
7884
payload_store: Optional[PayloadStore] = None,
7985
log_handler: Optional[logging.Handler] = None,
@@ -101,6 +107,7 @@ def __init__(self, *,
101107
log_formatter=log_formatter,
102108
interceptors=resolved_interceptors,
103109
channel_options=channel_options,
110+
resiliency_options=resiliency_options,
104111
concurrency_options=concurrency_options,
105112
# DTS natively supports long timers so chunking is unnecessary
106113
maximum_timer_interval=None,

0 commit comments

Comments
 (0)