From 3342f56abab7aaea31251d313faab54a1f8e17e0 Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Thu, 26 Mar 2026 13:22:02 -0400 Subject: [PATCH 1/2] fix: Use load jobs instead of stream insert so if script fails previous rows can be deleted --- main.py | 43 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/main.py b/main.py index 6917cd6..c0e5d55 100755 --- a/main.py +++ b/main.py @@ -644,6 +644,7 @@ def load_data( dataset_id: str, transformed_data: dict, snapshot_date: str, + use_streaming_insert: bool = False, ) -> None: """ Load transformed data to BigQuery using the Python client library. @@ -655,6 +656,10 @@ def load_data( 'commits', 'reviewers', 'comments') mapped to lists of row dictionaries snapshot_date: Snapshot date string in YYYY-MM-DD format, computed once by the caller to avoid date-boundary skew between the existence check and inserts + use_streaming_insert: If True, use the streaming insert API instead of a load + job. Use only against the BigQuery emulator, which does not support load + jobs. In production, load jobs are preferred because rows are immediately + mutable (no streaming buffer restriction). """ if not transformed_data: @@ -674,15 +679,35 @@ def load_data( for row in load_table_data: row["snapshot_date"] = snapshot_date - # Insert rows into BigQuery table_ref = f"{client.project}.{dataset_id}.{table}" logger.info(table_ref) - errors = client.insert_rows_json(table_ref, load_table_data) - if errors: - error_msg = f"BigQuery insert errors for table {table}: {errors}" - logger.error(error_msg) - raise Exception(error_msg) + if use_streaming_insert: + # The BigQuery emulator does not support load jobs; fall back to streaming + # inserts for local testing. The streaming buffer restriction that prevents + # immediate DELETE/UPDATE does not apply to the emulator. + errors = client.insert_rows_json(table_ref, load_table_data) + if errors: + error_msg = f"BigQuery insert errors for table {table}: {errors}" + logger.error(error_msg) + raise Exception(error_msg) + else: + # Load jobs write directly to storage, so rows are immediately mutable — + # this allows DELETE in delete_existing_snapshot() to work without hitting + # the streaming buffer restriction. + job_config = bigquery.LoadJobConfig( + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + ) + load_job = client.load_table_from_json( + load_table_data, table_ref, job_config=job_config + ) + load_job.result() + + if load_job.errors: + error_msg = f"BigQuery load errors for table {table}: {load_job.errors}" + logger.error(error_msg) + raise Exception(error_msg) logger.info( f"Data loading completed successfully for table {table} " @@ -832,7 +857,11 @@ def _refresh() -> None: # Load load_data( - bigquery_client, bigquery_dataset, transformed_data, snapshot_date + bigquery_client, + bigquery_dataset, + transformed_data, + snapshot_date, + use_streaming_insert=bool(emulator_host), ) total_processed += len(chunk) From d33e529fe1cdb692b940ff7d396ddcbf3afa7d57 Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Thu, 26 Mar 2026 22:33:02 -0400 Subject: [PATCH 2/2] Moved to a separate function --- main.py | 78 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/main.py b/main.py index c0e5d55..4c4b828 100755 --- a/main.py +++ b/main.py @@ -639,6 +639,51 @@ def delete_existing_snapshot( ) +def _insert_rows_to_table( + client: bigquery.Client, + table: str, + table_ref: str, + rows: list, + use_streaming_insert: bool, +) -> None: + """ + Write rows to a single BigQuery table via streaming insert or load job. + + Args: + client: BigQuery client instance + table: Table name (used only in error messages) + table_ref: Fully-qualified table reference (project.dataset.table) + rows: List of row dicts to write + use_streaming_insert: If True, use streaming insert (emulator only). + If False, use a load job so rows are immediately mutable (no + streaming buffer restriction), allowing DELETE to work right away. + """ + if use_streaming_insert: + # The BigQuery emulator does not support load jobs; fall back to streaming + # inserts for local testing. The streaming buffer restriction that prevents + # immediate DELETE/UPDATE does not apply to the emulator. + errors = client.insert_rows_json(table_ref, rows) + if errors: + error_msg = f"BigQuery insert errors for table {table}: {errors}" + logger.error(error_msg) + raise Exception(error_msg) + else: + # Load jobs write directly to storage, so rows are immediately mutable — + # this allows DELETE in delete_existing_snapshot() to work without hitting + # the streaming buffer restriction. + job_config = bigquery.LoadJobConfig( + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + ) + load_job = client.load_table_from_json(rows, table_ref, job_config=job_config) + load_job.result() + + if load_job.errors: + error_msg = f"BigQuery load errors for table {table}: {load_job.errors}" + logger.error(error_msg) + raise Exception(error_msg) + + def load_data( client: bigquery.Client, dataset_id: str, @@ -682,32 +727,13 @@ def load_data( table_ref = f"{client.project}.{dataset_id}.{table}" logger.info(table_ref) - if use_streaming_insert: - # The BigQuery emulator does not support load jobs; fall back to streaming - # inserts for local testing. The streaming buffer restriction that prevents - # immediate DELETE/UPDATE does not apply to the emulator. - errors = client.insert_rows_json(table_ref, load_table_data) - if errors: - error_msg = f"BigQuery insert errors for table {table}: {errors}" - logger.error(error_msg) - raise Exception(error_msg) - else: - # Load jobs write directly to storage, so rows are immediately mutable — - # this allows DELETE in delete_existing_snapshot() to work without hitting - # the streaming buffer restriction. - job_config = bigquery.LoadJobConfig( - write_disposition=bigquery.WriteDisposition.WRITE_APPEND, - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, - ) - load_job = client.load_table_from_json( - load_table_data, table_ref, job_config=job_config - ) - load_job.result() - - if load_job.errors: - error_msg = f"BigQuery load errors for table {table}: {load_job.errors}" - logger.error(error_msg) - raise Exception(error_msg) + _insert_rows_to_table( + client=client, + table=table, + table_ref=table_ref, + rows=load_table_data, + use_streaming_insert=use_streaming_insert, + ) logger.info( f"Data loading completed successfully for table {table} "