Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
edd8a8d
Core: fix concurrent manifest filtering race in ManifestFilterManager
goingforstudying-ctrl Jun 27, 2026
6675882
fix(test): correct API usage in TestManifestFilterManagerConcurrency
goingforstudying-ctrl Jun 27, 2026
a7cbfd1
docs: fix javadoc reference to ContentFile with fully-qualified link
goingforstudying-ctrl Jun 28, 2026
3a71d1a
Apply spotless formatting fixes
goingforstudying-ctrl Jun 28, 2026
1f701ca
Fix checkstyle: use Maps.newConcurrentMap() instead of new Concurrent…
goingforstudying-ctrl Jun 28, 2026
164d881
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jun 29, 2026
f336f0c
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jun 29, 2026
55f2ccf
fix: replace Guava Maps.newConcurrentMap with standard ConcurrentHashMap
goingforstudying-ctrl Jun 29, 2026
cbc4300
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jun 30, 2026
f5839ce
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jun 30, 2026
0a7e284
fix: use Maps.newConcurrentMap instead of new ConcurrentHashMap
goingforstudying-ctrl Jun 30, 2026
4ab8c2e
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jul 1, 2026
94f689a
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jul 1, 2026
9ba282a
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jul 1, 2026
372ca12
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jul 1, 2026
bbbf363
Fix checkstyle: use Maps.newConcurrentMap() instead of new Concurrent…
goingforstudying-ctrl Jul 2, 2026
2d1f406
Merge branch 'main' into fix/manifest-filter-thread-safety
goingforstudying-ctrl Jul 2, 2026
7e0f822
fix: remove unused ConcurrentHashMap import to pass spotless check
goingforstudying-ctrl Jul 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/ThreadSafeFileSet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

/**
* A thread-safe set implementation for Iceberg {@link org.apache.iceberg.ContentFile} objects.
*
* <p>This class provides a concurrent alternative to {@link DataFileSet} and {@link DeleteFileSet}
* for scenarios where files must be collected from multiple worker threads. The standard {@link
* WrapperSet} implementations use {@link java.util.LinkedHashSet} internally, which is
* <strong>not</strong> thread-safe and can corrupt when mutated concurrently (see Iceberg issue
* #16978).
*
* <p>Unlike {@link WrapperSet}, this implementation does <strong>not</strong> preserve insertion
* order. If order is required, collect files in a thread-safe structure and copy to a {@link
* DataFileSet} or {@link DeleteFileSet} on the main thread after parallel work completes.
*
* @param <F> the file type, typically {@link org.apache.iceberg.DataFile} or {@link
* org.apache.iceberg.DeleteFile}
*/
public class ThreadSafeFileSet<F> implements Set<F> {
private final Set<F> delegate;

@SuppressWarnings("unchecked")
public static <F> ThreadSafeFileSet<F> create() {
return new ThreadSafeFileSet<>();
}

@SuppressWarnings("unchecked")
public static <F> ThreadSafeFileSet<F> create(Iterable<F> files) {
ThreadSafeFileSet<F> set = new ThreadSafeFileSet<>();
files.forEach(set::add);
return set;
}

private ThreadSafeFileSet() {
this.delegate = Collections.newSetFromMap(Maps.newConcurrentMap());
}

@Override
public int size() {
return delegate.size();
}

@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

@Override
public boolean contains(Object o) {
return delegate.contains(o);
}

@Override
public Iterator<F> iterator() {
return delegate.iterator();
}

@Override
public Object[] toArray() {
return delegate.toArray();
}

@Override
public <T> T[] toArray(T[] a) {
return delegate.toArray(a);
}

@Override
public boolean add(F f) {
return delegate.add(f);
}

@Override
public boolean remove(Object o) {
return delegate.remove(o);
}

@Override
public boolean containsAll(Collection<?> c) {
return delegate.containsAll(c);
}

@Override
public boolean addAll(Collection<? extends F> c) {
return delegate.addAll(c);
}

@Override
public boolean retainAll(Collection<?> c) {
return delegate.retainAll(c);
}

@Override
public boolean removeAll(Collection<?> c) {
return delegate.removeAll(c);
}

@Override
public void clear() {
delegate.clear();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ThreadSafeFileSet<?> that = (ThreadSafeFileSet<?>) o;
return delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return delegate.hashCode();
}

@Override
public String toString() {
return delegate.toString();
}
}
86 changes: 83 additions & 3 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand All @@ -49,6 +50,7 @@
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadSafeFileSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -71,7 +73,12 @@ public String partition() {

private final Map<Integer, PartitionSpec> specsById;
private final PartitionSet deleteFilePartitions;
private final Set<F> deleteFiles = newFileSet();
// Thread-safe set for collecting deleted files during parallel manifest filtering.
// Uses ConcurrentHashMap-backed set to prevent the concurrent mutation corruption
// described in issue #16978. The set is still populated single-threaded after
// all parallel filtering completes, but the thread-safe backing provides defense
// in depth against future concurrent access patterns.
private final Set<F> deleteFiles = newThreadSafeFileSet();
private final Set<String> manifestsWithDeletes = Sets.newHashSet();
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
Expand All @@ -93,7 +100,10 @@ public String partition() {
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
// Thread-safe map for collecting per-manifest deleted files during parallel filtering.
// ConcurrentHashMap is used because worker threads write to this map from filterManifest()
// while the main thread reads from it after all tasks complete (see issue #16978).
private final ConcurrentMap<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();

private final Supplier<ExecutorService> workerPoolSupplier;
Expand All @@ -114,6 +124,24 @@ protected ManifestFilterManager(

protected abstract Set<F> newFileSet();

/**
* Creates a thread-safe set for collecting files in concurrent contexts.
*
* <p>This is used for the {@code deleteFiles} collection in ManifestFilterManager because worker
* threads may read this set during parallel manifest filtering (e.g., {@code
* manifestHasDeletedFiles} checks {@code deleteFiles.contains(file)}). While the current fix
* removes concurrent writes to this set, using a thread-safe backing collection provides defense
* in depth against future regressions (see issue #16978).
*
* <p>The default implementation returns a {@link ThreadSafeFileSet}. Subclasses may override this
* if they require different thread-safe semantics.
*
* @return a new thread-safe set for collecting files
*/
protected Set<F> newThreadSafeFileSet() {
return ThreadSafeFileSet.create();
}

protected void failAnyDelete() {
this.failAnyDelete = true;
}
Expand Down Expand Up @@ -217,6 +245,12 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife

boolean trustManifestReferences = canTrustManifestReferences(manifests);
ManifestFile[] filtered = new ManifestFile[manifests.size()];

// Collect deleted files per-manifest during parallel filtering.
// Using a concurrent map allows worker threads to safely append their
// local results without mutating shared mutable state (see issue #16978).
ConcurrentMap<ManifestFile, List<F>> concurrentDeletedFiles = Maps.newConcurrentMap();

// open all of the manifest files in parallel, use index to avoid reordering
Tasks.range(filtered.length)
.stopOnFailure()
Expand All @@ -227,13 +261,57 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
ManifestFile manifest =
filterManifest(tableSchema, manifests.get(index), trustManifestReferences);
filtered[index] = manifest;

// After filtering, safely copy the per-manifest deleted files into
// the concurrent collection. This replaces the old pattern of
// directly mutating the shared deleteFiles set from worker threads.
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
List<F> files = Lists.newArrayList(manifestDeletes);
if (!files.isEmpty()) {
concurrentDeletedFiles.put(manifest, files);
}
}
});

