Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 87 additions & 82 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ class TriggerCopyJobs(beam.DoFn):
"""

TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables'
MAX_SOURCES_PER_COPY_JOB = 1200

def __init__(
self,
Expand Down Expand Up @@ -528,96 +529,98 @@ def process(
self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
if isinstance(element_list, tuple):
# Allow this for streaming update compatibility while fixing BEAM-24535.
self.process_one(element_list, job_name_prefix)
else:
for element in element_list:
self.process_one(element, job_name_prefix)
element_list = [element_list]

def process_one(self, element, job_name_prefix):
destination, job_reference = element
if not element_list:
return
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)

copy_to_reference = bigquery_tools.parse_table_reference(destination)
first_destination = element_list[0][0]
copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
if copy_to_reference.projectId is None:
copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project

copy_from_reference = bigquery_tools.parse_table_reference(destination)
copy_from_reference.tableId = job_reference.jobId
if copy_from_reference.projectId is None:
copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project

_LOGGER.info(
"Triggering copy job from %s to %s",
copy_from_reference,
copy_to_reference)

wait_for_job, write_disposition = (
self._determine_write_disposition(copy_to_reference))

if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)

project_id = (
copy_to_reference.projectId
if self.load_job_project_id is None else self.load_job_project_id)
copy_job_name = '%s_%s' % (
job_name_prefix,
_bq_uuid(
'%s:%s.%s' % (
copy_from_reference.projectId,
copy_from_reference.datasetId,
copy_from_reference.tableId)))
job_reference = self.bq_wrapper._insert_copy_job(
project_id,
copy_job_name,
copy_from_reference,
copy_to_reference,
create_disposition=self.create_disposition,
write_disposition=write_disposition,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels())

if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
self.pending_jobs.append(
GlobalWindows.windowed_value((destination, job_reference)))
copy_from_references = []
for destination, job_reference in element_list:
copy_from_reference = bigquery_tools.parse_table_reference(destination)
copy_from_reference.tableId = job_reference.jobId
if copy_from_reference.projectId is None:
copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project
copy_from_references.append(copy_from_reference)

def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]:
"""
Determines the write disposition for a BigQuery copy job,
based on destination.

When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs
to the same destination can interfere with each other, truncate data, and
write to the BigQuery table repeatedly. To prevent this, the first copy job
runs with the user's specified write_disposition, but subsequent jobs must
always use WRITE_APPEND. This ensures that subsequent copy jobs do not
clear out data appended by previous jobs.

Args:
copy_to_reference: The reference to the destination table.

Returns:
A tuple containing a boolean indicating whether to wait for the job to
complete and the write disposition to use for the job.
"""
full_table_ref = '%s:%s.%s' % (
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
if full_table_ref not in self._observed_tables:
write_disposition = self.write_disposition
wait_for_job = True
is_first_time = full_table_ref not in self._observed_tables
if is_first_time:
self._observed_tables.add(full_table_ref)
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
else:
wait_for_job = False
write_disposition = 'WRITE_APPEND'
return wait_for_job, write_disposition
if self.bq_io_metadata:
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)

# Split into chunks of MAX_SOURCES_PER_COPY_JOB
chunks = [
copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB]
for i in range(
0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB)
]

copy_job_name_base = '%s_%s' % (
job_name_prefix,
_bq_uuid(
'%s:%s.%s' % (
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)))

project_id = (
copy_to_reference.projectId
if self.load_job_project_id is None else self.load_job_project_id)

for i, chunk in enumerate(chunks):
if i == 0 and is_first_time:
write_disposition = self.write_disposition
# Wait inline only if we have multiple chunks and write disposition is WRITE_TRUNCATE or WRITE_EMPTY.
# This ensures the first chunk initializes the table, and subsequent chunks (WRITE_APPEND) append to it.
wait_for_job = (
self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and
len(chunks) > 1)
else:
write_disposition = 'WRITE_APPEND'
wait_for_job = False

chunk_job_name = copy_job_name_base
if len(chunks) > 1:
chunk_job_name = f"{copy_job_name_base}_{i}"

_LOGGER.info(
"Triggering copy job %s from %s to %s (write_disposition: %s)",
chunk_job_name, [str(r) for r in chunk],
copy_to_reference,
write_disposition)

job_reference = self.bq_wrapper._insert_copy_job(
project_id,
chunk_job_name,
chunk,
copy_to_reference,
create_disposition=self.create_disposition,
write_disposition=write_disposition,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels()
if self.bq_io_metadata else None)

if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)

self.pending_jobs.append(
GlobalWindows.windowed_value((first_destination, job_reference)))

def finish_bundle(self):
for windowed_value in self.pending_jobs:
Expand Down Expand Up @@ -855,7 +858,8 @@ def process(self, element):
if latest_partition.can_accept(file_size):
latest_partition.add(file_path, file_size)
else:
partitions.append(latest_partition.files)
if latest_partition.files:
partitions.append(latest_partition.files)
latest_partition = PartitionFiles.Partition(
self.max_partition_size, self.max_files_per_partition)
latest_partition.add(file_path, file_size)
Expand Down Expand Up @@ -1181,12 +1185,13 @@ def _load_data(
# the truncation happens only once. See
# https://github.com/apache/beam/issues/24535.
finished_temp_tables_load_job_ids_list_pc = (
finished_temp_tables_load_job_ids_pc | beam.MapTuple(
finished_temp_tables_load_job_ids_pc
| beam.MapTuple(
lambda destination, job_reference: (
bigquery_tools.parse_table_reference(destination).tableId,
bigquery_tools.get_hashable_destination(destination),
(destination, job_reference)))
| beam.GroupByKey()
| beam.MapTuple(lambda tableId, batch: list(batch)))
| beam.MapTuple(lambda dest, batch: list(batch)))
else:
# Loads can happen in parallel.
finished_temp_tables_load_job_ids_list_pc = (
Expand Down
Loading
Loading