From fb007374196d687fae37895cfb4f3b58e6563603 Mon Sep 17 00:00:00 2001 From: sanshi <1715734693@qq.com> Date: Sat, 27 Jun 2026 15:09:38 +0800 Subject: [PATCH 1/2] Core:Fix concurrent modification of deleteFiles in ManifestFilterManager --- .../apache/iceberg/ManifestFilterManager.java | 40 ++++++++++++++----- .../org/apache/iceberg/TestDeleteFiles.java | 28 +++++++++++++ 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 7d146d924667..d17dbbf7f542 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -22,7 +22,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -82,7 +84,7 @@ public String partition() { private long minSequenceNumber = 0; private boolean failAnyDelete = false; private boolean failMissingDeletePaths = false; - private int duplicateDeleteCount = 0; + private final AtomicInteger duplicateDeleteCount = new AtomicInteger(0); private boolean caseSensitive = true; private boolean allDeletesReferenceManifests = true; // this is only being used for the DeleteManifestFilterManager to detect orphaned DVs for removed @@ -217,6 +219,8 @@ List filterManifests(Schema tableSchema, List manife boolean trustManifestReferences = canTrustManifestReferences(manifests); ManifestFile[] filtered = new ManifestFile[manifests.size()]; + // collect files deleted by expression match across parallel workers, then merge below + Queue expressionDeletedFiles = new ConcurrentLinkedQueue<>(); // open all of the manifest files in parallel, use index to avoid reordering Tasks.range(filtered.length) .stopOnFailure() @@ -225,10 +229,18 @@ List filterManifests(Schema tableSchema, List manife .run( index -> { ManifestFile manifest = - filterManifest(tableSchema, manifests.get(index), trustManifestReferences); + filterManifest( + tableSchema, + manifests.get(index), + trustManifestReferences, + expressionDeletedFiles); filtered[index] = manifest; }); + // merge files deleted by expression into deleteFiles on the single calling thread, + // avoiding any concurrent access to the non-thread-safe deleteFiles set + expressionDeletedFiles.forEach(deleteFiles::add); + validateRequiredDeletes(filtered); return Arrays.asList(filtered); @@ -264,7 +276,7 @@ SnapshotSummary.Builder buildSummary(Iterable manifests) { } } - summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount); + summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount.get()); return summaryBuilder; } @@ -366,7 +378,10 @@ private void invalidateFilteredCache() { * @return a ManifestReader that is a filtered version of the input manifest. */ private ManifestFile filterManifest( - Schema tableSchema, ManifestFile manifest, boolean trustManifestReferences) { + Schema tableSchema, + ManifestFile manifest, + boolean trustManifestReferences, + Queue expressionDeletedFiles) { ManifestFile cached = filteredManifests.get(manifest); if (cached != null) { return cached; @@ -385,7 +400,8 @@ private ManifestFile filterManifest( // manifest without copying data. if a manifest does have a file to remove, this will break // out of the loop and move on to filtering the manifest. if (manifestHasDeletedFiles(evaluator, manifest, reader)) { - ManifestFile filtered = filterManifestWithDeletedFiles(evaluator, manifest, reader); + ManifestFile filtered = + filterManifestWithDeletedFiles(evaluator, manifest, reader, expressionDeletedFiles); replacedManifestsCount.incrementAndGet(); return filtered; } else { @@ -496,7 +512,10 @@ private boolean isDanglingDV(DeleteFile file) { @SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"}) private ManifestFile filterManifestWithDeletedFiles( - PartitionAndMetricsEvaluator evaluator, ManifestFile manifest, ManifestReader reader) { + PartitionAndMetricsEvaluator evaluator, + ManifestFile manifest, + ManifestReader reader, + Queue expressionDeletedFiles) { boolean isDelete = reader.isDeleteManifestReader(); // when this point is reached, there is at least one file that will be deleted in the // manifest. produce a copy of the manifest with all deleted files removed. @@ -533,16 +552,17 @@ private ManifestFile filterManifestWithDeletedFiles( if (allRowsMatch) { writer.delete(entry); F fileCopy = file.copyWithoutStats(); - // add the file here in case it was deleted using an expression. The - // DeleteManifestFilterManager will then remove its matching DV - deleteFiles.add(fileCopy); + // add the file to the queue so the calling thread can later merge it into + // deleteFiles. DeleteManifestFilterManager uses deleteFiles to find and + // remove matching Deletion Vectors for expression-matched deletions. + expressionDeletedFiles.add(fileCopy); if (deletedFiles.contains(file)) { LOG.warn( "Deleting a duplicate path from manifest {}: {}", manifest.path(), file.location()); - duplicateDeleteCount += 1; + duplicateDeleteCount.incrementAndGet(); } else { // only add the file to deletes if it is a new delete // this keeps the snapshot summary accurate for non-duplicate data diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index d7cdd5c5d884..684067f06859 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -26,6 +26,8 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.ManifestEntry.Status; @@ -738,6 +740,32 @@ public void removingDataFilesWhenTruncatingAlsoRemovesDVs() { statuses(Status.DELETED, Status.DELETED)); } + @TestTemplate + public void testConcurrentManifestFilteringWithExpression() { + // append each file in a separate commit so each gets its own manifest, + // ensuring all 4 manifests are processed concurrently by the worker pool + commit(table, table.newAppend().appendFile(FILE_A), branch); + commit(table, table.newAppend().appendFile(FILE_B), branch); + commit(table, table.newAppend().appendFile(FILE_C), branch); + commit(table, table.newAppend().appendFile(FILE_D), branch); + + ExecutorService pool = Executors.newFixedThreadPool(4); + try { + commit( + table, + table + .newOverwrite() + .overwriteByRowFilter(Expressions.alwaysTrue()) + .scanManifestsWith(pool), + branch); + } finally { + pool.shutdown(); + } + + assertThat(latestSnapshot(table, branch).summary()) + .containsEntry(SnapshotSummary.DELETED_FILES_PROP, "4"); + } + private static ByteBuffer longToBuffer(long value) { return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value); } From 57e0271dab1850619e157b329527e346fb3fb6ef Mon Sep 17 00:00:00 2001 From: sanshi <1715734693@qq.com> Date: Sun, 28 Jun 2026 15:18:49 +0800 Subject: [PATCH 2/2] Improve regression test for concurrent manifest filtering --- .../org/apache/iceberg/TestDeleteFiles.java | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index 684067f06859..eaaae29c90cc 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -742,14 +742,22 @@ public void removingDataFilesWhenTruncatingAlsoRemovesDVs() { @TestTemplate public void testConcurrentManifestFilteringWithExpression() { - // append each file in a separate commit so each gets its own manifest, - // ensuring all 4 manifests are processed concurrently by the worker pool - commit(table, table.newAppend().appendFile(FILE_A), branch); - commit(table, table.newAppend().appendFile(FILE_B), branch); - commit(table, table.newAppend().appendFile(FILE_C), branch); - commit(table, table.newAppend().appendFile(FILE_D), branch); - - ExecutorService pool = Executors.newFixedThreadPool(4); + // each file gets its own manifest so all manifests are processed concurrently by the worker + // pool; colliding paths share a String.hashCode() value, concentrating concurrent adds into + // the same DataFileSet hash bucket to expose races in unfixed code deterministically + int fileCount = 128; + for (int i = 0; i < fileCount; i++) { + DataFile file = + DataFiles.builder(SPEC) + .withPath(collidingPath(i)) + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=" + (i % BUCKETS_NUMBER)) + .withRecordCount(1) + .build(); + commit(table, table.newAppend().appendFile(file), branch); + } + + ExecutorService pool = Executors.newFixedThreadPool(16); try { commit( table, @@ -763,7 +771,17 @@ public void testConcurrentManifestFilteringWithExpression() { } assertThat(latestSnapshot(table, branch).summary()) - .containsEntry(SnapshotSummary.DELETED_FILES_PROP, "4"); + .containsEntry(SnapshotSummary.DELETED_FILES_PROP, String.valueOf(fileCount)); + } + + // "Aa" and "BB" are a classic Java hash-collision pair (both hash to 2112), so all generated + // paths share the same String.hashCode() and land in the same DataFileSet hash bucket + private static String collidingPath(int index) { + StringBuilder path = new StringBuilder("/path/to/collide-"); + for (int bit = 0; bit < 7; bit++) { + path.append(((index >> bit) & 1) == 0 ? "Aa" : "BB"); + } + return path.append(".parquet").toString(); } private static ByteBuffer longToBuffer(long value) {