Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,11 +639,57 @@ def delete_existing_snapshot(
)


def _insert_rows_to_table(
client: bigquery.Client,
table: str,
table_ref: str,
rows: list,
use_streaming_insert: bool,
) -> None:
"""
Write rows to a single BigQuery table via streaming insert or load job.

Args:
client: BigQuery client instance
table: Table name (used only in error messages)
table_ref: Fully-qualified table reference (project.dataset.table)
rows: List of row dicts to write
use_streaming_insert: If True, use streaming insert (emulator only).
If False, use a load job so rows are immediately mutable (no
streaming buffer restriction), allowing DELETE to work right away.
"""
if use_streaming_insert:
# The BigQuery emulator does not support load jobs; fall back to streaming
# inserts for local testing. The streaming buffer restriction that prevents
# immediate DELETE/UPDATE does not apply to the emulator.
errors = client.insert_rows_json(table_ref, rows)
if errors:
error_msg = f"BigQuery insert errors for table {table}: {errors}"
logger.error(error_msg)
raise Exception(error_msg)
Comment on lines +667 to +669
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These branches raise a generic Exception, but main() only catches RuntimeError. If an insert/load fails, this will escape the top-level handler and produce an uncaught exception traceback instead of returning a clean exit code via main(). Consider raising RuntimeError (or a dedicated custom exception) here so failures are handled consistently.

Copilot uses AI. Check for mistakes.
else:
# Load jobs write directly to storage, so rows are immediately mutable —
# this allows DELETE in delete_existing_snapshot() to work without hitting
# the streaming buffer restriction.
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)
load_job = client.load_table_from_json(rows, table_ref, job_config=job_config)
load_job.result()

if load_job.errors:
error_msg = f"BigQuery load errors for table {table}: {load_job.errors}"
logger.error(error_msg)
raise Exception(error_msg)
Comment on lines +681 to +684
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the streaming-insert branch: raising a generic Exception here bypasses main()'s RuntimeError handler and will likely result in an uncaught exception traceback. Prefer raising RuntimeError (or another exception type that main() handles) for consistent failure behavior.

Copilot uses AI. Check for mistakes.


def load_data(
client: bigquery.Client,
dataset_id: str,
transformed_data: dict,
snapshot_date: str,
use_streaming_insert: bool = False,
) -> None:
"""
Load transformed data to BigQuery using the Python client library.
Expand All @@ -655,6 +701,10 @@ def load_data(
'commits', 'reviewers', 'comments') mapped to lists of row dictionaries
snapshot_date: Snapshot date string in YYYY-MM-DD format, computed once by the
caller to avoid date-boundary skew between the existence check and inserts
use_streaming_insert: If True, use the streaming insert API instead of a load
job. Use only against the BigQuery emulator, which does not support load
jobs. In production, load jobs are preferred because rows are immediately
mutable (no streaming buffer restriction).
"""

if not transformed_data:
Expand All @@ -674,15 +724,16 @@ def load_data(
for row in load_table_data:
row["snapshot_date"] = snapshot_date

# Insert rows into BigQuery
table_ref = f"{client.project}.{dataset_id}.{table}"
logger.info(table_ref)
errors = client.insert_rows_json(table_ref, load_table_data)

if errors:
error_msg = f"BigQuery insert errors for table {table}: {errors}"
logger.error(error_msg)
raise Exception(error_msg)
_insert_rows_to_table(
client=client,
table=table,
table_ref=table_ref,
rows=load_table_data,
use_streaming_insert=use_streaming_insert,
)

logger.info(
f"Data loading completed successfully for table {table} "
Expand Down Expand Up @@ -832,7 +883,11 @@ def _refresh() -> None:

# Load
load_data(
bigquery_client, bigquery_dataset, transformed_data, snapshot_date
bigquery_client,
bigquery_dataset,
transformed_data,
snapshot_date,
use_streaming_insert=bool(emulator_host),
)

total_processed += len(chunk)
Expand Down
Loading