From e0e8e8cff330084445d90d22668d950ccd4b0757 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Sun, 14 Jun 2026 18:44:41 -0400 Subject: [PATCH 1/2] fix missing region in Dataflow pipeline options --region was consumed by argparse as a known arg but never forwarded to Beam's GoogleCloudOptions, causing DataflowRunner validation to fail. Co-Authored-By: Claude Opus 4.6 --- dataflow/iris_streaming_pipeline.py | 203 ++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 dataflow/iris_streaming_pipeline.py diff --git a/dataflow/iris_streaming_pipeline.py b/dataflow/iris_streaming_pipeline.py new file mode 100644 index 0000000..563090d --- /dev/null +++ b/dataflow/iris_streaming_pipeline.py @@ -0,0 +1,203 @@ +""" +Dataflow streaming pipeline for real-time Iris inference. +Reads from Pub/Sub, calls FastAPI ML service deployed via Kubeflow, writes predictions to BigQuery. +""" + +import json +import argparse +from typing import Any, Dict, List +import requests +import time + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.transforms import window +from apache_beam.io import ReadFromPubSub, WriteToBigQuery + +from ml_pipelines_kfp.log import get_logger + +logger = get_logger(__name__) + +PROJECT_ID = "deeplearning-sahil" +REGION = "us-central1" +MODEL_NAME = "Iris-Classifier-XGBoost" +FASTAPI_SERVICE_NAME = "iris-classifier-xgboost-service" + +PREDICTION_SCHEMA = { + "fields": [ + {"name": "sepal_length", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "sepal_width", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "petal_length", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "petal_width", "type": "FLOAT", "mode": "REQUIRED"}, + {"name": "timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, + {"name": "sample_id", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "prediction", "type": "STRING", "mode": "REQUIRED"}, + {"name": "prediction_timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, + {"name": "model_service", "type": "STRING", "mode": "REQUIRED"}, + {"name": "processing_time", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "dataflow_processing_time", "type": "TIMESTAMP", "mode": "REQUIRED"}, + ] +} + + +class ParsePubSubMessage(beam.DoFn): + """Parse JSON message from Pub/Sub.""" + + def process(self, element): + try: + message_data = json.loads(element.decode("utf-8")) + + required_fields = [ + "sepal_length", + "sepal_width", + "petal_length", + "petal_width", + ] + if all(field in message_data for field in required_fields): + yield message_data + else: + logger.warning(f"Missing required fields in message: {message_data}") + + except (json.JSONDecodeError, AttributeError) as e: + logger.error(f"Error parsing message: {e}, message: {element}") + + +class CallFastAPIService(beam.DoFn): + """Call FastAPI ML service for inference.""" + + def __init__(self, service_url: str): + self.service_url = service_url + self.predict_url = f"{service_url}/predict" + + def process(self, element): + import time + from datetime import datetime + import requests + + start_time = time.time() + + try: + payload = { + "instances": [ + { + "SepalLengthCm": element["sepal_length"], + "SepalWidthCm": element["sepal_width"], + "PetalLengthCm": element["petal_length"], + "PetalWidthCm": element["petal_width"], + } + ] + } + + response = requests.post(self.predict_url, json=payload, timeout=30) + response.raise_for_status() + + result_data = response.json() + predictions = result_data.get("predictions", []) + + if predictions: + prediction_result = predictions[0] + predicted_class = str(prediction_result.get("prediction", "unknown")) + else: + predicted_class = "unknown" + + processing_time = time.time() - start_time + + result = { + "sepal_length": element["sepal_length"], + "sepal_width": element["sepal_width"], + "petal_length": element["petal_length"], + "petal_width": element["petal_width"], + "timestamp": element.get("timestamp", datetime.utcnow().isoformat()), + "sample_id": element.get("sample_id", 0), + "prediction": predicted_class, + "prediction_timestamp": datetime.utcnow().isoformat(), + "model_service": self.service_url, + "processing_time": processing_time, + } + + logger.info( + f"Prediction for sample {element.get('sample_id')}: {predicted_class}" + ) + yield result + + except Exception as e: + logger.error(f"Error calling FastAPI service: {e}, element: {element}") + yield { + "sepal_length": element.get("sepal_length", 0.0), + "sepal_width": element.get("sepal_width", 0.0), + "petal_length": element.get("petal_length", 0.0), + "petal_width": element.get("petal_width", 0.0), + "timestamp": element.get("timestamp", datetime.utcnow().isoformat()), + "sample_id": element.get("sample_id", 0), + "prediction": "ERROR", + "prediction_timestamp": datetime.utcnow().isoformat(), + "model_service": f"ERROR: {str(e)}", + "processing_time": time.time() - start_time, + } + + +class AddProcessingMetadata(beam.DoFn): + """Add processing metadata to records.""" + + def process(self, element): + from datetime import datetime + + element["dataflow_processing_time"] = datetime.utcnow().isoformat() + + yield element + + +def run_pipeline(argv=None): + """Run the Dataflow streaming pipeline.""" + + parser = argparse.ArgumentParser() + parser.add_argument( + "--input_topic", + required=True, + help="Pub/Sub topic to read from (projects/PROJECT/topics/TOPIC)", + ) + parser.add_argument( + "--output_table", + required=True, + help="BigQuery table to write to (PROJECT:DATASET.TABLE)", + ) + parser.add_argument("--project_id", required=True, help="Project ID") + parser.add_argument("--region", required=True, help="GCP Region") + parser.add_argument("--service_url", required=True, help="FastAPI service URL") + + known_args, pipeline_args = parser.parse_known_args(argv) + logger.info(f"Known args: {known_args}") + logger.info(f"Pipeline args: {pipeline_args}") + + pipeline_options = PipelineOptions(pipeline_args) + + from apache_beam.options.pipeline_options import GoogleCloudOptions + + google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) + google_cloud_options.project = known_args.project_id + google_cloud_options.region = known_args.region + + with beam.Pipeline(options=pipeline_options) as pipeline: + + predictions = ( + pipeline + | "Read from Pub/Sub" >> ReadFromPubSub(topic=known_args.input_topic) + | "Parse JSON" >> beam.ParDo(ParsePubSubMessage()) + | "Call FastAPI Service" + >> beam.ParDo(CallFastAPIService(known_args.service_url)) + | "Add Metadata" >> beam.ParDo(AddProcessingMetadata()) + | "Write to BigQuery" + >> WriteToBigQuery( + table=known_args.output_table, + schema=PREDICTION_SCHEMA, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + additional_bq_parameters={ + "timePartitioning": {"type": "DAY", "field": "prediction_timestamp"} + }, + ) + ) + + +if __name__ == "__main__": + run_pipeline() From 7d90504041fe426daf62ca03e97317ba00d5f12b Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Sun, 14 Jun 2026 18:45:52 -0400 Subject: [PATCH 2/2] remanining changes --- .github/workflows/deploy-dataflow.yaml | 2 +- .../iris_xgboost => scripts}/bq_dataloader.py | 0 .../iris_xgboost => scripts}/load_data.sh | 0 .../pubsub_producer.py | 0 .../dataflow}/iris_streaming_pipeline.py | 0 .../dataflow/iris_streaming_pipeline.py | 202 ------------------ .../test_inference.ipynb | 0 test/test_pubsub.py | 2 +- 8 files changed, 2 insertions(+), 204 deletions(-) rename {src/ml_pipelines_kfp/iris_xgboost => scripts}/bq_dataloader.py (100%) rename {src/ml_pipelines_kfp/iris_xgboost => scripts}/load_data.sh (100%) rename {src/ml_pipelines_kfp/iris_xgboost => scripts}/pubsub_producer.py (100%) rename {dataflow => src/dataflow}/iris_streaming_pipeline.py (100%) delete mode 100644 src/ml_pipelines_kfp/dataflow/iris_streaming_pipeline.py rename src/ml_pipelines_kfp/{iris_xgboost/pipelines => notebooks}/test_inference.ipynb (100%) diff --git a/.github/workflows/deploy-dataflow.yaml b/.github/workflows/deploy-dataflow.yaml index bc2a8fd..67331df 100644 --- a/.github/workflows/deploy-dataflow.yaml +++ b/.github/workflows/deploy-dataflow.yaml @@ -72,7 +72,7 @@ jobs: JOB_NAME="${{ env.JOB_PREFIX }}-$(date +%Y%m%d-%H%M%S)" echo "Submitting ${{ inputs.environment }} job: $JOB_NAME" - python src/ml_pipelines_kfp/dataflow/iris_streaming_pipeline.py \ + python src/dataflow/iris_streaming_pipeline.py \ --input_topic ${{ env.PUBSUB_TOPIC }} \ --output_table ${{ env.OUTPUT_TABLE }} \ --project_id ${{ env.PROJECT_ID }} \ diff --git a/src/ml_pipelines_kfp/iris_xgboost/bq_dataloader.py b/scripts/bq_dataloader.py similarity index 100% rename from src/ml_pipelines_kfp/iris_xgboost/bq_dataloader.py rename to scripts/bq_dataloader.py diff --git a/src/ml_pipelines_kfp/iris_xgboost/load_data.sh b/scripts/load_data.sh similarity index 100% rename from src/ml_pipelines_kfp/iris_xgboost/load_data.sh rename to scripts/load_data.sh diff --git a/src/ml_pipelines_kfp/iris_xgboost/pubsub_producer.py b/scripts/pubsub_producer.py similarity index 100% rename from src/ml_pipelines_kfp/iris_xgboost/pubsub_producer.py rename to scripts/pubsub_producer.py diff --git a/dataflow/iris_streaming_pipeline.py b/src/dataflow/iris_streaming_pipeline.py similarity index 100% rename from dataflow/iris_streaming_pipeline.py rename to src/dataflow/iris_streaming_pipeline.py diff --git a/src/ml_pipelines_kfp/dataflow/iris_streaming_pipeline.py b/src/ml_pipelines_kfp/dataflow/iris_streaming_pipeline.py deleted file mode 100644 index 57943c8..0000000 --- a/src/ml_pipelines_kfp/dataflow/iris_streaming_pipeline.py +++ /dev/null @@ -1,202 +0,0 @@ -""" -Dataflow streaming pipeline for real-time Iris inference. -Reads from Pub/Sub, calls FastAPI ML service deployed via Kubeflow, writes predictions to BigQuery. -""" - -import json -import argparse -from typing import Any, Dict, List -import requests -import time - -import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.transforms import window -from apache_beam.io import ReadFromPubSub, WriteToBigQuery - -from ml_pipelines_kfp.log import get_logger - -logger = get_logger(__name__) - -PROJECT_ID = "deeplearning-sahil" -REGION = "us-central1" -MODEL_NAME = "Iris-Classifier-XGBoost" -FASTAPI_SERVICE_NAME = "iris-classifier-xgboost-service" - -PREDICTION_SCHEMA = { - "fields": [ - {"name": "sepal_length", "type": "FLOAT", "mode": "REQUIRED"}, - {"name": "sepal_width", "type": "FLOAT", "mode": "REQUIRED"}, - {"name": "petal_length", "type": "FLOAT", "mode": "REQUIRED"}, - {"name": "petal_width", "type": "FLOAT", "mode": "REQUIRED"}, - {"name": "timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, - {"name": "sample_id", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "prediction", "type": "STRING", "mode": "REQUIRED"}, - {"name": "prediction_timestamp", "type": "TIMESTAMP", "mode": "REQUIRED"}, - {"name": "model_service", "type": "STRING", "mode": "REQUIRED"}, - {"name": "processing_time", "type": "FLOAT", "mode": "NULLABLE"}, - {"name": "dataflow_processing_time", "type": "TIMESTAMP", "mode": "REQUIRED"}, - ] -} - - -class ParsePubSubMessage(beam.DoFn): - """Parse JSON message from Pub/Sub.""" - - def process(self, element): - try: - message_data = json.loads(element.decode("utf-8")) - - required_fields = [ - "sepal_length", - "sepal_width", - "petal_length", - "petal_width", - ] - if all(field in message_data for field in required_fields): - yield message_data - else: - logger.warning(f"Missing required fields in message: {message_data}") - - except (json.JSONDecodeError, AttributeError) as e: - logger.error(f"Error parsing message: {e}, message: {element}") - - -class CallFastAPIService(beam.DoFn): - """Call FastAPI ML service for inference.""" - - def __init__(self, service_url: str): - self.service_url = service_url - self.predict_url = f"{service_url}/predict" - - def process(self, element): - import time - from datetime import datetime - import requests - - start_time = time.time() - - try: - payload = { - "instances": [ - { - "SepalLengthCm": element["sepal_length"], - "SepalWidthCm": element["sepal_width"], - "PetalLengthCm": element["petal_length"], - "PetalWidthCm": element["petal_width"], - } - ] - } - - response = requests.post(self.predict_url, json=payload, timeout=30) - response.raise_for_status() - - result_data = response.json() - predictions = result_data.get("predictions", []) - - if predictions: - prediction_result = predictions[0] - predicted_class = str(prediction_result.get("prediction", "unknown")) - else: - predicted_class = "unknown" - - processing_time = time.time() - start_time - - result = { - "sepal_length": element["sepal_length"], - "sepal_width": element["sepal_width"], - "petal_length": element["petal_length"], - "petal_width": element["petal_width"], - "timestamp": element.get("timestamp", datetime.utcnow().isoformat()), - "sample_id": element.get("sample_id", 0), - "prediction": predicted_class, - "prediction_timestamp": datetime.utcnow().isoformat(), - "model_service": self.service_url, - "processing_time": processing_time, - } - - logger.info( - f"Prediction for sample {element.get('sample_id')}: {predicted_class}" - ) - yield result - - except Exception as e: - logger.error(f"Error calling FastAPI service: {e}, element: {element}") - yield { - "sepal_length": element.get("sepal_length", 0.0), - "sepal_width": element.get("sepal_width", 0.0), - "petal_length": element.get("petal_length", 0.0), - "petal_width": element.get("petal_width", 0.0), - "timestamp": element.get("timestamp", datetime.utcnow().isoformat()), - "sample_id": element.get("sample_id", 0), - "prediction": "ERROR", - "prediction_timestamp": datetime.utcnow().isoformat(), - "model_service": f"ERROR: {str(e)}", - "processing_time": time.time() - start_time, - } - - -class AddProcessingMetadata(beam.DoFn): - """Add processing metadata to records.""" - - def process(self, element): - from datetime import datetime - - element["dataflow_processing_time"] = datetime.utcnow().isoformat() - - yield element - - -def run_pipeline(argv=None): - """Run the Dataflow streaming pipeline.""" - - parser = argparse.ArgumentParser() - parser.add_argument( - "--input_topic", - required=True, - help="Pub/Sub topic to read from (projects/PROJECT/topics/TOPIC)", - ) - parser.add_argument( - "--output_table", - required=True, - help="BigQuery table to write to (PROJECT:DATASET.TABLE)", - ) - parser.add_argument("--project_id", required=True, help="Project ID") - parser.add_argument("--region", required=True, help="GCP Region") - parser.add_argument("--service_url", required=True, help="FastAPI service URL") - - known_args, pipeline_args = parser.parse_known_args(argv) - logger.info(f"Known args: {known_args}") - logger.info(f"Pipeline args: {pipeline_args}") - - pipeline_options = PipelineOptions(pipeline_args) - - from apache_beam.options.pipeline_options import GoogleCloudOptions - - google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) - google_cloud_options.project = known_args.project_id - - with beam.Pipeline(options=pipeline_options) as pipeline: - - predictions = ( - pipeline - | "Read from Pub/Sub" >> ReadFromPubSub(topic=known_args.input_topic) - | "Parse JSON" >> beam.ParDo(ParsePubSubMessage()) - | "Call FastAPI Service" - >> beam.ParDo(CallFastAPIService(known_args.service_url)) - | "Add Metadata" >> beam.ParDo(AddProcessingMetadata()) - | "Write to BigQuery" - >> WriteToBigQuery( - table=known_args.output_table, - schema=PREDICTION_SCHEMA, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - additional_bq_parameters={ - "timePartitioning": {"type": "DAY", "field": "prediction_timestamp"} - }, - ) - ) - - -if __name__ == "__main__": - run_pipeline() diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/test_inference.ipynb b/src/ml_pipelines_kfp/notebooks/test_inference.ipynb similarity index 100% rename from src/ml_pipelines_kfp/iris_xgboost/pipelines/test_inference.ipynb rename to src/ml_pipelines_kfp/notebooks/test_inference.ipynb diff --git a/test/test_pubsub.py b/test/test_pubsub.py index 1b2dde6..c8a72f5 100755 --- a/test/test_pubsub.py +++ b/test/test_pubsub.py @@ -18,7 +18,7 @@ PUBSUB_SUBSCRIPTION, PROJECT_ID ) -from ml_pipelines_kfp.iris_xgboost.pubsub_producer import IrisDataPubSubProducer +from scripts.pubsub_producer import IrisDataPubSubProducer def test_pubsub_connection():