Skip to content

fix: revert streaming runner overlap to fix Kafka lag (v0.3.9)#67

Merged
anaselmhamdi merged 1 commit into
mainfrom
revert/streaming-runner-overlap-0.3.9
Apr 7, 2026
Merged

fix: revert streaming runner overlap to fix Kafka lag (v0.3.9)#67
anaselmhamdi merged 1 commit into
mainfrom
revert/streaming-runner-overlap-0.3.9

Conversation

@anaselmhamdi

@anaselmhamdi anaselmhamdi commented Apr 7, 2026

Copy link
Copy Markdown
Collaborator

Summary

Reverts the ThreadPoolExecutor-based streaming runner changes from #66 / v0.3.8. Bumps version to 0.3.9. Keeps max.poll.interval.ms=600000 from #66 — only the runner threading is reverted.

Why

#66 was meant to overlap consume(N+1) with write(N) so Kafka polls stay alive during long BigQuery writes. The implementation does not actually overlap them — the loop body waits on pending_future.result() before calling source.get():

if pending_future is not None:
    pending_future.result()       # blocks for the entire previous BQ write
    ... commit ...
source_iteration = source.get()   # only consumes AFTER the wait
pending_future = executor.submit(do_writes, ...)

So consume(N+1) always runs after write(N) finishes — same wall-clock as the old sequential code, plus:

  • ThreadPoolExecutor + closure allocation per iteration
  • A one-iteration commit delay (offsets lag by one batch)
  • Background thread mutating destination.destination_id (latent thread-safety footgun)
  • executor.shutdown(wait=True) was unreachable in production (no max_iterations)

Combined with the "stabilization paradox" — #60 / #62 / #63 / #64 / #65 fixed several pipeline-crashing bugs in the same window, removing the crash-and-restart cycle that previously masked how slow steady-state throughput is — Kafka pipelines now run continuously at their (slow) baseline rate, so lag is finally visible.

This PR restores the simple sequential consume → write → commit loop that was in place pre-#66. It does not attempt to design proper consume-during-write overlap — that's a separate, larger conversation that wants real production write-time metrics first.

What's kept from v0.3.8

  • max.poll.interval.ms=600000 (10 min) in bizon/connectors/sources/kafka/src/config.py. Still useful as a belt-and-braces safety net against rebalance during long writes, even though the runner no longer claims to overlap.

What's reverted

  • All threading-related code in bizon/engine/runner/adapters/streaming.py (ThreadPoolExecutor import, pending_future / should_commit locals, do_writes closure, the deferred-commit logic, the unreachable executor.shutdown).

Files changed

Test plan

  • uv run ruff format + ruff check on streaming.py — clean
  • uv run pytest tests/connectors/sources/kafka/ — 27 passed
  • uv run pytest tests/connectors/sources/kafka/test_kafka_decode.py — 21 passed
  • Module import sanity check — no leftover ThreadPoolExecutor / pending_future / should_commit symbols in StreamingRunner.run
  • Diff verified to be the inverse of 9ad0ed5 for streaming.py
  • Stage canary on one Kafka consumer group, watch kafka_consumer_lag for 30–60 min vs the v0.3.8 baseline before promoting

(Pre-existing test failures unrelated to this change: missing optional deps pika, datadog, psycopg2, and live BQ tests requiring credentials.)

Out of scope (deliberate)

  • Optimizing the per-row protobuf dict conversion from fix: convert dict/list to JSON strings before protobuf serialization (v0.3.4) #62 (destination.py:183). It's correct and load-bearing for Debezium CDC; revisit only if metrics later prove it's the bottleneck.
  • Changing batch_size / consumer_timeout defaults.
  • Designing real consume-during-write overlap (heartbeat thread, partition pause/resume, etc.) — follow-up with metrics in hand.

🤖 Generated with Claude Code

PR #66 (v0.3.8) wrapped destination writes in a ThreadPoolExecutor with
the goal of overlapping consume(N+1) with write(N) to keep Kafka polls
alive during long BigQuery writes. The implementation waits for
pending_future.result() *before* calling source.get(), so consume always
runs after the previous write finishes — no actual overlap, just thread
overhead, a one-iteration commit delay, and a destination-object thread
safety footgun.

This restores the simple sequential loop. The max.poll.interval.ms=600000
Kafka consumer default introduced in 0.3.8 is intentionally retained as a
safety net against rebalance during long writes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@anaselmhamdi anaselmhamdi merged commit 1ccd0a1 into main Apr 7, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant