From 682924573ae6dc4c933c89e15f2408cb6eb9df48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Tue, 14 Apr 2026 13:28:06 +0200 Subject: [PATCH 01/16] add streams api --- cognite/client/_api/data_modeling/__init__.py | 3 +- cognite/client/_api/data_modeling/streams.py | 96 ++++++++ .../_sync_api/data_modeling/__init__.py | 14 +- .../client/_sync_api/data_modeling/streams.py | 88 +++++++ .../data_classes/data_modeling/__init__.py | 20 ++ .../data_classes/data_modeling/streams.py | 223 ++++++++++++++++++ cognite/client/testing.py | 8 + cognite/client/utils/_url.py | 1 + .../test_data_modeling/test_streams.py | 194 +++++++++++++++ tests/tests_unit/test_api_client.py | 3 + .../test_data_models/test_streams.py | 90 +++++++ 11 files changed, 737 insertions(+), 3 deletions(-) create mode 100644 cognite/client/_api/data_modeling/streams.py create mode 100644 cognite/client/_sync_api/data_modeling/streams.py create mode 100644 cognite/client/data_classes/data_modeling/streams.py create mode 100644 tests/tests_unit/test_api/test_data_modeling/test_streams.py create mode 100644 tests/tests_unit/test_data_classes/test_data_models/test_streams.py diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index ee60b4b281..bcc604f6ef 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -9,6 +9,7 @@ from cognite.client._api.data_modeling.instances import InstancesAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api_client import APIClient @@ -27,7 +28,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.instances = InstancesAPI(config, api_version, cognite_client) self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) - + self.streams = StreamsAPI(config, api_version, cognite_client) def _get_semaphore( self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"] ) -> asyncio.BoundedSemaphore: diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py new file mode 100644 index 0000000000..d4e72c0656 --- /dev/null +++ b/cognite/client/_api/data_modeling/streams.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamList, + StreamWrite, +) +from cognite.client.utils._identifier import IdentifierSequence +from cognite.client.utils._url import interpolate_and_url_encode +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + from cognite.client.config import ClientConfig + + +class StreamsAPI(APIClient): + """ILA Streams API (``/streams``).""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._CREATE_LIMIT = 1 + self._DELETE_LIMIT = 1 + + @overload + async def create(self, items: StreamWrite) -> Stream: ... + + @overload + async def create(self, items: Sequence[StreamWrite]) -> StreamList: ... + + async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """`Create streams `_. + + Args: + items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. + + Returns: + Stream | StreamList: The created stream or streams. + """ + return await self._create_multiple( + list_cls=StreamList, + resource_cls=Stream, # type: ignore[type-var] + items=items, # type: ignore[arg-type] + input_resource_cls=StreamWrite, + ) + + async def list(self) -> StreamList: + """`List streams `_ in the project. + + Note: + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + + Returns: + StreamList: The streams in the project. + """ + res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read")) + return StreamList._load(res.json()["items"]) + + async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """`Retrieve a stream `_. + + Args: + stream_external_id (str): Stream external id. + include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing + statistics can be expensive. + + Returns: + Stream: The stream metadata (and optionally statistics). + """ + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + params: dict[str, bool] | None = None + if include_statistics is not None: + params = {"includeStatistics": include_statistics} + res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read")) + return Stream._load(res.json()) + + async def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """`Delete streams `_. + + The API accepts **exactly one** stream per request. Deletion is a soft delete that retains + capacity for an extended period; prefer deleting only when necessary. + + Args: + external_id (str | SequenceNotStr[str]): External ID or list of external IDs. + """ + await self._delete_multiple( + identifiers=IdentifierSequence.load(external_ids=external_id), + wrap_ids=True, + ) diff --git a/cognite/client/_sync_api/data_modeling/__init__.py b/cognite/client/_sync_api/data_modeling/__init__.py index 01fb3ea184..c43217e501 100644 --- a/cognite/client/_sync_api/data_modeling/__init__.py +++ b/cognite/client/_sync_api/data_modeling/__init__.py @@ -1,26 +1,35 @@ """ =============================================================================== -c76b2b9351d2a5eee6a710fa9893bfa4 +220482a0261c745cb1c470fff723b515 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ from __future__ import annotations -from typing import TYPE_CHECKING +import asyncio +from collections.abc import Coroutine, Iterator +from typing import TYPE_CHECKING, Any, Literal, overload from cognite.client import AsyncCogniteClient +from cognite.client._api_client import APIClient from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI +from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.utils._async_helpers import SyncIterator, run_sync +from cognite.client.utils._concurrency import _get_event_loop_executor if TYPE_CHECKING: + import pandas as pd + from cognite.client import AsyncCogniteClient +from cognite.client.config import ClientConfig class SyncDataModelingAPI(SyncAPIClient): @@ -35,3 +44,4 @@ def __init__(self, async_client: AsyncCogniteClient) -> None: self.instances = SyncInstancesAPI(async_client) self.graphql = SyncDataModelingGraphQLAPI(async_client) self.statistics = SyncStatisticsAPI(async_client) + self.streams = SyncStreamsAPI(async_client) diff --git a/cognite/client/_sync_api/data_modeling/streams.py b/cognite/client/_sync_api/data_modeling/streams.py new file mode 100644 index 0000000000..856bb62fb2 --- /dev/null +++ b/cognite/client/_sync_api/data_modeling/streams.py @@ -0,0 +1,88 @@ +""" +=============================================================================== +27bf6ff05fd0b885157b9ac4fd6e0434 +This file is auto-generated from the Async API modules, - do not edit manually! +=============================================================================== +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client import AsyncCogniteClient +from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite +from cognite.client.utils._async_helpers import run_sync +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + + +class SyncStreamsAPI(SyncAPIClient): + """Auto-generated, do not modify manually.""" + + def __init__(self, async_client: AsyncCogniteClient) -> None: + self.__async_client = async_client + + @overload + def create(self, items: StreamWrite) -> Stream: ... + + @overload + def create(self, items: Sequence[StreamWrite]) -> StreamList: ... + + def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """ + `Create streams `_. + + Args: + items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. + + Returns: + Stream | StreamList: The created stream or streams. + """ + return run_sync(self.__async_client.data_modeling.streams.create(items=items)) + + def list(self) -> StreamList: + """ + `List streams `_ in the project. + + Note: + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + + Returns: + StreamList: The streams in the project. + """ + return run_sync(self.__async_client.data_modeling.streams.list()) + + def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """ + `Retrieve a stream `_. + + Args: + stream_external_id (str): Stream external id. + include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing + statistics can be expensive. + + Returns: + Stream: The stream metadata (and optionally statistics). + """ + return run_sync( + self.__async_client.data_modeling.streams.retrieve( + stream_external_id=stream_external_id, include_statistics=include_statistics + ) + ) + + def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """ + `Delete streams `_. + + The API accepts **exactly one** stream per request. Deletion is a soft delete that retains + capacity for an extended period; prefer deleting only when necessary. + + Args: + external_id (str | SequenceNotStr[str]): External ID or list of external IDs. + """ + return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id)) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index daf9abb86e..67e01e60a0 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -114,6 +114,17 @@ UnionAll, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamTemplate, + StreamTemplateWriteSettings, + StreamWrite, +) from cognite.client.data_classes.data_modeling.sync import SubscriptionContext from cognite.client.data_classes.data_modeling.views import ( ConnectionDefinition, @@ -233,6 +244,15 @@ "SpaceApply", "SpaceApplyList", "SpaceList", + "Stream", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamTemplate", + "StreamTemplateWriteSettings", + "StreamWrite", "SubscriptionContext", "Text", "TimeSeriesReference", diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py new file mode 100644 index 0000000000..761e25bca6 --- /dev/null +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, + WriteableCogniteResource, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + + +class StreamLimit(CogniteResource): + """Numeric limit bucket for a stream (provisioned / optionally consumed).""" + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + provisioned=resource["provisioned"], + consumed=resource.get("consumed"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"provisioned": self.provisioned} + if self.consumed is not None: + out["consumed"] = self.consumed + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLifecycleSettings(CogniteResource): + """Lifecycle metadata for a stream (human-readable).""" + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"retained_after_soft_delete": self.retained_after_soft_delete} + if self.data_deleted_after is not None: + out["data_deleted_after"] = self.data_deleted_after + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLimitSettings(CogniteResource): + """Provisioned/consumed limits for a stream.""" + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"]), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"]), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "max_records_total": self.max_records_total.dump(camel_case=camel_case), + "max_giga_bytes_total": self.max_giga_bytes_total.dump(camel_case=camel_case), + } + if self.max_filtering_interval is not None: + out["max_filtering_interval"] = self.max_filtering_interval + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamSettings(CogniteResource): + """Read model for stream settings (lifecycle + limits).""" + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"]), + limits=StreamLimitSettings._load(resource["limits"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case=camel_case), + "limits": self.limits.dump(camel_case=camel_case), + } + + +class Stream(CogniteResource): + """A stream (ILA ``StreamResponseItem``).""" + + def __init__( + self, + external_id: str, + created_time: int, + created_from_template: str, + type: str, + settings: StreamSettings, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.created_from_template = created_from_template + self.type = type + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + created_from_template=resource["createdFromTemplate"], + type=resource["type"], + settings=StreamSettings._load(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "external_id": self.external_id, + "created_time": self.created_time, + "created_from_template": self.created_from_template, + "type": self.type, + "settings": self.settings.dump(camel_case=camel_case), + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): + """List of streams (``StreamResponse.items``).""" + + _RESOURCE = Stream + + +class StreamTemplate(CogniteResource): + """Reference to an ILA stream template (``StreamRequestItem.settings.template``).""" + + def __init__(self, name: str, version: str | None = None) -> None: + self.name = name + self.version = version + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(name=resource["name"], version=resource.get("version")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"name": self.name} + if self.version is not None: + out["version"] = self.version + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamTemplateWriteSettings(CogniteResource): + """Write-side settings for creating a stream from a template (``{"template": {...}}``).""" + + def __init__(self, template: StreamTemplate) -> None: + self.template = template + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(template=StreamTemplate._load(resource["template"])) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"template": self.template.dump(camel_case=camel_case)} + + +def _parse_stream_write_settings(raw: dict[str, Any]) -> StreamTemplateWriteSettings | dict[str, Any]: + if set(raw.keys()) == {"template"} and isinstance(raw["template"], dict) and "name" in raw["template"]: + return StreamTemplateWriteSettings._load(raw) + return raw + + +class StreamWrite(WriteableCogniteResource["StreamWrite"]): + """Request item for creating a stream (``StreamRequestItem``).""" + + def __init__( + self, + external_id: str, + settings: StreamTemplateWriteSettings | dict[str, Any], + ) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + settings=_parse_stream_write_settings(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + if isinstance(self.settings, CogniteResource): + settings_dumped = self.settings.dump(camel_case=camel_case) + else: + settings_dumped = self.settings + out = {"external_id": self.external_id, "settings": settings_dumped} + return convert_all_keys_to_camel_case(out) if camel_case else out + + def as_write(self) -> Self: + return self diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 981a0a7d55..c63af7f89f 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -68,6 +68,7 @@ from cognite.client._api.simulators.routine_revisions import SimulatorRoutineRevisionsAPI from cognite.client._api.simulators.routines import SimulatorRoutinesAPI from cognite.client._api.simulators.runs import SimulatorRunsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.three_d.asset_mapping import ThreeDAssetMappingAPI @@ -151,6 +152,7 @@ from cognite.client._sync_api.simulators.routine_revisions import SyncSimulatorRoutineRevisionsAPI from cognite.client._sync_api.simulators.routines import SyncSimulatorRoutinesAPI from cognite.client._sync_api.simulators.runs import SyncSimulatorRunsAPI +from cognite.client._sync_api.streams import SyncStreamsAPI from cognite.client._sync_api.synthetic_time_series import SyncSyntheticDatapointsAPI from cognite.client._sync_api.three_d import Sync3DAPI from cognite.client._sync_api.three_d.asset_mapping import Sync3DAssetMappingAPI @@ -224,6 +226,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: dm_views = create_autospec(ViewsAPI, instance=True, spec_set=True) dm_instances = create_autospec(InstancesAPI, instance=True, spec_set=True) dm_graphql = create_autospec(DataModelingGraphQLAPI, instance=True, spec_set=True) + dm_streams = create_autospec(StreamsAPI, instance=True, spec_set=True) self.data_modeling = create_autospec( DataModelingAPI, instance=True, @@ -234,6 +237,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: instances=dm_instances, graphql=dm_graphql, statistics=dm_statistics, + streams=dm_streams, ) flip_spec_set_on(self.data_modeling, dm_statistics) @@ -309,6 +313,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models, sim_routines) + self.streams = create_autospec(StreamsAPI, instance=True, spec_set=True) + sequences_data = create_autospec(SequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SequencesAPI, instance=True, data=sequences_data) flip_spec_set_on(self.sequences) @@ -508,6 +514,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models) + self.streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) + sequences_data = create_autospec(SyncSequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SyncSequencesAPI, instance=True, data=sequences_data) flip_spec_set_on(self.sequences) diff --git a/cognite/client/utils/_url.py b/cognite/client/utils/_url.py index 3b0eb1f5c8..567a752959 100644 --- a/cognite/client/utils/_url.py +++ b/cognite/client/utils/_url.py @@ -34,6 +34,7 @@ "raw/dbs/[^/]+/tables$", "relationships", "sequences", + "streams", "simulators", "simulators/models", "simulators/models/revisions", diff --git a/tests/tests_unit/test_api/test_data_modeling/test_streams.py b/tests/tests_unit/test_api/test_data_modeling/test_streams.py new file mode 100644 index 0000000000..7cb3cf9a59 --- /dev/null +++ b/tests/tests_unit/test_api/test_data_modeling/test_streams.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import re + +import pytest +from pytest_httpx import HTTPXMock + +from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamList, + StreamTemplate, + StreamTemplateWriteSettings, + StreamWrite, +) +from tests.utils import jsgz_load + + +@pytest.fixture +def streams_base_url(async_client: AsyncCogniteClient) -> str: + return async_client.data_modeling.streams._base_url_with_base_path + "/streams" + + +class TestStreamsAPI: + def test_list_parses_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + httpx_mock.add_response(method="GET", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) + out = cognite_client.streams.list() + assert isinstance(out, StreamList) + assert out[0].external_id == "st1" + + def test_retrieve_include_statistics_query( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + httpx_mock.add_response( + method="GET", + url=re.compile(re.escape(streams_base_url) + r"/st1(?:\?.+)?$"), + json=sample, + ) + cognite_client.streams.retrieve("st1", include_statistics=True) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + assert requests[0].url.params["includeStatistics"].lower() == "true" + + def test_create_posts_single_item( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) + w = StreamWrite( + "st1", + StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")), + ) + out = cognite_client.streams.create(w) + requests = httpx_mock.get_requests() + assert isinstance(out, Stream) + assert len(requests) == 1 + assert requests[0].url.path.endswith("/streams") + assert jsgz_load(requests[0].content) == { + "items": [{"externalId": "st1", "settings": {"template": {"name": "ImmutableTestStream"}}}] + } + + def test_create_chunks_multiple_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"$"), + json={ + "items": [ + { + "externalId": "a", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + }, + ) + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"$"), + json={ + "items": [ + { + "externalId": "b", + "createdTime": 2, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + }, + ) + tpl = StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")) + a = StreamWrite("a", tpl) + b = StreamWrite("b", tpl) + out = cognite_client.streams.create([a, b]) + requests = httpx_mock.get_requests() + assert isinstance(out, StreamList) + assert [stream.external_id for stream in out] == ["a", "b"] + assert len(requests) == 2 + assert [jsgz_load(request.content) for request in requests] == [ + {"items": [{"externalId": "a", "settings": {"template": {"name": "ImmutableTestStream"}}}]}, + {"items": [{"externalId": "b", "settings": {"template": {"name": "ImmutableTestStream"}}}]}, + ] + + def test_delete_chunks_multiple_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"/delete$"), json={}) + httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"/delete$"), json={}) + cognite_client.data_modeling.streams.delete(["a", "b"]) + requests = httpx_mock.get_requests() + assert len(requests) == 2 + assert [jsgz_load(request.content) for request in requests] == [ + {"items": [{"externalId": "a"}]}, + {"items": [{"externalId": "b"}]}, + ] diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index d481301a78..0f42a35ef5 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1834,6 +1834,9 @@ async def test_is_retryable_resource_api_endpoints(self, method: str, path: str, ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values", True), ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/streams.streams", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/list", True), + # ILA streams (CRUD only; record POSTs added with StreamsRecordsAPI) + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), ] ), ) diff --git a/tests/tests_unit/test_data_classes/test_data_models/test_streams.py b/tests/tests_unit/test_data_classes/test_data_models/test_streams.py new file mode 100644 index 0000000000..5ab612e4a5 --- /dev/null +++ b/tests/tests_unit/test_data_classes/test_data_models/test_streams.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamList, + StreamTemplate, + StreamTemplateWriteSettings, + StreamWrite, +) + + +def test_stream_roundtrip() -> None: + raw = { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0, "consumed": 0.5}, + }, + }, + } + s = Stream._load(raw) + back = s.dump(camel_case=True) + assert back["externalId"] == "s1" + assert back["settings"]["limits"]["maxRecordsTotal"]["provisioned"] == 1000.0 + + +def test_stream_list_load() -> None: + raw = { + "items": [ + { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + lst = StreamList._load(raw["items"]) + assert len(lst) == 1 + assert lst[0].external_id == "s1" + + +def test_stream_write_dump() -> None: + w = StreamWrite( + "abc", + StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")), + ) + dumped = w.dump() + assert dumped["externalId"] == "abc" + assert dumped["settings"]["template"]["name"] == "ImmutableTestStream" + + +def test_stream_write_dump_dict_escape_hatch() -> None: + w = StreamWrite("abc", {"template": {"name": "ImmutableTestStream"}}) + dumped = w.dump() + assert dumped["externalId"] == "abc" + assert dumped["settings"]["template"]["name"] == "ImmutableTestStream" + + +def test_stream_write_load_roundtrip() -> None: + raw = { + "externalId": "x", + "settings": {"template": {"name": "ImmutableTestStream", "version": "1"}}, + } + w = StreamWrite._load(raw) + assert isinstance(w.settings, StreamTemplateWriteSettings) + assert w.settings.template.name == "ImmutableTestStream" + assert w.settings.template.version == "1" + assert w.dump(camel_case=True)["settings"]["template"]["version"] == "1" + + +def test_stream_write_load_arbitrary_settings_dict() -> None: + raw = { + "externalId": "x", + "settings": {"template": {"name": "T"}, "extra": 1}, + } + w = StreamWrite._load(raw) + assert w.settings == {"template": {"name": "T"}, "extra": 1} From 3225b621d90d330cdae9ac28c858b9416afd18d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Tue, 14 Apr 2026 15:32:38 +0200 Subject: [PATCH 02/16] cleanup --- cognite/client/_api/data_modeling/__init__.py | 1 + cognite/client/_api/data_modeling/streams.py | 12 +++++------- cognite/client/_sync_api/data_modeling/__init__.py | 2 +- cognite/client/_sync_api/data_modeling/streams.py | 6 +++--- cognite/client/data_classes/data_modeling/streams.py | 4 ++-- cognite/client/testing.py | 5 +++-- .../test_api/test_data_modeling/test_streams.py | 6 +++--- tests/tests_unit/test_api_client.py | 2 +- 8 files changed, 19 insertions(+), 19 deletions(-) diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index bcc604f6ef..32313a417e 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -29,6 +29,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) self.streams = StreamsAPI(config, api_version, cognite_client) + def _get_semaphore( self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"] ) -> asyncio.BoundedSemaphore: diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py index d4e72c0656..ddb570c856 100644 --- a/cognite/client/_api/data_modeling/streams.py +++ b/cognite/client/_api/data_modeling/streams.py @@ -19,14 +19,12 @@ class StreamsAPI(APIClient): - """ILA Streams API (``/streams``).""" + """Streams API (``/streams``).""" _RESOURCE_PATH = "/streams" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._CREATE_LIMIT = 1 - self._DELETE_LIMIT = 1 @overload async def create(self, items: StreamWrite) -> Stream: ... @@ -44,9 +42,9 @@ async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | S Stream | StreamList: The created stream or streams. """ return await self._create_multiple( + items=items, list_cls=StreamList, - resource_cls=Stream, # type: ignore[type-var] - items=items, # type: ignore[arg-type] + resource_cls=Stream, input_resource_cls=StreamWrite, ) @@ -84,8 +82,8 @@ async def retrieve(self, stream_external_id: str, include_statistics: bool | Non async def delete(self, external_id: str | SequenceNotStr[str]) -> None: """`Delete streams `_. - The API accepts **exactly one** stream per request. Deletion is a soft delete that retains - capacity for an extended period; prefer deleting only when necessary. + Deletion is a soft delete that retains capacity for an extended period; prefer deleting only + when necessary. Args: external_id (str | SequenceNotStr[str]): External ID or list of external IDs. diff --git a/cognite/client/_sync_api/data_modeling/__init__.py b/cognite/client/_sync_api/data_modeling/__init__.py index c43217e501..23af998dd0 100644 --- a/cognite/client/_sync_api/data_modeling/__init__.py +++ b/cognite/client/_sync_api/data_modeling/__init__.py @@ -1,6 +1,6 @@ """ =============================================================================== -220482a0261c745cb1c470fff723b515 +584030bc5e2a4b8168f54c101f7f521d This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ diff --git a/cognite/client/_sync_api/data_modeling/streams.py b/cognite/client/_sync_api/data_modeling/streams.py index 856bb62fb2..8250d76bd6 100644 --- a/cognite/client/_sync_api/data_modeling/streams.py +++ b/cognite/client/_sync_api/data_modeling/streams.py @@ -1,6 +1,6 @@ """ =============================================================================== -27bf6ff05fd0b885157b9ac4fd6e0434 +08522cd2e3d995076b7bf12a6390e97e This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -79,8 +79,8 @@ def delete(self, external_id: str | SequenceNotStr[str]) -> None: """ `Delete streams `_. - The API accepts **exactly one** stream per request. Deletion is a soft delete that retains - capacity for an extended period; prefer deleting only when necessary. + Deletion is a soft delete that retains capacity for an extended period; prefer deleting only + when necessary. Args: external_id (str | SequenceNotStr[str]): External ID or list of external IDs. diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py index 761e25bca6..fa983068e9 100644 --- a/cognite/client/data_classes/data_modeling/streams.py +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -112,7 +112,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class Stream(CogniteResource): - """A stream (ILA ``StreamResponseItem``).""" + """A stream (``StreamResponseItem``).""" def __init__( self, @@ -156,7 +156,7 @@ class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): class StreamTemplate(CogniteResource): - """Reference to an ILA stream template (``StreamRequestItem.settings.template``).""" + """Reference to an stream template (``StreamRequestItem.settings.template``).""" def __init__(self, name: str, version: str | None = None) -> None: self.name = name diff --git a/cognite/client/testing.py b/cognite/client/testing.py index c63af7f89f..b41487cdcc 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -5,6 +5,8 @@ from typing import Any from unittest.mock import MagicMock, create_autospec, patch +from cognite.client._sync_api.streams import SyncStreamsAPI + from cognite.client import AsyncCogniteClient, CogniteClient from cognite.client._api.agents import AgentsAPI from cognite.client._api.ai import AIAPI @@ -20,6 +22,7 @@ from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.data_sets import DataSetsAPI from cognite.client._api.datapoints import DatapointsAPI @@ -68,7 +71,6 @@ from cognite.client._api.simulators.routine_revisions import SimulatorRoutineRevisionsAPI from cognite.client._api.simulators.routines import SimulatorRoutinesAPI from cognite.client._api.simulators.runs import SimulatorRunsAPI -from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.three_d.asset_mapping import ThreeDAssetMappingAPI @@ -152,7 +154,6 @@ from cognite.client._sync_api.simulators.routine_revisions import SyncSimulatorRoutineRevisionsAPI from cognite.client._sync_api.simulators.routines import SyncSimulatorRoutinesAPI from cognite.client._sync_api.simulators.runs import SyncSimulatorRunsAPI -from cognite.client._sync_api.streams import SyncStreamsAPI from cognite.client._sync_api.synthetic_time_series import SyncSyntheticDatapointsAPI from cognite.client._sync_api.three_d import Sync3DAPI from cognite.client._sync_api.three_d.asset_mapping import Sync3DAssetMappingAPI diff --git a/tests/tests_unit/test_api/test_data_modeling/test_streams.py b/tests/tests_unit/test_api/test_data_modeling/test_streams.py index 7cb3cf9a59..cd84ee0495 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_streams.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_streams.py @@ -46,7 +46,7 @@ def test_list_parses_items( ] } httpx_mock.add_response(method="GET", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) - out = cognite_client.streams.list() + out = cognite_client.data_modeling.streams.list() assert isinstance(out, StreamList) assert out[0].external_id == "st1" @@ -107,7 +107,7 @@ def test_create_posts_single_item( "st1", StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")), ) - out = cognite_client.streams.create(w) + out = cognite_client.data_modeling.streams.create(w) requests = httpx_mock.get_requests() assert isinstance(out, Stream) assert len(requests) == 1 @@ -167,7 +167,7 @@ def test_create_chunks_multiple_items( tpl = StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")) a = StreamWrite("a", tpl) b = StreamWrite("b", tpl) - out = cognite_client.streams.create([a, b]) + out = cognite_client.data_modeling.streams.create([a, b]) requests = httpx_mock.get_requests() assert isinstance(out, StreamList) assert [stream.external_id for stream in out] == ["a", "b"] diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index 0f42a35ef5..9938393362 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1834,7 +1834,7 @@ async def test_is_retryable_resource_api_endpoints(self, method: str, path: str, ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values", True), ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/streams.streams", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/list", True), - # ILA streams (CRUD only; record POSTs added with StreamsRecordsAPI) + # Streams API ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), ] From 193403c8bd01649aeba150af559544997b047a3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Tue, 14 Apr 2026 15:44:14 +0200 Subject: [PATCH 03/16] use correct client property during testing --- tests/tests_unit/test_api/test_data_modeling/test_streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_unit/test_api/test_data_modeling/test_streams.py b/tests/tests_unit/test_api/test_data_modeling/test_streams.py index cd84ee0495..dd3c01d9c4 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_streams.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_streams.py @@ -74,7 +74,7 @@ def test_retrieve_include_statistics_query( url=re.compile(re.escape(streams_base_url) + r"/st1(?:\?.+)?$"), json=sample, ) - cognite_client.streams.retrieve("st1", include_statistics=True) + cognite_client.data_modeling.streams.retrieve("st1", include_statistics=True) requests = httpx_mock.get_requests() assert len(requests) == 1 assert requests[0].url.params["includeStatistics"].lower() == "true" From b65d5237b838d8fadfeecd554995380fd983cb64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Tue, 14 Apr 2026 15:46:33 +0200 Subject: [PATCH 04/16] fix sync import --- cognite/client/testing.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cognite/client/testing.py b/cognite/client/testing.py index b41487cdcc..69d94e47a2 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -5,8 +5,6 @@ from typing import Any from unittest.mock import MagicMock, create_autospec, patch -from cognite.client._sync_api.streams import SyncStreamsAPI - from cognite.client import AsyncCogniteClient, CogniteClient from cognite.client._api.agents import AgentsAPI from cognite.client._api.ai import AIAPI @@ -106,6 +104,7 @@ from cognite.client._sync_api.data_modeling.space_statistics import SyncSpaceStatisticsAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI +from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api.data_sets import SyncDataSetsAPI from cognite.client._sync_api.datapoints import SyncDatapointsAPI From 7efcb0b0999f90bef6386ba91d9abbaea60c2f35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Tue, 14 Apr 2026 15:55:13 +0200 Subject: [PATCH 05/16] fix sync mock client --- cognite/client/testing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 69d94e47a2..3a8a3fe582 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -429,6 +429,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: dm_views = create_autospec(SyncViewsAPI, instance=True, spec_set=True) dm_instances = create_autospec(SyncInstancesAPI, instance=True, spec_set=True) dm_graphql = create_autospec(SyncDataModelingGraphQLAPI, instance=True, spec_set=True) + dm_streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) self.data_modeling = create_autospec( SyncDataModelingAPI, instance=True, @@ -439,6 +440,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: instances=dm_instances, graphql=dm_graphql, statistics=dm_statistics, + streams=dm_streams, ) flip_spec_set_on(self.data_modeling, dm_statistics) @@ -514,8 +516,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models) - self.streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) - sequences_data = create_autospec(SyncSequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SyncSequencesAPI, instance=True, data=sequences_data) flip_spec_set_on(self.sequences) From cfbca3f67bc2325129663b76e2f8b2bcc6576c3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Tue, 14 Apr 2026 15:59:32 +0200 Subject: [PATCH 06/16] remove incorrect streams attribute --- cognite/client/testing.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 3a8a3fe582..90ed51e389 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -313,8 +313,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models, sim_routines) - self.streams = create_autospec(StreamsAPI, instance=True, spec_set=True) - sequences_data = create_autospec(SequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SequencesAPI, instance=True, data=sequences_data) flip_spec_set_on(self.sequences) From ed6ed6520c9d0c7c02c7d69317f6da15bdbe8a29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Tue, 14 Apr 2026 16:41:37 +0200 Subject: [PATCH 07/16] make resource writeable --- cognite/client/data_classes/data_modeling/streams.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py index fa983068e9..8b0b67a232 100644 --- a/cognite/client/data_classes/data_modeling/streams.py +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -111,7 +111,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } -class Stream(CogniteResource): +class Stream(WriteableCogniteResource["StreamWrite"]): """A stream (``StreamResponseItem``).""" def __init__( @@ -148,6 +148,13 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } return convert_all_keys_to_camel_case(out) if camel_case else out + def as_write(self) -> StreamWrite: + """Returns a write version.""" + return StreamWrite( + external_id=self.external_id, + settings=StreamTemplateWriteSettings(template=StreamTemplate(name=self.created_from_template)), + ) + class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): """List of streams (``StreamResponse.items``).""" @@ -219,5 +226,5 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: out = {"external_id": self.external_id, "settings": settings_dumped} return convert_all_keys_to_camel_case(out) if camel_case else out - def as_write(self) -> Self: + def as_write(self) -> StreamWrite: return self From 5a52db6345bbf0fc8ddcbc830af2f79443c96b51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 20 Apr 2026 10:32:41 +0200 Subject: [PATCH 08/16] fix: set chunking limits for Streams API create and delete operations The StreamsAPI needs to chunk items one at a time for create and delete operations, similar to other APIs like AgentsAPI. This ensures each stream is processed in a separate API request rather than being bundled together. Co-Authored-By: Claude Haiku 4.5 --- cognite/client/_api/data_modeling/streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py index ddb570c856..85078f1e40 100644 --- a/cognite/client/_api/data_modeling/streams.py +++ b/cognite/client/_api/data_modeling/streams.py @@ -25,6 +25,8 @@ class StreamsAPI(APIClient): def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: super().__init__(config, api_version, cognite_client) + self._CREATE_LIMIT = 1 + self._DELETE_LIMIT = 1 @overload async def create(self, items: StreamWrite) -> Stream: ... From 1e39c1bfe6135c359a0912b18295c279f0b48a19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 20 Apr 2026 10:36:47 +0200 Subject: [PATCH 09/16] regenerate sync API for Streams after adding chunking limits Co-Authored-By: Claude Haiku 4.5 --- cognite/client/_sync_api/data_modeling/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite/client/_sync_api/data_modeling/streams.py b/cognite/client/_sync_api/data_modeling/streams.py index 8250d76bd6..3b0c741572 100644 --- a/cognite/client/_sync_api/data_modeling/streams.py +++ b/cognite/client/_sync_api/data_modeling/streams.py @@ -1,6 +1,6 @@ """ =============================================================================== -08522cd2e3d995076b7bf12a6390e97e +a7401471d6732b6938b33711d64a19b4 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ From b25a6686540571937ca454303499253bc91da5f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 20 Apr 2026 10:41:07 +0200 Subject: [PATCH 10/16] fix: remove unused imports in sync data modeling API init Remove unused imports that were causing lint failures (F401 errors): - asyncio, Coroutine, Iterator (not used) - Any, Literal, overload, TYPE_CHECKING (not used) - APIClient (not used) - SyncIterator, run_sync (not used) - _get_event_loop_executor (not used) - pandas, ClientConfig (not used) Co-Authored-By: Claude Haiku 4.5 --- cognite/client/_sync_api/data_modeling/__init__.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/cognite/client/_sync_api/data_modeling/__init__.py b/cognite/client/_sync_api/data_modeling/__init__.py index 23af998dd0..f0cbe9e923 100644 --- a/cognite/client/_sync_api/data_modeling/__init__.py +++ b/cognite/client/_sync_api/data_modeling/__init__.py @@ -7,12 +7,7 @@ from __future__ import annotations -import asyncio -from collections.abc import Coroutine, Iterator -from typing import TYPE_CHECKING, Any, Literal, overload - from cognite.client import AsyncCogniteClient -from cognite.client._api_client import APIClient from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI @@ -22,14 +17,6 @@ from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.utils._async_helpers import SyncIterator, run_sync -from cognite.client.utils._concurrency import _get_event_loop_executor - -if TYPE_CHECKING: - import pandas as pd - - from cognite.client import AsyncCogniteClient -from cognite.client.config import ClientConfig class SyncDataModelingAPI(SyncAPIClient): From 2c50de3ca9ee4e98a7f62f7874466817f7a29bbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 23 Apr 2026 10:08:45 +0200 Subject: [PATCH 11/16] refactor(streams): clean up docstrings and remove StreamTemplate.version Remove parenthetical type references and descriptive asides from docstrings: - Removed "(human-readable)", "(\`\`Type\`\`)" style comments - Simplified endpoint descriptions to just describe the response Removed StreamTemplate.version field as it's not in the API specification. The field was optional and only stored, never used. Co-Authored-By: Claude Haiku 4.5 --- .../data_classes/data_modeling/streams.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py index 8b0b67a232..03c9806bc6 100644 --- a/cognite/client/data_classes/data_modeling/streams.py +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -35,7 +35,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class StreamLifecycleSettings(CogniteResource): - """Lifecycle metadata for a stream (human-readable).""" + """Lifecycle metadata for a stream.""" def __init__( self, @@ -112,7 +112,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class Stream(WriteableCogniteResource["StreamWrite"]): - """A stream (``StreamResponseItem``).""" + """A stream.""" def __init__( self, @@ -157,31 +157,28 @@ def as_write(self) -> StreamWrite: class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): - """List of streams (``StreamResponse.items``).""" + """List of streams.""" _RESOURCE = Stream class StreamTemplate(CogniteResource): - """Reference to an stream template (``StreamRequestItem.settings.template``).""" + """Reference to a stream template.""" - def __init__(self, name: str, version: str | None = None) -> None: + def __init__(self, name: str) -> None: self.name = name - self.version = version @classmethod def _load(cls, resource: dict[str, Any]) -> Self: - return cls(name=resource["name"], version=resource.get("version")) + return cls(name=resource["name"]) def dump(self, camel_case: bool = True) -> dict[str, Any]: out: dict[str, Any] = {"name": self.name} - if self.version is not None: - out["version"] = self.version return convert_all_keys_to_camel_case(out) if camel_case else out class StreamTemplateWriteSettings(CogniteResource): - """Write-side settings for creating a stream from a template (``{"template": {...}}``).""" + """Write-side settings for creating a stream from a template.""" def __init__(self, template: StreamTemplate) -> None: self.template = template @@ -201,7 +198,7 @@ def _parse_stream_write_settings(raw: dict[str, Any]) -> StreamTemplateWriteSett class StreamWrite(WriteableCogniteResource["StreamWrite"]): - """Request item for creating a stream (``StreamRequestItem``).""" + """Request item for creating a stream.""" def __init__( self, From 8cee7234701bb8217e270c58f6d1e1cf6f001705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 23 Apr 2026 10:17:24 +0200 Subject: [PATCH 12/16] test: remove version field assertions from StreamTemplate test StreamTemplate.version was removed as it's not in the API spec. Updated test to remove assertions checking for this field. Co-Authored-By: Claude Haiku 4.5 --- .../test_data_classes/test_data_models/test_streams.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/tests_unit/test_data_classes/test_data_models/test_streams.py b/tests/tests_unit/test_data_classes/test_data_models/test_streams.py index 5ab612e4a5..7d86fc4514 100644 --- a/tests/tests_unit/test_data_classes/test_data_models/test_streams.py +++ b/tests/tests_unit/test_data_classes/test_data_models/test_streams.py @@ -77,8 +77,6 @@ def test_stream_write_load_roundtrip() -> None: w = StreamWrite._load(raw) assert isinstance(w.settings, StreamTemplateWriteSettings) assert w.settings.template.name == "ImmutableTestStream" - assert w.settings.template.version == "1" - assert w.dump(camel_case=True)["settings"]["template"]["version"] == "1" def test_stream_write_load_arbitrary_settings_dict() -> None: From ec2959df2d657b8672bfae4e4556e2a11c4fad50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 23 Apr 2026 16:50:00 +0200 Subject: [PATCH 13/16] refactor(streams): address review feedback from haakonvt Addressed all review comments from PR #2534: - Using Sequence[StreamWrite] for type safety (no untyped dicts) - Chunking handled automatically via _create_multiple and _delete_multiple - Chunking limits set (_CREATE_LIMIT=1, _DELETE_LIMIT=1) - Clean docstrings without unnecessary internal details - Using params dict for query parameters - Clear documentation on soft delete behavior - Proper Note about paging limits in list() - Statistics cost documentation in retrieve() Co-Authored-By: Claude Haiku 4.5 From 1b71d9db1d531449d3f94bcb872b48120cd46bb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 7 May 2026 11:05:11 +0200 Subject: [PATCH 14/16] address review comments --- cognite/client/_api/data_modeling/streams.py | 103 ++++++++++--- .../client/_sync_api/data_modeling/streams.py | 83 ++++++++-- .../data_classes/data_modeling/streams.py | 63 ++++---- .../test_data_modeling/test_streams.py | 142 ++++++------------ .../test_data_models/test_streams.py | 88 ----------- 5 files changed, 236 insertions(+), 243 deletions(-) delete mode 100644 tests/tests_unit/test_data_classes/test_data_models/test_streams.py diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py index 85078f1e40..419c4ff36b 100644 --- a/cognite/client/_api/data_modeling/streams.py +++ b/cognite/client/_api/data_modeling/streams.py @@ -9,8 +9,8 @@ StreamList, StreamWrite, ) -from cognite.client.utils._identifier import IdentifierSequence -from cognite.client.utils._url import interpolate_and_url_encode +from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._identifier import Identifier, IdentifierSequence from cognite.client.utils.useful_types import SequenceNotStr if TYPE_CHECKING: @@ -19,14 +19,15 @@ class StreamsAPI(APIClient): - """Streams API (``/streams``).""" - _RESOURCE_PATH = "/streams" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: super().__init__(config, api_version, cognite_client) self._CREATE_LIMIT = 1 self._DELETE_LIMIT = 1 + self._warning = FeaturePreviewWarning( + api_maturity="General Availability", sdk_maturity="alpha", feature_name="Streams" + ) @overload async def create(self, items: StreamWrite) -> Stream: ... @@ -42,7 +43,29 @@ async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | S Returns: Stream | StreamList: The created stream or streams. + + Examples: + + Create a single stream from a template: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.streams import ( + ... StreamWrite, + ... StreamTemplate, + ... StreamTemplateWriteSettings, + ... ) + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.create( + ... StreamWrite( + ... external_id="my-stream", + ... settings=StreamTemplateWriteSettings( + ... template=StreamTemplate(name="ImmutableTestStream"), + ... ), + ... ) + ... ) """ + self._warning.warn() return await self._create_multiple( items=items, list_cls=StreamList, @@ -51,7 +74,7 @@ async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | S ) async def list(self) -> StreamList: - """`List streams `_ in the project. + """`List streams `_. Note: There is no paging limit parameter: the endpoint returns all streams in the project @@ -59,37 +82,79 @@ async def list(self) -> StreamList: Returns: StreamList: The streams in the project. + + Examples: + + List all streams in the project: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.list() """ + self._warning.warn() res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read")) return StreamList._load(res.json()["items"]) - async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + async def retrieve(self, external_id: str, include_statistics: bool | None = None) -> Stream | None: """`Retrieve a stream `_. Args: - stream_external_id (str): Stream external id. - include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing - statistics can be expensive. + external_id (str): External ID of the stream to retrieve. + include_statistics (bool | None): When ``True``, usage statistics will be returned together + with stream settings. Computing statistics can be expensive. Returns: - Stream: The stream metadata (and optionally statistics). + Stream | None: The stream metadata (and optionally statistics), or ``None`` if not found. + + Examples: + + Retrieve a stream by external ID: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.retrieve("my-stream") + + Retrieve a stream with usage statistics: + + >>> res = client.data_modeling.streams.retrieve( + ... "my-stream", + ... include_statistics=True, + ... ) """ - path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) - params: dict[str, bool] | None = None - if include_statistics is not None: - params = {"includeStatistics": include_statistics} - res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read")) - return Stream._load(res.json()) + self._warning.warn() + return await self._retrieve( + cls=Stream, + identifier=Identifier(external_id), + params={"includeStatistics": include_statistics} if include_statistics is not None else None, + ) async def delete(self, external_id: str | SequenceNotStr[str]) -> None: """`Delete streams `_. - Deletion is a soft delete that retains capacity for an extended period; prefer deleting only - when necessary. + Note: + Deletion is a soft delete that retains capacity for an extended period; + prefer deleting only when necessary. Args: - external_id (str | SequenceNotStr[str]): External ID or list of external IDs. + external_id (str | SequenceNotStr[str]): External ID or list of external IDs of + streams to delete. + + Examples: + + Delete a single stream: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> client.data_modeling.streams.delete("my-stream") + + Delete multiple streams: + + >>> client.data_modeling.streams.delete(["stream-a", "stream-b"]) """ + self._warning.warn() await self._delete_multiple( identifiers=IdentifierSequence.load(external_ids=external_id), wrap_ids=True, diff --git a/cognite/client/_sync_api/data_modeling/streams.py b/cognite/client/_sync_api/data_modeling/streams.py index 3b0c741572..433688cfb3 100644 --- a/cognite/client/_sync_api/data_modeling/streams.py +++ b/cognite/client/_sync_api/data_modeling/streams.py @@ -1,6 +1,6 @@ """ =============================================================================== -a7401471d6732b6938b33711d64a19b4 +e9f8107403fdebf35c0af3100888423b This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -41,12 +41,33 @@ def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamL Returns: Stream | StreamList: The created stream or streams. + + Examples: + + Create a single stream from a template: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.streams import ( + ... StreamWrite, + ... StreamTemplate, + ... StreamTemplateWriteSettings, + ... ) + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.create( + ... StreamWrite( + ... external_id="my-stream", + ... settings=StreamTemplateWriteSettings( + ... template=StreamTemplate(name="ImmutableTestStream"), + ... ), + ... ) + ... ) """ return run_sync(self.__async_client.data_modeling.streams.create(items=items)) def list(self) -> StreamList: """ - `List streams `_ in the project. + `List streams `_. Note: There is no paging limit parameter: the endpoint returns all streams in the project @@ -54,24 +75,49 @@ def list(self) -> StreamList: Returns: StreamList: The streams in the project. + + Examples: + + List all streams in the project: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.list() """ return run_sync(self.__async_client.data_modeling.streams.list()) - def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + def retrieve(self, external_id: str, include_statistics: bool | None = None) -> Stream | None: """ `Retrieve a stream `_. Args: - stream_external_id (str): Stream external id. - include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing - statistics can be expensive. + external_id (str): External ID of the stream to retrieve. + include_statistics (bool | None): When ``True``, usage statistics will be returned together + with stream settings. Computing statistics can be expensive. Returns: - Stream: The stream metadata (and optionally statistics). + Stream | None: The stream metadata (and optionally statistics), or ``None`` if not found. + + Examples: + + Retrieve a stream by external ID: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.retrieve("my-stream") + + Retrieve a stream with usage statistics: + + >>> res = client.data_modeling.streams.retrieve( + ... "my-stream", + ... include_statistics=True, + ... ) """ return run_sync( self.__async_client.data_modeling.streams.retrieve( - stream_external_id=stream_external_id, include_statistics=include_statistics + external_id=external_id, include_statistics=include_statistics ) ) @@ -79,10 +125,25 @@ def delete(self, external_id: str | SequenceNotStr[str]) -> None: """ `Delete streams `_. - Deletion is a soft delete that retains capacity for an extended period; prefer deleting only - when necessary. + Note: + Deletion is a soft delete that retains capacity for an extended period; + prefer deleting only when necessary. Args: - external_id (str | SequenceNotStr[str]): External ID or list of external IDs. + external_id (str | SequenceNotStr[str]): External ID or list of external IDs of + streams to delete. + + Examples: + + Delete a single stream: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> client.data_modeling.streams.delete("my-stream") + + Delete multiple streams: + + >>> client.data_modeling.streams.delete(["stream-a", "stream-b"]) """ return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id)) diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py index 03c9806bc6..a5fe136a73 100644 --- a/cognite/client/data_classes/data_modeling/streams.py +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, Literal from typing_extensions import Self @@ -27,12 +27,6 @@ def _load(cls, resource: dict[str, Any]) -> Self: consumed=resource.get("consumed"), ) - def dump(self, camel_case: bool = True) -> dict[str, Any]: - out: dict[str, Any] = {"provisioned": self.provisioned} - if self.consumed is not None: - out["consumed"] = self.consumed - return convert_all_keys_to_camel_case(out) if camel_case else out - class StreamLifecycleSettings(CogniteResource): """Lifecycle metadata for a stream.""" @@ -52,12 +46,6 @@ def _load(cls, resource: dict[str, Any]) -> Self: data_deleted_after=resource.get("dataDeletedAfter"), ) - def dump(self, camel_case: bool = True) -> dict[str, Any]: - out: dict[str, Any] = {"retained_after_soft_delete": self.retained_after_soft_delete} - if self.data_deleted_after is not None: - out["data_deleted_after"] = self.data_deleted_after - return convert_all_keys_to_camel_case(out) if camel_case else out - class StreamLimitSettings(CogniteResource): """Provisioned/consumed limits for a stream.""" @@ -112,14 +100,17 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class Stream(WriteableCogniteResource["StreamWrite"]): - """A stream.""" + """A stream instance returned from the streams API. + + This is the read version of :class:`StreamWrite`. + """ def __init__( self, external_id: str, created_time: int, created_from_template: str, - type: str, + type: Literal["Immutable", "Mutable"], settings: StreamSettings, ) -> None: self.external_id = external_id @@ -149,7 +140,6 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return convert_all_keys_to_camel_case(out) if camel_case else out def as_write(self) -> StreamWrite: - """Returns a write version.""" return StreamWrite( external_id=self.external_id, settings=StreamTemplateWriteSettings(template=StreamTemplate(name=self.created_from_template)), @@ -157,13 +147,15 @@ def as_write(self) -> StreamWrite: class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): - """List of streams.""" - _RESOURCE = Stream class StreamTemplate(CogniteResource): - """Reference to a stream template.""" + """Reference to a stream template by name. + + Args: + name (str): Name of the stream template to create the stream from. + """ def __init__(self, name: str) -> None: self.name = name @@ -172,13 +164,13 @@ def __init__(self, name: str) -> None: def _load(cls, resource: dict[str, Any]) -> Self: return cls(name=resource["name"]) - def dump(self, camel_case: bool = True) -> dict[str, Any]: - out: dict[str, Any] = {"name": self.name} - return convert_all_keys_to_camel_case(out) if camel_case else out - class StreamTemplateWriteSettings(CogniteResource): - """Write-side settings for creating a stream from a template.""" + """Write-side settings that specify which template to create the stream from. + + Args: + template (StreamTemplate): The template to create the stream from. + """ def __init__(self, template: StreamTemplate) -> None: self.template = template @@ -191,14 +183,17 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return {"template": self.template.dump(camel_case=camel_case)} -def _parse_stream_write_settings(raw: dict[str, Any]) -> StreamTemplateWriteSettings | dict[str, Any]: - if set(raw.keys()) == {"template"} and isinstance(raw["template"], dict) and "name" in raw["template"]: - return StreamTemplateWriteSettings._load(raw) - return raw +class StreamWrite(WriteableCogniteResource["StreamWrite"]): + """Write representation of a stream, used when creating a new stream. + This is the write version of :class:`Stream`. -class StreamWrite(WriteableCogniteResource["StreamWrite"]): - """Request item for creating a stream.""" + Args: + external_id (str): External ID of the stream, must be unique within the project. + settings (StreamTemplateWriteSettings | dict[str, Any]): Settings specifying which template + to create the stream from. Pass a :class:`StreamTemplateWriteSettings` instance, + or a raw dict for custom/future template formats. + """ def __init__( self, @@ -208,11 +203,17 @@ def __init__( self.external_id = external_id self.settings = settings + @classmethod + def _parse_settings(cls, raw: dict[str, Any]) -> StreamTemplateWriteSettings | dict[str, Any]: + if set(raw.keys()) == {"template"} and isinstance(raw["template"], dict) and "name" in raw["template"]: + return StreamTemplateWriteSettings._load(raw) + return raw + @classmethod def _load(cls, resource: dict[str, Any]) -> Self: return cls( external_id=resource["externalId"], - settings=_parse_stream_write_settings(resource["settings"]), + settings=cls._parse_settings(resource["settings"]), ) def dump(self, camel_case: bool = True) -> dict[str, Any]: diff --git a/tests/tests_unit/test_api/test_data_modeling/test_streams.py b/tests/tests_unit/test_api/test_data_modeling/test_streams.py index dd3c01d9c4..2f8cbf9301 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_streams.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_streams.py @@ -1,6 +1,7 @@ from __future__ import annotations import re +from collections.abc import Callable import pytest from pytest_httpx import HTTPXMock @@ -21,31 +22,47 @@ def streams_base_url(async_client: AsyncCogniteClient) -> str: return async_client.data_modeling.streams._base_url_with_base_path + "/streams" +@pytest.fixture +def make_stream_response() -> Callable[..., dict]: + def _make(external_id: str = "st1", created_time: int = 10) -> dict: + return { + "externalId": external_id, + "createdTime": created_time, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + + return _make + + +@pytest.fixture +def stream_response(make_stream_response: Callable[..., dict]) -> dict: + return make_stream_response() + + +@pytest.fixture +def stream_list_response(stream_response: dict) -> dict: + return {"items": [stream_response]} + + class TestStreamsAPI: def test_list_parses_items( self, cognite_client: CogniteClient, httpx_mock: HTTPXMock, streams_base_url: str, + stream_list_response: dict, ) -> None: - sample = { - "items": [ - { - "externalId": "st1", - "createdTime": 10, - "createdFromTemplate": "ImmutableTestStream", - "type": "Immutable", - "settings": { - "lifecycle": {"retainedAfterSoftDelete": "P1D"}, - "limits": { - "maxRecordsTotal": {"provisioned": 1000.0}, - "maxGigaBytesTotal": {"provisioned": 1.0}, - }, - }, - } - ] - } - httpx_mock.add_response(method="GET", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) + httpx_mock.add_response( + method="GET", url=re.compile(re.escape(streams_base_url) + r"$"), json=stream_list_response + ) out = cognite_client.data_modeling.streams.list() assert isinstance(out, StreamList) assert out[0].external_id == "st1" @@ -55,24 +72,12 @@ def test_retrieve_include_statistics_query( cognite_client: CogniteClient, httpx_mock: HTTPXMock, streams_base_url: str, + stream_response: dict, ) -> None: - sample = { - "externalId": "st1", - "createdTime": 10, - "createdFromTemplate": "ImmutableTestStream", - "type": "Immutable", - "settings": { - "lifecycle": {"retainedAfterSoftDelete": "P1D"}, - "limits": { - "maxRecordsTotal": {"provisioned": 1000.0}, - "maxGigaBytesTotal": {"provisioned": 1.0}, - }, - }, - } httpx_mock.add_response( method="GET", url=re.compile(re.escape(streams_base_url) + r"/st1(?:\?.+)?$"), - json=sample, + json=stream_response, ) cognite_client.data_modeling.streams.retrieve("st1", include_statistics=True) requests = httpx_mock.get_requests() @@ -84,25 +89,11 @@ def test_create_posts_single_item( cognite_client: CogniteClient, httpx_mock: HTTPXMock, streams_base_url: str, + stream_list_response: dict, ) -> None: - sample = { - "items": [ - { - "externalId": "st1", - "createdTime": 10, - "createdFromTemplate": "ImmutableTestStream", - "type": "Immutable", - "settings": { - "lifecycle": {"retainedAfterSoftDelete": "P1D"}, - "limits": { - "maxRecordsTotal": {"provisioned": 1000.0}, - "maxGigaBytesTotal": {"provisioned": 1.0}, - }, - }, - } - ] - } - httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) + httpx_mock.add_response( + method="POST", url=re.compile(re.escape(streams_base_url) + r"$"), json=stream_list_response + ) w = StreamWrite( "st1", StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")), @@ -121,49 +112,11 @@ def test_create_chunks_multiple_items( cognite_client: CogniteClient, httpx_mock: HTTPXMock, streams_base_url: str, + make_stream_response: Callable[..., dict], ) -> None: - httpx_mock.add_response( - method="POST", - url=re.compile(re.escape(streams_base_url) + r"$"), - json={ - "items": [ - { - "externalId": "a", - "createdTime": 1, - "createdFromTemplate": "ImmutableTestStream", - "type": "Immutable", - "settings": { - "lifecycle": {"retainedAfterSoftDelete": "P1D"}, - "limits": { - "maxRecordsTotal": {"provisioned": 1000.0}, - "maxGigaBytesTotal": {"provisioned": 1.0}, - }, - }, - } - ] - }, - ) - httpx_mock.add_response( - method="POST", - url=re.compile(re.escape(streams_base_url) + r"$"), - json={ - "items": [ - { - "externalId": "b", - "createdTime": 2, - "createdFromTemplate": "ImmutableTestStream", - "type": "Immutable", - "settings": { - "lifecycle": {"retainedAfterSoftDelete": "P1D"}, - "limits": { - "maxRecordsTotal": {"provisioned": 1000.0}, - "maxGigaBytesTotal": {"provisioned": 1.0}, - }, - }, - } - ] - }, - ) + url_pattern = re.compile(re.escape(streams_base_url) + r"$") + httpx_mock.add_response(method="POST", url=url_pattern, json={"items": [make_stream_response("a", 1)]}) + httpx_mock.add_response(method="POST", url=url_pattern, json={"items": [make_stream_response("b", 2)]}) tpl = StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")) a = StreamWrite("a", tpl) b = StreamWrite("b", tpl) @@ -183,8 +136,9 @@ def test_delete_chunks_multiple_items( httpx_mock: HTTPXMock, streams_base_url: str, ) -> None: - httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"/delete$"), json={}) - httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"/delete$"), json={}) + url_pattern = re.compile(re.escape(streams_base_url) + r"/delete$") + httpx_mock.add_response(method="POST", url=url_pattern, json={}) + httpx_mock.add_response(method="POST", url=url_pattern, json={}) cognite_client.data_modeling.streams.delete(["a", "b"]) requests = httpx_mock.get_requests() assert len(requests) == 2 diff --git a/tests/tests_unit/test_data_classes/test_data_models/test_streams.py b/tests/tests_unit/test_data_classes/test_data_models/test_streams.py deleted file mode 100644 index 7d86fc4514..0000000000 --- a/tests/tests_unit/test_data_classes/test_data_models/test_streams.py +++ /dev/null @@ -1,88 +0,0 @@ -from __future__ import annotations - -from cognite.client.data_classes.data_modeling.streams import ( - Stream, - StreamList, - StreamTemplate, - StreamTemplateWriteSettings, - StreamWrite, -) - - -def test_stream_roundtrip() -> None: - raw = { - "externalId": "s1", - "createdTime": 1, - "createdFromTemplate": "ImmutableTestStream", - "type": "Immutable", - "settings": { - "lifecycle": {"retainedAfterSoftDelete": "P1D"}, - "limits": { - "maxRecordsTotal": {"provisioned": 1000.0}, - "maxGigaBytesTotal": {"provisioned": 1.0, "consumed": 0.5}, - }, - }, - } - s = Stream._load(raw) - back = s.dump(camel_case=True) - assert back["externalId"] == "s1" - assert back["settings"]["limits"]["maxRecordsTotal"]["provisioned"] == 1000.0 - - -def test_stream_list_load() -> None: - raw = { - "items": [ - { - "externalId": "s1", - "createdTime": 1, - "createdFromTemplate": "ImmutableTestStream", - "type": "Immutable", - "settings": { - "lifecycle": {"retainedAfterSoftDelete": "P1D"}, - "limits": { - "maxRecordsTotal": {"provisioned": 1000.0}, - "maxGigaBytesTotal": {"provisioned": 1.0}, - }, - }, - } - ] - } - lst = StreamList._load(raw["items"]) - assert len(lst) == 1 - assert lst[0].external_id == "s1" - - -def test_stream_write_dump() -> None: - w = StreamWrite( - "abc", - StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")), - ) - dumped = w.dump() - assert dumped["externalId"] == "abc" - assert dumped["settings"]["template"]["name"] == "ImmutableTestStream" - - -def test_stream_write_dump_dict_escape_hatch() -> None: - w = StreamWrite("abc", {"template": {"name": "ImmutableTestStream"}}) - dumped = w.dump() - assert dumped["externalId"] == "abc" - assert dumped["settings"]["template"]["name"] == "ImmutableTestStream" - - -def test_stream_write_load_roundtrip() -> None: - raw = { - "externalId": "x", - "settings": {"template": {"name": "ImmutableTestStream", "version": "1"}}, - } - w = StreamWrite._load(raw) - assert isinstance(w.settings, StreamTemplateWriteSettings) - assert w.settings.template.name == "ImmutableTestStream" - - -def test_stream_write_load_arbitrary_settings_dict() -> None: - raw = { - "externalId": "x", - "settings": {"template": {"name": "T"}, "extra": 1}, - } - w = StreamWrite._load(raw) - assert w.settings == {"template": {"name": "T"}, "extra": 1} From 1e6937ccc658fdceba7edd47dec210ce7e4538cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 7 May 2026 11:35:47 +0200 Subject: [PATCH 15/16] fix sphinx doc generation --- cognite/client/_cognite_client.py | 2 ++ docs/source/data_modeling.rst | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index f5ce8238ce..c2f6413b79 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -52,6 +52,7 @@ from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI + from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.datapoints import DatapointsAPI from cognite.client._api.datapoints_subscriptions import DatapointsSubscriptionAPI @@ -425,6 +426,7 @@ def _make_accessors_for_building_docs() -> None: AsyncCogniteClient.data_modeling.graphql = DataModelingGraphQLAPI # type: ignore AsyncCogniteClient.data_modeling.statistics = StatisticsAPI # type: ignore AsyncCogniteClient.data_modeling.statistics.spaces = SpaceStatisticsAPI # type: ignore + AsyncCogniteClient.data_modeling.streams = StreamsAPI # type: ignore AsyncCogniteClient.documents = DocumentsAPI # type: ignore AsyncCogniteClient.documents.previews = DocumentPreviewAPI # type: ignore AsyncCogniteClient.workflows = WorkflowAPI # type: ignore diff --git a/docs/source/data_modeling.rst b/docs/source/data_modeling.rst index 377ebf6ccb..af2bb93b23 100644 --- a/docs/source/data_modeling.rst +++ b/docs/source/data_modeling.rst @@ -286,6 +286,23 @@ Data modeling statistics data classes .. currentmodule:: cognite.client +Streams +------- +.. autosummary:: + :methods: + :toctree: generated/ + :template: custom-automethods-template.rst + + AsyncCogniteClient.data_modeling.streams + +Streams data classes +^^^^^^^^^^^^^^^^^^^^ +.. automodule:: cognite.client.data_classes.data_modeling.streams + :members: + :show-inheritance: + +.. currentmodule:: cognite.client + GraphQL ------- .. autosummary:: From 987b7d111d4b3011384b140fb3bda9b4a5d2f0d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 7 May 2026 11:46:15 +0200 Subject: [PATCH 16/16] ensure we run tests on docstrings --- tests/tests_unit/test_docstring_examples.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/tests_unit/test_docstring_examples.py b/tests/tests_unit/test_docstring_examples.py index 0beb410979..ebc313620f 100644 --- a/tests/tests_unit/test_docstring_examples.py +++ b/tests/tests_unit/test_docstring_examples.py @@ -29,7 +29,16 @@ units, workflows, ) -from cognite.client._api.data_modeling import containers, data_models, graphql, instances, spaces, statistics, views +from cognite.client._api.data_modeling import ( + containers, + data_models, + graphql, + instances, + spaces, + statistics, + streams, + views, +) from cognite.client._api.hosted_extractors import destinations, jobs, mappings, sources from cognite.client._api.postgres_gateway import tables as postgres_gateway_tables from cognite.client._api.postgres_gateway import users as postgres_gateway_users @@ -120,6 +129,7 @@ def test_data_modeling(self) -> None: run_docstring_tests(spaces) run_docstring_tests(graphql) run_docstring_tests(statistics) + run_docstring_tests(streams) def test_datapoint_subscriptions(self) -> None: run_docstring_tests(datapoints_subscriptions)