Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,25 @@ def __init__(
StreamUnaryClientInterceptor or
StreamStreamClientInterceptor, optional): gRPC interceptors.
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
message length in bytes. When omitted, the env var
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` is consulted to set the
receive limit (matches the Java SDK property of the same name).
"""
DaprHealth.wait_for_sidecar()
self.retry_policy = retry_policy or RetryPolicy()

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
options: List[Tuple[str, Any]] = [
('grpc.primary_user_agent', useragent),
]
else:
options = [
('grpc.max_send_message_length', max_grpc_message_length),
('grpc.max_receive_message_length', max_grpc_message_length),
('grpc.primary_user_agent', useragent),
]
options: List[Tuple[str, Any]] = [('grpc.primary_user_agent', useragent)]
if max_grpc_message_length:
options.append(('grpc.max_send_message_length', max_grpc_message_length))
options.append(('grpc.max_receive_message_length', max_grpc_message_length))
elif settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES:
options.append(
(
'grpc.max_receive_message_length',
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set a value to max_send_message_length in this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outbound is supposed to be set to unlimited by default so we'll be able to pass any input size through the outbound channel. If users want to restrict the outbound they can still do so explicitly through the constructor. This conforms to the other SDKs

settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES,
)
)

if not address:
address = settings.DAPR_GRPC_ENDPOINT or (
Expand Down
25 changes: 14 additions & 11 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,26 @@ def __init__(
StreamUnaryClientInterceptor or
StreamStreamClientInterceptor, optional): gRPC interceptors.
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
message length in bytes. When omitted, the env var
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` is consulted to set the
receive limit (matches the Java SDK property of the same name).
retry_policy (RetryPolicy optional): Specifies retry behaviour
"""
DaprHealth.wait_for_sidecar()
self.retry_policy = retry_policy or RetryPolicy()

useragent = f'dapr-sdk-python/{__version__}'
if not max_grpc_message_length:
options = [
('grpc.primary_user_agent', useragent),
]
else:
options = [
('grpc.max_send_message_length', max_grpc_message_length), # type: ignore
('grpc.max_receive_message_length', max_grpc_message_length), # type: ignore
('grpc.primary_user_agent', useragent),
]
options = [('grpc.primary_user_agent', useragent)]
if max_grpc_message_length:
options.append(('grpc.max_send_message_length', max_grpc_message_length)) # type: ignore
options.append(('grpc.max_receive_message_length', max_grpc_message_length)) # type: ignore
elif settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES:
options.append(
(
'grpc.max_receive_message_length',
settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES,
) # type: ignore
)

if not address:
address = settings.DAPR_GRPC_ENDPOINT or (
Expand Down
4 changes: 4 additions & 0 deletions dapr/conf/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@

DAPR_HTTP_TIMEOUT_SECONDS = 60

# Maximum inbound gRPC message size in bytes for the SDK client and ext-grpc callback server.
# 0 leaves the gRPC default (4 MiB) in place. See dapr/python-sdk#1023.
DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES = 0

# ----- Conversation API settings ------

# Configuration for handling large enums to avoid massive JSON schemas that can exceed LLM token limits
Expand Down
14 changes: 12 additions & 2 deletions ext/dapr-ext-grpc/dapr/ext/grpc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ def __init__(self, max_grpc_message_length: Optional[int] = None, **kwargs):
"""Inits App object and creates gRPC server.

