Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/cloudevents/core/bindings/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,42 @@ def from_structured(
return event_format.read(event_factory, message.body)


def to_batch(events: list[BaseCloudEvent], event_format: Format) -> HTTPMessage:
"""
Convert a list of CloudEvents to an HTTP batched content mode message.

The entire batch is serialized into the HTTP body as a JSON array and the
Content-Type header is set to the format's batch media type.

:param events: The CloudEvents to convert
:param event_format: Format implementation for batch serialization
:return: HTTPMessage with the batch in the body
"""
headers = {CONTENT_TYPE_HEADER: event_format.get_batch_content_type()}
body = event_format.write_batch(events)
return HTTPMessage(headers=headers, body=body)


def from_batch(
message: HTTPMessage,
event_format: Format,
event_factory: EventFactory | None = None,
) -> list[BaseCloudEvent]:
"""
Parse an HTTP batched content mode message to a list of CloudEvents.

Deserializes the batch from the HTTP body using the specified format. When
``event_factory`` is None, each event's version is auto-detected independently.

:param message: HTTPMessage to parse
:param event_format: Format implementation for batch deserialization
:param event_factory: Factory to create CloudEvent instances (auto-detected if None)
:return: List of CloudEvent instances
"""
events: list[BaseCloudEvent] = event_format.read_batch(event_factory, message.body)
return events


def from_http(
message: HTTPMessage,
event_format: Format,
Expand Down Expand Up @@ -300,6 +336,19 @@ def from_http(
:param event_factory: Factory function to create CloudEvent instances (auto-detected if None)
:return: CloudEvent instance
"""
content_type = ""
for key, value in message.headers.items():
if key.lower() == CONTENT_TYPE_HEADER:
content_type = value
break

batch_content_type = event_format.get_batch_content_type()
if content_type.split(";")[0].strip().lower() == batch_content_type:
raise ValueError(
f"Received a batch payload ('{batch_content_type}'); "
"use from_batch()/from_batch_event() to parse batched CloudEvents"
)
Comment on lines +345 to +350

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if content type has no ; in it?

get_batch_content_type does not enforce lower(). So we should either do lower() on both or reconsider how this is compared.

And I'd also dump what we compared against what we wanted into the error message, so worth extracting the content_type.split(";")[0].strip().lower() part into a variable.


if any(key.lower().startswith(CE_PREFIX) for key in message.headers.keys()):
return from_binary(message, event_format, event_factory)

Expand Down Expand Up @@ -402,6 +451,38 @@ def from_structured_event(
return from_structured(message, event_format, None)


def to_batch_event(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think language-wise it should be to_events_batch. It's not a batch event, it's a batch of events, right?

events: list[BaseCloudEvent],
event_format: Format | None = None,
) -> HTTPMessage:
"""
Convenience wrapper for to_batch with JSON format as default.

:param events: The CloudEvents to convert
:param event_format: Format implementation (defaults to JSONFormat)
:return: HTTPMessage with the batch in the body
"""
if event_format is None:
event_format = JSONFormat()
return to_batch(events, event_format)


def from_batch_event(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies here then. it's probably from_events_batch

message: HTTPMessage,
event_format: Format | None = None,
) -> list[BaseCloudEvent]:
"""
Convenience wrapper for from_batch with JSON format and auto-detection.

:param message: HTTPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: List of CloudEvent instances (version auto-detected per event)
"""
if event_format is None:
event_format = JSONFormat()
return from_batch(message, event_format, None)


def from_http_event(
message: HTTPMessage,
event_format: Format | None = None,
Expand Down
32 changes: 32 additions & 0 deletions src/cloudevents/core/formats/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,35 @@ def get_content_type(self) -> str:
:return: Content type string for CloudEvents structured content mode
"""
...

def write_batch(self, events: list[BaseCloudEvent]) -> bytes:
"""
Serialize a list of CloudEvents into a batch wire representation.

:param events: The CloudEvents to serialize.
:return: The batch serialized as bytes.
"""
...

def read_batch(
self,
event_factory: EventFactory | None,
data: str | bytes,
) -> list[BaseCloudEvent]:
"""
Deserialize a batch wire representation into a list of CloudEvents.

:param event_factory: Factory to create CloudEvent instances, or None to
auto-detect each element's version.
:param data: The serialized batch as a string or bytes.
:return: The list of CloudEvent instances.
"""
...

def get_batch_content_type(self) -> str:
"""
Get the Content-Type header value for batch mode.

:return: Content type string for CloudEvents batch content mode.
"""
...
Comment on lines +92 to +122

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is the first "batch" support in the SDK, we should make sure we make it easy to work with and extend. So I think instead of extending the existing format we should have smth like BatchFormat protocol. This way we would be able to create new formats without having to implement batching right away. This also won't break anyone else's implementation if anyone already relies on Format.

171 changes: 120 additions & 51 deletions src/cloudevents/core/formats/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,83 @@ def default(self, obj: Any) -> Any:
class JSONFormat(Format):
CONTENT_TYPE: Final[str] = "application/cloudevents+json"
DEFAULT_CONTENT_TYPE: Final[str] = "application/json"
BATCH_CONTENT_TYPE: Final[str] = "application/cloudevents-batch+json"
JSON_CONTENT_TYPE_PATTERN: Pattern[str] = re.compile(
r"^(application|text)/([a-zA-Z0-9\-\.]+\+)?json(;.*)?$"
)

def read(
self,
event_factory: EventFactory | None,
data: str | bytes,
) -> BaseCloudEvent:
def _event_to_dict(self, event: BaseCloudEvent) -> dict[str, Any]:
"""
Read a CloudEvent from a JSON formatted byte string.
Build the JSON-serializable dict for a single CloudEvent.

Shared by single-event ``write`` and batch ``write_batch`` so the two
paths cannot drift.
"""
event_data = event.get_data()
event_dict: dict[str, Any] = dict(event.get_attributes())
specversion = event_dict.get("specversion", SPECVERSION_V1_0)

if event_data is not None:
if isinstance(event_data, (bytes, bytearray)):
if specversion == SPECVERSION_V0_3:
event_dict["datacontentencoding"] = "base64"
event_dict["data"] = base64.b64encode(event_data).decode("utf-8")
else:
event_dict["data_base64"] = base64.b64encode(event_data).decode(
"utf-8"
)
else:
datacontenttype = event_dict.get(
"datacontenttype", self.DEFAULT_CONTENT_TYPE
)
if re.match(JSONFormat.JSON_CONTENT_TYPE_PATTERN, datacontenttype):
event_dict["data"] = event_data
else:
event_dict["data"] = str(event_data)
return event_dict

def write(self, event: BaseCloudEvent) -> bytes:
"""
Write a CloudEvent to a JSON formatted byte string.

Supports both v0.3 and v1.0 CloudEvents:
- v0.3: Uses 'datacontentencoding' attribute with 'data' field
- v0.3: Uses 'datacontentencoding: base64' with base64-encoded 'data' field
- v1.0: Uses 'data_base64' field (no datacontentencoding)

:param event_factory: A factory function to create CloudEvent instances.
If None, automatically detects version from 'specversion' field.
:param data: The JSON formatted byte array.
:return: The CloudEvent instance.
:param event: The CloudEvent to write.
:return: The CloudEvent as a JSON formatted byte array.
"""
decoded_data: str
if isinstance(data, bytes):
decoded_data = data.decode("utf-8")
else:
decoded_data = data
return dumps(self._event_to_dict(event), cls=_JSONEncoderWithDatetime).encode(
"utf-8"
)

def write_batch(self, events: list[BaseCloudEvent]) -> bytes:
"""
Write a list of CloudEvents to a JSON Batch formatted byte string.

The output is a JSON array whose elements are CloudEvents rendered per the
JSON event format. An empty list serializes to ``b"[]"``.

event_attributes = loads(decoded_data)
:param events: The CloudEvents to write.
:return: The batch as a JSON formatted byte array.
"""
return dumps(
[self._event_to_dict(event) for event in events],
cls=_JSONEncoderWithDatetime,
).encode("utf-8")

# Auto-detect version if factory not provided
def _dict_to_event(
self,
event_factory: EventFactory | None,
event_attributes: dict[str, Any],
) -> BaseCloudEvent:
"""
Build a CloudEvent from an already-parsed attributes dict.

Shared by single-event ``read`` and batch ``read_batch``. Handles version
auto-detection, 'time' parsing, v0.3 'datacontentencoding', and v1.0
'data_base64'.
"""
if event_factory is None:
from cloudevents.core.bindings.common import get_event_factory_for_version

Expand All @@ -84,67 +131,81 @@ def read(
if "time" in event_attributes:
event_attributes["time"] = isoparse(event_attributes["time"])

# Handle data field based on version
specversion = event_attributes.get("specversion", SPECVERSION_V1_0)
event_data: dict[str, Any] | str | bytes | None = event_attributes.pop(
"data", None
)

# v0.3: Check for datacontentencoding attribute
if (
specversion == SPECVERSION_V0_3
and "datacontentencoding" in event_attributes
):
encoding = event_attributes.get("datacontentencoding", "").lower()
if encoding == "base64" and isinstance(event_data, str):
# Decode base64 encoded data in v0.3
event_data = base64.b64decode(event_data)

# v1.0: Check for data_base64 field (when data is None)
if event_data is None:
event_data_base64 = event_attributes.pop("data_base64", None)
if event_data_base64 is not None:
event_data = base64.b64decode(event_data_base64)

return event_factory(event_attributes, event_data)

def write(self, event: BaseCloudEvent) -> bytes:
def read(
self,
event_factory: EventFactory | None,
data: str | bytes,
) -> BaseCloudEvent:
"""
Write a CloudEvent to a JSON formatted byte string.
Read a CloudEvent from a JSON formatted byte string.

Supports both v0.3 and v1.0 CloudEvents:
- v0.3: Uses 'datacontentencoding: base64' with base64-encoded 'data' field
- v0.3: Uses 'datacontentencoding' attribute with 'data' field
- v1.0: Uses 'data_base64' field (no datacontentencoding)

:param event: The CloudEvent to write.
:return: The CloudEvent as a JSON formatted byte array.
:param event_factory: A factory function to create CloudEvent instances.
If None, automatically detects version from 'specversion' field.
:param data: The JSON formatted byte array.
:return: The CloudEvent instance.
"""
event_data = event.get_data()
event_dict: dict[str, Any] = dict(event.get_attributes())
specversion = event_dict.get("specversion", SPECVERSION_V1_0)
decoded_data: str
if isinstance(data, bytes):
decoded_data = data.decode("utf-8")
else:
decoded_data = data

if event_data is not None:
if isinstance(event_data, (bytes, bytearray)):
# Handle binary data based on version
if specversion == SPECVERSION_V0_3:
# v0.3: Use datacontentencoding with base64-encoded data field
event_dict["datacontentencoding"] = "base64"
event_dict["data"] = base64.b64encode(event_data).decode("utf-8")
else:
# v1.0: Use data_base64 field
event_dict["data_base64"] = base64.b64encode(event_data).decode(
"utf-8"
)
else:
datacontenttype = event_dict.get(
"datacontenttype", self.DEFAULT_CONTENT_TYPE
)
if re.match(JSONFormat.JSON_CONTENT_TYPE_PATTERN, datacontenttype):
event_dict["data"] = event_data
else:
event_dict["data"] = str(event_data)
return self._dict_to_event(event_factory, loads(decoded_data))

def read_batch(
self,
event_factory: EventFactory | None,
data: str | bytes,
) -> list[BaseCloudEvent]:
"""
Read a list of CloudEvents from a JSON Batch formatted byte string.

The input MUST be a JSON array; each element is parsed per the JSON event
format. When ``event_factory`` is None the version is auto-detected per
element, so a batch may mix v0.3 and v1.0 events. An empty array returns an
empty list. An invalid element propagates its exception and aborts the call.

:param event_factory: Factory to create CloudEvent instances, or None to
auto-detect each element's version.
:param data: The JSON Batch formatted byte array.
:return: The list of CloudEvent instances.
:raises ValueError: If the body is not a JSON array.
"""
decoded_data: str
if isinstance(data, bytes):
decoded_data = data.decode("utf-8")
else:
decoded_data = data

return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8")
parsed = loads(decoded_data)
if not isinstance(parsed, list):
Comment thread
xSAVIKx marked this conversation as resolved.
raise ValueError("JSON batch payload must be a JSON array of CloudEvents")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's tell what the actual type was


return [self._dict_to_event(event_factory, item) for item in parsed]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is theoretically possible to arrive here with a list of ints or other non-cloud-event types. I.e. we'll bypass the list check from above. And yes, it'll fail inside the _dict_to_event but the failure itself won't be very dev-friendly as we won't have any clue about what item out of all has failed, what the actual problem was, etc. IMO it is a good idea to wrap this into an additional try-catch and provide a more dev-friendly error with more details if smth fails, wdyt?


def write_data(
self,
Expand Down Expand Up @@ -230,3 +291,11 @@ def get_content_type(self) -> str:
:return: Content type string for CloudEvents structured content mode
"""
return self.CONTENT_TYPE

def get_batch_content_type(self) -> str:
"""
Get the Content-Type header value for JSON batch mode.

:return: ``application/cloudevents-batch+json``
"""
return self.BATCH_CONTENT_TYPE
Loading
Loading