-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Python] Optimize BigQuery copy jobs in file loads using multi-source copy #38983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
62d343c
26c6003
8a23f3b
af3b026
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -491,6 +491,7 @@ class TriggerCopyJobs(beam.DoFn): | |
| """ | ||
|
|
||
| TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables' | ||
| MAX_SOURCES_PER_COPY_JOB = 1200 | ||
|
|
||
| def __init__( | ||
| self, | ||
|
|
@@ -528,96 +529,97 @@ def process( | |
| self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None): | ||
| if isinstance(element_list, tuple): | ||
| # Allow this for streaming update compatibility while fixing BEAM-24535. | ||
| self.process_one(element_list, job_name_prefix) | ||
| else: | ||
| for element in element_list: | ||
| self.process_one(element, job_name_prefix) | ||
| element_list = [element_list] | ||
|
|
||
| def process_one(self, element, job_name_prefix): | ||
| destination, job_reference = element | ||
| if not element_list: | ||
| return | ||
|
|
||
|
stankiewicz marked this conversation as resolved.
|
||
| copy_to_reference = bigquery_tools.parse_table_reference(destination) | ||
| first_destination = element_list[0][0] | ||
| copy_to_reference = bigquery_tools.parse_table_reference(first_destination) | ||
| if copy_to_reference.projectId is None: | ||
| copy_to_reference.projectId = vp.RuntimeValueProvider.get_value( | ||
| 'project', str, '') or self.project | ||
|
|
||
| copy_from_reference = bigquery_tools.parse_table_reference(destination) | ||
| copy_from_reference.tableId = job_reference.jobId | ||
| if copy_from_reference.projectId is None: | ||
| copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( | ||
| 'project', str, '') or self.project | ||
|
|
||
| _LOGGER.info( | ||
| "Triggering copy job from %s to %s", | ||
| copy_from_reference, | ||
| copy_to_reference) | ||
| copy_from_references = [] | ||
| for destination, job_reference in element_list: | ||
| copy_from_reference = bigquery_tools.parse_table_reference(destination) | ||
| copy_from_reference.tableId = job_reference.jobId | ||
| if copy_from_reference.projectId is None: | ||
| copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( | ||
| 'project', str, '') or self.project | ||
| copy_from_references.append(copy_from_reference) | ||
|
|
||
| wait_for_job, write_disposition = ( | ||
| self._determine_write_disposition(copy_to_reference)) | ||
|
|
||
| if not self.bq_io_metadata: | ||
| self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) | ||
| full_table_ref = '%s:%s.%s' % ( | ||
| copy_to_reference.projectId, | ||
| copy_to_reference.datasetId, | ||
| copy_to_reference.tableId) | ||
|
Comment on lines
+552
to
+555
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe use |
||
|
|
||
| project_id = ( | ||
| copy_to_reference.projectId | ||
| if self.load_job_project_id is None else self.load_job_project_id) | ||
| copy_job_name = '%s_%s' % ( | ||
| is_first_time = full_table_ref not in self._observed_tables | ||
| if is_first_time: | ||
| self._observed_tables.add(full_table_ref) | ||
| if self.bq_io_metadata: | ||
| Lineage.sinks().add( | ||
| 'bigquery', | ||
| copy_to_reference.projectId, | ||
| copy_to_reference.datasetId, | ||
| copy_to_reference.tableId) | ||
|
Comment on lines
+558
to
+565
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we only track it in |
||
|
|
||
| # Split into chunks of MAX_SOURCES_PER_COPY_JOB | ||
| chunks = [ | ||
| copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB] | ||
| for i in range( | ||
| 0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB) | ||
| ] | ||
|
|
||
| copy_job_name_base = '%s_%s' % ( | ||
| job_name_prefix, | ||
| _bq_uuid( | ||
| '%s:%s.%s' % ( | ||
| copy_from_reference.projectId, | ||
| copy_from_reference.datasetId, | ||
| copy_from_reference.tableId))) | ||
| job_reference = self.bq_wrapper._insert_copy_job( | ||
| project_id, | ||
| copy_job_name, | ||
| copy_from_reference, | ||
| copy_to_reference, | ||
| create_disposition=self.create_disposition, | ||
| write_disposition=write_disposition, | ||
| job_labels=self.bq_io_metadata.add_additional_bq_job_labels()) | ||
|
|
||
| if wait_for_job: | ||
| self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) | ||
| self.pending_jobs.append( | ||
| GlobalWindows.windowed_value((destination, job_reference))) | ||
| copy_to_reference.projectId, | ||
| copy_to_reference.datasetId, | ||
| copy_to_reference.tableId))) | ||
|
Comment on lines
576
to
+580
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: use |
||
|
|
||
| def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]: | ||
| """ | ||
| Determines the write disposition for a BigQuery copy job, | ||
| based on destination. | ||
|
|
||
| When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs | ||
| to the same destination can interfere with each other, truncate data, and | ||
| write to the BigQuery table repeatedly. To prevent this, the first copy job | ||
| runs with the user's specified write_disposition, but subsequent jobs must | ||
| always use WRITE_APPEND. This ensures that subsequent copy jobs do not | ||
| clear out data appended by previous jobs. | ||
|
|
||
| Args: | ||
| copy_to_reference: The reference to the destination table. | ||
| project_id = ( | ||
| copy_to_reference.projectId | ||
| if self.load_job_project_id is None else self.load_job_project_id) | ||
|
|
||
| Returns: | ||
| A tuple containing a boolean indicating whether to wait for the job to | ||
| complete and the write disposition to use for the job. | ||
| """ | ||
| full_table_ref = '%s:%s.%s' % ( | ||
| copy_to_reference.projectId, | ||
| copy_to_reference.datasetId, | ||
| copy_to_reference.tableId) | ||
| if full_table_ref not in self._observed_tables: | ||
| write_disposition = self.write_disposition | ||
| wait_for_job = True | ||
| self._observed_tables.add(full_table_ref) | ||
| Lineage.sinks().add( | ||
| 'bigquery', | ||
| copy_to_reference.projectId, | ||
| copy_to_reference.datasetId, | ||
| copy_to_reference.tableId) | ||
| else: | ||
| wait_for_job = False | ||
| write_disposition = 'WRITE_APPEND' | ||
| return wait_for_job, write_disposition | ||
| for i, chunk in enumerate(chunks): | ||
| if i == 0 and is_first_time: | ||
| write_disposition = self.write_disposition | ||
| # Wait inline only if we have multiple chunks and write disposition is WRITE_TRUNCATE or WRITE_EMPTY. | ||
| # This ensures the first chunk initializes the table, and subsequent chunks (WRITE_APPEND) append to it. | ||
| wait_for_job = ( | ||
| self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and | ||
| len(chunks) > 1) | ||
|
stankiewicz marked this conversation as resolved.
|
||
| else: | ||
| write_disposition = 'WRITE_APPEND' | ||
| wait_for_job = False | ||
|
|
||
| chunk_job_name = copy_job_name_base | ||
| if len(chunks) > 1: | ||
| chunk_job_name = f"{copy_job_name_base}_{i}" | ||
|
|
||
| _LOGGER.info( | ||
| "Triggering copy job %s from %s to %s (write_disposition: %s)", | ||
| chunk_job_name, [str(r) for r in chunk], | ||
| copy_to_reference, | ||
| write_disposition) | ||
|
|
||
| job_reference = self.bq_wrapper._insert_copy_job( | ||
| project_id, | ||
| chunk_job_name, | ||
| chunk, | ||
| copy_to_reference, | ||
| create_disposition=self.create_disposition, | ||
| write_disposition=write_disposition, | ||
| job_labels=self.bq_io_metadata.add_additional_bq_job_labels() | ||
| if self.bq_io_metadata else None) | ||
|
|
||
| if wait_for_job: | ||
| self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) | ||
|
|
||
| self.pending_jobs.append( | ||
| GlobalWindows.windowed_value((first_destination, job_reference))) | ||
|
|
||
| def finish_bundle(self): | ||
| for windowed_value in self.pending_jobs: | ||
|
|
@@ -744,7 +746,7 @@ def process( | |
| else: | ||
| try: | ||
| schema = bigquery_tools.table_schema_to_dict( | ||
| bigquery_tools.BigQueryWrapper().get_table( | ||
| self.bq_wrapper.get_table( | ||
| project_id=table_reference.projectId, | ||
| dataset_id=table_reference.datasetId, | ||
| table_id=table_reference.tableId).schema) | ||
|
|
@@ -855,7 +857,8 @@ def process(self, element): | |
| if latest_partition.can_accept(file_size): | ||
| latest_partition.add(file_path, file_size) | ||
| else: | ||
| partitions.append(latest_partition.files) | ||
| if latest_partition.files: | ||
| partitions.append(latest_partition.files) | ||
| latest_partition = PartitionFiles.Partition( | ||
| self.max_partition_size, self.max_files_per_partition) | ||
| latest_partition.add(file_path, file_size) | ||
|
|
@@ -1181,12 +1184,13 @@ def _load_data( | |
| # the truncation happens only once. See | ||
| # https://github.com/apache/beam/issues/24535. | ||
| finished_temp_tables_load_job_ids_list_pc = ( | ||
| finished_temp_tables_load_job_ids_pc | beam.MapTuple( | ||
| finished_temp_tables_load_job_ids_pc | ||
| | beam.MapTuple( | ||
| lambda destination, job_reference: ( | ||
| bigquery_tools.parse_table_reference(destination).tableId, | ||
| bigquery_tools.get_hashable_destination(destination), | ||
| (destination, job_reference))) | ||
| | beam.GroupByKey() | ||
| | beam.MapTuple(lambda tableId, batch: list(batch))) | ||
| | beam.MapTuple(lambda dest, batch: list(batch))) | ||
| else: | ||
| # Loads can happen in parallel. | ||
| finished_temp_tables_load_job_ids_list_pc = ( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment referring to the quota doc https://docs.cloud.google.com/bigquery/quotas#copy_jobs?