Core: Fix concurrent mutation of deleteFiles during parallel manifest filtering#16984
Open
liucao-dd wants to merge 1 commit into
Open
Conversation
…nager for large table overwrite
c17686d to
9d16b52
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
closes #16978
Summary
ManifestFilterManagerwhere parallel manifest filtering concurrently mutates the shareddeleteFilesset, corrupting its backing store.duplicateDeleteCountthread-safe withAtomicInteger.deleteFilesduring a full overwrite across many manifests.Problem
ManifestFilterManager.filterManifests()filters manifests in parallel viaTasks.range(...).executeWith(workerPoolSupplier.get()). During expression-based deletes, each worker thread callsdeleteFiles.add(fileCopy)on a shared instance-level set backed byWrapperSet/LinkedHashSet, which is not thread-safe.Under heavy concurrent adds (large overwrites spanning many manifests), the backing
LinkedHashMapcan corrupt. Observed symptoms includeClassCastException: LinkedHashMap$Entry cannot be cast to HashMap$TreeNode,StackOverflowError, and hangs.duplicateDeleteCountis also incremented from worker threads with a non-atomic++, which can lose increments.Fix
Wrap
deleteFileswithCollections.synchronizedSet(newFileSet())so concurrentadd/containscalls from worker threads are safe, and convertduplicateDeleteCounttoAtomicInteger.Collections.synchronizedSetsynchronizes individual method calls. Iteration is not auto-synchronized, so iteratingdeleteFilesmust remain after the parallel filtering phase (current behavior) or be guarded bysynchronized (deleteFiles). This invariant is documented at the field.Why
Collections.synchronizedSetThe fix is intentionally minimal and confined to
ManifestFilterManager, while preserving the existing semantics ofdeleteFilesexactly:newFileSet()returns aDataFileSet/DeleteFileSet, which match files byfile.location()viaWrapperSet. A raw concurrent set keyed onDataFile/DeleteFilewould use object identity because the file classes do not overrideequals/hashCode. That would break the explicitdeleteFile(...)path: the caller-supplied file instance added todeleteFileswould not match the separate file instance read back from a manifest during filtering.WrapperSetdocuments that it maintains insertion order. It is a sharedapi/type used outside this path, so re-backing it with a non-ordered concurrent map to fix one caller would silently change that contract for every consumer.Test plan
./gradlew :iceberg-core:test --tests org.apache.iceberg.TestOverwrite./gradlew :iceberg-core:test --tests org.apache.iceberg.TestDeleteFiles./gradlew :iceberg-core:test --tests org.apache.iceberg.TestOverwriteWithValidation./gradlew :iceberg-core:spotlessApply :iceberg-core:spotlessJavaChecktestFullOverwriteAcrossManyManifestsdisables manifest merging and appends 16 manifests of 64 files each, using file locations engineered to share oneString.hashCode()so thatDataFileSetwrapper hashes all collide into a single bucket. A full overwrite (alwaysTrue()) then forces many worker threads to add todeleteFilesconcurrently, concentrating contention in that bucket. Without the fix this reproduces the corruption (ClassCastException / StackOverflowError / hang); with the fix it passes consistently.AI Disclosure
LLM is used to research codebase standards and draft the regression test that actually fails without the fix, and drafting the PR description.
Seems like since I opened the issue yesterday there are already 2 open PRs attempted to close the issue, hence adding some thoughts:
A deferred-merge alternative, where worker threads collect deleted files into a concurrent structure and the caller merges them into
deleteFilesafter the parallel phase, is also behavior-preserving: expression-deleted files added todeleteFilesduring filtering are not needed by other workers to decide expression matches, and the actual post-phase consumers (filesToBeDeleted()and required-delete validation) run afterTasks.range(...)joins. It is a reasonable design with a lock-free hot path that is has higher throughput potential, but more invasive. This is available in #16980My PR prefers the synchronized wrapper for its minimal change and exact preservation of
WrapperSetsemantics; a deferred merge is a sensible alternative if we have proof on the lock contention.In object-store-backed deployments, each manifest is read from S3/GCS, so millisecond-scale I/O per manifest dominates both the microseconds of CPU and the nanoseconds of lock. So i would expect the lock to be genuinely negligible here. If the env is local FS, warm cache, manifests of many tiny entries, high core count, then there is merit to the deferred-merge approach.
Deferring the decision to the community and maintainers.