Skip to content

Commit 2b783e3

Browse files
committed
refactor: enable sqlglot as the default compiler with failsafe mechanism
1 parent a58063c commit 2b783e3

File tree

4 files changed

+238
-70
lines changed

4 files changed

+238
-70
lines changed

packages/bigframes/bigframes/core/compile/__init__.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,30 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
from typing import Any
16+
from typing import Literal
1717

18-
from bigframes import options
1918
from bigframes.core.compile.api import test_only_ibis_inferred_schema
2019
from bigframes.core.compile.configs import CompileRequest, CompileResult
2120

2221

23-
def compiler() -> Any:
24-
"""Returns the appropriate compiler module based on session options."""
25-
if options.experiments.sql_compiler == "experimental":
22+
def compile_sql(
23+
request: CompileRequest,
24+
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
25+
) -> CompileResult:
26+
"""Compiles a BigFrameNode according to the request into SQL."""
27+
if compiler_name == "sqlglot":
2628
import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler
2729

28-
return sqlglot_compiler
30+
return sqlglot_compiler.compile_sql(request)
2931
else:
3032
import bigframes.core.compile.ibis_compiler.ibis_compiler as ibis_compiler
3133

32-
return ibis_compiler
34+
return ibis_compiler.compile_sql(request)
3335

3436

3537
__all__ = [
3638
"test_only_ibis_inferred_schema",
3739
"CompileRequest",
3840
"CompileResult",
39-
"compiler",
41+
"compile_sql",
4042
]

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 117 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import concurrent.futures
1818
import math
1919
import threading
20+
import uuid
21+
import warnings
2022
from typing import Literal, Mapping, Optional, Sequence, Tuple
2123

2224
import google.api_core.exceptions
2325
import google.cloud.bigquery.job as bq_job
2426
import google.cloud.bigquery.table as bq_table
2527
import google.cloud.bigquery_storage_v1
28+
import google.cloud.exceptions
2629
from google.cloud import bigquery
2730

2831
import bigframes
@@ -124,9 +127,7 @@ def to_sql(
124127
else array_value.node
125128
)
126129
node = self._substitute_large_local_sources(node)
127-
compiled = compile.compiler().compile_sql(
128-
compile.CompileRequest(node, sort_rows=ordered)
129-
)
130+
compiled = self._compile(node, ordered=ordered)
130131
return compiled.sql
131132

132133
def execute(
@@ -242,46 +243,55 @@ def _export_gbq(
242243
# validate destination table
243244
existing_table = self._maybe_find_existing_table(spec)
244245

245-
compiled = compile.compiler().compile_sql(
246-
compile.CompileRequest(plan, sort_rows=False)
247-
)
248-
sql = compiled.sql
246+
def run_with_compiler(compiler_name, compiler_id=None):
247+
compiled = self._compile(plan, ordered=False, compiler_name=compiler_name)
248+
sql = compiled.sql
249249

250-
if (existing_table is not None) and _is_schema_match(
251-
existing_table.schema, array_value.schema
252-
):
253-
# b/409086472: Uses DML for table appends and replacements to avoid
254-
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
255-
# https://cloud.google.com/bigquery/quotas#standard_tables
256-
job_config = bigquery.QueryJobConfig()
250+
if (existing_table is not None) and _is_schema_match(
251+
existing_table.schema, array_value.schema
252+
):
253+
# b/409086472: Uses DML for table appends and replacements to avoid
254+
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
255+
# https://cloud.google.com/bigquery/quotas#standard_tables
256+
job_config = bigquery.QueryJobConfig()
257+
258+
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
259+
if spec.if_exists == "append":
260+
sql = sg_sql.to_sql(
261+
sg_sql.insert(ir.expr.as_select_all(), spec.table)
262+
)
263+
else: # for "replace"
264+
assert spec.if_exists == "replace"
265+
sql = sg_sql.to_sql(
266+
sg_sql.replace(ir.expr.as_select_all(), spec.table)
267+
)
268+
else:
269+
dispositions = {
270+
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
271+
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
272+
"append": bigquery.WriteDisposition.WRITE_APPEND,
273+
}
274+
job_config = bigquery.QueryJobConfig(
275+
write_disposition=dispositions[spec.if_exists],
276+
destination=spec.table,
277+
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
278+
)
257279

258-
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
259-
if spec.if_exists == "append":
260-
sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table))
261-
else: # for "replace"
262-
assert spec.if_exists == "replace"
263-
sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table))
264-
else:
265-
dispositions = {
266-
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
267-
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
268-
"append": bigquery.WriteDisposition.WRITE_APPEND,
269-
}
270-
job_config = bigquery.QueryJobConfig(
271-
write_disposition=dispositions[spec.if_exists],
272-
destination=spec.table,
273-
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
280+
# Attach data type usage to the job labels
281+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
282+
job_config.labels["bigframes-compiler"] = (
283+
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
284+
)
285+
# TODO(swast): plumb through the api_name of the user-facing api that
286+
# caused this query.
287+
iterator, job = self._run_execute_query(
288+
sql=sql,
289+
job_config=job_config,
290+
session=array_value.session,
274291
)
292+
return iterator, job
275293

276-
# Attach data type usage to the job labels
277-
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
278-
# TODO(swast): plumb through the api_name of the user-facing api that
279-
# caused this query.
280-
iterator, job = self._run_execute_query(
281-
sql=sql,
282-
job_config=job_config,
283-
session=array_value.session,
284-
)
294+
iterator, job = self._compile_with_fallback(run_with_compiler)
285295

286296
has_special_dtype_col = any(
287297
t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE)
@@ -410,6 +420,43 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue):
410420
self.prepare_plan(array_value.node)
411421
)
412422

