Skip to content

Step 9: Streaming feature pipeline (Pub/Sub → Feature Store)#29

Merged
shlbatra merged 7 commits into
mainfrom
feature/fp-step9-streaming-feature-pipeline
Jun 16, 2026
Merged

Step 9: Streaming feature pipeline (Pub/Sub → Feature Store)#29
shlbatra merged 7 commits into
mainfrom
feature/fp-step9-streaming-feature-pipeline

Conversation

@shlbatra

@shlbatra shlbatra commented Jun 16, 2026

Copy link
Copy Markdown
Owner

Summary

  • Add src/dataflow/iris_feature_pipeline.py — Beam streaming pipeline that reads from Pub/Sub, validates messages with Pydantic (dataflow/models/iris_schema.py), renames fields to canonical names, and dual-writes to:
    • BQ (offline store) — WRITE_APPEND for training/batch
    • Bigtable (online store) — direct WriteFeatureValues API for sub-second real-time serving
  • Add src/dataflow/models/iris_schema.pyPubSubIrisMessage Pydantic model for type-safe Pub/Sub message validation
  • Add src/dataflow/utils/online_store_writer.py — reusable WriteToOnlineStore Beam DoFn for direct Bigtable writes
  • Add .github/workflows/deploy-dataflow-feature.yaml — GitHub Action (workflow_dispatch) to deploy the feature pipeline
  • Add scripts/deploy_dataflow_feature.sh — local deploy script

Key design decisions

  • Dual-write instead of periodic sync — writes to BQ and Bigtable in parallel, no 3–10 min sync delay
  • source = "streaming" distinguishes streaming-ingested rows from batch ("training", "batch_input")
  • entity_id = "{sample_id}_streaming" — uses Pub/Sub message sample_id when available, falls back to UUID
  • BatchElements groups rows for efficient online store writes without adding latency at low throughput
  • No model calls — this pipeline only persists features; inference is handled separately

Test plan

  • Deploy to staging via GitHub Action or local script
  • Publish messages with pubsub_producer.py
  • Verify rows appear in iris_features BQ table with source = 'streaming'
  • Verify features are available in online store for real-time lookups

🤖 Generated with Claude Code

shlbatra and others added 7 commits June 16, 2026 12:06
…orkflow

Beam streaming pipeline reads from Pub/Sub, renames fields to canonical
names, writes to iris_features BQ table with WRITE_APPEND (source=streaming),
and triggers FeatureView sync periodically so the online store stays fresh.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds dataflow/schemas.py with PubSubIrisMessage model for type-safe
validation of incoming messages instead of manual field checking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Writes directly to Bigtable via WriteFeatureValues API for sub-second
online serving latency, instead of waiting for periodic FeatureView sync.
BQ write remains for offline training/batch use cases.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@shlbatra shlbatra merged commit 9c32700 into main Jun 16, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant