Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 82f0095

Browse files
committed
add grpc options with retry/keepalive possible via env vars
Signed-off-by: Filinto Duran <1373693+filintod@users.noreply.github.com>
1 parent 9bba479 commit 82f0095

4 files changed

Lines changed: 327 additions & 9 deletions

File tree

durabletask/aio/internal/shared.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
# Copyright (c) The Dapr Authors.
22
# Licensed under the MIT License.
33

4-
from typing import Optional, Sequence, Union
4+
from typing import Optional, Sequence, Union, Any
55

66
import grpc
77
from grpc import aio as grpc_aio
88

99
from durabletask.internal.shared import (
1010
get_default_host_address,
11+
build_grpc_channel_options,
1112
SECURE_PROTOCOLS,
1213
INSECURE_PROTOCOLS,
1314
)
@@ -24,8 +25,9 @@
2425
def get_grpc_aio_channel(
2526
host_address: Optional[str],
2627
secure_channel: bool = False,
27-
interceptors: Optional[Sequence[ClientInterceptor]] = None) -> grpc_aio.Channel:
28-
28+
interceptors: Optional[Sequence[ClientInterceptor]] = None,
29+
options: Optional[Sequence[tuple[str, Any]]] = None,
30+
) -> grpc_aio.Channel:
2931
if host_address is None:
3032
host_address = get_default_host_address()
3133

