From 476b48fdeca6aa259a70e93db0c4ed17231984d7 Mon Sep 17 00:00:00 2001 From: Casper Nielsen Date: Tue, 12 May 2026 11:55:30 +0200 Subject: [PATCH] fix(1023): enable setting DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES through environment variable Signed-off-by: Casper Nielsen --- dapr/aio/clients/grpc/client.py | 25 ++++++----- dapr/clients/grpc/client.py | 25 ++++++----- dapr/conf/global_settings.py | 4 ++ ext/dapr-ext-grpc/dapr/ext/grpc/app.py | 14 +++++- ext/dapr-ext-grpc/tests/test_app.py | 47 ++++++++++++++++++++ tests/clients/test_dapr_grpc_client.py | 39 +++++++++++++++- tests/clients/test_dapr_grpc_client_async.py | 38 +++++++++++++++- 7 files changed, 166 insertions(+), 26 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 68d032b57..5dc88ea6f 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -144,22 +144,25 @@ def __init__( StreamUnaryClientInterceptor or StreamStreamClientInterceptor, optional): gRPC interceptors. max_grpc_message_length (int, optional): The maximum grpc send and receive - message length in bytes. + message length in bytes. When omitted, the env var + ``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` is consulted to set the + receive limit (matches the Java SDK property of the same name). """ DaprHealth.wait_for_sidecar() self.retry_policy = retry_policy or RetryPolicy() useragent = f'dapr-sdk-python/{__version__}' - if not max_grpc_message_length: - options: List[Tuple[str, Any]] = [ - ('grpc.primary_user_agent', useragent), - ] - else: - options = [ - ('grpc.max_send_message_length', max_grpc_message_length), - ('grpc.max_receive_message_length', max_grpc_message_length), - ('grpc.primary_user_agent', useragent), - ] + options: List[Tuple[str, Any]] = [('grpc.primary_user_agent', useragent)] + if max_grpc_message_length: + options.append(('grpc.max_send_message_length', max_grpc_message_length)) + options.append(('grpc.max_receive_message_length', max_grpc_message_length)) + elif settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES: + options.append( + ( + 'grpc.max_receive_message_length', + settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES, + ) + ) if not address: address = settings.DAPR_GRPC_ENDPOINT or ( diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index 5ac02f609..70eba4ce9 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -137,23 +137,26 @@ def __init__( StreamUnaryClientInterceptor or StreamStreamClientInterceptor, optional): gRPC interceptors. max_grpc_message_length (int, optional): The maximum grpc send and receive - message length in bytes. + message length in bytes. When omitted, the env var + ``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` is consulted to set the + receive limit (matches the Java SDK property of the same name). retry_policy (RetryPolicy optional): Specifies retry behaviour """ DaprHealth.wait_for_sidecar() self.retry_policy = retry_policy or RetryPolicy() useragent = f'dapr-sdk-python/{__version__}' - if not max_grpc_message_length: - options = [ - ('grpc.primary_user_agent', useragent), - ] - else: - options = [ - ('grpc.max_send_message_length', max_grpc_message_length), # type: ignore - ('grpc.max_receive_message_length', max_grpc_message_length), # type: ignore - ('grpc.primary_user_agent', useragent), - ] + options = [('grpc.primary_user_agent', useragent)] + if max_grpc_message_length: + options.append(('grpc.max_send_message_length', max_grpc_message_length)) # type: ignore + options.append(('grpc.max_receive_message_length', max_grpc_message_length)) # type: ignore + elif settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES: + options.append( + ( + 'grpc.max_receive_message_length', + settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES, + ) # type: ignore + ) if not address: address = settings.DAPR_GRPC_ENDPOINT or ( diff --git a/dapr/conf/global_settings.py b/dapr/conf/global_settings.py index 5179f6a3a..924117daf 100644 --- a/dapr/conf/global_settings.py +++ b/dapr/conf/global_settings.py @@ -36,6 +36,10 @@ DAPR_HTTP_TIMEOUT_SECONDS = 60 +# Maximum inbound gRPC message size in bytes for the SDK client and ext-grpc callback server. +# 0 leaves the gRPC default (4 MiB) in place. See dapr/python-sdk#1023. +DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES = 0 + # ----- Conversation API settings ------ # Configuration for handling large enums to avoid massive JSON schemas that can exceed LLM token limits diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/app.py b/ext/dapr-ext-grpc/dapr/ext/grpc/app.py index 58e0cdf29..9f68f7a9c 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/app.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/app.py @@ -39,8 +39,11 @@ def __init__(self, max_grpc_message_length: Optional[int] = None, **kwargs): """Inits App object and creates gRPC server. Args: - max_grpc_messsage_length (int, optional): The maximum grpc send and receive - message length in bytes. Only used when kwargs are not set. + max_grpc_message_length (int, optional): The maximum grpc send and receive + message length in bytes. Only used when kwargs are not set. When this + argument is omitted, the env var + ``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` is consulted to set the + receive limit (matches the Java SDK property of the same name). kwargs: arguments to grpc.server() """ self._servicer = _CallbackServicer() @@ -52,6 +55,13 @@ def __init__(self, max_grpc_message_length: Optional[int] = None, **kwargs): ('grpc.max_send_message_length', max_grpc_message_length), ('grpc.max_receive_message_length', max_grpc_message_length), ] + elif settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES: + options = [ + ( + 'grpc.max_receive_message_length', + settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES, + ), + ] self._server = grpc.server( # type: ignore futures.ThreadPoolExecutor(max_workers=10), options=options ) diff --git a/ext/dapr-ext-grpc/tests/test_app.py b/ext/dapr-ext-grpc/tests/test_app.py index 315d9e18b..99ee0b37c 100644 --- a/ext/dapr-ext-grpc/tests/test_app.py +++ b/ext/dapr-ext-grpc/tests/test_app.py @@ -14,10 +14,13 @@ """ import unittest +from unittest.mock import MagicMock, patch from cloudevents.sdk.event import v1 from dapr.ext.grpc import App, BindingRequest, InvokeMethodRequest, Rule +from dapr.conf import settings + class AppTests(unittest.TestCase): def setUp(self): @@ -88,3 +91,47 @@ def health_check_cb(): def test_no_health_check(self): registered_cb = self._app._health_check_servicer._health_check_cb self.assertIsNone(registered_cb) + + +class AppGrpcOptionsTests(unittest.TestCase): + """Exercises options passed to grpc.server() based on env var / constructor arg.""" + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + @patch('dapr.ext.grpc.app.grpc.server') + def test_default_no_size_options(self, mock_server): + mock_server.return_value = MagicMock() + + App() + + _, kwargs = mock_server.call_args + options = dict(kwargs.get('options') or []) + self.assertNotIn('grpc.max_send_message_length', options) + self.assertNotIn('grpc.max_receive_message_length', options) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024) + @patch('dapr.ext.grpc.app.grpc.server') + def test_env_var_sets_receive_only(self, mock_server): + mock_server.return_value = MagicMock() + + App() + + _, kwargs = mock_server.call_args + options = dict(kwargs.get('options') or []) + self.assertEqual(options.get('grpc.max_receive_message_length'), 8 * 1024 * 1024) + self.assertNotIn('grpc.max_send_message_length', options) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024) + @patch('dapr.ext.grpc.app.grpc.server') + def test_constructor_arg_overrides_env(self, mock_server): + mock_server.return_value = MagicMock() + + App(max_grpc_message_length=32 * 1024 * 1024) + + _, kwargs = mock_server.call_args + options = dict(kwargs.get('options') or []) + self.assertEqual(options.get('grpc.max_send_message_length'), 32 * 1024 * 1024) + self.assertEqual(options.get('grpc.max_receive_message_length'), 32 * 1024 * 1024) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/clients/test_dapr_grpc_client.py b/tests/clients/test_dapr_grpc_client.py index 211b66ab6..06549276b 100644 --- a/tests/clients/test_dapr_grpc_client.py +++ b/tests/clients/test_dapr_grpc_client.py @@ -20,7 +20,7 @@ import time import unittest import uuid -from unittest.mock import patch +from unittest.mock import MagicMock, patch from google.rpc import code_pb2, status_pb2 @@ -1711,6 +1711,43 @@ def test_jobs_error_handling(self): with self.assertRaises(DaprGrpcError): dapr.schedule_job_alpha1(job) + def _capture_grpc_channel_options(self, **client_kwargs) -> dict[str, object]: + with ( + patch('dapr.clients.grpc.client.grpc.secure_channel') as mock_secure, + patch('dapr.clients.grpc.client.grpc.insecure_channel') as mock_insecure, + patch( + 'dapr.clients.grpc.client.grpc.intercept_channel', + side_effect=lambda ch, *_: ch, + ), + ): + mock_secure.return_value = MagicMock(name='secure-channel') + mock_insecure.return_value = MagicMock(name='insecure-channel') + DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}', **client_kwargs) + call = mock_insecure.call_args or mock_secure.call_args + return dict(call.kwargs['options']) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_grpc_channel_options_default(self): + """Without env var or constructor arg, no message size options are set.""" + options = self._capture_grpc_channel_options() + self.assertNotIn('grpc.max_send_message_length', options) + self.assertNotIn('grpc.max_receive_message_length', options) + self.assertIn('grpc.primary_user_agent', options) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024) + def test_grpc_channel_options_env_var_sets_receive_only(self): + """DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES sets receive size; send stays at gRPC default.""" + options = self._capture_grpc_channel_options() + self.assertEqual(options.get('grpc.max_receive_message_length'), 8 * 1024 * 1024) + self.assertNotIn('grpc.max_send_message_length', options) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024) + def test_grpc_channel_options_constructor_arg_overrides_env(self): + """Explicit max_grpc_message_length wins over the env var and sets both send + receive.""" + options = self._capture_grpc_channel_options(max_grpc_message_length=32 * 1024 * 1024) + self.assertEqual(options.get('grpc.max_send_message_length'), 32 * 1024 * 1024) + self.assertEqual(options.get('grpc.max_receive_message_length'), 32 * 1024 * 1024) + if __name__ == '__main__': unittest.main() diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index e27b8dc52..6b224f4cb 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -18,7 +18,7 @@ import tempfile import unittest import uuid -from unittest.mock import patch +from unittest.mock import MagicMock, patch from google.rpc import code_pb2, status_pb2 @@ -1763,6 +1763,42 @@ async def test_job_lifecycle(self): await dapr.close() + def _capture_grpc_channel_options(self, **client_kwargs) -> dict[str, object]: + with ( + patch('dapr.aio.clients.grpc.client.grpc.aio.secure_channel') as mock_secure, + patch('dapr.aio.clients.grpc.client.grpc.aio.insecure_channel') as mock_insecure, + ): + mock_secure.return_value = MagicMock(name='secure-channel') + mock_insecure.return_value = MagicMock(name='insecure-channel') + DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}', **client_kwargs) + if mock_insecure.call_args: + args, _ = mock_insecure.call_args + return dict(args[1]) + _, kwargs = mock_secure.call_args + return dict(kwargs['options']) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_grpc_channel_options_default(self): + """Without env var or constructor arg, no message size options are set.""" + options = self._capture_grpc_channel_options() + self.assertNotIn('grpc.max_send_message_length', options) + self.assertNotIn('grpc.max_receive_message_length', options) + self.assertIn('grpc.primary_user_agent', options) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024) + def test_grpc_channel_options_env_var_sets_receive_only(self): + """DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES sets receive size; send stays at gRPC default.""" + options = self._capture_grpc_channel_options() + self.assertEqual(options.get('grpc.max_receive_message_length'), 8 * 1024 * 1024) + self.assertNotIn('grpc.max_send_message_length', options) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 8 * 1024 * 1024) + def test_grpc_channel_options_constructor_arg_overrides_env(self): + """Explicit max_grpc_message_length wins over the env var and sets both send + receive.""" + options = self._capture_grpc_channel_options(max_grpc_message_length=32 * 1024 * 1024) + self.assertEqual(options.get('grpc.max_send_message_length'), 32 * 1024 * 1024) + self.assertEqual(options.get('grpc.max_receive_message_length'), 32 * 1024 * 1024) + if __name__ == '__main__': unittest.main()