Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/examples/dataset.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,28 @@ print(df)
```
````

### Polars LazyFrame with direct S3-compatible API access

Access dataset files directly via the S3-compatible API as a Polars LazyFrame for efficient lazy evaluation. This method bypasses FoundrySqlServer and works with both regular and hive-partitioned parquet datasets.

````{tab} v2
```python
from foundry_dev_tools import FoundryContext
import polars as pl

ctx = FoundryContext()
ds = ctx.get_dataset_by_path("/path/to/test_dataset")
lazy_df = ds.to_lazy_polars()

# Perform lazy operations (not executed yet)
result = lazy_df.filter(pl.col("age") > 25).select(["name", "age"])

# Execute and collect results
df = result.collect()
print(df)
```
````

### DuckDB Table from Spark SQL dialect

Queries the Foundry SQL server with Spark SQL dialect, load arrow stream using [duckdb](https://duckdb.org/).
Expand Down
54 changes: 51 additions & 3 deletions libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from foundry_dev_tools.errors.dataset import (
BranchNotFoundError,
DatasetHasNoOpenTransactionError,
DatasetHasNoTransactionsError,
DatasetNotFoundError,
TransactionTypeMismatchError,
)
Expand Down Expand Up @@ -261,11 +262,16 @@ def get_transactions(
).json()["values"]
]

def get_last_transaction(self) -> api_types.Transaction | None:
"""Returns the last transaction or None if there are no transactions."""
def get_last_transaction(self, include_open_exclusive_transaction: bool = True) -> api_types.Transaction | None:
"""Returns the last transaction or None if there are no transactions.

Args:
include_open_exclusive_transaction: If True, includes open transactions
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new include_open_exclusive_transaction param is documented as including "open transactions". Since the underlying API flag is specifically for open exclusive transactions, it would be clearer to mirror that terminology in the docstring to avoid implying it affects other transaction states.

Suggested change
include_open_exclusive_transaction: If True, includes open transactions
include_open_exclusive_transaction: If True, includes open exclusive transactions

Copilot uses AI. Check for mistakes.
in the results. If False, only returns committed transactions.
"""
v = self.get_transactions(
page_size=1,
include_open_exclusive_transaction=True,
include_open_exclusive_transaction=include_open_exclusive_transaction,
)
if v is not None and len(v) > 0:
return v[0]
Expand Down Expand Up @@ -799,6 +805,48 @@ def to_polars(self) -> pl.DataFrame:
"""
return self.query_foundry_sql("SELECT *", return_type="polars")

def to_lazy_polars(self, transaction_rid: str | None = None) -> pl.LazyFrame:
"""Get dataset as a :py:class:`polars.LazyFrame`.

Returns a lazy polars DataFrame that can be queried efficiently using
polars' lazy evaluation API. The data is accessed directly via the
S3-compatible API without going through FoundrySqlServer.

Args:
transaction_rid: The transaction RID to read from. If None, uses the
last committed transaction. Useful for reading specific historical
versions of the dataset.

Returns:
pl.LazyFrame: A lazy polars DataFrame

Example:
>>> ds = ctx.get_dataset_by_path("/path/to/dataset")
>>> lf = ds.to_lazy_polars()
>>> result = lf.filter(pl.col("age") > 25).select(["name", "age"])
>>> # Execute and collect results
>>> df = result.collect()

Note:
This method uses the S3-compatible API to directly access dataset files.
For hive-partitioned datasets, polars will automatically read
the partition structure.
"""
Comment on lines +808 to +834
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring is missing an "Args:" section to document the transaction_rid parameter. Following the existing codebase convention (as seen in methods like transaction_context and upload_schema), methods with parameters should document them in an Args section. Please add documentation explaining what transaction_rid is and when it should be provided.

Copilot uses AI. Check for mistakes.
from foundry_dev_tools._optional.polars import pl

if transaction_rid is None:
maybe_transaction = self.get_last_transaction(include_open_exclusive_transaction=False)
if maybe_transaction is None:
msg = f"Dataset has no committed transactions: {self.rid=}"
raise DatasetHasNoTransactionsError(info=msg)
Comment on lines +840 to +841
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_lazy_polars() raises DatasetHasNoTransactionsError but the message it constructs is about "no committed transactions". The exception's default message is still "Dataset has no transactions.", which can be misleading in the (common) case where the dataset has an OPEN transaction but no committed ones yet. Consider overriding the error's message here (pass message="Dataset has no committed transactions.") or adjust the wording so the header and info are consistent.

Suggested change
msg = f"Dataset has no committed transactions: {self.rid=}"
raise DatasetHasNoTransactionsError(info=msg)
base_msg = "Dataset has no committed transactions."
info_msg = f"{base_msg} {self.rid=}"
raise DatasetHasNoTransactionsError(message=base_msg, info=info_msg)

Copilot uses AI. Check for mistakes.
transaction_rid = maybe_transaction["rid"]

return pl.scan_parquet(
f"s3://{self.rid}.{transaction_rid}/**/*.parquet",
storage_options=self._context.s3.get_polars_storage_options(),
hive_partitioning=True,
)

@contextmanager
def transaction_context(
self,
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/resources/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,36 @@ def test_crud_dataset(spark_session, tmp_path): # noqa: PLR0915
# # check that deletion was successful
with pytest.raises(DatasetNotFoundError):
ds.sync()


def test_to_lazy_polars_parquet_dataset():
ds = TEST_SINGLETON.iris_parquet
lazy_df = ds.to_lazy_polars()

assert isinstance(lazy_df, pl.LazyFrame)

df = lazy_df.collect()
assert df.shape == (150, 5)
assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"]


def test_to_lazy_polars_parquet_dataset_explicit_transaction():
ds = TEST_SINGLETON.iris_parquet
lazy_df = ds.to_lazy_polars(ds.get_last_transaction()["rid"])

assert isinstance(lazy_df, pl.LazyFrame)

df = lazy_df.collect()
assert df.shape == (150, 5)
assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"]


def test_to_lazy_polars_hive_partitioned():
ds = TEST_SINGLETON.iris_hive_partitioned
lazy_df = ds.to_lazy_polars()

assert isinstance(lazy_df, pl.LazyFrame)

df = lazy_df.collect()
assert df.shape == (150, 5)
assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"]
97 changes: 97 additions & 0 deletions tests/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,79 @@
},
}

IRIS_SCHEMA_HIVE = {
"fieldSchemaList": [
{
"type": "DOUBLE",
"name": "sepal_width",
"nullable": None,
"userDefinedTypeClass": None,
"customMetadata": {},
"arraySubtype": None,
"precision": None,
"scale": None,
"mapKeyType": None,
"mapValueType": None,
"subSchemas": None,
},
{
"type": "DOUBLE",
"name": "sepal_length",
"nullable": None,
"userDefinedTypeClass": None,
"customMetadata": {},
"arraySubtype": None,
"precision": None,
"scale": None,
"mapKeyType": None,
"mapValueType": None,
"subSchemas": None,
},
{
"type": "DOUBLE",
"name": "petal_width",
"nullable": None,
"userDefinedTypeClass": None,
"customMetadata": {},
"arraySubtype": None,
"precision": None,
"scale": None,
"mapKeyType": None,
"mapValueType": None,
"subSchemas": None,
},
{
"type": "DOUBLE",
"name": "petal_length",
"nullable": None,
"userDefinedTypeClass": None,
"customMetadata": {},
"arraySubtype": None,
"precision": None,
"scale": None,
"mapKeyType": None,
"mapValueType": None,
"subSchemas": None,
},
{
"type": "STRING",
"name": "is_setosa",
"nullable": None,
"userDefinedTypeClass": None,
"customMetadata": {},
"arraySubtype": None,
"precision": None,
"scale": None,
"mapKeyType": None,
"mapValueType": None,
"subSchemas": None,
},
],
"primaryKey": None,
"dataFrameReaderClass": "com.palantir.foundry.spark.input.ParquetDataFrameReader",
"customMetadata": {"format": "parquet"},
}

FOUNDRY_SCHEMA_COMPLEX_DATASET = {
"fieldSchemaList": [
{
Expand Down Expand Up @@ -515,6 +588,30 @@ def iris_no_schema(self) -> Dataset:
)
return _iris_no_schema

@cached_property
def iris_hive_partitioned(self) -> Dataset:
_iris_hive_partitioned = self.ctx.get_dataset_by_path(
INTEGRATION_TEST_COMPASS_ROOT_PATH + "/iris_hive_partitioned",
create_if_not_exist=True,
)
if _iris_hive_partitioned.__created__:
_ = _iris_hive_partitioned.upload_folder(TEST_FOLDER.joinpath("test_data", "iris", "iris_hive_partitioned"))
_iris_hive_partitioned.upload_schema(
_iris_hive_partitioned.get_last_transaction()["rid"], schema=IRIS_SCHEMA_HIVE
)
return _iris_hive_partitioned

@cached_property
def iris_parquet(self) -> Dataset:
_iris_parquet = self.ctx.get_dataset_by_path(
INTEGRATION_TEST_COMPASS_ROOT_PATH + "/iris_parquet",
create_if_not_exist=True,
)
if _iris_parquet.__created__:
_ = _iris_parquet.upload_folder(TEST_FOLDER.joinpath("test_data", "iris", "iris_parquet"))
_iris_parquet.upload_schema(_iris_parquet.get_last_transaction()["rid"], schema=IRIS_SCHEMA_HIVE)
return _iris_parquet

@cached_property
def empty_dataset(self) -> Dataset:
return self.ctx.get_dataset_by_path(
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
39 changes: 39 additions & 0 deletions tests/unit/resources/test_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from unittest import mock

import pytest

from foundry_dev_tools.errors.dataset import DatasetHasNoTransactionsError
from foundry_dev_tools.resources.dataset import Dataset


def test_to_lazy_polars_no_transaction():
with mock.patch.object(Dataset, "__created__", True):
ds = Dataset.__new__(Dataset)
ds.rid = "ri.foundry.main.dataset.test-dataset"
ds.path = "/test/dataset/path"

with mock.patch.object(ds, "get_last_transaction", return_value=None):
with pytest.raises(DatasetHasNoTransactionsError) as exc_info:
ds.to_lazy_polars()

error_message = str(exc_info.value)
assert "Dataset has no transactions" in error_message
assert ds.rid in error_message


def test_to_lazy_polars_transaction_rid_logic():
with mock.patch.object(Dataset, "__created__", True):
ds = Dataset.__new__(Dataset)
ds.rid = "ri.foundry.main.dataset.abc123"
ds._context = mock.MagicMock()
ds._context.s3.get_polars_storage_options.return_value = {"aws_access_key_id": "test"}

with mock.patch("foundry_dev_tools._optional.polars.pl.scan_parquet") as mock_scan:
mock_scan.return_value = mock.MagicMock()
ds.to_lazy_polars(transaction_rid="test")

mock_scan.assert_called_once()
call_args = mock_scan.call_args
assert call_args[0][0] == f"s3://{ds.rid}.test/**/*.parquet"
assert call_args[1]["storage_options"] == ds._context.s3.get_polars_storage_options()
assert call_args[1]["hive_partitioning"] is True