-
Notifications
You must be signed in to change notification settings - Fork 152
Payload limit configuration and validation #1288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1207,6 +1207,20 @@ def __init__(self) -> None: | |
| super().__init__(encode_common_attributes=True) | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class PayloadLimitsConfig: | ||
| """Configuration for when payload sizes exceed limits.""" | ||
|
|
||
| memo_upload_error_limit: int | None = None | ||
| """The limit at which a memo size error is created.""" | ||
| memo_upload_warning_limit: int = 2 * 1024 | ||
| """The limit at which a memo size warning is created.""" | ||
| payload_upload_error_limit: int | None = None | ||
| """The limit at which a payloads size error is created.""" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought this was supposed to be a server setting. Seems dangerous to let them do anything other than turn it off, as it could get out of sync with the server. And we should document what will happen if you disable it--that the payload will go to the server and it will get an error anyway. I feel that turning it off is only here as a killswitch in case there is a backward compat issue--do you agree?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a newer version of the SDK that has payload limit checks is used with a server version that doesn't provide the limit information or (going back even farther) doesn't describe namespace at all, would you expect error limit checks to be enabled in the SDK? If no, then I think we can change this to just be a boolean disable type flag. Or if we tell folks to upgrade the server to get this more desirable behavior. Otherwise, a setting is necessary to set the limit for the worker. |
||
| payload_upload_warning_limit: int = 512 * 1024 | ||
| """The limit at which a payloads size warning is created.""" | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class DataConverter(WithSerializationContext): | ||
| """Data converter for converting and encoding payloads to/from Python values. | ||
|
|
@@ -1230,6 +1244,9 @@ class DataConverter(WithSerializationContext): | |
| failure_converter: FailureConverter = dataclasses.field(init=False) | ||
| """Failure converter created from the :py:attr:`failure_converter_class`.""" | ||
|
|
||
| payload_limits: PayloadLimitsConfig = PayloadLimitsConfig() | ||
| """Settings for payload size limits.""" | ||
|
|
||
| default: ClassVar[DataConverter] | ||
| """Singleton default data converter.""" | ||
|
|
||
|
|
@@ -1367,35 +1384,42 @@ async def _encode_memo( | |
| async def _encode_memo_existing( | ||
| self, source: Mapping[str, Any], memo: temporalio.api.common.v1.Memo | ||
| ): | ||
| payloads = [] | ||
| for k, v in source.items(): | ||
| payload = v | ||
| if not isinstance(v, temporalio.api.common.v1.Payload): | ||
| payload = (await self.encode([v]))[0] | ||
| memo.fields[k].CopyFrom(payload) | ||
| payloads.append(payload) | ||
| # Memos have their field payloads validated all together in one unit | ||
| self._validate_limits( | ||
| payloads, | ||
| self.payload_limits.memo_upload_error_limit, | ||
| self.payload_limits.memo_upload_warning_limit, | ||
| "Memo size exceeded the warning limit.", | ||
| ) | ||
|
|
||
| async def _encode_payload( | ||
| self, payload: temporalio.api.common.v1.Payload | ||
| ) -> temporalio.api.common.v1.Payload: | ||
| if self.payload_codec: | ||
| payload = (await self.payload_codec.encode([payload]))[0] | ||
| self._validate_payload_limits([payload]) | ||
| return payload | ||
|
|
||
| async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads): | ||
| if self.payload_codec: | ||
| await self.payload_codec.encode_wrapper(payloads) | ||
| self._validate_payload_limits(payloads.payloads) | ||
|
|
||
| async def _encode_payload_sequence( | ||
| self, payloads: Sequence[temporalio.api.common.v1.Payload] | ||
| ) -> list[temporalio.api.common.v1.Payload]: | ||
| if not self.payload_codec: | ||
| return list(payloads) | ||
| return await self.payload_codec.encode(payloads) | ||
|
|
||
| # Temporary shortcircuit detection while the _encode_* methods may no-op if | ||
| # a payload codec is not configured. Remove once those paths have more to them. | ||
| @property | ||
| def _encode_payload_has_effect(self) -> bool: | ||
| return self.payload_codec is not None | ||
| encoded_payloads = list(payloads) | ||
| if self.payload_codec: | ||
| encoded_payloads = await self.payload_codec.encode(encoded_payloads) | ||
| self._validate_payload_limits(encoded_payloads) | ||
| return encoded_payloads | ||
|
|
||
| async def _decode_payload( | ||
| self, payload: temporalio.api.common.v1.Payload | ||
|
|
@@ -1452,6 +1476,38 @@ async def _apply_to_failure_payloads( | |
| if failure.HasField("cause"): | ||
| await DataConverter._apply_to_failure_payloads(failure.cause, cb) | ||
|
|
||
| def _validate_payload_limits( | ||
| self, | ||
| payloads: Sequence[temporalio.api.common.v1.Payload], | ||
| ): | ||
| self._validate_limits( | ||
| payloads, | ||
| self.payload_limits.payload_upload_error_limit, | ||
| self.payload_limits.payload_upload_warning_limit, | ||
| "Payloads size exceeded the warning limit.", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| ) | ||
|
|
||
| def _validate_limits( | ||
| self, | ||
| payloads: Sequence[temporalio.api.common.v1.Payload], | ||
| error_limit: int | None, | ||
| warning_limit: int, | ||
| warning_message: str, | ||
| ): | ||
| total_size = sum(payload.ByteSize() for payload in payloads) | ||
|
|
||
| if error_limit and error_limit > 0 and total_size > error_limit: | ||
| raise temporalio.exceptions.PayloadSizeError( | ||
| size=total_size, | ||
| limit=error_limit, | ||
| ) | ||
|
|
||
| if warning_limit and warning_limit > 0 and total_size > warning_limit: | ||
| # TODO: Use a context aware logger to log extra information about workflow/activity/etc | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. still TODO right? |
||
| warnings.warn( | ||
| f"{warning_message} Size: {total_size} bytes, Limit: {warning_limit} bytes" | ||
| ) | ||
|
|
||
|
|
||
| DefaultPayloadConverter.default_encoding_payload_converters = ( | ||
| BinaryNullPayloadConverter(), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -446,3 +446,28 @@ def is_cancelled_exception(exception: BaseException) -> bool: | |
| and isinstance(exception.cause, CancelledError) | ||
| ) | ||
| ) | ||
|
|
||
|
|
||
| class PayloadSizeError(TemporalError): | ||
| """Error raised when payloads size exceeds payload size limits.""" | ||
|
|
||
| def __init__(self, size: int, limit: int): | ||
| """Initialize a payloads limit error. | ||
|
|
||
| Args: | ||
| size: Actual payloads size in bytes. | ||
| limit: Payloads size limit in bytes. | ||
| """ | ||
| super().__init__("Payloads size exceeded the error limit") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar points here. |
||
| self._size = size | ||
| self._limit = limit | ||
|
|
||
| @property | ||
| def payloads_size(self) -> int: | ||
| """Actual payloads size in bytes.""" | ||
| return self._size | ||
|
|
||
| @property | ||
| def payloads_limit(self) -> int: | ||
| """Payloads size limit in bytes.""" | ||
| return self._limit | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -380,6 +380,19 @@ async def _handle_start_activity_task( | |
| temporalio.exceptions.CancelledError("Cancelled"), | ||
| completion.result.cancelled.failure, | ||
| ) | ||
| elif isinstance( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note, this doesn't catch cases where encoding the failure includes payloads too large, may want to handle this error in the outer |
||
| err, | ||
| temporalio.exceptions.PayloadSizeError, | ||
| ): | ||
| temporalio.activity.logger.warning( | ||
| "Activity task failed: payloads size exceeded the error limit. Size: %d bytes, Limit: %d bytes", | ||
| err.payloads_size, | ||
| err.payloads_limit, | ||
| extra={"__temporal_error_identifier": "ActivityFailure"}, | ||
| ) | ||
| await data_converter.encode_failure( | ||
| err, completion.result.failed.failure | ||
| ) | ||
| else: | ||
| if ( | ||
| isinstance( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"uploaded payload sizes exceed the temporal server's limits." would be clearer.