Skip to content

[Feature Request]: Multi-Source BigQuery Copy Jobs for Python SDK #38982

@stankiewicz

Description

@stankiewicz

What would you like to happen?

Goal

Optimize the performance and atomicity of the BigQuery FILE_LOADS copy pipeline branch in the Python SDK by aligning it with the Java SDK's architectural design.

Specifically:

  1. Atomicity: For WRITE_TRUNCATE and WRITE_EMPTY pipelines, group all temporary tables for a single destination and copy them using a single multi-source copy job.
  2. Performance: For WRITE_APPEND pipelines, maintain running all partition copy jobs asynchronously (non-blocking) as proposed in PR 38981.
  3. Compatibility: Implement these changes without modifying pipeline graph, isolate changes to TriggerCopyJobs DoFn.

Background & Current State

When writing to multiple/dynamic destinations or handling large volumes requiring partition split, Apache Beam loads data into unique temporary tables in BigQuery, then triggers copy jobs to merge them into the final destination.

1. Python Copy Configuration Limitation

Currently, the Python SDK's copy tool in bigquery_tools.py only supports a single source table reference (from_table_reference mapped to the singular sourceTable field):

configuration=bigquery.JobConfiguration(
    copy=bigquery.JobConfigurationTableCopy(
        destinationTable=to_table_reference,
        sourceTable=from_table_reference, # Singular
        createDisposition=create_disposition,
        writeDisposition=write_disposition,
    )
)

In contrast, the Java SDK's counterpart in WriteRename.java uses the plural setSourceTables to pass a list of temporary tables directly to a single copy job configuration:

JobConfigurationTableCopy copyConfig =
    new JobConfigurationTableCopy()
        .setSourceTables(tempTables) // Plural list of source tables
        .setDestinationTable(ref);

2. Blocking Execution Model

Currently, TriggerCopyJobs processes copy requests sequentially in process_one. If wait_for_job is resolved to True, the worker thread blocks synchronously on the BigQuery copy job completion:

if wait_for_job:
  self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)

3. Pipeline Grouping Modes

TriggerCopyJobs runs in two grouping modes depending on self.write_disposition:

  • For WRITE_TRUNCATE / WRITE_EMPTY: The pipeline groups all partition load job outputs for a destination into a single list before calling TriggerCopyJobs. Therefore, the element_list parameter contains all temporary tables for that destination.
  • For WRITE_APPEND: The pipeline performs no grouping. element_list contains exactly one temporary table reference at a time.

Proposed Requirements

1. Enable Multi-Source Copy in BigQueryWrapper

Modify BigQueryWrapper._insert_copy_job (or introduce an internal helper method) in bigquery_tools.py to handle either a single table reference or a list of table references.

  • If a list of source tables is provided, map it to the sourceTables field of JobConfigurationTableCopy instead of sourceTable.

2. Copy All Temp Tables at Once for Truncate / Empty

Update TriggerCopyJobs.process:

  • When self.write_disposition is WRITE_TRUNCATE or WRITE_EMPTY, element_list is a list of all temporary tables for a destination table.
  • The transform should combine all temporary tables into a single source list and launch a single copy job to the destination.
  • This ensures that the truncation and copying of all partitions occur in a single atomic BigQuery transaction, matching the atomicity guarantees of the Java SDK.

3. Asynchronous Execution for Append Mode

  • When self.write_disposition is WRITE_APPEND, the copy jobs must be invoked asynchronously (non-blocking) during process (with wait_for_job = False), allowing parallel execution, see PR 38981.

4. Backward Compatibility

To avoid breaking streaming update pipelines do not modify graph and limit changes to TriggerCopyJobs'` process implementation.

Issue Priority

Priority: 1 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Prism Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions