Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions .github/workflows/deploy-dataflow-feature.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
name: Deploy Dataflow Feature Pipeline

on:
workflow_dispatch:
inputs:
environment:
description: "Target environment"
type: choice
options:
- staging
- prod
default: staging
region:
description: "GCP region for Dataflow job"
type: choice
options:
- us-central1
- us-east1
- us-west1
default: us-central1
worker_machine_type:
description: "Worker VM machine type"
type: choice
options:
- e2-standard-2
- n1-standard-2
- e2-standard-4
- n1-standard-4
default: e2-standard-2
online_batch_size:
description: "Max rows per online store write batch"
type: choice
options:
- "50"
- "100"
- "200"
default: "100"

env:
PROJECT_ID: "deeplearning-sahil"
REGION: ${{ inputs.region }}
PUBSUB_TOPIC: "projects/deeplearning-sahil/topics/iris-inference-data"
TEMP_LOCATION: "gs://sb-vertex/temp"
STAGING_LOCATION: "gs://sb-vertex/staging"
SERVICE_ACCOUNT_EMAIL: "kfp-mlops@deeplearning-sahil.iam.gserviceaccount.com"

jobs:
deploy:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set environment-specific variables
run: |
if [ "${{ inputs.environment }}" = "prod" ]; then
echo "OUTPUT_TABLE=${{ env.PROJECT_ID }}:ml_dataset.iris_features" >> $GITHUB_ENV
echo "JOB_PREFIX=iris-streaming-features" >> $GITHUB_ENV
else
echo "OUTPUT_TABLE=${{ env.PROJECT_ID }}:ml_dataset.iris_features" >> $GITHUB_ENV
echo "JOB_PREFIX=iris-streaming-features-staging" >> $GITHUB_ENV
fi

- name: Set up Python 3.10
uses: actions/setup-python@v5
with:
python-version: "3.10"

- name: Install dependencies
run: pip install -e .

- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}

- name: Set up gcloud CLI
uses: google-github-actions/setup-gcloud@v1
with:
project_id: ${{ env.PROJECT_ID }}

- name: Deploy Dataflow feature pipeline
run: |
JOB_NAME="${{ env.JOB_PREFIX }}-$(date +%Y%m%d-%H%M%S)"
echo "Submitting ${{ inputs.environment }} job: $JOB_NAME"

python src/dataflow/iris_feature_pipeline.py \
--input_topic ${{ env.PUBSUB_TOPIC }} \
--output_table ${{ env.OUTPUT_TABLE }} \
--project_id ${{ env.PROJECT_ID }} \
--region ${{ env.REGION }} \
--online_batch_size ${{ inputs.online_batch_size }} \
--runner DataflowRunner \
--job_name $JOB_NAME \
--temp_location ${{ env.TEMP_LOCATION }} \
--staging_location ${{ env.STAGING_LOCATION }} \
--service_account_email ${{ env.SERVICE_ACCOUNT_EMAIL }} \
--use_public_ips \
--worker_machine_type ${{ inputs.worker_machine_type }} \
--max_num_workers 3 \
--autoscaling_algorithm THROUGHPUT_BASED \
--streaming \
--enable_streaming_engine \
--experiments use_runner_v2 \
--no_wait

echo "Job submitted: $JOB_NAME"
echo "Environment: ${{ inputs.environment }}"
echo "Monitor: https://console.cloud.google.com/dataflow/jobs/${{ env.REGION }}?project=${{ env.PROJECT_ID }}"
6 changes: 6 additions & 0 deletions docs/gcp_feature_platform.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ Decouples feature ingestion from inference. This pipeline's only job is to persi
- Triggers `FeatureView.sync()` periodically or per-batch so the online store stays fresh
- Writes to BQ only (no model calls, no prediction output)

**Create `.github/workflows/deploy-dataflow-feature.yaml`** — GitHub Action to deploy the feature pipeline:
- `workflow_dispatch` with inputs for environment (staging/prod), region, and worker machine type
- Same pattern as existing `deploy-dataflow.yaml` (checkout, Python setup, install, GCP auth, gcloud CLI)
- No Cloud Run service URL needed — this pipeline only writes to BQ, no model calls
- Environment-specific job prefix (`iris-streaming-features-staging` / `iris-streaming-features`)

### Step 10: Streaming Inference Pipeline (Feature Store → Predictions)

This pipeline reads features from the online store and runs inference — fully decoupled from feature ingestion.
Expand Down
60 changes: 60 additions & 0 deletions scripts/deploy_dataflow_feature.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash

# Deploy Dataflow streaming job for ingesting Pub/Sub data into the Feature Store
set -e

# Environment: staging (default) or prod
ENV=${1:-staging}

# Configuration
PROJECT_ID="deeplearning-sahil"
REGION="us-central1"
PUBSUB_TOPIC="projects/$PROJECT_ID/topics/iris-inference-data"
TEMP_LOCATION="gs://sb-vertex/temp"
STAGING_LOCATION="gs://sb-vertex/staging"
SERVICE_ACCOUNT="kfp-mlops@deeplearning-sahil.iam.gserviceaccount.com"
ONLINE_BATCH_SIZE=100

if [ "$ENV" = "prod" ]; then
JOB_PREFIX="iris-streaming-features"
OUTPUT_TABLE="$PROJECT_ID:ml_dataset.iris_features"
else
JOB_PREFIX="iris-streaming-features-staging"
OUTPUT_TABLE="$PROJECT_ID:ml_dataset.iris_features"
fi

JOB_NAME="$JOB_PREFIX-$(date +%Y%m%d-%H%M%S)"

echo "Deploying Dataflow feature pipeline ($ENV)..."
echo "Output table: $OUTPUT_TABLE"

python src/dataflow/iris_feature_pipeline.py \
--input_topic $PUBSUB_TOPIC \
--output_table $OUTPUT_TABLE \
--project_id $PROJECT_ID \
--region $REGION \
--online_batch_size $ONLINE_BATCH_SIZE \
--runner DataflowRunner \
--job_name $JOB_NAME \
--temp_location $TEMP_LOCATION \
--staging_location $STAGING_LOCATION \
--service_account_email $SERVICE_ACCOUNT \
--use_public_ips \
--max_num_workers 3 \
--autoscaling_algorithm THROUGHPUT_BASED \
--streaming \
--enable_streaming_engine \
--experiments use_runner_v2 \
--no_wait

echo "Dataflow job submitted successfully!"
echo "Job name: $JOB_NAME"
echo "Environment: $ENV"
echo "Monitor at: https://console.cloud.google.com/dataflow/jobs/$REGION/$JOB_NAME?project=$PROJECT_ID"
echo ""
echo "To test the pipeline:"
echo "1. Publish messages: python src/ml_pipelines_kfp/iris_xgboost/pubsub_producer.py --project-id=$PROJECT_ID"
echo "2. Check feature table: SELECT * FROM ml_dataset.iris_features WHERE source = 'streaming'"
echo ""
echo "To stop the job:"
echo "gcloud dataflow jobs cancel $JOB_NAME --region=$REGION --project=$PROJECT_ID"
174 changes: 174 additions & 0 deletions src/dataflow/iris_feature_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""
Dataflow streaming pipeline for ingesting Pub/Sub messages into the Feature Store.
Reads from Pub/Sub, renames fields to canonical names, and dual-writes to:
- BQ iris_features table (offline store, for training/batch)
- Bigtable online store (real-time serving, sub-second latency)

No model calls — this pipeline only persists features.
"""

import json
import argparse
import logging
import uuid
from datetime import datetime, timezone

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions
from apache_beam.io import ReadFromPubSub, WriteToBigQuery
from apache_beam.transforms.util import BatchElements
from pydantic import ValidationError

from dataflow.models.iris_schema import PubSubIrisMessage
from dataflow.utils.online_store_writer import WriteToOnlineStore
from ml_pipelines_kfp.log import get_logger

logger = get_logger(__name__)

PROJECT_ID = "deeplearning-sahil"
REGION = "us-central1"

PUBSUB_TO_CANONICAL = {
"sepal_length": "sepal_length_cm",
"sepal_width": "sepal_width_cm",
"petal_length": "petal_length_cm",
"petal_width": "petal_width_cm",
}

FEATURE_TABLE_SCHEMA = {
"fields": [
{"name": "sepal_length_cm", "type": "FLOAT", "mode": "REQUIRED"},
{"name": "sepal_width_cm", "type": "FLOAT", "mode": "REQUIRED"},
{"name": "petal_length_cm", "type": "FLOAT", "mode": "REQUIRED"},
{"name": "petal_width_cm", "type": "FLOAT", "mode": "REQUIRED"},
{"name": "species", "type": "STRING", "mode": "NULLABLE"},
{"name": "source", "type": "STRING", "mode": "REQUIRED"},
{"name": "entity_id", "type": "STRING", "mode": "REQUIRED"},
{"name": "feature_timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"},
]
}


class ParsePubSubMessage(beam.DoFn):
"""Parse and validate JSON message from Pub/Sub using Pydantic schema."""

def process(self, element):
try:
message_data = json.loads(element.decode("utf-8"))
validated = PubSubIrisMessage(**message_data)
yield validated.model_dump()

except ValidationError as e:
logger.warning(f"Invalid message: {e}")
except (json.JSONDecodeError, AttributeError) as e:
logger.error(f"Error parsing message: {e}, message: {element}")


class MapToFeatureRow(beam.DoFn):
"""Rename validated Pub/Sub fields to canonical names and add Feature Store metadata."""

def process(self, element):
row = {
canonical: element[pubsub_key]
for pubsub_key, canonical in PUBSUB_TO_CANONICAL.items()
}

sample_id = element.get("sample_id") or uuid.uuid4().hex[:8]
row["species"] = None
row["source"] = "streaming"
row["entity_id"] = f"{sample_id}_streaming"
row["feature_timestamp"] = datetime.now(timezone.utc).isoformat()

yield row


def run_pipeline(argv=None):
"""Run the Dataflow streaming feature pipeline."""

parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
required=True,
help="Pub/Sub topic to read from (projects/PROJECT/topics/TOPIC)",
)
parser.add_argument(
"--output_table",
required=True,
help="BigQuery feature table (PROJECT:DATASET.TABLE)",
)
parser.add_argument("--project_id", required=True, help="GCP project ID")
parser.add_argument("--region", required=True, help="GCP Region")
parser.add_argument(
"--online_batch_size",
type=int,
default=100,
help="Max rows per online store write batch (default: 100)",
)
parser.add_argument(
"--online_store_id",
default="ml_online_store",
help="Feature Online Store ID (default: ml_online_store)",
)
parser.add_argument(
"--feature_view_id",
default="iris_features",
help="Feature View ID (default: iris_features)",
)
parser.add_argument(
"--no_wait",
action="store_true",
help="Submit the job and exit without waiting for it to finish",
)

known_args, pipeline_args = parser.parse_known_args(argv)
logger.info(f"Known args: {known_args}")
logger.info(f"Pipeline args: {pipeline_args}")

pipeline_options = PipelineOptions(pipeline_args)

google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = known_args.project_id
google_cloud_options.region = known_args.region

p = beam.Pipeline(options=pipeline_options)

feature_rows = (
p
| "Read from Pub/Sub" >> ReadFromPubSub(topic=known_args.input_topic)
| "Parse JSON" >> beam.ParDo(ParsePubSubMessage())
| "Map to Feature Row" >> beam.ParDo(MapToFeatureRow())
)

feature_rows | "Write to BQ (Offline Store)" >> WriteToBigQuery(
table=known_args.output_table,
schema=FEATURE_TABLE_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
)

(
feature_rows
| "Batch for Online Store"
>> BatchElements(
min_batch_size=1,
max_batch_size=known_args.online_batch_size,
)
| "Write to Online Store (Bigtable)"
>> beam.ParDo(
WriteToOnlineStore(
project_id=known_args.project_id,
region=known_args.region,
online_store_id=known_args.online_store_id,
feature_view_id=known_args.feature_view_id,
feature_columns=list(PUBSUB_TO_CANONICAL.values()),
)
)
)

result = p.run()
if not known_args.no_wait:
result.wait_until_finish()


if __name__ == "__main__":
run_pipeline()
16 changes: 16 additions & 0 deletions src/dataflow/models/iris_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import annotations

from typing import Optional

from pydantic import BaseModel


class PubSubIrisMessage(BaseModel):
"""Validates incoming Pub/Sub messages for the Iris feature pipeline."""

sepal_length: float
sepal_width: float
petal_length: float
petal_width: float
timestamp: Optional[str] = None
sample_id: Optional[int] = None
Loading
Loading