From 5c870f80d80fd740230ce07c8a48b3f2ca9ed688 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Fri, 17 Apr 2026 12:46:22 +0200 Subject: [PATCH] fix: don't crash streaming pipeline on Kafka consumer eviction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID, close the evicted consumer, recreate a fresh one in place, and continue the loop instead of re-raising. Next subscribe()/assign() rejoins the group. Uncommitted records may be reprocessed — consistent with Bizon's at-least-once contract. Bump to 0.3.13. Co-Authored-By: Claude Opus 4.7 --- CHANGELOG.md | 5 +++++ bizon/connectors/sources/kafka/src/source.py | 17 ++++++++++------- pyproject.toml | 2 +- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7f2580..0fa72e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.3.13] - 2026-04-17 + +### Changed +- Kafka source no longer crashes the streaming pipeline when commit fails with `ILLEGAL_GENERATION` / `UNKNOWN_MEMBER_ID` (consumer evicted from the group). Previously the source closed the consumer and re-raised, causing the runner to exit with `SOURCE_ERROR` and the pod to be restarted by Kubernetes. The source now closes the evicted consumer, recreates a fresh one in place, and returns — the next iteration's `subscribe()` / `assign()` rejoins the group cleanly. Uncommitted records from the failed batch may be reprocessed by the new partition owner, which is consistent with Bizon's at-least-once delivery contract. + ## [0.3.12] - 2026-04-17 ### Fixed diff --git a/bizon/connectors/sources/kafka/src/source.py b/bizon/connectors/sources/kafka/src/source.py index 179f4ea..7c2294e 100644 --- a/bizon/connectors/sources/kafka/src/source.py +++ b/bizon/connectors/sources/kafka/src/source.py @@ -559,18 +559,21 @@ def commit(self): self.consumer.commit(asynchronous=False) except CimplKafkaException as e: error_code = e.args[0].code() if e.args else None - # Consumer has been evicted from the group. Further processing causes duplicate - # writes since the new partition owner is already processing these messages. - # Close gracefully and raise so the runner restarts the pod with a fresh consumer. if error_code in (KafkaError.ILLEGAL_GENERATION, KafkaError.UNKNOWN_MEMBER_ID): - logger.error( + # Consumer was evicted from the group. Close it, recreate in place, and + # let the pipeline continue — next subscribe()/assign() rejoins the group. + # The uncommitted batch may be reprocessed by the new partition owner + # (Kafka at-least-once); downstream must tolerate duplicates. + logger.warning( f"Kafka commit rejected - consumer evicted from group (code={error_code}): {e}. " - f"Closing consumer and exiting for clean restart." + f"Recreating consumer in place; previous iteration's records may be " + f"reprocessed by the new partition owner (at-least-once)." ) try: self.consumer.close() except Exception as close_err: - logger.warning(f"Error closing consumer during eviction handling: {close_err}") - raise + logger.warning(f"Error closing evicted consumer: {close_err}") + self.consumer = Consumer(self.config.consumer_config) + return logger.error(f"Kafka exception occurred during commit: {e}") logger.info("Gracefully exiting without committing offsets due to Kafka exception") diff --git a/pyproject.toml b/pyproject.toml index 70ac496..eddba67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "bizon" -version = "0.3.12" +version = "0.3.13" description = "Extract and load your data reliably from API Clients with native fault-tolerant and checkpointing mechanism." authors = [ { name = "Antoine Balliet", email = "antoine.balliet@gmail.com" },