Skip to content

[fix][broker] Fix flaky OneWayReplicatorDeduplicationTest by coalescing snapshot requests#25679

Open
Praveenkumar76 wants to merge 1 commit intoapache:masterfrom
cognitree:fix/flaky-deduplication-test
Open

[fix][broker] Fix flaky OneWayReplicatorDeduplicationTest by coalescing snapshot requests#25679
Praveenkumar76 wants to merge 1 commit intoapache:masterfrom
cognitree:fix/flaky-deduplication-test

Conversation

@Praveenkumar76
Copy link
Copy Markdown
Contributor

Fixes #25141

Motivation

OneWayReplicatorDeduplicationTest.testDeduplication is flaky and occasionally fails with a ConditionTimeoutException.

The root cause is a race condition in MessageDeduplication.takeSnapshot. When multiple snapshot requests occur concurrently, the current implementation uses a compareAndSet guard to allow only one active snapshot. If another request arrives while a snapshot is already in progress, the method immediately returns a completed future, effectively dropping the new request.

Since the test depends on the snapshot reaching a specific state, dropping requests leads to inconsistent behavior and eventual timeouts.

Modifications

  • Implemented request coalescing in MessageDeduplication.takeSnapshot to ensure snapshot requests are not silently ignored.
  • Introduced a nextSnapshotFuture to track pending snapshot requests.
  • When a snapshot is already in progress:
    • Subsequent requests are grouped into a shared CompletableFuture.
  • After the current snapshot completes:
    • Exactly one additional snapshot is triggered to process all queued requests.
  • Ensured minimal synchronization to avoid performance impact while maintaining correctness.

Verifying this change

  • Verified that the flaky test now runs consistently without failures.
  • Tested by introducing artificial delays in snapshot execution to reproduce the race condition.
  • Confirmed that queued requests are properly handled and no longer dropped.

This change is already covered by existing tests, such as:

  • org.apache.pulsar.broker.service.OneWayReplicatorDeduplicationTest.testDeduplication

Highlight of changes:

  • Threading model: Added lightweight synchronization in takeSnapshot to safely coordinate snapshot request batching. This avoids dropping concurrent requests without introducing significant contention or blocking.

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 5, 2026

The root cause is a race condition in MessageDeduplication.takeSnapshot. When multiple snapshot requests occur concurrently, the current implementation uses a compareAndSet guard to allow only one active snapshot. If another request arrives while a snapshot is already in progress, the method immediately returns a completed future, effectively dropping the new request.

The title of the PR should reflect that this is a production code change, not a fix to a flaky test.

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that MessageDuplication needs to be changed. The test itself is invalid and should be fixed instead. Here's the explanation of MessageDuplication, answer from DeepWiki:

MessageDeduplication.takeSnapshot(Position) — Role and Correctness

Role of the method

The private takeSnapshot(Position) in org.apache.pulsar.broker.service.persistent.MessageDeduplication persists the current deduplication state to a ManagedCursor. The snapshot captures the highest sequence IDs persisted per producer (the highestSequencedPersisted map) and marks the cursor at the supplied Position, which corresponds to the last confirmed entry in the ManagedLedger. This is what allows deduplication state to be recovered correctly after a broker restart or topic unload/reload.

When it's invoked

  • Periodically, after snapshotInterval entries have been persisted (counter-driven, from recordMessagePersistedNormal / recordMessagePersistedRepl).
  • After replaying the cursor on a deduplication status check, if enough entries were processed.
  • After purging inactive producers, to persist the trimmed state.
  • From a scheduled task driven by brokerDeduplicationSnapshotFrequencyInSeconds / brokerDeduplicationSnapshotIntervalSeconds, via the public takeSnapshot() wrapper.
  • From BacklogQuotaManager when the deduplication cursor is the slowest consumer and needs to be advanced.

Correctness requirements when one is already in progress

Concurrency is gated by an AtomicBoolean snapshotTaking. If a snapshot is requested while another is in flight, the new request is dropped (not queued) and a warning is logged — there is no waiting and no replacement. The contract relies on two things to stay correct under that drop:

  1. The snapshot reads highestSequencedPersisted at invocation time and pairs it with the supplied Position (last confirmed entry), so each snapshot is internally consistent.
  2. The underlying ManagedCursor.markDelete is monotonic — it never moves the cursor backward — so even though concurrent requests are skipped, missing one is safe: a later snapshot will advance the cursor to a position at least as far as the dropped one would have. Combined with the periodic/scheduled triggers, the cursor is guaranteed to keep moving forward.

The practical implication: callers must not assume their requested snapshot actually ran. The mechanism is designed to be lossy-but-monotonic — fine for periodic persistence, but not something to rely on for "snapshot exactly at this position right now" semantics.


Source: DeepWiki query on apache/pulsar

One possible way to address the flaky test problem is to configure brokerDeduplicationSnapshotIntervalSeconds to a low value in the test instead of relying on brokerDeduplicationEntriesInterval. This shouldn't change the intention of the test at all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: OneWayReplicatorDeduplicationTest.testDeduplication

2 participants