From 7dab84ec3d788c00c3208993a43c3d5e87f01a44 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 12:06:22 -0400 Subject: [PATCH 1/7] =?UTF-8?q?Add=20streaming=20feature=20pipeline=20(Pub?= =?UTF-8?q?/Sub=20=E2=86=92=20Feature=20Store)=20and=20deploy=20workflow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Beam streaming pipeline reads from Pub/Sub, renames fields to canonical names, writes to iris_features BQ table with WRITE_APPEND (source=streaming), and triggers FeatureView sync periodically so the online store stays fresh. Co-Authored-By: Claude Opus 4.6 --- .../workflows/deploy-dataflow-feature.yaml | 110 +++++++++ docs/gcp_feature_platform.md | 6 + scripts/deploy_dataflow_feature.sh | 60 +++++ src/dataflow/iris_feature_pipeline.py | 214 ++++++++++++++++++ 4 files changed, 390 insertions(+) create mode 100644 .github/workflows/deploy-dataflow-feature.yaml create mode 100755 scripts/deploy_dataflow_feature.sh create mode 100644 src/dataflow/iris_feature_pipeline.py diff --git a/.github/workflows/deploy-dataflow-feature.yaml b/.github/workflows/deploy-dataflow-feature.yaml new file mode 100644 index 0000000..256c07c --- /dev/null +++ b/.github/workflows/deploy-dataflow-feature.yaml @@ -0,0 +1,110 @@ +name: Deploy Dataflow Feature Pipeline + +on: + workflow_dispatch: + inputs: + environment: + description: "Target environment" + type: choice + options: + - staging + - prod + default: staging + region: + description: "GCP region for Dataflow job" + type: choice + options: + - us-central1 + - us-east1 + - us-west1 + default: us-central1 + worker_machine_type: + description: "Worker VM machine type" + type: choice + options: + - e2-standard-2 + - n1-standard-2 + - e2-standard-4 + - n1-standard-4 + default: e2-standard-2 + sync_interval_secs: + description: "FeatureView sync interval in seconds" + type: choice + options: + - "300" + - "600" + - "900" + default: "300" + +env: + PROJECT_ID: "deeplearning-sahil" + REGION: ${{ inputs.region }} + PUBSUB_TOPIC: "projects/deeplearning-sahil/topics/iris-inference-data" + TEMP_LOCATION: "gs://sb-vertex/temp" + STAGING_LOCATION: "gs://sb-vertex/staging" + SERVICE_ACCOUNT_EMAIL: "kfp-mlops@deeplearning-sahil.iam.gserviceaccount.com" + +jobs: + deploy: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set environment-specific variables + run: | + if [ "${{ inputs.environment }}" = "prod" ]; then + echo "OUTPUT_TABLE=${{ env.PROJECT_ID }}:ml_dataset.iris_features" >> $GITHUB_ENV + echo "JOB_PREFIX=iris-streaming-features" >> $GITHUB_ENV + else + echo "OUTPUT_TABLE=${{ env.PROJECT_ID }}:ml_dataset.iris_features" >> $GITHUB_ENV + echo "JOB_PREFIX=iris-streaming-features-staging" >> $GITHUB_ENV + fi + + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: "3.10" + + - name: Install dependencies + run: pip install -e . + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v1 + with: + credentials_json: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} + + - name: Set up gcloud CLI + uses: google-github-actions/setup-gcloud@v1 + with: + project_id: ${{ env.PROJECT_ID }} + + - name: Deploy Dataflow feature pipeline + run: | + JOB_NAME="${{ env.JOB_PREFIX }}-$(date +%Y%m%d-%H%M%S)" + echo "Submitting ${{ inputs.environment }} job: $JOB_NAME" + + python src/dataflow/iris_feature_pipeline.py \ + --input_topic ${{ env.PUBSUB_TOPIC }} \ + --output_table ${{ env.OUTPUT_TABLE }} \ + --project_id ${{ env.PROJECT_ID }} \ + --region ${{ env.REGION }} \ + --sync_interval_secs ${{ inputs.sync_interval_secs }} \ + --runner DataflowRunner \ + --job_name $JOB_NAME \ + --temp_location ${{ env.TEMP_LOCATION }} \ + --staging_location ${{ env.STAGING_LOCATION }} \ + --service_account_email ${{ env.SERVICE_ACCOUNT_EMAIL }} \ + --use_public_ips \ + --worker_machine_type ${{ inputs.worker_machine_type }} \ + --max_num_workers 3 \ + --autoscaling_algorithm THROUGHPUT_BASED \ + --streaming \ + --enable_streaming_engine \ + --experiments use_runner_v2 \ + --no_wait + + echo "Job submitted: $JOB_NAME" + echo "Environment: ${{ inputs.environment }}" + echo "Monitor: https://console.cloud.google.com/dataflow/jobs/${{ env.REGION }}?project=${{ env.PROJECT_ID }}" diff --git a/docs/gcp_feature_platform.md b/docs/gcp_feature_platform.md index 717c1d1..438a092 100644 --- a/docs/gcp_feature_platform.md +++ b/docs/gcp_feature_platform.md @@ -186,6 +186,12 @@ Decouples feature ingestion from inference. This pipeline's only job is to persi - Triggers `FeatureView.sync()` periodically or per-batch so the online store stays fresh - Writes to BQ only (no model calls, no prediction output) +**Create `.github/workflows/deploy-dataflow-feature.yaml`** — GitHub Action to deploy the feature pipeline: +- `workflow_dispatch` with inputs for environment (staging/prod), region, and worker machine type +- Same pattern as existing `deploy-dataflow.yaml` (checkout, Python setup, install, GCP auth, gcloud CLI) +- No Cloud Run service URL needed — this pipeline only writes to BQ, no model calls +- Environment-specific job prefix (`iris-streaming-features-staging` / `iris-streaming-features`) + ### Step 10: Streaming Inference Pipeline (Feature Store → Predictions) This pipeline reads features from the online store and runs inference — fully decoupled from feature ingestion. diff --git a/scripts/deploy_dataflow_feature.sh b/scripts/deploy_dataflow_feature.sh new file mode 100755 index 0000000..3bc0621 --- /dev/null +++ b/scripts/deploy_dataflow_feature.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +# Deploy Dataflow streaming job for ingesting Pub/Sub data into the Feature Store +set -e + +# Environment: staging (default) or prod +ENV=${1:-staging} + +# Configuration +PROJECT_ID="deeplearning-sahil" +REGION="us-central1" +PUBSUB_TOPIC="projects/$PROJECT_ID/topics/iris-inference-data" +TEMP_LOCATION="gs://sb-vertex/temp" +STAGING_LOCATION="gs://sb-vertex/staging" +SERVICE_ACCOUNT="kfp-mlops@deeplearning-sahil.iam.gserviceaccount.com" +SYNC_INTERVAL_SECS=300 + +if [ "$ENV" = "prod" ]; then + JOB_PREFIX="iris-streaming-features" + OUTPUT_TABLE="$PROJECT_ID:ml_dataset.iris_features" +else + JOB_PREFIX="iris-streaming-features-staging" + OUTPUT_TABLE="$PROJECT_ID:ml_dataset.iris_features" +fi + +JOB_NAME="$JOB_PREFIX-$(date +%Y%m%d-%H%M%S)" + +echo "Deploying Dataflow feature pipeline ($ENV)..." +echo "Output table: $OUTPUT_TABLE" + +python src/dataflow/iris_feature_pipeline.py \ + --input_topic $PUBSUB_TOPIC \ + --output_table $OUTPUT_TABLE \ + --project_id $PROJECT_ID \ + --region $REGION \ + --sync_interval_secs $SYNC_INTERVAL_SECS \ + --runner DataflowRunner \ + --job_name $JOB_NAME \ + --temp_location $TEMP_LOCATION \ + --staging_location $STAGING_LOCATION \ + --service_account_email $SERVICE_ACCOUNT \ + --use_public_ips \ + --max_num_workers 3 \ + --autoscaling_algorithm THROUGHPUT_BASED \ + --streaming \ + --enable_streaming_engine \ + --experiments use_runner_v2 \ + --no_wait + +echo "Dataflow job submitted successfully!" +echo "Job name: $JOB_NAME" +echo "Environment: $ENV" +echo "Monitor at: https://console.cloud.google.com/dataflow/jobs/$REGION/$JOB_NAME?project=$PROJECT_ID" +echo "" +echo "To test the pipeline:" +echo "1. Publish messages: python src/ml_pipelines_kfp/iris_xgboost/pubsub_producer.py --project-id=$PROJECT_ID" +echo "2. Check feature table: SELECT * FROM ml_dataset.iris_features WHERE source = 'streaming'" +echo "" +echo "To stop the job:" +echo "gcloud dataflow jobs cancel $JOB_NAME --region=$REGION --project=$PROJECT_ID" diff --git a/src/dataflow/iris_feature_pipeline.py b/src/dataflow/iris_feature_pipeline.py new file mode 100644 index 0000000..e65b3f0 --- /dev/null +++ b/src/dataflow/iris_feature_pipeline.py @@ -0,0 +1,214 @@ +""" +Dataflow streaming pipeline for ingesting Pub/Sub messages into the Feature Store. +Reads from Pub/Sub, renames fields to canonical names, and writes to the +iris_features BQ table with WRITE_APPEND. Periodically triggers a FeatureView +sync so the online store stays fresh. + +No model calls — this pipeline only persists features. +""" + +import json +import argparse +import logging +import uuid + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.io import ReadFromPubSub, WriteToBigQuery +from apache_beam.transforms.window import FixedWindows + +from ml_pipelines_kfp.log import get_logger + +logger = get_logger(__name__) + +PROJECT_ID = "deeplearning-sahil" +REGION = "us-central1" + +PUBSUB_TO_CANONICAL = { + "sepal_length": "sepal_length_cm", + "sepal_width": "sepal_width_cm", + "petal_length": "petal_length_cm", + "petal_width": "petal_width_cm", +} + +FEATURE_TABLE_SCHEMA = { + "fields": [ + {"name": "sepal_length_cm", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "sepal_width_cm", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "petal_length_cm", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "petal_width_cm", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "species", "type": "STRING", "mode": "NULLABLE"}, + {"name": "source", "type": "STRING", "mode": "REQUIRED"}, + {"name": "entity_id", "type": "STRING", "mode": "REQUIRED"}, + {"name": "feature_timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, + ] +} + + +class ParsePubSubMessage(beam.DoFn): + """Parse JSON message from Pub/Sub.""" + + def process(self, element): + try: + message_data = json.loads(element.decode("utf-8")) + + required_fields = [ + "sepal_length", + "sepal_width", + "petal_length", + "petal_width", + ] + if all(field in message_data for field in required_fields): + yield message_data + else: + logger.warning(f"Missing required fields in message: {message_data}") + + except (json.JSONDecodeError, AttributeError) as e: + logger.error(f"Error parsing message: {e}, message: {element}") + + +class MapToFeatureRow(beam.DoFn): + """Rename Pub/Sub fields to canonical names and add Feature Store metadata.""" + + def process(self, element): + from datetime import datetime, timezone + + row = { + canonical: float(element[pubsub_key]) + for pubsub_key, canonical in PUBSUB_TO_CANONICAL.items() + } + + sample_id = element.get("sample_id", uuid.uuid4().hex[:8]) + row["species"] = None + row["source"] = "streaming" + row["entity_id"] = f"{sample_id}_streaming" + row["feature_timestamp"] = datetime.now(timezone.utc).isoformat() + + yield row + + +class TriggerFeatureSync(beam.DoFn): + """Trigger a FeatureView sync after each window completes.""" + + def __init__(self, project_id, region, online_store_id, feature_view_id): + self.project_id = project_id + self.region = region + self.online_store_id = online_store_id + self.feature_view_id = feature_view_id + + def setup(self): + from google.cloud.aiplatform_v1 import FeatureOnlineStoreAdminServiceClient + + self._client = FeatureOnlineStoreAdminServiceClient( + client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"} + ) + self._feature_view_name = ( + f"projects/{self.project_id}/locations/{self.region}" + f"/featureOnlineStores/{self.online_store_id}" + f"/featureViews/{self.feature_view_id}" + ) + + def process(self, count): + try: + response = self._client.sync_feature_view( + feature_view=self._feature_view_name + ) + logger.info( + f"FeatureView sync triggered ({count} rows in window): " + f"{response.feature_view_sync}" + ) + except Exception as e: + logger.error(f"FeatureView sync failed: {e}") + yield count + + +def run_pipeline(argv=None): + """Run the Dataflow streaming feature pipeline.""" + + parser = argparse.ArgumentParser() + parser.add_argument( + "--input_topic", + required=True, + help="Pub/Sub topic to read from (projects/PROJECT/topics/TOPIC)", + ) + parser.add_argument( + "--output_table", + required=True, + help="BigQuery feature table (PROJECT:DATASET.TABLE)", + ) + parser.add_argument("--project_id", required=True, help="GCP project ID") + parser.add_argument("--region", required=True, help="GCP Region") + parser.add_argument( + "--sync_interval_secs", + type=int, + default=300, + help="How often to trigger FeatureView sync (seconds, default: 300)", + ) + parser.add_argument( + "--online_store_id", + default="ml_online_store", + help="Feature Online Store ID (default: ml_online_store)", + ) + parser.add_argument( + "--feature_view_id", + default="iris_features", + help="Feature View ID (default: iris_features)", + ) + parser.add_argument( + "--no_wait", + action="store_true", + help="Submit the job and exit without waiting for it to finish", + ) + + known_args, pipeline_args = parser.parse_known_args(argv) + logger.info(f"Known args: {known_args}") + logger.info(f"Pipeline args: {pipeline_args}") + + pipeline_options = PipelineOptions(pipeline_args) + + from apache_beam.options.pipeline_options import GoogleCloudOptions + + google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) + google_cloud_options.project = known_args.project_id + google_cloud_options.region = known_args.region + + p = beam.Pipeline(options=pipeline_options) + + feature_rows = ( + p + | "Read from Pub/Sub" >> ReadFromPubSub(topic=known_args.input_topic) + | "Parse JSON" >> beam.ParDo(ParsePubSubMessage()) + | "Map to Feature Row" >> beam.ParDo(MapToFeatureRow()) + ) + + feature_rows | "Write to Feature Table" >> WriteToBigQuery( + table=known_args.output_table, + schema=FEATURE_TABLE_SCHEMA, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, + ) + + ( + feature_rows + | "Window for Sync" >> beam.WindowInto( + FixedWindows(known_args.sync_interval_secs) + ) + | "Count per Window" >> beam.combiners.Count.Globally().without_defaults() + | "Trigger FeatureView Sync" + >> beam.ParDo( + TriggerFeatureSync( + project_id=known_args.project_id, + region=known_args.region, + online_store_id=known_args.online_store_id, + feature_view_id=known_args.feature_view_id, + ) + ) + ) + + result = p.run() + if not known_args.no_wait: + result.wait_until_finish() + + +if __name__ == "__main__": + run_pipeline() From 21220a58b7dfe7263ef195339ee03f8d48e3231e Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 17:06:27 -0400 Subject: [PATCH 2/7] Extract Pub/Sub message validation to Pydantic schema Adds dataflow/schemas.py with PubSubIrisMessage model for type-safe validation of incoming messages instead of manual field checking. Co-Authored-By: Claude Opus 4.6 --- src/dataflow/iris_feature_pipeline.py | 26 +++++++++++--------------- src/dataflow/schemas.py | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 15 deletions(-) create mode 100644 src/dataflow/schemas.py diff --git a/src/dataflow/iris_feature_pipeline.py b/src/dataflow/iris_feature_pipeline.py index e65b3f0..3fb598c 100644 --- a/src/dataflow/iris_feature_pipeline.py +++ b/src/dataflow/iris_feature_pipeline.py @@ -46,39 +46,35 @@ class ParsePubSubMessage(beam.DoFn): - """Parse JSON message from Pub/Sub.""" + """Parse and validate JSON message from Pub/Sub using Pydantic schema.""" def process(self, element): + from pydantic import ValidationError + from dataflow.schemas import PubSubIrisMessage + try: message_data = json.loads(element.decode("utf-8")) + validated = PubSubIrisMessage(**message_data) + yield validated.model_dump() - required_fields = [ - "sepal_length", - "sepal_width", - "petal_length", - "petal_width", - ] - if all(field in message_data for field in required_fields): - yield message_data - else: - logger.warning(f"Missing required fields in message: {message_data}") - + except ValidationError as e: + logger.warning(f"Invalid message: {e}") except (json.JSONDecodeError, AttributeError) as e: logger.error(f"Error parsing message: {e}, message: {element}") class MapToFeatureRow(beam.DoFn): - """Rename Pub/Sub fields to canonical names and add Feature Store metadata.""" + """Rename validated Pub/Sub fields to canonical names and add Feature Store metadata.""" def process(self, element): from datetime import datetime, timezone row = { - canonical: float(element[pubsub_key]) + canonical: element[pubsub_key] for pubsub_key, canonical in PUBSUB_TO_CANONICAL.items() } - sample_id = element.get("sample_id", uuid.uuid4().hex[:8]) + sample_id = element.get("sample_id") or uuid.uuid4().hex[:8] row["species"] = None row["source"] = "streaming" row["entity_id"] = f"{sample_id}_streaming" diff --git a/src/dataflow/schemas.py b/src/dataflow/schemas.py new file mode 100644 index 0000000..06a3329 --- /dev/null +++ b/src/dataflow/schemas.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel + + +class PubSubIrisMessage(BaseModel): + """Validates incoming Pub/Sub messages for the Iris feature pipeline.""" + + sepal_length: float + sepal_width: float + petal_length: float + petal_width: float + timestamp: Optional[str] = None + sample_id: Optional[int] = None From dd4442667e9457e9466b4882bf96a62441366847 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 17:07:26 -0400 Subject: [PATCH 3/7] Move Pub/Sub schema to dataflow/models/iris_schema.py Co-Authored-By: Claude Opus 4.6 --- src/dataflow/iris_feature_pipeline.py | 2 +- src/dataflow/{schemas.py => models/iris_schema.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/dataflow/{schemas.py => models/iris_schema.py} (100%) diff --git a/src/dataflow/iris_feature_pipeline.py b/src/dataflow/iris_feature_pipeline.py index 3fb598c..dae7d2b 100644 --- a/src/dataflow/iris_feature_pipeline.py +++ b/src/dataflow/iris_feature_pipeline.py @@ -50,7 +50,7 @@ class ParsePubSubMessage(beam.DoFn): def process(self, element): from pydantic import ValidationError - from dataflow.schemas import PubSubIrisMessage + from dataflow.models.iris_schema import PubSubIrisMessage try: message_data = json.loads(element.decode("utf-8")) diff --git a/src/dataflow/schemas.py b/src/dataflow/models/iris_schema.py similarity index 100% rename from src/dataflow/schemas.py rename to src/dataflow/models/iris_schema.py From 9fd433f7c9ac0b5a3578e8f82cf6e1a529a47cc6 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 17:08:55 -0400 Subject: [PATCH 4/7] Move all imports to top of iris_feature_pipeline.py Co-Authored-By: Claude Opus 4.6 --- src/dataflow/iris_feature_pipeline.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/dataflow/iris_feature_pipeline.py b/src/dataflow/iris_feature_pipeline.py index dae7d2b..767be3e 100644 --- a/src/dataflow/iris_feature_pipeline.py +++ b/src/dataflow/iris_feature_pipeline.py @@ -11,12 +11,16 @@ import argparse import logging import uuid +from datetime import datetime, timezone import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions from apache_beam.io import ReadFromPubSub, WriteToBigQuery from apache_beam.transforms.window import FixedWindows +from google.cloud.aiplatform_v1 import FeatureOnlineStoreAdminServiceClient +from pydantic import ValidationError +from dataflow.models.iris_schema import PubSubIrisMessage from ml_pipelines_kfp.log import get_logger logger = get_logger(__name__) @@ -49,9 +53,6 @@ class ParsePubSubMessage(beam.DoFn): """Parse and validate JSON message from Pub/Sub using Pydantic schema.""" def process(self, element): - from pydantic import ValidationError - from dataflow.models.iris_schema import PubSubIrisMessage - try: message_data = json.loads(element.decode("utf-8")) validated = PubSubIrisMessage(**message_data) @@ -67,8 +68,6 @@ class MapToFeatureRow(beam.DoFn): """Rename validated Pub/Sub fields to canonical names and add Feature Store metadata.""" def process(self, element): - from datetime import datetime, timezone - row = { canonical: element[pubsub_key] for pubsub_key, canonical in PUBSUB_TO_CANONICAL.items() @@ -93,8 +92,6 @@ def __init__(self, project_id, region, online_store_id, feature_view_id): self.feature_view_id = feature_view_id def setup(self): - from google.cloud.aiplatform_v1 import FeatureOnlineStoreAdminServiceClient - self._client = FeatureOnlineStoreAdminServiceClient( client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"} ) @@ -162,8 +159,6 @@ def run_pipeline(argv=None): pipeline_options = PipelineOptions(pipeline_args) - from apache_beam.options.pipeline_options import GoogleCloudOptions - google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) google_cloud_options.project = known_args.project_id google_cloud_options.region = known_args.region From 99220a7c51bd0f43b4c58f2badef156800329ad9 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 17:12:55 -0400 Subject: [PATCH 5/7] Move TriggerFeatureSync to dataflow/utils/feature_sync.py for reuse Co-Authored-By: Claude Opus 4.6 --- src/dataflow/iris_feature_pipeline.py | 35 +----------------------- src/dataflow/utils/feature_sync.py | 39 +++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 34 deletions(-) create mode 100644 src/dataflow/utils/feature_sync.py diff --git a/src/dataflow/iris_feature_pipeline.py b/src/dataflow/iris_feature_pipeline.py index 767be3e..934e28b 100644 --- a/src/dataflow/iris_feature_pipeline.py +++ b/src/dataflow/iris_feature_pipeline.py @@ -17,10 +17,10 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions from apache_beam.io import ReadFromPubSub, WriteToBigQuery from apache_beam.transforms.window import FixedWindows -from google.cloud.aiplatform_v1 import FeatureOnlineStoreAdminServiceClient from pydantic import ValidationError from dataflow.models.iris_schema import PubSubIrisMessage +from dataflow.utils.feature_sync import TriggerFeatureSync from ml_pipelines_kfp.log import get_logger logger = get_logger(__name__) @@ -82,39 +82,6 @@ def process(self, element): yield row -class TriggerFeatureSync(beam.DoFn): - """Trigger a FeatureView sync after each window completes.""" - - def __init__(self, project_id, region, online_store_id, feature_view_id): - self.project_id = project_id - self.region = region - self.online_store_id = online_store_id - self.feature_view_id = feature_view_id - - def setup(self): - self._client = FeatureOnlineStoreAdminServiceClient( - client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"} - ) - self._feature_view_name = ( - f"projects/{self.project_id}/locations/{self.region}" - f"/featureOnlineStores/{self.online_store_id}" - f"/featureViews/{self.feature_view_id}" - ) - - def process(self, count): - try: - response = self._client.sync_feature_view( - feature_view=self._feature_view_name - ) - logger.info( - f"FeatureView sync triggered ({count} rows in window): " - f"{response.feature_view_sync}" - ) - except Exception as e: - logger.error(f"FeatureView sync failed: {e}") - yield count - - def run_pipeline(argv=None): """Run the Dataflow streaming feature pipeline.""" diff --git a/src/dataflow/utils/feature_sync.py b/src/dataflow/utils/feature_sync.py new file mode 100644 index 0000000..eea4165 --- /dev/null +++ b/src/dataflow/utils/feature_sync.py @@ -0,0 +1,39 @@ +import logging + +import apache_beam as beam +from google.cloud.aiplatform_v1 import FeatureOnlineStoreAdminServiceClient + +logger = logging.getLogger(__name__) + + +class TriggerFeatureSync(beam.DoFn): + """Trigger a FeatureView sync after each window completes.""" + + def __init__(self, project_id, region, online_store_id, feature_view_id): + self.project_id = project_id + self.region = region + self.online_store_id = online_store_id + self.feature_view_id = feature_view_id + + def setup(self): + self._client = FeatureOnlineStoreAdminServiceClient( + client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"} + ) + self._feature_view_name = ( + f"projects/{self.project_id}/locations/{self.region}" + f"/featureOnlineStores/{self.online_store_id}" + f"/featureViews/{self.feature_view_id}" + ) + + def process(self, count): + try: + response = self._client.sync_feature_view( + feature_view=self._feature_view_name + ) + logger.info( + f"FeatureView sync triggered ({count} rows in window): " + f"{response.feature_view_sync}" + ) + except Exception as e: + logger.error(f"FeatureView sync failed: {e}") + yield count From fb2d247b1f380f592a0266839a5bddc92bd7d472 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 17:19:27 -0400 Subject: [PATCH 6/7] Replace periodic sync with dual-write to BQ and online store Writes directly to Bigtable via WriteFeatureValues API for sub-second online serving latency, instead of waiting for periodic FeatureView sync. BQ write remains for offline training/batch use cases. Co-Authored-By: Claude Opus 4.6 --- .../workflows/deploy-dataflow-feature.yaml | 14 ++-- scripts/deploy_dataflow_feature.sh | 4 +- src/dataflow/iris_feature_pipeline.py | 30 ++++---- src/dataflow/utils/online_store_writer.py | 69 +++++++++++++++++++ 4 files changed, 94 insertions(+), 23 deletions(-) create mode 100644 src/dataflow/utils/online_store_writer.py diff --git a/.github/workflows/deploy-dataflow-feature.yaml b/.github/workflows/deploy-dataflow-feature.yaml index 256c07c..5647c27 100644 --- a/.github/workflows/deploy-dataflow-feature.yaml +++ b/.github/workflows/deploy-dataflow-feature.yaml @@ -27,14 +27,14 @@ on: - e2-standard-4 - n1-standard-4 default: e2-standard-2 - sync_interval_secs: - description: "FeatureView sync interval in seconds" + online_batch_size: + description: "Max rows per online store write batch" type: choice options: - - "300" - - "600" - - "900" - default: "300" + - "50" + - "100" + - "200" + default: "100" env: PROJECT_ID: "deeplearning-sahil" @@ -90,7 +90,7 @@ jobs: --output_table ${{ env.OUTPUT_TABLE }} \ --project_id ${{ env.PROJECT_ID }} \ --region ${{ env.REGION }} \ - --sync_interval_secs ${{ inputs.sync_interval_secs }} \ + --online_batch_size ${{ inputs.online_batch_size }} \ --runner DataflowRunner \ --job_name $JOB_NAME \ --temp_location ${{ env.TEMP_LOCATION }} \ diff --git a/scripts/deploy_dataflow_feature.sh b/scripts/deploy_dataflow_feature.sh index 3bc0621..674d44f 100755 --- a/scripts/deploy_dataflow_feature.sh +++ b/scripts/deploy_dataflow_feature.sh @@ -13,7 +13,7 @@ PUBSUB_TOPIC="projects/$PROJECT_ID/topics/iris-inference-data" TEMP_LOCATION="gs://sb-vertex/temp" STAGING_LOCATION="gs://sb-vertex/staging" SERVICE_ACCOUNT="kfp-mlops@deeplearning-sahil.iam.gserviceaccount.com" -SYNC_INTERVAL_SECS=300 +ONLINE_BATCH_SIZE=100 if [ "$ENV" = "prod" ]; then JOB_PREFIX="iris-streaming-features" @@ -33,7 +33,7 @@ python src/dataflow/iris_feature_pipeline.py \ --output_table $OUTPUT_TABLE \ --project_id $PROJECT_ID \ --region $REGION \ - --sync_interval_secs $SYNC_INTERVAL_SECS \ + --online_batch_size $ONLINE_BATCH_SIZE \ --runner DataflowRunner \ --job_name $JOB_NAME \ --temp_location $TEMP_LOCATION \ diff --git a/src/dataflow/iris_feature_pipeline.py b/src/dataflow/iris_feature_pipeline.py index 934e28b..9d28334 100644 --- a/src/dataflow/iris_feature_pipeline.py +++ b/src/dataflow/iris_feature_pipeline.py @@ -1,8 +1,8 @@ """ Dataflow streaming pipeline for ingesting Pub/Sub messages into the Feature Store. -Reads from Pub/Sub, renames fields to canonical names, and writes to the -iris_features BQ table with WRITE_APPEND. Periodically triggers a FeatureView -sync so the online store stays fresh. +Reads from Pub/Sub, renames fields to canonical names, and dual-writes to: + - BQ iris_features table (offline store, for training/batch) + - Bigtable online store (real-time serving, sub-second latency) No model calls — this pipeline only persists features. """ @@ -16,11 +16,11 @@ import apache_beam as beam from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions from apache_beam.io import ReadFromPubSub, WriteToBigQuery -from apache_beam.transforms.window import FixedWindows +from apache_beam.transforms.util import BatchElements from pydantic import ValidationError from dataflow.models.iris_schema import PubSubIrisMessage -from dataflow.utils.feature_sync import TriggerFeatureSync +from dataflow.utils.online_store_writer import WriteToOnlineStore from ml_pipelines_kfp.log import get_logger logger = get_logger(__name__) @@ -99,10 +99,10 @@ def run_pipeline(argv=None): parser.add_argument("--project_id", required=True, help="GCP project ID") parser.add_argument("--region", required=True, help="GCP Region") parser.add_argument( - "--sync_interval_secs", + "--online_batch_size", type=int, - default=300, - help="How often to trigger FeatureView sync (seconds, default: 300)", + default=100, + help="Max rows per online store write batch (default: 100)", ) parser.add_argument( "--online_store_id", @@ -139,7 +139,7 @@ def run_pipeline(argv=None): | "Map to Feature Row" >> beam.ParDo(MapToFeatureRow()) ) - feature_rows | "Write to Feature Table" >> WriteToBigQuery( + feature_rows | "Write to BQ (Offline Store)" >> WriteToBigQuery( table=known_args.output_table, schema=FEATURE_TABLE_SCHEMA, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, @@ -148,17 +148,19 @@ def run_pipeline(argv=None): ( feature_rows - | "Window for Sync" >> beam.WindowInto( - FixedWindows(known_args.sync_interval_secs) + | "Batch for Online Store" + >> BatchElements( + min_batch_size=1, + max_batch_size=known_args.online_batch_size, ) - | "Count per Window" >> beam.combiners.Count.Globally().without_defaults() - | "Trigger FeatureView Sync" + | "Write to Online Store (Bigtable)" >> beam.ParDo( - TriggerFeatureSync( + WriteToOnlineStore( project_id=known_args.project_id, region=known_args.region, online_store_id=known_args.online_store_id, feature_view_id=known_args.feature_view_id, + feature_columns=list(PUBSUB_TO_CANONICAL.values()), ) ) ) diff --git a/src/dataflow/utils/online_store_writer.py b/src/dataflow/utils/online_store_writer.py new file mode 100644 index 0000000..8205108 --- /dev/null +++ b/src/dataflow/utils/online_store_writer.py @@ -0,0 +1,69 @@ +import logging + +import apache_beam as beam +from google.cloud.aiplatform_v1 import FeatureOnlineStoreServiceClient +from google.cloud.aiplatform_v1.types import ( + WriteFeatureValuesRequest, + WriteFeatureValuesPayload, + FeatureValue, +) + +logger = logging.getLogger(__name__) + + +class WriteToOnlineStore(beam.DoFn): + """Write a batch of feature rows directly to the Feature Store online store (Bigtable). + + Expects each batch element to be a dict with 'entity_id' and feature columns. + Float features use double_value, string features use string_value. + """ + + def __init__(self, project_id, region, online_store_id, feature_view_id, feature_columns): + self.project_id = project_id + self.region = region + self.online_store_id = online_store_id + self.feature_view_id = feature_view_id + self.feature_columns = feature_columns + + def setup(self): + self._client = FeatureOnlineStoreServiceClient( + client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"} + ) + self._feature_view_name = ( + f"projects/{self.project_id}/locations/{self.region}" + f"/featureOnlineStores/{self.online_store_id}" + f"/featureViews/{self.feature_view_id}" + ) + + def process(self, batch): + payloads = [] + for row in batch: + feature_values = {} + for col in self.feature_columns: + val = row.get(col) + if val is None: + continue + if isinstance(val, (int, float)): + feature_values[col] = FeatureValue(double_value=float(val)) + else: + feature_values[col] = FeatureValue(string_value=str(val)) + + payloads.append( + WriteFeatureValuesPayload( + entity_id=row["entity_id"], + feature_values=feature_values, + ) + ) + + try: + self._client.write_feature_values( + request=WriteFeatureValuesRequest( + feature_view=self._feature_view_name, + payloads=payloads, + ) + ) + logger.info(f"Wrote {len(payloads)} rows to online store") + except Exception as e: + logger.error(f"Online store write failed ({len(payloads)} rows): {e}") + + yield from batch From 89e2b0a44e95679022de052810a0c5ca13245ba6 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 17:28:10 -0400 Subject: [PATCH 7/7] =?UTF-8?q?Remove=20feature=5Fsync.py=20=E2=80=94=20re?= =?UTF-8?q?placed=20by=20direct=20online=20store=20writes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 --- src/dataflow/utils/feature_sync.py | 39 ------------------------------ 1 file changed, 39 deletions(-) delete mode 100644 src/dataflow/utils/feature_sync.py diff --git a/src/dataflow/utils/feature_sync.py b/src/dataflow/utils/feature_sync.py deleted file mode 100644 index eea4165..0000000 --- a/src/dataflow/utils/feature_sync.py +++ /dev/null @@ -1,39 +0,0 @@ -import logging - -import apache_beam as beam -from google.cloud.aiplatform_v1 import FeatureOnlineStoreAdminServiceClient - -logger = logging.getLogger(__name__) - - -class TriggerFeatureSync(beam.DoFn): - """Trigger a FeatureView sync after each window completes.""" - - def __init__(self, project_id, region, online_store_id, feature_view_id): - self.project_id = project_id - self.region = region - self.online_store_id = online_store_id - self.feature_view_id = feature_view_id - - def setup(self): - self._client = FeatureOnlineStoreAdminServiceClient( - client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"} - ) - self._feature_view_name = ( - f"projects/{self.project_id}/locations/{self.region}" - f"/featureOnlineStores/{self.online_store_id}" - f"/featureViews/{self.feature_view_id}" - ) - - def process(self, count): - try: - response = self._client.sync_feature_view( - feature_view=self._feature_view_name - ) - logger.info( - f"FeatureView sync triggered ({count} rows in window): " - f"{response.feature_view_sync}" - ) - except Exception as e: - logger.error(f"FeatureView sync failed: {e}") - yield count