Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 86 additions & 82 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ class TriggerCopyJobs(beam.DoFn):
"""

TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables'
MAX_SOURCES_PER_COPY_JOB = 1200

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment referring to the quota doc https://docs.cloud.google.com/bigquery/quotas#copy_jobs?


def __init__(
self,
Expand Down Expand Up @@ -528,96 +529,97 @@ def process(
self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
if isinstance(element_list, tuple):
# Allow this for streaming update compatibility while fixing BEAM-24535.
self.process_one(element_list, job_name_prefix)
else:
for element in element_list:
self.process_one(element, job_name_prefix)
element_list = [element_list]

def process_one(self, element, job_name_prefix):
destination, job_reference = element
if not element_list:
return

Comment thread
stankiewicz marked this conversation as resolved.
copy_to_reference = bigquery_tools.parse_table_reference(destination)
first_destination = element_list[0][0]
copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
if copy_to_reference.projectId is None:
copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project

copy_from_reference = bigquery_tools.parse_table_reference(destination)
copy_from_reference.tableId = job_reference.jobId
if copy_from_reference.projectId is None:
copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project

_LOGGER.info(
"Triggering copy job from %s to %s",
copy_from_reference,
copy_to_reference)
copy_from_references = []
for destination, job_reference in element_list:
copy_from_reference = bigquery_tools.parse_table_reference(destination)
copy_from_reference.tableId = job_reference.jobId
if copy_from_reference.projectId is None:
copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project
copy_from_references.append(copy_from_reference)

wait_for_job, write_disposition = (
self._determine_write_disposition(copy_to_reference))

if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
full_table_ref = '%s:%s.%s' % (
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
Comment on lines +552 to +555

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe use bigquery_tools.get_hashable_destination(copy_to_reference)


project_id = (
copy_to_reference.projectId
if self.load_job_project_id is None else self.load_job_project_id)
copy_job_name = '%s_%s' % (
is_first_time = full_table_ref not in self._observed_tables
if is_first_time:
self._observed_tables.add(full_table_ref)
if self.bq_io_metadata:
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
Comment on lines +558 to +565

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we only track it in _observed_tables it at the end of the process call, after the copy job completes successfully?


# Split into chunks of MAX_SOURCES_PER_COPY_JOB
chunks = [
copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB]
for i in range(
0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB)
]

copy_job_name_base = '%s_%s' % (
job_name_prefix,
_bq_uuid(
'%s:%s.%s' % (
copy_from_reference.projectId,
copy_from_reference.datasetId,
copy_from_reference.tableId)))
job_reference = self.bq_wrapper._insert_copy_job(
project_id,
copy_job_name,
copy_from_reference,
copy_to_reference,
create_disposition=self.create_disposition,
write_disposition=write_disposition,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels())

if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
self.pending_jobs.append(
GlobalWindows.windowed_value((destination, job_reference)))
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)))
Comment on lines 576 to +580

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use full_table_ref here


def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]:
"""
Determines the write disposition for a BigQuery copy job,
based on destination.

When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs
to the same destination can interfere with each other, truncate data, and
write to the BigQuery table repeatedly. To prevent this, the first copy job
runs with the user's specified write_disposition, but subsequent jobs must
always use WRITE_APPEND. This ensures that subsequent copy jobs do not
clear out data appended by previous jobs.

Args:
copy_to_reference: The reference to the destination table.
project_id = (
copy_to_reference.projectId
if self.load_job_project_id is None else self.load_job_project_id)

Returns:
A tuple containing a boolean indicating whether to wait for the job to
complete and the write disposition to use for the job.
"""
full_table_ref = '%s:%s.%s' % (
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
if full_table_ref not in self._observed_tables:
write_disposition = self.write_disposition
wait_for_job = True
self._observed_tables.add(full_table_ref)
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
else:
wait_for_job = False
write_disposition = 'WRITE_APPEND'
return wait_for_job, write_disposition
for i, chunk in enumerate(chunks):
if i == 0 and is_first_time:
write_disposition = self.write_disposition
# Wait inline only if we have multiple chunks and write disposition is WRITE_TRUNCATE or WRITE_EMPTY.
# This ensures the first chunk initializes the table, and subsequent chunks (WRITE_APPEND) append to it.
wait_for_job = (
self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and
len(chunks) > 1)
Comment thread
stankiewicz marked this conversation as resolved.
else:
write_disposition = 'WRITE_APPEND'
wait_for_job = False

chunk_job_name = copy_job_name_base
if len(chunks) > 1:
chunk_job_name = f"{copy_job_name_base}_{i}"

_LOGGER.info(
"Triggering copy job %s from %s to %s (write_disposition: %s)",
chunk_job_name, [str(r) for r in chunk],
copy_to_reference,
write_disposition)

job_reference = self.bq_wrapper._insert_copy_job(
project_id,
chunk_job_name,
chunk,
copy_to_reference,
create_disposition=self.create_disposition,
write_disposition=write_disposition,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels()
if self.bq_io_metadata else None)

if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)

self.pending_jobs.append(
GlobalWindows.windowed_value((first_destination, job_reference)))

def finish_bundle(self):
for windowed_value in self.pending_jobs:
Expand Down Expand Up @@ -744,7 +746,7 @@ def process(
else:
try:
schema = bigquery_tools.table_schema_to_dict(
bigquery_tools.BigQueryWrapper().get_table(
self.bq_wrapper.get_table(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId).schema)
Expand Down Expand Up @@ -855,7 +857,8 @@ def process(self, element):
if latest_partition.can_accept(file_size):
latest_partition.add(file_path, file_size)
else:
partitions.append(latest_partition.files)
if latest_partition.files:
partitions.append(latest_partition.files)
latest_partition = PartitionFiles.Partition(
self.max_partition_size, self.max_files_per_partition)
latest_partition.add(file_path, file_size)
Expand Down Expand Up @@ -1181,12 +1184,13 @@ def _load_data(
# the truncation happens only once. See
# https://github.com/apache/beam/issues/24535.
finished_temp_tables_load_job_ids_list_pc = (
finished_temp_tables_load_job_ids_pc | beam.MapTuple(
finished_temp_tables_load_job_ids_pc
| beam.MapTuple(
lambda destination, job_reference: (
bigquery_tools.parse_table_reference(destination).tableId,
bigquery_tools.get_hashable_destination(destination),
(destination, job_reference)))
| beam.GroupByKey()
| beam.MapTuple(lambda tableId, batch: list(batch)))
| beam.MapTuple(lambda dest, batch: list(batch)))
else:
# Loads can happen in parallel.
finished_temp_tables_load_job_ids_list_pc = (
Expand Down
Loading
Loading