From 62d343ce68fdfa7a5c670171871d1b66246f5bbe Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 16 Jun 2026 11:24:31 +0200 Subject: [PATCH 1/4] [GCP] Optimize BigQuery TriggerCopyJobs performance for WRITE_APPEND --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 4e45d0324ee2..2baca5f9bd2b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -607,7 +607,7 @@ def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]: copy_to_reference.tableId) if full_table_ref not in self._observed_tables: write_disposition = self.write_disposition - wait_for_job = True + wait_for_job = (self.write_disposition != 'WRITE_APPEND') self._observed_tables.add(full_table_ref) Lineage.sinks().add( 'bigquery', From 26c60038c58646179479f3a7a1924a7ba8170a51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Tue, 16 Jun 2026 12:08:32 +0200 Subject: [PATCH 2/4] Update sdks/python/apache_beam/io/gcp/bigquery_file_loads.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 2baca5f9bd2b..bf8fad104848 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -607,7 +607,7 @@ def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]: copy_to_reference.tableId) if full_table_ref not in self._observed_tables: write_disposition = self.write_disposition - wait_for_job = (self.write_disposition != 'WRITE_APPEND') + wait_for_job = (self.write_disposition != BigQueryDisposition.WRITE_APPEND) self._observed_tables.add(full_table_ref) Lineage.sinks().add( 'bigquery', From 8a23f3b9d3bf991601b0ba56e6162591acf4ec58 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 16 Jun 2026 11:22:03 +0000 Subject: [PATCH 3/4] format --- sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index bf8fad104848..ef15f7b96217 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -607,7 +607,8 @@ def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]: copy_to_reference.tableId) if full_table_ref not in self._observed_tables: write_disposition = self.write_disposition - wait_for_job = (self.write_disposition != BigQueryDisposition.WRITE_APPEND) + wait_for_job = ( + self.write_disposition != BigQueryDisposition.WRITE_APPEND) self._observed_tables.add(full_table_ref) Lineage.sinks().add( 'bigquery', From af3b0267190bdcfa2b06a7cc266b2c73b53ce2cc Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Tue, 16 Jun 2026 13:05:49 +0000 Subject: [PATCH 4/4] Optimize BigQuery copy jobs in file loads using multi-source copy Updates BigQuery file loads in Python SDK to use multi-source copy jobs when copying temporary tables to the final destination table. * Update BigQueryWrapper._insert_copy_job to support a list of source tables, utilizing BigQuery's multi-source copy capability. * Update TriggerCopyJobs to process temporary tables in batch, splitting them into chunks of 1,200 (BigQuery limit) and triggering multi-source copy jobs. * Implement inline wait for the first chunk in TriggerCopyJobs when write disposition is WRITE_TRUNCATE or WRITE_EMPTY and there are multiple chunks. This ensures the destination table is initialized by the first job before subsequent chunks append to it. * Fix grouping key in _load_data for WRITE_TRUNCATE/WRITE_EMPTY to use the full hashable destination instead of just tableId, preventing incorrect grouping of tables with the same name in different datasets. * Fix TriggerLoadJobs to use bq_wrapper with mock client in tests, resolving credential refresh warnings. * Fix PartitionFiles to avoid yielding empty partitions when a file exceeds limits. TAG=agy CONV=126370d2-f42e-4132-a237-16bd5ccf72a3 --- .../apache_beam/io/gcp/bigquery_file_loads.py | 169 ++++++++------- .../io/gcp/bigquery_file_loads_test.py | 205 ++++++++++++++---- .../apache_beam/io/gcp/bigquery_tools.py | 18 +- 3 files changed, 256 insertions(+), 136 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index ef15f7b96217..0451a2db1857 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -491,6 +491,7 @@ class TriggerCopyJobs(beam.DoFn): """ TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables' + MAX_SOURCES_PER_COPY_JOB = 1200 def __init__( self, @@ -528,97 +529,97 @@ def process( self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None): if isinstance(element_list, tuple): # Allow this for streaming update compatibility while fixing BEAM-24535. - self.process_one(element_list, job_name_prefix) - else: - for element in element_list: - self.process_one(element, job_name_prefix) + element_list = [element_list] - def process_one(self, element, job_name_prefix): - destination, job_reference = element + if not element_list: + return - copy_to_reference = bigquery_tools.parse_table_reference(destination) + first_destination = element_list[0][0] + copy_to_reference = bigquery_tools.parse_table_reference(first_destination) if copy_to_reference.projectId is None: copy_to_reference.projectId = vp.RuntimeValueProvider.get_value( 'project', str, '') or self.project - copy_from_reference = bigquery_tools.parse_table_reference(destination) - copy_from_reference.tableId = job_reference.jobId - if copy_from_reference.projectId is None: - copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( - 'project', str, '') or self.project - - _LOGGER.info( - "Triggering copy job from %s to %s", - copy_from_reference, - copy_to_reference) + copy_from_references = [] + for destination, job_reference in element_list: + copy_from_reference = bigquery_tools.parse_table_reference(destination) + copy_from_reference.tableId = job_reference.jobId + if copy_from_reference.projectId is None: + copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( + 'project', str, '') or self.project + copy_from_references.append(copy_from_reference) - wait_for_job, write_disposition = ( - self._determine_write_disposition(copy_to_reference)) - - if not self.bq_io_metadata: - self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) + full_table_ref = '%s:%s.%s' % ( + copy_to_reference.projectId, + copy_to_reference.datasetId, + copy_to_reference.tableId) - project_id = ( - copy_to_reference.projectId - if self.load_job_project_id is None else self.load_job_project_id) - copy_job_name = '%s_%s' % ( + is_first_time = full_table_ref not in self._observed_tables + if is_first_time: + self._observed_tables.add(full_table_ref) + if self.bq_io_metadata: + Lineage.sinks().add( + 'bigquery', + copy_to_reference.projectId, + copy_to_reference.datasetId, + copy_to_reference.tableId) + + # Split into chunks of MAX_SOURCES_PER_COPY_JOB + chunks = [ + copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB] + for i in range( + 0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB) + ] + + copy_job_name_base = '%s_%s' % ( job_name_prefix, _bq_uuid( '%s:%s.%s' % ( - copy_from_reference.projectId, - copy_from_reference.datasetId, - copy_from_reference.tableId))) - job_reference = self.bq_wrapper._insert_copy_job( - project_id, - copy_job_name, - copy_from_reference, - copy_to_reference, - create_disposition=self.create_disposition, - write_disposition=write_disposition, - job_labels=self.bq_io_metadata.add_additional_bq_job_labels()) - - if wait_for_job: - self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) - self.pending_jobs.append( - GlobalWindows.windowed_value((destination, job_reference))) + copy_to_reference.projectId, + copy_to_reference.datasetId, + copy_to_reference.tableId))) - def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]: - """ - Determines the write disposition for a BigQuery copy job, - based on destination. - - When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs - to the same destination can interfere with each other, truncate data, and - write to the BigQuery table repeatedly. To prevent this, the first copy job - runs with the user's specified write_disposition, but subsequent jobs must - always use WRITE_APPEND. This ensures that subsequent copy jobs do not - clear out data appended by previous jobs. - - Args: - copy_to_reference: The reference to the destination table. + project_id = ( + copy_to_reference.projectId + if self.load_job_project_id is None else self.load_job_project_id) - Returns: - A tuple containing a boolean indicating whether to wait for the job to - complete and the write disposition to use for the job. - """ - full_table_ref = '%s:%s.%s' % ( - copy_to_reference.projectId, - copy_to_reference.datasetId, - copy_to_reference.tableId) - if full_table_ref not in self._observed_tables: - write_disposition = self.write_disposition - wait_for_job = ( - self.write_disposition != BigQueryDisposition.WRITE_APPEND) - self._observed_tables.add(full_table_ref) - Lineage.sinks().add( - 'bigquery', - copy_to_reference.projectId, - copy_to_reference.datasetId, - copy_to_reference.tableId) - else: - wait_for_job = False - write_disposition = 'WRITE_APPEND' - return wait_for_job, write_disposition + for i, chunk in enumerate(chunks): + if i == 0 and is_first_time: + write_disposition = self.write_disposition + # Wait inline only if we have multiple chunks and write disposition is WRITE_TRUNCATE or WRITE_EMPTY. + # This ensures the first chunk initializes the table, and subsequent chunks (WRITE_APPEND) append to it. + wait_for_job = ( + self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and + len(chunks) > 1) + else: + write_disposition = 'WRITE_APPEND' + wait_for_job = False + + chunk_job_name = copy_job_name_base + if len(chunks) > 1: + chunk_job_name = f"{copy_job_name_base}_{i}" + + _LOGGER.info( + "Triggering copy job %s from %s to %s (write_disposition: %s)", + chunk_job_name, [str(r) for r in chunk], + copy_to_reference, + write_disposition) + + job_reference = self.bq_wrapper._insert_copy_job( + project_id, + chunk_job_name, + chunk, + copy_to_reference, + create_disposition=self.create_disposition, + write_disposition=write_disposition, + job_labels=self.bq_io_metadata.add_additional_bq_job_labels() + if self.bq_io_metadata else None) + + if wait_for_job: + self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) + + self.pending_jobs.append( + GlobalWindows.windowed_value((first_destination, job_reference))) def finish_bundle(self): for windowed_value in self.pending_jobs: @@ -745,7 +746,7 @@ def process( else: try: schema = bigquery_tools.table_schema_to_dict( - bigquery_tools.BigQueryWrapper().get_table( + self.bq_wrapper.get_table( project_id=table_reference.projectId, dataset_id=table_reference.datasetId, table_id=table_reference.tableId).schema) @@ -856,7 +857,8 @@ def process(self, element): if latest_partition.can_accept(file_size): latest_partition.add(file_path, file_size) else: - partitions.append(latest_partition.files) + if latest_partition.files: + partitions.append(latest_partition.files) latest_partition = PartitionFiles.Partition( self.max_partition_size, self.max_files_per_partition) latest_partition.add(file_path, file_size) @@ -1182,12 +1184,13 @@ def _load_data( # the truncation happens only once. See # https://github.com/apache/beam/issues/24535. finished_temp_tables_load_job_ids_list_pc = ( - finished_temp_tables_load_job_ids_pc | beam.MapTuple( + finished_temp_tables_load_job_ids_pc + | beam.MapTuple( lambda destination, job_reference: ( - bigquery_tools.parse_table_reference(destination).tableId, + bigquery_tools.get_hashable_destination(destination), (destination, job_reference))) | beam.GroupByKey() - | beam.MapTuple(lambda tableId, batch: list(batch))) + | beam.MapTuple(lambda dest, batch: list(batch))) else: # Loads can happen in parallel. finished_temp_tables_load_job_ids_list_pc = ( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 191719e6a208..47c1ce5ea1bb 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -924,69 +924,180 @@ def dynamic_destination_resolver(element, *side_inputs): write_disposition=BigQueryDisposition.WRITE_TRUNCATE)) from apache_beam.io.gcp.internal.clients.bigquery import TableReference - mock_insert_copy_job.assert_has_calls( - [ - call( - 'project1', - mock.ANY, + mock_insert_copy_job.assert_has_calls([ + call( + 'project1', + mock.ANY, + [ TableReference( datasetId='dataset1', projectId='project1', tableId='job_name1'), - TableReference( - datasetId='dataset1', - projectId='project1', - tableId='table1'), - create_disposition=None, - write_disposition='WRITE_TRUNCATE', - job_labels={'step_name': 'bigquerybatchfileloads'}), - call( - 'project1', - mock.ANY, TableReference( datasetId='dataset1', projectId='project1', tableId='job_name1'), - TableReference( - datasetId='dataset1', - projectId='project1', - tableId='table1'), - create_disposition=None, - write_disposition='WRITE_APPEND', - job_labels={'step_name': 'bigquerybatchfileloads'}), - call( - 'project1', - mock.ANY, + ], + TableReference( + datasetId='dataset1', projectId='project1', tableId='table1'), + create_disposition=None, + write_disposition='WRITE_TRUNCATE', + job_labels={'step_name': 'bigquerybatchfileloads'}), + call( + 'project1', + mock.ANY, + [ TableReference( datasetId='dataset2', projectId='project1', tableId='job_name1'), - TableReference( - datasetId='dataset2', - projectId='project1', - tableId='table1'), - create_disposition=None, - # Previously this was `WRITE_APPEND`. - write_disposition='WRITE_TRUNCATE', - job_labels={'step_name': 'bigquerybatchfileloads'}), - call( - 'project1', - mock.ANY, + ], + TableReference( + datasetId='dataset2', projectId='project1', tableId='table1'), + create_disposition=None, + write_disposition='WRITE_TRUNCATE', + job_labels={'step_name': 'bigquerybatchfileloads'}), + call( + 'project1', + mock.ANY, + [ TableReference( datasetId='dataset3', projectId='project1', tableId='job_name1'), - TableReference( - datasetId='dataset3', - projectId='project1', - tableId='table1'), - create_disposition=None, - # Previously this was `WRITE_APPEND`. - write_disposition='WRITE_TRUNCATE', - job_labels={'step_name': 'bigquerybatchfileloads'}), - ], - any_order=True) - self.assertEqual(4, mock_insert_copy_job.call_count) + ], + TableReference( + datasetId='dataset3', projectId='project1', tableId='table1'), + create_disposition=None, + write_disposition='WRITE_TRUNCATE', + job_labels={'step_name': 'bigquerybatchfileloads'}), + ], + any_order=True) + self.assertEqual(3, mock_insert_copy_job.call_count) + + @mock.patch( + 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper.wait_for_bq_job') + @mock.patch( + 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._insert_copy_job') + def test_copy_jobs_splitting( + self, mock_insert_copy_job, mock_wait_for_bq_job): + destination = 'project1:dataset1.table1' + + from apache_beam.io.gcp.bigquery_file_loads import TriggerCopyJobs + original_max_sources = TriggerCopyJobs.MAX_SOURCES_PER_COPY_JOB + TriggerCopyJobs.MAX_SOURCES_PER_COPY_JOB = 2 + + try: + job_reference = bigquery_api.JobReference() + job_reference.projectId = 'project1' + job_reference.jobId = 'job_name1' + result_job = mock.Mock() + result_job.jobReference = job_reference + + mock_job = mock.Mock() + mock_job.status.state = 'DONE' + mock_job.status.errorResult = None + mock_job.jobReference = job_reference + + bq_client = mock.Mock() + bq_client.jobs.Get.return_value = mock_job + bq_client.jobs.Insert.return_value = result_job + bq_client.tables.Delete.return_value = None + mock_insert_copy_job.return_value = job_reference + temp_dir = self._new_tempdir() + + with TestPipeline('FnApiRunner') as p: + _ = ( + p + | beam.Create([ + { + 'name': 'a' + }, + { + 'name': 'b' + }, + { + 'name': 'c' + }, + { + 'name': 'd' + }, + { + 'name': 'e' + }, + ], + reshuffle=False) + | bqfl.BigQueryBatchFileLoads( + destination, + custom_gcs_temp_location=temp_dir, + test_client=bq_client, + validate=False, + temp_file_format=bigquery_tools.FileFormat.JSON, + max_file_size=10, + max_partition_size=10, + max_files_per_partition=1, + write_disposition=BigQueryDisposition.WRITE_TRUNCATE)) + + self.assertEqual(3, mock_insert_copy_job.call_count) + + from apache_beam.io.gcp.internal.clients.bigquery import TableReference + expected_calls = [ + call( + 'project1', + mock.ANY, + [ + TableReference( + datasetId='dataset1', + projectId='project1', + tableId='job_name1'), + TableReference( + datasetId='dataset1', + projectId='project1', + tableId='job_name1'), + ], + TableReference( + datasetId='dataset1', projectId='project1', tableId='table1'), + create_disposition=None, + write_disposition='WRITE_TRUNCATE', + job_labels=mock.ANY), + call( + 'project1', + mock.ANY, + [ + TableReference( + datasetId='dataset1', + projectId='project1', + tableId='job_name1'), + TableReference( + datasetId='dataset1', + projectId='project1', + tableId='job_name1'), + ], + TableReference( + datasetId='dataset1', projectId='project1', tableId='table1'), + create_disposition=None, + write_disposition='WRITE_APPEND', + job_labels=mock.ANY), + call( + 'project1', + mock.ANY, + [ + TableReference( + datasetId='dataset1', + projectId='project1', + tableId='job_name1'), + ], + TableReference( + datasetId='dataset1', projectId='project1', tableId='table1'), + create_disposition=None, + write_disposition='WRITE_APPEND', + job_labels=mock.ANY), + ] + mock_insert_copy_job.assert_has_calls(expected_calls, any_order=True) + self.assertEqual(9, mock_wait_for_bq_job.call_count) + + finally: + TriggerCopyJobs.MAX_SOURCES_PER_COPY_JOB = original_max_sources @parameterized.expand([ param(is_streaming=False, with_auto_sharding=False, compat_version=None), diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 8dd58cd55a01..491b7a39b0b7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -506,16 +506,22 @@ def _insert_copy_job( reference = bigquery.JobReference() reference.jobId = job_id reference.projectId = project_id + + copy_config = bigquery.JobConfigurationTableCopy( + destinationTable=to_table_reference, + createDisposition=create_disposition, + writeDisposition=write_disposition, + ) + if isinstance(from_table_reference, list): + copy_config.sourceTables = from_table_reference + else: + copy_config.sourceTable = from_table_reference + request = bigquery.BigqueryJobsInsertRequest( projectId=project_id, job=bigquery.Job( configuration=bigquery.JobConfiguration( - copy=bigquery.JobConfigurationTableCopy( - destinationTable=to_table_reference, - sourceTable=from_table_reference, - createDisposition=create_disposition, - writeDisposition=write_disposition, - ), + copy=copy_config, labels=_build_job_labels(job_labels), ), jobReference=reference,