Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ azlin = "azlin.rust_bridge:entry"

[tool.hatch.build.targets.wheel]
packages = ["src/azlin"]

[tool.pytest.ini_options]
pythonpath = ["."]

[dependency-groups]
dev = [
"click",
"pytest>=9.0.2",
"pyyaml",
]
Empty file added scripts/__init__.py
Empty file.
71 changes: 42 additions & 29 deletions scripts/cli_documentation/example_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
"""

import re
import sys
from pathlib import Path

import yaml

from .models import CommandExample
from .models import CommandExample, DocumentationError

# Pre-compiled pattern for command name validation (compiled once at import time)
_VALID_NAME_RE = re.compile(r"^[a-zA-Z0-9_-]+$")


class ExampleManager:
Expand Down Expand Up @@ -55,7 +57,7 @@ def _sanitize_command_name(self, command_name: str) -> str:
ValueError: Invalid command name: ../etc/passwd
"""
# Only allow alphanumeric, dash, and underscore
if not re.match(r"^[a-zA-Z0-9_-]+$", command_name):
if not _VALID_NAME_RE.match(command_name):
raise ValueError(f"Invalid command name: {command_name}")
return command_name

Expand All @@ -74,12 +76,9 @@ def load_examples(self, command_name: str) -> list[CommandExample]:
>>> for ex in examples:
... print(ex.title)
"""
# Sanitize command name to prevent path traversal
try:
safe_command_name = self._sanitize_command_name(command_name)
except ValueError as e:
print(f"Warning: {e}", file=sys.stderr)
return []
# Sanitize command name to prevent path traversal.
# ValueError propagates to the caller — silent swallowing is #878.
safe_command_name = self._sanitize_command_name(command_name)

# Try to find YAML file for this command
yaml_file = self.examples_dir / f"{safe_command_name}.yaml"
Expand Down Expand Up @@ -140,31 +139,37 @@ def _load_from_file(self, yaml_file: Path) -> list[CommandExample]:
Expected output here
"""
try:
with open(yaml_file) as f:
with open(yaml_file, encoding="utf-8") as f:
data = yaml.safe_load(f)

if not data or "examples" not in data:
return []

examples = []
for ex_data in data["examples"]:
command = ex_data.get("command")
if not command:
raise DocumentationError(
f"Example entry in '{yaml_file}' is missing required field 'command'"
)
example = CommandExample(
title=ex_data.get("title", ""),
description=ex_data.get("description", ""),
command=ex_data.get("command", ""),
command=command,
output=ex_data.get("output"),
)
examples.append(example)

return examples

except Exception as e:
# Log error but fail gracefully
print(
f"Warning: Failed to load examples from '{yaml_file}': {e}",
file=sys.stderr,
)
except FileNotFoundError:
return []
except DocumentationError:
raise
except Exception as e:
raise DocumentationError(
f"Failed to load examples for command '{yaml_file.stem}'"
) from e

def save_examples(self, command_name: str, examples: list[CommandExample]) -> bool:
"""Save examples to a YAML file.
Expand All @@ -174,7 +179,11 @@ def save_examples(self, command_name: str, examples: list[CommandExample]) -> bo
examples: List of examples to save

Returns:
True if save succeeded, False otherwise
True if save succeeded.

Raises:
DocumentationError: If the file cannot be written due to I/O or
other unexpected errors.

Example:
>>> manager = ExampleManager("scripts/examples/")
Expand All @@ -186,10 +195,11 @@ def save_examples(self, command_name: str, examples: list[CommandExample]) -> bo
>>> manager.save_examples("mount", examples)
True
"""
try:
# Sanitize command name to prevent path traversal
safe_command_name = self._sanitize_command_name(command_name)
# Sanitize command name to prevent path traversal.
# ValueError propagates to the caller — consistent with load_examples (#878).
safe_command_name = self._sanitize_command_name(command_name)

try:
yaml_file = self.examples_dir / f"{safe_command_name}.yaml"
yaml_file.parent.mkdir(parents=True, exist_ok=True)

Expand All @@ -206,18 +216,21 @@ def save_examples(self, command_name: str, examples: list[CommandExample]) -> bo
],
}

with open(yaml_file, "w") as f:
yaml.dump(data, f, default_flow_style=False, sort_keys=False)
with open(yaml_file, "w", encoding="utf-8") as f:
yaml.dump(
data,
f,
default_flow_style=False,
sort_keys=False,
allow_unicode=True,
)

return True

except Exception as e:
# Log error but fail gracefully
print(
f"Warning: Failed to save examples for command '{command_name}': {e}",
file=sys.stderr,
)
return False
raise DocumentationError(
f"Failed to save examples for '{safe_command_name}'"
) from e


__all__ = ["ExampleManager"]
51 changes: 21 additions & 30 deletions scripts/cli_documentation/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import click

from .models import CLIArgument, CLIMetadata, CLIOption
from .models import CLIArgument, CLIMetadata, CLIOption, DocumentationError


