diff --git a/pipeline-iris-pubsub-inference.yaml b/pipeline-iris-pubsub-inference.yaml deleted file mode 100644 index cf984c3..0000000 --- a/pipeline-iris-pubsub-inference.yaml +++ /dev/null @@ -1,388 +0,0 @@ -# PIPELINE DEFINITION -# Name: pipeline-iris-pubsub-inference -# Inputs: -# batch_size: int [Default: 100.0] -# bq_dataset: str -# bq_table_predictions: str -# location: str -# project_id: str -# pubsub_subscription: str [Default: 'iris-inference-data-sub'] -# pubsub_topic: str [Default: 'iris-inference-data'] -# timeout_seconds: int [Default: 300.0] -components: - comp-get-model: - executorLabel: exec-get-model - inputDefinitions: - parameters: - location: - parameterType: STRING - model_name: - parameterType: STRING - project_id: - parameterType: STRING - outputDefinitions: - artifacts: - latest_model: - artifactType: - schemaTitle: system.Model - schemaVersion: 0.0.1 - comp-inference-model: - executorLabel: exec-inference-model - inputDefinitions: - artifacts: - model: - artifactType: - schemaTitle: system.Model - schemaVersion: 0.0.1 - parameters: - bq_dataset: - parameterType: STRING - bq_table: - parameterType: STRING - bq_table_predictions: - parameterType: STRING - location: - parameterType: STRING - project_id: - parameterType: STRING - comp-pubsub-consumer-op: - executorLabel: exec-pubsub-consumer-op - inputDefinitions: - parameters: - batch_size: - parameterType: NUMBER_INTEGER - bq_dataset: - parameterType: STRING - bq_table: - parameterType: STRING - project_id: - parameterType: STRING - subscription_name: - parameterType: STRING - timeout_seconds: - parameterType: NUMBER_INTEGER - topic_name: - parameterType: STRING - outputDefinitions: - artifacts: - dataset: - artifactType: - schemaTitle: system.Dataset - schemaVersion: 0.0.1 -defaultPipelineRoot: gs://sb-vertex/pipeline_root -deploymentSpec: - executors: - exec-get-model: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - get_model - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'google-cloud-aiplatform==1.64.0'\ - \ 'fsspec==2024.6.1' 'gcsfs==2024.6.1' 'joblib==1.4.2' && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef get_model(\n project_id: str,\n location: str,\n model_name:\ - \ str,\n latest_model: Output[Model],\n):\n from google.cloud import\ - \ aiplatform, aiplatform_v1\n import fsspec\n import gcsfs\n import\ - \ joblib\n\n aiplatform.init(project=project_id, location=location)\n\ - \n\n client = aiplatform_v1.ModelServiceClient(\n client_options={\"\ - api_endpoint\": f\"{location}-aiplatform.googleapis.com\"}\n )\n\n \ - \ request = {\n \"parent\": f\"projects/{project_id}/locations/{location}\"\ - ,\n \"filter\": f\"display_name={model_name}\"\n }\n parent_models\ - \ = list(client.list_models(request=request))\n parent_model = parent_models[0]\ - \ if parent_models else None\n\n if not parent_model:\n print(f\"\ - Could not find model with f{model_name}\")\n return\n\n print(f\"\ - Parent Model - {parent_model}\")\n print(f\"class - {type(parent_model)}\"\ - )\n print(f\"name - {parent_model.name}\")\n print(f\"model path-\ - \ {parent_model.artifact_uri}\")\n print(f\"output model path - {latest_model.path}\"\ - )\n latest_model_path = latest_model.path.replace(\"/gcs/\", \"gs://\"\ - )\n print(f\"output model path cleaned - {latest_model_path}\")\n \ - \ fs, _ = fsspec.core.url_to_fs(parent_model.artifact_uri)\n print(f\"\ - file system: {fs}\")\n fs.copy(parent_model.artifact_uri+\"/\", latest_model.path.replace(\"\ - /gcs/\", \"gs://\"), recursive=True)\n\n" - image: python:3.10 - exec-inference-model: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - inference_model - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'numpy==1.24.3'\ - \ 'pandas==2.0.3' 'scikit-learn==1.5.1' 'joblib==1.4.2' 'google-cloud-bigquery==3.11.4'\ - \ 'pyarrow==12.0.1' 'db-dtypes==1.1.1' && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef inference_model(\n project_id: str,\n location: str,\n\ - \ bq_dataset: str,\n bq_table: str,\n bq_table_predictions: str,\n\ - \ model: Input[Model],\n):\n import joblib\n import pandas as pd\n\ - \ import numpy as np\n from google.cloud import bigquery\n from\ - \ datetime import datetime\n\n client = bigquery.Client(project=project_id)\n\ - \n dataset_ref = bigquery.DatasetReference(project_id, bq_dataset)\n\ - \ table_ref = dataset_ref.table(bq_table)\n table = bigquery.Table(table_ref)\n\ - \ iterable_table = client.list_rows(table).to_dataframe_iterable()\n\n\ - \ dfs = []\n for row in iterable_table:\n dfs.append(row)\n\ - \n df = pd.concat(dfs, ignore_index=True)\n print(df.head())\n \ - \ print(df.columns)\n\n if bq_table == 'iris_pubsub_data':\n df_cols\ - \ = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']].rename(columns={'sepal_length':\ - \ 'SepalLengthCm', 'sepal_width': 'SepalWidthCm', 'petal_length': 'PetalLengthCm',\ - \ 'petal_width': 'PetalWidthCm'})\n else:\n df_cols = df[['SepalLengthCm',\ - \ 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']]\n\n print(f\"Model\ - \ Path: {model.path}\")\n inf_model = joblib.load(model.path+'/model.joblib')\n\ - \ inf_pred = inf_model.predict(df_cols)\n print(len(inf_pred))\n \ - \ print(inf_pred[:5])\n\n # Create predictions dataframe\n predictions_df\ - \ = df.copy()\n predictions_df['prediction'] = inf_pred\n predictions_df['prediction_timestamp']\ - \ = datetime.now()\n predictions_df['model_path'] = model.path\n print(len(predictions_df))\n\ - \ # Write predictions to BigQuery\n predictions_table_id = f\"{project_id}.{bq_dataset}.{bq_table_predictions}\"\ - \n print(predictions_table_id)\n\n # try:\n # client.get_table(table_ref)\n\ - \ # # Table exists, use WRITE_APPEND\n # write_disposition\ - \ = \"WRITE_APPEND\"\n # except:\n # # Table doesn't exist, use\ - \ WRITE_TRUNCATE to create it\n # write_disposition = \"WRITE_TRUNCATE\"\ - \n #print(write_disposition)\n\n job_config = bigquery.LoadJobConfig(\n\ - \ write_disposition=\"WRITE_TRUNCATE\"\n #schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],\n\ - \ )\n\n job = client.load_table_from_dataframe(\n predictions_df,\ - \ predictions_table_id, job_config=job_config\n )\n job.result() \ - \ # Wait for the job to complete\n\n print(f\"Loaded {len(predictions_df)}\ - \ rows to {predictions_table_id}\")\n\n" - image: python:3.10 - exec-pubsub-consumer-op: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - pubsub_consumer_op - command: - - sh - - -c - - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ - \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.13.0'\ - \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ - \ python3 -m pip install --quiet --no-warn-script-location 'google-cloud-pubsub==2.18.1'\ - \ 'google-cloud-bigquery==3.11.4' 'numpy==1.24.3' 'pandas==2.0.3' 'pyarrow==12.0.1'\ - \ 'google-auth==2.23.3' 'db-dtypes==1.1.1' && \"$0\" \"$@\"\n" - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef pubsub_consumer_op(\n project_id: str,\n topic_name: str,\n\ - \ subscription_name: str,\n bq_dataset: str,\n bq_table: str,\n\ - \ batch_size: int,\n timeout_seconds: int,\n dataset: dsl.Output[dsl.Dataset]\n\ - ):\n import json\n import time\n from datetime import datetime\n\ - \ import logging\n import pandas as pd\n from google.cloud import\ - \ pubsub_v1, bigquery\n from google.cloud.exceptions import NotFound\n\ - \ from concurrent.futures import ThreadPoolExecutor\n\n logging.basicConfig(level=logging.INFO)\n\ - \ logger = logging.getLogger(__name__)\n\n # Initialize clients with\ - \ explicit project ID\n subscriber = pubsub_v1.SubscriberClient()\n \ - \ bq_client = bigquery.Client(project=project_id)\n\n # Create subscription\ - \ path\n subscription_path = subscriber.subscription_path(project_id,\ - \ subscription_name)\n topic_path = subscriber.topic_path(project_id,\ - \ topic_name)\n\n # Create subscription if it doesn't exist\n try:\n\ - \ subscriber.get_subscription(request={\"subscription\": subscription_path})\n\ - \ logger.info(f\"Subscription {subscription_path} already exists\"\ - )\n except Exception:\n try:\n subscriber.create_subscription(\n\ - \ request={\n \"name\": subscription_path,\n\ - \ \"topic\": topic_path,\n \"ack_deadline_seconds\"\ - : 60\n }\n )\n logger.info(f\"Created\ - \ subscription {subscription_path}\")\n except Exception as e:\n\ - \ logger.error(f\"Failed to create subscription: {e}\")\n \ - \ raise\n\n # BigQuery setup\n table_id = f\"{project_id}.{bq_dataset}.{bq_table}\"\ - \n\n # Create BigQuery table if it doesn't exist\n schema = [\n \ - \ bigquery.SchemaField(\"sepal_length\", \"FLOAT\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"sepal_width\", \"FLOAT\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"petal_length\", \"FLOAT\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"petal_width\", \"FLOAT\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"timestamp\", \"TIMESTAMP\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"sample_id\", \"INTEGER\", mode=\"REQUIRED\"\ - ),\n bigquery.SchemaField(\"ingestion_time\", \"TIMESTAMP\", mode=\"\ - REQUIRED\"),\n bigquery.SchemaField(\"message_id\", \"STRING\", mode=\"\ - REQUIRED\")\n ]\n\n try:\n table = bq_client.get_table(table_id)\n\ - \ logger.info(f\"Table {table_id} already exists\")\n except NotFound:\n\ - \ table = bigquery.Table(table_id, schema=schema)\n table\ - \ = bq_client.create_table(table)\n logger.info(f\"Created table\ - \ {table_id}\")\n\n # Message processing\n consumed_data = []\n \ - \ start_time = time.time()\n\n def callback(message):\n \"\"\"\ - Process individual Pub/Sub message.\"\"\"\n try:\n # Parse\ - \ message data\n data = json.loads(message.data.decode('utf-8'))\n\ - \n # Keep snake_case column names to match table schema\n \ - \ transformed_data = {\n 'sepal_length': data.get('sepal_length'),\n\ - \ 'sepal_width': data.get('sepal_width'), \n \ - \ 'petal_length': data.get('petal_length'),\n 'petal_width':\ - \ data.get('petal_width'),\n 'timestamp': data.get('timestamp'),\n\ - \ 'sample_id': data.get('sample_id'),\n 'ingestion_time':\ - \ datetime.utcnow().isoformat(),\n 'message_id': message.message_id\n\ - \ }\n\n consumed_data.append(transformed_data)\n\n\ - \ logger.info(f\"Consumed message: {data['sample_id']} (ID: {message.message_id})\"\ - )\n\n # Acknowledge the message\n message.ack()\n\n\ - \ except Exception as e:\n logger.error(f\"Error processing\ - \ message: {e}\")\n message.nack()\n\n # Configure flow control\n\ - \ flow_control = pubsub_v1.types.FlowControl(max_messages=batch_size\ - \ * 2)\n\n logger.info(f\"Starting Pub/Sub consumer for topic: {topic_name}\"\ - )\n\n try:\n # Start pulling messages\n streaming_pull_future\ - \ = subscriber.subscribe(\n subscription_path,\n callback=callback,\n\ - \ flow_control=flow_control\n )\n\n logger.info(f\"\ - Listening for messages on {subscription_path}...\")\n\n # Process\ - \ messages until batch size or timeout\n while len(consumed_data)\ - \ < batch_size and (time.time() - start_time) < timeout_seconds:\n \ - \ time.sleep(1) # Check every second\n\n # Cancel the subscriber\n\ - \ streaming_pull_future.cancel()\n\n # Process collected data\n\ - \ if consumed_data:\n df = pd.DataFrame(consumed_data)\n\ - \ df['timestamp'] = pd.to_datetime(df['timestamp'])\n \ - \ df['ingestion_time'] = pd.to_datetime(df['ingestion_time'])\n\n \ - \ # Load to BigQuery\n job_config = bigquery.LoadJobConfig(\n\ - \ write_disposition=\"WRITE_APPEND\",\n schema=schema\n\ - \ )\n\n job = bq_client.load_table_from_dataframe(\n\ - \ df, table_id, job_config=job_config\n )\n \ - \ job.result()\n\n logger.info(f\"Loaded {len(consumed_data)}\ - \ records to BigQuery\")\n else:\n logger.warning(\"No\ - \ messages received within timeout period\")\n\n except Exception as\ - \ e:\n logger.error(f\"Error in Pub/Sub consumer: {e}\")\n \ - \ raise\n\n finally:\n logger.info(\"Pub/Sub consumer finished\"\ - )\n\n # Set output dataset metadata\n dataset.uri = f\"bq://{table_id}\"\ - \n dataset.metadata = {\n \"total_records\": len(consumed_data),\n\ - \ \"table_id\": table_id,\n \"topic\": topic_name,\n \ - \ \"subscription\": subscription_name\n }\n\n" - image: python:3.10-slim -pipelineInfo: - name: pipeline-iris-pubsub-inference -root: - dag: - tasks: - get-model: - cachingOptions: - enableCache: true - componentRef: - name: comp-get-model - inputs: - parameters: - location: - componentInputParameter: location - model_name: - runtimeValue: - constant: Iris-Classifier-XGBoost - project_id: - componentInputParameter: project_id - taskInfo: - name: Get Model - inference-model: - cachingOptions: - enableCache: true - componentRef: - name: comp-inference-model - dependentTasks: - - get-model - - pubsub-consumer-op - inputs: - artifacts: - model: - taskOutputArtifact: - outputArtifactKey: latest_model - producerTask: get-model - parameters: - bq_dataset: - componentInputParameter: bq_dataset - bq_table: - runtimeValue: - constant: iris_pubsub_data - bq_table_predictions: - componentInputParameter: bq_table_predictions - location: - componentInputParameter: location - project_id: - componentInputParameter: project_id - taskInfo: - name: Inference on Pub/Sub Data - pubsub-consumer-op: - cachingOptions: - enableCache: true - componentRef: - name: comp-pubsub-consumer-op - inputs: - parameters: - batch_size: - componentInputParameter: batch_size - bq_dataset: - componentInputParameter: bq_dataset - bq_table: - runtimeValue: - constant: iris_pubsub_data - project_id: - componentInputParameter: project_id - subscription_name: - componentInputParameter: pubsub_subscription - timeout_seconds: - componentInputParameter: timeout_seconds - topic_name: - componentInputParameter: pubsub_topic - taskInfo: - name: Consume Pub/Sub Data - inputDefinitions: - parameters: - batch_size: - defaultValue: 100.0 - isOptional: true - parameterType: NUMBER_INTEGER - bq_dataset: - parameterType: STRING - bq_table_predictions: - parameterType: STRING - location: - parameterType: STRING - project_id: - parameterType: STRING - pubsub_subscription: - defaultValue: iris-inference-data-sub - isOptional: true - parameterType: STRING - pubsub_topic: - defaultValue: iris-inference-data - isOptional: true - parameterType: STRING - timeout_seconds: - defaultValue: 300.0 - isOptional: true - parameterType: NUMBER_INTEGER -schemaVersion: 2.1.0 -sdkVersion: kfp-2.13.0 diff --git a/pyproject.toml b/pyproject.toml index d4fbe80..7aa6d79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "ml_pipelines_kfp" version = "0.1.0" description = "Add your description here" -requires-python = ">=3.9,<3.11" +requires-python = ">=3.9,<3.12" dependencies = [ "fastapi>=0.111.1", "uvicorn>=0.30.1", diff --git a/scripts/clean_reinstall.sh b/scripts/clean_reinstall.sh new file mode 100755 index 0000000..747addc --- /dev/null +++ b/scripts/clean_reinstall.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$REPO_ROOT" + +echo "==> Clearing __pycache__ directories..." +find . -name __pycache__ -exec rm -rf {} + 2>/dev/null || true +echo " Done." + +echo "==> Reinstalling package in editable mode..." +uv pip install -e . +echo " Done." + +echo "" +echo "Package is now installed from source. Any code changes take effect immediately." diff --git a/src/ml_pipelines_kfp/iris_xgboost/models/instance.py b/src/ml_pipelines_kfp/iris_xgboost/models/instance.py index ab0023e..c443c1b 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/models/instance.py +++ b/src/ml_pipelines_kfp/iris_xgboost/models/instance.py @@ -1,13 +1,10 @@ from __future__ import annotations -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel class Instance(BaseModel): - # Accept both canonical names (sepal_length_cm) and aliases (SepalLengthCm) in input - model_config = ConfigDict(populate_by_name=True) - - sepal_length_cm: float = Field(alias="SepalLengthCm") - sepal_width_cm: float = Field(alias="SepalWidthCm") - petal_length_cm: float = Field(alias="PetalLengthCm") - petal_width_cm: float = Field(alias="PetalWidthCm") + sepal_length_cm: float + sepal_width_cm: float + petal_length_cm: float + petal_width_cm: float diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py index 2eee776..7fec98f 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/get_model.py @@ -11,8 +11,6 @@ def get_model( ): from google.cloud import aiplatform, aiplatform_v1 import fsspec - import gcsfs - import joblib from ml_pipelines_kfp.log import get_logger logger = get_logger(__name__) @@ -23,23 +21,23 @@ def get_model( client_options={"api_endpoint": f"{location}-aiplatform.googleapis.com"} ) - request = { + parent_models = list(client.list_models(request={ "parent": f"projects/{project_id}/locations/{location}", "filter": f"display_name={model_name}", - } - parent_models = list(client.list_models(request=request)) - parent_model = parent_models[0] if parent_models else None + })) - if not parent_model: + if not parent_models: logger.error(f"Could not find model: {model_name}") return - logger.info(f"Found model: {parent_model.name}, artifact_uri: {parent_model.artifact_uri}") + blessed = client.get_model(name=parent_models[0].name + "@blessed") + + logger.info(f"Found blessed model: {blessed.name}, version: {blessed.version_id}, artifact_uri: {blessed.artifact_uri}") latest_model_path = latest_model.path.replace("/gcs/", "gs://") - fs, _ = fsspec.core.url_to_fs(parent_model.artifact_uri) + fs, _ = fsspec.core.url_to_fs(blessed.artifact_uri) fs.copy( - parent_model.artifact_uri + "/", + blessed.artifact_uri + "/", latest_model_path, recursive=True, ) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py index 1e2856f..449e42f 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py @@ -7,13 +7,11 @@ def inference_model( project_id: str, location: str, bq_dataset: str, - bq_table: str, + bq_feature_table: str, bq_table_predictions: str, model: Input[Model], ): import joblib - import pandas as pd - import numpy as np from google.cloud import bigquery from datetime import datetime from ml_pipelines_kfp.log import get_logger @@ -22,30 +20,21 @@ def inference_model( client = bigquery.Client(project=project_id) - dataset_ref = bigquery.DatasetReference(project_id, bq_dataset) - table_ref = dataset_ref.table(bq_table) - table = bigquery.Table(table_ref) - iterable_table = client.list_rows(table).to_dataframe_iterable() - - dfs = [] - for row in iterable_table: - dfs.append(row) - - df = pd.concat(dfs, ignore_index=True) - - if bq_table == "iris_pubsub_data": - df_cols = df[ - ["sepal_length", "sepal_width", "petal_length", "petal_width"] - ].rename( - columns={ - "sepal_length": "SepalLengthCm", - "sepal_width": "SepalWidthCm", - "petal_length": "PetalLengthCm", - "petal_width": "PetalWidthCm", - } - ) - else: - df_cols = df[["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"]] + query = f""" + SELECT * FROM `{project_id}.{bq_dataset}.{bq_feature_table}` + WHERE source = 'batch_input' + """ + df = client.query(query).result().to_dataframe() + + logger.info(f"Loaded {len(df)} batch inference rows from feature store") + + feature_cols = [ + "sepal_length_cm", + "sepal_width_cm", + "petal_length_cm", + "petal_width_cm", + ] + df_cols = df[feature_cols] inf_model = joblib.load(model.path + "/model.joblib") inf_pred = inf_model.predict(df_cols) diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py index 459f2f6..27197eb 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_inference.py @@ -13,18 +13,18 @@ REGION, SERVICE_ACCOUNT, BQ_DATASET, - BQ_TABLE, + BQ_FEATURE_TABLE, BQ_TABLE_PREDICTIONS, SERVICE_ACCOUNT_PATH, ) -@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT) +@kfp.dsl.pipeline(name=f"{PIPELINE_NAME}-inference", pipeline_root=PIPELINE_ROOT) def pipeline( project_id: str, location: str, bq_dataset: str, - bq_table: str, + bq_feature_table: str, bq_table_predictions: str, ): @@ -46,7 +46,7 @@ def pipeline( location=location, model=get_model_op.outputs["latest_model"], bq_dataset=bq_dataset, - bq_table=bq_table, + bq_feature_table=bq_feature_table, bq_table_predictions=bq_table_predictions, ) .set_display_name("Inference Model") @@ -61,7 +61,7 @@ def pipeline( parser.add_argument("--project-id", default=PROJECT_ID) parser.add_argument("--region", default=REGION) parser.add_argument("--bq-dataset", default=BQ_DATASET) - parser.add_argument("--bq-table", default=BQ_TABLE) + parser.add_argument("--bq-feature-table", default=BQ_FEATURE_TABLE) parser.add_argument("--bq-table-predictions", default=BQ_TABLE_PREDICTIONS) parser.add_argument("--service-account-path", default=SERVICE_ACCOUNT_PATH) cli = parser.parse_args() @@ -76,16 +76,16 @@ def pipeline( kfp.compiler.Compiler().compile( pipeline_func=pipeline, package_path="pipeline.yaml", - pipeline_name=PIPELINE_NAME, + pipeline_name=f"{PIPELINE_NAME}-inference", ) job = aip.PipelineJob( - display_name=PIPELINE_NAME, + display_name=f"{PIPELINE_NAME}-inference", template_path="pipeline.yaml", pipeline_root=PIPELINE_ROOT, enable_caching=False, parameter_values={ "bq_dataset": cli.bq_dataset, - "bq_table": cli.bq_table, + "bq_feature_table": cli.bq_feature_table, "bq_table_predictions": cli.bq_table_predictions, "location": cli.region, "project_id": cli.project_id, diff --git a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_training.py b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_training.py index 5602667..e6444ec 100644 --- a/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_training.py +++ b/src/ml_pipelines_kfp/iris_xgboost/pipelines/iris_pipeline_training.py @@ -28,7 +28,7 @@ def coalesce(*args): return next((a for a in args if a is not None), None) -@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT) +@kfp.dsl.pipeline(name=f"{PIPELINE_NAME}-training", pipeline_root=PIPELINE_ROOT) def pipeline(project_id: str, location: str, bq_dataset: str, bq_feature_table: str): # Import components @@ -121,7 +121,7 @@ def pipeline(project_id: str, location: str, bq_dataset: str, bq_feature_table: # _constants.IMAGE_NAME needed: deploy.py/schema.py use it as base_image in @component decorator IMAGE_NAME = _constants.IMAGE_NAME = coalesce(cli.image_name, _constants.IMAGE_NAME) FASTAPI_IMAGE_NAME = coalesce(cli.fastapi_image_name, FASTAPI_IMAGE_NAME) - pipeline_name = coalesce(cli.pipeline_name, PIPELINE_NAME) + pipeline_name = coalesce(cli.pipeline_name, f"{PIPELINE_NAME}-training") pipeline_root = coalesce(cli.pipeline_root, PIPELINE_ROOT) bq_dataset = coalesce(cli.bq_dataset, BQ_DATASET) bq_table = coalesce(cli.bq_table, BQ_TABLE) diff --git a/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb new file mode 100644 index 0000000..eb257fb --- /dev/null +++ b/src/ml_pipelines_kfp/notebooks/verify_model_features.ipynb @@ -0,0 +1,206 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0700e57c", + "metadata": {}, + "source": [ + "# Verify Model Feature Names\n", + "\n", + "Load the registered model from GCS and inspect the feature names it was trained on.\n", + "Use this to confirm the model expects canonical feature names (`sepal_length_cm`, etc.)\n", + "after retraining with the feature store pipeline." + ] + }, + { + "cell_type": "markdown", + "id": "31b8841c", + "metadata": {}, + "source": [ + "## Config" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "b4150bbb", + "metadata": {}, + "outputs": [], + "source": [ + "PROJECT = \"deeplearning-sahil\"\n", + "REGION = \"us-central1\"\n", + "MODEL_NAME = \"Iris-Classifier-XGBoost-staging\"" + ] + }, + { + "cell_type": "markdown", + "id": "fb8aafcc", + "metadata": {}, + "source": [ + "## 1. Find the blessed model and its artifact URI" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "b8214db9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Model: Iris-Classifier-XGBoost-staging\n", + "Version: 6\n", + "Aliases: ['blessed']\n", + "Created: 2026-06-13 20:21:22.897585+00:00\n", + "Artifact URI: gs://sb-vertex/staging/pipeline_root/57434141298/pipeline-iris-staging-training-20260616112330/choose-best-model_-2956042079337185280/best_model\n" + ] + } + ], + "source": [ + "from google.cloud import aiplatform_v1\n", + "\n", + "client = aiplatform_v1.ModelServiceClient(\n", + " client_options={\"api_endpoint\": f\"{REGION}-aiplatform.googleapis.com\"}\n", + ")\n", + "\n", + "parent_models = list(client.list_models(\n", + " request={\n", + " \"parent\": f\"projects/{PROJECT}/locations/{REGION}\",\n", + " \"filter\": f\"display_name={MODEL_NAME}\",\n", + " }\n", + "))\n", + "\n", + "model = client.get_model(name=parent_models[0].name + \"@blessed\")\n", + "\n", + "print(f\"Model: {model.display_name}\")\n", + "print(f\"Version: {model.version_id}\")\n", + "print(f\"Aliases: {list(model.version_aliases)}\")\n", + "print(f\"Created: {model.create_time}\")\n", + "print(f\"Artifact URI: {model.artifact_uri}\")" + ] + }, + { + "cell_type": "markdown", + "id": "26d6407a", + "metadata": {}, + "source": [ + "## 2. Download and load model.joblib from GCS" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "2e668fc9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Downloading: gs://sb-vertex/staging/pipeline_root/57434141298/pipeline-iris-staging-training-20260616112330/choose-best-model_-2956042079337185280/best_model/model.joblib\n", + "Model type: RandomForestClassifier\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/shlba/Desktop/Docs/Study/code/Machine-learning-Ops-Deployment-Inference/.venv/lib/python3.10/site-packages/sklearn/base.py:380: InconsistentVersionWarning: Trying to unpickle estimator DecisionTreeClassifier from version 1.7.2 when using version 1.6.1. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:\n", + "https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations\n", + " warnings.warn(\n", + "/Users/shlba/Desktop/Docs/Study/code/Machine-learning-Ops-Deployment-Inference/.venv/lib/python3.10/site-packages/sklearn/base.py:380: InconsistentVersionWarning: Trying to unpickle estimator RandomForestClassifier from version 1.7.2 when using version 1.6.1. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:\n", + "https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "import fsspec\n", + "import joblib\n", + "\n", + "model_uri = model.artifact_uri.rstrip(\"/\") + \"/model.joblib\"\n", + "print(f\"Downloading: {model_uri}\")\n", + "\n", + "fs, _ = fsspec.core.url_to_fs(model_uri)\n", + "with fs.open(model_uri, \"rb\") as f:\n", + " xgb_model = joblib.load(f)\n", + "\n", + "print(f\"Model type: {type(xgb_model).__name__}\")" + ] + }, + { + "cell_type": "markdown", + "id": "0df7aeaf", + "metadata": {}, + "source": [ + "## 3. Inspect feature names the model was trained on" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "ca94ea8a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Feature names from training (feature_names_in_):\n", + "['sepal_length_cm', 'sepal_width_cm', 'petal_length_cm', 'petal_width_cm']\n", + "\n", + "Number of features: 4\n", + "\n", + "āœ… Feature names match canonical names from feature platform\n" + ] + } + ], + "source": [ + "print(\"Feature names from training (feature_names_in_):\")\n", + "print(list(xgb_model.feature_names_in_))\n", + "print(f\"\\nNumber of features: {xgb_model.n_features_in_}\")\n", + "\n", + "expected = [\"sepal_length_cm\", \"sepal_width_cm\", \"petal_length_cm\", \"petal_width_cm\"]\n", + "actual = list(xgb_model.feature_names_in_)\n", + "\n", + "if actual == expected:\n", + " print(\"\\nāœ… Feature names match canonical names from feature platform\")\n", + "else:\n", + " print(f\"\\nāŒ Feature name mismatch!\")\n", + " print(f\" Expected: {expected}\")\n", + " print(f\" Actual: {actual}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a4016ce2", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ml-pipelines-kfp (3.10.0)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/ml_pipelines_kfp/schemas/iris_xgboost/vertex/instance.yaml b/src/ml_pipelines_kfp/schemas/iris_xgboost/vertex/instance.yaml index 668a3e5..bdb8d04 100644 --- a/src/ml_pipelines_kfp/schemas/iris_xgboost/vertex/instance.yaml +++ b/src/ml_pipelines_kfp/schemas/iris_xgboost/vertex/instance.yaml @@ -1,13 +1,13 @@ title: Instance type: object properties: - SepalLengthCm: + sepal_length_cm: type: number - SepalWidthCm: - type: integer - PetalLengthCm: + sepal_width_cm: type: number - PetalWidthCm: + petal_length_cm: type: number - Species: + petal_width_cm: + type: number + species: type: integer \ No newline at end of file