[feat] PIP-468: DagWatchClient auto-reconnects on broker disconnect#25687
Open
merlimat wants to merge 1 commit intoapache:masterfrom
Open
[feat] PIP-468: DagWatchClient auto-reconnects on broker disconnect#25687merlimat wants to merge 1 commit intoapache:masterfrom
merlimat wants to merge 1 commit intoapache:masterfrom
Conversation
…nnect Replace the TODO in DagWatchClient.connectionClosed() that left long-lived producers / consumers stranded on a stale layout after a transient broker disconnect. The DAG watch is the consumer / producer's only path to learning about split / merge events; without reconnect, a single network blip silently drops the client off future layout updates and never surfaces an error to the application. The new logic mirrors the proven pattern used by ScalableTopicsWatcher (used for the namespace-watcher session) and ScalableConsumerClient (used for the controller session): - start() (initial create) keeps its current behavior: the future is failed on disconnect so producer.create() / consumer.subscribe() fails fast when the broker is unreachable. - After the initial layout has arrived, connectionClosed() schedules a reconnect with exponential backoff (100 ms initial, 30 s max). The reconnect path calls getConnection again, re-registers the same sessionId on the fresh ClientCnx, and re-issues ScalableTopicLookup; the broker re-pushes the current layout via onUpdate, which resets the backoff. - The connect/reconnect logic is shared in a single attach(cnx) method so both paths use identical wiring. Adds a forceCloseConnectionForTesting() hook (mirroring the one on ScalableConsumerClient) for cross-module integration tests. V5DagWatchAutoReconnectTest force-closes the DAG channel underneath a producer and a consumer and asserts each continues operating. A third test reflectively verifies the DagWatchClient's cnx field is repopulated within the backoff window.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Replace the TODO in
DagWatchClient.connectionClosed()that left long-lived producers / consumers stranded on a stale layout after a transient broker disconnect. The DAG watch is the consumer / producer's only path to learning about split / merge events; without reconnect, a single network blip silently drops the client off future layout updates and never surfaces an error to the application.The new logic mirrors the proven pattern used by
ScalableTopicsWatcher(namespace-watcher session) andScalableConsumerClient(controller session):start()(initial create) keeps its current behavior — the future is failed on disconnect soproducer.create()/consumer.subscribe()fails fast when the broker is unreachable.connectionClosed()schedules a reconnect with exponential backoff (100 ms → 30 s). The reconnect path callsgetConnectionagain, re-registers the samesessionIdon the freshClientCnx, and re-issuesScalableTopicLookup; the broker re-pushes the current layout viaonUpdate, which resets the backoff.attach(cnx)method so both paths use identical wiring.Adds a
forceCloseConnectionForTesting()hook (mirroring the one onScalableConsumerClient) for cross-module integration tests.Test plan
V5DagWatchAutoReconnectTest(3 integration tests):cnxfield onDagWatchClientgets re-populated within the backoff window.pulsar-client-v5andpulsar-brokercheckstyle clean.