diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 9fd8cef36f..27b50ea60a 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -923,8 +923,9 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) c.log.Infof("Broker notification of Closed consumer: %d", consumerID) if consumer, ok := c.consumerHandler(consumerID); ok { + c.DeleteConsumeHandler(consumerID) //delete the handle from registry first consumer.ConnectionClosed(closeConsumer) - c.DeleteConsumeHandler(consumerID) + } else { c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer") } diff --git a/pulsar/internal/connection_test.go b/pulsar/internal/connection_test.go index 92831cab93..531be07110 100644 --- a/pulsar/internal/connection_test.go +++ b/pulsar/internal/connection_test.go @@ -224,3 +224,47 @@ func newMockMetrics() *Metrics { }), } } + +type MockConsumerHandler struct { + ConsumerHandler + closeCount int32 + conn *connection + consumerID uint64 + mapCheck bool +} + +func (m *MockConsumerHandler) ConnectionClosed(closeCmd *pb.CommandCloseConsumer) { + atomic.AddInt32(&m.closeCount, 1) + + m.conn.mu.RLock() + _, exists := m.conn.consumerHandlers[m.consumerID] + m.conn.mu.RUnlock() + + if !exists { + m.mapCheck = true + } +} + +func TestConcurrentCloseConsumerRace(t *testing.T) { + c := newTestConnection() + mockConsumerID := uint64(12345) + + mockHandler := &MockConsumerHandler{ + conn: c, + consumerID: mockConsumerID, + } + + err := c.AddConsumeHandler(mockConsumerID, mockHandler) + require.NoError(t, err) + + closeCmd := &pb.CommandCloseConsumer{ + ConsumerId: &mockConsumerID, + } + + c.handleCloseConsumer(closeCmd) + + assert.Equal(t, int32(1), atomic.LoadInt32(&mockHandler.closeCount)) + + // leaves the consumer in the map while ConnectionClosed runs + assert.True(t, mockHandler.mapCheck, "The consumer handler must be removed from the registry map BEFORE ConnectionClosed is executed") +}