Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,8 +923,9 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer)
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)

if consumer, ok := c.consumerHandler(consumerID); ok {
c.DeleteConsumeHandler(consumerID) //delete the handle from registry first
consumer.ConnectionClosed(closeConsumer)
c.DeleteConsumeHandler(consumerID)

} else {
c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer")
}
Expand Down
44 changes: 44 additions & 0 deletions pulsar/internal/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,47 @@
}),
}
}

type MockConsumerHandler struct {
ConsumerHandler
closeCount int32
conn *connection
consumerID uint64
mapCheck bool
}

func (m *MockConsumerHandler) ConnectionClosed(closeCmd *pb.CommandCloseConsumer) {

Check failure on line 236 in pulsar/internal/connection_test.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'closeCmd' seems to be unused, consider removing or renaming it as _ (revive)
atomic.AddInt32(&m.closeCount, 1)

Check failure on line 238 in pulsar/internal/connection_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (goimports)
m.conn.mu.RLock()
_, exists := m.conn.consumerHandlers[m.consumerID]
m.conn.mu.RUnlock()

if !exists {
m.mapCheck = true
}
}

func TestConcurrentCloseConsumerRace(t *testing.T) {
c := newTestConnection()
mockConsumerID := uint64(12345)

mockHandler := &MockConsumerHandler{
conn: c,
consumerID: mockConsumerID,
}

err := c.AddConsumeHandler(mockConsumerID, mockHandler)
require.NoError(t, err)

closeCmd := &pb.CommandCloseConsumer{
ConsumerId: &mockConsumerID,
}

c.handleCloseConsumer(closeCmd)

assert.Equal(t, int32(1), atomic.LoadInt32(&mockHandler.closeCount))

// leaves the consumer in the map while ConnectionClosed runs
assert.True(t, mockHandler.mapCheck, "The consumer handler must be removed from the registry map BEFORE ConnectionClosed is executed")

Check failure on line 269 in pulsar/internal/connection_test.go

View workflow job for this annotation

GitHub Actions / lint

The line is 135 characters long, which exceeds the maximum of 120 characters. (lll)
}
Loading