Skip to content

Commit d3bf48c

Browse files
committed
feat: add callback parameter to _read_gbq_colab
1 parent f93911c commit d3bf48c

4 files changed

Lines changed: 41 additions & 12 deletions

File tree

packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import functools
1818
import typing
19-
from typing import cast, Any
19+
from typing import Any, cast
2020

2121
import bigframes_vendored.ibis.expr.api as ibis_api
2222
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes

packages/bigframes/bigframes/pandas/io/api.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ def _try_read_gbq_colab_sessionless_dry_run(
300300
def _read_gbq_colab( # type: ignore[overload-overlap]
301301
query_or_table: str,
302302
*,
303+
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
303304
pyformat_args: Optional[Dict[str, Any]] = ...,
304305
dry_run: Literal[False] = ...,
305306
) -> bigframes.dataframe.DataFrame: ...
@@ -309,6 +310,7 @@ def _read_gbq_colab( # type: ignore[overload-overlap]
309310
def _read_gbq_colab(
310311
query_or_table: str,
311312
*,
313+
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
312314
pyformat_args: Optional[Dict[str, Any]] = ...,
313315
dry_run: Literal[True] = ...,
314316
) -> pandas.Series: ...
@@ -317,6 +319,7 @@ def _read_gbq_colab(
317319
def _read_gbq_colab(
318320
query_or_table: str,
319321
*,
322+
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
320323
pyformat_args: Optional[Dict[str, Any]] = None,
321324
dry_run: bool = False,
322325
) -> bigframes.dataframe.DataFrame | pandas.Series:
@@ -328,6 +331,8 @@ def _read_gbq_colab(
328331
Args:
329332
query_or_table (str):
330333
SQL query or table ID (table ID not yet supported).
334+
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
335+
Callback to receive query execution events.
331336
pyformat_args (Optional[Dict[str, Any]]):
332337
Parameters to format into the query string.
333338
dry_run (bool):
@@ -379,6 +384,7 @@ def _read_gbq_colab(
379384
return global_session.with_default_session(
380385
bigframes.session.Session._read_gbq_colab,
381386
query_or_table,
387+
callback=callback,
382388
pyformat_args=pyformat_args,
383389
dry_run=dry_run,
384390
)

packages/bigframes/bigframes/session/__init__.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,10 @@ def __init__(
199199
self._clients_provider = clients_provider
200200
self._location = context.location or "US"
201201
else:
202-
credentials, project = (
203-
bigframes._config.auth.resolve_credentials_and_project(context)
204-
)
202+
(
203+
credentials,
204+
project,
205+
) = bigframes._config.auth.resolve_credentials_and_project(context)
205206
if context.location is None:
206207
with bigquery.Client(
207208
project=project,
@@ -584,6 +585,7 @@ def _read_gbq_colab(
584585
self,
585586
query: str,
586587
*,
588+
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
587589
pyformat_args: Optional[Dict[str, Any]] = None,
588590
dry_run: Literal[False] = ...,
589591
) -> dataframe.DataFrame: ...
@@ -593,6 +595,7 @@ def _read_gbq_colab(
593595
self,
594596
query: str,
595597
*,
598+
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
596599
pyformat_args: Optional[Dict[str, Any]] = None,
597600
dry_run: Literal[True] = ...,
598601
) -> pandas.Series: ...
@@ -601,8 +604,8 @@ def _read_gbq_colab(
601604
def _read_gbq_colab(
602605
self,
603606
query: str,
604-
# TODO: Add a callback parameter that takes some kind of Event object.
605607
*,
608+
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
606609
pyformat_args: Optional[Dict[str, Any]] = None,
607610
dry_run: bool = False,
608611
) -> Union[dataframe.DataFrame, pandas.Series]:
@@ -615,6 +618,8 @@ def _read_gbq_colab(
615618
query (str):
616619
A SQL query string to execute. Results (if any) are turned into
617620
a DataFrame.
621+
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
622+
Callback to receive query execution events.
618623
pyformat_args (dict):
619624
A dictionary of potential variables to replace in ``query``.
620625
Note: strings are _not_ escaped. Use query parameters for these,
@@ -634,13 +639,21 @@ def _read_gbq_colab(
634639
dry_run=dry_run,
635640
)
636641

637-
return self._loader.read_gbq_query(
638-
query=query,
639-
index_col=bigframes.enums.DefaultIndexKind.NULL,
640-
force_total_order=False,
641-
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
642-
allow_large_results=allow_large_results,
643-
)
642+
def _run_query():
643+
return self._loader.read_gbq_query(
644+
query=query,
645+
index_col=bigframes.enums.DefaultIndexKind.NULL,
646+
force_total_order=False,
647+
dry_run=typing.cast(
648+
Union[Literal[False], Literal[True]], dry_run
649+
),
650+
allow_large_results=allow_large_results,
651+
)
652+
653+
if callback is not None:
654+
with self._publisher.subscribe(callback):
655+
return _run_query()
656+
return _run_query()
644657

645658
@overload
646659
def read_gbq_query( # type: ignore[overload-overlap]

packages/bigframes/tests/unit/session/test_read_gbq_colab.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,13 @@ def test_read_gbq_colab_doesnt_set_destination_table():
126126

127127
assert query == "SELECT 'my-test-query';"
128128
assert config.destination is None
129+
130+
131+
def test_read_gbq_colab_with_callback():
132+
"""Make sure callback receives events during execution."""
133+
session = mocks.create_bigquery_session()
134+
callback = mock.Mock()
135+
136+
_ = session._read_gbq_colab("SELECT 'my-test-query';", callback=callback)
137+
138+
assert callback.call_count > 0

0 commit comments

Comments
 (0)