Skip to content

feat(core): add JSON Batch format support#299

Open
PlugaruT wants to merge 1 commit into
cloudevents:mainfrom
PlugaruT:feat/json-batch-core
Open

feat(core): add JSON Batch format support#299
PlugaruT wants to merge 1 commit into
cloudevents:mainfrom
PlugaruT:feat/json-batch-core

Conversation

@PlugaruT

Copy link
Copy Markdown
Contributor

What

Adds support for the CloudEvents JSON Batch format (application/cloudevents-batch+json) to the cloudevents.core package, across the JSON format layer and the HTTP binding.

Batch mode is only defined by the spec for the HTTP protocol binding (via the JSON Batch format); Kafka and AMQP define only structured/binary modes, so this change is intentionally scoped to JSON + HTTP.

Changes

  • formats/json.py: refactor write/read onto shared _event_to_dict/_dict_to_event helpers (no behavior change), then add BATCH_CONTENT_TYPE, write_batch, read_batch, and get_batch_content_type.
  • formats/base.py: extend the Format protocol with write_batch/read_batch/get_batch_content_type so any format implements the same contract.
  • bindings/http.py: add to_batch/from_batch plus the to_batch_event/from_batch_event JSON-default wrappers; from_http now rejects batch payloads with a clear, case-insensitive error pointing callers to from_batch.

Behavior

  • Empty array ↔ empty list.
  • Per-element version auto-detection — a batch may mix v0.3 and v1.0 events.
  • Binary data round-trips (data_base64 for v1.0, datacontentencoding: base64 for v0.3).
  • A malformed/invalid element aborts the whole call (no partial results).
  • A non-array body raises ValueError.

Tests

Added coverage for round-trip, empty batch, mixed-version batches, data_base64 elements, invalid-element abort, non-array body, and from_http batch rejection (incl. charset param and mixed-case content type). Full tests/test_core suite passes (356 tests); ruff clean.

Implement the CloudEvents JSON Batch format
(application/cloudevents-batch+json) in the core package.

- formats/json.py: extract shared _event_to_dict/_dict_to_event helpers
  and add BATCH_CONTENT_TYPE, write_batch, read_batch, get_batch_content_type
- formats/base.py: extend the Format protocol with the batch methods
- bindings/http.py: add to_batch/from_batch and to_batch_event/from_batch_event,
  and reject batch payloads in from_http with a clear error
- tests: cover round-trip, empty batch, mixed 0.3/1.0 versions, data_base64,
  invalid-element abort, non-array body, and from_http batch rejection

Signed-off-by: PlugaruT <plugaru.tudor@protonmail.com>
@PlugaruT PlugaruT marked this pull request as ready for review June 27, 2026 14:44
@PlugaruT PlugaruT requested a review from a team June 27, 2026 14:45

@xSAVIKx xSAVIKx left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PlugaruT, please take a look at some comments and suggestions noted below. IMO the biggest part here is to split the Format/BatchFormat to ease future development.

Comment on lines +345 to +350
batch_content_type = event_format.get_batch_content_type()
if content_type.split(";")[0].strip().lower() == batch_content_type:
raise ValueError(
f"Received a batch payload ('{batch_content_type}'); "
"use from_batch()/from_batch_event() to parse batched CloudEvents"
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if content type has no ; in it?

get_batch_content_type does not enforce lower(). So we should either do lower() on both or reconsider how this is compared.

And I'd also dump what we compared against what we wanted into the error message, so worth extracting the content_type.split(";")[0].strip().lower() part into a variable.

return from_structured(message, event_format, None)


def to_batch_event(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think language-wise it should be to_events_batch. It's not a batch event, it's a batch of events, right?

return to_batch(events, event_format)


def from_batch_event(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies here then. it's probably from_events_batch

Comment on lines +92 to +122
def write_batch(self, events: list[BaseCloudEvent]) -> bytes:
"""
Serialize a list of CloudEvents into a batch wire representation.

:param events: The CloudEvents to serialize.
:return: The batch serialized as bytes.
"""
...

def read_batch(
self,
event_factory: EventFactory | None,
data: str | bytes,
) -> list[BaseCloudEvent]:
"""
Deserialize a batch wire representation into a list of CloudEvents.

:param event_factory: Factory to create CloudEvent instances, or None to
auto-detect each element's version.
:param data: The serialized batch as a string or bytes.
:return: The list of CloudEvent instances.
"""
...

def get_batch_content_type(self) -> str:
"""
Get the Content-Type header value for batch mode.

:return: Content type string for CloudEvents batch content mode.
"""
...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is the first "batch" support in the SDK, we should make sure we make it easy to work with and extend. So I think instead of extending the existing format we should have smth like BatchFormat protocol. This way we would be able to create new formats without having to implement batching right away. This also won't break anyone else's implementation if anyone already relies on Format.

return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8")
parsed = loads(decoded_data)
if not isinstance(parsed, list):
raise ValueError("JSON batch payload must be a JSON array of CloudEvents")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's tell what the actual type was


return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8")
parsed = loads(decoded_data)
if not isinstance(parsed, list):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably check for Sequence, not list.

if not isinstance(parsed, list):
raise ValueError("JSON batch payload must be a JSON array of CloudEvents")

return [self._dict_to_event(event_factory, item) for item in parsed]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is theoretically possible to arrive here with a list of ints or other non-cloud-event types. I.e. we'll bypass the list check from above. And yes, it'll fail inside the _dict_to_event but the failure itself won't be very dev-friendly as we won't have any clue about what item out of all has failed, what the actual problem was, etc. IMO it is a good idea to wrap this into an additional try-catch and provide a more dev-friendly error with more details if smth fails, wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants