diff --git a/.github/workflows/deploy-dataflow-feature.yaml b/.github/workflows/deploy-dataflow-feature.yaml new file mode 100644 index 0000000..5647c27 --- /dev/null +++ b/.github/workflows/deploy-dataflow-feature.yaml @@ -0,0 +1,110 @@ +name: Deploy Dataflow Feature Pipeline + +on: + workflow_dispatch: + inputs: + environment: + description: "Target environment" + type: choice + options: + - staging + - prod + default: staging + region: + description: "GCP region for Dataflow job" + type: choice + options: + - us-central1 + - us-east1 + - us-west1 + default: us-central1 + worker_machine_type: + description: "Worker VM machine type" + type: choice + options: + - e2-standard-2 + - n1-standard-2 + - e2-standard-4 + - n1-standard-4 + default: e2-standard-2 + online_batch_size: + description: "Max rows per online store write batch" + type: choice + options: + - "50" + - "100" + - "200" + default: "100" + +env: + PROJECT_ID: "deeplearning-sahil" + REGION: ${{ inputs.region }} + PUBSUB_TOPIC: "projects/deeplearning-sahil/topics/iris-inference-data" + TEMP_LOCATION: "gs://sb-vertex/temp" + STAGING_LOCATION: "gs://sb-vertex/staging" + SERVICE_ACCOUNT_EMAIL: "kfp-mlops@deeplearning-sahil.iam.gserviceaccount.com" + +jobs: + deploy: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set environment-specific variables + run: | + if [ "${{ inputs.environment }}" = "prod" ]; then + echo "OUTPUT_TABLE=${{ env.PROJECT_ID }}:ml_dataset.iris_features" >> $GITHUB_ENV + echo "JOB_PREFIX=iris-streaming-features" >> $GITHUB_ENV + else + echo "OUTPUT_TABLE=${{ env.PROJECT_ID }}:ml_dataset.iris_features" >> $GITHUB_ENV + echo "JOB_PREFIX=iris-streaming-features-staging" >> $GITHUB_ENV + fi + + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: "3.10" + + - name: Install dependencies + run: pip install -e . + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v1 + with: + credentials_json: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} + + - name: Set up gcloud CLI + uses: google-github-actions/setup-gcloud@v1 + with: + project_id: ${{ env.PROJECT_ID }} + + - name: Deploy Dataflow feature pipeline + run: | + JOB_NAME="${{ env.JOB_PREFIX }}-$(date +%Y%m%d-%H%M%S)" + echo "Submitting ${{ inputs.environment }} job: $JOB_NAME" + + python src/dataflow/iris_feature_pipeline.py \ + --input_topic ${{ env.PUBSUB_TOPIC }} \ + --output_table ${{ env.OUTPUT_TABLE }} \ + --project_id ${{ env.PROJECT_ID }} \ + --region ${{ env.REGION }} \ + --online_batch_size ${{ inputs.online_batch_size }} \ + --runner DataflowRunner \ + --job_name $JOB_NAME \ + --temp_location ${{ env.TEMP_LOCATION }} \ + --staging_location ${{ env.STAGING_LOCATION }} \ + --service_account_email ${{ env.SERVICE_ACCOUNT_EMAIL }} \ + --use_public_ips \ + --worker_machine_type ${{ inputs.worker_machine_type }} \ + --max_num_workers 3 \ + --autoscaling_algorithm THROUGHPUT_BASED \ + --streaming \ + --enable_streaming_engine \ + --experiments use_runner_v2 \ + --no_wait + + echo "Job submitted: $JOB_NAME" + echo "Environment: ${{ inputs.environment }}" + echo "Monitor: https://console.cloud.google.com/dataflow/jobs/${{ env.REGION }}?project=${{ env.PROJECT_ID }}" diff --git a/docs/gcp_feature_platform.md b/docs/gcp_feature_platform.md index 717c1d1..438a092 100644 --- a/docs/gcp_feature_platform.md +++ b/docs/gcp_feature_platform.md @@ -186,6 +186,12 @@ Decouples feature ingestion from inference. This pipeline's only job is to persi - Triggers `FeatureView.sync()` periodically or per-batch so the online store stays fresh - Writes to BQ only (no model calls, no prediction output) +**Create `.github/workflows/deploy-dataflow-feature.yaml`** — GitHub Action to deploy the feature pipeline: +- `workflow_dispatch` with inputs for environment (staging/prod), region, and worker machine type +- Same pattern as existing `deploy-dataflow.yaml` (checkout, Python setup, install, GCP auth, gcloud CLI) +- No Cloud Run service URL needed — this pipeline only writes to BQ, no model calls +- Environment-specific job prefix (`iris-streaming-features-staging` / `iris-streaming-features`) + ### Step 10: Streaming Inference Pipeline (Feature Store → Predictions) This pipeline reads features from the online store and runs inference — fully decoupled from feature ingestion. diff --git a/scripts/deploy_dataflow_feature.sh b/scripts/deploy_dataflow_feature.sh new file mode 100755 index 0000000..674d44f --- /dev/null +++ b/scripts/deploy_dataflow_feature.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +# Deploy Dataflow streaming job for ingesting Pub/Sub data into the Feature Store +set -e + +# Environment: staging (default) or prod +ENV=${1:-staging} + +# Configuration +PROJECT_ID="deeplearning-sahil" +REGION="us-central1" +PUBSUB_TOPIC="projects/$PROJECT_ID/topics/iris-inference-data" +TEMP_LOCATION="gs://sb-vertex/temp" +STAGING_LOCATION="gs://sb-vertex/staging" +SERVICE_ACCOUNT="kfp-mlops@deeplearning-sahil.iam.gserviceaccount.com" +ONLINE_BATCH_SIZE=100 + +if [ "$ENV" = "prod" ]; then + JOB_PREFIX="iris-streaming-features" + OUTPUT_TABLE="$PROJECT_ID:ml_dataset.iris_features" +else + JOB_PREFIX="iris-streaming-features-staging" + OUTPUT_TABLE="$PROJECT_ID:ml_dataset.iris_features" +fi + +JOB_NAME="$JOB_PREFIX-$(date +%Y%m%d-%H%M%S)" + +echo "Deploying Dataflow feature pipeline ($ENV)..." +echo "Output table: $OUTPUT_TABLE" + +python src/dataflow/iris_feature_pipeline.py \ + --input_topic $PUBSUB_TOPIC \ + --output_table $OUTPUT_TABLE \ + --project_id $PROJECT_ID \ + --region $REGION \ + --online_batch_size $ONLINE_BATCH_SIZE \ + --runner DataflowRunner \ + --job_name $JOB_NAME \ + --temp_location $TEMP_LOCATION \ + --staging_location $STAGING_LOCATION \ + --service_account_email $SERVICE_ACCOUNT \ + --use_public_ips \ + --max_num_workers 3 \ + --autoscaling_algorithm THROUGHPUT_BASED \ + --streaming \ + --enable_streaming_engine \ + --experiments use_runner_v2 \ + --no_wait + +echo "Dataflow job submitted successfully!" +echo "Job name: $JOB_NAME" +echo "Environment: $ENV" +echo "Monitor at: https://console.cloud.google.com/dataflow/jobs/$REGION/$JOB_NAME?project=$PROJECT_ID" +echo "" +echo "To test the pipeline:" +echo "1. Publish messages: python src/ml_pipelines_kfp/iris_xgboost/pubsub_producer.py --project-id=$PROJECT_ID" +echo "2. Check feature table: SELECT * FROM ml_dataset.iris_features WHERE source = 'streaming'" +echo "" +echo "To stop the job:" +echo "gcloud dataflow jobs cancel $JOB_NAME --region=$REGION --project=$PROJECT_ID" diff --git a/src/dataflow/iris_feature_pipeline.py b/src/dataflow/iris_feature_pipeline.py new file mode 100644 index 0000000..9d28334 --- /dev/null +++ b/src/dataflow/iris_feature_pipeline.py @@ -0,0 +1,174 @@ +""" +Dataflow streaming pipeline for ingesting Pub/Sub messages into the Feature Store. +Reads from Pub/Sub, renames fields to canonical names, and dual-writes to: + - BQ iris_features table (offline store, for training/batch) + - Bigtable online store (real-time serving, sub-second latency) + +No model calls — this pipeline only persists features. +""" + +import json +import argparse +import logging +import uuid +from datetime import datetime, timezone + +import apache_beam as beam +from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions +from apache_beam.io import ReadFromPubSub, WriteToBigQuery +from apache_beam.transforms.util import BatchElements +from pydantic import ValidationError + +from dataflow.models.iris_schema import PubSubIrisMessage +from dataflow.utils.online_store_writer import WriteToOnlineStore +from ml_pipelines_kfp.log import get_logger + +logger = get_logger(__name__) + +PROJECT_ID = "deeplearning-sahil" +REGION = "us-central1" + +PUBSUB_TO_CANONICAL = { + "sepal_length": "sepal_length_cm", + "sepal_width": "sepal_width_cm", + "petal_length": "petal_length_cm", + "petal_width": "petal_width_cm", +} + +FEATURE_TABLE_SCHEMA = { + "fields": [ + {"name": "sepal_length_cm", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "sepal_width_cm", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "petal_length_cm", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "petal_width_cm", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "species", "type": "STRING", "mode": "NULLABLE"}, + {"name": "source", "type": "STRING", "mode": "REQUIRED"}, + {"name": "entity_id", "type": "STRING", "mode": "REQUIRED"}, + {"name": "feature_timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, + ] +} + + +class ParsePubSubMessage(beam.DoFn): + """Parse and validate JSON message from Pub/Sub using Pydantic schema.""" + + def process(self, element): + try: + message_data = json.loads(element.decode("utf-8")) + validated = PubSubIrisMessage(**message_data) + yield validated.model_dump() + + except ValidationError as e: + logger.warning(f"Invalid message: {e}") + except (json.JSONDecodeError, AttributeError) as e: + logger.error(f"Error parsing message: {e}, message: {element}") + + +class MapToFeatureRow(beam.DoFn): + """Rename validated Pub/Sub fields to canonical names and add Feature Store metadata.""" + + def process(self, element): + row = { + canonical: element[pubsub_key] + for pubsub_key, canonical in PUBSUB_TO_CANONICAL.items() + } + + sample_id = element.get("sample_id") or uuid.uuid4().hex[:8] + row["species"] = None + row["source"] = "streaming" + row["entity_id"] = f"{sample_id}_streaming" + row["feature_timestamp"] = datetime.now(timezone.utc).isoformat() + + yield row + + +def run_pipeline(argv=None): + """Run the Dataflow streaming feature pipeline.""" + + parser = argparse.ArgumentParser() + parser.add_argument( + "--input_topic", + required=True, + help="Pub/Sub topic to read from (projects/PROJECT/topics/TOPIC)", + ) + parser.add_argument( + "--output_table", + required=True, + help="BigQuery feature table (PROJECT:DATASET.TABLE)", + ) + parser.add_argument("--project_id", required=True, help="GCP project ID") + parser.add_argument("--region", required=True, help="GCP Region") + parser.add_argument( + "--online_batch_size", + type=int, + default=100, + help="Max rows per online store write batch (default: 100)", + ) + parser.add_argument( + "--online_store_id", + default="ml_online_store", + help="Feature Online Store ID (default: ml_online_store)", + ) + parser.add_argument( + "--feature_view_id", + default="iris_features", + help="Feature View ID (default: iris_features)", + ) + parser.add_argument( + "--no_wait", + action="store_true", + help="Submit the job and exit without waiting for it to finish", + ) + + known_args, pipeline_args = parser.parse_known_args(argv) + logger.info(f"Known args: {known_args}") + logger.info(f"Pipeline args: {pipeline_args}") + + pipeline_options = PipelineOptions(pipeline_args) + + google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) + google_cloud_options.project = known_args.project_id + google_cloud_options.region = known_args.region + + p = beam.Pipeline(options=pipeline_options) + + feature_rows = ( + p + | "Read from Pub/Sub" >> ReadFromPubSub(topic=known_args.input_topic) + | "Parse JSON" >> beam.ParDo(ParsePubSubMessage()) + | "Map to Feature Row" >> beam.ParDo(MapToFeatureRow()) + ) + + feature_rows | "Write to BQ (Offline Store)" >> WriteToBigQuery( + table=known_args.output_table, + schema=FEATURE_TABLE_SCHEMA, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, + ) + + ( + feature_rows + | "Batch for Online Store" + >> BatchElements( + min_batch_size=1, + max_batch_size=known_args.online_batch_size, + ) + | "Write to Online Store (Bigtable)" + >> beam.ParDo( + WriteToOnlineStore( + project_id=known_args.project_id, + region=known_args.region, + online_store_id=known_args.online_store_id, + feature_view_id=known_args.feature_view_id, + feature_columns=list(PUBSUB_TO_CANONICAL.values()), + ) + ) + ) + + result = p.run() + if not known_args.no_wait: + result.wait_until_finish() + + +if __name__ == "__main__": + run_pipeline() diff --git a/src/dataflow/models/iris_schema.py b/src/dataflow/models/iris_schema.py new file mode 100644 index 0000000..06a3329 --- /dev/null +++ b/src/dataflow/models/iris_schema.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel + + +class PubSubIrisMessage(BaseModel): + """Validates incoming Pub/Sub messages for the Iris feature pipeline.""" + + sepal_length: float + sepal_width: float + petal_length: float + petal_width: float + timestamp: Optional[str] = None + sample_id: Optional[int] = None diff --git a/src/dataflow/utils/online_store_writer.py b/src/dataflow/utils/online_store_writer.py new file mode 100644 index 0000000..8205108 --- /dev/null +++ b/src/dataflow/utils/online_store_writer.py @@ -0,0 +1,69 @@ +import logging + +import apache_beam as beam +from google.cloud.aiplatform_v1 import FeatureOnlineStoreServiceClient +from google.cloud.aiplatform_v1.types import ( + WriteFeatureValuesRequest, + WriteFeatureValuesPayload, + FeatureValue, +) + +logger = logging.getLogger(__name__) + + +class WriteToOnlineStore(beam.DoFn): + """Write a batch of feature rows directly to the Feature Store online store (Bigtable). + + Expects each batch element to be a dict with 'entity_id' and feature columns. + Float features use double_value, string features use string_value. + """ + + def __init__(self, project_id, region, online_store_id, feature_view_id, feature_columns): + self.project_id = project_id + self.region = region + self.online_store_id = online_store_id + self.feature_view_id = feature_view_id + self.feature_columns = feature_columns + + def setup(self): + self._client = FeatureOnlineStoreServiceClient( + client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"} + ) + self._feature_view_name = ( + f"projects/{self.project_id}/locations/{self.region}" + f"/featureOnlineStores/{self.online_store_id}" + f"/featureViews/{self.feature_view_id}" + ) + + def process(self, batch): + payloads = [] + for row in batch: + feature_values = {} + for col in self.feature_columns: + val = row.get(col) + if val is None: + continue + if isinstance(val, (int, float)): + feature_values[col] = FeatureValue(double_value=float(val)) + else: + feature_values[col] = FeatureValue(string_value=str(val)) + + payloads.append( + WriteFeatureValuesPayload( + entity_id=row["entity_id"], + feature_values=feature_values, + ) + ) + + try: + self._client.write_feature_values( + request=WriteFeatureValuesRequest( + feature_view=self._feature_view_name, + payloads=payloads, + ) + ) + logger.info(f"Wrote {len(payloads)} rows to online store") + except Exception as e: + logger.error(f"Online store write failed ({len(payloads)} rows): {e}") + + yield from batch