Skip to content

Add micro-batching and async HTTP to Dataflow streaming pipeline#10

Merged
shlbatra merged 2 commits into
mainfrom
feature/micro-batch-async-inference
Jun 15, 2026
Merged

Add micro-batching and async HTTP to Dataflow streaming pipeline#10
shlbatra merged 2 commits into
mainfrom
feature/micro-batch-async-inference

Conversation

@shlbatra

@shlbatra shlbatra commented Jun 14, 2026

Copy link
Copy Markdown
Owner

Summary

Replace the single-message synchronous HTTP pipeline with micro-batching and async HTTP to improve streaming inference throughput by ~20-200x. At high traffic, up to 50 messages are grouped into a single /predict call. At low traffic, partial batches flush after 1 second so no message waits indefinitely.

Changes

Pipeline (src/dataflow/iris_streaming_pipeline.py)

  • Replace CallFastAPIService (1 msg → 1 sync HTTP call) with BatchCallFastAPIService (up to 50 msgs → 1 async HTTP call)
  • Add BatchElements with max_batch_duration_secs=1 for stateful micro-batching across Beam bundles — avoids the problem where default BatchElements is a no-op on Dataflow streaming at low volume
  • Use aiohttp with TCPConnector(limit=4) for async HTTP, overlapping concurrent batch calls within a worker
  • Dedicated asyncio event loop in setup()/teardown() lifecycle (standard pattern for async I/O in Beam DoFns)
  • Add --batch_size (default 50) and --max_batch_duration_secs (default 1.0) CLI args for tuning
  • Remove unused requests and window imports

Deploy workflow (.github/workflows/deploy-dataflow.yaml)

  • Pass --batch_size 50 and --max_batch_duration_secs 1.0 to the pipeline

Docs

  • Update docs/scale_inference_batching.md plan to reflect windowless approach with max_batch_duration_secs
  • Update Readme.md streaming architecture section with batching and async HTTP details

Testing

  • Syntax check (python -m py_compile src/dataflow/iris_streaming_pipeline.py)
  • Existing tests pass (python -m pytest test/)
  • Local DirectRunner test with Pub/Sub producer — verify predictions land in staging BigQuery table
  • Deploy to staging via Deploy Dataflow Streaming workflow with environment=staging
  • Verify batch sizes in logs (Batch prediction failed (N instances) pattern — N should be ~50 at load, 1-few at low traffic)
  • Compare Dataflow job metrics: elements/sec, system lag, worker CPU
  • Verify no data loss: Pub/Sub acked count matches BigQuery row count over a time window
  • Promote to prod after staging validation

@shlbatra shlbatra changed the title support micro batching Add micro-batching and async HTTP to Dataflow streaming pipeline Jun 14, 2026
The `with beam.Pipeline()` context manager calls wait_until_finish(),
which blocks forever for streaming pipelines. Use explicit pipeline.run()
with conditional wait gated on --no_wait.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@shlbatra shlbatra merged commit 69cd9f9 into main Jun 15, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant