diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fa72e3..81f4459 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.3.14] - 2026-04-17 + +### Fixed +- Kafka consumer rebalance storm: on `ILLEGAL_GENERATION` / `UNKNOWN_MEMBER_ID` during commit, the source no longer closes and recreates the Consumer. Closing sends `LeaveGroup`, which triggers a group-wide rebalance that invalidates every other consumer's generation, causing them to recreate too — a self-sustaining cascade observed in production with 16 replicas evicting in perfect millisecond-synchrony every ~35s. librdkafka's group state machine already handles the rejoin automatically on the next `consume()` call, preserving `member.id` and keeping the rest of the group undisturbed. Commit errors are now log-and-continue for this error class. + ## [0.3.13] - 2026-04-17 ### Changed diff --git a/bizon/connectors/sources/kafka/src/source.py b/bizon/connectors/sources/kafka/src/source.py index 7c2294e..52a5e9e 100644 --- a/bizon/connectors/sources/kafka/src/source.py +++ b/bizon/connectors/sources/kafka/src/source.py @@ -554,26 +554,25 @@ def get(self, pagination: dict = None) -> SourceIteration: return self.read_topics_manually(pagination) def commit(self): - """Commit the offsets of the consumer""" + """Commit the offsets of the consumer. + + On ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID we log and return without closing + or recreating the consumer. librdkafka's consumer group state machine handles + the rejoin internally on the next consume() call, preserving member.id and + avoiding the LeaveGroup -> cluster-wide rebalance cascade that closing would + trigger. Uncommitted records may be reprocessed by the new partition owner + after the rejoin -- this is Bizon's standard at-least-once contract. + """ try: self.consumer.commit(asynchronous=False) except CimplKafkaException as e: error_code = e.args[0].code() if e.args else None if error_code in (KafkaError.ILLEGAL_GENERATION, KafkaError.UNKNOWN_MEMBER_ID): - # Consumer was evicted from the group. Close it, recreate in place, and - # let the pipeline continue — next subscribe()/assign() rejoins the group. - # The uncommitted batch may be reprocessed by the new partition owner - # (Kafka at-least-once); downstream must tolerate duplicates. logger.warning( - f"Kafka commit rejected - consumer evicted from group (code={error_code}): {e}. " - f"Recreating consumer in place; previous iteration's records may be " + f"Kafka commit skipped - stale generation (code={error_code}): {e}. " + f"librdkafka will rejoin on next consume(); uncommitted records may be " f"reprocessed by the new partition owner (at-least-once)." ) - try: - self.consumer.close() - except Exception as close_err: - logger.warning(f"Error closing evicted consumer: {close_err}") - self.consumer = Consumer(self.config.consumer_config) return logger.error(f"Kafka exception occurred during commit: {e}") logger.info("Gracefully exiting without committing offsets due to Kafka exception") diff --git a/pyproject.toml b/pyproject.toml index eddba67..4bee0c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "bizon" -version = "0.3.13" +version = "0.3.14" description = "Extract and load your data reliably from API Clients with native fault-tolerant and checkpointing mechanism." authors = [ { name = "Antoine Balliet", email = "antoine.balliet@gmail.com" }, diff --git a/uv.lock b/uv.lock index 658f229..5a491b5 100644 --- a/uv.lock +++ b/uv.lock @@ -55,7 +55,7 @@ wheels = [ [[package]] name = "bizon" -version = "0.3.12" +version = "0.3.14" source = { editable = "." } dependencies = [ { name = "backoff" },