From e951933e50316c6e21a312de3d939c2e9892e8dc Mon Sep 17 00:00:00 2001 From: Alan Date: Fri, 15 May 2026 12:51:10 +0200 Subject: [PATCH] feat: purge cascade observed_results + scripts GradientBoosting hors-Docker + delete_iris MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit API : - DELETE /predictions/purge accepte older_than_days=0 (supprime tout, sans seuil) - Suppression en cascade des observed_results liés aux prédictions purgées - PurgeResponse : nouveau champ deleted_observed_results_count - Test test_purge_older_than_days_zero_returns_422 → test_purge_older_than_days_zero_is_valid Scripts documentation : - train_iris_GradientBoosting.py : aligné sur train_iris.py — dotenv, valeurs par défaut, DEBUG dump env, timing _ts(), MLFLOW_HTTP_REQUEST_TIMEOUT, résolution credentials MinIO, substitution hostname hors-Docker, log_metrics batché, infer_signature avec predict_proba - upload_iris_model_GradientBoosting.py : ajout dotenv - delete_iris.py : nouveau script de nettoyage complet (modèle + prédictions + ground truth) avec dry-run par défaut et flag --yes pour exécution réelle - send_ground_truth.py → send_ground_truth_iris.py (renommage) Co-Authored-By: Claude Sonnet 4.6 --- README.md | 2 +- documentation/Scripts/delete_iris.py | 213 ++++++++++++++++++ ...und_truth.py => send_ground_truth_iris.py} | 5 +- .../Scripts/send_predictions_iris.py | 5 +- .../Scripts/train_iris_GradientBoosting.py | 161 +++++++++++-- documentation/Scripts/upload_iris_model.py | 4 +- .../upload_iris_model_GradientBoosting.py | 4 +- src/api/models.py | 5 +- src/api/predict.py | 11 +- src/schemas/prediction.py | 8 +- src/services/db_service.py | 48 ++-- tests/test_predictions_purge.py | 8 +- 12 files changed, 417 insertions(+), 57 deletions(-) create mode 100644 documentation/Scripts/delete_iris.py rename documentation/Scripts/{send_ground_truth.py => send_ground_truth_iris.py} (98%) diff --git a/README.md b/README.md index af4eebb..81b26be 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ bash scripts/init_env.sh # 2. (Optionnel) Supprimer les volumes Postgres existants avant le premier déploiement # À faire si vous repartez de zéro ou si le mot de passe a changé -docker-compose -p predictml-api down -v 2>&1 && echo "=== Volumes supprimés ===" +docker-compose -p predictml-api down -v 2>&1 # 3. Lancer tous les services docker-compose -p predictml-api up -d --build diff --git a/documentation/Scripts/delete_iris.py b/documentation/Scripts/delete_iris.py new file mode 100644 index 0000000..cd8b227 --- /dev/null +++ b/documentation/Scripts/delete_iris.py @@ -0,0 +1,213 @@ +""" +delete_iris.py — Nettoyage complet du modèle iris-classifier via l'API PredictML +================================================================================== + +Ce script supprime dans l'ordre : + 1. Toutes les prédictions du modèle (DELETE /predictions/purge) + 2. Toutes les versions du modèle (DELETE /models/{name}) + +Il affiche ensuite le nombre de résultats observés restants (ground truth) et +explique pourquoi ils ne peuvent pas être supprimés via l'API. + +Usage : + python delete_iris.py # dry-run → affiche ce qui sera supprimé + python delete_iris.py --yes # supprime sans confirmation + +Variables d'environnement : + API_URL URL de l'API (défaut : http://localhost:8000) + API_TOKEN Token Bearer admin — requis + MODEL_NAME Nom du modèle (défaut : iris-classifier) + +Prérequis Python : + pip install requests python-dotenv + +Ce script supprime la totalité des données, y compris les prédictions récentes +et les observed_results (ground truth) associés. +""" + +import json +import os +import sys + +import requests +from dotenv import find_dotenv, load_dotenv + +load_dotenv(find_dotenv()) + +# ── Configuration ───────────────────────────────────────────────────────────── + +API_URL = os.environ.get("API_URL", "http://localhost:8000") +API_TOKEN = os.environ.get("API_TOKEN", os.environ.get("ADMIN_TOKEN", "")) +MODEL_NAME = os.environ.get("MODEL_NAME", "iris-classifier") + +DRY_RUN = "--yes" not in sys.argv + +HEADERS = {"Authorization": f"Bearer {API_TOKEN}"} +HEADERS_JSON = {**HEADERS, "Content-Type": "application/json"} + +# ── Validation ──────────────────────────────────────────────────────────────── + +if not API_TOKEN: + print("❌ API_TOKEN non défini.") + print(" Lancez : API_TOKEN= python delete_iris.py [--yes]") + sys.exit(1) + +# ── 0. Vérification API ─────────────────────────────────────────────────────── + +try: + r = requests.get(f"{API_URL}/health", timeout=5) + r.raise_for_status() + print(f"✅ API accessible : {API_URL}") +except Exception as e: + print(f"❌ API inaccessible ({API_URL}) : {e}") + sys.exit(1) + +print() +if DRY_RUN: + print("=" * 62) + print(" MODE DRY-RUN — aucune suppression (ajoutez --yes pour exécuter)") + print("=" * 62) +else: + print("=" * 62) + print(f" SUPPRESSION RÉELLE pour le modèle : {MODEL_NAME}") + print("=" * 62) + +# ── 1. Inventaire — modèles ─────────────────────────────────────────────────── + +print(f"\n[1/3] Récupération des versions de '{MODEL_NAME}'…") + +r = requests.get(f"{API_URL}/models", headers=HEADERS, params={"name": MODEL_NAME}, timeout=10) + +if r.status_code == 200: + all_models = r.json() + versions = [m for m in all_models if m.get("name") == MODEL_NAME] + if versions: + print(f" {len(versions)} version(s) trouvée(s) :") + for v in versions: + prod = " [PRODUCTION]" if v.get("is_production") else "" + print(f" • v{v.get('version')}{prod}") + else: + print(f" Aucune version trouvée pour '{MODEL_NAME}'.") +elif r.status_code == 404: + versions = [] + print(f" Aucun modèle '{MODEL_NAME}' trouvé.") +else: + print(f" ⚠️ GET /models a répondu {r.status_code} : {r.text[:120]}") + versions = [] + +# ── 2. Inventaire — prédictions ─────────────────────────────────────────────── + +print(f"\n[2/3] Simulation de purge des prédictions et observed_results (older_than_days=0)…") + +r = requests.delete( + f"{API_URL}/predictions/purge", + headers=HEADERS, + params={"older_than_days": 0, "model_name": MODEL_NAME, "dry_run": True}, + timeout=15, +) + +predictions_to_delete = 0 +linked_gt = 0 +if r.status_code == 200: + purge_preview = r.json() + predictions_to_delete = purge_preview.get("deleted_count", 0) + linked_gt = purge_preview.get("linked_observed_results_count", 0) + oldest_remaining = purge_preview.get("oldest_remaining") + print(f" {predictions_to_delete} prédiction(s) à supprimer") + print(f" {linked_gt} observed_result(s) associé(s) à supprimer") + if oldest_remaining: + print(f" Plus ancienne après purge : {oldest_remaining}") +else: + print(f" ⚠️ GET purge dry-run a répondu {r.status_code} : {r.text[:120]}") + +# ── 3. Inventaire — observed results ───────────────────────────────────────── + +print(f"\n[3/3] Les observed_results seront supprimés en cascade avec les prédictions.") +observed_total = linked_gt + +# ── Confirmation / sortie dry-run ───────────────────────────────────────────── + +print() +print("─" * 62) +print(f" Résumé de ce qui sera supprimé :") +print(f" • {len(versions)} version(s) du modèle '{MODEL_NAME}'") +print(f" • {predictions_to_delete} prédiction(s)") +print(f" • {observed_total} observed_result(s) (supprimés en cascade)") +print("─" * 62) + +if DRY_RUN: + print() + print(" Relancez avec --yes pour exécuter la suppression :") + print(f" python delete_iris.py --yes") + sys.exit(0) + +print() + +# ── Exécution 1 : purge des prédictions ────────────────────────────────────── + +print(" ÉTAPE 1 — Purge des prédictions…") + +if predictions_to_delete == 0: + print(" ✅ Aucune prédiction à supprimer (> 24 h).") +else: + r = requests.delete( + f"{API_URL}/predictions/purge", + headers=HEADERS, + params={"older_than_days": 0, "model_name": MODEL_NAME, "dry_run": False}, + timeout=30, + ) + + if r.status_code == 200: + result = r.json() + deleted = result.get("deleted_count", 0) + deleted_obs = result.get("deleted_observed_results_count", 0) + print(f" ✅ {deleted} prédiction(s) supprimée(s).") + if deleted_obs: + print(f" ✅ {deleted_obs} observed_result(s) supprimé(s) en cascade.") + else: + print(f" ❌ Erreur {r.status_code} lors de la purge :") + try: + print(f" {json.dumps(r.json(), indent=2, ensure_ascii=False)}") + except Exception: + print(f" {r.text[:300]}") + +# ── Exécution 2 : suppression du modèle ────────────────────────────────────── + +print() +print(f" ÉTAPE 2 — Suppression de toutes les versions de '{MODEL_NAME}'…") + +if not versions: + print(f" ✅ Aucun modèle '{MODEL_NAME}' à supprimer.") +else: + r = requests.delete( + f"{API_URL}/models/{MODEL_NAME}", + headers=HEADERS, + timeout=30, + ) + + if r.status_code == 200: + result = r.json() + deleted_v = result.get("deleted_versions", []) + mlflow_del = result.get("mlflow_runs_deleted", []) + minio_del = result.get("minio_objects_deleted", []) + + print(f" ✅ {len(deleted_v)} version(s) supprimée(s) : {deleted_v}") + if mlflow_del: + print(f" {len(mlflow_del)} run(s) MLflow supprimé(s)") + if minio_del: + print(f" {len(minio_del)} objet(s) MinIO supprimé(s)") + elif r.status_code == 404: + print(f" ✅ Modèle '{MODEL_NAME}' déjà absent.") + else: + print(f" ❌ Erreur {r.status_code} lors de la suppression du modèle :") + try: + print(f" {json.dumps(r.json(), indent=2, ensure_ascii=False)}") + except Exception: + print(f" {r.text[:300]}") + +# ── Résumé final ────────────────────────────────────────────────────────────── + +print() +print("=" * 62) +print(" Nettoyage terminé.") +print("=" * 62) diff --git a/documentation/Scripts/send_ground_truth.py b/documentation/Scripts/send_ground_truth_iris.py similarity index 98% rename from documentation/Scripts/send_ground_truth.py rename to documentation/Scripts/send_ground_truth_iris.py index 58c21f8..82ca3aa 100644 --- a/documentation/Scripts/send_ground_truth.py +++ b/documentation/Scripts/send_ground_truth_iris.py @@ -28,10 +28,11 @@ from datetime import datetime, timezone import requests - +from dotenv import find_dotenv, load_dotenv +load_dotenv(find_dotenv()) # ── Configuration ───────────────────────────────────────────────────────────── -API_URL = os.environ.get("API_URL", "http://localhost:8000") +API_URL = os.environ.get("API_URL", "http://localhost:80") API_TOKEN = os.environ.get("API_TOKEN", os.environ.get("ADMIN_TOKEN", "")) MODEL_NAME = os.environ.get("MODEL_NAME", "iris-classifier") diff --git a/documentation/Scripts/send_predictions_iris.py b/documentation/Scripts/send_predictions_iris.py index 73c682f..af77ff1 100644 --- a/documentation/Scripts/send_predictions_iris.py +++ b/documentation/Scripts/send_predictions_iris.py @@ -24,10 +24,11 @@ import sys import requests - +from dotenv import find_dotenv, load_dotenv +load_dotenv(find_dotenv()) # ── Configuration ───────────────────────────────────────────────────────────── -API_URL = os.environ.get("API_URL", "http://localhost:8000") +API_URL = os.environ.get("API_URL", "http://localhost:80") API_TOKEN = os.environ.get("API_TOKEN", os.environ.get("ADMIN_TOKEN", "")) MODEL_NAME = os.environ.get("MODEL_NAME", "iris-classifier") diff --git a/documentation/Scripts/train_iris_GradientBoosting.py b/documentation/Scripts/train_iris_GradientBoosting.py index 2aef4e7..0edfc01 100644 --- a/documentation/Scripts/train_iris_GradientBoosting.py +++ b/documentation/Scripts/train_iris_GradientBoosting.py @@ -26,16 +26,20 @@ MODULES AUTORISÉS par le sandbox PredictML ------------------------------------------- - os, sys, json, pickle, datetime, numpy, pandas, sklearn, mlflow + os, sys, json, pickle, datetime, dotenv, numpy, pandas, sklearn, mlflow (subprocess, requests, socket, urllib sont bloqués) """ import json +import logging import os import pickle import sys from datetime import datetime +from dotenv import find_dotenv, load_dotenv +load_dotenv(find_dotenv()) + import numpy as np import pandas as pd from sklearn.datasets import load_iris @@ -46,26 +50,109 @@ try: import mlflow import mlflow.sklearn + # pip version non résolue dans le sandbox → pas actionnable + logging.getLogger("mlflow.utils.environment").setLevel(logging.ERROR) + # gRPC OTLP absent → tracing désactivé silencieusement + logging.getLogger("mlflow.tracing.provider").setLevel(logging.ERROR) _MLFLOW_AVAILABLE = True except ImportError: _MLFLOW_AVAILABLE = False +DEBUG = True + +_T0 = datetime.now() + +def _ts(label: str) -> None: + elapsed = (datetime.now() - _T0).total_seconds() + print(f"[TIMING] {label} — +{elapsed:.2f}s", file=sys.stderr) + # ── 1. Variables d'environnement (OBLIGATOIRES) ─────────────────────────────── -TRAIN_START_DATE = os.environ["TRAIN_START_DATE"] -TRAIN_END_DATE = os.environ["TRAIN_END_DATE"] -OUTPUT_MODEL_PATH = os.environ["OUTPUT_MODEL_PATH"] +if DEBUG: + print(f""" + ------------- AVANT SURCHARGE ------------ + [ENV] TRAIN_START_DATE = {os.environ.get("TRAIN_START_DATE")} + [ENV] TRAIN_END_DATE = {os.environ.get("TRAIN_END_DATE")} + [ENV] OUTPUT_MODEL_PATH = {os.environ.get("OUTPUT_MODEL_PATH")} + [ENV] MODEL_NAME = {os.environ.get("MODEL_NAME")} + [ENV] MLFLOW_TRACKING_URI = {os.environ.get("MLFLOW_TRACKING_URI")} + [ENV] MLFLOW_TRACKING_USERNAME = {os.environ.get("MLFLOW_TRACKING_USERNAME")} + [ENV] MLFLOW_TRACKING_PASSWORD = {os.environ.get("MLFLOW_TRACKING_PASSWORD")} + [ENV] MLFLOW_HTTP_REQUEST_TIMEOUT= {os.environ.get("MLFLOW_HTTP_REQUEST_TIMEOUT")} + [ENV] MLFLOW_S3_ENDPOINT_URL = {os.environ.get("MLFLOW_S3_ENDPOINT_URL")} + [ENV] AWS_ACCESS_KEY_ID = {os.environ.get("AWS_ACCESS_KEY_ID")} + [ENV] AWS_SECRET_ACCESS_KEY = {os.environ.get("AWS_SECRET_ACCESS_KEY")} + [ENV] MINIO_ENDPOINT = {os.environ.get("MINIO_ENDPOINT")} + [ENV] MINIO_ROOT_USER = {os.environ.get("MINIO_ROOT_USER")} + [ENV] MINIO_ACCESS_KEY = {os.environ.get("MINIO_ACCESS_KEY")} + [ENV] MINIO_ROOT_PASSWORD = {os.environ.get("MINIO_ROOT_PASSWORD")} + [ENV] MINIO_SECRET_KEY = {os.environ.get("MINIO_SECRET_KEY")} + """) + +TRAIN_START_DATE = os.environ.get("TRAIN_START_DATE", "2025-01-01") +TRAIN_END_DATE = os.environ.get("TRAIN_END_DATE", "2025-02-01") +OUTPUT_MODEL_PATH = os.environ.get("OUTPUT_MODEL_PATH", "default_model_path.pkl") MODEL_NAME = os.environ.get("MODEL_NAME", "iris-classifier") MLFLOW_TRACKING_URI = os.environ.get("MLFLOW_TRACKING_URI", "") MLFLOW_TRACKING_USERNAME = os.environ.get("MLFLOW_TRACKING_USERNAME", "") MLFLOW_TRACKING_PASSWORD = os.environ.get("MLFLOW_TRACKING_PASSWORD", "") +# Timeout court pour tous les appels HTTP MLflow (défaut système = 120 s → timeout garanti) +os.environ.setdefault("MLFLOW_HTTP_REQUEST_TIMEOUT", "8") + +# Credentials MinIO pour log_model — boto3 lit os.environ directement. +# Sans eux, boto3 explore tous les credential providers (~15s) avant d'abandonner. +# Priorité : AWS_* (standard boto3) > MINIO_ROOT_* (docker-compose) +_minio_endpoint = (os.environ.get("MLFLOW_S3_ENDPOINT_URL", "") + or os.environ.get("MINIO_ENDPOINT", "")) +_minio_access_key = (os.environ.get("AWS_ACCESS_KEY_ID", "") + or os.environ.get("MINIO_ROOT_USER", "")) +_minio_secret_key = (os.environ.get("AWS_SECRET_ACCESS_KEY", "") + or os.environ.get("MINIO_ROOT_PASSWORD", "")) +if _minio_endpoint: + os.environ["MLFLOW_S3_ENDPOINT_URL"] = _minio_endpoint +if _minio_access_key: + os.environ["AWS_ACCESS_KEY_ID"] = _minio_access_key +if _minio_secret_key: + os.environ["AWS_SECRET_ACCESS_KEY"] = _minio_secret_key + +# Hors Docker : substituer les hostnames internes par localhost +# /.dockerenv est créé automatiquement par Docker dans tout container +_in_docker = os.path.exists("/.dockerenv") +if not _in_docker: + if "//mlflow:" in MLFLOW_TRACKING_URI: + MLFLOW_TRACKING_URI = MLFLOW_TRACKING_URI.replace("//mlflow:", "//localhost:") + os.environ["MLFLOW_TRACKING_URI"] = MLFLOW_TRACKING_URI + _s3_url = os.environ.get("MLFLOW_S3_ENDPOINT_URL", "") + if "//minio:" in _s3_url: + _minio_internal_port = os.environ.get("MINIO_INTERNAL_PORT", "9000") + _minio_host_port = os.environ.get("MINIO_PORT", "9010") + os.environ["MLFLOW_S3_ENDPOINT_URL"] = _s3_url.replace( + f"//minio:{_minio_internal_port}", f"//localhost:{_minio_host_port}" + ) + +if DEBUG: + print(f""" + ------------- APRES SURCHARGE ------------ + [ENV] TRAIN_START_DATE = {TRAIN_START_DATE} + [ENV] TRAIN_END_DATE = {TRAIN_END_DATE} + [ENV] OUTPUT_MODEL_PATH = {OUTPUT_MODEL_PATH} + [ENV] MODEL_NAME = {MODEL_NAME} + [ENV] MLFLOW_TRACKING_URI = {MLFLOW_TRACKING_URI} + [ENV] MLFLOW_TRACKING_USERNAME = {MLFLOW_TRACKING_USERNAME} + [ENV] MLFLOW_TRACKING_PASSWORD = {MLFLOW_TRACKING_PASSWORD} + [ENV] MLFLOW_S3_ENDPOINT_URL = {os.environ.get("MLFLOW_S3_ENDPOINT_URL")} + [ENV] AWS_ACCESS_KEY_ID = {os.environ.get("AWS_ACCESS_KEY_ID")} + [ENV] AWS_SECRET_ACCESS_KEY = {os.environ.get("AWS_SECRET_ACCESS_KEY")} + """) + print( f"[{MODEL_NAME}] Ré-entraînement du {TRAIN_START_DATE} au {TRAIN_END_DATE}", file=sys.stderr, ) print(f"[{MODEL_NAME}] Sortie : {OUTPUT_MODEL_PATH}", file=sys.stderr) +_ts("démarrage") # ── 2. Chargement des données ───────────────────────────────────────────────── # @@ -83,6 +170,7 @@ # ───────────────────────────────────────────────────────────────────────────── print(f"[{MODEL_NAME}] Chargement du dataset Iris (données synthétiques)…", file=sys.stderr) +_ts("avant chargement données") iris = load_iris() X_full = pd.DataFrame(iris.data, columns=iris.feature_names) @@ -99,6 +187,7 @@ X, y = X_full.iloc[indices], y_full[indices] print(f"[{MODEL_NAME}] {n_samples} exemples retenus sur {len(X_full)} disponibles.", file=sys.stderr) +_ts("après chargement données") if n_samples < 20: print(json.dumps({"error": f"Pas assez de données ({n_samples} exemples < 20 requis)"})) @@ -118,7 +207,9 @@ HYPERPARAMS = {"n_estimators": 100, "learning_rate": 0.1, "max_depth": 3, "random_state": 42} model = GradientBoostingClassifier(**HYPERPARAMS) +_ts("avant fit") model.fit(X_train, y_train) +_ts("après fit") # ── 4. Évaluation ───────────────────────────────────────────────────────────── @@ -127,6 +218,7 @@ f1 = float(f1_score(y_test, y_pred, average="weighted", zero_division=0)) print(f"[{MODEL_NAME}] Accuracy : {acc:.4f} | F1 : {f1:.4f}", file=sys.stderr) +_ts("après évaluation") # ── 5. Sauvegarde du modèle (OBLIGATOIRE) ───────────────────────────────────── @@ -134,6 +226,7 @@ pickle.dump(model, fh) print(f"[{MODEL_NAME}] Modèle sauvegardé → {OUTPUT_MODEL_PATH}", file=sys.stderr) +_ts("après sauvegarde modèle") # ── 6. Statistiques pour MLflow et détection de drift ───────────────────────── @@ -162,16 +255,20 @@ if _MLFLOW_AVAILABLE and MLFLOW_TRACKING_URI: try: - if MLFLOW_TRACKING_USERNAME: - os.environ["MLFLOW_TRACKING_USERNAME"] = MLFLOW_TRACKING_USERNAME - os.environ["MLFLOW_TRACKING_PASSWORD"] = MLFLOW_TRACKING_PASSWORD - + _ts("MLflow — set_tracking_uri") mlflow.set_tracking_uri(MLFLOW_TRACKING_URI) + + _ts("MLflow — set_experiment (appel réseau #1)") mlflow.set_experiment(f"predictml/{MODEL_NAME}") + _ts("MLflow — set_experiment OK") run_name = f"{MODEL_NAME}_{TRAIN_START_DATE}_{TRAIN_END_DATE}_gb" + _ts("MLflow — start_run (appel réseau #2)") with mlflow.start_run(run_name=run_name) as run: - # Params — hyperparamètres + contexte temporel + _ts("MLflow — start_run OK") + + # Params — un seul appel réseau + _ts("MLflow — log_params (appel réseau #3)") mlflow.log_params({ "algorithm": "GradientBoosting", "n_estimators": HYPERPARAMS["n_estimators"], @@ -183,24 +280,28 @@ "n_samples_total": n_samples, "test_size": 0.2, }) - - # Métriques scalaires - mlflow.log_metric("accuracy", acc) - mlflow.log_metric("f1_score", f1) - mlflow.log_metric("n_rows_train", float(len(X_train))) - mlflow.log_metric("n_rows_test", float(len(X_test))) - - # Métriques par feature (mean, std, min, max, null_rate) + _ts("MLflow — log_params OK") + + # Toutes les métriques en un seul appel (au lieu de N appels individuels) + all_metrics: dict[str, float] = { + "accuracy": acc, + "f1_score": f1, + "n_rows_train": float(len(X_train)), + "n_rows_test": float(len(X_test)), + } for feat_name, stats in feature_stats.items(): safe = feat_name.replace(" ", "_").replace("(", "").replace(")", "")[:40] for stat_key, val in stats.items(): - mlflow.log_metric(f"feat_{safe}_{stat_key}", float(val)) - - # Distribution des labels + all_metrics[f"feat_{safe}_{stat_key}"] = float(val) for label, ratio in label_distribution.items(): - mlflow.log_metric(f"label_{label}_ratio", float(ratio)) + all_metrics[f"label_{label}_ratio"] = float(ratio) + + _ts(f"MLflow — log_metrics x{len(all_metrics)} (appel réseau #4)") + mlflow.log_metrics(all_metrics) + _ts("MLflow — log_metrics OK") - # Tags + # Tags — un seul appel réseau + _ts("MLflow — set_tags (appel réseau #5)") mlflow.set_tags({ "model_name": MODEL_NAME, "algorithm": "GradientBoosting", @@ -208,23 +309,36 @@ "n_features": str(len(feature_names)), "n_classes": str(len(iris.target_names)), }) + _ts("MLflow — set_tags OK") # Artifact — modèle sklearn (dégradation gracieuse si MinIO inaccessible) + # Signature explicite : inputs float64 + outputs proba float (évite le warning + # "integer column" causé par predict() qui retourne des classes entières) + _X_example = X_test.astype("float64") + _signature = mlflow.models.infer_signature( + _X_example, + model.predict_proba(_X_example), + ) + _ts("MLflow — log_model début (appel réseau #6 — MinIO)") try: mlflow.sklearn.log_model( model, artifact_path="model", - input_example=X_test.iloc[:3], + signature=_signature, ) + _ts("MLflow — log_model OK") print(f"[{MODEL_NAME}] Artifact modèle loggué dans MLflow.", file=sys.stderr) except Exception as art_exc: + _ts("MLflow — log_model ÉCHEC") print(f"[{MODEL_NAME}] Artifact ignoré (MinIO inaccessible) : {art_exc}", file=sys.stderr) mlflow_run_id = run.info.run_id + _ts("MLflow — run fermé") print(f"[{MODEL_NAME}] Run MLflow créé : {mlflow_run_id}", file=sys.stderr) except Exception as exc: + _ts("MLflow — EXCEPTION (bloc entier abandonné)") print(f"[{MODEL_NAME}] MLflow indisponible — ré-entraînement continue : {exc}", file=sys.stderr) mlflow_run_id = None @@ -244,4 +358,5 @@ if mlflow_run_id: output["mlflow_run_id"] = mlflow_run_id +_ts("fin script") print(json.dumps(output)) diff --git a/documentation/Scripts/upload_iris_model.py b/documentation/Scripts/upload_iris_model.py index c2735ce..52b23ef 100644 --- a/documentation/Scripts/upload_iris_model.py +++ b/documentation/Scripts/upload_iris_model.py @@ -28,10 +28,12 @@ import tempfile import requests +from dotenv import find_dotenv, load_dotenv +load_dotenv(find_dotenv()) # ── Configuration ───────────────────────────────────────────────────────────── -API_URL = os.environ.get("API_URL", "http://localhost:8000") +API_URL = os.environ.get("API_URL", "http://localhost:80") API_TOKEN = os.environ.get("API_TOKEN", os.environ.get("ADMIN_TOKEN", "")) MODEL_NAME = os.environ.get("MODEL_NAME", "iris-classifier") diff --git a/documentation/Scripts/upload_iris_model_GradientBoosting.py b/documentation/Scripts/upload_iris_model_GradientBoosting.py index fa2761d..7c01d9a 100644 --- a/documentation/Scripts/upload_iris_model_GradientBoosting.py +++ b/documentation/Scripts/upload_iris_model_GradientBoosting.py @@ -29,10 +29,12 @@ import tempfile import requests +from dotenv import find_dotenv, load_dotenv +load_dotenv(find_dotenv()) # ── Configuration ───────────────────────────────────────────────────────────── -API_URL = os.environ.get("API_URL", "http://localhost:8000") +API_URL = os.environ.get("API_URL", "http://localhost:80") API_TOKEN = os.environ.get("API_TOKEN", os.environ.get("ADMIN_TOKEN", "")) MODEL_NAME = os.environ.get("MODEL_NAME", "iris-classifier") diff --git a/src/api/models.py b/src/api/models.py index 165f40f..482d3e2 100644 --- a/src/api/models.py +++ b/src/api/models.py @@ -206,6 +206,7 @@ async def _get_leaderboard_drift_status( "enum", "dataclasses", "csv", + "dotenv" } @@ -251,7 +252,7 @@ def _validate_train_script(source: str) -> Optional[str]: if top not in _ALLOWED_IMPORT_MODULES: return ( f"Import non autorisé : '{alias.name}'. " - f"Modules autorisés : {sorted(_ALLOWED_IMPORT_MODULES)}" + f"Modules autorisés in _ALLOWED_IMPORT_MODULES: {sorted(_ALLOWED_IMPORT_MODULES)}" ) elif isinstance(node, ast.ImportFrom): if node.module: @@ -259,7 +260,7 @@ def _validate_train_script(source: str) -> Optional[str]: if top not in _ALLOWED_IMPORT_MODULES: return ( f"Import non autorisé : '{node.module}'. " - f"Modules autorisés : {sorted(_ALLOWED_IMPORT_MODULES)}" + f"Modules autorisés in _ALLOWED_IMPORT_MODULES : {sorted(_ALLOWED_IMPORT_MODULES)}" ) required_tokens = { diff --git a/src/api/predict.py b/src/api/predict.py index 2dd2079..8ca702a 100644 --- a/src/api/predict.py +++ b/src/api/predict.py @@ -597,8 +597,8 @@ async def get_anomalous_predictions( async def purge_predictions( older_than_days: int = Query( ..., - ge=1, - description="Supprimer les prédictions plus anciennes que N jours (ex: 90)", + ge=0, + description="Supprimer les prédictions plus anciennes que N jours (0 = toutes)", ), model_name: Optional[str] = Query( None, @@ -612,14 +612,13 @@ async def purge_predictions( db: AsyncSession = Depends(get_db), ): """ - Purge les prédictions plus anciennes que N jours (rétention RGPD). + Purge les prédictions et leurs observed_results associés. - - **older_than_days** : seuil de rétention en jours — obligatoire (ex: 90) + - **older_than_days** : seuil de rétention en jours (0 = tout supprimer) - **model_name** : restreindre la purge à un seul modèle (optionnel) - **dry_run** : `true` par défaut — simulation sans suppression. Passer `dry_run=false` pour supprimer réellement. - La réponse inclut un avertissement (`linked_observed_results_count > 0`) si des prédictions - supprimées sont liées à des observed_results (perte de données de performance). + Supprime en cascade les observed_results (ground truth) liés aux prédictions purgées. Accès réservé aux administrateurs. """ diff --git a/src/schemas/prediction.py b/src/schemas/prediction.py index 39175bc..eee4666 100644 --- a/src/schemas/prediction.py +++ b/src/schemas/prediction.py @@ -327,6 +327,10 @@ class PurgeResponse(BaseModel): deleted_count: int = Field( ..., description="Nombre de prédictions supprimées (ou à supprimer en dry_run)" ) + deleted_observed_results_count: int = Field( + 0, + description="Nombre d'observed_results supprimés en cascade (0 en dry_run)", + ) oldest_remaining: Optional[datetime] = Field( None, description="Timestamp de la prédiction la plus ancienne restante après la purge", @@ -335,8 +339,8 @@ class PurgeResponse(BaseModel): linked_observed_results_count: int = Field( ..., description=( - "Nombre de prédictions supprimées liées à des observed_results. " - "Avertissement si > 0 : des données de performance historiques seront perdues." + "Nombre d'observed_results liés aux prédictions à purger " + "(en dry_run : estimation ; après suppression : correspond à deleted_observed_results_count)." ), ) diff --git a/src/services/db_service.py b/src/services/db_service.py index 11e95c2..87f3c03 100644 --- a/src/services/db_service.py +++ b/src/services/db_service.py @@ -1145,14 +1145,15 @@ async def purge_predictions( dry_run: bool = True, ) -> dict: """ - Purge les prédictions plus anciennes que N jours (rétention RGPD). + Purge les prédictions et leurs observed_results associés. + older_than_days=0 supprime tout (pas de seuil temporel). En mode dry_run=True : comptage sans suppression. - En mode dry_run=False : suppression effective et commit. + En mode dry_run=False : suppression des observed_results liés puis des prédictions, puis commit. Returns: - dict avec dry_run, deleted_count, oldest_remaining, - models_affected et linked_observed_results_count. + dict avec dry_run, deleted_count, deleted_observed_results_count, + oldest_remaining, models_affected et linked_observed_results_count. """ cutoff = _utcnow() - timedelta(days=older_than_days) @@ -1170,19 +1171,25 @@ async def purge_predictions( ) models_affected = sorted(row[0] for row in models_result.all()) - # Count predictions linked to observed_results (performance data loss warning) - linked_result = await db.execute( - select(func.count(Prediction.id)) - .join( - ObservedResult, - and_( - Prediction.id_obs == ObservedResult.id_obs, - Prediction.model_name == ObservedResult.model_name, - ), - ) + # Collect (id_obs, model_name) pairs of predictions to purge + pairs_result = await db.execute( + select(Prediction.id_obs, Prediction.model_name) .where(and_(*filters)) + .distinct() ) - linked_count = linked_result.scalar() or 0 + pairs = pairs_result.all() + + # Count observed_results linked to those predictions + linked_count = 0 + if pairs: + or_clauses = [ + and_(ObservedResult.id_obs == id_obs, ObservedResult.model_name == mname) + for id_obs, mname in pairs + ] + linked_result = await db.execute( + select(func.count(ObservedResult.id)).where(or_(*or_clauses)) + ) + linked_count = linked_result.scalar() or 0 # Oldest prediction that will remain after the purge remaining_filters: list = [Prediction.timestamp >= cutoff] @@ -1194,13 +1201,24 @@ async def purge_predictions( ) oldest_remaining = oldest_result.scalar() + deleted_observed_results_count = 0 if not dry_run and deleted_count > 0: + # Delete linked observed_results first, then predictions + if pairs: + or_clauses = [ + and_(ObservedResult.id_obs == id_obs, ObservedResult.model_name == mname) + for id_obs, mname in pairs + ] + obs_result = await db.execute(delete(ObservedResult).where(or_(*or_clauses))) + deleted_observed_results_count = obs_result.rowcount or 0 + await db.execute(delete(Prediction).where(and_(*filters))) await db.commit() return { "dry_run": dry_run, "deleted_count": deleted_count, + "deleted_observed_results_count": deleted_observed_results_count, "oldest_remaining": oldest_remaining, "models_affected": models_affected, "linked_observed_results_count": linked_count, diff --git a/tests/test_predictions_purge.py b/tests/test_predictions_purge.py index c3b6aed..14c0c72 100644 --- a/tests/test_predictions_purge.py +++ b/tests/test_predictions_purge.py @@ -148,12 +148,16 @@ def test_purge_missing_older_than_days_returns_422(): assert response.status_code == 422 -def test_purge_older_than_days_zero_returns_422(): +def test_purge_older_than_days_zero_is_valid(): + """older_than_days=0 est désormais autorisé — purge tout (dry_run par défaut).""" response = client.delete( "/predictions/purge?older_than_days=0", headers={"Authorization": f"Bearer {ADMIN_TOKEN}"}, ) - assert response.status_code == 422 + assert response.status_code == 200 + data = response.json() + assert data["dry_run"] is True + assert "deleted_observed_results_count" in data # ---------------------------------------------------------------------------