From 9ace68b869a8a7b8bbd61ba3928d114cb8603b39 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 13 Apr 2026 22:16:25 +0000 Subject: [PATCH 1/3] perf(bigframes): Improve write api upload throughput --- .../bigframes/bigframes/session/loader.py | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py index b0a9e0a1ed31..af318d56dee5 100644 --- a/packages/bigframes/bigframes/session/loader.py +++ b/packages/bigframes/bigframes/session/loader.py @@ -52,7 +52,7 @@ import pandas import pyarrow as pa from google.cloud import bigquery_storage_v1 -from google.cloud.bigquery_storage_v1 import types as bq_storage_types +from google.cloud.bigquery_storage_v1 import types as bq_storage_types, writer as bq_storage_writer import bigframes._tools import bigframes._tools.strings @@ -520,38 +520,41 @@ def write_data( ) serialized_schema = schema.serialize().to_pybytes() - def stream_worker(work: Iterator[pa.RecordBatch]) -> str: + def stream_worker(work: Iterator[pa.RecordBatch], max_outstanding: int = 5) -> str: requested_stream = bq_storage_types.WriteStream( type_=bq_storage_types.WriteStream.Type.PENDING ) stream = self._write_client.create_write_stream( parent=parent, write_stream=requested_stream ) + base_request = bq_storage_types.AppendRowsRequest( + write_stream=stream.name, + ) + base_request.arrow_rows.writer_schema.serialized_schema = serialized_schema + + stream_manager = bq_storage_writer.AppendRowsStream( + client=self._write_client, initial_request_template=base_request + ) stream_name = stream.name + current_offset = 0 + futures: list[bq_storage_writer.AppendRowsFuture] = [] - def request_generator(): - current_offset = 0 - for batch in work: - request = bq_storage_types.AppendRowsRequest( - write_stream=stream.name, offset=current_offset - ) + for batch in work: + if len(futures) >= max_outstanding: + futures.pop(0).result() - request.arrow_rows.writer_schema.serialized_schema = ( - serialized_schema - ) - request.arrow_rows.rows.serialized_record_batch = ( - batch.serialize().to_pybytes() - ) + request = bq_storage_types.AppendRowsRequest(offset=current_offset) + request.arrow_rows.rows.serialized_record_batch = ( + batch.serialize().to_pybytes() + ) - yield request - current_offset += batch.num_rows + futures.append(stream_manager.send(request)) + current_offset += batch.num_rows - responses = self._write_client.append_rows(requests=request_generator()) - for resp in responses: - if resp.row_errors: - raise ValueError( - f"Errors in stream {stream_name}: {resp.row_errors}" - ) + for future in futures: + future.result() + + stream_manager.close() self._write_client.finalize_write_stream(name=stream_name) return stream_name From 09acc1dfefe440c7533f763357346a40ffd9295b Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 13 Apr 2026 23:15:12 +0000 Subject: [PATCH 2/3] linted --- packages/bigframes/bigframes/session/loader.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py index af318d56dee5..d3fb1377ba2c 100644 --- a/packages/bigframes/bigframes/session/loader.py +++ b/packages/bigframes/bigframes/session/loader.py @@ -52,7 +52,10 @@ import pandas import pyarrow as pa from google.cloud import bigquery_storage_v1 -from google.cloud.bigquery_storage_v1 import types as bq_storage_types, writer as bq_storage_writer +from google.cloud.bigquery_storage_v1 import ( + types as bq_storage_types, + writer as bq_storage_writer, +) import bigframes._tools import bigframes._tools.strings @@ -520,7 +523,9 @@ def write_data( ) serialized_schema = schema.serialize().to_pybytes() - def stream_worker(work: Iterator[pa.RecordBatch], max_outstanding: int = 5) -> str: + def stream_worker( + work: Iterator[pa.RecordBatch], max_outstanding: int = 5 + ) -> str: requested_stream = bq_storage_types.WriteStream( type_=bq_storage_types.WriteStream.Type.PENDING ) From 3486b31937e55bda90995a37fe69bf103e196823 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 14 Apr 2026 19:53:13 +0000 Subject: [PATCH 3/3] handle row_errors --- packages/bigframes/bigframes/session/loader.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py index d3fb1377ba2c..960208063105 100644 --- a/packages/bigframes/bigframes/session/loader.py +++ b/packages/bigframes/bigframes/session/loader.py @@ -546,7 +546,11 @@ def stream_worker( for batch in work: if len(futures) >= max_outstanding: - futures.pop(0).result() + row_errors = futures.pop(0).result().row_errors + if row_errors: + raise ValueError( + f"Problem loading rows: {row_errors}. {constants.FEEDBACK_LINK}" + ) request = bq_storage_types.AppendRowsRequest(offset=current_offset) request.arrow_rows.rows.serialized_record_batch = ( @@ -557,7 +561,11 @@ def stream_worker( current_offset += batch.num_rows for future in futures: - future.result() + row_errors = future.result().row_errors + if row_errors: + raise ValueError( + f"Problem loading rows: {row_errors}. {constants.FEEDBACK_LINK}" + ) stream_manager.close() self._write_client.finalize_write_stream(name=stream_name)