diff --git a/lerobot_validator/__init__.py b/lerobot_validator/__init__.py index a1e09c3..3281f1a 100644 --- a/lerobot_validator/__init__.py +++ b/lerobot_validator/__init__.py @@ -9,6 +9,14 @@ from lerobot_validator.validator import LerobotDatasetValidator from lerobot_validator.gcp_path import compute_gcp_path +from lerobot_validator.v3_metadata_checker import LerobotV3MetadataChecker +from lerobot_validator.v3_checks import Issue, validate_v3_dataset -__all__ = ["LerobotDatasetValidator", "compute_gcp_path"] +__all__ = [ + "LerobotDatasetValidator", + "compute_gcp_path", + "LerobotV3MetadataChecker", + "Issue", + "validate_v3_dataset", +] diff --git a/lerobot_validator/metadata_validator.py b/lerobot_validator/metadata_validator.py index 337b53d..c23f467 100644 --- a/lerobot_validator/metadata_validator.py +++ b/lerobot_validator/metadata_validator.py @@ -139,8 +139,14 @@ def _check_start_timestamp_format(self) -> None: for idx, row in self.df.iterrows(): timestamp = row.get("start_timestamp") - # Skip if missing (will be caught by required columns check) + # Null timestamps are not allowed — every episode must have a + # valid collection start time. if pd.isna(timestamp): + episode_id = row.get("episode_id", f"row_{idx}") + invalid_timestamps.append( + (idx, episode_id, timestamp, + "start_timestamp is missing/null (every episode requires a valid timestamp)") + ) continue # Try to convert to float (epoch time should be numeric) diff --git a/lerobot_validator/v3_checks.py b/lerobot_validator/v3_checks.py new file mode 100644 index 0000000..b4b572e --- /dev/null +++ b/lerobot_validator/v3_checks.py @@ -0,0 +1,602 @@ +""" +P0 validators for LeRobot v3 datasets. + +Each validator function takes a dataset path and returns a list of Issue objects. +Issues have a level ("error" or "warning") and a descriptive message. + +Validators: + V1: validate_tasks_format -- meta/tasks.parquet vs tasks.jsonl + V2: validate_codebase_version -- info.json codebase_version starts with "v3." + V5: validate_feature_shapes -- reject shape=[], image features need 3-element shape + V7: validate_timestamps -- reject absolute Unix epoch timestamps in data parquet + V11: validate_custom_metadata_csv -- required columns, no null/duplicate episode_ids + V12: validate_start_timestamp -- start_timestamp must be plausible Unix epoch floats + V13: validate_video_frame_count -- video frame counts must match data parquet row counts + V14: validate_feature_dtypes -- warn about string-typed features that need special handling +""" + +import json +import logging +import subprocess +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional, Union + +import numpy as np +import pandas as pd +from cloudpathlib import AnyPath, CloudPath + +from lerobot_validator.schemas import REQUIRED_METADATA_COLUMNS + +logger = logging.getLogger(__name__) + +# Timestamps at or above this value are treated as absolute Unix epoch (year 2000+). +UNIX_EPOCH_THRESHOLD = 946_684_800.0 + +# Upper bound for plausible Unix epoch timestamps (year 2100). +UNIX_EPOCH_MAX = 4_102_444_800.0 + +# Minimum columns required for the converter to function at all. +_MIN_REQUIRED_COLUMNS = ["episode_index", "episode_id"] + + +@dataclass +class Issue: + """A single validation finding.""" + + level: str # "error" or "warning" + validator: str + message: str + + def __str__(self) -> str: + return f"[{self.level}] {self.validator}: {self.message}" + + @staticmethod + def error(validator: str, message: str) -> "Issue": + return Issue(level="error", validator=validator, message=message) + + @staticmethod + def warning(validator: str, message: str) -> "Issue": + return Issue(level="warning", validator=validator, message=message) + + +def validate_tasks_format(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that meta/tasks.parquet exists; warn if only tasks.jsonl is present.""" + root = _to_path(dataset_path) + meta = root / "meta" + issues: List[Issue] = [] + + has_parquet = (meta / "tasks.parquet").exists() + has_jsonl = (meta / "tasks.jsonl").exists() + + if not has_parquet and not has_jsonl: + issues.append( + Issue.error( + "validate_tasks_format", + "meta/tasks.parquet not found. " + "LeRobot v3 datasets must include a tasks.parquet file.", + ) + ) + elif has_jsonl and not has_parquet: + issues.append( + Issue.warning( + "validate_tasks_format", + "meta/tasks.parquet not found but meta/tasks.jsonl is present. " + "The converter will auto-convert, but you should migrate to " + "tasks.parquet before uploading.", + ) + ) + + return issues + + +def validate_codebase_version(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that info.json contains codebase_version starting with 'v3.'.""" + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + issues.append(Issue.error("validate_codebase_version", "meta/info.json not found or not valid JSON.")) + return issues + + version = info.get("codebase_version") + if version is None: + issues.append(Issue.error("validate_codebase_version", "meta/info.json is missing 'codebase_version' field.")) + elif not str(version).startswith("v3."): + issues.append( + Issue.error( + "validate_codebase_version", + f"codebase_version is '{version}' but must start with 'v3.'. " + "Only LeRobot v3 datasets are supported.", + ) + ) + + return issues + + +def validate_feature_shapes(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check feature shapes in info.json. + + - Error: a feature has shape=[] (zero-dimensional). + - Error: an image/video feature does not have a 3-element shape. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + return issues + + features = info.get("features", {}) + if not isinstance(features, dict): + return issues + + for name, defn in features.items(): + if not isinstance(defn, dict): + continue + + shape = defn.get("shape") + dtype = defn.get("dtype", "") + + if isinstance(shape, list) and len(shape) == 0: + issues.append( + Issue.error( + "validate_feature_shapes", + f"Feature '{name}' has an empty shape (shape: []). " + "Scalar features should use shape: [1].", + ) + ) + continue + + if dtype in ("video", "image") and isinstance(shape, list) and len(shape) != 3: + issues.append( + Issue.error( + "validate_feature_shapes", + f"Feature '{name}' (dtype='{dtype}') has shape {shape} " + f"but image/video features must have a 3-element shape " + f"(e.g. [H, W, C]).", + ) + ) + + return issues + + +def validate_timestamps(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that data parquet timestamps are relative, not absolute Unix epoch. + + Reads only the first data parquet file. Checks the first timestamp to + determine if values are absolute, then samples the first episode for + monotonicity and starting offset. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + data_dir = root / "data" + if not data_dir.exists(): + return issues + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return issues + + pf = parquet_files[0] + try: + df = pd.read_parquet(str(pf), columns=["timestamp", "episode_index"]) + except Exception: + try: + df = pd.read_parquet(str(pf), columns=["timestamp"]) + except Exception: + return issues + + if df.empty or "timestamp" not in df.columns: + return issues + + first_ts = float(df["timestamp"].iloc[0]) + if first_ts >= UNIX_EPOCH_THRESHOLD: + issues.append( + Issue.error( + "validate_timestamps", + f"Timestamps appear to be absolute Unix epoch values " + f"(first value: {first_ts}). LeRobot v3 requires " + f"per-episode-relative timestamps starting near 0.0. " + f"Absolute timestamps cause video decode failures.", + ) + ) + return issues + + # Check only the first episode for monotonicity/offset (avoid processing entire file). + if "episode_index" in df.columns: + first_ep = df["episode_index"].iloc[0] + ep_df = df[df["episode_index"] == first_ep] + ts = ep_df["timestamp"].values + + if len(ts) > 0 and ts[0] > 1.0: + issues.append( + Issue.warning( + "validate_timestamps", + f"Episode {first_ep} starts at timestamp {ts[0]:.3f}s (expected near 0.0).", + ) + ) + + if len(ts) > 1: + diffs = ts[1:] - ts[:-1] + if (diffs < 0).any(): + issues.append( + Issue.warning( + "validate_timestamps", + f"Episode {first_ep} has non-monotonically increasing timestamps.", + ) + ) + + return issues + + +def validate_custom_metadata_csv( + dataset_path: Union[str, Path, CloudPath], + _df_cache: Optional[Dict[str, pd.DataFrame]] = None, +) -> List[Issue]: + """Check that meta/custom_metadata.csv exists and has required columns. + + If _df_cache is provided, the loaded DataFrame is stored under "csv" so + downstream validators (e.g. validate_start_timestamp) can reuse it. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + csv_path = root / "meta" / "custom_metadata.csv" + if not csv_path.exists(): + issues.append(Issue.error("validate_custom_metadata_csv", "meta/custom_metadata.csv not found.")) + return issues + + try: + df = pd.read_csv(str(csv_path)) + except Exception as exc: + issues.append( + Issue.error("validate_custom_metadata_csv", f"Failed to read meta/custom_metadata.csv: {exc}") + ) + return issues + + if _df_cache is not None: + _df_cache["csv"] = df + + missing_required = [c for c in _MIN_REQUIRED_COLUMNS if c not in df.columns] + if missing_required: + issues.append( + Issue.error( + "validate_custom_metadata_csv", + f"meta/custom_metadata.csv is missing required columns: {missing_required}", + ) + ) + return issues + + null_ids = df[df["episode_id"].isna()] + if len(null_ids) > 0: + issues.append( + Issue.error( + "validate_custom_metadata_csv", + f"episode_id has null values at rows: {null_ids.index.tolist()}", + ) + ) + + duplicates = df[df["episode_id"].duplicated(keep=False)] + if len(duplicates) > 0: + dup_ids = duplicates["episode_id"].unique().tolist() + issues.append( + Issue.error("validate_custom_metadata_csv", f"episode_id has duplicate values: {dup_ids}") + ) + + # Warn about expected columns from the full schema that are missing. + missing_optional = [c for c in REQUIRED_METADATA_COLUMNS if c not in df.columns and c not in _MIN_REQUIRED_COLUMNS] + if missing_optional: + issues.append( + Issue.warning( + "validate_custom_metadata_csv", + f"meta/custom_metadata.csv is missing optional columns: {missing_optional}", + ) + ) + + return issues + + +def validate_start_timestamp( + dataset_path: Union[str, Path, CloudPath], + _df_cache: Optional[Dict[str, pd.DataFrame]] = None, +) -> List[Issue]: + """Check that start_timestamp values are plausible Unix epoch floats. + + Reuses the DataFrame from _df_cache if available (populated by + validate_custom_metadata_csv), avoiding a redundant CSV read. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + + df = _df_cache.get("csv") if _df_cache else None + if df is None: + csv_path = root / "meta" / "custom_metadata.csv" + if not csv_path.exists(): + return issues + try: + df = pd.read_csv(str(csv_path)) + except Exception: + return issues + + if "start_timestamp" not in df.columns: + return issues + + invalid: List[str] = [] + for idx, row in df.iterrows(): + ts = row.get("start_timestamp") + episode_id = row.get("episode_id", f"row_{idx}") + + if pd.isna(ts): + invalid.append(f" Row {idx} (episode '{episode_id}'): start_timestamp is missing/null") + continue + + try: + ts_float = float(ts) + except (ValueError, TypeError): + invalid.append(f" Row {idx} (episode '{episode_id}'): '{ts}' is not a valid float") + continue + + if ts_float < UNIX_EPOCH_THRESHOLD: + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"{ts_float} is below year-2000 threshold ({UNIX_EPOCH_THRESHOLD}); " + f"likely a relative offset, not an absolute Unix timestamp" + ) + elif ts_float > UNIX_EPOCH_MAX: + invalid.append( + f" Row {idx} (episode '{episode_id}'): " + f"{ts_float} is above year-2100 threshold ({UNIX_EPOCH_MAX})" + ) + + if invalid: + issues.append( + Issue.error( + "validate_start_timestamp", + "start_timestamp must be a valid Unix epoch float " + f"(range {UNIX_EPOCH_THRESHOLD} to {UNIX_EPOCH_MAX}):\n" + "\n".join(invalid), + ) + ) + + return issues + + +def validate_video_frame_count(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Check that video files have roughly the expected number of frames. + + Compares the frame count reported by ffprobe against the number of rows + in the data parquet for each episode. Excessive dropped frames (>5% + missing) trigger a warning; >20% missing triggers an error. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + return issues + + video_path_tpl = info.get("video_path") + if video_path_tpl is None: + return issues + + video_keys = [ + name + for name, defn in info.get("features", {}).items() + if isinstance(defn, dict) and defn.get("dtype") in ("video", "image") + ] + if not video_keys: + return issues + + data_dir = root / "data" + if not data_dir.exists(): + return issues + + # Count expected frames per episode from data parquet. + episode_frame_counts: Dict[int, int] = {} + for pf in sorted(data_dir.glob("**/*.parquet")): + try: + df = pd.read_parquet(str(pf), columns=["episode_index"]) + except Exception: + continue + for ep_idx, count in df["episode_index"].value_counts().items(): + episode_frame_counts[int(ep_idx)] = episode_frame_counts.get(int(ep_idx), 0) + int(count) + + if not episode_frame_counts: + return issues + + chunks_size = info.get("chunks_size", 1000) + checked = 0 + problems: List[str] = [] + + # Sample up to 5 episodes to avoid checking every video. + sample_episodes = sorted(episode_frame_counts.keys())[:5] + for ep_idx in sample_episodes: + expected_frames = episode_frame_counts[ep_idx] + ep_chunk = ep_idx // chunks_size + + for vkey in video_keys[:1]: # Check first video key only. + try: + rendered = video_path_tpl.format( + episode_chunk=ep_chunk, + episode_index=ep_idx, + video_key=vkey, + ) + except KeyError: + continue + + video_file = root / rendered + if not video_file.exists(): + continue + + actual_frames = _probe_frame_count(str(video_file)) + if actual_frames is None: + continue + + checked += 1 + if actual_frames == 0: + problems.append( + f" Episode {ep_idx} ({vkey}): video has 0 frames (expected {expected_frames})" + ) + continue + + drop_rate = 1.0 - (actual_frames / expected_frames) + if drop_rate > 0.20: + problems.append( + f" Episode {ep_idx} ({vkey}): {actual_frames}/{expected_frames} frames " + f"({drop_rate:.0%} dropped)" + ) + elif drop_rate > 0.05: + issues.append( + Issue.warning( + "validate_video_frame_count", + f"Episode {ep_idx} ({vkey}): {actual_frames}/{expected_frames} frames " + f"({drop_rate:.0%} dropped). Minor frame drops may cause seek errors.", + ) + ) + + if problems: + issues.append( + Issue.error( + "validate_video_frame_count", + f"Video files have excessive dropped frames (>20% missing) " + f"in {len(problems)} of {checked} checked episodes:\n" + "\n".join(problems), + ) + ) + + return issues + + +def validate_feature_dtypes(dataset_path: Union[str, Path, CloudPath]) -> List[Issue]: + """Warn about feature dtypes that require special handling downstream. + + String-typed features (e.g. ``instruction.text``) cannot be stacked into + tensors by LeRobot's ``__getitem__``. The featurizer extracts them into + episode metadata and drops them from the HuggingFace dataset, but partners + should be aware these features receive special treatment. + """ + root = _to_path(dataset_path) + issues: List[Issue] = [] + info = _load_info(root) + + if info is None: + return issues + + features = info.get("features", {}) + if not isinstance(features, dict): + return issues + + string_features = [ + name + for name, defn in features.items() + if isinstance(defn, dict) and defn.get("dtype") == "string" + ] + + if string_features: + issues.append( + Issue.warning( + "validate_feature_dtypes", + f"String-typed features found: {string_features}. " + f"These cannot be stacked into tensors and will be extracted " + f"into episode metadata during featurization. Ensure this is " + f"intentional.", + ) + ) + + return issues + + +# --------------------------------------------------------------------------- +# Convenience: run all P0 validators +# --------------------------------------------------------------------------- + +_P0_VALIDATORS = [ + validate_tasks_format, + validate_codebase_version, + validate_feature_shapes, + validate_timestamps, + validate_custom_metadata_csv, + validate_start_timestamp, + validate_video_frame_count, + validate_feature_dtypes, +] + + +def validate_v3_dataset( + dataset_path: Union[str, Path, CloudPath], +) -> List[Issue]: + """Run all P0 validators and return a combined list of issues. + + Args: + dataset_path: Path to the lerobot dataset directory. + + Returns: + A list of Issue objects (errors and warnings). + """ + all_issues: List[Issue] = [] + # Shared cache so V12 reuses the CSV loaded by V11. + df_cache: Dict[str, pd.DataFrame] = {} + for validator_fn in _P0_VALIDATORS: + try: + import inspect + + sig = inspect.signature(validator_fn) + if "_df_cache" in sig.parameters: + all_issues.extend(validator_fn(dataset_path, _df_cache=df_cache)) + else: + all_issues.extend(validator_fn(dataset_path)) + except Exception as exc: + logger.warning("Validator %s raised: %s", validator_fn.__name__, exc) + all_issues.append( + Issue.error(validator_fn.__name__, f"Validator raised an unexpected exception: {exc}") + ) + return all_issues + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _to_path(dataset_path: Union[str, Path, CloudPath]) -> Any: + """Convert a string or Path to an AnyPath.""" + if isinstance(dataset_path, str): + return AnyPath(dataset_path) + return dataset_path + + +def _probe_frame_count(video_path: str) -> Optional[int]: + """Use ffprobe to count frames in a video file. Returns None on failure.""" + try: + result = subprocess.run( + [ + "ffprobe", + "-v", "error", + "-select_streams", "v:0", + "-count_frames", + "-show_entries", "stream=nb_read_frames", + "-of", "csv=p=0", + video_path, + ], + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + return None + return int(result.stdout.strip()) + except (subprocess.TimeoutExpired, ValueError, FileNotFoundError): + return None + + +def _load_info(root: Any) -> Optional[Dict[str, Any]]: + """Load meta/info.json and return the parsed dict, or None on failure.""" + info_file = root / "meta" / "info.json" + if not info_file.exists(): + return None + try: + with info_file.open("r") as f: + return json.load(f) + except Exception: + return None diff --git a/lerobot_validator/v3_metadata_checker.py b/lerobot_validator/v3_metadata_checker.py new file mode 100644 index 0000000..4128724 --- /dev/null +++ b/lerobot_validator/v3_metadata_checker.py @@ -0,0 +1,512 @@ +""" +Validator for LeRobot v3 dataset metadata integrity. + +Checks that the dataset conforms to the LeRobot v3 specification: + 1. tasks.parquet exists (flags if only tasks.jsonl is present) + 2. Episodes parquet has required columns + 3. Feature shapes in info.json are non-empty + 4. File path templates in info.json use standard placeholders + 5. Video files referenced by the dataset exist + 6. Timestamp consistency across data parquet files + 7. Episode row contiguity in data parquet files + 8. Data parquet files must not contain video struct columns + 9. Episodes parquet must include per-video-key metadata columns +""" + +import json +import logging +import re +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Union + +import pandas as pd +from cloudpathlib import AnyPath, CloudPath + +logger = logging.getLogger(__name__) + +# Columns that must exist in every episodes parquet file. +REQUIRED_EPISODES_COLUMNS: List[str] = [ + "data/chunk_index", + "data/file_index", + "tasks", +] + +# Standard placeholders expected in info.json path templates. +REQUIRED_DATA_PATH_PLACEHOLDERS: Set[str] = { + "{episode_chunk", + "{episode_index", +} +REQUIRED_VIDEO_PATH_PLACEHOLDERS: Set[str] = { + "{episode_chunk", + "{episode_index", + "{video_key", +} + +# Timestamps below this threshold are treated as relative (seconds from +# episode start). Values above are treated as absolute Unix timestamps. +# No robot episode is longer than ~11.5 days (1e6 seconds). +_ABSOLUTE_TIMESTAMP_THRESHOLD = 1_000_000.0 + + +class LerobotV3MetadataChecker: + """ + Validates structural metadata for LeRobot v3 datasets. + + Usage:: + + checker = LerobotV3MetadataChecker(dataset_path) + passed = checker.validate() + errors = checker.get_errors() + """ + + def __init__(self, dataset_path: Union[str, Path, CloudPath]) -> None: + if isinstance(dataset_path, str): + self.dataset_path = AnyPath(dataset_path) + else: + self.dataset_path = dataset_path + self.errors: List[str] = [] + self._info: Optional[Dict[str, Any]] = None + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def validate(self) -> bool: + """Run all v3 metadata checks. Returns True when all pass.""" + self.errors = [] + self._info = None + + if not self.dataset_path.exists(): + self.errors.append( + f"Dataset directory not found: {self.dataset_path}" + ) + return False + + self._load_info() + + self._check_tasks_parquet() + self._check_episodes_parquet_columns() + self._check_feature_shapes() + self._check_path_templates() + self._check_video_files_exist() + self._check_timestamp_consistency() + self._check_episode_contiguity() + self._check_no_video_columns_in_data_parquet() + self._check_episode_video_metadata_columns() + + return len(self.errors) == 0 + + def get_errors(self) -> List[str]: + """Return accumulated error messages.""" + return self.errors + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _meta_dir(self) -> Any: + return self.dataset_path / "meta" + + def _data_dir(self) -> Any: + return self.dataset_path / "data" + + def _load_info(self) -> None: + info_file = self._meta_dir() / "info.json" + if not info_file.exists(): + return # Already reported by LerobotDatasetChecker + try: + with info_file.open("r") as f: + self._info = json.load(f) + except json.JSONDecodeError as exc: + self.errors.append(f"meta/info.json is not valid JSON: {exc}") + except Exception as exc: + self.errors.append(f"Failed to read meta/info.json: {exc}") + + def _get_video_keys(self) -> List[str]: + """Return feature keys whose dtype is video or image.""" + if self._info is None: + return [] + features = self._info.get("features", {}) + return [ + name + for name, defn in features.items() + if isinstance(defn, dict) + and defn.get("dtype") in ("video", "image") + ] + + # ------------------------------------------------------------------ + # Check 1: tasks.parquet + # ------------------------------------------------------------------ + + def _check_tasks_parquet(self) -> None: + meta = self._meta_dir() + has_parquet = (meta / "tasks.parquet").exists() + has_jsonl = (meta / "tasks.jsonl").exists() + + if has_jsonl and not has_parquet: + self.errors.append( + "meta/tasks.parquet not found but meta/tasks.jsonl is present. " + "LeRobot v3 requires tasks.parquet — convert tasks.jsonl " + "to parquet before uploading." + ) + elif not has_parquet and not has_jsonl: + self.errors.append( + "meta/tasks.parquet not found. " + "LeRobot v3 datasets must include a tasks.parquet file." + ) + + # ------------------------------------------------------------------ + # Check 2: episodes parquet columns + # ------------------------------------------------------------------ + + def _check_episodes_parquet_columns(self) -> None: + episodes_file = self._meta_dir() / "episodes.parquet" + if not episodes_file.exists(): + self.errors.append( + "meta/episodes.parquet not found. " + "LeRobot v3 datasets must include an episodes.parquet file." + ) + return + + try: + df = pd.read_parquet(str(episodes_file)) + except Exception as exc: + self.errors.append( + f"Failed to read meta/episodes.parquet: {exc}" + ) + return + + missing = [ + col for col in REQUIRED_EPISODES_COLUMNS if col not in df.columns + ] + if missing: + self.errors.append( + f"meta/episodes.parquet is missing required columns: {missing}. " + f"Present columns: {sorted(df.columns.tolist())}" + ) + + # ------------------------------------------------------------------ + # Check 3: feature shapes + # ------------------------------------------------------------------ + + def _check_feature_shapes(self) -> None: + if self._info is None: + return + features = self._info.get("features", {}) + if not isinstance(features, dict): + return + + for name, defn in features.items(): + if not isinstance(defn, dict): + continue + shape = defn.get("shape") + if isinstance(shape, list) and len(shape) == 0: + self.errors.append( + f"Feature '{name}' has an empty shape (shape: []). " + "Scalar features should use shape: [1]." + ) + + # ------------------------------------------------------------------ + # Check 4: path templates + # ------------------------------------------------------------------ + + def _check_path_templates(self) -> None: + if self._info is None: + return + + data_path = self._info.get("data_path") + if data_path is not None: + self._validate_template( + str(data_path), + "data_path", + REQUIRED_DATA_PATH_PLACEHOLDERS, + ) + + video_path = self._info.get("video_path") + if video_path is not None: + self._validate_template( + str(video_path), + "video_path", + REQUIRED_VIDEO_PATH_PLACEHOLDERS, + ) + + def _validate_template( + self, + template: str, + field: str, + required: Set[str], + ) -> None: + # Extract placeholder stems (everything before the first colon or }) + found_stems = set() + for match in re.finditer(r"\{([^}:]+)", template): + found_stems.add("{" + match.group(1)) + + missing = required - found_stems + if missing: + self.errors.append( + f"info.json '{field}' template is missing required " + f"placeholders {sorted(missing)}. " + f"Template: '{template}'" + ) + + # ------------------------------------------------------------------ + # Check 5: video files exist + # ------------------------------------------------------------------ + + def _check_video_files_exist(self) -> None: + if self._info is None: + return + + video_path_tpl = self._info.get("video_path") + if video_path_tpl is None: + return + + video_keys = self._get_video_keys() + if not video_keys: + return + + episodes_file = self._meta_dir() / "episodes.parquet" + if not episodes_file.exists(): + return # Already flagged in check 2 + + try: + episodes_df = pd.read_parquet(str(episodes_file)) + except Exception: + return + + if "episode_index" not in episodes_df.columns: + return + + chunks_size = self._info.get("chunks_size", 1000) + missing_files: List[str] = [] + + for _, row in episodes_df.iterrows(): + ep_idx = int(row["episode_index"]) + ep_chunk = int( + row.get("data/chunk_index", ep_idx // chunks_size) + ) + + for vkey in video_keys: + try: + rendered = video_path_tpl.format( + episode_chunk=ep_chunk, + episode_index=ep_idx, + video_key=vkey, + ) + except KeyError: + continue # Template uses non-standard placeholders + + if not (self.dataset_path / rendered).exists(): + missing_files.append(rendered) + + if len(missing_files) > 10: + missing_files.append("... (truncated)") + break + + if missing_files: + self.errors.append( + f"Missing video files ({len(missing_files)} not found):\n" + + "\n".join(f" {p}" for p in missing_files) + ) + + # ------------------------------------------------------------------ + # Check 6: timestamp consistency + # ------------------------------------------------------------------ + + def _check_timestamp_consistency(self) -> None: + data_dir = self._data_dir() + if not data_dir.exists(): + return + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return + + overall_mode: Optional[bool] = None # True=absolute, False=relative + inconsistent: List[str] = [] + + for pf in parquet_files: + try: + df = pd.read_parquet(str(pf), columns=["timestamp"]) + except Exception: + continue + if df.empty: + continue + + has_abs = bool( + (df["timestamp"] > _ABSOLUTE_TIMESTAMP_THRESHOLD).any() + ) + has_rel = bool( + (df["timestamp"] <= _ABSOLUTE_TIMESTAMP_THRESHOLD).any() + ) + + if has_abs and has_rel: + inconsistent.append( + f" {pf.name}: mix of absolute and relative timestamps" + ) + continue + + file_mode = has_abs + if overall_mode is None: + overall_mode = file_mode + elif overall_mode != file_mode: + inconsistent.append( + f" {pf.name}: " + f"{'absolute' if file_mode else 'relative'} timestamps " + f"but earlier chunks use " + f"{'absolute' if overall_mode else 'relative'}" + ) + + if inconsistent: + self.errors.append( + "Timestamp inconsistency in data parquet files. " + "All files must use either relative (seconds from episode " + "start) or absolute (Unix epoch) timestamps:\n" + + "\n".join(inconsistent) + ) + + # ------------------------------------------------------------------ + # Check 7: episode contiguity + # ------------------------------------------------------------------ + + def _check_episode_contiguity(self) -> None: + data_dir = self._data_dir() + if not data_dir.exists(): + return + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return + + non_contiguous: List[str] = [] + + for pf in parquet_files: + try: + df = pd.read_parquet(str(pf), columns=["episode_index"]) + except Exception: + continue + if df.empty: + continue + + seen: set[int] = set() + prev: Optional[int] = None + for ep_idx in df["episode_index"]: + ep_idx = int(ep_idx) + if ep_idx != prev: + if ep_idx in seen: + non_contiguous.append( + f" {pf.name}: episode_index={ep_idx} " + f"appears non-contiguously" + ) + break + seen.add(ep_idx) + prev = ep_idx + + if non_contiguous: + self.errors.append( + "Non-contiguous episode rows in data parquet files. " + "All rows for each episode_index must be grouped " + "together:\n" + "\n".join(non_contiguous) + ) + + # ------------------------------------------------------------------ + # Check 8: no video struct columns in data parquet + # ------------------------------------------------------------------ + + def _check_no_video_columns_in_data_parquet(self) -> None: + """Data parquet files must not contain video feature columns. + + Video features (dtype="video" in info.json) are stored as separate + MP4 files. If the data parquet also contains these columns (typically + as struct), the LeRobot dataset loader will fail with + a CastError because the column names don't match the expected schema. + """ + data_dir = self._data_dir() + if not data_dir.exists(): + return + + video_keys = set(self._get_video_keys()) + if not video_keys: + return + + parquet_files = sorted(data_dir.glob("**/*.parquet")) + if not parquet_files: + return + + # Only need to check the first file — schema is consistent across chunks. + pf = parquet_files[0] + try: + df = pd.read_parquet(str(pf), columns=None, nrows=0) + except TypeError: + # Older pandas versions don't support nrows; read full file. + df = pd.read_parquet(str(pf)) + except Exception: + return + + offending = sorted(video_keys & set(df.columns)) + if offending: + self.errors.append( + f"Data parquet files contain video feature columns: " + f"{offending}. Video features (dtype='video' in info.json) " + f"must NOT appear as columns in data parquet files — they " + f"are stored as separate MP4 files. Remove these columns " + f"from the data parquet." + ) + + # ------------------------------------------------------------------ + # Check 9: episode video metadata columns + # ------------------------------------------------------------------ + + def _check_episode_video_metadata_columns(self) -> None: + """Episode parquet must include per-video-key metadata columns. + + For each video feature key, the episode parquet should contain + ``videos/{key}/chunk_index`` and ``videos/{key}/from_timestamp`` + so the dataset loader can resolve the correct video file and + starting timestamp for each episode. + """ + video_keys = self._get_video_keys() + if not video_keys: + return + + episodes_dir = self._meta_dir() / "episodes" + episodes_file = self._meta_dir() / "episodes.parquet" + + # v3 datasets may use either a flat episodes.parquet or a chunked + # episodes/ directory. + ep_columns: Optional[List[str]] = None + if episodes_dir.exists(): + parquet_files = sorted(episodes_dir.glob("**/*.parquet")) + if parquet_files: + try: + df = pd.read_parquet(str(parquet_files[0])) + ep_columns = df.columns.tolist() + except Exception: + return + elif episodes_file.exists(): + try: + df = pd.read_parquet(str(episodes_file)) + ep_columns = df.columns.tolist() + except Exception: + return + else: + return # Already flagged in check 2 + + if ep_columns is None: + return + + missing: List[str] = [] + for vkey in video_keys: + for suffix in ("chunk_index", "from_timestamp"): + col = f"videos/{vkey}/{suffix}" + if col not in ep_columns: + missing.append(col) + + if missing: + self.errors.append( + f"Episode parquet is missing video metadata columns: " + f"{missing}. For each video feature, the episode parquet " + f"must include 'videos/{{key}}/chunk_index' and " + f"'videos/{{key}}/from_timestamp' columns so the dataset " + f"loader can resolve video files and timestamps." + ) diff --git a/lerobot_validator/validator.py b/lerobot_validator/validator.py index 6d5b60c..0574a1f 100644 --- a/lerobot_validator/validator.py +++ b/lerobot_validator/validator.py @@ -10,6 +10,8 @@ from lerobot_validator.metadata_validator import MetadataValidator from lerobot_validator.annotation_validator import AnnotationValidator from lerobot_validator.lerobot_checks import LerobotDatasetChecker +from lerobot_validator.v3_metadata_checker import LerobotV3MetadataChecker +from lerobot_validator.v3_checks import validate_v3_dataset class LerobotDatasetValidator: @@ -49,8 +51,10 @@ def __init__( self.metadata_validator = MetadataValidator(self.metadata_path) self.annotation_validator = AnnotationValidator(self.annotation_path) self.lerobot_checker = LerobotDatasetChecker(self.dataset_path) + self.v3_checker = LerobotV3MetadataChecker(self.dataset_path) self.errors: List[str] = [] + self.warnings: List[str] = [] def validate(self) -> bool: """ @@ -60,16 +64,27 @@ def validate(self) -> bool: True if all validations pass, False otherwise """ self.errors = [] + self.warnings = [] # Run individual validators metadata_valid = self.metadata_validator.validate() annotation_valid = self.annotation_validator.validate() lerobot_valid = self.lerobot_checker.validate() + v3_valid = self.v3_checker.validate() # Collect errors self.errors.extend(self.metadata_validator.get_errors()) self.errors.extend(self.annotation_validator.get_errors()) self.errors.extend(self.lerobot_checker.get_errors()) + self.errors.extend(self.v3_checker.get_errors()) + + # Run P0 v3 validators + v3_issues = validate_v3_dataset(self.dataset_path) + for issue in v3_issues: + if issue.level == "error": + self.errors.append(f"[{issue.validator}] {issue.message}") + else: + self.warnings.append(f"[{issue.validator}] {issue.message}") # If basic validations pass and annotations exist, run cross-validation if metadata_valid and annotation_valid and self.annotation_validator.get_annotations(): @@ -229,8 +244,18 @@ def get_errors(self) -> List[str]: """Get all validation errors.""" return self.errors + def get_warnings(self) -> List[str]: + """Get all validation warnings.""" + return self.warnings + def print_results(self) -> None: """Print validation results.""" + if self.warnings: + print(f"Warnings ({len(self.warnings)}):\n") + for i, warning in enumerate(self.warnings, 1): + print(f" {i}. {warning}") + print() + if len(self.errors) == 0: print("✓ All validations passed!") else: diff --git a/tests/test_integration.py b/tests/test_integration.py index 3014467..cc506f7 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -13,21 +13,28 @@ def create_test_dataset(tmpdir): """Create a minimal test dataset structure.""" dataset_path = Path(tmpdir) / "dataset" dataset_path.mkdir() - + # Create meta folder meta_dir = dataset_path / "meta" meta_dir.mkdir() - - # Create data folder (lerobot datasets have parquet files here with task column) + + # Create data folder with a valid data chunk parquet data_dir = dataset_path / "data" - data_dir.mkdir() - # Create a dummy parquet file to satisfy the check - # In a real dataset, this would contain the task column - (data_dir / "chunk-000.parquet").touch() + chunk_dir = data_dir / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame({ + "episode_index": [0, 0, 1, 1], + "timestamp": [0.0, 0.033, 0.0, 0.033], + }).to_parquet(chunk_dir / "episode_000000.parquet", index=False) # Create info.json in meta folder (lerobot stores it there) info = { "fps": 30, + "codebase_version": "v3.0", + "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", + "features": { + "action": {"dtype": "float32", "shape": [7]}, + }, "episodes": { "ep_001": {"duration": 10.0, "num_frames": 300}, "ep_002": {"duration": 5.0, "num_frames": 150}, @@ -36,6 +43,19 @@ def create_test_dataset(tmpdir): with open(meta_dir / "info.json", "w") as f: json.dump(info, f) + # Create tasks.parquet + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + meta_dir / "tasks.parquet", index=False + ) + + # Create episodes.parquet with required v3 columns + pd.DataFrame({ + "episode_index": [0, 1], + "data/chunk_index": [0, 0], + "data/file_index": [0, 1], + "tasks": [["default"], ["default"]], + }).to_parquet(meta_dir / "episodes.parquet", index=False) + return dataset_path diff --git a/tests/test_is_eval_data_consistency.py b/tests/test_is_eval_data_consistency.py index d80cadc..14350a3 100644 --- a/tests/test_is_eval_data_consistency.py +++ b/tests/test_is_eval_data_consistency.py @@ -13,25 +13,45 @@ def create_test_dataset(tmpdir): """Create a minimal test dataset structure.""" dataset_path = Path(tmpdir) / "dataset" dataset_path.mkdir() - + # Create meta folder meta_dir = dataset_path / "meta" meta_dir.mkdir() - - # Create data folder (lerobot datasets have parquet files here with task column) - data_dir = dataset_path / "data" - data_dir.mkdir() - # Create a dummy parquet file to satisfy the check - # In a real dataset, this would contain the task column - (data_dir / "chunk-000.parquet").touch() - # Create info.json in meta folder (lerobot stores it there) + # Create data folder with a valid data chunk parquet + data_dir = dataset_path / "data" + chunk_dir = data_dir / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame({ + "episode_index": [0, 0, 1, 1], + "timestamp": [0.0, 0.033, 0.0, 0.033], + }).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + # Create info.json in meta folder info = { "fps": 30, + "codebase_version": "v3.0", + "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", + "features": { + "action": {"dtype": "float32", "shape": [7]}, + }, } with open(meta_dir / "info.json", "w") as f: json.dump(info, f) + # Create tasks.parquet + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + meta_dir / "tasks.parquet", index=False + ) + + # Create episodes.parquet with required v3 columns + pd.DataFrame({ + "episode_index": [0, 1], + "data/chunk_index": [0, 0], + "data/file_index": [0, 1], + "tasks": [["default"], ["default"]], + }).to_parquet(meta_dir / "episodes.parquet", index=False) + return dataset_path diff --git a/tests/test_metadata_validator.py b/tests/test_metadata_validator.py index 797d448..4856411 100644 --- a/tests/test_metadata_validator.py +++ b/tests/test_metadata_validator.py @@ -227,6 +227,32 @@ def test_invalid_gcs_uri_missing_path(): assert any("must include path after bucket" in err for err in errors) +def test_null_start_timestamp_fails(): + """Test that null/missing start_timestamp is rejected.""" + with tempfile.TemporaryDirectory() as tmpdir: + metadata_path = Path(tmpdir) / "custom_metadata.csv" + + df = pd.DataFrame( + { + "episode_index": [0, 1], + "operator_id": ["op1", "op1"], + "is_eval_episode": [False, False], + "episode_id": ["ep_001", "ep_002"], + "start_timestamp": [1730455200, None], + "checkpoint_path": ["", ""], + "success": [True, False], + "station_id": ["station_1", "station_1"], + "robot_id": ["robot_1", "robot_1"], + } + ) + df.to_csv(metadata_path, index=False) + + validator = MetadataValidator(metadata_path) + assert validator.validate() is False + errors = validator.get_errors() + assert any("missing/null" in e for e in errors) + + def test_valid_gcs_uri_formats(): """Test validation with various valid GCS URI formats.""" with tempfile.TemporaryDirectory() as tmpdir: diff --git a/tests/test_v3_checks.py b/tests/test_v3_checks.py new file mode 100644 index 0000000..dec9228 --- /dev/null +++ b/tests/test_v3_checks.py @@ -0,0 +1,707 @@ +"""Tests for P0 v3 validators (lerobot_validator.v3_checks).""" + +import json +import tempfile +from pathlib import Path +from typing import Any, Dict, List + +import pandas as pd + +from lerobot_validator.v3_checks import ( + Issue, + validate_codebase_version, + validate_custom_metadata_csv, + validate_feature_dtypes, + validate_feature_shapes, + validate_start_timestamp, + validate_tasks_format, + validate_timestamps, + validate_v3_dataset, + validate_video_frame_count, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_dataset(tmpdir: str) -> Path: + """Create a minimal dataset directory skeleton.""" + root = Path(tmpdir) / "dataset" + root.mkdir() + (root / "meta").mkdir() + (root / "data").mkdir() + return root + + +def _write_info(root: Path, info: Dict[str, Any]) -> None: + with open(root / "meta" / "info.json", "w") as f: + json.dump(info, f) + + +def _minimal_info(**overrides: Any) -> Dict[str, Any]: + info: Dict[str, Any] = { + "fps": 30, + "codebase_version": "v3.0", + "chunks_size": 1000, + "features": { + "observation.images.top": { + "dtype": "video", + "shape": [480, 640, 3], + }, + "action": { + "dtype": "float32", + "shape": [7], + }, + }, + } + info.update(overrides) + return info + + +def _write_tasks_parquet(root: Path) -> None: + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + root / "meta" / "tasks.parquet", index=False + ) + + +def _write_custom_metadata(root: Path, df: pd.DataFrame) -> None: + df.to_csv(root / "meta" / "custom_metadata.csv", index=False) + + +def _valid_metadata_df() -> pd.DataFrame: + return pd.DataFrame( + { + "episode_index": [0, 1], + "operator_id": ["op1", "op1"], + "is_eval_episode": [False, False], + "episode_id": ["ep_001", "ep_002"], + "start_timestamp": [1730455200.0, 1730458800.0], + "checkpoint_path": ["", ""], + "success": [True, False], + "station_id": ["station_1", "station_1"], + "robot_id": ["robot_1", "robot_1"], + } + ) + + +def _errors(issues: List[Issue]) -> List[Issue]: + return [i for i in issues if i.level == "error"] + + +def _warnings(issues: List[Issue]) -> List[Issue]: + return [i for i in issues if i.level == "warning"] + + +# =================================================================== +# V1: validate_tasks_format +# =================================================================== + + +class TestValidateTasksFormat: + def test_parquet_present_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + + issues = validate_tasks_format(root) + assert len(_errors(issues)) == 0 + + def test_neither_file_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + issues = validate_tasks_format(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "tasks.parquet not found" in errors[0].message + + def test_jsonl_only_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + (root / "meta" / "tasks.jsonl").write_text( + '{"task_index": 0, "task": "pick"}\n' + ) + + issues = validate_tasks_format(root) + assert len(_errors(issues)) == 0 + warnings = _warnings(issues) + assert len(warnings) == 1 + assert "tasks.jsonl" in warnings[0].message + + def test_both_files_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + (root / "meta" / "tasks.jsonl").write_text( + '{"task_index": 0, "task": "pick"}\n' + ) + + issues = validate_tasks_format(root) + assert len(issues) == 0 + + +# =================================================================== +# V2: validate_codebase_version +# =================================================================== + + +class TestValidateCodebaseVersion: + def test_v3_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v3.0")) + + issues = validate_codebase_version(root) + assert len(issues) == 0 + + def test_v3_minor_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v3.1.2")) + + issues = validate_codebase_version(root) + assert len(issues) == 0 + + def test_v2_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info(codebase_version="v2.1")) + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "v3." in errors[0].message + + def test_missing_version_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + del info["codebase_version"] + _write_info(root, info) + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "missing" in errors[0].message.lower() + + def test_no_info_json_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no info.json at all + + issues = validate_codebase_version(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "info.json" in errors[0].message + + +# =================================================================== +# V5: validate_feature_shapes +# =================================================================== + + +class TestValidateFeatureShapes: + def test_valid_shapes_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_empty_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["action"]["shape"] = [] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "empty shape" in errors[0].message + assert "action" in errors[0].message + + def test_scalar_shape_1_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["scalar_feat"] = {"dtype": "float32", "shape": [1]} + _write_info(root, info) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_image_feature_2d_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["observation.images.top"]["shape"] = [640, 480] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "3-element shape" in errors[0].message + + def test_video_feature_4d_shape_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["observation.images.top"]["shape"] = [1, 480, 640, 3] + _write_info(root, info) + + issues = validate_feature_shapes(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "3-element shape" in errors[0].message + + def test_image_dtype_3d_shape_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["cam"] = {"dtype": "image", "shape": [480, 640, 3]} + _write_info(root, info) + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + def test_no_info_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no info.json + + issues = validate_feature_shapes(root) + assert len(issues) == 0 + + +# =================================================================== +# V7: validate_timestamps +# =================================================================== + + +class TestValidateTimestamps: + def test_relative_timestamps_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0], + "timestamp": [0.0, 0.033, 0.066], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + assert len(issues) == 0 + + def test_absolute_timestamps_error(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [1_700_000_000.0, 1_700_000_000.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "absolute Unix epoch" in errors[0].message + + def test_non_monotonic_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0], + "timestamp": [0.0, 0.066, 0.033], # non-monotonic + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("non-monotonically" in w.message for w in warnings) + + def test_large_starting_offset_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [5.0, 5.033], # starts at 5s, not near 0 + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("starts at timestamp" in w.message for w in warnings) + + def test_no_data_dir_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # data dir is empty (no parquet files) + + issues = validate_timestamps(root) + assert len(issues) == 0 + + def test_no_episode_index_column_still_checks_absolute(self): + """Even without episode_index column, absolute timestamps should be caught.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "timestamp": [1_700_000_000.0, 1_700_000_000.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_timestamps(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "absolute Unix epoch" in errors[0].message + + +# =================================================================== +# V11: validate_custom_metadata_csv +# =================================================================== + + +class TestValidateCustomMetadataCsv: + def test_valid_metadata_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_custom_metadata_csv(root) + assert len(_errors(issues)) == 0 + + def test_missing_file_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "not found" in errors[0].message + + def test_missing_episode_index_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_id": ["ep_001", "ep_002"], + "operator_id": ["op1", "op1"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "episode_index" in errors[0].message + + def test_missing_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0, 1], + "operator_id": ["op1", "op1"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "episode_id" in errors[0].message + + def test_null_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "episode_id"] = None + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert any("null" in e.message for e in errors) + + def test_duplicate_episode_id_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[1, "episode_id"] = "ep_001" # duplicate + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + errors = _errors(issues) + assert any("duplicate" in e.message for e in errors) + + def test_missing_optional_columns_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0, 1], + "episode_id": ["ep_001", "ep_002"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_custom_metadata_csv(root) + warnings = _warnings(issues) + assert len(warnings) >= 1 + assert any("optional columns" in w.message for w in warnings) + + def test_all_columns_present_no_warnings(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_custom_metadata_csv(root) + warnings = _warnings(issues) + assert len(warnings) == 0 + + +# =================================================================== +# V12: validate_start_timestamp +# =================================================================== + + +class TestValidateStartTimestamp: + def test_valid_timestamps_pass(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_custom_metadata(root, _valid_metadata_df()) + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + def test_null_timestamp_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = None + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "missing/null" in errors[0].message + + def test_below_threshold_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = 100.0 # relative offset, not epoch + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "below year-2000 threshold" in errors[0].message + + def test_above_max_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df.loc[0, "start_timestamp"] = 5_000_000_000.0 # year ~2128 + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "above year-2100 threshold" in errors[0].message + + def test_non_numeric_errors(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = _valid_metadata_df() + df["start_timestamp"] = df["start_timestamp"].astype(str) + df.loc[0, "start_timestamp"] = "not-a-number" + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + errors = _errors(issues) + assert len(errors) == 1 + assert "not a valid float" in errors[0].message + + def test_missing_csv_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # no custom_metadata.csv + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + def test_missing_column_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + df = pd.DataFrame( + { + "episode_index": [0], + "episode_id": ["ep_001"], + } + ) + _write_custom_metadata(root, df) + + issues = validate_start_timestamp(root) + assert len(issues) == 0 + + +# =================================================================== +# validate_v3_dataset (combined runner) +# =================================================================== + + +class TestValidateV3Dataset: + def test_fully_valid_dataset_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_custom_metadata(root, _valid_metadata_df()) + + # Write relative timestamps + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [0.0, 0.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + issues = validate_v3_dataset(root) + errors = _errors(issues) + assert len(errors) == 0 + + def test_multiple_issues_collected(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + # No info.json -> V2 error + # No tasks.parquet -> V1 error + # No custom_metadata.csv -> V11 error + + issues = validate_v3_dataset(root) + errors = _errors(issues) + # Should have at least V1 + V2 + V11 errors + assert len(errors) >= 3 + + def test_issue_str_representation(self): + issue = Issue(level="error", validator="test_validator", message="test message") + assert str(issue) == "[error] test_validator: test message" + + +# =================================================================== +# V13: validate_video_frame_count +# =================================================================== + + +class TestValidateVideoFrameCount: + def test_no_info_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + issues = validate_video_frame_count(root) + assert len(issues) == 0 + + def test_no_video_features_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + # Replace video feature with a non-video one. + info["features"] = {"action": {"dtype": "float32", "shape": [7]}} + _write_info(root, info) + + issues = validate_video_frame_count(root) + assert len(issues) == 0 + + def test_no_data_dir_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + # Remove data dir. + (root / "data").rmdir() + + issues = validate_video_frame_count(root) + assert len(issues) == 0 + + def test_no_parquet_files_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + # data/ exists but is empty. + issues = validate_video_frame_count(root) + assert len(issues) == 0 + + +# =================================================================== +# V14: validate_feature_dtypes +# =================================================================== + + +class TestValidateFeatureDtypes: + def test_no_string_features_passes(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + issues = validate_feature_dtypes(root) + assert len(issues) == 0 + + def test_string_feature_warns(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["instruction.text"] = {"dtype": "string", "shape": [1]} + _write_info(root, info) + + issues = validate_feature_dtypes(root) + warnings = _warnings(issues) + assert len(warnings) == 1 + assert "instruction.text" in warnings[0].message + assert "string" in warnings[0].message.lower() + + def test_multiple_string_features_single_warning(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["instruction.text"] = {"dtype": "string", "shape": [1]} + info["features"]["observation.meta.tool"] = {"dtype": "string", "shape": [1]} + _write_info(root, info) + + issues = validate_feature_dtypes(root) + warnings = _warnings(issues) + assert len(warnings) == 1 + assert "instruction.text" in warnings[0].message + assert "observation.meta.tool" in warnings[0].message + + def test_no_info_returns_empty(self): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + issues = validate_feature_dtypes(root) + assert len(issues) == 0 diff --git a/tests/test_v3_metadata_checker.py b/tests/test_v3_metadata_checker.py new file mode 100644 index 0000000..317ebf0 --- /dev/null +++ b/tests/test_v3_metadata_checker.py @@ -0,0 +1,484 @@ +"""Tests for LeRobot v3 metadata checker.""" + +import json +import tempfile +from pathlib import Path +from typing import Any, Dict + +import pandas as pd + +from lerobot_validator.v3_metadata_checker import LerobotV3MetadataChecker + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_dataset(tmpdir: str) -> Path: + root = Path(tmpdir) / "dataset" + root.mkdir() + (root / "meta").mkdir() + (root / "data").mkdir() + return root + + +def _write_info(root: Path, info: Dict[str, Any]) -> None: + with open(root / "meta" / "info.json", "w") as f: + json.dump(info, f) + + +def _minimal_info() -> Dict[str, Any]: + return { + "fps": 30, + "chunks_size": 1000, + "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquet", + "video_path": "videos/{video_key}/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.mp4", + "features": { + "observation.images.top": { + "dtype": "video", + "shape": [480, 640, 3], + }, + "action": { + "dtype": "float32", + "shape": [7], + }, + }, + } + + +def _write_tasks_parquet(root: Path) -> None: + pd.DataFrame({"task_index": [0], "task": ["default"]}).to_parquet( + root / "meta" / "tasks.parquet", index=False + ) + + +def _write_episodes_parquet(root: Path) -> None: + pd.DataFrame( + { + "episode_index": [0, 1], + "data/chunk_index": [0, 0], + "data/file_index": [0, 1], + "tasks": [["default"], ["default"]], + } + ).to_parquet(root / "meta" / "episodes.parquet", index=False) + + +# --------------------------------------------------------------------------- +# Check 1: tasks.parquet +# --------------------------------------------------------------------------- + + +def test_tasks_parquet_present_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any("tasks.parquet" in e for e in checker.get_errors()) + + +def test_tasks_jsonl_only_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + (root / "meta" / "tasks.jsonl").write_text( + '{"task_index": 0, "task": "pick"}\n' + ) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("tasks.jsonl" in e for e in checker.get_errors()) + + +def test_no_tasks_file_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("tasks.parquet" in e for e in checker.get_errors()) + + +# --------------------------------------------------------------------------- +# Check 2: episodes parquet columns +# --------------------------------------------------------------------------- + + +def test_episodes_parquet_missing_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("episodes.parquet" in e for e in checker.get_errors()) + + +def test_episodes_parquet_missing_column_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + pd.DataFrame( + { + "episode_index": [0], + "data/chunk_index": [0], + "data/file_index": [0], + # missing "tasks" column + } + ).to_parquet(root / "meta" / "episodes.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "tasks" in e and "missing" in e.lower() + for e in checker.get_errors() + ) + + +def test_episodes_parquet_all_columns_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_episodes_parquet(root) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any("episodes.parquet" in e for e in checker.get_errors()) + + +# --------------------------------------------------------------------------- +# Check 3: feature shapes +# --------------------------------------------------------------------------- + + +def test_empty_feature_shape_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["action"]["shape"] = [] + _write_info(root, info) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("empty shape" in e for e in checker.get_errors()) + + +def test_valid_scalar_shape_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["features"]["action"]["shape"] = [1] + _write_info(root, info) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any("empty shape" in e for e in checker.get_errors()) + + +# --------------------------------------------------------------------------- +# Check 4: path templates +# --------------------------------------------------------------------------- + + +def test_data_path_missing_placeholder_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + # missing {episode_chunk} + info["data_path"] = "data/chunk-000/episode_{episode_index:06d}.parquet" + _write_info(root, info) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "episode_chunk" in e and "data_path" in e + for e in checker.get_errors() + ) + + +def test_video_path_missing_video_key_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + info = _minimal_info() + info["video_path"] = ( + "videos/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.mp4" + ) + _write_info(root, info) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "video_key" in e and "video_path" in e + for e in checker.get_errors() + ) + + +def test_valid_path_templates_pass(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any("template" in e for e in checker.get_errors()) + + +# --------------------------------------------------------------------------- +# Check 5: video files exist +# --------------------------------------------------------------------------- + + +def test_missing_video_files_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_episodes_parquet(root) + # no actual video files + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any("Missing video files" in e for e in checker.get_errors()) + + +def test_present_video_files_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_tasks_parquet(root) + _write_episodes_parquet(root) + + for ep_idx in [0, 1]: + vdir = ( + root + / "videos" + / "observation.images.top" + / "chunk-000" + ) + vdir.mkdir(parents=True, exist_ok=True) + (vdir / f"episode_{ep_idx:06d}.mp4").write_bytes(b"\x00") + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "Missing video files" in e for e in checker.get_errors() + ) + + +# --------------------------------------------------------------------------- +# Check 6: timestamp consistency +# --------------------------------------------------------------------------- + + +def test_relative_timestamps_consistent_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0], + "timestamp": [0.0, 0.033, 0.066], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "Timestamp inconsistency" in e for e in checker.get_errors() + ) + + +def test_mixed_timestamp_modes_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + # relative + pd.DataFrame( + { + "episode_index": [0, 0], + "timestamp": [0.0, 0.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + # absolute (Unix epoch) + pd.DataFrame( + { + "episode_index": [1, 1], + "timestamp": [1_700_000_000.0, 1_700_000_000.033], + } + ).to_parquet(chunk_dir / "episode_000001.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "Timestamp inconsistency" in e for e in checker.get_errors() + ) + + +# --------------------------------------------------------------------------- +# Check 7: episode contiguity +# --------------------------------------------------------------------------- + + +def test_contiguous_episodes_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 0, 0, 1, 1, 1], + "timestamp": [0.0, 0.033, 0.066, 0.0, 0.033, 0.066], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "Non-contiguous" in e for e in checker.get_errors() + ) + + +def test_non_contiguous_episodes_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0, 1, 0], # non-contiguous + "timestamp": [0.0, 0.0, 0.033], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "Non-contiguous" in e for e in checker.get_errors() + ) + + +# --------------------------------------------------------------------------- +# Check 8: no video struct columns in data parquet +# --------------------------------------------------------------------------- + + +def test_data_parquet_without_video_columns_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + pd.DataFrame( + { + "action": [[0.1] * 7], + "episode_index": [0], + "timestamp": [0.0], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "video feature columns" in e for e in checker.get_errors() + ) + + +def test_data_parquet_with_video_struct_columns_fails(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + chunk_dir = root / "data" / "chunk-000" + chunk_dir.mkdir(parents=True) + # Simulate a data parquet that erroneously includes the video + # feature key as a struct column (path + timestamp). + pd.DataFrame( + { + "action": [[0.1] * 7], + "observation.images.top": [ + {"path": "videos/top/chunk-000/ep_000000.mp4", "timestamp": 0.0} + ], + "episode_index": [0], + "timestamp": [0.0], + } + ).to_parquet(chunk_dir / "episode_000000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "video feature columns" in e for e in checker.get_errors() + ) + + +# --------------------------------------------------------------------------- +# Check 9: episode video metadata columns +# --------------------------------------------------------------------------- + + +def test_episode_parquet_with_video_metadata_passes(): + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + pd.DataFrame( + { + "episode_index": [0, 1], + "data/chunk_index": [0, 0], + "data/file_index": [0, 1], + "tasks": [["default"], ["default"]], + "videos/observation.images.top/chunk_index": [0, 0], + "videos/observation.images.top/from_timestamp": [0.0, 0.0], + } + ).to_parquet(root / "meta" / "episodes.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert not any( + "video metadata columns" in e for e in checker.get_errors() + ) + + +def test_episode_parquet_missing_video_metadata_fails(): + """Episode parquet without videos/{key}/chunk_index and from_timestamp.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + _write_episodes_parquet(root) # Has data/* cols but no videos/* cols + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "video metadata columns" in e for e in checker.get_errors() + ) + + +def test_episode_parquet_chunked_dir_missing_video_metadata_fails(): + """Same check works for chunked episodes/ directory layout.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = _make_dataset(tmpdir) + _write_info(root, _minimal_info()) + ep_dir = root / "meta" / "episodes" / "chunk-000" + ep_dir.mkdir(parents=True) + pd.DataFrame( + { + "episode_index": [0], + "data/chunk_index": [0], + "data/file_index": [0], + "tasks": [["default"]], + } + ).to_parquet(ep_dir / "file-000.parquet", index=False) + + checker = LerobotV3MetadataChecker(root) + checker.validate() + assert any( + "video metadata columns" in e for e in checker.get_errors() + )