diff --git a/main.py b/main.py index 6917cd6..4c4b828 100755 --- a/main.py +++ b/main.py @@ -639,11 +639,57 @@ def delete_existing_snapshot( ) +def _insert_rows_to_table( + client: bigquery.Client, + table: str, + table_ref: str, + rows: list, + use_streaming_insert: bool, +) -> None: + """ + Write rows to a single BigQuery table via streaming insert or load job. + + Args: + client: BigQuery client instance + table: Table name (used only in error messages) + table_ref: Fully-qualified table reference (project.dataset.table) + rows: List of row dicts to write + use_streaming_insert: If True, use streaming insert (emulator only). + If False, use a load job so rows are immediately mutable (no + streaming buffer restriction), allowing DELETE to work right away. + """ + if use_streaming_insert: + # The BigQuery emulator does not support load jobs; fall back to streaming + # inserts for local testing. The streaming buffer restriction that prevents + # immediate DELETE/UPDATE does not apply to the emulator. + errors = client.insert_rows_json(table_ref, rows) + if errors: + error_msg = f"BigQuery insert errors for table {table}: {errors}" + logger.error(error_msg) + raise Exception(error_msg) + else: + # Load jobs write directly to storage, so rows are immediately mutable — + # this allows DELETE in delete_existing_snapshot() to work without hitting + # the streaming buffer restriction. + job_config = bigquery.LoadJobConfig( + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + ) + load_job = client.load_table_from_json(rows, table_ref, job_config=job_config) + load_job.result() + + if load_job.errors: + error_msg = f"BigQuery load errors for table {table}: {load_job.errors}" + logger.error(error_msg) + raise Exception(error_msg) + + def load_data( client: bigquery.Client, dataset_id: str, transformed_data: dict, snapshot_date: str, + use_streaming_insert: bool = False, ) -> None: """ Load transformed data to BigQuery using the Python client library. @@ -655,6 +701,10 @@ def load_data( 'commits', 'reviewers', 'comments') mapped to lists of row dictionaries snapshot_date: Snapshot date string in YYYY-MM-DD format, computed once by the caller to avoid date-boundary skew between the existence check and inserts + use_streaming_insert: If True, use the streaming insert API instead of a load + job. Use only against the BigQuery emulator, which does not support load + jobs. In production, load jobs are preferred because rows are immediately + mutable (no streaming buffer restriction). """ if not transformed_data: @@ -674,15 +724,16 @@ def load_data( for row in load_table_data: row["snapshot_date"] = snapshot_date - # Insert rows into BigQuery table_ref = f"{client.project}.{dataset_id}.{table}" logger.info(table_ref) - errors = client.insert_rows_json(table_ref, load_table_data) - if errors: - error_msg = f"BigQuery insert errors for table {table}: {errors}" - logger.error(error_msg) - raise Exception(error_msg) + _insert_rows_to_table( + client=client, + table=table, + table_ref=table_ref, + rows=load_table_data, + use_streaming_insert=use_streaming_insert, + ) logger.info( f"Data loading completed successfully for table {table} " @@ -832,7 +883,11 @@ def _refresh() -> None: # Load load_data( - bigquery_client, bigquery_dataset, transformed_data, snapshot_date + bigquery_client, + bigquery_dataset, + transformed_data, + snapshot_date, + use_streaming_insert=bool(emulator_host), ) total_processed += len(chunk)