Args:
max_grpc_messsage_length (int, optional): The maximum grpc send and receive
message length in bytes. Only used when kwargs are not set.
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes. Only used when kwargs are not set. When this
argument is omitted, the env var
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` is consulted to set the
receive limit (matches the Java SDK property of the same name).
kwargs: arguments to grpc.server()
"""
self._servicer = _CallbackServicer()
Expand All @@ -52,6 +55,13 @@ def __init__(self, max_grpc_message_length: Optional[int] = None, **kwargs):
('grpc.max_send_message_length', max_grpc_message_length),
('grpc.max_receive_message_length', max_grpc_message_length),
]
elif settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES:
options = [
(
'grpc.max_receive_message_length',
settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES,
),
]
self._server = grpc.server( # type: ignore
futures.ThreadPoolExecutor(max_workers=10), options=options
)
Expand Down
47 changes: 47 additions & 0 deletions ext/dapr-ext-grpc/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
"""

import unittest
from unittest.mock import MagicMock, patch

from cloudevents.sdk.event import v1
from dapr.ext.grpc import App, BindingRequest, InvokeMethodRequest, Rule

from dapr.conf import settings


class AppTests(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -88,3 +91,47 @@ def health_check_cb():
def test_no_health_check(self):
registered_cb = self._app._health_check_servicer._health_check_cb
self.assertIsNone(registered_cb)


class AppGrpcOptionsTests(unittest.TestCase):
"""Exercises options passed to grpc.server() based on env var / constructor arg."""

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
@patch('dapr.ext.grpc.app.grpc.server')
def test_default_no_size_options(self, mock_server):
mock_server.return_value = MagicMock()

App()

_, kwargs = mock_server.call_args
options = dict(kwargs.get('options') or [])
self.assertNotIn('grpc.max_send_message_length', options)
self.assertNotIn('grpc.max_receive_message_length', options)

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024)
@patch('dapr.ext.grpc.app.grpc.server')
def test_env_var_sets_receive_only(self, mock_server):
mock_server.return_value = MagicMock()

App()

_, kwargs = mock_server.call_args
options = dict(kwargs.get('options') or [])
self.assertEqual(options.get('grpc.max_receive_message_length'), 8 * 1024 * 1024)
self.assertNotIn('grpc.max_send_message_length', options)

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024)
@patch('dapr.ext.grpc.app.grpc.server')
def test_constructor_arg_overrides_env(self, mock_server):
mock_server.return_value = MagicMock()

App(max_grpc_message_length=32 * 1024 * 1024)

_, kwargs = mock_server.call_args
options = dict(kwargs.get('options') or [])
self.assertEqual(options.get('grpc.max_send_message_length'), 32 * 1024 * 1024)
self.assertEqual(options.get('grpc.max_receive_message_length'), 32 * 1024 * 1024)


if __name__ == '__main__':
unittest.main()
39 changes: 38 additions & 1 deletion tests/clients/test_dapr_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import time
import unittest
import uuid
from unittest.mock import patch
from unittest.mock import MagicMock, patch

from google.rpc import code_pb2, status_pb2

Expand Down Expand Up @@ -1711,6 +1711,43 @@ def test_jobs_error_handling(self):
with self.assertRaises(DaprGrpcError):
dapr.schedule_job_alpha1(job)

def _capture_grpc_channel_options(self, **client_kwargs) -> dict[str, object]:
with (
patch('dapr.clients.grpc.client.grpc.secure_channel') as mock_secure,
patch('dapr.clients.grpc.client.grpc.insecure_channel') as mock_insecure,
patch(
'dapr.clients.grpc.client.grpc.intercept_channel',
side_effect=lambda ch, *_: ch,
),
):
mock_secure.return_value = MagicMock(name='secure-channel')
mock_insecure.return_value = MagicMock(name='insecure-channel')
DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}', **client_kwargs)
call = mock_insecure.call_args or mock_secure.call_args
return dict(call.kwargs['options'])

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
def test_grpc_channel_options_default(self):
"""Without env var or constructor arg, no message size options are set."""
options = self._capture_grpc_channel_options()
self.assertNotIn('grpc.max_send_message_length', options)
self.assertNotIn('grpc.max_receive_message_length', options)
self.assertIn('grpc.primary_user_agent', options)

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024)
def test_grpc_channel_options_env_var_sets_receive_only(self):
"""DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES sets receive size; send stays at gRPC default."""
options = self._capture_grpc_channel_options()
self.assertEqual(options.get('grpc.max_receive_message_length'), 8 * 1024 * 1024)
self.assertNotIn('grpc.max_send_message_length', options)

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024)
def test_grpc_channel_options_constructor_arg_overrides_env(self):
"""Explicit max_grpc_message_length wins over the env var and sets both send + receive."""
options = self._capture_grpc_channel_options(max_grpc_message_length=32 * 1024 * 1024)
self.assertEqual(options.get('grpc.max_send_message_length'), 32 * 1024 * 1024)
self.assertEqual(options.get('grpc.max_receive_message_length'), 32 * 1024 * 1024)


if __name__ == '__main__':
unittest.main()
38 changes: 37 additions & 1 deletion tests/clients/test_dapr_grpc_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import tempfile
import unittest
import uuid
from unittest.mock import patch
from unittest.mock import MagicMock, patch

from google.rpc import code_pb2, status_pb2

Expand Down Expand Up @@ -1763,6 +1763,42 @@ async def test_job_lifecycle(self):

await dapr.close()

def _capture_grpc_channel_options(self, **client_kwargs) -> dict[str, object]:
with (
patch('dapr.aio.clients.grpc.client.grpc.aio.secure_channel') as mock_secure,
patch('dapr.aio.clients.grpc.client.grpc.aio.insecure_channel') as mock_insecure,
):
mock_secure.return_value = MagicMock(name='secure-channel')
mock_insecure.return_value = MagicMock(name='insecure-channel')
DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}', **client_kwargs)
if mock_insecure.call_args:
args, _ = mock_insecure.call_args
return dict(args[1])
_, kwargs = mock_secure.call_args
return dict(kwargs['options'])

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
def test_grpc_channel_options_default(self):
"""Without env var or constructor arg, no message size options are set."""
options = self._capture_grpc_channel_options()
self.assertNotIn('grpc.max_send_message_length', options)
self.assertNotIn('grpc.max_receive_message_length', options)
self.assertIn('grpc.primary_user_agent', options)

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024)
def test_grpc_channel_options_env_var_sets_receive_only(self):
"""DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES sets receive size; send stays at gRPC default."""
options = self._capture_grpc_channel_options()
self.assertEqual(options.get('grpc.max_receive_message_length'), 8 * 1024 * 1024)
self.assertNotIn('grpc.max_send_message_length', options)

@patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024)
def test_grpc_channel_options_constructor_arg_overrides_env(self):
"""Explicit max_grpc_message_length wins over the env var and sets both send + receive."""
options = self._capture_grpc_channel_options(max_grpc_message_length=32 * 1024 * 1024)
self.assertEqual(options.get('grpc.max_send_message_length'), 32 * 1024 * 1024)
self.assertEqual(options.get('grpc.max_receive_message_length'), 32 * 1024 * 1024)


if __name__ == '__main__':
unittest.main()
Loading