diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index ee60b4b281..32313a417e 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -9,6 +9,7 @@ from cognite.client._api.data_modeling.instances import InstancesAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api_client import APIClient @@ -27,6 +28,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.instances = InstancesAPI(config, api_version, cognite_client) self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) + self.streams = StreamsAPI(config, api_version, cognite_client) def _get_semaphore( self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"] diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py new file mode 100644 index 0000000000..b558de20e2 --- /dev/null +++ b/cognite/client/_api/data_modeling/streams.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamList, + StreamWrite, +) +from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._identifier import Identifier, IdentifierSequence +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + from cognite.client.config import ClientConfig + + +class StreamsAPI(APIClient): + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._CREATE_LIMIT = 1 + self._DELETE_LIMIT = 1 + self._warning = FeaturePreviewWarning( + api_maturity="General Availability", sdk_maturity="alpha", feature_name="Streams" + ) + + @overload + async def create(self, items: StreamWrite) -> Stream: ... + + @overload + async def create(self, items: Sequence[StreamWrite]) -> StreamList: ... + + async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """`Create streams `_. + + Args: + items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. + + Returns: + Stream | StreamList: The created stream or streams. + + Examples: + + Create a single stream from a template: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.streams import ( + ... StreamWrite, + ... StreamTemplate, + ... StreamTemplateWriteSettings, + ... ) + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.create( + ... StreamWrite( + ... external_id="my-stream", + ... settings=StreamTemplateWriteSettings( + ... template=StreamTemplate(name="ImmutableTestStream"), + ... ), + ... ) + ... ) + """ + self._warning.warn() + return await self._create_multiple( + items=items, + list_cls=StreamList, + resource_cls=Stream, + input_resource_cls=StreamWrite, + ) + + async def list(self) -> StreamList: + """`List streams `_. + + Note: + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + + Returns: + StreamList: The streams in the project. + + Examples: + + List all streams in the project: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.list() + """ + self._warning.warn() + return await self._list(method="GET", list_cls=StreamList, resource_cls=Stream) + + async def retrieve(self, external_id: str, include_statistics: bool | None = None) -> Stream | None: + """`Retrieve a stream `_. + + Args: + external_id (str): External ID of the stream to retrieve. + include_statistics (bool | None): When ``True``, usage statistics will be returned together + with stream settings. Computing statistics can be expensive. + + Returns: + Stream | None: The stream metadata (and optionally statistics), or ``None`` if not found. + + Examples: + + Retrieve a stream by external ID: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.retrieve("my-stream") + + Retrieve a stream with usage statistics: + + >>> res = client.data_modeling.streams.retrieve( + ... "my-stream", + ... include_statistics=True, + ... ) + """ + self._warning.warn() + return await self._retrieve( + cls=Stream, + identifier=Identifier(external_id), + params={"includeStatistics": include_statistics} if include_statistics is not None else None, + ) + + async def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """`Delete streams `_. + + Note: + Deletion is a soft delete that retains capacity for an extended period; + prefer deleting only when necessary. + + Args: + external_id (str | SequenceNotStr[str]): External ID or list of external IDs of + streams to delete. + + Examples: + + Delete a single stream: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> client.data_modeling.streams.delete("my-stream") + + Delete multiple streams: + + >>> client.data_modeling.streams.delete(["stream-a", "stream-b"]) + """ + self._warning.warn() + await self._delete_multiple( + identifiers=IdentifierSequence.load(external_ids=external_id), + wrap_ids=True, + ) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index f5ce8238ce..c2f6413b79 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -52,6 +52,7 @@ from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI + from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.datapoints import DatapointsAPI from cognite.client._api.datapoints_subscriptions import DatapointsSubscriptionAPI @@ -425,6 +426,7 @@ def _make_accessors_for_building_docs() -> None: AsyncCogniteClient.data_modeling.graphql = DataModelingGraphQLAPI # type: ignore AsyncCogniteClient.data_modeling.statistics = StatisticsAPI # type: ignore AsyncCogniteClient.data_modeling.statistics.spaces = SpaceStatisticsAPI # type: ignore + AsyncCogniteClient.data_modeling.streams = StreamsAPI # type: ignore AsyncCogniteClient.documents = DocumentsAPI # type: ignore AsyncCogniteClient.documents.previews = DocumentPreviewAPI # type: ignore AsyncCogniteClient.workflows = WorkflowAPI # type: ignore diff --git a/cognite/client/_sync_api/data_modeling/__init__.py b/cognite/client/_sync_api/data_modeling/__init__.py index 01fb3ea184..f0cbe9e923 100644 --- a/cognite/client/_sync_api/data_modeling/__init__.py +++ b/cognite/client/_sync_api/data_modeling/__init__.py @@ -1,14 +1,12 @@ """ =============================================================================== -c76b2b9351d2a5eee6a710fa9893bfa4 +584030bc5e2a4b8168f54c101f7f521d This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ from __future__ import annotations -from typing import TYPE_CHECKING - from cognite.client import AsyncCogniteClient from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI @@ -16,12 +14,10 @@ from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI +from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api_client import SyncAPIClient -if TYPE_CHECKING: - from cognite.client import AsyncCogniteClient - class SyncDataModelingAPI(SyncAPIClient): """Auto-generated, do not modify manually.""" @@ -35,3 +31,4 @@ def __init__(self, async_client: AsyncCogniteClient) -> None: self.instances = SyncInstancesAPI(async_client) self.graphql = SyncDataModelingGraphQLAPI(async_client) self.statistics = SyncStatisticsAPI(async_client) + self.streams = SyncStreamsAPI(async_client) diff --git a/cognite/client/_sync_api/data_modeling/streams.py b/cognite/client/_sync_api/data_modeling/streams.py new file mode 100644 index 0000000000..81cfb292a6 --- /dev/null +++ b/cognite/client/_sync_api/data_modeling/streams.py @@ -0,0 +1,149 @@ +""" +=============================================================================== +3bfd805fbceb341bb437635ac632d5ad +This file is auto-generated from the Async API modules, - do not edit manually! +=============================================================================== +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client import AsyncCogniteClient +from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite +from cognite.client.utils._async_helpers import run_sync +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + + +class SyncStreamsAPI(SyncAPIClient): + """Auto-generated, do not modify manually.""" + + def __init__(self, async_client: AsyncCogniteClient) -> None: + self.__async_client = async_client + + @overload + def create(self, items: StreamWrite) -> Stream: ... + + @overload + def create(self, items: Sequence[StreamWrite]) -> StreamList: ... + + def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """ + `Create streams `_. + + Args: + items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. + + Returns: + Stream | StreamList: The created stream or streams. + + Examples: + + Create a single stream from a template: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.streams import ( + ... StreamWrite, + ... StreamTemplate, + ... StreamTemplateWriteSettings, + ... ) + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.create( + ... StreamWrite( + ... external_id="my-stream", + ... settings=StreamTemplateWriteSettings( + ... template=StreamTemplate(name="ImmutableTestStream"), + ... ), + ... ) + ... ) + """ + return run_sync(self.__async_client.data_modeling.streams.create(items=items)) + + def list(self) -> StreamList: + """ + `List streams `_. + + Note: + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + + Returns: + StreamList: The streams in the project. + + Examples: + + List all streams in the project: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.list() + """ + return run_sync(self.__async_client.data_modeling.streams.list()) + + def retrieve(self, external_id: str, include_statistics: bool | None = None) -> Stream | None: + """ + `Retrieve a stream `_. + + Args: + external_id (str): External ID of the stream to retrieve. + include_statistics (bool | None): When ``True``, usage statistics will be returned together + with stream settings. Computing statistics can be expensive. + + Returns: + Stream | None: The stream metadata (and optionally statistics), or ``None`` if not found. + + Examples: + + Retrieve a stream by external ID: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> res = client.data_modeling.streams.retrieve("my-stream") + + Retrieve a stream with usage statistics: + + >>> res = client.data_modeling.streams.retrieve( + ... "my-stream", + ... include_statistics=True, + ... ) + """ + return run_sync( + self.__async_client.data_modeling.streams.retrieve( + external_id=external_id, include_statistics=include_statistics + ) + ) + + def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """ + `Delete streams `_. + + Note: + Deletion is a soft delete that retains capacity for an extended period; + prefer deleting only when necessary. + + Args: + external_id (str | SequenceNotStr[str]): External ID or list of external IDs of + streams to delete. + + Examples: + + Delete a single stream: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> # async_client = AsyncCogniteClient() # another option + >>> client.data_modeling.streams.delete("my-stream") + + Delete multiple streams: + + >>> client.data_modeling.streams.delete(["stream-a", "stream-b"]) + """ + return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id)) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index ba6ce16cab..0fab81400f 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -119,6 +119,17 @@ UnionAll, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamTemplate, + StreamTemplateWriteSettings, + StreamWrite, +) from cognite.client.data_classes.data_modeling.sync import SubscriptionContext from cognite.client.data_classes.data_modeling.views import ( ConnectionDefinition, @@ -243,6 +254,15 @@ "SpaceApply", "SpaceApplyList", "SpaceList", + "Stream", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamTemplate", + "StreamTemplateWriteSettings", + "StreamWrite", "SubscriptionContext", "Text", "TimeSeriesReference", diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py new file mode 100644 index 0000000000..a8d39018f5 --- /dev/null +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -0,0 +1,216 @@ +from __future__ import annotations + +from typing import Any, Literal + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, + WriteableCogniteResource, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + + +class StreamLimit(CogniteResource): + """Numeric limit bucket for a stream (provisioned / optionally consumed).""" + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + provisioned=resource["provisioned"], + consumed=resource.get("consumed"), + ) + + +class StreamLifecycleSettings(CogniteResource): + """Lifecycle metadata for a stream.""" + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + +class StreamLimitSettings(CogniteResource): + """Provisioned/consumed limits for a stream.""" + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"]), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"]), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "max_records_total": self.max_records_total.dump(camel_case=camel_case), + "max_giga_bytes_total": self.max_giga_bytes_total.dump(camel_case=camel_case), + } + if self.max_filtering_interval is not None: + out["max_filtering_interval"] = self.max_filtering_interval + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamSettings(CogniteResource): + """Read model for stream settings (lifecycle + limits).""" + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"]), + limits=StreamLimitSettings._load(resource["limits"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case=camel_case), + "limits": self.limits.dump(camel_case=camel_case), + } + + +class Stream(WriteableCogniteResource["StreamWrite"]): + """A stream instance returned from the streams API. + + This is the read version of :class:`StreamWrite`. + """ + + def __init__( + self, + external_id: str, + created_time: int, + created_from_template: str, + type: Literal["Immutable", "Mutable"], + settings: StreamSettings, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.created_from_template = created_from_template + self.type = type + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + created_from_template=resource["createdFromTemplate"], + type=resource["type"], + settings=StreamSettings._load(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "external_id": self.external_id, + "created_time": self.created_time, + "created_from_template": self.created_from_template, + "type": self.type, + "settings": self.settings.dump(camel_case=camel_case), + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + def as_write(self) -> StreamWrite: + return StreamWrite( + external_id=self.external_id, + settings=StreamTemplateWriteSettings(template=StreamTemplate(name=self.created_from_template)), + ) + + +class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): + _RESOURCE = Stream + + +class StreamTemplate(CogniteResource): + """Reference to a stream template by name. + + Args: + name (str): Name of the stream template to create the stream from. + """ + + def __init__(self, name: str) -> None: + self.name = name + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(name=resource["name"]) + + +class StreamTemplateWriteSettings(CogniteResource): + """Write-side settings that specify which template to create the stream from. + + Args: + template (StreamTemplate): The template to create the stream from. + """ + + def __init__(self, template: StreamTemplate) -> None: + self.template = template + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(template=StreamTemplate._load(resource["template"])) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"template": self.template.dump(camel_case=camel_case)} + + +class StreamWrite(WriteableCogniteResource["StreamWrite"]): + """Write representation of a stream, used when creating a new stream. + + This is the write version of :class:`Stream`. + + Args: + external_id (str): External ID of the stream, must be unique within the project. + settings (StreamTemplateWriteSettings): Settings specifying which template to create the stream from. + """ + + def __init__( + self, + external_id: str, + settings: StreamTemplateWriteSettings, + ) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + settings=StreamTemplateWriteSettings._load(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id, "settings": self.settings.dump(camel_case=camel_case)} + return convert_all_keys_to_camel_case(out) if camel_case else out + + def as_write(self) -> StreamWrite: + return self diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 981a0a7d55..90ed51e389 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -20,6 +20,7 @@ from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.data_sets import DataSetsAPI from cognite.client._api.datapoints import DatapointsAPI @@ -103,6 +104,7 @@ from cognite.client._sync_api.data_modeling.space_statistics import SyncSpaceStatisticsAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI +from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api.data_sets import SyncDataSetsAPI from cognite.client._sync_api.datapoints import SyncDatapointsAPI @@ -224,6 +226,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: dm_views = create_autospec(ViewsAPI, instance=True, spec_set=True) dm_instances = create_autospec(InstancesAPI, instance=True, spec_set=True) dm_graphql = create_autospec(DataModelingGraphQLAPI, instance=True, spec_set=True) + dm_streams = create_autospec(StreamsAPI, instance=True, spec_set=True) self.data_modeling = create_autospec( DataModelingAPI, instance=True, @@ -234,6 +237,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: instances=dm_instances, graphql=dm_graphql, statistics=dm_statistics, + streams=dm_streams, ) flip_spec_set_on(self.data_modeling, dm_statistics) @@ -423,6 +427,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: dm_views = create_autospec(SyncViewsAPI, instance=True, spec_set=True) dm_instances = create_autospec(SyncInstancesAPI, instance=True, spec_set=True) dm_graphql = create_autospec(SyncDataModelingGraphQLAPI, instance=True, spec_set=True) + dm_streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) self.data_modeling = create_autospec( SyncDataModelingAPI, instance=True, @@ -433,6 +438,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: instances=dm_instances, graphql=dm_graphql, statistics=dm_statistics, + streams=dm_streams, ) flip_spec_set_on(self.data_modeling, dm_statistics) diff --git a/cognite/client/utils/_url.py b/cognite/client/utils/_url.py index 3b0eb1f5c8..567a752959 100644 --- a/cognite/client/utils/_url.py +++ b/cognite/client/utils/_url.py @@ -34,6 +34,7 @@ "raw/dbs/[^/]+/tables$", "relationships", "sequences", + "streams", "simulators", "simulators/models", "simulators/models/revisions", diff --git a/docs/source/data_modeling.rst b/docs/source/data_modeling.rst index 377ebf6ccb..af2bb93b23 100644 --- a/docs/source/data_modeling.rst +++ b/docs/source/data_modeling.rst @@ -286,6 +286,23 @@ Data modeling statistics data classes .. currentmodule:: cognite.client +Streams +------- +.. autosummary:: + :methods: + :toctree: generated/ + :template: custom-automethods-template.rst + + AsyncCogniteClient.data_modeling.streams + +Streams data classes +^^^^^^^^^^^^^^^^^^^^ +.. automodule:: cognite.client.data_classes.data_modeling.streams + :members: + :show-inheritance: + +.. currentmodule:: cognite.client + GraphQL ------- .. autosummary:: diff --git a/tests/tests_unit/test_api/test_data_modeling/test_streams.py b/tests/tests_unit/test_api/test_data_modeling/test_streams.py new file mode 100644 index 0000000000..4228cfe4dc --- /dev/null +++ b/tests/tests_unit/test_api/test_data_modeling/test_streams.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +import re +from collections.abc import Callable + +import pytest +from pytest_httpx import HTTPXMock + +from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamList, + StreamTemplate, + StreamTemplateWriteSettings, + StreamWrite, +) +from tests.utils import jsgz_load + + +@pytest.fixture +def streams_base_url(async_client: AsyncCogniteClient) -> str: + return async_client.data_modeling.streams._base_url_with_base_path + "/streams" + + +@pytest.fixture +def make_stream_response() -> Callable[..., dict]: + def _make(external_id: str = "st1", created_time: int = 10) -> dict: + return { + "externalId": external_id, + "createdTime": created_time, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + + return _make + + +@pytest.fixture +def stream_response(make_stream_response: Callable[..., dict]) -> dict: + return make_stream_response() + + +@pytest.fixture +def stream_list_response(stream_response: dict) -> dict: + return {"items": [stream_response]} + + +class TestStreamsAPI: + def test_list_parses_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + stream_list_response: dict, + ) -> None: + httpx_mock.add_response( + method="GET", url=re.compile(re.escape(streams_base_url) + r"(?:\?.+)?$"), json=stream_list_response + ) + out = cognite_client.data_modeling.streams.list() + assert isinstance(out, StreamList) + assert out[0].external_id == "st1" + + def test_retrieve_include_statistics_query( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + stream_response: dict, + ) -> None: + httpx_mock.add_response( + method="GET", + url=re.compile(re.escape(streams_base_url) + r"/st1(?:\?.+)?$"), + json=stream_response, + ) + cognite_client.data_modeling.streams.retrieve("st1", include_statistics=True) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + assert requests[0].url.params["includeStatistics"].lower() == "true" + + def test_create_posts_single_item( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + stream_list_response: dict, + ) -> None: + httpx_mock.add_response( + method="POST", url=re.compile(re.escape(streams_base_url) + r"$"), json=stream_list_response + ) + w = StreamWrite( + "st1", + StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")), + ) + out = cognite_client.data_modeling.streams.create(w) + requests = httpx_mock.get_requests() + assert isinstance(out, Stream) + assert len(requests) == 1 + assert requests[0].url.path.endswith("/streams") + assert jsgz_load(requests[0].content) == { + "items": [{"externalId": "st1", "settings": {"template": {"name": "ImmutableTestStream"}}}] + } + + def test_create_chunks_multiple_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + make_stream_response: Callable[..., dict], + ) -> None: + url_pattern = re.compile(re.escape(streams_base_url) + r"$") + httpx_mock.add_response(method="POST", url=url_pattern, json={"items": [make_stream_response("a", 1)]}) + httpx_mock.add_response(method="POST", url=url_pattern, json={"items": [make_stream_response("b", 2)]}) + tpl = StreamTemplateWriteSettings(StreamTemplate("ImmutableTestStream")) + a = StreamWrite("a", tpl) + b = StreamWrite("b", tpl) + out = cognite_client.data_modeling.streams.create([a, b]) + requests = httpx_mock.get_requests() + assert isinstance(out, StreamList) + assert [stream.external_id for stream in out] == ["a", "b"] + assert len(requests) == 2 + assert [jsgz_load(request.content) for request in requests] == [ + {"items": [{"externalId": "a", "settings": {"template": {"name": "ImmutableTestStream"}}}]}, + {"items": [{"externalId": "b", "settings": {"template": {"name": "ImmutableTestStream"}}}]}, + ] + + def test_delete_chunks_multiple_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + url_pattern = re.compile(re.escape(streams_base_url) + r"/delete$") + httpx_mock.add_response(method="POST", url=url_pattern, json={}) + httpx_mock.add_response(method="POST", url=url_pattern, json={}) + cognite_client.data_modeling.streams.delete(["a", "b"]) + requests = httpx_mock.get_requests() + assert len(requests) == 2 + assert [jsgz_load(request.content) for request in requests] == [ + {"items": [{"externalId": "a"}]}, + {"items": [{"externalId": "b"}]}, + ] diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index d707a29feb..0ddd5354d8 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1838,6 +1838,9 @@ async def test_is_retryable_resource_api_endpoints(self, method: str, path: str, ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values", True), ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/streams.streams", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/list", True), + # Streams API + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), ] ), ) diff --git a/tests/tests_unit/test_docstring_examples.py b/tests/tests_unit/test_docstring_examples.py index 0beb410979..ebc313620f 100644 --- a/tests/tests_unit/test_docstring_examples.py +++ b/tests/tests_unit/test_docstring_examples.py @@ -29,7 +29,16 @@ units, workflows, ) -from cognite.client._api.data_modeling import containers, data_models, graphql, instances, spaces, statistics, views +from cognite.client._api.data_modeling import ( + containers, + data_models, + graphql, + instances, + spaces, + statistics, + streams, + views, +) from cognite.client._api.hosted_extractors import destinations, jobs, mappings, sources from cognite.client._api.postgres_gateway import tables as postgres_gateway_tables from cognite.client._api.postgres_gateway import users as postgres_gateway_users @@ -120,6 +129,7 @@ def test_data_modeling(self) -> None: run_docstring_tests(spaces) run_docstring_tests(graphql) run_docstring_tests(statistics) + run_docstring_tests(streams) def test_datapoint_subscriptions(self) -> None: run_docstring_tests(datapoints_subscriptions)