Skip to content

Commit 845c697

Browse files
Bernd VerstCopilot
andcommitted
Modernize PR-new code for Python 3.10+ baseline
Apply three Python 3.10+ idioms to code introduced by this PR: - PEP 604 union syntax: replace Optional[X] with X | None in grpc_options.py and grpc_resiliency.py. - Dataclass tightening: add slots=True, kw_only=True to FailureTracker, GrpcRetryPolicyOptions, GrpcChannelOptions, GrpcWorkerResiliencyOptions, and GrpcClientResiliencyOptions. Update the two positional FailureTracker(...) call sites in client.py to use threshold=... kwargs. - PEP 617 parenthesized context managers: rewrite the 10 chained 'with patch(...), patch(...):' blocks added by this PR in test_client.py. Pre-existing chained sites are left untouched to keep the diff surgical. Internal-only change (no public API or behavior impact). 85/85 resiliency-focused tests pass; 232/8 passed/skipped in the broader non-e2e suite. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 3268b45 commit 845c697

4 files changed

Lines changed: 81 additions & 48 deletions

File tree

durabletask/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def __init__(self, *,
215215
# ``close()`` wait deterministically for an in-flight recreate.
216216
self._recreate_done_event = threading.Event()
217217
self._client_failure_tracker = FailureTracker(
218-
self._resiliency_options.channel_recreate_failure_threshold
218+
threshold=self._resiliency_options.channel_recreate_failure_threshold,
219219
)
220220
self._resiliency_interceptor = ClientResiliencyInterceptor(
221221
self._client_failure_tracker,
@@ -705,7 +705,7 @@ def __init__(self, *,
705705
# ``close()`` await an in-flight recreate deterministically.
706706
self._recreate_done_event = asyncio.Event()
707707
self._client_failure_tracker = FailureTracker(
708-
self._resiliency_options.channel_recreate_failure_threshold
708+
threshold=self._resiliency_options.channel_recreate_failure_threshold,
709709
)
710710
self._resiliency_interceptor = AsyncClientResiliencyInterceptor(
711711
self._client_failure_tracker,

durabletask/grpc_options.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
from dataclasses import dataclass, field
77
import json
8-
from typing import Any, Optional
8+
from typing import Any
99

1010

11-
@dataclass
11+
@dataclass(slots=True, kw_only=True)
1212
class GrpcRetryPolicyOptions:
1313
"""Configuration for transport-level gRPC retries."""
1414

@@ -64,16 +64,16 @@ def to_service_config(self) -> dict[str, Any]:
6464
}
6565

6666

67-
@dataclass
67+
@dataclass(slots=True, kw_only=True)
6868
class GrpcChannelOptions:
6969
"""Configuration for transport-level gRPC channel behavior."""
7070

71-
max_receive_message_length: Optional[int] = None
72-
max_send_message_length: Optional[int] = None
73-
keepalive_time_ms: Optional[int] = None
74-
keepalive_timeout_ms: Optional[int] = None
75-
keepalive_permit_without_calls: Optional[bool] = None
76-
retry_policy: Optional[GrpcRetryPolicyOptions] = None
71+
max_receive_message_length: int | None = None
72+
max_send_message_length: int | None = None
73+
keepalive_time_ms: int | None = None
74+
keepalive_timeout_ms: int | None = None
75+
keepalive_permit_without_calls: bool | None = None
76+
retry_policy: GrpcRetryPolicyOptions | None = None
7777
raw_options: list[tuple[str, Any]] = field(default_factory=list)
7878

7979
def to_grpc_options(self) -> list[tuple[str, Any]]:
@@ -102,7 +102,7 @@ def to_grpc_options(self) -> list[tuple[str, Any]]:
102102
return options
103103

104104

105-
@dataclass
105+
@dataclass(slots=True, kw_only=True)
106106
class GrpcWorkerResiliencyOptions:
107107
"""Configuration for worker-side gRPC resiliency behavior."""
108108

@@ -129,7 +129,7 @@ def __post_init__(self) -> None:
129129
)
130130

131131

132-
@dataclass
132+
@dataclass(slots=True, kw_only=True)
133133
class GrpcClientResiliencyOptions:
134134
"""Configuration for client-side gRPC resiliency behavior."""
135135

durabletask/internal/grpc_resiliency.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import random
55
import threading
66
from dataclasses import dataclass, field
7-
from typing import Callable, Optional
7+
from typing import Callable
88

99
import grpc
1010
import grpc.aio
@@ -35,7 +35,7 @@ def get_full_jitter_delay_seconds(
3535
return random.random() * upper_bound
3636

3737

38-
@dataclass
38+
@dataclass(slots=True, kw_only=True)
3939
class FailureTracker:
4040
"""Counts consecutive transport failures with thread-safe mutation.
4141
@@ -114,7 +114,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
114114
self._record_outcome(client_call_details.method, error)
115115
return response
116116

117-
def _record_outcome(self, method: str, error: Optional[BaseException]) -> None:
117+
def _record_outcome(self, method: str, error: BaseException | None) -> None:
118118
if error is None:
119119
self._failure_tracker.record_success()
120120
return
@@ -162,7 +162,7 @@ async def intercept_unary_unary(self, continuation, client_call_details, request
162162
self._record_outcome(client_call_details.method, None)
163163
return response
164164

165-
def _record_outcome(self, method: str, error: Optional[BaseException]) -> None:
165+
def _record_outcome(self, method: str, error: BaseException | None) -> None:
166166
if error is None:
167167
self._failure_tracker.record_success()
168168
return

tests/durabletask/test_client.py

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -509,8 +509,9 @@ def test_client_stores_resiliency_options_for_recreation():
509509
resiliency = GrpcClientResiliencyOptions(channel_recreate_failure_threshold=7)
510510
channel_options = GrpcChannelOptions(max_receive_message_length=1234)
511511
interceptors = [DefaultClientInterceptorImpl(METADATA)]
512-
with patch("durabletask.client.shared.get_grpc_channel", return_value=MagicMock()), patch(
513-
"durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=MagicMock()
512+
with (
513+
patch("durabletask.client.shared.get_grpc_channel", return_value=MagicMock()),
514+
patch("durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=MagicMock()),
514515
):
515516
client = TaskHubGrpcClient(
516517
host_address="localhost:4001",
@@ -542,12 +543,17 @@ def test_sync_client_recreates_sdk_owned_channel_with_original_transport_inputs(
542543

543544
timer = MagicMock()
544545

545-
with patch(
546+
with (
547+
patch(
546548
"durabletask.client.shared.get_grpc_channel",
547549
side_effect=[first_channel, second_channel],
548-
) as mock_get_channel, patch(
549-
"durabletask.client.stubs.TaskHubSidecarServiceStub", side_effect=[first_stub, second_stub]
550-
), patch("threading.Timer", return_value=timer) as mock_timer:
550+
) as mock_get_channel,
551+
patch(
552+
"durabletask.client.stubs.TaskHubSidecarServiceStub",
553+
side_effect=[first_stub, second_stub],
554+
),
555+
patch("threading.Timer", return_value=timer) as mock_timer,
556+
):
551557
client = TaskHubGrpcClient(
552558
host_address=host_address,
553559
secure_channel=True,
@@ -596,12 +602,17 @@ def test_sync_client_close_closes_retired_channels_immediately():
596602
second_stub.GetInstance.return_value = MagicMock(exists=False)
597603
close_timer = MagicMock(name="close-timer")
598604

599-
with patch(
605+
with (
606+
patch(
600607
"durabletask.client.shared.get_grpc_channel",
601608
side_effect=[first_channel, second_channel],
602-
), patch(
603-
"durabletask.client.stubs.TaskHubSidecarServiceStub", side_effect=[first_stub, second_stub]
604-
), patch("threading.Timer", return_value=close_timer):
609+
),
610+
patch(
611+
"durabletask.client.stubs.TaskHubSidecarServiceStub",
612+
side_effect=[first_stub, second_stub],
613+
),
614+
patch("threading.Timer", return_value=close_timer),
615+
):
605616
client = TaskHubGrpcClient(
606617
resiliency_options=GrpcClientResiliencyOptions(
607618
channel_recreate_failure_threshold=1,
@@ -632,13 +643,17 @@ def test_sync_client_close_closes_all_retired_sdk_channels_immediately():
632643
timer1 = MagicMock(name="close-timer-1")
633644
timer2 = MagicMock(name="close-timer-2")
634645

635-
with patch(
646+
with (
647+
patch(
636648
"durabletask.client.shared.get_grpc_channel",
637649
side_effect=[first_channel, second_channel, third_channel],
638-
), patch(
650+
),
651+
patch(
639652
"durabletask.client.stubs.TaskHubSidecarServiceStub",
640653
side_effect=[first_stub, second_stub, third_stub],
641-
), patch("threading.Timer", side_effect=[timer1, timer2]):
654+
),
655+
patch("threading.Timer", side_effect=[timer1, timer2]),
656+
):
642657
client = TaskHubGrpcClient(
643658
resiliency_options=GrpcClientResiliencyOptions(
644659
channel_recreate_failure_threshold=1,
@@ -681,8 +696,9 @@ def test_sync_client_resets_failure_tracking_after_long_poll_deadline(
681696
stub.GetInstance.side_effect = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
682697
getattr(stub, stub_method_name).side_effect = FakeRpcError(grpc.StatusCode.DEADLINE_EXCEEDED)
683698

684-
with patch("durabletask.client.shared.get_grpc_channel", return_value=MagicMock()), patch(
685-
"durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub
699+
with (
700+
patch("durabletask.client.shared.get_grpc_channel", return_value=MagicMock()),
701+
patch("durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub),
686702
):
687703
client = TaskHubGrpcClient(
688704
resiliency_options=GrpcClientResiliencyOptions(channel_recreate_failure_threshold=2)
@@ -700,9 +716,13 @@ def test_sync_client_does_not_recreate_caller_owned_channel():
700716
stub = MagicMock()
701717
stub.GetInstance.side_effect = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
702718

703-
with patch("durabletask.client.shared.get_grpc_channel") as mock_get_channel, patch(
719+
with (
720+
patch("durabletask.client.shared.get_grpc_channel") as mock_get_channel,
721+
patch(
704722
"durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub
705-
) as mock_stub, patch("threading.Timer") as mock_timer:
723+
) as mock_stub,
724+
patch("threading.Timer") as mock_timer,
725+
):
706726
client = TaskHubGrpcClient(
707727
channel=provided_channel,
708728
resiliency_options=GrpcClientResiliencyOptions(channel_recreate_failure_threshold=1),
@@ -739,15 +759,20 @@ def test_sync_client_recreate_cooldown_prevents_immediate_repeated_recreation():
739759
timer1 = MagicMock(name="close-timer-1")
740760
timer2 = MagicMock(name="close-timer-2")
741761

742-
with patch(
762+
with (
763+
patch(
743764
"durabletask.client.shared.get_grpc_channel",
744765
side_effect=[first_channel, second_channel, third_channel],
745-
) as mock_get_channel, patch(
766+
) as mock_get_channel,
767+
patch(
746768
"durabletask.client.stubs.TaskHubSidecarServiceStub",
747769
side_effect=[first_stub, second_stub, third_stub],
748-
), patch(
770+
),
771+
patch(
749772
"durabletask.client.time.monotonic", side_effect=[100.0, 101.0, 131.0]
750-
), patch("threading.Timer", side_effect=[timer1, timer2]) as mock_timer:
773+
),
774+
patch("threading.Timer", side_effect=[timer1, timer2]) as mock_timer,
775+
):
751776
client = TaskHubGrpcClient(
752777
host_address=HOST_ADDRESS,
753778
resiliency_options=GrpcClientResiliencyOptions(
@@ -811,8 +836,9 @@ def test_sync_client_resets_failure_tracking_after_success():
811836
MagicMock(exists=False),
812837
]
813838

814-
with patch("durabletask.client.shared.get_grpc_channel", return_value=MagicMock()), patch(
815-
"durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub
839+
with (
840+
patch("durabletask.client.shared.get_grpc_channel", return_value=MagicMock()),
841+
patch("durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub),
816842
):
817843
client = TaskHubGrpcClient(
818844
resiliency_options=GrpcClientResiliencyOptions(channel_recreate_failure_threshold=2)
@@ -831,8 +857,9 @@ def test_sync_client_resets_failure_tracking_after_application_error():
831857
FakeRpcError(grpc.StatusCode.INVALID_ARGUMENT),
832858
]
833859

834-
with patch("durabletask.client.shared.get_grpc_channel", return_value=MagicMock()), patch(
835-
"durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub
860+
with (
861+
patch("durabletask.client.shared.get_grpc_channel", return_value=MagicMock()),
862+
patch("durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub),
836863
):
837864
client = TaskHubGrpcClient(
838865
resiliency_options=GrpcClientResiliencyOptions(channel_recreate_failure_threshold=2)
@@ -906,8 +933,9 @@ async def test_async_client_does_not_count_wait_for_orchestration_deadline():
906933
stub.GetInstance = AsyncMock(side_effect=make_aio_rpc_error(grpc.StatusCode.UNAVAILABLE))
907934
stub.WaitForInstanceCompletion = AsyncMock(side_effect=make_aio_rpc_error(grpc.StatusCode.DEADLINE_EXCEEDED))
908935

909-
with patch("durabletask.client.shared.get_async_grpc_channel", return_value=MagicMock()), patch(
910-
"durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub
936+
with (
937+
patch("durabletask.client.shared.get_async_grpc_channel", return_value=MagicMock()),
938+
patch("durabletask.client.stubs.TaskHubSidecarServiceStub", return_value=stub),
911939
):
912940
client = AsyncTaskHubGrpcClient(
913941
resiliency_options=GrpcClientResiliencyOptions(channel_recreate_failure_threshold=2)
@@ -939,15 +967,20 @@ async def blocked_close_retired_channel(self, channel):
939967
await release_cleanup.wait()
940968
await channel.close()
941969

942-
with patch(
970+
with (
971+
patch(
943972
"durabletask.client.shared.get_async_grpc_channel",
944973
side_effect=[first_channel, second_channel],
945-
), patch(
946-
"durabletask.client.stubs.TaskHubSidecarServiceStub", side_effect=[first_stub, second_stub]
947-
), patch.object(
974+
),
975+
patch(
976+
"durabletask.client.stubs.TaskHubSidecarServiceStub",
977+
side_effect=[first_stub, second_stub],
978+
),
979+
patch.object(
948980
AsyncTaskHubGrpcClient,
949981
"_close_retired_channel",
950982
new=blocked_close_retired_channel,
983+
),
951984
):
952985
client = AsyncTaskHubGrpcClient(
953986
resiliency_options=GrpcClientResiliencyOptions(

0 commit comments

Comments
 (0)