class CLIExtractor:
Expand All @@ -42,7 +42,10 @@ def extract_command(
command_name: Name of the Click command to extract

Returns:
CLIMetadata object or None if command not found
CLIMetadata object or None if command not found in the module.

Raises:
DocumentationError: If the module import or metadata extraction fails.

Example:
>>> extractor = CLIExtractor()
Expand Down Expand Up @@ -71,12 +74,9 @@ def extract_command(
return self._extract_from_click_command(command)

except Exception as e:
# Log error but fail gracefully
print(
f"Warning: Failed to extract command '{command_name}': {e}",
file=sys.stderr,
)
return None
raise DocumentationError(
f"Failed to extract command '{command_name}'"
) from e

def extract_all_commands(self, module_path: str) -> list[CLIMetadata]:
"""Extract metadata from all commands in a module.
Expand All @@ -85,7 +85,10 @@ def extract_all_commands(self, module_path: str) -> list[CLIMetadata]:
module_path: Python module path (e.g., "azlin.cli")

Returns:
List of CLIMetadata objects for all found commands
List of CLIMetadata objects for all found commands.

Raises:
DocumentationError: If the module import or metadata extraction fails.

Example:
>>> extractor = CLIExtractor()
Expand All @@ -112,23 +115,18 @@ def extract_all_commands(self, module_path: str) -> list[CLIMetadata]:

# Check if it's a Click command or group
if isinstance(attr, (click.Command, click.Group)):
metadata = self._extract_from_click_command(attr)
if metadata:
commands.append(metadata)
commands.append(self._extract_from_click_command(attr))

return commands

except Exception as e:
# Log error but fail gracefully
print(
f"Warning: Failed to extract commands from module '{module_path}': {e}",
file=sys.stderr,
)
return []
raise DocumentationError(
f"Failed to extract commands from module '{module_path}'"
) from e

def _extract_from_click_command(
self, command: click.Command, parent_path: str = ""
) -> CLIMetadata | None:
) -> CLIMetadata:
"""Extract metadata from a Click command object.

Args:
Expand Down Expand Up @@ -159,11 +157,9 @@ def _extract_from_click_command(
for subcmd_name in command.list_commands(None):
subcmd = command.get_command(None, subcmd_name)
if subcmd:
sub_metadata = self._extract_from_click_command(
subcmd, full_path
subcommands.append(
self._extract_from_click_command(subcmd, full_path)
)
if sub_metadata:
subcommands.append(sub_metadata)

return CLIMetadata(
name=name,
Expand All @@ -176,13 +172,8 @@ def _extract_from_click_command(
)

except Exception as e:
# Log error but fail gracefully
command_name = getattr(command, "name", "unknown")
print(
f"Warning: Failed to extract metadata from command '{command_name}': {e}",
file=sys.stderr,
)
return None
name = getattr(command, "name", "unknown")
raise DocumentationError(f"Failed to extract subcommand '{name}'") from e

def _get_docstring(self, command: click.Command) -> str:
"""Extract docstring from command callback function."""
Expand Down
39 changes: 23 additions & 16 deletions scripts/cli_documentation/hasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@

import hashlib
import json
import sys
from pathlib import Path

from .models import ChangeSet, CLIMetadata
from .models import ChangeSet, CLIMetadata, DocumentationError


class CLIHasher:
Expand Down Expand Up @@ -125,7 +124,11 @@ def save_hashes(self) -> bool:
"""Save all hashes to disk.

Returns:
True if save succeeded, False otherwise
True if save succeeded.

Raises:
DocumentationError: If the file cannot be written due to I/O or
other unexpected errors.

Example:
>>> hasher = CLIHasher()
Expand All @@ -137,17 +140,13 @@ def save_hashes(self) -> bool:
try:
self.hash_file.parent.mkdir(parents=True, exist_ok=True)

with open(self.hash_file, "w") as f:
with open(self.hash_file, "w", encoding="utf-8") as f:
json.dump(self._hashes, f, indent=2, sort_keys=True)

return True

except Exception as e:
print(
f"Warning: Failed to save hashes to '{self.hash_file}': {e}",
file=sys.stderr,
)
return False
raise DocumentationError("Failed to save hashes to hash file") from e

def compare_hashes(self, current_commands: dict[str, CLIMetadata]) -> ChangeSet:
"""Compare current commands with stored hashes.
Expand Down Expand Up @@ -184,20 +183,28 @@ def compare_hashes(self, current_commands: dict[str, CLIMetadata]) -> ChangeSet:
return ChangeSet(changed=changed, added=added, removed=removed)

def _load_hashes(self) -> None:
"""Load hashes from disk."""
"""Load hashes from disk.

Raises:
DocumentationError: If the file exists but cannot be read or parsed.
"""
if not self.hash_file.exists():
self._hashes = {}
return

try:
with open(self.hash_file) as f:
with open(self.hash_file, encoding="utf-8") as f:
self._hashes = json.load(f)
except Exception as e:
print(
f"Warning: Failed to load hashes from '{self.hash_file}': {e}",
file=sys.stderr,
)
except FileNotFoundError:
self._hashes = {}
except json.JSONDecodeError as e:
raise DocumentationError(
f"Hash file {self.hash_file} is corrupt: {e}"
) from e
except OSError as e:
raise DocumentationError(
f"Cannot read hash file {self.hash_file}: {e}"
) from e

def clear_hashes(self) -> None:
"""Clear all stored hashes (force full regeneration)."""
Expand Down
10 changes: 10 additions & 0 deletions scripts/cli_documentation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,22 @@ def has_changes(self) -> bool:
return bool(self.changed or self.added or self.removed)


class DocumentationError(Exception):
"""Raised when a documentation file cannot be read or written unexpectedly.

This exception is raised when an I/O or parse error occurs that is not
simply a missing file. FileNotFoundError is handled gracefully (return
empty/default); all other failures propagate as DocumentationError.
"""


__all__ = [
"CLIArgument",
"CLIMetadata",
"CLIOption",
"ChangeSet",
"CommandExample",
"DocumentationError",
"SyncResult",
"ValidationResult",
]
Loading
Loading