From 11db224a0e0c12219fd12d5eb931a6078c25d860 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Fri, 17 Apr 2026 17:16:33 +0200 Subject: [PATCH] fix: add Kafka static membership to stop rebalance storm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 0.3.14 removed the close+recreate on ILLEGAL_GENERATION on the assumption librdkafka would auto-rejoin on the next consume(). It doesn't — commits kept failing indefinitely in production. This restores the 0.3.13 recreate-in-place behavior AND adds static membership (KIP-345) via HOSTNAME (K8s pod name). With a stable group.instance.id the reconnecting consumer is recognized as the same instance, so the close+recreate no longer triggers a full-group rebalance that cascades to invalidate all other consumers' commits. Users can override by setting group.instance.id in consumer_config. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 5 +++ bizon/connectors/sources/kafka/src/source.py | 37 +++++++++++++++----- pyproject.toml | 2 +- uv.lock | 2 +- 4 files changed, 36 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81f4459..15b4472 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.3.15] - 2026-04-17 + +### Fixed +- Kafka rebalance storm, take 2: 0.3.14's "log-and-continue on ILLEGAL_GENERATION" was incorrect — librdkafka did not auto-rejoin after a commit-time generation failure, so the consumer stayed permanently in a stale generation and every subsequent commit failed indefinitely. Reverted to the 0.3.13 recreate-in-place behavior AND added **static membership** (KIP-345): `group.instance.id` is now derived from the `HOSTNAME` env var (auto-populated by Kubernetes to the pod name). With a stable instance ID, the close+recreate on `ILLEGAL_GENERATION` reconnects as the same member instead of joining as a new one, so the broker skips the full-group rebalance. This breaks the cascade that made one pod's eviction invalidate all 15 others. Users can override by setting `group.instance.id` explicitly in `consumer_config`. + ## [0.3.14] - 2026-04-17 ### Fixed diff --git a/bizon/connectors/sources/kafka/src/source.py b/bizon/connectors/sources/kafka/src/source.py index 52a5e9e..11614b4 100644 --- a/bizon/connectors/sources/kafka/src/source.py +++ b/bizon/connectors/sources/kafka/src/source.py @@ -1,3 +1,4 @@ +import os import re import traceback from collections.abc import Mapping @@ -124,6 +125,21 @@ def __init__(self, config: KafkaSourceConfig): self.config.consumer_config["group.id"] = self.config.group_id self.config.consumer_config["bootstrap.servers"] = self.config.bootstrap_servers + # Static group membership (KIP-345): when HOSTNAME is set (Kubernetes + # auto-populates it to the pod name), derive a stable group.instance.id + # so that brief disconnects and in-process consumer recreations (e.g. + # commit()'s recovery path on ILLEGAL_GENERATION) rejoin without + # triggering a group-wide rebalance. Users can override by setting + # group.instance.id explicitly in consumer_config. + if "group.instance.id" not in self.config.consumer_config: + hostname = os.environ.get("HOSTNAME") + if hostname: + self.config.consumer_config["group.instance.id"] = f"{self.config.group_id}-{hostname}" + logger.info( + f"Kafka static membership enabled: " + f"group.instance.id={self.config.consumer_config['group.instance.id']}" + ) + # Set the error callback self.config.consumer_config["error_cb"] = on_error @@ -556,12 +572,12 @@ def get(self, pagination: dict = None) -> SourceIteration: def commit(self): """Commit the offsets of the consumer. - On ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID we log and return without closing - or recreating the consumer. librdkafka's consumer group state machine handles - the rejoin internally on the next consume() call, preserving member.id and - avoiding the LeaveGroup -> cluster-wide rebalance cascade that closing would - trigger. Uncommitted records may be reprocessed by the new partition owner - after the rejoin -- this is Bizon's standard at-least-once contract. + On ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID the consumer was evicted from the + group; close it, recreate in place, and let the next subscribe()/assign() + call rejoin. With static membership (group.instance.id) the broker recognizes + the reconnecting instance and avoids a group-wide rebalance cascade. + The uncommitted batch may be reprocessed by the new partition owner (Kafka + at-least-once); downstream must tolerate duplicates. """ try: self.consumer.commit(asynchronous=False) @@ -569,10 +585,15 @@ def commit(self): error_code = e.args[0].code() if e.args else None if error_code in (KafkaError.ILLEGAL_GENERATION, KafkaError.UNKNOWN_MEMBER_ID): logger.warning( - f"Kafka commit skipped - stale generation (code={error_code}): {e}. " - f"librdkafka will rejoin on next consume(); uncommitted records may be " + f"Kafka commit rejected - consumer evicted from group (code={error_code}): {e}. " + f"Recreating consumer in place; previous iteration's records may be " f"reprocessed by the new partition owner (at-least-once)." ) + try: + self.consumer.close() + except Exception as close_err: + logger.warning(f"Error closing evicted consumer: {close_err}") + self.consumer = Consumer(self.config.consumer_config) return logger.error(f"Kafka exception occurred during commit: {e}") logger.info("Gracefully exiting without committing offsets due to Kafka exception") diff --git a/pyproject.toml b/pyproject.toml index 4bee0c3..737a55b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "bizon" -version = "0.3.14" +version = "0.3.15" description = "Extract and load your data reliably from API Clients with native fault-tolerant and checkpointing mechanism." authors = [ { name = "Antoine Balliet", email = "antoine.balliet@gmail.com" }, diff --git a/uv.lock b/uv.lock index 5a491b5..faea9ee 100644 --- a/uv.lock +++ b/uv.lock @@ -55,7 +55,7 @@ wheels = [ [[package]] name = "bizon" -version = "0.3.14" +version = "0.3.15" source = { editable = "." } dependencies = [ { name = "backoff" },