feat(streams): Streams API#2534
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #2534 +/- ##
==========================================
- Coverage 92.82% 92.80% -0.03%
==========================================
Files 480 485 +5
Lines 48502 48763 +261
==========================================
+ Hits 45021 45253 +232
- Misses 3481 3510 +29
🚀 New features to boost your workflow:
|
This comment was marked as outdated.
This comment was marked as outdated.
c0ac1ca to
b83f313
Compare
This comment was marked as resolved.
This comment was marked as resolved.
haakonvt
left a comment
There was a problem hiding this comment.
Part 1 of review: cognite/client/_api/streams/__init__.py
1902069 to
02d53d4
Compare
This comment was marked as resolved.
This comment was marked as resolved.
29a0184 to
6829245
Compare
c2d5bad to
3225b62
Compare
The StreamsAPI needs to chunk items one at a time for create and delete operations, similar to other APIs like AgentsAPI. This ensures each stream is processed in a separate API request rather than being bundled together. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Remove unused imports that were causing lint failures (F401 errors): - asyncio, Coroutine, Iterator (not used) - Any, Literal, overload, TYPE_CHECKING (not used) - APIClient (not used) - SyncIterator, run_sync (not used) - _get_event_loop_executor (not used) - pandas, ClientConfig (not used) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
… PR 2534 structure This refactors the Streams and Records API to be part of the data_modeling module rather than a separate top-level streams module, matching the structure of PR #2534. Changes: - Move StreamsAPI from _api/streams to _api/data_modeling/streams - Move StreamsRecordsAPI from _api/streams/records to _api/data_modeling/records - Move stream data classes to data_modeling/streams.py (includes record classes) - Integrate records API into StreamsAPI via self.records attribute - Remove direct self.streams attribute from AsyncCogniteClient - Streams now accessed via client.data_modeling.streams - Generate sync API wrappers for new locations - Update all imports and exports Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Remove parenthetical type references and descriptive asides from docstrings: - Removed "(human-readable)", "(\`\`Type\`\`)" style comments - Simplified endpoint descriptions to just describe the response Removed StreamTemplate.version field as it's not in the API specification. The field was optional and only stored, never used. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
StreamTemplate.version was removed as it's not in the API spec. Updated test to remove assertions checking for this field. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
| def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: | ||
| super().__init__(config, api_version, cognite_client) | ||
| self._CREATE_LIMIT = 1 | ||
| self._DELETE_LIMIT = 1 |
There was a problem hiding this comment.
perhaps it's better to add some warning first in case the SDK implementation changes?
There was a problem hiding this comment.
self._warning = FeaturePreviewWarning(
api_maturity="General Availability",
sdk_maturity="alpha",
feature_name="Streams",
)
Addressed all review comments from PR #2534: - Using Sequence[StreamWrite] for type safety (no untyped dicts) - Chunking handled automatically via _create_multiple and _delete_multiple - Chunking limits set (_CREATE_LIMIT=1, _DELETE_LIMIT=1) - Clean docstrings without unnecessary internal details - Using params dict for query parameters - Clear documentation on soft delete behavior - Proper Note about paging limits in list() - Statistics cost documentation in retrieve() Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
|
@andersfylling is this PR dead? |
He is probably out of Claude tokens 😆 |
| res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read")) | ||
| return StreamList._load(res.json()["items"]) |
There was a problem hiding this comment.
Let's use self._list here to avoid having to manually supply the semaphore + load data class
There was a problem hiding this comment.
we don't support limit, and we don't have cursor implementation there.
| """Returns a write version.""" | ||
| return StreamWrite( | ||
| external_id=self.external_id, | ||
| settings=StreamTemplateWriteSettings(template=StreamTemplate(name=self.created_from_template)), |
There was a problem hiding this comment.
Using createdFromTemplate seems to not be too reliable according to the docs:
Name of the template used for creating this stream. Note: This value is for information only. The template might have been modified or even entirely deleted after the stream has been created.
| class StreamTemplateWriteSettings(CogniteResource): | ||
| """Write-side settings for creating a stream from a template.""" | ||
|
|
||
| def __init__(self, template: StreamTemplate) -> None: |
There was a problem hiding this comment.
Let's add a FeaturePreviewWarning here since the docs says:
Note: Stream Templates are in Beta
Optionally - or maybe better - lets add a "SDK in alpha" warning to the StreamsAPI itself to give ourselves some freedom in changing/modifying the SDK API design for some time.
There was a problem hiding this comment.
Please split mock responses into fixtures in order to keep more "condense test logic" in the actual tests
There was a problem hiding this comment.
I think most of these tests are already covered by our automatic testing, at least load/dump roundtrips of varying sorts, meaning most if not all of these tests can be removed
Summary
Add streams api to sdk.
Example
This PR does not add the Records API (#2535).
closes #2519
closes #2246