Skip to content

Paused consumption across multiple consumers #857

@rbokade-rbk

Description

@rbokade-rbk

We have observed that consumption is frequently halted or paused indefinitely until we restart the pod associated with the partition.

In production, we observe that message consumption is being paused in a partition because the PartitionStateManager marks the partition as lost and skips message processing, assuming it has been assigned to a different consumer.

Rebalance triggered : P12 assigned to same pod
2025-03-03 07:30:15.219 INFO pc-broker-poll io.confluent.parallelconsumer.state.PartitionStateManager Partitions revoked: [app-xxx.consent.consent.receipts-11]
2025-03-03 07:30:15.232 INFO pc-broker-poll io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor Assigned 1 total (1 new) partition(s) [app-xxx.consent.consent.receipts-12]

PartitionStateManager declared partition Lost
2025-03-03 12:07:42.765 INFO pc-broker-poll io.confluent.parallelconsumer.state.PartitionStateManager Lost partitions: [app-xxx.consent.consent.receipts-12]

No work in mailbox though we have enough lag to process and No Inflight messages , last commit was on 23rd , which was paused until 24th .

Image

Logs : Partition is assigned to this pod but as per logs it claims that its assigned to different consumer .

2025-03-03 12:07:42.765 INFO pc-broker-poll io.confluent.parallelconsumer.state.PartitionStateManager Lost partitions: [app-xxx.consent.consent.receipts-12]
2025-03-03 12:07:43.003 INFO (tenant-config-input-consumer,app-xxx.ds-preference-tenant-config.retry).retry-worker-4 com.onetrust.messaging.consumer.KafkaConsumer Partitions assigned: []
2025-03-03 12:07:44.303 INFO pc-broker-poll io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor Assigned 1 total (1 new) partition(s) [app-xxx.consent.consent.receipts-12]
2025-03-04 00:00:19.334 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.ConsumerManager Poll completed normally (after timeout of PT2S) and returned 0...
2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Poll completed
2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Got 0 records in poll result
2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Subscriptions are paused: true
2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: RUNNING
2025-03-04 00:00:19.335 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.ConsumerManager Poll starting with timeout: PT2S
2025-03-04 00:00:21.336 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.ConsumerManager Poll completed normally (after timeout of PT2S) and returned 0...
2025-03-04 00:00:21.336 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Poll completed
2025-03-04 00:00:21.336 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Got 0 records in poll result
2025-03-04 00:00:21.337 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Subscriptions are paused: true
2025-03-04 00:00:21.337 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.BrokerPollSystem Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: RUNNING
2025-03-04 00:00:21.337 DEBUG pc-broker-poll io.confluent.parallelconsumer.internal.ConsumerManager Poll starting with timeout: PT2S
2025-03-04 00:00:22.440 DEBUG pc-control io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor Mailbox results returned null, indicating timeToBlockFor elapsed (which was set as PT4.99999484S)
2025-03-04 00:00:22.440 DEBUG pc-control io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor isPoolQueueLow()? workAmountBelowTarget true 0 vs 10;
2025-03-04 00:00:22.440 DEBUG pc-control io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor Will try to get work - target: 20, current queue size: 0, requesting: 20, loading factor: 2
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771160:k:16ffceee). Skipping message - it's partition has already assigned to a different consumer.
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771160:k:16ffceee)
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771169:k:3a1af6a2). Skipping message - it's partition has already assigned to a different consumer.
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771169:k:3a1af6a2)
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771170:k:fc145cb0). Skipping message - it's partition has already assigned to a different consumer.
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771170:k:fc145cb0)
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771215:k:a7bbbfff). Skipping message - it's partition has already assigned to a different consumer.
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771215:k:a7bbbfff)
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771189:k:3c5ce0b3). Skipping message - it's partition has already assigned to a different consumer.
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771189:k:3c5ce0b3)
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771243:k:8e803207). Skipping message - it's partition has already assigned to a different consumer.
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771243:k:8e803207)
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771168:k:d50689cd). Skipping message - it's partition has already assigned to a different consumer.
2025-03-04 00:00:22.441 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771168:k:d50689cd)
2025-03-04 00:00:22.442 DEBUG pc-control io.confluent.parallelconsumer.state.PartitionState Epoch mismatch 0 vs 2 for record WorkContainer(tp:app-xxx.consent.consent.receipts-12:o:771171:k:20d5aeca). Skipping message - it's partition has already assigned to a different consumer.

On reviewing thread dump , PC threads are in wait state

"(dsCacheSync-in-0,ds-preference-cache-sync).pc-pool-13-thread-1" - Thread t@139
java.lang.Thread.State: WAITING
at java.base@17.0.14/jdk.internal.misc.Unsafe.park(Native Method)
- parking to wait for <70bbade3> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.base@17.0.14/java.util.concurrent.locks.LockSupport.park(Unknown Source)
at java.base@17.0.14/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(Unknown Source)
at java.base@17.0.14/java.util.concurrent.ForkJoinPool.unmanagedBlock(Unknown Source)
at java.base@17.0.14/java.util.concurrent.ForkJoinPool.managedBlock(Unknown Source)
at java.base@17.0.14/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source)
at java.base@17.0.14/java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
at java.base@17.0.14/java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
at java.base@17.0.14/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base@17.0.14/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base@17.0.14/java.lang.Thread.run(Unknown Source)

PC Version in production : 0.5.2.8
Upgrade in progress : 0.5.3.2

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions