Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 0 additions & 388 deletions pipeline-iris-pubsub-inference.yaml

This file was deleted.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "ml_pipelines_kfp"
version = "0.1.0"
description = "Add your description here"
requires-python = ">=3.9,<3.11"
requires-python = ">=3.9,<3.12"
dependencies = [
"fastapi>=0.111.1",
"uvicorn>=0.30.1",
Expand Down
16 changes: 16 additions & 0 deletions scripts/clean_reinstall.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -euo pipefail

REPO_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
cd "$REPO_ROOT"

echo "==> Clearing __pycache__ directories..."
find . -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
echo " Done."

echo "==> Reinstalling package in editable mode..."
uv pip install -e .
echo " Done."

echo ""
echo "Package is now installed from source. Any code changes take effect immediately."
13 changes: 5 additions & 8 deletions src/ml_pipelines_kfp/iris_xgboost/models/instance.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from __future__ import annotations

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel


class Instance(BaseModel):
# Accept both canonical names (sepal_length_cm) and aliases (SepalLengthCm) in input
model_config = ConfigDict(populate_by_name=True)

sepal_length_cm: float = Field(alias="SepalLengthCm")
sepal_width_cm: float = Field(alias="SepalWidthCm")
petal_length_cm: float = Field(alias="PetalLengthCm")
petal_width_cm: float = Field(alias="PetalWidthCm")
sepal_length_cm: float
sepal_width_cm: float
petal_length_cm: float
petal_width_cm: float
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ def get_model(
):
from google.cloud import aiplatform, aiplatform_v1
import fsspec
import gcsfs
import joblib
from ml_pipelines_kfp.log import get_logger

logger = get_logger(__name__)
Expand All @@ -23,23 +21,23 @@ def get_model(
client_options={"api_endpoint": f"{location}-aiplatform.googleapis.com"}
)

request = {
parent_models = list(client.list_models(request={
"parent": f"projects/{project_id}/locations/{location}",
"filter": f"display_name={model_name}",
}
parent_models = list(client.list_models(request=request))
parent_model = parent_models[0] if parent_models else None
}))

if not parent_model:
if not parent_models:
logger.error(f"Could not find model: {model_name}")
return

logger.info(f"Found model: {parent_model.name}, artifact_uri: {parent_model.artifact_uri}")
blessed = client.get_model(name=parent_models[0].name + "@blessed")

logger.info(f"Found blessed model: {blessed.name}, version: {blessed.version_id}, artifact_uri: {blessed.artifact_uri}")

latest_model_path = latest_model.path.replace("/gcs/", "gs://")
fs, _ = fsspec.core.url_to_fs(parent_model.artifact_uri)
fs, _ = fsspec.core.url_to_fs(blessed.artifact_uri)
fs.copy(
parent_model.artifact_uri + "/",
blessed.artifact_uri + "/",
latest_model_path,
recursive=True,
)
43 changes: 16 additions & 27 deletions src/ml_pipelines_kfp/iris_xgboost/pipelines/components/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ def inference_model(
project_id: str,
location: str,
bq_dataset: str,
bq_table: str,
bq_feature_table: str,
bq_table_predictions: str,
model: Input[Model],
):
import joblib
import pandas as pd
import numpy as np
from google.cloud import bigquery
from datetime import datetime
from ml_pipelines_kfp.log import get_logger
Expand All @@ -22,30 +20,21 @@ def inference_model(

client = bigquery.Client(project=project_id)

dataset_ref = bigquery.DatasetReference(project_id, bq_dataset)
table_ref = dataset_ref.table(bq_table)
table = bigquery.Table(table_ref)
iterable_table = client.list_rows(table).to_dataframe_iterable()

dfs = []
for row in iterable_table:
dfs.append(row)

df = pd.concat(dfs, ignore_index=True)

if bq_table == "iris_pubsub_data":
df_cols = df[
["sepal_length", "sepal_width", "petal_length", "petal_width"]
].rename(
columns={
"sepal_length": "SepalLengthCm",
"sepal_width": "SepalWidthCm",
"petal_length": "PetalLengthCm",
"petal_width": "PetalWidthCm",
}
)
else:
df_cols = df[["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"]]
query = f"""
SELECT * FROM `{project_id}.{bq_dataset}.{bq_feature_table}`
WHERE source = 'batch_input'
"""
df = client.query(query).result().to_dataframe()

logger.info(f"Loaded {len(df)} batch inference rows from feature store")

feature_cols = [
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
]
df_cols = df[feature_cols]

inf_model = joblib.load(model.path + "/model.joblib")
inf_pred = inf_model.predict(df_cols)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
REGION,
SERVICE_ACCOUNT,
BQ_DATASET,
BQ_TABLE,
BQ_FEATURE_TABLE,
BQ_TABLE_PREDICTIONS,
SERVICE_ACCOUNT_PATH,
)


@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
@kfp.dsl.pipeline(name=f"{PIPELINE_NAME}-inference", pipeline_root=PIPELINE_ROOT)
def pipeline(
project_id: str,
location: str,
bq_dataset: str,
bq_table: str,
bq_feature_table: str,
bq_table_predictions: str,
):

Expand All @@ -46,7 +46,7 @@ def pipeline(
location=location,
model=get_model_op.outputs["latest_model"],
bq_dataset=bq_dataset,
bq_table=bq_table,
bq_feature_table=bq_feature_table,
bq_table_predictions=bq_table_predictions,
)
.set_display_name("Inference Model")
Expand All @@ -61,7 +61,7 @@ def pipeline(
parser.add_argument("--project-id", default=PROJECT_ID)
parser.add_argument("--region", default=REGION)
parser.add_argument("--bq-dataset", default=BQ_DATASET)
parser.add_argument("--bq-table", default=BQ_TABLE)
parser.add_argument("--bq-feature-table", default=BQ_FEATURE_TABLE)
parser.add_argument("--bq-table-predictions", default=BQ_TABLE_PREDICTIONS)
parser.add_argument("--service-account-path", default=SERVICE_ACCOUNT_PATH)
cli = parser.parse_args()
Expand All @@ -76,16 +76,16 @@ def pipeline(
kfp.compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
pipeline_name=PIPELINE_NAME,
pipeline_name=f"{PIPELINE_NAME}-inference",
)
job = aip.PipelineJob(
display_name=PIPELINE_NAME,
display_name=f"{PIPELINE_NAME}-inference",
template_path="pipeline.yaml",
pipeline_root=PIPELINE_ROOT,
enable_caching=False,
parameter_values={
"bq_dataset": cli.bq_dataset,
"bq_table": cli.bq_table,
"bq_feature_table": cli.bq_feature_table,
"bq_table_predictions": cli.bq_table_predictions,
"location": cli.region,
"project_id": cli.project_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def coalesce(*args):
return next((a for a in args if a is not None), None)


@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
@kfp.dsl.pipeline(name=f"{PIPELINE_NAME}-training", pipeline_root=PIPELINE_ROOT)
def pipeline(project_id: str, location: str, bq_dataset: str, bq_feature_table: str):

# Import components
Expand Down Expand Up @@ -121,7 +121,7 @@ def pipeline(project_id: str, location: str, bq_dataset: str, bq_feature_table:
# _constants.IMAGE_NAME needed: deploy.py/schema.py use it as base_image in @component decorator
IMAGE_NAME = _constants.IMAGE_NAME = coalesce(cli.image_name, _constants.IMAGE_NAME)
FASTAPI_IMAGE_NAME = coalesce(cli.fastapi_image_name, FASTAPI_IMAGE_NAME)
pipeline_name = coalesce(cli.pipeline_name, PIPELINE_NAME)
pipeline_name = coalesce(cli.pipeline_name, f"{PIPELINE_NAME}-training")
pipeline_root = coalesce(cli.pipeline_root, PIPELINE_ROOT)
bq_dataset = coalesce(cli.bq_dataset, BQ_DATASET)
bq_table = coalesce(cli.bq_table, BQ_TABLE)
Expand Down
Loading
Loading