What would you like to happen?
Goal
Optimize the performance and atomicity of the BigQuery FILE_LOADS copy pipeline branch in the Python SDK by aligning it with the Java SDK's architectural design.
Specifically:
- Atomicity: For
WRITE_TRUNCATE and WRITE_EMPTY pipelines, group all temporary tables for a single destination and copy them using a single multi-source copy job.
- Performance: For
WRITE_APPEND pipelines, maintain running all partition copy jobs asynchronously (non-blocking) as proposed in PR 38981.
- Compatibility: Implement these changes without modifying pipeline graph, isolate changes to
TriggerCopyJobs DoFn.
Background & Current State
When writing to multiple/dynamic destinations or handling large volumes requiring partition split, Apache Beam loads data into unique temporary tables in BigQuery, then triggers copy jobs to merge them into the final destination.
1. Python Copy Configuration Limitation
Currently, the Python SDK's copy tool in bigquery_tools.py only supports a single source table reference (from_table_reference mapped to the singular sourceTable field):
configuration=bigquery.JobConfiguration(
copy=bigquery.JobConfigurationTableCopy(
destinationTable=to_table_reference,
sourceTable=from_table_reference, # Singular
createDisposition=create_disposition,
writeDisposition=write_disposition,
)
)
In contrast, the Java SDK's counterpart in WriteRename.java uses the plural setSourceTables to pass a list of temporary tables directly to a single copy job configuration:
JobConfigurationTableCopy copyConfig =
new JobConfigurationTableCopy()
.setSourceTables(tempTables) // Plural list of source tables
.setDestinationTable(ref);
2. Blocking Execution Model
Currently, TriggerCopyJobs processes copy requests sequentially in process_one. If wait_for_job is resolved to True, the worker thread blocks synchronously on the BigQuery copy job completion:
if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
3. Pipeline Grouping Modes
TriggerCopyJobs runs in two grouping modes depending on self.write_disposition:
- For
WRITE_TRUNCATE / WRITE_EMPTY: The pipeline groups all partition load job outputs for a destination into a single list before calling TriggerCopyJobs. Therefore, the element_list parameter contains all temporary tables for that destination.
- For
WRITE_APPEND: The pipeline performs no grouping. element_list contains exactly one temporary table reference at a time.
Proposed Requirements
1. Enable Multi-Source Copy in BigQueryWrapper
Modify BigQueryWrapper._insert_copy_job (or introduce an internal helper method) in bigquery_tools.py to handle either a single table reference or a list of table references.
- If a list of source tables is provided, map it to the
sourceTables field of JobConfigurationTableCopy instead of sourceTable.
2. Copy All Temp Tables at Once for Truncate / Empty
Update TriggerCopyJobs.process:
- When
self.write_disposition is WRITE_TRUNCATE or WRITE_EMPTY, element_list is a list of all temporary tables for a destination table.
- The transform should combine all temporary tables into a single source list and launch a single copy job to the destination.
- This ensures that the truncation and copying of all partitions occur in a single atomic BigQuery transaction, matching the atomicity guarantees of the Java SDK.
3. Asynchronous Execution for Append Mode
- When
self.write_disposition is WRITE_APPEND, the copy jobs must be invoked asynchronously (non-blocking) during process (with wait_for_job = False), allowing parallel execution, see PR 38981.
4. Backward Compatibility
To avoid breaking streaming update pipelines do not modify graph and limit changes to TriggerCopyJobs'` process implementation.
Issue Priority
Priority: 1 (default / most feature requests should be filed as P2)
Issue Components
What would you like to happen?
Goal
Optimize the performance and atomicity of the BigQuery
FILE_LOADScopy pipeline branch in the Python SDK by aligning it with the Java SDK's architectural design.Specifically:
WRITE_TRUNCATEandWRITE_EMPTYpipelines, group all temporary tables for a single destination and copy them using a single multi-source copy job.WRITE_APPENDpipelines, maintain running all partition copy jobs asynchronously (non-blocking) as proposed in PR 38981.TriggerCopyJobsDoFn.Background & Current State
When writing to multiple/dynamic destinations or handling large volumes requiring partition split, Apache Beam loads data into unique temporary tables in BigQuery, then triggers copy jobs to merge them into the final destination.
1. Python Copy Configuration Limitation
Currently, the Python SDK's copy tool in bigquery_tools.py only supports a single source table reference (
from_table_referencemapped to the singularsourceTablefield):In contrast, the Java SDK's counterpart in WriteRename.java uses the plural
setSourceTablesto pass a list of temporary tables directly to a single copy job configuration:2. Blocking Execution Model
Currently,
TriggerCopyJobsprocesses copy requests sequentially in process_one. Ifwait_for_jobis resolved toTrue, the worker thread blocks synchronously on the BigQuery copy job completion:3. Pipeline Grouping Modes
TriggerCopyJobsruns in two grouping modes depending onself.write_disposition:WRITE_TRUNCATE/WRITE_EMPTY: The pipeline groups all partition load job outputs for a destination into a single list before callingTriggerCopyJobs. Therefore, theelement_listparameter contains all temporary tables for that destination.WRITE_APPEND: The pipeline performs no grouping.element_listcontains exactly one temporary table reference at a time.Proposed Requirements
1. Enable Multi-Source Copy in
BigQueryWrapperModify
BigQueryWrapper._insert_copy_job(or introduce an internal helper method) in bigquery_tools.py to handle either a single table reference or a list of table references.sourceTablesfield ofJobConfigurationTableCopyinstead ofsourceTable.2. Copy All Temp Tables at Once for Truncate / Empty
Update TriggerCopyJobs.process:
self.write_dispositionisWRITE_TRUNCATEorWRITE_EMPTY,element_listis a list of all temporary tables for a destination table.3. Asynchronous Execution for Append Mode
self.write_dispositionisWRITE_APPEND, the copy jobs must be invoked asynchronously (non-blocking) duringprocess(withwait_for_job = False), allowing parallel execution, see PR 38981.4. Backward Compatibility
To avoid breaking streaming update pipelines do not modify graph and limit changes to TriggerCopyJobs'` process implementation.
Issue Priority
Priority: 1 (default / most feature requests should be filed as P2)
Issue Components