From c0d295da792e9834ff054ee12c45f48f6dba3f71 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 09:03:55 -0400 Subject: [PATCH 01/14] Switch batch inference to read from Feature Store offline store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Read from canonical feature table (iris_features) with server-side source='batch_input' filter instead of raw BQ tables. Remove the conditional column rename hack — canonical table has consistent names regardless of data source. Co-Authored-By: Claude Opus 4.6 --- .../pipelines/components/inference.py | 41 ++++++++----------- .../pipelines/iris_pipeline_inference.py | 10 ++--- 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py index 1e2856f..33fbc37 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py @@ -7,7 +7,7 @@ def inference_model( project_id: str, location: str, bq_dataset: str, - bq_table: str, + bq_feature_table: str, bq_table_predictions: str, model: Input[Model], ): @@ -22,30 +22,21 @@ def inference_model( client = bigquery.Client(project=project_id) - dataset_ref = bigquery.DatasetReference(project_id, bq_dataset) - table_ref = dataset_ref.table(bq_table) - table = bigquery.Table(table_ref) - iterable_table = client.list_rows(table).to_dataframe_iterable() - - dfs = [] - for row in iterable_table: - dfs.append(row) - - df = pd.concat(dfs, ignore_index=True) - - if bq_table == "iris_pubsub_data": - df_cols = df[ - ["sepal_length", "sepal_width", "petal_length", "petal_width"] - ].rename( - columns={ - "sepal_length": "SepalLengthCm", - "sepal_width": "SepalWidthCm", - "petal_length": "PetalLengthCm", - "petal_width": "PetalWidthCm", - } - ) - else: - df_cols = df[["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"]] + query = f""" + SELECT * FROM `{project_id}.{bq_dataset}.{bq_feature_table}` + WHERE source = 'batch_input' + """ + df = client.query(query).result().to_dataframe() + + logger.info(f"Loaded {len(df)} batch inference rows from feature store") + + feature_cols = [ + "sepal_length_cm", + "sepal_width_cm", + "petal_length_cm", + "petal_width_cm", + ] + df_cols = df[feature_cols] inf_model = joblib.load(model.path + "/model.joblib") inf_pred = inf_model.predict(df_cols) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py index 459f2f6..b04123c 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py @@ -13,7 +13,7 @@ REGION, SERVICE_ACCOUNT, BQ_DATASET, - BQ_TABLE, + BQ_FEATURE_TABLE, BQ_TABLE_PREDICTIONS, SERVICE_ACCOUNT_PATH, ) @@ -24,7 +24,7 @@ def pipeline( project_id: str, location: str, bq_dataset: str, - bq_table: str, + bq_feature_table: str, bq_table_predictions: str, ): @@ -46,7 +46,7 @@ def pipeline( location=location, model=get_model_op.outputs["latest_model"], bq_dataset=bq_dataset, - bq_table=bq_table, + bq_feature_table=bq_feature_table, bq_table_predictions=bq_table_predictions, ) .set_display_name("Inference Model") @@ -61,7 +61,7 @@ def pipeline( parser.add_argument("--project-id", default=PROJECT_ID) parser.add_argument("--region", default=REGION) parser.add_argument("--bq-dataset", default=BQ_DATASET) - parser.add_argument("--bq-table", default=BQ_TABLE) + parser.add_argument("--bq-feature-table", default=BQ_FEATURE_TABLE) parser.add_argument("--bq-table-predictions", default=BQ_TABLE_PREDICTIONS) parser.add_argument("--service-account-path", default=SERVICE_ACCOUNT_PATH) cli = parser.parse_args() @@ -85,7 +85,7 @@ def pipeline( enable_caching=False, parameter_values={ "bq_dataset": cli.bq_dataset, - "bq_table": cli.bq_table, + "bq_feature_table": cli.bq_feature_table, "bq_table_predictions": cli.bq_table_predictions, "location": cli.region, "project_id": cli.project_id, From aa2a6410e9de0a0c16ee48190cb481ed8767b3c6 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 09:12:21 -0400 Subject: [PATCH 02/14] Add training/inference suffix to pipeline names Append -training and -inference to PIPELINE_NAME in each pipeline file so they show as distinct pipelines in Vertex AI (e.g. pipeline-iris-staging-training, pipeline-iris-staging-inference). Co-Authored-By: Claude Opus 4.6 --- .../iris_xgboost/pipelines/iris_pipeline_inference.py | 6 +++--- .../iris_xgboost/pipelines/iris_pipeline_training.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py index b04123c..27197eb 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py @@ -19,7 +19,7 @@ ) -@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT) +@kfp.dsl.pipeline(name=f"{PIPELINE_NAME}-inference", pipeline_root=PIPELINE_ROOT) def pipeline( project_id: str, location: str, @@ -76,10 +76,10 @@ def pipeline( kfp.compiler.Compiler().compile( pipeline_func=pipeline, package_path="pipeline.yaml", - pipeline_name=PIPELINE_NAME, + pipeline_name=f"{PIPELINE_NAME}-inference", ) job = aip.PipelineJob( - display_name=PIPELINE_NAME, + display_name=f"{PIPELINE_NAME}-inference", template_path="pipeline.yaml", pipeline_root=PIPELINE_ROOT, enable_caching=False, diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_training.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_training.py index 5602667..e6444ec 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_training.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_training.py @@ -28,7 +28,7 @@ def coalesce(*args): return next((a for a in args if a is not None), None) -@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT) +@kfp.dsl.pipeline(name=f"{PIPELINE_NAME}-training", pipeline_root=PIPELINE_ROOT) def pipeline(project_id: str, location: str, bq_dataset: str, bq_feature_table: str): # Import components @@ -121,7 +121,7 @@ def pipeline(project_id: str, location: str, bq_dataset: str, bq_feature_table: # _constants.IMAGE_NAME needed: deploy.py/schema.py use it as base_image in @component decorator IMAGE_NAME = _constants.IMAGE_NAME = coalesce(cli.image_name, _constants.IMAGE_NAME) FASTAPI_IMAGE_NAME = coalesce(cli.fastapi_image_name, FASTAPI_IMAGE_NAME) - pipeline_name = coalesce(cli.pipeline_name, PIPELINE_NAME) + pipeline_name = coalesce(cli.pipeline_name, f"{PIPELINE_NAME}-training") pipeline_root = coalesce(cli.pipeline_root, PIPELINE_ROOT) bq_dataset = coalesce(cli.bq_dataset, BQ_DATASET) bq_table = coalesce(cli.bq_table, BQ_TABLE) From 2ee625d7313c5fdabf830644f15335e46f4c28de Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 09:53:50 -0400 Subject: [PATCH 03/14] Fix get_model to fetch the latest model version, not the oldest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit list_models returned models in creation order and [0] grabbed the first (oldest) version — trained with CamelCase columns before the feature store migration. Sort by create_time descending so [0] is the most recently registered model. Co-Authored-By: Claude Opus 4.6 --- .../iris_xgboost/pipelines/components/get_model.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py index 2eee776..8e364aa 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py @@ -28,6 +28,7 @@ def get_model( "filter": f"display_name={model_name}", } parent_models = list(client.list_models(request=request)) + parent_models.sort(key=lambda m: m.create_time, reverse=True) parent_model = parent_models[0] if parent_models else None if not parent_model: From bfa47bd567ac5efa3fb79e45dad4654bd3617a5f Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 10:05:30 -0400 Subject: [PATCH 04/14] Simplify Instance model to use canonical feature names only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop CamelCase aliases and ConfigDict — field names match the feature platform directly. No backward compat needed since the model is retrained on canonical names. Co-Authored-By: Claude Opus 4.6 --- .../iris_xgboost/models/instance.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/ml_pipelines_kfp/iris_xgboost/models/instance.py b/src/ml_pipelines_kfp/iris_xgboost/models/instance.py index ab0023e..c443c1b 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/models/instance.py +++ b/src/ml_pipelines_kfp/iris_xgboost/models/instance.py @@ -1,13 +1,10 @@ from __future__ import annotations -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel class Instance(BaseModel): - # Accept both canonical names (sepal_length_cm) and aliases (SepalLengthCm) in input - model_config = ConfigDict(populate_by_name=True) - - sepal_length_cm: float = Field(alias="SepalLengthCm") - sepal_width_cm: float = Field(alias="SepalWidthCm") - petal_length_cm: float = Field(alias="PetalLengthCm") - petal_width_cm: float = Field(alias="PetalWidthCm") + sepal_length_cm: float + sepal_width_cm: float + petal_length_cm: float + petal_width_cm: float From d6c9f6a667a13cda476f783dbcd27a22095c6b1f Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 10:07:03 -0400 Subject: [PATCH 05/14] Update instance schema to canonical feature names Also fix sepal_width_cm type from integer to number to match the other feature fields. Co-Authored-By: Claude Opus 4.6 --- .../schemas/iris_xgboost/vertex/instance.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ml_pipelines_kfp/schemas/iris_xgboost/vertex/instance.yaml b/src/ml_pipelines_kfp/schemas/iris_xgboost/vertex/instance.yaml index 668a3e5..bdb8d04 100644 --- a/src/ml_pipelines_kfp/schemas/iris_xgboost/vertex/instance.yaml +++ b/src/ml_pipelines_kfp/schemas/iris_xgboost/vertex/instance.yaml @@ -1,13 +1,13 @@ title: Instance type: object properties: - SepalLengthCm: + sepal_length_cm: type: number - SepalWidthCm: - type: integer - PetalLengthCm: + sepal_width_cm: type: number - PetalWidthCm: + petal_length_cm: type: number - Species: + petal_width_cm: + type: number + species: type: integer \ No newline at end of file From 1e35bf9c56ba22e3f66b1a51414d406c6ec5eb67 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:10:54 -0400 Subject: [PATCH 06/14] Add notebook to verify model feature names match feature platform Loads the latest registered model from GCS and checks that feature_names_in_ matches the canonical names from the feature store. Co-Authored-By: Claude Opus 4.6 --- .../notebooks/verify_model_features.ipynb | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb diff --git a/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb new file mode 100644 index 0000000..c116934 --- /dev/null +++ b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb @@ -0,0 +1,93 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0700e57c", + "source": "# Verify Model Feature Names\n\nLoad the registered model from GCS and inspect the feature names it was trained on.\nUse this to confirm the model expects canonical feature names (`sepal_length_cm`, etc.)\nafter retraining with the feature store pipeline.", + "metadata": {} + }, + { + "cell_type": "markdown", + "id": "31b8841c", + "source": "## Config", + "metadata": {} + }, + { + "cell_type": "code", + "id": "b4150bbb", + "source": "PROJECT = \"deeplearning-sahil\"\nREGION = \"us-central1\"\nMODEL_NAME = \"Iris-Classifier-XGBoost-staging\"", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "fb8aafcc", + "source": "## 1. Find the latest registered model and its artifact URI", + "metadata": {} + }, + { + "cell_type": "code", + "id": "b8214db9", + "source": "from google.cloud import aiplatform_v1\n\nclient = aiplatform_v1.ModelServiceClient(\n client_options={\"api_endpoint\": f\"{REGION}-aiplatform.googleapis.com\"}\n)\n\nparent_models = list(client.list_models(\n request={\n \"parent\": f\"projects/{PROJECT}/locations/{REGION}\",\n \"filter\": f\"display_name={MODEL_NAME}\",\n }\n))\nparent_models.sort(key=lambda m: m.create_time, reverse=True)\n\nmodel = parent_models[0]\nprint(f\"Model: {model.display_name}\")\nprint(f\"Created: {model.create_time}\")\nprint(f\"Artifact URI: {model.artifact_uri}\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "26d6407a", + "source": "## 2. Download and load model.joblib from GCS", + "metadata": {} + }, + { + "cell_type": "code", + "id": "2e668fc9", + "source": "import fsspec\nimport joblib\n\nmodel_uri = model.artifact_uri.rstrip(\"/\") + \"/model.joblib\"\nprint(f\"Downloading: {model_uri}\")\n\nfs, _ = fsspec.core.url_to_fs(model_uri)\nwith fs.open(model_uri, \"rb\") as f:\n xgb_model = joblib.load(f)\n\nprint(f\"Model type: {type(xgb_model).__name__}\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "0df7aeaf", + "source": "## 3. Inspect feature names the model was trained on", + "metadata": {} + }, + { + "cell_type": "code", + "id": "ca94ea8a", + "source": "print(\"Feature names from training (feature_names_in_):\")\nprint(list(xgb_model.feature_names_in_))\nprint(f\"\\nNumber of features: {xgb_model.n_features_in_}\")\n\nexpected = [\"sepal_length_cm\", \"sepal_width_cm\", \"petal_length_cm\", \"petal_width_cm\"]\nactual = list(xgb_model.feature_names_in_)\n\nif actual == expected:\n print(\"\\n✅ Feature names match canonical names from feature platform\")\nelse:\n print(f\"\\n❌ Feature name mismatch!\")\n print(f\" Expected: {expected}\")\n print(f\" Actual: {actual}\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "8fc0e3df", + "source": "## 4. Test prediction with canonical feature names", + "metadata": {} + }, + { + "cell_type": "code", + "id": "738b3459", + "source": "import pandas as pd\n\nsample = pd.DataFrame([{\n \"sepal_length_cm\": 5.1,\n \"sepal_width_cm\": 3.5,\n \"petal_length_cm\": 1.4,\n \"petal_width_cm\": 0.2,\n}])\n\nprediction = xgb_model.predict(sample)\nprobabilities = xgb_model.predict_proba(sample)\n\nspecies_map = {0: \"Iris-versicolor\", 1: \"Iris-virginica\", 2: \"Iris-setosa\"}\n\nprint(f\"Prediction: {int(prediction[0])} ({species_map.get(int(prediction[0]), 'unknown')})\")\nprint(f\"Probabilities: {probabilities[0].tolist()}\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file From 65c959e6527afb64d54eeb828f0e767ced23498b Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:16:42 -0400 Subject: [PATCH 07/14] Add script to clear pycache and reinstall package in editable mode Prevents stale bytecache or non-editable installs from causing KFP to serialize old component code into pipeline YAML. Co-Authored-By: Claude Opus 4.6 --- scripts/clean_reinstall.sh | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100755 scripts/clean_reinstall.sh diff --git a/scripts/clean_reinstall.sh b/scripts/clean_reinstall.sh new file mode 100755 index 0000000..424c675 --- /dev/null +++ b/scripts/clean_reinstall.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$REPO_ROOT" + +echo "==> Clearing __pycache__ directories..." +find . -name __pycache__ -exec rm -rf {} + 2>/dev/null || true +echo " Done." + +echo "==> Reinstalling package in editable mode..." +pip install -e . +echo " Done." + +echo "" +echo "Package is now installed from source. Any code changes take effect immediately." From eaf73751290bad41990bcca987a5af0faaa9cc5f Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:17:52 -0400 Subject: [PATCH 08/14] Extend Python version constraint to include 3.11 The upper bound <3.11 excluded the local Python 3.11.0, blocking editable installs. Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d4fbe80..7aa6d79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "ml_pipelines_kfp" version = "0.1.0" description = "Add your description here" -requires-python = ">=3.9,<3.11" +requires-python = ">=3.9,<3.12" dependencies = [ "fastapi>=0.111.1", "uvicorn>=0.30.1", From 19e1311f40dd43db2cdb94b2cb9b5115b4396fd5 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:18:41 -0400 Subject: [PATCH 09/14] cleanup --- pipeline-iris-pubsub-inference.yaml | 388 ------------------ .../pipelines/components/get_model.py | 2 - .../pipelines/components/inference.py | 2 - 3 files changed, 392 deletions(-) delete mode 100644 pipeline-iris-pubsub-inference.yaml diff --git a/pipeline-iris-pubsub-inference.yaml b/pipeline-iris-pubsub-inference.yaml deleted file mode 100644 index cf984c3..0000000 --- a/pipeline-iris-pubsub-inference.yaml +++ /dev/null @@ -1,388 +0,0 @@ -# PIPELINE DEFINITION -# Name: pipeline-iris-pubsub-inference -# Inputs: -# batch_size: int [Default: 100.0] -# bq_dataset: str -# bq_table_predictions: str -# location: str -# project_id: str -# pubsub_subscription: str [Default: 'iris-inference-data-sub'] -# pubsub_topic: str [Default: 'iris-inference-data'] -# timeout_seconds: int [Default: 300.0] -components: - comp-get-model: - executorLabel: exec-get-model - inputDefinitions: - parameters: - location: - parameterType: STRING - model_name: - parameterType: STRING - project_id: - parameterType: STRING - outputDefinitions: - artifacts: - latest_model: - artifactType: - schemaTitle: system.Model - schemaVersion: 0.0.1 - comp-inference-model: - executorLabel: exec-inference-model - inputDefinitions: - artifacts: - model: - artifactType: - schemaTitle: system.Model - schemaVersion: 0.0.1 - parameters: - bq_dataset: - parameterType: STRING - bq_table: - parameterType: STRING - bq_table_predictions: - parameterType: STRING - location: - parameterType: STRING - project_id: - parameterType: STRING - comp-pubsub-consumer-op: - executorLabel: exec-pubsub-consumer-op - inputDefinitions: - parameters: - batch_size: - parameterType: NUMBER_INTEGER - bq_dataset: - parameterType: STRING - bq_table: - parameterType: STRING - project_id: - parameterType: STRING - subscription_name: - parameterType: STRING - timeout_seconds: - parameterType: NUMBER_INTEGER - topic_name: - parameterType: STRING - outputDefinitions: - artifacts: - dataset: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 -defaultPipelineRoot: gs://sb-vertex/pipeline_root -deploymentSpec: - executors: - exec-get-model: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - get_model - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'google-cloud-aiplatform==1.64.0'\ - \ 'fsspec==2024.6.1' 'gcsfs==2024.6.1' 'joblib==1.4.2' && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef get_model(\n project_id: str,\n location: str,\n model_name:\ - \ str,\n latest_model: Output[Model],\n):\n from google.cloud import\ - \ aiplatform, aiplatform_v1\n import fsspec\n import gcsfs\n import\ - \ joblib\n\n aiplatform.init(project=project_id, location=location)\n\ - \n\n client = aiplatform_v1.ModelServiceClient(\n client_options={\"\ - api_endpoint\": f\"{location}-aiplatform.googleapis.com\"}\n )\n\n \ - \ request = {\n \"parent\": f\"projects/{project_id}/locations/{location}\"\ - ,\n \"filter\": f\"display_name={model_name}\"\n }\n parent_models\ - \ = list(client.list_models(request=request))\n parent_model = parent_models[0]\ - \ if parent_models else None\n\n if not parent_model:\n print(f\"\ - Could not find model with f{model_name}\")\n return\n\n print(f\"\ - Parent Model - {parent_model}\")\n print(f\"class - {type(parent_model)}\"\ - )\n print(f\"name - {parent_model.name}\")\n print(f\"model path-\ - \ {parent_model.artifact_uri}\")\n print(f\"output model path - {latest_model.path}\"\ - )\n latest_model_path = latest_model.path.replace(\"/gcs/\", \"gs://\"\ - )\n print(f\"output model path cleaned - {latest_model_path}\")\n \ - \ fs, _ = fsspec.core.url_to_fs(parent_model.artifact_uri)\n print(f\"\ - file system: {fs}\")\n fs.copy(parent_model.artifact_uri+\"/\", latest_model.path.replace(\"\ - /gcs/\", \"gs://\"), recursive=True)\n\n" - image: python:3.10 - exec-inference-model: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - inference_model - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'numpy==1.24.3'\ - \ 'pandas==2.0.3' 'scikit-learn==1.5.1' 'joblib==1.4.2' 'google-cloud-bigquery==3.11.4'\ - \ 'pyarrow==12.0.1' 'db-dtypes==1.1.1' && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef inference_model(\n project_id: str,\n location: str,\n\ - \ bq_dataset: str,\n bq_table: str,\n bq_table_predictions: str,\n\ - \ model: Input[Model],\n):\n import joblib\n import pandas as pd\n\ - \ import numpy as np\n from google.cloud import bigquery\n from\ - \ datetime import datetime\n\n client = bigquery.Client(project=project_id)\n\ - \n dataset_ref = bigquery.DatasetReference(project_id, bq_dataset)\n\ - \ table_ref = dataset_ref.table(bq_table)\n table = bigquery.Table(table_ref)\n\ - \ iterable_table = client.list_rows(table).to_dataframe_iterable()\n\n\ - \ dfs = []\n for row in iterable_table:\n dfs.append(row)\n\ - \n df = pd.concat(dfs, ignore_index=True)\n print(df.head())\n \ - \ print(df.columns)\n\n if bq_table == 'iris_pubsub_data':\n df_cols\ - \ = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']].rename(columns={'sepal_length':\ - \ 'SepalLengthCm', 'sepal_width': 'SepalWidthCm', 'petal_length': 'PetalLengthCm',\ - \ 'petal_width': 'PetalWidthCm'})\n else:\n df_cols = df[['SepalLengthCm',\ - \ 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']]\n\n print(f\"Model\ - \ Path: {model.path}\")\n inf_model = joblib.load(model.path+'/model.joblib')\n\ - \ inf_pred = inf_model.predict(df_cols)\n print(len(inf_pred))\n \ - \ print(inf_pred[:5])\n\n # Create predictions dataframe\n predictions_df\ - \ = df.copy()\n predictions_df['prediction'] = inf_pred\n predictions_df['prediction_timestamp']\ - \ = datetime.now()\n predictions_df['model_path'] = model.path\n print(len(predictions_df))\n\ - \ # Write predictions to BigQuery\n predictions_table_id = f\"{project_id}.{bq_dataset}.{bq_table_predictions}\"\ - \n print(predictions_table_id)\n\n # try:\n # client.get_table(table_ref)\n\ - \ # # Table exists, use WRITE_APPEND\n # write_disposition\ - \ = \"WRITE_APPEND\"\n # except:\n # # Table doesn't exist, use\ - \ WRITE_TRUNCATE to create it\n # write_disposition = \"WRITE_TRUNCATE\"\ - \n #print(write_disposition)\n\n job_config = bigquery.LoadJobConfig(\n\ - \ write_disposition=\"WRITE_TRUNCATE\"\n #schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],\n\ - \ )\n\n job = client.load_table_from_dataframe(\n predictions_df,\ - \ predictions_table_id, job_config=job_config\n )\n job.result() \ - \ # Wait for the job to complete\n\n print(f\"Loaded {len(predictions_df)}\ - \ rows to {predictions_table_id}\")\n\n" - image: python:3.10 - exec-pubsub-consumer-op: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - pubsub_consumer_op - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'google-cloud-pubsub==2.18.1'\ - \ 'google-cloud-bigquery==3.11.4' 'numpy==1.24.3' 'pandas==2.0.3' 'pyarrow==12.0.1'\ - \ 'google-auth==2.23.3' 'db-dtypes==1.1.1' && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef pubsub_consumer_op(\n project_id: str,\n topic_name: str,\n\ - \ subscription_name: str,\n bq_dataset: str,\n bq_table: str,\n\ - \ batch_size: int,\n timeout_seconds: int,\n dataset: dsl.Output[dsl.Dataset]\n\ - ):\n import json\n import time\n from datetime import datetime\n\ - \ import logging\n import pandas as pd\n from google.cloud import\ - \ pubsub_v1, bigquery\n from google.cloud.exceptions import NotFound\n\ - \ from concurrent.futures import ThreadPoolExecutor\n\n logging.basicConfig(level=logging.INFO)\n\ - \ logger = logging.getLogger(__name__)\n\n # Initialize clients with\ - \ explicit project ID\n subscriber = pubsub_v1.SubscriberClient()\n \ - \ bq_client = bigquery.Client(project=project_id)\n\n # Create subscription\ - \ path\n subscription_path = subscriber.subscription_path(project_id,\ - \ subscription_name)\n topic_path = subscriber.topic_path(project_id,\ - \ topic_name)\n\n # Create subscription if it doesn't exist\n try:\n\ - \ subscriber.get_subscription(request={\"subscription\": subscription_path})\n\ - \ logger.info(f\"Subscription {subscription_path} already exists\"\ - )\n except Exception:\n try:\n subscriber.create_subscription(\n\ - \ request={\n \"name\": subscription_path,\n\ - \ \"topic\": topic_path,\n \"ack_deadline_seconds\"\ - : 60\n }\n )\n logger.info(f\"Created\ - \ subscription {subscription_path}\")\n except Exception as e:\n\ - \ logger.error(f\"Failed to create subscription: {e}\")\n \ - \ raise\n\n # BigQuery setup\n table_id = f\"{project_id}.{bq_dataset}.{bq_table}\"\ - \n\n # Create BigQuery table if it doesn't exist\n schema = [\n \ - \ bigquery.SchemaField(\"sepal_length\", \"FLOAT\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"sepal_width\", \"FLOAT\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"petal_length\", \"FLOAT\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"petal_width\", \"FLOAT\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"timestamp\", \"TIMESTAMP\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"sample_id\", \"INTEGER\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"ingestion_time\", \"TIMESTAMP\", mode=\"\ - REQUIRED\"),\n bigquery.SchemaField(\"message_id\", \"STRING\", mode=\"\ - REQUIRED\")\n ]\n\n try:\n table = bq_client.get_table(table_id)\n\ - \ logger.info(f\"Table {table_id} already exists\")\n except NotFound:\n\ - \ table = bigquery.Table(table_id, schema=schema)\n table\ - \ = bq_client.create_table(table)\n logger.info(f\"Created table\ - \ {table_id}\")\n\n # Message processing\n consumed_data = []\n \ - \ start_time = time.time()\n\n def callback(message):\n \"\"\"\ - Process individual Pub/Sub message.\"\"\"\n try:\n # Parse\ - \ message data\n data = json.loads(message.data.decode('utf-8'))\n\ - \n # Keep snake_case column names to match table schema\n \ - \ transformed_data = {\n 'sepal_length': data.get('sepal_length'),\n\ - \ 'sepal_width': data.get('sepal_width'), \n \ - \ 'petal_length': data.get('petal_length'),\n 'petal_width':\ - \ data.get('petal_width'),\n 'timestamp': data.get('timestamp'),\n\ - \ 'sample_id': data.get('sample_id'),\n 'ingestion_time':\ - \ datetime.utcnow().isoformat(),\n 'message_id': message.message_id\n\ - \ }\n\n consumed_data.append(transformed_data)\n\n\ - \ logger.info(f\"Consumed message: {data['sample_id']} (ID: {message.message_id})\"\ - )\n\n # Acknowledge the message\n message.ack()\n\n\ - \ except Exception as e:\n logger.error(f\"Error processing\ - \ message: {e}\")\n message.nack()\n\n # Configure flow control\n\ - \ flow_control = pubsub_v1.types.FlowControl(max_messages=batch_size\ - \ * 2)\n\n logger.info(f\"Starting Pub/Sub consumer for topic: {topic_name}\"\ - )\n\n try:\n # Start pulling messages\n streaming_pull_future\ - \ = subscriber.subscribe(\n subscription_path,\n callback=callback,\n\ - \ flow_control=flow_control\n )\n\n logger.info(f\"\ - Listening for messages on {subscription_path}...\")\n\n # Process\ - \ messages until batch size or timeout\n while len(consumed_data)\ - \ < batch_size and (time.time() - start_time) < timeout_seconds:\n \ - \ time.sleep(1) # Check every second\n\n # Cancel the subscriber\n\ - \ streaming_pull_future.cancel()\n\n # Process collected data\n\ - \ if consumed_data:\n df = pd.DataFrame(consumed_data)\n\ - \ df['timestamp'] = pd.to_datetime(df['timestamp'])\n \ - \ df['ingestion_time'] = pd.to_datetime(df['ingestion_time'])\n\n \ - \ # Load to BigQuery\n job_config = bigquery.LoadJobConfig(\n\ - \ write_disposition=\"WRITE_APPEND\",\n schema=schema\n\ - \ )\n\n job = bq_client.load_table_from_dataframe(\n\ - \ df, table_id, job_config=job_config\n )\n \ - \ job.result()\n\n logger.info(f\"Loaded {len(consumed_data)}\ - \ records to BigQuery\")\n else:\n logger.warning(\"No\ - \ messages received within timeout period\")\n\n except Exception as\ - \ e:\n logger.error(f\"Error in Pub/Sub consumer: {e}\")\n \ - \ raise\n\n finally:\n logger.info(\"Pub/Sub consumer finished\"\ - )\n\n # Set output dataset metadata\n dataset.uri = f\"bq://{table_id}\"\ - \n dataset.metadata = {\n \"total_records\": len(consumed_data),\n\ - \ \"table_id\": table_id,\n \"topic\": topic_name,\n \ - \ \"subscription\": subscription_name\n }\n\n" - image: python:3.10-slim -pipelineInfo: - name: pipeline-iris-pubsub-inference -root: - dag: - tasks: - get-model: - cachingOptions: - enableCache: true - componentRef: - name: comp-get-model - inputs: - parameters: - location: - componentInputParameter: location - model_name: - runtimeValue: - constant: Iris-Classifier-XGBoost - project_id: - componentInputParameter: project_id - taskInfo: - name: Get Model - inference-model: - cachingOptions: - enableCache: true - componentRef: - name: comp-inference-model - dependentTasks: - - get-model - - pubsub-consumer-op - inputs: - artifacts: - model: - taskOutputArtifact: - outputArtifactKey: latest_model - producerTask: get-model - parameters: - bq_dataset: - componentInputParameter: bq_dataset - bq_table: - runtimeValue: - constant: iris_pubsub_data - bq_table_predictions: - componentInputParameter: bq_table_predictions - location: - componentInputParameter: location - project_id: - componentInputParameter: project_id - taskInfo: - name: Inference on Pub/Sub Data - pubsub-consumer-op: - cachingOptions: - enableCache: true - componentRef: - name: comp-pubsub-consumer-op - inputs: - parameters: - batch_size: - componentInputParameter: batch_size - bq_dataset: - componentInputParameter: bq_dataset - bq_table: - runtimeValue: - constant: iris_pubsub_data - project_id: - componentInputParameter: project_id - subscription_name: - componentInputParameter: pubsub_subscription - timeout_seconds: - componentInputParameter: timeout_seconds - topic_name: - componentInputParameter: pubsub_topic - taskInfo: - name: Consume Pub/Sub Data - inputDefinitions: - parameters: - batch_size: - defaultValue: 100.0 - isOptional: true - parameterType: NUMBER_INTEGER - bq_dataset: - parameterType: STRING - bq_table_predictions: - parameterType: STRING - location: - parameterType: STRING - project_id: - parameterType: STRING - pubsub_subscription: - defaultValue: iris-inference-data-sub - isOptional: true - parameterType: STRING - pubsub_topic: - defaultValue: iris-inference-data - isOptional: true - parameterType: STRING - timeout_seconds: - defaultValue: 300.0 - isOptional: true - parameterType: NUMBER_INTEGER -schemaVersion: 2.1.0 -sdkVersion: kfp-2.13.0 diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py index 8e364aa..13fec96 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py @@ -11,8 +11,6 @@ def get_model( ): from google.cloud import aiplatform, aiplatform_v1 import fsspec - import gcsfs - import joblib from ml_pipelines_kfp.log import get_logger logger = get_logger(__name__) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py index 33fbc37..449e42f 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py @@ -12,8 +12,6 @@ def inference_model( model: Input[Model], ): import joblib - import pandas as pd - import numpy as np from google.cloud import bigquery from datetime import datetime from ml_pipelines_kfp.log import get_logger From 40d8f56eed26e2b5ea09884c3a5d303f3e9f8593 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:22:15 -0400 Subject: [PATCH 10/14] Use uv for faster editable install in clean_reinstall.sh Co-Authored-By: Claude Opus 4.6 --- scripts/clean_reinstall.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/clean_reinstall.sh b/scripts/clean_reinstall.sh index 424c675..747addc 100755 --- a/scripts/clean_reinstall.sh +++ b/scripts/clean_reinstall.sh @@ -9,7 +9,7 @@ find . -name __pycache__ -exec rm -rf {} + 2>/dev/null || true echo " Done." echo "==> Reinstalling package in editable mode..." -pip install -e . +uv pip install -e . echo " Done." echo "" From 449fa7aabff2927e3427c96d661241e001947318 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:45:23 -0400 Subject: [PATCH 11/14] Fix get_model to fetch latest version, not just parent model list_models returns parent model entries, not versions. Use list_model_versions to get all versions of the model, then sort by create_time to pick the latest one. Co-Authored-By: Claude Opus 4.6 --- .../pipelines/components/get_model.py | 19 +-- .../notebooks/verify_model_features.ipynb | 148 ++++++++++++++---- 2 files changed, 125 insertions(+), 42 deletions(-) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py index 13fec96..8173829 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py @@ -21,24 +21,25 @@ def get_model( client_options={"api_endpoint": f"{location}-aiplatform.googleapis.com"} ) - request = { + parent_models = list(client.list_models(request={ "parent": f"projects/{project_id}/locations/{location}", "filter": f"display_name={model_name}", - } - parent_models = list(client.list_models(request=request)) - parent_models.sort(key=lambda m: m.create_time, reverse=True) - parent_model = parent_models[0] if parent_models else None + })) - if not parent_model: + if not parent_models: logger.error(f"Could not find model: {model_name}") return - logger.info(f"Found model: {parent_model.name}, artifact_uri: {parent_model.artifact_uri}") + versions = list(client.list_model_versions(name=parent_models[0].name)) + versions.sort(key=lambda v: v.create_time, reverse=True) + latest = versions[0] + + logger.info(f"Found model: {latest.name}, version: {latest.version_id}, artifact_uri: {latest.artifact_uri}") latest_model_path = latest_model.path.replace("/gcs/", "gs://") - fs, _ = fsspec.core.url_to_fs(parent_model.artifact_uri) + fs, _ = fsspec.core.url_to_fs(latest.artifact_uri) fs.copy( - parent_model.artifact_uri + "/", + latest.artifact_uri + "/", latest_model_path, recursive=True, ) diff --git a/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb index c116934..709d3a3 100644 --- a/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb +++ b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb @@ -3,88 +3,170 @@ { "cell_type": "markdown", "id": "0700e57c", - "source": "# Verify Model Feature Names\n\nLoad the registered model from GCS and inspect the feature names it was trained on.\nUse this to confirm the model expects canonical feature names (`sepal_length_cm`, etc.)\nafter retraining with the feature store pipeline.", - "metadata": {} + "metadata": {}, + "source": [ + "# Verify Model Feature Names\n", + "\n", + "Load the registered model from GCS and inspect the feature names it was trained on.\n", + "Use this to confirm the model expects canonical feature names (`sepal_length_cm`, etc.)\n", + "after retraining with the feature store pipeline." + ] }, { "cell_type": "markdown", "id": "31b8841c", - "source": "## Config", - "metadata": {} + "metadata": {}, + "source": [ + "## Config" + ] }, { "cell_type": "code", + "execution_count": 9, "id": "b4150bbb", - "source": "PROJECT = \"deeplearning-sahil\"\nREGION = \"us-central1\"\nMODEL_NAME = \"Iris-Classifier-XGBoost-staging\"", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "PROJECT = \"deeplearning-sahil\"\n", + "REGION = \"us-central1\"\n", + "MODEL_NAME = \"Iris-Classifier-XGBoost-staging\"" + ] }, { "cell_type": "markdown", "id": "fb8aafcc", - "source": "## 1. Find the latest registered model and its artifact URI", - "metadata": {} + "metadata": {}, + "source": [ + "## 1. Find the latest registered model and its artifact URI" + ] }, { "cell_type": "code", + "execution_count": null, "id": "b8214db9", - "source": "from google.cloud import aiplatform_v1\n\nclient = aiplatform_v1.ModelServiceClient(\n client_options={\"api_endpoint\": f\"{REGION}-aiplatform.googleapis.com\"}\n)\n\nparent_models = list(client.list_models(\n request={\n \"parent\": f\"projects/{PROJECT}/locations/{REGION}\",\n \"filter\": f\"display_name={MODEL_NAME}\",\n }\n))\nparent_models.sort(key=lambda m: m.create_time, reverse=True)\n\nmodel = parent_models[0]\nprint(f\"Model: {model.display_name}\")\nprint(f\"Created: {model.create_time}\")\nprint(f\"Artifact URI: {model.artifact_uri}\")", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": "from google.cloud import aiplatform_v1\n\nclient = aiplatform_v1.ModelServiceClient(\n client_options={\"api_endpoint\": f\"{REGION}-aiplatform.googleapis.com\"}\n)\n\nparent_models = list(client.list_models(\n request={\n \"parent\": f\"projects/{PROJECT}/locations/{REGION}\",\n \"filter\": f\"display_name={MODEL_NAME}\",\n }\n))\n\nversions = list(client.list_model_versions(name=parent_models[0].name))\nversions.sort(key=lambda v: v.create_time, reverse=True)\nmodel = versions[0]\n\nprint(f\"Model: {model.display_name}\")\nprint(f\"Version: {model.version_id}\")\nprint(f\"Created: {model.create_time}\")\nprint(f\"Artifact URI: {model.artifact_uri}\")" }, { "cell_type": "markdown", "id": "26d6407a", - "source": "## 2. Download and load model.joblib from GCS", - "metadata": {} + "metadata": {}, + "source": [ + "## 2. Download and load model.joblib from GCS" + ] }, { "cell_type": "code", + "execution_count": 7, "id": "2e668fc9", - "source": "import fsspec\nimport joblib\n\nmodel_uri = model.artifact_uri.rstrip(\"/\") + \"/model.joblib\"\nprint(f\"Downloading: {model_uri}\")\n\nfs, _ = fsspec.core.url_to_fs(model_uri)\nwith fs.open(model_uri, \"rb\") as f:\n xgb_model = joblib.load(f)\n\nprint(f\"Model type: {type(xgb_model).__name__}\")", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Downloading: gs://sb-vertex/staging/pipeline_root/57434141298/pipeline-iris-staging-20260613161458/choose-best-model_-2434796601957416960/best_model/model.joblib\n", + "Model type: RandomForestClassifier\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/shlba/Desktop/Docs/Study/code/Machine-learning-Ops-Deployment-Inference/.venv/lib/python3.10/site-packages/sklearn/base.py:380: InconsistentVersionWarning: Trying to unpickle estimator DecisionTreeClassifier from version 1.7.2 when using version 1.6.1. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:\n", + "https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations\n", + " warnings.warn(\n", + "/Users/shlba/Desktop/Docs/Study/code/Machine-learning-Ops-Deployment-Inference/.venv/lib/python3.10/site-packages/sklearn/base.py:380: InconsistentVersionWarning: Trying to unpickle estimator RandomForestClassifier from version 1.7.2 when using version 1.6.1. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:\n", + "https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "import fsspec\n", + "import joblib\n", + "\n", + "model_uri = model.artifact_uri.rstrip(\"/\") + \"/model.joblib\"\n", + "print(f\"Downloading: {model_uri}\")\n", + "\n", + "fs, _ = fsspec.core.url_to_fs(model_uri)\n", + "with fs.open(model_uri, \"rb\") as f:\n", + " xgb_model = joblib.load(f)\n", + "\n", + "print(f\"Model type: {type(xgb_model).__name__}\")" + ] }, { "cell_type": "markdown", "id": "0df7aeaf", - "source": "## 3. Inspect feature names the model was trained on", - "metadata": {} + "metadata": {}, + "source": [ + "## 3. Inspect feature names the model was trained on" + ] }, { "cell_type": "code", + "execution_count": 8, "id": "ca94ea8a", - "source": "print(\"Feature names from training (feature_names_in_):\")\nprint(list(xgb_model.feature_names_in_))\nprint(f\"\\nNumber of features: {xgb_model.n_features_in_}\")\n\nexpected = [\"sepal_length_cm\", \"sepal_width_cm\", \"petal_length_cm\", \"petal_width_cm\"]\nactual = list(xgb_model.feature_names_in_)\n\nif actual == expected:\n print(\"\\n✅ Feature names match canonical names from feature platform\")\nelse:\n print(f\"\\n❌ Feature name mismatch!\")\n print(f\" Expected: {expected}\")\n print(f\" Actual: {actual}\")", "metadata": {}, - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "id": "8fc0e3df", - "source": "## 4. Test prediction with canonical feature names", - "metadata": {} + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Feature names from training (feature_names_in_):\n", + "['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']\n", + "\n", + "Number of features: 4\n", + "\n", + "❌ Feature name mismatch!\n", + " Expected: ['sepal_length_cm', 'sepal_width_cm', 'petal_length_cm', 'petal_width_cm']\n", + " Actual: ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']\n" + ] + } + ], + "source": [ + "print(\"Feature names from training (feature_names_in_):\")\n", + "print(list(xgb_model.feature_names_in_))\n", + "print(f\"\\nNumber of features: {xgb_model.n_features_in_}\")\n", + "\n", + "expected = [\"sepal_length_cm\", \"sepal_width_cm\", \"petal_length_cm\", \"petal_width_cm\"]\n", + "actual = list(xgb_model.feature_names_in_)\n", + "\n", + "if actual == expected:\n", + " print(\"\\n✅ Feature names match canonical names from feature platform\")\n", + "else:\n", + " print(f\"\\n❌ Feature name mismatch!\")\n", + " print(f\" Expected: {expected}\")\n", + " print(f\" Actual: {actual}\")" + ] }, { "cell_type": "code", - "id": "738b3459", - "source": "import pandas as pd\n\nsample = pd.DataFrame([{\n \"sepal_length_cm\": 5.1,\n \"sepal_width_cm\": 3.5,\n \"petal_length_cm\": 1.4,\n \"petal_width_cm\": 0.2,\n}])\n\nprediction = xgb_model.predict(sample)\nprobabilities = xgb_model.predict_proba(sample)\n\nspecies_map = {0: \"Iris-versicolor\", 1: \"Iris-virginica\", 2: \"Iris-setosa\"}\n\nprint(f\"Prediction: {int(prediction[0])} ({species_map.get(int(prediction[0]), 'unknown')})\")\nprint(f\"Probabilities: {probabilities[0].tolist()}\")", - "metadata": {}, "execution_count": null, - "outputs": [] + "id": "09405888", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "ml-pipelines-kfp (3.10.0)", "language": "python", "name": "python3" }, "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", "version": "3.10.0" } }, From f083ef7a58de6b4b8b088f0192b469d128139814 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:46:15 -0400 Subject: [PATCH 12/14] Use blessed alias to fetch model instead of sorting by create_time register.py already sets version_aliases=['blessed'] on each uploaded model. Use get_model(name + '@blessed') to directly fetch the blessed version instead of listing all versions and sorting. Co-Authored-By: Claude Opus 4.6 --- .../iris_xgboost/pipelines/components/get_model.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py index 8173829..7fec98f 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py @@ -30,16 +30,14 @@ def get_model( logger.error(f"Could not find model: {model_name}") return - versions = list(client.list_model_versions(name=parent_models[0].name)) - versions.sort(key=lambda v: v.create_time, reverse=True) - latest = versions[0] + blessed = client.get_model(name=parent_models[0].name + "@blessed") - logger.info(f"Found model: {latest.name}, version: {latest.version_id}, artifact_uri: {latest.artifact_uri}") + logger.info(f"Found blessed model: {blessed.name}, version: {blessed.version_id}, artifact_uri: {blessed.artifact_uri}") latest_model_path = latest_model.path.replace("/gcs/", "gs://") - fs, _ = fsspec.core.url_to_fs(latest.artifact_uri) + fs, _ = fsspec.core.url_to_fs(blessed.artifact_uri) fs.copy( - latest.artifact_uri + "/", + blessed.artifact_uri + "/", latest_model_path, recursive=True, ) From 20b36b079ea84c2b1c80369324f502cdcb56ba14 Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:47:25 -0400 Subject: [PATCH 13/14] Update notebook to use blessed alias for model lookup Co-Authored-By: Claude Opus 4.6 --- .../notebooks/verify_model_features.ipynb | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb index 709d3a3..65a7298 100644 --- a/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb +++ b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb @@ -36,9 +36,7 @@ "cell_type": "markdown", "id": "fb8aafcc", "metadata": {}, - "source": [ - "## 1. Find the latest registered model and its artifact URI" - ] + "source": "## 1. Find the blessed model and its artifact URI" }, { "cell_type": "code", @@ -46,7 +44,7 @@ "id": "b8214db9", "metadata": {}, "outputs": [], - "source": "from google.cloud import aiplatform_v1\n\nclient = aiplatform_v1.ModelServiceClient(\n client_options={\"api_endpoint\": f\"{REGION}-aiplatform.googleapis.com\"}\n)\n\nparent_models = list(client.list_models(\n request={\n \"parent\": f\"projects/{PROJECT}/locations/{REGION}\",\n \"filter\": f\"display_name={MODEL_NAME}\",\n }\n))\n\nversions = list(client.list_model_versions(name=parent_models[0].name))\nversions.sort(key=lambda v: v.create_time, reverse=True)\nmodel = versions[0]\n\nprint(f\"Model: {model.display_name}\")\nprint(f\"Version: {model.version_id}\")\nprint(f\"Created: {model.create_time}\")\nprint(f\"Artifact URI: {model.artifact_uri}\")" + "source": "from google.cloud import aiplatform_v1\n\nclient = aiplatform_v1.ModelServiceClient(\n client_options={\"api_endpoint\": f\"{REGION}-aiplatform.googleapis.com\"}\n)\n\nparent_models = list(client.list_models(\n request={\n \"parent\": f\"projects/{PROJECT}/locations/{REGION}\",\n \"filter\": f\"display_name={MODEL_NAME}\",\n }\n))\n\nmodel = client.get_model(name=parent_models[0].name + \"@blessed\")\n\nprint(f\"Model: {model.display_name}\")\nprint(f\"Version: {model.version_id}\")\nprint(f\"Aliases: {list(model.version_aliases)}\")\nprint(f\"Created: {model.create_time}\")\nprint(f\"Artifact URI: {model.artifact_uri}\")" }, { "cell_type": "markdown", @@ -141,14 +139,6 @@ " print(f\" Expected: {expected}\")\n", " print(f\" Actual: {actual}\")" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "09405888", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { From 1867085e5ddfb253ff2a0bf5be32dbb59664bf9b Mon Sep 17 00:00:00 2001 From: Sahil Batra Date: Tue, 16 Jun 2026 11:48:34 -0400 Subject: [PATCH 14/14] update verify feature notebook --- .../notebooks/verify_model_features.ipynb | 67 +++++++++++++++---- 1 file changed, 54 insertions(+), 13 deletions(-) diff --git a/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb index 65a7298..eb257fb 100644 --- a/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb +++ b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb @@ -22,7 +22,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 19, "id": "b4150bbb", "metadata": {}, "outputs": [], @@ -36,15 +36,50 @@ "cell_type": "markdown", "id": "fb8aafcc", "metadata": {}, - "source": "## 1. Find the blessed model and its artifact URI" + "source": [ + "## 1. Find the blessed model and its artifact URI" + ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 20, "id": "b8214db9", "metadata": {}, - "outputs": [], - "source": "from google.cloud import aiplatform_v1\n\nclient = aiplatform_v1.ModelServiceClient(\n client_options={\"api_endpoint\": f\"{REGION}-aiplatform.googleapis.com\"}\n)\n\nparent_models = list(client.list_models(\n request={\n \"parent\": f\"projects/{PROJECT}/locations/{REGION}\",\n \"filter\": f\"display_name={MODEL_NAME}\",\n }\n))\n\nmodel = client.get_model(name=parent_models[0].name + \"@blessed\")\n\nprint(f\"Model: {model.display_name}\")\nprint(f\"Version: {model.version_id}\")\nprint(f\"Aliases: {list(model.version_aliases)}\")\nprint(f\"Created: {model.create_time}\")\nprint(f\"Artifact URI: {model.artifact_uri}\")" + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Model: Iris-Classifier-XGBoost-staging\n", + "Version: 6\n", + "Aliases: ['blessed']\n", + "Created: 2026-06-13 20:21:22.897585+00:00\n", + "Artifact URI: gs://sb-vertex/staging/pipeline_root/57434141298/pipeline-iris-staging-training-20260616112330/choose-best-model_-2956042079337185280/best_model\n" + ] + } + ], + "source": [ + "from google.cloud import aiplatform_v1\n", + "\n", + "client = aiplatform_v1.ModelServiceClient(\n", + " client_options={\"api_endpoint\": f\"{REGION}-aiplatform.googleapis.com\"}\n", + ")\n", + "\n", + "parent_models = list(client.list_models(\n", + " request={\n", + " \"parent\": f\"projects/{PROJECT}/locations/{REGION}\",\n", + " \"filter\": f\"display_name={MODEL_NAME}\",\n", + " }\n", + "))\n", + "\n", + "model = client.get_model(name=parent_models[0].name + \"@blessed\")\n", + "\n", + "print(f\"Model: {model.display_name}\")\n", + "print(f\"Version: {model.version_id}\")\n", + "print(f\"Aliases: {list(model.version_aliases)}\")\n", + "print(f\"Created: {model.create_time}\")\n", + "print(f\"Artifact URI: {model.artifact_uri}\")" + ] }, { "cell_type": "markdown", @@ -56,7 +91,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 21, "id": "2e668fc9", "metadata": {}, "outputs": [ @@ -64,7 +99,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Downloading: gs://sb-vertex/staging/pipeline_root/57434141298/pipeline-iris-staging-20260613161458/choose-best-model_-2434796601957416960/best_model/model.joblib\n", + "Downloading: gs://sb-vertex/staging/pipeline_root/57434141298/pipeline-iris-staging-training-20260616112330/choose-best-model_-2956042079337185280/best_model/model.joblib\n", "Model type: RandomForestClassifier\n" ] }, @@ -105,7 +140,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 22, "id": "ca94ea8a", "metadata": {}, "outputs": [ @@ -114,13 +149,11 @@ "output_type": "stream", "text": [ "Feature names from training (feature_names_in_):\n", - "['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']\n", + "['sepal_length_cm', 'sepal_width_cm', 'petal_length_cm', 'petal_width_cm']\n", "\n", "Number of features: 4\n", "\n", - "❌ Feature name mismatch!\n", - " Expected: ['sepal_length_cm', 'sepal_width_cm', 'petal_length_cm', 'petal_width_cm']\n", - " Actual: ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']\n" + "✅ Feature names match canonical names from feature platform\n" ] } ], @@ -139,6 +172,14 @@ " print(f\" Expected: {expected}\")\n", " print(f\" Actual: {actual}\")" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a4016ce2", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { @@ -162,4 +203,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +}