From fd730f8218310ac4d7ae16283d6df9a2812678df Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 18:06:08 -0400 Subject: [PATCH 1/2] Fix pubsub producer script path to work from any directory Resolves script path relative to script location instead of cwd, fixing 'No such file or directory' when running from scripts/. Co-Authored-By: Claude Opus 4.6 --- scripts/deploy_dataflow_feature.sh | 2 +- scripts/run_pubsub_producer.sh | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/deploy_dataflow_feature.sh b/scripts/deploy_dataflow_feature.sh index 674d44f..97092c7 100755 --- a/scripts/deploy_dataflow_feature.sh +++ b/scripts/deploy_dataflow_feature.sh @@ -53,7 +53,7 @@ echo "Environment: $ENV" echo "Monitor at: https://console.cloud.google.com/dataflow/jobs/$REGION/$JOB_NAME?project=$PROJECT_ID" echo "" echo "To test the pipeline:" -echo "1. Publish messages: python src/ml_pipelines_kfp/iris_xgboost/pubsub_producer.py --project-id=$PROJECT_ID" +echo "1. Publish messages: ./scripts/run_pubsub_producer.sh" echo "2. Check feature table: SELECT * FROM ml_dataset.iris_features WHERE source = 'streaming'" echo "" echo "To stop the job:" diff --git a/scripts/run_pubsub_producer.sh b/scripts/run_pubsub_producer.sh index 0abf3de..ebd785e 100755 --- a/scripts/run_pubsub_producer.sh +++ b/scripts/run_pubsub_producer.sh @@ -3,6 +3,8 @@ # Generate random Iris data and publish to Pub/Sub for streaming inference testing set -e +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" + PROJECT_ID="deeplearning-sahil" TOPIC="iris-inference-data" BATCH_SIZE=${1:-10} @@ -22,7 +24,7 @@ if [ -n "$DURATION" ]; then DURATION_ARG="--duration $DURATION" fi -python scripts/pubsub_producer.py \ +python "$SCRIPT_DIR/pubsub_producer.py" \ --project-id "$PROJECT_ID" \ --topic "$TOPIC" \ --batch-size "$BATCH_SIZE" \ From 163b7b8a7a96be22d381a3434313c7854e7bac55 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 18:07:46 -0400 Subject: [PATCH 2/2] Update README with correct Pub/Sub producer instructions and feature pipeline Co-Authored-By: Claude Opus 4.6 --- Readme.md | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/Readme.md b/Readme.md index 1da0762..75a6a45 100644 --- a/Readme.md +++ b/Readme.md @@ -180,7 +180,19 @@ ENVIRONMENT=prod python src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipelin Predictions are written to `ml_dataset.iris_predictions`. -### 5. Real-time Streaming Inference +### 5. Streaming Feature Ingestion + +Deploy a Dataflow streaming job that ingests Pub/Sub messages into the Feature Store (dual-writes to BQ offline store and Bigtable online store): + +```bash +# Staging +./scripts/deploy_dataflow_feature.sh staging + +# Production +./scripts/deploy_dataflow_feature.sh prod +``` + +### 6. Real-time Streaming Inference Deploy a Dataflow streaming job for real-time inference: @@ -192,12 +204,20 @@ Deploy a Dataflow streaming job for real-time inference: ./scripts/deploy_dataflow_streaming.sh prod ``` -Start generating test data: +### 7. Publish Pub/Sub Test Events + +Generate random Iris data and publish to Pub/Sub for testing streaming pipelines: ```bash -python src/ml_pipelines_kfp/iris_xgboost/pubsub_producer.py --project-id=deeplearning-sahil +# Default: batch_size=10, delay=5s, runs indefinitely +./scripts/run_pubsub_producer.sh + +# Custom: batch_size=20, delay=2s, duration=60s +./scripts/run_pubsub_producer.sh 20 2 60 ``` +This can be run from any directory — the script resolves paths automatically. + ## Development ### Code Quality