Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ async def encode_completion(
encode_headers: bool,
) -> None:
"""Encode all payloads in the completion."""
if data_converter._encode_payload_has_effect:
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not encode_headers
).visit(_Visitor(data_converter._encode_payload_sequence), completion)
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not encode_headers
).visit(_Visitor(data_converter._encode_payload_sequence), completion)
2 changes: 1 addition & 1 deletion temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6837,7 +6837,7 @@ async def _apply_headers(
) -> None:
if source is None:
return
if encode_headers and data_converter._encode_payload_has_effect:
if encode_headers:
for payload in source.values():
payload.CopyFrom(await data_converter._encode_payload(payload))
temporalio.common._apply_headers(source, dest)
Expand Down
74 changes: 65 additions & 9 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,20 @@ def __init__(self) -> None:
super().__init__(encode_common_attributes=True)


@dataclass(frozen=True)
class PayloadLimitsConfig:
"""Configuration for when payload sizes exceed limits."""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"uploaded payload sizes exceed the temporal server's limits." would be clearer.


memo_upload_error_limit: int | None = None
"""The limit at which a memo size error is created."""
memo_upload_warning_limit: int = 2 * 1024
"""The limit at which a memo size warning is created."""
payload_upload_error_limit: int | None = None
"""The limit at which a payloads size error is created."""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was supposed to be a server setting. Seems dangerous to let them do anything other than turn it off, as it could get out of sync with the server. And we should document what will happen if you disable it--that the payload will go to the server and it will get an error anyway. I feel that turning it off is only here as a killswitch in case there is a backward compat issue--do you agree?
Should also document how to change the server limit for open source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a newer version of the SDK that has payload limit checks is used with a server version that doesn't provide the limit information or (going back even farther) doesn't describe namespace at all, would you expect error limit checks to be enabled in the SDK? If no, then I think we can change this to just be a boolean disable type flag. Or if we tell folks to upgrade the server to get this more desirable behavior. Otherwise, a setting is necessary to set the limit for the worker.

payload_upload_warning_limit: int = 512 * 1024
"""The limit at which a payloads size warning is created."""


@dataclass(frozen=True)
class DataConverter(WithSerializationContext):
"""Data converter for converting and encoding payloads to/from Python values.
Expand All @@ -1230,6 +1244,9 @@ class DataConverter(WithSerializationContext):
failure_converter: FailureConverter = dataclasses.field(init=False)
"""Failure converter created from the :py:attr:`failure_converter_class`."""

payload_limits: PayloadLimitsConfig = PayloadLimitsConfig()
"""Settings for payload size limits."""

default: ClassVar[DataConverter]
"""Singleton default data converter."""

Expand Down Expand Up @@ -1367,35 +1384,42 @@ async def _encode_memo(
async def _encode_memo_existing(
self, source: Mapping[str, Any], memo: temporalio.api.common.v1.Memo
):
payloads = []
for k, v in source.items():
payload = v
if not isinstance(v, temporalio.api.common.v1.Payload):
payload = (await self.encode([v]))[0]
memo.fields[k].CopyFrom(payload)
payloads.append(payload)
# Memos have their field payloads validated all together in one unit
self._validate_limits(
payloads,
self.payload_limits.memo_upload_error_limit,
self.payload_limits.memo_upload_warning_limit,
"Memo size exceeded the warning limit.",
)

async def _encode_payload(
self, payload: temporalio.api.common.v1.Payload
) -> temporalio.api.common.v1.Payload:
if self.payload_codec:
payload = (await self.payload_codec.encode([payload]))[0]
self._validate_payload_limits([payload])
return payload

async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
if self.payload_codec:
await self.payload_codec.encode_wrapper(payloads)
self._validate_payload_limits(payloads.payloads)

async def _encode_payload_sequence(
self, payloads: Sequence[temporalio.api.common.v1.Payload]
) -> list[temporalio.api.common.v1.Payload]:
if not self.payload_codec:
return list(payloads)
return await self.payload_codec.encode(payloads)

# Temporary shortcircuit detection while the _encode_* methods may no-op if
# a payload codec is not configured. Remove once those paths have more to them.
@property
def _encode_payload_has_effect(self) -> bool:
return self.payload_codec is not None
encoded_payloads = list(payloads)
if self.payload_codec:
encoded_payloads = await self.payload_codec.encode(encoded_payloads)
self._validate_payload_limits(encoded_payloads)
return encoded_payloads

async def _decode_payload(
self, payload: temporalio.api.common.v1.Payload
Expand Down Expand Up @@ -1452,6 +1476,38 @@ async def _apply_to_failure_payloads(
if failure.HasField("cause"):
await DataConverter._apply_to_failure_payloads(failure.cause, cb)

def _validate_payload_limits(
self,
payloads: Sequence[temporalio.api.common.v1.Payload],
):
self._validate_limits(
payloads,
self.payload_limits.payload_upload_error_limit,
self.payload_limits.payload_upload_warning_limit,
"Payloads size exceeded the warning limit.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

def _validate_limits(
self,
payloads: Sequence[temporalio.api.common.v1.Payload],
error_limit: int | None,
warning_limit: int,
warning_message: str,
):
total_size = sum(payload.ByteSize() for payload in payloads)

if error_limit and error_limit > 0 and total_size > error_limit:
raise temporalio.exceptions.PayloadSizeError(
size=total_size,
limit=error_limit,
)

if warning_limit and warning_limit > 0 and total_size > warning_limit:
# TODO: Use a context aware logger to log extra information about workflow/activity/etc

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still TODO right?

warnings.warn(
f"{warning_message} Size: {total_size} bytes, Limit: {warning_limit} bytes"
)


DefaultPayloadConverter.default_encoding_payload_converters = (
BinaryNullPayloadConverter(),
Expand Down
25 changes: 25 additions & 0 deletions temporalio/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,28 @@ def is_cancelled_exception(exception: BaseException) -> bool:
and isinstance(exception.cause, CancelledError)
)
)


class PayloadSizeError(TemporalError):
"""Error raised when payloads size exceeds payload size limits."""

def __init__(self, size: int, limit: int):
"""Initialize a payloads limit error.

Args:
size: Actual payloads size in bytes.
limit: Payloads size limit in bytes.
"""
super().__init__("Payloads size exceeded the error limit")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar points here.

self._size = size
self._limit = limit

@property
def payloads_size(self) -> int:
"""Actual payloads size in bytes."""
return self._size

@property
def payloads_limit(self) -> int:
"""Payloads size limit in bytes."""
return self._limit
13 changes: 13 additions & 0 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,19 @@ async def _handle_start_activity_task(
temporalio.exceptions.CancelledError("Cancelled"),
completion.result.cancelled.failure,
)
elif isinstance(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, this doesn't catch cases where encoding the failure includes payloads too large, may want to handle this error in the outer except. This can happen when application error details are too large, or when stack trace is too large and it's moved to encoded attributes (see temporalio/features#597).

err,
temporalio.exceptions.PayloadSizeError,
):
temporalio.activity.logger.warning(
"Activity task failed: payloads size exceeded the error limit. Size: %d bytes, Limit: %d bytes",
err.payloads_size,
err.payloads_limit,
extra={"__temporal_error_identifier": "ActivityFailure"},
)
await data_converter.encode_failure(
err, completion.result.failed.failure
)
else:
if (
isinstance(
Expand Down
16 changes: 16 additions & 0 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import temporalio.converter
import temporalio.exceptions
import temporalio.workflow
from temporalio.api.enums.v1 import WorkflowTaskFailedCause
from temporalio.bridge.worker import PollShutdownError

from . import _command_aware_visitor
Expand Down Expand Up @@ -372,6 +373,21 @@ async def _handle_activation(
data_converter,
encode_headers=self._encode_headers,
)
except temporalio.exceptions.PayloadSizeError as err:
# TODO: Would like to use temporalio.workflow.logger here, but
# that requires being in the workflow event loop. Possibly refactor
# the logger core functionality into shareable class and update
# LoggerAdapter to be a decorator.
logger.warning(
"Workflow task failed: payloads size exceeded the error limit. Size: %d bytes, Limit: %d bytes",
err.payloads_size,
err.payloads_limit,
)
completion.failed.Clear()
await data_converter.encode_failure(err, completion.failed.failure)
completion.failed.force_cause = (
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_PAYLOADS_TOO_LARGE
)
except Exception as err:
logger.exception(
"Failed encoding completion on workflow with run ID %s", act.run_id
Expand Down
Loading
Loading