// Merge all per-manifest deleted files into the shared deleteFiles set
// after all parallel filtering is complete. This single-threaded merge
// prevents the concurrent HashMap/LinkedHashSet corruption that caused
// ClassCastException during parallel manifest filtering (issue #16978).
mergeDeletedFiles(concurrentDeletedFiles);

validateRequiredDeletes(filtered);

return Arrays.asList(filtered);
}

/**
* Merges per-manifest deleted files into the shared {@code deleteFiles} set.
*
* <p>This method must be called after all parallel manifest filtering has completed. It is not
* safe to call while worker threads are still processing manifests because {@code deleteFiles} is
* backed by a non-thread-safe LinkedHashSet.
*
* @param concurrentDeletedFiles map from filtered manifest to its deleted files
*/
private void mergeDeletedFiles(ConcurrentMap<ManifestFile, List<F>> concurrentDeletedFiles) {
int totalDeleted = 0;
for (List<F> manifestDeletes : concurrentDeletedFiles.values()) {
for (F file : manifestDeletes) {
if (deleteFiles.add(file)) {
totalDeleted += 1;
}
}
}

if (totalDeleted > 0) {
LOG.debug(
"Merged {} deleted files from {} manifests into deleteFiles",
totalDeleted,
concurrentDeletedFiles.size());
}
}

// Use the current set of referenced manifests as a source of truth when it's a subset of all
// manifests and all removals which were performed reference manifests.
// If a manifest without live files is not in the trusted referenced set, this means that the
Expand Down Expand Up @@ -535,7 +613,9 @@ private ManifestFile filterManifestWithDeletedFiles(
F fileCopy = file.copyWithoutStats();
// add the file here in case it was deleted using an expression. The
// DeleteManifestFilterManager will then remove its matching DV
deleteFiles.add(fileCopy);
// NOTE: deleted files are merged into deleteFiles after all parallel
// filtering completes to avoid concurrent mutation races (see
// mergeDeletedFiles)

if (deletedFiles.contains(file)) {
LOG.warn(
Expand Down
Loading
Loading