Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.3.9] - 2026-04-07

### Fixed
- Reverted the streaming runner ThreadPoolExecutor changes from 0.3.8: the intended `consume(N+1)`/`write(N)` overlap was not actually achieved (the loop waited for the previous write before consuming the next batch), adding thread overhead and a one-iteration commit delay without throughput gain. Restored the simple sequential loop. The `max.poll.interval.ms=600000` Kafka consumer default introduced in 0.3.8 is retained as a safety net.

## [0.3.8] - 2026-04-03

### Fixed
Expand Down
66 changes: 20 additions & 46 deletions bizon/engine/runner/adapters/streaming.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import List

Expand Down Expand Up @@ -112,35 +111,13 @@ def run(self) -> RunnerStatus:

destination.buffer.buffer_size = 0 # force buffer to be flushed immediately
iteration = 0
executor = ThreadPoolExecutor(max_workers=1)
pending_future = None
should_commit = False

while True:
if source.config.max_iterations and iteration > source.config.max_iterations:
# Wait for last write before exiting
if pending_future is not None:
pending_future.result()
logger.info(f"Max iterations {source.config.max_iterations} reached, terminating stream ...")
break

with monitor.trace(operation_name="bizon.stream.iteration"):
# === BACKPRESSURE: wait for previous write to finish ===
if pending_future is not None:
pending_future.result() # raises if write failed
pending_future = None

# Commit only after write confirmed successful
if should_commit and os.getenv("ENVIRONMENT") == "production":
try:
source.commit()
except Exception as e:
logger.error(f"Error committing source: {e}")
monitor.track_pipeline_status(PipelineReturnStatus.ERROR)
return RunnerStatus(stream=PipelineReturnStatus.ERROR)
should_commit = False

# === CONSUME (main thread - keeps heartbeats alive) ===
source_iteration = source.get()

destination_id_indexed_records = {}
Expand All @@ -158,8 +135,6 @@ def run(self) -> RunnerStatus:
else:
destination_id_indexed_records[record.destination_id] = [record]

# === PREPARE DATA (main thread - fast) ===
write_jobs = []
for destination_id, records in destination_id_indexed_records.items():
df_source_records = StreamingRunner.convert_source_records(records)

Expand All @@ -171,31 +146,30 @@ def run(self) -> RunnerStatus:
df_destination_records = StreamingRunner.convert_to_destination_records(
df_source_records, datetime.now(tz=UTC)
)
write_jobs.append(
(destination_id, df_destination_records, len(df_destination_records), dsm_headers)
# Override destination_id
destination.destination_id = destination_id
destination.write_or_buffer_records(
df_destination_records=df_destination_records,
iteration=iteration,
pagination=None,
)
monitor.track_records_synced(
num_records=len(df_destination_records),
destination_id=destination_id,
extra_tags={"destination_id": destination_id},
headers=dsm_headers,
)

if os.getenv("ENVIRONMENT") == "production":
try:
source.commit()
except Exception as e:
logger.error(f"Error committing source: {e}")
monitor.track_pipeline_status(PipelineReturnStatus.ERROR)
return RunnerStatus(stream=PipelineReturnStatus.ERROR)

# === SUBMIT WRITE (background thread - slow BigQuery writes) ===
def do_writes(jobs, iter_num):
for dest_id, df_dest, num_records, headers in jobs:
destination.destination_id = dest_id
destination.write_or_buffer_records(
df_destination_records=df_dest,
iteration=iter_num,
pagination=None,
)
monitor.track_records_synced(
num_records=num_records,
destination_id=dest_id,
extra_tags={"destination_id": dest_id},
headers=headers,
)

pending_future = executor.submit(do_writes, write_jobs, iteration)
should_commit = True
iteration += 1

monitor.track_pipeline_status(PipelineReturnStatus.SUCCESS)

executor.shutdown(wait=True)
return RunnerStatus(stream=PipelineReturnStatus.SUCCESS) # return when max iterations is reached
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "bizon"
version = "0.3.8"
version = "0.3.9"
description = "Extract and load your data reliably from API Clients with native fault-tolerant and checkpointing mechanism."
authors = [
{ name = "Antoine Balliet", email = "antoine.balliet@gmail.com" },
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading