Skip to content

[feat][pip][consumer] Add Pause/Resume/Paused to Consumer#1507

Open
PavelZeger wants to merge 3 commits into
apache:masterfrom
PavelZeger:pip-1504-add-pause-resume-to-consumer
Open

[feat][pip][consumer] Add Pause/Resume/Paused to Consumer#1507
PavelZeger wants to merge 3 commits into
apache:masterfrom
PavelZeger:pip-1504-add-pause-resume-to-consumer

Conversation

@PavelZeger
Copy link
Copy Markdown
Contributor

@PavelZeger PavelZeger commented May 30, 2026

Fixes #1504

Motivation

The Java client exposes pause() / resume() on every Consumer; the Go client has no equivalent. While paused, a consumer should stop issuing flow permits to the broker so no new
messages are delivered, while already-buffered, in-flight, and previously-granted messages remain available via Receive() / Chan() and ack/nack keep working.

Pause/resume is the standard backpressure pattern for when a downstream is temporarily unavailable (database failover, a "halt ingestion" feature flag, orderly shutdown that finishes in-flight work before closing). The only workaround today is closing and re-subscribing, which is heavyweight (re-resolves the topic, redelivers unacked messages, and can rebalance other consumers in KeyShared/Failover).

The semantics were reviewed and agreed in #1504.

Modifications

  • Added Pause(), Resume(), and Paused() to the Consumer interface pulsar/consumer.go), with GoDoc matching the Java semantics.
  • partitionConsumer (pulsar/consumer_partition.go):
  • Added a paused flag and gated permit flow in flowIfNeed — the single place that returns permits to the broker.
  • When paused, the dispatcher connect/reconnect branch stashes the initial grant into availablePermits instead of sending it, so reconnecting while paused does not resume delivery.
  • resume() flushes only the permits actually owed (via flowIfNeed), mirroring Java's increaseAvailablePermits(cnx, 0), rather than granting a fresh full batch (which would over-grant after a mid-stream pause).
  • Wired the API through all four Consumer implementations: consumer, zeroQueueConsumer, multiTopicConsumer, and regexConsumer. Paused() is reported at the top level ("Pause() called more recently than Resume()").
  • Propagated the paused state to dynamically created children: new partitions (internalTopicSubscribeToPartitions) and newly discovered regex topics (regexConsumer.subscribe) inherit the parent's paused state at creation.
  • For the zero-queue consumer, Receive() is unchanged: the existing permit is gated while paused, so Receive(ctx) blocks until Resume() (or the context is cancelled).

Deviations from the original PIP

The observable behavior agreed in #1504 is unchanged; four implementation details differ from the issue's wording. Resume() flushes only the permits actually owed (and the dispatcher stashes the initial grant while paused) instead of sending a full currentQueueSize batch, which would over-grant after a mid-stream pause. The zero-queue consumer reuses the same flow gate instead of a new resumedCh. The getter is named Paused() rather than IsPaused(). BatchReceive is out of scope, since the Go client doesn't have that method yet.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Added TestConsumerPauseResume, TestPartitionedConsumerPauseResume, TestZeroQueueConsumerPauseResume, and TestRegexConsumerPauseResumeInheritsNewTopics in pulsar/consumer_test.go. They verify that a paused consumer stops delivery before draining the full backlog, that Resume() delivers the remainder with no message loss, that Pause()/Resume() are idempotent, that Paused() reflects state, that a newly discovered regex topic inherits the paused state, and that a zero-queue Receive() blocks while paused and returns after resume.
  • Verified locally against Apache Pulsar 4.0.3 standalone (all four tests pass, repeated runs stable);

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: yes — adds Pause(), Resume(), and Paused() to the Consumer interface
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no (uses the existing CommandFlow; pausing simply withholds it)

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? GoDocs (doc comments on the new Consumer interface methods)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[PIP] Add Pause() / Resume() to Consumer

1 participant