diff --git a/api/src/main/java/org/apache/iceberg/util/ThreadSafeFileSet.java b/api/src/main/java/org/apache/iceberg/util/ThreadSafeFileSet.java
new file mode 100644
index 000000000000..6ad6bb17f33f
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/util/ThreadSafeFileSet.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * A thread-safe set implementation for Iceberg {@link org.apache.iceberg.ContentFile} objects.
+ *
+ *
This class provides a concurrent alternative to {@link DataFileSet} and {@link DeleteFileSet}
+ * for scenarios where files must be collected from multiple worker threads. The standard {@link
+ * WrapperSet} implementations use {@link java.util.LinkedHashSet} internally, which is
+ * not thread-safe and can corrupt when mutated concurrently (see Iceberg issue
+ * #16978).
+ *
+ *
Unlike {@link WrapperSet}, this implementation does not preserve insertion
+ * order. If order is required, collect files in a thread-safe structure and copy to a {@link
+ * DataFileSet} or {@link DeleteFileSet} on the main thread after parallel work completes.
+ *
+ * @param the file type, typically {@link org.apache.iceberg.DataFile} or {@link
+ * org.apache.iceberg.DeleteFile}
+ */
+public class ThreadSafeFileSet implements Set {
+ private final Set delegate;
+
+ @SuppressWarnings("unchecked")
+ public static ThreadSafeFileSet create() {
+ return new ThreadSafeFileSet<>();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ThreadSafeFileSet create(Iterable files) {
+ ThreadSafeFileSet set = new ThreadSafeFileSet<>();
+ files.forEach(set::add);
+ return set;
+ }
+
+ private ThreadSafeFileSet() {
+ this.delegate = Collections.newSetFromMap(Maps.newConcurrentMap());
+ }
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return delegate.contains(o);
+ }
+
+ @Override
+ public Iterator iterator() {
+ return delegate.iterator();
+ }
+
+ @Override
+ public Object[] toArray() {
+ return delegate.toArray();
+ }
+
+ @Override
+ public T[] toArray(T[] a) {
+ return delegate.toArray(a);
+ }
+
+ @Override
+ public boolean add(F f) {
+ return delegate.add(f);
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return delegate.remove(o);
+ }
+
+ @Override
+ public boolean containsAll(Collection> c) {
+ return delegate.containsAll(c);
+ }
+
+ @Override
+ public boolean addAll(Collection extends F> c) {
+ return delegate.addAll(c);
+ }
+
+ @Override
+ public boolean retainAll(Collection> c) {
+ return delegate.retainAll(c);
+ }
+
+ @Override
+ public boolean removeAll(Collection> c) {
+ return delegate.removeAll(c);
+ }
+
+ @Override
+ public void clear() {
+ delegate.clear();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ThreadSafeFileSet> that = (ThreadSafeFileSet>) o;
+ return delegate.equals(that.delegate);
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
index 7d146d924667..ce92d3d1530e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -49,6 +50,7 @@
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadSafeFileSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,7 +73,12 @@ public String partition() {
private final Map specsById;
private final PartitionSet deleteFilePartitions;
- private final Set deleteFiles = newFileSet();
+ // Thread-safe set for collecting deleted files during parallel manifest filtering.
+ // Uses ConcurrentHashMap-backed set to prevent the concurrent mutation corruption
+ // described in issue #16978. The set is still populated single-threaded after
+ // all parallel filtering completes, but the thread-safe backing provides defense
+ // in depth against future concurrent access patterns.
+ private final Set deleteFiles = newThreadSafeFileSet();
private final Set manifestsWithDeletes = Sets.newHashSet();
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
@@ -93,7 +100,10 @@ public String partition() {
private final Map filteredManifests = Maps.newConcurrentMap();
// tracking where files were deleted to validate retries quickly
- private final Map> filteredManifestToDeletedFiles =
+ // Thread-safe map for collecting per-manifest deleted files during parallel filtering.
+ // ConcurrentHashMap is used because worker threads write to this map from filterManifest()
+ // while the main thread reads from it after all tasks complete (see issue #16978).
+ private final ConcurrentMap> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();
private final Supplier workerPoolSupplier;
@@ -114,6 +124,24 @@ protected ManifestFilterManager(
protected abstract Set newFileSet();
+ /**
+ * Creates a thread-safe set for collecting files in concurrent contexts.
+ *
+ *
This is used for the {@code deleteFiles} collection in ManifestFilterManager because worker
+ * threads may read this set during parallel manifest filtering (e.g., {@code
+ * manifestHasDeletedFiles} checks {@code deleteFiles.contains(file)}). While the current fix
+ * removes concurrent writes to this set, using a thread-safe backing collection provides defense
+ * in depth against future regressions (see issue #16978).
+ *
+ *
The default implementation returns a {@link ThreadSafeFileSet}. Subclasses may override this
+ * if they require different thread-safe semantics.
+ *
+ * @return a new thread-safe set for collecting files
+ */
+ protected Set newThreadSafeFileSet() {
+ return ThreadSafeFileSet.create();
+ }
+
protected void failAnyDelete() {
this.failAnyDelete = true;
}
@@ -217,6 +245,12 @@ List filterManifests(Schema tableSchema, List manife
boolean trustManifestReferences = canTrustManifestReferences(manifests);
ManifestFile[] filtered = new ManifestFile[manifests.size()];
+
+ // Collect deleted files per-manifest during parallel filtering.
+ // Using a concurrent map allows worker threads to safely append their
+ // local results without mutating shared mutable state (see issue #16978).
+ ConcurrentMap> concurrentDeletedFiles = Maps.newConcurrentMap();
+
// open all of the manifest files in parallel, use index to avoid reordering
Tasks.range(filtered.length)
.stopOnFailure()
@@ -227,13 +261,57 @@ List filterManifests(Schema tableSchema, List manife
ManifestFile manifest =
filterManifest(tableSchema, manifests.get(index), trustManifestReferences);
filtered[index] = manifest;
+
+ // After filtering, safely copy the per-manifest deleted files into
+ // the concurrent collection. This replaces the old pattern of
+ // directly mutating the shared deleteFiles set from worker threads.
+ Iterable manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+ if (manifestDeletes != null) {
+ List files = Lists.newArrayList(manifestDeletes);
+ if (!files.isEmpty()) {
+ concurrentDeletedFiles.put(manifest, files);
+ }
+ }
});
+ // Merge all per-manifest deleted files into the shared deleteFiles set
+ // after all parallel filtering is complete. This single-threaded merge
+ // prevents the concurrent HashMap/LinkedHashSet corruption that caused
+ // ClassCastException during parallel manifest filtering (issue #16978).
+ mergeDeletedFiles(concurrentDeletedFiles);
+
validateRequiredDeletes(filtered);
return Arrays.asList(filtered);
}
+ /**
+ * Merges per-manifest deleted files into the shared {@code deleteFiles} set.
+ *
+ *
This method must be called after all parallel manifest filtering has completed. It is not
+ * safe to call while worker threads are still processing manifests because {@code deleteFiles} is
+ * backed by a non-thread-safe LinkedHashSet.
+ *
+ * @param concurrentDeletedFiles map from filtered manifest to its deleted files
+ */
+ private void mergeDeletedFiles(ConcurrentMap> concurrentDeletedFiles) {
+ int totalDeleted = 0;
+ for (List manifestDeletes : concurrentDeletedFiles.values()) {
+ for (F file : manifestDeletes) {
+ if (deleteFiles.add(file)) {
+ totalDeleted += 1;
+ }
+ }
+ }
+
+ if (totalDeleted > 0) {
+ LOG.debug(
+ "Merged {} deleted files from {} manifests into deleteFiles",
+ totalDeleted,
+ concurrentDeletedFiles.size());
+ }
+ }
+
// Use the current set of referenced manifests as a source of truth when it's a subset of all
// manifests and all removals which were performed reference manifests.
// If a manifest without live files is not in the trusted referenced set, this means that the
@@ -535,7 +613,9 @@ private ManifestFile filterManifestWithDeletedFiles(
F fileCopy = file.copyWithoutStats();
// add the file here in case it was deleted using an expression. The
// DeleteManifestFilterManager will then remove its matching DV
- deleteFiles.add(fileCopy);
+ // NOTE: deleted files are merged into deleteFiles after all parallel
+ // filtering completes to avoid concurrent mutation races (see
+ // mergeDeletedFiles)
if (deletedFiles.contains(file)) {
LOG.warn(
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestFilterManagerConcurrency.java b/core/src/test/java/org/apache/iceberg/TestManifestFilterManagerConcurrency.java
new file mode 100644
index 000000000000..48bdb646cd46
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestManifestFilterManagerConcurrency.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for concurrent manifest filtering in ManifestFilterManager.
+ *
+ *
These tests verify that parallel manifest filtering does not corrupt shared mutable state
+ * (specifically the deleteFiles set) when multiple worker threads process manifests concurrently.
+ *
+ * @see Issue #16978
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestManifestFilterManagerConcurrency extends TestBase {
+
+ @Parameter(index = 0)
+ private int formatVersion;
+
+ @Parameter(index = 1)
+ private String branch;
+
+ @Parameters(name = "formatVersion = {0}, branch = {1}")
+ protected static List