Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cognite/client/_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cognite.client._api.data_modeling.instances import InstancesAPI
from cognite.client._api.data_modeling.spaces import SpacesAPI
from cognite.client._api.data_modeling.statistics import StatisticsAPI
from cognite.client._api.data_modeling.streams import StreamsAPI
from cognite.client._api.data_modeling.views import ViewsAPI
from cognite.client._api_client import APIClient

Expand All @@ -27,6 +28,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self.instances = InstancesAPI(config, api_version, cognite_client)
self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client)
self.statistics = StatisticsAPI(config, api_version, cognite_client)
self.streams = StreamsAPI(config, api_version, cognite_client)

def _get_semaphore(
self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"]
Expand Down
106 changes: 106 additions & 0 deletions cognite/client/_api/data_modeling/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from __future__ import annotations

from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.streams import (
RecordsAggregateResponse,
RecordsDeleteResponse,
RecordsFilterResponse,
RecordsIngestResponse,
RecordsSyncResponse,
)
from cognite.client.utils._url import interpolate_and_url_encode

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


class StreamsRecordsAPI(APIClient):
"""ILA record operations under ``/streams/{streamId}/records/...``."""

_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
super().__init__(config, api_version, cognite_client)

def _records_base(self, stream_external_id: str) -> str:
return interpolate_and_url_encode("/streams/{}/records", stream_external_id)

async def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""`Ingest records <https://api-docs.cognite.com/20230101/tag/Records/operation/ingestRecords>`_ into a stream."""
res = await self._post(
self._records_base(stream_external_id), json=body, semaphore=self._get_semaphore("write")
)
return RecordsIngestResponse._load(res.json())

async def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""`Upsert records <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_ in a mutable stream."""
res = await self._post(
self._records_base(stream_external_id) + "/upsert", json=body, semaphore=self._get_semaphore("write")
)
return RecordsIngestResponse._load(res.json())

async def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse:
"""`Delete records <https://api-docs.cognite.com/20230101/tag/Records/operation/deleteRecords>`_ from a mutable stream."""
res = await self._post(
self._records_base(stream_external_id) + "/delete", json=body, semaphore=self._get_semaphore("write")
)
return RecordsDeleteResponse._load(res.json())

async def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse:
"""`Filter records <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_."""
res = await self._post(
self._records_base(stream_external_id) + "/filter", json=body, semaphore=self._get_semaphore("read")
)
return RecordsFilterResponse._load(res.json())

async def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse:
"""`Aggregate over records <https://api-docs.cognite.com/20230101/tag/Records/operation/aggregateRecords>`_."""
res = await self._post(
self._records_base(stream_external_id) + "/aggregate", json=body, semaphore=self._get_semaphore("read")
)
return RecordsAggregateResponse._load(res.json())

async def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse:
"""`Sync records <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_ (cursor-based read)."""
res = await self._post(
self._records_base(stream_external_id) + "/sync", json=body, semaphore=self._get_semaphore("read")
)
return RecordsSyncResponse._load(res.json())

async def ingest_items(
self,
stream_external_id: str,
items: Sequence[Mapping[str, Any]],
) -> RecordsIngestResponse:
"""Ingest records using the ``items`` array shape (1-1000 records per request).

Each element must match the API ``recordItems`` object (``space``, ``externalId``, ``sources``, ...).
This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``.
"""
if not items:
raise ValueError("ingest_items requires at least one record (API allows 1-1000 items per call).")
return await self.ingest(stream_external_id, {"items": [dict(i) for i in items]})

async def upsert_items(
self,
stream_external_id: str,
items: Sequence[Mapping[str, Any]],
) -> RecordsIngestResponse:
"""Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`."""
if not items:
raise ValueError("upsert_items requires at least one record (API allows 1-1000 items per call).")
return await self.upsert(stream_external_id, {"items": [dict(i) for i in items]})

async def delete_items(
self,
stream_external_id: str,
items: Sequence[Mapping[str, Any]],
) -> RecordsDeleteResponse:
"""Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`."""
if not items:
raise ValueError("delete_items requires at least one record identifier.")
return await self.delete(stream_external_id, {"items": [dict(i) for i in items]})
98 changes: 98 additions & 0 deletions cognite/client/_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, overload

from cognite.client._api.data_modeling.records import StreamsRecordsAPI
from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.streams import (
Stream,
StreamList,
StreamWrite,
)
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils._url import interpolate_and_url_encode
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


class StreamsAPI(APIClient):
"""Streams API (``/streams``) with nested Records API (``/streams/{id}/records``)."""

_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._CREATE_LIMIT = 1
self._DELETE_LIMIT = 1
self.records = StreamsRecordsAPI(config, api_version, cognite_client)

@overload
async def create(self, items: StreamWrite) -> Stream: ...

@overload
async def create(self, items: Sequence[StreamWrite]) -> StreamList: ...

async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList:
"""`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_.

Args:
items (StreamWrite | Sequence[StreamWrite]): One or more streams to create.

Returns:
Stream | StreamList: The created stream or streams.
"""
return await self._create_multiple(
items=items,
list_cls=StreamList,
resource_cls=Stream,
input_resource_cls=StreamWrite,
)

async def list(self) -> StreamList:
"""`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_ in the project.

Note:
There is no paging limit parameter: the endpoint returns all streams in the project
(projects are expected to have few streams).

Returns:
StreamList: The streams in the project.
"""
res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read"))
return StreamList._load(res.json()["items"])

async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream:
"""`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_.

Args:
stream_external_id (str): Stream external id.
include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing
statistics can be expensive.

Returns:
Stream: The stream metadata (and optionally statistics).
"""
path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id)
params: dict[str, bool] | None = None
if include_statistics is not None:
params = {"includeStatistics": include_statistics}
res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read"))
return Stream._load(res.json())

