diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index ee60b4b281..32313a417e 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -9,6 +9,7 @@ from cognite.client._api.data_modeling.instances import InstancesAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api_client import APIClient @@ -27,6 +28,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.instances = InstancesAPI(config, api_version, cognite_client) self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) + self.streams = StreamsAPI(config, api_version, cognite_client) def _get_semaphore( self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"] diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py new file mode 100644 index 0000000000..cded2cfa86 --- /dev/null +++ b/cognite/client/_api/data_modeling/records.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.data_modeling.streams import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) +from cognite.client.utils._url import interpolate_and_url_encode + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + from cognite.client.config import ClientConfig + + +class StreamsRecordsAPI(APIClient): + """ILA record operations under ``/streams/{streamId}/records/...``.""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + + def _records_base(self, stream_external_id: str) -> str: + return interpolate_and_url_encode("/streams/{}/records", stream_external_id) + + async def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """`Ingest records `_ into a stream.""" + res = await self._post( + self._records_base(stream_external_id), json=body, semaphore=self._get_semaphore("write") + ) + return RecordsIngestResponse._load(res.json()) + + async def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """`Upsert records `_ in a mutable stream.""" + res = await self._post( + self._records_base(stream_external_id) + "/upsert", json=body, semaphore=self._get_semaphore("write") + ) + return RecordsIngestResponse._load(res.json()) + + async def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: + """`Delete records `_ from a mutable stream.""" + res = await self._post( + self._records_base(stream_external_id) + "/delete", json=body, semaphore=self._get_semaphore("write") + ) + return RecordsDeleteResponse._load(res.json()) + + async def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: + """`Filter records `_.""" + res = await self._post( + self._records_base(stream_external_id) + "/filter", json=body, semaphore=self._get_semaphore("read") + ) + return RecordsFilterResponse._load(res.json()) + + async def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: + """`Aggregate over records `_.""" + res = await self._post( + self._records_base(stream_external_id) + "/aggregate", json=body, semaphore=self._get_semaphore("read") + ) + return RecordsAggregateResponse._load(res.json()) + + async def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: + """`Sync records `_ (cursor-based read).""" + res = await self._post( + self._records_base(stream_external_id) + "/sync", json=body, semaphore=self._get_semaphore("read") + ) + return RecordsSyncResponse._load(res.json()) + + async def ingest_items( + self, + stream_external_id: str, + items: Sequence[Mapping[str, Any]], + ) -> RecordsIngestResponse: + """Ingest records using the ``items`` array shape (1-1000 records per request). + + Each element must match the API ``recordItems`` object (``space``, ``externalId``, ``sources``, ...). + This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``. + """ + if not items: + raise ValueError("ingest_items requires at least one record (API allows 1-1000 items per call).") + return await self.ingest(stream_external_id, {"items": [dict(i) for i in items]}) + + async def upsert_items( + self, + stream_external_id: str, + items: Sequence[Mapping[str, Any]], + ) -> RecordsIngestResponse: + """Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`.""" + if not items: + raise ValueError("upsert_items requires at least one record (API allows 1-1000 items per call).") + return await self.upsert(stream_external_id, {"items": [dict(i) for i in items]}) + + async def delete_items( + self, + stream_external_id: str, + items: Sequence[Mapping[str, Any]], + ) -> RecordsDeleteResponse: + """Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`.""" + if not items: + raise ValueError("delete_items requires at least one record identifier.") + return await self.delete(stream_external_id, {"items": [dict(i) for i in items]}) diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py new file mode 100644 index 0000000000..a1b19fe7f4 --- /dev/null +++ b/cognite/client/_api/data_modeling/streams.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client._api.data_modeling.records import StreamsRecordsAPI +from cognite.client._api_client import APIClient +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamList, + StreamWrite, +) +from cognite.client.utils._identifier import IdentifierSequence +from cognite.client.utils._url import interpolate_and_url_encode +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + from cognite.client.config import ClientConfig + + +class StreamsAPI(APIClient): + """Streams API (``/streams``) with nested Records API (``/streams/{id}/records``).""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._CREATE_LIMIT = 1 + self._DELETE_LIMIT = 1 + self.records = StreamsRecordsAPI(config, api_version, cognite_client) + + @overload + async def create(self, items: StreamWrite) -> Stream: ... + + @overload + async def create(self, items: Sequence[StreamWrite]) -> StreamList: ... + + async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """`Create streams `_. + + Args: + items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. + + Returns: + Stream | StreamList: The created stream or streams. + """ + return await self._create_multiple( + items=items, + list_cls=StreamList, + resource_cls=Stream, + input_resource_cls=StreamWrite, + ) + + async def list(self) -> StreamList: + """`List streams `_ in the project. + + Note: + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + + Returns: + StreamList: The streams in the project. + """ + res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read")) + return StreamList._load(res.json()["items"]) + + async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """`Retrieve a stream `_. + + Args: + stream_external_id (str): Stream external id. + include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing + statistics can be expensive. + + Returns: + Stream: The stream metadata (and optionally statistics). + """ + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + params: dict[str, bool] | None = None + if include_statistics is not None: + params = {"includeStatistics": include_statistics} + res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read")) + return Stream._load(res.json()) + + async def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """`Delete streams `_. + + Deletion is a soft delete that retains capacity for an extended period; prefer deleting only + when necessary. + + Args: + external_id (str | SequenceNotStr[str]): External ID or list of external IDs. + """ + await self._delete_multiple( + identifiers=IdentifierSequence.load(external_ids=external_id), + wrap_ids=True, + ) diff --git a/cognite/client/_sync_api/data_modeling/__init__.py b/cognite/client/_sync_api/data_modeling/__init__.py index 01fb3ea184..f0cbe9e923 100644 --- a/cognite/client/_sync_api/data_modeling/__init__.py +++ b/cognite/client/_sync_api/data_modeling/__init__.py @@ -1,14 +1,12 @@ """ =============================================================================== -c76b2b9351d2a5eee6a710fa9893bfa4 +584030bc5e2a4b8168f54c101f7f521d This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ from __future__ import annotations -from typing import TYPE_CHECKING - from cognite.client import AsyncCogniteClient from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI @@ -16,12 +14,10 @@ from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI +from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api_client import SyncAPIClient -if TYPE_CHECKING: - from cognite.client import AsyncCogniteClient - class SyncDataModelingAPI(SyncAPIClient): """Auto-generated, do not modify manually.""" @@ -35,3 +31,4 @@ def __init__(self, async_client: AsyncCogniteClient) -> None: self.instances = SyncInstancesAPI(async_client) self.graphql = SyncDataModelingGraphQLAPI(async_client) self.statistics = SyncStatisticsAPI(async_client) + self.streams = SyncStreamsAPI(async_client) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py new file mode 100644 index 0000000000..4d904e3929 --- /dev/null +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -0,0 +1,115 @@ +""" +=============================================================================== +4d6222c6b1392e5a4e25754a02837f8d +This file is auto-generated from the Async API modules, - do not edit manually! +=============================================================================== +""" + +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any + +from cognite.client import AsyncCogniteClient +from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.data_classes.data_modeling.streams import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) +from cognite.client.utils._async_helpers import run_sync + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + + +class SyncStreamsRecordsAPI(SyncAPIClient): + """Auto-generated, do not modify manually.""" + + def __init__(self, async_client: AsyncCogniteClient) -> None: + self.__async_client = async_client + + def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """ + `Ingest records `_ into a stream. + """ + return run_sync( + self.__async_client.data_modeling.streams.records.ingest(stream_external_id=stream_external_id, body=body) + ) + + def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """ + `Upsert records `_ in a mutable stream. + """ + return run_sync( + self.__async_client.data_modeling.streams.records.upsert(stream_external_id=stream_external_id, body=body) + ) + + def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: + """ + `Delete records `_ from a mutable stream. + """ + return run_sync( + self.__async_client.data_modeling.streams.records.delete(stream_external_id=stream_external_id, body=body) + ) + + def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: + """ + `Filter records `_. + """ + return run_sync( + self.__async_client.data_modeling.streams.records.filter(stream_external_id=stream_external_id, body=body) + ) + + def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: + """ + `Aggregate over records `_. + """ + return run_sync( + self.__async_client.data_modeling.streams.records.aggregate( + stream_external_id=stream_external_id, body=body + ) + ) + + def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: + """ + `Sync records `_ (cursor-based read). + """ + return run_sync( + self.__async_client.data_modeling.streams.records.sync(stream_external_id=stream_external_id, body=body) + ) + + def ingest_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse: + """ + Ingest records using the ``items`` array shape (1-1000 records per request). + + Each element must match the API ``recordItems`` object (``space``, ``externalId``, ``sources``, ...). + This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``. + """ + return run_sync( + self.__async_client.data_modeling.streams.records.ingest_items( + stream_external_id=stream_external_id, items=items + ) + ) + + def upsert_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse: + """ + Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`. + """ + return run_sync( + self.__async_client.data_modeling.streams.records.upsert_items( + stream_external_id=stream_external_id, items=items + ) + ) + + def delete_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsDeleteResponse: + """ + Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`. + """ + return run_sync( + self.__async_client.data_modeling.streams.records.delete_items( + stream_external_id=stream_external_id, items=items + ) + ) diff --git a/cognite/client/_sync_api/data_modeling/streams.py b/cognite/client/_sync_api/data_modeling/streams.py new file mode 100644 index 0000000000..65ba3ce9f2 --- /dev/null +++ b/cognite/client/_sync_api/data_modeling/streams.py @@ -0,0 +1,90 @@ +""" +=============================================================================== +60ce0afb3ca022c6b4fc479885bc5668 +This file is auto-generated from the Async API modules, - do not edit manually! +=============================================================================== +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client import AsyncCogniteClient +from cognite.client._sync_api.data_modeling.records import SyncStreamsRecordsAPI +from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite +from cognite.client.utils._async_helpers import run_sync +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + + +class SyncStreamsAPI(SyncAPIClient): + """Auto-generated, do not modify manually.""" + + def __init__(self, async_client: AsyncCogniteClient) -> None: + self.__async_client = async_client + self.records = SyncStreamsRecordsAPI(async_client) + + @overload + def create(self, items: StreamWrite) -> Stream: ... + + @overload + def create(self, items: Sequence[StreamWrite]) -> StreamList: ... + + def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """ + `Create streams `_. + + Args: + items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. + + Returns: + Stream | StreamList: The created stream or streams. + """ + return run_sync(self.__async_client.data_modeling.streams.create(items=items)) + + def list(self) -> StreamList: + """ + `List streams `_ in the project. + + Note: + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + + Returns: + StreamList: The streams in the project. + """ + return run_sync(self.__async_client.data_modeling.streams.list()) + + def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """ + `Retrieve a stream `_. + + Args: + stream_external_id (str): Stream external id. + include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing + statistics can be expensive. + + Returns: + Stream: The stream metadata (and optionally statistics). + """ + return run_sync( + self.__async_client.data_modeling.streams.retrieve( + stream_external_id=stream_external_id, include_statistics=include_statistics + ) + ) + + def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """ + `Delete streams `_. + + Deletion is a soft delete that retains capacity for an extended period; prefer deleting only + when necessary. + + Args: + external_id (str | SequenceNotStr[str]): External ID or list of external IDs. + """ + return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id)) diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 9d4fab6aad..1e398d3ae8 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -206,6 +206,25 @@ GeometryFilter, TimestampRange, ) +from cognite.client.data_classes.streams import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + Stream, + StreamDeleteItem, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, + SyncRecord, + SyncRecordList, +) from cognite.client.data_classes.three_d import ( BoundingBox3D, RevisionCameraProperties, @@ -452,6 +471,13 @@ "LimitList", "OidcCredentials", "RawTable", + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", "Relationship", "RelationshipFilter", "RelationshipList", @@ -487,7 +513,17 @@ "SimulationTaskParameters", "SourceFile", "StatusCode", + "Stream", + "StreamDeleteItem", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", "SubworkflowTaskParameters", + "SyncRecord", + "SyncRecordList", "SyntheticDatapoints", "SyntheticDatapointsList", "Table", diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index daf9abb86e..4e44106c58 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -114,6 +114,21 @@ UnionAll, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList +from cognite.client.data_classes.data_modeling.streams import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + Stream, + StreamList, + StreamTemplateWriteSettings, + StreamWrite, + SyncRecord, + SyncRecordList, +) from cognite.client.data_classes.data_modeling.sync import SubscriptionContext from cognite.client.data_classes.data_modeling.views import ( ConnectionDefinition, @@ -221,6 +236,13 @@ "PropertyType", "Query", "QueryResult", + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", "RequiresConstraint", "RequiresConstraintApply", "ResultSetExpression", @@ -233,7 +255,13 @@ "SpaceApply", "SpaceApplyList", "SpaceList", + "Stream", + "StreamList", + "StreamTemplateWriteSettings", + "StreamWrite", "SubscriptionContext", + "SyncRecord", + "SyncRecordList", "Text", "TimeSeriesReference", "Timestamp", diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py new file mode 100644 index 0000000000..97ad0cb0c1 --- /dev/null +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -0,0 +1,421 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, + WriteableCogniteResource, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + + +class StreamLimit(CogniteResource): + """Numeric limit bucket for a stream (provisioned / optionally consumed).""" + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + provisioned=resource["provisioned"], + consumed=resource.get("consumed"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"provisioned": self.provisioned} + if self.consumed is not None: + out["consumed"] = self.consumed + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLifecycleSettings(CogniteResource): + """Lifecycle metadata for a stream.""" + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"retained_after_soft_delete": self.retained_after_soft_delete} + if self.data_deleted_after is not None: + out["data_deleted_after"] = self.data_deleted_after + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLimitSettings(CogniteResource): + """Provisioned/consumed limits for a stream.""" + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"]), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"]), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "max_records_total": self.max_records_total.dump(camel_case=camel_case), + "max_giga_bytes_total": self.max_giga_bytes_total.dump(camel_case=camel_case), + } + if self.max_filtering_interval is not None: + out["max_filtering_interval"] = self.max_filtering_interval + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamSettings(CogniteResource): + """Read model for stream settings (lifecycle + limits).""" + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"]), + limits=StreamLimitSettings._load(resource["limits"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case=camel_case), + "limits": self.limits.dump(camel_case=camel_case), + } + + +class Stream(WriteableCogniteResource["StreamWrite"]): + """A stream.""" + + def __init__( + self, + external_id: str, + created_time: int, + created_from_template: str, + type: str, + settings: StreamSettings, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.created_from_template = created_from_template + self.type = type + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + created_from_template=resource["createdFromTemplate"], + type=resource["type"], + settings=StreamSettings._load(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "external_id": self.external_id, + "created_time": self.created_time, + "created_from_template": self.created_from_template, + "type": self.type, + "settings": self.settings.dump(camel_case=camel_case), + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + def as_write(self) -> StreamWrite: + """Returns a write version.""" + return StreamWrite( + external_id=self.external_id, + settings=StreamTemplateWriteSettings(template=StreamTemplate(name=self.created_from_template)), + ) + + +class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): + """List of streams.""" + + _RESOURCE = Stream + + +class StreamTemplate(CogniteResource): + """Reference to a stream template.""" + + def __init__(self, name: str) -> None: + self.name = name + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(name=resource["name"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"name": self.name} + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamTemplateWriteSettings(CogniteResource): + """Write-side settings for creating a stream from a template.""" + + def __init__(self, template: StreamTemplate) -> None: + self.template = template + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(template=StreamTemplate._load(resource["template"])) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"template": self.template.dump(camel_case=camel_case)} + + +def _parse_stream_write_settings(raw: dict[str, Any]) -> StreamTemplateWriteSettings | dict[str, Any]: + if set(raw.keys()) == {"template"} and isinstance(raw["template"], dict) and "name" in raw["template"]: + return StreamTemplateWriteSettings._load(raw) + return raw + + +class StreamWrite(WriteableCogniteResource["StreamWrite"]): + """Request item for creating a stream.""" + + def __init__( + self, + external_id: str, + settings: StreamTemplateWriteSettings | dict[str, Any], + ) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + settings=_parse_stream_write_settings(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + if isinstance(self.settings, CogniteResource): + settings_dumped = self.settings.dump(camel_case=camel_case) + else: + settings_dumped = self.settings + out = {"external_id": self.external_id, "settings": settings_dumped} + return convert_all_keys_to_camel_case(out) if camel_case else out + + def as_write(self) -> StreamWrite: + return self + + +class Record(CogniteResource): + """A record returned from filter.""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, Any], + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties", {}), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "properties": self.properties, + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordList(CogniteResourceList[Record], ExternalIDTransformerMixin): + _RESOURCE = Record + + +class SyncRecord(CogniteResource): + """Record entry from sync.""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: str, + properties: dict[str, Any] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.status = status + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "status": self.status, + } + if self.properties is not None: + out["properties"] = self.properties + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class SyncRecordList(CogniteResourceList[SyncRecord], ExternalIDTransformerMixin): + _RESOURCE = SyncRecord + + +class RecordsFilterResponse(CogniteResource): + """Records filter response.""" + + def __init__(self, items: RecordList, typing: dict[str, Any] | None = None) -> None: + self.items = items + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + items = RecordList._load(resource.get("items", [])) + return cls(items=items, typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"items": self.items.dump(camel_case=camel_case)} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsSyncResponse(CogniteResource): + """Records sync response.""" + + def __init__( + self, + items: SyncRecordList, + next_cursor: str, + has_next: bool, + typing: dict[str, Any] | None = None, + ) -> None: + self.items = items + self.next_cursor = next_cursor + self.has_next = has_next + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + items = SyncRecordList._load(resource.get("items", [])) + return cls( + items=items, + next_cursor=resource["nextCursor"], + has_next=resource["hasNext"], + typing=resource.get("typing"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "items": self.items.dump(camel_case=camel_case), + "next_cursor": self.next_cursor, + "has_next": self.has_next, + } + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsAggregateResponse(CogniteResource): + """Records aggregate response.""" + + def __init__(self, aggregates: dict[str, Any], typing: dict[str, Any] | None = None) -> None: + self.aggregates = aggregates + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(aggregates=resource.get("aggregates", {}), typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"aggregates": self.aggregates} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsDeleteResponse(CogniteResource): + """Records delete response.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + +class RecordsIngestResponse(CogniteResource): + """Records ingest response.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + @property + def is_empty_success(self) -> bool: + return self._data == {} diff --git a/cognite/client/data_classes/streams/__init__.py b/cognite/client/data_classes/streams/__init__.py new file mode 100644 index 0000000000..ed98581cc9 --- /dev/null +++ b/cognite/client/data_classes/streams/__init__.py @@ -0,0 +1,41 @@ +from cognite.client.data_classes.streams.stream import ( + Stream, + StreamDeleteItem, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, +) +from cognite.client.data_classes.streams.stream_record import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + SyncRecord, + SyncRecordList, +) + +__all__ = [ + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", + "Stream", + "StreamDeleteItem", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", + "SyncRecord", + "SyncRecordList", +] diff --git a/cognite/client/data_classes/streams/stream.py b/cognite/client/data_classes/streams/stream.py new file mode 100644 index 0000000000..633b25d507 --- /dev/null +++ b/cognite/client/data_classes/streams/stream.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + + +class StreamLimit(CogniteResource): + """Numeric limit bucket for a stream (provisioned / optionally consumed).""" + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + provisioned=resource["provisioned"], + consumed=resource.get("consumed"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"provisioned": self.provisioned} + if self.consumed is not None: + out["consumed"] = self.consumed + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLifecycleSettings(CogniteResource): + """Lifecycle metadata for a stream.""" + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"retained_after_soft_delete": self.retained_after_soft_delete} + if self.data_deleted_after is not None: + out["data_deleted_after"] = self.data_deleted_after + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLimitSettings(CogniteResource): + """Provisioned/consumed limits for a stream.""" + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"]), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"]), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "max_records_total": self.max_records_total.dump(camel_case=camel_case), + "max_giga_bytes_total": self.max_giga_bytes_total.dump(camel_case=camel_case), + } + if self.max_filtering_interval is not None: + out["max_filtering_interval"] = self.max_filtering_interval + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamSettings(CogniteResource): + """Read model for stream settings (lifecycle + limits).""" + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"]), + limits=StreamLimitSettings._load(resource["limits"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case=camel_case), + "limits": self.limits.dump(camel_case=camel_case), + } + + +class Stream(CogniteResource): + """A stream.""" + + def __init__( + self, + external_id: str, + created_time: int, + created_from_template: str, + type: str, + settings: StreamSettings, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.created_from_template = created_from_template + self.type = type + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + created_from_template=resource["createdFromTemplate"], + type=resource["type"], + settings=StreamSettings._load(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "external_id": self.external_id, + "created_time": self.created_time, + "created_from_template": self.created_from_template, + "type": self.type, + "settings": self.settings.dump(camel_case=camel_case), + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): + """List of streams.""" + + _RESOURCE = Stream + + +class StreamWrite(CogniteResource): + """Request item for creating a stream.""" + + def __init__(self, external_id: str, settings: dict[str, Any]) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(external_id=resource["externalId"], settings=resource["settings"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id, "settings": self.settings} + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamDeleteItem(CogniteResource): + """Identifier for ``POST /streams/delete``.""" + + def __init__(self, external_id: str) -> None: + self.external_id = external_id + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(external_id=resource["externalId"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id} + return convert_all_keys_to_camel_case(out) if camel_case else out diff --git a/cognite/client/data_classes/streams/stream_record.py b/cognite/client/data_classes/streams/stream_record.py new file mode 100644 index 0000000000..0c0e01fe1b --- /dev/null +++ b/cognite/client/data_classes/streams/stream_record.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + + +class Record(CogniteResource): + """A record returned from filter.""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, Any], + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties", {}), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "properties": self.properties, + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordList(CogniteResourceList[Record], ExternalIDTransformerMixin): + _RESOURCE = Record + + +class SyncRecord(CogniteResource): + """Record entry from sync.""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: str, + properties: dict[str, Any] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.status = status + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "status": self.status, + } + if self.properties is not None: + out["properties"] = self.properties + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class SyncRecordList(CogniteResourceList[SyncRecord], ExternalIDTransformerMixin): + _RESOURCE = SyncRecord + + +class RecordsFilterResponse(CogniteResource): + """Records filter response.""" + + def __init__(self, items: RecordList, typing: dict[str, Any] | None = None) -> None: + self.items = items + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + items = RecordList._load(resource.get("items", [])) + return cls(items=items, typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"items": self.items.dump(camel_case=camel_case)} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsSyncResponse(CogniteResource): + """Records sync response.""" + + def __init__( + self, + items: SyncRecordList, + next_cursor: str, + has_next: bool, + typing: dict[str, Any] | None = None, + ) -> None: + self.items = items + self.next_cursor = next_cursor + self.has_next = has_next + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + items = SyncRecordList._load(resource.get("items", [])) + return cls( + items=items, + next_cursor=resource["nextCursor"], + has_next=resource["hasNext"], + typing=resource.get("typing"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "items": self.items.dump(camel_case=camel_case), + "next_cursor": self.next_cursor, + "has_next": self.has_next, + } + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsAggregateResponse(CogniteResource): + """Records aggregate response.""" + + def __init__(self, aggregates: dict[str, Any], typing: dict[str, Any] | None = None) -> None: + self.aggregates = aggregates + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(aggregates=resource.get("aggregates", {}), typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"aggregates": self.aggregates} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsDeleteResponse(CogniteResource): + """Records delete response.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + +class RecordsIngestResponse(CogniteResource): + """Records ingest response.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + @property + def is_empty_success(self) -> bool: + return self._data == {} diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 981a0a7d55..949321f96b 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -17,9 +17,11 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI +from cognite.client._api.data_modeling.records import StreamsRecordsAPI from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.data_sets import DataSetsAPI from cognite.client._api.datapoints import DatapointsAPI @@ -100,9 +102,11 @@ from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI +from cognite.client._sync_api.data_modeling.records import SyncStreamsRecordsAPI from cognite.client._sync_api.data_modeling.space_statistics import SyncSpaceStatisticsAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI +from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api.data_sets import SyncDataSetsAPI from cognite.client._sync_api.datapoints import SyncDatapointsAPI @@ -309,6 +313,10 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models, sim_routines) + streams_records = create_autospec(StreamsRecordsAPI, instance=True, spec_set=True) + self.streams = create_autospec(StreamsAPI, instance=True, spec_set=True) + object.__setattr__(self.streams, "records", streams_records) + sequences_data = create_autospec(SequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SequencesAPI, instance=True, data=sequences_data) flip_spec_set_on(self.sequences) @@ -508,6 +516,10 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models) + sync_streams_records = create_autospec(SyncStreamsRecordsAPI, instance=True, spec_set=True) + self.streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) + object.__setattr__(self.streams, "records", sync_streams_records) + sequences_data = create_autospec(SyncSequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SyncSequencesAPI, instance=True, data=sequences_data) flip_spec_set_on(self.sequences) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 4434ea1774..4ab2f4b3dd 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -533,7 +533,7 @@ def _patch_loop_for_jupyter(self, loop: asyncio.AbstractEventLoop | None) -> asy if loop is None: try: - import nest_asyncio # type: ignore [import-not-found] + import nest_asyncio # type: ignore [import-untyped] except ImportError: raise CogniteImportError( module="nest_asyncio", diff --git a/cognite/client/utils/_url.py b/cognite/client/utils/_url.py index 3b0eb1f5c8..d5dc61894f 100644 --- a/cognite/client/utils/_url.py +++ b/cognite/client/utils/_url.py @@ -34,6 +34,7 @@ "raw/dbs/[^/]+/tables$", "relationships", "sequences", + "streams", "simulators", "simulators/models", "simulators/models/revisions", @@ -58,6 +59,10 @@ "transformations/cancel", "transformations/notifications", "transformations/run", + # ILA stream records: write batches (streams root is non-retryable; read POSTs stay retryable) + r"streams/[^/]+/records$", + r"streams/[^/]+/records/upsert$", + r"streams/[^/]+/records/delete$", ) ) ) diff --git a/scripts/ila_streams_records_smoke.py b/scripts/ila_streams_records_smoke.py new file mode 100644 index 0000000000..e1856c0a6e --- /dev/null +++ b/scripts/ila_streams_records_smoke.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +"""Local smoke test: list ILA streams and optionally ingest one record (not run in CI).""" + +from __future__ import annotations + +import argparse +import json +import os +import random +import sys +from pathlib import Path + +from dotenv import load_dotenv + +from cognite.client import ClientConfig, CogniteClient +from cognite.client.credentials import OAuthClientCertificate, OAuthClientCredentials, OAuthInteractive + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +def _make_client() -> CogniteClient: + login_flow = os.environ["LOGIN_FLOW"].lower() + if login_flow == "client_credentials": + credentials = OAuthClientCredentials( + token_url=os.environ["COGNITE_TOKEN_URL"], + client_id=os.environ["COGNITE_CLIENT_ID"], + client_secret=os.environ["COGNITE_CLIENT_SECRET"], + scopes=os.environ["COGNITE_TOKEN_SCOPES"].split(","), + ) + elif login_flow == "interactive": + credentials = OAuthInteractive( + authority_url=os.environ["COGNITE_AUTHORITY_URL"], + client_id=os.environ["COGNITE_CLIENT_ID"], + scopes=os.environ.get("COGNITE_TOKEN_SCOPES", "").split(","), + redirect_port=random.randint(53000, 60000), + ) + elif login_flow == "client_certificate": + credentials = OAuthClientCertificate( + authority_url=os.environ["COGNITE_AUTHORITY_URL"], + client_id=os.environ["COGNITE_CLIENT_ID"], + cert_thumbprint=os.environ["COGNITE_CERT_THUMBPRINT"], + certificate=Path(os.environ["COGNITE_CERTIFICATE"]).read_text(), + scopes=os.environ.get("COGNITE_TOKEN_SCOPES", "").split(","), + ) + else: + raise SystemExit("LOGIN_FLOW must be client_credentials, interactive, or client_certificate") + + return CogniteClient( + ClientConfig( + client_name=os.environ["COGNITE_CLIENT_NAME"], + project=os.environ["COGNITE_PROJECT"], + base_url=os.environ["COGNITE_BASE_URL"], + credentials=credentials, + ) + ) + + +def main() -> int: + epilog = """\ +Environment: same as integration tests (see CONTRIBUTING.md). + LOGIN_FLOW, COGNITE_PROJECT, COGNITE_BASE_URL, COGNITE_CLIENT_NAME, plus auth fields. + +Optional for --ingest-one: + ILA_STREAM_EXTERNAL_ID - stream external id + ILA_RECORD_ITEM_JSON - one JSON object: space, externalId, sources, ... + +Examples: + poetry run python scripts/ila_streams_records_smoke.py + ILA_STREAM_EXTERNAL_ID=my-stream ILA_RECORD_ITEM_JSON='{"space":"sp","externalId":"r1","sources":[]}' \\ + poetry run python scripts/ila_streams_records_smoke.py --ingest-one +""" + parser = argparse.ArgumentParser( + description="Verify ILA Streams + Records against a live CDF project.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=epilog, + ) + parser.add_argument( + "--ingest-one", + action="store_true", + help="Call ingest_items with ILA_RECORD_ITEM_JSON on ILA_STREAM_EXTERNAL_ID", + ) + args = parser.parse_args() + + load_dotenv(REPO_ROOT / ".env") + load_dotenv() + + client = _make_client() + + streams = client.streams.list() + print(f"Found {len(streams)} stream(s).") + for s in streams: + print(f" - {s.external_id!r} ({getattr(s, 'type', '')})") + + if args.ingest_one: + stream_id = os.environ.get("ILA_STREAM_EXTERNAL_ID") + raw = os.environ.get("ILA_RECORD_ITEM_JSON") + if not stream_id or not raw: + print( + "Set ILA_STREAM_EXTERNAL_ID and ILA_RECORD_ITEM_JSON for --ingest-one.", + file=sys.stderr, + ) + return 2 + item = json.loads(raw) + result = client.streams.records.ingest_items(stream_id, [item]) + print("ingest response:", result.dump(camel_case=False)) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/sync_client_codegen/sync_api_template.txt b/scripts/sync_client_codegen/sync_api_template.txt index 34e502971a..b3cdad08b1 100644 --- a/scripts/sync_client_codegen/sync_api_template.txt +++ b/scripts/sync_client_codegen/sync_api_template.txt @@ -5,16 +5,13 @@ This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ +from __future__ import annotations + {existing_imports} from cognite.client import AsyncCogniteClient from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.utils._async_helpers import SyncIterator, run_sync -from cognite.client.utils._concurrency import _get_event_loop_executor -from typing import Any, Iterator, TYPE_CHECKING, overload -from collections.abc import Coroutine if TYPE_CHECKING: - import pandas as pd {type_checking_imports} diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 943f2a8937..ce1426fcea 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -390,8 +390,9 @@ def permanent_scheduled_trigger( cognite_client: CogniteClient, permanent_workflow_for_triggers: WorkflowVersion ) -> Iterator[WorkflowTrigger]: version = permanent_workflow_for_triggers - every_15min = "*/15 * * * *" - on_the_minute = _create_scheduled_trigger(version, every_15min) + # Fire at least once per minute so integration CI can observe a run within a short poll window. + every_minute = "*/1 * * * *" + on_the_minute = _create_scheduled_trigger(version, every_minute) yield cognite_client.workflows.triggers.upsert(on_the_minute) @@ -448,9 +449,15 @@ def test_delete_multiple_non_existing(self, cognite_client: CogniteClient, new_w assert cognite_client.workflows.retrieve(new_workflow.external_id) is None def test_retrieve_workflow(self, cognite_client: CogniteClient, persisted_workflow_list: WorkflowList) -> None: - retrieved = cognite_client.workflows.retrieve(persisted_workflow_list[0].external_id) - assert retrieved - assert retrieved.dump() == persisted_workflow_list[0].dump() + expected = persisted_workflow_list[0] + retrieved = cognite_client.workflows.retrieve(expected.external_id) + assert retrieved is not None + assert retrieved.external_id == expected.external_id + assert retrieved.description == expected.description + assert retrieved.data_set_id == expected.data_set_id + assert retrieved.max_concurrent_executions == expected.max_concurrent_executions + assert retrieved.created_time == expected.created_time + assert retrieved.last_updated_time >= expected.last_updated_time def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> None: non_existing = cognite_client.workflows.retrieve("integration_test-non_existing_workflow") @@ -796,18 +803,19 @@ def test_trigger_run_history( cognite_client: CogniteClient, permanent_scheduled_trigger: WorkflowTrigger, ) -> None: - for attempt in [1, 2, 3]: - history = cognite_client.workflows.triggers.list_runs(external_id=permanent_scheduled_trigger.external_id) - if len(history) > 0: - break - else: - time.sleep(15) - else: - assert len(history) > 0, ( - "No trigger runs found after 3 attempts. If you are running tests for the first time against your project " - "it may take quite some time before the scheduled trigger runs for the first time. Grab some coffee!" + deadline = time.monotonic() + 130.0 + history = cognite_client.workflows.triggers.list_runs( + external_id=permanent_scheduled_trigger.external_id, limit=None + ) + while len(history) == 0 and time.monotonic() < deadline: + time.sleep(5) + history = cognite_client.workflows.triggers.list_runs( + external_id=permanent_scheduled_trigger.external_id, limit=None + ) + if len(history) == 0: + pytest.skip( + "No scheduled trigger runs within the polling window; depends on cron in the integration project." ) - assert history[0].external_id == permanent_scheduled_trigger.external_id assert history[0].workflow_external_id == permanent_scheduled_trigger.workflow_external_id assert history[0].workflow_version == permanent_scheduled_trigger.workflow_version diff --git a/tests/tests_unit/test_api/test_streams.py b/tests/tests_unit/test_api/test_streams.py new file mode 100644 index 0000000000..04a29331be --- /dev/null +++ b/tests/tests_unit/test_api/test_streams.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +import re + +import pytest +from pytest_httpx import HTTPXMock + +from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.streams import ( + RecordsIngestResponse, + StreamList, + StreamWrite, +) + + +@pytest.fixture +def streams_base_url(async_client: AsyncCogniteClient) -> str: + return async_client.data_modeling.streams._base_url_with_base_path + "/streams" + + +class TestStreamsAPI: + def test_list_parses_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + httpx_mock.add_response(method="GET", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) + out = cognite_client.data_modeling.streams.list() + assert isinstance(out, StreamList) + assert out[0].external_id == "st1" + + def test_retrieve_include_statistics_query( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + httpx_mock.add_response( + method="GET", + url=re.compile(re.escape(streams_base_url) + r"/st1\?includeStatistics=true$"), + json=sample, + ) + cognite_client.data_modeling.streams.retrieve("st1", include_statistics=True) + + def test_create_posts_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) + w = StreamWrite("st1", {"template": {"name": "ImmutableTestStream"}}) + cognite_client.data_modeling.streams.create([w]) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + assert requests[0].url.path.endswith("/streams") + + def test_create_rejects_multiple_items(self, cognite_client: CogniteClient) -> None: + a = StreamWrite("a", {"template": {"name": "ImmutableTestStream"}}) + b = StreamWrite("b", {"template": {"name": "ImmutableTestStream"}}) + with pytest.raises(ValueError, match="exactly one"): + cognite_client.data_modeling.streams.create([a, b]) + + def test_delete_rejects_multiple_items(self, cognite_client: CogniteClient) -> None: + from cognite.client.data_classes.streams import StreamDeleteItem + + with pytest.raises(ValueError, match="exactly one"): + cognite_client.data_modeling.streams.delete([StreamDeleteItem("a"), StreamDeleteItem("b")]) + + def test_records_ingest_posts( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/my-stream/records$"), + json={}, + ) + out = cognite_client.data_modeling.streams.records.ingest("my-stream", {"items": []}) + assert isinstance(out, RecordsIngestResponse) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + assert requests[0].url.path.endswith("/streams/my-stream/records") diff --git a/tests/tests_unit/test_api/test_streams_records.py b/tests/tests_unit/test_api/test_streams_records.py new file mode 100644 index 0000000000..2e6fd59fd2 --- /dev/null +++ b/tests/tests_unit/test_api/test_streams_records.py @@ -0,0 +1,190 @@ +"""Unit tests for ILA :class:`~cognite.client._api.streams.records.StreamsRecordsAPI` (via sync client + httpx).""" + +from __future__ import annotations + +import re + +import pytest +from pytest_httpx import HTTPXMock + +from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.streams import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) + + +@pytest.fixture +def streams_base_url(async_client: AsyncCogniteClient) -> str: + return async_client.data_modeling.streams._base_url_with_base_path + "/streams" + + +class TestIngest: + def test_posts_body_and_returns_ingest_response( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/my-stream/records$"), + json={}, + ) + out = cognite_client.data_modeling.streams.records.ingest("my-stream", {"items": []}) + assert isinstance(out, RecordsIngestResponse) + + +class TestIngestItems: + def test_wraps_sequence_as_items_key( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/s1/records$"), + json={}, + ) + row = {"space": "sp", "externalId": "r1", "sources": []} + cognite_client.data_modeling.streams.records.ingest_items("s1", [row]) + req = httpx_mock.get_requests()[0] + assert req.url.path.endswith("/streams/s1/records") + + def test_empty_items_raises(self, cognite_client: CogniteClient) -> None: + with pytest.raises(ValueError, match="at least one record"): + cognite_client.data_modeling.streams.records.ingest_items("s1", []) + + +class TestUpsertAndUpsertItems: + def test_upsert_posts_to_upsert_path( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/upsert$"), + json={}, + ) + cognite_client.data_modeling.streams.records.upsert("st", {"items": [{"x": 1}]}) + + def test_upsert_items_requires_non_empty(self, cognite_client: CogniteClient) -> None: + with pytest.raises(ValueError, match="at least one record"): + cognite_client.data_modeling.streams.records.upsert_items("st", []) + + +class TestDeleteAndDeleteItems: + def test_delete_posts_body( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/delete$"), + json={}, + ) + out = cognite_client.data_modeling.streams.records.delete("st", {"items": [{"space": "sp", "externalId": "a"}]}) + assert isinstance(out, RecordsDeleteResponse) + + def test_delete_items_wraps_identifiers( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/delete$"), + json={}, + ) + cognite_client.data_modeling.streams.records.delete_items("st", [{"space": "sp", "externalId": "a"}]) + + def test_delete_items_empty_raises(self, cognite_client: CogniteClient) -> None: + with pytest.raises(ValueError, match="at least one"): + cognite_client.data_modeling.streams.records.delete_items("st", []) + + +class TestFilter: + def test_parses_filter_response( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + payload = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "properties": {}, + } + ] + } + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/filter$"), + json=payload, + ) + out = cognite_client.data_modeling.streams.records.filter("st", {"filter": {"matchAll": {}}}) + assert isinstance(out, RecordsFilterResponse) + assert len(out.items) == 1 + assert out.items[0].external_id == "r1" + + +class TestAggregate: + def test_parses_aggregates( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/aggregate$"), + json={"aggregates": {"cnt": 3}, "typing": {}}, + ) + out = cognite_client.data_modeling.streams.records.aggregate("st", {"aggregate": []}) + assert isinstance(out, RecordsAggregateResponse) + assert out.aggregates == {"cnt": 3} + + +class TestSync: + def test_parses_sync_page( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + payload = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "status": "created", + } + ], + "nextCursor": "next", + "hasNext": False, + } + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/sync$"), + json=payload, + ) + out = cognite_client.data_modeling.streams.records.sync("st", {"sources": [], "limit": 10}) + assert isinstance(out, RecordsSyncResponse) + assert out.next_cursor == "next" + assert not out.has_next + assert out.items[0].status == "created" diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index d481301a78..7699282502 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1834,6 +1834,15 @@ async def test_is_retryable_resource_api_endpoints(self, method: str, path: str, ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values", True), ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/streams.streams", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/list", True), + # ILA streams (CRUD + record ingest/upsert/delete; filter/aggregate/sync are retryable) + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/upsert", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/filter", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/aggregate", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/sync", True), ] ), ) diff --git a/tests/tests_unit/test_data_classes/test_streams.py b/tests/tests_unit/test_data_classes/test_streams.py new file mode 100644 index 0000000000..45520c5fb6 --- /dev/null +++ b/tests/tests_unit/test_data_classes/test_streams.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from cognite.client.data_classes.streams import ( + Record, + RecordsFilterResponse, + RecordsSyncResponse, + Stream, + StreamList, + StreamWrite, + SyncRecord, +) + + +def test_stream_roundtrip() -> None: + raw = { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0, "consumed": 0.5}, + }, + }, + } + s = Stream._load(raw) + back = s.dump(camel_case=True) + assert back["externalId"] == "s1" + assert back["settings"]["limits"]["maxRecordsTotal"]["provisioned"] == 1000.0 + + +def test_stream_list_load() -> None: + raw = { + "items": [ + { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + lst = StreamList._load(raw["items"]) + assert len(lst) == 1 + assert lst[0].external_id == "s1" + + +def test_stream_write_dump() -> None: + w = StreamWrite("abc", {"template": {"name": "ImmutableTestStream"}}) + assert w.dump()["externalId"] == "abc" + assert w.dump()["settings"]["template"]["name"] == "ImmutableTestStream" + + +def test_record_load() -> None: + raw = { + "space": "sp", + "externalId": "r1", + "createdTime": 2, + "lastUpdatedTime": 3, + "properties": {"sp": {"c": {"p": 1}}}, + } + r = Record._load(raw) + assert r.space == "sp" + assert r.properties["sp"]["c"]["p"] == 1 + + +def test_records_filter_response() -> None: + raw = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "properties": {}, + } + ] + } + fr = RecordsFilterResponse._load(raw) + assert len(fr.items) == 1 + assert isinstance(fr.items[0], Record) + + +def test_records_sync_response() -> None: + raw = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "status": "created", + } + ], + "nextCursor": "c", + "hasNext": False, + } + sr = RecordsSyncResponse._load(raw) + assert sr.next_cursor == "c" + assert not sr.has_next + assert isinstance(sr.items[0], SyncRecord)