-
Notifications
You must be signed in to change notification settings - Fork 0
Bug 2026725: Use load jobs instead of stream insert so if script fails previous rows can be deleted #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Bug 2026725: Use load jobs instead of stream insert so if script fails previous rows can be deleted #16
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -639,11 +639,57 @@ def delete_existing_snapshot( | |
| ) | ||
|
|
||
|
|
||
| def _insert_rows_to_table( | ||
| client: bigquery.Client, | ||
| table: str, | ||
| table_ref: str, | ||
| rows: list, | ||
| use_streaming_insert: bool, | ||
| ) -> None: | ||
| """ | ||
| Write rows to a single BigQuery table via streaming insert or load job. | ||
|
|
||
| Args: | ||
| client: BigQuery client instance | ||
| table: Table name (used only in error messages) | ||
| table_ref: Fully-qualified table reference (project.dataset.table) | ||
| rows: List of row dicts to write | ||
| use_streaming_insert: If True, use streaming insert (emulator only). | ||
| If False, use a load job so rows are immediately mutable (no | ||
| streaming buffer restriction), allowing DELETE to work right away. | ||
| """ | ||
| if use_streaming_insert: | ||
| # The BigQuery emulator does not support load jobs; fall back to streaming | ||
| # inserts for local testing. The streaming buffer restriction that prevents | ||
| # immediate DELETE/UPDATE does not apply to the emulator. | ||
| errors = client.insert_rows_json(table_ref, rows) | ||
| if errors: | ||
| error_msg = f"BigQuery insert errors for table {table}: {errors}" | ||
| logger.error(error_msg) | ||
| raise Exception(error_msg) | ||
| else: | ||
| # Load jobs write directly to storage, so rows are immediately mutable — | ||
| # this allows DELETE in delete_existing_snapshot() to work without hitting | ||
| # the streaming buffer restriction. | ||
| job_config = bigquery.LoadJobConfig( | ||
| write_disposition=bigquery.WriteDisposition.WRITE_APPEND, | ||
| source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, | ||
| ) | ||
| load_job = client.load_table_from_json(rows, table_ref, job_config=job_config) | ||
| load_job.result() | ||
|
|
||
| if load_job.errors: | ||
| error_msg = f"BigQuery load errors for table {table}: {load_job.errors}" | ||
| logger.error(error_msg) | ||
| raise Exception(error_msg) | ||
|
Comment on lines
+681
to
+684
|
||
|
|
||
|
|
||
| def load_data( | ||
| client: bigquery.Client, | ||
| dataset_id: str, | ||
| transformed_data: dict, | ||
| snapshot_date: str, | ||
| use_streaming_insert: bool = False, | ||
| ) -> None: | ||
| """ | ||
| Load transformed data to BigQuery using the Python client library. | ||
|
|
@@ -655,6 +701,10 @@ def load_data( | |
| 'commits', 'reviewers', 'comments') mapped to lists of row dictionaries | ||
| snapshot_date: Snapshot date string in YYYY-MM-DD format, computed once by the | ||
| caller to avoid date-boundary skew between the existence check and inserts | ||
| use_streaming_insert: If True, use the streaming insert API instead of a load | ||
| job. Use only against the BigQuery emulator, which does not support load | ||
| jobs. In production, load jobs are preferred because rows are immediately | ||
| mutable (no streaming buffer restriction). | ||
| """ | ||
|
|
||
| if not transformed_data: | ||
|
|
@@ -674,15 +724,16 @@ def load_data( | |
| for row in load_table_data: | ||
| row["snapshot_date"] = snapshot_date | ||
|
|
||
| # Insert rows into BigQuery | ||
| table_ref = f"{client.project}.{dataset_id}.{table}" | ||
| logger.info(table_ref) | ||
| errors = client.insert_rows_json(table_ref, load_table_data) | ||
|
|
||
| if errors: | ||
| error_msg = f"BigQuery insert errors for table {table}: {errors}" | ||
| logger.error(error_msg) | ||
| raise Exception(error_msg) | ||
| _insert_rows_to_table( | ||
| client=client, | ||
| table=table, | ||
| table_ref=table_ref, | ||
| rows=load_table_data, | ||
| use_streaming_insert=use_streaming_insert, | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"Data loading completed successfully for table {table} " | ||
|
|
@@ -832,7 +883,11 @@ def _refresh() -> None: | |
|
|
||
| # Load | ||
| load_data( | ||
| bigquery_client, bigquery_dataset, transformed_data, snapshot_date | ||
| bigquery_client, | ||
| bigquery_dataset, | ||
| transformed_data, | ||
| snapshot_date, | ||
| use_streaming_insert=bool(emulator_host), | ||
| ) | ||
|
|
||
| total_processed += len(chunk) | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These branches raise a generic Exception, but main() only catches RuntimeError. If an insert/load fails, this will escape the top-level handler and produce an uncaught exception traceback instead of returning a clean exit code via main(). Consider raising RuntimeError (or a dedicated custom exception) here so failures are handled consistently.