From 7284f6d488257ae843a9d6e23359c1be60f45ff2 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Wed, 8 Apr 2026 16:22:53 +0200 Subject: [PATCH] fix: cache ensured tables in BigQuery streaming v2 to stop 403 storm load_to_bigquery_via_streaming unconditionally called create_table on every flush, so each bizon.stream.iteration issued one POST /tables per destination_id. With ~15 CDC tables routed through one destination, this blew past BigQuery's per-table metadata quota (5 ops / 10s) and returned HTTP 403 rateLimitExceeded. The google-cloud-bigquery SDK silently retried those 403s via DEFAULT_RETRY, so data kept flowing but every iteration burned wall-clock in backoff and flooded Datadog with red spans. Cache (temp_table_id, schema_fingerprint) pairs on the destination instance and short-circuit the create_table/Conflict/schema-reconcile block when we've already ensured a table with that exact schema this process lifetime. Drop matching cache entries in finalize() after delete_table in FULL_REFRESH and INCREMENTAL modes so a same-process re-run recreates the temp table. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../bigquery_streaming_v2/src/destination.py | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/bizon/connectors/destinations/bigquery_streaming_v2/src/destination.py b/bizon/connectors/destinations/bigquery_streaming_v2/src/destination.py index 39db5b5..d8ed824 100644 --- a/bizon/connectors/destinations/bigquery_streaming_v2/src/destination.py +++ b/bizon/connectors/destinations/bigquery_streaming_v2/src/destination.py @@ -75,6 +75,10 @@ def __init__( self.bq_storage_client_options = ClientOptions( quota_project_id=self.project_id, ) + # Cache of (temp_table_id, schema_fingerprint) pairs already ensured in this process. + # Prevents calling create_table on every flush, which otherwise hits BigQuery's + # per-table metadata quota (5 ops / 10s) and returns 403 rateLimitExceeded. + self._ensured_tables: set[tuple[str, int]] = set() @property def table_id(self) -> str: @@ -275,32 +279,34 @@ def process_streaming_batch( raise def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) -> str: - # Create table if it does not exist (use temp_table_id for staging) + # Ensure the staging table exists — but only once per (table, schema) in this process. + # Calling create_table on every flush otherwise hits BigQuery's per-table metadata + # quota (5 ops / 10s) and the API returns 403 rateLimitExceeded, which the google-cloud-bigquery + # SDK silently retries with exponential backoff. schema = self.get_bigquery_schema() - table = bigquery.Table(self.temp_table_id, schema=schema) - time_partitioning = TimePartitioning( - field=self.config.time_partitioning.field, type_=self.config.time_partitioning.type - ) - table.time_partitioning = time_partitioning + schema_fingerprint = hash(tuple((f.name, f.field_type, f.mode) for f in schema)) + cache_key = (self.temp_table_id, schema_fingerprint) - if self.clustering_keys and self.clustering_keys[self.destination_id]: - table.clustering_fields = self.clustering_keys[self.destination_id] - try: - table = self.bq_client.create_table(table) - except Conflict: - table = self.bq_client.get_table(self.temp_table_id) - # Compare and update schema if needed - existing_fields = {field.name: field for field in table.schema} - new_fields = {field.name: field for field in self.get_bigquery_schema()} - - # Find fields that need to be added - fields_to_add = [field for name, field in new_fields.items() if name not in existing_fields] - - if fields_to_add: - logger.warning(f"Adding new fields to table schema: {[field.name for field in fields_to_add]}") - updated_schema = table.schema + fields_to_add - table.schema = updated_schema - table = self.bq_client.update_table(table, ["schema"]) + if cache_key not in self._ensured_tables: + table = bigquery.Table(self.temp_table_id, schema=schema) + table.time_partitioning = TimePartitioning( + field=self.config.time_partitioning.field, type_=self.config.time_partitioning.type + ) + if self.clustering_keys and self.clustering_keys[self.destination_id]: + table.clustering_fields = self.clustering_keys[self.destination_id] + try: + self.bq_client.create_table(table) + except Conflict: + existing_table = self.bq_client.get_table(self.temp_table_id) + existing_fields = {field.name: field for field in existing_table.schema} + new_fields = {field.name: field for field in schema} + fields_to_add = [field for name, field in new_fields.items() if name not in existing_fields] + + if fields_to_add: + logger.warning(f"Adding new fields to table schema: {[field.name for field in fields_to_add]}") + existing_table.schema = existing_table.schema + fields_to_add + self.bq_client.update_table(existing_table, ["schema"]) + self._ensured_tables.add(cache_key) # Create the stream (use temp_table_id for staging) temp_table_parts = self.temp_table_id.split(".") @@ -439,6 +445,7 @@ def finalize(self): ).result() logger.info(f"Deleting temp table {self.temp_table_id} ...") self.bq_client.delete_table(self.temp_table_id, not_found_ok=True) + self._ensured_tables = {k for k in self._ensured_tables if k[0] != self.temp_table_id} return True elif self.sync_metadata.sync_mode == SourceSyncModes.INCREMENTAL: @@ -447,6 +454,7 @@ def finalize(self): self.bq_client.query(f"INSERT INTO {self.table_id} SELECT * FROM {self.temp_table_id}").result() logger.info(f"Deleting incremental temp table {self.temp_table_id} ...") self.bq_client.delete_table(self.temp_table_id, not_found_ok=True) + self._ensured_tables = {k for k in self._ensured_tables if k[0] != self.temp_table_id} return True elif self.sync_metadata.sync_mode == SourceSyncModes.STREAM: