fix: stop Kafka rebalance storm by not recreating consumer on ILLEGAL_GENERATION#74
Merged
Merged
Conversation
…_GENERATION Closing and recreating the consumer on every ILLEGAL_GENERATION sends LeaveGroup, which triggers a group-wide rebalance. That rebalance invalidates every other consumer's generation and they recreate too — a self-sustaining cascade. Production with 16 replicas showed all pods evicting in perfect millisecond-synchrony every ~35s. librdkafka's consumer group state machine already rejoins automatically on the next consume() after ILLEGAL_GENERATION, preserving member.id and leaving the rest of the group undisturbed. Just log and return. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Remove the
self.consumer.close()+self.consumer = Consumer(...)calls from theILLEGAL_GENERATION/UNKNOWN_MEMBER_IDbranch ofKafkaSource.commit(). Replace with a single log-and-return. librdkafka's consumer group state machine already handles rejoin automatically on the nextconsume()call.Root cause — production evidence
Observed on
conversations-events-bizon-streaming(16 replicas):11:52:49.880,11:54:04.053,11:55:06.167)This is a self-sustaining cascade:
ILLEGAL_GENERATIONLeaveGroup→ coordinator triggers group-wide rebalanceILLEGAL_GENERATIONLeaveGroups → more rebalancesgroup.instance.id) → yet more rebalancesThe storm never dies because each eviction creates more evictions.
Why log-and-return is safe
ILLEGAL_GENERATIONresponsesconsume()automatically doesJoinGroup/SyncGroupto rejoin with the same member.idLeaveGroupis sent, so other pods' generations stay validTest plan
uv run pytest tests/connectors/sources/kafka/— 27 tests passuv run ruff check && ruff formatconversations-eventspipeline (16 replicas). Within one rebalance cycle (~1 min),grep "Kafka commit skipped"should show a handful of transient events, not the sustained 9-per-5min pattern.grep "Recreating consumer"must be zero. Pod restart count must stay at 0.max(__event_timestamp)keeps advancing without stallsFollow-ups (separate PRs)
partition.assignment.strategy: cooperative-sticky(incremental rebalance — defense in depth)group.instance.idfromHOSTNAMEenv var for static membershipconsumer.commitraisingCimplKafkaException(ILLEGAL_GENERATION)and assertingself.consumeris the same instance afterward (regression guard)🤖 Generated with Claude Code