diff --git a/src/cloudevents/core/bindings/http.py b/src/cloudevents/core/bindings/http.py index fbdcc3b0..bcb9805e 100644 --- a/src/cloudevents/core/bindings/http.py +++ b/src/cloudevents/core/bindings/http.py @@ -260,6 +260,42 @@ def from_structured( return event_format.read(event_factory, message.body) +def to_batch(events: list[BaseCloudEvent], event_format: Format) -> HTTPMessage: + """ + Convert a list of CloudEvents to an HTTP batched content mode message. + + The entire batch is serialized into the HTTP body as a JSON array and the + Content-Type header is set to the format's batch media type. + + :param events: The CloudEvents to convert + :param event_format: Format implementation for batch serialization + :return: HTTPMessage with the batch in the body + """ + headers = {CONTENT_TYPE_HEADER: event_format.get_batch_content_type()} + body = event_format.write_batch(events) + return HTTPMessage(headers=headers, body=body) + + +def from_batch( + message: HTTPMessage, + event_format: Format, + event_factory: EventFactory | None = None, +) -> list[BaseCloudEvent]: + """ + Parse an HTTP batched content mode message to a list of CloudEvents. + + Deserializes the batch from the HTTP body using the specified format. When + ``event_factory`` is None, each event's version is auto-detected independently. + + :param message: HTTPMessage to parse + :param event_format: Format implementation for batch deserialization + :param event_factory: Factory to create CloudEvent instances (auto-detected if None) + :return: List of CloudEvent instances + """ + events: list[BaseCloudEvent] = event_format.read_batch(event_factory, message.body) + return events + + def from_http( message: HTTPMessage, event_format: Format, @@ -300,6 +336,19 @@ def from_http( :param event_factory: Factory function to create CloudEvent instances (auto-detected if None) :return: CloudEvent instance """ + content_type = "" + for key, value in message.headers.items(): + if key.lower() == CONTENT_TYPE_HEADER: + content_type = value + break + + batch_content_type = event_format.get_batch_content_type() + if content_type.split(";")[0].strip().lower() == batch_content_type: + raise ValueError( + f"Received a batch payload ('{batch_content_type}'); " + "use from_batch()/from_batch_event() to parse batched CloudEvents" + ) + if any(key.lower().startswith(CE_PREFIX) for key in message.headers.keys()): return from_binary(message, event_format, event_factory) @@ -402,6 +451,38 @@ def from_structured_event( return from_structured(message, event_format, None) +def to_batch_event( + events: list[BaseCloudEvent], + event_format: Format | None = None, +) -> HTTPMessage: + """ + Convenience wrapper for to_batch with JSON format as default. + + :param events: The CloudEvents to convert + :param event_format: Format implementation (defaults to JSONFormat) + :return: HTTPMessage with the batch in the body + """ + if event_format is None: + event_format = JSONFormat() + return to_batch(events, event_format) + + +def from_batch_event( + message: HTTPMessage, + event_format: Format | None = None, +) -> list[BaseCloudEvent]: + """ + Convenience wrapper for from_batch with JSON format and auto-detection. + + :param message: HTTPMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: List of CloudEvent instances (version auto-detected per event) + """ + if event_format is None: + event_format = JSONFormat() + return from_batch(message, event_format, None) + + def from_http_event( message: HTTPMessage, event_format: Format | None = None, diff --git a/src/cloudevents/core/formats/base.py b/src/cloudevents/core/formats/base.py index ae2d9d0a..d71d9d54 100644 --- a/src/cloudevents/core/formats/base.py +++ b/src/cloudevents/core/formats/base.py @@ -88,3 +88,35 @@ def get_content_type(self) -> str: :return: Content type string for CloudEvents structured content mode """ ... + + def write_batch(self, events: list[BaseCloudEvent]) -> bytes: + """ + Serialize a list of CloudEvents into a batch wire representation. + + :param events: The CloudEvents to serialize. + :return: The batch serialized as bytes. + """ + ... + + def read_batch( + self, + event_factory: EventFactory | None, + data: str | bytes, + ) -> list[BaseCloudEvent]: + """ + Deserialize a batch wire representation into a list of CloudEvents. + + :param event_factory: Factory to create CloudEvent instances, or None to + auto-detect each element's version. + :param data: The serialized batch as a string or bytes. + :return: The list of CloudEvent instances. + """ + ... + + def get_batch_content_type(self) -> str: + """ + Get the Content-Type header value for batch mode. + + :return: Content type string for CloudEvents batch content mode. + """ + ... diff --git a/src/cloudevents/core/formats/json.py b/src/cloudevents/core/formats/json.py index f20e0538..b8a55c01 100644 --- a/src/cloudevents/core/formats/json.py +++ b/src/cloudevents/core/formats/json.py @@ -45,36 +45,83 @@ def default(self, obj: Any) -> Any: class JSONFormat(Format): CONTENT_TYPE: Final[str] = "application/cloudevents+json" DEFAULT_CONTENT_TYPE: Final[str] = "application/json" + BATCH_CONTENT_TYPE: Final[str] = "application/cloudevents-batch+json" JSON_CONTENT_TYPE_PATTERN: Pattern[str] = re.compile( r"^(application|text)/([a-zA-Z0-9\-\.]+\+)?json(;.*)?$" ) - def read( - self, - event_factory: EventFactory | None, - data: str | bytes, - ) -> BaseCloudEvent: + def _event_to_dict(self, event: BaseCloudEvent) -> dict[str, Any]: """ - Read a CloudEvent from a JSON formatted byte string. + Build the JSON-serializable dict for a single CloudEvent. + + Shared by single-event ``write`` and batch ``write_batch`` so the two + paths cannot drift. + """ + event_data = event.get_data() + event_dict: dict[str, Any] = dict(event.get_attributes()) + specversion = event_dict.get("specversion", SPECVERSION_V1_0) + + if event_data is not None: + if isinstance(event_data, (bytes, bytearray)): + if specversion == SPECVERSION_V0_3: + event_dict["datacontentencoding"] = "base64" + event_dict["data"] = base64.b64encode(event_data).decode("utf-8") + else: + event_dict["data_base64"] = base64.b64encode(event_data).decode( + "utf-8" + ) + else: + datacontenttype = event_dict.get( + "datacontenttype", self.DEFAULT_CONTENT_TYPE + ) + if re.match(JSONFormat.JSON_CONTENT_TYPE_PATTERN, datacontenttype): + event_dict["data"] = event_data + else: + event_dict["data"] = str(event_data) + return event_dict + + def write(self, event: BaseCloudEvent) -> bytes: + """ + Write a CloudEvent to a JSON formatted byte string. Supports both v0.3 and v1.0 CloudEvents: - - v0.3: Uses 'datacontentencoding' attribute with 'data' field + - v0.3: Uses 'datacontentencoding: base64' with base64-encoded 'data' field - v1.0: Uses 'data_base64' field (no datacontentencoding) - :param event_factory: A factory function to create CloudEvent instances. - If None, automatically detects version from 'specversion' field. - :param data: The JSON formatted byte array. - :return: The CloudEvent instance. + :param event: The CloudEvent to write. + :return: The CloudEvent as a JSON formatted byte array. """ - decoded_data: str - if isinstance(data, bytes): - decoded_data = data.decode("utf-8") - else: - decoded_data = data + return dumps(self._event_to_dict(event), cls=_JSONEncoderWithDatetime).encode( + "utf-8" + ) + + def write_batch(self, events: list[BaseCloudEvent]) -> bytes: + """ + Write a list of CloudEvents to a JSON Batch formatted byte string. + + The output is a JSON array whose elements are CloudEvents rendered per the + JSON event format. An empty list serializes to ``b"[]"``. - event_attributes = loads(decoded_data) + :param events: The CloudEvents to write. + :return: The batch as a JSON formatted byte array. + """ + return dumps( + [self._event_to_dict(event) for event in events], + cls=_JSONEncoderWithDatetime, + ).encode("utf-8") - # Auto-detect version if factory not provided + def _dict_to_event( + self, + event_factory: EventFactory | None, + event_attributes: dict[str, Any], + ) -> BaseCloudEvent: + """ + Build a CloudEvent from an already-parsed attributes dict. + + Shared by single-event ``read`` and batch ``read_batch``. Handles version + auto-detection, 'time' parsing, v0.3 'datacontentencoding', and v1.0 + 'data_base64'. + """ if event_factory is None: from cloudevents.core.bindings.common import get_event_factory_for_version @@ -84,23 +131,19 @@ def read( if "time" in event_attributes: event_attributes["time"] = isoparse(event_attributes["time"]) - # Handle data field based on version specversion = event_attributes.get("specversion", SPECVERSION_V1_0) event_data: dict[str, Any] | str | bytes | None = event_attributes.pop( "data", None ) - # v0.3: Check for datacontentencoding attribute if ( specversion == SPECVERSION_V0_3 and "datacontentencoding" in event_attributes ): encoding = event_attributes.get("datacontentencoding", "").lower() if encoding == "base64" and isinstance(event_data, str): - # Decode base64 encoded data in v0.3 event_data = base64.b64decode(event_data) - # v1.0: Check for data_base64 field (when data is None) if event_data is None: event_data_base64 = event_attributes.pop("data_base64", None) if event_data_base64 is not None: @@ -108,43 +151,61 @@ def read( return event_factory(event_attributes, event_data) - def write(self, event: BaseCloudEvent) -> bytes: + def read( + self, + event_factory: EventFactory | None, + data: str | bytes, + ) -> BaseCloudEvent: """ - Write a CloudEvent to a JSON formatted byte string. + Read a CloudEvent from a JSON formatted byte string. Supports both v0.3 and v1.0 CloudEvents: - - v0.3: Uses 'datacontentencoding: base64' with base64-encoded 'data' field + - v0.3: Uses 'datacontentencoding' attribute with 'data' field - v1.0: Uses 'data_base64' field (no datacontentencoding) - :param event: The CloudEvent to write. - :return: The CloudEvent as a JSON formatted byte array. + :param event_factory: A factory function to create CloudEvent instances. + If None, automatically detects version from 'specversion' field. + :param data: The JSON formatted byte array. + :return: The CloudEvent instance. """ - event_data = event.get_data() - event_dict: dict[str, Any] = dict(event.get_attributes()) - specversion = event_dict.get("specversion", SPECVERSION_V1_0) + decoded_data: str + if isinstance(data, bytes): + decoded_data = data.decode("utf-8") + else: + decoded_data = data - if event_data is not None: - if isinstance(event_data, (bytes, bytearray)): - # Handle binary data based on version - if specversion == SPECVERSION_V0_3: - # v0.3: Use datacontentencoding with base64-encoded data field - event_dict["datacontentencoding"] = "base64" - event_dict["data"] = base64.b64encode(event_data).decode("utf-8") - else: - # v1.0: Use data_base64 field - event_dict["data_base64"] = base64.b64encode(event_data).decode( - "utf-8" - ) - else: - datacontenttype = event_dict.get( - "datacontenttype", self.DEFAULT_CONTENT_TYPE - ) - if re.match(JSONFormat.JSON_CONTENT_TYPE_PATTERN, datacontenttype): - event_dict["data"] = event_data - else: - event_dict["data"] = str(event_data) + return self._dict_to_event(event_factory, loads(decoded_data)) + + def read_batch( + self, + event_factory: EventFactory | None, + data: str | bytes, + ) -> list[BaseCloudEvent]: + """ + Read a list of CloudEvents from a JSON Batch formatted byte string. + + The input MUST be a JSON array; each element is parsed per the JSON event + format. When ``event_factory`` is None the version is auto-detected per + element, so a batch may mix v0.3 and v1.0 events. An empty array returns an + empty list. An invalid element propagates its exception and aborts the call. + + :param event_factory: Factory to create CloudEvent instances, or None to + auto-detect each element's version. + :param data: The JSON Batch formatted byte array. + :return: The list of CloudEvent instances. + :raises ValueError: If the body is not a JSON array. + """ + decoded_data: str + if isinstance(data, bytes): + decoded_data = data.decode("utf-8") + else: + decoded_data = data - return dumps(event_dict, cls=_JSONEncoderWithDatetime).encode("utf-8") + parsed = loads(decoded_data) + if not isinstance(parsed, list): + raise ValueError("JSON batch payload must be a JSON array of CloudEvents") + + return [self._dict_to_event(event_factory, item) for item in parsed] def write_data( self, @@ -230,3 +291,11 @@ def get_content_type(self) -> str: :return: Content type string for CloudEvents structured content mode """ return self.CONTENT_TYPE + + def get_batch_content_type(self) -> str: + """ + Get the Content-Type header value for JSON batch mode. + + :return: ``application/cloudevents-batch+json`` + """ + return self.BATCH_CONTENT_TYPE diff --git a/tests/test_core/test_bindings/test_http.py b/tests/test_core/test_bindings/test_http.py index cf46ba4d..4ffadd27 100644 --- a/tests/test_core/test_bindings/test_http.py +++ b/tests/test_core/test_bindings/test_http.py @@ -19,12 +19,16 @@ from cloudevents.core.bindings.http import ( HTTPMessage, + from_batch, + from_batch_event, from_binary, from_binary_event, from_http, from_http_event, from_structured, from_structured_event, + to_batch, + to_batch_event, to_binary, to_binary_event, to_structured, @@ -1139,3 +1143,90 @@ def test_convenience_with_explicit_format_override() -> None: assert recovered.get_type() == event.get_type() assert recovered.get_data() == event.get_data() + + +def test_to_batch_sets_batch_content_type() -> None: + events = [ + create_event({"id": "1"}, {"key": "value1"}), + create_event({"id": "2"}, {"key": "value2"}), + ] + + message = to_batch(events, JSONFormat()) + + assert message.headers["content-type"] == "application/cloudevents-batch+json" + + +def test_to_batch_from_batch_round_trip() -> None: + events = [ + create_event({"id": "1"}, {"key": "value1"}), + create_event({"id": "2"}, {"key": "value2"}), + ] + + message = to_batch(events, JSONFormat()) + parsed = from_batch(message, JSONFormat(), CloudEvent) + + assert len(parsed) == 2 + assert parsed[0].get_id() == "1" + assert parsed[0].get_data() == {"key": "value1"} + assert parsed[1].get_id() == "2" + + +def test_from_batch_empty() -> None: + message = HTTPMessage( + headers={"content-type": "application/cloudevents-batch+json"}, + body=b"[]", + ) + + assert from_batch(message, JSONFormat(), CloudEvent) == [] + + +def test_to_batch_event_defaults_to_json() -> None: + events = [create_event({"id": "1"}, {"key": "value1"})] + + message = to_batch_event(events) + + assert message.headers["content-type"] == "application/cloudevents-batch+json" + + +def test_from_batch_event_round_trip_auto_detect() -> None: + events = [ + create_event({"id": "1"}, {"key": "value1"}), + create_event({"id": "2"}, {"key": "value2"}), + ] + + message = to_batch_event(events) + parsed = from_batch_event(message) + + assert len(parsed) == 2 + assert parsed[0].get_id() == "1" + assert parsed[1].get_id() == "2" + + +def test_from_http_rejects_batch_payload() -> None: + message = HTTPMessage( + headers={"content-type": "application/cloudevents-batch+json"}, + body=b"[]", + ) + + with pytest.raises(ValueError, match="from_batch"): + from_http(message, JSONFormat()) + + +def test_from_http_rejects_batch_payload_with_charset_param() -> None: + message = HTTPMessage( + headers={"content-type": "application/cloudevents-batch+json; charset=utf-8"}, + body=b"[]", + ) + + with pytest.raises(ValueError, match="from_batch"): + from_http(message, JSONFormat()) + + +def test_from_http_rejects_batch_payload_mixed_case_content_type() -> None: + message = HTTPMessage( + headers={"content-type": "Application/CloudEvents-Batch+JSON"}, + body=b"[]", + ) + + with pytest.raises(ValueError, match="from_batch"): + from_http(message, JSONFormat()) diff --git a/tests/test_core/test_format/test_json.py b/tests/test_core/test_format/test_json.py index 9971779b..767e1df5 100644 --- a/tests/test_core/test_format/test_json.py +++ b/tests/test_core/test_format/test_json.py @@ -17,6 +17,7 @@ import pytest +from cloudevents.core.exceptions import BaseCloudEventException from cloudevents.core.formats.json import JSONFormat from cloudevents.core.v1.event import CloudEvent @@ -346,3 +347,134 @@ def test_read_data_json_body(content_type: str) -> None: result = formatter.read_data(body, content_type) assert result == {"key": "value"} + + +def test_write_batch_with_multiple_events() -> None: + formatter = JSONFormat() + event1 = CloudEvent( + attributes={ + "id": "1", + "source": "source", + "type": "type", + "specversion": "1.0", + }, + data={"key": "value1"}, + ) + event2 = CloudEvent( + attributes={ + "id": "2", + "source": "source", + "type": "type", + "specversion": "1.0", + }, + data={"key": "value2"}, + ) + + result = formatter.write_batch([event1, event2]) + + parsed = loads(result.decode("utf-8")) + assert isinstance(parsed, list) + assert len(parsed) == 2 + assert parsed[0]["id"] == "1" + assert parsed[0]["data"] == {"key": "value1"} + assert parsed[1]["id"] == "2" + assert parsed[1]["data"] == {"key": "value2"} + + +def test_write_batch_empty_returns_empty_array() -> None: + formatter = JSONFormat() + assert formatter.write_batch([]) == b"[]" + + +def test_read_batch_round_trip() -> None: + formatter = JSONFormat() + event1 = CloudEvent( + attributes={ + "id": "1", + "source": "source", + "type": "type", + "specversion": "1.0", + }, + data={"key": "value1"}, + ) + event2 = CloudEvent( + attributes={ + "id": "2", + "source": "source", + "type": "type", + "specversion": "1.0", + }, + data={"key": "value2"}, + ) + + serialized = formatter.write_batch([event1, event2]) + events = formatter.read_batch(CloudEvent, serialized) + + assert len(events) == 2 + assert events[0].get_id() == "1" + assert events[0].get_data() == {"key": "value1"} + assert events[1].get_id() == "2" + assert events[1].get_data() == {"key": "value2"} + + +def test_read_batch_empty_array_returns_empty_list() -> None: + formatter = JSONFormat() + assert formatter.read_batch(CloudEvent, b"[]") == [] + + +def test_read_batch_auto_detects_mixed_versions() -> None: + from cloudevents.core.v03.event import CloudEvent as CloudEventV03 + + formatter = JSONFormat() + body = ( + b'[{"id": "1", "source": "source", "type": "type", "specversion": "1.0"},' + b' {"id": "2", "source": "source", "type": "type", "specversion": "0.3"}]' + ) + + events = formatter.read_batch(None, body) + + assert isinstance(events[0], CloudEvent) + assert isinstance(events[1], CloudEventV03) + + +def test_read_batch_with_data_base64_element() -> None: + formatter = JSONFormat() + event = CloudEvent( + attributes={ + "id": "1", + "source": "source", + "type": "type", + "specversion": "1.0", + }, + data=b"binary-data", + ) + + serialized = formatter.write_batch([event]) + events = formatter.read_batch(CloudEvent, serialized) + + assert events[0].get_data() == b"binary-data" + + +def test_read_batch_non_array_body_raises() -> None: + formatter = JSONFormat() + body = b'{"id": "1", "source": "source", "type": "type", "specversion": "1.0"}' + + with pytest.raises(ValueError): + formatter.read_batch(CloudEvent, body) + + +def test_read_batch_invalid_element_aborts() -> None: + formatter = JSONFormat() + # second element is missing the required 'type' attribute + body = ( + b'[{"id": "1", "source": "source", "type": "type", "specversion": "1.0"},' + b' {"id": "2", "source": "source", "specversion": "1.0"}]' + ) + + with pytest.raises(BaseCloudEventException): + formatter.read_batch(CloudEvent, body) + + +def test_get_batch_content_type() -> None: + formatter = JSONFormat() + assert formatter.get_batch_content_type() == "application/cloudevents-batch+json"