-
Notifications
You must be signed in to change notification settings - Fork 37
feat(streams): Streams API #2534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
6829245
3225b62
193403c
b65d523
7efcb0b
cfbca3f
ed6ed65
5a52db6
1e39c1b
b25a668
04f1ec5
71c5d9e
2c50de3
8cee723
ec2959d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from typing import TYPE_CHECKING, overload | ||
|
|
||
| from cognite.client._api_client import APIClient | ||
| from cognite.client.data_classes.data_modeling.streams import ( | ||
| Stream, | ||
| StreamList, | ||
| StreamWrite, | ||
| ) | ||
| from cognite.client.utils._identifier import IdentifierSequence | ||
| from cognite.client.utils._url import interpolate_and_url_encode | ||
| from cognite.client.utils.useful_types import SequenceNotStr | ||
|
|
||
| if TYPE_CHECKING: | ||
| from cognite.client import AsyncCogniteClient | ||
| from cognite.client.config import ClientConfig | ||
|
|
||
|
|
||
| class StreamsAPI(APIClient): | ||
| """Streams API (``/streams``).""" | ||
|
|
||
| _RESOURCE_PATH = "/streams" | ||
|
|
||
| def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: | ||
| super().__init__(config, api_version, cognite_client) | ||
| self._CREATE_LIMIT = 1 | ||
| self._DELETE_LIMIT = 1 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps it's better to add some warning first in case the SDK implementation changes?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| @overload | ||
| async def create(self, items: StreamWrite) -> Stream: ... | ||
|
|
||
| @overload | ||
| async def create(self, items: Sequence[StreamWrite]) -> StreamList: ... | ||
|
|
||
| async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: | ||
| """`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_. | ||
|
|
||
| Args: | ||
| items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. | ||
|
|
||
| Returns: | ||
| Stream | StreamList: The created stream or streams. | ||
|
andersfylling marked this conversation as resolved.
|
||
| """ | ||
| return await self._create_multiple( | ||
| items=items, | ||
| list_cls=StreamList, | ||
| resource_cls=Stream, | ||
| input_resource_cls=StreamWrite, | ||
| ) | ||
|
|
||
| async def list(self) -> StreamList: | ||
| """`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_ in the project. | ||
|
andersfylling marked this conversation as resolved.
|
||
|
|
||
| Note: | ||
| There is no paging limit parameter: the endpoint returns all streams in the project | ||
| (projects are expected to have few streams). | ||
|
|
||
| Returns: | ||
| StreamList: The streams in the project. | ||
|
andersfylling marked this conversation as resolved.
|
||
| """ | ||
| res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read")) | ||
| return StreamList._load(res.json()["items"]) | ||
|
Comment on lines
+63
to
+64
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't support |
||
|
|
||
| async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: | ||
| """`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_. | ||
|
|
||
| Args: | ||
| stream_external_id (str): Stream external id. | ||
| include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing | ||
| statistics can be expensive. | ||
|
andersfylling marked this conversation as resolved.
|
||
|
|
||
| Returns: | ||
| Stream: The stream metadata (and optionally statistics). | ||
|
andersfylling marked this conversation as resolved.
|
||
| """ | ||
| path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id) | ||
| params: dict[str, bool] | None = None | ||
| if include_statistics is not None: | ||
| params = {"includeStatistics": include_statistics} | ||
| res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read")) | ||
| return Stream._load(res.json()) | ||
|
andersfylling marked this conversation as resolved.
|
||
|
|
||
| async def delete(self, external_id: str | SequenceNotStr[str]) -> None: | ||
| """`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_. | ||
|
|
||
| Deletion is a soft delete that retains capacity for an extended period; prefer deleting only | ||
| when necessary. | ||
|
|
||
| Args: | ||
| external_id (str | SequenceNotStr[str]): External ID or list of external IDs. | ||
|
andersfylling marked this conversation as resolved.
|
||
| """ | ||
| await self._delete_multiple( | ||
| identifiers=IdentifierSequence.load(external_ids=external_id), | ||
| wrap_ids=True, | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| """ | ||
| =============================================================================== | ||
| a7401471d6732b6938b33711d64a19b4 | ||
| This file is auto-generated from the Async API modules, - do not edit manually! | ||
| =============================================================================== | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from typing import TYPE_CHECKING, overload | ||
|
|
||
| from cognite.client import AsyncCogniteClient | ||
| from cognite.client._sync_api_client import SyncAPIClient | ||
| from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite | ||
| from cognite.client.utils._async_helpers import run_sync | ||
| from cognite.client.utils.useful_types import SequenceNotStr | ||
|
|
||
| if TYPE_CHECKING: | ||
| from cognite.client import AsyncCogniteClient | ||
|
|
||
|
|
||
| class SyncStreamsAPI(SyncAPIClient): | ||
| """Auto-generated, do not modify manually.""" | ||
|
|
||
| def __init__(self, async_client: AsyncCogniteClient) -> None: | ||
| self.__async_client = async_client | ||
|
|
||
| @overload | ||
| def create(self, items: StreamWrite) -> Stream: ... | ||
|
|
||
| @overload | ||
| def create(self, items: Sequence[StreamWrite]) -> StreamList: ... | ||
|
|
||
| def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: | ||
| """ | ||
| `Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_. | ||
|
|
||
| Args: | ||
| items (StreamWrite | Sequence[StreamWrite]): One or more streams to create. | ||
|
|
||
| Returns: | ||
| Stream | StreamList: The created stream or streams. | ||
| """ | ||
| return run_sync(self.__async_client.data_modeling.streams.create(items=items)) | ||
|
|
||
| def list(self) -> StreamList: | ||
| """ | ||
| `List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_ in the project. | ||
|
|
||
| Note: | ||
| There is no paging limit parameter: the endpoint returns all streams in the project | ||
| (projects are expected to have few streams). | ||
|
|
||
| Returns: | ||
| StreamList: The streams in the project. | ||
| """ | ||
| return run_sync(self.__async_client.data_modeling.streams.list()) | ||
|
|
||
| def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream: | ||
|
andersfylling marked this conversation as resolved.
|
||
| """ | ||
| `Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_. | ||
|
|
||
| Args: | ||
| stream_external_id (str): Stream external id. | ||
| include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing | ||
| statistics can be expensive. | ||
|
|
||
| Returns: | ||
| Stream: The stream metadata (and optionally statistics). | ||
| """ | ||
| return run_sync( | ||
| self.__async_client.data_modeling.streams.retrieve( | ||
| stream_external_id=stream_external_id, include_statistics=include_statistics | ||
| ) | ||
| ) | ||
|
|
||
| def delete(self, external_id: str | SequenceNotStr[str]) -> None: | ||
| """ | ||
| `Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_. | ||
|
|
||
| Deletion is a soft delete that retains capacity for an extended period; prefer deleting only | ||
| when necessary. | ||
|
|
||
| Args: | ||
| external_id (str | SequenceNotStr[str]): External ID or list of external IDs. | ||
| """ | ||
| return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id)) | ||
Uh oh!
There was an error while loading. Please reload this page.