Step 9: Streaming feature pipeline (Pub/Sub → Feature Store)#29
Merged
Conversation
…orkflow Beam streaming pipeline reads from Pub/Sub, renames fields to canonical names, writes to iris_features BQ table with WRITE_APPEND (source=streaming), and triggers FeatureView sync periodically so the online store stays fresh. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds dataflow/schemas.py with PubSubIrisMessage model for type-safe validation of incoming messages instead of manual field checking. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Writes directly to Bigtable via WriteFeatureValues API for sub-second online serving latency, instead of waiting for periodic FeatureView sync. BQ write remains for offline training/batch use cases. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
src/dataflow/iris_feature_pipeline.py— Beam streaming pipeline that reads from Pub/Sub, validates messages with Pydantic (dataflow/models/iris_schema.py), renames fields to canonical names, and dual-writes to:WRITE_APPENDfor training/batchWriteFeatureValuesAPI for sub-second real-time servingsrc/dataflow/models/iris_schema.py—PubSubIrisMessagePydantic model for type-safe Pub/Sub message validationsrc/dataflow/utils/online_store_writer.py— reusableWriteToOnlineStoreBeam DoFn for direct Bigtable writes.github/workflows/deploy-dataflow-feature.yaml— GitHub Action (workflow_dispatch) to deploy the feature pipelinescripts/deploy_dataflow_feature.sh— local deploy scriptKey design decisions
source = "streaming"distinguishes streaming-ingested rows from batch ("training","batch_input")entity_id = "{sample_id}_streaming"— uses Pub/Sub messagesample_idwhen available, falls back to UUIDBatchElementsgroups rows for efficient online store writes without adding latency at low throughputTest plan
pubsub_producer.pyiris_featuresBQ table withsource = 'streaming'🤖 Generated with Claude Code