Skip to content

fix: stop Kafka rebalance storm by not recreating consumer on ILLEGAL_GENERATION#74

Merged
anaselmhamdi merged 1 commit into
mainfrom
fix/kafka-rebalance-storm-no-recreate
Apr 17, 2026
Merged

fix: stop Kafka rebalance storm by not recreating consumer on ILLEGAL_GENERATION#74
anaselmhamdi merged 1 commit into
mainfrom
fix/kafka-rebalance-storm-no-recreate

Conversation

@anaselmhamdi
Copy link
Copy Markdown
Collaborator

Summary

Remove the self.consumer.close() + self.consumer = Consumer(...) calls from the ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID branch of KafkaSource.commit(). Replace with a single log-and-return. librdkafka's consumer group state machine already handles rejoin automatically on the next consume() call.

Root cause — production evidence

Observed on conversations-events-bizon-streaming (16 replicas):

  • Every pod shows identical eviction counts (exactly 9 in 5min, 23-26 in 15min)
  • Eviction timestamps match to the millisecond across pods (e.g. 11:52:49.880, 11:54:04.053, 11:55:06.167)
  • One rebalance event every ~35s, affecting all 16 pods simultaneously

This is a self-sustaining cascade:

  1. Pod A commit fails with ILLEGAL_GENERATION
  2. Pod A closes consumer → sends LeaveGroup → coordinator triggers group-wide rebalance
  3. During rebalance, all other pods' generations become stale → their next commits also fail with ILLEGAL_GENERATION
  4. All pods recreate → more LeaveGroups → more rebalances
  5. Fresh consumers rejoin as new dynamic members (no group.instance.id) → yet more rebalances

The storm never dies because each eviction creates more evictions.

Why log-and-return is safe

  • librdkafka preserves the Consumer object's internal state (member.id, metadata cache, connections) across ILLEGAL_GENERATION responses
  • Next consume() automatically does JoinGroup / SyncGroup to rejoin with the same member.id
  • No LeaveGroup is sent, so other pods' generations stay valid
  • The recreate was never preventing duplicates — the uncommitted batch was already in BigQuery (at-least-once). Recreating only triggered the storm.

Test plan

  • uv run pytest tests/connectors/sources/kafka/ — 27 tests pass
  • uv run ruff check && ruff format
  • Deploy to conversations-events pipeline (16 replicas). Within one rebalance cycle (~1 min), grep "Kafka commit skipped" should show a handful of transient events, not the sustained 9-per-5min pattern. grep "Recreating consumer" must be zero. Pod restart count must stay at 0.
  • BigQuery max(__event_timestamp) keeps advancing without stalls

Follow-ups (separate PRs)

  • Add partition.assignment.strategy: cooperative-sticky (incremental rebalance — defense in depth)
  • Opt-in group.instance.id from HOSTNAME env var for static membership
  • Unit test mocking consumer.commit raising CimplKafkaException(ILLEGAL_GENERATION) and asserting self.consumer is the same instance afterward (regression guard)

🤖 Generated with Claude Code

…_GENERATION

Closing and recreating the consumer on every ILLEGAL_GENERATION sends
LeaveGroup, which triggers a group-wide rebalance. That rebalance
invalidates every other consumer's generation and they recreate too —
a self-sustaining cascade. Production with 16 replicas showed all pods
evicting in perfect millisecond-synchrony every ~35s.

librdkafka's consumer group state machine already rejoins automatically
on the next consume() after ILLEGAL_GENERATION, preserving member.id
and leaving the rest of the group undisturbed. Just log and return.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@anaselmhamdi anaselmhamdi merged commit da0ee98 into main Apr 17, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant