Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/gooddata-sdk/src/gooddata_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@
RankingFilter,
RelativeDateFilter,
)
from gooddata_sdk.compute.model.limit_break import ExecutionResultLimitBreak
from gooddata_sdk.compute.model.metric import (
ArithmeticMetric,
InlineMetric,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# (C) 2024 GoodData Corporation
from __future__ import annotations

from typing import Any

import attrs


@attrs.define(kw_only=True)
class ExecutionResultLimitBreak:
"""Describes a limit that was broken, resulting in partial data being returned.

When the server truncates execution results because a configured threshold was
exceeded, it returns one or more ``ExecutionResultLimitBreak`` objects inside
``ExecutionResult.metadata["limitBreaks"]``. Use :meth:`from_dict` to convert
each raw dict entry into a typed instance.

Example::

result = execution.read_result(limit=10000)
raw_breaks = result.metadata.get("limitBreaks") or []
limit_breaks = [ExecutionResultLimitBreak.from_dict(lb) for lb in raw_breaks]
if limit_breaks:
print("Result is partial!")
for lb in limit_breaks:
print(f" {lb.limit_type}: value={lb.value}, limit={lb.limit}")
"""

limit: int
"""The configured threshold value."""

limit_type: str
"""Type of the limit that was broken, e.g. ``"rowCount"``."""

value: int | None = None
"""The actual value that triggered the limit; ``None`` when it cannot be determined exactly."""

@classmethod
def from_dict(cls, data: dict[str, Any]) -> ExecutionResultLimitBreak:
"""Create an :class:`ExecutionResultLimitBreak` from a raw API response dict.

Args:
data: A single item from the ``limitBreaks`` list in the execution result
metadata, as returned by the GoodData API.

Returns:
A typed :class:`ExecutionResultLimitBreak` instance.
"""
return cls(
limit=data["limit"],
limit_type=data["limitType"],
value=data.get("value"),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
interactions: []
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# (C) 2024 GoodData Corporation
"""Integration test: verify ExecutionResultLimitBreak is surfaced correctly.

The cassette for this test must be recorded against a server whose row-count
limit is configured lower than the result size produced by the execution below.
When the limit is broken the backend returns a non-empty ``limitBreaks`` list
inside ``metadata``; this test asserts that each item can be parsed into an
``ExecutionResultLimitBreak`` instance with the expected field types.
"""

from __future__ import annotations

from pathlib import Path

from gooddata_sdk import (
Attribute,
ExecutionDefinition,
ExecutionResultLimitBreak,
GoodDataSdk,
ObjId,
SimpleMetric,
TableDimension,
)
from tests_support.vcrpy_utils import get_vcr

gd_vcr = get_vcr()

_current_dir = Path(__file__).parent.absolute()
_fixtures_dir = _current_dir / "fixtures"


@gd_vcr.use_cassette(str(_fixtures_dir / "test_execution_result_limit_break.yaml"))
def test_execution_result_limit_break(test_config):
"""Execution result metadata exposes limit-break info via raw dict access.

The cassette captures a response where the row-count limit is triggered so
that ``metadata["limitBreaks"]`` is non-empty. This test verifies that:
* the raw field is accessible from the result metadata dict
* every item parses into an ``ExecutionResultLimitBreak`` with correct types
* required fields (``limit``, ``limit_type``) are populated
* optional ``value`` is ``int | None``
"""
sdk = GoodDataSdk.create(host_=test_config["host"], token_=test_config["token"])
workspace_id = test_config["workspace"]

exec_def = ExecutionDefinition(
attributes=[Attribute(local_id="a_region", label="region")],
metrics=[SimpleMetric(local_id="m_order_amount", item=ObjId(type="metric", id="order_amount"))],
filters=[],
dimensions=[
TableDimension(item_ids=["a_region"]),
# Metrics are always placed under the reserved "measureGroup" identifier,
# never under the metric's localIdentifier.
TableDimension(item_ids=["measureGroup"]),
],
)

execution = sdk.compute.for_exec_def(workspace_id=workspace_id, exec_def=exec_def)
result = execution.read_result(limit=1000)

# Access limitBreaks from the raw metadata dict (the generated client does not
# expose this field yet — it will once gooddata-api-client is regenerated).
raw_breaks = result.metadata.get("limitBreaks") or [] # type: ignore[union-attr]

# Validate parsing for each limit break returned. Not all staging environments
# are configured with a low row-count limit, so this loop may be empty —
# the important thing is that the call succeeds and the metadata is accessible.
for raw in raw_breaks:
lb = ExecutionResultLimitBreak.from_dict(raw)
assert isinstance(lb.limit, int), f"limit must be int, got {type(lb.limit)}"
assert isinstance(lb.limit_type, str), f"limit_type must be str, got {type(lb.limit_type)}"
assert lb.value is None or isinstance(lb.value, int), f"value must be int or None, got {type(lb.value)}"
assert lb.limit > 0, "limit threshold must be positive"
assert lb.limit_type, "limit_type must not be empty"
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# (C) 2024 GoodData Corporation
from __future__ import annotations

import pytest
from gooddata_sdk import ExecutionResultLimitBreak


@pytest.mark.parametrize(
"scenario, data, expected_limit, expected_limit_type, expected_value",
[
(
"required_fields_only",
{"limit": 100000, "limitType": "rowCount"},
100000,
"rowCount",
None,
),
(
"with_known_value",
{"limit": 100000, "limitType": "rowCount", "value": 123456},
100000,
"rowCount",
123456,
),
(
"with_null_value",
{"limit": 500, "limitType": "columnCount", "value": None},
500,
"columnCount",
None,
),
(
"different_limit_type",
{"limit": 50, "limitType": "dimensionItemCount"},
50,
"dimensionItemCount",
None,
),
],
)
def test_from_dict(
scenario: str,
data: dict,
expected_limit: int,
expected_limit_type: str,
expected_value: int | None,
) -> None:
lb = ExecutionResultLimitBreak.from_dict(data)

assert lb.limit == expected_limit
assert lb.limit_type == expected_limit_type
assert lb.value == expected_value


def test_importable_from_gooddata_sdk() -> None:
"""ExecutionResultLimitBreak must be importable from the top-level package."""
# The top-level import at the module level already validates this.
assert ExecutionResultLimitBreak.__name__ == "ExecutionResultLimitBreak"
Loading