From cce7fa9a03b7930e19b83eb75f8679cac378812c Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Tue, 7 Apr 2026 17:14:10 +0200 Subject: [PATCH] fix: revert streaming runner overlap to fix Kafka lag (v0.3.9) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #66 (v0.3.8) wrapped destination writes in a ThreadPoolExecutor with the goal of overlapping consume(N+1) with write(N) to keep Kafka polls alive during long BigQuery writes. The implementation waits for pending_future.result() *before* calling source.get(), so consume always runs after the previous write finishes — no actual overlap, just thread overhead, a one-iteration commit delay, and a destination-object thread safety footgun. This restores the simple sequential loop. The max.poll.interval.ms=600000 Kafka consumer default introduced in 0.3.8 is intentionally retained as a safety net against rebalance during long writes. Co-Authored-By: Claude Opus 4.6 (1M context) --- CHANGELOG.md | 5 ++ bizon/engine/runner/adapters/streaming.py | 66 +++++++---------------- pyproject.toml | 2 +- uv.lock | 2 +- 4 files changed, 27 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d36974..7d4dcf9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.3.9] - 2026-04-07 + +### Fixed +- Reverted the streaming runner ThreadPoolExecutor changes from 0.3.8: the intended `consume(N+1)`/`write(N)` overlap was not actually achieved (the loop waited for the previous write before consuming the next batch), adding thread overhead and a one-iteration commit delay without throughput gain. Restored the simple sequential loop. The `max.poll.interval.ms=600000` Kafka consumer default introduced in 0.3.8 is retained as a safety net. + ## [0.3.8] - 2026-04-03 ### Fixed diff --git a/bizon/engine/runner/adapters/streaming.py b/bizon/engine/runner/adapters/streaming.py index 3a41ed0..75126e0 100644 --- a/bizon/engine/runner/adapters/streaming.py +++ b/bizon/engine/runner/adapters/streaming.py @@ -1,6 +1,5 @@ import os import time -from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import List @@ -112,35 +111,13 @@ def run(self) -> RunnerStatus: destination.buffer.buffer_size = 0 # force buffer to be flushed immediately iteration = 0 - executor = ThreadPoolExecutor(max_workers=1) - pending_future = None - should_commit = False while True: if source.config.max_iterations and iteration > source.config.max_iterations: - # Wait for last write before exiting - if pending_future is not None: - pending_future.result() logger.info(f"Max iterations {source.config.max_iterations} reached, terminating stream ...") break with monitor.trace(operation_name="bizon.stream.iteration"): - # === BACKPRESSURE: wait for previous write to finish === - if pending_future is not None: - pending_future.result() # raises if write failed - pending_future = None - - # Commit only after write confirmed successful - if should_commit and os.getenv("ENVIRONMENT") == "production": - try: - source.commit() - except Exception as e: - logger.error(f"Error committing source: {e}") - monitor.track_pipeline_status(PipelineReturnStatus.ERROR) - return RunnerStatus(stream=PipelineReturnStatus.ERROR) - should_commit = False - - # === CONSUME (main thread - keeps heartbeats alive) === source_iteration = source.get() destination_id_indexed_records = {} @@ -158,8 +135,6 @@ def run(self) -> RunnerStatus: else: destination_id_indexed_records[record.destination_id] = [record] - # === PREPARE DATA (main thread - fast) === - write_jobs = [] for destination_id, records in destination_id_indexed_records.items(): df_source_records = StreamingRunner.convert_source_records(records) @@ -171,31 +146,30 @@ def run(self) -> RunnerStatus: df_destination_records = StreamingRunner.convert_to_destination_records( df_source_records, datetime.now(tz=UTC) ) - write_jobs.append( - (destination_id, df_destination_records, len(df_destination_records), dsm_headers) + # Override destination_id + destination.destination_id = destination_id + destination.write_or_buffer_records( + df_destination_records=df_destination_records, + iteration=iteration, + pagination=None, ) + monitor.track_records_synced( + num_records=len(df_destination_records), + destination_id=destination_id, + extra_tags={"destination_id": destination_id}, + headers=dsm_headers, + ) + + if os.getenv("ENVIRONMENT") == "production": + try: + source.commit() + except Exception as e: + logger.error(f"Error committing source: {e}") + monitor.track_pipeline_status(PipelineReturnStatus.ERROR) + return RunnerStatus(stream=PipelineReturnStatus.ERROR) - # === SUBMIT WRITE (background thread - slow BigQuery writes) === - def do_writes(jobs, iter_num): - for dest_id, df_dest, num_records, headers in jobs: - destination.destination_id = dest_id - destination.write_or_buffer_records( - df_destination_records=df_dest, - iteration=iter_num, - pagination=None, - ) - monitor.track_records_synced( - num_records=num_records, - destination_id=dest_id, - extra_tags={"destination_id": dest_id}, - headers=headers, - ) - - pending_future = executor.submit(do_writes, write_jobs, iteration) - should_commit = True iteration += 1 monitor.track_pipeline_status(PipelineReturnStatus.SUCCESS) - executor.shutdown(wait=True) return RunnerStatus(stream=PipelineReturnStatus.SUCCESS) # return when max iterations is reached diff --git a/pyproject.toml b/pyproject.toml index 2675412..034d86e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "bizon" -version = "0.3.8" +version = "0.3.9" description = "Extract and load your data reliably from API Clients with native fault-tolerant and checkpointing mechanism." authors = [ { name = "Antoine Balliet", email = "antoine.balliet@gmail.com" }, diff --git a/uv.lock b/uv.lock index 628fd03..03d983a 100644 --- a/uv.lock +++ b/uv.lock @@ -55,7 +55,7 @@ wheels = [ [[package]] name = "bizon" -version = "0.3.8" +version = "0.3.9" source = { editable = "." } dependencies = [ { name = "backoff" },