diff --git a/pyproject.toml b/pyproject.toml index a1cdc631..024ad38e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,11 +101,13 @@ markers = [ "ebpf: Tests requiring eBPF daemon/root privileges", "cloud: Tests for cloud storage operations", "happy_path: Happy path tests for core functionality", + "diagnostic: Opt-in diagnostics or aspirational performance budgets outside the default gate", + "large_pipeline: Stress-style pipeline coverage with larger DAG fixtures", "ray_e2e: Ray end-to-end tests requiring a running Docker cluster", "ray_contract: User-facing Ray contract tests using `roar run ray job submit ...`", "ray_diagnostic: Diagnostic Ray tests that intentionally inspect internal runtime details", ] -addopts = "-v --strict-markers -n auto --dist loadfile --ignore=tests/ebpf --ignore=tests/live_glaas --ignore=tests/benchmarks --ignore=tests/integration --ignore=tests/e2e --ignore-glob=tests/backends/*/e2e --ignore-glob=tests/backends/*/live" +addopts = "-v --strict-markers -n auto --dist loadfile --ignore=tests/ebpf --ignore=tests/live_glaas --ignore=tests/benchmarks --ignore=tests/e2e --ignore=tests/integration/test_cli_startup.py --ignore=tests/execution/runtime/test_sitecustomize_perf.py --ignore-glob=tests/backends/*/e2e --ignore-glob=tests/backends/*/live" timeout = 60 filterwarnings = [ "ignore::DeprecationWarning", diff --git a/roar/application/labels.py b/roar/application/labels.py index 093e52bd..1bec06d3 100644 --- a/roar/application/labels.py +++ b/roar/application/labels.py @@ -14,8 +14,9 @@ from typing import Any, Protocol from ..db.context import DatabaseContext +from ..execution.recording.dataset_metadata import AUTO_DATASET_LABEL_KEYS -RESERVED_LABEL_KEYS = {"dataset.type", "dataset.modality"} +RESERVED_LABEL_KEYS = set(AUTO_DATASET_LABEL_KEYS) @dataclass(frozen=True) diff --git a/roar/cli/commands/auth.py b/roar/cli/commands/auth.py index 855cc6c5..5467b3b9 100644 --- a/roar/cli/commands/auth.py +++ b/roar/cli/commands/auth.py @@ -116,10 +116,9 @@ def auth_register() -> None: @auth.command("test") def auth_test() -> None: """Test connection to GLaaS server.""" - # Get GLaaS server URL from config - glaas_url = config_get("glaas.url") - if not glaas_url: - glaas_url = os.environ.get("GLAAS_URL") + from ...integrations.glaas import get_glaas_url + + glaas_url = get_glaas_url() if not glaas_url: raise click.ClickException( @@ -201,7 +200,9 @@ def auth_test() -> None: @auth.command("status") def auth_status() -> None: """Show current auth status.""" - glaas_url = config_get("glaas.url") or os.environ.get("GLAAS_URL") + from ...integrations.glaas import get_glaas_url + + glaas_url = get_glaas_url() key_info = _find_ssh_pubkey() click.echo("GLaaS Auth Status") diff --git a/roar/db/repositories/session.py b/roar/db/repositories/session.py index 590ac323..c005634d 100644 --- a/roar/db/repositories/session.py +++ b/roar/db/repositories/session.py @@ -377,7 +377,11 @@ def update_current_step(self, session_id: int, step_number: int) -> None: self._session.flush() def update_git_commits( - self, session_id: int, git_commit: str, update_start: bool = False + self, + session_id: int, + git_commit: str | None, + update_start: bool = False, + git_repo: str | None = None, ) -> None: """ Update git commit references for a session. @@ -386,14 +390,19 @@ def update_git_commits( session_id: Session ID git_commit: Git commit hash update_start: Whether to update start commit if not set + git_repo: Git repository URL/path to persist on the session """ session = self._session.get(Session, session_id) if not session: return - if update_start and not session.git_commit_start: + if git_repo and not session.git_repo: + session.git_repo = git_repo + + if git_commit and update_start and not session.git_commit_start: session.git_commit_start = git_commit - session.git_commit_end = git_commit + if git_commit: + session.git_commit_end = git_commit self._session.flush() def rename_step( diff --git a/roar/db/services/job_recording.py b/roar/db/services/job_recording.py index 682939b0..c012a410 100644 --- a/roar/db/services/job_recording.py +++ b/roar/db/services/job_recording.py @@ -137,7 +137,7 @@ def record_job( # Handle session assignment session_id, step_number = self._assign_to_session( - assign_to_session, step_identity, git_commit + assign_to_session, step_identity, git_commit, git_repo ) # Create the job record @@ -193,6 +193,7 @@ def _assign_to_session( assign: bool, step_identity: str, git_commit: str | None, + git_repo: str | None, ) -> tuple[int | None, int | None]: """Handle session assignment and step numbering.""" if not assign: @@ -209,8 +210,13 @@ def _assign_to_session( self._session_repo.update_current_step(session_id, step_number) - if git_commit: - self._session_repo.update_git_commits(session_id, git_commit, update_start=True) + if git_commit or git_repo: + self._session_repo.update_git_commits( + session_id, + git_commit, + update_start=True, + git_repo=git_repo, + ) return session_id, step_number diff --git a/roar/execution/provenance/data_loader.py b/roar/execution/provenance/data_loader.py index b322e1a7..de701702 100644 --- a/roar/execution/provenance/data_loader.py +++ b/roar/execution/provenance/data_loader.py @@ -45,7 +45,16 @@ def load_tracer_data(self, path: str) -> TracerData: """ self.logger.debug("Loading tracer data from: %s", path) with open(path, "rb") as f: - data = msgpack.unpack(f, raw=False) + raw_data = f.read() + + try: + data = msgpack.unpackb(raw_data, raw=False) + except msgpack.ExtraData: + stripped = raw_data.lstrip() + if not stripped.startswith((b"{", b"[")): + raise + self.logger.debug("Tracer report was JSON-encoded; falling back to JSON parsing") + data = json.loads(raw_data.decode("utf-8")) self.logger.debug("Tracer data parsed successfully: %d keys", len(data)) files = self._normalize_files(data) diff --git a/roar/execution/recording/dataset_metadata.py b/roar/execution/recording/dataset_metadata.py index 35e35848..8fbca5f8 100644 --- a/roar/execution/recording/dataset_metadata.py +++ b/roar/execution/recording/dataset_metadata.py @@ -1,10 +1,24 @@ -"""Helpers for attaching dataset identity labels to composite artifact metadata.""" +"""Helpers for attaching dataset identity metadata and labels to composite artifacts.""" from __future__ import annotations from typing import Any from urllib.parse import urlparse +from .dataset_profile import build_dataset_profile + +AUTO_DATASET_LABEL_KEYS = frozenset( + { + "dataset.type", + "dataset.id", + "dataset.fingerprint", + "dataset.fingerprint_algorithm", + "dataset.split", + "dataset.version_hint", + "dataset.modality", + } +) + def find_matching_identifier( root_path: str, dataset_identifiers: list[dict[str, Any]] @@ -50,3 +64,45 @@ def build_dataset_metadata(identifier: dict[str, Any]) -> dict[str, Any]: if value is not None: meta[key] = value return meta + + +def build_dataset_label_metadata( + identifier: dict[str, Any], + *, + components: list[dict[str, Any]] | None = None, + component_count_total: int | None = None, +) -> dict[str, Any]: + """Build the system-managed label document for a detected dataset artifact. + + The label payload is intentionally smaller and more stable than the full + dataset metadata blob. It captures the artifact's dataset identity and the + most queryable derived characteristics for local labels and future sync. + """ + dataset: dict[str, Any] = {"type": "dataset"} + + value = identifier.get("dataset_id") + if value is not None: + dataset["id"] = value + + value = identifier.get("dataset_fingerprint") + if value is not None: + dataset["fingerprint"] = value + + value = identifier.get("dataset_fingerprint_algorithm") + if value is not None: + dataset["fingerprint_algorithm"] = value + + value = identifier.get("split") + if value is not None: + dataset["split"] = value + + value = identifier.get("version_hint") + if value is not None: + dataset["version_hint"] = value + + profile = build_dataset_profile(components or [], total_components=component_count_total) + modality = profile.get("modality_hint") if isinstance(profile, dict) else None + if isinstance(modality, str) and modality: + dataset["modality"] = modality + + return {"dataset": dataset} diff --git a/roar/execution/recording/job_recording.py b/roar/execution/recording/job_recording.py index 0bd77424..37f21700 100644 --- a/roar/execution/recording/job_recording.py +++ b/roar/execution/recording/job_recording.py @@ -27,7 +27,12 @@ from ...db.context import optional_repo from ...db.hashing import hash_files_blake3 from .dataset_identifier import DatasetIdentifierInferer -from .dataset_metadata import build_dataset_metadata, find_matching_identifier +from .dataset_metadata import ( + AUTO_DATASET_LABEL_KEYS, + build_dataset_label_metadata, + build_dataset_metadata, + find_matching_identifier, +) if TYPE_CHECKING: from ...core.models.run import RunContext @@ -215,10 +220,7 @@ def record( """Create a job and link precomputed input/output artifacts.""" resolved_session_id = session_id if resolved_session_id is None: - active_session = db_ctx.sessions.get_active() - if active_session is None: - raise ValueError("No active session") - resolved_session_id = int(active_session["id"]) + resolved_session_id = int(db_ctx.sessions.get_or_create_active()) step_number = db_ctx.sessions.get_next_step_number(resolved_session_id) job_id, recorded_job_uid = db_ctx.jobs.create( @@ -352,8 +354,14 @@ def materialize( } } matching = find_matching_identifier(str(root), dataset_identifiers) + dataset_label_metadata: dict[str, Any] = {} if matching is not None: meta_dict["dataset"] = build_dataset_metadata(matching) + dataset_label_metadata = build_dataset_label_metadata( + matching, + components=list(composite.payload.get("components") or []), + component_count_total=composite.component_count_total, + ) metadata = json.dumps(meta_dict) artifact_id, _created = db_ctx.artifacts.register( hashes={"composite-blake3": composite.digest}, @@ -362,6 +370,12 @@ def materialize( source_type=composite.payload.get("source_type"), metadata=metadata, ) + if dataset_label_metadata: + self._sync_dataset_labels( + db_ctx, + artifact_id=artifact_id, + dataset_label_metadata=dataset_label_metadata, + ) composite_repo.upsert_details( artifact_id=artifact_id, components=list(composite.payload.get("components") or []), @@ -482,6 +496,76 @@ def _is_path_under_root(path: Path, root: Path) -> bool: except ValueError: return False + @staticmethod + def _sync_dataset_labels( + db_ctx: Any, + *, + artifact_id: str, + dataset_label_metadata: dict[str, Any], + ) -> None: + labels_repo = cast(Any, optional_repo(db_ctx, "labels")) + if labels_repo is None or not dataset_label_metadata: + return + + current = labels_repo.get_current("artifact", artifact_id=artifact_id) + current_metadata = current.get("metadata") if isinstance(current, dict) else {} + if not isinstance(current_metadata, dict): + current_metadata = {} + + merged = CompositeOutputMaterializer._merge_dataset_labels( + current_metadata, + dataset_label_metadata, + ) + if merged == current_metadata: + return + + labels_repo.create_version("artifact", merged, artifact_id=artifact_id) + + @staticmethod + def _merge_dataset_labels(current: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]: + merged = CompositeOutputMaterializer._remove_label_paths( + current, + AUTO_DATASET_LABEL_KEYS, + ) + return CompositeOutputMaterializer._deep_merge(merged, patch) + + @staticmethod + def _deep_merge(current: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]: + merged = json.loads(json.dumps(current)) + for key, value in patch.items(): + existing = merged.get(key) + if isinstance(existing, dict) and isinstance(value, dict): + merged[key] = CompositeOutputMaterializer._deep_merge(existing, value) + else: + merged[key] = value + return merged + + @staticmethod + def _remove_label_paths( + metadata: dict[str, Any], reserved_paths: set[str] | frozenset[str] + ) -> dict[str, Any]: + cleaned = json.loads(json.dumps(metadata)) + for path in reserved_paths: + CompositeOutputMaterializer._remove_nested(cleaned, path.split(".")) + return cleaned + + @staticmethod + def _remove_nested(root: dict[str, Any], path: list[str]) -> None: + if not path: + return + key = path[0] + if key not in root: + return + if len(path) == 1: + root.pop(key, None) + return + child = root.get(key) + if not isinstance(child, dict): + return + CompositeOutputMaterializer._remove_nested(child, path[1:]) + if not child: + root.pop(key, None) + class ExecutionJobRecorder: """Persist a traced execution and return reporting payload pieces.""" diff --git a/tests/application/get/test_service.py b/tests/application/get/test_service.py index 94be19a0..dc3ac6ff 100644 --- a/tests/application/get/test_service.py +++ b/tests/application/get/test_service.py @@ -133,7 +133,7 @@ def test_get_artifacts_skips_active_session_check_on_dry_run(tmp_path: Path) -> recorder_cls.return_value.record.assert_not_called() -def test_get_artifacts_requires_active_session_for_real_downloads(tmp_path: Path) -> None: +def test_get_artifacts_surfaces_recorder_errors(tmp_path: Path) -> None: parsed_source = MagicMock(is_prefix=False, scheme="s3") db_ctx = MagicMock() db_ctx.__enter__.return_value = db_ctx @@ -141,7 +141,7 @@ def test_get_artifacts_requires_active_session_for_real_downloads(tmp_path: Path service = MagicMock() service.get.return_value = GetTransferResult(success=True, downloaded_files=[]) recorder = MagicMock() - recorder.record.side_effect = ValueError("No active session") + recorder.record.side_effect = ValueError("Recorder failed") with ( patch("roar.application.get.service.bootstrap"), @@ -153,7 +153,7 @@ def test_get_artifacts_requires_active_session_for_real_downloads(tmp_path: Path patch("roar.application.get.service.LocalJobRecorder", return_value=recorder), ): resolve_git_state.return_value.commit = "deadbeef" - with pytest.raises(ValueError, match="No active session"): + with pytest.raises(ValueError, match="Recorder failed"): get_artifacts(_request(tmp_path)) diff --git a/tests/conftest.py b/tests/conftest.py index 8fdbd62f..f0313f3f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,14 +8,83 @@ """ import os +import shutil import subprocess import sys +import tempfile from collections.abc import Callable +from functools import lru_cache from pathlib import Path +try: + import fcntl +except ImportError: # pragma: no cover - non-Unix fallback + fcntl = None + import pytest REPO_ROOT = Path(__file__).resolve().parents[1] +RUST_MANIFEST = REPO_ROOT / "rust" / "Cargo.toml" +RELEASE_BIN_DIR = REPO_ROOT / "rust" / "target" / "release" +PACKAGE_BIN_DIR = REPO_ROOT / "roar" / "bin" + + +def _repo_local_binary_dirs() -> list[str]: + dirs: list[str] = [] + for directory in (RELEASE_BIN_DIR, PACKAGE_BIN_DIR): + if directory.is_dir(): + dirs.append(str(directory)) + return dirs + + +def _repo_local_ptrace_exists() -> bool: + return (RELEASE_BIN_DIR / "roar-tracer").exists() or (PACKAGE_BIN_DIR / "roar-tracer").exists() + + +@lru_cache(maxsize=1) +def _ensure_repo_local_ptrace_tracer() -> None: + if not sys.platform.startswith("linux"): + return + if _repo_local_ptrace_exists(): + return + + cargo = shutil.which("cargo") + if cargo is None: + raise RuntimeError( + "Integration tests need a repo-local roar-tracer, but `cargo` was not found on PATH." + ) + + lock_path = Path(tempfile.gettempdir()) / "roar-test-suite-optimizations-roar-tracer.lock" + with lock_path.open("w", encoding="utf-8") as lock_file: + if fcntl is not None: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) + try: + if _repo_local_ptrace_exists(): + return + result = subprocess.run( + [ + cargo, + "build", + "--release", + "--manifest-path", + str(RUST_MANIFEST), + "-p", + "roar-tracer", + ], + cwd=REPO_ROOT, + capture_output=True, + text=True, + env=dict(os.environ), + ) + if result.returncode != 0: + raise RuntimeError( + "Failed to build a repo-local roar-tracer for integration tests.\n" + f"stdout:\n{result.stdout or ''}\n" + f"stderr:\n{result.stderr or ''}" + ) + finally: + if fcntl is not None: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) def _subprocess_env() -> dict[str, str]: @@ -28,14 +97,27 @@ def _subprocess_env() -> dict[str, str]: env["PYTHONPATH"] = ( f"{repo_root}{os.pathsep}{current_pythonpath}" if current_pythonpath else repo_root ) + repo_binary_dirs = _repo_local_binary_dirs() + current_path = env.get("PATH", "") + path_entries = current_path.split(os.pathsep) if current_path else [] + new_entries = [entry for entry in repo_binary_dirs if entry not in path_entries] + if new_entries: + env["PATH"] = ( + os.pathsep.join([*new_entries, *path_entries]) + if path_entries + else os.pathsep.join(new_entries) + ) return env os.environ["PYTHONPATH"] = _subprocess_env()["PYTHONPATH"] +os.environ["PATH"] = _subprocess_env()["PATH"] def _run_roar_cmd(*args: str, cwd: Path, check: bool = True) -> subprocess.CompletedProcess: """Run a roar command using the current Python interpreter.""" + if args and args[0] in {"run", "build"}: + _ensure_repo_local_ptrace_tracer() command = [sys.executable, "-m", "roar", *args] result = subprocess.run( command, diff --git a/tests/execution/runtime/test_sitecustomize_perf.py b/tests/execution/runtime/test_sitecustomize_perf.py index b44c091e..29e035e6 100644 --- a/tests/execution/runtime/test_sitecustomize_perf.py +++ b/tests/execution/runtime/test_sitecustomize_perf.py @@ -1,4 +1,10 @@ -"""Unit tests for sitecustomize.py startup performance optimizations.""" +""" +Opt-in diagnostic tests for sitecustomize.py startup performance. + +These checks track coarse performance budgets and are intentionally excluded +from the default pytest profile. Run them explicitly when working on startup +or atexit overhead in the runtime inject path. +""" import os import subprocess @@ -6,12 +12,15 @@ import time from pathlib import Path +import pytest + from roar.execution.runtime.inject.tracker import ( get_installed_packages, get_used_packages, ) INJECT_DIR = Path(__file__).resolve().parents[3] / "roar" / "execution" / "runtime" / "inject" +pytestmark = pytest.mark.diagnostic def _roar_env(*, log_file: str | None = None) -> dict: diff --git a/tests/happy_path/test_dag_command.py b/tests/happy_path/test_dag_command.py index ff9274d5..df2115a4 100644 --- a/tests/happy_path/test_dag_command.py +++ b/tests/happy_path/test_dag_command.py @@ -9,44 +9,53 @@ import pytest -@pytest.mark.happy_path -class TestDagCommand: - """Test roar dag command functionality.""" +def _run_preprocess(roar_cli, git_commit, python_exe) -> None: + result = roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") + assert result.returncode == 0 + git_commit("After preprocess") - def test_dag_shows_linear_pipeline( - self, - temp_git_repo, - roar_cli, - git_commit, - sample_scripts, - sample_data, - python_exe, - ): - """ - Run preprocess -> train and verify dag output shows 2 steps. - Given: A linear pipeline with 2 steps - When: Running roar dag - Then: Output should show both steps with proper metrics - """ - # Step 1: Preprocess - result = roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - assert result.returncode == 0 - git_commit("After preprocess") +def _run_train(roar_cli, git_commit, python_exe) -> None: + result = roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") + assert result.returncode == 0 + git_commit("After train") - # Step 2: Train - result = roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - assert result.returncode == 0 - git_commit("After train") - # Run dag command - result = roar_cli("dag", "--no-color") - assert result.returncode == 0 +def _run_evaluate(roar_cli, git_commit, python_exe) -> None: + result = roar_cli("run", python_exe, "evaluate.py", "model.pkl", "test.csv", "metrics.json") + assert result.returncode == 0 + git_commit("After evaluate") - output = result.stdout - assert "Pipeline: 2 steps" in output - assert "@1" in output - assert "@2" in output + +def _rerun_preprocess_with_modified_input( + temp_git_repo, + roar_cli, + git_commit, + python_exe, + updated_input: str, +) -> None: + (temp_git_repo / "input.csv").write_text(updated_input) + git_commit("Modified input") + + result = roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") + assert result.returncode == 0 + git_commit("Rerun preprocess") + + +def _write_script(temp_git_repo, git_commit, name: str, body: str, commit_message: str) -> None: + (temp_git_repo / name).write_text(body) + git_commit(commit_message) + + +def _run_roar_and_commit(roar_cli, git_commit, commit_message: str, *args: str) -> None: + result = roar_cli(*args) + assert result.returncode == 0 + git_commit(commit_message) + + +@pytest.mark.happy_path +class TestDagCommand: + """Test roar dag command functionality.""" def test_dag_json_output( self, @@ -65,9 +74,7 @@ def test_dag_json_output( Then: Output should be valid JSON with expected structure """ # Run a simple step - result = roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - assert result.returncode == 0 - git_commit("After preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) # Run dag with JSON output result = roar_cli("dag", "--json") @@ -108,19 +115,15 @@ def test_dag_shows_stale_steps( Then: The train step should be marked as stale """ # Initial pipeline - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") - - # Modify preprocess output - (temp_git_repo / "input.csv").write_text("id,value\n1,modified\n2,data\n") - git_commit("Modified input") - - # Rerun preprocess - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("Rerun preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) + _rerun_preprocess_with_modified_input( + temp_git_repo, + roar_cli, + git_commit, + python_exe, + "id,value\n1,modified\n2,data\n", + ) # Check dag - train should be stale result = roar_cli("dag", "--json") @@ -150,8 +153,7 @@ def test_dag_no_color_output( When: Running roar dag --no-color Then: Output should not contain ANSI escape codes """ - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) result = roar_cli("dag", "--no-color") assert result.returncode == 0 @@ -176,14 +178,16 @@ def test_dag_expanded_view( Then: All executions should be visible """ # Initial run - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess 1") + _run_preprocess(roar_cli, git_commit, python_exe) # Modify and rerun - (temp_git_repo / "input.csv").write_text("id,value\n1,modified\n") - git_commit("Modified input") - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess 2") + _rerun_preprocess_with_modified_input( + temp_git_repo, + roar_cli, + git_commit, + python_exe, + "id,value\n1,modified\n", + ) # Check expanded view result = roar_cli("dag", "--expanded", "--json") @@ -196,46 +200,6 @@ def test_dag_expanded_view( preprocess_nodes = [n for n in dag_data["nodes"] if "preprocess.py" in n["command"]] assert len(preprocess_nodes) == 2 - def test_dag_shows_diamond_pattern( - self, - temp_git_repo, - roar_cli, - git_commit, - sample_scripts, - sample_data, - python_exe, - ): - """ - Verify dag correctly shows diamond pattern dependencies. - - Given: A diamond pipeline pattern - When: Running roar dag --json - Then: Dependencies should be correctly tracked - """ - # Create diamond: input -> features_a/features_b -> combine - roar_cli("run", python_exe, "extract_features_a.py", "input.csv", "features_a.csv") - git_commit("After features_a") - - roar_cli("run", python_exe, "extract_features_b.py", "input.csv", "features_b.csv") - git_commit("After features_b") - - roar_cli( - "run", python_exe, "combine.py", "features_a.csv", "features_b.csv", "combined.json" - ) - git_commit("After combine") - - result = roar_cli("dag", "--json") - assert result.returncode == 0 - - dag_data = json.loads(result.stdout) - assert dag_data["total_steps"] == 3 - - # Find the combine step - combine_step = next((n for n in dag_data["nodes"] if "combine.py" in n["command"]), None) - assert combine_step is not None - assert combine_step["metrics"]["consumed"] == 2 - assert len(combine_step["dependencies"]) == 2 - def test_dag_empty_session( self, temp_git_repo, @@ -280,8 +244,7 @@ def test_dag_multi_model_comparison( """ # Create model training scripts for model_type in ["rf", "xgb", "nn"]: - script = temp_git_repo / f"train_{model_type}.py" - script.write_text(f''' + (temp_git_repo / f"train_{model_type}.py").write_text(f''' import sys import json @@ -300,8 +263,11 @@ def test_dag_multi_model_comparison( git_commit("Add model training scripts") # Create select_best script - select_best = temp_git_repo / "select_best.py" - select_best.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "select_best.py", + """ import sys import json @@ -316,22 +282,31 @@ def test_dag_multi_model_comparison( json.dump({"selected": "rf", "models": combined}, f) print("Selected best model -> model.pkl") -""") - git_commit("Add select_best script") +""", + "Add select_best script", + ) # Run training jobs in parallel (different models) - roar_cli("run", python_exe, "train_rf.py", "model_rf.pkl") - git_commit("After train_rf") - - roar_cli("run", python_exe, "train_xgb.py", "model_xgb.pkl") - git_commit("After train_xgb") - - roar_cli("run", python_exe, "train_nn.py", "model_nn.pkl") - git_commit("After train_nn") + _run_roar_and_commit( + roar_cli, git_commit, "After train_rf", "run", python_exe, "train_rf.py", "model_rf.pkl" + ) + _run_roar_and_commit( + roar_cli, + git_commit, + "After train_xgb", + "run", + python_exe, + "train_xgb.py", + "model_xgb.pkl", + ) + _run_roar_and_commit( + roar_cli, git_commit, "After train_nn", "run", python_exe, "train_nn.py", "model_nn.pkl" + ) # Select best - roar_cli("run", python_exe, "select_best.py") - git_commit("After select_best") + _run_roar_and_commit( + roar_cli, git_commit, "After select_best", "run", python_exe, "select_best.py" + ) # Check DAG result = roar_cli("dag", "--json") @@ -364,17 +339,42 @@ def test_dag_feature_engineering_fan_in( """ # Use existing extract_features_* scripts from sample_scripts # Run feature extraction (fan-out from input) - roar_cli("run", python_exe, "extract_features_a.py", "input.csv", "features_a.csv") - git_commit("After features_a") - - roar_cli("run", python_exe, "extract_features_b.py", "input.csv", "features_b.csv") - git_commit("After features_b") - - roar_cli("run", python_exe, "extract_features_c.py", "input.csv", "features_c.csv") - git_commit("After features_c") + _run_roar_and_commit( + roar_cli, + git_commit, + "After features_a", + "run", + python_exe, + "extract_features_a.py", + "input.csv", + "features_a.csv", + ) + _run_roar_and_commit( + roar_cli, + git_commit, + "After features_b", + "run", + python_exe, + "extract_features_b.py", + "input.csv", + "features_b.csv", + ) + _run_roar_and_commit( + roar_cli, + git_commit, + "After features_c", + "run", + python_exe, + "extract_features_c.py", + "input.csv", + "features_c.csv", + ) # Combine features (fan-in) - roar_cli( + _run_roar_and_commit( + roar_cli, + git_commit, + "After combine", "run", python_exe, "combine.py", @@ -383,11 +383,18 @@ def test_dag_feature_engineering_fan_in( "features_c.csv", "combined.json", ) - git_commit("After combine") # Train on combined features - roar_cli("run", python_exe, "train.py", "combined.json", "model.pkl") - git_commit("After train") + _run_roar_and_commit( + roar_cli, + git_commit, + "After train", + "run", + python_exe, + "train.py", + "combined.json", + "model.pkl", + ) result = roar_cli("dag", "--json") assert result.returncode == 0 @@ -418,8 +425,7 @@ def test_dag_ensemble_model( """ # Create base model training scripts for model_name in ["a", "b", "c"]: - script = temp_git_repo / f"train_model_{model_name}.py" - script.write_text(f""" + (temp_git_repo / f"train_model_{model_name}.py").write_text(f""" import sys import json @@ -435,8 +441,11 @@ def test_dag_ensemble_model( git_commit("Add base model scripts") # Create ensemble script - ensemble = temp_git_repo / "train_ensemble.py" - ensemble.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "train_ensemble.py", + """ import json models = {} @@ -453,22 +462,25 @@ def test_dag_ensemble_model( json.dump(ensemble_model, f) print("Trained ensemble model") -""") - git_commit("Add ensemble script") +""", + "Add ensemble script", + ) # Train base models - roar_cli("run", python_exe, "train_model_a.py") - git_commit("After model_a") - - roar_cli("run", python_exe, "train_model_b.py") - git_commit("After model_b") - - roar_cli("run", python_exe, "train_model_c.py") - git_commit("After model_c") + _run_roar_and_commit( + roar_cli, git_commit, "After model_a", "run", python_exe, "train_model_a.py" + ) + _run_roar_and_commit( + roar_cli, git_commit, "After model_b", "run", python_exe, "train_model_b.py" + ) + _run_roar_and_commit( + roar_cli, git_commit, "After model_c", "run", python_exe, "train_model_c.py" + ) # Train ensemble - roar_cli("run", python_exe, "train_ensemble.py") - git_commit("After ensemble") + _run_roar_and_commit( + roar_cli, git_commit, "After ensemble", "run", python_exe, "train_ensemble.py" + ) result = roar_cli("dag", "--json") assert result.returncode == 0 @@ -501,8 +513,11 @@ def test_dag_build_steps( Then: @B prefix should appear for build steps """ # Create a simple build script - build_script = temp_git_repo / "setup_env.py" - build_script.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "setup_env.py", + """ import json # Simulate setting up environment @@ -511,16 +526,21 @@ def test_dag_build_steps( json.dump(config, f) print("Environment setup complete") -""") - git_commit("Add build script") +""", + "Add build script", + ) # Run build step - roar_cli("build", python_exe, "setup_env.py") - git_commit("After build") + _run_roar_and_commit( + roar_cli, git_commit, "After build", "build", python_exe, "setup_env.py" + ) # Run a regular step that uses the config - run_script = temp_git_repo / "use_config.py" - run_script.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "use_config.py", + """ import json with open("config.json", "r") as f: @@ -530,11 +550,13 @@ def test_dag_build_steps( json.dump({"used_config": config}, f) print("Used config") -""") - git_commit("Add use_config script") +""", + "Add use_config script", + ) - roar_cli("run", python_exe, "use_config.py") - git_commit("After use_config") + _run_roar_and_commit( + roar_cli, git_commit, "After use_config", "run", python_exe, "use_config.py" + ) result = roar_cli("dag", "--no-color") assert result.returncode == 0 @@ -561,21 +583,16 @@ def test_dag_cascade_invalidation( Then: Downstream steps should be marked stale with correct stale_count """ # Build pipeline: preprocess -> train -> evaluate - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") - - roar_cli("run", python_exe, "evaluate.py", "model.pkl", "test.csv", "metrics.json") - git_commit("After evaluate") - - # Modify input and rerun preprocess - (temp_git_repo / "input.csv").write_text("id,value\n1,new_data\n2,changed\n") - git_commit("Modified input") - - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("Rerun preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) + _run_evaluate(roar_cli, git_commit, python_exe) + _rerun_preprocess_with_modified_input( + temp_git_repo, + roar_cli, + git_commit, + python_exe, + "id,value\n1,new_data\n2,changed\n", + ) # Check cascade invalidation result = roar_cli("dag", "--json") @@ -609,8 +626,11 @@ def test_dag_partial_branch_invalidation( Then: Only affected branch should be marked stale """ # Create split script - split_script = temp_git_repo / "split.py" - split_script.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "split.py", + """ import sys with open("input.csv", "r") as f: @@ -624,46 +644,55 @@ def test_dag_partial_branch_invalidation( f.write(data[len(data)//2:]) print("Split input into part_a and part_b") -""") - git_commit("Add split script") +""", + "Add split script", + ) # Build diamond: split -> (train_a, train_b) -> merge - roar_cli("run", python_exe, "split.py") - git_commit("After split") + _run_roar_and_commit(roar_cli, git_commit, "After split", "run", python_exe, "split.py") # Create train_a script - train_a = temp_git_repo / "train_a.py" - train_a.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "train_a.py", + """ import json with open("part_a.csv", "r") as f: data = f.read() with open("model_a.pkl", "w") as f: json.dump({"model": "a", "hash": hash(data)}, f) print("Trained model_a") -""") - git_commit("Add train_a script") +""", + "Add train_a script", + ) - roar_cli("run", python_exe, "train_a.py") - git_commit("After train_a") + _run_roar_and_commit(roar_cli, git_commit, "After train_a", "run", python_exe, "train_a.py") # Create train_b script - train_b = temp_git_repo / "train_b.py" - train_b.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "train_b.py", + """ import json with open("part_b.csv", "r") as f: data = f.read() with open("model_b.pkl", "w") as f: json.dump({"model": "b", "hash": hash(data)}, f) print("Trained model_b") -""") - git_commit("Add train_b script") +""", + "Add train_b script", + ) - roar_cli("run", python_exe, "train_b.py") - git_commit("After train_b") + _run_roar_and_commit(roar_cli, git_commit, "After train_b", "run", python_exe, "train_b.py") # Create merge script - merge_script = temp_git_repo / "merge_models.py" - merge_script.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "merge_models.py", + """ import json with open("model_a.pkl", "r") as f: model_a = json.load(f) @@ -672,18 +701,19 @@ def test_dag_partial_branch_invalidation( with open("final_model.pkl", "w") as f: json.dump({"a": model_a, "b": model_b}, f) print("Merged models") -""") - git_commit("Add merge script") +""", + "Add merge script", + ) - roar_cli("run", python_exe, "merge_models.py") - git_commit("After merge") + _run_roar_and_commit( + roar_cli, git_commit, "After merge", "run", python_exe, "merge_models.py" + ) # Rerun only train_a (branch A) (temp_git_repo / "part_a.csv").write_text("modified,data\n") git_commit("Modified part_a") - roar_cli("run", python_exe, "train_a.py") - git_commit("Rerun train_a") + _run_roar_and_commit(roar_cli, git_commit, "Rerun train_a", "run", python_exe, "train_a.py") # Check partial invalidation result = roar_cli("dag", "--json") @@ -713,8 +743,11 @@ def test_dag_checkpoint_recovery( Then: Both executions should appear in expanded view """ # Create a script that can fail - fail_script = temp_git_repo / "might_fail.py" - fail_script.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "might_fail.py", + """ import sys import os @@ -733,19 +766,34 @@ def test_dag_checkpoint_recovery( with open(output_file, "w") as f: f.write(data.upper()) print(f"Success: wrote {output_file}") -""") - git_commit("Add might_fail script") +""", + "Add might_fail script", + ) # First run (success for setup) - roar_cli("run", python_exe, "might_fail.py", "output.json") - git_commit("After first run") + _run_roar_and_commit( + roar_cli, + git_commit, + "After first run", + "run", + python_exe, + "might_fail.py", + "output.json", + ) # Modify input and rerun (temp_git_repo / "input.csv").write_text("id,value\n1,retry\n") git_commit("Modified input") - roar_cli("run", python_exe, "might_fail.py", "output.json") - git_commit("After second run") + _run_roar_and_commit( + roar_cli, + git_commit, + "After second run", + "run", + python_exe, + "might_fail.py", + "output.json", + ) # Check expanded view shows both executions result = roar_cli("dag", "--expanded", "--json") @@ -796,8 +844,9 @@ def test_dag_large_pipeline( # Run all steps for i in range(1, 13): - roar_cli("run", python_exe, f"step_{i}.py") - git_commit(f"After step_{i}") + _run_roar_and_commit( + roar_cli, git_commit, f"After step_{i}", "run", python_exe, f"step_{i}.py" + ) result = roar_cli("dag", "--json") assert result.returncode == 0 @@ -843,8 +892,9 @@ def test_dag_deep_nesting( # Run all levels for i in range(1, 7): - roar_cli("run", python_exe, f"level_{i}.py") - git_commit(f"After level_{i}") + _run_roar_and_commit( + roar_cli, git_commit, f"After level_{i}", "run", python_exe, f"level_{i}.py" + ) result = roar_cli("dag", "--no-color") assert result.returncode == 0 @@ -885,11 +935,8 @@ def test_dag_named_steps( the JSON structure includes the field. """ # Run steps - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) # Verify JSON includes step_name field in structure result = roar_cli("dag", "--json") @@ -924,8 +971,11 @@ def test_dag_multiple_artifacts( Then: All artifacts should be listed """ # Create script that produces multiple outputs - multi_output = temp_git_repo / "multi_output.py" - multi_output.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "multi_output.py", + """ import json with open("input.csv", "r") as f: @@ -942,11 +992,13 @@ def test_dag_multiple_artifacts( json.dump({"part": 3, "data": data[20:]}, f) print("Produced 3 outputs") -""") - git_commit("Add multi_output script") +""", + "Add multi_output script", + ) - roar_cli("run", python_exe, "multi_output.py") - git_commit("After multi_output") + _run_roar_and_commit( + roar_cli, git_commit, "After multi_output", "run", python_exe, "multi_output.py" + ) result = roar_cli("dag", "--json") assert result.returncode == 0 @@ -979,8 +1031,11 @@ def test_dag_long_command_truncation( Then: Command should be truncated with '...' """ # Create script with long name and arguments - long_script = temp_git_repo / "very_long_script_name_that_goes_on_and_on.py" - long_script.write_text(""" + _write_script( + temp_git_repo, + git_commit, + "very_long_script_name_that_goes_on_and_on.py", + """ import sys with open("input.csv", "r") as f: @@ -990,13 +1045,21 @@ def test_dag_long_command_truncation( f.write(data + " " + " ".join(sys.argv[1:])) print("Done") -""") - git_commit("Add long script") +""", + "Add long script", + ) # Run with very long arguments long_args = ["--parameter-" + str(i) + "=value" + str(i) for i in range(1, 20)] - roar_cli("run", python_exe, "very_long_script_name_that_goes_on_and_on.py", *long_args) - git_commit("After long command") + _run_roar_and_commit( + roar_cli, + git_commit, + "After long command", + "run", + python_exe, + "very_long_script_name_that_goes_on_and_on.py", + *long_args, + ) result = roar_cli("dag", "--no-color") assert result.returncode == 0 @@ -1033,8 +1096,7 @@ def test_dag_artifact_states_in_json( When: Running roar dag --json Then: Artifacts should have state, artifact_id, consumer_steps, is_terminal fields """ - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) result = roar_cli("dag", "--json") assert result.returncode == 0 @@ -1069,18 +1131,15 @@ def test_dag_stale_artifact_state( Then: Artifacts from stale steps should have state="stale" """ # Build pipeline - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") - - # Modify input and rerun preprocess to make train stale - (temp_git_repo / "input.csv").write_text("id,value\n1,modified\n2,data\n") - git_commit("Modified input") - - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("Rerun preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) + _rerun_preprocess_with_modified_input( + temp_git_repo, + roar_cli, + git_commit, + python_exe, + "id,value\n1,modified\n2,data\n", + ) result = roar_cli("dag", "--json") assert result.returncode == 0 @@ -1109,11 +1168,8 @@ def test_dag_show_artifacts_option( When: Running roar dag --show-artifacts --json Then: Intermediate artifacts should be included """ - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) # Without --show-artifacts, only terminal artifacts shown result = roar_cli("dag", "--json") @@ -1150,18 +1206,15 @@ def test_dag_stale_only_option( Then: Only stale steps and artifacts should be shown """ # Build pipeline - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") - - # Rerun preprocess to make train stale - (temp_git_repo / "input.csv").write_text("id,value\n1,new\n") - git_commit("Modified input") - - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("Rerun preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) + _rerun_preprocess_with_modified_input( + temp_git_repo, + roar_cli, + git_commit, + python_exe, + "id,value\n1,new\n", + ) # Without --stale-only result = roar_cli("dag", "--json") @@ -1196,11 +1249,8 @@ def test_dag_artifact_consumer_tracking( When: Running roar dag --show-artifacts --json Then: processed.csv artifact should have train step in consumer_steps """ - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) result = roar_cli("dag", "--show-artifacts", "--json") assert result.returncode == 0 @@ -1234,18 +1284,15 @@ def test_dag_text_output_shows_artifact_states( Then: Stale artifacts should show [stale] marker """ # Build pipeline - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") - - # Rerun preprocess to make train stale - (temp_git_repo / "input.csv").write_text("id,value\n1,modified\n") - git_commit("Modified input") - - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("Rerun preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) + _rerun_preprocess_with_modified_input( + temp_git_repo, + roar_cli, + git_commit, + python_exe, + "id,value\n1,modified\n", + ) result = roar_cli("dag", "--no-color") assert result.returncode == 0 @@ -1270,8 +1317,7 @@ def test_dag_active_artifact_state( When: Running roar dag --json Then: Terminal artifacts should have state="active" """ - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) result = roar_cli("dag", "--json") assert result.returncode == 0 @@ -1303,21 +1349,16 @@ def test_dag_superseded_propagates_downstream( Expected: artifact_a = superseded, artifact_b = superseded """ # Build initial pipeline: preprocess -> train -> evaluate - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("After preprocess") - - roar_cli("run", python_exe, "train.py", "processed.csv", "model.pkl") - git_commit("After train") - - roar_cli("run", python_exe, "evaluate.py", "model.pkl", "test.csv", "metrics.json") - git_commit("After evaluate") - - # Modify input and rerun preprocess (this creates a superseded version) - (temp_git_repo / "input.csv").write_text("id,value\n1,new_data\n2,changed\n") - git_commit("Modified input") - - roar_cli("run", python_exe, "preprocess.py", "input.csv", "processed.csv") - git_commit("Rerun preprocess") + _run_preprocess(roar_cli, git_commit, python_exe) + _run_train(roar_cli, git_commit, python_exe) + _run_evaluate(roar_cli, git_commit, python_exe) + _rerun_preprocess_with_modified_input( + temp_git_repo, + roar_cli, + git_commit, + python_exe, + "id,value\n1,new_data\n2,changed\n", + ) # Check expanded view - artifacts from superseded executions should be superseded result = roar_cli("dag", "--expanded", "--show-artifacts", "--json") diff --git a/tests/happy_path/test_label_command.py b/tests/happy_path/test_label_command.py index 3f50b405..f321b7a2 100644 --- a/tests/happy_path/test_label_command.py +++ b/tests/happy_path/test_label_command.py @@ -55,6 +55,57 @@ def _artifact_label_rows_by_hash( class TestLabelCommand: """CLI product-path tests for label lifecycle behavior.""" + def test_detected_dataset_composite_artifact_gets_auto_labels( + self, + temp_git_repo, + roar_cli, + git_commit, + python_exe, + ): + dataset_script = temp_git_repo / "emit_dataset.py" + dataset_script.write_text( + "\n".join( + [ + "from __future__ import annotations", + "", + "from pathlib import Path", + "import argparse", + "", + "parser = argparse.ArgumentParser()", + "parser.add_argument('--output-dir', required=True)", + "args = parser.parse_args()", + "", + "root = Path(args.output_dir)", + "(root / 'train').mkdir(parents=True, exist_ok=True)", + "(root / 'train' / 'part-00000.csv').write_text('value\\n1\\n', encoding='utf-8')", + "(root / 'train' / 'part-00001.csv').write_text('value\\n2\\n', encoding='utf-8')", + ] + ), + encoding="utf-8", + ) + git_commit("Add dataset emitter") + + result = roar_cli("run", python_exe, "emit_dataset.py", "--output-dir", "dataset") + assert result.returncode == 0 + + dataset_root = temp_git_repo / "dataset" + + label_show = _assert_ok(roar_cli("label", "show", "artifact", "dataset", check=False)) + assert f"dataset.id={dataset_root.resolve().as_uri()}" in label_show + assert "dataset.modality=tabular" in label_show + assert "dataset.type=dataset" in label_show + + show_output = _assert_ok(roar_cli("show", "dataset", check=False)) + assert "Labels:" in show_output + assert f"dataset.id={dataset_root.resolve().as_uri()}" in show_output + assert "dataset.modality=tabular" in show_output + assert "dataset.type=dataset" in show_output + + rows = _artifact_label_rows(temp_git_repo, dataset_root) + assert rows[0][1]["dataset"]["type"] == "dataset" + assert rows[0][1]["dataset"]["id"] == dataset_root.resolve().as_uri() + assert rows[0][1]["dataset"]["modality"] == "tabular" + def test_artifact_label_set_patches_current_document_and_preserves_history( self, temp_git_repo, diff --git a/tests/integration/fake_glaas.py b/tests/integration/fake_glaas.py new file mode 100644 index 00000000..b727ce70 --- /dev/null +++ b/tests/integration/fake_glaas.py @@ -0,0 +1,246 @@ +"""Minimal fake GLaaS server for local publish-path integration tests.""" + +from __future__ import annotations + +import json +import re +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any + + +class _FakeGlaasServer(ThreadingHTTPServer): + def __init__(self) -> None: + super().__init__(("127.0.0.1", 0), _FakeGlaasHandler) + self.health_checks = 0 + self.session_registrations: list[dict[str, Any]] = [] + self.job_batches: list[dict[str, Any]] = [] + self.job_creates: list[dict[str, Any]] = [] + self.artifact_batches: list[list[dict[str, Any]]] = [] + self.input_links: list[dict[str, Any]] = [] + self.output_links: list[dict[str, Any]] = [] + self.label_syncs: list[list[dict[str, Any]]] = [] + self.composite_registrations: list[dict[str, Any]] = [] + self.artifacts_by_digest: dict[str, dict[str, Any]] = {} + self.artifact_dags_by_digest: dict[str, dict[str, Any]] = {} + self._next_job_id = 1 + + @property + def base_url(self) -> str: + return f"http://127.0.0.1:{self.server_address[1]}" + + def allocate_job_id(self) -> int: + job_id = self._next_job_id + self._next_job_id += 1 + return job_id + + +class _FakeGlaasHandler(BaseHTTPRequestHandler): + server: _FakeGlaasServer + + def _read_json(self) -> dict[str, Any]: + content_length = int(self.headers.get("Content-Length", "0")) + if content_length <= 0: + return {} + raw = self.rfile.read(content_length) + if not raw: + return {} + return json.loads(raw.decode("utf-8")) + + def _write_json(self, status_code: int, payload: dict[str, Any]) -> None: + body = json.dumps(payload).encode("utf-8") + self.send_response(status_code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self) -> None: + if self.path == "/api/v1/health": + self.server.health_checks += 1 + self._write_json(200, {"success": True, "status": "healthy"}) + return + + artifact_match = re.fullmatch(r"/api/v1/artifacts/([0-9a-f]+)", self.path) + if artifact_match: + prefix = artifact_match.group(1) + for digest in sorted(self.server.artifacts_by_digest): + if digest.startswith(prefix): + self._write_json(200, {"hash": digest}) + return + self._write_json(404, {"error": f"Artifact not found: {prefix}"}) + return + + artifact_dag_match = re.fullmatch(r"/api/v1/artifacts/([0-9a-f]+)/dag", self.path) + if artifact_dag_match: + prefix = artifact_dag_match.group(1) + for digest in sorted(self.server.artifact_dags_by_digest): + if digest.startswith(prefix): + self._write_json(200, self.server.artifact_dags_by_digest[digest]) + return + self._write_json(404, {"error": f"Artifact DAG not found: {prefix}"}) + return + + self._write_json(404, {"error": f"Unhandled GET path: {self.path}"}) + + def do_POST(self) -> None: + payload = self._read_json() + + if self.path == "/api/v1/sessions": + self.server.session_registrations.append(payload) + session_hash = str(payload.get("hash", "")) + self._write_json( + 200, + { + "hash": session_hash, + "url": f"{self.server.base_url}/dag/{session_hash}", + "created": True, + }, + ) + return + + if self.path == "/api/v1/artifacts/batch": + artifacts = payload.get("artifacts", []) + if isinstance(artifacts, list): + self.server.artifact_batches.append(artifacts) + for artifact in artifacts: + hashes = artifact.get("hashes", []) + if not isinstance(hashes, list): + continue + for entry in hashes: + if not isinstance(entry, dict): + continue + digest = entry.get("digest") + if isinstance(digest, str) and digest: + self.server.artifacts_by_digest[digest] = artifact + self._write_json(200, {"created": len(artifacts), "existing": 0}) + return + + if self.path == "/api/v1/labels/sync": + labels = payload.get("labels", []) + if isinstance(labels, list): + self.server.label_syncs.append(labels) + self._write_json( + 200, + {"created": 0, "updated": 0, "unchanged": len(labels)}, + ) + return + + if self.path == "/api/v1/artifacts/composites": + self.server.composite_registrations.append(payload) + self._write_json( + 200, + { + "artifact_id": f"composite-{len(self.server.composite_registrations)}", + "created": True, + }, + ) + return + + batch_match = re.fullmatch(r"/api/v1/sessions/([0-9a-f]+)/jobs/batch", self.path) + if batch_match: + session_hash = batch_match.group(1) + jobs = payload.get("jobs", []) + if not isinstance(jobs, list): + jobs = [] + self.server.job_batches.append({"session_hash": session_hash, "jobs": jobs}) + job_ids = [self.server.allocate_job_id() for _job in jobs] + self._write_json(200, {"job_ids": job_ids, "errors": []}) + return + + job_match = re.fullmatch(r"/api/v1/sessions/([0-9a-f]+)/jobs", self.path) + if job_match: + session_hash = job_match.group(1) + self.server.job_creates.append({"session_hash": session_hash, "job": payload}) + self._write_json(200, {"id": self.server.allocate_job_id()}) + return + + input_match = re.fullmatch(r"/api/v1/sessions/([0-9a-f]+)/jobs/([^/]+)/inputs", self.path) + if input_match: + session_hash, job_uid = input_match.groups() + artifacts = payload.get("artifacts", []) + if not isinstance(artifacts, list): + artifacts = [] + self.server.input_links.append( + {"session_hash": session_hash, "job_uid": job_uid, "artifacts": artifacts} + ) + self._write_json(200, {"job_uid": job_uid, "inputs_linked": len(artifacts)}) + return + + output_match = re.fullmatch( + r"/api/v1/sessions/([0-9a-f]+)/jobs/([^/]+)/outputs", + self.path, + ) + if output_match: + session_hash, job_uid = output_match.groups() + artifacts = payload.get("artifacts", []) + if not isinstance(artifacts, list): + artifacts = [] + self.server.output_links.append( + {"session_hash": session_hash, "job_uid": job_uid, "artifacts": artifacts} + ) + self._write_json(200, {"job_uid": job_uid, "outputs_linked": len(artifacts)}) + return + + self._write_json(404, {"error": f"Unhandled POST path: {self.path}"}) + + def log_message(self, format: str, *args: object) -> None: + """Suppress default stderr logging for integration tests.""" + + +class FakeGlaasServer: + """Context-managed fake GLaaS server.""" + + def __init__(self) -> None: + self._server = _FakeGlaasServer() + self._thread: threading.Thread | None = None + + @property + def base_url(self) -> str: + return self._server.base_url + + @property + def health_checks(self) -> int: + return self._server.health_checks + + @property + def session_registrations(self) -> list[dict[str, Any]]: + return self._server.session_registrations + + @property + def job_batches(self) -> list[dict[str, Any]]: + return self._server.job_batches + + @property + def job_creates(self) -> list[dict[str, Any]]: + return self._server.job_creates + + @property + def artifact_batches(self) -> list[list[dict[str, Any]]]: + return self._server.artifact_batches + + @property + def input_links(self) -> list[dict[str, Any]]: + return self._server.input_links + + @property + def output_links(self) -> list[dict[str, Any]]: + return self._server.output_links + + @property + def label_syncs(self) -> list[list[dict[str, Any]]]: + return self._server.label_syncs + + def set_artifact_dag(self, digest: str, dag: dict[str, Any]) -> None: + self._server.artifact_dags_by_digest[digest] = dag + + def __enter__(self) -> FakeGlaasServer: + self._thread = threading.Thread(target=self._server.serve_forever, daemon=True) + self._thread.start() + return self + + def __exit__(self, *exc_info: object) -> None: + self._server.shutdown() + self._server.server_close() + if self._thread is not None: + self._thread.join(timeout=5) diff --git a/tests/integration/test_auth_cli.py b/tests/integration/test_auth_cli.py new file mode 100644 index 00000000..2f0db30d --- /dev/null +++ b/tests/integration/test_auth_cli.py @@ -0,0 +1,177 @@ +"""Product-path coverage for the `roar auth` CLI.""" + +from __future__ import annotations + +import json +import os +import shutil +import subprocess +import sys +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path + +import pytest + +pytestmark = pytest.mark.integration + + +class _FakeGlaasAuthServer(ThreadingHTTPServer): + auth_status_code = 404 + last_authorization: str | None = None + + +class _FakeGlaasAuthHandler(BaseHTTPRequestHandler): + server: _FakeGlaasAuthServer + + def do_GET(self) -> None: + if self.path == "/api/v1/health": + self._write_json(200, {"status": "ok"}) + return + + if self.path == "/api/v1/artifacts/00000000": + self.server.last_authorization = self.headers.get("Authorization") + if self.server.auth_status_code == 404 and self.server.last_authorization: + self._write_json(404, {"detail": "Not found"}) + return + + self._write_json(401, {"detail": "Unknown key"}) + return + + self._write_json(404, {"detail": "Not found"}) + + def log_message(self, format: str, *args: object) -> None: + return + + def _write_json(self, status: int, payload: dict[str, str]) -> None: + body = json.dumps(payload).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + +def _run_roar_auth( + *args: str, cwd: Path, env_overrides: dict[str, str] +) -> subprocess.CompletedProcess[str]: + env = dict(os.environ) + env.update(env_overrides) + return subprocess.run( + [sys.executable, "-m", "roar", "auth", *args], + cwd=cwd, + capture_output=True, + text=True, + env=env, + ) + + +@pytest.fixture +def ssh_keypair(tmp_path: Path) -> Path: + if shutil.which("ssh-keygen") is None: + pytest.skip("ssh-keygen is required for auth CLI tests") + + key_path = tmp_path / "id_ed25519" + subprocess.run( + ["ssh-keygen", "-q", "-t", "ed25519", "-N", "", "-f", str(key_path), "-C", "roar-test"], + check=True, + capture_output=True, + ) + return key_path + + +@pytest.fixture +def fake_glaas_auth_server() -> _FakeGlaasAuthServer: + server = _FakeGlaasAuthServer(("127.0.0.1", 0), _FakeGlaasAuthHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + yield server + finally: + server.shutdown() + server.server_close() + thread.join() + + +def test_auth_register_prints_public_key(temp_git_repo: Path, ssh_keypair: Path) -> None: + pubkey_path = Path(f"{ssh_keypair}.pub") + pubkey_content = pubkey_path.read_text().strip() + + result = _run_roar_auth( + "register", + cwd=temp_git_repo, + env_overrides={"ROAR_SSH_KEY": str(ssh_keypair)}, + ) + + assert result.returncode == 0, result.stderr + assert "Your SSH public key:" in result.stdout + assert pubkey_content in result.stdout + assert f"Path: {pubkey_path}" in result.stdout + + +def test_auth_status_prefers_env_glaas_url( + temp_git_repo: Path, + ssh_keypair: Path, + fake_glaas_auth_server: _FakeGlaasAuthServer, +) -> None: + fake_glaas_url = f"http://127.0.0.1:{fake_glaas_auth_server.server_address[1]}" + result = _run_roar_auth( + "status", + cwd=temp_git_repo, + env_overrides={ + "ROAR_SSH_KEY": str(ssh_keypair), + "GLAAS_URL": fake_glaas_url, + }, + ) + + assert result.returncode == 0, result.stderr + assert f"Server URL: {fake_glaas_url}" in result.stdout + assert f"SSH key: {ssh_keypair}.pub" in result.stdout + assert "Fingerprint: SHA256:" in result.stdout + assert "https://api.glaas.ai" not in result.stdout + + +def test_auth_test_succeeds_against_local_server( + temp_git_repo: Path, + ssh_keypair: Path, + fake_glaas_auth_server: _FakeGlaasAuthServer, +) -> None: + fake_glaas_url = f"http://127.0.0.1:{fake_glaas_auth_server.server_address[1]}" + result = _run_roar_auth( + "test", + cwd=temp_git_repo, + env_overrides={ + "ROAR_SSH_KEY": str(ssh_keypair), + "GLAAS_URL": fake_glaas_url, + }, + ) + + assert result.returncode == 0, result.stderr + assert f"Testing connection to {fake_glaas_url}..." in result.stdout + assert "Server is reachable." in result.stdout + assert "Testing authentication..." in result.stdout + assert "Authentication successful!" in result.stdout + assert fake_glaas_auth_server.last_authorization is not None + assert "Signature keyid=" in fake_glaas_auth_server.last_authorization + + +def test_auth_test_surfaces_unauthorized_response( + temp_git_repo: Path, + ssh_keypair: Path, + fake_glaas_auth_server: _FakeGlaasAuthServer, +) -> None: + fake_glaas_auth_server.auth_status_code = 401 + fake_glaas_url = f"http://127.0.0.1:{fake_glaas_auth_server.server_address[1]}" + result = _run_roar_auth( + "test", + cwd=temp_git_repo, + env_overrides={ + "ROAR_SSH_KEY": str(ssh_keypair), + "GLAAS_URL": fake_glaas_url, + }, + ) + + combined_output = f"{result.stdout}\n{result.stderr}" + assert result.returncode != 0 + assert "Authentication failed: Unknown key" in combined_output + assert fake_glaas_auth_server.last_authorization is not None diff --git a/tests/integration/test_cli_startup.py b/tests/integration/test_cli_startup.py index 3d94cb1e..ad29b970 100644 --- a/tests/integration/test_cli_startup.py +++ b/tests/integration/test_cli_startup.py @@ -1,7 +1,9 @@ """ -Integration test for CLI startup performance. +Opt-in diagnostic test for CLI startup performance. -Verifies that the CLI starts quickly, especially for --help. +This file is intentionally excluded from the default pytest profile because it +tracks an aspirational latency budget rather than a stable product contract. +Run it explicitly when working on CLI startup time. """ import subprocess diff --git a/tests/integration/test_cli_state_commands.py b/tests/integration/test_cli_state_commands.py new file mode 100644 index 00000000..2a3dc32a --- /dev/null +++ b/tests/integration/test_cli_state_commands.py @@ -0,0 +1,195 @@ +"""Product-path coverage for local stateful CLI commands.""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest + +pytestmark = pytest.mark.integration + + +def _query_rows(repo_path: Path, sql: str, params: tuple[object, ...] = ()) -> list[sqlite3.Row]: + db_path = repo_path / ".roar" / "roar.db" + assert db_path.exists(), ".roar/roar.db not found" + with sqlite3.connect(str(db_path)) as conn: + conn.row_factory = sqlite3.Row + return conn.execute(sql, params).fetchall() + + +def _active_session(repo_path: Path) -> sqlite3.Row: + rows = _query_rows( + repo_path, + "SELECT id, is_active, current_step FROM sessions WHERE is_active = 1 ORDER BY id DESC LIMIT 1", + ) + assert rows, "Expected an active session" + return rows[0] + + +def test_config_set_get_and_list_round_trip(temp_git_repo: Path, roar_cli) -> None: + set_result = roar_cli("config", "set", "output.quiet", "true") + assert set_result.returncode == 0 + assert "Set output.quiet = True" in set_result.stdout + + get_result = roar_cli("config", "get", "output.quiet") + assert get_result.returncode == 0 + assert "output.quiet: True" in get_result.stdout + + list_result = roar_cli("config", "list") + assert list_result.returncode == 0 + assert "output.quiet" in list_result.stdout + + +def test_env_set_get_list_and_unset_round_trip(temp_git_repo: Path, roar_cli) -> None: + set_result = roar_cli("env", "set", "FOO", "bar") + assert set_result.returncode == 0 + assert "Set FOO=bar" in set_result.stdout + + get_result = roar_cli("env", "get", "FOO") + assert get_result.returncode == 0 + assert get_result.stdout.strip() == "bar" + + list_result = roar_cli("env", "list") + assert list_result.returncode == 0 + assert "FOO=bar" in list_result.stdout + + unset_result = roar_cli("env", "unset", "FOO") + assert unset_result.returncode == 0 + assert "Unset FOO" in unset_result.stdout + + list_after_unset = roar_cli("env", "list") + assert list_after_unset.returncode == 0 + assert "No environment variables set." in list_after_unset.stdout + + +def test_reset_creates_and_rotates_active_session(temp_git_repo: Path, roar_cli) -> None: + sessions_before_reset = _query_rows( + temp_git_repo, + "SELECT id, is_active, current_step FROM sessions ORDER BY id", + ) + assert sessions_before_reset == [] + + first_reset_result = roar_cli("reset", "-y") + assert first_reset_result.returncode == 0 + assert "Created new session" in first_reset_result.stdout + + first_active_session = _active_session(temp_git_repo) + assert first_active_session["current_step"] == 1 + + second_reset_result = roar_cli("reset", "-y") + assert second_reset_result.returncode == 0 + assert "Current session has 0 step(s)." in second_reset_result.stdout + assert f"Deactivated session {first_active_session['id']}." in second_reset_result.stdout + assert "Created new session" in second_reset_result.stdout + + sessions_after_reset = _query_rows( + temp_git_repo, + "SELECT id, is_active, current_step FROM sessions ORDER BY id", + ) + active_sessions = [row for row in sessions_after_reset if row["is_active"] == 1] + assert len(active_sessions) == 1 + assert active_sessions[0]["id"] != first_active_session["id"] + assert active_sessions[0]["current_step"] == 1 + + previous_session = next( + row for row in sessions_after_reset if row["id"] == first_active_session["id"] + ) + assert previous_session["is_active"] == 0 + + log_result = roar_cli("log") + assert log_result.returncode == 0 + assert "No log entries found." in log_result.stdout + + +def test_reset_rotates_active_session_after_recorded_job( + temp_git_repo: Path, + roar_cli, + git_commit, + python_exe: str, +) -> None: + script = temp_git_repo / "simple.py" + script.write_text("with open('result.txt', 'w') as f: f.write('ok')\n") + git_commit("Add reset fixture") + + run_result = roar_cli("run", python_exe, "simple.py") + assert run_result.returncode == 0 + assert (temp_git_repo / "result.txt").exists() + + previous_active = _active_session(temp_git_repo) + jobs_before_reset = _query_rows( + temp_git_repo, + "SELECT id FROM jobs WHERE session_id = ? ORDER BY id", + (previous_active["id"],), + ) + assert len(jobs_before_reset) == 1 + + reset_result = roar_cli("reset", "-y") + assert reset_result.returncode == 0 + assert "Current session has 1 step(s)." in reset_result.stdout + assert f"Deactivated session {previous_active['id']}." in reset_result.stdout + assert "Created new session" in reset_result.stdout + + active_after_reset = _active_session(temp_git_repo) + assert active_after_reset["id"] != previous_active["id"] + assert active_after_reset["current_step"] == 1 + + log_result = roar_cli("log") + assert log_result.returncode == 0 + assert "No log entries found." in log_result.stdout + + +def test_pop_reports_when_no_active_session(temp_git_repo: Path, roar_cli) -> None: + pop_result = roar_cli("pop", "-y") + assert pop_result.returncode == 0 + assert "No active session." in pop_result.stdout + + +def test_pop_reports_when_active_session_has_no_jobs(temp_git_repo: Path, roar_cli) -> None: + reset_result = roar_cli("reset", "-y") + assert reset_result.returncode == 0 + + pop_result = roar_cli("pop", "-y") + assert pop_result.returncode == 0 + assert "No jobs in the active session." in pop_result.stdout + + +def test_pop_removes_latest_job_and_output_file( + temp_git_repo: Path, + roar_cli, + git_commit, + python_exe: str, +) -> None: + output_path = temp_git_repo / "result.txt" + script = temp_git_repo / "simple.py" + script.write_text("with open('result.txt', 'w') as f: f.write('ok')\n") + git_commit("Add pop fixture") + + run_result = roar_cli("run", python_exe, "simple.py") + assert run_result.returncode == 0 + assert output_path.exists() + + active_session = _active_session(temp_git_repo) + jobs_before_pop = _query_rows( + temp_git_repo, + "SELECT id FROM jobs WHERE session_id = ? ORDER BY id", + (active_session["id"],), + ) + assert len(jobs_before_pop) == 1 + + pop_result = roar_cli("pop", "-y") + assert pop_result.returncode == 0 + assert "Removed job" in pop_result.stdout + assert "Deleted 1 output file(s)." in pop_result.stdout + assert not output_path.exists() + + jobs_after_pop = _query_rows( + temp_git_repo, + "SELECT id FROM jobs WHERE session_id = ? ORDER BY id", + (active_session["id"],), + ) + assert jobs_after_pop == [] + + pop_again_result = roar_cli("pop", "-y") + assert pop_again_result.returncode == 0 + assert "No jobs in the active session." in pop_again_result.stdout diff --git a/tests/integration/test_get_cli_integration.py b/tests/integration/test_get_cli_integration.py new file mode 100644 index 00000000..86e3db09 --- /dev/null +++ b/tests/integration/test_get_cli_integration.py @@ -0,0 +1,164 @@ +"""Product-path coverage for the local `roar get` CLI.""" + +from __future__ import annotations + +import json +import sqlite3 +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path + +import pytest + +pytestmark = pytest.mark.integration + + +class _DownloadHandler(BaseHTTPRequestHandler): + """Serve static download payloads for local `roar get` tests.""" + + def _write_headers(self, status_code: int, content: bytes) -> None: + self.send_response(status_code) + self.send_header("Content-Length", str(len(content))) + self.end_headers() + + def do_GET(self) -> None: + content = self.server.payloads.get(self.path) # type: ignore[attr-defined] + if content is None: + self._write_headers(404, b"") + return + + self._write_headers(200, content) + self.wfile.write(content) + + def do_HEAD(self) -> None: + content = self.server.payloads.get(self.path) # type: ignore[attr-defined] + if content is None: + self._write_headers(404, b"") + return + + self._write_headers(200, content) + + def log_message(self, format: str, *args: object) -> None: + """Suppress test-server stderr noise.""" + + +class _DownloadServer: + def __init__(self, payloads: dict[str, bytes]): + self._server = ThreadingHTTPServer(("127.0.0.1", 0), _DownloadHandler) + self._server.payloads = payloads # type: ignore[attr-defined] + self._thread: threading.Thread | None = None + + def url_for(self, path: str) -> str: + return f"http://127.0.0.1:{self._server.server_address[1]}{path}" + + def __enter__(self) -> _DownloadServer: + self._thread = threading.Thread(target=self._server.serve_forever, daemon=True) + self._thread.start() + return self + + def __exit__(self, *exc_info: object) -> None: + self._server.shutdown() + self._server.server_close() + if self._thread is not None: + self._thread.join(timeout=5) + + +@pytest.fixture +def download_server() -> _DownloadServer: + with _DownloadServer( + { + "/artifacts/model.bin": b"pretrained-weights", + "/artifacts/weights.bin": b"weights-for-dry-run", + } + ) as server: + yield server + + +def _query_rows(repo_path: Path, sql: str, params: tuple[object, ...] = ()) -> list[sqlite3.Row]: + db_path = repo_path / ".roar" / "roar.db" + assert db_path.exists(), ".roar/roar.db not found" + with sqlite3.connect(str(db_path)) as conn: + conn.row_factory = sqlite3.Row + return conn.execute(sql, params).fetchall() + + +def test_get_downloads_default_filename_and_records_local_job( + temp_git_repo: Path, + roar_cli, + download_server: _DownloadServer, +) -> None: + source_url = download_server.url_for("/artifacts/model.bin") + + result = roar_cli("get", source_url, "-m", "download model") + + assert result.returncode == 0 + assert f"Downloaded 1 file(s) from {source_url}" in result.stdout + assert "Job created: step" in result.stdout + + downloaded_path = temp_git_repo / "model.bin" + assert downloaded_path.exists() + assert downloaded_path.read_bytes() == b"pretrained-weights" + + jobs = _query_rows( + temp_git_repo, + "SELECT id, job_type, command, metadata FROM jobs ORDER BY id", + ) + assert len(jobs) == 1 + assert jobs[0]["job_type"] == "get" + assert source_url in jobs[0]["command"] + + metadata = json.loads(jobs[0]["metadata"]) + assert metadata["get"]["source"] == source_url + assert metadata["get"]["message"] == "download model" + + +def test_get_dry_run_does_not_download_or_record_job( + temp_git_repo: Path, + roar_cli, + download_server: _DownloadServer, +) -> None: + source_url = download_server.url_for("/artifacts/weights.bin") + + result = roar_cli("get", source_url, "--dry-run") + + assert result.returncode == 0 + assert "Dry run - would download:" in result.stdout + assert str(temp_git_repo / "weights.bin") in result.stdout + assert not (temp_git_repo / "weights.bin").exists() + + jobs = _query_rows(temp_git_repo, "SELECT id FROM jobs ORDER BY id") + assert jobs == [] + + +def test_get_then_run_creates_local_lineage_chain( + temp_git_repo: Path, + roar_cli, + git_commit, + python_exe: str, + download_server: _DownloadServer, +) -> None: + script = temp_git_repo / "consume_model.py" + script.write_text( + "from pathlib import Path\n" + "data = Path('model.bin').read_bytes()\n" + "Path('report.bin').write_bytes(data + b'::consumed')\n" + ) + git_commit("Add consumer script") + + source_url = download_server.url_for("/artifacts/model.bin") + get_result = roar_cli("get", source_url) + assert get_result.returncode == 0 + assert (temp_git_repo / "model.bin").read_bytes() == b"pretrained-weights" + + git_commit("Commit downloaded artifact") + + run_result = roar_cli("run", python_exe, "consume_model.py") + assert run_result.returncode == 0 + assert (temp_git_repo / "report.bin").read_bytes() == b"pretrained-weights::consumed" + + lineage_result = roar_cli("lineage", "report.bin") + lineage = json.loads(lineage_result.stdout) + + commands = [job["command"] for job in lineage["jobs"]] + assert any("roar get" in command and source_url in command for command in commands) + assert any("consume_model.py" in command for command in commands) diff --git a/tests/integration/test_init_cli.py b/tests/integration/test_init_cli.py new file mode 100644 index 00000000..7651e2d9 --- /dev/null +++ b/tests/integration/test_init_cli.py @@ -0,0 +1,65 @@ +"""Product-path coverage for the `roar init` CLI.""" + +from __future__ import annotations + +import os +import subprocess +import sys +from pathlib import Path + +import pytest + +pytestmark = pytest.mark.integration + + +def _run_roar_init(*args: str, cwd: Path) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [sys.executable, "-m", "roar", "init", *args], + cwd=cwd, + capture_output=True, + text=True, + env=dict(os.environ), + ) + + +def test_init_with_yes_adds_roar_to_gitignore(tmp_path: Path) -> None: + subprocess.run(["git", "init"], cwd=tmp_path, check=True, capture_output=True) + gitignore_path = tmp_path / ".gitignore" + gitignore_path.write_text("# test ignore rules\n") + + result = _run_roar_init("-y", cwd=tmp_path) + + assert result.returncode == 0, result.stderr + assert (tmp_path / ".roar").is_dir() + assert (tmp_path / ".roar" / "roar.db").is_file() + assert (tmp_path / ".roar" / "config.toml").is_file() + assert "Added .roar/ to .gitignore" in result.stdout + assert gitignore_path.read_text().endswith(".roar/\n") + + +def test_init_with_no_preserves_gitignore(tmp_path: Path) -> None: + subprocess.run(["git", "init"], cwd=tmp_path, check=True, capture_output=True) + gitignore_path = tmp_path / ".gitignore" + original_gitignore = "# ignore build artifacts\n" + gitignore_path.write_text(original_gitignore) + + result = _run_roar_init("-n", cwd=tmp_path) + + assert result.returncode == 0, result.stderr + assert (tmp_path / ".roar").is_dir() + assert "Skipped .gitignore update." in result.stdout + assert gitignore_path.read_text() == original_gitignore + + +def test_init_with_path_initializes_target_directory(tmp_path: Path) -> None: + project_dir = tmp_path / "project" + project_dir.mkdir() + + result = _run_roar_init("--path", str(project_dir), "-n", cwd=tmp_path) + + assert result.returncode == 0, result.stderr + assert (project_dir / ".roar").is_dir() + assert (project_dir / ".roar" / "roar.db").is_file() + assert (project_dir / ".roar" / "config.toml").is_file() + assert not (tmp_path / ".roar").exists() + assert "Created" in result.stdout diff --git a/tests/integration/test_proxy_cli_integration.py b/tests/integration/test_proxy_cli_integration.py new file mode 100644 index 00000000..20dc94bb --- /dev/null +++ b/tests/integration/test_proxy_cli_integration.py @@ -0,0 +1,40 @@ +"""Product-path coverage for `roar proxy` configuration commands.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +pytestmark = pytest.mark.integration + + +def test_proxy_enable_disable_round_trip_updates_status_and_config( + temp_git_repo: Path, + roar_cli, +) -> None: + config_path = temp_git_repo / ".roar" / "config.toml" + + initial_status = roar_cli("proxy") + assert initial_status.returncode == 0 + assert "Proxy enabled: False" in initial_status.stdout + + enable_result = roar_cli("proxy", "enable") + assert enable_result.returncode == 0 + assert "S3 proxy enabled for roar run." in enable_result.stdout + assert str(config_path) in enable_result.stdout + assert "enabled = true" in config_path.read_text() + + status_after_enable = roar_cli("proxy", "status") + assert status_after_enable.returncode == 0 + assert "Proxy enabled: True" in status_after_enable.stdout + + disable_result = roar_cli("proxy", "disable") + assert disable_result.returncode == 0 + assert "S3 proxy disabled." in disable_result.stdout + assert str(config_path) in disable_result.stdout + assert "enabled = true" not in config_path.read_text() + + status_after_disable = roar_cli("proxy") + assert status_after_disable.returncode == 0 + assert "Proxy enabled: False" in status_after_disable.stdout diff --git a/tests/integration/test_put_cli_integration.py b/tests/integration/test_put_cli_integration.py new file mode 100644 index 00000000..872c1834 --- /dev/null +++ b/tests/integration/test_put_cli_integration.py @@ -0,0 +1,147 @@ +"""Product-path coverage for local `roar put` flows.""" + +from __future__ import annotations + +import json +import subprocess +from pathlib import Path +from typing import Any + +import pytest + +from .fake_glaas import FakeGlaasServer + +pytestmark = pytest.mark.integration + + +@pytest.fixture +def fake_glaas_publish_server() -> FakeGlaasServer: + with FakeGlaasServer() as server: + yield server + + +def _configure_put_repo(repo: Path, roar_cli, fake_glaas_url: str) -> None: + subprocess.run( + ["git", "remote", "add", "origin", "https://github.com/test/repo.git"], + cwd=repo, + capture_output=True, + check=True, + ) + roar_cli("config", "set", "glaas.url", fake_glaas_url) + roar_cli("config", "set", "glaas.web_url", fake_glaas_url) + + +def _create_repo_with_outputs( + repo: Path, + *, + roar_cli, + git_commit, + python_exe: str, +) -> None: + script = repo / "train.py" + script.write_text( + "from pathlib import Path\n" + "Path('model.pt').write_bytes(b'fake model weights' * 10)\n" + "Path('metrics.json').write_text('{\"accuracy\": 0.95}')\n" + ) + git_commit("Add training script") + + run_result = roar_cli("run", python_exe, "train.py") + assert run_result.returncode == 0 + + git_commit("Add training outputs") + + +def _get_dag(roar_cli) -> dict[str, Any]: + return json.loads(roar_cli("dag", "--json", "--show-artifacts").stdout) + + +def test_put_registers_lineage_with_fake_glaas_and_updates_local_dag( + temp_git_repo: Path, + roar_cli, + git_commit, + python_exe: str, + monkeypatch, + fake_glaas_publish_server: FakeGlaasServer, +) -> None: + _configure_put_repo(temp_git_repo, roar_cli, fake_glaas_publish_server.base_url) + _create_repo_with_outputs( + temp_git_repo, + roar_cli=roar_cli, + git_commit=git_commit, + python_exe=python_exe, + ) + monkeypatch.setenv("ROAR_PUT_SKIP_UPLOAD", "1") + + result = roar_cli("put", "model.pt", "s3://test-bucket/models", "-m", "publish model") + + assert result.returncode == 0 + assert "Published 1 file(s) to s3://test-bucket/models" in result.stdout + assert "model.pt -> s3://test-bucket/models/model.pt" in result.stdout + assert "Job created: step 2" in result.stdout + assert f"View: {fake_glaas_publish_server.base_url}/dag/" in result.stdout + + dag = _get_dag(roar_cli) + assert dag["total_steps"] == 2 + nodes_by_step = {node["step_number"]: node for node in dag["nodes"]} + put_node = nodes_by_step[2] + assert 'roar put model.pt -m "publish model"' in put_node["command"] + assert put_node["metrics"]["inputs"] >= 1 + assert put_node["metrics"]["outputs"] == 0 + assert 1 in put_node["dependencies"] + + assert fake_glaas_publish_server.health_checks >= 1 + assert len(fake_glaas_publish_server.session_registrations) == 1 + assert len(fake_glaas_publish_server.job_batches) == 1 + assert len(fake_glaas_publish_server.job_creates) == 1 + assert len(fake_glaas_publish_server.artifact_batches) >= 1 + assert fake_glaas_publish_server.input_links + assert fake_glaas_publish_server.output_links + + batch_jobs = fake_glaas_publish_server.job_batches[0]["jobs"] + assert any(job.get("job_type") == "run" for job in batch_jobs) + assert fake_glaas_publish_server.job_creates[0]["job"]["job_type"] == "put" + + +def test_put_dry_run_does_not_create_local_or_remote_publish_jobs( + temp_git_repo: Path, + roar_cli, + git_commit, + python_exe: str, + monkeypatch, + fake_glaas_publish_server: FakeGlaasServer, +) -> None: + _configure_put_repo(temp_git_repo, roar_cli, fake_glaas_publish_server.base_url) + _create_repo_with_outputs( + temp_git_repo, + roar_cli=roar_cli, + git_commit=git_commit, + python_exe=python_exe, + ) + monkeypatch.setenv("ROAR_PUT_SKIP_UPLOAD", "1") + + dag_before = _get_dag(roar_cli) + + result = roar_cli( + "put", + "model.pt", + "metrics.json", + "s3://bucket/test", + "-m", + "test", + "--dry-run", + ) + + assert result.returncode == 0 + assert "Dry run - would upload:" in result.stdout + assert "model.pt" in result.stdout + assert "metrics.json" in result.stdout + assert "Total: 2 file(s)" in result.stdout + + dag_after = _get_dag(roar_cli) + assert dag_after["total_steps"] == dag_before["total_steps"] == 1 + assert len(fake_glaas_publish_server.job_batches) == 0 + assert len(fake_glaas_publish_server.job_creates) == 0 + assert len(fake_glaas_publish_server.artifact_batches) == 0 + assert len(fake_glaas_publish_server.input_links) == 0 + assert len(fake_glaas_publish_server.output_links) == 0 diff --git a/tests/integration/test_register_dry_run_cli.py b/tests/integration/test_register_dry_run_cli.py new file mode 100644 index 00000000..686f62b4 --- /dev/null +++ b/tests/integration/test_register_dry_run_cli.py @@ -0,0 +1,151 @@ +"""Product-path coverage for local `roar register --dry-run` targets.""" + +from __future__ import annotations + +import re +import sqlite3 +import subprocess +from pathlib import Path + +import pytest + +from .fake_glaas import FakeGlaasServer + +pytestmark = pytest.mark.integration + + +@pytest.fixture +def fake_glaas_publish_server() -> FakeGlaasServer: + with FakeGlaasServer() as server: + yield server + + +def _query_rows(repo_path: Path, sql: str, params: tuple[object, ...] = ()) -> list[sqlite3.Row]: + db_path = repo_path / ".roar" / "roar.db" + assert db_path.exists(), ".roar/roar.db not found" + with sqlite3.connect(str(db_path)) as conn: + conn.row_factory = sqlite3.Row + return conn.execute(sql, params).fetchall() + + +def _parse_dry_run_counts(output: str) -> dict[str, int]: + counts: dict[str, int] = {} + for key in ("Jobs", "Artifacts", "Links"): + match = re.search(rf"{key}: (\d+)", output) + assert match is not None, f"Missing {key} count in output: {output}" + counts[key.lower()] = int(match.group(1)) + return counts + + +def _parse_session_hash(output: str) -> str: + match = re.search(r"Session:\s+https://glaas\.ai/dag/([0-9a-f]+)", output) + assert match is not None, f"Missing session URL in output: {output}" + return match.group(1) + + +def _configure_register_repo(repo: Path, roar_cli, fake_glaas_url: str) -> None: + subprocess.run( + ["git", "remote", "add", "origin", "https://github.com/test/repo.git"], + cwd=repo, + capture_output=True, + check=True, + ) + roar_cli("config", "set", "glaas.url", fake_glaas_url) + roar_cli("config", "set", "glaas.web_url", fake_glaas_url) + + +def test_register_dry_run_resolves_artifact_step_and_session_targets( + temp_git_repo: Path, + roar_cli, + git_commit, + python_exe: str, +) -> None: + script = temp_git_repo / "generate_report.py" + script.write_text("from pathlib import Path\nPath('report.txt').write_text('register me')\n") + git_commit("Add register fixture") + + run_result = roar_cli("run", python_exe, "generate_report.py") + assert run_result.returncode == 0 + assert (temp_git_repo / "report.txt").read_text() == "register me" + + active_sessions = _query_rows( + temp_git_repo, + "SELECT hash FROM sessions WHERE is_active = 1 ORDER BY id DESC LIMIT 1", + ) + assert active_sessions + session_hash = active_sessions[0]["hash"] + assert isinstance(session_hash, str) and len(session_hash) >= 12 + + artifact_result = roar_cli("register", "--dry-run", "report.txt") + step_result = roar_cli("register", "--dry-run", "@1") + session_result = roar_cli("register", "--dry-run", session_hash) + + for result in (artifact_result, step_result, session_result): + assert result.returncode == 0 + assert "Dry run - would register:" in result.stdout + assert "View on GLaaS:" in result.stdout + + published_session_hashes = { + _parse_session_hash(artifact_result.stdout), + _parse_session_hash(step_result.stdout), + _parse_session_hash(session_result.stdout), + } + assert len(published_session_hashes) == 1 + + artifact_counts = _parse_dry_run_counts(artifact_result.stdout) + step_counts = _parse_dry_run_counts(step_result.stdout) + session_counts = _parse_dry_run_counts(session_result.stdout) + + assert artifact_counts == step_counts == session_counts + assert artifact_counts["jobs"] == 1 + assert artifact_counts["artifacts"] >= 1 + assert artifact_counts["links"] >= 1 + + +def test_register_publishes_local_lineage_with_fake_glaas( + temp_git_repo: Path, + roar_cli, + git_commit, + python_exe: str, + fake_glaas_publish_server: FakeGlaasServer, +) -> None: + _configure_register_repo(temp_git_repo, roar_cli, fake_glaas_publish_server.base_url) + + input_path = temp_git_repo / "input.txt" + input_path.write_text("register me\n") + script = temp_git_repo / "generate_report.py" + script.write_text( + "from pathlib import Path\n" + "content = Path('input.txt').read_text()\n" + "Path('report.txt').write_text(content.upper())\n" + ) + git_commit("Add register publish fixture") + + run_result = roar_cli("run", python_exe, "generate_report.py") + assert run_result.returncode == 0 + assert (temp_git_repo / "report.txt").read_text() == "REGISTER ME\n" + + git_commit("Commit tracked report") + + result = roar_cli("register", "report.txt", "--yes") + + assert result.returncode == 0 + assert "Registered lineage for: report.txt" in result.stdout + assert "Jobs: 1" in result.stdout + assert "Artifacts:" in result.stdout + assert "Links:" in result.stdout + assert "To reproduce this artifact:" in result.stdout + assert "roar reproduce " in result.stdout + assert "View on GLaaS:" in result.stdout + + assert fake_glaas_publish_server.health_checks >= 1 + assert len(fake_glaas_publish_server.session_registrations) == 1 + assert len(fake_glaas_publish_server.job_batches) == 1 + assert len(fake_glaas_publish_server.job_creates) == 0 + assert len(fake_glaas_publish_server.artifact_batches) >= 1 + assert fake_glaas_publish_server.input_links + assert fake_glaas_publish_server.output_links + + registered_jobs = fake_glaas_publish_server.job_batches[0]["jobs"] + assert len(registered_jobs) == 1 + assert registered_jobs[0]["job_type"] == "run" diff --git a/tests/integration/test_reproduce_packages.py b/tests/integration/test_reproduce_packages.py index 72cfa691..35f0b96f 100644 --- a/tests/integration/test_reproduce_packages.py +++ b/tests/integration/test_reproduce_packages.py @@ -15,6 +15,8 @@ from roar.core.logging import NullLogger, get_logger from roar.execution.reproduction.environment_setup import EnvironmentSetupService +from .fake_glaas import FakeGlaasServer + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -38,6 +40,11 @@ def _make_pipeline(build_steps=None, run_steps=None): class TestReproducePackageExtraction: """Integration tests for package extraction during reproduction.""" + @pytest.fixture + def fake_glaas_server(self): + with FakeGlaasServer() as server: + yield server + def test_preview_shows_pip_packages(self, temp_git_repo, roar_cli, git_commit, python_exe): """Running roar reproduce should show pip packages in preview.""" # Create a script that imports a package (sys is always available) @@ -67,6 +74,58 @@ def test_preview_shows_pip_packages(self, temp_git_repo, roar_cli, git_commit, p # Output should contain artifact hash info at minimum assert artifact_hash[:12] in preview.stdout or "Artifact" in preview.stdout + def test_preview_can_write_dag_output_via_fake_glaas( + self, + temp_git_repo, + roar_cli, + git_commit, + python_exe, + fake_glaas_server, + ): + """Preview can export DAG JSON locally when a GLaaS endpoint is configured.""" + script = temp_git_repo / "use_pkg.py" + script.write_text( + "import json\nwith open('out.json', 'w') as f:\n json.dump({'v': 1}, f)\n" + ) + git_commit("Add reproduce out fixture") + + result = roar_cli("run", python_exe, "use_pkg.py") + assert result.returncode == 0 + git_commit("After run") + + lineage_result = roar_cli("lineage", "out.json") + lineage = json.loads(lineage_result.stdout) + artifact_hash = lineage["artifact"]["hash"] + dag_path = temp_git_repo / "dag.json" + + fake_glaas_server.set_artifact_dag( + artifact_hash, + { + "artifact": {"hash": artifact_hash}, + "gitRepo": "https://github.com/test/repo.git", + "gitCommit": "abc123def4567890", + "jobs": [], + "external_deps": [], + "is_external": False, + }, + ) + roar_cli("config", "set", "glaas.url", fake_glaas_server.base_url) + + preview = roar_cli( + "reproduce", + artifact_hash[:12], + "--list-requirements", + "--out", + str(dag_path), + check=False, + ) + + assert preview.returncode == 0 + assert f"DAG lineage response written to {dag_path}" in preview.stdout + assert "Pipeline Preview" in preview.stdout + written = json.loads(dag_path.read_text()) + assert written["artifact"]["hash"] == artifact_hash + def test_preview_shows_dpkg_packages_when_present( self, temp_git_repo, roar_cli, git_commit, python_exe ): diff --git a/tests/integration/test_reproduce_run_cli.py b/tests/integration/test_reproduce_run_cli.py new file mode 100644 index 00000000..d83b51f5 --- /dev/null +++ b/tests/integration/test_reproduce_run_cli.py @@ -0,0 +1,75 @@ +"""Product-path coverage for local `roar reproduce --run` flows.""" + +from __future__ import annotations + +import json +import sqlite3 +import subprocess +from pathlib import Path + +import pytest + +pytestmark = pytest.mark.integration + + +def _query_rows(repo_path: Path, sql: str, params: tuple[object, ...] = ()) -> list[sqlite3.Row]: + db_path = repo_path / ".roar" / "roar.db" + assert db_path.exists(), ".roar/roar.db not found" + with sqlite3.connect(str(db_path)) as conn: + conn.row_factory = sqlite3.Row + return conn.execute(sql, params).fetchall() + + +def test_reproduce_run_reuses_matching_local_repo_and_recreates_artifact( + temp_git_repo: Path, + roar_cli, + git_commit, + python_exe: str, +) -> None: + local_remote = f"file://{temp_git_repo}" + subprocess.run( + ["git", "remote", "add", "origin", local_remote], + cwd=temp_git_repo, + capture_output=True, + check=True, + ) + + script = temp_git_repo / "generate_model.py" + script.write_text( + "from pathlib import Path\nPath('model.bin').write_bytes(b'local-reproduction-payload')\n" + ) + git_commit("Add reproduction fixture") + + run_result = roar_cli("run", python_exe, "generate_model.py") + assert run_result.returncode == 0 + + original_output = temp_git_repo / "model.bin" + original_bytes = original_output.read_bytes() + assert original_bytes == b"local-reproduction-payload" + + lineage_result = roar_cli("lineage", "model.bin") + lineage = json.loads(lineage_result.stdout) + artifact_hash = lineage["artifact"]["hash"] + assert len(artifact_hash) >= 12 + + active_sessions = _query_rows( + temp_git_repo, + "SELECT git_repo, git_commit_start FROM sessions WHERE is_active = 1 ORDER BY id DESC LIMIT 1", + ) + assert active_sessions + assert active_sessions[0]["git_repo"] == local_remote + assert active_sessions[0]["git_commit_start"] is not None + + git_commit("Commit generated artifact") + original_output.unlink() + assert not original_output.exists() + + reproduce_result = roar_cli("reproduce", artifact_hash[:12], "--run", "-y") + + assert reproduce_result.returncode == 0 + assert "Current repository matches artifact remote, using existing environment" in ( + reproduce_result.stdout + ) + assert "Reproduction Complete" in reproduce_result.stdout + assert "Steps run: 1/1" in reproduce_result.stdout + assert original_output.read_bytes() == original_bytes diff --git a/tests/integration/test_tracer_cli_integration.py b/tests/integration/test_tracer_cli_integration.py new file mode 100644 index 00000000..f656e958 --- /dev/null +++ b/tests/integration/test_tracer_cli_integration.py @@ -0,0 +1,139 @@ +"""Product-path coverage for the `roar tracer` CLI.""" + +from __future__ import annotations + +import os +import stat +import subprocess +import sys +from pathlib import Path + +import pytest + +import tests.conftest as test_conftest + +pytestmark = pytest.mark.integration + + +def _run_roar_tracer( + *args: str, + cwd: Path, + env_overrides: dict[str, str] | None = None, +) -> subprocess.CompletedProcess[str]: + env = dict(os.environ) + if env_overrides: + env.update(env_overrides) + return subprocess.run( + [sys.executable, "-m", "roar", "tracer", *args], + cwd=cwd, + capture_output=True, + text=True, + env=env, + ) + + +def _write_executable(path: Path, body: str = "#!/bin/sh\nexit 0\n") -> Path: + path.write_text(body) + path.chmod(path.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + return path.resolve() + + +def _repo_local_ptrace_binary() -> str | None: + for candidate in ( + test_conftest.RELEASE_BIN_DIR / "roar-tracer", + test_conftest.PACKAGE_BIN_DIR / "roar-tracer", + ): + if candidate.exists(): + return str(candidate.resolve()) + return None + + +def _expected_ptrace_binary(path_bin_dir: Path | None = None) -> str: + repo_local = _repo_local_ptrace_binary() + if repo_local is not None: + return repo_local + assert path_bin_dir is not None + return str((path_bin_dir / "roar-tracer").resolve()) + + +def test_tracer_status_reports_configured_default_and_repo_local_ptrace_binary( + temp_git_repo: Path, + roar_cli, + tmp_path: Path, +) -> None: + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + _write_executable(bin_dir / "roar-tracer") + expected_ptrace = _expected_ptrace_binary(bin_dir) + + set_result = roar_cli("tracer", "ptrace") + assert set_result.returncode == 0 + + result = _run_roar_tracer( + "status", + cwd=temp_git_repo, + env_overrides={"PATH": str(bin_dir)}, + ) + + assert result.returncode == 0, result.stderr + assert "Default tracer: ptrace" in result.stdout + assert "Fallback enabled: True" in result.stdout + assert "Proxy enabled: False" in result.stdout + assert f"ptrace: {expected_ptrace}" in result.stdout + assert " ebpf:" in result.stdout + assert " preload:" in result.stdout + assert " roard:" in result.stdout + + +def test_tracer_check_uses_configured_default_backend_and_repo_local_binary( + temp_git_repo: Path, + roar_cli, + tmp_path: Path, +) -> None: + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + _write_executable(bin_dir / "roar-tracer") + expected_ptrace = _expected_ptrace_binary(bin_dir) + + roar_cli("tracer", "ptrace") + + result = _run_roar_tracer( + "check", + cwd=temp_git_repo, + env_overrides={"PATH": str(bin_dir)}, + ) + + assert result.returncode == 0, result.stderr + assert f"Tracer check passed for 'ptrace': {expected_ptrace}" in result.stdout + + +def test_tracer_check_prefers_repo_local_binary_over_path_override( + temp_git_repo: Path, + roar_cli, + tmp_path: Path, +) -> None: + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + _write_executable(bin_dir / "roar-tracer") + expected_ptrace = _expected_ptrace_binary(bin_dir) + + roar_cli("tracer", "ptrace") + + result = _run_roar_tracer( + "check", + "--backend", + "ptrace", + cwd=temp_git_repo, + env_overrides={"PATH": str(bin_dir)}, + ) + + assert result.returncode == 0, result.stderr + assert f"Tracer check passed for 'ptrace': {expected_ptrace}" in result.stdout + + +def test_tracer_setup_without_subcommand_shows_help(temp_git_repo: Path) -> None: + result = _run_roar_tracer("setup", cwd=temp_git_repo) + + assert result.returncode == 0, result.stderr + assert "Set up tracer backends." in result.stdout + assert "ebpf" in result.stdout diff --git a/tests/unit/get/test_get_cli.py b/tests/unit/get/test_get_cli.py index e0d6a0b3..1a962c86 100644 --- a/tests/unit/get/test_get_cli.py +++ b/tests/unit/get/test_get_cli.py @@ -5,7 +5,6 @@ from click.testing import CliRunner -from roar.application.get.results import GetDownloadedFile, GetDryRunItem, GetResponse from roar.cli.commands.get import get @@ -37,59 +36,3 @@ def test_cli_surfaces_application_errors(tmp_path: Path) -> None: assert result.exit_code != 0 assert "Unsupported" in result.output - - -def test_cli_prints_dry_run_plan(tmp_path: Path) -> None: - runner = CliRunner() - response = GetResponse( - success=True, - source="s3://bucket/model.pt", - dry_run=True, - would_download=[ - GetDryRunItem( - remote_url="s3://bucket/model.pt", - local_path=str(tmp_path / "model.pt"), - ) - ], - ) - - with patch("roar.cli.commands.get.get_artifacts", return_value=response): - result = runner.invoke( - get, - ["s3://bucket/model.pt", "--dry-run"], - obj=_mock_context(tmp_path), - ) - - assert result.exit_code == 0 - assert "Dry run - would download:" in result.output - assert "s3://bucket/model.pt" in result.output - - -def test_cli_prints_success_tag_and_warnings(tmp_path: Path) -> None: - runner = CliRunner() - response = GetResponse( - success=True, - source="s3://bucket/model.pt", - job_id=7, - downloaded_files=[ - GetDownloadedFile( - remote_url="s3://bucket/model.pt", - local_path=str(tmp_path / "model.pt"), - ) - ], - git_tag="roar/deadbeef", - warnings=["Could not create git tag: warning"], - ) - - with patch("roar.cli.commands.get.get_artifacts", return_value=response): - result = runner.invoke( - get, - ["s3://bucket/model.pt", str(tmp_path / "model.pt"), "--tag"], - obj=_mock_context(tmp_path), - ) - - assert result.exit_code == 0 - assert "Created git tag: roar/deadbeef" in result.output - assert "Downloaded 1 file(s) from s3://bucket/model.pt" in result.output - assert "Job created: step 7" in result.output - assert "Warning: Could not create git tag: warning" in result.stderr diff --git a/tests/unit/test_dataset_identifier.py b/tests/unit/test_dataset_identifier.py index a0ced680..ecd78532 100644 --- a/tests/unit/test_dataset_identifier.py +++ b/tests/unit/test_dataset_identifier.py @@ -138,6 +138,16 @@ def test_record_materializes_local_composite_outputs(tmp_path: Path): assert composite_artifact is not None assert composite_artifact["kind"] == "composite" + current_labels = db_ctx.labels.get_current( + "artifact", + artifact_id=composite_artifact["id"], + ) + assert current_labels is not None + assert current_labels["metadata"]["dataset"]["type"] == "dataset" + assert current_labels["metadata"]["dataset"]["id"] == dataset_dir.resolve().as_uri() + assert current_labels["metadata"]["dataset"]["modality"] == "tabular" + assert current_labels["metadata"]["dataset"]["fingerprint"] + summary = db_ctx.composites.get(composite_artifact["id"]) assert summary is not None assert summary["component_count"] == 2 diff --git a/tests/unit/test_dataset_label.py b/tests/unit/test_dataset_label.py index 447551bd..bd69ff48 100644 --- a/tests/unit/test_dataset_label.py +++ b/tests/unit/test_dataset_label.py @@ -3,6 +3,8 @@ from __future__ import annotations from roar.execution.recording.dataset_metadata import ( + AUTO_DATASET_LABEL_KEYS, + build_dataset_label_metadata, build_dataset_metadata, find_matching_identifier, ) @@ -126,3 +128,54 @@ def test_ignores_unknown_keys(self): result = build_dataset_metadata(identifier) assert "extra_key" not in result assert result["dataset_id"] == "file:///data/raw" + + +# --------------------------------------------------------------------------- +# build_dataset_label_metadata +# --------------------------------------------------------------------------- + + +class TestBuildDatasetLabelMetadata: + def test_includes_dataset_identity_and_modality_labels(self): + identifier = { + "dataset_id": "file:///data/imagenet", + "dataset_fingerprint": "a1b2c3d4e5f67890", + "dataset_fingerprint_algorithm": "blake3", + "split": "train", + "version_hint": "v2", + } + + result = build_dataset_label_metadata( + identifier, + components=[ + { + "relative_path": "train/class_a/image-0001.jpg", + "component_size": 42, + "component_type": "image/jpeg", + } + ], + component_count_total=1, + ) + + assert result == { + "dataset": { + "type": "dataset", + "id": "file:///data/imagenet", + "fingerprint": "a1b2c3d4e5f67890", + "fingerprint_algorithm": "blake3", + "split": "train", + "version_hint": "v2", + "modality": "image", + } + } + + def test_declares_reserved_paths_for_system_managed_dataset_labels(self): + assert { + "dataset.type", + "dataset.id", + "dataset.fingerprint", + "dataset.fingerprint_algorithm", + "dataset.split", + "dataset.version_hint", + "dataset.modality", + } == AUTO_DATASET_LABEL_KEYS diff --git a/tests/unit/test_job_recording.py b/tests/unit/test_job_recording.py index 530a0d88..5592b58e 100644 --- a/tests/unit/test_job_recording.py +++ b/tests/unit/test_job_recording.py @@ -129,3 +129,40 @@ def test_local_job_recorder_records_precomputed_output_artifacts(tmp_path: Path) assert job["job_type"] == "get" assert len(outputs) == 1 assert outputs[0]["path"] == str(local_file) + + +def test_local_job_recorder_creates_active_session_when_missing(tmp_path: Path) -> None: + repo_root = tmp_path / "repo" + repo_root.mkdir() + roar_dir = repo_root / ".roar" + roar_dir.mkdir() + local_file = repo_root / "downloaded.bin" + local_file.write_bytes(b"payload") + + with _create_database_context(roar_dir) as db_ctx: + recorder = LocalJobRecorder() + job_id, _job_uid = recorder.record( + db_ctx, + command="roar get https://example.com/downloaded.bin", + timestamp=1700000000.0, + metadata='{"get":{"source":"https://example.com/downloaded.bin"}}', + execution_backend="local", + execution_role="host", + job_type="get", + output_artifacts=[ + LocalRecordedArtifact( + path=str(local_file), + hashes={"blake3": "abc123"}, + size=local_file.stat().st_size, + ) + ], + exit_code=0, + ) + + job = db_ctx.jobs.get(job_id) + session = db_ctx.sessions.get_active() + + assert job is not None + assert session is not None + assert job["session_id"] == session["id"] + assert session["current_step"] == 1 diff --git a/tests/unit/test_label_service.py b/tests/unit/test_label_service.py new file mode 100644 index 00000000..66403970 --- /dev/null +++ b/tests/unit/test_label_service.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +import pytest + +from roar.application.labels import LabelService + + +def test_reject_reserved_keys_blocks_system_managed_dataset_labels() -> None: + with pytest.raises(ValueError, match="Reserved label keys cannot be set manually"): + LabelService._reject_reserved_keys( + { + "dataset": { + "id": "file:///data/train", + "modality": "tabular", + } + } + ) diff --git a/tests/unit/test_proxy_cli.py b/tests/unit/test_proxy_cli.py index 46f8ad51..a075ff6a 100644 --- a/tests/unit/test_proxy_cli.py +++ b/tests/unit/test_proxy_cli.py @@ -1,4 +1,4 @@ -"""Tests for the proxy CLI commands.""" +"""Tests for proxy CLI branches that are not worth hitting via subprocess.""" import importlib from unittest.mock import MagicMock, patch @@ -15,48 +15,36 @@ class TestProxyCli: def runner(self): return CliRunner() - def test_proxy_no_subcommand_shows_status(self, runner): - mock_svc = MagicMock() - mock_svc.find_proxy.return_value = "/usr/bin/roar-proxy" - mock_svc.get_daemon_status.return_value = None - - with ( - patch.object(proxy_cli_module, "config_get", return_value=False), - patch("roar.integrations.config.get_roar_dir", return_value="/tmp/.roar"), - patch("roar.execution.cluster.proxy.ProxyService", return_value=mock_svc), - ): - result = runner.invoke(proxy_cli_module.proxy) - assert result.exit_code == 0 - assert "Proxy enabled:" in result.output - - def test_proxy_enable(self, runner): - with patch.object(proxy_cli_module, "config_set", return_value=("/tmp/config.toml", True)): + def test_proxy_enable_surfaces_config_errors(self, runner): + with patch.object(proxy_cli_module, "config_set", side_effect=ValueError("bad config")): result = runner.invoke(proxy_cli_module.proxy, ["enable"]) - assert result.exit_code == 0 - assert "enabled" in result.output.lower() - proxy_cli_module.config_set.assert_called_once_with("proxy.enabled", "true") - def test_proxy_disable(self, runner): - with patch.object(proxy_cli_module, "config_set", return_value=("/tmp/config.toml", False)): + assert result.exit_code != 0 + assert "bad config" in result.output + + def test_proxy_disable_surfaces_config_errors(self, runner): + with patch.object(proxy_cli_module, "config_set", side_effect=ValueError("bad config")): result = runner.invoke(proxy_cli_module.proxy, ["disable"]) - assert result.exit_code == 0 - assert "disabled" in result.output.lower() - proxy_cli_module.config_set.assert_called_once_with("proxy.enabled", "false") - def test_proxy_start_calls_start_daemon(self, runner): + assert result.exit_code != 0 + assert "bad config" in result.output + + def test_proxy_status_handles_daemon_lookup_errors(self, runner): mock_svc = MagicMock() - mock_svc.get_daemon_status.return_value = None - mock_svc.start_daemon.return_value = {"pid": 123, "port": 9090, "started_at": 1.0} + mock_svc.find_proxy.return_value = None + mock_svc.get_daemon_status.side_effect = RuntimeError("boom") with ( + patch.object(proxy_cli_module, "config_get", return_value=False), patch("roar.integrations.config.get_roar_dir", return_value="/tmp/.roar"), patch("roar.execution.cluster.proxy.ProxyService", return_value=mock_svc), ): - result = runner.invoke(proxy_cli_module.proxy, ["start"]) - assert result.exit_code == 0 - assert "123" in result.output - assert "9090" in result.output - mock_svc.start_daemon.assert_called_once() + result = runner.invoke(proxy_cli_module.proxy) + + assert result.exit_code == 0 + assert "Proxy enabled: False" in result.output + assert "Binary: not found" in result.output + assert "Daemon: not running" in result.output def test_proxy_start_when_already_running(self, runner): mock_svc = MagicMock() @@ -71,18 +59,6 @@ def test_proxy_start_when_already_running(self, runner): assert "already running" in result.output.lower() mock_svc.start_daemon.assert_not_called() - def test_proxy_stop_calls_stop_daemon(self, runner): - mock_svc = MagicMock() - mock_svc.stop_daemon.return_value = True - - with ( - patch("roar.integrations.config.get_roar_dir", return_value="/tmp/.roar"), - patch("roar.execution.cluster.proxy.ProxyService", return_value=mock_svc), - ): - result = runner.invoke(proxy_cli_module.proxy, ["stop"]) - assert result.exit_code == 0 - assert "stopped" in result.output.lower() - def test_proxy_stop_when_not_running(self, runner): mock_svc = MagicMock() mock_svc.stop_daemon.return_value = False @@ -94,19 +70,3 @@ def test_proxy_stop_when_not_running(self, runner): result = runner.invoke(proxy_cli_module.proxy, ["stop"]) assert result.exit_code == 0 assert "no proxy daemon" in result.output.lower() - - def test_proxy_status_subcommand(self, runner): - mock_svc = MagicMock() - mock_svc.find_proxy.return_value = "/usr/bin/roar-proxy" - mock_svc.get_daemon_status.return_value = {"pid": 123, "port": 9090} - - with ( - patch.object(proxy_cli_module, "config_get", return_value=True), - patch("roar.integrations.config.get_roar_dir", return_value="/tmp/.roar"), - patch("roar.execution.cluster.proxy.ProxyService", return_value=mock_svc), - ): - result = runner.invoke(proxy_cli_module.proxy, ["status"]) - assert result.exit_code == 0 - assert "Proxy enabled: True" in result.output - assert "/usr/bin/roar-proxy" in result.output - assert "running" in result.output.lower() diff --git a/tests/unit/test_register_cli.py b/tests/unit/test_register_cli.py index 6f4fb603..c4f36454 100644 --- a/tests/unit/test_register_cli.py +++ b/tests/unit/test_register_cli.py @@ -45,42 +45,3 @@ def test_register_cli_accepts_s3_uri(tmp_path): mock_register.assert_called_once() request = mock_register.call_args.args[0] assert request.target == "s3://output-bucket/results/run123/final_report.json" - - -def test_register_cli_accepts_step_reference(tmp_path): - runner = CliRunner() - - ctx = MagicMock() - ctx.roar_dir = tmp_path / ".roar" - ctx.roar_dir.mkdir() - ctx.cwd = tmp_path - ctx.is_initialized = True - - with patch("roar.cli.commands.register.register_lineage_target") as mock_register: - mock_register.return_value = _fake_result() - with patch("roar.cli.commands.register.config_get", return_value="https://glaas.local"): - result = runner.invoke(register, ["@4", "--yes"], obj=ctx) - - assert result.exit_code == 0, result.output - mock_register.assert_called_once() - assert mock_register.call_args.args[0].target == "@4" - - -def test_register_cli_accepts_session_hash(tmp_path): - runner = CliRunner() - - ctx = MagicMock() - ctx.roar_dir = tmp_path / ".roar" - ctx.roar_dir.mkdir() - ctx.cwd = tmp_path - ctx.is_initialized = True - - session_hash = "c" * 64 - with patch("roar.cli.commands.register.register_lineage_target") as mock_register: - mock_register.return_value = _fake_result() - with patch("roar.cli.commands.register.config_get", return_value="https://glaas.local"): - result = runner.invoke(register, [session_hash, "--yes"], obj=ctx) - - assert result.exit_code == 0, result.output - mock_register.assert_called_once() - assert mock_register.call_args.args[0].target == session_hash diff --git a/tests/unit/test_reproduce_cli.py b/tests/unit/test_reproduce_cli.py index 4eec85a7..5a755994 100644 --- a/tests/unit/test_reproduce_cli.py +++ b/tests/unit/test_reproduce_cli.py @@ -1,11 +1,10 @@ -"""Unit tests for the thin reproduce CLI wrapper.""" +"""Localized reproduce CLI tests that are not worth exercising via subprocess.""" from pathlib import Path from unittest.mock import MagicMock, patch from click.testing import CliRunner -from roar.application.reproduce.requests import ReproduceRequest from roar.cli.commands.reproduce import reproduce @@ -17,7 +16,7 @@ def _ctx(tmp_path: Path) -> MagicMock: return ctx -def test_reproduce_cli_builds_application_request(tmp_path: Path) -> None: +def test_reproduce_cli_passes_environment_setup_flags(tmp_path: Path) -> None: runner = CliRunner() with patch("roar.cli.commands.reproduce.reproduce_artifact", return_value=None) as mock_service: @@ -26,30 +25,19 @@ def test_reproduce_cli_builds_application_request(tmp_path: Path) -> None: [ "abc123def456", "--run", - "-y", "--dpkg-any-version", "--pip-any-version", "--package-sync", - "--list-requirements", - "--out", - "dag.json", ], obj=_ctx(tmp_path), ) assert result.exit_code == 0 request = mock_service.call_args.args[0] - assert isinstance(request, ReproduceRequest) - assert request.hash_prefix == "abc123def456" - assert request.roar_dir == tmp_path / ".roar" - assert request.cwd == tmp_path assert request.run_pipeline is True - assert request.auto_confirm is True assert request.dpkg_any_version is True assert request.pip_any_version is True assert request.package_sync is True - assert request.list_requirements is True - assert request.out_path == "dag.json" def test_reproduce_cli_surfaces_application_errors(tmp_path: Path) -> None: diff --git a/tests/unit/test_test_harness_tracer.py b/tests/unit/test_test_harness_tracer.py new file mode 100644 index 00000000..fc26f8e1 --- /dev/null +++ b/tests/unit/test_test_harness_tracer.py @@ -0,0 +1,59 @@ +"""Localized tests for the pytest subprocess harness.""" + +from __future__ import annotations + +import os +import subprocess +from pathlib import Path +from unittest.mock import Mock + +import tests.conftest as test_conftest + + +def test_subprocess_env_prepends_repo_local_binary_dirs(monkeypatch) -> None: + monkeypatch.setenv("PATH", "/usr/bin") + monkeypatch.setattr( + test_conftest, + "_repo_local_binary_dirs", + lambda: ["/tmp/release-bin", "/tmp/package-bin"], + ) + + env = test_conftest._subprocess_env() + + assert env["PATH"].split(os.pathsep)[:3] == ["/tmp/release-bin", "/tmp/package-bin", "/usr/bin"] + + +def test_run_command_ensures_repo_local_ptrace_tracer(monkeypatch, tmp_path: Path) -> None: + ensure_tracer = Mock() + monkeypatch.setattr(test_conftest, "_ensure_repo_local_ptrace_tracer", ensure_tracer) + + completed = subprocess.CompletedProcess( + args=["python", "-m", "roar", "run", "python", "script.py"], + returncode=0, + stdout="", + stderr="", + ) + monkeypatch.setattr(test_conftest.subprocess, "run", lambda *args, **kwargs: completed) + + result = test_conftest._run_roar_cmd("run", "python", "script.py", cwd=tmp_path) + + assert result is completed + ensure_tracer.assert_called_once_with() + + +def test_non_run_command_skips_repo_local_ptrace_build(monkeypatch, tmp_path: Path) -> None: + ensure_tracer = Mock() + monkeypatch.setattr(test_conftest, "_ensure_repo_local_ptrace_tracer", ensure_tracer) + + completed = subprocess.CompletedProcess( + args=["python", "-m", "roar", "config", "list"], + returncode=0, + stdout="", + stderr="", + ) + monkeypatch.setattr(test_conftest.subprocess, "run", lambda *args, **kwargs: completed) + + result = test_conftest._run_roar_cmd("config", "list", cwd=tmp_path) + + assert result is completed + ensure_tracer.assert_not_called() diff --git a/tests/unit/test_tracer_cli.py b/tests/unit/test_tracer_cli.py index b2fc3ed3..adf27df1 100644 --- a/tests/unit/test_tracer_cli.py +++ b/tests/unit/test_tracer_cli.py @@ -20,26 +20,6 @@ def test_set_default_writes_tracer_default_key(self): mock_set.assert_called_once_with("tracer.default", "ebpf") assert "Default tracer set to: ebpf" in result.output - def test_check_ptrace_fails_when_binary_missing(self): - runner = CliRunner() - with patch.object( - tracer_cli_module, "_backend_ready", return_value=(False, "ptrace tracer not found") - ): - result = runner.invoke(tracer_cli_module.tracer, ["check", "--backend", "ptrace"]) - - assert result.exit_code == 1 - assert "Tracer check failed for 'ptrace'" in result.output - - def test_check_ptrace_passes_when_binary_present(self): - runner = CliRunner() - with patch.object( - tracer_cli_module, "_backend_ready", return_value=(True, "/usr/bin/roar-tracer") - ): - result = runner.invoke(tracer_cli_module.tracer, ["check", "--backend", "ptrace"]) - - assert result.exit_code == 0 - assert "Tracer check passed for 'ptrace'" in result.output - def test_set_default_preload_via_alias(self): runner = CliRunner() with patch.object( @@ -50,25 +30,3 @@ def test_set_default_preload_via_alias(self): assert result.exit_code == 0 mock_set.assert_called_once_with("tracer.default", "preload") assert "Default tracer set to: preload" in result.output - - def test_status_shows_default_and_fallback(self): - runner = CliRunner() - with ( - patch.object(tracer_cli_module, "config_get") as mock_get, - patch.object(tracer_cli_module, "_find_ptrace_tracer", return_value=None), - patch.object(tracer_cli_module, "_find_ebpf_tracer", return_value=None), - patch.object(tracer_cli_module, "_find_preload_tracer", return_value=None), - patch.object(tracer_cli_module, "_find_roard", return_value=None), - patch.object(tracer_cli_module, "_get_perf_event_paranoid", return_value=2), - ): - mock_get.side_effect = lambda key: { - "tracer.default": "auto", - "tracer.fallback_enabled": True, - "proxy.enabled": False, - }.get(key) - result = runner.invoke(tracer_cli_module.tracer, ["status"]) - - assert result.exit_code == 0 - assert "Default tracer: auto" in result.output - assert "Fallback enabled: True" in result.output - assert "Proxy enabled: False" in result.output diff --git a/tests/unit/test_tracer_data_loader.py b/tests/unit/test_tracer_data_loader.py index bab0dc4a..cf955c1e 100644 --- a/tests/unit/test_tracer_data_loader.py +++ b/tests/unit/test_tracer_data_loader.py @@ -1,5 +1,6 @@ -"""Unit tests for tracer MessagePack loading and normalization.""" +"""Unit tests for tracer report loading and normalization.""" +import json from pathlib import Path import msgpack @@ -12,6 +13,28 @@ def _write_msgpack(path: Path, payload: dict) -> None: class TestDataLoaderService: + def test_loads_json_report_when_tracer_writes_json(self, tmp_path: Path) -> None: + report = { + "version": 1, + "tracer_mode": "ptrace", + "opened_files": ["/tmp/a.txt"], + "read_files": ["/tmp/a.txt"], + "written_files": ["/tmp/b.txt"], + "processes": [{"pid": 1, "parent_pid": None, "command": ["python", "train.py"]}], + "start_time": 1.0, + "end_time": 2.5, + } + report_path = tmp_path / "trace.msgpack" + report_path.write_text(json.dumps(report)) + + data = DataLoaderService().load_tracer_data(str(report_path)) + + assert data.opened_files == ["/tmp/a.txt"] + assert data.read_files == ["/tmp/a.txt"] + assert data.written_files == ["/tmp/b.txt"] + assert data.tracer_mode == "ptrace" + assert data.version == 1 + def test_loads_legacy_ptrace_report(self, tmp_path: Path) -> None: report = { "opened_files": ["/tmp/a.txt", "/tmp/a.txt"],