[feat][pip][consumer] Add Pause/Resume/Paused to Consumer#1507
Open
PavelZeger wants to merge 3 commits into
Open
[feat][pip][consumer] Add Pause/Resume/Paused to Consumer#1507PavelZeger wants to merge 3 commits into
PavelZeger wants to merge 3 commits into
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #1504
Motivation
The Java client exposes
pause()/resume()on everyConsumer; the Go client has no equivalent. While paused, a consumer should stop issuing flow permits to the broker so no newmessages are delivered, while already-buffered, in-flight, and previously-granted messages remain available via
Receive()/Chan()and ack/nack keep working.Pause/resume is the standard backpressure pattern for when a downstream is temporarily unavailable (database failover, a "halt ingestion" feature flag, orderly shutdown that finishes in-flight work before closing). The only workaround today is closing and re-subscribing, which is heavyweight (re-resolves the topic, redelivers unacked messages, and can rebalance other consumers in
KeyShared/Failover).The semantics were reviewed and agreed in #1504.
Modifications
Pause(),Resume(), andPaused()to theConsumerinterfacepulsar/consumer.go), with GoDoc matching the Java semantics.partitionConsumer(pulsar/consumer_partition.go):pausedflag and gated permit flow inflowIfNeed— the single place that returns permits to the broker.availablePermitsinstead of sending it, so reconnecting while paused does not resume delivery.resume()flushes only the permits actually owed (viaflowIfNeed), mirroring Java'sincreaseAvailablePermits(cnx, 0), rather than granting a fresh full batch (which would over-grant after a mid-stream pause).Consumerimplementations:consumer,zeroQueueConsumer,multiTopicConsumer, andregexConsumer.Paused()is reported at the top level ("Pause()called more recently thanResume()").internalTopicSubscribeToPartitions) and newly discovered regex topics (regexConsumer.subscribe) inherit the parent's paused state at creation.Receive()is unchanged: the existing permit is gated while paused, soReceive(ctx)blocks untilResume()(or the context is cancelled).Deviations from the original PIP
The observable behavior agreed in #1504 is unchanged; four implementation details differ from the issue's wording.
Resume()flushes only the permits actually owed (and the dispatcher stashes the initial grant while paused) instead of sending a fullcurrentQueueSizebatch, which would over-grant after a mid-stream pause. The zero-queue consumer reuses the same flow gate instead of a newresumedCh. The getter is namedPaused()rather thanIsPaused().BatchReceiveis out of scope, since the Go client doesn't have that method yet.Verifying this change
This change added tests and can be verified as follows:
TestConsumerPauseResume,TestPartitionedConsumerPauseResume,TestZeroQueueConsumerPauseResume, andTestRegexConsumerPauseResumeInheritsNewTopicsinpulsar/consumer_test.go. They verify that a paused consumer stops delivery before draining the full backlog, thatResume()delivers the remainder with no message loss, thatPause()/Resume()are idempotent, thatPaused()reflects state, that a newly discovered regex topic inherits the paused state, and that a zero-queueReceive()blocks while paused and returns after resume.Does this pull request potentially affect one of the following parts:
Pause(),Resume(), andPaused()to theConsumerinterfaceCommandFlow; pausing simply withholds it)Documentation
Consumerinterface methods)