diff --git a/.github/workflows/test_toolbox_dsl.yml b/.github/workflows/test_toolbox_dsl.yml index 0946fca0..670e6710 100644 --- a/.github/workflows/test_toolbox_dsl.yml +++ b/.github/workflows/test_toolbox_dsl.yml @@ -29,7 +29,7 @@ jobs: run: | set -o errexit python -m pip install --upgrade pip - python -m pip install pytest pyyaml jinja2 jsonpath_ng + python -m pip install -e .[testing] - name: Run projects/core/tests run: | diff --git a/.gitignore b/.gitignore index 3419d170..96cf6d34 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ htmlcov/ .coverage .coverage.* .cache +.caliper_cache/ nosetests.xml coverage.xml *.cover diff --git a/projects/caliper/README.md b/projects/caliper/README.md index 984062d8..46c17665 100644 --- a/projects/caliper/README.md +++ b/projects/caliper/README.md @@ -1,8 +1,30 @@ -# Caliper (file export) +# Caliper -This package is trimmed to **file artifact export** (MLflow) and **orchestration** helpers used by FORGE project CI (e.g. `projects.skeleton`). +Artifact post-processing: parse labeled test trees, visualize, KPIs, export to OpenSearch / S3 / MLflow. -- CLI: `caliper artifacts export ...` (see `--help`) -- Code: `projects.caliper.engine.file_export` and `projects.caliper.orchestration` +**Specification**: [specs/009-artifact-post-processing/spec.md](../../specs/009-artifact-post-processing/spec.md) -For product requirements, see the FORGE `specs/` tree (not modified here per project policy). +## CLI + +- `--artifacts-dir` (`--base-dir`): root directory of the **test artifact tree** (where `__test_labels__.yaml` lives). Manifest YAML is discovered here unless `--postprocess-config` points elsewhere. +- `--plugin-module` (`--plugin`): dotted Python **import path** for the plugin module (`get_plugin()`), overriding `plugin_module` in the manifest when both are set. + +```bash +caliper --artifacts-dir /path/to/artifacts parse +caliper --plugin-module my_package.caliper_plugin --artifacts-dir /path visualize \ + --output-dir ./out --report-group default +``` + +Install optional backends: `pip install -e '.[caliper]'` + +## Commands + +| Command | Purpose | +|---------|---------| +| `parse` | Traverse, parse, write parse cache | +| `visualize` | Plots + HTML from unified model | +| `kpi generate` / `import` / `export` / `analyze` | Canonical KPI pipeline | +| `artifacts export` | File upload to S3 / MLflow | +| `ai-eval-export` | AI evaluation JSON | + +See [quickstart.md](../../specs/009-artifact-post-processing/quickstart.md) and [plan.md](../../specs/009-artifact-post-processing/plan.md). diff --git a/projects/caliper/cli/main.py b/projects/caliper/cli/main.py index 538b678b..64ec2545 100644 --- a/projects/caliper/cli/main.py +++ b/projects/caliper/cli/main.py @@ -1,31 +1,365 @@ -"""Caliper CLI — file artifact export only (trimmed).""" +"""Caliper CLI.""" from __future__ import annotations import sys from pathlib import Path -from typing import NoReturn +from typing import Any, NoReturn import click import yaml +from projects.caliper.engine.ai_eval import run_ai_eval_export from projects.caliper.engine.file_export.artifacts_export_run import run_artifacts_export from projects.caliper.engine.file_export.mlflow_config import load_mlflow_config_yaml +from projects.caliper.engine.kpi.analyze import run_analyze +from projects.caliper.engine.kpi.generate import run_kpi_generate +from projects.caliper.engine.kpi.import_export import ( + export_kpis_to_index, + import_kpis_snapshot, + load_kpis_jsonl, +) +from projects.caliper.engine.load_plugin import load_plugin +from projects.caliper.engine.parse import run_parse +from projects.caliper.engine.plugin_config import resolve_plugin_module_string +from projects.caliper.engine.visualize import run_visualize + +_ARTIFACTS_DIR_HELP = ( + "Root directory of the test artifact tree (directories containing " + "__test_labels__.yaml). Optional manifest files (e.g. caliper.yaml) are searched here " + "unless --postprocess-config is set." +) +_PLUGIN_MODULE_HELP = ( + "Python import path of the Caliper plugin module (must expose get_plugin()). " + "Names the plugin implementation; overrides plugin_module in the manifest when both " + "are set." +) +_POSTPROCESS_CONFIG_HELP = ( + "Path to the post-processing manifest (YAML). If omitted, conventional filenames " + "are searched under the artifact tree (--artifacts-dir)." +) + + +def _root_obj(ctx: click.Context) -> dict[str, Any]: + while ctx.parent is not None: + ctx = ctx.parent + return ctx.obj def _exit_with_help(ctx: click.Context, message: str, code: int = 1) -> NoReturn: + """Print error line and this command's --help text.""" click.echo(f"Error: {message}\n", err=True) click.echo(ctx.get_help(), err=True) ctx.exit(code) +def _require_artifacts_dir(ctx: click.Context) -> Path: + obj = _root_obj(ctx) + bd = obj.get("base_dir") + if bd is None: + _exit_with_help( + ctx, + "This command requires the test artifact tree root: " + "`--artifacts-dir DIR` or `--base-dir DIR` " + "(before or after the subcommand).", + code=1, + ) + return bd # type: ignore[return-value] + + +def _apply_workspace_cli_overrides( + ctx: click.Context, + *, + artifacts_dir: Path | None, + postprocess_config: Path | None, + plugin_module_override: str | None, +) -> None: + """Merge subcommand-level workspace flags into the root context (group options win if unset).""" + obj = _root_obj(ctx) + if artifacts_dir is not None: + obj["base_dir"] = artifacts_dir + if postprocess_config is not None: + obj["postprocess_config"] = postprocess_config + if plugin_module_override is not None: + obj["plugin_cli"] = plugin_module_override + + +def _workspace_cli_options(cmd: Any) -> Any: + """Repeat global workspace options so they may appear after the subcommand.""" + opts = ( + click.option( + "--artifacts-dir", + "--base-dir", + "artifacts_dir", + type=click.Path(path_type=Path, exists=True), + default=None, + help=( + "Test artifact tree root (same meaning as before COMMAND). " + "Repeat here if you prefer flags after the subcommand." + ), + ), + click.option( + "--postprocess-config", + type=click.Path(path_type=Path, dir_okay=False, exists=True), + default=None, + help=_POSTPROCESS_CONFIG_HELP + " Overrides the global option when set here.", + ), + click.option( + "--plugin-module", + "--plugin", + "plugin_module_override", + metavar="MODULE", + default=None, + help="Plugin import path; same as global --plugin-module / --plugin.", + ), + ) + for opt in reversed(opts): + cmd = opt(cmd) + return cmd + + +def _plugin_tuple(ctx: click.Context) -> tuple[str, Any]: + base_dir = _require_artifacts_dir(ctx) + obj = _root_obj(ctx) + pc: Path | None = obj["postprocess_config"] + cli_p: str | None = obj["plugin_cli"] + try: + mod, _manifest_path = resolve_plugin_module_string( + base_dir=base_dir, + postprocess_config=pc, + cli_plugin=cli_p, + ) + except (ValueError, FileNotFoundError) as e: + _exit_with_help(ctx, str(e), code=1) + try: + plugin = load_plugin(mod) + except RuntimeError as e: + _exit_with_help(ctx, str(e), code=2) + return mod, plugin + + @click.group(context_settings={"help_option_names": ["-h", "--help"]}) -def main() -> None: - """Caliper — file artifact export.""" +@click.option( + "--artifacts-dir", + "--base-dir", + "artifacts_dir", + type=click.Path(path_type=Path, exists=True), + default=None, + help=_ARTIFACTS_DIR_HELP, +) +@click.option( + "--postprocess-config", + type=click.Path(path_type=Path, dir_okay=False, exists=True), + default=None, + help=_POSTPROCESS_CONFIG_HELP, +) +@click.option( + "--plugin-module", + "--plugin", + "plugin_module", + metavar="MODULE", + default=None, + help=_PLUGIN_MODULE_HELP, +) +@click.pass_context +def main( + ctx: click.Context, + artifacts_dir: Path | None, + postprocess_config: Path | None, + plugin_module: str | None, +) -> None: + """Caliper — artifact post-processing.""" + ctx.ensure_object(dict) + ctx.obj["base_dir"] = artifacts_dir + ctx.obj["postprocess_config"] = postprocess_config + ctx.obj["plugin_cli"] = plugin_module + + +@main.command("parse") +@_workspace_cli_options +@click.option("--no-cache", is_flag=True, help="Force full parse.") +@click.option( + "--cache-dir", + type=click.Path(path_type=Path), + default=None, + help="Override cache file path.", +) +@click.pass_context +def parse_cmd( + ctx: click.Context, + no_cache: bool, + cache_dir: Path | None, + artifacts_dir: Path | None, + postprocess_config: Path | None, + plugin_module_override: str | None, +) -> None: + _apply_workspace_cli_overrides( + ctx, + artifacts_dir=artifacts_dir, + postprocess_config=postprocess_config, + plugin_module_override=plugin_module_override, + ) + mod, plugin = _plugin_tuple(ctx) + artifact_root: Path = _root_obj(ctx)["base_dir"] + try: + model = run_parse( + base_dir=artifact_root, + plugin_module=mod, + plugin=plugin, + use_cache=not no_cache, + ) + except Exception as e: # noqa: BLE001 + click.echo(f"parse failed: {e}", err=True) + sys.exit(2) + click.echo( + f"Parsed {len(model.unified_result_records)} record(s); cache={model.parse_cache_ref}" + ) + + +@main.command("visualize") +@_workspace_cli_options +@click.option("--reports", default=None, help="Comma-separated report ids.") +@click.option("--report-group", default=None) +@click.option("--visualize-config", type=click.Path(path_type=Path), default=None) +@click.option("--include-label", multiple=True) +@click.option("--exclude-label", multiple=True) +@click.option( + "--output-dir", + type=click.Path(path_type=Path), + required=True, +) +@click.pass_context +def visualize_cmd( + ctx: click.Context, + reports: str | None, + report_group: str | None, + visualize_config: Path | None, + include_label: tuple[str, ...], + exclude_label: tuple[str, ...], + output_dir: Path, + artifacts_dir: Path | None, + postprocess_config: Path | None, + plugin_module_override: str | None, +) -> None: + _apply_workspace_cli_overrides( + ctx, + artifacts_dir=artifacts_dir, + postprocess_config=postprocess_config, + plugin_module_override=plugin_module_override, + ) + mod, plugin = _plugin_tuple(ctx) + artifact_root: Path = _root_obj(ctx)["base_dir"] + try: + paths = run_visualize( + base_dir=artifact_root, + plugin_module=mod, + plugin=plugin, + output_dir=output_dir, + reports_csv=reports, + report_group=report_group, + visualize_config_path=visualize_config, + include_pairs=include_label, + exclude_pairs=exclude_label, + use_cache=True, + cache_path=None, + ) + except Exception as e: # noqa: BLE001 + click.echo(f"visualize failed: {e}", err=True) + sys.exit(2) + click.echo("Wrote: " + ", ".join(paths)) + + +@main.group("kpi") +@click.pass_context +def kpi_group(ctx: click.Context) -> None: + """KPI generate/import/export/analyze.""" + + +@kpi_group.command("generate") +@_workspace_cli_options +@click.option("--output", type=click.Path(path_type=Path), required=True) +@click.pass_context +def kpi_generate( + ctx: click.Context, + output: Path, + artifacts_dir: Path | None, + postprocess_config: Path | None, + plugin_module_override: str | None, +) -> None: + _apply_workspace_cli_overrides( + ctx, + artifacts_dir=artifacts_dir, + postprocess_config=postprocess_config, + plugin_module_override=plugin_module_override, + ) + mod, plugin = _plugin_tuple(ctx) + artifact_root: Path = _root_obj(ctx)["base_dir"] + try: + run_kpi_generate( + base_dir=artifact_root, + plugin_module=mod, + plugin=plugin, + output=output, + use_cache=True, + cache_path=None, + ) + except Exception as e: # noqa: BLE001 + click.echo(f"kpi generate failed: {e}", err=True) + sys.exit(2) + click.echo(f"Wrote KPIs to {output}") + + +@kpi_group.command("import") +@click.option("--snapshot", type=click.Path(path_type=Path), required=True) +@click.pass_context +def kpi_import(ctx: click.Context, snapshot: Path) -> None: + try: + import_kpis_snapshot(snapshot_path=snapshot) + except Exception as e: # noqa: BLE001 + click.echo(f"kpi import failed: {e}", err=True) + sys.exit(3) + click.echo(f"Wrote snapshot {snapshot}") + + +@kpi_group.command("export") +@click.option("--input", "input_path", type=click.Path(path_type=Path), required=True) +@click.option("--dry-run", is_flag=True) +@click.pass_context +def kpi_export(ctx: click.Context, input_path: Path, dry_run: bool) -> None: + try: + rows = load_kpis_jsonl(input_path) + if dry_run: + click.echo(f"Would export {len(rows)} records") + return + export_kpis_to_index(rows) + except Exception as e: # noqa: BLE001 + click.echo(f"kpi export failed: {e}", err=True) + sys.exit(3) + click.echo("Export complete") + + +@kpi_group.command("analyze") +@click.option("--current", type=click.Path(path_type=Path), required=True) +@click.option("--baseline", type=click.Path(path_type=Path), required=True) +@click.option("--output", type=click.Path(path_type=Path), required=True) +@click.pass_context +def kpi_analyze( + ctx: click.Context, + current: Path, + baseline: Path, + output: Path, +) -> None: + try: + run_analyze(current_path=current, baseline_path=baseline, output_path=output) + except Exception as e: # noqa: BLE001 + click.echo(f"kpi analyze failed: {e}", err=True) + sys.exit(3) + click.echo(f"Wrote {output}") @main.group("artifacts") -def artifacts_group() -> None: +@click.pass_context +def artifacts_group(ctx: click.Context) -> None: """File artifact export.""" @@ -44,7 +378,7 @@ def artifacts_group() -> None: "mlflow_tracking_uri", default=None, envvar="MLFLOW_TRACKING_URI", - help="MLflow tracking server URI (for mlflow backend).", + help="MLflow tracking server URI (required for mlflow backend unless MLFLOW_TRACKING_URI is set).", ) @click.option("--mlflow-experiment", default=None, envvar="MLFLOW_EXPERIMENT_NAME") @click.option("--mlflow-run-id", default=None, envvar="MLFLOW_RUN_ID") @@ -52,40 +386,53 @@ def artifacts_group() -> None: "--mlflow-run-name", default=None, envvar="CALIPER_MLFLOW_RUN_NAME", - help="Display name for a new MLflow run.", + help=( + "Display name for a new MLflow run (ignored when --mlflow-run-id is set; " + "otherwise MLflow assigns a random name)." + ), ) @click.option( "--mlflow-insecure-tls", is_flag=True, - help="Do not verify TLS for the MLflow tracking server.", + help="Do not verify TLS for the MLflow tracking server (self-signed / private CA). " + "Equivalent to MLFLOW_TRACKING_INSECURE_TLS=true.", ) @click.option( "--mlflow-secrets", "mlflow_secrets_path", type=click.Path(path_type=Path, exists=True), default=None, - help="YAML with credentials: tracking_uri, token or username/password, TLS options.", + help=( + "YAML with credentials only: tracking_uri, token or username/password, TLS options. " + "Keep separate from --mlflow-config. Values apply only for this process." + ), ) @click.option( "--mlflow-config", "mlflow_config_path", type=click.Path(path_type=Path, exists=True), default=None, - help="YAML with non-secret MLflow settings (experiment, run_name, run_id, ...).", + help=( + "YAML with non-secret MLflow settings: tracking_uri (optional), experiment, " + "run_name, run_id. Separate from --mlflow-secrets; secrets file wins on overlapping keys." + ), ) @click.option("--dry-run", is_flag=True) @click.option( "-v", "--verbose", is_flag=True, - help="Print detailed progress on stderr (no secrets).", + help="Print detailed progress and configuration on stderr (no secrets).", ) @click.option( "--status-yaml", "status_yaml_path", type=click.Path(path_type=Path), default=None, - help="Write a YAML summary of export outcomes per backend.", + help=( + "Write a YAML summary of export outcomes, including MLflow run_url and experiment_url " + "when the mlflow backend succeeds." + ), ) @click.option( "--upload-workers", @@ -148,8 +495,45 @@ def artifacts_export( sys.exit(code) +@main.command("ai-eval-export") +@_workspace_cli_options +@click.option("--output", type=click.Path(path_type=Path), required=True) +@click.pass_context +def ai_eval_export( + ctx: click.Context, + output: Path, + artifacts_dir: Path | None, + postprocess_config: Path | None, + plugin_module_override: str | None, +) -> None: + _apply_workspace_cli_overrides( + ctx, + artifacts_dir=artifacts_dir, + postprocess_config=postprocess_config, + plugin_module_override=plugin_module_override, + ) + mod, plugin = _plugin_tuple(ctx) + artifact_root: Path = _root_obj(ctx)["base_dir"] + try: + run_ai_eval_export( + base_dir=artifact_root, + plugin_module=mod, + plugin=plugin, + output=output, + use_cache=True, + cache_path=None, + ) + except Exception as e: # noqa: BLE001 + click.echo(f"ai-eval-export failed: {e}", err=True) + sys.exit(2) + click.echo(f"Wrote {output}") + + def run_cli() -> None: + """Invoke CLI; on missing required options, print subcommand help.""" try: + # standalone_mode=False returns exit codes instead of calling sys.exit; + # propagate them so failures are non-zero (e.g. ctx.exit(1) from _exit_with_help). rv = main.main(standalone_mode=False, prog_name="caliper") if isinstance(rv, int) and rv != 0: sys.exit(rv) diff --git a/projects/caliper/dash_app/kpi_view.py b/projects/caliper/dash_app/kpi_view.py new file mode 100644 index 00000000..e195fc4f --- /dev/null +++ b/projects/caliper/dash_app/kpi_view.py @@ -0,0 +1,42 @@ +"""Minimal Dash read-only KPI table (FR-010 baseline).""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + + +def build_layout(kpi_jsonl_path: Path) -> Any: + """Return Dash layout from a KPI JSONL file (no server run here).""" + from dash import html # noqa: PLC0415 + + rows: list[dict[str, Any]] = [] + if kpi_jsonl_path.is_file(): + for line in kpi_jsonl_path.read_text(encoding="utf-8").splitlines(): + if line.strip(): + rows.append(json.loads(line)) + + table = html.Table( + [html.Tr([html.Th("kpi_id"), html.Th("value"), html.Th("run_id")])] + + [ + html.Tr( + [ + html.Td(r.get("kpi_id", "")), + html.Td(str(r.get("value", ""))), + html.Td(r.get("run_id", "")), + ] + ) + for r in rows[:500] + ] + ) + return html.Div([html.H3("Caliper — KPI snapshot"), table]) + + +def make_app(kpi_jsonl_path: Path) -> Any: + """Construct a Dash app for local use.""" + from dash import Dash # noqa: PLC0415 + + app = Dash(__name__, title="FORGE KPI view") + app.layout = build_layout(kpi_jsonl_path) + return app diff --git a/projects/caliper/engine/__init__.py b/projects/caliper/engine/__init__.py index a757e4a1..fb7b93be 100644 --- a/projects/caliper/engine/__init__.py +++ b/projects/caliper/engine/__init__.py @@ -1 +1 @@ -"""Caliper engine: file export to MLflow, and orchestration helpers.""" +"""Post-processing engine (models, parse, cache, KPIs, file export).""" diff --git a/projects/caliper/engine/ai_eval.py b/projects/caliper/engine/ai_eval.py new file mode 100644 index 00000000..bb9790dc --- /dev/null +++ b/projects/caliper/engine/ai_eval.py @@ -0,0 +1,33 @@ +"""AI agent evaluation JSON (FR-011).""" + +from __future__ import annotations + +from pathlib import Path + +from projects.caliper.engine.parse import run_parse +from projects.caliper.engine.validation import load_schema, schema_path, validate_instance + + +def run_ai_eval_export( + *, + base_dir: Path, + plugin_module: str, + plugin: object, + output: Path, + use_cache: bool, +) -> dict[str, object]: + model = run_parse( + base_dir=base_dir, + plugin_module=plugin_module, + plugin=plugin, + use_cache=use_cache, + ) + build = plugin.build_ai_eval_payload + payload = build(model) + schema = load_schema(schema_path("ai_eval_payload.schema.json")) + validate_instance(payload, schema, "AI eval payload") + import json # noqa: PLC0415 + + output.parent.mkdir(parents=True, exist_ok=True) + output.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8") + return payload diff --git a/projects/caliper/engine/cache.py b/projects/caliper/engine/cache.py new file mode 100644 index 00000000..d8f87719 --- /dev/null +++ b/projects/caliper/engine/cache.py @@ -0,0 +1,180 @@ +"""Parse cache read/write with input fingerprint (FR-016).""" + +from __future__ import annotations + +import hashlib +import json +import os +from pathlib import Path +from typing import Any + +CACHE_SCHEMA_VERSION = "1" + + +def fingerprint_base_dir(base_dir: Path, plugin_module: str) -> str: + """Stable hash over file paths + mtimes under base_dir.""" + base_dir = base_dir.resolve() + entries: list[tuple[str, int, int]] = [] + for root, _dirs, files in os.walk(base_dir): + for name in sorted(files): + p = Path(root) / name + try: + st = p.stat() + except OSError: + continue + rel = str(p.relative_to(base_dir)) + entries.append((rel, int(st.st_mtime_ns), st.st_size)) + entries.sort() + payload = json.dumps( + {"plugin_module": plugin_module, "files": entries}, + sort_keys=True, + ) + return hashlib.sha256(payload.encode()).hexdigest() + + +def fingerprint_test_base(test_base_dir: Path, plugin_module: str) -> str: + """Stable hash over file paths + mtimes under a single test base directory.""" + test_base_dir = test_base_dir.resolve() + entries: list[tuple[str, int, int]] = [] + for root, _dirs, files in os.walk(test_base_dir): + for name in sorted(files): + p = Path(root) / name + try: + st = p.stat() + except OSError: + continue + rel = str(p.relative_to(test_base_dir)) + entries.append((rel, int(st.st_mtime_ns), st.st_size)) + entries.sort() + payload = json.dumps( + {"plugin_module": plugin_module, "test_base": str(test_base_dir), "files": entries}, + sort_keys=True, + ) + return hashlib.sha256(payload.encode()).hexdigest() + + +def default_cache_path(base_dir: Path, plugin_module: str) -> Path: + safe = plugin_module.replace(".", "_") + return base_dir.resolve() / ".caliper_cache" / f"{safe}_v{CACHE_SCHEMA_VERSION}.json" + + +def cache_path_for_test_base(test_base_dir: Path, plugin_module: str) -> Path: + """Generate cache path for a specific test base directory.""" + safe = plugin_module.replace(".", "_") + return test_base_dir.resolve() / ".caliper_cache" / f"{safe}_v{CACHE_SCHEMA_VERSION}.json" + + +def write_test_base_cache( + test_base_dir: Path, + *, + plugin_module: str, + test_base_records: list, + fingerprint: str, +) -> Path: + """Write cache for a single test base.""" + + def rec_to_dict(r: Any) -> dict[str, Any]: + """Convert UnifiedResultRecord to dict.""" + return { + "test_base_path": r.test_base_path, + "distinguishing_labels": r.distinguishing_labels, + "metrics": r.metrics, + "run_identity": r.run_identity, + "parse_notes": r.parse_notes, + } + + cache_path = cache_path_for_test_base(test_base_dir, plugin_module) + doc = { + "schema_version": CACHE_SCHEMA_VERSION, + "plugin_module": plugin_module, + "test_base_dir": str(test_base_dir), + "input_fingerprint": fingerprint, + "records": [rec_to_dict(r) for r in test_base_records], + } + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_text(json.dumps(doc, indent=2), encoding="utf-8") + return cache_path + + +def read_test_base_cache(test_base_dir: Path, plugin_module: str) -> dict[str, Any] | None: + """Read cache for a single test base and convert records back to objects.""" + from projects.caliper.engine.model import UnifiedResultRecord # noqa: PLC0415 + + def dict_to_rec(d: dict[str, Any]) -> UnifiedResultRecord: + """Convert dict back to UnifiedResultRecord.""" + return UnifiedResultRecord( + test_base_path=d["test_base_path"], + distinguishing_labels=d["distinguishing_labels"], + metrics=d["metrics"], + run_identity=d["run_identity"], + parse_notes=d["parse_notes"], + ) + + cache_path = cache_path_for_test_base(test_base_dir, plugin_module) + if not cache_path.is_file(): + return None + + raw_data = json.loads(cache_path.read_text(encoding="utf-8")) + + # Convert record dicts back to UnifiedResultRecord objects + if "records" in raw_data: + raw_data["records"] = [dict_to_rec(r) for r in raw_data["records"]] + + return raw_data + + +def test_base_cache_is_valid( + cached: dict[str, Any], + *, + expected_fingerprint: str, + plugin_module: str, + test_base_dir: Path, +) -> bool: + """Check if test base cache is valid.""" + if cached.get("schema_version") != CACHE_SCHEMA_VERSION: + return False + if cached.get("plugin_module") != plugin_module: + return False + if cached.get("test_base_dir") != str(test_base_dir): + return False + if cached.get("input_fingerprint") != expected_fingerprint: + return False + return True + + +def write_cache( + path: Path, + *, + unified_model_dict: dict[str, Any], + fingerprint: str, + plugin_module: str, +) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + doc = { + "schema_version": CACHE_SCHEMA_VERSION, + "plugin_module": plugin_module, + "input_fingerprint": fingerprint, + "unified_model": unified_model_dict, + } + path.write_text(json.dumps(doc, indent=2), encoding="utf-8") + + +def read_cache(path: Path) -> dict[str, Any] | None: + if not path.is_file(): + return None + return json.loads(path.read_text(encoding="utf-8")) + + +def cache_is_valid( + cached: dict[str, Any], + *, + expected_fingerprint: str, + plugin_module: str, +) -> bool: + if cached.get("schema_version") != CACHE_SCHEMA_VERSION: + return False + if cached.get("plugin_module") != plugin_module: + return False + if cached.get("input_fingerprint") != expected_fingerprint: + return False + return True diff --git a/projects/caliper/engine/kpi/__init__.py b/projects/caliper/engine/kpi/__init__.py new file mode 100644 index 00000000..12f75aec --- /dev/null +++ b/projects/caliper/engine/kpi/__init__.py @@ -0,0 +1 @@ +"""KPI generation, OpenSearch, regression.""" diff --git a/projects/caliper/engine/kpi/analyze.py b/projects/caliper/engine/kpi/analyze.py new file mode 100644 index 00000000..8bb1a8da --- /dev/null +++ b/projects/caliper/engine/kpi/analyze.py @@ -0,0 +1,80 @@ +"""Regression analysis vs baseline KPI set.""" + +from __future__ import annotations + +from typing import Any + +from projects.caliper.engine.kpi.import_export import load_kpis_jsonl +from projects.caliper.engine.kpi.rules import DEFAULT_RULE +from projects.caliper.engine.model import RegressionFinding + + +def _direction_worse( + current: float, + baseline: float, + *, + higher_is_better: bool, +) -> bool: + if higher_is_better: + return current < baseline + return current > baseline + + +def run_analyze( + *, + current_path: Any, + baseline_path: Any, + output_path: Any, +) -> list[RegressionFinding]: + current = load_kpis_jsonl(current_path) + baseline = load_kpis_jsonl(baseline_path) + base_by_id = {b["kpi_id"]: b for b in baseline} + findings: list[RegressionFinding] = [] + rule = DEFAULT_RULE + for c in current: + kid = c["kpi_id"] + if kid not in base_by_id: + continue + b = base_by_id[kid] + try: + cv = float(c["value"]) + bv = float(b["value"]) + except (TypeError, ValueError): + continue + higher = str(c.get("labels", {}).get("higher_is_better", "true")).lower() in ( + "1", + "true", + "yes", + ) + worse = _direction_worse(cv, bv, higher_is_better=higher) + rel = abs(cv - bv) / (abs(bv) + 1e-9) + status = "ok" + if worse and rel > rule.max_relative_regression: + status = "regression" + elif not worse and rel > rule.max_relative_regression: + status = "improvement" + findings.append( + RegressionFinding( + kpi_id=kid, + current_value=cv, + baseline_value=bv, + direction="higher_better" if higher else "lower_better", + status=status, + ) + ) + import json # noqa: PLC0415 + + out = { + "findings": [ + { + "kpi_id": f.kpi_id, + "current_value": f.current_value, + "baseline_value": f.baseline_value, + "direction": f.direction, + "status": f.status, + } + for f in findings + ] + } + output_path.write_text(json.dumps(out, indent=2), encoding="utf-8") + return findings diff --git a/projects/caliper/engine/kpi/catalog.py b/projects/caliper/engine/kpi/catalog.py new file mode 100644 index 00000000..55e41c38 --- /dev/null +++ b/projects/caliper/engine/kpi/catalog.py @@ -0,0 +1,9 @@ +"""Resolve KPI catalog from plugin.""" + +from __future__ import annotations + +from projects.caliper.engine.model import PostProcessingPlugin + + +def get_catalog(plugin: PostProcessingPlugin) -> list[dict[str, object]]: + return plugin.kpi_catalog() diff --git a/projects/caliper/engine/kpi/generate.py b/projects/caliper/engine/kpi/generate.py new file mode 100644 index 00000000..9e63c55f --- /dev/null +++ b/projects/caliper/engine/kpi/generate.py @@ -0,0 +1,36 @@ +"""Emit canonical KPI JSONL.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from projects.caliper.engine.parse import run_parse +from projects.caliper.engine.validation import load_schema, schema_path, validate_instance + + +def run_kpi_generate( + *, + base_dir: Path, + plugin_module: str, + plugin: object, + output: Path | None, + use_cache: bool, + cache_path: Path | None, +) -> list[dict[str, Any]]: + model = run_parse( + base_dir=base_dir, + plugin_module=plugin_module, + plugin=plugin, + use_cache=use_cache, + ) + compute = plugin.compute_kpis + rows: list[dict[str, Any]] = compute(model) + kpi_schema = load_schema(schema_path("kpi_record.schema.json")) + for row in rows: + validate_instance(row, kpi_schema, "KPI record") + text = "\n".join(json.dumps(r, ensure_ascii=False) for r in rows) + ("\n" if rows else "") + if output: + output.write_text(text, encoding="utf-8") + return rows diff --git a/projects/caliper/engine/kpi/import_export.py b/projects/caliper/engine/kpi/import_export.py new file mode 100644 index 00000000..26998283 --- /dev/null +++ b/projects/caliper/engine/kpi/import_export.py @@ -0,0 +1,47 @@ +"""KPI import from / export to OpenSearch.""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any + +from projects.caliper.engine.kpi.opensearch_client import build_client + + +def export_kpis_to_index( + records: list[dict[str, Any]], + *, + index: str | None = None, +) -> None: + client = build_client() + idx = index or os.environ.get("OPENSEARCH_KPI_INDEX", "forge-kpis") + for rec in records: + # Use kpi_id + run_id + timestamp as id + doc_id = f"{rec.get('kpi_id')}-{rec.get('run_id')}-{rec.get('timestamp')}" + client.index(index=idx, id=doc_id, body=rec, refresh=True) + + +def import_kpis_snapshot( + *, + snapshot_path: Path, + index: str | None = None, + max_hits: int = 10_000, +) -> list[dict[str, Any]]: + """Download KPIs from OpenSearch into snapshot file.""" + client = build_client() + idx = index or os.environ.get("OPENSEARCH_KPI_INDEX", "forge-kpis") + body = {"size": max_hits, "query": {"match_all": {}}} + resp = client.search(index=idx, body=body) + hits = [h["_source"] for h in resp.get("hits", {}).get("hits", [])] + snapshot_path.write_text( + "\n".join(json.dumps(h, ensure_ascii=False) for h in hits) + ("\n" if hits else ""), + encoding="utf-8", + ) + return hits + + +def load_kpis_jsonl(path: Path) -> list[dict[str, Any]]: + lines = path.read_text(encoding="utf-8").strip().splitlines() + return [json.loads(line) for line in lines if line.strip()] diff --git a/projects/caliper/engine/kpi/opensearch_client.py b/projects/caliper/engine/kpi/opensearch_client.py new file mode 100644 index 00000000..9d9fa2b7 --- /dev/null +++ b/projects/caliper/engine/kpi/opensearch_client.py @@ -0,0 +1,35 @@ +"""OpenSearch client from environment.""" + +from __future__ import annotations + +import os +from typing import Any + + +def build_client() -> Any: + """Create OpenSearch client; requires opensearch-py and env configuration.""" + try: + from opensearchpy import OpenSearch + except ImportError as e: + raise RuntimeError( + "opensearch-py is required for KPI import/export. " + "Install with: pip install -e '.[caliper]'" + ) from e + + hosts_env = os.environ.get("OPENSEARCH_HOSTS", "localhost:9200") + host_list: list[dict[str, int | str]] = [] + for h in hosts_env.split(","): + h = h.strip() + if ":" in h: + host, port_s = h.rsplit(":", 1) + host_list.append({"host": host, "port": int(port_s)}) + else: + host_list.append({"host": h, "port": 9200}) + + return OpenSearch( + hosts=host_list, + http_compress=True, + use_ssl=os.environ.get("OPENSEARCH_USE_SSL", "").lower() in ("1", "true", "yes"), + verify_certs=os.environ.get("OPENSEARCH_VERIFY_CERTS", "true").lower() + not in ("0", "false", "no"), + ) diff --git a/projects/caliper/engine/kpi/rules.py b/projects/caliper/engine/kpi/rules.py new file mode 100644 index 00000000..58fa227e --- /dev/null +++ b/projects/caliper/engine/kpi/rules.py @@ -0,0 +1,15 @@ +"""Default regression rules (threshold relative delta).""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class RegressionRule: + """Simple relative threshold: flag if change exceeds fraction.""" + + max_relative_regression: float = 0.1 # 10% worse + + +DEFAULT_RULE = RegressionRule() diff --git a/projects/caliper/engine/label_filters.py b/projects/caliper/engine/label_filters.py new file mode 100644 index 00000000..96b8c45a --- /dev/null +++ b/projects/caliper/engine/label_filters.py @@ -0,0 +1,49 @@ +"""Include/exclude filters on distinguishing labels.""" + +from __future__ import annotations + +from typing import Any + +LabelMap = dict[str, Any] + + +def parse_filter_kv(pairs: tuple[str, ...]) -> dict[str, str]: + out: dict[str, str] = {} + for p in pairs: + if "=" not in p: + raise ValueError(f"Invalid filter (expected KEY=VALUE): {p}") + k, v = p.split("=", 1) + out[k.strip()] = v.strip() + return out + + +def matches_filters( + labels: LabelMap, + *, + include: dict[str, str], + exclude: dict[str, str], +) -> bool: + """Exclude wins on conflict; include requires all pairs to match when non-empty.""" + for k, v in exclude.items(): + if labels.get(k) == v: + return False + if not include: + return True + return all(labels.get(k) == v for k, v in include.items()) + + +def filter_records( + records: list[Any], + *, + include: dict[str, str], + exclude: dict[str, str], +) -> list[Any]: + out: list[Any] = [] + for r in records: + if matches_filters( + getattr(r, "distinguishing_labels", {}), + include=include, + exclude=exclude, + ): + out.append(r) + return out diff --git a/projects/caliper/engine/load_plugin.py b/projects/caliper/engine/load_plugin.py new file mode 100644 index 00000000..7804da65 --- /dev/null +++ b/projects/caliper/engine/load_plugin.py @@ -0,0 +1,40 @@ +"""Load plugin module and validate required callables.""" + +from __future__ import annotations + +import importlib +from typing import Any, cast + +from projects.caliper.engine.model import PostProcessingPlugin + + +def load_plugin(module_path: str) -> PostProcessingPlugin: + """ + FR-014: import plugin module; fail with actionable error. + + Convention: module exposes ``get_plugin() -> PostProcessingPlugin`` or + attribute ``plugin`` implementing the protocol. + """ + try: + mod = importlib.import_module(module_path) + except ModuleNotFoundError as e: + raise RuntimeError( + f"Cannot import plugin module {module_path!r}: {e}. " + "Check PYTHONPATH and package installation." + ) from e + except Exception as e: # noqa: BLE001 — surface import-time errors + raise RuntimeError(f"Failed loading plugin module {module_path!r}: {e}") from e + + plugin: Any = None + if hasattr(mod, "get_plugin"): + plugin = mod.get_plugin() + elif hasattr(mod, "plugin"): + plugin = mod.plugin + else: + raise RuntimeError( + f"Plugin module {module_path!r} must define get_plugin() or 'plugin' " + "returning a PostProcessingPlugin implementation." + ) + if plugin is None: + raise RuntimeError(f"Plugin module {module_path!r} returned no plugin.") + return cast(PostProcessingPlugin, plugin) diff --git a/projects/caliper/engine/model.py b/projects/caliper/engine/model.py index d725f639..eddb8e37 100644 --- a/projects/caliper/engine/model.py +++ b/projects/caliper/engine/model.py @@ -1,12 +1,53 @@ -"""Types shared by the file export pipeline (artifact uploads).""" +"""Unified run model and plugin protocol for post-processing.""" from __future__ import annotations +from abc import ABC, abstractmethod from dataclasses import dataclass, field from pathlib import Path from typing import Any +@dataclass +class TestBaseNode: + """Directory containing __test_labels__.yaml.""" + + directory: Path + labels: dict[str, Any] + artifact_paths: list[Path] = field(default_factory=list) + + +@dataclass +class UnifiedResultRecord: + """One parsed facet / test result with distinguishing labels.""" + + test_base_path: str + distinguishing_labels: dict[str, Any] + metrics: dict[str, Any] + run_identity: dict[str, Any] = field(default_factory=dict) + parse_notes: list[str] = field(default_factory=list) + + +@dataclass +class ParseResult: + """Output of plugin parse pass.""" + + records: list[UnifiedResultRecord] + warnings: list[str] = field(default_factory=list) + + +@dataclass +class UnifiedRunModel: + """Reloadable unified representation after parsing.""" + + plugin_module: str + base_directory: str + test_nodes: list[TestBaseNode] + unified_result_records: list[UnifiedResultRecord] + parse_cache_ref: str | None = None + schema_version: str = "1" + + @dataclass class FileExportManifest: """Files to upload to external backends.""" @@ -18,9 +59,51 @@ class FileExportManifest: @dataclass class FileExportBackendResult: - """Per-backend export outcome.""" + """Per-backend outcome.""" backend: str status: str # success | failure | skipped detail: str = "" - metadata: dict[str, Any] | None = field(default=None) + metadata: dict[str, Any] | None = None + + +@dataclass +class RegressionFinding: + """Single KPI regression outcome.""" + + kpi_id: str + current_value: Any + baseline_value: Any + direction: str + status: str + + +class PostProcessingPlugin(ABC): + """Project plugin: parse required; other hooks optional with defaults.""" + + @abstractmethod + def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult: + """Parse each labeled test base into unified records.""" + + def visualize( + self, + model: UnifiedRunModel, + output_dir: Path, + report_ids: list[str] | None, + group_id: str | None, + visualize_config: dict[str, Any] | None, + ) -> list[str]: + """Write plots/HTML; return list of created file paths.""" + return [] + + def kpi_catalog(self) -> list[dict[str, Any]]: + """KPI definitions (id, name, unit, higher_is_better).""" + return [] + + def compute_kpis(self, model: UnifiedRunModel) -> list[dict[str, Any]]: + """Return canonical-shaped KPI dicts (pre-validated).""" + return [] + + def build_ai_eval_payload(self, model: UnifiedRunModel) -> dict[str, Any]: + """Structured JSON for AI agent evaluation.""" + return {"schema_version": "1", "run_id": "", "metrics": {}} diff --git a/projects/caliper/engine/parse.py b/projects/caliper/engine/parse.py new file mode 100644 index 00000000..a8b11e21 --- /dev/null +++ b/projects/caliper/engine/parse.py @@ -0,0 +1,98 @@ +"""Parse orchestration: traverse → plugin → unified model → cache.""" + +from __future__ import annotations + +from pathlib import Path + +from projects.caliper.engine.cache import ( + cache_path_for_test_base, + fingerprint_test_base, + read_test_base_cache, + test_base_cache_is_valid, + write_test_base_cache, +) +from projects.caliper.engine.model import UnifiedRunModel +from projects.caliper.engine.traverse import discover_test_bases + +# Validation functions no longer needed - using per-test-base caching + + +def run_parse( + *, + base_dir: Path, + plugin_module: str, + plugin: object, + use_cache: bool, + force_report_partial: bool = True, +) -> UnifiedRunModel: + """ + Run full parse or load valid cache. + + plugin must implement parse(base_dir, nodes). + """ + base_dir = base_dir.resolve() + + # Discover test bases + nodes = discover_test_bases(base_dir) + + # Always use per-test-base caching + all_records = [] + cache_refs = [] + all_warnings = [] + + parse_fn = plugin.parse + + for node in nodes: + test_base_dir = node.directory + cache_file = cache_path_for_test_base(test_base_dir, plugin_module) + fp = fingerprint_test_base(test_base_dir, plugin_module) + + # Try to load from cache + cached_records = None + if use_cache: + raw = read_test_base_cache(test_base_dir, plugin_module) + if raw is not None and test_base_cache_is_valid( + raw, + expected_fingerprint=fp, + plugin_module=plugin_module, + test_base_dir=test_base_dir, + ): + cached_records = raw["records"] + + if cached_records is not None: + # Use cached records + all_records.extend(cached_records) + cache_refs.append(str(cache_file)) + else: + # Parse this test base + result = parse_fn(base_dir, [node]) # Parse just this node + records = result.records + warnings = getattr(result, "warnings", []) + + all_records.extend(records) + all_warnings.extend(warnings) + + # Write cache for this test base + cache_file = write_test_base_cache( + test_base_dir, + plugin_module=plugin_module, + test_base_records=records, + fingerprint=fp, + ) + cache_refs.append(str(cache_file)) + + # Create unified model with all records + cache_ref_summary = f"per-test-base: {len(cache_refs)} cache files" + model = UnifiedRunModel( + plugin_module=plugin_module, + base_directory=str(base_dir), + test_nodes=nodes, + unified_result_records=all_records, + parse_cache_ref=cache_ref_summary, + ) + + if all_warnings and force_report_partial: + for w in all_warnings: + print(f"[parse warning] {w}") # noqa: T201 — CLI feedback + + return model diff --git a/projects/caliper/engine/plugin_config.py b/projects/caliper/engine/plugin_config.py new file mode 100644 index 00000000..d5ee94cb --- /dev/null +++ b/projects/caliper/engine/plugin_config.py @@ -0,0 +1,77 @@ +"""Locate post-processing manifest and resolve plugin module.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import yaml + +MANIFEST_FILENAMES = ("caliper.yaml", "forge-postprocess.yaml", "postprocess.yaml") + +# Keep in sync with CLI flags in projects/caliper/cli/main.py +_CLI_ARTIFACT_TREE = "`--artifacts-dir` / `--base-dir`" +_CLI_PLUGIN_MODULE = "`--plugin-module` / `--plugin`" + + +def load_manifest_file(path: Path) -> dict[str, Any]: + """Load YAML manifest; raise ValueError on invalid content.""" + raw = path.read_text(encoding="utf-8") + data = yaml.safe_load(raw) + if not isinstance(data, dict): + raise ValueError(f"Manifest must be a mapping at top level: {path}") + return data + + +def _find_manifest_under_base(base_dir: Path) -> Path | None: + base_dir = base_dir.resolve() + for name in MANIFEST_FILENAMES: + candidate = base_dir / name + if candidate.is_file(): + return candidate + return None + + +def resolve_manifest_path( + base_dir: Path, + postprocess_config: Path | None, +) -> tuple[Path | None, dict[str, Any] | None]: + """Return (path, data) if a manifest was found/loaded.""" + if postprocess_config is not None: + p = postprocess_config.resolve() + if not p.is_file(): + raise FileNotFoundError(f"Post-processing manifest not found: {p}") + return p, load_manifest_file(p) + found = _find_manifest_under_base(base_dir) + if found is None: + return None, None + return found, load_manifest_file(found) + + +def resolve_plugin_module_string( + *, + base_dir: Path, + postprocess_config: Path | None, + cli_plugin: str | None, +) -> tuple[str, Path | None]: + """ + FR-002 resolution: CLI plugin module flag overrides manifest ``plugin_module``. + Returns (module_string, manifest_path_or_none). + """ + manifest_path, data = resolve_manifest_path(base_dir, postprocess_config) + if cli_plugin: + return cli_plugin.strip(), manifest_path + if data is None: + raise ValueError( + "No plugin module: set plugin_module in " + f"{'/'.join(MANIFEST_FILENAMES)} under {_CLI_ARTIFACT_TREE}, " + f"or pass {_CLI_PLUGIN_MODULE}, " + "or use --postprocess-config PATH with a manifest that declares plugin_module." + ) + mod = data.get("plugin_module") + if not mod or not isinstance(mod, str): + raise ValueError( + f"Manifest {manifest_path} must declare a non-empty string 'plugin_module', " + f"or pass {_CLI_PLUGIN_MODULE}." + ) + return mod.strip(), manifest_path diff --git a/projects/caliper/engine/traverse.py b/projects/caliper/engine/traverse.py new file mode 100644 index 00000000..088232f0 --- /dev/null +++ b/projects/caliper/engine/traverse.py @@ -0,0 +1,54 @@ +"""Discover test base directories via __test_labels__.yaml.""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any + +import yaml + +from projects.caliper.engine.model import TestBaseNode + +MARKER = "__test_labels__.yaml" + + +def discover_test_bases(base_dir: Path) -> list[TestBaseNode]: + """Walk base_dir; each directory containing MARKER becomes a TestBaseNode.""" + base_dir = base_dir.resolve() + if not base_dir.is_dir(): + raise FileNotFoundError(f"Base directory does not exist: {base_dir}") + + nodes: list[TestBaseNode] = [] + for dirpath, _dirnames, filenames in os.walk(base_dir, topdown=True): + if MARKER not in filenames: + continue + path = Path(dirpath) + marker_path = path / MARKER + labels = _load_labels(marker_path) + nodes.append( + TestBaseNode( + directory=path, + labels=labels, + artifact_paths=_list_files_under(path, exclude_marker=True), + ) + ) + return sorted(nodes, key=lambda n: str(n.directory)) + + +def _load_labels(path: Path) -> dict[str, Any]: + raw = path.read_text(encoding="utf-8") + data = yaml.safe_load(raw) + if data is None: + return {} + if not isinstance(data, dict): + raise ValueError(f"Invalid {MARKER}: top level must be a mapping: {path}") + return data + + +def _list_files_under(dirpath: Path, *, exclude_marker: bool) -> list[Path]: + out: list[Path] = [] + for p in sorted(dirpath.rglob("*")): + if p.is_file() and (not exclude_marker or p.name != MARKER): + out.append(p) + return out diff --git a/projects/caliper/engine/validation.py b/projects/caliper/engine/validation.py new file mode 100644 index 00000000..9b4e8623 --- /dev/null +++ b/projects/caliper/engine/validation.py @@ -0,0 +1,89 @@ +"""jsonschema validation helpers (FR-012).""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import jsonschema + +from projects.caliper.engine.model import UnifiedRunModel + + +def load_schema(path: Path) -> dict[str, Any]: + return json.loads(path.read_text(encoding="utf-8")) + + +def validate_instance(instance: Any, schema: dict[str, Any], what: str) -> None: + try: + jsonschema.validate(instance=instance, schema=schema) + except jsonschema.ValidationError as e: + raise ValueError(f"{what} validation failed: {e.message}") from e + + +def schema_path(package_relative: str) -> Path: + """Path under schemas/ next to this package.""" + here = Path(__file__).resolve().parent.parent + return here / "schemas" / package_relative + + +def model_to_jsonable(model: UnifiedRunModel) -> dict[str, Any]: + """Serialize unified model for cache (simplified).""" + + def node_to_dict(n: Any) -> dict[str, Any]: + return { + "directory": str(n.directory), + "labels": n.labels, + "artifact_paths": [str(p) for p in n.artifact_paths], + } + + def rec_to_dict(r: Any) -> dict[str, Any]: + return { + "test_base_path": r.test_base_path, + "distinguishing_labels": r.distinguishing_labels, + "metrics": r.metrics, + "run_identity": r.run_identity, + "parse_notes": r.parse_notes, + } + + return { + "plugin_module": model.plugin_module, + "base_directory": model.base_directory, + "test_nodes": [node_to_dict(n) for n in model.test_nodes], + "unified_result_records": [rec_to_dict(r) for r in model.unified_result_records], + "schema_version": model.schema_version, + } + + +def model_from_jsonable(data: dict[str, Any]) -> UnifiedRunModel: + from projects.caliper.engine.model import ( # noqa: PLC0415 + TestBaseNode, + UnifiedResultRecord, + ) + + nodes = [ + TestBaseNode( + directory=Path(n["directory"]), + labels=n["labels"], + artifact_paths=[Path(p) for p in n.get("artifact_paths", [])], + ) + for n in data["test_nodes"] + ] + records = [ + UnifiedResultRecord( + test_base_path=r["test_base_path"], + distinguishing_labels=r["distinguishing_labels"], + metrics=r["metrics"], + run_identity=r.get("run_identity", {}), + parse_notes=r.get("parse_notes", []), + ) + for r in data["unified_result_records"] + ] + return UnifiedRunModel( + plugin_module=data["plugin_module"], + base_directory=data["base_directory"], + test_nodes=nodes, + unified_result_records=records, + schema_version=data.get("schema_version", "1"), + ) diff --git a/projects/caliper/engine/visualize.py b/projects/caliper/engine/visualize.py new file mode 100644 index 00000000..8b9c4405 --- /dev/null +++ b/projects/caliper/engine/visualize.py @@ -0,0 +1,101 @@ +"""Visualization orchestration.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import yaml + +from projects.caliper.engine.label_filters import filter_records, parse_filter_kv +from projects.caliper.engine.parse import run_parse + + +def resolve_visualize_config( + base_dir: Path, + explicit_path: Path | None, +) -> dict[str, Any] | None: + + def _load(p: Path) -> dict[str, Any] | None: + data = yaml.safe_load(p.read_text(encoding="utf-8")) + if data is None: + return None + if not isinstance(data, dict): + raise ValueError(f"Visualize config must be a mapping at top level: {p}") + return data + + if explicit_path is not None: + p = explicit_path.resolve() + if not p.is_file(): + raise FileNotFoundError(f"Visualize config not found: {p}") + return _load(p) + + for name in ("visualize-groups.yaml", "visualize-groups.yml"): + cand = base_dir / name + if cand.is_file(): + return _load(cand) + return None + + +def resolve_report_ids( + *, + reports_csv: str | None, + report_group: str | None, + config: dict[str, Any] | None, +) -> list[str]: + if reports_csv: + return [x.strip() for x in reports_csv.split(",") if x.strip()] + if report_group and config: + groups = config.get("groups", {}) + if report_group not in groups: + raise ValueError(f"Unknown report group: {report_group!r}") + g = groups[report_group] + if isinstance(g, list): + return [str(x) for x in g] + if isinstance(g, str): + return [g] + raise ValueError("Provide --reports or --report-group with a valid visualize config.") + + +def run_visualize( + *, + base_dir: Path, + plugin_module: str, + plugin: object, + output_dir: Path, + reports_csv: str | None, + report_group: str | None, + visualize_config_path: Path | None, + include_pairs: tuple[str, ...], + exclude_pairs: tuple[str, ...], + use_cache: bool, + cache_path: Path | None, +) -> list[str]: + model = run_parse( + base_dir=base_dir, + plugin_module=plugin_module, + plugin=plugin, + use_cache=use_cache, + ) + inc = parse_filter_kv(include_pairs) + exc = parse_filter_kv(exclude_pairs) + model.unified_result_records = filter_records( + model.unified_result_records, + include=inc, + exclude=exc, + ) + cfg = resolve_visualize_config(base_dir, visualize_config_path) + ids = resolve_report_ids( + reports_csv=reports_csv, + report_group=report_group, + config=cfg, + ) + output_dir.mkdir(parents=True, exist_ok=True) + viz = plugin.visualize + return viz( + model, + output_dir, + ids, + report_group, + cfg, + ) diff --git a/projects/caliper/orchestration/postprocess.py b/projects/caliper/orchestration/postprocess.py new file mode 100644 index 00000000..43ee2909 --- /dev/null +++ b/projects/caliper/orchestration/postprocess.py @@ -0,0 +1,292 @@ +""" +Config-driven Caliper parse / visualize for FORGE orchestration. + +KPI generation, export, and regression analyze are reserved in config but **not** implemented +here; step entries document ``skipped`` stubs so callers keep a stable ``steps`` shape. + +Computes ``final_status`` from the FORGE test phase outcome plus parse/visualize results. +""" + +from __future__ import annotations + +import logging +import traceback +from pathlib import Path +from typing import Any + +from pydantic import ValidationError + +from projects.caliper.engine.load_plugin import load_plugin +from projects.caliper.engine.parse import run_parse +from projects.caliper.engine.plugin_config import resolve_plugin_module_string +from projects.caliper.engine.visualize import run_visualize +from projects.caliper.orchestration.postprocess_config import ( + CaliperOrchestrationPostprocessConfig, +) +from projects.caliper.orchestration.postprocess_outcome import ( + TestPhaseOutcome, + compute_final_postprocess_status, +) + +logger = logging.getLogger(__name__) + +_STUB_REASON_KPI = ( + "orchestration stub: KPI steps are not wired here (use Caliper CLI or extend orchestration)." +) +_STUB_REASON_ANALYZE = "orchestration stub: regression analyze is not wired here (use Caliper CLI or extend orchestration)." + + +def _resolve_paths( + postprocess_config: CaliperOrchestrationPostprocessConfig, + *, + artifacts_dir: Path, +) -> tuple[Path, Path | None, Path | None]: + manifest_path = ( + Path(postprocess_config.postprocess_config).expanduser().resolve() + if postprocess_config.postprocess_config + else None + ) + # Always use default cache behavior - store cache files with each test result + cache_path = None + return artifacts_dir.resolve(), manifest_path, cache_path + + +def _resolve_visualize_output_dir( + raw: str | None, +) -> Path: + if raw is None or not str(raw).strip(): + raise ValueError( + "caliper.postprocess.visualize.output_dir is required when no explicit visualize_output_directory is provided" + ) + p = Path(raw).expanduser() + if p.is_absolute(): + return p.resolve() + raise ValueError("caliper.postprocess.visualize.output_dir must be an absolute path") + + +def _resolve_visualize_config_path( + raw: str | None, + *, + artifact_tree: Path, +) -> Path | None: + if raw is None or not str(raw).strip(): + return None + p = Path(raw).expanduser() + if p.is_absolute(): + return p.resolve() + return (artifact_tree / p).resolve() + + +def _load_plugin( + postprocess_config: CaliperOrchestrationPostprocessConfig, + *, + tree_root: Path, + manifest_path: Path | None, +) -> tuple[str, object]: + mod_str, _manifest = resolve_plugin_module_string( + base_dir=tree_root, + postprocess_config=manifest_path, + cli_plugin=postprocess_config.plugin_module, + ) + return mod_str, load_plugin(mod_str) + + +def _stub_kpi_generate(postprocess_config: CaliperOrchestrationPostprocessConfig) -> dict[str, Any]: + if not postprocess_config.kpi.enabled: + return {"status": "skipped", "reason": "kpi disabled"} + if not postprocess_config.kpi.generate.enabled: + return {"status": "skipped", "reason": "kpi.generate disabled"} + return {"status": "skipped", "reason": _STUB_REASON_KPI} + + +def _stub_kpi_export(postprocess_config: CaliperOrchestrationPostprocessConfig) -> dict[str, Any]: + if not postprocess_config.kpi.enabled: + return {"status": "skipped", "reason": "kpi disabled"} + if not postprocess_config.kpi.export.enabled: + return {"status": "skipped", "reason": "kpi.export disabled"} + return {"status": "skipped", "reason": _STUB_REASON_KPI} + + +def _stub_analyze(postprocess_config: CaliperOrchestrationPostprocessConfig) -> dict[str, Any]: + if not postprocess_config.analyze.enabled: + return {"status": "skipped", "reason": "analyze disabled"} + return {"status": "skipped", "reason": _STUB_REASON_ANALYZE} + + +def run_postprocess_from_orchestration_config( + postprocess_config_raw: dict[str, Any] | None, + *, + artifacts_dir: Path, + visualize_output_dir: Path | None = None, + test_outcome: TestPhaseOutcome | None = None, +) -> dict[str, Any]: + """ + Run enabled parse / visualize steps and compute ``final_status``. + + KPI and analyze sections only emit stub ``steps`` entries (never failures). + + Parse/visualize use ``artifacts_dir`` and ``visualize_output_dir``. + """ + outcome = test_outcome or TestPhaseOutcome("NOT_AVAILABLE") + steps: dict[str, Any] = {} + parse_failed = False + visualize_failed = False + + try: + postprocess_config = CaliperOrchestrationPostprocessConfig.model_validate( + postprocess_config_raw or {} + ) + except ValidationError as e: + logger.error("Invalid caliper postprocess config: %s", e) + raise + + test_block = {"phase": outcome.phase, "message": outcome.message} + + if not postprocess_config.enabled: + logger.info("caliper.postprocess.enabled is false — skipping post-processing steps") + final = compute_final_postprocess_status( + test_outcome=outcome, + parse_failed=False, + visualize_failed=False, + kpi_generate_failed=False, + kpi_export_failed=False, + analyze_failed=False, + has_regression=False, + has_improvement=False, + ) + return { + "final_status": final, + "test_phase": test_block, + "steps": {}, + } + + tree_root, manifest_path, cache_path = _resolve_paths( + postprocess_config, artifacts_dir=artifacts_dir + ) + + def _run_parse() -> dict[str, Any]: + nonlocal parse_failed + if not postprocess_config.parse.enabled: + return {"status": "skipped", "reason": "parse disabled"} + try: + mod_str, plugin = _load_plugin( + postprocess_config, tree_root=tree_root, manifest_path=manifest_path + ) + model = run_parse( + base_dir=tree_root, + plugin_module=mod_str, + plugin=plugin, + use_cache=not postprocess_config.parse.no_cache, + ) + return { + "status": "ok", + "plugin_module": mod_str, + "record_count": len(model.unified_result_records), + "parse_cache_ref": model.parse_cache_ref, + } + except Exception as e: # noqa: BLE001 + parse_failed = True + logger.exception("Caliper parse failed") + return {"status": "failure", "detail": str(e), "traceback": traceback.format_exc()} + + def _run_visualize() -> dict[str, Any]: + nonlocal visualize_failed + if not postprocess_config.visualize.enabled: + return {"status": "skipped", "reason": "visualize disabled"} + try: + mod_str, plugin = _load_plugin( + postprocess_config, tree_root=tree_root, manifest_path=manifest_path + ) + viz_cfg_path = _resolve_visualize_config_path( + postprocess_config.visualize.visualize_config, + artifact_tree=tree_root, + ) + if visualize_output_dir is not None: + output_dir = visualize_output_dir.expanduser().resolve() + else: + output_dir = _resolve_visualize_output_dir( + postprocess_config.visualize.output_dir, + ) + paths = run_visualize( + base_dir=tree_root, + plugin_module=mod_str, + plugin=plugin, + output_dir=output_dir, + reports_csv=postprocess_config.visualize.reports, + report_group=postprocess_config.visualize.report_group, + visualize_config_path=viz_cfg_path, + include_pairs=tuple(postprocess_config.visualize.include_labels), + exclude_pairs=tuple(postprocess_config.visualize.exclude_labels), + use_cache=not postprocess_config.parse.no_cache, + cache_path=cache_path, + ) + # Convert paths to relative paths from output_dir + relative_paths = [] + for path in paths: + try: + path_obj = Path(path) + relative_path = path_obj.relative_to(output_dir) + relative_paths.append(str(relative_path)) + except ValueError: + # If path is not under output_dir, keep as-is + relative_paths.append(str(path)) + + return { + "status": "ok", + "plugin_module": mod_str, + "output_dir": str(output_dir), + "paths": relative_paths, + } + except Exception as e: # noqa: BLE001 + visualize_failed = True + logger.exception("Caliper visualize failed") + return {"status": "failure", "detail": str(e), "traceback": traceback.format_exc()} + + any_step = ( + postprocess_config.parse.enabled + or postprocess_config.visualize.enabled + or postprocess_config.kpi.enabled + or postprocess_config.analyze.enabled + ) + if not any_step: + logger.info("caliper.postprocess: no parse/visualize/kpi/analyze steps enabled") + final = compute_final_postprocess_status( + test_outcome=outcome, + parse_failed=False, + visualize_failed=False, + kpi_generate_failed=False, + kpi_export_failed=False, + analyze_failed=False, + has_regression=False, + has_improvement=False, + ) + return {"final_status": final, "test_phase": test_block, "steps": {}} + + if postprocess_config.parse.enabled: + steps["parse"] = _run_parse() + if postprocess_config.visualize.enabled: + steps["visualize"] = _run_visualize() + + if postprocess_config.kpi.enabled: + steps["kpi_generate"] = _stub_kpi_generate(postprocess_config) + steps["kpi_export"] = _stub_kpi_export(postprocess_config) + + if postprocess_config.analyze.enabled: + steps["analyze"] = _stub_analyze(postprocess_config) + + final = compute_final_postprocess_status( + test_outcome=outcome, + parse_failed=parse_failed, + visualize_failed=visualize_failed, + kpi_generate_failed=False, + kpi_export_failed=False, + analyze_failed=False, + has_regression=False, + has_improvement=False, + ) + + return { + "final_status": final, + "test_phase": test_block, + "steps": steps, + } diff --git a/projects/caliper/orchestration/postprocess_config.py b/projects/caliper/orchestration/postprocess_config.py new file mode 100644 index 00000000..5eacd732 --- /dev/null +++ b/projects/caliper/orchestration/postprocess_config.py @@ -0,0 +1,155 @@ +""" +Pydantic models for Caliper parse / visualize / KPI steps driven from ``caliper.postprocess``. +""" + +from __future__ import annotations + +from typing import Self + +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator + + +class CaliperOrchestrationParseSection(BaseModel): + """``caliper.postprocess.parse``.""" + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + no_cache: bool = False + + +class CaliperOrchestrationVisualizeSection(BaseModel): + """``caliper.postprocess.visualize`` — same semantics as ``caliper visualize``.""" + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + output_dir: str | None = Field( + default=None, + description=("Directory for HTML/plots. Must be an absolute path."), + ) + reports: str | None = Field( + default=None, + description="Comma-separated report ids or list of report ids (alternative to report_group).", + ) + report_group: str | None = Field( + default=None, + description="Group id from visualize-groups.yaml under the artifact tree.", + ) + visualize_config: str | None = Field( + default=None, + description="Path to visualize-groups YAML; default search under artifact tree.", + ) + include_labels: list[str] = Field(default_factory=list) + exclude_labels: list[str] = Field(default_factory=list) + + @field_validator("reports", mode="before") + @classmethod + def _convert_reports_list(cls, v): + """Convert list of reports to comma-separated string.""" + if isinstance(v, list): + return ",".join(str(item) for item in v) + return v + + +class CaliperOrchestrationKpiGenerateSection(BaseModel): + """Emit KPI JSONL via plugin ``compute_kpis``.""" + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + output: str | None = Field( + default="kpis.jsonl", + description="Filename or path; relative paths resolve under the post-processing artifact dir.", + ) + + +class CaliperOrchestrationKpiExportSection(BaseModel): + """Push KPI rows to OpenSearch (requires env/client setup).""" + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + + +class CaliperOrchestrationKpiSection(BaseModel): + """``caliper.postprocess.kpi``.""" + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + generate: CaliperOrchestrationKpiGenerateSection = Field( + default_factory=CaliperOrchestrationKpiGenerateSection + ) + export: CaliperOrchestrationKpiExportSection = Field( + default_factory=CaliperOrchestrationKpiExportSection + ) + + +class CaliperOrchestrationAnalyzeSection(BaseModel): + """``caliper.postprocess.analyze`` — regression vs baseline KPI JSONL.""" + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + baseline: str | None = Field( + default=None, + description="Baseline KPI JSONL path (relative → artifact tree root unless absolute).", + ) + output: str | None = Field( + default="kpi_analyze.json", + description="Written under post-processing artifact dir when relative.", + ) + + @model_validator(mode="after") + def _baseline_when_enabled(self) -> Self: + if self.enabled and not (self.baseline and str(self.baseline).strip()): + raise ValueError( + "caliper.postprocess.analyze.enabled requires non-empty baseline path." + ) + return self + + +class CaliperOrchestrationPostprocessConfig(BaseModel): + """``caliper.postprocess`` — parse, visualize, optional KPI + regression.""" + + model_config = ConfigDict(extra="ignore") + + enabled: bool = Field(True, description="Master switch for the whole post-processing pipeline.") + + artifacts_dir: str | None = Field( + default=None, + description=( + "Root of the Caliper artifact tree; when null, callers typically use " + "ARTIFACT_BASE_DIR or override via CLI." + ), + ) + plugin_module: str | None = Field( + default=None, + description="Plugin import path; overrides manifest plugin_module when set.", + ) + postprocess_config: str | None = Field( + default=None, + description="Explicit path to caliper.yaml manifest.", + ) + parse: CaliperOrchestrationParseSection = Field( + default_factory=CaliperOrchestrationParseSection + ) + visualize: CaliperOrchestrationVisualizeSection = Field( + default_factory=CaliperOrchestrationVisualizeSection + ) + kpi: CaliperOrchestrationKpiSection = Field(default_factory=CaliperOrchestrationKpiSection) + analyze: CaliperOrchestrationAnalyzeSection = Field( + default_factory=CaliperOrchestrationAnalyzeSection + ) + + @model_validator(mode="after") + def _visualize_needs_selector(self) -> Self: + if not self.visualize.enabled: + return self + if not (self.visualize.reports or self.visualize.report_group): + raise ValueError( + "caliper.postprocess.visualize.enabled requires " + "`reports` (comma-separated) or `report_group`." + ) + return self diff --git a/projects/caliper/orchestration/postprocess_outcome.py b/projects/caliper/orchestration/postprocess_outcome.py new file mode 100644 index 00000000..1cdbfe92 --- /dev/null +++ b/projects/caliper/orchestration/postprocess_outcome.py @@ -0,0 +1,61 @@ +"""Aggregated post-processing outcome after FORGE test phase + Caliper steps.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Literal + +# Ordered worst-first for documentation; see compute_final_postprocess_status(). +FINAL_TEST_FAILED = "test_failed" +FINAL_PARSE_VISUALIZE_FAILED = "parse_visualize_failed" +FINAL_KPI_PIPELINE_FAILED = "kpi_pipeline_failed" +FINAL_PERFORMANCE_REGRESSION = "performance_regression" +FINAL_PERFORMANCE_INCREASE = "performance_increase" +FINAL_SUCCESS = "success" + +FinalPostprocessStatus = Literal[ + "success", + "test_failed", + "parse_visualize_failed", + "kpi_pipeline_failed", + "performance_increase", + "performance_regression", +] + + +@dataclass(frozen=True) +class TestPhaseOutcome: + """Result of the orchestration test phase (before Caliper post-processing).""" + + phase: Literal["SUCCESS", "FAILED", "NOT_AVAILABLE"] + message: str | None = None + + +def compute_final_postprocess_status( + *, + test_outcome: TestPhaseOutcome, + parse_failed: bool, + visualize_failed: bool, + kpi_generate_failed: bool, + kpi_export_failed: bool, + analyze_failed: bool, + has_regression: bool, + has_improvement: bool, +) -> FinalPostprocessStatus: + """ + Single authoritative label for CI / dashboards. + + Priority (first match wins): test failure → parse/viz failure → KPI/analyze pipeline failure → + regression → improvement → success. + """ + if test_outcome.phase == "FAILED": + return FINAL_TEST_FAILED + if parse_failed or visualize_failed: + return FINAL_PARSE_VISUALIZE_FAILED + if kpi_generate_failed or kpi_export_failed or analyze_failed: + return FINAL_KPI_PIPELINE_FAILED + if has_regression: + return FINAL_PERFORMANCE_REGRESSION + if has_improvement: + return FINAL_PERFORMANCE_INCREASE + return FINAL_SUCCESS diff --git a/projects/caliper/schemas/ai_eval_payload.schema.json b/projects/caliper/schemas/ai_eval_payload.schema.json new file mode 100644 index 00000000..722e5052 --- /dev/null +++ b/projects/caliper/schemas/ai_eval_payload.schema.json @@ -0,0 +1,15 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "required": ["schema_version", "run_id"], + "properties": { + "schema_version": { "type": "string" }, + "run_id": { "type": "string" }, + "metrics": { "type": "object" }, + "optional": { + "description": "Explicit nulls for missing optional agent metrics", + "type": "object" + } + }, + "additionalProperties": true +} diff --git a/projects/caliper/schemas/kpi_record.schema.json b/projects/caliper/schemas/kpi_record.schema.json new file mode 100644 index 00000000..a502db0c --- /dev/null +++ b/projects/caliper/schemas/kpi_record.schema.json @@ -0,0 +1,31 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "required": [ + "schema_version", + "kpi_id", + "value", + "unit", + "run_id", + "timestamp", + "labels", + "source" + ], + "properties": { + "schema_version": { "type": "string" }, + "kpi_id": { "type": "string" }, + "value": {}, + "unit": { "type": "string" }, + "run_id": { "type": "string" }, + "timestamp": { "type": "string" }, + "labels": { "type": "object" }, + "source": { + "type": "object", + "properties": { + "test_base_path": { "type": "string" }, + "plugin_module": { "type": "string" } + } + } + }, + "additionalProperties": true +} diff --git a/projects/caliper/tests/__init__.py b/projects/caliper/tests/__init__.py new file mode 100644 index 00000000..22c8eaa0 --- /dev/null +++ b/projects/caliper/tests/__init__.py @@ -0,0 +1 @@ +"""Caliper tests and stub plugins.""" diff --git a/projects/caliper/tests/fixtures/minimal_tree/caliper.yaml b/projects/caliper/tests/fixtures/minimal_tree/caliper.yaml new file mode 100644 index 00000000..3a75eba6 --- /dev/null +++ b/projects/caliper/tests/fixtures/minimal_tree/caliper.yaml @@ -0,0 +1 @@ +plugin_module: projects.caliper.tests.stub_plugin diff --git a/projects/caliper/tests/fixtures/minimal_tree/team_a/__test_labels__.yaml b/projects/caliper/tests/fixtures/minimal_tree/team_a/__test_labels__.yaml new file mode 100644 index 00000000..953fa0ed --- /dev/null +++ b/projects/caliper/tests/fixtures/minimal_tree/team_a/__test_labels__.yaml @@ -0,0 +1,4 @@ +version: "1" +labels: + model: stub-model + facet: team-a diff --git a/projects/caliper/tests/fixtures/minimal_tree/team_a/metrics.json b/projects/caliper/tests/fixtures/minimal_tree/team_a/metrics.json new file mode 100644 index 00000000..cf4fe55d --- /dev/null +++ b/projects/caliper/tests/fixtures/minimal_tree/team_a/metrics.json @@ -0,0 +1 @@ +{"throughput": 42.5} diff --git a/projects/caliper/tests/stub_plugin.py b/projects/caliper/tests/stub_plugin.py new file mode 100644 index 00000000..54fd2fb6 --- /dev/null +++ b/projects/caliper/tests/stub_plugin.py @@ -0,0 +1,115 @@ +"""Stub post-processing plugin for tests and demos.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path + +from projects.caliper.engine.model import ( + ParseResult, + PostProcessingPlugin, + TestBaseNode, + UnifiedResultRecord, + UnifiedRunModel, +) + + +class StubPlugin(PostProcessingPlugin): + def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult: + records: list[UnifiedResultRecord] = [] + warnings: list[str] = [] + for node in nodes: + labels = ( + node.labels.get("labels") + if isinstance(node.labels.get("labels"), dict) + else node.labels + ) + raw = {} + for p in node.artifact_paths: + if p.name == "metrics.json": + import json # noqa: PLC0415 + + try: + raw = json.loads(p.read_text(encoding="utf-8")) + except json.JSONDecodeError as e: + warnings.append(f"Malformed JSON {p}: {e}") + raw = {"_error": "partial_parse"} + dist = labels if isinstance(labels, dict) else {} + records.append( + UnifiedResultRecord( + test_base_path=str(node.directory), + distinguishing_labels=dict(dist) if dist else {"facet": "default"}, + metrics=raw or {"throughput": 1.0}, + run_identity={"stub": True}, + parse_notes=[], + ) + ) + return ParseResult(records=records, warnings=warnings) + + def visualize( + self, + model: UnifiedRunModel, + output_dir: Path, + report_ids: list[str] | None, + group_id: str | None, + visualize_config: dict[str, object] | None, + ) -> list[str]: + output_dir.mkdir(parents=True, exist_ok=True) + html_path = output_dir / "report.html" + rid = ",".join(report_ids or []) or "default" + html_path.write_text( + f"
{rid}
", + encoding="utf-8", + ) + return [str(html_path)] + + def kpi_catalog(self) -> list[dict[str, object]]: + return [ + { + "kpi_id": "throughput_rps", + "name": "Throughput", + "unit": "req/s", + "higher_is_better": True, + } + ] + + def compute_kpis(self, model: UnifiedRunModel) -> list[dict[str, object]]: + ts = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + out: list[dict[str, object]] = [] + for r in model.unified_result_records: + m = r.metrics.get("throughput", 0.0) + try: + val = float(m) + except (TypeError, ValueError): + val = 0.0 + out.append( + { + "schema_version": "1", + "kpi_id": "throughput_rps", + "value": val, + "unit": "req/s", + "run_id": r.test_base_path, + "timestamp": ts, + "labels": { + **r.distinguishing_labels, + "higher_is_better": True, + }, + "source": { + "test_base_path": r.test_base_path, + "plugin_module": model.plugin_module, + }, + } + ) + return out + + def build_ai_eval_payload(self, model: UnifiedRunModel) -> dict[str, object]: + return { + "schema_version": "1", + "run_id": model.base_directory, + "metrics": {"records": len(model.unified_result_records)}, + "optional": {}, + } + + +def get_plugin() -> PostProcessingPlugin: + return StubPlugin() diff --git a/projects/core/ci_entrypoint/fournos.py b/projects/core/ci_entrypoint/fournos.py index 278e62f8..96dfb1e4 100644 --- a/projects/core/ci_entrypoint/fournos.py +++ b/projects/core/ci_entrypoint/fournos.py @@ -8,6 +8,7 @@ import logging import os +import shutil from pathlib import Path import yaml @@ -136,11 +137,17 @@ def parse_and_save_pr_arguments_fournos(): metadata_dir.mkdir(parents=True, exist_ok=True) # Load FournosJob YAML - fjob, fjob_spec = load_fjob_yaml(metadata_dir.parent / "fournos_fjob.yaml") + fournos_fjob = metadata_dir.parent / "fournos_fjob.yaml" + fjob, fjob_spec = load_fjob_yaml(fournos_fjob) if not fjob_spec: raise ValueError("FournosJob YAML not found, cannot parse FOURNOS PR arguments") + # Move fournos_fjob to metadata_dir + fournos_fjob_dest = metadata_dir / "fournos_fjob.yaml" + shutil.move(str(fournos_fjob), str(fournos_fjob_dest)) + logger.info(f"Moved FournosJob YAML from {fournos_fjob} to {fournos_fjob_dest}") + try: prepare_vault(fjob_spec) process_fjob_environment(fjob_spec) diff --git a/projects/core/library/config.py b/projects/core/library/config.py index d84005c4..3bf2ad7f 100644 --- a/projects/core/library/config.py +++ b/projects/core/library/config.py @@ -10,6 +10,8 @@ import jsonpath_ng import yaml +from projects.core.ci_entrypoint.prepare_ci import CI_METADATA_DIRNAME + from . import env logger = logging.getLogger(__name__) @@ -203,6 +205,9 @@ def apply_preset(self, name): raise ValueError(f"No preset found with name '{name}'") logger.info(f"Applying preset '{name}' ==> {values}") + dest_txt = env.ARTIFACT_DIR / CI_METADATA_DIRNAME / "presets_applied.txt" + dest_txt.parent.mkdir(parents=True, exist_ok=True) + for key, value in values.items(): if key == "extends": for extend_name in value: @@ -211,7 +216,8 @@ def apply_preset(self, name): msg = f"preset[{name}] {key} --> {value}" logger.info(msg) - with open(env.ARTIFACT_DIR / "presets_applied", "a") as f: + + with open(dest_txt, "a") as f: print(msg, file=f) self.set_config(key, value, print=False) diff --git a/projects/core/library/export.py b/projects/core/library/export.py index 101ebc23..52d2944a 100644 --- a/projects/core/library/export.py +++ b/projects/core/library/export.py @@ -52,11 +52,13 @@ def _update_fjob_export_status(status: dict): fjob_data["status"] = {} if "engineStatus" not in fjob_data["status"]: fjob_data["status"]["engineStatus"] = {} - if "status" not in fjob_data["status"]["engineStatus"]: - fjob_data["status"]["engineStatus"]["status"] = {} + if "forge" not in fjob_data["status"]["engineStatus"]: + fjob_data["status"]["engineStatus"]["forge"] = {} + if "status" not in fjob_data["status"]["engineStatus"]["forge"]: + fjob_data["status"]["engineStatus"]["forge"]["status"] = {} # Update with export-artifacts status - fjob_data["status"]["engineStatus"]["export-artifacts"] = status + fjob_data["status"]["engineStatus"]["forge"]["exportArtifacts"] = status # Patch the fjob patch_data = {"status": fjob_data["status"]} diff --git a/projects/core/library/postprocess.py b/projects/core/library/postprocess.py new file mode 100644 index 00000000..94656efc --- /dev/null +++ b/projects/core/library/postprocess.py @@ -0,0 +1,283 @@ +""" +Shared Caliper parse / visualize orchestration for FORGE projects. + +Registers a :mod:`click` subcommand that reads ``caliper.postprocess`` from project config and runs +:func:`projects.caliper.orchestration.postprocess.run_postprocess_from_orchestration_config`. +""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Any + +import click +import yaml +from pydantic import ValidationError + +from projects.caliper.orchestration.postprocess import ( + run_postprocess_from_orchestration_config, +) +from projects.caliper.orchestration.postprocess_config import ( + CaliperOrchestrationPostprocessConfig, +) +from projects.caliper.orchestration.postprocess_outcome import TestPhaseOutcome +from projects.core.library import ci as ci_lib +from projects.core.library import config, env +from projects.core.library.reports_index import generate_caliper_reports_index + +logger = logging.getLogger(__name__) + + +def run_and_postprocess(test_func, *args, **kwargs): + """ + Wrapper that runs a test function and handles outcome tracking with Caliper postprocessing. + + This wrapper: + 1. Executes the provided test function with given arguments + 2. Captures test outcomes (success, failure, exception details) + 3. Runs Caliper postprocessing with the test outcome + 4. Returns 1 if postprocessing fails when test succeeds + 5. Properly chains exceptions if both test and postprocess fail + + Args: + test_func: Callable test function to execute + *args: Positional arguments to pass to test_func + **kwargs: Keyword arguments to pass to test_func + + Returns: + The return value from the test function, or 1 if postprocessing fails + + Raises: + The original exception from test_func. If both test and postprocess fail, + exceptions are properly chained. If only postprocessing fails, returns 1. + """ + artifact_base_dir = Path(env.ARTIFACT_DIR).resolve() + + exc_msg: str | None = None + ret: int | None = None + original_exc: BaseException | None = None # Store the test exception + + try: + ret = test_func(*args, **kwargs) + return ret + except BaseException as e: + exc_msg = str(e) + original_exc = e # Capture before the name is cleared + raise + finally: + # Determine test outcome based on exception/return code + if exc_msg is not None: + outcome = TestPhaseOutcome("FAILED", exc_msg) + elif ret == 0: + outcome = TestPhaseOutcome("SUCCESS") + elif ret is None: + outcome = TestPhaseOutcome("FAILED", "test aborted without exit code") + else: + outcome = TestPhaseOutcome("FAILED", f"exit_code={ret}") + + # Run postprocessing and check status for failures + try: + status = run_postprocess_after_test(artifact_base_dir, test_outcome=outcome) + + # Check if postprocessing failed + final_status = status.get("final_status") + if final_status and "failed" in final_status: + if original_exc is not None: + # Both test and postprocess failed: log both issues + logger.error( + "Both test and postprocessing failed (final_status: %s)", final_status + ) + raise # Re-raise the original test exception + else: + # Only postprocess failed: return failure code + logger.error( + "Test succeeded but postprocessing failed (final_status: %s) - returning exit code 1", + final_status, + ) + return 1 + + except Exception as postprocess_exc: + logger.exception("Caliper postprocess after test failed with exception") + if original_exc is not None: + # Both test and postprocess failed: chain so both are visible in the traceback + raise postprocess_exc from original_exc + else: + # Only postprocess failed: return failure code instead of raising + logger.error( + "Test succeeded but postprocessing failed with exception - returning exit code 1" + ) + return 1 + + +def run_postprocess_after_test( + artifact_root: Path | os.PathLike[str] | str | None, + *, + test_outcome: TestPhaseOutcome | None = None, +) -> None: + """ + Run Caliper post-processing after the orchestration test phase. + + Uses ``artifact_root`` (typically :data:`env.ARTIFACT_BASE_DIR`) as the Caliper artifact tree, + and :func:`env.NextArtifactDir` ``(\"postprocessing\")`` as the workspace for visualize output, + KPI JSONL, and regression artifacts. + + ``test_outcome`` feeds ``final_status`` computation together with parse/visualize/KPI outcomes. + """ + try: + postprocess_config_raw = config.project.get_config("caliper.postprocess", print=False) or {} + postprocess_config = CaliperOrchestrationPostprocessConfig.model_validate( + postprocess_config_raw + ) + except ValidationError as e: + logger.error("Invalid caliper.postprocess config: %s", e) + raise + + if not postprocess_config.enabled: + logger.info("Caliper post-processing disabled (caliper.postprocess.enabled: false).") + return + + artifact_root_path = Path(artifact_root).resolve() if artifact_root is not None else None + + with env.NextArtifactDir("postprocessing"): + workspace = Path(env.ARTIFACT_DIR).resolve() + logger.info( + "Running Caliper postprocess (artifacts=%s, workspace=%s, test_phase=%s)", + artifact_root_path, + workspace, + test_outcome.phase if test_outcome else "SUCCESS", + ) + status = run_orchestration_postprocess( + artifact_dir=artifact_root_path, + visualize_output_dir=workspace, + test_outcome=test_outcome, + ) + logger.info( + "Caliper postprocess finished:\n%s", + yaml.dump(status, indent=2, default_flow_style=False, sort_keys=False), + ) + + # Generate reports index if visualization was successful + try: + index_path = generate_caliper_reports_index(status, workspace, "reports_index.html") + if index_path: + logger.info("Generated reports index at %s", index_path) + except Exception as e: + logger.warning("Failed to generate reports index: %s", e) + + return status + + +def resolve_caliper_postprocess_artifacts_dir( + *, + artifact_dir: Path | None, + postprocess_config: CaliperOrchestrationPostprocessConfig, +) -> Path: + """ + Resolve the Caliper **artifact tree** root. + + Precedence: explicit ``artifact_dir``, ``caliper.postprocess.artifacts_dir`` + """ + if artifact_dir is not None: + return artifact_dir.expanduser().resolve() + + if postprocess_config.artifacts_dir and postprocess_config.artifacts_dir.strip(): + return Path(postprocess_config.artifacts_dir).expanduser().resolve() + + raise ValueError( + "Caliper postprocess requires the artifact tree root: use --artifact-dir, " + "set caliper.postprocess.artifacts_dir in project config, or set ARTIFACT_BASE_DIR." + ) + + +def run_orchestration_postprocess( + *, + artifact_dir: Path | None, + visualize_output_dir: Path | None = None, + test_outcome: TestPhaseOutcome | None = None, +) -> dict[str, Any]: + """Load ``caliper.postprocess`` from project config and run enabled post-processing steps.""" + + try: + postprocess_config_raw = config.project.get_config("caliper.postprocess", print=False) or {} + postprocess_config = CaliperOrchestrationPostprocessConfig.model_validate( + postprocess_config_raw + ) + except ValidationError as e: + logger.error("Invalid caliper.postprocess config: %s", e) + raise + + artifacts_dir = resolve_caliper_postprocess_artifacts_dir( + artifact_dir=artifact_dir, + postprocess_config=postprocess_config, + ) + + result = run_postprocess_from_orchestration_config( + postprocess_config_raw, + artifacts_dir=artifacts_dir, + visualize_output_dir=visualize_output_dir, + test_outcome=test_outcome, + ) + + status_base = visualize_output_dir + if status_base is None: + return result + + status_path = Path(status_base) / "caliper_postprocess_status.yaml" + try: + status_path.parent.mkdir(parents=True, exist_ok=True) + status_path.write_text( + yaml.dump(result, indent=2, default_flow_style=False, sort_keys=False), + encoding="utf-8", + ) + logger.info("Wrote postprocess status YAML to %s", status_path) + except OSError as e: + logger.warning("Could not write %s: %s", status_path, e) + + return result + + +@click.command("postprocess") +@click.option( + "--artifact-dir", + "artifact_dir", + type=click.Path(path_type=Path, exists=True, file_okay=False, dir_okay=True), + default=None, + help=( + "Caliper artifact tree root (directories with __test_labels__.yaml). " + "Overrides caliper.postprocess.artifacts_dir and ARTIFACT_BASE_DIR when set." + ), +) +@click.option( + "--output-dir", + "output_dir", + type=click.Path(path_type=Path, exists=False, file_okay=False, dir_okay=True), + default=None, + help=( + "Output directory, where the post processing results will be stored. " + "Overrides caliper.postprocess.artifacts_dir and ARTIFACT_BASE_DIR when set." + ), +) +@click.pass_context +@ci_lib.safe_ci_command +def postprocess_command(_ctx, artifact_dir: Path | None, output_dir: Path | None): + """Run the post-processing pipeline.""" + + status = run_orchestration_postprocess( + artifact_dir=artifact_dir, + test_outcome=TestPhaseOutcome("NOT_AVAILABLE"), + visualize_output_dir=output_dir, + ) + logger.info("Caliper postprocess status:\n" + yaml.dump(status, indent=2)) + + # Generate reports index if output directory is specified + if output_dir: + try: + index_path = generate_caliper_reports_index(status, output_dir, "reports_index.html") + if index_path: + logger.info("Generated reports index at %s", index_path) + except Exception as e: + logger.warning("Failed to generate reports index: %s", e) + + return 0 diff --git a/projects/core/library/reports_index.py b/projects/core/library/reports_index.py new file mode 100644 index 00000000..3b0e8dd5 --- /dev/null +++ b/projects/core/library/reports_index.py @@ -0,0 +1,157 @@ +""" +Generate HTML index pages for Caliper reports. + +Inspired by topsail/testing/utils/generate_plot_index.py but adapted for +Caliper postprocessing workflow. +""" + +import logging +from datetime import datetime +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + + +def generate_caliper_reports_index( + status: dict[str, Any], output_dir: Path, index_filename: str = "index.html" +) -> Path | None: + """ + Generate an HTML index page with links to all Caliper reports. + + Args: + status: Caliper postprocess status dict with steps information + output_dir: Directory where reports are located and where index will be written + index_filename: Name of the index file to generate (default: "index.html") + + Returns: + Path to the generated index file, or None if no reports found + """ + output_dir = Path(output_dir).resolve() + index_path = output_dir / index_filename + + # Extract report information from status + visualize_step = status.get("steps", {}).get("visualize", {}) + if visualize_step.get("status") != "ok": + logger.info("No successful visualize step found, skipping index generation") + return None + + # Look for HTML files in the output directory + html_files = [] + json_files = [] + + ignored_files = {index_filename, "caliper_postprocess_status.yaml"} + + for html_file in sorted(output_dir.glob("*.html")): + if html_file.name not in ignored_files: + html_files.append(html_file) + + for json_file in sorted(output_dir.glob("*.json")): + if json_file.name not in ignored_files: + json_files.append(json_file) + + # Check subdirectories for additional reports + for subdir in sorted(output_dir.glob("*/")): + if subdir.is_dir(): + for html_file in sorted(subdir.glob("*.html")): + if html_file.name not in ignored_files: + html_files.append(html_file) + for json_file in sorted(subdir.glob("*.json")): + if json_file.name not in ignored_files: + json_files.append(json_file) + + if not html_files and not json_files: + logger.info("No HTML or JSON reports found, skipping index generation") + return None + + # Generate HTML content + html_content = _generate_index_html_content( + html_files=html_files, json_files=json_files, output_dir=output_dir, status=status + ) + + # Write index file + try: + index_path.write_text(html_content, encoding="utf-8") + logger.info("Generated Caliper reports index at %s", index_path) + return index_path + except OSError as e: + logger.warning("Failed to write reports index to %s: %s", index_path, e) + return None + + +def _generate_index_html_content( + html_files: list[Path], json_files: list[Path], output_dir: Path, status: dict[str, Any] +) -> str: + """Generate the HTML content for the reports index.""" + + # Get test outcome and timing information + test_phase = status.get("test_phase", {}) + test_status = test_phase.get("phase", "UNKNOWN") + final_status = status.get("final_status", "unknown") + + # Build HTML content + html_parts = [ + "", + "", + "", + " ", + "Generated on {timestamp} by FORGE Caliper
", + "", + "", + ] + ) + + return "\n".join(html_parts) diff --git a/projects/core/notifications/send.py b/projects/core/notifications/send.py index 48911826..ef1ad6b5 100644 --- a/projects/core/notifications/send.py +++ b/projects/core/notifications/send.py @@ -165,13 +165,48 @@ def get_common_message(finish_reason: str, status: str, get_link, get_italics, g message += f""" • Link to the {get_link("test results", "", is_dir=True)}. """ - if (pathlib.Path(os.environ.get("ARTIFACT_DIR", "")) / "reports_index.html").exists(): - message += f""" + # Check for Caliper postprocess status and generated reports + artifact_dir = pathlib.Path(os.environ.get("ARTIFACT_DIR", "")) + caliper_status_path = None + + # Search for caliper_postprocess_status.yaml in artifact directory and subdirectories + for status_file in artifact_dir.glob("**/caliper_postprocess_status.yaml"): + caliper_status_path = status_file + break + + if caliper_status_path and caliper_status_path.exists(): + try: + with open(caliper_status_path) as f: + caliper_status = yaml.safe_load(f) + + visualize_step = caliper_status.get("steps", {}).get("visualize", {}) + if visualize_step.get("status") == "ok" and visualize_step.get("paths"): + paths = visualize_step["paths"] + message += f""" +• Generated {len(paths)} Caliper report(s): +""" + for path in paths: + # Just list the path, no link + message += f" - {path}\n" + + # Also link to the reports index if it exists + reports_index_path = artifact_dir / "reports_index.html" + if reports_index_path.exists(): + message += f""" • Link to the {get_link("reports index", "reports_index.html")}. +""" + else: + message += """ +• Caliper postprocess completed but no reports generated. +""" + except Exception as e: + logger.warning("Failed to parse caliper_postprocess_status.yaml: %s", e) + message += """ +• No reports generated... """ else: message += """ -• No reports index generated... +• No reports generated... """ if ( @@ -525,11 +560,14 @@ def get_bold(text): status_icon = ":no-red-circle:" if summary.get("failures") else ":done-circle-check:" + reports_index_link = "" + if (pathlib.Path(os.environ.get("ARTIFACT_DIR", "")) / "reports_index.html").exists(): + reports_index_link = f"• Link to the {get_link('reports index', 'reports_index.html')}.\n" + return f"""{status_icon} {get_bold(summary["message"])} • Link to the {get_link("test results", "", is_dir=True)}. -• Link to the {get_link("reports index", "reports_index.html")}. - +{reports_index_link} - `{summary["entries_count"]}` entries were tested against `{summary["kpis_count"]}` KPIs - `{summary["failures"]}` failed - `{summary["no_history"]}` had no history diff --git a/projects/skeleton/orchestration/cli.py b/projects/skeleton/orchestration/cli.py index a2a4f5e1..f1633288 100755 --- a/projects/skeleton/orchestration/cli.py +++ b/projects/skeleton/orchestration/cli.py @@ -13,6 +13,7 @@ from projects.core.library import config from projects.core.library.cli import safe_cli_command +from projects.core.library.postprocess import postprocess_command logger = logging.getLogger(__name__) @@ -65,5 +66,8 @@ def cleanup(ctx): sys.exit(exit_code) +main.add_command(postprocess_command) + + if __name__ == "__main__": main() diff --git a/projects/skeleton/orchestration/config.yaml b/projects/skeleton/orchestration/config.yaml index f88a16b6..fc99b7c9 100644 --- a/projects/skeleton/orchestration/config.yaml +++ b/projects/skeleton/orchestration/config.yaml @@ -12,7 +12,7 @@ skeleton: extra_properties: {} # for the preset to give any extra property deep_testing: false test: - duration_seconds: 30 # Default test duration in seconds + duration_seconds: 0 # Default test duration in seconds caliper: export: @@ -38,3 +38,30 @@ caliper: tags: {} parameters: {} metrics: {} + + postprocess: + enabled: true + artifacts_dir: null + plugin_module: projects.skeleton.postprocess.default.plugin + postprocess_config: null + parse: + enabled: true + no_cache: false + visualize: + enabled: true + output_dir: null + reports: [summary_table, throughput_chart] + report_group: null + + kpi: + enabled: false + generate: + enabled: false + output: kpis.jsonl + export: + enabled: false + + analyze: + enabled: false + baseline: null + output: kpi_analyze.json diff --git a/projects/skeleton/orchestration/test_skeleton.py b/projects/skeleton/orchestration/test_skeleton.py index 50831ec4..fe4b56bc 100644 --- a/projects/skeleton/orchestration/test_skeleton.py +++ b/projects/skeleton/orchestration/test_skeleton.py @@ -1,3 +1,4 @@ +import json import logging import pathlib import signal @@ -6,11 +7,39 @@ import yaml from projects.core.library import config, env, run, vault +from projects.core.library.postprocess import run_and_postprocess from projects.skeleton.toolbox.cluster_info.main import run as cluster_info logger = logging.getLogger(__name__) +def seed_skeleton_caliper_artifacts() -> pathlib.Path: + """ + Create minimal Caliper inputs under the FORGE artifact root: + + * ``__test_labels__.yaml`` + ``metrics.json`` per scenario (required by the skeleton plugin). + """ + demo_dir = env.ARTIFACT_DIR + FAKE_DATA = ( + ("smoke", 120.5, 8.2), + ("load", 87.0, 22.1), + ) + for scenario, throughput, latency_ms in FAKE_DATA: + d = demo_dir / scenario + d.mkdir(parents=True, exist_ok=True) + (d / "__test_labels__.yaml").write_text( + yaml.dump({"labels": {"scenario": scenario}}, sort_keys=False), + encoding="utf-8", + ) + (d / "metrics.json").write_text( + json.dumps({"throughput": throughput, "latency_ms": latency_ms}), + encoding="utf-8", + ) + + logger.info("Seeded Caliper demo tree under %s", demo_dir) + return demo_dir + + def _signal_handler_sigint(sig, frame): """Sample SIGINT signal handler for skeleton project.""" env.reset_artifact_dir() @@ -48,12 +77,42 @@ def init(skip_vault_init=False, strict_vault_validation=True): def test(): - logger.info("=== Skeleton Project Test Phase ===") + """Main test function that wraps do_test() with outcome postprocessing.""" + return run_and_postprocess(do_test) + +def skeleton_take_time(): # Get test duration configuration test_duration = config.project.get_config("skeleton.test.duration_seconds") logger.info(f"Test duration: {test_duration} seconds") + start_time = time.time() + test_iteration = 0 + + logger.info(f"Starting {test_duration}s test loop...") + + # Run timed test loop + while time.time() - start_time < test_duration: + test_iteration += 1 + elapsed = time.time() - start_time + remaining = test_duration - elapsed + + logger.info( + f"Test iteration {test_iteration} - Elapsed: {elapsed:.1f}s, Remaining: {remaining:.1f}s" + ) + + # Simulate some test work with explicit waiting message + wait_time = min(30.0, remaining) + logger.info(f"⏳ Waiting {wait_time:.1f}s before next iteration...") + time.sleep(wait_time) + + elapsed_total = time.time() - start_time + logger.info(f"✅ Completed {test_iteration} test iterations in {elapsed_total:.1f}s") + + +def do_test(): + logger.info("=== Skeleton Project Test Phase ===") + if config.project.get_config("skeleton.deep_testing"): logger.warning("Running the (fake) deep testing ...") else: @@ -77,28 +136,10 @@ def test(): logger.info("") logger.info(f"Fake test configuration:\n{yaml_cfg}") - # Run timed test loop - start_time = time.time() - test_iteration = 0 + skeleton_take_time() - logger.info(f"Starting {test_duration}s test loop...") - - while time.time() - start_time < test_duration: - test_iteration += 1 - elapsed = time.time() - start_time - remaining = test_duration - elapsed - - logger.info( - f"Test iteration {test_iteration} - Elapsed: {elapsed:.1f}s, Remaining: {remaining:.1f}s" - ) - - # Simulate some test work with explicit waiting message - wait_time = min(30.0, remaining) - logger.info(f"⏳ Waiting {wait_time:.1f}s before next iteration...") - time.sleep(wait_time) - - elapsed_total = time.time() - start_time - logger.info(f"✅ Completed {test_iteration} test iterations in {elapsed_total:.1f}s") + with env.NextArtifactDir("skeleton_seed_data_for_caliper_postprocessing"): + seed_skeleton_caliper_artifacts() if not config.project.get_config("skeleton.collect_cluster_info"): logger.warning("⚠️ Cluster information gathering not enabled. Returning early.") @@ -137,7 +178,7 @@ def resolve_hardware_request(hardware_spec: dict): logger.info("Hardware resolution: stub implementation - no changes made") # Stub implementation - could be extended to: - # - Read hardware config from project config + # - Read hardware config from project configuration # - Set hardware requirements based on workload needs # - Handle different hardware profiles (GPU, CPU, memory requirements) # - Example: return {"gpu": {"type": "nvidia-tesla-v100", "count": 1}, "memory": "32Gi"} diff --git a/projects/skeleton/postprocess/__init__.py b/projects/skeleton/postprocess/__init__.py new file mode 100644 index 00000000..d6ef977c --- /dev/null +++ b/projects/skeleton/postprocess/__init__.py @@ -0,0 +1 @@ +"""Skeleton Caliper post-processing sample plugin.""" diff --git a/projects/skeleton/postprocess/default/__init__.py b/projects/skeleton/postprocess/default/__init__.py new file mode 100644 index 00000000..127e3baf --- /dev/null +++ b/projects/skeleton/postprocess/default/__init__.py @@ -0,0 +1 @@ +"""Default sample Caliper plugin + demo artifact layout for Skeleton.""" diff --git a/projects/skeleton/postprocess/default/caliper.yaml b/projects/skeleton/postprocess/default/caliper.yaml new file mode 100644 index 00000000..f0a42bcf --- /dev/null +++ b/projects/skeleton/postprocess/default/caliper.yaml @@ -0,0 +1 @@ +plugin_module: projects.skeleton.postprocess.default.plugin diff --git a/projects/skeleton/postprocess/default/demo_run/load/__test_labels__.yaml b/projects/skeleton/postprocess/default/demo_run/load/__test_labels__.yaml new file mode 100644 index 00000000..c4596951 --- /dev/null +++ b/projects/skeleton/postprocess/default/demo_run/load/__test_labels__.yaml @@ -0,0 +1,2 @@ +labels: + scenario: load diff --git a/projects/skeleton/postprocess/default/demo_run/load/metrics.json b/projects/skeleton/postprocess/default/demo_run/load/metrics.json new file mode 100644 index 00000000..c7eccb96 --- /dev/null +++ b/projects/skeleton/postprocess/default/demo_run/load/metrics.json @@ -0,0 +1 @@ +{"throughput": 87.0, "latency_ms": 22.1} diff --git a/projects/skeleton/postprocess/default/demo_run/smoke/__test_labels__.yaml b/projects/skeleton/postprocess/default/demo_run/smoke/__test_labels__.yaml new file mode 100644 index 00000000..9bb11d1c --- /dev/null +++ b/projects/skeleton/postprocess/default/demo_run/smoke/__test_labels__.yaml @@ -0,0 +1,2 @@ +labels: + scenario: smoke diff --git a/projects/skeleton/postprocess/default/demo_run/smoke/metrics.json b/projects/skeleton/postprocess/default/demo_run/smoke/metrics.json new file mode 100644 index 00000000..943a2df0 --- /dev/null +++ b/projects/skeleton/postprocess/default/demo_run/smoke/metrics.json @@ -0,0 +1 @@ +{"throughput": 120.5, "latency_ms": 8.2} diff --git a/projects/skeleton/postprocess/default/plugin.py b/projects/skeleton/postprocess/default/plugin.py new file mode 100644 index 00000000..254c05c2 --- /dev/null +++ b/projects/skeleton/postprocess/default/plugin.py @@ -0,0 +1,218 @@ +"""Sample Caliper PostProcessingPlugin for Skeleton (`projects/skeleton/postprocess/default`).""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from projects.caliper.engine.model import ( + ParseResult, + PostProcessingPlugin, + TestBaseNode, + UnifiedResultRecord, + UnifiedRunModel, +) + + +def _labels_from_node(node: TestBaseNode) -> dict[str, Any]: + raw = node.labels + inner = raw.get("labels") + if isinstance(inner, dict): + return dict(inner) + if isinstance(raw, dict): + return dict(raw) + return {"facet": "default"} + + +class SkeletonDefaultPlugin(PostProcessingPlugin): + """ + Parses per-test directories containing ``metrics.json`` (simple numeric mapping). + + Visual reports (Plotly HTML): + + * ``summary_table`` — tabular view of scenarios and metrics. + * ``throughput_chart`` — bar chart of ``throughput`` when present. + """ + + def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult: + records: list[UnifiedResultRecord] = [] + warnings: list[str] = [] + for node in nodes: + metrics: dict[str, Any] = {} + for p in node.artifact_paths: + if p.name != "metrics.json": + continue + try: + metrics = json.loads(p.read_text(encoding="utf-8")) + if not isinstance(metrics, dict): + warnings.append(f"{p}: metrics.json must be a JSON object") + metrics = {} + except json.JSONDecodeError as e: + warnings.append(f"Malformed JSON {p}: {e}") + metrics = {"_parse_error": True} + break + labels = _labels_from_node(node) + records.append( + UnifiedResultRecord( + test_base_path=str(node.directory.relative_to(base_dir.resolve())), + distinguishing_labels=labels, + metrics=dict(metrics) if metrics else {"throughput": 0.0}, + run_identity={"skeleton_sample": True}, + parse_notes=[], + ) + ) + return ParseResult(records=records, warnings=warnings) + + def visualize( + self, + model: UnifiedRunModel, + output_dir: Path, + report_ids: list[str] | None, + group_id: str | None, + visualize_config: dict[str, Any] | None, + ) -> list[str]: + import html as html_lib + + import plotly.graph_objects as go + + output_dir.mkdir(parents=True, exist_ok=True) + paths: list[str] = [] + wanted = frozenset(report_ids or ()) + + if "summary_table" in wanted: + rows = [] + for r in model.unified_result_records: + scenario = r.distinguishing_labels.get("scenario", "") + tp = r.metrics.get("throughput", "") + lat = r.metrics.get("latency_ms", "") + rows.append( + "| test_base_path | scenario | throughput | latency_ms | " + "
|---|