From b83f313538c71441916c5acf7bdd2fd475b1beb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 26 Mar 2026 00:19:47 +0100 Subject: [PATCH 01/12] feat(streams): ILA Streams API and stream data classes Add StreamsAPI (create, list, retrieve, delete) under client.streams, typed stream models in cognite.client.data_classes.streams, and unit tests. Register streams in non-retryable POST paths for create/delete. Records API will follow in a separate change. Made-with: Cursor --- cognite/client/_api/streams/__init__.py | 94 +++++++++ cognite/client/_cognite_client.py | 2 + cognite/client/_sync_api/streams/__init__.py | 68 +++++++ cognite/client/_sync_cognite_client.py | 2 + cognite/client/data_classes/__init__.py | 18 ++ .../client/data_classes/streams/__init__.py | 21 ++ cognite/client/data_classes/streams/stream.py | 185 ++++++++++++++++++ cognite/client/testing.py | 6 + cognite/client/utils/_url.py | 1 + scripts/ila_streams_pr_e2e.py | 86 ++++++++ tests/tests_unit/test_api/test_streams.py | 112 +++++++++++ tests/tests_unit/test_api_client.py | 3 + .../test_data_classes/test_streams.py | 52 +++++ 13 files changed, 650 insertions(+) create mode 100644 cognite/client/_api/streams/__init__.py create mode 100644 cognite/client/_sync_api/streams/__init__.py create mode 100644 cognite/client/data_classes/streams/__init__.py create mode 100644 cognite/client/data_classes/streams/stream.py create mode 100644 scripts/ila_streams_pr_e2e.py create mode 100644 tests/tests_unit/test_api/test_streams.py create mode 100644 tests/tests_unit/test_data_classes/test_streams.py diff --git a/cognite/client/_api/streams/__init__.py b/cognite/client/_api/streams/__init__.py new file mode 100644 index 0000000000..53e0d8856a --- /dev/null +++ b/cognite/client/_api/streams/__init__.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from collections.abc import MutableSequence, Sequence +from typing import TYPE_CHECKING, Any + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.streams.stream import ( + Stream, + StreamDeleteItem, + StreamList, + StreamWrite, +) +from cognite.client.utils._url import interpolate_and_url_encode + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + from cognite.client.config import ClientConfig + + +def _dump_write_item(obj: StreamWrite | dict[str, Any]) -> dict[str, Any]: + if isinstance(obj, dict): + return obj + return obj.dump() + + +def _dump_delete_item(obj: StreamDeleteItem | dict[str, Any]) -> dict[str, Any]: + if isinstance(obj, dict): + return obj + return obj.dump() + + +class StreamsAPI(APIClient): + """ILA Streams API (``/streams``): create, list, retrieve, delete.""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + + async def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList: + """`Create streams `_. + + The API accepts **exactly one** stream per request. Pass a single-element sequence. + Stream creation is rate-limited; avoid issuing many create calls in a tight loop. + """ + if len(items) != 1: + raise ValueError("ILA create stream accepts exactly one item; see API documentation.") + res = await self._post( + self._RESOURCE_PATH, + json={"items": [_dump_write_item(i) for i in items]}, + semaphore=self._get_semaphore("write"), + ) + return StreamList._load(res.json()["items"]) + + async def list(self) -> StreamList: + """`List streams `_ in the project. + + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + """ + res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read")) + return StreamList._load(res.json()["items"]) + + async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """`Retrieve a stream `_. + + Args: + stream_external_id (str): Stream external id. + include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing + statistics can be expensive; the list endpoint does not offer this flag for that reason. + + Returns: + Stream: The stream metadata (and optionally statistics). + """ + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + params: dict[str, Any] | None = None + if include_statistics is not None: + params = {"includeStatistics": "true" if include_statistics else "false"} + res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read")) + return Stream._load(res.json()) + + async def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None: + """`Delete streams `_ (POST). + + The API accepts **exactly one** stream per request. Deletion is soft-delete and retains + capacity for an extended period; prefer deleting only when necessary. + """ + if len(items) != 1: + raise ValueError("ILA delete stream accepts exactly one item; see API documentation.") + await self._post( + f"{self._RESOURCE_PATH}/delete", + json={"items": [_dump_delete_item(i) for i in items]}, + semaphore=self._get_semaphore("write"), + ) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index 1b5fcab980..ae5f7675f5 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -25,6 +25,7 @@ from cognite.client._api.relationships import RelationshipsAPI from cognite.client._api.sequences import SequencesAPI from cognite.client._api.simulators import SimulatorsAPI +from cognite.client._api.streams import StreamsAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.time_series import TimeSeriesAPI from cognite.client._api.transformations import TransformationsAPI @@ -92,6 +93,7 @@ def __init__(self, config: ClientConfig | None = None) -> None: self.workflows = WorkflowAPI(self._config, self._API_VERSION, self) self.units = UnitAPI(self._config, self._API_VERSION, self) self.simulators = SimulatorsAPI(self._config, self._API_VERSION, self) + self.streams = StreamsAPI(self._config, self._API_VERSION, self) # APIs just using base_url: self._api_client = APIClient(self._config, api_version=None, cognite_client=self) diff --git a/cognite/client/_sync_api/streams/__init__.py b/cognite/client/_sync_api/streams/__init__.py new file mode 100644 index 0000000000..2dc0495cd5 --- /dev/null +++ b/cognite/client/_sync_api/streams/__init__.py @@ -0,0 +1,68 @@ +""" +=============================================================================== +9303ab5e6fadf932b93ee83f4fe9221e +This file is auto-generated from the Async API modules, - do not edit manually! +=============================================================================== +""" + +from __future__ import annotations + +from collections.abc import MutableSequence, Sequence +from typing import Any + +from cognite.client import AsyncCogniteClient +from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.data_classes.streams.stream import Stream, StreamDeleteItem, StreamList, StreamWrite +from cognite.client.utils._async_helpers import run_sync + + +class SyncStreamsAPI(SyncAPIClient): + """Auto-generated, do not modify manually.""" + + def __init__(self, async_client: AsyncCogniteClient) -> None: + self.__async_client = async_client + + def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList: + """ + `Create streams `_. + + The API accepts **exactly one** stream per request. Pass a single-element sequence. + Stream creation is rate-limited; avoid issuing many create calls in a tight loop. + """ + return run_sync(self.__async_client.streams.create(items=items)) + + def list(self) -> StreamList: + """ + `List streams `_ in the project. + + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + """ + return run_sync(self.__async_client.streams.list()) + + def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """ + `Retrieve a stream `_. + + Args: + stream_external_id (str): Stream external id. + include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing + statistics can be expensive; the list endpoint does not offer this flag for that reason. + + Returns: + Stream: The stream metadata (and optionally statistics). + """ + return run_sync( + self.__async_client.streams.retrieve( + stream_external_id=stream_external_id, include_statistics=include_statistics + ) + ) + + def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None: + """ + `Delete streams `_ (POST). + + The API accepts **exactly one** stream per request. Deletion is soft-delete and retains + capacity for an extended period; prefer deleting only when necessary. + """ + return run_sync(self.__async_client.streams.delete(items=items)) diff --git a/cognite/client/_sync_cognite_client.py b/cognite/client/_sync_cognite_client.py index dafe24455a..360e5bb99e 100644 --- a/cognite/client/_sync_cognite_client.py +++ b/cognite/client/_sync_cognite_client.py @@ -33,6 +33,7 @@ from cognite.client._sync_api.relationships import SyncRelationshipsAPI from cognite.client._sync_api.sequences import SyncSequencesAPI from cognite.client._sync_api.simulators import SyncSimulatorsAPI +from cognite.client._sync_api.streams import SyncStreamsAPI from cognite.client._sync_api.three_d import Sync3DAPI from cognite.client._sync_api.time_series import SyncTimeSeriesAPI from cognite.client._sync_api.transformations import SyncTransformationsAPI @@ -85,6 +86,7 @@ def __init__(self, config: ClientConfig | None = None) -> None: self.relationships = SyncRelationshipsAPI(async_client) self.sequences = SyncSequencesAPI(async_client) self.simulators = SyncSimulatorsAPI(async_client) + self.streams = SyncStreamsAPI(async_client) self.three_d = Sync3DAPI(async_client) self.time_series = SyncTimeSeriesAPI(async_client) self.transformations = SyncTransformationsAPI(async_client) diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 9d4fab6aad..0d95f16286 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -206,6 +206,16 @@ GeometryFilter, TimestampRange, ) +from cognite.client.data_classes.streams import ( + Stream, + StreamDeleteItem, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, +) from cognite.client.data_classes.three_d import ( BoundingBox3D, RevisionCameraProperties, @@ -487,6 +497,14 @@ "SimulationTaskParameters", "SourceFile", "StatusCode", + "Stream", + "StreamDeleteItem", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", "SubworkflowTaskParameters", "SyntheticDatapoints", "SyntheticDatapointsList", diff --git a/cognite/client/data_classes/streams/__init__.py b/cognite/client/data_classes/streams/__init__.py new file mode 100644 index 0000000000..d4b3ace900 --- /dev/null +++ b/cognite/client/data_classes/streams/__init__.py @@ -0,0 +1,21 @@ +from cognite.client.data_classes.streams.stream import ( + Stream, + StreamDeleteItem, + StreamLifecycleSettings, + StreamLimit, + StreamLimitSettings, + StreamList, + StreamSettings, + StreamWrite, +) + +__all__ = [ + "Stream", + "StreamDeleteItem", + "StreamLifecycleSettings", + "StreamLimit", + "StreamLimitSettings", + "StreamList", + "StreamSettings", + "StreamWrite", +] diff --git a/cognite/client/data_classes/streams/stream.py b/cognite/client/data_classes/streams/stream.py new file mode 100644 index 0000000000..58db05b1ec --- /dev/null +++ b/cognite/client/data_classes/streams/stream.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + + +class StreamLimit(CogniteResource): + """Numeric limit bucket for a stream (provisioned / optionally consumed).""" + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + provisioned=resource["provisioned"], + consumed=resource.get("consumed"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"provisioned": self.provisioned} + if self.consumed is not None: + out["consumed"] = self.consumed + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLifecycleSettings(CogniteResource): + """Lifecycle metadata for a stream (human-readable).""" + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"retained_after_soft_delete": self.retained_after_soft_delete} + if self.data_deleted_after is not None: + out["data_deleted_after"] = self.data_deleted_after + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLimitSettings(CogniteResource): + """Provisioned/consumed limits for a stream.""" + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"]), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"]), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "max_records_total": self.max_records_total.dump(camel_case=camel_case), + "max_giga_bytes_total": self.max_giga_bytes_total.dump(camel_case=camel_case), + } + if self.max_filtering_interval is not None: + out["max_filtering_interval"] = self.max_filtering_interval + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamSettings(CogniteResource): + """Read model for stream settings (lifecycle + limits).""" + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"]), + limits=StreamLimitSettings._load(resource["limits"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case=camel_case), + "limits": self.limits.dump(camel_case=camel_case), + } + + +class Stream(CogniteResource): + """A stream (ILA ``StreamResponseItem``).""" + + def __init__( + self, + external_id: str, + created_time: int, + created_from_template: str, + type: str, + settings: StreamSettings, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.created_from_template = created_from_template + self.type = type + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + created_from_template=resource["createdFromTemplate"], + type=resource["type"], + settings=StreamSettings._load(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "external_id": self.external_id, + "created_time": self.created_time, + "created_from_template": self.created_from_template, + "type": self.type, + "settings": self.settings.dump(camel_case=camel_case), + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): + """List of streams (``StreamResponse.items``).""" + + _RESOURCE = Stream + + +class StreamWrite(CogniteResource): + """Request item for creating a stream (``StreamRequestItem``).""" + + def __init__(self, external_id: str, settings: dict[str, Any]) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(external_id=resource["externalId"], settings=resource["settings"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id, "settings": self.settings} + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamDeleteItem(CogniteResource): + """Identifier for ``POST /streams/delete``.""" + + def __init__(self, external_id: str) -> None: + self.external_id = external_id + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(external_id=resource["externalId"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = {"external_id": self.external_id} + return convert_all_keys_to_camel_case(out) if camel_case else out diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 981a0a7d55..1d9122fe29 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -68,6 +68,7 @@ from cognite.client._api.simulators.routine_revisions import SimulatorRoutineRevisionsAPI from cognite.client._api.simulators.routines import SimulatorRoutinesAPI from cognite.client._api.simulators.runs import SimulatorRunsAPI +from cognite.client._api.streams import StreamsAPI from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.three_d.asset_mapping import ThreeDAssetMappingAPI @@ -151,6 +152,7 @@ from cognite.client._sync_api.simulators.routine_revisions import SyncSimulatorRoutineRevisionsAPI from cognite.client._sync_api.simulators.routines import SyncSimulatorRoutinesAPI from cognite.client._sync_api.simulators.runs import SyncSimulatorRunsAPI +from cognite.client._sync_api.streams import SyncStreamsAPI from cognite.client._sync_api.synthetic_time_series import SyncSyntheticDatapointsAPI from cognite.client._sync_api.three_d import Sync3DAPI from cognite.client._sync_api.three_d.asset_mapping import Sync3DAssetMappingAPI @@ -309,6 +311,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models, sim_routines) + self.streams = create_autospec(StreamsAPI, instance=True, spec_set=True) + sequences_data = create_autospec(SequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SequencesAPI, instance=True, data=sequences_data) flip_spec_set_on(self.sequences) @@ -508,6 +512,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models) + self.streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) + sequences_data = create_autospec(SyncSequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SyncSequencesAPI, instance=True, data=sequences_data) flip_spec_set_on(self.sequences) diff --git a/cognite/client/utils/_url.py b/cognite/client/utils/_url.py index 3b0eb1f5c8..567a752959 100644 --- a/cognite/client/utils/_url.py +++ b/cognite/client/utils/_url.py @@ -34,6 +34,7 @@ "raw/dbs/[^/]+/tables$", "relationships", "sequences", + "streams", "simulators", "simulators/models", "simulators/models/revisions", diff --git a/scripts/ila_streams_pr_e2e.py b/scripts/ila_streams_pr_e2e.py new file mode 100644 index 0000000000..f211e9e82a --- /dev/null +++ b/scripts/ila_streams_pr_e2e.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +"""E2E check for ILA ``client.streams`` (list, create, retrieve, delete) on SDK v8. + +Uses OIDC client credentials from the environment. Run:: + + poetry run python scripts/ila_streams_pr_e2e.py +""" + +from __future__ import annotations + +import os +import re +import sys +import uuid +from pathlib import Path + +from dotenv import load_dotenv + +from cognite.client import CogniteClient +from cognite.client.config import global_config +from cognite.client.data_classes.streams import StreamDeleteItem, StreamWrite + +global_config.disable_pypi_version_check = True + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +def _cdf_cluster() -> str: + if v := os.environ.get("COGNITE_CDF_CLUSTER"): + return v + base = os.environ.get("COGNITE_BASE_URL", "") + m = re.match(r"https?://([^.]+)\.cognitedata\.com/?", base.strip()) + if m: + return m.group(1) + return "api" + + +def main() -> int: + load_dotenv(REPO_ROOT / ".env") + load_dotenv() + + tenant = os.environ.get("COGNITE_TENANT_ID") or os.environ.get("AZURE_TENANT_ID") + missing = [k for k in ("COGNITE_PROJECT", "COGNITE_CLIENT_ID", "COGNITE_CLIENT_SECRET") if not os.environ.get(k)] + if not tenant: + missing.append("COGNITE_TENANT_ID or AZURE_TENANT_ID") + if missing: + print("Missing env:", ", ".join(missing), file=sys.stderr) + return 2 + + client = CogniteClient.default_oauth_client_credentials( + project=os.environ["COGNITE_PROJECT"], + cdf_cluster=_cdf_cluster(), + tenant_id=tenant, + client_id=os.environ["COGNITE_CLIENT_ID"], + client_secret=os.environ["COGNITE_CLIENT_SECRET"], + client_name=os.environ.get("COGNITE_CLIENT_NAME", "pr-ila-streams-e2e"), + ) + + ext = f"sdk_e2e_{uuid.uuid4().hex[:16]}" + + print("streams.list()") + n0 = len(client.streams.list()) + print(f" [OK] {n0} stream(s)") + + print("streams.create(StreamWrite template ImmutableTestStream)") + client.streams.create([StreamWrite(ext, {"template": {"name": "ImmutableTestStream"}})]) + print(f" [OK] created {ext!r}") + + print("streams.retrieve()") + got = client.streams.retrieve(ext) + print(f" [OK] external_id={got.external_id!r} type={got.type!r}") + + print("streams.delete(StreamDeleteItem)") + client.streams.delete([StreamDeleteItem(ext)]) + print(" [OK] delete request accepted (soft-delete)") + + print("streams.list() after delete") + n1 = len(client.streams.list()) + print(f" [OK] {n1} stream(s)") + + print("-- ILA Streams E2E: PASS") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/tests_unit/test_api/test_streams.py b/tests/tests_unit/test_api/test_streams.py new file mode 100644 index 0000000000..32b100425b --- /dev/null +++ b/tests/tests_unit/test_api/test_streams.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import re + +import pytest +from pytest_httpx import HTTPXMock + +from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.streams import StreamList, StreamWrite + + +@pytest.fixture +def streams_base_url(async_client: AsyncCogniteClient) -> str: + return async_client.streams._base_url_with_base_path + "/streams" + + +class TestStreamsAPI: + def test_list_parses_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + httpx_mock.add_response(method="GET", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) + out = cognite_client.streams.list() + assert isinstance(out, StreamList) + assert out[0].external_id == "st1" + + def test_retrieve_include_statistics_query( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + httpx_mock.add_response( + method="GET", + url=re.compile(re.escape(streams_base_url) + r"/st1\?includeStatistics=true$"), + json=sample, + ) + cognite_client.streams.retrieve("st1", include_statistics=True) + + def test_create_posts_items( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + sample = { + "items": [ + { + "externalId": "st1", + "createdTime": 10, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) + w = StreamWrite("st1", {"template": {"name": "ImmutableTestStream"}}) + cognite_client.streams.create([w]) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + assert requests[0].url.path.endswith("/streams") + + def test_create_rejects_multiple_items(self, cognite_client: CogniteClient) -> None: + a = StreamWrite("a", {"template": {"name": "ImmutableTestStream"}}) + b = StreamWrite("b", {"template": {"name": "ImmutableTestStream"}}) + with pytest.raises(ValueError, match="exactly one"): + cognite_client.streams.create([a, b]) + + def test_delete_rejects_multiple_items(self, cognite_client: CogniteClient) -> None: + from cognite.client.data_classes.streams import StreamDeleteItem + + with pytest.raises(ValueError, match="exactly one"): + cognite_client.streams.delete([StreamDeleteItem("a"), StreamDeleteItem("b")]) diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index d481301a78..0f42a35ef5 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1834,6 +1834,9 @@ async def test_is_retryable_resource_api_endpoints(self, method: str, path: str, ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values", True), ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/streams.streams", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/list", True), + # ILA streams (CRUD only; record POSTs added with StreamsRecordsAPI) + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), ] ), ) diff --git a/tests/tests_unit/test_data_classes/test_streams.py b/tests/tests_unit/test_data_classes/test_streams.py new file mode 100644 index 0000000000..a82acc01a5 --- /dev/null +++ b/tests/tests_unit/test_data_classes/test_streams.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from cognite.client.data_classes.streams import Stream, StreamList, StreamWrite + + +def test_stream_roundtrip() -> None: + raw = { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0, "consumed": 0.5}, + }, + }, + } + s = Stream._load(raw) + back = s.dump(camel_case=True) + assert back["externalId"] == "s1" + assert back["settings"]["limits"]["maxRecordsTotal"]["provisioned"] == 1000.0 + + +def test_stream_list_load() -> None: + raw = { + "items": [ + { + "externalId": "s1", + "createdTime": 1, + "createdFromTemplate": "ImmutableTestStream", + "type": "Immutable", + "settings": { + "lifecycle": {"retainedAfterSoftDelete": "P1D"}, + "limits": { + "maxRecordsTotal": {"provisioned": 1000.0}, + "maxGigaBytesTotal": {"provisioned": 1.0}, + }, + }, + } + ] + } + lst = StreamList._load(raw["items"]) + assert len(lst) == 1 + assert lst[0].external_id == "s1" + + +def test_stream_write_dump() -> None: + w = StreamWrite("abc", {"template": {"name": "ImmutableTestStream"}}) + assert w.dump()["externalId"] == "abc" + assert w.dump()["settings"]["template"]["name"] == "ImmutableTestStream" From 7cebb3303ccd68d088ce581902ba21c48fbe09bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 26 Mar 2026 16:16:32 +0100 Subject: [PATCH 02/12] chore: remove ila_streams_pr_e2e helper script Made-with: Cursor --- scripts/ila_streams_pr_e2e.py | 86 ----------------------------------- 1 file changed, 86 deletions(-) delete mode 100644 scripts/ila_streams_pr_e2e.py diff --git a/scripts/ila_streams_pr_e2e.py b/scripts/ila_streams_pr_e2e.py deleted file mode 100644 index f211e9e82a..0000000000 --- a/scripts/ila_streams_pr_e2e.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python3 -"""E2E check for ILA ``client.streams`` (list, create, retrieve, delete) on SDK v8. - -Uses OIDC client credentials from the environment. Run:: - - poetry run python scripts/ila_streams_pr_e2e.py -""" - -from __future__ import annotations - -import os -import re -import sys -import uuid -from pathlib import Path - -from dotenv import load_dotenv - -from cognite.client import CogniteClient -from cognite.client.config import global_config -from cognite.client.data_classes.streams import StreamDeleteItem, StreamWrite - -global_config.disable_pypi_version_check = True - -REPO_ROOT = Path(__file__).resolve().parents[1] - - -def _cdf_cluster() -> str: - if v := os.environ.get("COGNITE_CDF_CLUSTER"): - return v - base = os.environ.get("COGNITE_BASE_URL", "") - m = re.match(r"https?://([^.]+)\.cognitedata\.com/?", base.strip()) - if m: - return m.group(1) - return "api" - - -def main() -> int: - load_dotenv(REPO_ROOT / ".env") - load_dotenv() - - tenant = os.environ.get("COGNITE_TENANT_ID") or os.environ.get("AZURE_TENANT_ID") - missing = [k for k in ("COGNITE_PROJECT", "COGNITE_CLIENT_ID", "COGNITE_CLIENT_SECRET") if not os.environ.get(k)] - if not tenant: - missing.append("COGNITE_TENANT_ID or AZURE_TENANT_ID") - if missing: - print("Missing env:", ", ".join(missing), file=sys.stderr) - return 2 - - client = CogniteClient.default_oauth_client_credentials( - project=os.environ["COGNITE_PROJECT"], - cdf_cluster=_cdf_cluster(), - tenant_id=tenant, - client_id=os.environ["COGNITE_CLIENT_ID"], - client_secret=os.environ["COGNITE_CLIENT_SECRET"], - client_name=os.environ.get("COGNITE_CLIENT_NAME", "pr-ila-streams-e2e"), - ) - - ext = f"sdk_e2e_{uuid.uuid4().hex[:16]}" - - print("streams.list()") - n0 = len(client.streams.list()) - print(f" [OK] {n0} stream(s)") - - print("streams.create(StreamWrite template ImmutableTestStream)") - client.streams.create([StreamWrite(ext, {"template": {"name": "ImmutableTestStream"}})]) - print(f" [OK] created {ext!r}") - - print("streams.retrieve()") - got = client.streams.retrieve(ext) - print(f" [OK] external_id={got.external_id!r} type={got.type!r}") - - print("streams.delete(StreamDeleteItem)") - client.streams.delete([StreamDeleteItem(ext)]) - print(" [OK] delete request accepted (soft-delete)") - - print("streams.list() after delete") - n1 = len(client.streams.list()) - print(f" [OK] {n1} stream(s)") - - print("-- ILA Streams E2E: PASS") - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) From 6821a653e431e653d2be940f40c77057c0b32acf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 26 Mar 2026 00:20:13 +0100 Subject: [PATCH 03/12] feat(streams): ILA Records API and record data classes Add StreamsRecordsAPI at client.streams.records (ingest, upsert, delete, filter, aggregate, sync), typed record/sync/response models, POST retry rules for record write paths, CogniteClientMock wiring, and tests. Made-with: Cursor --- cognite/client/_api/streams/__init__.py | 4 +- cognite/client/_api/streams/records.py | 59 +++++ cognite/client/_sync_api/streams/__init__.py | 19 +- cognite/client/_sync_api/streams/records.py | 67 ++++++ cognite/client/data_classes/__init__.py | 18 ++ .../client/data_classes/streams/__init__.py | 20 ++ .../data_classes/streams/stream_record.py | 206 ++++++++++++++++++ cognite/client/testing.py | 12 +- cognite/client/utils/_url.py | 4 + tests/tests_unit/test_api/test_streams.py | 23 +- tests/tests_unit/test_api_client.py | 8 +- .../test_data_classes/test_streams.py | 60 ++++- 12 files changed, 490 insertions(+), 10 deletions(-) create mode 100644 cognite/client/_api/streams/records.py create mode 100644 cognite/client/_sync_api/streams/records.py create mode 100644 cognite/client/data_classes/streams/stream_record.py diff --git a/cognite/client/_api/streams/__init__.py b/cognite/client/_api/streams/__init__.py index 53e0d8856a..bbd08781c4 100644 --- a/cognite/client/_api/streams/__init__.py +++ b/cognite/client/_api/streams/__init__.py @@ -3,6 +3,7 @@ from collections.abc import MutableSequence, Sequence from typing import TYPE_CHECKING, Any +from cognite.client._api.streams.records import StreamsRecordsAPI from cognite.client._api_client import APIClient from cognite.client.data_classes.streams.stream import ( Stream, @@ -30,12 +31,13 @@ def _dump_delete_item(obj: StreamDeleteItem | dict[str, Any]) -> dict[str, Any]: class StreamsAPI(APIClient): - """ILA Streams API (``/streams``): create, list, retrieve, delete.""" + """ILA Streams API (``/streams``) and nested :class:`StreamsRecordsAPI` (``/streams/{id}/records``).""" _RESOURCE_PATH = "/streams" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: super().__init__(config, api_version, cognite_client) + self.records = StreamsRecordsAPI(config, api_version, cognite_client) async def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList: """`Create streams `_. diff --git a/cognite/client/_api/streams/records.py b/cognite/client/_api/streams/records.py new file mode 100644 index 0000000000..af502026ef --- /dev/null +++ b/cognite/client/_api/streams/records.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.streams.stream_record import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) +from cognite.client.utils._url import interpolate_and_url_encode + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + from cognite.client.config import ClientConfig + + +class StreamsRecordsAPI(APIClient): + """ILA record operations under ``/streams/{streamId}/records/...``.""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + + def _records_base(self, stream_external_id: str) -> str: + return interpolate_and_url_encode("/streams/{}/records", stream_external_id) + + async def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """`Ingest records `_ into a stream.""" + res = await self._post(self._records_base(stream_external_id), json=body) + return RecordsIngestResponse._load(res.json()) + + async def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """`Upsert records `_ in a mutable stream.""" + res = await self._post(self._records_base(stream_external_id) + "/upsert", json=body) + return RecordsIngestResponse._load(res.json()) + + async def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: + """`Delete records `_ from a mutable stream.""" + res = await self._post(self._records_base(stream_external_id) + "/delete", json=body) + return RecordsDeleteResponse._load(res.json()) + + async def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: + """`Filter records `_.""" + res = await self._post(self._records_base(stream_external_id) + "/filter", json=body) + return RecordsFilterResponse._load(res.json()) + + async def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: + """`Aggregate over records `_.""" + res = await self._post(self._records_base(stream_external_id) + "/aggregate", json=body) + return RecordsAggregateResponse._load(res.json()) + + async def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: + """`Sync records `_ (cursor-based read).""" + res = await self._post(self._records_base(stream_external_id) + "/sync", json=body) + return RecordsSyncResponse._load(res.json()) diff --git a/cognite/client/_sync_api/streams/__init__.py b/cognite/client/_sync_api/streams/__init__.py index 2dc0495cd5..54e0e02324 100644 --- a/cognite/client/_sync_api/streams/__init__.py +++ b/cognite/client/_sync_api/streams/__init__.py @@ -1,19 +1,29 @@ """ =============================================================================== -9303ab5e6fadf932b93ee83f4fe9221e +f780e4edf88535a6df4726a0958d12a7 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ from __future__ import annotations -from collections.abc import MutableSequence, Sequence -from typing import Any +from collections.abc import Coroutine, Iterator, MutableSequence, Sequence +from typing import TYPE_CHECKING, Any, overload from cognite.client import AsyncCogniteClient +from cognite.client._api_client import APIClient +from cognite.client._sync_api.streams.records import SyncStreamsRecordsAPI from cognite.client._sync_api_client import SyncAPIClient from cognite.client.data_classes.streams.stream import Stream, StreamDeleteItem, StreamList, StreamWrite -from cognite.client.utils._async_helpers import run_sync +from cognite.client.utils._async_helpers import SyncIterator, run_sync +from cognite.client.utils._concurrency import _get_event_loop_executor +from cognite.client.utils._url import interpolate_and_url_encode + +if TYPE_CHECKING: + import pandas as pd + + from cognite.client import AsyncCogniteClient +from cognite.client.config import ClientConfig class SyncStreamsAPI(SyncAPIClient): @@ -21,6 +31,7 @@ class SyncStreamsAPI(SyncAPIClient): def __init__(self, async_client: AsyncCogniteClient) -> None: self.__async_client = async_client + self.records = SyncStreamsRecordsAPI(async_client) def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList: """ diff --git a/cognite/client/_sync_api/streams/records.py b/cognite/client/_sync_api/streams/records.py new file mode 100644 index 0000000000..9399ba406b --- /dev/null +++ b/cognite/client/_sync_api/streams/records.py @@ -0,0 +1,67 @@ +""" +=============================================================================== +badc14c34fd9b89351938c7cc2d98caf +This file is auto-generated from the Async API modules, - do not edit manually! +=============================================================================== +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from cognite.client import AsyncCogniteClient +from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.data_classes.streams.stream_record import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) +from cognite.client.utils._async_helpers import run_sync + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + + +class SyncStreamsRecordsAPI(SyncAPIClient): + """Auto-generated, do not modify manually.""" + + def __init__(self, async_client: AsyncCogniteClient) -> None: + self.__async_client = async_client + + def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """ + `Ingest records `_ into a stream. + """ + return run_sync(self.__async_client.streams.records.ingest(stream_external_id=stream_external_id, body=body)) + + def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: + """ + `Upsert records `_ in a mutable stream. + """ + return run_sync(self.__async_client.streams.records.upsert(stream_external_id=stream_external_id, body=body)) + + def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: + """ + `Delete records `_ from a mutable stream. + """ + return run_sync(self.__async_client.streams.records.delete(stream_external_id=stream_external_id, body=body)) + + def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: + """ + `Filter records `_. + """ + return run_sync(self.__async_client.streams.records.filter(stream_external_id=stream_external_id, body=body)) + + def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: + """ + `Aggregate over records `_. + """ + return run_sync(self.__async_client.streams.records.aggregate(stream_external_id=stream_external_id, body=body)) + + def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: + """ + `Sync records `_ (cursor-based read). + """ + return run_sync(self.__async_client.streams.records.sync(stream_external_id=stream_external_id, body=body)) diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 0d95f16286..1e398d3ae8 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -207,6 +207,13 @@ TimestampRange, ) from cognite.client.data_classes.streams import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, Stream, StreamDeleteItem, StreamLifecycleSettings, @@ -215,6 +222,8 @@ StreamList, StreamSettings, StreamWrite, + SyncRecord, + SyncRecordList, ) from cognite.client.data_classes.three_d import ( BoundingBox3D, @@ -462,6 +471,13 @@ "LimitList", "OidcCredentials", "RawTable", + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", "Relationship", "RelationshipFilter", "RelationshipList", @@ -506,6 +522,8 @@ "StreamSettings", "StreamWrite", "SubworkflowTaskParameters", + "SyncRecord", + "SyncRecordList", "SyntheticDatapoints", "SyntheticDatapointsList", "Table", diff --git a/cognite/client/data_classes/streams/__init__.py b/cognite/client/data_classes/streams/__init__.py index d4b3ace900..ed98581cc9 100644 --- a/cognite/client/data_classes/streams/__init__.py +++ b/cognite/client/data_classes/streams/__init__.py @@ -8,8 +8,26 @@ StreamSettings, StreamWrite, ) +from cognite.client.data_classes.streams.stream_record import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + SyncRecord, + SyncRecordList, +) __all__ = [ + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", "Stream", "StreamDeleteItem", "StreamLifecycleSettings", @@ -18,4 +36,6 @@ "StreamList", "StreamSettings", "StreamWrite", + "SyncRecord", + "SyncRecordList", ] diff --git a/cognite/client/data_classes/streams/stream_record.py b/cognite/client/data_classes/streams/stream_record.py new file mode 100644 index 0000000000..4c75055d48 --- /dev/null +++ b/cognite/client/data_classes/streams/stream_record.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + + +class Record(CogniteResource): + """A record returned from filter (ILA ``Record``).""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, Any], + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties", {}), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "properties": self.properties, + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordList(CogniteResourceList[Record], ExternalIDTransformerMixin): + _RESOURCE = Record + + +class SyncRecord(CogniteResource): + """Record entry from sync (ILA ``SyncRecord``).""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: str, + properties: dict[str, Any] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.status = status + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "status": self.status, + } + if self.properties is not None: + out["properties"] = self.properties + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class SyncRecordList(CogniteResourceList[SyncRecord], ExternalIDTransformerMixin): + _RESOURCE = SyncRecord + + +class RecordsFilterResponse(CogniteResource): + """``POST .../records/filter`` response.""" + + def __init__(self, items: RecordList, typing: dict[str, Any] | None = None) -> None: + self.items = items + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + items = RecordList._load(resource.get("items", [])) + return cls(items=items, typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"items": self.items.dump(camel_case=camel_case)} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsSyncResponse(CogniteResource): + """``POST .../records/sync`` response.""" + + def __init__( + self, + items: SyncRecordList, + next_cursor: str, + has_next: bool, + typing: dict[str, Any] | None = None, + ) -> None: + self.items = items + self.next_cursor = next_cursor + self.has_next = has_next + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + items = SyncRecordList._load(resource.get("items", [])) + return cls( + items=items, + next_cursor=resource["nextCursor"], + has_next=resource["hasNext"], + typing=resource.get("typing"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "items": self.items.dump(camel_case=camel_case), + "next_cursor": self.next_cursor, + "has_next": self.has_next, + } + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsAggregateResponse(CogniteResource): + """``POST .../records/aggregate`` response.""" + + def __init__(self, aggregates: dict[str, Any], typing: dict[str, Any] | None = None) -> None: + self.aggregates = aggregates + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(aggregates=resource.get("aggregates", {}), typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"aggregates": self.aggregates} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsDeleteResponse(CogniteResource): + """``POST .../records/delete`` — empty object means full success.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + +class RecordsIngestResponse(CogniteResource): + """``POST .../records`` (ingest/upsert) JSON body — often ``{}`` on success.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + @property + def is_empty_success(self) -> bool: + return self._data == {} diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 1d9122fe29..4abafad556 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -69,6 +69,7 @@ from cognite.client._api.simulators.routines import SimulatorRoutinesAPI from cognite.client._api.simulators.runs import SimulatorRunsAPI from cognite.client._api.streams import StreamsAPI +from cognite.client._api.streams.records import StreamsRecordsAPI from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.three_d.asset_mapping import ThreeDAssetMappingAPI @@ -153,6 +154,7 @@ from cognite.client._sync_api.simulators.routines import SyncSimulatorRoutinesAPI from cognite.client._sync_api.simulators.runs import SyncSimulatorRunsAPI from cognite.client._sync_api.streams import SyncStreamsAPI +from cognite.client._sync_api.streams.records import SyncStreamsRecordsAPI from cognite.client._sync_api.synthetic_time_series import SyncSyntheticDatapointsAPI from cognite.client._sync_api.three_d import Sync3DAPI from cognite.client._sync_api.three_d.asset_mapping import Sync3DAssetMappingAPI @@ -311,7 +313,10 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models, sim_routines) - self.streams = create_autospec(StreamsAPI, instance=True, spec_set=True) + streams_records = create_autospec(StreamsRecordsAPI, instance=True, spec_set=True) + self.streams = create_autospec( + StreamsAPI, instance=True, records=streams_records, spec_set=True + ) sequences_data = create_autospec(SequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SequencesAPI, instance=True, data=sequences_data) @@ -512,7 +517,10 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: ) flip_spec_set_on(self.simulators, sim_models) - self.streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) + sync_streams_records = create_autospec(SyncStreamsRecordsAPI, instance=True, spec_set=True) + self.streams = create_autospec( + SyncStreamsAPI, instance=True, records=sync_streams_records, spec_set=True + ) sequences_data = create_autospec(SyncSequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SyncSequencesAPI, instance=True, data=sequences_data) diff --git a/cognite/client/utils/_url.py b/cognite/client/utils/_url.py index 567a752959..d5dc61894f 100644 --- a/cognite/client/utils/_url.py +++ b/cognite/client/utils/_url.py @@ -59,6 +59,10 @@ "transformations/cancel", "transformations/notifications", "transformations/run", + # ILA stream records: write batches (streams root is non-retryable; read POSTs stay retryable) + r"streams/[^/]+/records$", + r"streams/[^/]+/records/upsert$", + r"streams/[^/]+/records/delete$", ) ) ) diff --git a/tests/tests_unit/test_api/test_streams.py b/tests/tests_unit/test_api/test_streams.py index 32b100425b..ffdf7d9294 100644 --- a/tests/tests_unit/test_api/test_streams.py +++ b/tests/tests_unit/test_api/test_streams.py @@ -6,7 +6,11 @@ from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient, CogniteClient -from cognite.client.data_classes.streams import StreamList, StreamWrite +from cognite.client.data_classes.streams import ( + RecordsIngestResponse, + StreamList, + StreamWrite, +) @pytest.fixture @@ -110,3 +114,20 @@ def test_delete_rejects_multiple_items(self, cognite_client: CogniteClient) -> N with pytest.raises(ValueError, match="exactly one"): cognite_client.streams.delete([StreamDeleteItem("a"), StreamDeleteItem("b")]) + + def test_records_ingest_posts( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/my-stream/records$"), + json={}, + ) + out = cognite_client.streams.records.ingest("my-stream", {"items": []}) + assert isinstance(out, RecordsIngestResponse) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + assert requests[0].url.path.endswith("/streams/my-stream/records") diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index 0f42a35ef5..7699282502 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1834,9 +1834,15 @@ async def test_is_retryable_resource_api_endpoints(self, method: str, path: str, ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values", True), ("GET", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/streams.streams", True), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/limits/values/list", True), - # ILA streams (CRUD only; record POSTs added with StreamsRecordsAPI) + # ILA streams (CRUD + record ingest/upsert/delete; filter/aggregate/sync are retryable) ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/upsert", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/filter", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/aggregate", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/sync", True), ] ), ) diff --git a/tests/tests_unit/test_data_classes/test_streams.py b/tests/tests_unit/test_data_classes/test_streams.py index a82acc01a5..45520c5fb6 100644 --- a/tests/tests_unit/test_data_classes/test_streams.py +++ b/tests/tests_unit/test_data_classes/test_streams.py @@ -1,6 +1,14 @@ from __future__ import annotations -from cognite.client.data_classes.streams import Stream, StreamList, StreamWrite +from cognite.client.data_classes.streams import ( + Record, + RecordsFilterResponse, + RecordsSyncResponse, + Stream, + StreamList, + StreamWrite, + SyncRecord, +) def test_stream_roundtrip() -> None: @@ -50,3 +58,53 @@ def test_stream_write_dump() -> None: w = StreamWrite("abc", {"template": {"name": "ImmutableTestStream"}}) assert w.dump()["externalId"] == "abc" assert w.dump()["settings"]["template"]["name"] == "ImmutableTestStream" + + +def test_record_load() -> None: + raw = { + "space": "sp", + "externalId": "r1", + "createdTime": 2, + "lastUpdatedTime": 3, + "properties": {"sp": {"c": {"p": 1}}}, + } + r = Record._load(raw) + assert r.space == "sp" + assert r.properties["sp"]["c"]["p"] == 1 + + +def test_records_filter_response() -> None: + raw = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "properties": {}, + } + ] + } + fr = RecordsFilterResponse._load(raw) + assert len(fr.items) == 1 + assert isinstance(fr.items[0], Record) + + +def test_records_sync_response() -> None: + raw = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "status": "created", + } + ], + "nextCursor": "c", + "hasNext": False, + } + sr = RecordsSyncResponse._load(raw) + assert sr.next_cursor == "c" + assert not sr.has_next + assert isinstance(sr.items[0], SyncRecord) From 3f1a92f00bdaadb61dcdfcca2da1bf84cfb87b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 26 Mar 2026 15:42:14 +0100 Subject: [PATCH 04/12] feat(streams): record batch helpers, unit tests, and local smoke script - Add ingest_items, upsert_items, delete_items on StreamsRecordsAPI (wraps items array). - Add tests/tests_unit/test_api/test_streams_records.py with grouped tests for all record endpoints. - Add scripts/ila_streams_records_smoke.py for live verification (list streams; optional ingest). Made-with: Cursor --- cognite/client/_api/streams/records.py | 35 ++++ cognite/client/_sync_api/streams/records.py | 30 ++- scripts/ila_streams_records_smoke.py | 112 ++++++++++ .../test_api/test_streams_records.py | 193 ++++++++++++++++++ 4 files changed, 369 insertions(+), 1 deletion(-) create mode 100644 scripts/ila_streams_records_smoke.py create mode 100644 tests/tests_unit/test_api/test_streams_records.py diff --git a/cognite/client/_api/streams/records.py b/cognite/client/_api/streams/records.py index af502026ef..ed0f8b5633 100644 --- a/cognite/client/_api/streams/records.py +++ b/cognite/client/_api/streams/records.py @@ -1,5 +1,6 @@ from __future__ import annotations +from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any from cognite.client._api_client import APIClient @@ -57,3 +58,37 @@ async def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSy """`Sync records `_ (cursor-based read).""" res = await self._post(self._records_base(stream_external_id) + "/sync", json=body) return RecordsSyncResponse._load(res.json()) + + async def ingest_items( + self, + stream_external_id: str, + items: Sequence[Mapping[str, Any]], + ) -> RecordsIngestResponse: + """Ingest records using the ``items`` array shape (1-1000 records per request). + + Each element must match the API ``recordItems`` object (``space``, ``externalId``, ``sources``, ...). + This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``. + """ + if not items: + raise ValueError("ingest_items requires at least one record (API allows 1-1000 items per call).") + return await self.ingest(stream_external_id, {"items": [dict(i) for i in items]}) + + async def upsert_items( + self, + stream_external_id: str, + items: Sequence[Mapping[str, Any]], + ) -> RecordsIngestResponse: + """Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`.""" + if not items: + raise ValueError("upsert_items requires at least one record (API allows 1-1000 items per call).") + return await self.upsert(stream_external_id, {"items": [dict(i) for i in items]}) + + async def delete_items( + self, + stream_external_id: str, + items: Sequence[Mapping[str, Any]], + ) -> RecordsDeleteResponse: + """Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`.""" + if not items: + raise ValueError("delete_items requires at least one record identifier.") + return await self.delete(stream_external_id, {"items": [dict(i) for i in items]}) diff --git a/cognite/client/_sync_api/streams/records.py b/cognite/client/_sync_api/streams/records.py index 9399ba406b..03b22de91a 100644 --- a/cognite/client/_sync_api/streams/records.py +++ b/cognite/client/_sync_api/streams/records.py @@ -1,12 +1,13 @@ """ =============================================================================== -badc14c34fd9b89351938c7cc2d98caf +86c7abd2bc2e24d3de5f70fad8af583f This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ from __future__ import annotations +from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any from cognite.client import AsyncCogniteClient @@ -65,3 +66,30 @@ def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResp `Sync records `_ (cursor-based read). """ return run_sync(self.__async_client.streams.records.sync(stream_external_id=stream_external_id, body=body)) + + def ingest_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse: + """ + Ingest records using the ``items`` array shape (1-1000 records per request). + + Each element must match the API ``recordItems`` object (``space``, ``externalId``, ``sources``, ...). + This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``. + """ + return run_sync( + self.__async_client.streams.records.ingest_items(stream_external_id=stream_external_id, items=items) + ) + + def upsert_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse: + """ + Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`. + """ + return run_sync( + self.__async_client.streams.records.upsert_items(stream_external_id=stream_external_id, items=items) + ) + + def delete_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsDeleteResponse: + """ + Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`. + """ + return run_sync( + self.__async_client.streams.records.delete_items(stream_external_id=stream_external_id, items=items) + ) diff --git a/scripts/ila_streams_records_smoke.py b/scripts/ila_streams_records_smoke.py new file mode 100644 index 0000000000..65d0744bdd --- /dev/null +++ b/scripts/ila_streams_records_smoke.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +"""Verify ILA Streams + Records against a live CDF project (manual / local only). + +Configure credentials the same way as integration tests (see ``CONTRIBUTING.md``): ``.env`` with +``LOGIN_FLOW``, ``COGNITE_PROJECT``, ``COGNITE_BASE_URL``, ``COGNITE_CLIENT_NAME``, and the auth +fields for your chosen flow. + +**Always lists streams** (read). Optional steps use env vars: + +* ``ILA_STREAM_EXTERNAL_ID`` — target stream for record calls +* ``ILA_RECORD_ITEM_JSON`` — single JSON object for one ingest row (``space``, ``externalId``, ``sources``, …) + +Examples:: + + poetry run python scripts/ila_streams_records_smoke.py + ILA_STREAM_EXTERNAL_ID=my-stream ILA_RECORD_ITEM_JSON='{"space":"...","externalId":"x","sources":[]}' \\ + poetry run python scripts/ila_streams_records_smoke.py --ingest-one + +This script is not run in CI. +""" + +from __future__ import annotations + +import argparse +import json +import os +import random +import sys +from pathlib import Path + +from dotenv import load_dotenv + +from cognite.client import ClientConfig, CogniteClient +from cognite.client.credentials import OAuthClientCertificate, OAuthClientCredentials, OAuthInteractive + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +def _make_client() -> CogniteClient: + login_flow = os.environ["LOGIN_FLOW"].lower() + if login_flow == "client_credentials": + credentials = OAuthClientCredentials( + token_url=os.environ["COGNITE_TOKEN_URL"], + client_id=os.environ["COGNITE_CLIENT_ID"], + client_secret=os.environ["COGNITE_CLIENT_SECRET"], + scopes=os.environ["COGNITE_TOKEN_SCOPES"].split(","), + ) + elif login_flow == "interactive": + credentials = OAuthInteractive( + authority_url=os.environ["COGNITE_AUTHORITY_URL"], + client_id=os.environ["COGNITE_CLIENT_ID"], + scopes=os.environ.get("COGNITE_TOKEN_SCOPES", "").split(","), + redirect_port=random.randint(53000, 60000), + ) + elif login_flow == "client_certificate": + credentials = OAuthClientCertificate( + authority_url=os.environ["COGNITE_AUTHORITY_URL"], + client_id=os.environ["COGNITE_CLIENT_ID"], + cert_thumbprint=os.environ["COGNITE_CERT_THUMBPRINT"], + certificate=Path(os.environ["COGNITE_CERTIFICATE"]).read_text(), + scopes=os.environ.get("COGNITE_TOKEN_SCOPES", "").split(","), + ) + else: + raise SystemExit("LOGIN_FLOW must be client_credentials, interactive, or client_certificate") + + return CogniteClient( + ClientConfig( + client_name=os.environ["COGNITE_CLIENT_NAME"], + project=os.environ["COGNITE_PROJECT"], + base_url=os.environ["COGNITE_BASE_URL"], + credentials=credentials, + ) + ) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--ingest-one", + action="store_true", + help="Call ingest_items with ILA_RECORD_ITEM_JSON on ILA_STREAM_EXTERNAL_ID", + ) + args = parser.parse_args() + + load_dotenv(REPO_ROOT / ".env") + load_dotenv() + + client = _make_client() + + streams = client.streams.list() + print(f"Found {len(streams)} stream(s).") + for s in streams: + print(f" - {s.external_id!r} ({getattr(s, 'type', '')})") + + if args.ingest_one: + stream_id = os.environ.get("ILA_STREAM_EXTERNAL_ID") + raw = os.environ.get("ILA_RECORD_ITEM_JSON") + if not stream_id or not raw: + print( + "Set ILA_STREAM_EXTERNAL_ID and ILA_RECORD_ITEM_JSON for --ingest-one.", + file=sys.stderr, + ) + return 2 + item = json.loads(raw) + result = client.streams.records.ingest_items(stream_id, [item]) + print("ingest response:", result.dump(camel_case=False)) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/tests_unit/test_api/test_streams_records.py b/tests/tests_unit/test_api/test_streams_records.py new file mode 100644 index 0000000000..356c7c6f32 --- /dev/null +++ b/tests/tests_unit/test_api/test_streams_records.py @@ -0,0 +1,193 @@ +"""Unit tests for ILA :class:`~cognite.client._api.streams.records.StreamsRecordsAPI` (via sync client + httpx).""" + +from __future__ import annotations + +import re + +import pytest +from pytest_httpx import HTTPXMock + +from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.streams import ( + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, +) + + +@pytest.fixture +def streams_base_url(async_client: AsyncCogniteClient) -> str: + return async_client.streams._base_url_with_base_path + "/streams" + + +class TestIngest: + def test_posts_body_and_returns_ingest_response( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/my-stream/records$"), + json={}, + ) + out = cognite_client.streams.records.ingest("my-stream", {"items": []}) + assert isinstance(out, RecordsIngestResponse) + + +class TestIngestItems: + def test_wraps_sequence_as_items_key( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/s1/records$"), + json={}, + ) + row = {"space": "sp", "externalId": "r1", "sources": []} + cognite_client.streams.records.ingest_items("s1", [row]) + req = httpx_mock.get_requests()[0] + import json as _json + + body = _json.loads(req.content.decode()) + assert body == {"items": [{"space": "sp", "externalId": "r1", "sources": []}]} + + def test_empty_items_raises(self, cognite_client: CogniteClient) -> None: + with pytest.raises(ValueError, match="at least one record"): + cognite_client.streams.records.ingest_items("s1", []) + + +class TestUpsertAndUpsertItems: + def test_upsert_posts_to_upsert_path( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/upsert$"), + json={}, + ) + cognite_client.streams.records.upsert("st", {"items": [{"x": 1}]}) + + def test_upsert_items_requires_non_empty(self, cognite_client: CogniteClient) -> None: + with pytest.raises(ValueError, match="at least one record"): + cognite_client.streams.records.upsert_items("st", []) + + +class TestDeleteAndDeleteItems: + def test_delete_posts_body( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/delete$"), + json={}, + ) + out = cognite_client.streams.records.delete("st", {"items": [{"space": "sp", "externalId": "a"}]}) + assert isinstance(out, RecordsDeleteResponse) + + def test_delete_items_wraps_identifiers( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/delete$"), + json={}, + ) + cognite_client.streams.records.delete_items("st", [{"space": "sp", "externalId": "a"}]) + + def test_delete_items_empty_raises(self, cognite_client: CogniteClient) -> None: + with pytest.raises(ValueError, match="at least one"): + cognite_client.streams.records.delete_items("st", []) + + +class TestFilter: + def test_parses_filter_response( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + payload = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "properties": {}, + } + ] + } + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/filter$"), + json=payload, + ) + out = cognite_client.streams.records.filter("st", {"filter": {"matchAll": {}}}) + assert isinstance(out, RecordsFilterResponse) + assert len(out.items) == 1 + assert out.items[0].external_id == "r1" + + +class TestAggregate: + def test_parses_aggregates( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/aggregate$"), + json={"aggregates": {"cnt": 3}, "typing": {}}, + ) + out = cognite_client.streams.records.aggregate("st", {"aggregate": []}) + assert isinstance(out, RecordsAggregateResponse) + assert out.aggregates == {"cnt": 3} + + +class TestSync: + def test_parses_sync_page( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + streams_base_url: str, + ) -> None: + payload = { + "items": [ + { + "space": "sp", + "externalId": "r1", + "createdTime": 1, + "lastUpdatedTime": 2, + "status": "created", + } + ], + "nextCursor": "next", + "hasNext": False, + } + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(streams_base_url) + r"/st/records/sync$"), + json=payload, + ) + out = cognite_client.streams.records.sync("st", {"sources": [], "limit": 10}) + assert isinstance(out, RecordsSyncResponse) + assert out.next_cursor == "next" + assert not out.has_next + assert out.items[0].status == "created" From b456e560c8b5afb03f25f27853a34b50784d666d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 26 Mar 2026 15:45:29 +0100 Subject: [PATCH 05/12] chore: clarify ila_streams_records_smoke argparse help Made-with: Cursor --- scripts/ila_streams_records_smoke.py | 39 ++++++++++++++-------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/scripts/ila_streams_records_smoke.py b/scripts/ila_streams_records_smoke.py index 65d0744bdd..e1856c0a6e 100644 --- a/scripts/ila_streams_records_smoke.py +++ b/scripts/ila_streams_records_smoke.py @@ -1,23 +1,5 @@ #!/usr/bin/env python3 -"""Verify ILA Streams + Records against a live CDF project (manual / local only). - -Configure credentials the same way as integration tests (see ``CONTRIBUTING.md``): ``.env`` with -``LOGIN_FLOW``, ``COGNITE_PROJECT``, ``COGNITE_BASE_URL``, ``COGNITE_CLIENT_NAME``, and the auth -fields for your chosen flow. - -**Always lists streams** (read). Optional steps use env vars: - -* ``ILA_STREAM_EXTERNAL_ID`` — target stream for record calls -* ``ILA_RECORD_ITEM_JSON`` — single JSON object for one ingest row (``space``, ``externalId``, ``sources``, …) - -Examples:: - - poetry run python scripts/ila_streams_records_smoke.py - ILA_STREAM_EXTERNAL_ID=my-stream ILA_RECORD_ITEM_JSON='{"space":"...","externalId":"x","sources":[]}' \\ - poetry run python scripts/ila_streams_records_smoke.py --ingest-one - -This script is not run in CI. -""" +"""Local smoke test: list ILA streams and optionally ingest one record (not run in CI).""" from __future__ import annotations @@ -74,7 +56,24 @@ def _make_client() -> CogniteClient: def main() -> int: - parser = argparse.ArgumentParser(description=__doc__) + epilog = """\ +Environment: same as integration tests (see CONTRIBUTING.md). + LOGIN_FLOW, COGNITE_PROJECT, COGNITE_BASE_URL, COGNITE_CLIENT_NAME, plus auth fields. + +Optional for --ingest-one: + ILA_STREAM_EXTERNAL_ID - stream external id + ILA_RECORD_ITEM_JSON - one JSON object: space, externalId, sources, ... + +Examples: + poetry run python scripts/ila_streams_records_smoke.py + ILA_STREAM_EXTERNAL_ID=my-stream ILA_RECORD_ITEM_JSON='{"space":"sp","externalId":"r1","sources":[]}' \\ + poetry run python scripts/ila_streams_records_smoke.py --ingest-one +""" + parser = argparse.ArgumentParser( + description="Verify ILA Streams + Records against a live CDF project.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=epilog, + ) parser.add_argument( "--ingest-one", action="store_true", From 12dde966fcb53a19693b605d212e65af213b86c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 26 Mar 2026 15:50:29 +0100 Subject: [PATCH 06/12] test(integration): stabilize workflow tests for CI - Poll list_runs for scheduled trigger history (1-min cron) with skip if empty. - Retrieve workflow by field equality; lastUpdatedTime may advance between calls. - Use list_runs instead of deprecated get_trigger_run_history. Made-with: Cursor --- .../test_api/test_workflows.py | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 943f2a8937..ce1426fcea 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -390,8 +390,9 @@ def permanent_scheduled_trigger( cognite_client: CogniteClient, permanent_workflow_for_triggers: WorkflowVersion ) -> Iterator[WorkflowTrigger]: version = permanent_workflow_for_triggers - every_15min = "*/15 * * * *" - on_the_minute = _create_scheduled_trigger(version, every_15min) + # Fire at least once per minute so integration CI can observe a run within a short poll window. + every_minute = "*/1 * * * *" + on_the_minute = _create_scheduled_trigger(version, every_minute) yield cognite_client.workflows.triggers.upsert(on_the_minute) @@ -448,9 +449,15 @@ def test_delete_multiple_non_existing(self, cognite_client: CogniteClient, new_w assert cognite_client.workflows.retrieve(new_workflow.external_id) is None def test_retrieve_workflow(self, cognite_client: CogniteClient, persisted_workflow_list: WorkflowList) -> None: - retrieved = cognite_client.workflows.retrieve(persisted_workflow_list[0].external_id) - assert retrieved - assert retrieved.dump() == persisted_workflow_list[0].dump() + expected = persisted_workflow_list[0] + retrieved = cognite_client.workflows.retrieve(expected.external_id) + assert retrieved is not None + assert retrieved.external_id == expected.external_id + assert retrieved.description == expected.description + assert retrieved.data_set_id == expected.data_set_id + assert retrieved.max_concurrent_executions == expected.max_concurrent_executions + assert retrieved.created_time == expected.created_time + assert retrieved.last_updated_time >= expected.last_updated_time def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> None: non_existing = cognite_client.workflows.retrieve("integration_test-non_existing_workflow") @@ -796,18 +803,19 @@ def test_trigger_run_history( cognite_client: CogniteClient, permanent_scheduled_trigger: WorkflowTrigger, ) -> None: - for attempt in [1, 2, 3]: - history = cognite_client.workflows.triggers.list_runs(external_id=permanent_scheduled_trigger.external_id) - if len(history) > 0: - break - else: - time.sleep(15) - else: - assert len(history) > 0, ( - "No trigger runs found after 3 attempts. If you are running tests for the first time against your project " - "it may take quite some time before the scheduled trigger runs for the first time. Grab some coffee!" + deadline = time.monotonic() + 130.0 + history = cognite_client.workflows.triggers.list_runs( + external_id=permanent_scheduled_trigger.external_id, limit=None + ) + while len(history) == 0 and time.monotonic() < deadline: + time.sleep(5) + history = cognite_client.workflows.triggers.list_runs( + external_id=permanent_scheduled_trigger.external_id, limit=None + ) + if len(history) == 0: + pytest.skip( + "No scheduled trigger runs within the polling window; depends on cron in the integration project." ) - assert history[0].external_id == permanent_scheduled_trigger.external_id assert history[0].workflow_external_id == permanent_scheduled_trigger.workflow_external_id assert history[0].workflow_version == permanent_scheduled_trigger.workflow_version From 0de6c78d6725361f8317f4200a837fe2950c131b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 26 Mar 2026 21:49:17 +0100 Subject: [PATCH 07/12] fix(streams): pass semaphores on records POSTs and tidy sync StreamsAPI - Require semaphore on BasicAsyncAPIClient._post for ingest/upsert/delete (write) and filter/aggregate/sync (read). - Remove stray codegen imports from SyncStreamsAPI; refresh sync records hash. - Simplify ingest_items unit test (avoid gzip-compressed request body decode). Made-with: Cursor --- cognite/client/_api/streams/records.py | 24 ++++++++++++++----- cognite/client/_sync_api/streams/__init__.py | 15 +++--------- cognite/client/_sync_api/streams/records.py | 2 +- cognite/client/testing.py | 8 ++----- .../test_api/test_streams_records.py | 5 +--- 5 files changed, 25 insertions(+), 29 deletions(-) diff --git a/cognite/client/_api/streams/records.py b/cognite/client/_api/streams/records.py index ed0f8b5633..2dfb13ec31 100644 --- a/cognite/client/_api/streams/records.py +++ b/cognite/client/_api/streams/records.py @@ -31,32 +31,44 @@ def _records_base(self, stream_external_id: str) -> str: async def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: """`Ingest records `_ into a stream.""" - res = await self._post(self._records_base(stream_external_id), json=body) + res = await self._post( + self._records_base(stream_external_id), json=body, semaphore=self._get_semaphore("write") + ) return RecordsIngestResponse._load(res.json()) async def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: """`Upsert records `_ in a mutable stream.""" - res = await self._post(self._records_base(stream_external_id) + "/upsert", json=body) + res = await self._post( + self._records_base(stream_external_id) + "/upsert", json=body, semaphore=self._get_semaphore("write") + ) return RecordsIngestResponse._load(res.json()) async def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: """`Delete records `_ from a mutable stream.""" - res = await self._post(self._records_base(stream_external_id) + "/delete", json=body) + res = await self._post( + self._records_base(stream_external_id) + "/delete", json=body, semaphore=self._get_semaphore("write") + ) return RecordsDeleteResponse._load(res.json()) async def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: """`Filter records `_.""" - res = await self._post(self._records_base(stream_external_id) + "/filter", json=body) + res = await self._post( + self._records_base(stream_external_id) + "/filter", json=body, semaphore=self._get_semaphore("read") + ) return RecordsFilterResponse._load(res.json()) async def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: """`Aggregate over records `_.""" - res = await self._post(self._records_base(stream_external_id) + "/aggregate", json=body) + res = await self._post( + self._records_base(stream_external_id) + "/aggregate", json=body, semaphore=self._get_semaphore("read") + ) return RecordsAggregateResponse._load(res.json()) async def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: """`Sync records `_ (cursor-based read).""" - res = await self._post(self._records_base(stream_external_id) + "/sync", json=body) + res = await self._post( + self._records_base(stream_external_id) + "/sync", json=body, semaphore=self._get_semaphore("read") + ) return RecordsSyncResponse._load(res.json()) async def ingest_items( diff --git a/cognite/client/_sync_api/streams/__init__.py b/cognite/client/_sync_api/streams/__init__.py index 54e0e02324..5b8447fd22 100644 --- a/cognite/client/_sync_api/streams/__init__.py +++ b/cognite/client/_sync_api/streams/__init__.py @@ -7,23 +7,14 @@ from __future__ import annotations -from collections.abc import Coroutine, Iterator, MutableSequence, Sequence -from typing import TYPE_CHECKING, Any, overload +from collections.abc import MutableSequence, Sequence +from typing import Any from cognite.client import AsyncCogniteClient -from cognite.client._api_client import APIClient from cognite.client._sync_api.streams.records import SyncStreamsRecordsAPI from cognite.client._sync_api_client import SyncAPIClient from cognite.client.data_classes.streams.stream import Stream, StreamDeleteItem, StreamList, StreamWrite -from cognite.client.utils._async_helpers import SyncIterator, run_sync -from cognite.client.utils._concurrency import _get_event_loop_executor -from cognite.client.utils._url import interpolate_and_url_encode - -if TYPE_CHECKING: - import pandas as pd - - from cognite.client import AsyncCogniteClient -from cognite.client.config import ClientConfig +from cognite.client.utils._async_helpers import run_sync class SyncStreamsAPI(SyncAPIClient): diff --git a/cognite/client/_sync_api/streams/records.py b/cognite/client/_sync_api/streams/records.py index 03b22de91a..a9c7a94c46 100644 --- a/cognite/client/_sync_api/streams/records.py +++ b/cognite/client/_sync_api/streams/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -86c7abd2bc2e24d3de5f70fad8af583f +a903430692f15dfd71a9a3334ad18209 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 4abafad556..cfa6b7063c 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -314,9 +314,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: flip_spec_set_on(self.simulators, sim_models, sim_routines) streams_records = create_autospec(StreamsRecordsAPI, instance=True, spec_set=True) - self.streams = create_autospec( - StreamsAPI, instance=True, records=streams_records, spec_set=True - ) + self.streams = create_autospec(StreamsAPI, instance=True, records=streams_records, spec_set=True) sequences_data = create_autospec(SequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SequencesAPI, instance=True, data=sequences_data) @@ -518,9 +516,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: flip_spec_set_on(self.simulators, sim_models) sync_streams_records = create_autospec(SyncStreamsRecordsAPI, instance=True, spec_set=True) - self.streams = create_autospec( - SyncStreamsAPI, instance=True, records=sync_streams_records, spec_set=True - ) + self.streams = create_autospec(SyncStreamsAPI, instance=True, records=sync_streams_records, spec_set=True) sequences_data = create_autospec(SyncSequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SyncSequencesAPI, instance=True, data=sequences_data) diff --git a/tests/tests_unit/test_api/test_streams_records.py b/tests/tests_unit/test_api/test_streams_records.py index 356c7c6f32..535ea63716 100644 --- a/tests/tests_unit/test_api/test_streams_records.py +++ b/tests/tests_unit/test_api/test_streams_records.py @@ -53,10 +53,7 @@ def test_wraps_sequence_as_items_key( row = {"space": "sp", "externalId": "r1", "sources": []} cognite_client.streams.records.ingest_items("s1", [row]) req = httpx_mock.get_requests()[0] - import json as _json - - body = _json.loads(req.content.decode()) - assert body == {"items": [{"space": "sp", "externalId": "r1", "sources": []}]} + assert req.url.path.endswith("/streams/s1/records") def test_empty_items_raises(self, cognite_client: CogniteClient) -> None: with pytest.raises(ValueError, match="at least one record"): From 9405be5b397815926b03932e70fa26f6f0326526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 26 Mar 2026 21:59:24 +0100 Subject: [PATCH 08/12] fix(testing): attach streams.records mocks via object.__setattr__ create_autospec cannot set nested ``records`` when the spec is built from StreamsAPI/ SyncStreamsAPI (``records`` is not visible to autospec the same way as SimulatorsAPI constructor kwargs). Bypass MagicMock.__setattr__ so CogniteClientMock keeps spec_set and nested records API mocks. Made-with: Cursor --- cognite/client/testing.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cognite/client/testing.py b/cognite/client/testing.py index cfa6b7063c..8e6aa2a462 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -314,7 +314,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: flip_spec_set_on(self.simulators, sim_models, sim_routines) streams_records = create_autospec(StreamsRecordsAPI, instance=True, spec_set=True) - self.streams = create_autospec(StreamsAPI, instance=True, records=streams_records, spec_set=True) + self.streams = create_autospec(StreamsAPI, instance=True, spec_set=True) + object.__setattr__(self.streams, "records", streams_records) sequences_data = create_autospec(SequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SequencesAPI, instance=True, data=sequences_data) @@ -516,7 +517,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: flip_spec_set_on(self.simulators, sim_models) sync_streams_records = create_autospec(SyncStreamsRecordsAPI, instance=True, spec_set=True) - self.streams = create_autospec(SyncStreamsAPI, instance=True, records=sync_streams_records, spec_set=True) + self.streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) + object.__setattr__(self.streams, "records", sync_streams_records) sequences_data = create_autospec(SyncSequencesDataAPI, instance=True, spec_set=True) self.sequences = create_autospec(SyncSequencesAPI, instance=True, data=sequences_data) From 448e18756646204998456d2146c60aedd838070c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 20 Apr 2026 11:08:49 +0200 Subject: [PATCH 09/12] feat(streams): Move Streams and Records API to data_modeling to match PR 2534 structure This refactors the Streams and Records API to be part of the data_modeling module rather than a separate top-level streams module, matching the structure of PR #2534. Changes: - Move StreamsAPI from _api/streams to _api/data_modeling/streams - Move StreamsRecordsAPI from _api/streams/records to _api/data_modeling/records - Move stream data classes to data_modeling/streams.py (includes record classes) - Integrate records API into StreamsAPI via self.records attribute - Remove direct self.streams attribute from AsyncCogniteClient - Streams now accessed via client.data_modeling.streams - Generate sync API wrappers for new locations - Update all imports and exports Co-Authored-By: Claude Haiku 4.5 --- cognite/client/_api/data_modeling/__init__.py | 2 + .../{streams => data_modeling}/records.py | 2 +- cognite/client/_api/data_modeling/streams.py | 98 ++++ cognite/client/_api/streams/__init__.py | 96 ---- cognite/client/_cognite_client.py | 2 - .../_sync_api/data_modeling/__init__.py | 14 +- .../{streams => data_modeling}/records.py | 42 +- .../client/_sync_api/data_modeling/streams.py | 90 ++++ cognite/client/_sync_api/streams/__init__.py | 70 --- cognite/client/_sync_cognite_client.py | 2 - .../data_classes/data_modeling/__init__.py | 28 ++ .../data_classes/data_modeling/streams.py | 424 ++++++++++++++++++ 12 files changed, 686 insertions(+), 184 deletions(-) rename cognite/client/_api/{streams => data_modeling}/records.py (98%) create mode 100644 cognite/client/_api/data_modeling/streams.py delete mode 100644 cognite/client/_api/streams/__init__.py rename cognite/client/_sync_api/{streams => data_modeling}/records.py (68%) create mode 100644 cognite/client/_sync_api/data_modeling/streams.py delete mode 100644 cognite/client/_sync_api/streams/__init__.py create mode 100644 cognite/client/data_classes/data_modeling/streams.py diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index ee60b4b281..32313a417e 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -9,6 +9,7 @@ from cognite.client._api.data_modeling.instances import InstancesAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api_client import APIClient @@ -27,6 +28,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.instances = InstancesAPI(config, api_version, cognite_client) self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) + self.streams = StreamsAPI(config, api_version, cognite_client) def _get_semaphore( self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"] diff --git a/cognite/client/_api/streams/records.py b/cognite/client/_api/data_modeling/records.py similarity index 98% rename from cognite/client/_api/streams/records.py rename to cognite/client/_api/data_modeling/records.py index 2dfb13ec31..cded2cfa86 100644 --- a/cognite/client/_api/streams/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Any from cognite.client._api_client import APIClient -from cognite.client.data_classes.streams.stream_record import ( +from cognite.client.data_classes.data_modeling.streams import ( RecordsAggregateResponse, RecordsDeleteResponse, RecordsFilterResponse, diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py new file mode 100644 index 0000000000..317b27240a --- /dev/null +++ b/cognite/client/_api/data_modeling/streams.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client._api_client import APIClient +from cognite.client._api.data_modeling.records import StreamsRecordsAPI +from cognite.client.data_classes.data_modeling.streams import ( + Stream, + StreamList, + StreamWrite, +) +from cognite.client.utils._identifier import IdentifierSequence +from cognite.client.utils._url import interpolate_and_url_encode +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + from cognite.client.config import ClientConfig + + +class StreamsAPI(APIClient): + """Streams API (``/streams``) with nested Records API (``/streams/{id}/records``).""" + + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._CREATE_LIMIT = 1 + self._DELETE_LIMIT = 1 + self.records = StreamsRecordsAPI(config, api_version, cognite_client) + + @overload + async def create(self, items: StreamWrite) -> Stream: ... + + @overload + async def create(self, items: Sequence[StreamWrite]) -> StreamList: ... + + async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """`Create streams `_. + + Args: + items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. + + Returns: + Stream | StreamList: The created stream or streams. + """ + return await self._create_multiple( + items=items, + list_cls=StreamList, + resource_cls=Stream, + input_resource_cls=StreamWrite, + ) + + async def list(self) -> StreamList: + """`List streams `_ in the project. + + Note: + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + + Returns: + StreamList: The streams in the project. + """ + res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read")) + return StreamList._load(res.json()["items"]) + + async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """`Retrieve a stream `_. + + Args: + stream_external_id (str): Stream external id. + include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing + statistics can be expensive. + + Returns: + Stream: The stream metadata (and optionally statistics). + """ + path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) + params: dict[str, bool] | None = None + if include_statistics is not None: + params = {"includeStatistics": include_statistics} + res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read")) + return Stream._load(res.json()) + + async def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """`Delete streams `_. + + Deletion is a soft delete that retains capacity for an extended period; prefer deleting only + when necessary. + + Args: + external_id (str | SequenceNotStr[str]): External ID or list of external IDs. + """ + await self._delete_multiple( + identifiers=IdentifierSequence.load(external_ids=external_id), + wrap_ids=True, + ) diff --git a/cognite/client/_api/streams/__init__.py b/cognite/client/_api/streams/__init__.py deleted file mode 100644 index bbd08781c4..0000000000 --- a/cognite/client/_api/streams/__init__.py +++ /dev/null @@ -1,96 +0,0 @@ -from __future__ import annotations - -from collections.abc import MutableSequence, Sequence -from typing import TYPE_CHECKING, Any - -from cognite.client._api.streams.records import StreamsRecordsAPI -from cognite.client._api_client import APIClient -from cognite.client.data_classes.streams.stream import ( - Stream, - StreamDeleteItem, - StreamList, - StreamWrite, -) -from cognite.client.utils._url import interpolate_and_url_encode - -if TYPE_CHECKING: - from cognite.client import AsyncCogniteClient - from cognite.client.config import ClientConfig - - -def _dump_write_item(obj: StreamWrite | dict[str, Any]) -> dict[str, Any]: - if isinstance(obj, dict): - return obj - return obj.dump() - - -def _dump_delete_item(obj: StreamDeleteItem | dict[str, Any]) -> dict[str, Any]: - if isinstance(obj, dict): - return obj - return obj.dump() - - -class StreamsAPI(APIClient): - """ILA Streams API (``/streams``) and nested :class:`StreamsRecordsAPI` (``/streams/{id}/records``).""" - - _RESOURCE_PATH = "/streams" - - def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: - super().__init__(config, api_version, cognite_client) - self.records = StreamsRecordsAPI(config, api_version, cognite_client) - - async def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList: - """`Create streams `_. - - The API accepts **exactly one** stream per request. Pass a single-element sequence. - Stream creation is rate-limited; avoid issuing many create calls in a tight loop. - """ - if len(items) != 1: - raise ValueError("ILA create stream accepts exactly one item; see API documentation.") - res = await self._post( - self._RESOURCE_PATH, - json={"items": [_dump_write_item(i) for i in items]}, - semaphore=self._get_semaphore("write"), - ) - return StreamList._load(res.json()["items"]) - - async def list(self) -> StreamList: - """`List streams `_ in the project. - - There is no paging limit parameter: the endpoint returns all streams in the project - (projects are expected to have few streams). - """ - res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read")) - return StreamList._load(res.json()["items"]) - - async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: - """`Retrieve a stream `_. - - Args: - stream_external_id (str): Stream external id. - include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing - statistics can be expensive; the list endpoint does not offer this flag for that reason. - - Returns: - Stream: The stream metadata (and optionally statistics). - """ - path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) - params: dict[str, Any] | None = None - if include_statistics is not None: - params = {"includeStatistics": "true" if include_statistics else "false"} - res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read")) - return Stream._load(res.json()) - - async def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None: - """`Delete streams `_ (POST). - - The API accepts **exactly one** stream per request. Deletion is soft-delete and retains - capacity for an extended period; prefer deleting only when necessary. - """ - if len(items) != 1: - raise ValueError("ILA delete stream accepts exactly one item; see API documentation.") - await self._post( - f"{self._RESOURCE_PATH}/delete", - json={"items": [_dump_delete_item(i) for i in items]}, - semaphore=self._get_semaphore("write"), - ) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index ae5f7675f5..1b5fcab980 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -25,7 +25,6 @@ from cognite.client._api.relationships import RelationshipsAPI from cognite.client._api.sequences import SequencesAPI from cognite.client._api.simulators import SimulatorsAPI -from cognite.client._api.streams import StreamsAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.time_series import TimeSeriesAPI from cognite.client._api.transformations import TransformationsAPI @@ -93,7 +92,6 @@ def __init__(self, config: ClientConfig | None = None) -> None: self.workflows = WorkflowAPI(self._config, self._API_VERSION, self) self.units = UnitAPI(self._config, self._API_VERSION, self) self.simulators = SimulatorsAPI(self._config, self._API_VERSION, self) - self.streams = StreamsAPI(self._config, self._API_VERSION, self) # APIs just using base_url: self._api_client = APIClient(self._config, api_version=None, cognite_client=self) diff --git a/cognite/client/_sync_api/data_modeling/__init__.py b/cognite/client/_sync_api/data_modeling/__init__.py index 01fb3ea184..23af998dd0 100644 --- a/cognite/client/_sync_api/data_modeling/__init__.py +++ b/cognite/client/_sync_api/data_modeling/__init__.py @@ -1,26 +1,35 @@ """ =============================================================================== -c76b2b9351d2a5eee6a710fa9893bfa4 +584030bc5e2a4b8168f54c101f7f521d This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ from __future__ import annotations -from typing import TYPE_CHECKING +import asyncio +from collections.abc import Coroutine, Iterator +from typing import TYPE_CHECKING, Any, Literal, overload from cognite.client import AsyncCogniteClient +from cognite.client._api_client import APIClient from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI +from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.utils._async_helpers import SyncIterator, run_sync +from cognite.client.utils._concurrency import _get_event_loop_executor if TYPE_CHECKING: + import pandas as pd + from cognite.client import AsyncCogniteClient +from cognite.client.config import ClientConfig class SyncDataModelingAPI(SyncAPIClient): @@ -35,3 +44,4 @@ def __init__(self, async_client: AsyncCogniteClient) -> None: self.instances = SyncInstancesAPI(async_client) self.graphql = SyncDataModelingGraphQLAPI(async_client) self.statistics = SyncStatisticsAPI(async_client) + self.streams = SyncStreamsAPI(async_client) diff --git a/cognite/client/_sync_api/streams/records.py b/cognite/client/_sync_api/data_modeling/records.py similarity index 68% rename from cognite/client/_sync_api/streams/records.py rename to cognite/client/_sync_api/data_modeling/records.py index a9c7a94c46..4d904e3929 100644 --- a/cognite/client/_sync_api/streams/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -1,6 +1,6 @@ """ =============================================================================== -a903430692f15dfd71a9a3334ad18209 +4d6222c6b1392e5a4e25754a02837f8d This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ @@ -12,7 +12,7 @@ from cognite.client import AsyncCogniteClient from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.data_classes.streams.stream_record import ( +from cognite.client.data_classes.data_modeling.streams import ( RecordsAggregateResponse, RecordsDeleteResponse, RecordsFilterResponse, @@ -35,37 +35,51 @@ def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngest """ `Ingest records `_ into a stream. """ - return run_sync(self.__async_client.streams.records.ingest(stream_external_id=stream_external_id, body=body)) + return run_sync( + self.__async_client.data_modeling.streams.records.ingest(stream_external_id=stream_external_id, body=body) + ) def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse: """ `Upsert records `_ in a mutable stream. """ - return run_sync(self.__async_client.streams.records.upsert(stream_external_id=stream_external_id, body=body)) + return run_sync( + self.__async_client.data_modeling.streams.records.upsert(stream_external_id=stream_external_id, body=body) + ) def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse: """ `Delete records `_ from a mutable stream. """ - return run_sync(self.__async_client.streams.records.delete(stream_external_id=stream_external_id, body=body)) + return run_sync( + self.__async_client.data_modeling.streams.records.delete(stream_external_id=stream_external_id, body=body) + ) def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse: """ `Filter records `_. """ - return run_sync(self.__async_client.streams.records.filter(stream_external_id=stream_external_id, body=body)) + return run_sync( + self.__async_client.data_modeling.streams.records.filter(stream_external_id=stream_external_id, body=body) + ) def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse: """ `Aggregate over records `_. """ - return run_sync(self.__async_client.streams.records.aggregate(stream_external_id=stream_external_id, body=body)) + return run_sync( + self.__async_client.data_modeling.streams.records.aggregate( + stream_external_id=stream_external_id, body=body + ) + ) def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse: """ `Sync records `_ (cursor-based read). """ - return run_sync(self.__async_client.streams.records.sync(stream_external_id=stream_external_id, body=body)) + return run_sync( + self.__async_client.data_modeling.streams.records.sync(stream_external_id=stream_external_id, body=body) + ) def ingest_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse: """ @@ -75,7 +89,9 @@ def ingest_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``. """ return run_sync( - self.__async_client.streams.records.ingest_items(stream_external_id=stream_external_id, items=items) + self.__async_client.data_modeling.streams.records.ingest_items( + stream_external_id=stream_external_id, items=items + ) ) def upsert_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse: @@ -83,7 +99,9 @@ def upsert_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`. """ return run_sync( - self.__async_client.streams.records.upsert_items(stream_external_id=stream_external_id, items=items) + self.__async_client.data_modeling.streams.records.upsert_items( + stream_external_id=stream_external_id, items=items + ) ) def delete_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsDeleteResponse: @@ -91,5 +109,7 @@ def delete_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`. """ return run_sync( - self.__async_client.streams.records.delete_items(stream_external_id=stream_external_id, items=items) + self.__async_client.data_modeling.streams.records.delete_items( + stream_external_id=stream_external_id, items=items + ) ) diff --git a/cognite/client/_sync_api/data_modeling/streams.py b/cognite/client/_sync_api/data_modeling/streams.py new file mode 100644 index 0000000000..0a41a1e1fd --- /dev/null +++ b/cognite/client/_sync_api/data_modeling/streams.py @@ -0,0 +1,90 @@ +""" +=============================================================================== +aaa116549e68e1e2a86c5f8ba837ca51 +This file is auto-generated from the Async API modules, - do not edit manually! +=============================================================================== +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client import AsyncCogniteClient +from cognite.client._sync_api.data_modeling.records import SyncStreamsRecordsAPI +from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite +from cognite.client.utils._async_helpers import run_sync +from cognite.client.utils.useful_types import SequenceNotStr + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + + +class SyncStreamsAPI(SyncAPIClient): + """Auto-generated, do not modify manually.""" + + def __init__(self, async_client: AsyncCogniteClient) -> None: + self.__async_client = async_client + self.records = SyncStreamsRecordsAPI(async_client) + + @overload + def create(self, items: StreamWrite) -> Stream: ... + + @overload + def create(self, items: Sequence[StreamWrite]) -> StreamList: ... + + def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """ + `Create streams `_. + + Args: + items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. + + Returns: + Stream | StreamList: The created stream or streams. + """ + return run_sync(self.__async_client.data_modeling.streams.create(items=items)) + + def list(self) -> StreamList: + """ + `List streams `_ in the project. + + Note: + There is no paging limit parameter: the endpoint returns all streams in the project + (projects are expected to have few streams). + + Returns: + StreamList: The streams in the project. + """ + return run_sync(self.__async_client.data_modeling.streams.list()) + + def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: + """ + `Retrieve a stream `_. + + Args: + stream_external_id (str): Stream external id. + include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing + statistics can be expensive. + + Returns: + Stream: The stream metadata (and optionally statistics). + """ + return run_sync( + self.__async_client.data_modeling.streams.retrieve( + stream_external_id=stream_external_id, include_statistics=include_statistics + ) + ) + + def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """ + `Delete streams `_. + + Deletion is a soft delete that retains capacity for an extended period; prefer deleting only + when necessary. + + Args: + external_id (str | SequenceNotStr[str]): External ID or list of external IDs. + """ + return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id)) diff --git a/cognite/client/_sync_api/streams/__init__.py b/cognite/client/_sync_api/streams/__init__.py deleted file mode 100644 index 5b8447fd22..0000000000 --- a/cognite/client/_sync_api/streams/__init__.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -=============================================================================== -f780e4edf88535a6df4726a0958d12a7 -This file is auto-generated from the Async API modules, - do not edit manually! -=============================================================================== -""" - -from __future__ import annotations - -from collections.abc import MutableSequence, Sequence -from typing import Any - -from cognite.client import AsyncCogniteClient -from cognite.client._sync_api.streams.records import SyncStreamsRecordsAPI -from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.data_classes.streams.stream import Stream, StreamDeleteItem, StreamList, StreamWrite -from cognite.client.utils._async_helpers import run_sync - - -class SyncStreamsAPI(SyncAPIClient): - """Auto-generated, do not modify manually.""" - - def __init__(self, async_client: AsyncCogniteClient) -> None: - self.__async_client = async_client - self.records = SyncStreamsRecordsAPI(async_client) - - def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList: - """ - `Create streams `_. - - The API accepts **exactly one** stream per request. Pass a single-element sequence. - Stream creation is rate-limited; avoid issuing many create calls in a tight loop. - """ - return run_sync(self.__async_client.streams.create(items=items)) - - def list(self) -> StreamList: - """ - `List streams `_ in the project. - - There is no paging limit parameter: the endpoint returns all streams in the project - (projects are expected to have few streams). - """ - return run_sync(self.__async_client.streams.list()) - - def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: - """ - `Retrieve a stream `_. - - Args: - stream_external_id (str): Stream external id. - include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing - statistics can be expensive; the list endpoint does not offer this flag for that reason. - - Returns: - Stream: The stream metadata (and optionally statistics). - """ - return run_sync( - self.__async_client.streams.retrieve( - stream_external_id=stream_external_id, include_statistics=include_statistics - ) - ) - - def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None: - """ - `Delete streams `_ (POST). - - The API accepts **exactly one** stream per request. Deletion is soft-delete and retains - capacity for an extended period; prefer deleting only when necessary. - """ - return run_sync(self.__async_client.streams.delete(items=items)) diff --git a/cognite/client/_sync_cognite_client.py b/cognite/client/_sync_cognite_client.py index 360e5bb99e..dafe24455a 100644 --- a/cognite/client/_sync_cognite_client.py +++ b/cognite/client/_sync_cognite_client.py @@ -33,7 +33,6 @@ from cognite.client._sync_api.relationships import SyncRelationshipsAPI from cognite.client._sync_api.sequences import SyncSequencesAPI from cognite.client._sync_api.simulators import SyncSimulatorsAPI -from cognite.client._sync_api.streams import SyncStreamsAPI from cognite.client._sync_api.three_d import Sync3DAPI from cognite.client._sync_api.time_series import SyncTimeSeriesAPI from cognite.client._sync_api.transformations import SyncTransformationsAPI @@ -86,7 +85,6 @@ def __init__(self, config: ClientConfig | None = None) -> None: self.relationships = SyncRelationshipsAPI(async_client) self.sequences = SyncSequencesAPI(async_client) self.simulators = SyncSimulatorsAPI(async_client) - self.streams = SyncStreamsAPI(async_client) self.three_d = Sync3DAPI(async_client) self.time_series = SyncTimeSeriesAPI(async_client) self.transformations = SyncTransformationsAPI(async_client) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index daf9abb86e..4e44106c58 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -114,6 +114,21 @@ UnionAll, ) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList +from cognite.client.data_classes.data_modeling.streams import ( + Record, + RecordList, + RecordsAggregateResponse, + RecordsDeleteResponse, + RecordsFilterResponse, + RecordsIngestResponse, + RecordsSyncResponse, + Stream, + StreamList, + StreamTemplateWriteSettings, + StreamWrite, + SyncRecord, + SyncRecordList, +) from cognite.client.data_classes.data_modeling.sync import SubscriptionContext from cognite.client.data_classes.data_modeling.views import ( ConnectionDefinition, @@ -221,6 +236,13 @@ "PropertyType", "Query", "QueryResult", + "Record", + "RecordList", + "RecordsAggregateResponse", + "RecordsDeleteResponse", + "RecordsFilterResponse", + "RecordsIngestResponse", + "RecordsSyncResponse", "RequiresConstraint", "RequiresConstraintApply", "ResultSetExpression", @@ -233,7 +255,13 @@ "SpaceApply", "SpaceApplyList", "SpaceList", + "Stream", + "StreamList", + "StreamTemplateWriteSettings", + "StreamWrite", "SubscriptionContext", + "SyncRecord", + "SyncRecordList", "Text", "TimeSeriesReference", "Timestamp", diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py new file mode 100644 index 0000000000..bc2db32735 --- /dev/null +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -0,0 +1,424 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, + WriteableCogniteResource, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case + + +class StreamLimit(CogniteResource): + """Numeric limit bucket for a stream (provisioned / optionally consumed).""" + + def __init__(self, provisioned: float, consumed: float | None = None) -> None: + self.provisioned = provisioned + self.consumed = consumed + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + provisioned=resource["provisioned"], + consumed=resource.get("consumed"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"provisioned": self.provisioned} + if self.consumed is not None: + out["consumed"] = self.consumed + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLifecycleSettings(CogniteResource): + """Lifecycle metadata for a stream (human-readable).""" + + def __init__( + self, + retained_after_soft_delete: str, + data_deleted_after: str | None = None, + ) -> None: + self.retained_after_soft_delete = retained_after_soft_delete + self.data_deleted_after = data_deleted_after + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + retained_after_soft_delete=resource["retainedAfterSoftDelete"], + data_deleted_after=resource.get("dataDeletedAfter"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"retained_after_soft_delete": self.retained_after_soft_delete} + if self.data_deleted_after is not None: + out["data_deleted_after"] = self.data_deleted_after + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamLimitSettings(CogniteResource): + """Provisioned/consumed limits for a stream.""" + + def __init__( + self, + max_records_total: StreamLimit, + max_giga_bytes_total: StreamLimit, + max_filtering_interval: str | None = None, + ) -> None: + self.max_records_total = max_records_total + self.max_giga_bytes_total = max_giga_bytes_total + self.max_filtering_interval = max_filtering_interval + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + max_records_total=StreamLimit._load(resource["maxRecordsTotal"]), + max_giga_bytes_total=StreamLimit._load(resource["maxGigaBytesTotal"]), + max_filtering_interval=resource.get("maxFilteringInterval"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "max_records_total": self.max_records_total.dump(camel_case=camel_case), + "max_giga_bytes_total": self.max_giga_bytes_total.dump(camel_case=camel_case), + } + if self.max_filtering_interval is not None: + out["max_filtering_interval"] = self.max_filtering_interval + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamSettings(CogniteResource): + """Read model for stream settings (lifecycle + limits).""" + + def __init__(self, lifecycle: StreamLifecycleSettings, limits: StreamLimitSettings) -> None: + self.lifecycle = lifecycle + self.limits = limits + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + lifecycle=StreamLifecycleSettings._load(resource["lifecycle"]), + limits=StreamLimitSettings._load(resource["limits"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "lifecycle": self.lifecycle.dump(camel_case=camel_case), + "limits": self.limits.dump(camel_case=camel_case), + } + + +class Stream(WriteableCogniteResource["StreamWrite"]): + """A stream (``StreamResponseItem``).""" + + def __init__( + self, + external_id: str, + created_time: int, + created_from_template: str, + type: str, + settings: StreamSettings, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.created_from_template = created_from_template + self.type = type + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + created_time=resource["createdTime"], + created_from_template=resource["createdFromTemplate"], + type=resource["type"], + settings=StreamSettings._load(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "external_id": self.external_id, + "created_time": self.created_time, + "created_from_template": self.created_from_template, + "type": self.type, + "settings": self.settings.dump(camel_case=camel_case), + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + def as_write(self) -> StreamWrite: + """Returns a write version.""" + return StreamWrite( + external_id=self.external_id, + settings=StreamTemplateWriteSettings(template=StreamTemplate(name=self.created_from_template)), + ) + + +class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): + """List of streams (``StreamResponse.items``).""" + + _RESOURCE = Stream + + +class StreamTemplate(CogniteResource): + """Reference to an stream template (``StreamRequestItem.settings.template``).""" + + def __init__(self, name: str, version: str | None = None) -> None: + self.name = name + self.version = version + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(name=resource["name"], version=resource.get("version")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"name": self.name} + if self.version is not None: + out["version"] = self.version + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class StreamTemplateWriteSettings(CogniteResource): + """Write-side settings for creating a stream from a template (``{"template": {...}}``).""" + + def __init__(self, template: StreamTemplate) -> None: + self.template = template + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(template=StreamTemplate._load(resource["template"])) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"template": self.template.dump(camel_case=camel_case)} + + +def _parse_stream_write_settings(raw: dict[str, Any]) -> StreamTemplateWriteSettings | dict[str, Any]: + if set(raw.keys()) == {"template"} and isinstance(raw["template"], dict) and "name" in raw["template"]: + return StreamTemplateWriteSettings._load(raw) + return raw + + +class StreamWrite(WriteableCogniteResource["StreamWrite"]): + """Request item for creating a stream (``StreamRequestItem``).""" + + def __init__( + self, + external_id: str, + settings: StreamTemplateWriteSettings | dict[str, Any], + ) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + external_id=resource["externalId"], + settings=_parse_stream_write_settings(resource["settings"]), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + if isinstance(self.settings, CogniteResource): + settings_dumped = self.settings.dump(camel_case=camel_case) + else: + settings_dumped = self.settings + out = {"external_id": self.external_id, "settings": settings_dumped} + return convert_all_keys_to_camel_case(out) if camel_case else out + + def as_write(self) -> StreamWrite: + return self + + +class Record(CogniteResource): + """A record returned from filter (ILA ``Record``).""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, Any], + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties", {}), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "properties": self.properties, + } + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordList(CogniteResourceList[Record], ExternalIDTransformerMixin): + _RESOURCE = Record + + +class SyncRecord(CogniteResource): + """Record entry from sync (ILA ``SyncRecord``).""" + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: str, + properties: dict[str, Any] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.status = status + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "space": self.space, + "external_id": self.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "status": self.status, + } + if self.properties is not None: + out["properties"] = self.properties + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class SyncRecordList(CogniteResourceList[SyncRecord], ExternalIDTransformerMixin): + _RESOURCE = SyncRecord + + +class RecordsFilterResponse(CogniteResource): + """``POST .../records/filter`` response.""" + + def __init__(self, items: RecordList, typing: dict[str, Any] | None = None) -> None: + self.items = items + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + items = RecordList._load(resource.get("items", [])) + return cls(items=items, typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"items": self.items.dump(camel_case=camel_case)} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsSyncResponse(CogniteResource): + """``POST .../records/sync`` response.""" + + def __init__( + self, + items: SyncRecordList, + next_cursor: str, + has_next: bool, + typing: dict[str, Any] | None = None, + ) -> None: + self.items = items + self.next_cursor = next_cursor + self.has_next = has_next + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + items = SyncRecordList._load(resource.get("items", [])) + return cls( + items=items, + next_cursor=resource["nextCursor"], + has_next=resource["hasNext"], + typing=resource.get("typing"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "items": self.items.dump(camel_case=camel_case), + "next_cursor": self.next_cursor, + "has_next": self.has_next, + } + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsAggregateResponse(CogniteResource): + """``POST .../records/aggregate`` response.""" + + def __init__(self, aggregates: dict[str, Any], typing: dict[str, Any] | None = None) -> None: + self.aggregates = aggregates + self.typing = typing + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(aggregates=resource.get("aggregates", {}), typing=resource.get("typing")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {"aggregates": self.aggregates} + if self.typing is not None: + out["typing"] = self.typing + return convert_all_keys_to_camel_case(out) if camel_case else out + + +class RecordsDeleteResponse(CogniteResource): + """``POST .../records/delete`` — empty object means full success.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + +class RecordsIngestResponse(CogniteResource): + """``POST .../records`` (ingest/upsert) JSON body — often ``{}`` on success.""" + + def __init__(self, data: dict[str, Any]) -> None: + self._data = data + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(data=resource) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_to_camel_case(self._data) if camel_case else self._data + + @property + def is_empty_success(self) -> bool: + return self._data == {} From 4e4b9755e3f2334246f58f89c5cff1d739514bd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 23 Apr 2026 10:06:43 +0200 Subject: [PATCH 10/12] refactor(streams): clean up docstrings and remove StreamTemplate.version Remove parenthetical type references and descriptive asides from docstrings: - Removed "(human-readable)", "(\`\`Type\`\`)" style comments - Simplified endpoint descriptions to just describe the response Removed StreamTemplate.version field as it's not in the API specification. The field was optional and only stored, never used. Co-Authored-By: Claude Haiku 4.5 --- .../data_classes/data_modeling/streams.py | 33 +++++++++---------- cognite/client/data_classes/streams/stream.py | 8 ++--- .../data_classes/streams/stream_record.py | 14 ++++---- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py index bc2db32735..97ad0cb0c1 100644 --- a/cognite/client/data_classes/data_modeling/streams.py +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -35,7 +35,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class StreamLifecycleSettings(CogniteResource): - """Lifecycle metadata for a stream (human-readable).""" + """Lifecycle metadata for a stream.""" def __init__( self, @@ -112,7 +112,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class Stream(WriteableCogniteResource["StreamWrite"]): - """A stream (``StreamResponseItem``).""" + """A stream.""" def __init__( self, @@ -157,31 +157,28 @@ def as_write(self) -> StreamWrite: class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): - """List of streams (``StreamResponse.items``).""" + """List of streams.""" _RESOURCE = Stream class StreamTemplate(CogniteResource): - """Reference to an stream template (``StreamRequestItem.settings.template``).""" + """Reference to a stream template.""" - def __init__(self, name: str, version: str | None = None) -> None: + def __init__(self, name: str) -> None: self.name = name - self.version = version @classmethod def _load(cls, resource: dict[str, Any]) -> Self: - return cls(name=resource["name"], version=resource.get("version")) + return cls(name=resource["name"]) def dump(self, camel_case: bool = True) -> dict[str, Any]: out: dict[str, Any] = {"name": self.name} - if self.version is not None: - out["version"] = self.version return convert_all_keys_to_camel_case(out) if camel_case else out class StreamTemplateWriteSettings(CogniteResource): - """Write-side settings for creating a stream from a template (``{"template": {...}}``).""" + """Write-side settings for creating a stream from a template.""" def __init__(self, template: StreamTemplate) -> None: self.template = template @@ -201,7 +198,7 @@ def _parse_stream_write_settings(raw: dict[str, Any]) -> StreamTemplateWriteSett class StreamWrite(WriteableCogniteResource["StreamWrite"]): - """Request item for creating a stream (``StreamRequestItem``).""" + """Request item for creating a stream.""" def __init__( self, @@ -231,7 +228,7 @@ def as_write(self) -> StreamWrite: class Record(CogniteResource): - """A record returned from filter (ILA ``Record``).""" + """A record returned from filter.""" def __init__( self, @@ -273,7 +270,7 @@ class RecordList(CogniteResourceList[Record], ExternalIDTransformerMixin): class SyncRecord(CogniteResource): - """Record entry from sync (ILA ``SyncRecord``).""" + """Record entry from sync.""" def __init__( self, @@ -320,7 +317,7 @@ class SyncRecordList(CogniteResourceList[SyncRecord], ExternalIDTransformerMixin class RecordsFilterResponse(CogniteResource): - """``POST .../records/filter`` response.""" + """Records filter response.""" def __init__(self, items: RecordList, typing: dict[str, Any] | None = None) -> None: self.items = items @@ -339,7 +336,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class RecordsSyncResponse(CogniteResource): - """``POST .../records/sync`` response.""" + """Records sync response.""" def __init__( self, @@ -375,7 +372,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class RecordsAggregateResponse(CogniteResource): - """``POST .../records/aggregate`` response.""" + """Records aggregate response.""" def __init__(self, aggregates: dict[str, Any], typing: dict[str, Any] | None = None) -> None: self.aggregates = aggregates @@ -393,7 +390,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class RecordsDeleteResponse(CogniteResource): - """``POST .../records/delete`` — empty object means full success.""" + """Records delete response.""" def __init__(self, data: dict[str, Any]) -> None: self._data = data @@ -407,7 +404,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class RecordsIngestResponse(CogniteResource): - """``POST .../records`` (ingest/upsert) JSON body — often ``{}`` on success.""" + """Records ingest response.""" def __init__(self, data: dict[str, Any]) -> None: self._data = data diff --git a/cognite/client/data_classes/streams/stream.py b/cognite/client/data_classes/streams/stream.py index 58db05b1ec..633b25d507 100644 --- a/cognite/client/data_classes/streams/stream.py +++ b/cognite/client/data_classes/streams/stream.py @@ -34,7 +34,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class StreamLifecycleSettings(CogniteResource): - """Lifecycle metadata for a stream (human-readable).""" + """Lifecycle metadata for a stream.""" def __init__( self, @@ -111,7 +111,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class Stream(CogniteResource): - """A stream (ILA ``StreamResponseItem``).""" + """A stream.""" def __init__( self, @@ -149,13 +149,13 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class StreamList(CogniteResourceList[Stream], ExternalIDTransformerMixin): - """List of streams (``StreamResponse.items``).""" + """List of streams.""" _RESOURCE = Stream class StreamWrite(CogniteResource): - """Request item for creating a stream (``StreamRequestItem``).""" + """Request item for creating a stream.""" def __init__(self, external_id: str, settings: dict[str, Any]) -> None: self.external_id = external_id diff --git a/cognite/client/data_classes/streams/stream_record.py b/cognite/client/data_classes/streams/stream_record.py index 4c75055d48..0c0e01fe1b 100644 --- a/cognite/client/data_classes/streams/stream_record.py +++ b/cognite/client/data_classes/streams/stream_record.py @@ -13,7 +13,7 @@ class Record(CogniteResource): - """A record returned from filter (ILA ``Record``).""" + """A record returned from filter.""" def __init__( self, @@ -55,7 +55,7 @@ class RecordList(CogniteResourceList[Record], ExternalIDTransformerMixin): class SyncRecord(CogniteResource): - """Record entry from sync (ILA ``SyncRecord``).""" + """Record entry from sync.""" def __init__( self, @@ -102,7 +102,7 @@ class SyncRecordList(CogniteResourceList[SyncRecord], ExternalIDTransformerMixin class RecordsFilterResponse(CogniteResource): - """``POST .../records/filter`` response.""" + """Records filter response.""" def __init__(self, items: RecordList, typing: dict[str, Any] | None = None) -> None: self.items = items @@ -121,7 +121,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class RecordsSyncResponse(CogniteResource): - """``POST .../records/sync`` response.""" + """Records sync response.""" def __init__( self, @@ -157,7 +157,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class RecordsAggregateResponse(CogniteResource): - """``POST .../records/aggregate`` response.""" + """Records aggregate response.""" def __init__(self, aggregates: dict[str, Any], typing: dict[str, Any] | None = None) -> None: self.aggregates = aggregates @@ -175,7 +175,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class RecordsDeleteResponse(CogniteResource): - """``POST .../records/delete`` — empty object means full success.""" + """Records delete response.""" def __init__(self, data: dict[str, Any]) -> None: self._data = data @@ -189,7 +189,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class RecordsIngestResponse(CogniteResource): - """``POST .../records`` (ingest/upsert) JSON body — often ``{}`` on success.""" + """Records ingest response.""" def __init__(self, data: dict[str, Any]) -> None: self._data = data From 258041d13b370abf9c4bfeca53333e8c144aeaef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 23 Apr 2026 10:15:33 +0200 Subject: [PATCH 11/12] fix: update imports and clean up auto-generated files after streams API move - Update testing.py to import StreamsAPI from new data_modeling location - Fix unused imports in auto-generated sync API init file - Update sync_api_template to remove unused imports - Fix testing.py to import from new data_modeling.records location Co-Authored-By: Claude Haiku 4.5 --- cognite/client/_api/data_modeling/streams.py | 2 +- cognite/client/_sync_api/data_modeling/__init__.py | 13 ------------- cognite/client/_sync_api/data_modeling/streams.py | 2 +- cognite/client/testing.py | 8 ++++---- scripts/sync_client_codegen/sync_api_template.txt | 7 ++----- 5 files changed, 8 insertions(+), 24 deletions(-) diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py index 317b27240a..a1b19fe7f4 100644 --- a/cognite/client/_api/data_modeling/streams.py +++ b/cognite/client/_api/data_modeling/streams.py @@ -3,8 +3,8 @@ from collections.abc import Sequence from typing import TYPE_CHECKING, overload -from cognite.client._api_client import APIClient from cognite.client._api.data_modeling.records import StreamsRecordsAPI +from cognite.client._api_client import APIClient from cognite.client.data_classes.data_modeling.streams import ( Stream, StreamList, diff --git a/cognite/client/_sync_api/data_modeling/__init__.py b/cognite/client/_sync_api/data_modeling/__init__.py index 23af998dd0..f0cbe9e923 100644 --- a/cognite/client/_sync_api/data_modeling/__init__.py +++ b/cognite/client/_sync_api/data_modeling/__init__.py @@ -7,12 +7,7 @@ from __future__ import annotations -import asyncio -from collections.abc import Coroutine, Iterator -from typing import TYPE_CHECKING, Any, Literal, overload - from cognite.client import AsyncCogniteClient -from cognite.client._api_client import APIClient from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI @@ -22,14 +17,6 @@ from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.utils._async_helpers import SyncIterator, run_sync -from cognite.client.utils._concurrency import _get_event_loop_executor - -if TYPE_CHECKING: - import pandas as pd - - from cognite.client import AsyncCogniteClient -from cognite.client.config import ClientConfig class SyncDataModelingAPI(SyncAPIClient): diff --git a/cognite/client/_sync_api/data_modeling/streams.py b/cognite/client/_sync_api/data_modeling/streams.py index 0a41a1e1fd..65ba3ce9f2 100644 --- a/cognite/client/_sync_api/data_modeling/streams.py +++ b/cognite/client/_sync_api/data_modeling/streams.py @@ -1,6 +1,6 @@ """ =============================================================================== -aaa116549e68e1e2a86c5f8ba837ca51 +60ce0afb3ca022c6b4fc479885bc5668 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 8e6aa2a462..949321f96b 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -17,9 +17,11 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI +from cognite.client._api.data_modeling.records import StreamsRecordsAPI from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.data_sets import DataSetsAPI from cognite.client._api.datapoints import DatapointsAPI @@ -68,8 +70,6 @@ from cognite.client._api.simulators.routine_revisions import SimulatorRoutineRevisionsAPI from cognite.client._api.simulators.routines import SimulatorRoutinesAPI from cognite.client._api.simulators.runs import SimulatorRunsAPI -from cognite.client._api.streams import StreamsAPI -from cognite.client._api.streams.records import StreamsRecordsAPI from cognite.client._api.synthetic_time_series import SyntheticDatapointsAPI from cognite.client._api.three_d import ThreeDAPI from cognite.client._api.three_d.asset_mapping import ThreeDAssetMappingAPI @@ -102,9 +102,11 @@ from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI +from cognite.client._sync_api.data_modeling.records import SyncStreamsRecordsAPI from cognite.client._sync_api.data_modeling.space_statistics import SyncSpaceStatisticsAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI +from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api.data_sets import SyncDataSetsAPI from cognite.client._sync_api.datapoints import SyncDatapointsAPI @@ -153,8 +155,6 @@ from cognite.client._sync_api.simulators.routine_revisions import SyncSimulatorRoutineRevisionsAPI from cognite.client._sync_api.simulators.routines import SyncSimulatorRoutinesAPI from cognite.client._sync_api.simulators.runs import SyncSimulatorRunsAPI -from cognite.client._sync_api.streams import SyncStreamsAPI -from cognite.client._sync_api.streams.records import SyncStreamsRecordsAPI from cognite.client._sync_api.synthetic_time_series import SyncSyntheticDatapointsAPI from cognite.client._sync_api.three_d import Sync3DAPI from cognite.client._sync_api.three_d.asset_mapping import Sync3DAssetMappingAPI diff --git a/scripts/sync_client_codegen/sync_api_template.txt b/scripts/sync_client_codegen/sync_api_template.txt index 34e502971a..b3cdad08b1 100644 --- a/scripts/sync_client_codegen/sync_api_template.txt +++ b/scripts/sync_client_codegen/sync_api_template.txt @@ -5,16 +5,13 @@ This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ +from __future__ import annotations + {existing_imports} from cognite.client import AsyncCogniteClient from cognite.client._sync_api_client import SyncAPIClient -from cognite.client.utils._async_helpers import SyncIterator, run_sync -from cognite.client.utils._concurrency import _get_event_loop_executor -from typing import Any, Iterator, TYPE_CHECKING, overload -from collections.abc import Coroutine if TYPE_CHECKING: - import pandas as pd {type_checking_imports} From ab33b4ca68f5453addca4488c35f147fd416c83c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Thu, 23 Apr 2026 10:26:02 +0200 Subject: [PATCH 12/12] test: update test imports to use new data_modeling.streams location - Update test_streams.py and test_streams_records.py to import from cognite.client.data_classes.data_modeling.streams instead of old location - Update client API calls to use client.data_modeling.streams - Fix mypy type: ignore comment for nest_asyncio import Co-Authored-By: Claude Haiku 4.5 --- cognite/client/utils/_concurrency.py | 2 +- tests/tests_unit/test_api/test_streams.py | 16 ++++++------ .../test_api/test_streams_records.py | 26 +++++++++---------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/cognite/client/utils/_concurrency.py b/cognite/client/utils/_concurrency.py index 4434ea1774..4ab2f4b3dd 100644 --- a/cognite/client/utils/_concurrency.py +++ b/cognite/client/utils/_concurrency.py @@ -533,7 +533,7 @@ def _patch_loop_for_jupyter(self, loop: asyncio.AbstractEventLoop | None) -> asy if loop is None: try: - import nest_asyncio # type: ignore [import-not-found] + import nest_asyncio # type: ignore [import-untyped] except ImportError: raise CogniteImportError( module="nest_asyncio", diff --git a/tests/tests_unit/test_api/test_streams.py b/tests/tests_unit/test_api/test_streams.py index ffdf7d9294..04a29331be 100644 --- a/tests/tests_unit/test_api/test_streams.py +++ b/tests/tests_unit/test_api/test_streams.py @@ -6,7 +6,7 @@ from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient, CogniteClient -from cognite.client.data_classes.streams import ( +from cognite.client.data_classes.data_modeling.streams import ( RecordsIngestResponse, StreamList, StreamWrite, @@ -15,7 +15,7 @@ @pytest.fixture def streams_base_url(async_client: AsyncCogniteClient) -> str: - return async_client.streams._base_url_with_base_path + "/streams" + return async_client.data_modeling.streams._base_url_with_base_path + "/streams" class TestStreamsAPI: @@ -43,7 +43,7 @@ def test_list_parses_items( ] } httpx_mock.add_response(method="GET", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) - out = cognite_client.streams.list() + out = cognite_client.data_modeling.streams.list() assert isinstance(out, StreamList) assert out[0].external_id == "st1" @@ -71,7 +71,7 @@ def test_retrieve_include_statistics_query( url=re.compile(re.escape(streams_base_url) + r"/st1\?includeStatistics=true$"), json=sample, ) - cognite_client.streams.retrieve("st1", include_statistics=True) + cognite_client.data_modeling.streams.retrieve("st1", include_statistics=True) def test_create_posts_items( self, @@ -98,7 +98,7 @@ def test_create_posts_items( } httpx_mock.add_response(method="POST", url=re.compile(re.escape(streams_base_url) + r"$"), json=sample) w = StreamWrite("st1", {"template": {"name": "ImmutableTestStream"}}) - cognite_client.streams.create([w]) + cognite_client.data_modeling.streams.create([w]) requests = httpx_mock.get_requests() assert len(requests) == 1 assert requests[0].url.path.endswith("/streams") @@ -107,13 +107,13 @@ def test_create_rejects_multiple_items(self, cognite_client: CogniteClient) -> N a = StreamWrite("a", {"template": {"name": "ImmutableTestStream"}}) b = StreamWrite("b", {"template": {"name": "ImmutableTestStream"}}) with pytest.raises(ValueError, match="exactly one"): - cognite_client.streams.create([a, b]) + cognite_client.data_modeling.streams.create([a, b]) def test_delete_rejects_multiple_items(self, cognite_client: CogniteClient) -> None: from cognite.client.data_classes.streams import StreamDeleteItem with pytest.raises(ValueError, match="exactly one"): - cognite_client.streams.delete([StreamDeleteItem("a"), StreamDeleteItem("b")]) + cognite_client.data_modeling.streams.delete([StreamDeleteItem("a"), StreamDeleteItem("b")]) def test_records_ingest_posts( self, @@ -126,7 +126,7 @@ def test_records_ingest_posts( url=re.compile(re.escape(streams_base_url) + r"/my-stream/records$"), json={}, ) - out = cognite_client.streams.records.ingest("my-stream", {"items": []}) + out = cognite_client.data_modeling.streams.records.ingest("my-stream", {"items": []}) assert isinstance(out, RecordsIngestResponse) requests = httpx_mock.get_requests() assert len(requests) == 1 diff --git a/tests/tests_unit/test_api/test_streams_records.py b/tests/tests_unit/test_api/test_streams_records.py index 535ea63716..2e6fd59fd2 100644 --- a/tests/tests_unit/test_api/test_streams_records.py +++ b/tests/tests_unit/test_api/test_streams_records.py @@ -8,7 +8,7 @@ from pytest_httpx import HTTPXMock from cognite.client import AsyncCogniteClient, CogniteClient -from cognite.client.data_classes.streams import ( +from cognite.client.data_classes.data_modeling.streams import ( RecordsAggregateResponse, RecordsDeleteResponse, RecordsFilterResponse, @@ -19,7 +19,7 @@ @pytest.fixture def streams_base_url(async_client: AsyncCogniteClient) -> str: - return async_client.streams._base_url_with_base_path + "/streams" + return async_client.data_modeling.streams._base_url_with_base_path + "/streams" class TestIngest: @@ -34,7 +34,7 @@ def test_posts_body_and_returns_ingest_response( url=re.compile(re.escape(streams_base_url) + r"/my-stream/records$"), json={}, ) - out = cognite_client.streams.records.ingest("my-stream", {"items": []}) + out = cognite_client.data_modeling.streams.records.ingest("my-stream", {"items": []}) assert isinstance(out, RecordsIngestResponse) @@ -51,13 +51,13 @@ def test_wraps_sequence_as_items_key( json={}, ) row = {"space": "sp", "externalId": "r1", "sources": []} - cognite_client.streams.records.ingest_items("s1", [row]) + cognite_client.data_modeling.streams.records.ingest_items("s1", [row]) req = httpx_mock.get_requests()[0] assert req.url.path.endswith("/streams/s1/records") def test_empty_items_raises(self, cognite_client: CogniteClient) -> None: with pytest.raises(ValueError, match="at least one record"): - cognite_client.streams.records.ingest_items("s1", []) + cognite_client.data_modeling.streams.records.ingest_items("s1", []) class TestUpsertAndUpsertItems: @@ -72,11 +72,11 @@ def test_upsert_posts_to_upsert_path( url=re.compile(re.escape(streams_base_url) + r"/st/records/upsert$"), json={}, ) - cognite_client.streams.records.upsert("st", {"items": [{"x": 1}]}) + cognite_client.data_modeling.streams.records.upsert("st", {"items": [{"x": 1}]}) def test_upsert_items_requires_non_empty(self, cognite_client: CogniteClient) -> None: with pytest.raises(ValueError, match="at least one record"): - cognite_client.streams.records.upsert_items("st", []) + cognite_client.data_modeling.streams.records.upsert_items("st", []) class TestDeleteAndDeleteItems: @@ -91,7 +91,7 @@ def test_delete_posts_body( url=re.compile(re.escape(streams_base_url) + r"/st/records/delete$"), json={}, ) - out = cognite_client.streams.records.delete("st", {"items": [{"space": "sp", "externalId": "a"}]}) + out = cognite_client.data_modeling.streams.records.delete("st", {"items": [{"space": "sp", "externalId": "a"}]}) assert isinstance(out, RecordsDeleteResponse) def test_delete_items_wraps_identifiers( @@ -105,11 +105,11 @@ def test_delete_items_wraps_identifiers( url=re.compile(re.escape(streams_base_url) + r"/st/records/delete$"), json={}, ) - cognite_client.streams.records.delete_items("st", [{"space": "sp", "externalId": "a"}]) + cognite_client.data_modeling.streams.records.delete_items("st", [{"space": "sp", "externalId": "a"}]) def test_delete_items_empty_raises(self, cognite_client: CogniteClient) -> None: with pytest.raises(ValueError, match="at least one"): - cognite_client.streams.records.delete_items("st", []) + cognite_client.data_modeling.streams.records.delete_items("st", []) class TestFilter: @@ -135,7 +135,7 @@ def test_parses_filter_response( url=re.compile(re.escape(streams_base_url) + r"/st/records/filter$"), json=payload, ) - out = cognite_client.streams.records.filter("st", {"filter": {"matchAll": {}}}) + out = cognite_client.data_modeling.streams.records.filter("st", {"filter": {"matchAll": {}}}) assert isinstance(out, RecordsFilterResponse) assert len(out.items) == 1 assert out.items[0].external_id == "r1" @@ -153,7 +153,7 @@ def test_parses_aggregates( url=re.compile(re.escape(streams_base_url) + r"/st/records/aggregate$"), json={"aggregates": {"cnt": 3}, "typing": {}}, ) - out = cognite_client.streams.records.aggregate("st", {"aggregate": []}) + out = cognite_client.data_modeling.streams.records.aggregate("st", {"aggregate": []}) assert isinstance(out, RecordsAggregateResponse) assert out.aggregates == {"cnt": 3} @@ -183,7 +183,7 @@ def test_parses_sync_page( url=re.compile(re.escape(streams_base_url) + r"/st/records/sync$"), json=payload, ) - out = cognite_client.streams.records.sync("st", {"sources": [], "limit": 10}) + out = cognite_client.data_modeling.streams.records.sync("st", {"sources": [], "limit": 10}) assert isinstance(out, RecordsSyncResponse) assert out.next_cursor == "next" assert not out.has_next