@@ -41,9 +43,21 @@ def get_grpc_aio_channel(
4143
host_address = host_address[len(protocol):]
4244
break
4345

46+
# Build channel options (merge provided options with env-driven keepalive/retry)
47+
channel_options = build_grpc_channel_options(options)
48+
49+
# Create the base channel
4450
if secure_channel:
45-
channel = grpc_aio.secure_channel(host_address, grpc.ssl_channel_credentials(), interceptors=interceptors)
51+
if channel_options is not None:
52+
channel = grpc_aio.secure_channel(
53+
host_address, grpc.ssl_channel_credentials(), interceptors=interceptors, options=channel_options,
54+
)
55+
else:
56+
channel = grpc_aio.secure_channel(host_address, grpc.ssl_channel_credentials(), interceptors=interceptors)
4657
else:
47-
channel = grpc_aio.insecure_channel(host_address, interceptors=interceptors)
58+
if channel_options is not None:
59+
channel = grpc_aio.insecure_channel(host_address, interceptors=interceptors, options=channel_options)
60+
else:
61+
channel = grpc_aio.insecure_channel(host_address, interceptors=interceptors)
4862

4963
return channel

durabletask/internal/shared.py

Lines changed: 138 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import dataclasses
55
import json
66
import logging
7+
import os
78
from types import SimpleNamespace
8-
from typing import Any, Optional, Sequence, Union
9+
from typing import Any, Optional, Sequence, Union, Iterable
910

1011
import grpc
1112

@@ -24,6 +25,126 @@
2425
INSECURE_PROTOCOLS = ["http://", "grpc://"]
2526

2627

28+
def _get_env_bool(name: str, default: bool) -> bool:
29+
""" helper to convert the environment variable to a bool"""
30+
val = os.environ.get(name)
31+
if val is None:
32+
return default
33+
return val.strip().lower() in {"1", "true", "t", "yes", "y"}
34+
35+
36+
def _get_env_int(name: str, default: int) -> int:
37+
""" helper to convert the env var to an int or if we could not to the default value given """
38+
val = os.environ.get(name)
39+
if val is None:
40+
return default
41+
try:
42+
return int(val)
43+
except Exception:
44+
return default
45+
46+
47+
def _get_env_float(name: str, default: float) -> float:
48+
""" helper to convert the env var to a float or if we could not to the default value given """
49+
val = os.environ.get(name)
50+
if val is None:
51+
return default
52+
try:
53+
return float(val)
54+
except Exception:
55+
return default
56+
57+
58+
def _get_env_csv(name: str, default_csv: str) -> list[str]:
59+
""" helper to convert the env var to a list or if we could not to the default value given """
60+
val = os.environ.get(name, default_csv)
61+
return [s.strip().upper() for s in val.split(",") if s.strip()]
62+
63+
64+
def get_grpc_keepalive_options() -> list[tuple[str, Any]]:
65+
"""Build gRPC keepalive channel options from environment variables.
66+
67+
Environment variables (defaults in parentheses):
68+
- DAPR_GRPC_KEEPALIVE_ENABLED (false)
69+
- DAPR_GRPC_KEEPALIVE_TIME_MS (120000)
70+
- DAPR_GRPC_KEEPALIVE_TIMEOUT_MS (20000)
71+
- DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS (false)
72+
"""
73+
enabled = _get_env_bool("DAPR_GRPC_KEEPALIVE_ENABLED", False)
74+
if not enabled:
75+
return []
76+
time_ms = _get_env_int("DAPR_GRPC_KEEPALIVE_TIME_MS", 120000)
77+
timeout_ms = _get_env_int("DAPR_GRPC_KEEPALIVE_TIMEOUT_MS", 20000)
78+
permit_without_calls = (
79+
1 if _get_env_bool("DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS", False) else 0
80+
)
81+
return [
82+
("grpc.keepalive_time_ms", time_ms),
83+
("grpc.keepalive_timeout_ms", timeout_ms),
84+
("grpc.keepalive_permit_without_calls", permit_without_calls),
85+
]
86+
87+
88+
def get_grpc_retry_service_config_option() -> Optional[tuple[str, str]]:
89+
"""Return ("grpc.service_config", json_str) if retry is enabled via env; else None.
90+
91+
Environment variables (defaults in parentheses):
92+
- DAPR_GRPC_RETRY_ENABLED (false)
93+
- DAPR_GRPC_RETRY_MAX_ATTEMPTS (4)
94+
- DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS (100)
95+
- DAPR_GRPC_RETRY_MAX_BACKOFF_MS (1000)
96+
- DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER (2.0)
97+
- DAPR_GRPC_RETRY_CODES (UNAVAILABLE,DEADLINE_EXCEEDED)
98+
"""
99+
enabled = _get_env_bool("DAPR_GRPC_RETRY_ENABLED", False)
100+
if not enabled:
101+
return None
102+
103+
max_attempts = _get_env_int("DAPR_GRPC_RETRY_MAX_ATTEMPTS", 4)
104+
initial_backoff_ms = _get_env_int("DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS", 100)
105+
max_backoff_ms = _get_env_int("DAPR_GRPC_RETRY_MAX_BACKOFF_MS", 1000)
106+
backoff_multiplier = _get_env_float("DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER", 2.0)
107+
codes = _get_env_csv("DAPR_GRPC_RETRY_CODES", "UNAVAILABLE,DEADLINE_EXCEEDED")
108+
109+
# service_config ref => https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto#L44
110+
service_config = {
111+
"methodConfig": [
112+
{
113+
"name": [{"service": ""}],
114+
"retryPolicy": {
115+
"maxAttempts": max_attempts,
116+
"initialBackoff": f"{initial_backoff_ms / 1000.0}s",
117+
"maxBackoff": f"{max_backoff_ms / 1000.0}s",
118+
"backoffMultiplier": backoff_multiplier,
119+
"retryableStatusCodes": codes,
120+
},
121+
}
122+
]
123+
}
124+
# we are not applying retry throttling policy (but a user can pass the whole option string via options)
125+
return "grpc.service_config", json.dumps(service_config)
126+
127+
128+
def build_grpc_channel_options(
129+
base_options: Optional[Iterable[tuple[str, Any]]] = None,
130+
) -> Optional[list[tuple[str, Any]]]:
131+
"""Combine base options + env-driven keepalive and retry service config.
132+
133+
The returned list is safe to pass as the `options` argument to grpc.secure_channel/insecure_channel.
134+
"""
135+
combined: list[tuple[str, Any]] = []
136+
if base_options:
137+
combined.extend(list(base_options))
138+
139+
keepalive = get_grpc_keepalive_options()
140+
if keepalive:
141+
combined.extend(keepalive)
142+
retry_opt = get_grpc_retry_service_config_option()
143+
if retry_opt is not None:
144+
combined.append(retry_opt)
145+
return combined if combined else None
146+
147+
27148
def get_default_host_address() -> str:
28149
"""Resolve the default Durable Task sidecar address.
29150
@@ -53,7 +174,9 @@ def get_default_host_address() -> str:
53174
def get_grpc_channel(
54175
host_address: Optional[str],
55176
secure_channel: bool = False,
56-
interceptors: Optional[Sequence[ClientInterceptor]] = None) -> grpc.Channel:
177+
interceptors: Optional[Sequence[ClientInterceptor]] = None,
178+
options: Optional[Sequence[tuple[str, Any]]] = None,
179+
) -> grpc.Channel:
57180
if host_address is None:
58181
host_address = get_default_host_address()
59182

@@ -71,11 +194,22 @@ def get_grpc_channel(
71194
host_address = host_address[len(protocol):]
72195
break
73196

197+
# Build channel options (merge provided options with env-driven keepalive/retry)
198+
channel_options = build_grpc_channel_options(options)
199+
74200
# Create the base channel
75201
if secure_channel:
76-
channel = grpc.secure_channel(host_address, grpc.ssl_channel_credentials())
202+
if channel_options is not None:
203+
channel = grpc.secure_channel(
204+
host_address, grpc.ssl_channel_credentials(), options=channel_options
205+
)
206+
else:
207+
channel = grpc.secure_channel(host_address, grpc.ssl_channel_credentials())
77208
else:
78-
channel = grpc.insecure_channel(host_address)
209+
if channel_options is not None:
210+
channel = grpc.insecure_channel(host_address, options=channel_options)
211+
else:
212+
channel = grpc.insecure_channel(host_address)
79213

80214
# Apply interceptors ONLY if they exist
81215
if interceptors:
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import json
2+
from unittest.mock import ANY, patch
3+
4+
import pytest
5+
6+
from durabletask.aio.internal.shared import get_grpc_aio_channel
7+
8+
HOST_ADDRESS = "localhost:50051"
9+
10+
11+
def _find_option(options, key):
12+
for k, v in options:
13+
if k == key:
14+
return v
15+
raise AssertionError(f"Option with key {key} not found in options: {options}")
16+
17+
18+
def test_aio_channel_passes_base_options_and_max_lengths():
19+
base_options = [
20+
("grpc.max_send_message_length", 4321),
21+
("grpc.max_receive_message_length", 8765),
22+
("grpc.primary_user_agent", "durabletask-aio-tests"),
23+
]
24+
with patch("durabletask.aio.internal.shared.grpc_aio.insecure_channel") as mock_channel:
25+
get_grpc_aio_channel(HOST_ADDRESS, False, options=base_options)
26+
# Ensure called with options kwarg
27+
assert mock_channel.call_count == 1
28+
args, kwargs = mock_channel.call_args
29+
assert args[0] == HOST_ADDRESS
30+
assert "options" in kwargs
31+
opts = kwargs["options"]
32+
# Check our base options made it through
33+
assert ("grpc.max_send_message_length", 4321) in opts
34+
assert ("grpc.max_receive_message_length", 8765) in opts
35+
assert ("grpc.primary_user_agent", "durabletask-aio-tests") in opts
36+
37+
38+
def test_aio_channel_merges_env_keepalive_and_retry(monkeypatch: pytest.MonkeyPatch):
39+
# Enable keepalive and retry via env
40+
monkeypatch.setenv("DAPR_GRPC_KEEPALIVE_ENABLED", "true")
41+
monkeypatch.setenv("DAPR_GRPC_KEEPALIVE_TIME_MS", "101000")
42+
monkeypatch.setenv("DAPR_GRPC_KEEPALIVE_TIMEOUT_MS", "30303")
43+
monkeypatch.setenv("DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS", "true")
44+
45+
monkeypatch.setenv("DAPR_GRPC_RETRY_ENABLED", "true")
46+
monkeypatch.setenv("DAPR_GRPC_RETRY_MAX_ATTEMPTS", "7")
47+
monkeypatch.setenv("DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS", "150")
48+
monkeypatch.setenv("DAPR_GRPC_RETRY_MAX_BACKOFF_MS", "3000")
49+
monkeypatch.setenv("DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER", "2.25")
50+
monkeypatch.setenv("DAPR_GRPC_RETRY_CODES", "UNAVAILABLE,RESOURCE_EXHAUSTED")
51+
52+
base_options = [("another.custom.opt", 99)]
53+
54+
with patch("durabletask.aio.internal.shared.grpc_aio.insecure_channel") as mock_channel:
55+
get_grpc_aio_channel(HOST_ADDRESS, False, options=base_options)
56+
57+
args, kwargs = mock_channel.call_args
58+
assert args[0] == HOST_ADDRESS
59+
assert "options" in kwargs
60+
opts = kwargs["options"]
61+
62+
# Base option present
63+
assert ("another.custom.opt", 99) in opts
64+
65+
# Keepalive options present
66+
assert ("grpc.keepalive_time_ms", 101000) in opts
67+
assert ("grpc.keepalive_timeout_ms", 30303) in opts
68+
assert ("grpc.keepalive_permit_without_calls", 1) in opts # true -> 1
69+
70+
# Retry service config present and parses correctly
71+
svc_cfg_str = _find_option(opts, "grpc.service_config")
72+
svc_cfg = json.loads(svc_cfg_str)
73+
assert "methodConfig" in svc_cfg and isinstance(svc_cfg["methodConfig"], list)
74+
retry_policy = svc_cfg["methodConfig"][0]["retryPolicy"]
75+
assert retry_policy["maxAttempts"] == 7
76+
assert retry_policy["initialBackoff"] == f"{150/1000.0}s"
77+
assert retry_policy["maxBackoff"] == f"{3000/1000.0}s"
78+
assert retry_policy["backoffMultiplier"] == 2.25
79+
assert "RESOURCE_EXHAUSTED" in retry_policy["retryableStatusCodes"]
80+
81+
82+
def test_aio_secure_channel_receives_options_when_secure_true():
83+
base_options = [("grpc.max_receive_message_length", 999999)]
84+
with patch("durabletask.aio.internal.shared.grpc_aio.secure_channel") as mock_channel, \
85+
patch("grpc.ssl_channel_credentials") as mock_credentials:
86+
get_grpc_aio_channel(HOST_ADDRESS, True, options=base_options)
87+
args, kwargs = mock_channel.call_args
88+
assert args[0] == HOST_ADDRESS
89+
assert args[1] == mock_credentials.return_value
90+
assert ("grpc.max_receive_message_length", 999999) in kwargs.get("options", [])
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import json
2+
from unittest.mock import ANY, patch
3+
4+
import pytest
5+
6+
from durabletask.internal.shared import get_grpc_channel
7+
8+
HOST_ADDRESS = "localhost:50051"
9+
10+
11+
def _find_option(options, key):
12+
for k, v in options:
13+
if k == key:
14+
return v
15+
raise AssertionError(f"Option with key {key} not found in options: {options}")
16+
17+
18+
def test_sync_channel_passes_base_options_and_max_lengths():
19+
base_options = [
20+
("grpc.max_send_message_length", 1234),
21+
("grpc.max_receive_message_length", 5678),
22+
("grpc.primary_user_agent", "durabletask-tests"),
23+
]
24+
with patch("grpc.insecure_channel") as mock_channel:
25+
get_grpc_channel(HOST_ADDRESS, False, options=base_options)
26+
# Ensure called with options kwarg
27+
assert mock_channel.call_count == 1
28+
args, kwargs = mock_channel.call_args
29+
assert args[0] == HOST_ADDRESS
30+
assert "options" in kwargs
31+
opts = kwargs["options"]
32+
# Check our base options made it through
33+
assert ("grpc.max_send_message_length", 1234) in opts
34+
assert ("grpc.max_receive_message_length", 5678) in opts
35+
assert ("grpc.primary_user_agent", "durabletask-tests") in opts
36+
37+
38+
def test_sync_channel_merges_env_keepalive_and_retry(monkeypatch: pytest.MonkeyPatch):
39+
# Enable keepalive and retry via env
40+
monkeypatch.setenv("DAPR_GRPC_KEEPALIVE_ENABLED", "true")
41+
monkeypatch.setenv("DAPR_GRPC_KEEPALIVE_TIME_MS", "111000")
42+
monkeypatch.setenv("DAPR_GRPC_KEEPALIVE_TIMEOUT_MS", "22222")
43+
monkeypatch.setenv("DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS", "true")
44+
45+
monkeypatch.setenv("DAPR_GRPC_RETRY_ENABLED", "true")
46+
monkeypatch.setenv("DAPR_GRPC_RETRY_MAX_ATTEMPTS", "5")
47+
monkeypatch.setenv("DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS", "250")
48+
monkeypatch.setenv("DAPR_GRPC_RETRY_MAX_BACKOFF_MS", "2000")
49+
monkeypatch.setenv("DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER", "1.5")
50+
monkeypatch.setenv("DAPR_GRPC_RETRY_CODES", "UNAVAILABLE,DEADLINE_EXCEEDED,ABORTED")
51+
52+
base_options = [("custom.opt", 42)]
53+
54+
with patch("grpc.insecure_channel") as mock_channel:
55+
get_grpc_channel(HOST_ADDRESS, False, options=base_options)
56+
57+
args, kwargs = mock_channel.call_args
58+
assert args[0] == HOST_ADDRESS
59+
assert "options" in kwargs
60+
opts = kwargs["options"]
61+
62+
# Base option present
63+
assert ("custom.opt", 42) in opts
64+
65+
# Keepalive options present
66+
assert ("grpc.keepalive_time_ms", 111000) in opts
67+
assert ("grpc.keepalive_timeout_ms", 22222) in opts
68+
assert ("grpc.keepalive_permit_without_calls", 1) in opts # true -> 1
69+
70+
# Retry service config present and parses correctly
71+
svc_cfg_str = _find_option(opts, "grpc.service_config")
72+
svc_cfg = json.loads(svc_cfg_str)
73+
assert "methodConfig" in svc_cfg and isinstance(svc_cfg["methodConfig"], list)
74+
retry_policy = svc_cfg["methodConfig"][0]["retryPolicy"]
75+
assert retry_policy["maxAttempts"] == 5
76+
assert retry_policy["initialBackoff"] == f"{250/1000.0}s"
77+
assert retry_policy["maxBackoff"] == f"{2000/1000.0}s"
78+
assert retry_policy["backoffMultiplier"] == 1.5
79+
# Codes are upper-cased list
80+
assert "ABORTED" in retry_policy["retryableStatusCodes"]

0 commit comments

Comments
 (0)