Core: Fix concurrent modification of deleteFiles in ManifestFilterManager#16980
Core: Fix concurrent modification of deleteFiles in ManifestFilterManager#16980lilei1128 wants to merge 2 commits into
Conversation
| } | ||
|
|
||
| @TestTemplate | ||
| public void testConcurrentManifestFilteringWithExpression() { |
There was a problem hiding this comment.
Thanks for picking this up! Since I opened #16978 yesterday and also opened #16984 with a different fix, want to acknowledge that your approach is lock-free on the hot path and has higher throughput potential.
I was thinking that in object-store-backed deployments, each manifest is read from S3/GCS, so millisecond-scale I/O per manifest dominates both the microseconds of CPU and the nanoseconds of lock. Hence I went with a simpler one-liner fix to use synchronizedSet.
The one thing I’d suggest is strengthening the regression test. With only 4 manifests / 4 files, the test may pass even without the fix because it does not create enough concurrent pressure on the underlying LinkedHashSet. In #16984 I used many files across many manifests with colliding file-location hashes to make the unfixed race much easier to reproduce. I’ll defer to maintainers on which implementation they prefer, but wanted to share this since we’re addressing the same issue.
There was a problem hiding this comment.
Thanks for sharing #16984 and the collidingLocation technique — forcing all paths into the same hash bucket is a much more reliable way to reproduce the race than simply increasing file count, and we will adapt that approach in our test.
One note on the synchronizedSet approach: as your own comment acknowledges, synchronizedSet only protects individual add/contains calls — iteration is not covered. validateRequiredDeletes uses containsAll and buildSummary iterates over deleteFiles, both of which are safe today only because they run after filtering completes. That's a correctness constraint buried in call-site ordering rather than enforced by the data structure itself.
Our approach (collect into a ConcurrentLinkedQueue during parallel filtering, then merge into deleteFiles on the calling thread after Tasks.range returns) avoids this entirely — deleteFiles is never written concurrently, so iteration safety is guaranteed structurally rather than by convention.
When multiple worker threads processed manifests in parallel via
Tasks.range(), expression-matched files were written directly to the
non-thread-safe deleteFiles set, causing data races and incorrect
deleted-file counts.
Close #16978
Fix by collecting expression-deleted files into a ConcurrentLinkedQueue
during parallel filtering, then merging into deleteFiles on the calling
thread after all workers complete. Also convert duplicateDeleteCount
from int to AtomicInteger to safely handle concurrent increments across
worker threads.
Add a regression test that appends 128 files in separate commits (one
manifest each) and deletes all via overwriteByRowFilter with a 16-thread
pool. File paths are constructed using the "Aa"/"BB" Java hash-collision
pair so all locations share the same String.hashCode(), concentrating
concurrent adds into the same DataFileSet hash bucket and making the
race deterministically reproducible on。unfixed code.