[fix][broker] Fix flaky OneWayReplicatorDeduplicationTest by coalescing snapshot requests#25679
[fix][broker] Fix flaky OneWayReplicatorDeduplicationTest by coalescing snapshot requests#25679Praveenkumar76 wants to merge 1 commit intoapache:masterfrom
Conversation
…ng snapshot requests
The title of the PR should reflect that this is a production code change, not a fix to a flaky test. |
lhotari
left a comment
There was a problem hiding this comment.
I don't think that MessageDuplication needs to be changed. The test itself is invalid and should be fixed instead. Here's the explanation of MessageDuplication, answer from DeepWiki:
MessageDeduplication.takeSnapshot(Position)— Role and CorrectnessRole of the method
The private
takeSnapshot(Position)inorg.apache.pulsar.broker.service.persistent.MessageDeduplicationpersists the current deduplication state to aManagedCursor. The snapshot captures the highest sequence IDs persisted per producer (thehighestSequencedPersistedmap) and marks the cursor at the suppliedPosition, which corresponds to the last confirmed entry in theManagedLedger. This is what allows deduplication state to be recovered correctly after a broker restart or topic unload/reload.When it's invoked
- Periodically, after
snapshotIntervalentries have been persisted (counter-driven, fromrecordMessagePersistedNormal/recordMessagePersistedRepl).- After replaying the cursor on a deduplication status check, if enough entries were processed.
- After purging inactive producers, to persist the trimmed state.
- From a scheduled task driven by
brokerDeduplicationSnapshotFrequencyInSeconds/brokerDeduplicationSnapshotIntervalSeconds, via the publictakeSnapshot()wrapper.- From
BacklogQuotaManagerwhen the deduplication cursor is the slowest consumer and needs to be advanced.Correctness requirements when one is already in progress
Concurrency is gated by an
AtomicBoolean snapshotTaking. If a snapshot is requested while another is in flight, the new request is dropped (not queued) and a warning is logged — there is no waiting and no replacement. The contract relies on two things to stay correct under that drop:
- The snapshot reads
highestSequencedPersistedat invocation time and pairs it with the suppliedPosition(last confirmed entry), so each snapshot is internally consistent.- The underlying
ManagedCursor.markDeleteis monotonic — it never moves the cursor backward — so even though concurrent requests are skipped, missing one is safe: a later snapshot will advance the cursor to a position at least as far as the dropped one would have. Combined with the periodic/scheduled triggers, the cursor is guaranteed to keep moving forward.The practical implication: callers must not assume their requested snapshot actually ran. The mechanism is designed to be lossy-but-monotonic — fine for periodic persistence, but not something to rely on for "snapshot exactly at this position right now" semantics.
Source: DeepWiki query on apache/pulsar
One possible way to address the flaky test problem is to configure brokerDeduplicationSnapshotIntervalSeconds to a low value in the test instead of relying on brokerDeduplicationEntriesInterval. This shouldn't change the intention of the test at all.
Fixes #25141
Motivation
OneWayReplicatorDeduplicationTest.testDeduplicationis flaky and occasionally fails with aConditionTimeoutException.The root cause is a race condition in
MessageDeduplication.takeSnapshot. When multiple snapshot requests occur concurrently, the current implementation uses acompareAndSetguard to allow only one active snapshot. If another request arrives while a snapshot is already in progress, the method immediately returns a completed future, effectively dropping the new request.Since the test depends on the snapshot reaching a specific state, dropping requests leads to inconsistent behavior and eventual timeouts.
Modifications
MessageDeduplication.takeSnapshotto ensure snapshot requests are not silently ignored.nextSnapshotFutureto track pending snapshot requests.CompletableFuture.Verifying this change
This change is already covered by existing tests, such as:
org.apache.pulsar.broker.service.OneWayReplicatorDeduplicationTest.testDeduplicationHighlight of changes:
takeSnapshotto safely coordinate snapshot request batching. This avoids dropping concurrent requests without introducing significant contention or blocking.Does this pull request potentially affect one of the following parts: