diff --git a/packages/overture-schema-cli/pyproject.toml b/packages/overture-schema-cli/pyproject.toml index 62ed4c3e3..c62e214a5 100644 --- a/packages/overture-schema-cli/pyproject.toml +++ b/packages/overture-schema-cli/pyproject.toml @@ -2,10 +2,8 @@ dependencies = [ "overture-schema-core", "pydantic>=2.0", - "pyyaml>=6.0.2", "click>=8.0", "rich>=13.0", - "yamlcore>=0.0.4", ] description = "Command-line interface for Overture Maps schema validation and JSON Schema generation" dynamic = ["version"] diff --git a/packages/overture-schema-cli/src/overture/schema/cli/__init__.py b/packages/overture-schema-cli/src/overture/schema/cli/__init__.py index 85045f0c0..6e8250bff 100644 --- a/packages/overture-schema-cli/src/overture/schema/cli/__init__.py +++ b/packages/overture-schema-cli/src/overture/schema/cli/__init__.py @@ -1,31 +1,5 @@ """CLI subpackage for overture-schema.""" -from .commands import ( - cli, - create_union_type_from_models, - handle_generic_error, - handle_validation_error, - load_input, - perform_validation, - resolve_types, -) -from .types import ( - ErrorLocation, - ModelDict, - UnionType, - ValidationErrorDict, -) +from .commands import cli -__all__ = [ - "cli", - "create_union_type_from_models", - "handle_generic_error", - "handle_validation_error", - "load_input", - "perform_validation", - "resolve_types", - "ErrorLocation", - "ModelDict", - "UnionType", - "ValidationErrorDict", -] +__all__ = ["cli"] diff --git a/packages/overture-schema-cli/src/overture/schema/cli/commands.py b/packages/overture-schema-cli/src/overture/schema/cli/commands.py index 8fdd8bdf4..d3f6e4b7a 100644 --- a/packages/overture-schema-cli/src/overture/schema/cli/commands.py +++ b/packages/overture-schema-cli/src/overture/schema/cli/commands.py @@ -1,266 +1,24 @@ """Click-based CLI for overture-schema package.""" -import builtins import json import sys -from collections import Counter, defaultdict -from functools import reduce -from operator import or_ -from pathlib import Path -from typing import Annotated, Any, Literal, cast, get_args, get_origin +from collections import defaultdict +from importlib.metadata import entry_points import click -import yaml -from pydantic import BaseModel, Field, Tag, TypeAdapter, ValidationError +from pydantic import BaseModel from rich.console import Console -from yamlcore import CoreLoader # type: ignore -from overture.schema.core import OvertureFeature -from overture.schema.core.discovery import ModelKey, discover_models -from overture.schema.system.feature import Feature +from overture.schema.core.discovery import ModelKey, discover_models, resolve_types from overture.schema.system.json_schema import json_schema from .docstrings import get_model_docstring, get_theme_module_docstring -from .error_formatting import ( - format_validation_error, - format_validation_errors_verbose, - group_errors_by_discriminator, - select_most_likely_errors, -) from .output import rewrap -from .type_analysis import StructuralTuple, get_item_index, introspect_union -from .types import ErrorLocation, ModelDict, UnionType -# Console instances for rich output stdout = Console(highlight=False) stderr = Console(highlight=False, file=sys.stderr) -def _is_geojson_feature(data: dict) -> bool: - """Check if data is in GeoJSON Feature format.""" - return data.get("type") == "Feature" and "properties" in data - - -def _can_discriminate(model_class: object) -> bool: - """Check if a model can participate in a discriminated union. - - Returns True if the model is an OvertureFeature with a single literal 'type' value. - """ - if not (isinstance(model_class, type) and issubclass(model_class, OvertureFeature)): - return False - - return _type_literal(cast(type[OvertureFeature], model_class)) is not None - - -def _type_literal(feature_class: type[OvertureFeature]) -> str | None: - """Extract the literal value from an OvertureFeature's 'type' field. - - Returns the literal type value, or None if not a single literal. - """ - if "type" not in feature_class.model_fields: - return None - - type_annotation = feature_class.model_fields["type"].annotation - - # Unwrap Annotated if present - while get_origin(type_annotation) is Annotated: - type_annotation = get_args(type_annotation)[0] - - # Check if it's a Literal with a single value - if get_origin(type_annotation) is Literal: - args = get_args(type_annotation) - if len(args) == 1 and isinstance(args[0], str): - return args[0] - - return None - - -def _discriminated_union(feature_classes: tuple[type[OvertureFeature], ...]) -> Any: # noqa: ANN401 - """Create a discriminated union of Overture features on the 'type' field.""" - if not feature_classes: - return None - elif len(feature_classes) == 1: - # Single model doesn't need a discriminated union - return feature_classes[0] - - return Annotated[ - reduce( - or_, - (Annotated[f, Tag(cast(str, _type_literal(f)))] for f in feature_classes), - ), - Field(discriminator=Feature.field_discriminator("type", *feature_classes)), - ] - - -def create_union_type_from_models( - models: ModelDict, -) -> UnionType: - """Create a union type from a dict of models. - - Uses discriminated unions for OvertureFeatures when possible for better performance. - - Args - ---- - models: Dict mapping ModelKey to Pydantic model classes - - Returns - ------- - Union type suitable for TypeAdapter - """ - if not models: - raise ValueError("No models provided") - - model_list = list(models.values()) - - # Separate models that can be discriminated from those that cannot - discriminated_models = tuple( - cast(type[OvertureFeature], m) for m in model_list if _can_discriminate(m) - ) - discriminated_union = _discriminated_union(discriminated_models) - - non_discriminated_models = [m for m in model_list if not _can_discriminate(m)] - # Use None only if list is empty, otherwise build union - non_discriminated_union = ( - reduce(or_, non_discriminated_models) if non_discriminated_models else None - ) - - # Combine discriminated and non-discriminated unions - if discriminated_union and non_discriminated_union: - return discriminated_union | non_discriminated_union - elif discriminated_union: - return discriminated_union - elif non_discriminated_union: - return non_discriminated_union - else: - raise RuntimeError("No valid models found") - - -def validate_feature(data: dict, model_type: UnionType) -> BaseModel: - """Validate a single feature against the model type. - - Args - ---- - data: Feature data to validate (GeoJSON or flat format) - model_type: Union type for validation - - Returns - ------- - Validated model instance - - Raises - ------ - ValidationError: If validation fails - """ - adapter = TypeAdapter(model_type) - if isinstance(data, dict) and _is_geojson_feature(data): - # Use validate_json to trigger the model's GeoJSON handling - return cast(BaseModel, adapter.validate_json(json.dumps(data))) - return cast(BaseModel, adapter.validate_python(data)) - - -def validate_features(data: list, model_type: UnionType) -> list[BaseModel]: - """Validate a list of features against the model type. - - Args - ---- - data: List of feature data to validate (GeoJSON or flat format) - model_type: Union type for validation - - Returns - ------- - List of validated model instances - - Raises - ------ - ValidationError: If validation fails - """ - # Check if any items are GeoJSON features - has_geojson = any( - isinstance(item, dict) and _is_geojson_feature(item) for item in data - ) - - list_type = list[model_type] # type: ignore[misc,valid-type] - adapter = TypeAdapter(list_type) - - if has_geojson: - # Use validate_json to trigger the model's GeoJSON handling - return cast(list[BaseModel], adapter.validate_json(json.dumps(data))) - return cast(list[BaseModel], adapter.validate_python(data)) - - -def resolve_types( - use_overture_types: bool, - namespace: str | None, - theme_names: tuple[str, ...], - type_names: tuple[str, ...], -) -> UnionType: - """Resolve CLI options into a model type suitable for parse_feature. - - Args - ---- - use_overture_types: Boolean from --overture-types flag - namespace: Namespace to filter by (e.g., "overture", "annex") - theme_names: List of theme names from --theme option - type_names: List of type names from --type option - - Returns - ------- - Model type suitable for passing to parse_feature - """ - # Determine effective namespace - effective_namespace = "overture" if use_overture_types else namespace - - # Discover models once with the appropriate namespace - all_models = discover_models(namespace=effective_namespace) - - # Filter models based on CLI options - filtered_models: ModelDict = {} - - if use_overture_types: - filtered_models = all_models - - elif theme_names and not type_names: - # Theme-only mode: all types in specified themes - for key, model_class in all_models.items(): - if key.theme in theme_names: - filtered_models[key] = model_class - - elif type_names and not theme_names: - # Type-only mode: find matching types across all themes - for key, model_class in all_models.items(): - if key.type in type_names: - filtered_models[key] = model_class - - elif type_names and theme_names: - # Both specified: find matching types within specified themes - for key, model_class in all_models.items(): - if key.theme in theme_names and key.type in type_names: - filtered_models[key] = model_class - - else: - # No filters specified - use all models - filtered_models = all_models - - if not filtered_models: - raise ValueError("No models found matching the specified criteria") - - return create_union_type_from_models(filtered_models) - - -def get_source_name(filename: Path) -> str: - """Get display name for input source. - - Args - ---- - filename: Path to input file or "-" for stdin - - Returns - ------- - Display name: "" for stdin input, otherwise the filename - """ - return "" if str(filename) == "-" else str(filename) - - @click.group() @click.version_option(package_name="overture-schema") def cli() -> None: @@ -288,428 +46,22 @@ def cli() -> None: pass -def load_input(filename: Path) -> tuple[dict | list, str]: - """Load and parse input from file or stdin. - - Args - ---- - filename: Path to input file, or "-" for stdin - - Returns - ------- - Tuple of (parsed_data, source_name) - - Raises - ------ - yaml.YAMLError: If input is invalid YAML/JSON - SystemExit: If filename doesn't exist or isn't a file - """ - if str(filename) == "-": - # Read all stdin content - content = sys.stdin.read() - - # Try to detect JSONL format (newline-delimited JSON) - # JSONL has multiple non-empty lines, each containing a complete JSON object - lines = [line.strip() for line in content.strip().split("\n") if line.strip()] - - if len(lines) > 1: - # Attempt to parse as JSONL - try: - parsed_lines = [json.loads(line) for line in lines] - return parsed_lines, "" - except json.JSONDecodeError: - # Not valid JSONL, fall through to YAML parser - pass - - # Parse as single YAML/JSON document - import io - - data = yaml.load(io.StringIO(content), Loader=CoreLoader) - return data, "" - - if not filename.is_file(): - raise click.UsageError(f"'{filename}' is not a file.") - - # Warn about unexpected file extensions - if filename.suffix not in {".json", ".yaml", ".yml", ".geojson"}: - click.echo( - f"Warning: File '{filename}' has unexpected extension. " - f"Expecting .json, .yaml, .yml, or .geojson", - err=True, - ) - - # Use YAML-1.2-compliant loader (YAML-1.2 dropped support for yes/no boolean values) - with filename.open("r", encoding="utf-8") as f: - data = yaml.load(f, Loader=CoreLoader) - - return data, str(filename) - - -def perform_validation(data: dict | list, model_type: UnionType) -> None: - """Validate data based on its structure. - - Automatically detects and handles three input formats: - - Single feature (dict) - - List of features (list) - - GeoJSON FeatureCollection (dict with type="FeatureCollection") - - Args - ---- - data : dict | list - Parsed data to validate - model_type : UnionType - Union type for validation - - Raises - ------ - ValidationError - If validation fails - """ - if isinstance(data, list): - # List of features - validate_features(data, model_type) - elif isinstance(data, dict) and data.get("type") == "FeatureCollection": - # GeoJSON FeatureCollection - validate_features(data["features"], model_type) - else: - # Single feature - validate_feature(data, model_type) - - -def compute_collection_statistics( - item_types: dict[int, builtins.type[BaseModel] | None], - filtered_errors: list, -) -> tuple[ - int, - Counter[builtins.type[BaseModel] | None], - dict[builtins.type[BaseModel], set[int]], -]: - """Compute validation statistics for heterogeneous collections. - - Args - ---- - item_types : dict[int, type[BaseModel] | None] - Mapping from item index to detected model type - filtered_errors : list - List of filtered validation errors - - Returns - ------- - tuple - Tuple of (items_without_errors, type_counts, items_with_errors_by_type) - """ - # Compute statistics: group items by type - type_counts: Counter[builtins.type[BaseModel] | None] = Counter(item_types.values()) - - # Determine total number of items (max index + 1, or count from data) - max_index = max(item_types.keys()) if item_types else -1 - total_items = max_index + 1 - - # Count items with errors per type - items_with_errors_by_type: dict[builtins.type[BaseModel], set[int]] = {} - for err in filtered_errors: - idx = get_item_index(err["loc"]) - if idx is not None and idx in item_types: - model_type_cls = item_types[idx] - if model_type_cls is not None: - if model_type_cls not in items_with_errors_by_type: - items_with_errors_by_type[model_type_cls] = set() - items_with_errors_by_type[model_type_cls].add(idx) - - # Count items without any errors - items_without_errors = total_items - len( - { - idx - for idx in item_types.keys() - if any(get_item_index(err["loc"]) == idx for err in filtered_errors) - } - ) - - return items_without_errors, type_counts, items_with_errors_by_type - - -def print_collection_statistics( - items_without_errors: int, - type_counts: Counter[builtins.type[BaseModel] | None], - items_with_errors_by_type: dict[builtins.type[BaseModel], set[int]], - stderr: Console, -) -> None: - """Print validation statistics for heterogeneous collections. +def load_plugins(group: click.Group) -> None: + """Load plugin subcommands from entry points. - Args - ---- - items_without_errors : int - Count of items with no validation errors - type_counts : Counter[type[BaseModel] | None] - Counter of items by model type - items_with_errors_by_type : dict[type[BaseModel], set[int]] - Mapping from model type to set of item indices with errors - stderr : Console - Console for stderr output + Iterates the ``overture.schema.cli`` entry point group. Each entry point + should resolve to a ``click.Command`` or ``click.Group``. Broken plugins + emit a warning to stderr and are skipped. Names that collide with + already-registered commands are skipped. """ - stderr.print(" [dim]Collection statistics:[/dim]") - - # Show items without errors first - # TODO: Once we switch to parse_features (instead of validate_features), - # we can include type information for items without errors by parsing - # the input and tracking which items validated successfully and their types. - # This would allow output like: "Building: 2 confirmed (no errors)" - if items_without_errors > 0: - stderr.print( - f" • {items_without_errors} item{'s' if items_without_errors != 1 else ''} with no errors", - style="dim", - ) - - # Show per-type statistics - for model_type_cls, count in type_counts.most_common(): - if model_type_cls is not None: - items_with_errors = len( - items_with_errors_by_type.get(model_type_cls, set()) - ) - valid_count = count - items_with_errors - - if valid_count > 0: - stderr.print( - f" • {model_type_cls.__name__}: {valid_count} confirmed, {items_with_errors} with errors", - style="dim", - ) - else: - stderr.print( - f" • {model_type_cls.__name__} (probable): {items_with_errors} item{'s' if items_with_errors != 1 else ''} with errors", - style="dim", - ) - stderr.print() - - -def handle_validation_error( - e: ValidationError, - model_type: UnionType, - stderr: Console, - original_data: dict | list | None = None, - show_fields: list[str] | None = None, -) -> None: - """Handle and format validation errors with rich contextual information. - - Groups errors by discriminator, selects most likely error groups, and provides - helpful diagnostics for heterogeneous collections and ambiguous types. - - Args - ---- - e : ValidationError - ValidationError from pydantic - model_type : UnionType - Union type used for validation - stderr : Console - Console for stderr output - original_data : dict | list | None - Original input data for error display - show_fields : list[str] | None - List of field names to display alongside errors - """ - # Compute metadata once upfront - metadata = introspect_union(model_type) - - # Create cache for structural tuple computation (optimizes systematic errors) - structural_cache: dict[ErrorLocation, StructuralTuple] = {} - - # Group errors by discriminator path and select most likely group(s) - error_groups = group_errors_by_discriminator(e.errors(), metadata, structural_cache) - filtered_errors, is_tied, is_heterogeneous, item_types = select_most_likely_errors( - error_groups, - metadata=metadata, - all_errors=e.errors(), - structural_cache=structural_cache, - ) - - # Show heterogeneity warning if collection has mixed types - if is_heterogeneous: - stderr.print( - " ⚠ Heterogeneous collection: Data contains multiple feature types.", - style="yellow", - ) - stderr.print( - " • Consider validating each type separately with --theme or --type", - style="dim", - ) - stderr.print() - - # Compute and display statistics if there are errors to report - if filtered_errors: - items_without_errors, type_counts, items_with_errors_by_type = ( - compute_collection_statistics(item_types, filtered_errors) - ) - print_collection_statistics( - items_without_errors, type_counts, items_with_errors_by_type, stderr - ) - - # Show tie indicator if multiple groups had same error count - elif is_tied: - stderr.print( - " ⚠ Ambiguous: Data matches multiple types equally. Consider:", - style="yellow", - ) - stderr.print( - " • Specifying --theme or --type to narrow validation", style="dim" - ) - stderr.print(" • Adding discriminator fields to clarify intent", style="dim") - stderr.print() - - # Group errors by item - - errors_by_item: dict[int | None, list] = defaultdict(list) - for error in filtered_errors: - item_idx = get_item_index(error["loc"]) - errors_by_item[item_idx].append(error) - - # Display errors grouped by item - - for item_idx, item_errors in errors_by_item.items(): - # Determine item type - error_item_type = None - if item_idx is not None and item_idx in item_types: - error_item_type = item_types.get(item_idx) - - # Try verbose display first - displayed = format_validation_errors_verbose( - item_errors, - stderr, - metadata=metadata, - item_type=error_item_type, - structural_cache=structural_cache, - original_data=original_data, - item_index=item_idx, - show_fields=show_fields, - ) - - # Fall back to non-verbose format if verbose couldn't display - if not displayed: - for i, error in enumerate(item_errors): - format_validation_error( - error, - stderr, - metadata=metadata, - show_model_hint=(i == 0), - item_type=error_item_type, - show_item_type=is_heterogeneous, - structural_cache=structural_cache, - original_data=original_data, - show_feature_data=False, - ) - - -def handle_generic_error(e: Exception, filename: Path, error_type: str) -> None: - """Handle generic errors during validation. - - Args - ---- - e : Exception - Exception that occurred - filename : Path - Input filename or "-" for stdin - error_type : str - Type of error for user-friendly message - - Raises - ------ - click.UsageError - Always, with formatted error message - """ - source_name = get_source_name(filename) - - if error_type == "yaml": - raise click.UsageError(f"'{source_name}' contains invalid input: {e}") - elif error_type == "value": - raise click.UsageError(str(e)) - elif error_type == "key": - raise click.UsageError(f"Invalid data structure - missing key: {e}") - else: - raise click.UsageError(f"Error processing {source_name}: {e}") - - -@cli.command() -@click.argument("filename", type=click.Path(path_type=Path), required=True) -@click.option( - "--overture-types", - is_flag=True, - help="Validate against all official Overture types (excludes extensions)", -) -@click.option( - "--namespace", - help="Namespace to filter by (e.g., overture, annex)", -) -@click.option( - "--theme", - multiple=True, - help="Theme to validate against (shorthand for all types in theme)", -) -@click.option( - "--type", - "types", - multiple=True, - help="Specific type to validate against (e.g., building, segment)", -) -@click.option( - "--show-field", - "show_fields", - multiple=True, - help="Field to display alongside errors (e.g., id, version). Can be repeated.", -) -def validate( - filename: Path, - overture_types: bool, - namespace: str | None, - theme: tuple[str, ...], - types: tuple[str, ...], - show_fields: tuple[str, ...], -) -> None: - r"""Validate Overture Maps data against schemas. - - Read from FILENAME or stdin if FILENAME is '-'. - Supports JSON, YAML, and GeoJSON formats. - - \b - Examples: - # Validate a file - $ overture-schema validate data.json - \b - # Validate from stdin - $ overture-schema validate - < data.json - \b - # Validate only buildings - $ overture-schema validate --theme buildings data.json - \b - # Validate specific type - $ overture-schema validate --type building data.json - \b - # Official Overture types only - $ overture-schema validate --overture-types data.json - """ - # Resolve model type first (errors here are ValueErrors, not ValidationErrors) - try: - model_type = resolve_types(overture_types, namespace, theme, types) - except ValueError as e: - handle_generic_error(e, filename, "value") - return - - # Load input (errors here are YAMLErrors or ValueErrors, not ValidationErrors) - try: - data, source_name = load_input(filename) - except yaml.YAMLError as e: - handle_generic_error(e, filename, "yaml") - return - except KeyError as e: - handle_generic_error(e, filename, "key") - return - - # Perform validation (now model_type and data are guaranteed to be defined) - try: - perform_validation(data, model_type) - stdout.print(f"✓ Successfully validated {source_name}") - except ValidationError as e: - handle_validation_error( - e, model_type, stderr, original_data=data, show_fields=list(show_fields) - ) - sys.exit(1) + for ep in entry_points(group="overture.schema.cli"): + if ep.name in group.commands: + continue + try: + cmd = ep.load() + group.add_command(cmd, ep.name) + except Exception as e: + click.echo(f"Warning: failed to load plugin '{ep.name}': {e}", err=True) @cli.command("json-schema") @@ -759,7 +111,8 @@ def json_schema_command( $ overture-schema json-schema --overture-types """ try: - model_type = resolve_types(overture_types, namespace, theme, types) + effective_namespace = "overture" if overture_types else namespace + model_type = resolve_types(effective_namespace, theme, types) schema = json_schema(model_type) # Use plain print for JSON output to avoid Rich formatting print(json.dumps(schema, indent=2, sort_keys=True)) @@ -794,7 +147,6 @@ def dump_namespace( stdout.print() - # Add types to the tree sorted_types = sorted(theme_types[theme], key=lambda x: x[0].type) for key, model_class in sorted_types: stdout.print( @@ -820,40 +172,37 @@ def list_types() -> None: # List all types $ overture-schema list-types """ - try: - models = discover_models() - - # Group models by namespace and theme - namespaces: dict[ - str, dict[str | None, list[tuple[ModelKey, type[BaseModel]]]] - ] = {} - for key, model_class in models.items(): - if key.namespace not in namespaces: - namespaces[key.namespace] = {} - if key.theme not in namespaces[key.namespace]: - namespaces[key.namespace][key.theme] = [] - - namespaces[key.namespace][key.theme].append((key, model_class)) - - # display Overture themes first - if "overture" in namespaces: - stdout.print("[bold red]OVERTURE THEMES[/bold red]", justify="center") - stdout.print() + models = discover_models() - dump_namespace(namespaces["overture"]) + # Group models by namespace and theme + namespaces: dict[str, dict[str | None, list[tuple[ModelKey, type[BaseModel]]]]] = ( + defaultdict(lambda: defaultdict(list)) + ) + for key, model_class in models.items(): + namespaces[key.namespace][key.theme].append((key, model_class)) - stdout.print("[bold red]ADDITIONAL TYPES[/bold red]", justify="center") - stdout.print() + # Display Overture themes first + if "overture" in namespaces: + stdout.print("[bold red]OVERTURE THEMES[/bold red]", justify="center") + stdout.print() + + dump_namespace(namespaces["overture"]) + + stdout.print("[bold red]ADDITIONAL TYPES[/bold red]", justify="center") + stdout.print() + + for namespace in sorted(namespaces.keys()): + if namespace == "overture": + continue - for namespace in sorted(namespaces.keys()): - if namespace == "overture": - continue + stdout.print(f"[bold blue]{namespace.upper()}[/bold blue]") + dump_namespace(namespaces[namespace]) - stdout.print(f"[bold blue]{namespace.upper()}[/bold blue]") - dump_namespace(namespaces[namespace]) - except Exception as e: - click.echo(f"Error listing types: {e}", err=True) +# Load plugin subcommands from entry points. +# Built-in commands are already registered via @cli.command() decorators above, +# so load_plugins skips names that collide with built-ins. +load_plugins(cli) if __name__ == "__main__": diff --git a/packages/overture-schema-cli/src/overture/schema/cli/types.py b/packages/overture-schema-cli/src/overture/schema/cli/types.py deleted file mode 100644 index 1b5d4e44d..000000000 --- a/packages/overture-schema-cli/src/overture/schema/cli/types.py +++ /dev/null @@ -1,22 +0,0 @@ -"""Type aliases for CLI module.""" - -from typing import Any, TypeAlias - -from pydantic import BaseModel -from pydantic_core import ErrorDetails - -from overture.schema.core.discovery import ModelKey - -# Type alias for union types created from Pydantic models -# This represents either a single model or a discriminated union of models -UnionType: TypeAlias = type[BaseModel] | Any - -# Dictionary mapping ModelKey to Pydantic model classes -ModelDict: TypeAlias = dict[ModelKey, type[BaseModel]] - -# Pydantic validation error dictionary structure -# In Pydantic v2, ValidationError.errors() returns list[ErrorDetails] -ValidationErrorDict: TypeAlias = ErrorDetails - -# Error location tuple (mix of field names and list indices) -ErrorLocation: TypeAlias = tuple[str | int, ...] diff --git a/packages/overture-schema-cli/tests/conftest.py b/packages/overture-schema-cli/tests/conftest.py index 6702574c2..35430a70d 100644 --- a/packages/overture-schema-cli/tests/conftest.py +++ b/packages/overture-schema-cli/tests/conftest.py @@ -1,13 +1,9 @@ """Shared test fixtures for CLI tests.""" from collections.abc import Generator -from io import StringIO -from typing import Any -from unittest.mock import patch import pytest from click.testing import CliRunner -from rich.console import Console @pytest.fixture @@ -16,129 +12,3 @@ def cli_runner() -> Generator[CliRunner, None, None]: runner = CliRunner() with runner.isolated_filesystem(): yield runner - - -@pytest.fixture -def stderr_buffer() -> Generator[StringIO, None, None]: - """Provide a patched stderr buffer for capturing CLI error output.""" - buffer = StringIO() - captured_console = Console(file=buffer, force_terminal=False) - - with patch("overture.schema.cli.commands.stderr", captured_console): - yield buffer - - -@pytest.fixture -def building_feature_yaml_content() -> str: - """Return YAML content for a valid building feature.""" - return """ -id: test -type: Feature -geometry: - type: Polygon - coordinates: [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]] -properties: - theme: buildings - type: building - version: 0 -""" - - -@pytest.fixture -def building_feature_yaml( - cli_runner: CliRunner, building_feature_yaml_content: str -) -> str: - """Create a test.yaml file with valid building feature in isolated filesystem.""" - filename = "test.yaml" - with open(filename, "w") as f: - f.write(building_feature_yaml_content) - return filename - - -@pytest.fixture -def missing_id_yaml_content() -> str: - """Return YAML content with missing required field.""" - return """ -type: Feature -geometry: - type: Polygon - coordinates: [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]] -properties: - theme: buildings - type: building - version: 0 -""" - - -def build_feature( - id: str | None = "test", - theme: str | None = "buildings", - type: str = "building", - geometry_type: str = "Polygon", - coordinates: list | None = None, - version: int | None = 0, - geojson_format: bool = True, - **properties: Any, -) -> dict[str, Any]: - """Build a feature dictionary with the specified parameters. - - Args: - id: Feature ID (None to omit) - theme: Theme name (None to omit) - type: Feature type - geometry_type: Geometry type (Point, Polygon, etc.) - coordinates: Custom coordinates (None for sensible defaults) - version: Feature version (None to omit) - geojson_format: If True, use GeoJSON format; if False, use flat format - **properties: Additional properties to include - - Returns: - Feature dictionary in the requested format - """ - # Default coordinates based on geometry type - if coordinates is None: - if geometry_type == "Point": - coordinates = [0.0, 0.0] - elif geometry_type == "Polygon": - coordinates = [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]] - elif geometry_type == "LineString": - coordinates = [[0, 0], [1, 1]] - elif geometry_type == "MultiPolygon": - coordinates = [[[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]]] - else: - coordinates = [] - - geometry = {"type": geometry_type, "coordinates": coordinates} - - if geojson_format: - # GeoJSON format: properties nested under "properties" key - props: dict[str, Any] = { - "type": type, - **properties, - } - if theme is not None: - props["theme"] = theme - if version is not None: - props["version"] = version - - feature: dict[str, Any] = { - "type": "Feature", - "geometry": geometry, - "properties": props, - } - if id is not None: - feature["id"] = id - else: - # Flat format: properties at top level - # Build in the expected order: geometry, theme, type, version, id, properties - feature = {"geometry": geometry} - if theme is not None: - feature["theme"] = theme - feature["type"] = type - if version is not None: - feature["version"] = version - feature.update(properties) - if id is not None: - feature["id"] = id - - return feature diff --git a/packages/overture-schema-cli/tests/test_cli_commands.py b/packages/overture-schema-cli/tests/test_cli_commands.py index 7b6f3b42f..1202a2517 100644 --- a/packages/overture-schema-cli/tests/test_cli_commands.py +++ b/packages/overture-schema-cli/tests/test_cli_commands.py @@ -1,11 +1,8 @@ -"""Tests for CLI commands (validate, list-types, json-schema).""" +"""Tests for CLI commands (list-types, json-schema).""" import json -from io import StringIO -import pytest from click.testing import CliRunner -from conftest import build_feature from overture.schema.cli.commands import cli @@ -39,330 +36,3 @@ def test_json_schema_generates_valid_output(self, cli_runner: CliRunner) -> None # Should be valid JSON schema = json.loads(result.output) assert isinstance(schema, dict) - - -class TestValidateCommand: - """Tests for the validate command.""" - - def test_validate_success_message_from_file( - self, cli_runner: CliRunner, building_feature_yaml: str - ) -> None: - """Test that validation shows success message for valid file input.""" - result = cli_runner.invoke(cli, ["validate", building_feature_yaml]) - assert result.exit_code == 0 - assert "Successfully validated" in result.output - - def test_validate_flat_format_input(self, cli_runner: CliRunner) -> None: - """Test that validation works with flat (non-GeoJSON) format.""" - flat_feature = build_feature(geojson_format=False) - flat_json = json.dumps(flat_feature) - result = cli_runner.invoke( - cli, ["validate", "--theme", "buildings", "-"], input=flat_json - ) - assert result.exit_code == 0 - assert "Successfully validated " in result.output - - def test_validate_error_message_format( - self, - cli_runner: CliRunner, - missing_id_yaml_content: str, - stderr_buffer: StringIO, - ) -> None: - """Test that validation errors are formatted correctly.""" - result = cli_runner.invoke( - cli, ["validate", "-"], input=missing_id_yaml_content - ) - assert result.exit_code == 1 - - stderr_output = stderr_buffer.getvalue() - assert "Validation Failed" in stderr_output - # Should show the field path - assert "id" in stderr_output.lower() - - def test_validate_error_filters_tagged_union_from_path( - self, - cli_runner: CliRunner, - missing_id_yaml_content: str, - stderr_buffer: StringIO, - ) -> None: - """Test that validation error paths don't show internal tagged-union markers.""" - result = cli_runner.invoke( - cli, ["validate", "-"], input=missing_id_yaml_content - ) - assert result.exit_code == 1 - - stderr_output = stderr_buffer.getvalue() - - # Should NOT show Pydantic's internal union markers in the path - assert "tagged-union" not in stderr_output.lower() - assert "union[" not in stderr_output.lower() - - # Should show the actual field name - assert "id" in stderr_output.lower() - - def test_validate_error_with_invalid_type_value( - self, cli_runner: CliRunner - ) -> None: - """Test validation error for invalid type value.""" - invalid_feature = build_feature(type="invalid_type") - invalid_type_json = json.dumps(invalid_feature) - result = cli_runner.invoke(cli, ["validate", "-"], input=invalid_type_json) - assert result.exit_code == 1 - - def test_validate_error_with_nested_field(self, cli_runner: CliRunner) -> None: - """Test validation error message includes nested field path.""" - feature = build_feature( - names={ - "common": [ - {"value": "Test Building", "language": "invalid_language_code"} - ] - } - ) - nested_field_json = json.dumps(feature) - result = cli_runner.invoke(cli, ["validate", "-"], input=nested_field_json) - assert result.exit_code == 1 - - def test_validate_stdin_requires_dash_argument( - self, - cli_runner: CliRunner, - building_feature_yaml_content: str, - ) -> None: - """Test validating from stdin requires explicit '-' argument.""" - # With dash argument - should work - result = cli_runner.invoke( - cli, ["validate", "-"], input=building_feature_yaml_content - ) - assert result.exit_code == 0 - assert "Successfully validated " in result.output - - # Without dash argument - should show help/usage - result = cli_runner.invoke( - cli, ["validate"], input=building_feature_yaml_content - ) - assert result.exit_code == 2 # Usage error - assert "Missing argument" in result.output or "Usage:" in result.output - - @pytest.mark.parametrize( - "has_error,expected_exit_code,check_index", - [ - pytest.param(False, 0, False, id="success"), - pytest.param(True, 1, True, id="with_error"), - ], - ) - def test_validate_feature_list( - self, - cli_runner: CliRunner, - stderr_buffer: StringIO, - has_error: bool, - expected_exit_code: int, - check_index: bool, - ) -> None: - """Test validation of a list of features (success and error cases).""" - feature1 = build_feature(id="test1") - feature2_id = None if has_error else "test2" - feature2 = build_feature( - id=feature2_id, coordinates=[[[2, 2], [3, 2], [3, 3], [2, 3], [2, 2]]] - ) - feature_list_json = json.dumps([feature1, feature2]) - result = cli_runner.invoke(cli, ["validate", "-"], input=feature_list_json) - assert result.exit_code == expected_exit_code - - if check_index: - stderr_output = stderr_buffer.getvalue() - # Should show list index for the second feature - assert "[1]" in stderr_output or "1" in stderr_output - else: - assert "Successfully validated " in result.output - - @pytest.mark.parametrize( - "first_feature_valid,second_feature_valid,expected_exit_code", - [ - pytest.param(True, True, 0, id="both_valid"), - pytest.param(True, False, 1, id="second_invalid"), - pytest.param(False, False, 1, id="both_invalid"), - ], - ) - def test_validate_feature_collection( - self, - cli_runner: CliRunner, - stderr_buffer: StringIO, - first_feature_valid: bool, - second_feature_valid: bool, - expected_exit_code: int, - ) -> None: - """Test validation of a GeoJSON FeatureCollection with various validity states.""" - feature1 = build_feature(id="test1" if first_feature_valid else None) - feature2 = build_feature( - id="test2" if second_feature_valid else None, - coordinates=[[[2, 2], [3, 2], [3, 3], [2, 3], [2, 2]]], - ) - feature_collection = { - "type": "FeatureCollection", - "features": [feature1, feature2], - } - result = cli_runner.invoke( - cli, ["validate", "-"], input=json.dumps(feature_collection) - ) - assert result.exit_code == expected_exit_code - - if expected_exit_code == 0: - assert "Successfully validated " in result.output - else: - stderr_output = stderr_buffer.getvalue() - # Should show errors for list items - if not first_feature_valid or not second_feature_valid: - assert "[0]" in stderr_output or "[1]" in stderr_output - - def test_validate_with_nonexistent_filters_raises_error( - self, - cli_runner: CliRunner, - building_feature_yaml_content: str, - ) -> None: - """Test that validation with filters matching no models raises a clear error.""" - # Try to validate with a nonexistent theme - result = cli_runner.invoke( - cli, - ["validate", "--theme", "nonexistent_theme", "-"], - input=building_feature_yaml_content, - ) - # UsageError exits with code 2 - assert result.exit_code == 2 - assert "No models found matching the specified criteria" in result.output - - def test_validate_with_nonexistent_type_raises_error( - self, - cli_runner: CliRunner, - building_feature_yaml_content: str, - ) -> None: - """Test that validation with nonexistent type raises a clear error.""" - # Try to validate with a nonexistent type - result = cli_runner.invoke( - cli, - ["validate", "--type", "nonexistent_type", "-"], - input=building_feature_yaml_content, - ) - # UsageError exits with code 2 - assert result.exit_code == 2 - assert "No models found matching the specified criteria" in result.output - - def test_validate_with_valid_theme_invalid_type_raises_error( - self, - cli_runner: CliRunner, - building_feature_yaml_content: str, - ) -> None: - """Test that validation with valid theme but invalid type raises an error.""" - # Try to validate buildings theme with a type that doesn't exist in that theme - result = cli_runner.invoke( - cli, - ["validate", "--theme", "buildings", "--type", "segment", "-"], - input=building_feature_yaml_content, - ) - # UsageError exits with code 2 - assert result.exit_code == 2 - assert "No models found matching the specified criteria" in result.output - - -class TestShowFieldOption: - """Tests for the --show-field option in validate command.""" - - def test_show_field_displays_in_header_on_error( - self, cli_runner: CliRunner, stderr_buffer: StringIO - ) -> None: - """Test that --show-field displays field value in error header.""" - # Create invalid feature with missing required field, but with id - feature = build_feature(id="abc123", version=None) - result = cli_runner.invoke( - cli, ["validate", "--show-field", "id", "-"], input=json.dumps(feature) - ) - assert result.exit_code == 1 - - stderr_output = stderr_buffer.getvalue() - # Header should include id value - assert "id=abc123" in stderr_output - - def test_show_field_displays_in_context( - self, cli_runner: CliRunner, stderr_buffer: StringIO - ) -> None: - """Test that --show-field pins field in context display.""" - # Create invalid feature with error far from id field - feature = build_feature(id="test123", version=None) - result = cli_runner.invoke( - cli, ["validate", "--show-field", "id", "-"], input=json.dumps(feature) - ) - assert result.exit_code == 1 - - stderr_output = stderr_buffer.getvalue() - # Should show id field in context even if error is elsewhere - assert "id" in stderr_output - assert "test123" in stderr_output - - def test_show_multiple_fields( - self, cli_runner: CliRunner, stderr_buffer: StringIO - ) -> None: - """Test that multiple --show-field options work together.""" - feature = build_feature(id="xyz789", version=1, theme=None) - result = cli_runner.invoke( - cli, - ["validate", "--show-field", "id", "--show-field", "version", "-"], - input=json.dumps(feature), - ) - assert result.exit_code == 1 - - stderr_output = stderr_buffer.getvalue() - # Header should include both field values - assert "id=xyz789" in stderr_output - assert "version=1" in stderr_output - - def test_show_field_with_missing_field( - self, cli_runner: CliRunner, stderr_buffer: StringIO - ) -> None: - """Test that --show-field shows for non-existent fields.""" - feature = build_feature(version=None) - # Don't include 'custom_field' in the feature - result = cli_runner.invoke( - cli, - ["validate", "--show-field", "custom_field", "-"], - input=json.dumps(feature), - ) - assert result.exit_code == 1 - - stderr_output = stderr_buffer.getvalue() - # Should show for the non-existent field - assert "custom_field" in stderr_output - assert "" in stderr_output - - def test_show_field_truncates_long_values( - self, cli_runner: CliRunner, stderr_buffer: StringIO - ) -> None: - """Test that long field values are truncated in header.""" - long_id = "x" * 100 # Very long ID - feature = build_feature(id=long_id, version=None) - result = cli_runner.invoke( - cli, ["validate", "--show-field", "id", "-"], input=json.dumps(feature) - ) - assert result.exit_code == 1 - - stderr_output = stderr_buffer.getvalue() - # Should show truncated value in header (with ellipsis) - assert "id=" in stderr_output - assert "..." in stderr_output - # Should not show the full 100 character string - assert long_id not in stderr_output - - def test_show_field_in_collection( - self, cli_runner: CliRunner, stderr_buffer: StringIO - ) -> None: - """Test that --show-field works with feature collections.""" - feature1 = build_feature(id="first", version=None) - feature2 = build_feature(id="second", theme=None) - result = cli_runner.invoke( - cli, - ["validate", "--show-field", "id", "-"], - input=json.dumps([feature1, feature2]), - ) - assert result.exit_code == 1 - - stderr_output = stderr_buffer.getvalue() - # Should show id for both features - assert "id=first" in stderr_output or "first" in stderr_output - assert "id=second" in stderr_output or "second" in stderr_output diff --git a/packages/overture-schema-cli/tests/test_plugin_loading.py b/packages/overture-schema-cli/tests/test_plugin_loading.py new file mode 100644 index 000000000..3e7fe85df --- /dev/null +++ b/packages/overture-schema-cli/tests/test_plugin_loading.py @@ -0,0 +1,110 @@ +"""Tests for CLI plugin loading via entry points.""" + +from unittest.mock import MagicMock, patch + +import click +import pytest +from click.testing import CliRunner +from overture.schema.cli.commands import cli, load_plugins + + +class TestLoadPlugins: + """Tests for the load_plugins function.""" + + def test_discovers_and_registers_entry_points(self) -> None: + """Plugins from entry points are registered as subcommands.""" + test_cmd = click.Command("test-plugin", callback=lambda: None) + mock_ep = MagicMock() + mock_ep.name = "test-plugin" + mock_ep.load.return_value = test_cmd + + group = click.Group() + with patch("overture.schema.cli.commands.entry_points", return_value=[mock_ep]): + load_plugins(group) + + assert "test-plugin" in group.commands + + def test_broken_plugin_warns_and_skips(self, capsys: pytest.CaptureFixture) -> None: + """Broken entry points emit a warning and don't prevent CLI startup.""" + mock_ep = MagicMock() + mock_ep.name = "broken-plugin" + mock_ep.load.side_effect = ImportError("missing dependency") + + group = click.Group() + with patch("overture.schema.cli.commands.entry_points", return_value=[mock_ep]): + load_plugins(group) + + assert "broken-plugin" not in group.commands + captured = capsys.readouterr() + assert "broken-plugin" in captured.err + assert "missing dependency" in captured.err + + def test_plugin_does_not_clobber_builtin(self) -> None: + """Plugin names that collide with built-ins don't replace the built-in.""" + builtin_cmd = click.Command( + "list-types", callback=lambda: click.echo("builtin") + ) + plugin_cmd = click.Command("list-types", callback=lambda: click.echo("plugin")) + + group = click.Group() + group.add_command(builtin_cmd, "list-types") + + mock_ep = MagicMock() + mock_ep.name = "list-types" + mock_ep.load.return_value = plugin_cmd + + with patch("overture.schema.cli.commands.entry_points", return_value=[mock_ep]): + load_plugins(group) + + assert group.commands["list-types"] is builtin_cmd + + +class TestPluginLoadingIntegration: + """Integration tests for plugin loading with the actual CLI group.""" + + def test_builtin_commands_work_without_plugins(self) -> None: + """list-types and json-schema work when no plugins are installed.""" + runner = CliRunner() + result = runner.invoke(cli, ["list-types"]) + assert result.exit_code == 0 + + def test_help_lists_builtin_commands(self) -> None: + """--help shows built-in commands.""" + runner = CliRunner() + result = runner.invoke(cli, ["--help"]) + assert result.exit_code == 0 + assert "list-types" in result.output + assert "json-schema" in result.output + + +class TestPluginLoadingWithValidation: + """Integration tests with overture-schema-validation installed.""" + + def test_validate_appears_in_help(self) -> None: + """validate subcommand appears in --help when validation package is installed.""" + runner = CliRunner() + result = runner.invoke(cli, ["--help"]) + assert result.exit_code == 0 + assert "validate" in result.output + + def test_validate_works_end_to_end(self) -> None: + """overture-schema validate works end-to-end via plugin loading.""" + import json + + feature = { + "id": "test", + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]], + }, + "properties": {"theme": "buildings", "type": "building", "version": 0}, + } + + runner = CliRunner() + with runner.isolated_filesystem(): + with open("test.json", "w") as f: + json.dump(feature, f) + result = runner.invoke(cli, ["validate", "test.json"]) + assert result.exit_code == 0 + assert "Successfully validated" in result.output diff --git a/packages/overture-schema-cli/tests/test_resolve_types.py b/packages/overture-schema-cli/tests/test_resolve_types.py deleted file mode 100644 index 2fa226802..000000000 --- a/packages/overture-schema-cli/tests/test_resolve_types.py +++ /dev/null @@ -1,169 +0,0 @@ -"""Parametrized tests for resolve_types function.""" - -import pytest -from overture.schema.cli.commands import resolve_types - - -class TestResolveTypes: - """Tests for the resolve_types function with various filter combinations.""" - - @pytest.mark.parametrize( - "overture_types,namespace,theme_names,type_names,should_succeed", - [ - # Test --overture-types flag - pytest.param(True, None, (), (), True, id="overture_types_only"), - pytest.param(False, "overture", (), (), True, id="overture_namespace"), - # Test theme filtering - pytest.param(False, None, ("buildings",), (), True, id="theme_buildings"), - pytest.param( - False, None, ("transportation",), (), True, id="theme_transportation" - ), - pytest.param( - False, None, ("buildings", "places"), (), True, id="multiple_themes" - ), - pytest.param(False, None, ("nonexistent",), (), False, id="invalid_theme"), - # Test type filtering - pytest.param(False, None, (), ("building",), True, id="type_building"), - pytest.param(False, None, (), ("segment",), True, id="type_segment"), - pytest.param( - False, None, (), ("building", "place"), True, id="multiple_types" - ), - pytest.param(False, None, (), ("nonexistent",), False, id="invalid_type"), - # Test combined theme + type filtering - pytest.param( - False, - None, - ("buildings",), - ("building",), - True, - id="theme_and_type_match", - ), - pytest.param( - False, - None, - ("buildings",), - ("segment",), - False, - id="theme_and_type_mismatch", - ), - pytest.param( - False, - None, - ("transportation",), - ("segment", "connector"), - True, - id="theme_with_multiple_types", - ), - # Test namespace combined with theme/type - pytest.param( - False, - "overture", - ("buildings",), - (), - True, - id="namespace_with_theme", - ), - pytest.param( - False, - "overture", - (), - ("building",), - True, - id="namespace_with_type", - ), - pytest.param( - False, - "overture", - ("buildings",), - ("building",), - True, - id="namespace_with_theme_and_type", - ), - # Test no filters (all models) - pytest.param(False, None, (), (), True, id="no_filters_all_models"), - ], - ) - def test_resolve_types_combinations( - self, - overture_types: bool, - namespace: str | None, - theme_names: tuple[str, ...], - type_names: tuple[str, ...], - should_succeed: bool, - ) -> None: - """Test resolve_types with various filter combinations.""" - if should_succeed: - model_type = resolve_types( - overture_types, namespace, theme_names, type_names - ) - assert model_type is not None - else: - with pytest.raises(ValueError, match="No models found"): - resolve_types(overture_types, namespace, theme_names, type_names) - - @pytest.mark.parametrize( - "namespace,expected_themes", - [ - pytest.param( - "overture", - { - "buildings", - "places", - "transportation", - "base", - "divisions", - "addresses", - }, - id="overture_namespace", - ), - ], - ) - def test_resolve_types_returns_expected_themes( - self, - namespace: str, - expected_themes: set[str], - ) -> None: - """Test that resolve_types returns models from expected themes.""" - from overture.schema.core.discovery import discover_models - - models = discover_models(namespace=namespace) - actual_themes = {key.theme for key in models.keys()} - - # Check that we have at least the expected themes (may have more) - assert expected_themes.issubset(actual_themes), ( - f"Missing expected themes. Expected {expected_themes}, got {actual_themes}" - ) - - -class TestResolveTypesEdgeCases: - """Tests for edge cases in resolve_types.""" - - def test_resolve_types_case_sensitive(self) -> None: - """Test that theme and type names are case-sensitive.""" - # Lowercase should work - model_type = resolve_types(False, None, ("buildings",), ()) - assert model_type is not None - - # Uppercase should fail (themes are lowercase in registry) - with pytest.raises(ValueError, match="No models found"): - resolve_types(False, None, ("BUILDINGS",), ()) - - def test_resolve_types_empty_result_error_message(self) -> None: - """Test that a helpful error message is shown when no models match.""" - with pytest.raises(ValueError) as exc_info: - resolve_types(False, None, ("nonexistent",), ("also_fake",)) - - assert "No models found" in str(exc_info.value) - - def test_resolve_types_namespace_isolation(self) -> None: - """Test that namespace filtering properly isolates models.""" - # Get all models (no namespace filter) - all_models_type = resolve_types(False, None, (), ()) - assert all_models_type is not None - - # Get only overture namespace - overture_type = resolve_types(False, "overture", (), ()) - assert overture_type is not None - - # Both should work, but they represent different sets of models - # (This test primarily ensures no exceptions are raised) diff --git a/packages/overture-schema-core/src/overture/schema/core/__init__.py b/packages/overture-schema-core/src/overture/schema/core/__init__.py index 735c794c7..7f64ca884 100644 --- a/packages/overture-schema-core/src/overture/schema/core/__init__.py +++ b/packages/overture-schema-core/src/overture/schema/core/__init__.py @@ -1,15 +1,30 @@ from . import cartography, names, scoping, sources +from .discovery import ( + ModelDict, + ModelKey, + discover_models, + filter_models, + resolve_types, +) from .models import OvertureFeature, ThemeT, TypeT from .scoping import Scope, scoped +from .union import UnionType, create_union_type_from_models __all__ = [ "cartography", + "create_union_type_from_models", + "discover_models", + "filter_models", + "ModelDict", + "ModelKey", "names", "OvertureFeature", + "resolve_types", "Scope", "scoped", "scoping", "sources", "ThemeT", "TypeT", + "UnionType", ] diff --git a/packages/overture-schema-core/src/overture/schema/core/discovery.py b/packages/overture-schema-core/src/overture/schema/core/discovery.py index 15da3abc4..d49aa1f36 100644 --- a/packages/overture-schema-core/src/overture/schema/core/discovery.py +++ b/packages/overture-schema-core/src/overture/schema/core/discovery.py @@ -1,8 +1,14 @@ """Model discovery system for Overture schema registry.""" +from __future__ import annotations + import importlib.metadata import logging from dataclasses import dataclass +from typing import TYPE_CHECKING, TypeAlias + +if TYPE_CHECKING: + from .union import UnionType from pydantic import BaseModel @@ -32,9 +38,12 @@ class ModelKey: class_name: str +ModelDict: TypeAlias = dict[ModelKey, type[BaseModel]] + + def discover_models( namespace: str | None = None, -) -> dict[ModelKey, type[BaseModel]]: +) -> ModelDict: """Discover all registered Overture models via entry points. Parameters @@ -99,36 +108,83 @@ def discover_models( return models -def get_registered_model( - namespace: str, feature_type: str, theme: str | None = None -) -> type[BaseModel] | None: - """Get the Pydantic model for a namespace/theme/type combination. +def filter_models( + namespace: str | None = None, + theme_names: tuple[str, ...] = (), + type_names: tuple[str, ...] = (), +) -> ModelDict: + """Filter discovered models by namespace, theme, and type. + + Parameters + ---------- + namespace : str | None + Namespace to filter by (e.g., "overture", "annex"). + theme_names : tuple[str, ...] + Theme names to filter by. + type_names : tuple[str, ...] + Type names to filter by. + + Returns + ------- + ModelDict + Filtered models matching the criteria. + + Raises + ------ + ValueError + If no models match the specified criteria. - This uses setuptools entry points for registration. + """ + all_models = discover_models(namespace=namespace) + + filtered = { + k: v + for k, v in all_models.items() + if (not theme_names or k.theme in theme_names) + and (not type_names or k.type in type_names) + } + + if not filtered: + parts = [] + if namespace: + parts.append(f"namespace={namespace!r}") + if theme_names: + parts.append(f"themes={theme_names!r}") + if type_names: + parts.append(f"types={type_names!r}") + criteria = ", ".join(parts) if parts else "no filters" + raise ValueError(f"No models found matching {criteria}") + + return filtered + + +def resolve_types( + namespace: str | None = None, + theme_names: tuple[str, ...] = (), + type_names: tuple[str, ...] = (), +) -> UnionType: + """Filter models and build a union type for validation or schema generation. + + Convenience function combining filter_models() and + create_union_type_from_models(). Parameters ---------- - namespace : str - The namespace (e.g., "overture", "annex") - feature_type : str - The type name - theme : str | None, optional - The theme name (optional) + namespace : str | None + Namespace to filter by (e.g., "overture", "annex"). + theme_names : tuple[str, ...] + Theme names to filter by. + type_names : tuple[str, ...] + Type names to filter by. Returns ------- - type[BaseModel] | None - The model class if found, None otherwise. + UnionType + Union type suitable for TypeAdapter """ - # Check all discovered models for a match - models = discover_models(namespace=namespace) - # Need to find by namespace/theme/type, not exact key match - for key, model_class in models.items(): - if ( - key.namespace == namespace - and key.theme == theme - and key.type == feature_type - ): - return model_class - return None + # Deferred to keep discovery independent of union at import time + from .union import create_union_type_from_models + + filtered = filter_models(namespace, theme_names, type_names) + return create_union_type_from_models(filtered) diff --git a/packages/overture-schema-core/src/overture/schema/core/union.py b/packages/overture-schema-core/src/overture/schema/core/union.py new file mode 100644 index 000000000..bc5c519ac --- /dev/null +++ b/packages/overture-schema-core/src/overture/schema/core/union.py @@ -0,0 +1,124 @@ +"""Union type construction for model validation.""" + +from functools import reduce +from operator import or_ +from typing import Annotated, Any, Literal, TypeAlias, cast, get_args, get_origin + +from pydantic import BaseModel, Field, Tag + +from overture.schema.system.feature import Feature + +from .discovery import ModelDict +from .models import OvertureFeature + +UnionType: TypeAlias = type[BaseModel] | Any + + +def _can_discriminate(model_class: object) -> bool: + """Check if a model can participate in a discriminated union. + + Returns True if the model is an OvertureFeature with a single literal 'type' value. + """ + if not (isinstance(model_class, type) and issubclass(model_class, OvertureFeature)): + return False + + return _type_literal(cast(type[OvertureFeature], model_class)) is not None + + +def _type_literal(feature_class: type[OvertureFeature]) -> str | None: + """Extract the literal value from an OvertureFeature's 'type' field. + + Returns the literal type value, or None if not a single literal. + """ + if "type" not in feature_class.model_fields: + return None + + type_annotation = feature_class.model_fields["type"].annotation + + # Unwrap Annotated if present + while get_origin(type_annotation) is Annotated: + type_annotation = get_args(type_annotation)[0] + + # Check if it's a Literal with a single value + if get_origin(type_annotation) is Literal: + args = get_args(type_annotation) + if len(args) == 1 and isinstance(args[0], str): + return args[0] + + return None + + +def _discriminated_union(feature_classes: tuple[type[OvertureFeature], ...]) -> Any: # noqa: ANN401 + """Create a discriminated union of Overture features on the 'type' field.""" + if not feature_classes: + return None + elif len(feature_classes) == 1: + # Single model doesn't need a discriminated union + return feature_classes[0] + + def _tag(f: type[OvertureFeature]) -> str: + literal = _type_literal(f) + assert literal is not None, ( + f"{f.__name__} passed _can_discriminate but has no type literal" + ) + return literal + + return Annotated[ + reduce( + or_, + (Annotated[f, Tag(_tag(f))] for f in feature_classes), + ), + Field(discriminator=Feature.field_discriminator("type", *feature_classes)), + ] + + +def create_union_type_from_models( + models: ModelDict, +) -> UnionType: + """Create a union type from a dict of models. + + Uses discriminated unions for OvertureFeatures when possible for better performance. + + Parameters + ---------- + models : ModelDict + Dict mapping ModelKey to Pydantic model classes + + Returns + ------- + UnionType + Union type suitable for TypeAdapter + + Raises + ------ + ValueError + If no models are provided + + """ + if not models: + raise ValueError("No models provided") + + # Partition models in a single pass + discriminated_list: list[type[OvertureFeature]] = [] + non_discriminated_list: list[type[BaseModel]] = [] + for m in models.values(): + if _can_discriminate(m): + discriminated_list.append(cast(type[OvertureFeature], m)) + else: + non_discriminated_list.append(m) + + discriminated_union = _discriminated_union(tuple(discriminated_list)) + non_discriminated_union = ( + reduce(or_, non_discriminated_list) if non_discriminated_list else None + ) + + # Combine discriminated and non-discriminated unions. + # At least one is non-None because models is verified non-empty above + # and every model lands in exactly one partition. + assert discriminated_union is not None or non_discriminated_union is not None + if discriminated_union is not None and non_discriminated_union is not None: + return discriminated_union | non_discriminated_union + elif discriminated_union is not None: + return discriminated_union + else: + return non_discriminated_union # type: ignore[return-value] diff --git a/packages/overture-schema-core/tests/test_filter_models.py b/packages/overture-schema-core/tests/test_filter_models.py new file mode 100644 index 000000000..9457291e9 --- /dev/null +++ b/packages/overture-schema-core/tests/test_filter_models.py @@ -0,0 +1,66 @@ +"""Tests for model filtering.""" + +import pytest +from overture.schema.core.discovery import ModelKey, filter_models + + +class TestFilterModels: + """Tests for filter_models function.""" + + @pytest.mark.parametrize( + "namespace,theme_names,type_names,should_succeed", + [ + pytest.param("overture", (), (), True, id="overture_namespace"), + pytest.param(None, ("buildings",), (), True, id="theme_buildings"), + pytest.param( + None, ("transportation",), (), True, id="theme_transportation" + ), + pytest.param(None, ("buildings", "places"), (), True, id="multiple_themes"), + pytest.param(None, ("nonexistent",), (), False, id="invalid_theme"), + pytest.param(None, (), ("building",), True, id="type_building"), + pytest.param(None, (), ("segment",), True, id="type_segment"), + pytest.param(None, (), ("building", "place"), True, id="multiple_types"), + pytest.param(None, (), ("nonexistent",), False, id="invalid_type"), + pytest.param( + None, + ("buildings",), + ("building",), + True, + id="theme_and_type_match", + ), + pytest.param( + None, + ("buildings",), + ("segment",), + False, + id="theme_and_type_mismatch", + ), + pytest.param(None, (), (), True, id="no_filters_all_models"), + ], + ) + def test_filter_models_combinations( + self, + namespace: str | None, + theme_names: tuple[str, ...], + type_names: tuple[str, ...], + should_succeed: bool, + ) -> None: + if should_succeed: + result = filter_models(namespace, theme_names, type_names) + assert len(result) > 0 + assert all(isinstance(k, ModelKey) for k in result) + else: + with pytest.raises(ValueError, match="No models found"): + filter_models(namespace, theme_names, type_names) + + def test_case_sensitive(self) -> None: + result = filter_models(theme_names=("buildings",)) + assert len(result) > 0 + + with pytest.raises(ValueError, match="No models found"): + filter_models(theme_names=("BUILDINGS",)) + + def test_namespace_isolation(self) -> None: + all_models = filter_models() + overture_models = filter_models(namespace="overture") + assert len(all_models) >= len(overture_models) diff --git a/packages/overture-schema-core/tests/test_union.py b/packages/overture-schema-core/tests/test_union.py new file mode 100644 index 000000000..401b34034 --- /dev/null +++ b/packages/overture-schema-core/tests/test_union.py @@ -0,0 +1,29 @@ +"""Tests for union type construction.""" + +import pytest +from overture.schema.core.discovery import filter_models +from overture.schema.core.union import create_union_type_from_models +from pydantic import BaseModel + + +class TestCreateUnionType: + """Tests for create_union_type_from_models.""" + + def test_creates_type_from_single_theme(self) -> None: + models = filter_models(theme_names=("buildings",)) + result = create_union_type_from_models(models) + assert result is not None + + def test_creates_type_from_multiple_themes(self) -> None: + models = filter_models(theme_names=("buildings", "places")) + result = create_union_type_from_models(models) + assert result is not None + + def test_raises_on_empty_models(self) -> None: + with pytest.raises(ValueError, match="No models provided"): + create_union_type_from_models({}) + + def test_single_model_returns_class_directly(self) -> None: + models = filter_models(theme_names=("buildings",), type_names=("building",)) + result = create_union_type_from_models(models) + assert isinstance(result, type) and issubclass(result, BaseModel) diff --git a/packages/overture-schema-validation/README.md b/packages/overture-schema-validation/README.md new file mode 100644 index 000000000..e660868a8 --- /dev/null +++ b/packages/overture-schema-validation/README.md @@ -0,0 +1,21 @@ +# overture-schema-validation + +Validation command for the Overture Maps schema CLI. + +This package provides the `validate` subcommand for the `overture-schema` CLI tool. + +## Installation + +This package is automatically loaded by `overture-schema-cli` via entry points. + +```bash +pip install overture-schema-cli overture-schema-validation +``` + +## Usage + +```bash +overture-schema validate +``` + +See the main [overture-schema](https://github.com/OvertureMaps/schema) documentation for more details. diff --git a/packages/overture-schema-validation/pyproject.toml b/packages/overture-schema-validation/pyproject.toml new file mode 100644 index 000000000..ddc7cdba4 --- /dev/null +++ b/packages/overture-schema-validation/pyproject.toml @@ -0,0 +1,38 @@ +[project] +dependencies = [ + "overture-schema-core", + "pydantic>=2.0", + "pyyaml>=6.0.2", + "click>=8.0", + "rich>=13.0", + "yamlcore>=0.0.4", +] +description = "Validation command for Overture Maps schema CLI" +dynamic = ["version"] +license = "MIT" +name = "overture-schema-validation" +readme = "README.md" +requires-python = ">=3.10" + +[tool.uv.sources] +overture-schema-core = { workspace = true } + +[build-system] +build-backend = "hatchling.build" +requires = ["hatchling"] + +[dependency-groups] +dev = [ + "pytest>=7.0", + "ruff", + "mypy", +] + +[tool.hatch.version] +path = "src/overture/schema/validation/__about__.py" + +[tool.hatch.build.targets.wheel] +packages = ["src/overture"] + +[project.entry-points."overture.schema.cli"] +validate = "overture.schema.validation.commands:validate" diff --git a/packages/overture-schema-validation/src/overture/__init__.py b/packages/overture-schema-validation/src/overture/__init__.py new file mode 100644 index 000000000..8db66d3d0 --- /dev/null +++ b/packages/overture-schema-validation/src/overture/__init__.py @@ -0,0 +1 @@ +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/packages/overture-schema-validation/src/overture/schema/__init__.py b/packages/overture-schema-validation/src/overture/schema/__init__.py new file mode 100644 index 000000000..8db66d3d0 --- /dev/null +++ b/packages/overture-schema-validation/src/overture/schema/__init__.py @@ -0,0 +1 @@ +__path__ = __import__("pkgutil").extend_path(__path__, __name__) diff --git a/packages/overture-schema-validation/src/overture/schema/validation/__about__.py b/packages/overture-schema-validation/src/overture/schema/validation/__about__.py new file mode 100644 index 000000000..3dc1f76bc --- /dev/null +++ b/packages/overture-schema-validation/src/overture/schema/validation/__about__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/packages/overture-schema-validation/src/overture/schema/validation/__init__.py b/packages/overture-schema-validation/src/overture/schema/validation/__init__.py new file mode 100644 index 000000000..f37c15290 --- /dev/null +++ b/packages/overture-schema-validation/src/overture/schema/validation/__init__.py @@ -0,0 +1 @@ +"""Validation subcommand for the Overture Schema CLI.""" diff --git a/packages/overture-schema-validation/src/overture/schema/validation/commands.py b/packages/overture-schema-validation/src/overture/schema/validation/commands.py new file mode 100644 index 000000000..84c5ab1e6 --- /dev/null +++ b/packages/overture-schema-validation/src/overture/schema/validation/commands.py @@ -0,0 +1,517 @@ +"""Validation command for overture-schema CLI.""" + +import io +import json +import sys +from collections import Counter, defaultdict +from pathlib import Path +from typing import NoReturn, cast + +import click +import yaml +from pydantic import BaseModel, TypeAdapter, ValidationError +from rich.console import Console +from yamlcore import CoreLoader # type: ignore + +from overture.schema.core.discovery import resolve_types +from overture.schema.core.union import UnionType + +from .error_formatting import ( + format_validation_error, + format_validation_errors_verbose, + group_errors_by_discriminator, + select_most_likely_errors, +) +from .type_analysis import StructuralTuple, get_item_index, introspect_union +from .types import ErrorLocation + +stdout = Console(highlight=False) +stderr = Console(highlight=False, file=sys.stderr) + + +def _is_geojson_feature(data: dict) -> bool: + """Check if data is in GeoJSON Feature format.""" + return data.get("type") == "Feature" and "properties" in data + + +def validate_feature(data: dict, model_type: UnionType) -> BaseModel: + """Validate a single feature against the model type. + + Args + ---- + data: Feature data to validate (GeoJSON or flat format) + model_type: Union type for validation + + Returns + ------- + Validated model instance + + Raises + ------ + ValidationError: If validation fails + """ + adapter = TypeAdapter(model_type) + if isinstance(data, dict) and _is_geojson_feature(data): + # Use validate_json to trigger the model's GeoJSON handling + return cast(BaseModel, adapter.validate_json(json.dumps(data))) + return cast(BaseModel, adapter.validate_python(data)) + + +def validate_features(data: list, model_type: UnionType) -> list[BaseModel]: + """Validate a list of features against the model type. + + Args + ---- + data: List of feature data to validate (GeoJSON or flat format) + model_type: Union type for validation + + Returns + ------- + List of validated model instances + + Raises + ------ + ValidationError: If validation fails + """ + # Check if any items are GeoJSON features + has_geojson = any( + isinstance(item, dict) and _is_geojson_feature(item) for item in data + ) + + list_type = list[model_type] # type: ignore[misc,valid-type] + adapter = TypeAdapter(list_type) + + if has_geojson: + # Use validate_json to trigger the model's GeoJSON handling + return cast(list[BaseModel], adapter.validate_json(json.dumps(data))) + return cast(list[BaseModel], adapter.validate_python(data)) + + +def get_source_name(filename: Path) -> str: + """Get display name for input source. + + Args + ---- + filename: Path to input file or "-" for stdin + + Returns + ------- + Display name: "" for stdin input, otherwise the filename + """ + return "" if str(filename) == "-" else str(filename) + + +def load_input(filename: Path) -> tuple[dict | list, str]: + """Load and parse input from file or stdin. + + Args + ---- + filename: Path to input file, or "-" for stdin + + Returns + ------- + Tuple of (parsed_data, source_name) + + Raises + ------ + yaml.YAMLError: If input is invalid YAML/JSON + SystemExit: If filename doesn't exist or isn't a file + """ + if str(filename) == "-": + # Read all stdin content + content = sys.stdin.read() + + # Try to detect JSONL format (newline-delimited JSON) + # JSONL has multiple non-empty lines, each containing a complete JSON object + lines = [line.strip() for line in content.strip().split("\n") if line.strip()] + + if len(lines) > 1: + # Attempt to parse as JSONL + try: + parsed_lines = [json.loads(line) for line in lines] + return parsed_lines, "" + except json.JSONDecodeError: + # Not valid JSONL, fall through to YAML parser + pass + + # Parse as single YAML/JSON document + data = yaml.load(io.StringIO(content), Loader=CoreLoader) + return data, "" + + if not filename.is_file(): + raise click.UsageError(f"'{filename}' is not a file.") + + # Warn about unexpected file extensions + if filename.suffix not in {".json", ".yaml", ".yml", ".geojson"}: + click.echo( + f"Warning: File '{filename}' has unexpected extension. " + f"Expecting .json, .yaml, .yml, or .geojson", + err=True, + ) + + # Use YAML-1.2-compliant loader (YAML-1.2 dropped support for yes/no boolean values) + with filename.open("r", encoding="utf-8") as f: + data = yaml.load(f, Loader=CoreLoader) + + return data, str(filename) + + +def perform_validation(data: dict | list, model_type: UnionType) -> None: + """Validate data based on its structure. + + Automatically detects and handles three input formats: + - Single feature (dict) + - List of features (list) + - GeoJSON FeatureCollection (dict with type="FeatureCollection") + + Args + ---- + data : dict | list + Parsed data to validate + model_type : UnionType + Union type for validation + + Raises + ------ + ValidationError + If validation fails + """ + if isinstance(data, list): + # List of features + validate_features(data, model_type) + elif isinstance(data, dict) and data.get("type") == "FeatureCollection": + # GeoJSON FeatureCollection + validate_features(data["features"], model_type) + else: + # Single feature + validate_feature(data, model_type) + + +def compute_collection_statistics( + item_types: dict[int, type[BaseModel] | None], + filtered_errors: list, +) -> tuple[ + int, + Counter[type[BaseModel] | None], + dict[type[BaseModel], set[int]], +]: + """Compute validation statistics for heterogeneous collections. + + Args + ---- + item_types : dict[int, type[BaseModel] | None] + Mapping from item index to detected model type + filtered_errors : list + List of filtered validation errors + + Returns + ------- + tuple + Tuple of (items_without_errors, type_counts, items_with_errors_by_type) + """ + # Compute statistics: group items by type + type_counts: Counter[type[BaseModel] | None] = Counter(item_types.values()) + + # Determine total number of items (max index + 1, or count from data) + max_index = max(item_types.keys()) if item_types else -1 + total_items = max_index + 1 + + # Count items with errors per type + items_with_errors_by_type: dict[type[BaseModel], set[int]] = defaultdict(set) + for err in filtered_errors: + idx = get_item_index(err["loc"]) + if idx is not None and idx in item_types: + model_type_cls = item_types[idx] + if model_type_cls is not None: + items_with_errors_by_type[model_type_cls].add(idx) + + # Count items without any errors + items_without_errors = total_items - len( + { + idx + for idx in item_types.keys() + if any(get_item_index(err["loc"]) == idx for err in filtered_errors) + } + ) + + return items_without_errors, type_counts, items_with_errors_by_type + + +def print_collection_statistics( + items_without_errors: int, + type_counts: Counter[type[BaseModel] | None], + items_with_errors_by_type: dict[type[BaseModel], set[int]], + stderr: Console, +) -> None: + """Print validation statistics for heterogeneous collections. + + Args + ---- + items_without_errors : int + Count of items with no validation errors + type_counts : Counter[type[BaseModel] | None] + Counter of items by model type + items_with_errors_by_type : dict[type[BaseModel], set[int]] + Mapping from model type to set of item indices with errors + stderr : Console + Console for stderr output + """ + stderr.print(" [dim]Collection statistics:[/dim]") + + # Show items without errors first + # TODO: Once we switch to parse_features (instead of validate_features), + # we can include type information for items without errors by parsing + # the input and tracking which items validated successfully and their types. + # This would allow output like: "Building: 2 confirmed (no errors)" + if items_without_errors > 0: + stderr.print( + f" • {items_without_errors} item{'s' if items_without_errors != 1 else ''} with no errors", + style="dim", + ) + + # Show per-type statistics + for model_type_cls, count in type_counts.most_common(): + if model_type_cls is not None: + items_with_errors = len( + items_with_errors_by_type.get(model_type_cls, set()) + ) + valid_count = count - items_with_errors + + if valid_count > 0: + stderr.print( + f" • {model_type_cls.__name__}: {valid_count} confirmed, {items_with_errors} with errors", + style="dim", + ) + else: + stderr.print( + f" • {model_type_cls.__name__} (probable): {items_with_errors} item{'s' if items_with_errors != 1 else ''} with errors", + style="dim", + ) + stderr.print() + + +def handle_validation_error( + e: ValidationError, + model_type: UnionType, + stderr: Console, + original_data: dict | list | None = None, + show_fields: list[str] | None = None, +) -> None: + """Handle and format validation errors with rich contextual information. + + Groups errors by discriminator, selects most likely error groups, and provides + helpful diagnostics for heterogeneous collections and ambiguous types. + + Args + ---- + e : ValidationError + ValidationError from pydantic + model_type : UnionType + Union type used for validation + stderr : Console + Console for stderr output + original_data : dict | list | None + Original input data for error display + show_fields : list[str] | None + List of field names to display alongside errors + """ + # Compute metadata once upfront + metadata = introspect_union(model_type) + + # Create cache for structural tuple computation (optimizes systematic errors) + structural_cache: dict[ErrorLocation, StructuralTuple] = {} + + # Group errors by discriminator path and select most likely group(s) + error_groups = group_errors_by_discriminator(e.errors(), metadata, structural_cache) + filtered_errors, is_tied, is_heterogeneous, item_types = select_most_likely_errors( + error_groups, + metadata=metadata, + all_errors=e.errors(), + structural_cache=structural_cache, + ) + + # Show heterogeneity warning if collection has mixed types + if is_heterogeneous: + stderr.print( + " ⚠ Heterogeneous collection: Data contains multiple feature types.", + style="yellow", + ) + stderr.print( + " • Consider validating each type separately with --theme or --type", + style="dim", + ) + stderr.print() + + # Compute and display statistics if there are errors to report + if filtered_errors: + items_without_errors, type_counts, items_with_errors_by_type = ( + compute_collection_statistics(item_types, filtered_errors) + ) + print_collection_statistics( + items_without_errors, type_counts, items_with_errors_by_type, stderr + ) + + # Show tie indicator if multiple groups had same error count + elif is_tied: + stderr.print( + " ⚠ Ambiguous: Data matches multiple types equally. Consider:", + style="yellow", + ) + stderr.print( + " • Specifying --theme or --type to narrow validation", style="dim" + ) + stderr.print(" • Adding discriminator fields to clarify intent", style="dim") + stderr.print() + + # Group errors by item + + errors_by_item: dict[int | None, list] = defaultdict(list) + for error in filtered_errors: + item_idx = get_item_index(error["loc"]) + errors_by_item[item_idx].append(error) + + # Display errors grouped by item + + for item_idx, item_errors in errors_by_item.items(): + # Determine item type + error_item_type = None + if item_idx is not None and item_idx in item_types: + error_item_type = item_types.get(item_idx) + + # Try verbose display first + displayed = format_validation_errors_verbose( + item_errors, + stderr, + metadata=metadata, + item_type=error_item_type, + structural_cache=structural_cache, + original_data=original_data, + item_index=item_idx, + show_fields=show_fields, + ) + + # Fall back to non-verbose format if verbose couldn't display + if not displayed: + for i, error in enumerate(item_errors): + format_validation_error( + error, + stderr, + metadata=metadata, + show_model_hint=(i == 0), + item_type=error_item_type, + show_item_type=is_heterogeneous, + structural_cache=structural_cache, + ) + + +def handle_generic_error(e: Exception, filename: Path, error_type: str) -> NoReturn: + """Handle generic errors during validation. + + Args + ---- + e : Exception + Exception that occurred + filename : Path + Input filename or "-" for stdin + error_type : str + Type of error for user-friendly message + + Raises + ------ + click.UsageError + Always, with formatted error message + """ + source_name = get_source_name(filename) + + if error_type == "yaml": + raise click.UsageError(f"'{source_name}' contains invalid input: {e}") + elif error_type == "value": + raise click.UsageError(str(e)) + elif error_type == "key": + raise click.UsageError(f"Invalid data structure - missing key: {e}") + else: + raise click.UsageError(f"Error processing {source_name}: {e}") + + +@click.command() +@click.argument("filename", type=click.Path(path_type=Path), required=True) +@click.option( + "--overture-types", + is_flag=True, + help="Validate against all official Overture types (excludes extensions)", +) +@click.option( + "--namespace", + help="Namespace to filter by (e.g., overture, annex)", +) +@click.option( + "--theme", + multiple=True, + help="Theme to validate against (shorthand for all types in theme)", +) +@click.option( + "--type", + "types", + multiple=True, + help="Specific type to validate against (e.g., building, segment)", +) +@click.option( + "--show-field", + "show_fields", + multiple=True, + help="Field to display alongside errors (e.g., id, version). Can be repeated.", +) +def validate( + filename: Path, + overture_types: bool, + namespace: str | None, + theme: tuple[str, ...], + types: tuple[str, ...], + show_fields: tuple[str, ...], +) -> None: + r"""Validate Overture Maps data against schemas. + + Read from FILENAME or stdin if FILENAME is '-'. + Supports JSON, YAML, and GeoJSON formats. + + \b + Examples: + # Validate a file + $ overture-schema validate data.json + \b + # Validate from stdin + $ overture-schema validate - < data.json + \b + # Validate only buildings + $ overture-schema validate --theme buildings data.json + \b + # Validate specific type + $ overture-schema validate --type building data.json + \b + # Official Overture types only + $ overture-schema validate --overture-types data.json + """ + # Resolve model type first (errors here are ValueErrors, not ValidationErrors) + try: + effective_namespace = "overture" if overture_types else namespace + model_type = resolve_types(effective_namespace, theme, types) + except ValueError as e: + handle_generic_error(e, filename, "value") + + # Load input (errors here are YAMLErrors or ValueErrors, not ValidationErrors) + try: + data, source_name = load_input(filename) + except yaml.YAMLError as e: + handle_generic_error(e, filename, "yaml") + except KeyError as e: + handle_generic_error(e, filename, "key") + + # Perform validation (now model_type and data are guaranteed to be defined) + try: + perform_validation(data, model_type) + stdout.print(f"✓ Successfully validated {source_name}") + except ValidationError as e: + handle_validation_error( + e, model_type, stderr, original_data=data, show_fields=list(show_fields) + ) + sys.exit(1) diff --git a/packages/overture-schema-cli/src/overture/schema/cli/data_display.py b/packages/overture-schema-validation/src/overture/schema/validation/data_display.py similarity index 93% rename from packages/overture-schema-cli/src/overture/schema/cli/data_display.py rename to packages/overture-schema-validation/src/overture/schema/validation/data_display.py index 5dc2f28a7..d58072520 100644 --- a/packages/overture-schema-cli/src/overture/schema/cli/data_display.py +++ b/packages/overture-schema-validation/src/overture/schema/validation/data_display.py @@ -1,7 +1,9 @@ """Data display utilities for verbose error output.""" +from collections import defaultdict from typing import Any +from rich import box from rich.panel import Panel from rich.table import Table @@ -10,7 +12,7 @@ DEFAULT_CONTEXT_SIZE = 1 -def _format_nested_path(error_path: list[str | int]) -> str: +def format_path(error_path: list[str | int]) -> str: """Format error path as displayable nested key. Converts paths like ["sources", 0, "confidence"] into "sources[0].confidence". @@ -26,15 +28,10 @@ def _format_nested_path(error_path: list[str | int]) -> str: parts: list[str] = [] for element in error_path: if isinstance(element, str): - # Add dot separator before field names (except first, and not right after opening) - if parts and not parts[-1].endswith("]"): - parts.append(".") - elif parts and parts[-1].endswith("]"): - # Add dot after array index before field name + if parts: parts.append(".") parts.append(element) elif isinstance(element, int): - # Add array index in brackets parts.append(f"[{element}]") return "".join(parts) @@ -141,14 +138,6 @@ def _select_context_for_array_index_error( ------- Dict of selected fields showing context around the array """ - # Find the path up to the last string field (the array field itself) - array_field_path: list[str | int] = [] - for element in error_path: - array_field_path.append(element) - if isinstance(element, str): - # Keep going until we hit an int, but remember the last string position - pass - # Find the last string element's index last_string_idx = -1 for i, element in enumerate(error_path): @@ -182,7 +171,7 @@ def _select_context_for_array_index_error( selected: dict[str, Any] = {} # Format the full path including the array index - full_path_str = _format_nested_path(error_path) + full_path_str = format_path(error_path) if isinstance(current, dict): # Get array value and the specific item @@ -207,7 +196,7 @@ def _select_context_for_array_index_error( if len(array_path) > 1: # Navigate to parent to get sibling fields parent_path = array_path[:-1] - prefix = _format_nested_path(parent_path) + prefix = format_path(parent_path) parent_fields = list(current.keys()) if array_field_name in parent_fields: @@ -389,7 +378,7 @@ def select_context_fields( nested_end = min(len(parent_fields), nested_target_index + context_size + 1) # Build path prefix once for reuse (without trailing dot) - prefix_str = _format_nested_path(parent_path) + prefix_str = format_path(parent_path) # Add nested elision marker at start if needed if nested_start > 0 and context_size > 0: @@ -454,7 +443,7 @@ def select_context_fields( selected[field_name] = current else: # Build nested key with array indices: ["sources", 0, "confidence"] -> "sources[0].confidence" - nested_key = _format_nested_path(error_path) + nested_key = format_path(error_path) selected[nested_key] = current else: selected[field_name] = feature[field_name] @@ -611,7 +600,7 @@ def create_feature_display( table.add_column("Error") # Build mapping from field display name to error messages - error_map: dict[str, list[str]] = {} + error_map: dict[str, list[str]] = defaultdict(list) for error_path, error_msg in errors: # Determine which field has the error (use first string in path) error_field: str | None = None @@ -620,24 +609,15 @@ def create_feature_display( error_field = element break - # Skip if no field name found (shouldn't happen in practice) if error_field is None: continue - # Format nested path for display - if len(error_path) > 1: - # Special case: geometry is opaque, don't show nested path - if error_field == "geometry": - error_field_display = "geometry" - else: - # Handle nested paths like ["sources", 0, "confidence"] -> "sources[0].confidence" - error_field_display = _format_nested_path(error_path) + # Format nested path for display (geometry is opaque -- don't expand) + if len(error_path) > 1 and error_field != "geometry": + error_field_display = format_path(error_path) else: error_field_display = error_field - # Add to error map - if error_field_display not in error_map: - error_map[error_field_display] = [] error_map[error_field_display].append(error_msg) # Add rows for each field @@ -669,9 +649,6 @@ def create_feature_display( value_styled = f"[dim]{formatted_value}[/dim]" table.add_row(field_name_styled, value_styled, "", "") - # Wrap table in a Panel with rounded borders - from rich import box - # Add title: "Validation Failed" for single features, or item info for collections if item_index is not None: if item_type: diff --git a/packages/overture-schema-cli/src/overture/schema/cli/error_formatting.py b/packages/overture-schema-validation/src/overture/schema/validation/error_formatting.py similarity index 68% rename from packages/overture-schema-cli/src/overture/schema/cli/error_formatting.py rename to packages/overture-schema-validation/src/overture/schema/validation/error_formatting.py index b8de95d67..2fab7d226 100644 --- a/packages/overture-schema-cli/src/overture/schema/cli/error_formatting.py +++ b/packages/overture-schema-validation/src/overture/schema/validation/error_formatting.py @@ -1,13 +1,16 @@ """Error formatting and grouping for validation errors.""" +from collections import defaultdict from typing import Any from pydantic import BaseModel from rich.console import Console from .data_display import ( + DEFAULT_CONTEXT_SIZE, create_feature_display, extract_feature_data, + format_path, select_context_fields, ) from .type_analysis import ( @@ -48,7 +51,7 @@ def group_errors_by_discriminator( >>> from typing import Annotated, Union >>> from pydantic import Field >>> from overture.schema.buildings import Building, BuildingPart - >>> from overture.schema.cli.type_analysis import introspect_union + >>> from overture.schema.validation.type_analysis import introspect_union >>> BuildingUnion = Annotated[ ... Union[Building, BuildingPart], ... Field(discriminator='type') @@ -79,24 +82,20 @@ def group_errors_by_discriminator( >>> ('building_part',) in groups True """ - groups: dict[ErrorLocation, list[ValidationErrorDict]] = {} + groups: dict[ErrorLocation, list[ValidationErrorDict]] = defaultdict(list) for error in errors: loc = error["loc"] try: structural = get_or_create_structural_tuple(loc, metadata, structural_cache) disc_path = extract_discriminator_path(loc, structural) - if disc_path not in groups: - groups[disc_path] = [] groups[disc_path].append(error) except (KeyError, TypeError, IndexError): # Structural analysis can fail for unexpected error path formats # (e.g., new Pydantic union markers). Group under empty path as fallback. - if () not in groups: - groups[()] = [] groups[()].append(error) - return groups + return dict(groups) def analyze_collection_heterogeneity( @@ -119,11 +118,9 @@ def analyze_collection_heterogeneity( - is_heterogeneous: True if collection contains multiple model types """ # Group errors by item index - item_errors: dict[int | None, list[ValidationErrorDict]] = {} + item_errors: dict[int | None, list[ValidationErrorDict]] = defaultdict(list) for error in errors: item_idx = get_item_index(error["loc"]) - if item_idx not in item_errors: - item_errors[item_idx] = [] item_errors[item_idx].append(error) # Infer the most likely type for each item @@ -134,12 +131,12 @@ def analyze_collection_heterogeneity( continue # Group this item's errors by inferred type - errors_by_type: dict[type[BaseModel], list[ValidationErrorDict]] = {} + errors_by_type: dict[type[BaseModel], list[ValidationErrorDict]] = defaultdict( + list + ) for error in item_error_list: inferred_type = infer_model_from_error(error, metadata, structural_cache) if inferred_type is not None: - if inferred_type not in errors_by_type: - errors_by_type[inferred_type] = [] errors_by_type[inferred_type].append(error) if errors_by_type: @@ -194,9 +191,9 @@ def select_most_likely_errors( # Check for heterogeneous collections is_heterogeneous = False - _item_types: dict[int, type[BaseModel] | None] = {} + item_types: dict[int, type[BaseModel] | None] = {} if metadata is not None and all_errors is not None: - _item_types, is_heterogeneous = analyze_collection_heterogeneity( + item_types, is_heterogeneous = analyze_collection_heterogeneity( all_errors, metadata, structural_cache ) @@ -205,19 +202,19 @@ def select_most_likely_errors( filtered_errors = [] for error in all_errors: item_idx = get_item_index(error["loc"]) - if item_idx is not None and item_idx in _item_types: + if item_idx is not None and item_idx in item_types: # Only include this error if it matches the inferred type for this item if metadata is not None: error_type = infer_model_from_error( error, metadata, structural_cache ) - if error_type == _item_types[item_idx]: + if error_type == item_types[item_idx]: filtered_errors.append(error) else: # Non-list errors or items without inferred type - include them filtered_errors.append(error) - return filtered_errors, False, True, _item_types + return filtered_errors, False, True, item_types # Find the minimum error count min_error_count = min(len(errors) for errors in error_groups.values()) @@ -233,33 +230,42 @@ def select_most_likely_errors( # Indicate if there was a tie is_tied = len(tied_groups) > 1 - return flattened_errors, is_tied, is_heterogeneous, _item_types + return flattened_errors, is_tied, is_heterogeneous, item_types -def format_path(filtered_loc: list[str | int]) -> str: - """Convert filtered location path to dot-separated string. +def get_effective_message(error: ValidationErrorDict) -> str: + """Return the effective error message, preferring ctx["error"] over msg.""" + msg = error["msg"] + ctx = error.get("ctx", {}) + if "error" in ctx: + return str(ctx["error"]) + return msg - Args - ---- - filtered_loc: List of path components (strings and integers) - Returns - ------- - Formatted path string (e.g., "properties.name" or "items[0].value") - """ - path_str = "" - for i, part in enumerate(filtered_loc): - if isinstance(part, str): - if i > 0: - path_str += "." - path_str += part - else: - path_str += f"[{part}]" +_DISPLAYABLE_STRUCT_TYPES = ("list_index", "field") - if not path_str: - path_str = "(root)" - return path_str +def _filter_union_markers( + loc: ErrorLocation, + metadata: UnionMetadata | None, + structural_cache: dict[ErrorLocation, StructuralTuple] | None, +) -> list[str | int]: + """Filter Pydantic union markers from an error location path. + + Keeps only list indices and field names, stripping out internal + discriminator and model markers that are noise in user-facing output. + """ + if metadata is None: + return list(loc) + try: + structural = get_or_create_structural_tuple(loc, metadata, structural_cache) + return [ + element + for element, struct_type in zip(loc, structural, strict=False) + if struct_type in _DISPLAYABLE_STRUCT_TYPES + ] + except (KeyError, TypeError, IndexError): + return list(loc) def format_validation_errors_verbose( @@ -305,32 +311,11 @@ def format_validation_errors_verbose( error_tuples: list[tuple[list[str | int], str]] = [] for error in errors: loc = error["loc"] - msg = error["msg"] - - # Extract actual error message from context if available - ctx = error.get("ctx", {}) - if "error" in ctx: - msg = ctx["error"] - - # Filter loc to remove union markers - if metadata is not None: - try: - structural = get_or_create_structural_tuple( - loc, metadata, structural_cache - ) - filtered_loc = [ - element - for element, struct_type in zip(loc, structural, strict=False) - if struct_type in ("list_index", "field") - ] - except (KeyError, TypeError, IndexError): - # Fall back to unfiltered path on unexpected error formats - filtered_loc = list(loc) - else: - filtered_loc = list(loc) + msg = get_effective_message(error) + + error_path = _filter_union_markers(loc, metadata, structural_cache) # Strip out the list index since we've already extracted that feature - error_path = list(filtered_loc) if error_path and isinstance(error_path[0], int): error_path = error_path[1:] @@ -338,7 +323,7 @@ def format_validation_errors_verbose( # Select context fields for all errors # Merge context from all error paths - context_size = 1 + context_size = DEFAULT_CONTEXT_SIZE selected_fields: dict[str, Any] = {} for error_path, _ in error_tuples: @@ -347,25 +332,22 @@ def format_validation_errors_verbose( ) selected_fields.update(context) - if selected_fields: - # Create and display panel with all errors - # Get type name from item_type if available - type_name = item_type.__name__ if item_type else None - panel = create_feature_display( - selected_fields, - error_tuples, - item_index=item_index, - item_type=type_name, - show_fields=show_fields, - feature=feature, - ) - console.print(panel) - console.print() - return True - else: - # No fields to display (e.g., root-level discriminator errors) + if not selected_fields: return False + type_name = item_type.__name__ if item_type else None + panel = create_feature_display( + selected_fields, + error_tuples, + item_index=item_index, + item_type=type_name, + show_fields=show_fields, + feature=feature, + ) + console.print(panel) + console.print() + return True + def format_validation_error( error: ValidationErrorDict, @@ -375,8 +357,6 @@ def format_validation_error( item_type: type[BaseModel] | None = None, show_item_type: bool = False, structural_cache: dict[ErrorLocation, StructuralTuple] | None = None, - original_data: dict[str, Any] | list[Any] | None = None, - show_feature_data: bool = False, ) -> None: """Format and print a single validation error. @@ -389,8 +369,6 @@ def format_validation_error( item_type: The inferred type for this item (always provided if available) show_item_type: Whether to display the item type in the path (True for heterogeneous collections) structural_cache: Optional cache for structural tuple computation - original_data: Original input data for extracting feature details (optional) - show_feature_data: Whether to display feature data with error (verbose mode) """ loc = error["loc"] @@ -412,25 +390,10 @@ def format_validation_error( # Structural analysis can fail for unexpected error path formats pass - # Filter out union markers from the path using structural analysis - if metadata is not None: - try: - structural = get_or_create_structural_tuple(loc, metadata, structural_cache) - # Filter out 'union', 'model', and 'discriminator' markers - # Keep only 'list_index' and 'field' elements for display - filtered_loc = [ - element - for element, struct_type in zip(loc, structural, strict=False) - if struct_type in ("list_index", "field") - ] - except (KeyError, TypeError, IndexError): - # Fall back to original loc if structural analysis fails - filtered_loc = list(loc) - else: - filtered_loc = list(loc) + filtered_loc = _filter_union_markers(loc, metadata, structural_cache) # Convert to dot-separated path - path_str = format_path(filtered_loc) + path_str = format_path(filtered_loc) or "(root)" # Add item type annotation if requested (for heterogeneous collections) if show_item_type and item_type is not None: @@ -443,56 +406,25 @@ def format_validation_error( console.print() # Format the error message - msg = error["msg"] + msg = get_effective_message(error) input_value = error.get("input") - ctx = error.get("ctx", {}) - if "error" in ctx: - msg = ctx["error"] + # Suppress input value when the message came from a nested error context, + # since the raw input is misleading in that case. + if "error" in error.get("ctx", {}): input_value = None - # Skip error summary lines in verbose mode - if not show_feature_data: - console.print(f" {path_str}", style="cyan") - console.print(f" → {msg}", style="yellow") - - # Show input value if present and not too large - if input_value is not None: - value_str = ( - repr(input_value) - if not isinstance(input_value, str) - else f"'{input_value}'" - ) - prefix = " → Got: " - if len(value_str) <= console.width - len(prefix): - console.print(f"{prefix}{value_str}", style="dim") - console.print() + console.print(f" {path_str}", style="cyan") + console.print(f" → {msg}", style="yellow") - # Show feature data in verbose mode - if show_feature_data and original_data is not None: - # Extract item index from error location - item_index = get_item_index(loc) - - # Extract flattened feature data - feature = extract_feature_data(original_data, item_index) - - if feature: - # Convert filtered_loc to error path format (list of str/int) - # Strip out the list index since we've already extracted that feature - error_path = list(filtered_loc) - if error_path and isinstance(error_path[0], int): - error_path = error_path[1:] # Remove list index - - # Select context fields - selected_fields = select_context_fields(feature, error_path, context_size=1) - - if selected_fields: - # Create and display panel (with item index for title) - panel = create_feature_display( - selected_fields, - [(error_path, msg)], - item_index=item_index, - ) - # Print panel directly (it has its own borders, no extra indentation needed) - console.print(panel) - console.print() + # Show input value if present and not too large + if input_value is not None: + value_str = ( + repr(input_value) + if not isinstance(input_value, str) + else f"'{input_value}'" + ) + prefix = " → Got: " + if len(value_str) <= console.width - len(prefix): + console.print(f"{prefix}{value_str}", style="dim") + console.print() diff --git a/packages/overture-schema-validation/src/overture/schema/validation/output.py b/packages/overture-schema-validation/src/overture/schema/validation/output.py new file mode 100644 index 000000000..0e22181d8 --- /dev/null +++ b/packages/overture-schema-validation/src/overture/schema/validation/output.py @@ -0,0 +1,29 @@ +"""Rich console output utilities.""" + +from rich.console import Console +from rich.text import Text + + +def rewrap(text: str, console: Console, indent: int = 0, padding_right: int = 0) -> str: + """Unwrap and re-wrap text at console width with indentation. + + Args + ---- + text : str + The text to rewrap + console : Console + Rich Console instance for width and wrapping + indent : int + Number of spaces to indent (default: 0) + padding_right : int + Right padding to subtract from width (default: 0) + + Returns + ------- + str + Re-wrapped and indented text + """ + unwrapped = " ".join(text.split()) + text_obj = Text(unwrapped) + wrapped_lines = text_obj.wrap(console, console.width - indent - padding_right) + return "\n".join(f"{' ' * indent}{line}" for line in wrapped_lines) diff --git a/packages/overture-schema-validation/src/overture/schema/validation/py.typed b/packages/overture-schema-validation/src/overture/schema/validation/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/packages/overture-schema-cli/src/overture/schema/cli/type_analysis.py b/packages/overture-schema-validation/src/overture/schema/validation/type_analysis.py similarity index 94% rename from packages/overture-schema-cli/src/overture/schema/cli/type_analysis.py rename to packages/overture-schema-validation/src/overture/schema/validation/type_analysis.py index 9316c80c0..2d7889081 100644 --- a/packages/overture-schema-cli/src/overture/schema/cli/type_analysis.py +++ b/packages/overture-schema-validation/src/overture/schema/validation/type_analysis.py @@ -1,15 +1,17 @@ """Type introspection and structural analysis for union types.""" import inspect +import types from dataclasses import dataclass -from typing import Annotated as AnnotatedType -from typing import Any, Literal, get_args, get_origin +from typing import Annotated, Any, Literal, Union, get_args, get_origin from pydantic import BaseModel from pydantic.fields import FieldInfo from .types import ErrorLocation, ValidationErrorDict +KNOWN_DISCRIMINATOR_FIELDS = ("type", "theme", "subtype") + # Type aliases for structural tuple elements StructuralElement = Literal["list_index", "union", "model", "discriminator", "field"] StructuralTuple = tuple[StructuralElement, ...] @@ -47,18 +49,21 @@ def _process_union_member( member_origin = get_origin(member) # Case 1: Annotated type (might contain nested union or Tag) - if member_origin is AnnotatedType: + if member_origin is Annotated: member_args = get_args(member) if not member_args: return # Check for discriminator in annotations has_discriminator = any( - isinstance(metadata, FieldInfo) and hasattr(metadata, "discriminator") + isinstance(metadata, FieldInfo) + and getattr(metadata, "discriminator", None) is not None for metadata in member_args[1:] ) - if has_discriminator or get_origin(member_args[0]) is not None: + inner_origin = get_origin(member_args[0]) + is_nested_union = inner_origin is types.UnionType or inner_origin is Union + if has_discriminator or is_nested_union: # Nested union (with or without discriminator) nested_metadata = introspect_union(member) nested_unions[str(member)] = nested_metadata @@ -77,10 +82,11 @@ def _process_union_member( model_name_to_model[member.__name__] = member # Extract discriminator values from known discriminator fields only - # Restrict to known discriminator names to avoid false positives from other Literal fields - discriminator_fields = ("type", "theme", "subtype") + # (avoids false positives from other Literal fields). + # A model may have multiple discriminator fields (e.g., both `type` + # and `theme`), so register all of them. for field_name, field_info in member.model_fields.items(): - if field_name not in discriminator_fields: + if field_name not in KNOWN_DISCRIMINATOR_FIELDS: continue annotation = field_info.annotation if get_origin(annotation) is Literal: @@ -152,7 +158,7 @@ def introspect_union(union_type: Any) -> UnionMetadata: # noqa: ANN401 actual_union = union_type # Unwrap Annotated ONLY if the top level is Annotated - if origin is AnnotatedType: + if origin is Annotated: # This is Annotated[Union[...], ...] args = get_args(union_type) if args: @@ -160,10 +166,8 @@ def introspect_union(union_type: Any) -> UnionMetadata: # noqa: ANN401 actual_union = args[0] # Look for Field with discriminator in metadata for metadata in args[1:]: - if isinstance(metadata, FieldInfo) and hasattr( - metadata, "discriminator" - ): - disc = metadata.discriminator + if isinstance(metadata, FieldInfo): + disc = getattr(metadata, "discriminator", None) # discriminator can be a string or Discriminator object discriminator_field = str(disc) if disc is not None else None break diff --git a/packages/overture-schema-validation/src/overture/schema/validation/types.py b/packages/overture-schema-validation/src/overture/schema/validation/types.py new file mode 100644 index 000000000..b8ac54df7 --- /dev/null +++ b/packages/overture-schema-validation/src/overture/schema/validation/types.py @@ -0,0 +1,12 @@ +"""Type aliases for CLI module.""" + +from typing import TypeAlias + +from pydantic_core import ErrorDetails + +# Pydantic validation error dictionary structure +# In Pydantic v2, ValidationError.errors() returns list[ErrorDetails] +ValidationErrorDict: TypeAlias = ErrorDetails + +# Error location tuple (mix of field names and list indices) +ErrorLocation: TypeAlias = tuple[str | int, ...] diff --git a/packages/overture-schema-validation/tests/conftest.py b/packages/overture-schema-validation/tests/conftest.py new file mode 100644 index 000000000..9d7ab4346 --- /dev/null +++ b/packages/overture-schema-validation/tests/conftest.py @@ -0,0 +1,144 @@ +"""Shared test fixtures for validation tests.""" + +from collections.abc import Generator +from io import StringIO +from typing import Any +from unittest.mock import patch + +import pytest +from click.testing import CliRunner +from rich.console import Console + + +@pytest.fixture +def cli_runner() -> Generator[CliRunner, None, None]: + """Provide a CliRunner within an isolated filesystem.""" + runner = CliRunner() + with runner.isolated_filesystem(): + yield runner + + +@pytest.fixture +def stderr_buffer() -> Generator[StringIO, None, None]: + """Provide a patched stderr buffer for capturing validation error output.""" + buffer = StringIO() + captured_console = Console(file=buffer, force_terminal=False) + + with patch("overture.schema.validation.commands.stderr", captured_console): + yield buffer + + +@pytest.fixture +def building_feature_yaml_content() -> str: + """Return YAML content for a valid building feature.""" + return """ +id: test +type: Feature +geometry: + type: Polygon + coordinates: [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]] +properties: + theme: buildings + type: building + version: 0 +""" + + +@pytest.fixture +def building_feature_yaml( + cli_runner: CliRunner, building_feature_yaml_content: str +) -> str: + """Create a test.yaml file with valid building feature in isolated filesystem.""" + filename = "test.yaml" + with open(filename, "w") as f: + f.write(building_feature_yaml_content) + return filename + + +@pytest.fixture +def missing_id_yaml_content() -> str: + """Return YAML content with missing required field.""" + return """ +type: Feature +geometry: + type: Polygon + coordinates: [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]] +properties: + theme: buildings + type: building + version: 0 +""" + + +def build_feature( + id: str | None = "test", + theme: str | None = "buildings", + type: str = "building", + geometry_type: str = "Polygon", + coordinates: list | None = None, + version: int | None = 0, + geojson_format: bool = True, + **properties: Any, +) -> dict[str, Any]: + """Build a feature dictionary with the specified parameters. + + Args: + id: Feature ID (None to omit) + theme: Theme name (None to omit) + type: Feature type + geometry_type: Geometry type (Point, Polygon, etc.) + coordinates: Custom coordinates (None for sensible defaults) + version: Feature version (None to omit) + geojson_format: If True, use GeoJSON format; if False, use flat format + **properties: Additional properties to include + + Returns: + Feature dictionary in the requested format + """ + # Default coordinates based on geometry type + if coordinates is None: + if geometry_type == "Point": + coordinates = [0.0, 0.0] + elif geometry_type == "Polygon": + coordinates = [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]] + elif geometry_type == "LineString": + coordinates = [[0, 0], [1, 1]] + elif geometry_type == "MultiPolygon": + coordinates = [[[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]]] + else: + coordinates = [] + + geometry = {"type": geometry_type, "coordinates": coordinates} + + if geojson_format: + # GeoJSON format: properties nested under "properties" key + props: dict[str, Any] = { + "type": type, + **properties, + } + if theme is not None: + props["theme"] = theme + if version is not None: + props["version"] = version + + feature: dict[str, Any] = { + "type": "Feature", + "geometry": geometry, + "properties": props, + } + if id is not None: + feature["id"] = id + else: + # Flat format: properties at top level + # Build in the expected order: geometry, theme, type, version, id, properties + feature = {"geometry": geometry} + if theme is not None: + feature["theme"] = theme + feature["type"] = type + if version is not None: + feature["version"] = version + feature.update(properties) + if id is not None: + feature["id"] = id + + return feature diff --git a/packages/overture-schema-cli/tests/test_data_display.py b/packages/overture-schema-validation/tests/test_data_display.py similarity index 99% rename from packages/overture-schema-cli/tests/test_data_display.py rename to packages/overture-schema-validation/tests/test_data_display.py index dd0bfd562..f2d86dd6f 100644 --- a/packages/overture-schema-cli/tests/test_data_display.py +++ b/packages/overture-schema-validation/tests/test_data_display.py @@ -3,7 +3,7 @@ from io import StringIO from conftest import build_feature -from overture.schema.cli.data_display import ( +from overture.schema.validation.data_display import ( create_feature_display, extract_feature_data, format_field_value, diff --git a/packages/overture-schema-cli/tests/test_error_formatting.py b/packages/overture-schema-validation/tests/test_error_formatting.py similarity index 95% rename from packages/overture-schema-cli/tests/test_error_formatting.py rename to packages/overture-schema-validation/tests/test_error_formatting.py index fa8ef1a5d..c94e677a6 100644 --- a/packages/overture-schema-cli/tests/test_error_formatting.py +++ b/packages/overture-schema-validation/tests/test_error_formatting.py @@ -7,12 +7,12 @@ import pytest from click.testing import CliRunner from overture.schema.cli.commands import cli -from overture.schema.cli.error_formatting import ( - format_path, +from overture.schema.validation.data_display import format_path +from overture.schema.validation.error_formatting import ( group_errors_by_discriminator, select_most_likely_errors, ) -from overture.schema.cli.type_analysis import introspect_union +from overture.schema.validation.type_analysis import introspect_union from pydantic import BaseModel, Field, TypeAdapter, ValidationError from rich.console import Console @@ -134,7 +134,7 @@ def test_clear_winner_selected(self, cli_runner: CliRunner) -> None: buffer = StringIO() captured_console = Console(file=buffer, force_terminal=False) - with patch("overture.schema.cli.commands.stderr", captured_console): + with patch("overture.schema.validation.commands.stderr", captured_console): result = cli_runner.invoke(cli, ["validate", filename]) assert result.exit_code == 1 @@ -181,7 +181,7 @@ def test_list_indices_do_not_cause_false_ambiguity( buffer = StringIO() captured_console = Console(file=buffer, force_terminal=False) - with patch("overture.schema.cli.commands.stderr", captured_console): + with patch("overture.schema.validation.commands.stderr", captured_console): result = cli_runner.invoke(cli, ["validate", filename]) assert result.exit_code == 1 @@ -199,7 +199,7 @@ class TestFormatPath: @pytest.mark.parametrize( "filtered_loc,expected_output", [ - pytest.param([], "(root)", id="empty_path"), + pytest.param([], "", id="empty_path"), pytest.param(["id"], "id", id="single_field"), pytest.param(["properties", "name"], "properties.name", id="nested_field"), pytest.param([0], "[0]", id="list_index"), diff --git a/packages/overture-schema-cli/tests/test_heterogeneous_collections.py b/packages/overture-schema-validation/tests/test_heterogeneous_collections.py similarity index 100% rename from packages/overture-schema-cli/tests/test_heterogeneous_collections.py rename to packages/overture-schema-validation/tests/test_heterogeneous_collections.py diff --git a/packages/overture-schema-cli/tests/test_type_analysis.py b/packages/overture-schema-validation/tests/test_type_analysis.py similarity index 99% rename from packages/overture-schema-cli/tests/test_type_analysis.py rename to packages/overture-schema-validation/tests/test_type_analysis.py index 12fb10e75..09cc745c2 100644 --- a/packages/overture-schema-cli/tests/test_type_analysis.py +++ b/packages/overture-schema-validation/tests/test_type_analysis.py @@ -3,7 +3,7 @@ from typing import Annotated, Literal import pytest -from overture.schema.cli.type_analysis import ( +from overture.schema.validation.type_analysis import ( StructuralTuple, create_structural_tuple, extract_discriminator_path, diff --git a/packages/overture-schema-validation/tests/test_validate_command.py b/packages/overture-schema-validation/tests/test_validate_command.py new file mode 100644 index 000000000..ce0bbab30 --- /dev/null +++ b/packages/overture-schema-validation/tests/test_validate_command.py @@ -0,0 +1,336 @@ +"""Tests for the validate command.""" + +import json +from io import StringIO + +import pytest +from click.testing import CliRunner +from conftest import build_feature +from overture.schema.cli.commands import cli + + +class TestValidateCommand: + """Tests for the validate command.""" + + def test_validate_success_message_from_file( + self, cli_runner: CliRunner, building_feature_yaml: str + ) -> None: + """Test that validation shows success message for valid file input.""" + result = cli_runner.invoke(cli, ["validate", building_feature_yaml]) + assert result.exit_code == 0 + assert "Successfully validated" in result.output + + def test_validate_flat_format_input(self, cli_runner: CliRunner) -> None: + """Test that validation works with flat (non-GeoJSON) format.""" + flat_feature = build_feature(geojson_format=False) + flat_json = json.dumps(flat_feature) + result = cli_runner.invoke( + cli, ["validate", "--theme", "buildings", "-"], input=flat_json + ) + assert result.exit_code == 0 + assert "Successfully validated " in result.output + + def test_validate_error_message_format( + self, + cli_runner: CliRunner, + missing_id_yaml_content: str, + stderr_buffer: StringIO, + ) -> None: + """Test that validation errors are formatted correctly.""" + result = cli_runner.invoke( + cli, ["validate", "-"], input=missing_id_yaml_content + ) + assert result.exit_code == 1 + + stderr_output = stderr_buffer.getvalue() + assert "Validation Failed" in stderr_output + # Should show the field path + assert "id" in stderr_output.lower() + + def test_validate_error_filters_tagged_union_from_path( + self, + cli_runner: CliRunner, + missing_id_yaml_content: str, + stderr_buffer: StringIO, + ) -> None: + """Test that validation error paths don't show internal tagged-union markers.""" + result = cli_runner.invoke( + cli, ["validate", "-"], input=missing_id_yaml_content + ) + assert result.exit_code == 1 + + stderr_output = stderr_buffer.getvalue() + + # Should NOT show Pydantic's internal union markers in the path + assert "tagged-union" not in stderr_output.lower() + assert "union[" not in stderr_output.lower() + + # Should show the actual field name + assert "id" in stderr_output.lower() + + def test_validate_error_with_invalid_type_value( + self, cli_runner: CliRunner + ) -> None: + """Test validation error for invalid type value.""" + invalid_feature = build_feature(type="invalid_type") + invalid_type_json = json.dumps(invalid_feature) + result = cli_runner.invoke(cli, ["validate", "-"], input=invalid_type_json) + assert result.exit_code == 1 + + def test_validate_error_with_nested_field(self, cli_runner: CliRunner) -> None: + """Test validation error message includes nested field path.""" + feature = build_feature( + names={ + "common": [ + {"value": "Test Building", "language": "invalid_language_code"} + ] + } + ) + nested_field_json = json.dumps(feature) + result = cli_runner.invoke(cli, ["validate", "-"], input=nested_field_json) + assert result.exit_code == 1 + + def test_validate_stdin_requires_dash_argument( + self, + cli_runner: CliRunner, + building_feature_yaml_content: str, + ) -> None: + """Test validating from stdin requires explicit '-' argument.""" + # With dash argument - should work + result = cli_runner.invoke( + cli, ["validate", "-"], input=building_feature_yaml_content + ) + assert result.exit_code == 0 + assert "Successfully validated " in result.output + + # Without dash argument - should show help/usage + result = cli_runner.invoke( + cli, ["validate"], input=building_feature_yaml_content + ) + assert result.exit_code == 2 # Usage error + assert "Missing argument" in result.output or "Usage:" in result.output + + @pytest.mark.parametrize( + "has_error,expected_exit_code,check_index", + [ + pytest.param(False, 0, False, id="success"), + pytest.param(True, 1, True, id="with_error"), + ], + ) + def test_validate_feature_list( + self, + cli_runner: CliRunner, + stderr_buffer: StringIO, + has_error: bool, + expected_exit_code: int, + check_index: bool, + ) -> None: + """Test validation of a list of features (success and error cases).""" + feature1 = build_feature(id="test1") + feature2_id = None if has_error else "test2" + feature2 = build_feature( + id=feature2_id, coordinates=[[[2, 2], [3, 2], [3, 3], [2, 3], [2, 2]]] + ) + feature_list_json = json.dumps([feature1, feature2]) + result = cli_runner.invoke(cli, ["validate", "-"], input=feature_list_json) + assert result.exit_code == expected_exit_code + + if check_index: + stderr_output = stderr_buffer.getvalue() + # Should show list index for the second feature + assert "[1]" in stderr_output or "1" in stderr_output + else: + assert "Successfully validated " in result.output + + @pytest.mark.parametrize( + "first_feature_valid,second_feature_valid,expected_exit_code", + [ + pytest.param(True, True, 0, id="both_valid"), + pytest.param(True, False, 1, id="second_invalid"), + pytest.param(False, False, 1, id="both_invalid"), + ], + ) + def test_validate_feature_collection( + self, + cli_runner: CliRunner, + stderr_buffer: StringIO, + first_feature_valid: bool, + second_feature_valid: bool, + expected_exit_code: int, + ) -> None: + """Test validation of a GeoJSON FeatureCollection with various validity states.""" + feature1 = build_feature(id="test1" if first_feature_valid else None) + feature2 = build_feature( + id="test2" if second_feature_valid else None, + coordinates=[[[2, 2], [3, 2], [3, 3], [2, 3], [2, 2]]], + ) + feature_collection = { + "type": "FeatureCollection", + "features": [feature1, feature2], + } + result = cli_runner.invoke( + cli, ["validate", "-"], input=json.dumps(feature_collection) + ) + assert result.exit_code == expected_exit_code + + if expected_exit_code == 0: + assert "Successfully validated " in result.output + else: + stderr_output = stderr_buffer.getvalue() + # Should show errors for list items + if not first_feature_valid or not second_feature_valid: + assert "[0]" in stderr_output or "[1]" in stderr_output + + def test_validate_with_nonexistent_filters_raises_error( + self, + cli_runner: CliRunner, + building_feature_yaml_content: str, + ) -> None: + """Test that validation with filters matching no models raises a clear error.""" + # Try to validate with a nonexistent theme + result = cli_runner.invoke( + cli, + ["validate", "--theme", "nonexistent_theme", "-"], + input=building_feature_yaml_content, + ) + # UsageError exits with code 2 + assert result.exit_code == 2 + assert "No models found matching" in result.output + + def test_validate_with_nonexistent_type_raises_error( + self, + cli_runner: CliRunner, + building_feature_yaml_content: str, + ) -> None: + """Test that validation with nonexistent type raises a clear error.""" + # Try to validate with a nonexistent type + result = cli_runner.invoke( + cli, + ["validate", "--type", "nonexistent_type", "-"], + input=building_feature_yaml_content, + ) + # UsageError exits with code 2 + assert result.exit_code == 2 + assert "No models found matching" in result.output + + def test_validate_with_valid_theme_invalid_type_raises_error( + self, + cli_runner: CliRunner, + building_feature_yaml_content: str, + ) -> None: + """Test that validation with valid theme but invalid type raises an error.""" + # Try to validate buildings theme with a type that doesn't exist in that theme + result = cli_runner.invoke( + cli, + ["validate", "--theme", "buildings", "--type", "segment", "-"], + input=building_feature_yaml_content, + ) + # UsageError exits with code 2 + assert result.exit_code == 2 + assert "No models found matching" in result.output + + +class TestShowFieldOption: + """Tests for the --show-field option in validate command.""" + + def test_show_field_displays_in_header_on_error( + self, cli_runner: CliRunner, stderr_buffer: StringIO + ) -> None: + """Test that --show-field displays field value in error header.""" + # Create invalid feature with missing required field, but with id + feature = build_feature(id="abc123", version=None) + result = cli_runner.invoke( + cli, ["validate", "--show-field", "id", "-"], input=json.dumps(feature) + ) + assert result.exit_code == 1 + + stderr_output = stderr_buffer.getvalue() + # Header should include id value + assert "id=abc123" in stderr_output + + def test_show_field_displays_in_context( + self, cli_runner: CliRunner, stderr_buffer: StringIO + ) -> None: + """Test that --show-field pins field in context display.""" + # Create invalid feature with error far from id field + feature = build_feature(id="test123", version=None) + result = cli_runner.invoke( + cli, ["validate", "--show-field", "id", "-"], input=json.dumps(feature) + ) + assert result.exit_code == 1 + + stderr_output = stderr_buffer.getvalue() + # Should show id field in context even if error is elsewhere + assert "id" in stderr_output + assert "test123" in stderr_output + + def test_show_multiple_fields( + self, cli_runner: CliRunner, stderr_buffer: StringIO + ) -> None: + """Test that multiple --show-field options work together.""" + feature = build_feature(id="xyz789", version=1, theme=None) + result = cli_runner.invoke( + cli, + ["validate", "--show-field", "id", "--show-field", "version", "-"], + input=json.dumps(feature), + ) + assert result.exit_code == 1 + + stderr_output = stderr_buffer.getvalue() + # Header should include both field values + assert "id=xyz789" in stderr_output + assert "version=1" in stderr_output + + def test_show_field_with_missing_field( + self, cli_runner: CliRunner, stderr_buffer: StringIO + ) -> None: + """Test that --show-field shows for non-existent fields.""" + feature = build_feature(version=None) + # Don't include 'custom_field' in the feature + result = cli_runner.invoke( + cli, + ["validate", "--show-field", "custom_field", "-"], + input=json.dumps(feature), + ) + assert result.exit_code == 1 + + stderr_output = stderr_buffer.getvalue() + # Should show for the non-existent field + assert "custom_field" in stderr_output + assert "" in stderr_output + + def test_show_field_truncates_long_values( + self, cli_runner: CliRunner, stderr_buffer: StringIO + ) -> None: + """Test that long field values are truncated in header.""" + long_id = "x" * 100 # Very long ID + feature = build_feature(id=long_id, version=None) + result = cli_runner.invoke( + cli, ["validate", "--show-field", "id", "-"], input=json.dumps(feature) + ) + assert result.exit_code == 1 + + stderr_output = stderr_buffer.getvalue() + # Should show truncated value in header (with ellipsis) + assert "id=" in stderr_output + assert "..." in stderr_output + # Should not show the full 100 character string + assert long_id not in stderr_output + + def test_show_field_in_collection( + self, cli_runner: CliRunner, stderr_buffer: StringIO + ) -> None: + """Test that --show-field works with feature collections.""" + feature1 = build_feature(id="first", version=None) + feature2 = build_feature(id="second", theme=None) + result = cli_runner.invoke( + cli, + ["validate", "--show-field", "id", "-"], + input=json.dumps([feature1, feature2]), + ) + assert result.exit_code == 1 + + stderr_output = stderr_buffer.getvalue() + # Should show id for both features + assert "id=first" in stderr_output or "first" in stderr_output + assert "id=second" in stderr_output or "second" in stderr_output diff --git a/packages/overture-schema-cli/tests/test_cli_functions.py b/packages/overture-schema-validation/tests/test_validate_functions.py similarity index 91% rename from packages/overture-schema-cli/tests/test_cli_functions.py rename to packages/overture-schema-validation/tests/test_validate_functions.py index e3de0fdbd..0289ad06b 100644 --- a/packages/overture-schema-cli/tests/test_cli_functions.py +++ b/packages/overture-schema-validation/tests/test_validate_functions.py @@ -1,4 +1,4 @@ -"""Tests for CLI helper functions (load_input, perform_validation).""" +"""Tests for validation helper functions (load_input, perform_validation).""" import json from pathlib import Path @@ -7,7 +7,8 @@ import yaml from click.exceptions import UsageError from conftest import build_feature -from overture.schema.cli.commands import load_input, perform_validation, resolve_types +from overture.schema.core.discovery import resolve_types +from overture.schema.validation.commands import load_input, perform_validation from pydantic import ValidationError @@ -15,7 +16,7 @@ class TestLoadInput: """Tests for load_input function. Note: Happy-path file and stdin loading are covered by integration tests - in test_cli_commands.py. These tests focus on error cases and edge cases. + in test_validate_command.py. These tests focus on error cases and edge cases. """ def test_load_input_file_not_found(self) -> None: @@ -197,14 +198,14 @@ class TestPerformValidation: """Tests for perform_validation function. Note: Happy-path validation (single features, lists, FeatureCollections, flat format) - is covered by integration tests in test_cli_commands.py. These tests focus on edge + is covered by integration tests in test_validate_command.py. These tests focus on edge cases and validation logic specific to the function. """ def test_perform_validation_raises_for_invalid_single_feature(self) -> None: """Test that perform_validation raises ValidationError for single invalid feature.""" data = build_feature(id=None) # Missing required 'id' - model_type = resolve_types(False, None, ("buildings",), ()) + model_type = resolve_types(theme_names=("buildings",)) with pytest.raises(ValidationError) as exc_info: perform_validation(data, model_type) @@ -219,7 +220,7 @@ def test_perform_validation_raises_for_invalid_list_item(self) -> None: id=None, coordinates=[[[2, 2], [3, 2], [3, 3], [2, 3], [2, 2]]] ) data = [feature1, feature2] - model_type = resolve_types(False, None, ("buildings",), ()) + model_type = resolve_types(theme_names=("buildings",)) with pytest.raises(ValidationError) as exc_info: perform_validation(data, model_type) @@ -231,7 +232,7 @@ def test_perform_validation_raises_for_invalid_list_item(self) -> None: def test_perform_validation_empty_list(self) -> None: """Test validating an empty list (edge case).""" data: list[dict[str, object]] = [] - model_type = resolve_types(False, None, ("buildings",), ()) + model_type = resolve_types(theme_names=("buildings",)) # Should not raise perform_validation(data, model_type) @@ -239,7 +240,7 @@ def test_perform_validation_empty_list(self) -> None: def test_perform_validation_empty_feature_collection(self) -> None: """Test validating an empty FeatureCollection (edge case).""" data = {"type": "FeatureCollection", "features": []} - model_type = resolve_types(False, None, ("buildings",), ()) + model_type = resolve_types(theme_names=("buildings",)) # Should not raise perform_validation(data, model_type) @@ -249,10 +250,10 @@ def test_perform_validation_with_different_themes(self) -> None: data = build_feature(theme="buildings", type="building") # Should work with buildings theme - buildings_type = resolve_types(False, None, ("buildings",), ()) + buildings_type = resolve_types(theme_names=("buildings",)) perform_validation(data, buildings_type) # Should fail with wrong theme - places_type = resolve_types(False, None, ("places",), ()) + places_type = resolve_types(theme_names=("places",)) with pytest.raises(ValidationError): perform_validation(data, places_type) diff --git a/pyproject.toml b/pyproject.toml index 92201b713..04c6f4c2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,10 +61,12 @@ pythonpath = [ "packages/overture-schema-annex/tests", "packages/overture-schema-base-theme/tests", "packages/overture-schema-buildings-theme/tests", + "packages/overture-schema-cli/tests", "packages/overture-schema-core/tests", "packages/overture-schema-divisions-theme/tests", "packages/overture-schema-places-theme/tests", "packages/overture-schema-system/tests", "packages/overture-schema-transportation-theme/tests", + "packages/overture-schema-validation/tests", "packages/overture-schema/tests", ] diff --git a/uv.lock b/uv.lock index 8ee6263f7..8de7192c9 100644 --- a/uv.lock +++ b/uv.lock @@ -19,6 +19,7 @@ members = [ "overture-schema-places-theme", "overture-schema-system", "overture-schema-transportation-theme", + "overture-schema-validation", "overture-schema-workspace", ] @@ -678,9 +679,7 @@ dependencies = [ { name = "click" }, { name = "overture-schema-core" }, { name = "pydantic" }, - { name = "pyyaml" }, { name = "rich" }, - { name = "yamlcore" }, ] [package.dev-dependencies] @@ -695,9 +694,7 @@ requires-dist = [ { name = "click", specifier = ">=8.0" }, { name = "overture-schema-core", editable = "packages/overture-schema-core" }, { name = "pydantic", specifier = ">=2.0" }, - { name = "pyyaml", specifier = ">=6.0.2" }, { name = "rich", specifier = ">=13.0" }, - { name = "yamlcore", specifier = ">=0.0.4" }, ] [package.metadata.requires-dev] @@ -812,6 +809,42 @@ requires-dist = [ { name = "pydantic", specifier = ">=2.0" }, ] +[[package]] +name = "overture-schema-validation" +source = { editable = "packages/overture-schema-validation" } +dependencies = [ + { name = "click" }, + { name = "overture-schema-core" }, + { name = "pydantic" }, + { name = "pyyaml" }, + { name = "rich" }, + { name = "yamlcore" }, +] + +[package.dev-dependencies] +dev = [ + { name = "mypy" }, + { name = "pytest" }, + { name = "ruff" }, +] + +[package.metadata] +requires-dist = [ + { name = "click", specifier = ">=8.0" }, + { name = "overture-schema-core", editable = "packages/overture-schema-core" }, + { name = "pydantic", specifier = ">=2.0" }, + { name = "pyyaml", specifier = ">=6.0.2" }, + { name = "rich", specifier = ">=13.0" }, + { name = "yamlcore", specifier = ">=0.0.4" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "mypy" }, + { name = "pytest", specifier = ">=7.0" }, + { name = "ruff" }, +] + [[package]] name = "overture-schema-workspace" version = "0.0.0"