Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,4 +431,31 @@ type Consumer interface {

// Name returns the name of consumer.
Name() string

// Pause stops the consumer from requesting more messages from the broker.
//
// Messages already buffered, in-flight, or previously granted to this consumer
// remain available via Receive() / Chan(); acknowledgement still works. This
// might cause Receive() to block until Resume() is called and new messages are
// pushed by the broker. For a zero-queue consumer, Receive blocks until Resume()
// or the context is canceled.
//
// Pausing only withholds flow permits; the consumer stays connected, so it does
// not change subscription membership:
// - Shared: dispatch moves to other consumers once granted permits run out.
// - KeyShared: the paused consumer's keys are not reassigned; their backlog grows.
// - Failover: no failover is triggered; the consumer stays active.
// - Exclusive: the backlog grows.
// In all cases the backlog drains after Resume().
//
// Idempotent.
Pause()

// Resume reverses Pause, letting the consumer request messages again.
//
// Idempotent.
Resume()

// Paused reports whether Pause() has been called more recently than Resume().
Paused() bool
}
23 changes: 23 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type consumer struct {
consumerName string
disableForceTopicCreation bool

paused atomic.Bool

// channel used to deliver message to clients
messageCh chan ConsumerMessage

Expand Down Expand Up @@ -318,6 +320,24 @@ func (c *consumer) Name() string {
return c.consumerName
}

func (c *consumer) Pause() {
c.paused.Store(true)
for _, pc := range c.partitionConsumers() {
pc.pause()
}
}

func (c *consumer) Resume() {
c.paused.Store(false)
for _, pc := range c.partitionConsumers() {
pc.resume()
}
}

func (c *consumer) Paused() bool {
return c.paused.Load()
}

func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
Expand Down Expand Up @@ -436,6 +456,9 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {

c.consumers.Store(append([]*partitionConsumer(nil), newConsumers...))
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
if c.paused.Load() {
newConsumers[partitionIdx].pause()
}
newConsumers[partitionIdx].startDispatcher()
}
if newNumPartitions < oldNumPartitions {
Expand Down
21 changes: 21 additions & 0 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
Expand Down Expand Up @@ -78,6 +79,8 @@ type multiTopicConsumer struct {
closeOnce sync.Once
closeCh chan struct{}

paused atomic.Bool

log log.Logger
}

Expand Down Expand Up @@ -360,3 +363,21 @@ func (c *multiTopicConsumer) SeekByTime(_ time.Time) error {
func (c *multiTopicConsumer) Name() string {
return c.consumerName
}

func (c *multiTopicConsumer) Pause() {
c.paused.Store(true)
for _, con := range c.consumers {
con.Pause()
}
}

func (c *multiTopicConsumer) Resume() {
c.paused.Store(false)
for _, con := range c.consumers {
con.Resume()
}
}

func (c *multiTopicConsumer) Paused() bool {
return c.paused.Load()
}
26 changes: 22 additions & 4 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,11 @@ type partitionConsumer struct {
// After executing seekByTime, the client is unaware of the startMessageId.
// Use this flag to compare markDeletePosition with BrokerLastMessageId when checking hasMoreMessages.
hasSoughtByTime atomic.Bool
ctx context.Context
cancelFunc context.CancelFunc

paused atomic.Bool

ctx context.Context
cancelFunc context.CancelFunc
}

// pauseDispatchMessage used to discard the message in the dispatcher goroutine.
Expand Down Expand Up @@ -306,6 +309,10 @@ func (p *availablePermits) get() int32 {
}

func (p *availablePermits) flowIfNeed() {
if p.pc.paused.Load() {
return
}

// TODO implement a better flow controller
// send more permits if needed
var flowThreshold int32
Expand Down Expand Up @@ -1771,6 +1778,16 @@ func (pc *partitionConsumer) SetRedirectedClusterURI(redirectedClusterURI string
pc.redirectedClusterURI = redirectedClusterURI
}

func (pc *partitionConsumer) pause() {
pc.paused.Store(true)
}

func (pc *partitionConsumer) resume() {
if pc.paused.CompareAndSwap(true, false) {
pc.availablePermits.flowIfNeed()
}
}

// Flow command gives additional permits to send messages to the consumer.
// A typical consumer implementation will use a queue to accumulate these messages
// before the application is ready to consume them. After the consumer is ready,
Expand Down Expand Up @@ -1879,8 +1896,9 @@ func (pc *partitionConsumer) dispatcher() {
}

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
// send initial permits
if err := pc.internalFlow(initialPermits); err != nil && !pc.options.enableZeroQueueConsumer {
if pc.paused.Load() {
pc.availablePermits.add(int32(initialPermits))
} else if err := pc.internalFlow(initialPermits); err != nil && !pc.options.enableZeroQueueConsumer {
pc.log.WithError(err).Error("unable to send initial permits to broker")
}

Expand Down
32 changes: 30 additions & 2 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"time"

pkgerrors "github.com/pkg/errors"
Expand Down Expand Up @@ -52,6 +53,8 @@ type regexConsumer struct {
consumersLock sync.Mutex
consumers map[string]Consumer

paused atomic.Bool

closeOnce sync.Once
closeCh chan struct{}

Expand Down Expand Up @@ -348,6 +351,28 @@ func (c *regexConsumer) Name() string {
return c.consumerName
}

func (c *regexConsumer) Pause() {
c.consumersLock.Lock()
defer c.consumersLock.Unlock()
c.paused.Store(true)
for _, con := range c.consumers {
con.Pause()
}
}

func (c *regexConsumer) Resume() {
c.consumersLock.Lock()
defer c.consumersLock.Unlock()
c.paused.Store(false)
for _, con := range c.consumers {
con.Resume()
}
}

func (c *regexConsumer) Paused() bool {
return c.paused.Load()
}

func (c *regexConsumer) closed() bool {
select {
case <-c.closeCh:
Expand Down Expand Up @@ -422,8 +447,11 @@ func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRou

c.consumersLock.Lock()
defer c.consumersLock.Unlock()
for t, consumer := range consumers {
c.consumers[t] = consumer
for t, con := range consumers {
if c.paused.Load() {
con.Pause()
}
c.consumers[t] = con
}
}

Expand Down
Loading