-
Notifications
You must be signed in to change notification settings - Fork 37
feat(records): alpha support for streams and records #2246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
bf7e159
5ee67ff
8867493
e3086e3
a804832
a625536
08ed751
55ab1b5
6ea6ae8
7ebb47a
bf85d13
44708f7
8704f93
74cd61d
46f6623
5760a0b
bcd743f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Sequence | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| from cognite.client._api_client import APIClient | ||
| from cognite.client._constants import DEFAULT_LIMIT_READ | ||
| from cognite.client.data_classes.data_modeling.records import ( | ||
| LastUpdatedRange, | ||
| RecordId, | ||
| RecordIngest, | ||
| RecordList, | ||
| RecordListWithCursor, | ||
| ) | ||
| from cognite.client.data_classes.filters import Filter | ||
| from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations | ||
|
|
||
| if TYPE_CHECKING: | ||
| from cognite.client import CogniteClient | ||
| from cognite.client.config import ClientConfig | ||
|
|
||
|
|
||
| @warn_on_all_method_invocations( | ||
| FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Records API") | ||
| ) | ||
| class RecordsAPI(APIClient): | ||
| _RESOURCE_PATH = "/streams/{}/records" | ||
|
|
||
| def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: | ||
| super().__init__(config, api_version, cognite_client) | ||
| self.__alpha_headers = { | ||
| "cdf-version": "alpha", | ||
| } | ||
|
|
||
| def ingest(self, stream: str, records: Sequence[RecordIngest]) -> None: | ||
| body = {"items": [record.dump(camel_case=True) for record in records]} | ||
| self._post(url_path=self._RESOURCE_PATH.format(stream), json=body, headers=self.__alpha_headers) | ||
|
|
||
| def upsert(self, stream: str, records: Sequence[RecordIngest]) -> None: | ||
| body = {"items": [record.dump(camel_case=True) for record in records]} | ||
| self._post(url_path=self._RESOURCE_PATH.format(stream) + "/upsert", json=body, headers=self.__alpha_headers) | ||
|
|
||
| def delete(self, stream: str, ids: RecordId | Sequence[RecordId]) -> None: | ||
| items = ids if isinstance(ids, Sequence) else [ids] | ||
| body = {"items": [item.dump(camel_case=True) for item in items]} | ||
| self._post(url_path=self._RESOURCE_PATH.format(stream) + "/delete", json=body, headers=self.__alpha_headers) | ||
|
|
||
| def filter( | ||
| self, | ||
| stream: str, | ||
| *, | ||
| last_updated_time: LastUpdatedRange | None = None, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Food for thought. This parameter is required for immutable streams. Should we have a default value for it, or should we maybe require an explicit |
||
| filter: Filter | None = None, | ||
| limit: int | None = DEFAULT_LIMIT_READ, | ||
| ) -> RecordList: | ||
| body: dict[str, Any] = {} | ||
| if last_updated_time is not None: | ||
| body["lastUpdatedTime"] = last_updated_time.dump() | ||
| if filter is not None: | ||
| body["filter"] = filter.dump() | ||
| body["limit"] = limit | ||
| res = self._post( | ||
| url_path=self._RESOURCE_PATH.format(stream) + "/filter", json=body, headers=self.__alpha_headers | ||
| ) | ||
| return RecordList._load(res.json()["items"], cognite_client=self._cognite_client) | ||
|
|
||
| def sync( | ||
| self, | ||
| stream: str, | ||
| *, | ||
| filter: Filter | None = None, | ||
| cursor: str | None = None, | ||
| initialize_cursor: str | None = None, | ||
| limit: int | None = DEFAULT_LIMIT_READ, | ||
| ) -> RecordListWithCursor: | ||
| body: dict = {} | ||
| if filter is not None: | ||
| body["filter"] = filter.dump() | ||
| if cursor is not None: | ||
| body["cursor"] = cursor | ||
| if initialize_cursor is not None: | ||
| body["initializeCursor"] = initialize_cursor | ||
| body["limit"] = limit | ||
| res = self._post(url_path=self._RESOURCE_PATH.format(stream) + "/sync", json=body, headers=self.__alpha_headers) | ||
| payload = res.json() | ||
| items = RecordList._load(payload["items"], cognite_client=self._cognite_client) | ||
| return RecordListWithCursor(list(items), cursor=payload.get("nextCursor"), has_next=payload.get("hasNext")) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Iterator, Sequence | ||
| from typing import TYPE_CHECKING, overload | ||
|
|
||
| from cognite.client._api_client import APIClient | ||
| from cognite.client._constants import DEFAULT_LIMIT_READ | ||
| from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite | ||
| from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations | ||
| from cognite.client.utils._identifier import Identifier | ||
|
|
||
| if TYPE_CHECKING: | ||
| from cognite.client import CogniteClient | ||
| from cognite.client.config import ClientConfig | ||
|
|
||
|
|
||
| @warn_on_all_method_invocations( | ||
| FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Records API") | ||
| ) | ||
| class StreamsAPI(APIClient): | ||
| _RESOURCE_PATH = "/streams" | ||
|
|
||
| def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: | ||
| super().__init__(config, api_version, cognite_client) | ||
| self._CREATE_LIMIT = 1 | ||
| self.__alpha_headers = { | ||
| "cdf-version": "alpha", | ||
| } | ||
|
|
||
| @overload | ||
| def __call__( | ||
| self, | ||
| chunk_size: None = None, | ||
| limit: int | None = None, | ||
| ) -> Iterator[Stream]: ... | ||
|
|
||
| @overload | ||
| def __call__( | ||
| self, | ||
| chunk_size: int, | ||
| limit: int | None = None, | ||
| ) -> Iterator[StreamList]: ... | ||
|
|
||
| def __call__( | ||
| self, | ||
| chunk_size: int | None = None, | ||
| limit: int | None = None, | ||
| ) -> Iterator[Stream] | Iterator[StreamList]: | ||
| """Iterate over streams | ||
|
|
||
| Fetches streams as they are iterated over, so you keep a limited number of streams in memory. | ||
|
|
||
| Args: | ||
| chunk_size (int | None): Number of streams to return in each chunk. Defaults to yielding one stream a time. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't fully understand what this thing does, so maybe my worries are groundless. But streams endpoint will have pretty strict rate/concurrency limits. Why have a chunk size of 1, when by default a project can have no more than 10 streams? Maybe set something like 20 to avoid unnecessary API calls? |
||
| limit (int | None): Maximum number of streams to return. Defaults to returning all items. | ||
|
|
||
| Returns: | ||
| Iterator[Stream] | Iterator[StreamList]: yields Stream one by one if chunk_size is not specified, else StreamList objects. | ||
| """ | ||
| return self._list_generator( | ||
| list_cls=StreamList, | ||
| resource_cls=Stream, | ||
| method="GET", | ||
| chunk_size=chunk_size, | ||
| limit=limit, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: we do not have limit on listStream atm. https://api-docs.cogheim.net/redoc/#tag/Streams/operation/listStream |
||
| headers=self.__alpha_headers, | ||
| ) | ||
|
|
||
| def __iter__(self) -> Iterator[Stream]: | ||
| """Iterate over streams | ||
|
|
||
| Fetches streams as they are iterated over, so you keep a limited number of streams in memory. | ||
|
|
||
| Returns: | ||
| Iterator[Stream]: yields Streams one by one. | ||
| """ | ||
| return self() | ||
|
|
||
| def retrieve(self, external_id: str) -> Stream | None: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about |
||
| """`Retrieve a stream. <https://developer.cognite.com/api#tag/Streams/operation/byStreamIdsStreams>`_ | ||
|
|
||
| Args: | ||
| external_id (str): No description. | ||
|
|
||
| Returns: | ||
| Stream | None: Requested stream or None if it does not exist. | ||
|
|
||
| Examples: | ||
|
|
||
| >>> from cognite.client import CogniteClient | ||
| >>> client = CogniteClient() | ||
| >>> res = client.data_modeling.streams.retrieve(streams='myStream') | ||
|
|
||
| Get multiple streams by id: | ||
|
|
||
| >>> res = client.data_modeling.streams.retrieve(streams=["MyStream", "MyAwesomeStream", "MyOtherStream"]) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is concerning. How many IDs can be passed this way? Our endpoint allows retrieving only 1 stream by ID, so this results in multiple API invocations, right? And stream endpoints have very strict limits: https://cognitedata.atlassian.net/wiki/spaces/RPILA/pages/4797431945/Design+ILA+rate+and+concurrency+limits#Stream-endpoint-limits Invoking this method with >5 IDs basically guarantees throttling. |
||
|
|
||
| """ | ||
| identifier = Identifier.load(external_id=external_id) | ||
| return self._retrieve(identifier=identifier, cls=Stream, headers=self.__alpha_headers) | ||
|
|
||
| def delete(self, external_id: str) -> None: | ||
| """`Delete one or more streams <https://developer.cognite.com/api#tag/Streams/operation/deleteStreamsV3>`_ | ||
|
|
||
| Args: | ||
| external_id (str): ID of streams. | ||
| Examples: | ||
|
|
||
| Delete streams by id: | ||
|
|
||
| >>> from cognite.client import CogniteClient | ||
| >>> client = CogniteClient() | ||
| >>> client.data_modeling.streams.delete(streams=["myStream", "myOtherStream"]) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Food for thought. Streams are intended to be long-lived, and customers should think twice before creating or deleting them. A deleted stream will stay for up to 6 weeks in a soft-deleted state, all this time consuming capacity, incurring costs and preventing another stream with the same name from being created. Should we really allow customers to delete multiple streams with 1 request? |
||
| """ | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: Streams api deviate from cognite api standard on delete (bug has been opened on this), which is generally to do post with items list under , instead we have a issue a DELETE request. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we only accept one item in the list of streams to be deleted and created btw.. |
||
| self._delete(url_path=f"{self._RESOURCE_PATH}/{external_id}", headers=self.__alpha_headers) | ||
|
|
||
| def list(self, limit: int | None = DEFAULT_LIMIT_READ) -> StreamList: | ||
| """`List streams <https://developer.cognite.com/api#tag/Streams/operation/listStreamsV3>`_ | ||
|
|
||
| Args: | ||
| limit (int | None): Maximum number of streams to return. Defaults to 10. Set to -1, float("inf") or None to return all items. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned by Andreas, there is no limit. The endpoint returns all the streams there are, as a project isn't supposed to have many. |
||
|
|
||
| Returns: | ||
| StreamList: List of requested streams | ||
|
|
||
| Examples: | ||
|
|
||
| List streams and filter on max start time: | ||
|
|
||
| >>> from cognite.client import CogniteClient | ||
| >>> client = CogniteClient() | ||
| >>> stream_list = client.data_modeling.streams.list(limit=5) | ||
|
|
||
| Iterate over streams: | ||
|
|
||
| >>> for stream in client.data_modeling.streams: | ||
| ... stream # do something with the stream | ||
|
|
||
| Iterate over chunks of streams to reduce memory load: | ||
|
|
||
| >>> for stream_list in client.data_modeling.streams(chunk_size=2500): | ||
| ... stream_list # do something with the streams | ||
| """ | ||
| return self._list( | ||
| list_cls=StreamList, resource_cls=Stream, method="GET", limit=limit, headers=self.__alpha_headers | ||
| ) | ||
|
|
||
| @overload | ||
| def create(self, streams: Sequence[StreamWrite]) -> StreamList: ... | ||
|
|
||
| @overload | ||
| def create(self, streams: StreamWrite) -> Stream: ... | ||
|
|
||
| def create(self, streams: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Food for thought. Streams are intended to be long-lived, and customers should think twice before creating or deleting them. A deleted stream will stay for up to 6 weeks in a soft-deleted state, all this time consuming capacity, incurring costs and preventing another stream with the same name from being created. Should we really allow customers to create multiple streams with 1 request? Especially considering that the endpoint will have only 1 rps limit, so calling this method with multiple stream names would automatically mean throttling. |
||
| """`Create one or more streams. <https://developer.cognite.com/api#tag/Streams/operation/ApplyStreams>`_ | ||
|
|
||
| Args: | ||
| streams (StreamWrite | Sequence[StreamWrite]): Stream | Sequence[Stream]): Stream or streams of streamsda to create or update. | ||
|
|
||
| Returns: | ||
| Stream | StreamList: Created stream(s) | ||
|
|
||
| Examples: | ||
|
|
||
| Create new streams: | ||
|
|
||
| >>> from cognite.client import CogniteClient | ||
| >>> from cognite.client.data_classes.data_modeling.streams import StreamWrite | ||
| >>> client = CogniteClient() | ||
| >>> streams = [StreamWrite(stream="myStream", description="My first stream", name="My Stream"), | ||
| ... StreamWrite(stream="myOtherStream", description="My second stream", name="My Other Stream")] | ||
| >>> res = client.data_modeling.streams.create(streams) | ||
| """ | ||
| return self._create_multiple( | ||
| list_cls=StreamList, | ||
| resource_cls=Stream, | ||
| items=streams, | ||
| input_resource_cls=StreamWrite, | ||
| headers=self.__alpha_headers, | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about aggregate endpoint?