async def delete(self, external_id: str | SequenceNotStr[str]) -> None:
"""`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_.

Deletion is a soft delete that retains capacity for an extended period; prefer deleting only
when necessary.

Args:
external_id (str | SequenceNotStr[str]): External ID or list of external IDs.
"""
await self._delete_multiple(
identifiers=IdentifierSequence.load(external_ids=external_id),
wrap_ids=True,
)
9 changes: 3 additions & 6 deletions cognite/client/_sync_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
"""
===============================================================================
c76b2b9351d2a5eee6a710fa9893bfa4
584030bc5e2a4b8168f54c101f7f521d
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI
from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI
from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI
from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI
from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI
from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI
from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI
from cognite.client._sync_api.data_modeling.views import SyncViewsAPI
from cognite.client._sync_api_client import SyncAPIClient

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient


class SyncDataModelingAPI(SyncAPIClient):
"""Auto-generated, do not modify manually."""
Expand All @@ -35,3 +31,4 @@ def __init__(self, async_client: AsyncCogniteClient) -> None:
self.instances = SyncInstancesAPI(async_client)
self.graphql = SyncDataModelingGraphQLAPI(async_client)
self.statistics = SyncStatisticsAPI(async_client)
self.streams = SyncStreamsAPI(async_client)
115 changes: 115 additions & 0 deletions cognite/client/_sync_api/data_modeling/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
===============================================================================
4d6222c6b1392e5a4e25754a02837f8d
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api_client import SyncAPIClient
from cognite.client.data_classes.data_modeling.streams import (
RecordsAggregateResponse,
RecordsDeleteResponse,
RecordsFilterResponse,
RecordsIngestResponse,
RecordsSyncResponse,
)
from cognite.client.utils._async_helpers import run_sync

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient


class SyncStreamsRecordsAPI(SyncAPIClient):
"""Auto-generated, do not modify manually."""

def __init__(self, async_client: AsyncCogniteClient) -> None:
self.__async_client = async_client

def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""
`Ingest records <https://api-docs.cognite.com/20230101/tag/Records/operation/ingestRecords>`_ into a stream.
"""
return run_sync(
self.__async_client.data_modeling.streams.records.ingest(stream_external_id=stream_external_id, body=body)
)

def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""
`Upsert records <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_ in a mutable stream.
"""
return run_sync(
self.__async_client.data_modeling.streams.records.upsert(stream_external_id=stream_external_id, body=body)
)

def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse:
"""
`Delete records <https://api-docs.cognite.com/20230101/tag/Records/operation/deleteRecords>`_ from a mutable stream.
"""
return run_sync(
self.__async_client.data_modeling.streams.records.delete(stream_external_id=stream_external_id, body=body)
)

def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse:
"""
`Filter records <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.
"""
return run_sync(
self.__async_client.data_modeling.streams.records.filter(stream_external_id=stream_external_id, body=body)
)

def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse:
"""
`Aggregate over records <https://api-docs.cognite.com/20230101/tag/Records/operation/aggregateRecords>`_.
"""
return run_sync(
self.__async_client.data_modeling.streams.records.aggregate(
stream_external_id=stream_external_id, body=body
)
)

def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse:
"""
`Sync records <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_ (cursor-based read).
"""
return run_sync(
self.__async_client.data_modeling.streams.records.sync(stream_external_id=stream_external_id, body=body)
)

def ingest_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse:
"""
Ingest records using the ``items`` array shape (1-1000 records per request).

Each element must match the API ``recordItems`` object (``space``, ``externalId``, ``sources``, ...).
This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``.
"""
return run_sync(
self.__async_client.data_modeling.streams.records.ingest_items(
stream_external_id=stream_external_id, items=items
)
)

def upsert_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse:
"""
Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`.
"""
return run_sync(
self.__async_client.data_modeling.streams.records.upsert_items(
stream_external_id=stream_external_id, items=items
)
)

def delete_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsDeleteResponse:
"""
Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`.
"""
return run_sync(
self.__async_client.data_modeling.streams.records.delete_items(
stream_external_id=stream_external_id, items=items
)
)
Loading
Loading