diff --git a/docs/examples/dataset.md b/docs/examples/dataset.md index f8687b8f..fa042a9a 100644 --- a/docs/examples/dataset.md +++ b/docs/examples/dataset.md @@ -282,6 +282,28 @@ print(df) ``` ```` +### Polars LazyFrame with direct S3-compatible API access + +Access dataset files directly via the S3-compatible API as a Polars LazyFrame for efficient lazy evaluation. This method bypasses FoundrySqlServer and works with both regular and hive-partitioned parquet datasets. + +````{tab} v2 +```python +from foundry_dev_tools import FoundryContext +import polars as pl + +ctx = FoundryContext() +ds = ctx.get_dataset_by_path("/path/to/test_dataset") +lazy_df = ds.to_lazy_polars() + +# Perform lazy operations (not executed yet) +result = lazy_df.filter(pl.col("age") > 25).select(["name", "age"]) + +# Execute and collect results +df = result.collect() +print(df) +``` +```` + ### DuckDB Table from Spark SQL dialect Queries the Foundry SQL server with Spark SQL dialect, load arrow stream using [duckdb](https://duckdb.org/). diff --git a/libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py b/libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py index 8a731878..4aad2eec 100644 --- a/libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py +++ b/libs/foundry-dev-tools/src/foundry_dev_tools/resources/dataset.py @@ -14,6 +14,7 @@ from foundry_dev_tools.errors.dataset import ( BranchNotFoundError, DatasetHasNoOpenTransactionError, + DatasetHasNoTransactionsError, DatasetNotFoundError, TransactionTypeMismatchError, ) @@ -261,11 +262,16 @@ def get_transactions( ).json()["values"] ] - def get_last_transaction(self) -> api_types.Transaction | None: - """Returns the last transaction or None if there are no transactions.""" + def get_last_transaction(self, include_open_exclusive_transaction: bool = True) -> api_types.Transaction | None: + """Returns the last transaction or None if there are no transactions. + + Args: + include_open_exclusive_transaction: If True, includes open transactions + in the results. If False, only returns committed transactions. + """ v = self.get_transactions( page_size=1, - include_open_exclusive_transaction=True, + include_open_exclusive_transaction=include_open_exclusive_transaction, ) if v is not None and len(v) > 0: return v[0] @@ -799,6 +805,48 @@ def to_polars(self) -> pl.DataFrame: """ return self.query_foundry_sql("SELECT *", return_type="polars") + def to_lazy_polars(self, transaction_rid: str | None = None) -> pl.LazyFrame: + """Get dataset as a :py:class:`polars.LazyFrame`. + + Returns a lazy polars DataFrame that can be queried efficiently using + polars' lazy evaluation API. The data is accessed directly via the + S3-compatible API without going through FoundrySqlServer. + + Args: + transaction_rid: The transaction RID to read from. If None, uses the + last committed transaction. Useful for reading specific historical + versions of the dataset. + + Returns: + pl.LazyFrame: A lazy polars DataFrame + + Example: + >>> ds = ctx.get_dataset_by_path("/path/to/dataset") + >>> lf = ds.to_lazy_polars() + >>> result = lf.filter(pl.col("age") > 25).select(["name", "age"]) + >>> # Execute and collect results + >>> df = result.collect() + + Note: + This method uses the S3-compatible API to directly access dataset files. + For hive-partitioned datasets, polars will automatically read + the partition structure. + """ + from foundry_dev_tools._optional.polars import pl + + if transaction_rid is None: + maybe_transaction = self.get_last_transaction(include_open_exclusive_transaction=False) + if maybe_transaction is None: + msg = f"Dataset has no committed transactions: {self.rid=}" + raise DatasetHasNoTransactionsError(info=msg) + transaction_rid = maybe_transaction["rid"] + + return pl.scan_parquet( + f"s3://{self.rid}.{transaction_rid}/**/*.parquet", + storage_options=self._context.s3.get_polars_storage_options(), + hive_partitioning=True, + ) + @contextmanager def transaction_context( self, diff --git a/tests/integration/resources/test_dataset.py b/tests/integration/resources/test_dataset.py index 6e202395..00ee93d6 100644 --- a/tests/integration/resources/test_dataset.py +++ b/tests/integration/resources/test_dataset.py @@ -130,3 +130,36 @@ def test_crud_dataset(spark_session, tmp_path): # noqa: PLR0915 # # check that deletion was successful with pytest.raises(DatasetNotFoundError): ds.sync() + + +def test_to_lazy_polars_parquet_dataset(): + ds = TEST_SINGLETON.iris_parquet + lazy_df = ds.to_lazy_polars() + + assert isinstance(lazy_df, pl.LazyFrame) + + df = lazy_df.collect() + assert df.shape == (150, 5) + assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"] + + +def test_to_lazy_polars_parquet_dataset_explicit_transaction(): + ds = TEST_SINGLETON.iris_parquet + lazy_df = ds.to_lazy_polars(ds.get_last_transaction()["rid"]) + + assert isinstance(lazy_df, pl.LazyFrame) + + df = lazy_df.collect() + assert df.shape == (150, 5) + assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"] + + +def test_to_lazy_polars_hive_partitioned(): + ds = TEST_SINGLETON.iris_hive_partitioned + lazy_df = ds.to_lazy_polars() + + assert isinstance(lazy_df, pl.LazyFrame) + + df = lazy_df.collect() + assert df.shape == (150, 5) + assert df.columns == ["sepal_width", "sepal_length", "petal_width", "petal_length", "is_setosa"] diff --git a/tests/integration/utils.py b/tests/integration/utils.py index 69414a8d..e627d550 100644 --- a/tests/integration/utils.py +++ b/tests/integration/utils.py @@ -144,6 +144,79 @@ }, } +IRIS_SCHEMA_HIVE = { + "fieldSchemaList": [ + { + "type": "DOUBLE", + "name": "sepal_width", + "nullable": None, + "userDefinedTypeClass": None, + "customMetadata": {}, + "arraySubtype": None, + "precision": None, + "scale": None, + "mapKeyType": None, + "mapValueType": None, + "subSchemas": None, + }, + { + "type": "DOUBLE", + "name": "sepal_length", + "nullable": None, + "userDefinedTypeClass": None, + "customMetadata": {}, + "arraySubtype": None, + "precision": None, + "scale": None, + "mapKeyType": None, + "mapValueType": None, + "subSchemas": None, + }, + { + "type": "DOUBLE", + "name": "petal_width", + "nullable": None, + "userDefinedTypeClass": None, + "customMetadata": {}, + "arraySubtype": None, + "precision": None, + "scale": None, + "mapKeyType": None, + "mapValueType": None, + "subSchemas": None, + }, + { + "type": "DOUBLE", + "name": "petal_length", + "nullable": None, + "userDefinedTypeClass": None, + "customMetadata": {}, + "arraySubtype": None, + "precision": None, + "scale": None, + "mapKeyType": None, + "mapValueType": None, + "subSchemas": None, + }, + { + "type": "STRING", + "name": "is_setosa", + "nullable": None, + "userDefinedTypeClass": None, + "customMetadata": {}, + "arraySubtype": None, + "precision": None, + "scale": None, + "mapKeyType": None, + "mapValueType": None, + "subSchemas": None, + }, + ], + "primaryKey": None, + "dataFrameReaderClass": "com.palantir.foundry.spark.input.ParquetDataFrameReader", + "customMetadata": {"format": "parquet"}, +} + FOUNDRY_SCHEMA_COMPLEX_DATASET = { "fieldSchemaList": [ { @@ -515,6 +588,30 @@ def iris_no_schema(self) -> Dataset: ) return _iris_no_schema + @cached_property + def iris_hive_partitioned(self) -> Dataset: + _iris_hive_partitioned = self.ctx.get_dataset_by_path( + INTEGRATION_TEST_COMPASS_ROOT_PATH + "/iris_hive_partitioned", + create_if_not_exist=True, + ) + if _iris_hive_partitioned.__created__: + _ = _iris_hive_partitioned.upload_folder(TEST_FOLDER.joinpath("test_data", "iris", "iris_hive_partitioned")) + _iris_hive_partitioned.upload_schema( + _iris_hive_partitioned.get_last_transaction()["rid"], schema=IRIS_SCHEMA_HIVE + ) + return _iris_hive_partitioned + + @cached_property + def iris_parquet(self) -> Dataset: + _iris_parquet = self.ctx.get_dataset_by_path( + INTEGRATION_TEST_COMPASS_ROOT_PATH + "/iris_parquet", + create_if_not_exist=True, + ) + if _iris_parquet.__created__: + _ = _iris_parquet.upload_folder(TEST_FOLDER.joinpath("test_data", "iris", "iris_parquet")) + _iris_parquet.upload_schema(_iris_parquet.get_last_transaction()["rid"], schema=IRIS_SCHEMA_HIVE) + return _iris_parquet + @cached_property def empty_dataset(self) -> Dataset: return self.ctx.get_dataset_by_path( diff --git a/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-setosa/file.parquet b/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-setosa/file.parquet new file mode 100644 index 00000000..7ffebed2 Binary files /dev/null and b/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-setosa/file.parquet differ diff --git a/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-versicolor/file.parquet b/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-versicolor/file.parquet new file mode 100644 index 00000000..2302fa86 Binary files /dev/null and b/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-versicolor/file.parquet differ diff --git a/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-virginica/file.parquet b/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-virginica/file.parquet new file mode 100644 index 00000000..6864edca Binary files /dev/null and b/tests/test_data/iris/iris_hive_partitioned/is_setosa=Iris-virginica/file.parquet differ diff --git a/tests/test_data/iris/iris_parquet/spark/iris.parquet b/tests/test_data/iris/iris_parquet/spark/iris.parquet new file mode 100644 index 00000000..eb8b03de Binary files /dev/null and b/tests/test_data/iris/iris_parquet/spark/iris.parquet differ diff --git a/tests/unit/resources/test_dataset.py b/tests/unit/resources/test_dataset.py new file mode 100644 index 00000000..12236b61 --- /dev/null +++ b/tests/unit/resources/test_dataset.py @@ -0,0 +1,39 @@ +from unittest import mock + +import pytest + +from foundry_dev_tools.errors.dataset import DatasetHasNoTransactionsError +from foundry_dev_tools.resources.dataset import Dataset + + +def test_to_lazy_polars_no_transaction(): + with mock.patch.object(Dataset, "__created__", True): + ds = Dataset.__new__(Dataset) + ds.rid = "ri.foundry.main.dataset.test-dataset" + ds.path = "/test/dataset/path" + + with mock.patch.object(ds, "get_last_transaction", return_value=None): + with pytest.raises(DatasetHasNoTransactionsError) as exc_info: + ds.to_lazy_polars() + + error_message = str(exc_info.value) + assert "Dataset has no transactions" in error_message + assert ds.rid in error_message + + +def test_to_lazy_polars_transaction_rid_logic(): + with mock.patch.object(Dataset, "__created__", True): + ds = Dataset.__new__(Dataset) + ds.rid = "ri.foundry.main.dataset.abc123" + ds._context = mock.MagicMock() + ds._context.s3.get_polars_storage_options.return_value = {"aws_access_key_id": "test"} + + with mock.patch("foundry_dev_tools._optional.polars.pl.scan_parquet") as mock_scan: + mock_scan.return_value = mock.MagicMock() + ds.to_lazy_polars(transaction_rid="test") + + mock_scan.assert_called_once() + call_args = mock_scan.call_args + assert call_args[0][0] == f"s3://{ds.rid}.test/**/*.parquet" + assert call_args[1]["storage_options"] == ds._context.s3.get_polars_storage_options() + assert call_args[1]["hive_partitioning"] is True