-
Notifications
You must be signed in to change notification settings - Fork 61
feat(core): add JSON Batch format support #299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -260,6 +260,42 @@ def from_structured( | |
| return event_format.read(event_factory, message.body) | ||
|
|
||
|
|
||
| def to_batch(events: list[BaseCloudEvent], event_format: Format) -> HTTPMessage: | ||
| """ | ||
| Convert a list of CloudEvents to an HTTP batched content mode message. | ||
|
|
||
| The entire batch is serialized into the HTTP body as a JSON array and the | ||
| Content-Type header is set to the format's batch media type. | ||
|
|
||
| :param events: The CloudEvents to convert | ||
| :param event_format: Format implementation for batch serialization | ||
| :return: HTTPMessage with the batch in the body | ||
| """ | ||
| headers = {CONTENT_TYPE_HEADER: event_format.get_batch_content_type()} | ||
| body = event_format.write_batch(events) | ||
| return HTTPMessage(headers=headers, body=body) | ||
|
|
||
|
|
||
| def from_batch( | ||
| message: HTTPMessage, | ||
| event_format: Format, | ||
| event_factory: EventFactory | None = None, | ||
| ) -> list[BaseCloudEvent]: | ||
| """ | ||
| Parse an HTTP batched content mode message to a list of CloudEvents. | ||
|
|
||
| Deserializes the batch from the HTTP body using the specified format. When | ||
| ``event_factory`` is None, each event's version is auto-detected independently. | ||
|
|
||
| :param message: HTTPMessage to parse | ||
| :param event_format: Format implementation for batch deserialization | ||
| :param event_factory: Factory to create CloudEvent instances (auto-detected if None) | ||
| :return: List of CloudEvent instances | ||
| """ | ||
| events: list[BaseCloudEvent] = event_format.read_batch(event_factory, message.body) | ||
| return events | ||
|
|
||
|
|
||
| def from_http( | ||
| message: HTTPMessage, | ||
| event_format: Format, | ||
|
|
@@ -300,6 +336,19 @@ def from_http( | |
| :param event_factory: Factory function to create CloudEvent instances (auto-detected if None) | ||
| :return: CloudEvent instance | ||
| """ | ||
| content_type = "" | ||
| for key, value in message.headers.items(): | ||
| if key.lower() == CONTENT_TYPE_HEADER: | ||
| content_type = value | ||
| break | ||
|
|
||
| batch_content_type = event_format.get_batch_content_type() | ||
| if content_type.split(";")[0].strip().lower() == batch_content_type: | ||
| raise ValueError( | ||
| f"Received a batch payload ('{batch_content_type}'); " | ||
| "use from_batch()/from_batch_event() to parse batched CloudEvents" | ||
| ) | ||
|
|
||
| if any(key.lower().startswith(CE_PREFIX) for key in message.headers.keys()): | ||
| return from_binary(message, event_format, event_factory) | ||
|
|
||
|
|
@@ -402,6 +451,38 @@ def from_structured_event( | |
| return from_structured(message, event_format, None) | ||
|
|
||
|
|
||
| def to_batch_event( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think language-wise it should be |
||
| events: list[BaseCloudEvent], | ||
| event_format: Format | None = None, | ||
| ) -> HTTPMessage: | ||
| """ | ||
| Convenience wrapper for to_batch with JSON format as default. | ||
|
|
||
| :param events: The CloudEvents to convert | ||
| :param event_format: Format implementation (defaults to JSONFormat) | ||
| :return: HTTPMessage with the batch in the body | ||
| """ | ||
| if event_format is None: | ||
| event_format = JSONFormat() | ||
| return to_batch(events, event_format) | ||
|
|
||
|
|
||
| def from_batch_event( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same applies here then. it's probably |
||
| message: HTTPMessage, | ||
| event_format: Format | None = None, | ||
| ) -> list[BaseCloudEvent]: | ||
| """ | ||
| Convenience wrapper for from_batch with JSON format and auto-detection. | ||
|
|
||
| :param message: HTTPMessage to parse | ||
| :param event_format: Format implementation (defaults to JSONFormat) | ||
| :return: List of CloudEvent instances (version auto-detected per event) | ||
| """ | ||
| if event_format is None: | ||
| event_format = JSONFormat() | ||
| return from_batch(message, event_format, None) | ||
|
|
||
|
|
||
| def from_http_event( | ||
| message: HTTPMessage, | ||
| event_format: Format | None = None, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,3 +88,35 @@ def get_content_type(self) -> str: | |
| :return: Content type string for CloudEvents structured content mode | ||
| """ | ||
| ... | ||
|
|
||
| def write_batch(self, events: list[BaseCloudEvent]) -> bytes: | ||
| """ | ||
| Serialize a list of CloudEvents into a batch wire representation. | ||
|
|
||
| :param events: The CloudEvents to serialize. | ||
| :return: The batch serialized as bytes. | ||
| """ | ||
| ... | ||
|
|
||
| def read_batch( | ||
| self, | ||
| event_factory: EventFactory | None, | ||
| data: str | bytes, | ||
| ) -> list[BaseCloudEvent]: | ||
| """ | ||
| Deserialize a batch wire representation into a list of CloudEvents. | ||
|
|
||
| :param event_factory: Factory to create CloudEvent instances, or None to | ||
| auto-detect each element's version. | ||
| :param data: The serialized batch as a string or bytes. | ||
| :return: The list of CloudEvent instances. | ||
| """ | ||
| ... | ||
|
|
||
| def get_batch_content_type(self) -> str: | ||
| """ | ||
| Get the Content-Type header value for batch mode. | ||
|
|
||
| :return: Content type string for CloudEvents batch content mode. | ||
| """ | ||
| ... | ||
|
Comment on lines
+92
to
+122
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While this is the first "batch" support in the SDK, we should make sure we make it easy to work with and extend. So I think instead of extending the existing |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,36 +45,83 @@ def default(self, obj: Any) -> Any: | |
| class JSONFormat(Format): | ||
| CONTENT_TYPE: Final[str] = "application/cloudevents+json" | ||
| DEFAULT_CONTENT_TYPE: Final[str] = "application/json" | ||
| BATCH_CONTENT_TYPE: Final[str] = "application/cloudevents-batch+json" | ||
| JSON_CONTENT_TYPE_PATTERN: Pattern[str] = re.compile( | ||
| r"^(application|text)/([a-zA-Z0-9\-\.]+\+)?json(;.*)?$" | ||
| ) | ||
|
|
||
| def read( | ||
| self, | ||
| event_factory: EventFactory | None, | ||
| data: str | bytes, | ||
| ) -> BaseCloudEvent: | ||
| def _event_to_dict(self, event: BaseCloudEvent) -> dict[str, Any]: | ||
| """ | ||
| Read a CloudEvent from a JSON formatted byte string. | ||
| Build the JSON-serializable dict for a single CloudEvent. | ||
|
|
||
| Shared by single-event ``write`` and batch ``write_batch`` so the two | ||
| paths cannot drift. | ||
| """ | ||
| event_data = event.get_data() | ||
| event_dict: dict[str, Any] = dict(event.get_attributes()) | ||
| specversion = event_dict.get("specversion", SPECVERSION_V1_0) | ||
|
|
||
| if event_data is not None: | ||
| if isinstance(event_data, (bytes, bytearray)): | ||
| if specversion == SPECVERSION_V0_3: | ||
| event_dict["datacontentencoding"] = "base64" | ||
| event_dict["data"] = base64.b64encode(event_data).decode("utf-8") | ||
| else: | ||
| event_dict["data_base64"] = base64.b64encode(event_data).decode( | ||
| "utf-8" | ||
| ) | ||
| else: | ||
| datacontenttype = event_dict.get( | ||
| "datacontenttype", self.DEFAULT_CONTENT_TYPE | ||
| ) | ||
| if re.match(JSONFormat.JSON_CONTENT_TYPE_PATTERN, datacontenttype): | ||
| event_dict["data"] = event_data | ||
| else: | ||
| event_dict["data"] = str(event_data) | ||
| return event_dict | ||
|
|
||
| def write(self, event: BaseCloudEvent) -> bytes: | ||
| """ | ||
| Write a CloudEvent to a JSON formatted byte string. | ||
|
|
||
| Supports both v0.3 and v1.0 CloudEvents: | ||
| - v0.3: Uses 'datacontentencoding' attribute with 'data' field | ||
| - v0.3: Uses 'datacontentencoding: base64' with base64-encoded 'data' field | ||
| - v1.0: Uses 'data_base64' field (no datacontentencoding) | ||
|
|
||
| :param event_factory: A factory function to create CloudEvent instances. | ||
| If None, automatically detects version from 'specversion' field. | ||
| :param data: The JSON formatted byte array. | ||
| :return: The CloudEvent instance. | ||
| :param event: The CloudEvent to write. | ||
| :return: The CloudEvent as a JSON formatted byte array. | ||
| """ | ||
| decoded_data: str | ||
| if isinstance(data, bytes): | ||
| decoded_data = data.decode("utf-8") | ||
| else: | ||
| decoded_data = data | ||
| return dumps(self._event_to_dict(event), cls=_JSONEncoderWithDatetime).encode( | ||
| "utf-8" | ||
| ) | ||
|
|
||
| def write_batch(self, events: list[BaseCloudEvent]) -> bytes: | ||
| """ | ||
| Write a list of CloudEvents to a JSON Batch formatted byte string. | ||
|
|
||
| The output is a JSON array whose elements are CloudEvents rendered per the | ||
| JSON event format. An empty list serializes to ``b"[]"``. | ||
|
|
||
| event_attributes = loads(decoded_data) | ||
| :param events: The CloudEvents to write. | ||
| :return: The batch as a JSON formatted byte array. | ||
| """ | ||
| return dumps( | ||
| [self._event_to_dict(event) for event in events], | ||
| cls=_JSONEncoderWithDatetime, | ||
| ).encode("utf-8") | ||
|
|
||
| # Auto-detect version if factory not provided | ||
| def _dict_to_event( | ||
| self, | ||
| event_factory: EventFactory | None, | ||
| event_attributes: dict[str, Any], | ||
| ) -> BaseCloudEvent: | ||
| """ | ||
| Build a CloudEvent from an already-parsed attributes dict. | ||
|
|
||
| Shared by single-event ``read`` and batch ``read_batch``. Handles version | ||
| auto-detection, 'time' parsing, v0.3 'datacontentencoding', and v1.0 | ||
| 'data_base64'. | ||
| """ | ||
| if event_factory is None: | ||
| from cloudevents.core.bindings.common import get_event_factory_for_version | ||
|
|
||
|
|
@@ -84,67 +131,81 @@ def read( | |
| if "time" in event_attributes: | ||
| event_attributes["time"] = isoparse(event_attributes["time"]) | ||
|
|
||
| # Handle data field based on version | ||
| specversion = event_attributes.get("specversion", SPECVERSION_V1_0) | ||
| event_data: dict[str, Any] | str | bytes | None = event_attributes.pop( | ||
| "data", None | ||
| ) | ||
|
|
||
| # v0.3: Check for datacontentencoding attribute | ||
| if ( | ||
| specversion == SPECVERSION_V0_3 | ||
| and "datacontentencoding" in event_attributes | ||
| ): | ||
| encoding = event_attributes.get("datacontentencoding", "").lower() | ||
| if encoding == "base64" and isinstance(event_data, str): | ||
| # Decode base64 encoded data in v0.3 | ||
| event_data = base64.b64decode(event_data) | ||
|
|
||
| # v1.0: Check for data_base64 field (when data is None) | ||
| if event_data is None: | ||
| event_data_base64 = event_attributes.pop("data_base64", None) | ||
| if event_data_base64 is not None: | ||
| event_data = base64.b64decode(event_data_base64) | ||
|
|
||
| return event_factory(event_attributes, event_data) | ||
|
|
||
| def write(self, event: BaseCloudEvent) -> bytes: | ||
| def read( | ||
| self, | ||
| event_factory: EventFactory | None, | ||
| data: str | bytes, | ||
| ) -> BaseCloudEvent: | ||
| """ | ||
| Write a CloudEvent to a JSON formatted byte string. | ||
| Read a CloudEvent from a JSON formatted byte string. | ||
|
|
||
| Supports both v0.3 and v1.0 CloudEvents: | ||
| - v0.3: Uses 'datacontentencoding: base64' with base64-encoded 'data' field | ||
| - v0.3: Uses 'datacontentencoding' attribute with 'data' field | ||
| - v1.0: Uses 'data_base64' field (no datacontentencoding) | ||
|
|
||
| :param event: The CloudEvent to write. | ||
| :return: The CloudEvent as a JSON formatted byte array. | ||
| :param event_factory: A factory function to create CloudEvent instances. | ||
| If None, automatically detects version from 'specversion' field. | ||
| :param data: The JSON formatted byte array. | ||
| :return: The CloudEvent instance. | ||
| """ | ||
| event_data = event.get_data() | ||
| event_dict: dict[str, Any] = dict(event.get_attributes()) | ||
| specversion = event_dict.get("specversion", SPECVERSION_V1_0) | ||
| decoded_data: str | ||
| if isinstance(data, bytes): | ||
| decoded_data = data.decode("utf-8") | ||
| else: | ||
| decoded_data = data | ||
|
|
||
| if event_data is not None: | ||
| if isinstance(event_data, (bytes, bytearray)): | ||
| # Handle binary data based on version | ||
| if specversion == SPECVERSION_V0_3: | ||
| # v0.3: Use datacontentencoding with base64-encoded data field | ||
| event_dict["datacontentencoding"] = "base64" | ||
| event_dict["data"] = base64.b64encode(event_data).decode("utf-8") | ||
| else: | ||
| # v1.0: Use data_base64 field | ||
| event_dict["data_base64"] = base64.b64encode(event_data).decode( | ||
| "utf-8" | ||
| ) | ||
| else: | ||
| datacontenttype = event_dict.get( | ||
| "datacontenttype", self.DEFAULT_CONTENT_TYPE | ||
| ) | ||
| if re.match(JSONFormat.JSON_CONTENT_TYPE_PATTERN, datacontenttype): | ||
| event_dict["data"] = event_data | ||
| else: | ||
| event_dict["data"] = str(event_data) | ||
| return self._dict_to_event(event_factory, loads(decoded_data)) | ||
|
|
||
| def read_batch( | ||
| self, | ||
| event_factory: EventFactory | None, | ||
| data: str | bytes, | ||
| ) -> list[BaseCloudEvent]: | ||
| """ | ||
| Read a list of CloudEvents from a JSON Batch formatted byte string. | ||
|
|
||
| The input MUST be a JSON array; each element is parsed per the JSON event | ||
| format. When ``event_factory`` is None the version is auto-detected per | ||
| element, so a batch may mix v0.3 and v1.0 events. An empty array returns an | ||
| empty list. An invalid element propagates its exception and aborts the call. | ||
|
|
||
| :param event_factory: Factory to create CloudEvent instances, or None to | ||
| auto-detect each element's version. | ||
| :param data: The JSON Batch formatted byte array. | ||
| :return: The list of CloudEvent instances. | ||
| :raises ValueError: If the body is not a JSON array. | ||
| """ | ||
| decoded_data: str | ||
| if isinstance(data, bytes): | ||
| decoded_data = data.decode("utf-8") | ||
| else: | ||
| decoded_data = data | ||
|
|
||
| return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8") | ||
| parsed = loads(decoded_data) | ||
| if not isinstance(parsed, list): | ||
|
xSAVIKx marked this conversation as resolved.
|
||
| raise ValueError("JSON batch payload must be a JSON array of CloudEvents") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's tell what the actual type was |
||
|
|
||
| return [self._dict_to_event(event_factory, item) for item in parsed] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is theoretically possible to arrive here with a list of ints or other non-cloud-event types. I.e. we'll bypass the |
||
|
|
||
| def write_data( | ||
| self, | ||
|
|
@@ -230,3 +291,11 @@ def get_content_type(self) -> str: | |
| :return: Content type string for CloudEvents structured content mode | ||
| """ | ||
| return self.CONTENT_TYPE | ||
|
|
||
| def get_batch_content_type(self) -> str: | ||
| """ | ||
| Get the Content-Type header value for JSON batch mode. | ||
|
|
||
| :return: ``application/cloudevents-batch+json`` | ||
| """ | ||
| return self.BATCH_CONTENT_TYPE | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if content type has no
;in it?get_batch_content_typedoes not enforcelower(). So we should either dolower()on both or reconsider how this is compared.And I'd also dump what we compared against what we wanted into the error message, so worth extracting the
content_type.split(";")[0].strip().lower()part into a variable.