Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.api;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.maintenance.operator.DVPosition;
import org.apache.iceberg.flink.maintenance.operator.DVWriteResult;
import org.apache.iceberg.flink.maintenance.operator.EqualityConvertCommitter;
import org.apache.iceberg.flink.maintenance.operator.EqualityConvertDVWriter;
import org.apache.iceberg.flink.maintenance.operator.EqualityConvertPKIndex;
import org.apache.iceberg.flink.maintenance.operator.EqualityConvertPlan;
import org.apache.iceberg.flink.maintenance.operator.EqualityConvertPlanner;
import org.apache.iceberg.flink.maintenance.operator.EqualityConvertReader;
import org.apache.iceberg.flink.maintenance.operator.IndexCommand;
import org.apache.iceberg.flink.maintenance.operator.ReadCommand;
import org.apache.iceberg.flink.maintenance.operator.SerializedEqualityValues;
import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types.NestedField;

/**
* Creates the equality delete to DV conversion data stream. Runs a single iteration of the
* conversion for every {@link Trigger} event.
*
* <p>The pipeline reads equality delete files from a staging branch, converts them to deletion
* vectors (DVs) using a primary key index stored in Flink state, and commits the data files and DVs
* to the target branch.
*
* <p>The conversion is split into parallel stages:
*
* <ol>
* <li>Planner (p=1): scans staging branch, emits file-level ReadCommands with phase timestamps
* <li>Reader (p=N): reads files, emits row-level IndexCommands
* <li>PKIndex (p=N): maintains PK index shards, resolves equality deletes to DV positions
* <li>DVWriter (p=N, keyed by data file path): buffers positions per file, writes Puffin DVs
* inline
* <li>Committer (p=1): commits data files and DVs to the target branch
* </ol>
*
* <p>Mutual exclusion with concurrent maintenance tasks (e.g. compaction) is enforced by the Flink
* maintenance framework lock.
*/
@Experimental
public class ConvertEqualityDeletes {
static final String PLANNER_TASK_NAME = "EqConvert Planner";
static final String READER_TASK_NAME = "EqConvert Reader";
static final String PK_INDEX_TASK_NAME = "EqConvert PKIndex";
static final String DV_WRITER_TASK_NAME = "EqConvert DVWriter";
static final String UPSTREAM_ABORT_TASK_NAME = "EqConvert UpstreamAbort";
static final String COMMIT_TASK_NAME = "EqConvert Commit";
static final String AGGREGATOR_TASK_NAME = "EqConvert Aggregator";

private ConvertEqualityDeletes() {}

public static Builder builder() {
return new Builder();
}

public static class Builder extends MaintenanceTaskBuilder<Builder> {
private String stagingBranch;
private String targetBranch = SnapshotRef.MAIN_BRANCH;
private List<String> equalityFieldColumns = Collections.emptyList();

@Override
String maintenanceTaskName() {
return "ConvertEqualityDeletes";
}

/** Sets the staging branch name that holds the equality delete files and data files. */
public Builder stagingBranch(String newStagingBranch) {
this.stagingBranch = newStagingBranch;
return this;
}

/**
* Sets the target branch where converted data files and DVs are committed. Defaults to the main
* branch.
*/
public Builder targetBranch(String newTargetBranch) {
this.targetBranch = newTargetBranch;
return this;
}

/**
* Sets the equality field columns used by the worker index. Required. Must match the equality
* field columns the writer uses for staging eq-delete files. Mirrors {@link
* org.apache.iceberg.flink.sink.IcebergSink.Builder#equalityFieldColumns}.
*
* <p>The partition source columns of an equality delete's spec must be a subset of these
* columns. Writes via Flink's IcebergSink already ensure this.
*/
public Builder equalityFieldColumns(List<String> columns) {
Preconditions.checkNotNull(columns, "equalityFieldColumns must not be null");
Preconditions.checkArgument(!columns.isEmpty(), "equalityFieldColumns must not be empty");
this.equalityFieldColumns = ImmutableList.copyOf(columns);
return this;
}

@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
Preconditions.checkNotNull(stagingBranch, "stagingBranch must be set");
Preconditions.checkArgument(
!equalityFieldColumns.isEmpty(), "equalityFieldColumns must be set on the builder");
Set<Integer> eqFieldIds = resolveEqualityFieldIds();

// Planner (p=1): emits ReadCommands with phase timestamps and watermarks
SingleOutputStreamOperator<ReadCommand> planned =
setSlotSharingGroup(
trigger
.transform(
operatorName(PLANNER_TASK_NAME),
TypeInformation.of(ReadCommand.class),
new EqualityConvertPlanner(
tableName(),
taskName(),
tableLoader(),
stagingBranch,
targetBranch,
eqFieldIds))
.uid(PLANNER_TASK_NAME + uidSuffix())
.forceNonParallel());

// Reader (p=N): reads files, emits IndexCommands
SingleOutputStreamOperator<IndexCommand> index =
setSlotSharingGroup(
planned
.rebalance()
.process(
new EqualityConvertReader(
tableLoader(), eqFieldIds, stagingBranch.equals(targetBranch)))
.name(operatorName(READER_TASK_NAME))
.uid(READER_TASK_NAME + uidSuffix())
.setParallelism(parallelism()));

// Broadcast from the planner to the PKIndex to clear the entire index
BroadcastStream<IndexCommand> clearIndexBroadcast =
planned
.getSideOutput(EqualityConvertPlanner.CLEAR_BROADCAST_STREAM)
.broadcast(EqualityConvertPKIndex.CLEAR_BROADCAST_DESCRIPTOR);

// PKIndex (p=N): keyed by full PK, phase-aware buffering.
SingleOutputStreamOperator<DVPosition> dvPositions =
setSlotSharingGroup(
index
.keyBy(IndexCommand::key, TypeInformation.of(SerializedEqualityValues.class))
.connect(clearIndexBroadcast)
.process(new EqualityConvertPKIndex(stagingBranch.equals(targetBranch)))
.name(operatorName(PK_INDEX_TASK_NAME))
.uid(PK_INDEX_TASK_NAME + uidSuffix())
.setParallelism(parallelism()));

// Reader-side abort signals bypass the PKIndex and feed the DVWriter directly, so a reader
// failure can short-circuit the cycle without waiting on a keyed shuffle. This is not a full
// short-circuit: the abort is keyed by data file path (empty for ABORT), so only one resolver
// subtask observes it; the others still write their buffered DVs, which the committer then
// drops.
DataStream<DVPosition> readerAborts =
index.getSideOutput(EqualityConvertReader.READER_ABORT_STREAM);
DataStream<DVPosition> dvPositionsWithAborts = dvPositions.union(readerAborts);

// Metadata side output from planner
DataStream<EqualityConvertPlan> metadata =
planned.getSideOutput(EqualityConvertPlanner.METADATA_STREAM);

// DVWriter (p=N, keyed by data file path): groups positions per file, writes Puffin DV
// files inline, emits a DVWriteResult per cycle. Plan metadata broadcast so every subtask
// sees it.
SingleOutputStreamOperator<DVWriteResult> resolved =
setSlotSharingGroup(
dvPositionsWithAborts
.keyBy(DVPosition::dataFilePath)
.connect(metadata.broadcast())
.transform(
operatorName(DV_WRITER_TASK_NAME),
TypeInformation.of(DVWriteResult.class),
new EqualityConvertDVWriter(
tableName(), taskName(), tableLoader(), targetBranch))
.uid(DV_WRITER_TASK_NAME + uidSuffix())
.setParallelism(parallelism()));

// Upstream errors become abort signals so a partial read never commits. The same error side
// outputs also feed the aggregator below to surface the exception in TaskResult; the two
// consumers serve different purposes and must both exist.
DataStream<DVWriteResult> upstreamAborts =
setSlotSharingGroup(
index
.getSideOutput(TaskResultAggregator.ERROR_STREAM)
.union(dvPositions.getSideOutput(TaskResultAggregator.ERROR_STREAM))
.map(e -> DVWriteResult.ABORT)
.returns(TypeInformation.of(DVWriteResult.class))
.name(operatorName(UPSTREAM_ABORT_TASK_NAME))
.uid(UPSTREAM_ABORT_TASK_NAME + uidSuffix())
.forceNonParallel());

// Committer (p=1): commits data files + DVs to main.
SingleOutputStreamOperator<Trigger> committed =
setSlotSharingGroup(
resolved
.union(upstreamAborts)
.connect(metadata)
.transform(
operatorName(COMMIT_TASK_NAME),
TypeInformation.of(Trigger.class),
new EqualityConvertCommitter(
tableName(), taskName(), tableLoader(), stagingBranch, targetBranch))
.uid(COMMIT_TASK_NAME + uidSuffix())
.forceNonParallel());

// Aggregator (p=1): collects errors and emits TaskResult.
return setSlotSharingGroup(
committed
.connect(
planned
.getSideOutput(TaskResultAggregator.ERROR_STREAM)
.union(index.getSideOutput(TaskResultAggregator.ERROR_STREAM))
.union(dvPositions.getSideOutput(TaskResultAggregator.ERROR_STREAM))
.union(resolved.getSideOutput(TaskResultAggregator.ERROR_STREAM))
.union(committed.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
.transform(
operatorName(AGGREGATOR_TASK_NAME),
TypeInformation.of(TaskResult.class),
new TaskResultAggregator(tableName(), taskName(), index()))
.uid(AGGREGATOR_TASK_NAME + uidSuffix())
.forceNonParallel());
}

private Set<Integer> resolveEqualityFieldIds() {
if (!tableLoader().isOpen()) {
tableLoader().open();
}

Table table = tableLoader().loadTable();
int formatVersion = TableUtil.formatVersion(table);
Preconditions.checkArgument(
formatVersion >= 3,
"ConvertEqualityDeletes requires table format version >= 3 (DVs), "
+ "but table '%s' is version %s",
tableName(),
formatVersion);

Schema schema = table.schema();
List<Integer> fieldIds = Lists.newArrayListWithCapacity(equalityFieldColumns.size());
for (String column : equalityFieldColumns) {
NestedField field = schema.findField(column);
Preconditions.checkArgument(
field != null,
"Equality field column '%s' not found in table schema %s",
column,
schema);
fieldIds.add(field.fieldId());
}

return ImmutableSet.copyOf(fieldIds);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public class EqualityConvertCommitter extends AbstractStreamOperator<Trigger>

private static final Logger LOG = LoggerFactory.getLogger(EqualityConvertCommitter.class);

static final String COMMITTED_STAGING_SNAPSHOT_PROPERTY = "equality-convert-staging-snapshot";
public static final String COMMITTED_STAGING_SNAPSHOT_PROPERTY =
"equality-convert-staging-snapshot";

private static final String ADDED_DV_NUM_METRIC = "addedDvNum";
private static final String COMMIT_DURATION_MS_METRIC = "commitDurationMs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotChanges;
Expand Down Expand Up @@ -515,6 +516,7 @@ private StagingInputs retrieveStagingFiles(Snapshot stagingSnapshot) {
deleteFile.location(),
deleteFieldIds,
eqFieldIds);
validateDeleteSpecPartitionColumns(stagingSnapshot, deleteFile);
eqDeleteFiles.add(deleteFile);
} else if (ContentFileUtil.isDV(deleteFile)) {
stagingDVFiles.add(deleteFile);
Expand All @@ -531,6 +533,24 @@ private StagingInputs retrieveStagingFiles(Snapshot stagingSnapshot) {
return new StagingInputs(newDataFiles, stagingDVFiles, eqDeleteFiles);
}

private void validateDeleteSpecPartitionColumns(Snapshot stagingSnapshot, DeleteFile deleteFile) {
PartitionSpec spec = table.specs().get(deleteFile.specId());
for (PartitionField field : spec.fields()) {
Preconditions.checkState(
eqFieldIds.contains(field.sourceId()),
"Staging snapshot %s on branch '%s' contains an equality delete file %s under spec %s, "
+ "which partitions by field '%s' (source id %s) that is not an equality field %s. "
+ "Partition columns must be a subset of the equality fields.",
stagingSnapshot.snapshotId(),
stagingBranch,
deleteFile.location(),
spec.specId(),
field.name(),
field.sourceId(),
eqFieldIds);
}
}

/** Files added by one staging snapshot, classified for cycle emission. */
private record StagingInputs(
List<DataFile> newDataFiles,
Expand Down
Loading