diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 9aeabb03c5..9a791b3d2c 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -431,4 +431,31 @@ type Consumer interface { // Name returns the name of consumer. Name() string + + // Pause stops the consumer from requesting more messages from the broker. + // + // Messages already buffered, in-flight, or previously granted to this consumer + // remain available via Receive() / Chan(); acknowledgement still works. This + // might cause Receive() to block until Resume() is called and new messages are + // pushed by the broker. For a zero-queue consumer, Receive blocks until Resume() + // or the context is canceled. + // + // Pausing only withholds flow permits; the consumer stays connected, so it does + // not change subscription membership: + // - Shared: dispatch moves to other consumers once granted permits run out. + // - KeyShared: the paused consumer's keys are not reassigned; their backlog grows. + // - Failover: no failover is triggered; the consumer stays active. + // - Exclusive: the backlog grows. + // In all cases the backlog drains after Resume(). + // + // Idempotent. + Pause() + + // Resume reverses Pause, letting the consumer request messages again. + // + // Idempotent. + Resume() + + // Paused reports whether Pause() has been called more recently than Resume(). + Paused() bool } diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 5848b93c6d..318c0fbe02 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -58,6 +58,8 @@ type consumer struct { consumerName string disableForceTopicCreation bool + paused atomic.Bool + // channel used to deliver message to clients messageCh chan ConsumerMessage @@ -318,6 +320,24 @@ func (c *consumer) Name() string { return c.consumerName } +func (c *consumer) Pause() { + c.paused.Store(true) + for _, pc := range c.partitionConsumers() { + pc.pause() + } +} + +func (c *consumer) Resume() { + c.paused.Store(false) + for _, pc := range c.partitionConsumers() { + pc.resume() + } +} + +func (c *consumer) Paused() bool { + return c.paused.Load() +} + func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) { var wg sync.WaitGroup stopDiscoveryCh := make(chan struct{}) @@ -436,6 +456,9 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { c.consumers.Store(append([]*partitionConsumer(nil), newConsumers...)) for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ { + if c.paused.Load() { + newConsumers[partitionIdx].pause() + } newConsumers[partitionIdx].startDispatcher() } if newNumPartitions < oldNumPartitions { diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 3e4c8ebd29..23906e67d2 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/apache/pulsar-client-go/pulsar/internal" @@ -78,6 +79,8 @@ type multiTopicConsumer struct { closeOnce sync.Once closeCh chan struct{} + paused atomic.Bool + log log.Logger } @@ -360,3 +363,21 @@ func (c *multiTopicConsumer) SeekByTime(_ time.Time) error { func (c *multiTopicConsumer) Name() string { return c.consumerName } + +func (c *multiTopicConsumer) Pause() { + c.paused.Store(true) + for _, con := range c.consumers { + con.Pause() + } +} + +func (c *multiTopicConsumer) Resume() { + c.paused.Store(false) + for _, con := range c.consumers { + con.Resume() + } +} + +func (c *multiTopicConsumer) Paused() bool { + return c.paused.Load() +} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 08e1b36f27..41f76b8922 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -254,8 +254,11 @@ type partitionConsumer struct { // After executing seekByTime, the client is unaware of the startMessageId. // Use this flag to compare markDeletePosition with BrokerLastMessageId when checking hasMoreMessages. hasSoughtByTime atomic.Bool - ctx context.Context - cancelFunc context.CancelFunc + + paused atomic.Bool + + ctx context.Context + cancelFunc context.CancelFunc } // pauseDispatchMessage used to discard the message in the dispatcher goroutine. @@ -306,6 +309,10 @@ func (p *availablePermits) get() int32 { } func (p *availablePermits) flowIfNeed() { + if p.pc.paused.Load() { + return + } + // TODO implement a better flow controller // send more permits if needed var flowThreshold int32 @@ -1771,6 +1778,16 @@ func (pc *partitionConsumer) SetRedirectedClusterURI(redirectedClusterURI string pc.redirectedClusterURI = redirectedClusterURI } +func (pc *partitionConsumer) pause() { + pc.paused.Store(true) +} + +func (pc *partitionConsumer) resume() { + if pc.paused.CompareAndSwap(true, false) { + pc.availablePermits.flowIfNeed() + } +} + // Flow command gives additional permits to send messages to the consumer. // A typical consumer implementation will use a queue to accumulate these messages // before the application is ready to consume them. After the consumer is ready, @@ -1879,8 +1896,9 @@ func (pc *partitionConsumer) dispatcher() { } pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits) - // send initial permits - if err := pc.internalFlow(initialPermits); err != nil && !pc.options.enableZeroQueueConsumer { + if pc.paused.Load() { + pc.availablePermits.add(int32(initialPermits)) + } else if err := pc.internalFlow(initialPermits); err != nil && !pc.options.enableZeroQueueConsumer { pc.log.WithError(err).Error("unable to send initial permits to broker") } diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 2548eaa7c1..631ec2b16f 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -25,6 +25,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" pkgerrors "github.com/pkg/errors" @@ -52,6 +53,8 @@ type regexConsumer struct { consumersLock sync.Mutex consumers map[string]Consumer + paused atomic.Bool + closeOnce sync.Once closeCh chan struct{} @@ -348,6 +351,28 @@ func (c *regexConsumer) Name() string { return c.consumerName } +func (c *regexConsumer) Pause() { + c.consumersLock.Lock() + defer c.consumersLock.Unlock() + c.paused.Store(true) + for _, con := range c.consumers { + con.Pause() + } +} + +func (c *regexConsumer) Resume() { + c.consumersLock.Lock() + defer c.consumersLock.Unlock() + c.paused.Store(false) + for _, con := range c.consumers { + con.Resume() + } +} + +func (c *regexConsumer) Paused() bool { + return c.paused.Load() +} + func (c *regexConsumer) closed() bool { select { case <-c.closeCh: @@ -422,8 +447,11 @@ func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRou c.consumersLock.Lock() defer c.consumersLock.Unlock() - for t, consumer := range consumers { - c.consumers[t] = consumer + for t, con := range consumers { + if c.paused.Load() { + con.Pause() + } + c.consumers[t] = con } } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index c3fc82d4dd..40d5db8fe2 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -6391,19 +6391,19 @@ func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) { } func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) { - client, err := NewClient(ClientOptions{URL: serviceURL}) + aClient, err := NewClient(ClientOptions{URL: serviceURL}) require.NoError(t, err) - defer client.Close() + defer aClient.Close() interceptor := &closeInterceptor{fired: make(chan struct{})} - consumer, err := client.Subscribe(ConsumerOptions{ + aConsumer, err := aClient.Subscribe(ConsumerOptions{ Topic: newTopicName(), SubscriptionName: "test-on-close-user", Interceptors: ConsumerInterceptors{interceptor}, }) require.NoError(t, err) - consumer.Close() + aConsumer.Close() select { case <-interceptor.fired: @@ -6412,7 +6412,7 @@ func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) { } assert.Nil(t, interceptor.err, "user-initiated close should report nil cause") - assert.Equal(t, consumer, interceptor.consumer) + assert.Equal(t, aConsumer, interceptor.consumer) } func TestIsNonRetriableSubscribeError(t *testing.T) { @@ -6441,3 +6441,234 @@ func TestIsNonRetriableSubscribeError(t *testing.T) { }) } } + +func drainUntilTimeout(t *testing.T, consumer Consumer, perMsgTimeout time.Duration) int { + t.Helper() + count := 0 + for { + ctx, cancel := context.WithTimeout(context.Background(), perMsgTimeout) + msg, err := consumer.Receive(ctx) + cancel() + if err != nil { + return count + } + + ackErr := consumer.Ack(msg) + if ackErr != nil { + return 0 + } + + count++ + } +} + +func drainExactly(t *testing.T, consumer Consumer, want int) { + t.Helper() + for i := 0; i < want; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + msg, err := consumer.Receive(ctx) + cancel() + assert.Nil(t, err) + if err != nil { + return + } + + ackErr := consumer.Ack(msg) + if ackErr != nil { + return + } + } +} + +func TestConsumerPauseResume(t *testing.T) { + aClient, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer aClient.Close() + + topic := newTopicName() + const numMsg = 20 + + aProducer, err := aClient.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer aProducer.Close() + + for i := 0; i < numMsg; i++ { + _, err := aProducer.Send(context.Background(), &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-%d", i)), + }) + assert.Nil(t, err) + } + + aConsumer, err := aClient.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "pause-resume-sub", + Type: Exclusive, + ReceiverQueueSize: 5, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer aConsumer.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + first, err := aConsumer.Receive(ctx) + cancel() + assert.Nil(t, err) + if err == nil { + assert.NoError(t, aConsumer.Ack(first)) + } + received := 1 + + aConsumer.Pause() + aConsumer.Pause() + assert.True(t, aConsumer.Paused()) + + received += drainUntilTimeout(t, aConsumer, 3*time.Second) + assert.Less(t, received, numMsg, "pause must stop delivery before all messages arrive") + + aConsumer.Resume() + aConsumer.Resume() + assert.False(t, aConsumer.Paused()) + + drainExactly(t, aConsumer, numMsg-received) +} + +func TestPartitionedConsumerPauseResume(t *testing.T) { + aClient, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer aClient.Close() + + topic := newTopicName() + makeHTTPCall(t, http.MethodPut, adminURL+"/admin/v2/persistent/public/default/"+topic+"/partitions", "3") + + const numMsg = 30 + aProducer, err := aClient.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer aProducer.Close() + + for i := 0; i < numMsg; i++ { + _, sendErr := aProducer.Send(context.Background(), &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-%d", i)), + }) + assert.Nil(t, sendErr) + } + + aConsumer, err := aClient.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "pause-resume-partitioned-sub", + Type: Exclusive, + ReceiverQueueSize: 5, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer aConsumer.Close() + + aConsumer.Pause() + assert.True(t, aConsumer.Paused()) + + receivedWhilePaused := drainUntilTimeout(t, aConsumer, 3*time.Second) + assert.Less(t, receivedWhilePaused, numMsg, "pause must stop delivery before all messages arrive") + + aConsumer.Resume() + assert.False(t, aConsumer.Paused()) + + drainExactly(t, aConsumer, numMsg-receivedWhilePaused) +} + +func TestZeroQueueConsumerPauseResume(t *testing.T) { + aClient, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer aClient.Close() + + topic := newTopicName() + aProducer, err := aClient.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer aProducer.Close() + + aConsumer, err := aClient.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "zq-pause-sub", + Type: Exclusive, + EnableZeroQueueConsumer: true, + }) + assert.Nil(t, err) + defer aConsumer.Close() + + _, err = aProducer.Send(context.Background(), &ProducerMessage{Payload: []byte("hello")}) + assert.Nil(t, err) + + aConsumer.Pause() + assert.True(t, aConsumer.Paused()) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + msg, err := aConsumer.Receive(ctx) + cancel() + assert.Nil(t, msg) + assert.NotNil(t, err) + + aConsumer.Resume() + assert.False(t, aConsumer.Paused()) + + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + msg, err = aConsumer.Receive(ctx) + cancel() + assert.Nil(t, err) + if assert.NotNil(t, msg) { + assert.Equal(t, "hello", string(msg.Payload())) + ackErr := aConsumer.Ack(msg) + require.NoError(t, ackErr) + } +} + +func TestRegexConsumerPauseResumeInheritsNewTopics(t *testing.T) { + aClient, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer aClient.Close() + + prefix := fmt.Sprintf("persistent://public/default/pause-regex-%d", time.Now().UnixNano()) + aConsumer, err := aClient.Subscribe(ConsumerOptions{ + TopicsPattern: prefix + "-.*", + SubscriptionName: "regex-pause-sub", + Type: Exclusive, + ReceiverQueueSize: 5, + AutoDiscoveryPeriod: 1 * time.Second, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer aConsumer.Close() + + aConsumer.Pause() + assert.True(t, aConsumer.Paused()) + + const numMsg = 20 + aProducer, err := aClient.CreateProducer(ProducerOptions{ + Topic: prefix + "-new", + DisableBatching: true, + }) + assert.Nil(t, err) + defer aProducer.Close() + for i := 0; i < numMsg; i++ { + _, sendErr := aProducer.Send(context.Background(), &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-%d", i)), + }) + assert.Nil(t, sendErr) + } + + time.Sleep(3 * time.Second) + + receivedWhilePaused := drainUntilTimeout(t, aConsumer, 3*time.Second) + assert.Less(t, receivedWhilePaused, numMsg, "newly discovered topic must inherit paused state") + + aConsumer.Resume() + assert.False(t, aConsumer.Paused()) + + drainExactly(t, aConsumer, numMsg-receivedWhilePaused) +} diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go index 20a0944e35..e3d07ad92b 100644 --- a/pulsar/consumer_zero_queue.go +++ b/pulsar/consumer_zero_queue.go @@ -323,3 +323,15 @@ func (z *zeroQueueConsumer) SeekByTime(time time.Time) error { func (z *zeroQueueConsumer) Name() string { return z.consumerName } + +func (z *zeroQueueConsumer) Pause() { + z.pc.pause() +} + +func (z *zeroQueueConsumer) Resume() { + z.pc.resume() +} + +func (z *zeroQueueConsumer) Paused() bool { + return z.pc.paused.Load() +} diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index e7712356f5..7a5273dd57 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -115,6 +115,14 @@ func (c *mockConsumer) Name() string { return "" } +func (c *mockConsumer) Pause() {} + +func (c *mockConsumer) Resume() {} + +func (c *mockConsumer) Paused() bool { + return false +} + func (c *mockConsumer) GetLastMessageIDs() ([]pulsar.TopicMessageID, error) { ids := make([]pulsar.TopicMessageID, 0) return ids, nil