423+
def _compile(
424+
self,
425+
node: nodes.BigFrameNode,
426+
*,
427+
ordered: bool = False,
428+
peek: Optional[int] = None,
429+
materialize_all_order_keys: bool = False,
430+
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
431+
) -> compile.CompileResult:
432+
return compile.compile_sql(
433+
compile.CompileRequest(
434+
node,
435+
sort_rows=ordered,
436+
peek_count=peek,
437+
materialize_all_order_keys=materialize_all_order_keys,
438+
),
439+
compiler_name=compiler_name,
440+
)
441+
442+
def _compile_with_fallback(self, run_fn):
443+
compiler_option = bigframes.options.experiments.sql_compiler
444+
if compiler_option == "legacy":
445+
return run_fn("ibis")
446+
elif compiler_option == "experimental":
447+
return run_fn("sqlglot")
448+
else: # stable
449+
compiler_id = f"{uuid.uuid1().hex[:12]}"
450+
try:
451+
return run_fn("sqlglot", compiler_id=compiler_id)
452+
except google.cloud.exceptions.BadRequest as e:
453+
msg = bfe.format_message(
454+
f"Compiler ID {compiler_id}: BadRequest on sqlglot. "
455+
f"Falling back to ibis. Details: {e.message}"
456+
)
457+
warnings.warn(msg, category=UserWarning)
458+
return run_fn("ibis", compiler_id=compiler_id)
459+
413460
def prepare_plan(
414461
self,
415462
plan: nodes.BigFrameNode,
@@ -604,34 +651,43 @@ def _execute_plan_gbq(
604651
]
605652
cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS]
606653

