Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ As of 2025-08-29, changes are grouped as follows
- Added missing parameter `description` to `DatapointSubscriptionUpdate` object such that it can be updated
in the `client.time_series.subscriptions.update(...)` method.


## [7.81.0] - 2025-08-14
### Added
- [alpha] Alpha support for Streams & Records APIs. Note that both the SDK and API implementation may be changed at any time.

## [7.80.1] - 2025-08-14
### Fixed
- Make CogniteAPIError.response_code non-nullable again, addressing a regression introduced in the previous version.
Expand Down
9 changes: 4 additions & 5 deletions cognite/client/_api/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cognite.client._api_client import APIClient
from cognite.client.data_classes.agents import Agent, AgentList, AgentUpsert
from cognite.client.data_classes.agents.chat import AgentChatResponse, Message, MessageList
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils.useful_types import SequenceNotStr

Expand All @@ -15,6 +15,9 @@
from cognite.client.config import ClientConfig


@warn_on_all_method_invocations(
FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Agents")
)
class AgentsAPI(APIClient):
_RESOURCE_PATH = "/ai/agents"

Expand Down Expand Up @@ -151,7 +154,6 @@ def upsert(self, agents: AgentUpsert | Sequence[AgentUpsert]) -> Agent | AgentLi


"""
self._warnings.warn()
return self._create_multiple(
list_cls=AgentList,
resource_cls=Agent,
Expand Down Expand Up @@ -189,7 +191,6 @@ def retrieve(

>>> res = client.agents.retrieve(external_ids=["my_agent_1", "my_agent_2"])
"""
self._warnings.warn()
identifiers = IdentifierSequence.load(external_ids=external_ids)
return self._retrieve_multiple(
list_cls=AgentList,
Expand All @@ -214,7 +215,6 @@ def delete(self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bo
>>> client.agents.delete(external_ids="my_agent")

"""
self._warnings.warn()
self._delete_multiple(
identifiers=IdentifierSequence.load(external_ids=external_ids),
wrap_ids=True,
Expand All @@ -236,7 +236,6 @@ def list(self) -> AgentList: # The API does not yet support limit or pagination
>>> agent_list = client.agents.list()

"""
self._warnings.warn()
res = self._get(url_path=self._RESOURCE_PATH)
return AgentList._load(res.json()["items"], cognite_client=self._cognite_client)

Expand Down
4 changes: 4 additions & 0 deletions cognite/client/_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from cognite.client._api.data_modeling.data_models import DataModelsAPI
from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI
from cognite.client._api.data_modeling.instances import InstancesAPI
from cognite.client._api.data_modeling.records import RecordsAPI
from cognite.client._api.data_modeling.spaces import SpacesAPI
from cognite.client._api.data_modeling.statistics import StatisticsAPI
from cognite.client._api.data_modeling.streams import StreamsAPI
from cognite.client._api.data_modeling.views import ViewsAPI
from cognite.client._api_client import APIClient

Expand All @@ -26,3 +28,5 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self.instances = InstancesAPI(config, api_version, cognite_client)
self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client)
self.statistics = StatisticsAPI(config, api_version, cognite_client)
self.streams = StreamsAPI(config, api_version, cognite_client)
self.records = RecordsAPI(config, api_version, cognite_client)
87 changes: 87 additions & 0 deletions cognite/client/_api/data_modeling/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes.data_modeling.records import (
LastUpdatedRange,
RecordId,
RecordIngest,
RecordList,
RecordListWithCursor,
)
from cognite.client.data_classes.filters import Filter
from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


@warn_on_all_method_invocations(
FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Records API")
)
class RecordsAPI(APIClient):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about aggregate endpoint?

_RESOURCE_PATH = "/streams/{}/records"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self.__alpha_headers = {
"cdf-version": "alpha",
}

def ingest(self, stream: str, records: Sequence[RecordIngest]) -> None:
body = {"items": [record.dump(camel_case=True) for record in records]}
self._post(url_path=self._RESOURCE_PATH.format(stream), json=body, headers=self.__alpha_headers)

def upsert(self, stream: str, records: Sequence[RecordIngest]) -> None:
body = {"items": [record.dump(camel_case=True) for record in records]}
self._post(url_path=self._RESOURCE_PATH.format(stream) + "/upsert", json=body, headers=self.__alpha_headers)

def delete(self, stream: str, ids: RecordId | Sequence[RecordId]) -> None:
items = ids if isinstance(ids, Sequence) else [ids]
body = {"items": [item.dump(camel_case=True) for item in items]}
self._post(url_path=self._RESOURCE_PATH.format(stream) + "/delete", json=body, headers=self.__alpha_headers)

def filter(
self,
stream: str,
*,
last_updated_time: LastUpdatedRange | None = None,
Copy link
Copy Markdown

@asosnovski asosnovski Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought. This parameter is required for immutable streams. Should we have a default value for it, or should we maybe require an explicit None for mutable streams so that SDK users have to think which stream they are querying?

filter: Filter | None = None,
limit: int | None = DEFAULT_LIMIT_READ,
) -> RecordList:
body: dict[str, Any] = {}
if last_updated_time is not None:
body["lastUpdatedTime"] = last_updated_time.dump()
if filter is not None:
body["filter"] = filter.dump()
body["limit"] = limit
res = self._post(
url_path=self._RESOURCE_PATH.format(stream) + "/filter", json=body, headers=self.__alpha_headers
)
return RecordList._load(res.json()["items"], cognite_client=self._cognite_client)

def sync(
self,
stream: str,
*,
filter: Filter | None = None,
cursor: str | None = None,
initialize_cursor: str | None = None,
limit: int | None = DEFAULT_LIMIT_READ,
) -> RecordListWithCursor:
body: dict = {}
if filter is not None:
body["filter"] = filter.dump()
if cursor is not None:
body["cursor"] = cursor
if initialize_cursor is not None:
body["initializeCursor"] = initialize_cursor
body["limit"] = limit
res = self._post(url_path=self._RESOURCE_PATH.format(stream) + "/sync", json=body, headers=self.__alpha_headers)
payload = res.json()
items = RecordList._load(payload["items"], cognite_client=self._cognite_client)
return RecordListWithCursor(list(items), cursor=payload.get("nextCursor"), has_next=payload.get("hasNext"))
180 changes: 180 additions & 0 deletions cognite/client/_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from __future__ import annotations

from collections.abc import Iterator, Sequence
from typing import TYPE_CHECKING, overload

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite
from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations
from cognite.client.utils._identifier import Identifier

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


@warn_on_all_method_invocations(
FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Records API")
)
class StreamsAPI(APIClient):
_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._CREATE_LIMIT = 1
self.__alpha_headers = {
"cdf-version": "alpha",
}

@overload
def __call__(
self,
chunk_size: None = None,
limit: int | None = None,
) -> Iterator[Stream]: ...

@overload
def __call__(
self,
chunk_size: int,
limit: int | None = None,
) -> Iterator[StreamList]: ...

def __call__(
self,
chunk_size: int | None = None,
limit: int | None = None,
) -> Iterator[Stream] | Iterator[StreamList]:
"""Iterate over streams

Fetches streams as they are iterated over, so you keep a limited number of streams in memory.

Args:
chunk_size (int | None): Number of streams to return in each chunk. Defaults to yielding one stream a time.
Copy link
Copy Markdown

@asosnovski asosnovski Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand what this thing does, so maybe my worries are groundless. But streams endpoint will have pretty strict rate/concurrency limits. Why have a chunk size of 1, when by default a project can have no more than 10 streams? Maybe set something like 20 to avoid unnecessary API calls?

limit (int | None): Maximum number of streams to return. Defaults to returning all items.

Returns:
Iterator[Stream] | Iterator[StreamList]: yields Stream one by one if chunk_size is not specified, else StreamList objects.
"""
return self._list_generator(
list_cls=StreamList,
resource_cls=Stream,
method="GET",
chunk_size=chunk_size,
limit=limit,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: we do not have limit on listStream atm.

https://api-docs.cogheim.net/redoc/#tag/Streams/operation/listStream

headers=self.__alpha_headers,
)

def __iter__(self) -> Iterator[Stream]:
"""Iterate over streams

Fetches streams as they are iterated over, so you keep a limited number of streams in memory.

Returns:
Iterator[Stream]: yields Streams one by one.
"""
return self()

def retrieve(self, external_id: str) -> Stream | None:
Copy link
Copy Markdown

@asosnovski asosnovski Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about includeStatistics parameter? NB! Statistics calculation is potentially expensive, that's why we have it in the get stream endpoint but not in the list streams. And allowing to get multiple streams by ID with one method invocation kind of circumvents this.

"""`Retrieve a stream. <https://developer.cognite.com/api#tag/Streams/operation/byStreamIdsStreams>`_

Args:
external_id (str): No description.

Returns:
Stream | None: Requested stream or None if it does not exist.

Examples:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.data_modeling.streams.retrieve(streams='myStream')

Get multiple streams by id:

>>> res = client.data_modeling.streams.retrieve(streams=["MyStream", "MyAwesomeStream", "MyOtherStream"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is concerning. How many IDs can be passed this way? Our endpoint allows retrieving only 1 stream by ID, so this results in multiple API invocations, right? And stream endpoints have very strict limits: https://cognitedata.atlassian.net/wiki/spaces/RPILA/pages/4797431945/Design+ILA+rate+and+concurrency+limits#Stream-endpoint-limits Invoking this method with >5 IDs basically guarantees throttling.


"""
identifier = Identifier.load(external_id=external_id)
return self._retrieve(identifier=identifier, cls=Stream, headers=self.__alpha_headers)

def delete(self, external_id: str) -> None:
"""`Delete one or more streams <https://developer.cognite.com/api#tag/Streams/operation/deleteStreamsV3>`_

Args:
external_id (str): ID of streams.
Examples:

Delete streams by id:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> client.data_modeling.streams.delete(streams=["myStream", "myOtherStream"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought. Streams are intended to be long-lived, and customers should think twice before creating or deleting them. A deleted stream will stay for up to 6 weeks in a soft-deleted state, all this time consuming capacity, incurring costs and preventing another stream with the same name from being created. Should we really allow customers to delete multiple streams with 1 request?

"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Streams api deviate from cognite api standard on delete (bug has been opened on this), which is generally to do post with items list under , instead we have a issue a DELETE request.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only accept one item in the list of streams to be deleted and created btw..

self._delete(url_path=f"{self._RESOURCE_PATH}/{external_id}", headers=self.__alpha_headers)

def list(self, limit: int | None = DEFAULT_LIMIT_READ) -> StreamList:
"""`List streams <https://developer.cognite.com/api#tag/Streams/operation/listStreamsV3>`_

Args:
limit (int | None): Maximum number of streams to return. Defaults to 10. Set to -1, float("inf") or None to return all items.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned by Andreas, there is no limit. The endpoint returns all the streams there are, as a project isn't supposed to have many.


Returns:
StreamList: List of requested streams

Examples:

List streams and filter on max start time:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> stream_list = client.data_modeling.streams.list(limit=5)

Iterate over streams:

>>> for stream in client.data_modeling.streams:
... stream # do something with the stream

Iterate over chunks of streams to reduce memory load:

>>> for stream_list in client.data_modeling.streams(chunk_size=2500):
... stream_list # do something with the streams
"""
return self._list(
list_cls=StreamList, resource_cls=Stream, method="GET", limit=limit, headers=self.__alpha_headers
)

@overload
def create(self, streams: Sequence[StreamWrite]) -> StreamList: ...

@overload
def create(self, streams: StreamWrite) -> Stream: ...

def create(self, streams: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought. Streams are intended to be long-lived, and customers should think twice before creating or deleting them. A deleted stream will stay for up to 6 weeks in a soft-deleted state, all this time consuming capacity, incurring costs and preventing another stream with the same name from being created. Should we really allow customers to create multiple streams with 1 request? Especially considering that the endpoint will have only 1 rps limit, so calling this method with multiple stream names would automatically mean throttling.

"""`Create one or more streams. <https://developer.cognite.com/api#tag/Streams/operation/ApplyStreams>`_

Args:
streams (StreamWrite | Sequence[StreamWrite]): Stream | Sequence[Stream]): Stream or streams of streamsda to create or update.

Returns:
Stream | StreamList: Created stream(s)

Examples:

Create new streams:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling.streams import StreamWrite
>>> client = CogniteClient()
>>> streams = [StreamWrite(stream="myStream", description="My first stream", name="My Stream"),
... StreamWrite(stream="myOtherStream", description="My second stream", name="My Other Stream")]
>>> res = client.data_modeling.streams.create(streams)
"""
return self._create_multiple(
list_cls=StreamList,
resource_cls=Stream,
items=streams,
input_resource_cls=StreamWrite,
headers=self.__alpha_headers,
)
Loading