Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand Down Expand Up @@ -82,7 +84,7 @@ public String partition() {
private long minSequenceNumber = 0;
private boolean failAnyDelete = false;
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
private final AtomicInteger duplicateDeleteCount = new AtomicInteger(0);
private boolean caseSensitive = true;
private boolean allDeletesReferenceManifests = true;
// this is only being used for the DeleteManifestFilterManager to detect orphaned DVs for removed
Expand Down Expand Up @@ -217,6 +219,8 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife

boolean trustManifestReferences = canTrustManifestReferences(manifests);
ManifestFile[] filtered = new ManifestFile[manifests.size()];
// collect files deleted by expression match across parallel workers, then merge below
Queue<F> expressionDeletedFiles = new ConcurrentLinkedQueue<>();
// open all of the manifest files in parallel, use index to avoid reordering
Tasks.range(filtered.length)
.stopOnFailure()
Expand All @@ -225,10 +229,18 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
.run(
index -> {
ManifestFile manifest =
filterManifest(tableSchema, manifests.get(index), trustManifestReferences);
filterManifest(
tableSchema,
manifests.get(index),
trustManifestReferences,
expressionDeletedFiles);
filtered[index] = manifest;
});

// merge files deleted by expression into deleteFiles on the single calling thread,
// avoiding any concurrent access to the non-thread-safe deleteFiles set
expressionDeletedFiles.forEach(deleteFiles::add);

validateRequiredDeletes(filtered);

return Arrays.asList(filtered);
Expand Down Expand Up @@ -264,7 +276,7 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
}
}

summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount);
summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount.get());

return summaryBuilder;
}
Expand Down Expand Up @@ -366,7 +378,10 @@ private void invalidateFilteredCache() {
* @return a ManifestReader that is a filtered version of the input manifest.
*/
private ManifestFile filterManifest(
Schema tableSchema, ManifestFile manifest, boolean trustManifestReferences) {
Schema tableSchema,
ManifestFile manifest,
boolean trustManifestReferences,
Queue<F> expressionDeletedFiles) {
ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
Expand All @@ -385,7 +400,8 @@ private ManifestFile filterManifest(
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
if (manifestHasDeletedFiles(evaluator, manifest, reader)) {
ManifestFile filtered = filterManifestWithDeletedFiles(evaluator, manifest, reader);
ManifestFile filtered =
filterManifestWithDeletedFiles(evaluator, manifest, reader, expressionDeletedFiles);
replacedManifestsCount.incrementAndGet();
return filtered;
} else {
Expand Down Expand Up @@ -496,7 +512,10 @@ private boolean isDanglingDV(DeleteFile file) {

@SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"})
private ManifestFile filterManifestWithDeletedFiles(
PartitionAndMetricsEvaluator evaluator, ManifestFile manifest, ManifestReader<F> reader) {
PartitionAndMetricsEvaluator evaluator,
ManifestFile manifest,
ManifestReader<F> reader,
Queue<F> expressionDeletedFiles) {
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
Expand Down Expand Up @@ -533,16 +552,17 @@ private ManifestFile filterManifestWithDeletedFiles(
if (allRowsMatch) {
writer.delete(entry);
F fileCopy = file.copyWithoutStats();
// add the file here in case it was deleted using an expression. The
// DeleteManifestFilterManager will then remove its matching DV
deleteFiles.add(fileCopy);
// add the file to the queue so the calling thread can later merge it into
// deleteFiles. DeleteManifestFilterManager uses deleteFiles to find and
// remove matching Deletion Vectors for expression-matched deletions.
expressionDeletedFiles.add(fileCopy);

if (deletedFiles.contains(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
file.location());
duplicateDeleteCount += 1;
duplicateDeleteCount.incrementAndGet();
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
Expand Down
46 changes: 46 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ManifestEntry.Status;
Expand Down Expand Up @@ -738,6 +740,50 @@ public void removingDataFilesWhenTruncatingAlsoRemovesDVs() {
statuses(Status.DELETED, Status.DELETED));
}

@TestTemplate
public void testConcurrentManifestFilteringWithExpression() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up! Since I opened #16978 yesterday and also opened #16984 with a different fix, want to acknowledge that your approach is lock-free on the hot path and has higher throughput potential.

I was thinking that in object-store-backed deployments, each manifest is read from S3/GCS, so millisecond-scale I/O per manifest dominates both the microseconds of CPU and the nanoseconds of lock. Hence I went with a simpler one-liner fix to use synchronizedSet.

The one thing I’d suggest is strengthening the regression test. With only 4 manifests / 4 files, the test may pass even without the fix because it does not create enough concurrent pressure on the underlying LinkedHashSet. In #16984 I used many files across many manifests with colliding file-location hashes to make the unfixed race much easier to reproduce. I’ll defer to maintainers on which implementation they prefer, but wanted to share this since we’re addressing the same issue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing #16984 and the collidingLocation technique — forcing all paths into the same hash bucket is a much more reliable way to reproduce the race than simply increasing file count, and we will adapt that approach in our test.

One note on the synchronizedSet approach: as your own comment acknowledges, synchronizedSet only protects individual add/contains calls — iteration is not covered. validateRequiredDeletes uses containsAll and buildSummary iterates over deleteFiles, both of which are safe today only because they run after filtering completes. That's a correctness constraint buried in call-site ordering rather than enforced by the data structure itself.

Our approach (collect into a ConcurrentLinkedQueue during parallel filtering, then merge into deleteFiles on the calling thread after Tasks.range returns) avoids this entirely — deleteFiles is never written concurrently, so iteration safety is guaranteed structurally rather than by convention.

// each file gets its own manifest so all manifests are processed concurrently by the worker
// pool; colliding paths share a String.hashCode() value, concentrating concurrent adds into
// the same DataFileSet hash bucket to expose races in unfixed code deterministically
int fileCount = 128;
for (int i = 0; i < fileCount; i++) {
DataFile file =
DataFiles.builder(SPEC)
.withPath(collidingPath(i))
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=" + (i % BUCKETS_NUMBER))
.withRecordCount(1)
.build();
commit(table, table.newAppend().appendFile(file), branch);
}

ExecutorService pool = Executors.newFixedThreadPool(16);
try {
commit(
table,
table
.newOverwrite()
.overwriteByRowFilter(Expressions.alwaysTrue())
.scanManifestsWith(pool),
branch);
} finally {
pool.shutdown();
}

assertThat(latestSnapshot(table, branch).summary())
.containsEntry(SnapshotSummary.DELETED_FILES_PROP, String.valueOf(fileCount));
}

// "Aa" and "BB" are a classic Java hash-collision pair (both hash to 2112), so all generated
// paths share the same String.hashCode() and land in the same DataFileSet hash bucket
private static String collidingPath(int index) {
StringBuilder path = new StringBuilder("/path/to/collide-");
for (int bit = 0; bit < 7; bit++) {
path.append(((index >> bit) & 1) == 0 ? "Aa" : "BB");
}
return path.append(".parquet").toString();
}

private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
Expand Down