Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,13 @@ markers = [
"ebpf: Tests requiring eBPF daemon/root privileges",
"cloud: Tests for cloud storage operations",
"happy_path: Happy path tests for core functionality",
"diagnostic: Opt-in diagnostics or aspirational performance budgets outside the default gate",
"large_pipeline: Stress-style pipeline coverage with larger DAG fixtures",
"ray_e2e: Ray end-to-end tests requiring a running Docker cluster",
"ray_contract: User-facing Ray contract tests using `roar run ray job submit ...`",
"ray_diagnostic: Diagnostic Ray tests that intentionally inspect internal runtime details",
]
addopts = "-v --strict-markers -n auto --dist loadfile --ignore=tests/ebpf --ignore=tests/live_glaas --ignore=tests/benchmarks --ignore=tests/integration --ignore=tests/e2e --ignore-glob=tests/backends/*/e2e --ignore-glob=tests/backends/*/live"
addopts = "-v --strict-markers -n auto --dist loadfile --ignore=tests/ebpf --ignore=tests/live_glaas --ignore=tests/benchmarks --ignore=tests/e2e --ignore=tests/integration/test_cli_startup.py --ignore=tests/execution/runtime/test_sitecustomize_perf.py --ignore-glob=tests/backends/*/e2e --ignore-glob=tests/backends/*/live"
timeout = 60
filterwarnings = [
"ignore::DeprecationWarning",
Expand Down
3 changes: 2 additions & 1 deletion roar/application/labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
from typing import Any, Protocol

from ..db.context import DatabaseContext
from ..execution.recording.dataset_metadata import AUTO_DATASET_LABEL_KEYS

RESERVED_LABEL_KEYS = {"dataset.type", "dataset.modality"}
RESERVED_LABEL_KEYS = set(AUTO_DATASET_LABEL_KEYS)


@dataclass(frozen=True)
Expand Down
11 changes: 6 additions & 5 deletions roar/cli/commands/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ def auth_register() -> None:
@auth.command("test")
def auth_test() -> None:
"""Test connection to GLaaS server."""
# Get GLaaS server URL from config
glaas_url = config_get("glaas.url")
if not glaas_url:
glaas_url = os.environ.get("GLAAS_URL")
from ...integrations.glaas import get_glaas_url

glaas_url = get_glaas_url()

if not glaas_url:
raise click.ClickException(
Expand Down Expand Up @@ -201,7 +200,9 @@ def auth_test() -> None:
@auth.command("status")
def auth_status() -> None:
"""Show current auth status."""
glaas_url = config_get("glaas.url") or os.environ.get("GLAAS_URL")
from ...integrations.glaas import get_glaas_url

glaas_url = get_glaas_url()
key_info = _find_ssh_pubkey()

click.echo("GLaaS Auth Status")
Expand Down
15 changes: 12 additions & 3 deletions roar/db/repositories/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,11 @@ def update_current_step(self, session_id: int, step_number: int) -> None:
self._session.flush()

def update_git_commits(
self, session_id: int, git_commit: str, update_start: bool = False
self,
session_id: int,
git_commit: str | None,
update_start: bool = False,
git_repo: str | None = None,
) -> None:
"""
Update git commit references for a session.
Expand All @@ -386,14 +390,19 @@ def update_git_commits(
session_id: Session ID
git_commit: Git commit hash
update_start: Whether to update start commit if not set
git_repo: Git repository URL/path to persist on the session
"""
session = self._session.get(Session, session_id)
if not session:
return

if update_start and not session.git_commit_start:
if git_repo and not session.git_repo:
session.git_repo = git_repo

if git_commit and update_start and not session.git_commit_start:
session.git_commit_start = git_commit
session.git_commit_end = git_commit
if git_commit:
session.git_commit_end = git_commit
self._session.flush()

def rename_step(
Expand Down
12 changes: 9 additions & 3 deletions roar/db/services/job_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def record_job(

# Handle session assignment
session_id, step_number = self._assign_to_session(
assign_to_session, step_identity, git_commit
assign_to_session, step_identity, git_commit, git_repo
)

# Create the job record
Expand Down Expand Up @@ -193,6 +193,7 @@ def _assign_to_session(
assign: bool,
step_identity: str,
git_commit: str | None,
git_repo: str | None,
) -> tuple[int | None, int | None]:
"""Handle session assignment and step numbering."""
if not assign:
Expand All @@ -209,8 +210,13 @@ def _assign_to_session(

self._session_repo.update_current_step(session_id, step_number)

if git_commit:
self._session_repo.update_git_commits(session_id, git_commit, update_start=True)
if git_commit or git_repo:
self._session_repo.update_git_commits(
session_id,
git_commit,
update_start=True,
git_repo=git_repo,
)

return session_id, step_number

Expand Down
11 changes: 10 additions & 1 deletion roar/execution/provenance/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,16 @@ def load_tracer_data(self, path: str) -> TracerData:
"""
self.logger.debug("Loading tracer data from: %s", path)
with open(path, "rb") as f:
data = msgpack.unpack(f, raw=False)
raw_data = f.read()

try:
data = msgpack.unpackb(raw_data, raw=False)
except msgpack.ExtraData:
stripped = raw_data.lstrip()
if not stripped.startswith((b"{", b"[")):
raise
self.logger.debug("Tracer report was JSON-encoded; falling back to JSON parsing")
data = json.loads(raw_data.decode("utf-8"))

self.logger.debug("Tracer data parsed successfully: %d keys", len(data))
files = self._normalize_files(data)
Expand Down
58 changes: 57 additions & 1 deletion roar/execution/recording/dataset_metadata.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
"""Helpers for attaching dataset identity labels to composite artifact metadata."""
"""Helpers for attaching dataset identity metadata and labels to composite artifacts."""

from __future__ import annotations

from typing import Any
from urllib.parse import urlparse

from .dataset_profile import build_dataset_profile

AUTO_DATASET_LABEL_KEYS = frozenset(
{
"dataset.type",
"dataset.id",
"dataset.fingerprint",
"dataset.fingerprint_algorithm",
"dataset.split",
"dataset.version_hint",
"dataset.modality",
}
)


def find_matching_identifier(
root_path: str, dataset_identifiers: list[dict[str, Any]]
Expand Down Expand Up @@ -50,3 +64,45 @@ def build_dataset_metadata(identifier: dict[str, Any]) -> dict[str, Any]:
if value is not None:
meta[key] = value
return meta


def build_dataset_label_metadata(
identifier: dict[str, Any],
*,
components: list[dict[str, Any]] | None = None,
component_count_total: int | None = None,
) -> dict[str, Any]:
"""Build the system-managed label document for a detected dataset artifact.

The label payload is intentionally smaller and more stable than the full
dataset metadata blob. It captures the artifact's dataset identity and the
most queryable derived characteristics for local labels and future sync.
"""
dataset: dict[str, Any] = {"type": "dataset"}

value = identifier.get("dataset_id")
if value is not None:
dataset["id"] = value

value = identifier.get("dataset_fingerprint")
if value is not None:
dataset["fingerprint"] = value

value = identifier.get("dataset_fingerprint_algorithm")
if value is not None:
dataset["fingerprint_algorithm"] = value

value = identifier.get("split")
if value is not None:
dataset["split"] = value

value = identifier.get("version_hint")
if value is not None:
dataset["version_hint"] = value

profile = build_dataset_profile(components or [], total_components=component_count_total)
modality = profile.get("modality_hint") if isinstance(profile, dict) else None
if isinstance(modality, str) and modality:
dataset["modality"] = modality

return {"dataset": dataset}
94 changes: 89 additions & 5 deletions roar/execution/recording/job_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
from ...db.context import optional_repo
from ...db.hashing import hash_files_blake3
from .dataset_identifier import DatasetIdentifierInferer
from .dataset_metadata import build_dataset_metadata, find_matching_identifier
from .dataset_metadata import (
AUTO_DATASET_LABEL_KEYS,
build_dataset_label_metadata,
build_dataset_metadata,
find_matching_identifier,
)

if TYPE_CHECKING:
from ...core.models.run import RunContext
Expand Down Expand Up @@ -215,10 +220,7 @@ def record(
"""Create a job and link precomputed input/output artifacts."""
resolved_session_id = session_id
if resolved_session_id is None:
active_session = db_ctx.sessions.get_active()
if active_session is None:
raise ValueError("No active session")
resolved_session_id = int(active_session["id"])
resolved_session_id = int(db_ctx.sessions.get_or_create_active())

step_number = db_ctx.sessions.get_next_step_number(resolved_session_id)
job_id, recorded_job_uid = db_ctx.jobs.create(
Expand Down Expand Up @@ -352,8 +354,14 @@ def materialize(
}
}
matching = find_matching_identifier(str(root), dataset_identifiers)
dataset_label_metadata: dict[str, Any] = {}
if matching is not None:
meta_dict["dataset"] = build_dataset_metadata(matching)
dataset_label_metadata = build_dataset_label_metadata(
matching,
components=list(composite.payload.get("components") or []),
component_count_total=composite.component_count_total,
)
metadata = json.dumps(meta_dict)
artifact_id, _created = db_ctx.artifacts.register(
hashes={"composite-blake3": composite.digest},
Expand All @@ -362,6 +370,12 @@ def materialize(
source_type=composite.payload.get("source_type"),
metadata=metadata,
)
if dataset_label_metadata:
self._sync_dataset_labels(
db_ctx,
artifact_id=artifact_id,
dataset_label_metadata=dataset_label_metadata,
)
composite_repo.upsert_details(
artifact_id=artifact_id,
components=list(composite.payload.get("components") or []),
Expand Down Expand Up @@ -482,6 +496,76 @@ def _is_path_under_root(path: Path, root: Path) -> bool:
except ValueError:
return False

@staticmethod
def _sync_dataset_labels(
db_ctx: Any,
*,
artifact_id: str,
dataset_label_metadata: dict[str, Any],
) -> None:
labels_repo = cast(Any, optional_repo(db_ctx, "labels"))
if labels_repo is None or not dataset_label_metadata:
return

current = labels_repo.get_current("artifact", artifact_id=artifact_id)
current_metadata = current.get("metadata") if isinstance(current, dict) else {}
if not isinstance(current_metadata, dict):
current_metadata = {}

merged = CompositeOutputMaterializer._merge_dataset_labels(
current_metadata,
dataset_label_metadata,
)
if merged == current_metadata:
return

labels_repo.create_version("artifact", merged, artifact_id=artifact_id)

@staticmethod
def _merge_dataset_labels(current: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]:
merged = CompositeOutputMaterializer._remove_label_paths(
current,
AUTO_DATASET_LABEL_KEYS,
)
return CompositeOutputMaterializer._deep_merge(merged, patch)

@staticmethod
def _deep_merge(current: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]:
merged = json.loads(json.dumps(current))
for key, value in patch.items():
existing = merged.get(key)
if isinstance(existing, dict) and isinstance(value, dict):
merged[key] = CompositeOutputMaterializer._deep_merge(existing, value)
else:
merged[key] = value
return merged

@staticmethod
def _remove_label_paths(
metadata: dict[str, Any], reserved_paths: set[str] | frozenset[str]
) -> dict[str, Any]:
cleaned = json.loads(json.dumps(metadata))
for path in reserved_paths:
CompositeOutputMaterializer._remove_nested(cleaned, path.split("."))
return cleaned

@staticmethod
def _remove_nested(root: dict[str, Any], path: list[str]) -> None:
if not path:
return
key = path[0]
if key not in root:
return
if len(path) == 1:
root.pop(key, None)
return
child = root.get(key)
if not isinstance(child, dict):
return
CompositeOutputMaterializer._remove_nested(child, path[1:])
if not child:
root.pop(key, None)


class ExecutionJobRecorder:
"""Persist a traced execution and return reporting payload pieces."""
Expand Down
6 changes: 3 additions & 3 deletions tests/application/get/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ def test_get_artifacts_skips_active_session_check_on_dry_run(tmp_path: Path) ->
recorder_cls.return_value.record.assert_not_called()


def test_get_artifacts_requires_active_session_for_real_downloads(tmp_path: Path) -> None:
def test_get_artifacts_surfaces_recorder_errors(tmp_path: Path) -> None:
parsed_source = MagicMock(is_prefix=False, scheme="s3")
db_ctx = MagicMock()
db_ctx.__enter__.return_value = db_ctx
db_ctx.__exit__.return_value = None
service = MagicMock()
service.get.return_value = GetTransferResult(success=True, downloaded_files=[])
recorder = MagicMock()
recorder.record.side_effect = ValueError("No active session")
recorder.record.side_effect = ValueError("Recorder failed")

with (
patch("roar.application.get.service.bootstrap"),
Expand All @@ -153,7 +153,7 @@ def test_get_artifacts_requires_active_session_for_real_downloads(tmp_path: Path
patch("roar.application.get.service.LocalJobRecorder", return_value=recorder),
):
resolve_git_state.return_value.commit = "deadbeef"
with pytest.raises(ValueError, match="No active session"):
with pytest.raises(ValueError, match="Recorder failed"):
get_artifacts(_request(tmp_path))


Expand Down
Loading
Loading