diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 7d146d924667..d5c7b1a31226 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -71,7 +72,10 @@ public String partition() { private final Map specsById; private final PartitionSet deleteFilePartitions; - private final Set deleteFiles = newFileSet(); + // Manifests are filtered in parallel and workers add expression-deleted files to this set. + // SynchronizedSet protects individual operations; iteration must remain after filtering completes + // or be guarded by synchronized(deleteFiles). + private final Set deleteFiles = Collections.synchronizedSet(newFileSet()); private final Set manifestsWithDeletes = Sets.newHashSet(); private final PartitionSet dropPartitions; private final CharSequenceSet deletePaths = CharSequenceSet.empty(); @@ -82,7 +86,7 @@ public String partition() { private long minSequenceNumber = 0; private boolean failAnyDelete = false; private boolean failMissingDeletePaths = false; - private int duplicateDeleteCount = 0; + private final AtomicInteger duplicateDeleteCount = new AtomicInteger(0); private boolean caseSensitive = true; private boolean allDeletesReferenceManifests = true; // this is only being used for the DeleteManifestFilterManager to detect orphaned DVs for removed @@ -264,7 +268,7 @@ SnapshotSummary.Builder buildSummary(Iterable manifests) { } } - summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount); + summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount.get()); return summaryBuilder; } @@ -542,7 +546,7 @@ private ManifestFile filterManifestWithDeletedFiles( "Deleting a duplicate path from manifest {}: {}", manifest.path(), file.location()); - duplicateDeleteCount += 1; + duplicateDeleteCount.incrementAndGet(); } else { // only add the file to deletes if it is a new delete // this keeps the snapshot summary accurate for non-duplicate data diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java b/core/src/test/java/org/apache/iceberg/TestOverwrite.java index efd4935a4b9f..8d436ee6cc1d 100644 --- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java +++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.expressions.Expressions.alwaysTrue; import static org.apache.iceberg.expressions.Expressions.and; import static org.apache.iceberg.expressions.Expressions.equal; import static org.apache.iceberg.expressions.Expressions.lessThan; @@ -335,4 +336,55 @@ public void testValidatedOverwriteWithAppendSuccess() { assertThat(latestSnapshot(base, branch).snapshotId()).isEqualTo(baseId); } + + @TestTemplate + public void testFullOverwriteAcrossManyManifests() { + // A full overwrite filters existing manifests in parallel and records every removed file in + // ManifestFilterManager's shared deleteFiles set. The colliding file locations below + // concentrate many concurrent adds in the same hash bucket, guarding against regressions that + // mutate the + // set without synchronization. + table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, "false").commit(); + + int manifestCount = 16; + int filesPerManifest = 64; + + // FILE_0_TO_4 and FILE_5_TO_9 were appended together in createTestTable. + int liveFiles = 2; + int globalIndex = 0; + for (int i = 0; i < manifestCount; i++) { + AppendFiles append = table.newAppend(); + for (int j = 0; j < filesPerManifest; j++) { + append.appendFile( + DataFiles.builder(PARTITION_BY_DATE) + .withPath(collidingLocation(globalIndex)) + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withPartitionPath("date=2018-06-08") + .build()); + liveFiles += 1; + globalIndex += 1; + } + commit(table, append, branch); + } + + Snapshot overwriteSnapshot = + commit(table, table.newOverwrite().overwriteByRowFilter(alwaysTrue()), branch); + + assertThat(overwriteSnapshot.operation()).isEqualTo(DataOperations.DELETE); + assertThat(overwriteSnapshot.summary()) + .containsEntry(SnapshotSummary.DELETED_FILES_PROP, String.valueOf(liveFiles)) + .doesNotContainKey(SnapshotSummary.DELETED_DUPLICATE_FILES); + } + + // Builds unique file locations with identical String.hashCode() values. DataFileSet hashes by + // location, so these paths exercise contention in ManifestFilterManager's shared deleteFiles set. + private static String collidingLocation(int index) { + StringBuilder location = new StringBuilder("/path/to/collide-"); + for (int bit = 0; bit < 12; bit++) { + location.append(((index >> bit) & 1) == 0 ? "Aa" : "BB"); + } + return location.toString(); + } }