diff --git a/lerobot_validator/__init__.py b/lerobot_validator/__init__.py index a1e09c3..7182c60 100644 --- a/lerobot_validator/__init__.py +++ b/lerobot_validator/__init__.py @@ -9,6 +9,7 @@ from lerobot_validator.validator import LerobotDatasetValidator from lerobot_validator.gcp_path import compute_gcp_path +from lerobot_validator.v3_checks import Issue, validate_v3_dataset -__all__ = ["LerobotDatasetValidator", "compute_gcp_path"] +__all__ = ["LerobotDatasetValidator", "compute_gcp_path", "Issue", "validate_v3_dataset"] diff --git a/lerobot_validator/v3_checks.py b/lerobot_validator/v3_checks.py new file mode 100644 index 0000000..7b9185c --- /dev/null +++ b/lerobot_validator/v3_checks.py @@ -0,0 +1,548 @@ +""" +P0 validators for LeRobot v3 datasets. + +Each validator function takes a dataset path and returns a list of Issue objects. +Issues have a level ("error" or "warning") and a descriptive message. + +Validators: + V1: validate_tasks_format -- meta/tasks.parquet vs tasks.jsonl + V2: validate_codebase_version -- info.json codebase_version starts with "v3." + V5: validate_feature_shapes -- reject shape=[], image features need 3-element shape + V7: validate_timestamps -- reject absolute Unix epoch timestamps in data parquet + V11: validate_custom_metadata_csv -- required columns, no null/duplicate episode_ids + V12: validate_start_timestamp -- start_timestamp must be plausible Unix epoch floats +""" + +import json +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +import pandas as pd +from cloudpathlib import AnyPath, CloudPath + +logger = logging.getLogger(__name__) + +# Timestamps at or above this value are treated as absolute Unix epoch (year 2000+). +_UNIX_EPOCH_THRESHOLD = 946_684_800.0 + +# Upper bound for plausible Unix epoch timestamps (year 2100). +_UNIX_EPOCH_MAX = 4_102_444_800.0 + +# Required columns in custom_metadata.csv (minimum set for rejection). +_REQUIRED_METADATA_COLUMNS = ["episode_index", "episode_id"] + + +@dataclass +class Issue: + """A single validation finding.""" + + level: str # "error" or "warning" + validator: str # e.g. "validate_tasks_format" + message: str + + def __str__(self) -> str: + return f"[{self.level}] {self.validator}: {self.message}" + + +# --------------------------------------------------------------------------- +# V1: validate_tasks_format +# --------------------------------------------------------------------------- + + +def validate_tasks_format(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that meta/tasks.parquet exists; warn if only tasks.jsonl is present. + + - Error: neither tasks.parquet nor tasks.jsonl exists. + - Warning: tasks.jsonl exists but tasks.parquet does not (old format). + - Pass: tasks.parquet exists. + """ + root = _to_path(dataset_path) + meta = root / "meta" + issues: List[Issue] = [] + + has_parquet = (meta / "tasks.parquet").exists() + has_jsonl = (meta / "tasks.jsonl").exists() + + if not has_parquet and not has_jsonl: + issues.append( + Issue( + level="error", + validator="validate_tasks_format", + message=( + "meta/tasks.parquet not found. " + "LeRobot v3 datasets must include a tasks.parquet file." + ), + ) + ) + elif has_jsonl and not has_parquet: + issues.append( + Issue( + level="warning", + validator="validate_tasks_format", + message=( + "meta/tasks.parquet not found but meta/tasks.jsonl is present. " + "The converter will auto-convert, but you should migrate to " + "tasks.parquet before uploading." + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V2: validate_codebase_version +# --------------------------------------------------------------------------- + + +def validate_codebase_version(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that info.json contains codebase_version starting with 'v3.'. + + - Error: codebase_version is missing or does not start with 'v3.'. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + issues.append( + Issue( + level="error", + validator="validate_codebase_version", + message="meta/info.json not found or not valid JSON.", + ) + ) + return issues + + version = info.get("codebase_version") + if version is None: + issues.append( + Issue( + level="error", + validator="validate_codebase_version", + message="meta/info.json is missing 'codebase_version' field.", + ) + ) + elif not str(version).startswith("v3."): + issues.append( + Issue( + level="error", + validator="validate_codebase_version", + message=( + f"codebase_version is '{version}' but must start with 'v3.'. " + "Only LeRobot v3 datasets are supported." + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V5: validate_feature_shapes +# --------------------------------------------------------------------------- + + +def validate_feature_shapes(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check feature shapes in info.json. + + - Error: a feature has shape=[] (zero-dimensional). + - Error: an image/video feature does not have a 3-element shape. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + return issues + + features = info.get("features", {}) + if not isinstance(features, dict): + return issues + + for name, defn in features.items(): + if not isinstance(defn, dict): + continue + + shape = defn.get("shape") + dtype = defn.get("dtype", "") + + # Reject 0-D shapes + if isinstance(shape, list) and len(shape) == 0: + issues.append( + Issue( + level="error", + validator="validate_feature_shapes", + message=( + f"Feature '{name}' has an empty shape (shape: []). " + "Scalar features should use shape: [1]." + ), + ) + ) + continue + + # Image/video features must have exactly 3 dimensions (H, W, C) or (C, H, W) + if dtype in ("video", "image") and isinstance(shape, list) and len(shape) != 3: + issues.append( + Issue( + level="error", + validator="validate_feature_shapes", + message=( + f"Feature '{name}' (dtype='{dtype}') has shape {shape} " + f"but image/video features must have a 3-element shape " + f"(e.g. [H, W, C])." + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V7: validate_timestamps +# --------------------------------------------------------------------------- + + +def validate_timestamps(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that data parquet timestamps are relative, not absolute Unix epoch. + + Reads the first data parquet file and checks the first timestamp value. + + - Error: timestamps are absolute Unix epoch (>= 946684800.0). + - Warning: timestamps are not monotonically increasing within an episode. + - Warning: non-zero starting offset within an episode (> 1 second). + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + data_dir = root / "data" + if not data_dir.exists(): + return issues + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return issues + + # Read the first data parquet file + pf = parquet_files[0] + try: + df = pd.read_parquet(str(pf), columns=["timestamp", "episode_index"]) + except Exception: + try: + df = pd.read_parquet(str(pf), columns=["timestamp"]) + except Exception: + return issues + + if df.empty or "timestamp" not in df.columns: + return issues + + # Check for absolute Unix epoch timestamps + first_ts = float(df["timestamp"].iloc[0]) + if first_ts >= _UNIX_EPOCH_THRESHOLD: + issues.append( + Issue( + level="error", + validator="validate_timestamps", + message=( + f"Timestamps appear to be absolute Unix epoch values " + f"(first value: {first_ts}). LeRobot v3 requires " + f"per-episode-relative timestamps starting near 0.0. " + f"Absolute timestamps cause video decode failures." + ), + ) + ) + return issues # No point checking monotonicity if timestamps are wrong type + + # Check per-episode properties if episode_index is available + if "episode_index" in df.columns: + for ep_idx, ep_df in df.groupby("episode_index"): + ts = ep_df["timestamp"].values + + # Warn if starting offset > 1 second + if len(ts) > 0 and ts[0] > 1.0: + issues.append( + Issue( + level="warning", + validator="validate_timestamps", + message=( + f"Episode {ep_idx} starts at timestamp {ts[0]:.3f}s " + f"(expected near 0.0)." + ), + ) + ) + + # Warn if not monotonically increasing + if len(ts) > 1: + diffs = ts[1:] - ts[:-1] + if (diffs < 0).any(): + issues.append( + Issue( + level="warning", + validator="validate_timestamps", + message=( + f"Episode {ep_idx} has non-monotonically " + f"increasing timestamps." + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V11: validate_custom_metadata_csv +# --------------------------------------------------------------------------- + + +def validate_custom_metadata_csv(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that meta/custom_metadata.csv exists and has required columns. + + - Error: file missing. + - Error: required columns (episode_index, episode_id) absent. + - Error: null episode_id values. + - Error: duplicate episode_id values. + - Warning: other expected columns missing. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + csv_path = root / "meta" / "custom_metadata.csv" + if not csv_path.exists(): + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message="meta/custom_metadata.csv not found.", + ) + ) + return issues + + try: + df = pd.read_csv(str(csv_path)) + except Exception as exc: + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message=f"Failed to read meta/custom_metadata.csv: {exc}", + ) + ) + return issues + + # Check required columns + missing_required = [c for c in _REQUIRED_METADATA_COLUMNS if c not in df.columns] + if missing_required: + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message=( + f"meta/custom_metadata.csv is missing required columns: " + f"{missing_required}" + ), + ) + ) + return issues # Cannot do further checks without required columns + + # Check for null episode_id values + null_ids = df[df["episode_id"].isna()] + if len(null_ids) > 0: + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message=( + f"episode_id has null values at rows: " + f"{null_ids.index.tolist()}" + ), + ) + ) + + # Check for duplicate episode_id values + duplicates = df[df["episode_id"].duplicated(keep=False)] + if len(duplicates) > 0: + dup_ids = duplicates["episode_id"].unique().tolist() + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message=( + f"episode_id has duplicate values: {dup_ids}" + ), + ) + ) + + # Warn about other expected columns that are missing + all_expected = [ + "episode_index", + "operator_id", + "is_eval_episode", + "episode_id", + "start_timestamp", + "checkpoint_path", + "success", + "station_id", + "robot_id", + ] + missing_optional = [ + c for c in all_expected if c not in df.columns and c not in _REQUIRED_METADATA_COLUMNS + ] + if missing_optional: + issues.append( + Issue( + level="warning", + validator="validate_custom_metadata_csv", + message=( + f"meta/custom_metadata.csv is missing optional columns: " + f"{missing_optional}" + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V12: validate_start_timestamp +# --------------------------------------------------------------------------- + + +def validate_start_timestamp(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that start_timestamp values are plausible Unix epoch floats. + + - Error: value is not a valid float. + - Error: value is below year-2000 threshold (likely relative, not absolute). + - Error: value is above year-2100 threshold. + - Error: value is null/missing. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + csv_path = root / "meta" / "custom_metadata.csv" + if not csv_path.exists(): + return issues # V11 already reports this + + try: + df = pd.read_csv(str(csv_path)) + except Exception: + return issues # V11 already reports this + + if "start_timestamp" not in df.columns: + return issues # V11 warns about missing columns + + invalid: List[str] = [] + for idx, row in df.iterrows(): + ts = row.get("start_timestamp") + episode_id = row.get("episode_id", f"row_{idx}") + + if pd.isna(ts): + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"start_timestamp is missing/null" + ) + continue + + try: + ts_float = float(ts) + except (ValueError, TypeError): + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"'{ts}' is not a valid float" + ) + continue + + if ts_float < _UNIX_EPOCH_THRESHOLD: + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"{ts_float} is below year-2000 threshold ({_UNIX_EPOCH_THRESHOLD}); " + f"likely a relative offset, not an absolute Unix timestamp" + ) + elif ts_float > _UNIX_EPOCH_MAX: + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"{ts_float} is above year-2100 threshold ({_UNIX_EPOCH_MAX})" + ) + + if invalid: + issues.append( + Issue( + level="error", + validator="validate_start_timestamp", + message=( + "start_timestamp must be a valid Unix epoch float " + f"(range {_UNIX_EPOCH_THRESHOLD} to {_UNIX_EPOCH_MAX}):\n" + + "\n".join(invalid) + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# Convenience: run all P0 validators +# --------------------------------------------------------------------------- + +_P0_VALIDATORS = [ + validate_tasks_format, + validate_codebase_version, + validate_feature_shapes, + validate_timestamps, + validate_custom_metadata_csv, + validate_start_timestamp, +] + + +def validate_v3_dataset( + dataset_path: Union[str, Path, CloudPath], + thorough: bool = False, +) -> List[Issue]: + """Run all P0 validators and return a combined list of issues. + + Args: + dataset_path: Path to the lerobot dataset directory. + thorough: Reserved for future P2 checks that require video probing. + + Returns: + A list of Issue objects (errors and warnings). + """ + all_issues: List[Issue] = [] + for validator_fn in _P0_VALIDATORS: + try: + all_issues.extend(validator_fn(dataset_path)) + except Exception as exc: + logger.warning("Validator %s raised: %s", validator_fn.__name__, exc) + all_issues.append( + Issue( + level="error", + validator=validator_fn.__name__, + message=f"Validator raised an unexpected exception: {exc}", + ) + ) + return all_issues + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _to_path(dataset_path: Union[str, Path, CloudPath]) -> Any: + """Convert a string or Path to an AnyPath.""" + if isinstance(dataset_path, str): + return AnyPath(dataset_path) + return dataset_path + + +def _load_info(root: Any) -> Optional[Dict[str, Any]]: + """Load meta/info.json and return the parsed dict, or None on failure.""" + info_file = root / "meta" / "info.json" + if not info_file.exists(): + return None + try: + with info_file.open("r") as f: + return json.load(f) + except Exception: + return None diff --git a/lerobot_validator/validator.py b/lerobot_validator/validator.py index 6d5b60c..e30e563 100644 --- a/lerobot_validator/validator.py +++ b/lerobot_validator/validator.py @@ -10,6 +10,7 @@ from lerobot_validator.metadata_validator import MetadataValidator from lerobot_validator.annotation_validator import AnnotationValidator from lerobot_validator.lerobot_checks import LerobotDatasetChecker +from lerobot_validator.v3_checks import validate_v3_dataset class LerobotDatasetValidator: @@ -51,6 +52,7 @@ def __init__( self.lerobot_checker = LerobotDatasetChecker(self.dataset_path) self.errors: List[str] = [] + self.warnings: List[str] = [] def validate(self) -> bool: """ @@ -60,6 +62,7 @@ def validate(self) -> bool: True if all validations pass, False otherwise """ self.errors = [] + self.warnings = [] # Run individual validators metadata_valid = self.metadata_validator.validate() @@ -71,6 +74,14 @@ def validate(self) -> bool: self.errors.extend(self.annotation_validator.get_errors()) self.errors.extend(self.lerobot_checker.get_errors()) + # Run P0 v3 validators + v3_issues = validate_v3_dataset(self.dataset_path) + for issue in v3_issues: + if issue.level == "error": + self.errors.append(f"[{issue.validator}] {issue.message}") + else: + self.warnings.append(f"[{issue.validator}] {issue.message}") + # If basic validations pass and annotations exist, run cross-validation if metadata_valid and annotation_valid and self.annotation_validator.get_annotations(): self._cross_validate() @@ -229,8 +240,18 @@ def get_errors(self) -> List[str]: """Get all validation errors.""" return self.errors + def get_warnings(self) -> List[str]: + """Get all validation warnings.""" + return self.warnings + def print_results(self) -> None: """Print validation results.""" + if self.warnings: + print(f"Warnings ({len(self.warnings)}):\n") + for i, warning in enumerate(self.warnings, 1): + print(f" {i}. {warning}") + print() + if len(self.errors) == 0: print("✓ All validations passed!") else: diff --git a/tests/test_integration.py b/tests/test_integration.py index 3014467..033d7e2 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -13,11 +13,11 @@ def create_test_dataset(tmpdir): """Create a minimal test dataset structure.""" dataset_path = Path(tmpdir) / "dataset" dataset_path.mkdir() - + # Create meta folder meta_dir = dataset_path / "meta" meta_dir.mkdir() - + # Create data folder (lerobot datasets have parquet files here with task column) data_dir = dataset_path / "data" data_dir.mkdir() @@ -28,6 +28,8 @@ def create_test_dataset(tmpdir): # Create info.json in meta folder (lerobot stores it there) info = { "fps": 30, + "codebase_version": "v3.0", + "features": {}, "episodes": { "ep_001": {"duration": 10.0, "num_frames": 300}, "ep_002": {"duration": 5.0, "num_frames": 150}, @@ -36,6 +38,11 @@ def create_test_dataset(tmpdir): with open(meta_dir / "info.json", "w") as f: json.dump(info, f) + # Create tasks.parquet (required by v3 validators) + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + meta_dir / "tasks.parquet", index=False + ) + return dataset_path diff --git a/tests/test_is_eval_data_consistency.py b/tests/test_is_eval_data_consistency.py index d80cadc..3bca560 100644 --- a/tests/test_is_eval_data_consistency.py +++ b/tests/test_is_eval_data_consistency.py @@ -13,11 +13,11 @@ def create_test_dataset(tmpdir): """Create a minimal test dataset structure.""" dataset_path = Path(tmpdir) / "dataset" dataset_path.mkdir() - + # Create meta folder meta_dir = dataset_path / "meta" meta_dir.mkdir() - + # Create data folder (lerobot datasets have parquet files here with task column) data_dir = dataset_path / "data" data_dir.mkdir() @@ -28,10 +28,17 @@ def create_test_dataset(tmpdir): # Create info.json in meta folder (lerobot stores it there) info = { "fps": 30, + "codebase_version": "v3.0", + "features": {}, } with open(meta_dir / "info.json", "w") as f: json.dump(info, f) + # Create tasks.parquet (required by v3 validators) + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + meta_dir / "tasks.parquet", index=False + ) + return dataset_path diff --git a/tests/test_v3_checks.py b/tests/test_v3_checks.py new file mode 100644 index 0000000..eea8b8b --- /dev/null +++ b/tests/test_v3_checks.py @@ -0,0 +1,615 @@ +"""Tests for P0 v3 validators (lerobot_validator.v3_checks).""" + +import json +import tempfile +from pathlib import Path +from typing import Any, Dict, List + +import pandas as pd + +from lerobot_validator.v3_checks import ( + Issue, + validate_codebase_version, + validate_custom_metadata_csv, + validate_feature_shapes, + validate_start_timestamp, + validate_tasks_format, + validate_timestamps, + validate_v3_dataset, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_dataset(tmpdir: str) -> Path: + """Create a minimal dataset directory skeleton.""" + root = Path(tmpdir) / "dataset" + root.mkdir() + (root / "meta").mkdir() + (root / "data").mkdir() + return root + + +def _write_info(root: Path, info: Dict[str, Any]) -> None: + with open(root / "meta" / "info.json", "w") as f: + json.dump(info, f) + + +def _minimal_info(**overrides: Any) -> Dict[str, Any]: + info: Dict[str, Any] = { + "fps": 30, + "codebase_version": "v3.0", + "chunks_size": 1000, + "features": { + "observation.images.top": { + "dtype": "video", + "shape": [480, 640, 3], + }, + "action": { + "dtype": "float32", + "shape": [7], + }, + }, + } + info.update(overrides) + return info + + +def _write_tasks_parquet(root: Path) -> None: + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + root / "meta" / "tasks.parquet", index=False + ) + + +def _write_custom_metadata(root: Path, df: pd.DataFrame) -> None: + df.to_csv(root / "meta" / "custom_metadata.csv", index=False) + + +def _valid_metadata_df() -> pd.DataFrame: + return pd.DataFrame( + { + "episode_index": [0, 1], + "operator_id": ["op1", "op1"], + "is_eval_episode": [False, False], + "episode_id": ["ep_001", "ep_002"], + "start_timestamp": [1730455200.0, 1730458800.0], + "checkpoint_path": ["", ""], + "success": [True, False], + "station_id": ["station_1", "station_1"], + "robot_id": ["robot_1", "robot_1"], + } + ) + + +def _errors(issues: List[Issue]) -> List[Issue]: + return [i for i in issues if i.level == "error"] + + +def _warnings(issues: List[Issue]) -> List[Issue]: + return [i for i in issues if i.level == "warning"] + + +# =================================================================== +# V1: validate_tasks_format +# =================================================================== + + +class TestValidateTasksFormat: + def test_parquet_present_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + + issues = validate_tasks_format(root) + assert len(_errors(issues)) == 0 + + def test_neither_file_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + issues = validate_tasks_format(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "tasks.parquet not found" in errors[0].message + + def test_jsonl_only_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + (root / "meta" / "tasks.jsonl").write_text( + '{"task_index": 0, "task": "pick"}\n' + ) + + issues = validate_tasks_format(root) + assert len(_errors(issues)) == 0 + warnings = _warnings(issues) + assert len(warnings) == 1 + assert "tasks.jsonl" in warnings[0].message + + def test_both_files_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + (root / "meta" / "tasks.jsonl").write_text( + '{"task_index": 0, "task": "pick"}\n' + ) + + issues = validate_tasks_format(root) + assert len(issues) == 0 + + +# =================================================================== +# V2: validate_codebase_version +# =================================================================== + + +class TestValidateCodebaseVersion: + def test_v3_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v3.0")) + + issues = validate_codebase_version(root) + assert len(issues) == 0 + + def test_v3_minor_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v3.1.2")) + + issues = validate_codebase_version(root) + assert len(issues) == 0 + + def test_v2_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v2.1")) + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "v3." in errors[0].message + + def test_missing_version_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + del info["codebase_version"] + _write_info(root, info) + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "missing" in errors[0].message.lower() + + def test_no_info_json_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no info.json at all + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "info.json" in errors[0].message + + +# =================================================================== +# V5: validate_feature_shapes +# =================================================================== + + +class TestValidateFeatureShapes: + def test_valid_shapes_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_empty_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["action"]["shape"] = [] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "empty shape" in errors[0].message + assert "action" in errors[0].message + + def test_scalar_shape_1_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["scalar_feat"] = {"dtype": "float32", "shape": [1]} + _write_info(root, info) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_image_feature_2d_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["observation.images.top"]["shape"] = [640, 480] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "3-element shape" in errors[0].message + + def test_video_feature_4d_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["observation.images.top"]["shape"] = [1, 480, 640, 3] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "3-element shape" in errors[0].message + + def test_image_dtype_3d_shape_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["cam"] = {"dtype": "image", "shape": [480, 640, 3]} + _write_info(root, info) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_no_info_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no info.json + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + +# =================================================================== +# V7: validate_timestamps +# =================================================================== + + +class TestValidateTimestamps: + def test_relative_timestamps_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0], + "timestamp": [0.0, 0.033, 0.066], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + assert len(issues) == 0 + + def test_absolute_timestamps_error(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [1_700_000_000.0, 1_700_000_000.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "absolute Unix epoch" in errors[0].message + + def test_non_monotonic_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0], + "timestamp": [0.0, 0.066, 0.033], # non-monotonic + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("non-monotonically" in w.message for w in warnings) + + def test_large_starting_offset_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [5.0, 5.033], # starts at 5s, not near 0 + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("starts at timestamp" in w.message for w in warnings) + + def test_no_data_dir_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # data dir is empty (no parquet files) + + issues = validate_timestamps(root) + assert len(issues) == 0 + + def test_no_episode_index_column_still_checks_absolute(self): + """Even without episode_index column, absolute timestamps should be caught.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "timestamp": [1_700_000_000.0, 1_700_000_000.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "absolute Unix epoch" in errors[0].message + + +# =================================================================== +# V11: validate_custom_metadata_csv +# =================================================================== + + +class TestValidateCustomMetadataCsv: + def test_valid_metadata_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_custom_metadata_csv(root) + assert len(_errors(issues)) == 0 + + def test_missing_file_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "not found" in errors[0].message + + def test_missing_episode_index_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_id": ["ep_001", "ep_002"], + "operator_id": ["op1", "op1"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "episode_index" in errors[0].message + + def test_missing_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0, 1], + "operator_id": ["op1", "op1"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "episode_id" in errors[0].message + + def test_null_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "episode_id"] = None + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert any("null" in e.message for e in errors) + + def test_duplicate_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[1, "episode_id"] = "ep_001" # duplicate + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert any("duplicate" in e.message for e in errors) + + def test_missing_optional_columns_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0, 1], + "episode_id": ["ep_001", "ep_002"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("optional columns" in w.message for w in warnings) + + def test_all_columns_present_no_warnings(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_custom_metadata_csv(root) + warnings = _warnings(issues) + assert len(warnings) == 0 + + +# =================================================================== +# V12: validate_start_timestamp +# =================================================================== + + +class TestValidateStartTimestamp: + def test_valid_timestamps_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + def test_null_timestamp_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = None + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "missing/null" in errors[0].message + + def test_below_threshold_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = 100.0 # relative offset, not epoch + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "below year-2000 threshold" in errors[0].message + + def test_above_max_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = 5_000_000_000.0 # year ~2128 + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "above year-2100 threshold" in errors[0].message + + def test_non_numeric_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df["start_timestamp"] = df["start_timestamp"].astype(str) + df.loc[0, "start_timestamp"] = "not-a-number" + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "not a valid float" in errors[0].message + + def test_missing_csv_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no custom_metadata.csv + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + def test_missing_column_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0], + "episode_id": ["ep_001"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + +# =================================================================== +# validate_v3_dataset (combined runner) +# =================================================================== + + +class TestValidateV3Dataset: + def test_fully_valid_dataset_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_custom_metadata(root, _valid_metadata_df()) + + # Write relative timestamps + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [0.0, 0.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_v3_dataset(root) + errors = _errors(issues) + assert len(errors) == 0 + + def test_multiple_issues_collected(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # No info.json -> V2 error + # No tasks.parquet -> V1 error + # No custom_metadata.csv -> V11 error + + issues = validate_v3_dataset(root) + errors = _errors(issues) + # Should have at least V1 + V2 + V11 errors + assert len(errors) >= 3 + + def test_issue_str_representation(self): + issue = Issue(level="error", validator="test_validator", message="test message") + assert str(issue) == "[error] test_validator: test message"