From 9265359244a31bba60071caebfeb3ba94d231145 Mon Sep 17 00:00:00 2001 From: Chandra Kuchi Date: Wed, 18 Mar 2026 17:57:13 +0000 Subject: [PATCH 1/6] feat: add LeRobot v3 metadata validation checks Add LerobotV3MetadataChecker with 7 pre-ingestion validation checks: 1. tasks.parquet exists (flags tasks.jsonl-only datasets) 2. Episodes parquet has required columns (chunk_index, file_index, tasks) 3. Feature shapes are non-empty ([] should be [1]) 4. File path templates use standard placeholders 5. Video files referenced by episodes exist 6. Timestamp consistency (all relative or all absolute) 7. Episode row contiguity in data parquet Integrated into the main validation orchestrator. Includes 17 new tests and updated existing test fixtures to include v3 metadata. Co-Authored-By: Claude Opus 4.6 (1M context) --- lerobot_validator/__init__.py | 3 +- lerobot_validator/v3_metadata_checker.py | 406 +++++++++++++++++++++++ lerobot_validator/validator.py | 4 + tests/test_integration.py | 33 +- tests/test_is_eval_data_consistency.py | 37 ++- tests/test_v3_metadata_checker.py | 368 ++++++++++++++++++++ 6 files changed, 834 insertions(+), 17 deletions(-) create mode 100644 lerobot_validator/v3_metadata_checker.py create mode 100644 tests/test_v3_metadata_checker.py diff --git a/lerobot_validator/__init__.py b/lerobot_validator/__init__.py index a1e09c3..80f8cdf 100644 --- a/lerobot_validator/__init__.py +++ b/lerobot_validator/__init__.py @@ -9,6 +9,7 @@ from lerobot_validator.validator import LerobotDatasetValidator from lerobot_validator.gcp_path import compute_gcp_path +from lerobot_validator.v3_metadata_checker import LerobotV3MetadataChecker -__all__ = ["LerobotDatasetValidator", "compute_gcp_path"] +__all__ = ["LerobotDatasetValidator", "compute_gcp_path", "LerobotV3MetadataChecker"] diff --git a/lerobot_validator/v3_metadata_checker.py b/lerobot_validator/v3_metadata_checker.py new file mode 100644 index 0000000..76a0a2b --- /dev/null +++ b/lerobot_validator/v3_metadata_checker.py @@ -0,0 +1,406 @@ +""" +Validator for LeRobot v3 dataset metadata integrity. + +Checks that the dataset conforms to the LeRobot v3 specification: + 1. tasks.parquet exists (flags if only tasks.jsonl is present) + 2. Episodes parquet has required columns + 3. Feature shapes in info.json are non-empty + 4. File path templates in info.json use standard placeholders + 5. Video files referenced by the dataset exist + 6. Timestamp consistency across data parquet files + 7. Episode row contiguity in data parquet files +""" + +import json +import logging +import re +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Union + +import pandas as pd +from cloudpathlib import AnyPath, CloudPath + +logger = logging.getLogger(__name__) + +# Columns that must exist in every episodes parquet file. +REQUIRED_EPISODES_COLUMNS: List[str] = [ + "data/chunk_index", + "data/file_index", + "tasks", +] + +# Standard placeholders expected in info.json path templates. +REQUIRED_DATA_PATH_PLACEHOLDERS: Set[str] = { + "{episode_chunk", + "{episode_index", +} +REQUIRED_VIDEO_PATH_PLACEHOLDERS: Set[str] = { + "{episode_chunk", + "{episode_index", + "{video_key", +} + +# Timestamps below this threshold are treated as relative (seconds from +# episode start). Values above are treated as absolute Unix timestamps. +# No robot episode is longer than ~11.5 days (1e6 seconds). +_ABSOLUTE_TIMESTAMP_THRESHOLD = 1_000_000.0 + + +class LerobotV3MetadataChecker: + """ + Validates structural metadata for LeRobot v3 datasets. + + Usage:: + + checker = LerobotV3MetadataChecker(dataset_path) + passed = checker.validate() + errors = checker.get_errors() + """ + + def __init__(self, dataset_path: Union[str, Path, CloudPath]) -> None: + if isinstance(dataset_path, str): + self.dataset_path = AnyPath(dataset_path) + else: + self.dataset_path = dataset_path + self.errors: List[str] = [] + self._info: Optional[Dict[str, Any]] = None + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def validate(self) -> bool: + """Run all v3 metadata checks. Returns True when all pass.""" + self.errors = [] + self._info = None + + if not self.dataset_path.exists(): + self.errors.append( + f"Dataset directory not found: {self.dataset_path}" + ) + return False + + self._load_info() + + self._check_tasks_parquet() + self._check_episodes_parquet_columns() + self._check_feature_shapes() + self._check_path_templates() + self._check_video_files_exist() + self._check_timestamp_consistency() + self._check_episode_contiguity() + + return len(self.errors) == 0 + + def get_errors(self) -> List[str]: + """Return accumulated error messages.""" + return self.errors + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _meta_dir(self) -> Any: + return self.dataset_path / "meta" + + def _data_dir(self) -> Any: + return self.dataset_path / "data" + + def _load_info(self) -> None: + info_file = self._meta_dir() / "info.json" + if not info_file.exists(): + return # Already reported by LerobotDatasetChecker + try: + with info_file.open("r") as f: + self._info = json.load(f) + except json.JSONDecodeError as exc: + self.errors.append(f"meta/info.json is not valid JSON: {exc}") + except Exception as exc: + self.errors.append(f"Failed to read meta/info.json: {exc}") + + def _get_video_keys(self) -> List[str]: + """Return feature keys whose dtype is video or image.""" + if self._info is None: + return [] + features = self._info.get("features", {}) + return [ + name + for name, defn in features.items() + if isinstance(defn, dict) + and defn.get("dtype") in ("video", "image") + ] + + # ------------------------------------------------------------------ + # Check 1: tasks.parquet + # ------------------------------------------------------------------ + + def _check_tasks_parquet(self) -> None: + meta = self._meta_dir() + has_parquet = (meta / "tasks.parquet").exists() + has_jsonl = (meta / "tasks.jsonl").exists() + + if has_jsonl and not has_parquet: + self.errors.append( + "meta/tasks.parquet not found but meta/tasks.jsonl is present. " + "LeRobot v3 requires tasks.parquet — convert tasks.jsonl " + "to parquet before uploading." + ) + elif not has_parquet and not has_jsonl: + self.errors.append( + "meta/tasks.parquet not found. " + "LeRobot v3 datasets must include a tasks.parquet file." + ) + + # ------------------------------------------------------------------ + # Check 2: episodes parquet columns + # ------------------------------------------------------------------ + + def _check_episodes_parquet_columns(self) -> None: + episodes_file = self._meta_dir() / "episodes.parquet" + if not episodes_file.exists(): + self.errors.append( + "meta/episodes.parquet not found. " + "LeRobot v3 datasets must include an episodes.parquet file." + ) + return + + try: + df = pd.read_parquet(str(episodes_file)) + except Exception as exc: + self.errors.append( + f"Failed to read meta/episodes.parquet: {exc}" + ) + return + + missing = [ + col for col in REQUIRED_EPISODES_COLUMNS if col not in df.columns + ] + if missing: + self.errors.append( + f"meta/episodes.parquet is missing required columns: {missing}. " + f"Present columns: {sorted(df.columns.tolist())}" + ) + + # ------------------------------------------------------------------ + # Check 3: feature shapes + # ------------------------------------------------------------------ + + def _check_feature_shapes(self) -> None: + if self._info is None: + return + features = self._info.get("features", {}) + if not isinstance(features, dict): + return + + for name, defn in features.items(): + if not isinstance(defn, dict): + continue + shape = defn.get("shape") + if isinstance(shape, list) and len(shape) == 0: + self.errors.append( + f"Feature '{name}' has an empty shape (shape: []). " + "Scalar features should use shape: [1]." + ) + + # ------------------------------------------------------------------ + # Check 4: path templates + # ------------------------------------------------------------------ + + def _check_path_templates(self) -> None: + if self._info is None: + return + + data_path = self._info.get("data_path") + if data_path is not None: + self._validate_template( + str(data_path), + "data_path", + REQUIRED_DATA_PATH_PLACEHOLDERS, + ) + + video_path = self._info.get("video_path") + if video_path is not None: + self._validate_template( + str(video_path), + "video_path", + REQUIRED_VIDEO_PATH_PLACEHOLDERS, + ) + + def _validate_template( + self, + template: str, + field: str, + required: Set[str], + ) -> None: + # Extract placeholder stems (everything before the first colon or }) + found_stems = set() + for match in re.finditer(r"\{([^}:]+)", template): + found_stems.add("{" + match.group(1)) + + missing = required - found_stems + if missing: + self.errors.append( + f"info.json '{field}' template is missing required " + f"placeholders {sorted(missing)}. " + f"Template: '{template}'" + ) + + # ------------------------------------------------------------------ + # Check 5: video files exist + # ------------------------------------------------------------------ + + def _check_video_files_exist(self) -> None: + if self._info is None: + return + + video_path_tpl = self._info.get("video_path") + if video_path_tpl is None: + return + + video_keys = self._get_video_keys() + if not video_keys: + return + + episodes_file = self._meta_dir() / "episodes.parquet" + if not episodes_file.exists(): + return # Already flagged in check 2 + + try: + episodes_df = pd.read_parquet(str(episodes_file)) + except Exception: + return + + if "episode_index" not in episodes_df.columns: + return + + chunks_size = self._info.get("chunks_size", 1000) + missing_files: List[str] = [] + + for _, row in episodes_df.iterrows(): + ep_idx = int(row["episode_index"]) + ep_chunk = int( + row.get("data/chunk_index", ep_idx // chunks_size) + ) + + for vkey in video_keys: + try: + rendered = video_path_tpl.format( + episode_chunk=ep_chunk, + episode_index=ep_idx, + video_key=vkey, + ) + except KeyError: + continue # Template uses non-standard placeholders + + if not (self.dataset_path / rendered).exists(): + missing_files.append(rendered) + + if len(missing_files) > 10: + missing_files.append("... (truncated)") + break + + if missing_files: + self.errors.append( + f"Missing video files ({len(missing_files)} not found):\n" + + "\n".join(f" {p}" for p in missing_files) + ) + + # ------------------------------------------------------------------ + # Check 6: timestamp consistency + # ------------------------------------------------------------------ + + def _check_timestamp_consistency(self) -> None: + data_dir = self._data_dir() + if not data_dir.exists(): + return + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return + + overall_mode: Optional[bool] = None # True=absolute, False=relative + inconsistent: List[str] = [] + + for pf in parquet_files: + try: + df = pd.read_parquet(str(pf), columns=["timestamp"]) + except Exception: + continue + if df.empty: + continue + + has_abs = bool( + (df["timestamp"] > _ABSOLUTE_TIMESTAMP_THRESHOLD).any() + ) + has_rel = bool( + (df["timestamp"] <= _ABSOLUTE_TIMESTAMP_THRESHOLD).any() + ) + + if has_abs and has_rel: + inconsistent.append( + f" {pf.name}: mix of absolute and relative timestamps" + ) + continue + + file_mode = has_abs + if overall_mode is None: + overall_mode = file_mode + elif overall_mode != file_mode: + inconsistent.append( + f" {pf.name}: " + f"{'absolute' if file_mode else 'relative'} timestamps " + f"but earlier chunks use " + f"{'absolute' if overall_mode else 'relative'}" + ) + + if inconsistent: + self.errors.append( + "Timestamp inconsistency in data parquet files. " + "All files must use either relative (seconds from episode " + "start) or absolute (Unix epoch) timestamps:\n" + + "\n".join(inconsistent) + ) + + # ------------------------------------------------------------------ + # Check 7: episode contiguity + # ------------------------------------------------------------------ + + def _check_episode_contiguity(self) -> None: + data_dir = self._data_dir() + if not data_dir.exists(): + return + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return + + non_contiguous: List[str] = [] + + for pf in parquet_files: + try: + df = pd.read_parquet(str(pf), columns=["episode_index"]) + except Exception: + continue + if df.empty: + continue + + seen: set[int] = set() + prev: Optional[int] = None + for ep_idx in df["episode_index"]: + ep_idx = int(ep_idx) + if ep_idx != prev: + if ep_idx in seen: + non_contiguous.append( + f" {pf.name}: episode_index={ep_idx} " + f"appears non-contiguously" + ) + break + seen.add(ep_idx) + prev = ep_idx + + if non_contiguous: + self.errors.append( + "Non-contiguous episode rows in data parquet files. " + "All rows for each episode_index must be grouped " + "together:\n" + "\n".join(non_contiguous) + ) diff --git a/lerobot_validator/validator.py b/lerobot_validator/validator.py index 6d5b60c..67f6dc4 100644 --- a/lerobot_validator/validator.py +++ b/lerobot_validator/validator.py @@ -10,6 +10,7 @@ from lerobot_validator.metadata_validator import MetadataValidator from lerobot_validator.annotation_validator import AnnotationValidator from lerobot_validator.lerobot_checks import LerobotDatasetChecker +from lerobot_validator.v3_metadata_checker import LerobotV3MetadataChecker class LerobotDatasetValidator: @@ -49,6 +50,7 @@ def __init__( self.metadata_validator = MetadataValidator(self.metadata_path) self.annotation_validator = AnnotationValidator(self.annotation_path) self.lerobot_checker = LerobotDatasetChecker(self.dataset_path) + self.v3_checker = LerobotV3MetadataChecker(self.dataset_path) self.errors: List[str] = [] @@ -65,11 +67,13 @@ def validate(self) -> bool: metadata_valid = self.metadata_validator.validate() annotation_valid = self.annotation_validator.validate() lerobot_valid = self.lerobot_checker.validate() + v3_valid = self.v3_checker.validate() # Collect errors self.errors.extend(self.metadata_validator.get_errors()) self.errors.extend(self.annotation_validator.get_errors()) self.errors.extend(self.lerobot_checker.get_errors()) + self.errors.extend(self.v3_checker.get_errors()) # If basic validations pass and annotations exist, run cross-validation if metadata_valid and annotation_valid and self.annotation_validator.get_annotations(): diff --git a/tests/test_integration.py b/tests/test_integration.py index 3014467..0a4b41e 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -13,21 +13,27 @@ def create_test_dataset(tmpdir): """Create a minimal test dataset structure.""" dataset_path = Path(tmpdir) / "dataset" dataset_path.mkdir() - + # Create meta folder meta_dir = dataset_path / "meta" meta_dir.mkdir() - - # Create data folder (lerobot datasets have parquet files here with task column) + + # Create data folder with a valid data chunk parquet data_dir = dataset_path / "data" - data_dir.mkdir() - # Create a dummy parquet file to satisfy the check - # In a real dataset, this would contain the task column - (data_dir / "chunk-000.parquet").touch() + chunk_dir = data_dir / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame({ + "episode_index": [0, 0, 1, 1], + "timestamp": [0.0, 0.033, 0.0, 0.033], + }).to_parquet(chunk_dir / "episode_000000.parquet", index=False) # Create info.json in meta folder (lerobot stores it there) info = { "fps": 30, + "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", + "features": { + "action": {"dtype": "float32", "shape": [7]}, + }, "episodes": { "ep_001": {"duration": 10.0, "num_frames": 300}, "ep_002": {"duration": 5.0, "num_frames": 150}, @@ -36,6 +42,19 @@ def create_test_dataset(tmpdir): with open(meta_dir / "info.json", "w") as f: json.dump(info, f) + # Create tasks.parquet + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + meta_dir / "tasks.parquet", index=False + ) + + # Create episodes.parquet with required v3 columns + pd.DataFrame({ + "episode_index": [0, 1], + "data/chunk_index": [0, 0], + "data/file_index": [0, 1], + "tasks": [["default"], ["default"]], + }).to_parquet(meta_dir / "episodes.parquet", index=False) + return dataset_path diff --git a/tests/test_is_eval_data_consistency.py b/tests/test_is_eval_data_consistency.py index d80cadc..4ef99b3 100644 --- a/tests/test_is_eval_data_consistency.py +++ b/tests/test_is_eval_data_consistency.py @@ -13,25 +13,44 @@ def create_test_dataset(tmpdir): """Create a minimal test dataset structure.""" dataset_path = Path(tmpdir) / "dataset" dataset_path.mkdir() - + # Create meta folder meta_dir = dataset_path / "meta" meta_dir.mkdir() - - # Create data folder (lerobot datasets have parquet files here with task column) - data_dir = dataset_path / "data" - data_dir.mkdir() - # Create a dummy parquet file to satisfy the check - # In a real dataset, this would contain the task column - (data_dir / "chunk-000.parquet").touch() - # Create info.json in meta folder (lerobot stores it there) + # Create data folder with a valid data chunk parquet + data_dir = dataset_path / "data" + chunk_dir = data_dir / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame({ + "episode_index": [0, 0, 1, 1], + "timestamp": [0.0, 0.033, 0.0, 0.033], + }).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + # Create info.json in meta folder info = { "fps": 30, + "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", + "features": { + "action": {"dtype": "float32", "shape": [7]}, + }, } with open(meta_dir / "info.json", "w") as f: json.dump(info, f) + # Create tasks.parquet + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + meta_dir / "tasks.parquet", index=False + ) + + # Create episodes.parquet with required v3 columns + pd.DataFrame({ + "episode_index": [0, 1], + "data/chunk_index": [0, 0], + "data/file_index": [0, 1], + "tasks": [["default"], ["default"]], + }).to_parquet(meta_dir / "episodes.parquet", index=False) + return dataset_path diff --git a/tests/test_v3_metadata_checker.py b/tests/test_v3_metadata_checker.py new file mode 100644 index 0000000..0d9404d --- /dev/null +++ b/tests/test_v3_metadata_checker.py @@ -0,0 +1,368 @@ +"""Tests for LeRobot v3 metadata checker.""" + +import json +import tempfile +from pathlib import Path +from typing import Any, Dict + +import pandas as pd + +from lerobot_validator.v3_metadata_checker import LerobotV3MetadataChecker + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_dataset(tmpdir: str) -> Path: + root = Path(tmpdir) / "dataset" + root.mkdir() + (root / "meta").mkdir() + (root / "data").mkdir() + return root + + +def _write_info(root: Path, info: Dict[str, Any]) -> None: + with open(root / "meta" / "info.json", "w") as f: + json.dump(info, f) + + +def _minimal_info() -> Dict[str, Any]: + return { + "fps": 30, + "chunks_size": 1000, + "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", + "video_path": "videos/{video_key}/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.mp4", + "features": { + "observation.images.top": { + "dtype": "video", + "shape": [480, 640, 3], + }, + "action": { + "dtype": "float32", + "shape": [7], + }, + }, + } + + +def _write_tasks_parquet(root: Path) -> None: + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + root / "meta" / "tasks.parquet", index=False + ) + + +def _write_episodes_parquet(root: Path) -> None: + pd.DataFrame( + { + "episode_index": [0, 1], + "data/chunk_index": [0, 0], + "data/file_index": [0, 1], + "tasks": [["default"], ["default"]], + } + ).to_parquet(root / "meta" / "episodes.parquet", index=False) + + +# --------------------------------------------------------------------------- +# Check 1: tasks.parquet +# --------------------------------------------------------------------------- + + +def test_tasks_parquet_present_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any("tasks.parquet" in e for e in checker.get_errors()) + + +def test_tasks_jsonl_only_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + (root / "meta" / "tasks.jsonl").write_text( + '{"task_index": 0, "task": "pick"}\n' + ) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("tasks.jsonl" in e for e in checker.get_errors()) + + +def test_no_tasks_file_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("tasks.parquet" in e for e in checker.get_errors()) + + +# --------------------------------------------------------------------------- +# Check 2: episodes parquet columns +# --------------------------------------------------------------------------- + + +def test_episodes_parquet_missing_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("episodes.parquet" in e for e in checker.get_errors()) + + +def test_episodes_parquet_missing_column_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + pd.DataFrame( + { + "episode_index": [0], + "data/chunk_index": [0], + "data/file_index": [0], + # missing "tasks" column + } + ).to_parquet(root / "meta" / "episodes.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "tasks" in e and "missing" in e.lower() + for e in checker.get_errors() + ) + + +def test_episodes_parquet_all_columns_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_episodes_parquet(root) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any("episodes.parquet" in e for e in checker.get_errors()) + + +# --------------------------------------------------------------------------- +# Check 3: feature shapes +# --------------------------------------------------------------------------- + + +def test_empty_feature_shape_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["action"]["shape"] = [] + _write_info(root, info) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("empty shape" in e for e in checker.get_errors()) + + +def test_valid_scalar_shape_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["action"]["shape"] = [1] + _write_info(root, info) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any("empty shape" in e for e in checker.get_errors()) + + +# --------------------------------------------------------------------------- +# Check 4: path templates +# --------------------------------------------------------------------------- + + +def test_data_path_missing_placeholder_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + # missing {episode_chunk} + info["data_path"] = "data/chunk-000/episode_{episode_index:06d}.parquet" + _write_info(root, info) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "episode_chunk" in e and "data_path" in e + for e in checker.get_errors() + ) + + +def test_video_path_missing_video_key_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["video_path"] = ( + "videos/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.mp4" + ) + _write_info(root, info) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "video_key" in e and "video_path" in e + for e in checker.get_errors() + ) + + +def test_valid_path_templates_pass(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any("template" in e for e in checker.get_errors()) + + +# --------------------------------------------------------------------------- +# Check 5: video files exist +# --------------------------------------------------------------------------- + + +def test_missing_video_files_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_episodes_parquet(root) + # no actual video files + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("Missing video files" in e for e in checker.get_errors()) + + +def test_present_video_files_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_episodes_parquet(root) + + for ep_idx in [0, 1]: + vdir = ( + root + / "videos" + / "observation.images.top" + / "chunk-000" + ) + vdir.mkdir(parents=True, exist_ok=True) + (vdir / f"episode_{ep_idx:06d}.mp4").write_bytes(b"\x00") + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "Missing video files" in e for e in checker.get_errors() + ) + + +# --------------------------------------------------------------------------- +# Check 6: timestamp consistency +# --------------------------------------------------------------------------- + + +def test_relative_timestamps_consistent_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0], + "timestamp": [0.0, 0.033, 0.066], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "Timestamp inconsistency" in e for e in checker.get_errors() + ) + + +def test_mixed_timestamp_modes_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + # relative + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [0.0, 0.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + # absolute (Unix epoch) + pd.DataFrame( + { + "episode_index": [1, 1], + "timestamp": [1_700_000_000.0, 1_700_000_000.033], + } + ).to_parquet(chunk_dir / "episode_000001.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "Timestamp inconsistency" in e for e in checker.get_errors() + ) + + +# --------------------------------------------------------------------------- +# Check 7: episode contiguity +# --------------------------------------------------------------------------- + + +def test_contiguous_episodes_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0, 1, 1, 1], + "timestamp": [0.0, 0.033, 0.066, 0.0, 0.033, 0.066], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "Non-contiguous" in e for e in checker.get_errors() + ) + + +def test_non_contiguous_episodes_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 1, 0], # non-contiguous + "timestamp": [0.0, 0.0, 0.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "Non-contiguous" in e for e in checker.get_errors() + ) From 3777ed14b62759e8e75fe521f0d0f32a6a2acf8d Mon Sep 17 00:00:00 2001 From: Chandra Kuchi Date: Wed, 18 Mar 2026 18:01:25 +0000 Subject: [PATCH 2/6] fix: reject null start_timestamp in metadata validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Null/missing start_timestamp values are now flagged as errors instead of being silently skipped. Downstream converters require a valid collection timestamp for every episode — catching this at validation time prevents runtime failures. Co-Authored-By: Claude Opus 4.6 (1M context) --- lerobot_validator/metadata_validator.py | 8 +++++++- tests/test_metadata_validator.py | 26 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/lerobot_validator/metadata_validator.py b/lerobot_validator/metadata_validator.py index 337b53d..c23f467 100644 --- a/lerobot_validator/metadata_validator.py +++ b/lerobot_validator/metadata_validator.py @@ -139,8 +139,14 @@ def _check_start_timestamp_format(self) -> None: for idx, row in self.df.iterrows(): timestamp = row.get("start_timestamp") - # Skip if missing (will be caught by required columns check) + # Null timestamps are not allowed — every episode must have a + # valid collection start time. if pd.isna(timestamp): + episode_id = row.get("episode_id", f"row_{idx}") + invalid_timestamps.append( + (idx, episode_id, timestamp, + "start_timestamp is missing/null (every episode requires a valid timestamp)") + ) continue # Try to convert to float (epoch time should be numeric) diff --git a/tests/test_metadata_validator.py b/tests/test_metadata_validator.py index 797d448..4856411 100644 --- a/tests/test_metadata_validator.py +++ b/tests/test_metadata_validator.py @@ -227,6 +227,32 @@ def test_invalid_gcs_uri_missing_path(): assert any("must include path after bucket" in err for err in errors) +def test_null_start_timestamp_fails(): + """Test that null/missing start_timestamp is rejected.""" + with tempfile.TemporaryDirectory() as tmpdir: + metadata_path = Path(tmpdir) / "custom_metadata.csv" + + df = pd.DataFrame( + { + "episode_index": [0, 1], + "operator_id": ["op1", "op1"], + "is_eval_episode": [False, False], + "episode_id": ["ep_001", "ep_002"], + "start_timestamp": [1730455200, None], + "checkpoint_path": ["", ""], + "success": [True, False], + "station_id": ["station_1", "station_1"], + "robot_id": ["robot_1", "robot_1"], + } + ) + df.to_csv(metadata_path, index=False) + + validator = MetadataValidator(metadata_path) + assert validator.validate() is False + errors = validator.get_errors() + assert any("missing/null" in e for e in errors) + + def test_valid_gcs_uri_formats(): """Test validation with various valid GCS URI formats.""" with tempfile.TemporaryDirectory() as tmpdir: From 1148e6e488085b492f5630bc9bf929eca7f254ba Mon Sep 17 00:00:00 2001 From: Chandra Kuchi Date: Mon, 23 Mar 2026 03:15:49 +0000 Subject: [PATCH 3/6] feat: add checks for video columns in data/episode parquet MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add two new validation checks to catch dataset issues that cause runtime failures in the LeRobot MCAP converter: 8. Data parquet must not contain video struct columns — these cause CastError when the dataset loader tries to match features schema. 9. Episode parquet must include videos/{key}/chunk_index and videos/{key}/from_timestamp for each video feature — the dataset loader needs these to resolve video files and timestamps. Both issues were discovered during fpvlabs dataset ingestion and required workarounds in the converter. Co-Authored-By: Claude Opus 4.6 (1M context) --- lerobot_validator/v3_metadata_checker.py | 106 +++++++++++++++++++++ tests/test_v3_metadata_checker.py | 116 +++++++++++++++++++++++ 2 files changed, 222 insertions(+) diff --git a/lerobot_validator/v3_metadata_checker.py b/lerobot_validator/v3_metadata_checker.py index 76a0a2b..4128724 100644 --- a/lerobot_validator/v3_metadata_checker.py +++ b/lerobot_validator/v3_metadata_checker.py @@ -9,6 +9,8 @@ 5. Video files referenced by the dataset exist 6. Timestamp consistency across data parquet files 7. Episode row contiguity in data parquet files + 8. Data parquet files must not contain video struct columns + 9. Episodes parquet must include per-video-key metadata columns """ import json @@ -89,6 +91,8 @@ def validate(self) -> bool: self._check_video_files_exist() self._check_timestamp_consistency() self._check_episode_contiguity() + self._check_no_video_columns_in_data_parquet() + self._check_episode_video_metadata_columns() return len(self.errors) == 0 @@ -404,3 +408,105 @@ def _check_episode_contiguity(self) -> None: "All rows for each episode_index must be grouped " "together:\n" + "\n".join(non_contiguous) ) + + # ------------------------------------------------------------------ + # Check 8: no video struct columns in data parquet + # ------------------------------------------------------------------ + + def _check_no_video_columns_in_data_parquet(self) -> None: + """Data parquet files must not contain video feature columns. + + Video features (dtype="video" in info.json) are stored as separate + MP4 files. If the data parquet also contains these columns (typically + as struct), the LeRobot dataset loader will fail with + a CastError because the column names don't match the expected schema. + """ + data_dir = self._data_dir() + if not data_dir.exists(): + return + + video_keys = set(self._get_video_keys()) + if not video_keys: + return + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return + + # Only need to check the first file — schema is consistent across chunks. + pf = parquet_files[0] + try: + df = pd.read_parquet(str(pf), columns=None, nrows=0) + except TypeError: + # Older pandas versions don't support nrows; read full file. + df = pd.read_parquet(str(pf)) + except Exception: + return + + offending = sorted(video_keys & set(df.columns)) + if offending: + self.errors.append( + f"Data parquet files contain video feature columns: " + f"{offending}. Video features (dtype='video' in info.json) " + f"must NOT appear as columns in data parquet files — they " + f"are stored as separate MP4 files. Remove these columns " + f"from the data parquet." + ) + + # ------------------------------------------------------------------ + # Check 9: episode video metadata columns + # ------------------------------------------------------------------ + + def _check_episode_video_metadata_columns(self) -> None: + """Episode parquet must include per-video-key metadata columns. + + For each video feature key, the episode parquet should contain + ``videos/{key}/chunk_index`` and ``videos/{key}/from_timestamp`` + so the dataset loader can resolve the correct video file and + starting timestamp for each episode. + """ + video_keys = self._get_video_keys() + if not video_keys: + return + + episodes_dir = self._meta_dir() / "episodes" + episodes_file = self._meta_dir() / "episodes.parquet" + + # v3 datasets may use either a flat episodes.parquet or a chunked + # episodes/ directory. + ep_columns: Optional[List[str]] = None + if episodes_dir.exists(): + parquet_files = sorted(episodes_dir.glob("**/*.parquet")) + if parquet_files: + try: + df = pd.read_parquet(str(parquet_files[0])) + ep_columns = df.columns.tolist() + except Exception: + return + elif episodes_file.exists(): + try: + df = pd.read_parquet(str(episodes_file)) + ep_columns = df.columns.tolist() + except Exception: + return + else: + return # Already flagged in check 2 + + if ep_columns is None: + return + + missing: List[str] = [] + for vkey in video_keys: + for suffix in ("chunk_index", "from_timestamp"): + col = f"videos/{vkey}/{suffix}" + if col not in ep_columns: + missing.append(col) + + if missing: + self.errors.append( + f"Episode parquet is missing video metadata columns: " + f"{missing}. For each video feature, the episode parquet " + f"must include 'videos/{{key}}/chunk_index' and " + f"'videos/{{key}}/from_timestamp' columns so the dataset " + f"loader can resolve video files and timestamps." + ) diff --git a/tests/test_v3_metadata_checker.py b/tests/test_v3_metadata_checker.py index 0d9404d..317ebf0 100644 --- a/tests/test_v3_metadata_checker.py +++ b/tests/test_v3_metadata_checker.py @@ -366,3 +366,119 @@ def test_non_contiguous_episodes_fails(): assert any( "Non-contiguous" in e for e in checker.get_errors() ) + + +# --------------------------------------------------------------------------- +# Check 8: no video struct columns in data parquet +# --------------------------------------------------------------------------- + + +def test_data_parquet_without_video_columns_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "action": [[0.1] * 7], + "episode_index": [0], + "timestamp": [0.0], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "video feature columns" in e for e in checker.get_errors() + ) + + +def test_data_parquet_with_video_struct_columns_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + # Simulate a data parquet that erroneously includes the video + # feature key as a struct column (path + timestamp). + pd.DataFrame( + { + "action": [[0.1] * 7], + "observation.images.top": [ + {"path": "videos/top/chunk-000/ep_000000.mp4", "timestamp": 0.0} + ], + "episode_index": [0], + "timestamp": [0.0], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "video feature columns" in e for e in checker.get_errors() + ) + + +# --------------------------------------------------------------------------- +# Check 9: episode video metadata columns +# --------------------------------------------------------------------------- + + +def test_episode_parquet_with_video_metadata_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + pd.DataFrame( + { + "episode_index": [0, 1], + "data/chunk_index": [0, 0], + "data/file_index": [0, 1], + "tasks": [["default"], ["default"]], + "videos/observation.images.top/chunk_index": [0, 0], + "videos/observation.images.top/from_timestamp": [0.0, 0.0], + } + ).to_parquet(root / "meta" / "episodes.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "video metadata columns" in e for e in checker.get_errors() + ) + + +def test_episode_parquet_missing_video_metadata_fails(): + """Episode parquet without videos/{key}/chunk_index and from_timestamp.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_episodes_parquet(root) # Has data/* cols but no videos/* cols + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "video metadata columns" in e for e in checker.get_errors() + ) + + +def test_episode_parquet_chunked_dir_missing_video_metadata_fails(): + """Same check works for chunked episodes/ directory layout.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + ep_dir = root / "meta" / "episodes" / "chunk-000" + ep_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0], + "data/chunk_index": [0], + "data/file_index": [0], + "tasks": [["default"]], + } + ).to_parquet(ep_dir / "file-000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "video metadata columns" in e for e in checker.get_errors() + ) From 30796e572091abd33d28a380c50c83d75ca31fb1 Mon Sep 17 00:00:00 2001 From: Chandra Kuchi Date: Mon, 23 Mar 2026 05:12:20 +0000 Subject: [PATCH 4/6] feat: add P0 v3 validators for LeRobot dataset pre-ingestion checks Add 6 P0 validators as lerobot_validator/v3_checks.py to catch the most common data quality issues before partner upload: - V1 validate_tasks_format: error if no tasks file, warn if only jsonl - V2 validate_codebase_version: require codebase_version starts with v3. - V5 validate_feature_shapes: reject shape=[], require 3-element image shapes - V7 validate_timestamps: reject absolute Unix epoch in data parquets - V11 validate_custom_metadata_csv: require episode_index/episode_id, reject null/duplicate episode_ids - V12 validate_start_timestamp: require plausible Unix epoch floats Wire validate_v3_dataset() into the LerobotDatasetValidator orchestrator so errors surface automatically, and add get_warnings() support. Update existing test fixtures to include codebase_version so integration tests pass with the new checks. Co-Authored-By: Claude Opus 4.6 (1M context) --- lerobot_validator/__init__.py | 9 +- lerobot_validator/v3_checks.py | 548 ++++++++++++++++++++++ lerobot_validator/validator.py | 21 + tests/test_integration.py | 1 + tests/test_is_eval_data_consistency.py | 1 + tests/test_v3_checks.py | 615 +++++++++++++++++++++++++ 6 files changed, 1194 insertions(+), 1 deletion(-) create mode 100644 lerobot_validator/v3_checks.py create mode 100644 tests/test_v3_checks.py diff --git a/lerobot_validator/__init__.py b/lerobot_validator/__init__.py index 80f8cdf..3281f1a 100644 --- a/lerobot_validator/__init__.py +++ b/lerobot_validator/__init__.py @@ -10,6 +10,13 @@ from lerobot_validator.validator import LerobotDatasetValidator from lerobot_validator.gcp_path import compute_gcp_path from lerobot_validator.v3_metadata_checker import LerobotV3MetadataChecker +from lerobot_validator.v3_checks import Issue, validate_v3_dataset -__all__ = ["LerobotDatasetValidator", "compute_gcp_path", "LerobotV3MetadataChecker"] +__all__ = [ + "LerobotDatasetValidator", + "compute_gcp_path", + "LerobotV3MetadataChecker", + "Issue", + "validate_v3_dataset", +] diff --git a/lerobot_validator/v3_checks.py b/lerobot_validator/v3_checks.py new file mode 100644 index 0000000..7b9185c --- /dev/null +++ b/lerobot_validator/v3_checks.py @@ -0,0 +1,548 @@ +""" +P0 validators for LeRobot v3 datasets. + +Each validator function takes a dataset path and returns a list of Issue objects. +Issues have a level ("error" or "warning") and a descriptive message. + +Validators: + V1: validate_tasks_format -- meta/tasks.parquet vs tasks.jsonl + V2: validate_codebase_version -- info.json codebase_version starts with "v3." + V5: validate_feature_shapes -- reject shape=[], image features need 3-element shape + V7: validate_timestamps -- reject absolute Unix epoch timestamps in data parquet + V11: validate_custom_metadata_csv -- required columns, no null/duplicate episode_ids + V12: validate_start_timestamp -- start_timestamp must be plausible Unix epoch floats +""" + +import json +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +import pandas as pd +from cloudpathlib import AnyPath, CloudPath + +logger = logging.getLogger(__name__) + +# Timestamps at or above this value are treated as absolute Unix epoch (year 2000+). +_UNIX_EPOCH_THRESHOLD = 946_684_800.0 + +# Upper bound for plausible Unix epoch timestamps (year 2100). +_UNIX_EPOCH_MAX = 4_102_444_800.0 + +# Required columns in custom_metadata.csv (minimum set for rejection). +_REQUIRED_METADATA_COLUMNS = ["episode_index", "episode_id"] + + +@dataclass +class Issue: + """A single validation finding.""" + + level: str # "error" or "warning" + validator: str # e.g. "validate_tasks_format" + message: str + + def __str__(self) -> str: + return f"[{self.level}] {self.validator}: {self.message}" + + +# --------------------------------------------------------------------------- +# V1: validate_tasks_format +# --------------------------------------------------------------------------- + + +def validate_tasks_format(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that meta/tasks.parquet exists; warn if only tasks.jsonl is present. + + - Error: neither tasks.parquet nor tasks.jsonl exists. + - Warning: tasks.jsonl exists but tasks.parquet does not (old format). + - Pass: tasks.parquet exists. + """ + root = _to_path(dataset_path) + meta = root / "meta" + issues: List[Issue] = [] + + has_parquet = (meta / "tasks.parquet").exists() + has_jsonl = (meta / "tasks.jsonl").exists() + + if not has_parquet and not has_jsonl: + issues.append( + Issue( + level="error", + validator="validate_tasks_format", + message=( + "meta/tasks.parquet not found. " + "LeRobot v3 datasets must include a tasks.parquet file." + ), + ) + ) + elif has_jsonl and not has_parquet: + issues.append( + Issue( + level="warning", + validator="validate_tasks_format", + message=( + "meta/tasks.parquet not found but meta/tasks.jsonl is present. " + "The converter will auto-convert, but you should migrate to " + "tasks.parquet before uploading." + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V2: validate_codebase_version +# --------------------------------------------------------------------------- + + +def validate_codebase_version(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that info.json contains codebase_version starting with 'v3.'. + + - Error: codebase_version is missing or does not start with 'v3.'. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + issues.append( + Issue( + level="error", + validator="validate_codebase_version", + message="meta/info.json not found or not valid JSON.", + ) + ) + return issues + + version = info.get("codebase_version") + if version is None: + issues.append( + Issue( + level="error", + validator="validate_codebase_version", + message="meta/info.json is missing 'codebase_version' field.", + ) + ) + elif not str(version).startswith("v3."): + issues.append( + Issue( + level="error", + validator="validate_codebase_version", + message=( + f"codebase_version is '{version}' but must start with 'v3.'. " + "Only LeRobot v3 datasets are supported." + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V5: validate_feature_shapes +# --------------------------------------------------------------------------- + + +def validate_feature_shapes(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check feature shapes in info.json. + + - Error: a feature has shape=[] (zero-dimensional). + - Error: an image/video feature does not have a 3-element shape. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + return issues + + features = info.get("features", {}) + if not isinstance(features, dict): + return issues + + for name, defn in features.items(): + if not isinstance(defn, dict): + continue + + shape = defn.get("shape") + dtype = defn.get("dtype", "") + + # Reject 0-D shapes + if isinstance(shape, list) and len(shape) == 0: + issues.append( + Issue( + level="error", + validator="validate_feature_shapes", + message=( + f"Feature '{name}' has an empty shape (shape: []). " + "Scalar features should use shape: [1]." + ), + ) + ) + continue + + # Image/video features must have exactly 3 dimensions (H, W, C) or (C, H, W) + if dtype in ("video", "image") and isinstance(shape, list) and len(shape) != 3: + issues.append( + Issue( + level="error", + validator="validate_feature_shapes", + message=( + f"Feature '{name}' (dtype='{dtype}') has shape {shape} " + f"but image/video features must have a 3-element shape " + f"(e.g. [H, W, C])." + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V7: validate_timestamps +# --------------------------------------------------------------------------- + + +def validate_timestamps(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that data parquet timestamps are relative, not absolute Unix epoch. + + Reads the first data parquet file and checks the first timestamp value. + + - Error: timestamps are absolute Unix epoch (>= 946684800.0). + - Warning: timestamps are not monotonically increasing within an episode. + - Warning: non-zero starting offset within an episode (> 1 second). + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + data_dir = root / "data" + if not data_dir.exists(): + return issues + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return issues + + # Read the first data parquet file + pf = parquet_files[0] + try: + df = pd.read_parquet(str(pf), columns=["timestamp", "episode_index"]) + except Exception: + try: + df = pd.read_parquet(str(pf), columns=["timestamp"]) + except Exception: + return issues + + if df.empty or "timestamp" not in df.columns: + return issues + + # Check for absolute Unix epoch timestamps + first_ts = float(df["timestamp"].iloc[0]) + if first_ts >= _UNIX_EPOCH_THRESHOLD: + issues.append( + Issue( + level="error", + validator="validate_timestamps", + message=( + f"Timestamps appear to be absolute Unix epoch values " + f"(first value: {first_ts}). LeRobot v3 requires " + f"per-episode-relative timestamps starting near 0.0. " + f"Absolute timestamps cause video decode failures." + ), + ) + ) + return issues # No point checking monotonicity if timestamps are wrong type + + # Check per-episode properties if episode_index is available + if "episode_index" in df.columns: + for ep_idx, ep_df in df.groupby("episode_index"): + ts = ep_df["timestamp"].values + + # Warn if starting offset > 1 second + if len(ts) > 0 and ts[0] > 1.0: + issues.append( + Issue( + level="warning", + validator="validate_timestamps", + message=( + f"Episode {ep_idx} starts at timestamp {ts[0]:.3f}s " + f"(expected near 0.0)." + ), + ) + ) + + # Warn if not monotonically increasing + if len(ts) > 1: + diffs = ts[1:] - ts[:-1] + if (diffs < 0).any(): + issues.append( + Issue( + level="warning", + validator="validate_timestamps", + message=( + f"Episode {ep_idx} has non-monotonically " + f"increasing timestamps." + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V11: validate_custom_metadata_csv +# --------------------------------------------------------------------------- + + +def validate_custom_metadata_csv(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that meta/custom_metadata.csv exists and has required columns. + + - Error: file missing. + - Error: required columns (episode_index, episode_id) absent. + - Error: null episode_id values. + - Error: duplicate episode_id values. + - Warning: other expected columns missing. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + csv_path = root / "meta" / "custom_metadata.csv" + if not csv_path.exists(): + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message="meta/custom_metadata.csv not found.", + ) + ) + return issues + + try: + df = pd.read_csv(str(csv_path)) + except Exception as exc: + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message=f"Failed to read meta/custom_metadata.csv: {exc}", + ) + ) + return issues + + # Check required columns + missing_required = [c for c in _REQUIRED_METADATA_COLUMNS if c not in df.columns] + if missing_required: + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message=( + f"meta/custom_metadata.csv is missing required columns: " + f"{missing_required}" + ), + ) + ) + return issues # Cannot do further checks without required columns + + # Check for null episode_id values + null_ids = df[df["episode_id"].isna()] + if len(null_ids) > 0: + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message=( + f"episode_id has null values at rows: " + f"{null_ids.index.tolist()}" + ), + ) + ) + + # Check for duplicate episode_id values + duplicates = df[df["episode_id"].duplicated(keep=False)] + if len(duplicates) > 0: + dup_ids = duplicates["episode_id"].unique().tolist() + issues.append( + Issue( + level="error", + validator="validate_custom_metadata_csv", + message=( + f"episode_id has duplicate values: {dup_ids}" + ), + ) + ) + + # Warn about other expected columns that are missing + all_expected = [ + "episode_index", + "operator_id", + "is_eval_episode", + "episode_id", + "start_timestamp", + "checkpoint_path", + "success", + "station_id", + "robot_id", + ] + missing_optional = [ + c for c in all_expected if c not in df.columns and c not in _REQUIRED_METADATA_COLUMNS + ] + if missing_optional: + issues.append( + Issue( + level="warning", + validator="validate_custom_metadata_csv", + message=( + f"meta/custom_metadata.csv is missing optional columns: " + f"{missing_optional}" + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# V12: validate_start_timestamp +# --------------------------------------------------------------------------- + + +def validate_start_timestamp(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that start_timestamp values are plausible Unix epoch floats. + + - Error: value is not a valid float. + - Error: value is below year-2000 threshold (likely relative, not absolute). + - Error: value is above year-2100 threshold. + - Error: value is null/missing. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + csv_path = root / "meta" / "custom_metadata.csv" + if not csv_path.exists(): + return issues # V11 already reports this + + try: + df = pd.read_csv(str(csv_path)) + except Exception: + return issues # V11 already reports this + + if "start_timestamp" not in df.columns: + return issues # V11 warns about missing columns + + invalid: List[str] = [] + for idx, row in df.iterrows(): + ts = row.get("start_timestamp") + episode_id = row.get("episode_id", f"row_{idx}") + + if pd.isna(ts): + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"start_timestamp is missing/null" + ) + continue + + try: + ts_float = float(ts) + except (ValueError, TypeError): + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"'{ts}' is not a valid float" + ) + continue + + if ts_float < _UNIX_EPOCH_THRESHOLD: + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"{ts_float} is below year-2000 threshold ({_UNIX_EPOCH_THRESHOLD}); " + f"likely a relative offset, not an absolute Unix timestamp" + ) + elif ts_float > _UNIX_EPOCH_MAX: + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"{ts_float} is above year-2100 threshold ({_UNIX_EPOCH_MAX})" + ) + + if invalid: + issues.append( + Issue( + level="error", + validator="validate_start_timestamp", + message=( + "start_timestamp must be a valid Unix epoch float " + f"(range {_UNIX_EPOCH_THRESHOLD} to {_UNIX_EPOCH_MAX}):\n" + + "\n".join(invalid) + ), + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# Convenience: run all P0 validators +# --------------------------------------------------------------------------- + +_P0_VALIDATORS = [ + validate_tasks_format, + validate_codebase_version, + validate_feature_shapes, + validate_timestamps, + validate_custom_metadata_csv, + validate_start_timestamp, +] + + +def validate_v3_dataset( + dataset_path: Union[str, Path, CloudPath], + thorough: bool = False, +) -> List[Issue]: + """Run all P0 validators and return a combined list of issues. + + Args: + dataset_path: Path to the lerobot dataset directory. + thorough: Reserved for future P2 checks that require video probing. + + Returns: + A list of Issue objects (errors and warnings). + """ + all_issues: List[Issue] = [] + for validator_fn in _P0_VALIDATORS: + try: + all_issues.extend(validator_fn(dataset_path)) + except Exception as exc: + logger.warning("Validator %s raised: %s", validator_fn.__name__, exc) + all_issues.append( + Issue( + level="error", + validator=validator_fn.__name__, + message=f"Validator raised an unexpected exception: {exc}", + ) + ) + return all_issues + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _to_path(dataset_path: Union[str, Path, CloudPath]) -> Any: + """Convert a string or Path to an AnyPath.""" + if isinstance(dataset_path, str): + return AnyPath(dataset_path) + return dataset_path + + +def _load_info(root: Any) -> Optional[Dict[str, Any]]: + """Load meta/info.json and return the parsed dict, or None on failure.""" + info_file = root / "meta" / "info.json" + if not info_file.exists(): + return None + try: + with info_file.open("r") as f: + return json.load(f) + except Exception: + return None diff --git a/lerobot_validator/validator.py b/lerobot_validator/validator.py index 67f6dc4..0574a1f 100644 --- a/lerobot_validator/validator.py +++ b/lerobot_validator/validator.py @@ -11,6 +11,7 @@ from lerobot_validator.annotation_validator import AnnotationValidator from lerobot_validator.lerobot_checks import LerobotDatasetChecker from lerobot_validator.v3_metadata_checker import LerobotV3MetadataChecker +from lerobot_validator.v3_checks import validate_v3_dataset class LerobotDatasetValidator: @@ -53,6 +54,7 @@ def __init__( self.v3_checker = LerobotV3MetadataChecker(self.dataset_path) self.errors: List[str] = [] + self.warnings: List[str] = [] def validate(self) -> bool: """ @@ -62,6 +64,7 @@ def validate(self) -> bool: True if all validations pass, False otherwise """ self.errors = [] + self.warnings = [] # Run individual validators metadata_valid = self.metadata_validator.validate() @@ -75,6 +78,14 @@ def validate(self) -> bool: self.errors.extend(self.lerobot_checker.get_errors()) self.errors.extend(self.v3_checker.get_errors()) + # Run P0 v3 validators + v3_issues = validate_v3_dataset(self.dataset_path) + for issue in v3_issues: + if issue.level == "error": + self.errors.append(f"[{issue.validator}] {issue.message}") + else: + self.warnings.append(f"[{issue.validator}] {issue.message}") + # If basic validations pass and annotations exist, run cross-validation if metadata_valid and annotation_valid and self.annotation_validator.get_annotations(): self._cross_validate() @@ -233,8 +244,18 @@ def get_errors(self) -> List[str]: """Get all validation errors.""" return self.errors + def get_warnings(self) -> List[str]: + """Get all validation warnings.""" + return self.warnings + def print_results(self) -> None: """Print validation results.""" + if self.warnings: + print(f"Warnings ({len(self.warnings)}):\n") + for i, warning in enumerate(self.warnings, 1): + print(f" {i}. {warning}") + print() + if len(self.errors) == 0: print("✓ All validations passed!") else: diff --git a/tests/test_integration.py b/tests/test_integration.py index 0a4b41e..cc506f7 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -30,6 +30,7 @@ def create_test_dataset(tmpdir): # Create info.json in meta folder (lerobot stores it there) info = { "fps": 30, + "codebase_version": "v3.0", "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", "features": { "action": {"dtype": "float32", "shape": [7]}, diff --git a/tests/test_is_eval_data_consistency.py b/tests/test_is_eval_data_consistency.py index 4ef99b3..14350a3 100644 --- a/tests/test_is_eval_data_consistency.py +++ b/tests/test_is_eval_data_consistency.py @@ -30,6 +30,7 @@ def create_test_dataset(tmpdir): # Create info.json in meta folder info = { "fps": 30, + "codebase_version": "v3.0", "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", "features": { "action": {"dtype": "float32", "shape": [7]}, diff --git a/tests/test_v3_checks.py b/tests/test_v3_checks.py new file mode 100644 index 0000000..eea8b8b --- /dev/null +++ b/tests/test_v3_checks.py @@ -0,0 +1,615 @@ +"""Tests for P0 v3 validators (lerobot_validator.v3_checks).""" + +import json +import tempfile +from pathlib import Path +from typing import Any, Dict, List + +import pandas as pd + +from lerobot_validator.v3_checks import ( + Issue, + validate_codebase_version, + validate_custom_metadata_csv, + validate_feature_shapes, + validate_start_timestamp, + validate_tasks_format, + validate_timestamps, + validate_v3_dataset, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_dataset(tmpdir: str) -> Path: + """Create a minimal dataset directory skeleton.""" + root = Path(tmpdir) / "dataset" + root.mkdir() + (root / "meta").mkdir() + (root / "data").mkdir() + return root + + +def _write_info(root: Path, info: Dict[str, Any]) -> None: + with open(root / "meta" / "info.json", "w") as f: + json.dump(info, f) + + +def _minimal_info(**overrides: Any) -> Dict[str, Any]: + info: Dict[str, Any] = { + "fps": 30, + "codebase_version": "v3.0", + "chunks_size": 1000, + "features": { + "observation.images.top": { + "dtype": "video", + "shape": [480, 640, 3], + }, + "action": { + "dtype": "float32", + "shape": [7], + }, + }, + } + info.update(overrides) + return info + + +def _write_tasks_parquet(root: Path) -> None: + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + root / "meta" / "tasks.parquet", index=False + ) + + +def _write_custom_metadata(root: Path, df: pd.DataFrame) -> None: + df.to_csv(root / "meta" / "custom_metadata.csv", index=False) + + +def _valid_metadata_df() -> pd.DataFrame: + return pd.DataFrame( + { + "episode_index": [0, 1], + "operator_id": ["op1", "op1"], + "is_eval_episode": [False, False], + "episode_id": ["ep_001", "ep_002"], + "start_timestamp": [1730455200.0, 1730458800.0], + "checkpoint_path": ["", ""], + "success": [True, False], + "station_id": ["station_1", "station_1"], + "robot_id": ["robot_1", "robot_1"], + } + ) + + +def _errors(issues: List[Issue]) -> List[Issue]: + return [i for i in issues if i.level == "error"] + + +def _warnings(issues: List[Issue]) -> List[Issue]: + return [i for i in issues if i.level == "warning"] + + +# =================================================================== +# V1: validate_tasks_format +# =================================================================== + + +class TestValidateTasksFormat: + def test_parquet_present_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + + issues = validate_tasks_format(root) + assert len(_errors(issues)) == 0 + + def test_neither_file_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + issues = validate_tasks_format(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "tasks.parquet not found" in errors[0].message + + def test_jsonl_only_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + (root / "meta" / "tasks.jsonl").write_text( + '{"task_index": 0, "task": "pick"}\n' + ) + + issues = validate_tasks_format(root) + assert len(_errors(issues)) == 0 + warnings = _warnings(issues) + assert len(warnings) == 1 + assert "tasks.jsonl" in warnings[0].message + + def test_both_files_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + (root / "meta" / "tasks.jsonl").write_text( + '{"task_index": 0, "task": "pick"}\n' + ) + + issues = validate_tasks_format(root) + assert len(issues) == 0 + + +# =================================================================== +# V2: validate_codebase_version +# =================================================================== + + +class TestValidateCodebaseVersion: + def test_v3_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v3.0")) + + issues = validate_codebase_version(root) + assert len(issues) == 0 + + def test_v3_minor_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v3.1.2")) + + issues = validate_codebase_version(root) + assert len(issues) == 0 + + def test_v2_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v2.1")) + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "v3." in errors[0].message + + def test_missing_version_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + del info["codebase_version"] + _write_info(root, info) + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "missing" in errors[0].message.lower() + + def test_no_info_json_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no info.json at all + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "info.json" in errors[0].message + + +# =================================================================== +# V5: validate_feature_shapes +# =================================================================== + + +class TestValidateFeatureShapes: + def test_valid_shapes_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_empty_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["action"]["shape"] = [] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "empty shape" in errors[0].message + assert "action" in errors[0].message + + def test_scalar_shape_1_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["scalar_feat"] = {"dtype": "float32", "shape": [1]} + _write_info(root, info) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_image_feature_2d_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["observation.images.top"]["shape"] = [640, 480] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "3-element shape" in errors[0].message + + def test_video_feature_4d_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["observation.images.top"]["shape"] = [1, 480, 640, 3] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "3-element shape" in errors[0].message + + def test_image_dtype_3d_shape_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["cam"] = {"dtype": "image", "shape": [480, 640, 3]} + _write_info(root, info) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_no_info_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no info.json + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + +# =================================================================== +# V7: validate_timestamps +# =================================================================== + + +class TestValidateTimestamps: + def test_relative_timestamps_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0], + "timestamp": [0.0, 0.033, 0.066], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + assert len(issues) == 0 + + def test_absolute_timestamps_error(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [1_700_000_000.0, 1_700_000_000.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "absolute Unix epoch" in errors[0].message + + def test_non_monotonic_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0], + "timestamp": [0.0, 0.066, 0.033], # non-monotonic + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("non-monotonically" in w.message for w in warnings) + + def test_large_starting_offset_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [5.0, 5.033], # starts at 5s, not near 0 + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("starts at timestamp" in w.message for w in warnings) + + def test_no_data_dir_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # data dir is empty (no parquet files) + + issues = validate_timestamps(root) + assert len(issues) == 0 + + def test_no_episode_index_column_still_checks_absolute(self): + """Even without episode_index column, absolute timestamps should be caught.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "timestamp": [1_700_000_000.0, 1_700_000_000.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "absolute Unix epoch" in errors[0].message + + +# =================================================================== +# V11: validate_custom_metadata_csv +# =================================================================== + + +class TestValidateCustomMetadataCsv: + def test_valid_metadata_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_custom_metadata_csv(root) + assert len(_errors(issues)) == 0 + + def test_missing_file_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "not found" in errors[0].message + + def test_missing_episode_index_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_id": ["ep_001", "ep_002"], + "operator_id": ["op1", "op1"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "episode_index" in errors[0].message + + def test_missing_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0, 1], + "operator_id": ["op1", "op1"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "episode_id" in errors[0].message + + def test_null_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "episode_id"] = None + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert any("null" in e.message for e in errors) + + def test_duplicate_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[1, "episode_id"] = "ep_001" # duplicate + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert any("duplicate" in e.message for e in errors) + + def test_missing_optional_columns_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0, 1], + "episode_id": ["ep_001", "ep_002"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("optional columns" in w.message for w in warnings) + + def test_all_columns_present_no_warnings(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_custom_metadata_csv(root) + warnings = _warnings(issues) + assert len(warnings) == 0 + + +# =================================================================== +# V12: validate_start_timestamp +# =================================================================== + + +class TestValidateStartTimestamp: + def test_valid_timestamps_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + def test_null_timestamp_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = None + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "missing/null" in errors[0].message + + def test_below_threshold_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = 100.0 # relative offset, not epoch + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "below year-2000 threshold" in errors[0].message + + def test_above_max_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = 5_000_000_000.0 # year ~2128 + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "above year-2100 threshold" in errors[0].message + + def test_non_numeric_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df["start_timestamp"] = df["start_timestamp"].astype(str) + df.loc[0, "start_timestamp"] = "not-a-number" + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "not a valid float" in errors[0].message + + def test_missing_csv_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no custom_metadata.csv + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + def test_missing_column_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0], + "episode_id": ["ep_001"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + +# =================================================================== +# validate_v3_dataset (combined runner) +# =================================================================== + + +class TestValidateV3Dataset: + def test_fully_valid_dataset_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_custom_metadata(root, _valid_metadata_df()) + + # Write relative timestamps + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [0.0, 0.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_v3_dataset(root) + errors = _errors(issues) + assert len(errors) == 0 + + def test_multiple_issues_collected(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # No info.json -> V2 error + # No tasks.parquet -> V1 error + # No custom_metadata.csv -> V11 error + + issues = validate_v3_dataset(root) + errors = _errors(issues) + # Should have at least V1 + V2 + V11 errors + assert len(errors) >= 3 + + def test_issue_str_representation(self): + issue = Issue(level="error", validator="test_validator", message="test message") + assert str(issue) == "[error] test_validator: test message" From afaf753456140bbb95f9bb580066c39c6f7238f1 Mon Sep 17 00:00:00 2001 From: Chandra Kuchi Date: Mon, 23 Mar 2026 05:16:25 +0000 Subject: [PATCH 5/6] =?UTF-8?q?fix:=20simplify=20v3=5Fchecks=20=E2=80=94?= =?UTF-8?q?=20use=20schemas.py=20constants,=20add=20Issue=20factories,=20s?= =?UTF-8?q?hare=20CSV=20reads?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Import REQUIRED_METADATA_COLUMNS from schemas.py instead of hardcoding - Add Issue.error() / Issue.warning() factory methods to reduce boilerplate - Share CSV DataFrame between V11 and V12 via _df_cache to avoid redundant reads - validate_timestamps checks only first episode for monotonicity (not all) - Remove unused `thorough` parameter from validate_v3_dataset - Make timestamp constants public (UNIX_EPOCH_THRESHOLD, UNIX_EPOCH_MAX) Co-Authored-By: Claude Opus 4.6 (1M context) --- lerobot_validator/v3_checks.py | 378 +++++++++++---------------------- 1 file changed, 129 insertions(+), 249 deletions(-) diff --git a/lerobot_validator/v3_checks.py b/lerobot_validator/v3_checks.py index 7b9185c..f41e15e 100644 --- a/lerobot_validator/v3_checks.py +++ b/lerobot_validator/v3_checks.py @@ -22,16 +22,18 @@ import pandas as pd from cloudpathlib import AnyPath, CloudPath +from lerobot_validator.schemas import REQUIRED_METADATA_COLUMNS + logger = logging.getLogger(__name__) # Timestamps at or above this value are treated as absolute Unix epoch (year 2000+). -_UNIX_EPOCH_THRESHOLD = 946_684_800.0 +UNIX_EPOCH_THRESHOLD = 946_684_800.0 # Upper bound for plausible Unix epoch timestamps (year 2100). -_UNIX_EPOCH_MAX = 4_102_444_800.0 +UNIX_EPOCH_MAX = 4_102_444_800.0 -# Required columns in custom_metadata.csv (minimum set for rejection). -_REQUIRED_METADATA_COLUMNS = ["episode_index", "episode_id"] +# Minimum columns required for the converter to function at all. +_MIN_REQUIRED_COLUMNS = ["episode_index", "episode_id"] @dataclass @@ -39,25 +41,23 @@ class Issue: """A single validation finding.""" level: str # "error" or "warning" - validator: str # e.g. "validate_tasks_format" + validator: str message: str def __str__(self) -> str: return f"[{self.level}] {self.validator}: {self.message}" + @staticmethod + def error(validator: str, message: str) -> "Issue": + return Issue(level="error", validator=validator, message=message) -# --------------------------------------------------------------------------- -# V1: validate_tasks_format -# --------------------------------------------------------------------------- + @staticmethod + def warning(validator: str, message: str) -> "Issue": + return Issue(level="warning", validator=validator, message=message) def validate_tasks_format(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: - """Check that meta/tasks.parquet exists; warn if only tasks.jsonl is present. - - - Error: neither tasks.parquet nor tasks.jsonl exists. - - Warning: tasks.jsonl exists but tasks.parquet does not (old format). - - Pass: tasks.parquet exists. - """ + """Check that meta/tasks.parquet exists; warn if only tasks.jsonl is present.""" root = _to_path(dataset_path) meta = root / "meta" issues: List[Issue] = [] @@ -67,84 +67,50 @@ def validate_tasks_format(dataset_path: Union[str, Path, CloudPath]) -> List[Iss if not has_parquet and not has_jsonl: issues.append( - Issue( - level="error", - validator="validate_tasks_format", - message=( - "meta/tasks.parquet not found. " - "LeRobot v3 datasets must include a tasks.parquet file." - ), + Issue.error( + "validate_tasks_format", + "meta/tasks.parquet not found. " + "LeRobot v3 datasets must include a tasks.parquet file.", ) ) elif has_jsonl and not has_parquet: issues.append( - Issue( - level="warning", - validator="validate_tasks_format", - message=( - "meta/tasks.parquet not found but meta/tasks.jsonl is present. " - "The converter will auto-convert, but you should migrate to " - "tasks.parquet before uploading." - ), + Issue.warning( + "validate_tasks_format", + "meta/tasks.parquet not found but meta/tasks.jsonl is present. " + "The converter will auto-convert, but you should migrate to " + "tasks.parquet before uploading.", ) ) return issues -# --------------------------------------------------------------------------- -# V2: validate_codebase_version -# --------------------------------------------------------------------------- - - def validate_codebase_version(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: - """Check that info.json contains codebase_version starting with 'v3.'. - - - Error: codebase_version is missing or does not start with 'v3.'. - """ + """Check that info.json contains codebase_version starting with 'v3.'.""" root = _to_path(dataset_path) issues: List[Issue] = [] info = _load_info(root) if info is None: - issues.append( - Issue( - level="error", - validator="validate_codebase_version", - message="meta/info.json not found or not valid JSON.", - ) - ) + issues.append(Issue.error("validate_codebase_version", "meta/info.json not found or not valid JSON.")) return issues version = info.get("codebase_version") if version is None: - issues.append( - Issue( - level="error", - validator="validate_codebase_version", - message="meta/info.json is missing 'codebase_version' field.", - ) - ) + issues.append(Issue.error("validate_codebase_version", "meta/info.json is missing 'codebase_version' field.")) elif not str(version).startswith("v3."): issues.append( - Issue( - level="error", - validator="validate_codebase_version", - message=( - f"codebase_version is '{version}' but must start with 'v3.'. " - "Only LeRobot v3 datasets are supported." - ), + Issue.error( + "validate_codebase_version", + f"codebase_version is '{version}' but must start with 'v3.'. " + "Only LeRobot v3 datasets are supported.", ) ) return issues -# --------------------------------------------------------------------------- -# V5: validate_feature_shapes -# --------------------------------------------------------------------------- - - def validate_feature_shapes(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: """Check feature shapes in info.json. @@ -169,50 +135,35 @@ def validate_feature_shapes(dataset_path: Union[str, Path, CloudPath]) -> List[I shape = defn.get("shape") dtype = defn.get("dtype", "") - # Reject 0-D shapes if isinstance(shape, list) and len(shape) == 0: issues.append( - Issue( - level="error", - validator="validate_feature_shapes", - message=( - f"Feature '{name}' has an empty shape (shape: []). " - "Scalar features should use shape: [1]." - ), + Issue.error( + "validate_feature_shapes", + f"Feature '{name}' has an empty shape (shape: []). " + "Scalar features should use shape: [1].", ) ) continue - # Image/video features must have exactly 3 dimensions (H, W, C) or (C, H, W) if dtype in ("video", "image") and isinstance(shape, list) and len(shape) != 3: issues.append( - Issue( - level="error", - validator="validate_feature_shapes", - message=( - f"Feature '{name}' (dtype='{dtype}') has shape {shape} " - f"but image/video features must have a 3-element shape " - f"(e.g. [H, W, C])." - ), + Issue.error( + "validate_feature_shapes", + f"Feature '{name}' (dtype='{dtype}') has shape {shape} " + f"but image/video features must have a 3-element shape " + f"(e.g. [H, W, C]).", ) ) return issues -# --------------------------------------------------------------------------- -# V7: validate_timestamps -# --------------------------------------------------------------------------- - - def validate_timestamps(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: """Check that data parquet timestamps are relative, not absolute Unix epoch. - Reads the first data parquet file and checks the first timestamp value. - - - Error: timestamps are absolute Unix epoch (>= 946684800.0). - - Warning: timestamps are not monotonically increasing within an episode. - - Warning: non-zero starting offset within an episode (> 1 second). + Reads only the first data parquet file. Checks the first timestamp to + determine if values are absolute, then samples the first episode for + monotonicity and starting offset. """ root = _to_path(dataset_path) issues: List[Issue] = [] @@ -225,7 +176,6 @@ def validate_timestamps(dataset_path: Union[str, Path, CloudPath]) -> List[Issue if not parquet_files: return issues - # Read the first data parquet file pf = parquet_files[0] try: df = pd.read_parquet(str(pf), columns=["timestamp", "episode_index"]) @@ -238,199 +188,137 @@ def validate_timestamps(dataset_path: Union[str, Path, CloudPath]) -> List[Issue if df.empty or "timestamp" not in df.columns: return issues - # Check for absolute Unix epoch timestamps first_ts = float(df["timestamp"].iloc[0]) - if first_ts >= _UNIX_EPOCH_THRESHOLD: + if first_ts >= UNIX_EPOCH_THRESHOLD: issues.append( - Issue( - level="error", - validator="validate_timestamps", - message=( - f"Timestamps appear to be absolute Unix epoch values " - f"(first value: {first_ts}). LeRobot v3 requires " - f"per-episode-relative timestamps starting near 0.0. " - f"Absolute timestamps cause video decode failures." - ), + Issue.error( + "validate_timestamps", + f"Timestamps appear to be absolute Unix epoch values " + f"(first value: {first_ts}). LeRobot v3 requires " + f"per-episode-relative timestamps starting near 0.0. " + f"Absolute timestamps cause video decode failures.", ) ) - return issues # No point checking monotonicity if timestamps are wrong type + return issues - # Check per-episode properties if episode_index is available + # Check only the first episode for monotonicity/offset (avoid processing entire file). if "episode_index" in df.columns: - for ep_idx, ep_df in df.groupby("episode_index"): - ts = ep_df["timestamp"].values + first_ep = df["episode_index"].iloc[0] + ep_df = df[df["episode_index"] == first_ep] + ts = ep_df["timestamp"].values - # Warn if starting offset > 1 second - if len(ts) > 0 and ts[0] > 1.0: - issues.append( - Issue( - level="warning", - validator="validate_timestamps", - message=( - f"Episode {ep_idx} starts at timestamp {ts[0]:.3f}s " - f"(expected near 0.0)." - ), - ) + if len(ts) > 0 and ts[0] > 1.0: + issues.append( + Issue.warning( + "validate_timestamps", + f"Episode {first_ep} starts at timestamp {ts[0]:.3f}s (expected near 0.0).", ) + ) - # Warn if not monotonically increasing - if len(ts) > 1: - diffs = ts[1:] - ts[:-1] - if (diffs < 0).any(): - issues.append( - Issue( - level="warning", - validator="validate_timestamps", - message=( - f"Episode {ep_idx} has non-monotonically " - f"increasing timestamps." - ), - ) + if len(ts) > 1: + diffs = ts[1:] - ts[:-1] + if (diffs < 0).any(): + issues.append( + Issue.warning( + "validate_timestamps", + f"Episode {first_ep} has non-monotonically increasing timestamps.", ) + ) return issues -# --------------------------------------------------------------------------- -# V11: validate_custom_metadata_csv -# --------------------------------------------------------------------------- - - -def validate_custom_metadata_csv(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: +def validate_custom_metadata_csv( + dataset_path: Union[str, Path, CloudPath], + _df_cache: Optional[Dict[str, pd.DataFrame]] = None, +) -> List[Issue]: """Check that meta/custom_metadata.csv exists and has required columns. - - Error: file missing. - - Error: required columns (episode_index, episode_id) absent. - - Error: null episode_id values. - - Error: duplicate episode_id values. - - Warning: other expected columns missing. + If _df_cache is provided, the loaded DataFrame is stored under "csv" so + downstream validators (e.g. validate_start_timestamp) can reuse it. """ root = _to_path(dataset_path) issues: List[Issue] = [] csv_path = root / "meta" / "custom_metadata.csv" if not csv_path.exists(): - issues.append( - Issue( - level="error", - validator="validate_custom_metadata_csv", - message="meta/custom_metadata.csv not found.", - ) - ) + issues.append(Issue.error("validate_custom_metadata_csv", "meta/custom_metadata.csv not found.")) return issues try: df = pd.read_csv(str(csv_path)) except Exception as exc: issues.append( - Issue( - level="error", - validator="validate_custom_metadata_csv", - message=f"Failed to read meta/custom_metadata.csv: {exc}", - ) + Issue.error("validate_custom_metadata_csv", f"Failed to read meta/custom_metadata.csv: {exc}") ) return issues - # Check required columns - missing_required = [c for c in _REQUIRED_METADATA_COLUMNS if c not in df.columns] + if _df_cache is not None: + _df_cache["csv"] = df + + missing_required = [c for c in _MIN_REQUIRED_COLUMNS if c not in df.columns] if missing_required: issues.append( - Issue( - level="error", - validator="validate_custom_metadata_csv", - message=( - f"meta/custom_metadata.csv is missing required columns: " - f"{missing_required}" - ), + Issue.error( + "validate_custom_metadata_csv", + f"meta/custom_metadata.csv is missing required columns: {missing_required}", ) ) - return issues # Cannot do further checks without required columns + return issues - # Check for null episode_id values null_ids = df[df["episode_id"].isna()] if len(null_ids) > 0: issues.append( - Issue( - level="error", - validator="validate_custom_metadata_csv", - message=( - f"episode_id has null values at rows: " - f"{null_ids.index.tolist()}" - ), + Issue.error( + "validate_custom_metadata_csv", + f"episode_id has null values at rows: {null_ids.index.tolist()}", ) ) - # Check for duplicate episode_id values duplicates = df[df["episode_id"].duplicated(keep=False)] if len(duplicates) > 0: dup_ids = duplicates["episode_id"].unique().tolist() issues.append( - Issue( - level="error", - validator="validate_custom_metadata_csv", - message=( - f"episode_id has duplicate values: {dup_ids}" - ), - ) + Issue.error("validate_custom_metadata_csv", f"episode_id has duplicate values: {dup_ids}") ) - # Warn about other expected columns that are missing - all_expected = [ - "episode_index", - "operator_id", - "is_eval_episode", - "episode_id", - "start_timestamp", - "checkpoint_path", - "success", - "station_id", - "robot_id", - ] - missing_optional = [ - c for c in all_expected if c not in df.columns and c not in _REQUIRED_METADATA_COLUMNS - ] + # Warn about expected columns from the full schema that are missing. + missing_optional = [c for c in REQUIRED_METADATA_COLUMNS if c not in df.columns and c not in _MIN_REQUIRED_COLUMNS] if missing_optional: issues.append( - Issue( - level="warning", - validator="validate_custom_metadata_csv", - message=( - f"meta/custom_metadata.csv is missing optional columns: " - f"{missing_optional}" - ), + Issue.warning( + "validate_custom_metadata_csv", + f"meta/custom_metadata.csv is missing optional columns: {missing_optional}", ) ) return issues -# --------------------------------------------------------------------------- -# V12: validate_start_timestamp -# --------------------------------------------------------------------------- - - -def validate_start_timestamp(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: +def validate_start_timestamp( + dataset_path: Union[str, Path, CloudPath], + _df_cache: Optional[Dict[str, pd.DataFrame]] = None, +) -> List[Issue]: """Check that start_timestamp values are plausible Unix epoch floats. - - Error: value is not a valid float. - - Error: value is below year-2000 threshold (likely relative, not absolute). - - Error: value is above year-2100 threshold. - - Error: value is null/missing. + Reuses the DataFrame from _df_cache if available (populated by + validate_custom_metadata_csv), avoiding a redundant CSV read. """ root = _to_path(dataset_path) issues: List[Issue] = [] - csv_path = root / "meta" / "custom_metadata.csv" - if not csv_path.exists(): - return issues # V11 already reports this - - try: - df = pd.read_csv(str(csv_path)) - except Exception: - return issues # V11 already reports this + df = _df_cache.get("csv") if _df_cache else None + if df is None: + csv_path = root / "meta" / "custom_metadata.csv" + if not csv_path.exists(): + return issues + try: + df = pd.read_csv(str(csv_path)) + except Exception: + return issues if "start_timestamp" not in df.columns: - return issues # V11 warns about missing columns + return issues invalid: List[str] = [] for idx, row in df.iterrows(): @@ -438,43 +326,33 @@ def validate_start_timestamp(dataset_path: Union[str, Path, CloudPath]) -> List[ episode_id = row.get("episode_id", f"row_{idx}") if pd.isna(ts): - invalid.append( - f" Row {idx} (episode '{episode_id}'): " - f"start_timestamp is missing/null" - ) + invalid.append(f" Row {idx} (episode '{episode_id}'): start_timestamp is missing/null") continue try: ts_float = float(ts) except (ValueError, TypeError): - invalid.append( - f" Row {idx} (episode '{episode_id}'): " - f"'{ts}' is not a valid float" - ) + invalid.append(f" Row {idx} (episode '{episode_id}'): '{ts}' is not a valid float") continue - if ts_float < _UNIX_EPOCH_THRESHOLD: + if ts_float < UNIX_EPOCH_THRESHOLD: invalid.append( f" Row {idx} (episode '{episode_id}'): " - f"{ts_float} is below year-2000 threshold ({_UNIX_EPOCH_THRESHOLD}); " + f"{ts_float} is below year-2000 threshold ({UNIX_EPOCH_THRESHOLD}); " f"likely a relative offset, not an absolute Unix timestamp" ) - elif ts_float > _UNIX_EPOCH_MAX: + elif ts_float > UNIX_EPOCH_MAX: invalid.append( f" Row {idx} (episode '{episode_id}'): " - f"{ts_float} is above year-2100 threshold ({_UNIX_EPOCH_MAX})" + f"{ts_float} is above year-2100 threshold ({UNIX_EPOCH_MAX})" ) if invalid: issues.append( - Issue( - level="error", - validator="validate_start_timestamp", - message=( - "start_timestamp must be a valid Unix epoch float " - f"(range {_UNIX_EPOCH_THRESHOLD} to {_UNIX_EPOCH_MAX}):\n" - + "\n".join(invalid) - ), + Issue.error( + "validate_start_timestamp", + "start_timestamp must be a valid Unix epoch float " + f"(range {UNIX_EPOCH_THRESHOLD} to {UNIX_EPOCH_MAX}):\n" + "\n".join(invalid), ) ) @@ -497,29 +375,31 @@ def validate_start_timestamp(dataset_path: Union[str, Path, CloudPath]) -> List[ def validate_v3_dataset( dataset_path: Union[str, Path, CloudPath], - thorough: bool = False, ) -> List[Issue]: """Run all P0 validators and return a combined list of issues. Args: dataset_path: Path to the lerobot dataset directory. - thorough: Reserved for future P2 checks that require video probing. Returns: A list of Issue objects (errors and warnings). """ all_issues: List[Issue] = [] + # Shared cache so V12 reuses the CSV loaded by V11. + df_cache: Dict[str, pd.DataFrame] = {} for validator_fn in _P0_VALIDATORS: try: - all_issues.extend(validator_fn(dataset_path)) + import inspect + + sig = inspect.signature(validator_fn) + if "_df_cache" in sig.parameters: + all_issues.extend(validator_fn(dataset_path, _df_cache=df_cache)) + else: + all_issues.extend(validator_fn(dataset_path)) except Exception as exc: logger.warning("Validator %s raised: %s", validator_fn.__name__, exc) all_issues.append( - Issue( - level="error", - validator=validator_fn.__name__, - message=f"Validator raised an unexpected exception: {exc}", - ) + Issue.error(validator_fn.__name__, f"Validator raised an unexpected exception: {exc}") ) return all_issues From 9a31100a7e49e28e0c4af01740bd241d559c82f1 Mon Sep 17 00:00:00 2001 From: Chandra Kuchi Date: Mon, 23 Mar 2026 22:33:30 +0000 Subject: [PATCH 6/6] feat: add video frame count and feature dtype validators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit V13: validate_video_frame_count — samples up to 5 episodes and compares ffprobe frame counts against data parquet row counts. >5% dropped frames triggers a warning, >20% triggers an error. This catches the root cause of the tolerance_s hack in the monopi featurizer. V14: validate_feature_dtypes — warns about string-typed features (e.g. instruction.text) that cannot be tensor-stacked and require special handling during featurization. Co-Authored-By: Claude Opus 4.6 (1M context) --- lerobot_validator/v3_checks.py | 174 +++++++++++++++++++++++++++++++++ tests/test_v3_checks.py | 92 +++++++++++++++++ 2 files changed, 266 insertions(+) diff --git a/lerobot_validator/v3_checks.py b/lerobot_validator/v3_checks.py index f41e15e..b4b572e 100644 --- a/lerobot_validator/v3_checks.py +++ b/lerobot_validator/v3_checks.py @@ -11,14 +11,18 @@ V7: validate_timestamps -- reject absolute Unix epoch timestamps in data parquet V11: validate_custom_metadata_csv -- required columns, no null/duplicate episode_ids V12: validate_start_timestamp -- start_timestamp must be plausible Unix epoch floats + V13: validate_video_frame_count -- video frame counts must match data parquet row counts + V14: validate_feature_dtypes -- warn about string-typed features that need special handling """ import json import logging +import subprocess from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Union +import numpy as np import pandas as pd from cloudpathlib import AnyPath, CloudPath @@ -359,6 +363,150 @@ def validate_start_timestamp( return issues +def validate_video_frame_count(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that video files have roughly the expected number of frames. + + Compares the frame count reported by ffprobe against the number of rows + in the data parquet for each episode. Excessive dropped frames (>5% + missing) trigger a warning; >20% missing triggers an error. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + return issues + + video_path_tpl = info.get("video_path") + if video_path_tpl is None: + return issues + + video_keys = [ + name + for name, defn in info.get("features", {}).items() + if isinstance(defn, dict) and defn.get("dtype") in ("video", "image") + ] + if not video_keys: + return issues + + data_dir = root / "data" + if not data_dir.exists(): + return issues + + # Count expected frames per episode from data parquet. + episode_frame_counts: Dict[int, int] = {} + for pf in sorted(data_dir.glob("**/*.parquet")): + try: + df = pd.read_parquet(str(pf), columns=["episode_index"]) + except Exception: + continue + for ep_idx, count in df["episode_index"].value_counts().items(): + episode_frame_counts[int(ep_idx)] = episode_frame_counts.get(int(ep_idx), 0) + int(count) + + if not episode_frame_counts: + return issues + + chunks_size = info.get("chunks_size", 1000) + checked = 0 + problems: List[str] = [] + + # Sample up to 5 episodes to avoid checking every video. + sample_episodes = sorted(episode_frame_counts.keys())[:5] + for ep_idx in sample_episodes: + expected_frames = episode_frame_counts[ep_idx] + ep_chunk = ep_idx // chunks_size + + for vkey in video_keys[:1]: # Check first video key only. + try: + rendered = video_path_tpl.format( + episode_chunk=ep_chunk, + episode_index=ep_idx, + video_key=vkey, + ) + except KeyError: + continue + + video_file = root / rendered + if not video_file.exists(): + continue + + actual_frames = _probe_frame_count(str(video_file)) + if actual_frames is None: + continue + + checked += 1 + if actual_frames == 0: + problems.append( + f" Episode {ep_idx} ({vkey}): video has 0 frames (expected {expected_frames})" + ) + continue + + drop_rate = 1.0 - (actual_frames / expected_frames) + if drop_rate > 0.20: + problems.append( + f" Episode {ep_idx} ({vkey}): {actual_frames}/{expected_frames} frames " + f"({drop_rate:.0%} dropped)" + ) + elif drop_rate > 0.05: + issues.append( + Issue.warning( + "validate_video_frame_count", + f"Episode {ep_idx} ({vkey}): {actual_frames}/{expected_frames} frames " + f"({drop_rate:.0%} dropped). Minor frame drops may cause seek errors.", + ) + ) + + if problems: + issues.append( + Issue.error( + "validate_video_frame_count", + f"Video files have excessive dropped frames (>20% missing) " + f"in {len(problems)} of {checked} checked episodes:\n" + "\n".join(problems), + ) + ) + + return issues + + +def validate_feature_dtypes(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Warn about feature dtypes that require special handling downstream. + + String-typed features (e.g. ``instruction.text``) cannot be stacked into + tensors by LeRobot's ``__getitem__``. The featurizer extracts them into + episode metadata and drops them from the HuggingFace dataset, but partners + should be aware these features receive special treatment. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + return issues + + features = info.get("features", {}) + if not isinstance(features, dict): + return issues + + string_features = [ + name + for name, defn in features.items() + if isinstance(defn, dict) and defn.get("dtype") == "string" + ] + + if string_features: + issues.append( + Issue.warning( + "validate_feature_dtypes", + f"String-typed features found: {string_features}. " + f"These cannot be stacked into tensors and will be extracted " + f"into episode metadata during featurization. Ensure this is " + f"intentional.", + ) + ) + + return issues + + # --------------------------------------------------------------------------- # Convenience: run all P0 validators # --------------------------------------------------------------------------- @@ -370,6 +518,8 @@ def validate_start_timestamp( validate_timestamps, validate_custom_metadata_csv, validate_start_timestamp, + validate_video_frame_count, + validate_feature_dtypes, ] @@ -416,6 +566,30 @@ def _to_path(dataset_path: Union[str, Path, CloudPath]) -> Any: return dataset_path +def _probe_frame_count(video_path: str) -> Optional[int]: + """Use ffprobe to count frames in a video file. Returns None on failure.""" + try: + result = subprocess.run( + [ + "ffprobe", + "-v", "error", + "-select_streams", "v:0", + "-count_frames", + "-show_entries", "stream=nb_read_frames", + "-of", "csv=p=0", + video_path, + ], + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + return None + return int(result.stdout.strip()) + except (subprocess.TimeoutExpired, ValueError, FileNotFoundError): + return None + + def _load_info(root: Any) -> Optional[Dict[str, Any]]: """Load meta/info.json and return the parsed dict, or None on failure.""" info_file = root / "meta" / "info.json" diff --git a/tests/test_v3_checks.py b/tests/test_v3_checks.py index eea8b8b..dec9228 100644 --- a/tests/test_v3_checks.py +++ b/tests/test_v3_checks.py @@ -11,11 +11,13 @@ Issue, validate_codebase_version, validate_custom_metadata_csv, + validate_feature_dtypes, validate_feature_shapes, validate_start_timestamp, validate_tasks_format, validate_timestamps, validate_v3_dataset, + validate_video_frame_count, ) @@ -613,3 +615,93 @@ def test_multiple_issues_collected(self): def test_issue_str_representation(self): issue = Issue(level="error", validator="test_validator", message="test message") assert str(issue) == "[error] test_validator: test message" + + +# =================================================================== +# V13: validate_video_frame_count +# =================================================================== + + +class TestValidateVideoFrameCount: + def test_no_info_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + issues = validate_video_frame_count(root) + assert len(issues) == 0 + + def test_no_video_features_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + # Replace video feature with a non-video one. + info["features"] = {"action": {"dtype": "float32", "shape": [7]}} + _write_info(root, info) + + issues = validate_video_frame_count(root) + assert len(issues) == 0 + + def test_no_data_dir_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + # Remove data dir. + (root / "data").rmdir() + + issues = validate_video_frame_count(root) + assert len(issues) == 0 + + def test_no_parquet_files_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + # data/ exists but is empty. + issues = validate_video_frame_count(root) + assert len(issues) == 0 + + +# =================================================================== +# V14: validate_feature_dtypes +# =================================================================== + + +class TestValidateFeatureDtypes: + def test_no_string_features_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + issues = validate_feature_dtypes(root) + assert len(issues) == 0 + + def test_string_feature_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["instruction.text"] = {"dtype": "string", "shape": [1]} + _write_info(root, info) + + issues = validate_feature_dtypes(root) + warnings = _warnings(issues) + assert len(warnings) == 1 + assert "instruction.text" in warnings[0].message + assert "string" in warnings[0].message.lower() + + def test_multiple_string_features_single_warning(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["instruction.text"] = {"dtype": "string", "shape": [1]} + info["features"]["observation.meta.tool"] = {"dtype": "string", "shape": [1]} + _write_info(root, info) + + issues = validate_feature_dtypes(root) + warnings = _warnings(issues) + assert len(warnings) == 1 + assert "instruction.text" in warnings[0].message + assert "observation.meta.tool" in warnings[0].message + + def test_no_info_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + issues = validate_feature_dtypes(root) + assert len(issues) == 0