Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -71,7 +72,10 @@ public String partition() {

private final Map<Integer, PartitionSpec> specsById;
private final PartitionSet deleteFilePartitions;
private final Set<F> deleteFiles = newFileSet();
// Manifests are filtered in parallel and workers add expression-deleted files to this set.
// SynchronizedSet protects individual operations; iteration must remain after filtering completes
// or be guarded by synchronized(deleteFiles).
private final Set<F> deleteFiles = Collections.synchronizedSet(newFileSet());
private final Set<String> manifestsWithDeletes = Sets.newHashSet();
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
Expand All @@ -82,7 +86,7 @@ public String partition() {
private long minSequenceNumber = 0;
private boolean failAnyDelete = false;
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
private final AtomicInteger duplicateDeleteCount = new AtomicInteger(0);
private boolean caseSensitive = true;
private boolean allDeletesReferenceManifests = true;
// this is only being used for the DeleteManifestFilterManager to detect orphaned DVs for removed
Expand Down Expand Up @@ -264,7 +268,7 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
}
}

summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount);
summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount.get());

return summaryBuilder;
}
Expand Down Expand Up @@ -542,7 +546,7 @@ private ManifestFile filterManifestWithDeletedFiles(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
file.location());
duplicateDeleteCount += 1;
duplicateDeleteCount.incrementAndGet();
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
Expand Down
52 changes: 52 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestOverwrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
import static org.apache.iceberg.expressions.Expressions.and;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.lessThan;
Expand Down Expand Up @@ -335,4 +336,55 @@ public void testValidatedOverwriteWithAppendSuccess() {

assertThat(latestSnapshot(base, branch).snapshotId()).isEqualTo(baseId);
}

@TestTemplate
public void testFullOverwriteAcrossManyManifests() {
// A full overwrite filters existing manifests in parallel and records every removed file in
// ManifestFilterManager's shared deleteFiles set. The colliding file locations below
// concentrate many concurrent adds in the same hash bucket, guarding against regressions that
// mutate the
// set without synchronization.
table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, "false").commit();

int manifestCount = 16;
int filesPerManifest = 64;

// FILE_0_TO_4 and FILE_5_TO_9 were appended together in createTestTable.
int liveFiles = 2;
int globalIndex = 0;
for (int i = 0; i < manifestCount; i++) {
AppendFiles append = table.newAppend();
for (int j = 0; j < filesPerManifest; j++) {
append.appendFile(
DataFiles.builder(PARTITION_BY_DATE)
.withPath(collidingLocation(globalIndex))
.withFormat(FileFormat.PARQUET)
.withFileSizeInBytes(10)
.withRecordCount(1)
.withPartitionPath("date=2018-06-08")
.build());
liveFiles += 1;
globalIndex += 1;
}
commit(table, append, branch);
}

Snapshot overwriteSnapshot =
commit(table, table.newOverwrite().overwriteByRowFilter(alwaysTrue()), branch);

assertThat(overwriteSnapshot.operation()).isEqualTo(DataOperations.DELETE);
assertThat(overwriteSnapshot.summary())
.containsEntry(SnapshotSummary.DELETED_FILES_PROP, String.valueOf(liveFiles))
.doesNotContainKey(SnapshotSummary.DELETED_DUPLICATE_FILES);
}

// Builds unique file locations with identical String.hashCode() values. DataFileSet hashes by
// location, so these paths exercise contention in ManifestFilterManager's shared deleteFiles set.
private static String collidingLocation(int index) {
StringBuilder location = new StringBuilder("/path/to/collide-");
for (int bit = 0; bit < 12; bit++) {
location.append(((index >> bit) & 1) == 0 ? "Aa" : "BB");
}
return location.toString();
}
}