-
Notifications
You must be signed in to change notification settings - Fork 32
feat: add to_lazy_polars to Dataset #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
64cf451
08aeb1a
9294e11
d243994
f8c945a
e0a23b6
e6d325d
0a93041
c9cbd5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |||||||||||
| from foundry_dev_tools.errors.dataset import ( | ||||||||||||
| BranchNotFoundError, | ||||||||||||
| DatasetHasNoOpenTransactionError, | ||||||||||||
| DatasetHasNoTransactionsError, | ||||||||||||
| DatasetNotFoundError, | ||||||||||||
| TransactionTypeMismatchError, | ||||||||||||
| ) | ||||||||||||
|
|
@@ -261,11 +262,16 @@ def get_transactions( | |||||||||||
| ).json()["values"] | ||||||||||||
| ] | ||||||||||||
|
|
||||||||||||
| def get_last_transaction(self) -> api_types.Transaction | None: | ||||||||||||
| """Returns the last transaction or None if there are no transactions.""" | ||||||||||||
| def get_last_transaction(self, include_open_exclusive_transaction: bool = True) -> api_types.Transaction | None: | ||||||||||||
| """Returns the last transaction or None if there are no transactions. | ||||||||||||
|
|
||||||||||||
| Args: | ||||||||||||
| include_open_exclusive_transaction: If True, includes open transactions | ||||||||||||
| in the results. If False, only returns committed transactions. | ||||||||||||
| """ | ||||||||||||
| v = self.get_transactions( | ||||||||||||
| page_size=1, | ||||||||||||
| include_open_exclusive_transaction=True, | ||||||||||||
| include_open_exclusive_transaction=include_open_exclusive_transaction, | ||||||||||||
| ) | ||||||||||||
| if v is not None and len(v) > 0: | ||||||||||||
| return v[0] | ||||||||||||
|
|
@@ -799,6 +805,48 @@ def to_polars(self) -> pl.DataFrame: | |||||||||||
| """ | ||||||||||||
| return self.query_foundry_sql("SELECT *", return_type="polars") | ||||||||||||
|
|
||||||||||||
| def to_lazy_polars(self, transaction_rid: str | None = None) -> pl.LazyFrame: | ||||||||||||
| """Get dataset as a :py:class:`polars.LazyFrame`. | ||||||||||||
|
|
||||||||||||
| Returns a lazy polars DataFrame that can be queried efficiently using | ||||||||||||
| polars' lazy evaluation API. The data is accessed directly via the | ||||||||||||
| S3-compatible API without going through FoundrySqlServer. | ||||||||||||
|
|
||||||||||||
| Args: | ||||||||||||
| transaction_rid: The transaction RID to read from. If None, uses the | ||||||||||||
| last committed transaction. Useful for reading specific historical | ||||||||||||
| versions of the dataset. | ||||||||||||
|
|
||||||||||||
| Returns: | ||||||||||||
| pl.LazyFrame: A lazy polars DataFrame | ||||||||||||
|
|
||||||||||||
| Example: | ||||||||||||
| >>> ds = ctx.get_dataset_by_path("/path/to/dataset") | ||||||||||||
| >>> lf = ds.to_lazy_polars() | ||||||||||||
| >>> result = lf.filter(pl.col("age") > 25).select(["name", "age"]) | ||||||||||||
| >>> # Execute and collect results | ||||||||||||
| >>> df = result.collect() | ||||||||||||
|
|
||||||||||||
| Note: | ||||||||||||
| This method uses the S3-compatible API to directly access dataset files. | ||||||||||||
| For hive-partitioned datasets, polars will automatically read | ||||||||||||
| the partition structure. | ||||||||||||
| """ | ||||||||||||
|
Comment on lines
+808
to
+834
|
||||||||||||
| from foundry_dev_tools._optional.polars import pl | ||||||||||||
|
|
||||||||||||
| if transaction_rid is None: | ||||||||||||
| maybe_transaction = self.get_last_transaction(include_open_exclusive_transaction=False) | ||||||||||||
| if maybe_transaction is None: | ||||||||||||
| msg = f"Dataset has no committed transactions: {self.rid=}" | ||||||||||||
| raise DatasetHasNoTransactionsError(info=msg) | ||||||||||||
|
Comment on lines
+840
to
+841
|
||||||||||||
| msg = f"Dataset has no committed transactions: {self.rid=}" | |
| raise DatasetHasNoTransactionsError(info=msg) | |
| base_msg = "Dataset has no committed transactions." | |
| info_msg = f"{base_msg} {self.rid=}" | |
| raise DatasetHasNoTransactionsError(message=base_msg, info=info_msg) |
nicornk marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| from unittest import mock | ||
|
|
||
| import pytest | ||
|
|
||
| from foundry_dev_tools.errors.dataset import DatasetHasNoTransactionsError | ||
| from foundry_dev_tools.resources.dataset import Dataset | ||
|
|
||
|
|
||
| def test_to_lazy_polars_no_transaction(): | ||
| with mock.patch.object(Dataset, "__created__", True): | ||
| ds = Dataset.__new__(Dataset) | ||
| ds.rid = "ri.foundry.main.dataset.test-dataset" | ||
| ds.path = "/test/dataset/path" | ||
|
|
||
| with mock.patch.object(ds, "get_last_transaction", return_value=None): | ||
| with pytest.raises(DatasetHasNoTransactionsError) as exc_info: | ||
| ds.to_lazy_polars() | ||
|
|
||
| error_message = str(exc_info.value) | ||
| assert "Dataset has no transactions" in error_message | ||
| assert ds.rid in error_message | ||
|
|
||
|
|
||
| def test_to_lazy_polars_transaction_rid_logic(): | ||
| with mock.patch.object(Dataset, "__created__", True): | ||
| ds = Dataset.__new__(Dataset) | ||
| ds.rid = "ri.foundry.main.dataset.abc123" | ||
| ds._context = mock.MagicMock() | ||
| ds._context.s3.get_polars_storage_options.return_value = {"aws_access_key_id": "test"} | ||
|
|
||
| with mock.patch("foundry_dev_tools._optional.polars.pl.scan_parquet") as mock_scan: | ||
| mock_scan.return_value = mock.MagicMock() | ||
| ds.to_lazy_polars(transaction_rid="test") | ||
|
|
||
| mock_scan.assert_called_once() | ||
| call_args = mock_scan.call_args | ||
| assert call_args[0][0] == f"s3://{ds.rid}.test/**/*.parquet" | ||
| assert call_args[1]["storage_options"] == ds._context.s3.get_polars_storage_options() | ||
| assert call_args[1]["hive_partitioning"] is True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new
include_open_exclusive_transactionparam is documented as including "open transactions". Since the underlying API flag is specifically for open exclusive transactions, it would be clearer to mirror that terminology in the docstring to avoid implying it affects other transaction states.