Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def __init__(
self.bq_storage_client_options = ClientOptions(
quota_project_id=self.project_id,
)
# Cache of (temp_table_id, schema_fingerprint) pairs already ensured in this process.
# Prevents calling create_table on every flush, which otherwise hits BigQuery's
# per-table metadata quota (5 ops / 10s) and returns 403 rateLimitExceeded.
self._ensured_tables: set[tuple[str, int]] = set()

@property
def table_id(self) -> str:
Expand Down Expand Up @@ -275,32 +279,34 @@ def process_streaming_batch(
raise

def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) -> str:
# Create table if it does not exist (use temp_table_id for staging)
# Ensure the staging table exists — but only once per (table, schema) in this process.
# Calling create_table on every flush otherwise hits BigQuery's per-table metadata
# quota (5 ops / 10s) and the API returns 403 rateLimitExceeded, which the google-cloud-bigquery
# SDK silently retries with exponential backoff.
schema = self.get_bigquery_schema()
table = bigquery.Table(self.temp_table_id, schema=schema)
time_partitioning = TimePartitioning(
field=self.config.time_partitioning.field, type_=self.config.time_partitioning.type
)
table.time_partitioning = time_partitioning
schema_fingerprint = hash(tuple((f.name, f.field_type, f.mode) for f in schema))
cache_key = (self.temp_table_id, schema_fingerprint)

if self.clustering_keys and self.clustering_keys[self.destination_id]:
table.clustering_fields = self.clustering_keys[self.destination_id]
try:
table = self.bq_client.create_table(table)
except Conflict:
table = self.bq_client.get_table(self.temp_table_id)
# Compare and update schema if needed
existing_fields = {field.name: field for field in table.schema}
new_fields = {field.name: field for field in self.get_bigquery_schema()}

# Find fields that need to be added
fields_to_add = [field for name, field in new_fields.items() if name not in existing_fields]

if fields_to_add:
logger.warning(f"Adding new fields to table schema: {[field.name for field in fields_to_add]}")
updated_schema = table.schema + fields_to_add
table.schema = updated_schema
table = self.bq_client.update_table(table, ["schema"])
if cache_key not in self._ensured_tables:
table = bigquery.Table(self.temp_table_id, schema=schema)
table.time_partitioning = TimePartitioning(
field=self.config.time_partitioning.field, type_=self.config.time_partitioning.type
)
if self.clustering_keys and self.clustering_keys[self.destination_id]:
table.clustering_fields = self.clustering_keys[self.destination_id]
try:
self.bq_client.create_table(table)
except Conflict:
existing_table = self.bq_client.get_table(self.temp_table_id)
existing_fields = {field.name: field for field in existing_table.schema}
new_fields = {field.name: field for field in schema}
fields_to_add = [field for name, field in new_fields.items() if name not in existing_fields]

if fields_to_add:
logger.warning(f"Adding new fields to table schema: {[field.name for field in fields_to_add]}")
existing_table.schema = existing_table.schema + fields_to_add
self.bq_client.update_table(existing_table, ["schema"])
self._ensured_tables.add(cache_key)

# Create the stream (use temp_table_id for staging)
temp_table_parts = self.temp_table_id.split(".")
Expand Down Expand Up @@ -439,6 +445,7 @@ def finalize(self):
).result()
logger.info(f"Deleting temp table {self.temp_table_id} ...")
self.bq_client.delete_table(self.temp_table_id, not_found_ok=True)
self._ensured_tables = {k for k in self._ensured_tables if k[0] != self.temp_table_id}
return True

elif self.sync_metadata.sync_mode == SourceSyncModes.INCREMENTAL:
Expand All @@ -447,6 +454,7 @@ def finalize(self):
self.bq_client.query(f"INSERT INTO {self.table_id} SELECT * FROM {self.temp_table_id}").result()
logger.info(f"Deleting incremental temp table {self.temp_table_id} ...")
self.bq_client.delete_table(self.temp_table_id, not_found_ok=True)
self._ensured_tables = {k for k in self._ensured_tables if k[0] != self.temp_table_id}
return True

elif self.sync_metadata.sync_mode == SourceSyncModes.STREAM:
Expand Down
Loading