feat(core): add JSON Batch format support#299
Conversation
Implement the CloudEvents JSON Batch format (application/cloudevents-batch+json) in the core package. - formats/json.py: extract shared _event_to_dict/_dict_to_event helpers and add BATCH_CONTENT_TYPE, write_batch, read_batch, get_batch_content_type - formats/base.py: extend the Format protocol with the batch methods - bindings/http.py: add to_batch/from_batch and to_batch_event/from_batch_event, and reject batch payloads in from_http with a clear error - tests: cover round-trip, empty batch, mixed 0.3/1.0 versions, data_base64, invalid-element abort, non-array body, and from_http batch rejection Signed-off-by: PlugaruT <plugaru.tudor@protonmail.com>
| batch_content_type = event_format.get_batch_content_type() | ||
| if content_type.split(";")[0].strip().lower() == batch_content_type: | ||
| raise ValueError( | ||
| f"Received a batch payload ('{batch_content_type}'); " | ||
| "use from_batch()/from_batch_event() to parse batched CloudEvents" | ||
| ) |
There was a problem hiding this comment.
what if content type has no ; in it?
get_batch_content_type does not enforce lower(). So we should either do lower() on both or reconsider how this is compared.
And I'd also dump what we compared against what we wanted into the error message, so worth extracting the content_type.split(";")[0].strip().lower() part into a variable.
| return from_structured(message, event_format, None) | ||
|
|
||
|
|
||
| def to_batch_event( |
There was a problem hiding this comment.
I think language-wise it should be to_events_batch. It's not a batch event, it's a batch of events, right?
| return to_batch(events, event_format) | ||
|
|
||
|
|
||
| def from_batch_event( |
There was a problem hiding this comment.
Same applies here then. it's probably from_events_batch
| def write_batch(self, events: list[BaseCloudEvent]) -> bytes: | ||
| """ | ||
| Serialize a list of CloudEvents into a batch wire representation. | ||
|
|
||
| :param events: The CloudEvents to serialize. | ||
| :return: The batch serialized as bytes. | ||
| """ | ||
| ... | ||
|
|
||
| def read_batch( | ||
| self, | ||
| event_factory: EventFactory | None, | ||
| data: str | bytes, | ||
| ) -> list[BaseCloudEvent]: | ||
| """ | ||
| Deserialize a batch wire representation into a list of CloudEvents. | ||
|
|
||
| :param event_factory: Factory to create CloudEvent instances, or None to | ||
| auto-detect each element's version. | ||
| :param data: The serialized batch as a string or bytes. | ||
| :return: The list of CloudEvent instances. | ||
| """ | ||
| ... | ||
|
|
||
| def get_batch_content_type(self) -> str: | ||
| """ | ||
| Get the Content-Type header value for batch mode. | ||
|
|
||
| :return: Content type string for CloudEvents batch content mode. | ||
| """ | ||
| ... |
There was a problem hiding this comment.
While this is the first "batch" support in the SDK, we should make sure we make it easy to work with and extend. So I think instead of extending the existing format we should have smth like BatchFormat protocol. This way we would be able to create new formats without having to implement batching right away. This also won't break anyone else's implementation if anyone already relies on Format.
| return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8") | ||
| parsed = loads(decoded_data) | ||
| if not isinstance(parsed, list): | ||
| raise ValueError("JSON batch payload must be a JSON array of CloudEvents") |
There was a problem hiding this comment.
let's tell what the actual type was
|
|
||
| return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8") | ||
| parsed = loads(decoded_data) | ||
| if not isinstance(parsed, list): |
There was a problem hiding this comment.
we should probably check for Sequence, not list.
| if not isinstance(parsed, list): | ||
| raise ValueError("JSON batch payload must be a JSON array of CloudEvents") | ||
|
|
||
| return [self._dict_to_event(event_factory, item) for item in parsed] |
There was a problem hiding this comment.
it is theoretically possible to arrive here with a list of ints or other non-cloud-event types. I.e. we'll bypass the list check from above. And yes, it'll fail inside the _dict_to_event but the failure itself won't be very dev-friendly as we won't have any clue about what item out of all has failed, what the actual problem was, etc. IMO it is a good idea to wrap this into an additional try-catch and provide a more dev-friendly error with more details if smth fails, wdyt?
What
Adds support for the CloudEvents JSON Batch format (
application/cloudevents-batch+json) to thecloudevents.corepackage, across the JSON format layer and the HTTP binding.Batch mode is only defined by the spec for the HTTP protocol binding (via the JSON Batch format); Kafka and AMQP define only structured/binary modes, so this change is intentionally scoped to JSON + HTTP.
Changes
formats/json.py: refactorwrite/readonto shared_event_to_dict/_dict_to_eventhelpers (no behavior change), then addBATCH_CONTENT_TYPE,write_batch,read_batch, andget_batch_content_type.formats/base.py: extend theFormatprotocol withwrite_batch/read_batch/get_batch_content_typeso any format implements the same contract.bindings/http.py: addto_batch/from_batchplus theto_batch_event/from_batch_eventJSON-default wrappers;from_httpnow rejects batch payloads with a clear, case-insensitive error pointing callers tofrom_batch.Behavior
data_base64for v1.0,datacontentencoding: base64for v0.3).ValueError.Tests
Added coverage for round-trip, empty batch, mixed-version batches,
data_base64elements, invalid-element abort, non-array body, andfrom_httpbatch rejection (incl. charset param and mixed-case content type). Fulltests/test_coresuite passes (356 tests);ruffclean.