diff --git a/api/src/main/java/org/apache/iceberg/util/ThreadSafeFileSet.java b/api/src/main/java/org/apache/iceberg/util/ThreadSafeFileSet.java new file mode 100644 index 000000000000..6ad6bb17f33f --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/ThreadSafeFileSet.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * A thread-safe set implementation for Iceberg {@link org.apache.iceberg.ContentFile} objects. + * + *

This class provides a concurrent alternative to {@link DataFileSet} and {@link DeleteFileSet} + * for scenarios where files must be collected from multiple worker threads. The standard {@link + * WrapperSet} implementations use {@link java.util.LinkedHashSet} internally, which is + * not thread-safe and can corrupt when mutated concurrently (see Iceberg issue + * #16978). + * + *

Unlike {@link WrapperSet}, this implementation does not preserve insertion + * order. If order is required, collect files in a thread-safe structure and copy to a {@link + * DataFileSet} or {@link DeleteFileSet} on the main thread after parallel work completes. + * + * @param the file type, typically {@link org.apache.iceberg.DataFile} or {@link + * org.apache.iceberg.DeleteFile} + */ +public class ThreadSafeFileSet implements Set { + private final Set delegate; + + @SuppressWarnings("unchecked") + public static ThreadSafeFileSet create() { + return new ThreadSafeFileSet<>(); + } + + @SuppressWarnings("unchecked") + public static ThreadSafeFileSet create(Iterable files) { + ThreadSafeFileSet set = new ThreadSafeFileSet<>(); + files.forEach(set::add); + return set; + } + + private ThreadSafeFileSet() { + this.delegate = Collections.newSetFromMap(Maps.newConcurrentMap()); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return delegate.contains(o); + } + + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + @Override + public Object[] toArray() { + return delegate.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return delegate.toArray(a); + } + + @Override + public boolean add(F f) { + return delegate.add(f); + } + + @Override + public boolean remove(Object o) { + return delegate.remove(o); + } + + @Override + public boolean containsAll(Collection c) { + return delegate.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + return delegate.addAll(c); + } + + @Override + public boolean retainAll(Collection c) { + return delegate.retainAll(c); + } + + @Override + public boolean removeAll(Collection c) { + return delegate.removeAll(c); + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ThreadSafeFileSet that = (ThreadSafeFileSet) o; + return delegate.equals(that.delegate); + } + + @Override + public int hashCode() { + return delegate.hashCode(); + } + + @Override + public String toString() { + return delegate.toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 7d146d924667..ce92d3d1530e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -49,6 +50,7 @@ import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadSafeFileSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +73,12 @@ public String partition() { private final Map specsById; private final PartitionSet deleteFilePartitions; - private final Set deleteFiles = newFileSet(); + // Thread-safe set for collecting deleted files during parallel manifest filtering. + // Uses ConcurrentHashMap-backed set to prevent the concurrent mutation corruption + // described in issue #16978. The set is still populated single-threaded after + // all parallel filtering completes, but the thread-safe backing provides defense + // in depth against future concurrent access patterns. + private final Set deleteFiles = newThreadSafeFileSet(); private final Set manifestsWithDeletes = Sets.newHashSet(); private final PartitionSet dropPartitions; private final CharSequenceSet deletePaths = CharSequenceSet.empty(); @@ -93,7 +100,10 @@ public String partition() { private final Map filteredManifests = Maps.newConcurrentMap(); // tracking where files were deleted to validate retries quickly - private final Map> filteredManifestToDeletedFiles = + // Thread-safe map for collecting per-manifest deleted files during parallel filtering. + // ConcurrentHashMap is used because worker threads write to this map from filterManifest() + // while the main thread reads from it after all tasks complete (see issue #16978). + private final ConcurrentMap> filteredManifestToDeletedFiles = Maps.newConcurrentMap(); private final Supplier workerPoolSupplier; @@ -114,6 +124,24 @@ protected ManifestFilterManager( protected abstract Set newFileSet(); + /** + * Creates a thread-safe set for collecting files in concurrent contexts. + * + *

This is used for the {@code deleteFiles} collection in ManifestFilterManager because worker + * threads may read this set during parallel manifest filtering (e.g., {@code + * manifestHasDeletedFiles} checks {@code deleteFiles.contains(file)}). While the current fix + * removes concurrent writes to this set, using a thread-safe backing collection provides defense + * in depth against future regressions (see issue #16978). + * + *

The default implementation returns a {@link ThreadSafeFileSet}. Subclasses may override this + * if they require different thread-safe semantics. + * + * @return a new thread-safe set for collecting files + */ + protected Set newThreadSafeFileSet() { + return ThreadSafeFileSet.create(); + } + protected void failAnyDelete() { this.failAnyDelete = true; } @@ -217,6 +245,12 @@ List filterManifests(Schema tableSchema, List manife boolean trustManifestReferences = canTrustManifestReferences(manifests); ManifestFile[] filtered = new ManifestFile[manifests.size()]; + + // Collect deleted files per-manifest during parallel filtering. + // Using a concurrent map allows worker threads to safely append their + // local results without mutating shared mutable state (see issue #16978). + ConcurrentMap> concurrentDeletedFiles = Maps.newConcurrentMap(); + // open all of the manifest files in parallel, use index to avoid reordering Tasks.range(filtered.length) .stopOnFailure() @@ -227,13 +261,57 @@ List filterManifests(Schema tableSchema, List manife ManifestFile manifest = filterManifest(tableSchema, manifests.get(index), trustManifestReferences); filtered[index] = manifest; + + // After filtering, safely copy the per-manifest deleted files into + // the concurrent collection. This replaces the old pattern of + // directly mutating the shared deleteFiles set from worker threads. + Iterable manifestDeletes = filteredManifestToDeletedFiles.get(manifest); + if (manifestDeletes != null) { + List files = Lists.newArrayList(manifestDeletes); + if (!files.isEmpty()) { + concurrentDeletedFiles.put(manifest, files); + } + } }); + // Merge all per-manifest deleted files into the shared deleteFiles set + // after all parallel filtering is complete. This single-threaded merge + // prevents the concurrent HashMap/LinkedHashSet corruption that caused + // ClassCastException during parallel manifest filtering (issue #16978). + mergeDeletedFiles(concurrentDeletedFiles); + validateRequiredDeletes(filtered); return Arrays.asList(filtered); } + /** + * Merges per-manifest deleted files into the shared {@code deleteFiles} set. + * + *

This method must be called after all parallel manifest filtering has completed. It is not + * safe to call while worker threads are still processing manifests because {@code deleteFiles} is + * backed by a non-thread-safe LinkedHashSet. + * + * @param concurrentDeletedFiles map from filtered manifest to its deleted files + */ + private void mergeDeletedFiles(ConcurrentMap> concurrentDeletedFiles) { + int totalDeleted = 0; + for (List manifestDeletes : concurrentDeletedFiles.values()) { + for (F file : manifestDeletes) { + if (deleteFiles.add(file)) { + totalDeleted += 1; + } + } + } + + if (totalDeleted > 0) { + LOG.debug( + "Merged {} deleted files from {} manifests into deleteFiles", + totalDeleted, + concurrentDeletedFiles.size()); + } + } + // Use the current set of referenced manifests as a source of truth when it's a subset of all // manifests and all removals which were performed reference manifests. // If a manifest without live files is not in the trusted referenced set, this means that the @@ -535,7 +613,9 @@ private ManifestFile filterManifestWithDeletedFiles( F fileCopy = file.copyWithoutStats(); // add the file here in case it was deleted using an expression. The // DeleteManifestFilterManager will then remove its matching DV - deleteFiles.add(fileCopy); + // NOTE: deleted files are merged into deleteFiles after all parallel + // filtering completes to avoid concurrent mutation races (see + // mergeDeletedFiles) if (deletedFiles.contains(file)) { LOG.warn( diff --git a/core/src/test/java/org/apache/iceberg/TestManifestFilterManagerConcurrency.java b/core/src/test/java/org/apache/iceberg/TestManifestFilterManagerConcurrency.java new file mode 100644 index 000000000000..48bdb646cd46 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestFilterManagerConcurrency.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Tests for concurrent manifest filtering in ManifestFilterManager. + * + *

These tests verify that parallel manifest filtering does not corrupt shared mutable state + * (specifically the deleteFiles set) when multiple worker threads process manifests concurrently. + * + * @see Issue #16978 + */ +@ExtendWith(ParameterizedTestExtension.class) +public class TestManifestFilterManagerConcurrency extends TestBase { + + @Parameter(index = 0) + private int formatVersion; + + @Parameter(index = 1) + private String branch; + + @Parameters(name = "formatVersion = {0}, branch = {1}") + protected static List parameters() { + return TestHelpers.ALL_VERSIONS.stream() + .flatMap( + v -> + Stream.of( + new Object[] {v, SnapshotRef.MAIN_BRANCH}, new Object[] {v, "testBranch"})) + .collect(Collectors.toList()); + } + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); + + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("data").build(); + + private static final int MANIFEST_COUNT = 32; + private static final int FILES_PER_MANIFEST = 4; + + @TestTemplate + public void testParallelOverwriteDoesNotCorruptDeleteFiles() { + // Create a table with many manifests to ensure parallel processing + table = create(SCHEMA, SPEC); + + // Append files across multiple manifests by committing in batches + for (int m = 0; m < MANIFEST_COUNT; m++) { + int manifestIndex = m; + List files = + IntStream.range(0, FILES_PER_MANIFEST) + .mapToObj( + f -> + DataFiles.builder(SPEC) + .withPath("/path/to/data-" + manifestIndex + "-" + f + ".parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=batch" + manifestIndex) + .withRecordCount(1) + .build()) + .collect(Collectors.toList()); + + AppendFiles append = table.newAppend(); + files.forEach(append::appendFile); + commit(table, append, branch); + } + + // Verify we have the expected number of manifests + assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(MANIFEST_COUNT); + + // Perform a full overwrite using a row filter that matches all rows. + // This triggers parallel manifest filtering and previously caused + // concurrent mutation of the shared deleteFiles set. + OverwriteFiles overwrite = table.newOverwrite().overwriteByRowFilter(Expressions.alwaysTrue()); + + // The overwrite should complete without throwing ClassCastException + // or any other concurrent modification error + assertThatNoException().isThrownBy(() -> commit(table, overwrite, branch)); + + // Verify the overwrite succeeded and all old files were deleted + assertThat(table.currentSnapshot().addedDataFiles(table.io())).isEmpty(); + assertThat(table.currentSnapshot().removedDataFiles(table.io())).isEmpty(); + } + + @TestTemplate + public void testParallelDeleteByExpressionCollectsAllFiles() { + table = create(SCHEMA, SPEC); + + // Append files across multiple manifests + for (int m = 0; m < MANIFEST_COUNT; m++) { + int manifestIndex = m; + List files = + IntStream.range(0, FILES_PER_MANIFEST) + .mapToObj( + f -> + DataFiles.builder(SPEC) + .withPath("/path/to/data-" + manifestIndex + "-" + f + ".parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data=batch" + manifestIndex) + .withRecordCount(1) + .build()) + .collect(Collectors.toList()); + + AppendFiles append = table.newAppend(); + files.forEach(append::appendFile); + commit(table, append, branch); + } + + // Delete files by partition expression that matches all partitions + DeleteFiles delete = table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()); + + assertThatNoException().isThrownBy(() -> commit(table, delete, branch)); + + // After deleting all files, the table should have no data files + assertThat(table.currentSnapshot().dataManifests(table.io())).isEmpty(); + } +}