diff --git a/docs/src/content/docs/enterprise/lifecycle-hooks.md b/docs/src/content/docs/enterprise/lifecycle-hooks.md new file mode 100644 index 000000000..e61efa0f4 --- /dev/null +++ b/docs/src/content/docs/enterprise/lifecycle-hooks.md @@ -0,0 +1,245 @@ +--- +title: "Lifecycle Hooks" +description: "Run custom actions (shell commands, HTTP webhooks) at install, update, and uninstall time." +sidebar: + order: 12 +--- + +APM supports **lifecycle hooks** -- custom actions that fire automatically +at key moments during install, update, and uninstall operations. Hooks are +fire-and-forget: a failing hook never blocks the CLI. + +Hooks are defined in standalone JSON files and discovered from well-known +directories, following the same pattern as GitHub Copilot CLI hooks. + +## Supported events + +| Event | Fires when | +|------------------|--------------------------------------| +| `pre-install` | Before the install pipeline runs | +| `post-install` | After a successful install completes | +| `pre-update` | Before the update pipeline runs | +| `post-update` | After a successful update completes | +| `pre-uninstall` | Before uninstall begins | +| `post-uninstall` | After a successful uninstall | + +## Hook file format + +Hook files are JSON with a versioned schema: + +```json +{ + "version": 1, + "hooks": { + "post-install": [ + { + "type": "command", + "bash": "./scripts/notify.sh", + "timeoutSec": 10 + }, + { + "type": "http", + "url": "https://analytics.corp.net/apm/events", + "headers": { "Authorization": "Bearer $APM_ANALYTICS_TOKEN" }, + "timeoutSec": 5 + } + ] + } +} +``` + +## Hook types + +### Command + +Run a shell command. The event payload is piped via **stdin** as JSON: + +```json +{ + "type": "command", + "bash": "./scripts/notify.sh", + "cwd": "./scripts", + "env": { "LOG_LEVEL": "INFO" }, + "timeoutSec": 10 +} +``` + +Fields: +- `bash` -- command string for bash (use this on Linux/macOS) +- `command` -- fallback command string (cross-platform) +- `cwd` -- working directory (relative paths resolve against project root) +- `env` -- extra environment variables merged into the process env +- `timeoutSec` -- execution timeout (default: 30s) + +If both `bash` and `command` are present, `bash` takes priority. + +### HTTP + +Send a JSON POST to an HTTPS endpoint: + +```json +{ + "type": "http", + "url": "https://analytics.corp.net/apm/events", + "headers": { "Authorization": "Bearer $APM_ANALYTICS_TOKEN" }, + "timeoutSec": 5 +} +``` + +Fields: +- `url` -- HTTPS endpoint (**http:// is rejected**) +- `headers` -- request headers; values support `$ENV_VAR` expansion +- `timeoutSec` -- request timeout (default: 10s) + +Security: +- **HTTPS only** -- `http://` URLs are rejected +- **No redirects** -- redirect following is disabled +- Headers support env-var expansion (`$VAR` or `${VAR}`) + +## Discovery locations + +Hook files are loaded from three directories. All files are **additive** -- +every hook from every file runs. Policy hooks cannot be disabled. + +| Priority | Path | Who controls | +|--------------|-----------------------------|------------------| +| 1 (highest) | `/etc/apm/policy.d/*.json` | Platform/IT team | +| 2 | `~/.apm/hooks/*.json` | Individual user | +| 3 | `.apm/hooks.json` | Project | + +Policy and user sources are directories (all `*.json` files are loaded). +The project source is a single file. + +## Event payload + +Command hooks receive JSON on **stdin**. HTTP hooks receive it as the +POST body. + +```json +{ + "schema_version": 1, + "event": "post-install", + "packages": [ + { "name": "org/repo", "reference": "v1.0.0" } + ], + "scope": "project", + "timestamp": "2026-06-13T14:50:15Z", + "cli_version": "0.14.1", + "working_directory": "/path/to/project" +} +``` + +## Analytics use case + +The canonical use case for lifecycle hooks is installation analytics. +An enterprise platform team can deploy an org-wide webhook via the +policy directory to track which packages are actively used: + +Create `/etc/apm/policy.d/analytics.json`: + +```json +{ + "version": 1, + "hooks": { + "post-install": [ + { + "type": "http", + "url": "https://analytics.internal.company.com/apm/events", + "headers": { "Authorization": "Bearer $APM_ANALYTICS_TOKEN" } + } + ], + "post-uninstall": [ + { + "type": "http", + "url": "https://analytics.internal.company.com/apm/events", + "headers": { "Authorization": "Bearer $APM_ANALYTICS_TOKEN" } + } + ] + } +} +``` + +Set the token in CI: + +```bash +export APM_ANALYTICS_TOKEN="your-bearer-token" +apm install +``` + +The webhook receives a JSON payload for every install and uninstall, +enabling dashboards that show adoption, version drift, and removal +trends -- without any changes to individual project configurations. + +## Security considerations + +- HTTP hook URLs must use `https://`. +- Tokens are never stored in hook files -- use env-var expansion in headers. +- All hooks are fire-and-forget with configurable timeouts (10s for HTTP, + 30s for commands by default). +- Hook failures are logged in verbose mode (`--verbose`) and never + block the CLI. + +## Hook output log + +Hook stdout, stderr, and execution status are appended to a log file at +`~/.apm/logs/hooks.log` (or `$APM_HOME/logs/hooks.log`). This lets +administrators audit hook behaviour without enabling verbose CLI output. + +Each entry includes a UTC timestamp, event name, hook type, target +command or URL, status, exit code (for commands), and any captured output: + +``` +[2026-06-16T08:25:43Z] event=pre-install type=command target=echo 'Check passed' status=ok exit_code=0 + stdout: Check passed + +[2026-06-16T08:25:44Z] event=post-install type=http target=https://analytics.corp.net/events status=ok + stdout: HTTP 200 +``` + +The log file is created automatically on first hook execution. + +## CLI commands + +APM provides three commands to work with lifecycle hooks: + +### ``apm hooks`` -- list discovered hooks + +Run without a sub-command to see all hooks discovered from policy, user, +and project directories: + +```bash +apm hooks +``` + +### ``apm hooks init`` -- scaffold a starter hook file + +Generate a starter JSON hook file at ``.apm/hooks.json``: + +```bash +apm hooks init # creates .apm/hooks.json +apm hooks init --force # overwrite existing file +``` + +### ``apm hooks validate`` -- check hook files for errors + +Validate all discovered hook files across policy, user, and project +directories. Reports schema errors, unknown events, missing fields, and +non-HTTPS URLs: + +```bash +apm hooks validate +``` + +Exits with a non-zero code if any errors are found. + +### ``apm hooks test`` -- dry-run a synthetic event + +Fire a synthetic event through all discovered hooks to verify wiring +without performing a real install/update/uninstall: + +```bash +apm hooks test # fires post-install (default) +apm hooks test pre-uninstall # fires a specific event +``` + +Hook output is written to ``~/.apm/logs/hooks.log`` as usual. diff --git a/docs/src/content/docs/enterprise/security.md b/docs/src/content/docs/enterprise/security.md index 96441c122..79a5c37fc 100644 --- a/docs/src/content/docs/enterprise/security.md +++ b/docs/src/content/docs/enterprise/security.md @@ -29,8 +29,8 @@ APM is a build-time dependency manager for AI agent configuration. It performs f APM has no runtime footprint. Once `apm install` or `apm compile` completes, the process exits. - **No runtime component.** APM generates files then terminates. It does not run alongside your application. -- **No network calls after install.** All network activity (git clone/fetch) occurs during dependency resolution. There are no callbacks, webhooks, or phone-home requests. -- **No arbitrary code execution.** APM does not execute scripts from packages, evaluate expressions in templates, or run downloaded code. (**Canvas exception:** the experimental `canvas` primitive deploys executable `extension.mjs` (Node.js) code to `.github/extensions/` or `~/.copilot/extensions/`; this surface is gated by both the `canvas` experimental flag and `--trust-canvas-extensions` for dependency-provided canvases. See [Canvas extensions](/apm/integrations/canvas/).) +- **No network calls after install (by default).** All network activity (git clone/fetch) occurs during dependency resolution. Opt-in [lifecycle hooks](/apm/enterprise/lifecycle-hooks/) may send HTTPS POST requests if explicitly configured by the user or policy administrator. +- **No arbitrary code execution (by default).** APM does not execute scripts from packages, evaluate expressions in templates, or run downloaded code. Opt-in [lifecycle hooks](/apm/enterprise/lifecycle-hooks/) may execute shell commands if explicitly configured; credential variables are stripped from the hook environment. (**Canvas exception:** the experimental `canvas` primitive deploys executable `extension.mjs` (Node.js) code to `.github/extensions/` or `~/.copilot/extensions/`; this surface is gated by both the `canvas` experimental flag and `--trust-canvas-extensions` for dependency-provided canvases. See [Canvas extensions](/apm/integrations/canvas/).) - **No access to application data.** APM never reads databases, API responses, application state, or user data. - **No persistent background processes.** APM does not install daemons, services, or scheduled tasks. - **No telemetry or data collection.** APM collects no usage data, analytics, or diagnostics. Nothing is transmitted to Microsoft or any third party. diff --git a/packages/apm-guide/.apm/skills/apm-usage/commands.md b/packages/apm-guide/.apm/skills/apm-usage/commands.md index 89f895ef7..798f4e643 100644 --- a/packages/apm-guide/.apm/skills/apm-usage/commands.md +++ b/packages/apm-guide/.apm/skills/apm-usage/commands.md @@ -86,6 +86,17 @@ When `apm install --target copilot` has already deployed instructions to `.githu **External scanners (experimental, behind `apm experimental enable external-scanners`).** `--external NAME` runs a third-party SARIF scanner (e.g. `skillspector`) and merges its findings. `--external-llm/--no-external-llm` toggles LLM-powered analysis (default off; sends scanned content to a third-party API, so APM prints a `[!]` egress banner and forwards `OPENAI_API_KEY`/`NVIDIA_INFERENCE_KEY` only when on). `--external-args TEXT` is a single shlex-split string of extra scanner flags, validated against a per-adapter allowlist -- non-allowlisted flags, secret-looking flags, and out-of-cwd paths are rejected fail-closed. `--external-llm`/`--external-args` without `--external` is a usage error (exit 2). Scanner configuration or infrastructure errors (feature disabled, scanner not found, malformed SARIF) exit **3**. Persist defaults with `apm config set external..llm true` and `apm config set external..args -- "--model gpt-4o"`. Precedence: CLI > config > policy floor. +## Lifecycle hooks + +| Command | Purpose | Key flags | +|---------|---------|-----------| +| `apm hooks` | List all discovered lifecycle hooks across policy, user, and project sources | -- | +| `apm hooks init` | Scaffold a starter `.apm/hooks.json` file in the current project | `--force` (overwrite existing file) | +| `apm hooks test EVENT` | Fire a synthetic event through all discovered hooks (dry-run) | `--verbose` | +| `apm hooks validate` | Check all discovered hook files for schema errors, unknown events, missing fields, and non-HTTPS URLs | -- | + +Lifecycle hooks fire on six events: `pre-install`, `post-install`, `pre-update`, `post-update`, `pre-uninstall`, `post-uninstall`. Hook files are standalone JSON discovered from three sources (additive): policy (`/etc/apm/policy.d/*.json`), user (`~/.apm/hooks/*.json`), project (`.apm/hooks.json`). Two hook types: `command` (shell via subprocess, event JSON on stdin) and `http` (HTTPS POST). Hook output is appended to `~/.apm/logs/hooks.log`. See the [Lifecycle hooks](/apm/enterprise/lifecycle-hooks/) guide for full documentation. + ## Distribution | Command | Purpose | Key flags | diff --git a/src/apm_cli/cli.py b/src/apm_cli/cli.py index 9c7b3b350..0c9358d5b 100644 --- a/src/apm_cli/cli.py +++ b/src/apm_cli/cli.py @@ -27,6 +27,7 @@ from apm_cli.commands.doctor import doctor from apm_cli.commands.experimental import experimental from apm_cli.commands.find import find as find_cmd +from apm_cli.commands.hooks import hooks from apm_cli.commands.init import init from apm_cli.commands.install import install from apm_cli.commands.list_cmd import list as list_cmd @@ -187,6 +188,7 @@ def cli(ctx, verbose: bool) -> None: cli.add_command(policy) cli.add_command(outdated_cmd, name="outdated") cli.add_command(doctor) +cli.add_command(hooks) cli.add_command(marketplace) cli.add_command(find_cmd) cli.add_command(marketplace_search, name="search") diff --git a/src/apm_cli/commands/hooks.py b/src/apm_cli/commands/hooks.py new file mode 100644 index 000000000..ff612fb48 --- /dev/null +++ b/src/apm_cli/commands/hooks.py @@ -0,0 +1,369 @@ +"""``apm hooks`` -- inspect, test, and scaffold lifecycle hooks. + +Sub-commands: + +* ``apm hooks`` -- list discovered hooks +* ``apm hooks test`` -- dry-run a synthetic event through all hooks +* ``apm hooks init`` -- scaffold a starter hook JSON file +* ``apm hooks validate`` -- check all hook files for errors +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path +from urllib.parse import urlparse + +import click + +from apm_cli.utils.console import ( + STATUS_SYMBOLS, + _rich_echo, + _rich_error, + _rich_info, + _rich_success, + _rich_warning, +) + +# Default scaffold template. +_INIT_TEMPLATE = { + "version": 1, + "hooks": { + "post-install": [ + { + "type": "command", + "bash": "echo 'Installed:' && cat", + "timeoutSec": 10, + } + ], + }, +} + + +@click.group( + invoke_without_command=True, + help="Inspect, test, and scaffold lifecycle hooks.", +) +@click.pass_context +def hooks(ctx: click.Context) -> None: + """List discovered lifecycle hooks when invoked without a sub-command.""" + if ctx.invoked_subcommand is not None: + return + + from apm_cli.core.lifecycle_hooks import discover_hooks + + project_root = str(Path.cwd()) + entries = discover_hooks(project_root=project_root) + + if not entries: + _rich_info("No lifecycle hooks discovered.", symbol="info") + _rich_echo( + " Create one with: apm hooks init", + style="dim", + ) + return + + _rich_echo( + f"{STATUS_SYMBOLS['check']} Discovered {len(entries)} hook(s):\n", + style="green", + ) + + try: + from rich.table import Table + + from apm_cli.utils.console import _get_console + + console = _get_console() + if console is None: + raise ImportError("Rich console unavailable") + + table = Table( + title="Lifecycle Hooks", + show_header=True, + header_style="bold cyan", + ) + table.add_column("Event", style="bold white") + table.add_column("Type", style="cyan") + table.add_column("Target", style="white") + table.add_column("Source", style="dim") + + for entry in entries: + target = entry.url or entry.effective_command or "(none)" + table.add_row(entry.event, entry.hook_type, target, entry.source) + + console.print(table) + except (ImportError, NameError): + for entry in entries: + target = entry.url or entry.effective_command or "(none)" + click.echo(f" {entry.event:20s} {entry.hook_type:10s} {target} ({entry.source})") + + +@hooks.command( + name="test", + help="Dry-run a synthetic lifecycle event through discovered hooks.", +) +@click.argument( + "event", + required=False, + default="post-install", + type=click.Choice( + [ + "pre-install", + "post-install", + "pre-update", + "post-update", + "pre-uninstall", + "post-uninstall", + ], + case_sensitive=False, + ), +) +@click.option("--verbose", "-v", is_flag=True, help="Show detailed output") +def hooks_test(event: str, verbose: bool) -> None: + """Fire a synthetic event through all discovered hooks.""" + from apm_cli.core.lifecycle_hooks import ( + LifecycleEvent, + PackageInfo, + build_runner_from_context, + ) + + project_root = str(Path.cwd()) + runner = build_runner_from_context(project_root=project_root, verbose=verbose) + + matching = runner.hooks_for_event(event) + if not matching: + _rich_warning( + f"No hooks registered for '{event}'. Create one with: apm hooks init", + symbol="warning", + ) + return + + _rich_info( + f"Firing synthetic '{event}' event ({len(matching)} hook(s))...", + symbol="gear", + ) + + synthetic_event = LifecycleEvent.create( + event=event, + packages=[PackageInfo(name="test/synthetic-package", reference="v0.0.0-test")], + scope="project", + working_directory=project_root, + ) + + threads = runner.fire(event, synthetic_event) + + # Drain HTTP daemon threads so log entries are written before exit. + for t in threads: + t.join(timeout=15) + + _rich_success( + f"'{event}' event fired. Check ~/.apm/logs/hooks.log for output.", + symbol="check", + ) + + +@hooks.command( + name="init", + help="Scaffold a starter hook JSON file at .apm/hooks.json.", +) +@click.option("--force", is_flag=True, help="Overwrite if file already exists.") +def hooks_init(force: bool) -> None: + """Create a starter hook JSON file in the project.""" + apm_dir = Path.cwd() / ".apm" + target_file = apm_dir / "hooks.json" + + if target_file.exists() and not force: + _rich_warning( + f"Hook file already exists: {target_file.relative_to(Path.cwd())}", + symbol="warning", + ) + _rich_echo(" Use --force to overwrite.", style="dim") + return + + apm_dir.mkdir(parents=True, exist_ok=True) + + content = json.dumps(_INIT_TEMPLATE, indent=2) + "\n" + target_file.write_text(content, encoding="utf-8") + + _rich_success( + f"Created hook file: {target_file.relative_to(Path.cwd())}", + symbol="check", + ) + _rich_echo("") + + try: + from rich.panel import Panel + + from apm_cli.utils.console import _get_console + + console = _get_console() + if console is None: + raise ImportError("Rich console unavailable") + + console.print( + Panel( + "[bold]Next steps:[/bold]\n\n" + f" 1. Edit [cyan]{target_file.relative_to(Path.cwd())}[/cyan] " + "to add your hooks\n" + " 2. Run [cyan]apm hooks validate[/cyan] to check for errors\n" + " 3. Run [cyan]apm hooks test post-install[/cyan] to dry-run\n", + title="Getting Started", + style="cyan", + ) + ) + except (ImportError, NameError): + click.echo("Next steps:") + click.echo(f" 1. Edit {target_file.relative_to(Path.cwd())} to add your hooks") + click.echo(" 2. Run `apm hooks validate` to check for errors") + click.echo(" 3. Run `apm hooks test post-install` to dry-run") + + +@hooks.command( + name="validate", + help="Validate all discovered hook files for errors.", +) +def hooks_validate() -> None: + """Check all hook JSON files across discovery sources for errors.""" + from apm_cli.core.lifecycle_hooks import ( + _get_policy_hooks_dir, + _get_project_hooks_file, + _get_user_hooks_dir, + ) + + project_root = str(Path.cwd()) + dirs = [ + ("policy", _get_policy_hooks_dir()), + ("user", _get_user_hooks_dir()), + ] + + total_files = 0 + total_errors = 0 + total_hooks = 0 + + def _process_file(json_file: Path, source: str) -> None: + nonlocal total_files, total_errors, total_hooks + total_files += 1 + errors = _validate_hook_file(json_file, source) + if errors: + total_errors += len(errors) + rel = json_file.relative_to(Path.cwd()) if source == "project" else json_file + _rich_error(f"{rel}:", symbol="error") + for err in errors: + _rich_echo(f" {err}", style="red") + else: + try: + data = json.loads(json_file.read_text(encoding="utf-8")) + hook_count = sum( + len(v) for v in data.get("hooks", {}).values() if isinstance(v, list) + ) + total_hooks += hook_count + except Exception: + pass + + # Check directory-based sources (policy, user). + for source, directory in dirs: + if not directory.is_dir(): + continue + for json_file in sorted(directory.glob("*.json")): + if json_file.is_file(): + _process_file(json_file, source) + + # Check project-level single file. + project_file = _get_project_hooks_file(project_root) + if project_file.is_file(): + _process_file(project_file, "project") + + if total_files == 0: + _rich_info("No hook files found.", symbol="info") + _rich_echo(" Create one with: apm hooks init", style="dim") + return + + if total_errors > 0: + _rich_error( + f"{total_errors} error(s) in {total_files} file(s).", + symbol="error", + ) + sys.exit(1) + else: + _rich_success( + f"All {total_files} hook file(s) valid ({total_hooks} hook(s) configured).", + symbol="check", + ) + + +def _validate_hook_file(path: Path, source: str) -> list[str]: + """Validate a single hook JSON file. Returns a list of error messages.""" + from apm_cli.core.lifecycle_hooks import ( + HOOK_FILE_VERSION, + HOOK_TYPES, + LIFECYCLE_EVENTS, + ) + + errors: list[str] = [] + + try: + raw = path.read_text(encoding="utf-8") + except OSError as e: + return [f"Cannot read file: {e}"] + + try: + data = json.loads(raw) + except json.JSONDecodeError as e: + return [f"Invalid JSON: {e}"] + + if not isinstance(data, dict): + return ["Root must be a JSON object"] + + version = data.get("version") + if version is None: + errors.append("Missing 'version' field") + elif version != HOOK_FILE_VERSION: + errors.append(f"Unsupported version: {version} (expected {HOOK_FILE_VERSION})") + + hooks_dict = data.get("hooks") + if hooks_dict is None: + errors.append("Missing 'hooks' field") + return errors + + if not isinstance(hooks_dict, dict): + errors.append("'hooks' must be a JSON object") + return errors + + for event_name, hook_list in hooks_dict.items(): + if event_name not in LIFECYCLE_EVENTS: + errors.append(f"Unknown event: '{event_name}'") + continue + + if not isinstance(hook_list, list): + errors.append(f"'{event_name}' must be an array") + continue + + for i, entry in enumerate(hook_list): + prefix = f"{event_name}[{i}]" + + if not isinstance(entry, dict): + errors.append(f"{prefix}: must be a JSON object") + continue + + hook_type = entry.get("type", "command") + if hook_type not in HOOK_TYPES: + errors.append(f"{prefix}: unknown type '{hook_type}'") + continue + + if hook_type == "command": + if not entry.get("bash") and not entry.get("command"): + errors.append(f"{prefix}: command hook needs 'bash' or 'command' field") + + elif hook_type == "http": + url = entry.get("url") + if not url: + errors.append(f"{prefix}: http hook needs 'url' field") + else: + parsed = urlparse(url) + if parsed.scheme.lower() != "https": + errors.append(f"{prefix}: URL must use https:// scheme") + if parsed.username or parsed.password: + errors.append(f"{prefix}: URL must not contain embedded credentials") + + return errors diff --git a/src/apm_cli/commands/uninstall/cli.py b/src/apm_cli/commands/uninstall/cli.py index cff62d270..2878c7b32 100644 --- a/src/apm_cli/commands/uninstall/cli.py +++ b/src/apm_cli/commands/uninstall/cli.py @@ -83,6 +83,17 @@ def uninstall(ctx, packages, dry_run, verbose, global_): logger.start(f"Uninstalling {len(packages)} package(s)...") + # Fire pre-uninstall lifecycle hooks + _fire_uninstall_hooks( + "pre-uninstall", + packages=packages, + scope=scope, + manifest_path=manifest_path, + logger=logger, + verbose=verbose, + deploy_root=deploy_root, + ) + # Read current apm.yml from ...utils.yaml_io import dump_yaml, load_yaml @@ -283,6 +294,59 @@ def uninstall(ctx, packages, dry_run, verbose, global_): if packages_not_found: logger.warning(f"Note: {len(packages_not_found)} package(s) were not found in apm.yml") + # Fire post-uninstall lifecycle hooks + _fire_uninstall_hooks( + "post-uninstall", + packages=packages_to_remove, + scope=scope, + manifest_path=manifest_path, + logger=logger, + verbose=verbose, + deploy_root=deploy_root, + ) + except Exception as e: logger.error(f"Error uninstalling packages: {e}") sys.exit(1) + + +def _fire_uninstall_hooks( + event_name: str, + *, + packages, + scope, + manifest_path, + logger, + verbose: bool, + deploy_root, +) -> None: + """Build a hook runner and fire an uninstall lifecycle event. + + Best-effort: all exceptions are swallowed so hooks never block + the uninstall flow. + """ + import contextlib + + with contextlib.suppress(Exception): + from apm_cli.core.lifecycle_hooks import ( + LifecycleEvent, + PackageInfo, + build_runner_from_context, + ) + + runner = build_runner_from_context( + logger=logger, + verbose=verbose, + project_root=str(deploy_root), + ) + + pkg_infos = [PackageInfo(name=str(pkg)) for pkg in packages] + scope_name = scope.value if hasattr(scope, "value") else str(scope) + event = LifecycleEvent.create( + event=event_name, + packages=pkg_infos, + scope=scope_name, + working_directory=str(deploy_root), + ) + + runner.fire(event_name, event) diff --git a/src/apm_cli/commands/update.py b/src/apm_cli/commands/update.py index c91373272..2b9ff0eda 100644 --- a/src/apm_cli/commands/update.py +++ b/src/apm_cli/commands/update.py @@ -53,7 +53,7 @@ import sys from dataclasses import dataclass from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import click from git.exc import GitCommandError @@ -81,6 +81,7 @@ from ._helpers import UnknownPackageError, _find_apm_yml, resolve_requested_packages if TYPE_CHECKING: + from ..core.command_logger import CommandLogger from ..models.dependency.reference import DependencyReference @@ -527,6 +528,15 @@ def _plan_callback(plan: UpdatePlan) -> bool: return _confirm_plan_application() try: + # Fire pre-update lifecycle hooks + _fire_update_hooks( + "pre-update", + apm_package=staged_apm_package, + scope=scope, + logger=logger, + verbose=verbose, + ) + result = _install_apm_dependencies( staged_apm_package, update_refs=True, @@ -593,5 +603,62 @@ def _plan_callback(plan: UpdatePlan) -> bool: else: _rich_success("No dependency changes were applied.") + # Fire post-update lifecycle hooks + _fire_update_hooks( + "post-update", + apm_package=staged_apm_package, + scope=scope, + logger=logger, + verbose=verbose, + ) + + +def _fire_update_hooks( + event_name: str, + *, + apm_package: Any, + scope: Any, + logger: CommandLogger | None, + verbose: bool, +) -> None: + """Build a hook runner and fire an update lifecycle event. + + Best-effort: all exceptions are swallowed so hooks never block + the update flow. + """ + import contextlib + + with contextlib.suppress(Exception): + from apm_cli.core.lifecycle_hooks import ( + LifecycleEvent, + PackageInfo, + build_runner_from_context, + ) + + project_root = None + pkg_path = getattr(apm_package, "package_path", None) + if pkg_path is not None: + project_root = str(pkg_path) + + runner = build_runner_from_context( + logger=logger, + verbose=verbose, + project_root=project_root, + ) + + pkg_infos = [] + for dep in apm_package.get_apm_dependencies(): + pkg_infos.append(PackageInfo(name=dep.repo_url or str(dep), reference=dep.reference)) + + scope_name = scope.value if hasattr(scope, "value") else str(scope) + event = LifecycleEvent.create( + event=event_name, + packages=pkg_infos, + scope=scope_name, + working_directory=project_root, + ) + + runner.fire(event_name, event) + __all__ = ["update"] diff --git a/src/apm_cli/core/hook_executors.py b/src/apm_cli/core/hook_executors.py new file mode 100644 index 000000000..14d244232 --- /dev/null +++ b/src/apm_cli/core/hook_executors.py @@ -0,0 +1,310 @@ +"""Lifecycle hook executors -- one per action type. + +Each executor is fire-and-forget: it catches all exceptions internally +and logs failures in verbose mode only (using ``[i]`` ASCII symbol). + +Two hook types (Copilot CLI aligned): + +- ``command`` -- shell command via subprocess, event JSON on **stdin** +- ``http`` -- HTTPS POST with JSON body, env-var expansion in headers + +Hook output is appended to ``~/.apm/logs/hooks.log`` so administrators +can audit what hooks produce without enabling verbose CLI output. +""" + +from __future__ import annotations + +import logging +import os +import re +import subprocess +import threading +from datetime import datetime, timezone +from pathlib import Path +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +if TYPE_CHECKING: + from apm_cli.core.command_logger import CommandLogger + from apm_cli.core.lifecycle_hooks import HookEntry, LifecycleEvent + +_logger = logging.getLogger(__name__) + +# Fallback timeouts when hook entry does not specify one. +_DEFAULT_HTTP_TIMEOUT = 10 +_DEFAULT_COMMAND_TIMEOUT = 30 + +# Pattern for $VAR or ${VAR} expansion in header values. +_ENV_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)\}|\$([A-Za-z_][A-Za-z0-9_]*)") + +# Credential variable denylist -- these must never be expanded into HTTP +# headers or leaked to hook subprocesses. Matches names that END with these +# suffixes (e.g. GITHUB_APM_PAT, API_KEY) but not unrelated names like PATH. +_CREDENTIAL_DENYLIST = re.compile( + r"(?:_|^)(?:TOKEN|SECRET|PAT|KEY|PASSWORD|CREDENTIAL)(?:_|$)", re.IGNORECASE +) + + +def _redact_url_credentials(url: str) -> str: + """Strip ``user:password@`` from a URL before logging.""" + try: + parsed = urlparse(url) + if not parsed.netloc or "@" not in parsed.netloc: + return url + host = parsed.hostname or "" + if parsed.port is not None: + host = f"{host}:{parsed.port}" + return parsed._replace(netloc=host).geturl() + except (ValueError, TypeError): + return url + + +# -- Hook output log ------------------------------------------------------- + + +def _get_hooks_log_path() -> Path: + """Return the path to the hooks output log file.""" + apm_home = os.environ.get("APM_HOME") + base = Path(apm_home) if apm_home else Path.home() / ".apm" + return base / "logs" / "hooks.log" + + +def _append_to_hook_log( + event_name: str, + hook_type: str, + target: str, + *, + stdout: str = "", + stderr: str = "", + status: str = "ok", + exit_code: int | None = None, +) -> None: + """Append a timestamped entry to the hooks log file. + + Creates ``~/.apm/logs/`` on first write. Errors are silently + swallowed -- logging must never break the CLI. + """ + try: + log_path = _get_hooks_log_path() + log_path.parent.mkdir(parents=True, exist_ok=True) + + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + lines = [f"[{ts}] event={event_name} type={hook_type} target={target} status={status}"] + if exit_code is not None: + lines[0] += f" exit_code={exit_code}" + if stdout and stdout.strip(): + lines.append(f" stdout: {stdout.strip()}") + if stderr and stderr.strip(): + lines.append(f" stderr: {stderr.strip()}") + lines.append("") # blank line separator + + with open(log_path, "a", encoding="utf-8") as f: + f.write("\n".join(lines) + "\n") + except Exception: + _logger.debug("Failed to write to hooks log", exc_info=True) + + +def execute_hook( + hook: HookEntry, + event: LifecycleEvent, + *, + logger: CommandLogger | None = None, + verbose: bool = False, + project_root: str | None = None, +) -> threading.Thread | None: + """Dispatch to the correct executor based on hook type. + + Returns the daemon thread for HTTP hooks (so callers can optionally + join it), or None for command hooks and no-ops. + """ + if hook.hook_type == "http": + return _execute_http(hook, event, logger=logger, verbose=verbose) + elif hook.hook_type == "command": + _execute_command(hook, event, logger=logger, verbose=verbose, project_root=project_root) + return None + + +# -- HTTP executor ---------------------------------------------------------- + + +def _expand_env_vars(value: str) -> str: + """Expand ``$VAR`` and ``${VAR}`` references in *value*. + + Variables whose names match the credential denylist pattern + (TOKEN, SECRET, PAT, KEY, PASSWORD, CREDENTIAL) are never expanded + -- they resolve to an empty string to prevent accidental exfiltration. + """ + + def _replace(match: re.Match) -> str: + var_name = match.group(1) or match.group(2) + if _CREDENTIAL_DENYLIST.search(var_name): + _logger.debug("Blocked credential variable expansion: %s", var_name) + return "" + return os.environ.get(var_name, "") + + return _ENV_VAR_PATTERN.sub(_replace, value) + + +def _execute_http( + hook: HookEntry, + event: LifecycleEvent, + *, + logger: CommandLogger | None = None, + verbose: bool = False, +) -> threading.Thread | None: + """Send an HTTP POST to the hook URL in a daemon thread. + + Returns the started thread so callers can optionally join it. + + Security hardening: + - HTTPS-only (rejects ``http://``) + - No redirect following + - Configurable timeout (default 10s) + - Header values support ``$ENV_VAR`` expansion (credential vars blocked) + """ + url = hook.url + if not url: + _logger.debug("HTTP hook has no URL, skipping") + return None + + parsed = urlparse(url) + if parsed.scheme != "https": + if verbose and logger: + logger.verbose_detail( + f"[i] HTTP hook rejected: URL must use https (got {parsed.scheme}://)" + ) + _logger.debug("Rejecting non-HTTPS hook URL: %s", url) + return None + + if not parsed.hostname: + _logger.debug("HTTP hook URL has no hostname: %s", url) + return None + + # Build headers with env-var expansion. + request_headers: dict[str, str] = {"Content-Type": "application/json"} + if hook.headers: + for key, val in hook.headers.items(): + request_headers[key] = _expand_env_vars(val) + + payload = event.to_json() + timeout = hook.effective_timeout + hostname = parsed.hostname + + event_name = event.event + safe_url = _redact_url_credentials(url) + + def _send() -> None: + try: + import requests + + resp = requests.post( + url, + data=payload, + headers=request_headers, + timeout=timeout, + allow_redirects=False, + ) + _append_to_hook_log( + event_name, + "http", + safe_url, + stdout=f"HTTP {resp.status_code}", + status="ok" if resp.ok else "error", + ) + except Exception as exc: + _logger.debug("HTTP POST failed for %s", safe_url, exc_info=True) + _append_to_hook_log(event_name, "http", safe_url, stderr=str(exc), status="error") + + thread = threading.Thread(target=_send, daemon=True) + thread.start() + + if verbose and logger: + logger.verbose_detail(f"[i] {event.event} event dispatched to {hostname}") + + return thread + + +# -- Command executor ------------------------------------------------------- + + +def _execute_command( + hook: HookEntry, + event: LifecycleEvent, + *, + logger: CommandLogger | None = None, + verbose: bool = False, + project_root: str | None = None, +) -> None: + """Execute a shell command with the event payload on stdin. + + Command hooks run synchronously (bounded by ``timeout``), unlike HTTP + hooks which fire in a background thread. The timeout default is 30s + per hook -- multiple slow hooks can accumulate, but each is capped. + """ + cmd = hook.effective_command + if not cmd: + _logger.debug("Command hook has no command string, skipping") + return + + env = _build_hook_env(hook) + timeout = hook.effective_timeout + cwd = _resolve_cwd(hook, project_root) + + try: + result = subprocess.run( + cmd, + shell=True, + env=env, + input=event.to_json(), + timeout=timeout, + capture_output=True, + text=True, + cwd=cwd, + ) + _append_to_hook_log( + event.event, + "command", + cmd, + stdout=result.stdout, + stderr=result.stderr, + exit_code=result.returncode, + status="ok" if result.returncode == 0 else "error", + ) + except subprocess.TimeoutExpired: + _logger.debug("Command hook timed out: %s", cmd) + _append_to_hook_log(event.event, "command", cmd, status="timeout") + if verbose and logger: + logger.verbose_detail(f"[i] Lifecycle command hook timed out: {cmd}") + except Exception as exc: + _logger.debug("Command hook failed: %s", cmd, exc_info=True) + _append_to_hook_log(event.event, "command", cmd, stderr=str(exc), status="error") + if verbose and logger: + logger.verbose_detail(f"[i] Lifecycle command hook failed: {cmd}") + + +# -- Helpers ---------------------------------------------------------------- + + +def _build_hook_env(hook: HookEntry) -> dict[str, str]: + """Build the environment dict for command hooks. + + Inherits the current process environment but strips any variables + whose names match the credential denylist (TOKEN, SECRET, PAT, KEY, + PASSWORD, CREDENTIAL) to prevent accidental exfiltration via hooks. + """ + env = {k: v for k, v in os.environ.items() if not _CREDENTIAL_DENYLIST.search(k)} + if hook.env: + env.update(hook.env) + return env + + +def _resolve_cwd(hook: HookEntry, project_root: str | None) -> str | None: + """Resolve the working directory for a command hook.""" + if not hook.cwd: + return project_root + from pathlib import Path + + if Path(hook.cwd).is_absolute(): + return hook.cwd + root = Path(project_root) if project_root else Path.cwd() + return str(root / hook.cwd) diff --git a/src/apm_cli/core/lifecycle_hooks.py b/src/apm_cli/core/lifecycle_hooks.py new file mode 100644 index 000000000..74b5dc950 --- /dev/null +++ b/src/apm_cli/core/lifecycle_hooks.py @@ -0,0 +1,367 @@ +"""Lifecycle hook models, runner, and discovery. + +APM supports lifecycle hooks that fire at key moments during install, +update, and uninstall operations. Hooks are configured via standalone +JSON files discovered from three directories (Copilot CLI pattern): + +1. **Policy** -- ``/etc/apm/policy.d/*.json`` (admin-owned, cannot be disabled) +2. **User** -- ``~/.apm/hooks/*.json`` +3. **Project** -- ``.apm/hooks.json`` (single file) + +Each file uses ``{ "version": 1, "hooks": { "": [...] } }``. + +Two hook types are supported: + +- ``command`` -- shell command (``bash`` / ``command`` fields) +- ``http`` -- HTTPS POST to a URL with optional headers + +All hooks are **fire-and-forget**: failures are logged in verbose mode +but never block the CLI. +""" + +from __future__ import annotations + +import json +import logging +import os +import platform +import threading +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from apm_cli.core.command_logger import CommandLogger + +_logger = logging.getLogger(__name__) + +# Supported lifecycle event names. +LIFECYCLE_EVENTS = ( + "pre-install", + "post-install", + "pre-update", + "post-update", + "pre-uninstall", + "post-uninstall", +) + +# Supported hook action types (Copilot CLI aligned). +HOOK_TYPES = ("command", "http") + +# Current hook-file schema version. +HOOK_FILE_VERSION = 1 + + +# -- Event model ----------------------------------------------------------- + + +@dataclass +class PackageInfo: + """Minimal package metadata carried in lifecycle events.""" + + name: str + reference: str | None = None + + +@dataclass +class LifecycleEvent: + """Data payload passed to every lifecycle hook. + + HTTP hooks receive this as a JSON POST body. Command hooks + receive it via **stdin** (JSON-encoded). + """ + + schema_version: int = 1 + event: str = "" + packages: list[PackageInfo] = field(default_factory=list) + scope: str = "project" + timestamp: str = "" + cli_version: str = "" + working_directory: str = "" + + def to_json(self) -> str: + """Serialise the event to a compact JSON string.""" + return json.dumps(asdict(self), separators=(",", ":")) + + @staticmethod + def create( + event: str, + packages: list[PackageInfo] | None = None, + scope: str = "project", + working_directory: str | None = None, + ) -> LifecycleEvent: + """Factory that auto-fills timestamp and CLI version.""" + from apm_cli.version import get_version + + return LifecycleEvent( + event=event, + packages=packages or [], + scope=scope, + timestamp=datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + cli_version=get_version(), + working_directory=working_directory or str(Path.cwd()), + ) + + +# -- Hook entry (one action inside a hook file) ---------------------------- + + +@dataclass +class HookEntry: + """One configured lifecycle hook action (Copilot CLI schema). + + Attributes: + hook_type: ``command`` or ``http``. + event: Lifecycle event name (e.g. ``post-install``). + bash: Shell command for Unix (``command`` type). + command: Cross-platform fallback command string. + url: HTTP endpoint URL (``http`` type). + headers: HTTP headers dict; values support ``$ENV_VAR`` expansion. + timeout_sec: Timeout in seconds (default 30 for commands, 10 for http). + cwd: Working directory for the command (relative or absolute). + env: Extra environment variables for the command. + source: Where this hook was defined: ``policy``, ``user``, + or ``project``. + source_file: Path of the JSON file that declared this hook. + """ + + hook_type: str + event: str + bash: str | None = None + command: str | None = None + url: str | None = None + headers: dict[str, str] | None = None + timeout_sec: int | None = None + cwd: str | None = None + env: dict[str, str] | None = None + source: str = "project" + source_file: str | None = None + + @property + def effective_command(self) -> str | None: + """Resolve the command to run on the current platform. + + On Windows, prefer ``command`` (cross-platform) over ``bash`` + because bash may not be available. On Unix, prefer ``bash``. + """ + if platform.system() == "Windows": + return self.command or self.bash + return self.bash or self.command + + @property + def effective_timeout(self) -> int: + """Return timeout_sec with sensible defaults per type.""" + if self.timeout_sec is not None: + return self.timeout_sec + return 10 if self.hook_type == "http" else 30 + + +# -- Hook file parsing ----------------------------------------------------- + + +def parse_hook_file(path: Path, source: str = "project") -> list[HookEntry]: + """Parse a single JSON hook file into a list of :class:`HookEntry`. + + Returns an empty list if the file is malformed or uses an + unsupported version. + """ + try: + with open(path, encoding="utf-8") as f: + data = json.load(f) + except (OSError, json.JSONDecodeError) as e: + _logger.debug("Failed to load hook file %s: %s", path, e) + return [] + + if not isinstance(data, dict): + return [] + + version = data.get("version") + if version != HOOK_FILE_VERSION: + _logger.debug("Skipping hook file %s: unsupported version %s", path, version) + return [] + + hooks_dict = data.get("hooks") + if not isinstance(hooks_dict, dict): + return [] + + entries: list[HookEntry] = [] + for event_name, hook_list in hooks_dict.items(): + if event_name not in LIFECYCLE_EVENTS: + _logger.debug("Ignoring unknown lifecycle event %s in %s", event_name, path) + continue + if not isinstance(hook_list, list): + continue + for raw in hook_list: + if not isinstance(raw, dict): + continue + hook_type = raw.get("type", "command") + if hook_type not in HOOK_TYPES: + _logger.debug("Ignoring unknown hook type %s in %s", hook_type, path) + continue + entries.append( + HookEntry( + hook_type=hook_type, + event=event_name, + bash=raw.get("bash"), + command=raw.get("command"), + url=raw.get("url"), + headers=raw.get("headers"), + timeout_sec=raw.get("timeoutSec") or raw.get("timeout"), + cwd=raw.get("cwd"), + env=raw.get("env"), + source=source, + source_file=str(path), + ) + ) + return entries + + +# -- Hook discovery --------------------------------------------------------- + + +def _get_policy_hooks_dir() -> Path: + """Return the platform-specific policy hooks directory.""" + system = platform.system() + if system == "Windows": + return Path(r"C:\ProgramData\APM\policy.d") + return Path("/etc/apm/policy.d") + + +def _get_user_hooks_dir() -> Path: + """Return the user-level hooks directory (~/.apm/hooks/).""" + apm_home = os.environ.get("APM_HOME") + if apm_home: + return Path(apm_home) / "hooks" + return Path.home() / ".apm" / "hooks" + + +def _get_project_hooks_file(project_root: str | None = None) -> Path: + """Return the project-level hooks file (``.apm/hooks.json``).""" + root = Path(project_root) if project_root else Path.cwd() + return root / ".apm" / "hooks.json" + + +def _load_hooks_from_dir( + directory: Path, + source: str, +) -> list[HookEntry]: + """Load all ``*.json`` hook files from *directory*, sorted by name.""" + if not directory.is_dir(): + return [] + entries: list[HookEntry] = [] + for json_file in sorted(directory.glob("*.json")): + if json_file.is_file(): + entries.extend(parse_hook_file(json_file, source=source)) + return entries + + +def discover_hooks( + project_root: str | None = None, +) -> list[HookEntry]: + """Discover and merge hooks from all three sources. + + Load order (all additive, policy first): + 1. Policy -- ``/etc/apm/policy.d/*.json`` (directory) + 2. User -- ``~/.apm/hooks/*.json`` (directory) + 3. Project -- ``.apm/hooks.json`` (single file) + """ + hooks: list[HookEntry] = [] + hooks.extend(_load_hooks_from_dir(_get_policy_hooks_dir(), source="policy")) + hooks.extend(_load_hooks_from_dir(_get_user_hooks_dir(), source="user")) + + project_file = _get_project_hooks_file(project_root) + if project_file.is_file(): + hooks.extend(parse_hook_file(project_file, source="project")) + + return hooks + + +# -- Hook runner ------------------------------------------------------------ + + +class LifecycleHookRunner: + """Collects hooks and fires them for lifecycle events. + + All execution is fire-and-forget with error isolation. + """ + + def __init__( + self, + hooks: list[HookEntry] | None = None, + logger: CommandLogger | None = None, + verbose: bool = False, + project_root: str | None = None, + ) -> None: + self._hooks = hooks or [] + self._logger = logger + self._verbose = verbose + self._project_root = project_root + + def fire(self, event_name: str, event: LifecycleEvent) -> list[threading.Thread]: + """Execute all hooks registered for *event_name*. + + Each hook runs in isolation -- a failure in one hook does not + prevent subsequent hooks from running. + + Returns a list of daemon threads started by HTTP hooks so + callers can optionally join them (e.g. for test/dry-run). + """ + from apm_cli.core.hook_executors import execute_hook + + matching = [h for h in self._hooks if h.event == event_name] + if not matching: + return [] + + threads: list[threading.Thread] = [] + for hook in matching: + try: + thread = execute_hook( + hook, + event, + logger=self._logger, + verbose=self._verbose, + project_root=self._project_root, + ) + if thread is not None: + threads.append(thread) + except Exception: + # Fire-and-forget: swallow all errors. + _logger.debug( + "Lifecycle hook failed (type=%s, event=%s)", + hook.hook_type, + event_name, + exc_info=True, + ) + if self._verbose and self._logger: + self._logger.verbose_detail( + f"[i] Lifecycle hook failed: {hook.hook_type} for {event_name}" + ) + return threads + + def hooks_for_event(self, event_name: str) -> list[HookEntry]: + """Return hooks registered for *event_name* (public API).""" + return [h for h in self._hooks if h.event == event_name] + + +# -- Convenience: build runner from file-based discovery ------------------- + + +def build_runner_from_context( + *, + logger: Any = None, + verbose: bool = False, + project_root: str | None = None, +) -> LifecycleHookRunner: + """Create a :class:`LifecycleHookRunner` via file-based discovery. + + Scans policy, user, and project hook directories for JSON files. + """ + hooks = discover_hooks(project_root=project_root) + + return LifecycleHookRunner( + hooks=hooks, + logger=logger, + verbose=verbose, + project_root=project_root, + ) diff --git a/src/apm_cli/install/service.py b/src/apm_cli/install/service.py index 6b8155499..93ae90bb5 100644 --- a/src/apm_cli/install/service.py +++ b/src/apm_cli/install/service.py @@ -27,6 +27,7 @@ from apm_cli.install.request import InstallRequest if TYPE_CHECKING: + from apm_cli.core.lifecycle_hooks import LifecycleEvent, LifecycleHookRunner from apm_cli.models.results import InstallResult @@ -46,6 +47,9 @@ class InstallService: def run(self, request: InstallRequest) -> InstallResult: """Execute the install pipeline and return the structured result. + Fires ``pre-install`` / ``post-install`` lifecycle hooks around + the pipeline when hooks are configured. + Raises: InstallNotAvailableError: if the dependency subsystem failed to import (e.g. missing optional extras). Adapters are @@ -69,7 +73,11 @@ def run(self, request: InstallRequest) -> InstallResult: except ImportError as e: # pragma: no cover -- defensive raise InstallNotAvailableError(f"APM dependency system not available: {e}") from e - return run_install_pipeline( + runner = self._build_hook_runner(request) + event = self._build_event("pre-install", request) + runner.fire("pre-install", event) + + result = run_install_pipeline( request.apm_package, update_refs=request.update_refs, verbose=request.verbose, @@ -96,6 +104,61 @@ def run(self, request: InstallRequest) -> InstallResult: trust_canvas=request.trust_canvas, ) + post_event = self._build_event("post-install", request) + runner.fire("post-install", post_event) + + return result + + # -- Lifecycle hook helpers --------------------------------------------- + + @staticmethod + def _build_hook_runner(request: InstallRequest) -> LifecycleHookRunner: + """Build a :class:`LifecycleHookRunner` from the request context.""" + from apm_cli.core.lifecycle_hooks import build_runner_from_context + + project_root = None + pkg_path = getattr(request.apm_package, "package_path", None) + if pkg_path is not None: + project_root = str(pkg_path) + + return build_runner_from_context( + logger=request.logger, + verbose=request.verbose, + project_root=project_root, + ) + + @staticmethod + def _build_event(event_name: str, request: InstallRequest) -> LifecycleEvent: + """Build a :class:`LifecycleEvent` from the request.""" + from apm_cli.core.lifecycle_hooks import LifecycleEvent, PackageInfo + + packages = [] + for dep in request.apm_package.get_apm_dependencies(): + packages.append( + PackageInfo( + name=dep.repo_url or str(dep), + reference=dep.reference, + ) + ) + + scope_name = "project" + if request.scope is not None: + scope_name = ( + request.scope.value if hasattr(request.scope, "value") else str(request.scope) + ) + + project_root = None + pkg_path = getattr(request.apm_package, "package_path", None) + if pkg_path is not None: + project_root = str(pkg_path) + + return LifecycleEvent.create( + event=event_name, + packages=packages, + scope=scope_name, + working_directory=project_root, + ) + @staticmethod def _enforce_frozen(request: InstallRequest) -> None: """Raise :class:`FrozenInstallError` if lockfile is absent or stale. diff --git a/tests/unit/commands/test_hooks.py b/tests/unit/commands/test_hooks.py new file mode 100644 index 000000000..cc16c45c4 --- /dev/null +++ b/tests/unit/commands/test_hooks.py @@ -0,0 +1,304 @@ +"""Unit tests for ``apm hooks`` CLI commands.""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import click.testing +import pytest + +from apm_cli.commands.hooks import _validate_hook_file, hooks + + +@pytest.fixture() +def cli_runner(): + return click.testing.CliRunner() + + +def _write_hook_file(path: Path, data: dict) -> Path: + """Write a hook JSON file and return the path.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data), encoding="utf-8") + return path + + +# -- apm hooks (list) ------------------------------------------------------- + + +class TestHooksList: + def test_no_hooks_shows_info(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + result = cli_runner.invoke(hooks, []) + assert result.exit_code == 0 + assert "No lifecycle hooks" in result.output + + def test_shows_discovered_hooks(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + { + "version": 1, + "hooks": { + "post-install": [{"type": "command", "bash": "echo hi"}], + }, + }, + ) + result = cli_runner.invoke(hooks, []) + assert result.exit_code == 0 + assert "1 hook" in result.output + + +# -- apm hooks test ---------------------------------------------------------- + + +class TestHooksTest: + def test_no_hooks_warns(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + result = cli_runner.invoke(hooks, ["test", "post-install"]) + assert result.exit_code == 0 + assert "No hooks registered" in result.output + + def test_fires_synthetic_event(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + monkeypatch.setenv("APM_HOME", str(tmp_path / "home")) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + { + "version": 1, + "hooks": { + "post-install": [{"type": "command", "bash": "echo test-ok"}], + }, + }, + ) + with patch("apm_cli.core.hook_executors.subprocess.run") as mock_run: + mock_run.return_value = MagicMock(stdout="test-ok", stderr="", returncode=0) + result = cli_runner.invoke(hooks, ["test", "post-install"]) + assert result.exit_code == 0 + assert "fired" in result.output.lower() or "Fired" in result.output + + def test_accepts_all_event_names(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + events = [ + "pre-install", + "post-install", + "pre-update", + "post-update", + "pre-uninstall", + "post-uninstall", + ] + for event in events: + result = cli_runner.invoke(hooks, ["test", event]) + assert result.exit_code == 0 + + def test_rejects_invalid_event(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + result = cli_runner.invoke(hooks, ["test", "invalid-event"]) + assert result.exit_code != 0 + + def test_default_event_is_post_install(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + result = cli_runner.invoke(hooks, ["test"]) + assert result.exit_code == 0 + # Either shows "No hooks" or fires -- both are valid for post-install default + + +# -- apm hooks init ---------------------------------------------------------- + + +class TestHooksInit: + def test_creates_hook_file(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + result = cli_runner.invoke(hooks, ["init"]) + assert result.exit_code == 0 + assert "Created hook file" in result.output + hook_file = tmp_path / ".apm" / "hooks.json" + assert hook_file.exists() + data = json.loads(hook_file.read_text()) + assert data["version"] == 1 + assert "post-install" in data["hooks"] + + def test_refuses_overwrite_without_force(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + {"version": 1, "hooks": {}}, + ) + result = cli_runner.invoke(hooks, ["init"]) + assert result.exit_code == 0 + assert "already exists" in result.output + + def test_force_overwrites(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + {"version": 1, "hooks": {}}, + ) + result = cli_runner.invoke(hooks, ["init", "--force"]) + assert result.exit_code == 0 + assert "Created hook file" in result.output + data = json.loads((tmp_path / ".apm" / "hooks.json").read_text()) + assert "post-install" in data["hooks"] + + +# -- apm hooks validate ------------------------------------------------------ + + +class TestHooksValidate: + def test_no_files_shows_info(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code == 0 + assert "No hook files found" in result.output + + def test_valid_file_passes(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + { + "version": 1, + "hooks": { + "post-install": [{"type": "command", "bash": "echo ok"}], + }, + }, + ) + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code == 0 + assert "valid" in result.output.lower() + + def test_invalid_json_fails(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + apm_dir = tmp_path / ".apm" + apm_dir.mkdir(parents=True) + (apm_dir / "hooks.json").write_text("{not valid json", encoding="utf-8") + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code != 0 + assert "Invalid JSON" in result.output + + def test_wrong_version_fails(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + {"version": 99, "hooks": {}}, + ) + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code != 0 + assert "Unsupported version" in result.output + + def test_unknown_event_fails(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + { + "version": 1, + "hooks": {"on-banana": [{"type": "command", "bash": "echo"}]}, + }, + ) + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code != 0 + assert "Unknown event" in result.output + + def test_command_without_bash_or_command_fails(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + { + "version": 1, + "hooks": {"post-install": [{"type": "command"}]}, + }, + ) + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code != 0 + assert "bash" in result.output or "command" in result.output + + def test_http_without_url_fails(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + { + "version": 1, + "hooks": {"post-install": [{"type": "http"}]}, + }, + ) + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code != 0 + assert "url" in result.output.lower() + + def test_http_url_rejects_plain_http(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + { + "version": 1, + "hooks": {"post-install": [{"type": "http", "url": "http://insecure.com/hook"}]}, + }, + ) + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code != 0 + assert "https" in result.output.lower() + + def test_multiple_errors_reported(self, cli_runner, tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + _write_hook_file( + tmp_path / ".apm" / "hooks.json", + { + "version": 1, + "hooks": { + "post-install": [{"type": "command"}, {"type": "http"}], + }, + }, + ) + result = cli_runner.invoke(hooks, ["validate"]) + assert result.exit_code != 0 + + +# -- _validate_hook_file (unit) --------------------------------------------- + + +class TestValidateHookFile: + def test_missing_version(self, tmp_path): + f = _write_hook_file(tmp_path / "t.json", {"hooks": {}}) + errors = _validate_hook_file(f, "project") + assert any("version" in e.lower() for e in errors) + + def test_missing_hooks_field(self, tmp_path): + f = _write_hook_file(tmp_path / "t.json", {"version": 1}) + errors = _validate_hook_file(f, "project") + assert any("hooks" in e.lower() for e in errors) + + def test_non_dict_root(self, tmp_path): + path = tmp_path / "t.json" + path.write_text("[]", encoding="utf-8") + errors = _validate_hook_file(path, "project") + assert any("object" in e.lower() for e in errors) + + def test_unreadable_file(self, tmp_path): + path = tmp_path / "missing.json" + errors = _validate_hook_file(path, "project") + assert any("read" in e.lower() for e in errors) + + def test_valid_file_returns_empty(self, tmp_path): + f = _write_hook_file( + tmp_path / "t.json", + { + "version": 1, + "hooks": { + "post-install": [{"type": "command", "bash": "echo"}], + "pre-uninstall": [{"type": "http", "url": "https://example.com/hook"}], + }, + }, + ) + errors = _validate_hook_file(f, "project") + assert errors == [] + + def test_unknown_hook_type(self, tmp_path): + f = _write_hook_file( + tmp_path / "t.json", + { + "version": 1, + "hooks": {"post-install": [{"type": "magic"}]}, + }, + ) + errors = _validate_hook_file(f, "project") + assert any("unknown type" in e.lower() for e in errors) diff --git a/tests/unit/core/test_hook_executors.py b/tests/unit/core/test_hook_executors.py new file mode 100644 index 000000000..67b6cd509 --- /dev/null +++ b/tests/unit/core/test_hook_executors.py @@ -0,0 +1,412 @@ +"""Unit tests for lifecycle hook executors (Copilot CLI aligned).""" + +from __future__ import annotations + +import json +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch +from urllib.parse import urlparse + +import pytest + +from apm_cli.core.hook_executors import ( + _append_to_hook_log, + _build_hook_env, + _execute_command, + _execute_http, + _expand_env_vars, + _get_hooks_log_path, + _redact_url_credentials, + _resolve_cwd, + execute_hook, +) +from apm_cli.core.lifecycle_hooks import HookEntry, LifecycleEvent, PackageInfo + + +def _make_event(event_name: str = "post-install") -> LifecycleEvent: + return LifecycleEvent( + event=event_name, + packages=[PackageInfo(name="org/repo", reference="v1")], + scope="project", + timestamp="2026-01-01T00:00:00Z", + cli_version="0.0.0", + working_directory="/tmp/test", + ) + + +# -- execute_hook dispatcher ------------------------------------------------ + + +class TestExecuteHook: + def test_dispatches_to_http(self) -> None: + hook = HookEntry(hook_type="http", event="post-install", url="https://example.com") + with patch("apm_cli.core.hook_executors._execute_http") as mock: + execute_hook(hook, _make_event()) + mock.assert_called_once() + + def test_dispatches_to_command(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="echo hi") + with patch("apm_cli.core.hook_executors._execute_command") as mock: + execute_hook(hook, _make_event()) + mock.assert_called_once() + + +# -- HTTP executor ---------------------------------------------------------- + + +class TestHttpExecutor: + def test_rejects_http_url(self) -> None: + hook = HookEntry(hook_type="http", event="post-install", url="http://insecure.com/hook") + logger = MagicMock() + with patch("apm_cli.core.hook_executors.threading") as mock_threading: + _execute_http(hook, _make_event(), logger=logger, verbose=True) + mock_threading.Thread.assert_not_called() + + def test_rejects_missing_url(self) -> None: + hook = HookEntry(hook_type="http", event="post-install", url=None) + with patch("apm_cli.core.hook_executors.threading") as mock_threading: + _execute_http(hook, _make_event()) + mock_threading.Thread.assert_not_called() + + def test_starts_daemon_thread_for_https(self) -> None: + hook = HookEntry( + hook_type="http", + event="post-install", + url="https://analytics.example.com/events", + ) + with patch("apm_cli.core.hook_executors.threading") as mock_threading: + mock_thread = MagicMock() + mock_threading.Thread.return_value = mock_thread + _execute_http(hook, _make_event()) + mock_threading.Thread.assert_called_once() + call_kwargs = mock_threading.Thread.call_args + assert call_kwargs.kwargs.get("daemon") is True + mock_thread.start.assert_called_once() + + def test_verbose_logs_hostname(self) -> None: + hook = HookEntry( + hook_type="http", + event="post-install", + url="https://analytics.corp.net/apm", + ) + logger = MagicMock() + with patch("apm_cli.core.hook_executors.threading"): + _execute_http(hook, _make_event(), logger=logger, verbose=True) + logger.verbose_detail.assert_called_once() + log_msg = logger.verbose_detail.call_args[0][0] + parsed = urlparse(hook.url) + assert parsed.hostname is not None + assert parsed.hostname in log_msg + + +# -- Command executor ------------------------------------------------------- + + +class TestCommandExecutor: + def test_runs_command_with_stdin_payload(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="echo done") + event = _make_event() + with patch("apm_cli.core.hook_executors.subprocess.run") as mock_run: + _execute_command(hook, event) + mock_run.assert_called_once() + call_kwargs = mock_run.call_args + input_data = call_kwargs.kwargs.get("input") + assert input_data is not None + payload = json.loads(input_data) + assert payload["event"] == "post-install" + + def test_uses_shell_true(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="echo") + with patch("apm_cli.core.hook_executors.subprocess.run") as mock_run: + _execute_command(hook, _make_event()) + call_kwargs = mock_run.call_args + assert call_kwargs.kwargs.get("shell") is True + + def test_timeout_from_hook(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="sleep", timeout_sec=5) + with patch("apm_cli.core.hook_executors.subprocess.run") as mock_run: + _execute_command(hook, _make_event()) + call_kwargs = mock_run.call_args + assert call_kwargs.kwargs.get("timeout") == 5 + + def test_default_timeout(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="echo") + with patch("apm_cli.core.hook_executors.subprocess.run") as mock_run: + _execute_command(hook, _make_event()) + call_kwargs = mock_run.call_args + assert call_kwargs.kwargs.get("timeout") == 30 + + def test_swallows_timeout_error(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="sleep") + with patch( + "apm_cli.core.hook_executors.subprocess.run", + side_effect=subprocess.TimeoutExpired("sleep", 30), + ): + _execute_command(hook, _make_event()) + + def test_swallows_generic_error(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="bad") + with patch( + "apm_cli.core.hook_executors.subprocess.run", + side_effect=OSError("not found"), + ): + _execute_command(hook, _make_event()) + + def test_skips_when_no_command(self) -> None: + hook = HookEntry(hook_type="command", event="post-install") + with patch("apm_cli.core.hook_executors.subprocess.run") as mock_run: + _execute_command(hook, _make_event()) + mock_run.assert_not_called() + + def test_verbose_logs_on_timeout(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="slow") + logger = MagicMock() + with patch( + "apm_cli.core.hook_executors.subprocess.run", + side_effect=subprocess.TimeoutExpired("slow", 30), + ): + _execute_command(hook, _make_event(), logger=logger, verbose=True) + logger.verbose_detail.assert_called_once() + + def test_merges_hook_env(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("EXISTING_VAR", "original") + hook = HookEntry( + hook_type="command", event="post-install", bash="echo", env={"EXTRA": "added"} + ) + with patch("apm_cli.core.hook_executors.subprocess.run") as mock_run: + _execute_command(hook, _make_event()) + env = mock_run.call_args.kwargs.get("env", {}) + assert env.get("EXISTING_VAR") == "original" + assert env.get("EXTRA") == "added" + + def test_uses_project_root_as_cwd(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="echo") + with patch("apm_cli.core.hook_executors.subprocess.run") as mock_run: + _execute_command(hook, _make_event(), project_root="/my/project") + assert mock_run.call_args.kwargs.get("cwd") == "/my/project" + + +# -- _expand_env_vars ------------------------------------------------------- + + +class TestExpandEnvVars: + def test_expands_dollar_var(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("MY_HOST", "example.com") + assert _expand_env_vars("Host: $MY_HOST") == "Host: example.com" + + def test_expands_braced_var(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("MY_HOST", "example.com") + assert _expand_env_vars("Host: ${MY_HOST}") == "Host: example.com" + + def test_missing_var_becomes_empty(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("NONEXISTENT", raising=False) + assert _expand_env_vars("Bearer $NONEXISTENT") == "Bearer " + + def test_no_vars_unchanged(self) -> None: + assert _expand_env_vars("plain text") == "plain text" + + def test_blocks_github_apm_pat(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("GITHUB_APM_PAT", "ghp_secret123") + assert _expand_env_vars("Bearer ${GITHUB_APM_PAT}") == "Bearer " + + def test_blocks_ado_apm_pat(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("ADO_APM_PAT", "ado_secret") + assert _expand_env_vars("$ADO_APM_PAT") == "" + + def test_blocks_token_vars(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("GITHUB_TOKEN", "ghp_tok") + assert _expand_env_vars("${GITHUB_TOKEN}") == "" + + def test_blocks_secret_vars(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("MY_SECRET", "s3cr3t") + assert _expand_env_vars("$MY_SECRET") == "" + + def test_blocks_password_vars(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DB_PASSWORD", "pass123") + assert _expand_env_vars("${DB_PASSWORD}") == "" + + def test_blocks_key_vars(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("API_KEY", "key123") + assert _expand_env_vars("${API_KEY}") == "" + + def test_allows_safe_vars(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("MY_HEADER_VALUE", "safe-value") + assert _expand_env_vars("${MY_HEADER_VALUE}") == "safe-value" + + +# -- _build_hook_env -------------------------------------------------------- + + +class TestBuildHookEnv: + def test_inherits_safe_environment(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("MY_CUSTOM_VAR", "hello") + hook = HookEntry(hook_type="command", event="post-install") + env = _build_hook_env(hook) + assert env.get("MY_CUSTOM_VAR") == "hello" + + def test_merges_hook_env(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", env={"FOO": "bar"}) + env = _build_hook_env(hook) + assert env.get("FOO") == "bar" + + def test_strips_credential_vars(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("GITHUB_APM_PAT", "ghp_secret") + monkeypatch.setenv("ADO_APM_PAT", "ado_secret") + monkeypatch.setenv("GITHUB_TOKEN", "ghp_tok") + monkeypatch.setenv("MY_SECRET", "s3cr3t") + monkeypatch.setenv("DB_PASSWORD", "pass") + monkeypatch.setenv("API_KEY", "key") + monkeypatch.setenv("SAFE_VAR", "kept") + hook = HookEntry(hook_type="command", event="post-install") + env = _build_hook_env(hook) + assert "GITHUB_APM_PAT" not in env + assert "ADO_APM_PAT" not in env + assert "GITHUB_TOKEN" not in env + assert "MY_SECRET" not in env + assert "DB_PASSWORD" not in env + assert "API_KEY" not in env + assert env.get("SAFE_VAR") == "kept" + + def test_preserves_path(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("PATH", "/usr/bin:/bin") + hook = HookEntry(hook_type="command", event="post-install") + env = _build_hook_env(hook) + assert env.get("PATH") == "/usr/bin:/bin" + + +# -- _resolve_cwd ----------------------------------------------------------- + + +class TestResolveCwd: + def test_returns_project_root_when_no_cwd(self) -> None: + hook = HookEntry(hook_type="command", event="post-install") + assert _resolve_cwd(hook, "/my/project") == "/my/project" + + def test_absolute_cwd_used_directly(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", cwd="/absolute/path") + assert _resolve_cwd(hook, "/my/project") == "/absolute/path" + + def test_relative_cwd_resolved_against_project_root(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", cwd="scripts") + result = _resolve_cwd(hook, "/my/project") + assert result == "/my/project/scripts" + + +# -- Hook output log ------------------------------------------------------- + + +class TestGetHooksLogPath: + def test_default_path(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("APM_HOME", raising=False) + path = _get_hooks_log_path() + assert path.name == "hooks.log" + assert "logs" in path.parts + + def test_respects_apm_home(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("APM_HOME", "/custom/apm") + path = _get_hooks_log_path() + assert str(path) == "/custom/apm/logs/hooks.log" + + +class TestAppendToHookLog: + def test_creates_log_file(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("APM_HOME", str(tmp_path)) + _append_to_hook_log("post-install", "command", "echo hi", stdout="hello world") + log = tmp_path / "logs" / "hooks.log" + assert log.exists() + content = log.read_text() + assert "post-install" in content + assert "command" in content + assert "echo hi" in content + assert "hello world" in content + + def test_includes_exit_code(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("APM_HOME", str(tmp_path)) + _append_to_hook_log("pre-install", "command", "false", exit_code=1, status="error") + content = (tmp_path / "logs" / "hooks.log").read_text() + assert "exit_code=1" in content + assert "status=error" in content + + def test_includes_stderr(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("APM_HOME", str(tmp_path)) + _append_to_hook_log("post-install", "command", "bad", stderr="not found") + content = (tmp_path / "logs" / "hooks.log").read_text() + assert "stderr: not found" in content + + def test_appends_multiple_entries( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.setenv("APM_HOME", str(tmp_path)) + _append_to_hook_log("pre-install", "command", "echo 1") + _append_to_hook_log("post-install", "command", "echo 2") + content = (tmp_path / "logs" / "hooks.log").read_text() + assert "pre-install" in content + assert "post-install" in content + + def test_swallows_write_errors(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("APM_HOME", "/nonexistent/readonly/path") + # Should not raise + _append_to_hook_log("post-install", "command", "echo", stdout="hi") + + +class TestCommandExecutorLogging: + def test_logs_successful_command_output( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.setenv("APM_HOME", str(tmp_path)) + hook = HookEntry(hook_type="command", event="post-install", bash="echo done") + mock_result = MagicMock() + mock_result.stdout = "hook output line" + mock_result.stderr = "" + mock_result.returncode = 0 + with patch("apm_cli.core.hook_executors.subprocess.run", return_value=mock_result): + _execute_command(hook, _make_event()) + content = (tmp_path / "logs" / "hooks.log").read_text() + assert "hook output line" in content + assert "exit_code=0" in content + assert "status=ok" in content + + def test_logs_failed_command(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("APM_HOME", str(tmp_path)) + hook = HookEntry(hook_type="command", event="post-install", bash="false") + mock_result = MagicMock() + mock_result.stdout = "" + mock_result.stderr = "something broke" + mock_result.returncode = 1 + with patch("apm_cli.core.hook_executors.subprocess.run", return_value=mock_result): + _execute_command(hook, _make_event()) + content = (tmp_path / "logs" / "hooks.log").read_text() + assert "something broke" in content + assert "exit_code=1" in content + assert "status=error" in content + + def test_logs_timeout(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("APM_HOME", str(tmp_path)) + hook = HookEntry(hook_type="command", event="post-install", bash="sleep 999") + with patch( + "apm_cli.core.hook_executors.subprocess.run", + side_effect=subprocess.TimeoutExpired("sleep", 30), + ): + _execute_command(hook, _make_event()) + content = (tmp_path / "logs" / "hooks.log").read_text() + assert "status=timeout" in content + + +# -- URL redaction ----------------------------------------------------------- + + +class TestRedactUrlCredentials: + def test_plain_url_unchanged(self) -> None: + assert _redact_url_credentials("https://example.com/hook") == "https://example.com/hook" + + def test_strips_user_password(self) -> None: + result = _redact_url_credentials("https://user:secret@example.com/hook") + assert "user" not in result + assert "secret" not in result + assert "example.com/hook" in result + + def test_strips_user_only(self) -> None: + result = _redact_url_credentials("https://user@example.com/hook") + assert "user" not in result + assert "example.com/hook" in result diff --git a/tests/unit/core/test_lifecycle_hooks.py b/tests/unit/core/test_lifecycle_hooks.py new file mode 100644 index 000000000..f11a8d22a --- /dev/null +++ b/tests/unit/core/test_lifecycle_hooks.py @@ -0,0 +1,388 @@ +"""Unit tests for lifecycle hook models, runner, and file-based discovery.""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +from apm_cli.core.lifecycle_hooks import ( + HOOK_TYPES, + LIFECYCLE_EVENTS, + HookEntry, + LifecycleEvent, + LifecycleHookRunner, + PackageInfo, + discover_hooks, + parse_hook_file, +) + +# -- PackageInfo ----------------------------------------------------------- + + +class TestPackageInfo: + def test_defaults(self) -> None: + pkg = PackageInfo(name="org/repo") + assert pkg.name == "org/repo" + assert pkg.reference is None + + def test_with_reference(self) -> None: + pkg = PackageInfo(name="org/repo", reference="v1.0.0") + assert pkg.reference == "v1.0.0" + + +# -- LifecycleEvent -------------------------------------------------------- + + +class TestLifecycleEvent: + def test_to_json_round_trips(self) -> None: + event = LifecycleEvent( + event="post-install", + packages=[PackageInfo(name="org/repo", reference="v1")], + scope="project", + timestamp="2026-01-01T00:00:00Z", + cli_version="0.14.0", + working_directory="/tmp/project", + ) + payload = json.loads(event.to_json()) + assert payload["schema_version"] == 1 + assert payload["event"] == "post-install" + assert payload["packages"] == [{"name": "org/repo", "reference": "v1"}] + assert payload["scope"] == "project" + assert payload["working_directory"] == "/tmp/project" + + @patch("apm_cli.version.get_version", return_value="0.14.1") + def test_create_factory_fills_version_and_timestamp(self, _mock_ver: MagicMock) -> None: + event = LifecycleEvent.create( + event="pre-install", + packages=[PackageInfo(name="a/b")], + scope="user", + ) + assert event.event == "pre-install" + assert event.cli_version == "0.14.1" + assert event.timestamp # non-empty ISO string + assert event.scope == "user" + + @patch("apm_cli.version.get_version", return_value="0.0.0") + def test_create_defaults_to_project_scope(self, _m: MagicMock) -> None: + event = LifecycleEvent.create(event="post-install") + assert event.scope == "project" + assert event.packages == [] + + @patch("apm_cli.version.get_version", return_value="0.0.0") + def test_create_with_working_directory(self, _m: MagicMock) -> None: + event = LifecycleEvent.create(event="post-install", working_directory="/my/project") + assert event.working_directory == "/my/project" + + +# -- HookEntry ------------------------------------------------------------- + + +class TestHookEntry: + def test_command_hook_effective_command_bash(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="./notify.sh") + assert hook.effective_command == "./notify.sh" + + def test_command_hook_effective_command_fallback(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", command="echo done") + assert hook.effective_command == "echo done" + + def test_command_hook_bash_takes_priority(self) -> None: + hook = HookEntry( + hook_type="command", event="post-install", bash="./bash.sh", command="echo fallback" + ) + assert hook.effective_command == "./bash.sh" + + def test_http_hook_no_command(self) -> None: + hook = HookEntry(hook_type="http", event="post-install", url="https://x.com") + assert hook.effective_command is None + + def test_effective_timeout_http(self) -> None: + hook = HookEntry(hook_type="http", event="post-install") + assert hook.effective_timeout == 10 + + def test_effective_timeout_command(self) -> None: + hook = HookEntry(hook_type="command", event="post-install") + assert hook.effective_timeout == 30 + + def test_effective_timeout_custom(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", timeout_sec=5) + assert hook.effective_timeout == 5 + + +# -- parse_hook_file ------------------------------------------------------- + + +class TestParseHookFile: + def test_parses_valid_file(self, tmp_path: Path) -> None: + hook_file = tmp_path / "hooks.json" + hook_file.write_text( + json.dumps( + { + "version": 1, + "hooks": { + "post-install": [ + {"type": "command", "bash": "echo done"}, + {"type": "http", "url": "https://x.com/hook"}, + ], + }, + } + ) + ) + hooks = parse_hook_file(hook_file) + assert len(hooks) == 2 + assert hooks[0].hook_type == "command" + assert hooks[0].bash == "echo done" + assert hooks[1].hook_type == "http" + assert hooks[1].url == "https://x.com/hook" + + def test_ignores_unknown_event(self, tmp_path: Path) -> None: + hook_file = tmp_path / "hooks.json" + hook_file.write_text( + json.dumps( + {"version": 1, "hooks": {"unknown-event": [{"type": "command", "bash": "x"}]}} + ) + ) + assert parse_hook_file(hook_file) == [] + + def test_ignores_unknown_type(self, tmp_path: Path) -> None: + hook_file = tmp_path / "hooks.json" + hook_file.write_text( + json.dumps({"version": 1, "hooks": {"post-install": [{"type": "script", "path": "x"}]}}) + ) + assert parse_hook_file(hook_file) == [] + + def test_rejects_wrong_version(self, tmp_path: Path) -> None: + hook_file = tmp_path / "hooks.json" + hook_file.write_text( + json.dumps( + {"version": 99, "hooks": {"post-install": [{"type": "command", "bash": "x"}]}} + ) + ) + assert parse_hook_file(hook_file) == [] + + def test_rejects_invalid_json(self, tmp_path: Path) -> None: + hook_file = tmp_path / "hooks.json" + hook_file.write_text("not json") + assert parse_hook_file(hook_file) == [] + + def test_rejects_non_dict(self, tmp_path: Path) -> None: + hook_file = tmp_path / "hooks.json" + hook_file.write_text(json.dumps([1, 2, 3])) + assert parse_hook_file(hook_file) == [] + + def test_preserves_optional_fields(self, tmp_path: Path) -> None: + hook_file = tmp_path / "hooks.json" + hook_file.write_text( + json.dumps( + { + "version": 1, + "hooks": { + "post-install": [ + { + "type": "command", + "bash": "echo", + "cwd": "./scripts", + "env": {"FOO": "bar"}, + "timeoutSec": 15, + } + ], + }, + } + ) + ) + hooks = parse_hook_file(hook_file) + assert len(hooks) == 1 + assert hooks[0].cwd == "./scripts" + assert hooks[0].env == {"FOO": "bar"} + assert hooks[0].timeout_sec == 15 + + def test_parses_http_headers(self, tmp_path: Path) -> None: + hook_file = tmp_path / "hooks.json" + hook_file.write_text( + json.dumps( + { + "version": 1, + "hooks": { + "post-install": [ + { + "type": "http", + "url": "https://example.com", + "headers": {"Authorization": "Bearer $TOKEN"}, + } + ], + }, + } + ) + ) + hooks = parse_hook_file(hook_file) + assert hooks[0].headers == {"Authorization": "Bearer $TOKEN"} + + +# -- discover_hooks --------------------------------------------------------- + + +class TestDiscoverHooks: + def test_discovers_from_project_file(self, tmp_path: Path) -> None: + apm_dir = tmp_path / ".apm" + apm_dir.mkdir(parents=True) + (apm_dir / "hooks.json").write_text( + json.dumps( + { + "version": 1, + "hooks": {"post-install": [{"type": "command", "bash": "echo project"}]}, + } + ) + ) + hooks = discover_hooks(project_root=str(tmp_path)) + assert len(hooks) >= 1 + assert any(h.bash == "echo project" for h in hooks) + + def test_discovers_from_user_dir(self, tmp_path: Path) -> None: + user_hooks = tmp_path / "user_hooks" + user_hooks.mkdir() + (user_hooks / "global.json").write_text( + json.dumps( + { + "version": 1, + "hooks": {"post-install": [{"type": "command", "bash": "echo user"}]}, + } + ) + ) + with patch("apm_cli.core.lifecycle_hooks._get_user_hooks_dir", return_value=user_hooks): + hooks = discover_hooks() + assert any(h.bash == "echo user" for h in hooks) + + def test_additive_across_sources(self, tmp_path: Path) -> None: + user_hooks = tmp_path / "user" + user_hooks.mkdir() + (user_hooks / "a.json").write_text( + json.dumps( + { + "version": 1, + "hooks": {"post-install": [{"type": "command", "bash": "echo user"}]}, + } + ) + ) + project_dir = tmp_path / "project" + project_apm = project_dir / ".apm" + project_apm.mkdir(parents=True) + (project_apm / "hooks.json").write_text( + json.dumps( + { + "version": 1, + "hooks": {"post-install": [{"type": "command", "bash": "echo project"}]}, + } + ) + ) + with patch("apm_cli.core.lifecycle_hooks._get_user_hooks_dir", return_value=user_hooks): + hooks = discover_hooks(project_root=str(project_dir)) + assert len(hooks) == 2 + + def test_missing_project_file_returns_empty(self, tmp_path: Path) -> None: + hooks = discover_hooks(project_root=str(tmp_path)) + # No policy or user hooks either in a clean tmp_path + assert hooks == [] + + def test_no_dirs_returns_empty(self) -> None: + with ( + patch( + "apm_cli.core.lifecycle_hooks._get_policy_hooks_dir", + return_value=Path("/nonexistent"), + ), + patch( + "apm_cli.core.lifecycle_hooks._get_user_hooks_dir", + return_value=Path("/nonexistent2"), + ), + ): + hooks = discover_hooks(project_root="/nonexistent3") + assert hooks == [] + + +# -- LifecycleHookRunner --------------------------------------------------- + + +class TestLifecycleHookRunner: + def _make_event(self, event_name: str = "post-install") -> LifecycleEvent: + return LifecycleEvent( + event=event_name, + packages=[PackageInfo(name="org/repo")], + scope="project", + timestamp="2026-01-01T00:00:00Z", + cli_version="0.0.0", + working_directory="/tmp/test", + ) + + def test_fire_calls_matching_hooks(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="echo hi") + runner = LifecycleHookRunner(hooks=[hook]) + with patch("apm_cli.core.hook_executors.execute_hook") as mock_exec: + runner.fire("post-install", self._make_event()) + mock_exec.assert_called_once() + + def test_fire_skips_non_matching_events(self) -> None: + hook = HookEntry(hook_type="command", event="pre-install", bash="echo") + runner = LifecycleHookRunner(hooks=[hook]) + with patch("apm_cli.core.hook_executors.execute_hook") as mock_exec: + runner.fire("post-install", self._make_event()) + mock_exec.assert_not_called() + + def test_error_isolation_one_failing_hook_does_not_block_others(self) -> None: + hook1 = HookEntry(hook_type="command", event="post-install", bash="fail") + hook2 = HookEntry(hook_type="command", event="post-install", bash="ok") + runner = LifecycleHookRunner(hooks=[hook1, hook2]) + call_count = 0 + + def _side_effect(hook, event, **kw): + nonlocal call_count + call_count += 1 + if hook.bash == "fail": + raise RuntimeError("boom") + + with patch("apm_cli.core.hook_executors.execute_hook", side_effect=_side_effect): + runner.fire("post-install", self._make_event()) + assert call_count == 2 # both hooks were attempted + + def test_fire_with_no_hooks_is_noop(self) -> None: + runner = LifecycleHookRunner(hooks=[]) + # Should not raise. + runner.fire("post-install", self._make_event()) + + def test_verbose_logs_on_failure(self) -> None: + hook = HookEntry(hook_type="command", event="post-install", bash="bad") + logger = MagicMock() + runner = LifecycleHookRunner(hooks=[hook], logger=logger, verbose=True) + with patch("apm_cli.core.hook_executors.execute_hook", side_effect=RuntimeError("boom")): + runner.fire("post-install", self._make_event()) + logger.verbose_detail.assert_called_once() + + +# -- Constants -------------------------------------------------------------- + + +class TestConstants: + def test_lifecycle_events_tuple(self) -> None: + assert "pre-install" in LIFECYCLE_EVENTS + assert "post-install" in LIFECYCLE_EVENTS + assert "pre-update" in LIFECYCLE_EVENTS + assert "post-update" in LIFECYCLE_EVENTS + assert "pre-uninstall" in LIFECYCLE_EVENTS + assert "post-uninstall" in LIFECYCLE_EVENTS + + def test_hook_types_tuple(self) -> None: + assert set(HOOK_TYPES) == {"command", "http"} + + +class TestHooksForEvent: + def test_returns_matching_hooks(self) -> None: + h1 = HookEntry(hook_type="command", event="post-install", bash="echo a") + h2 = HookEntry(hook_type="command", event="pre-install", bash="echo b") + h3 = HookEntry(hook_type="http", event="post-install", url="https://x.com") + runner = LifecycleHookRunner(hooks=[h1, h2, h3]) + result = runner.hooks_for_event("post-install") + assert result == [h1, h3] + + def test_returns_empty_for_unknown_event(self) -> None: + h = HookEntry(hook_type="command", event="post-install", bash="echo") + runner = LifecycleHookRunner(hooks=[h]) + assert runner.hooks_for_event("pre-uninstall") == []