Skip to content

fix(pulsar/internal): concurrent double reconnection loop in connecti…#1506

Open
Skywalkr-dev wants to merge 1 commit into
apache:masterfrom
Skywalkr-dev:fix/consumer-conn-close-race
Open

fix(pulsar/internal): concurrent double reconnection loop in connecti…#1506
Skywalkr-dev wants to merge 1 commit into
apache:masterfrom
Skywalkr-dev:fix/consumer-conn-close-race

Conversation

@Skywalkr-dev
Copy link
Copy Markdown

@Skywalkr-dev Skywalkr-dev commented May 25, 2026

Fixes #1461

Motivation

This Pull Request resolves a critical concurrent race condition within the connection layer pulsar/internal/connection.go where a single consumer handler can inadvertently trigger duplicate background reconnection loops reconnectLoop.
Root Cause:

When the broker sends an explicit CommandCloseConsumer frame, handleCloseConsumer() is invoked.

In the original implementation, the connection fetches the consumer handler from the internal registry map c.consumerHandlers but leaves the entry intact while waiting for the asynchronous consumer.ConnectionClosed(closeConsumer) callback to execute.

If a concurrent network drop or socket disconnect occurs precisely during this window, the connection's failsafe Close() routine wakes up, takes a snapshot of c.consumerHandler, finds the still-registered consumer, and triggers a duplicate handler.ConnectionClosed(nil) callback.

The target partitionConsumer receives two consecutive closure triggers back-to-back, spawning parallel reconnection goroutines that desynchronize internal consumer state, corrupt background tracking, and cause redundant internal receiver queue clearing.

Modifications

Swapped lines inside handleCloseConsumer in pulsar/internal/connection.go to explicitly unregister the consumer from the local connection map via c.DeleteConsumeHandler(consumerID) prior to firing the asynchronous ConnectionClosed workflow.
Added Regression Unit Test: Appended TestConcurrentCloseConsumerRace to pulsar/internal/connection_test.go.

Verifying this change

[x] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

Added a regression unit test TestConcurrentCloseConsumerRace within pulsar/internal/connection_test.go that asserts the handler is removed from the connection registry map before ConnectionClosed executes to eliminate the concurrency race window.

Does this pull request potentially affect one of the following parts:
Dependencies (does it add or upgrade a dependency): no
The public API: no
The schema: no
The default values of configurations: no
The wire protocol: no

Documentation
Does this pull request introduce a new feature? no

…on engine

Signed-off-by: Skywalkr-dev <snaveenbharath2005@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Consumer message loss due to duplicate reconnection

1 participant