607-
compiled = compile.compiler().compile_sql(
608-
compile.CompileRequest(
654+
def run_with_compiler(compiler_name, compiler_id=None):
655+
compiled = self._compile(
609656
plan,
610-
sort_rows=ordered,
611-
peek_count=peek,
657+
ordered=ordered,
658+
peek=peek,
612659
materialize_all_order_keys=(cache_spec is not None),
660+
compiler_name=compiler_name,
613661
)
614-
)
615-
# might have more columns than og schema, for hidden ordering columns
616-
compiled_schema = compiled.sql_schema
662+
# might have more columns than og schema, for hidden ordering columns
663+
compiled_schema = compiled.sql_schema
617664

618-
destination_table: Optional[bigquery.TableReference] = None
665+
destination_table: Optional[bigquery.TableReference] = None
619666

620-
job_config = bigquery.QueryJobConfig()
621-
if create_table:
622-
destination_table = self.storage_manager.create_temp_table(
623-
compiled_schema, cluster_cols
667+
job_config = bigquery.QueryJobConfig()
668+
if create_table:
669+
destination_table = self.storage_manager.create_temp_table(
670+
compiled_schema, cluster_cols
671+
)
672+
job_config.destination = destination_table
673+
674+
# Attach data type usage to the job labels
675+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
676+
job_config.labels["bigframes-compiler"] = (
677+
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
624678
)
625-
job_config.destination = destination_table
626-
627-
# Attach data type usage to the job labels
628-
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
629-
iterator, query_job = self._run_execute_query(
630-
sql=compiled.sql,
631-
job_config=job_config,
632-
query_with_job=(destination_table is not None),
633-
session=plan.session,
634-
)
679+
iterator, query_job = self._run_execute_query(
680+
sql=compiled.sql,
681+
job_config=job_config,
682+
query_with_job=(destination_table is not None),
683+
session=plan.session,
684+
)
685+
return iterator, query_job, compiled
686+
687+
iterator, query_job, compiled = self._compile_with_fallback(run_with_compiler)
688+
689+
# might have more columns than og schema, for hidden ordering columns
690+
compiled_schema = compiled.sql_schema
635691

636692
# we could actually cache even when caching is not explicitly requested, but being conservative for now
637693
result_bq_data = None

packages/bigframes/noxfile.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,6 @@ def cover(session):
445445
omitted_paths = [
446446
# non-prod, unit tested
447447
"bigframes/core/compile/polars/*",
448-
"bigframes/core/compile/sqlglot/*",
449448
# untested
450449
"bigframes/streaming/*",
451450
# utils
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from unittest import mock
16+
17+
import google.cloud.bigquery as bigquery
18+
import google.cloud.exceptions
19+
import pyarrow as pa
20+
import pytest
21+
22+
import bigframes
23+
import bigframes.core.nodes as nodes
24+
import bigframes.core.schema as schemata
25+
from bigframes.session.bq_caching_executor import BigQueryCachingExecutor
26+
27+
28+
@pytest.fixture
29+
def mock_executor():
30+
bqclient = mock.create_autospec(bigquery.Client)
31+
bqclient.project = "test-project"
32+
storage_manager = mock.Mock()
33+
bqstoragereadclient = mock.Mock()
34+
loader = mock.Mock()
35+
publisher = mock.Mock()
36+
return BigQueryCachingExecutor(
37+
bqclient, storage_manager, bqstoragereadclient, loader, publisher=publisher
38+
)
39+
40+
41+
def test_compiler_with_fallback_legacy(mock_executor):
42+
run_fn = mock.Mock()
43+
with bigframes.option_context("experiments.sql_compiler", "legacy"):
44+
mock_executor._compile_with_fallback(run_fn)
45+
run_fn.assert_called_once_with("ibis")
46+
47+
48+
def test_compiler_with_fallback_experimental(mock_executor):
49+
run_fn = mock.Mock()
50+
with bigframes.option_context("experiments.sql_compiler", "experimental"):
51+
mock_executor._compile_with_fallback(run_fn)
52+
run_fn.assert_called_once_with("sqlglot")
53+
54+
55+
def test_compiler_with_fallback_stable_success(mock_executor):
56+
run_fn = mock.Mock()
57+
with bigframes.option_context("experiments.sql_compiler", "stable"):
58+
mock_executor._compile_with_fallback(run_fn)
59+
run_fn.assert_called_once_with("sqlglot", compiler_id=mock.ANY)
60+
61+
62+
def test_compiler_execute_plan_gbq_fallback_labels(mock_executor):
63+
plan = mock.create_autospec(nodes.BigFrameNode)
64+
plan.schema = schemata.ArraySchema(tuple())
65+
plan.session = None
66+
67+
# Mock prepare_plan
68+
mock_executor.prepare_plan = mock.Mock(return_value=plan)
69+
70+
# Mock _compile
71+
from bigframes.core.compile.configs import CompileResult
72+
73+
fake_compiled = CompileResult(
74+
sql="SELECT 1", sql_schema=[], row_order=None, encoded_type_refs="fake_refs"
75+
)
76+
mock_executor._compile = mock.Mock(return_value=fake_compiled)
77+
78+
# Mock _run_execute_query to fail first time, then succeed
79+
mock_iterator = mock.Mock()
80+
mock_iterator.total_rows = 0
81+
mock_iterator.to_arrow.return_value = pa.Table.from_arrays([], names=[])
82+
mock_query_job = mock.Mock(spec=bigquery.QueryJob)
83+
mock_query_job.destination = None
84+
85+
error = google.cloud.exceptions.BadRequest("failed")
86+
error.job = mock.Mock(spec=bigquery.QueryJob) # type: ignore
87+
error.job.job_id = "failed_job_id" # type: ignore
88+
89+
mock_executor._run_execute_query = mock.Mock(
90+
side_effect=[error, (mock_iterator, mock_query_job)]
91+
)
92+
93+
with (
94+
bigframes.option_context("experiments.sql_compiler", "stable"),
95+
pytest.warns(UserWarning, match="Falling back to ibis"),
96+
):
97+
mock_executor._execute_plan_gbq(plan, ordered=False, must_create_table=False)
98+
99+
# Verify labels for both calls
100+
assert mock_executor._run_execute_query.call_count == 2
101+
102+
call_1_kwargs = mock_executor._run_execute_query.call_args_list[0][1]
103+
call_2_kwargs = mock_executor._run_execute_query.call_args_list[1][1]
104+
105+
label_1 = call_1_kwargs["job_config"].labels["bigframes-compiler"]
106+
label_2 = call_2_kwargs["job_config"].labels["bigframes-compiler"]
107+
108+
assert label_1.startswith("sqlglot-")
109+
assert label_2.startswith("ibis-")
110+
# Both should have the same compiler_id suffix
111+
assert label_1.split("-")[1] == label_2.split("-")[1]

0 commit comments

Comments
 (0)