diff --git a/bizon/connectors/destinations/bigquery_streaming_v2/src/destination.py b/bizon/connectors/destinations/bigquery_streaming_v2/src/destination.py index 39db5b5..d8ed824 100644 --- a/bizon/connectors/destinations/bigquery_streaming_v2/src/destination.py +++ b/bizon/connectors/destinations/bigquery_streaming_v2/src/destination.py @@ -75,6 +75,10 @@ def __init__( self.bq_storage_client_options = ClientOptions( quota_project_id=self.project_id, ) + # Cache of (temp_table_id, schema_fingerprint) pairs already ensured in this process. + # Prevents calling create_table on every flush, which otherwise hits BigQuery's + # per-table metadata quota (5 ops / 10s) and returns 403 rateLimitExceeded. + self._ensured_tables: set[tuple[str, int]] = set() @property def table_id(self) -> str: @@ -275,32 +279,34 @@ def process_streaming_batch( raise def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) -> str: - # Create table if it does not exist (use temp_table_id for staging) + # Ensure the staging table exists — but only once per (table, schema) in this process. + # Calling create_table on every flush otherwise hits BigQuery's per-table metadata + # quota (5 ops / 10s) and the API returns 403 rateLimitExceeded, which the google-cloud-bigquery + # SDK silently retries with exponential backoff. schema = self.get_bigquery_schema() - table = bigquery.Table(self.temp_table_id, schema=schema) - time_partitioning = TimePartitioning( - field=self.config.time_partitioning.field, type_=self.config.time_partitioning.type - ) - table.time_partitioning = time_partitioning + schema_fingerprint = hash(tuple((f.name, f.field_type, f.mode) for f in schema)) + cache_key = (self.temp_table_id, schema_fingerprint) - if self.clustering_keys and self.clustering_keys[self.destination_id]: - table.clustering_fields = self.clustering_keys[self.destination_id] - try: - table = self.bq_client.create_table(table) - except Conflict: - table = self.bq_client.get_table(self.temp_table_id) - # Compare and update schema if needed - existing_fields = {field.name: field for field in table.schema} - new_fields = {field.name: field for field in self.get_bigquery_schema()} - - # Find fields that need to be added - fields_to_add = [field for name, field in new_fields.items() if name not in existing_fields] - - if fields_to_add: - logger.warning(f"Adding new fields to table schema: {[field.name for field in fields_to_add]}") - updated_schema = table.schema + fields_to_add - table.schema = updated_schema - table = self.bq_client.update_table(table, ["schema"]) + if cache_key not in self._ensured_tables: + table = bigquery.Table(self.temp_table_id, schema=schema) + table.time_partitioning = TimePartitioning( + field=self.config.time_partitioning.field, type_=self.config.time_partitioning.type + ) + if self.clustering_keys and self.clustering_keys[self.destination_id]: + table.clustering_fields = self.clustering_keys[self.destination_id] + try: + self.bq_client.create_table(table) + except Conflict: + existing_table = self.bq_client.get_table(self.temp_table_id) + existing_fields = {field.name: field for field in existing_table.schema} + new_fields = {field.name: field for field in schema} + fields_to_add = [field for name, field in new_fields.items() if name not in existing_fields] + + if fields_to_add: + logger.warning(f"Adding new fields to table schema: {[field.name for field in fields_to_add]}") + existing_table.schema = existing_table.schema + fields_to_add + self.bq_client.update_table(existing_table, ["schema"]) + self._ensured_tables.add(cache_key) # Create the stream (use temp_table_id for staging) temp_table_parts = self.temp_table_id.split(".") @@ -439,6 +445,7 @@ def finalize(self): ).result() logger.info(f"Deleting temp table {self.temp_table_id} ...") self.bq_client.delete_table(self.temp_table_id, not_found_ok=True) + self._ensured_tables = {k for k in self._ensured_tables if k[0] != self.temp_table_id} return True elif self.sync_metadata.sync_mode == SourceSyncModes.INCREMENTAL: @@ -447,6 +454,7 @@ def finalize(self): self.bq_client.query(f"INSERT INTO {self.table_id} SELECT * FROM {self.temp_table_id}").result() logger.info(f"Deleting incremental temp table {self.temp_table_id} ...") self.bq_client.delete_table(self.temp_table_id, not_found_ok=True) + self._ensured_tables = {k for k in self._ensured_tables if k[0] != self.temp_table_id} return True elif self.sync_metadata.sync_mode == SourceSyncModes.STREAM: