diff --git a/Makefile b/Makefile index 648442b..7dff04c 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,8 @@ CODEX_INSTALL_PLUGINS := ox,oxgh CODEX_DEV_PLUGINS := ox,oxgh,oxgl +.PHONY: setup dev dev-codex format check codex install-codex link-codex bump bump-check + setup: @bash scripts/banner.sh uv sync --frozen @@ -41,6 +43,8 @@ link-codex: bump: uv run python scripts/bump.py $(filter-out $@,$(MAKECMDGOALS)) + # Version bumps update plugin manifests that are copied into Codex plugin packages. + uv run python scripts/generate_codex.py bump-check: uv run python scripts/bump.py --check diff --git a/README.md b/README.md index 8f13383..c67f7ca 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,8 @@ Those user-level installed skills are available from any repo as `$oxgh:open-pr` The generated Codex plugin packages under `codex/plugins/` are for Codex plugin marketplace workflows. The repo-local marketplace at `.agents/plugins/marketplace.json` points at those packages when working in this repository. +The generated `ox` Codex plugin also includes PostToolUse and Stop hooks. Configure checks with the same `.claude/ox-hooks.json` file shown below: Codex runs `fast` checks after edits and `slow` checks before it finishes a turn. + ### Claude Code #### 1. Add the marketplace to your project diff --git a/codex/plugins/ox/.codex-plugin/plugin.json b/codex/plugins/ox/.codex-plugin/plugin.json index d7480d7..2f71c1d 100644 --- a/codex/plugins/ox/.codex-plugin/plugin.json +++ b/codex/plugins/ox/.codex-plugin/plugin.json @@ -1,6 +1,6 @@ { "name": "ox", - "version": "0.0.13", + "version": "0.0.15", "description": "Base plugin — commit command, code quality hooks, auto-format and check hooks for all projects", "author": { "name": "Oxidian" @@ -11,5 +11,6 @@ "shortDescription": "Base plugin — commit command, code quality hooks, auto-format and check hooks for all projects", "developerName": "Oxidian", "category": "Productivity" - } + }, + "hooks": "./hooks.json" } diff --git a/codex/plugins/ox/hooks.json b/codex/plugins/ox/hooks.json new file mode 100644 index 0000000..0101602 --- /dev/null +++ b/codex/plugins/ox/hooks.json @@ -0,0 +1,29 @@ +{ + "hooks": { + "PostToolUse": [ + { + "matcher": "apply_patch|Edit|Write", + "hooks": [ + { + "type": "command", + "command": "sh -c 'root=\"$(git rev-parse --show-toplevel 2>/dev/null || pwd)\"; bootstrap_dir=\"${CODEX_PLUGINS_BOOTSTRAP_DIR:-.codex/cc-plugins}\"; case \"$bootstrap_dir\" in /*) bootstrap_root=\"$bootstrap_dir\" ;; *) bootstrap_root=\"$root/$bootstrap_dir\" ;; esac; bootstrap_runner=\"$bootstrap_root/codex/plugins/ox/scripts/run_if_changed.py\"; repo_runner=\"$root/codex/plugins/ox/scripts/run_if_changed.py\"; if [ -f \"$bootstrap_runner\" ]; then runner=\"$bootstrap_runner\"; elif [ -f \"$repo_runner\" ]; then runner=\"$repo_runner\"; else echo \"ox hook runner not found; checked $bootstrap_runner and $repo_runner\" >&2; exit 2; fi; exec python3 \"$runner\" --runtime codex --action fast'", + "timeout": 30, + "statusMessage": "Running fast checks" + } + ] + } + ], + "Stop": [ + { + "hooks": [ + { + "type": "command", + "command": "sh -c 'root=\"$(git rev-parse --show-toplevel 2>/dev/null || pwd)\"; bootstrap_dir=\"${CODEX_PLUGINS_BOOTSTRAP_DIR:-.codex/cc-plugins}\"; case \"$bootstrap_dir\" in /*) bootstrap_root=\"$bootstrap_dir\" ;; *) bootstrap_root=\"$root/$bootstrap_dir\" ;; esac; bootstrap_runner=\"$bootstrap_root/codex/plugins/ox/scripts/run_if_changed.py\"; repo_runner=\"$root/codex/plugins/ox/scripts/run_if_changed.py\"; if [ -f \"$bootstrap_runner\" ]; then runner=\"$bootstrap_runner\"; elif [ -f \"$repo_runner\" ]; then runner=\"$repo_runner\"; else echo \"ox hook runner not found; checked $bootstrap_runner and $repo_runner\" >&2; exit 2; fi; exec python3 \"$runner\" --runtime codex --action slow'", + "timeout": 120, + "statusMessage": "Running final checks" + } + ] + } + ] + } +} diff --git a/codex/plugins/ox/scripts/ban_complex_bash.py b/codex/plugins/ox/scripts/ban_complex_bash.py new file mode 100644 index 0000000..bae74e9 --- /dev/null +++ b/codex/plugins/ox/scripts/ban_complex_bash.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +"""PermissionRequest hook: auto-deny complex Bash commands. + +In practice, when Claude attempts complex bash commands there's already a simpler +way to do what it wants. These commands block the agentic loop waiting for user +input, which we don't want. + +When Claude Code shows a permission dialog for a Bash command and the +``permission_suggestions`` field is missing or empty (meaning no "Always Allow" +option is available), the command is considered too complex and is denied +automatically. + +Plain ``cd`` commands are an exception — they never have permission_suggestions +but are simple navigation commands, so they are allowed through to the normal +permission dialog. Chained ``cd`` commands (e.g. ``cd /path && make build``) are +denied with guidance to run the ``cd`` separately. +""" + +import json +import re +import sys + +DENY_MESSAGE = ( + "BLOCKED: Bash command too complex. " + "Check CLAUDE.md for available dev commands or use a simpler command with fewer pipes." +) + +CD_DENY_MESSAGE = ( + "BLOCKED: You MUST run `{cd_part}` as a separate Bash tool call first," + " THEN run your actual command in a second Bash tool call." + " Do NOT drop the cd or combine them." +) + +_SHELL_CHAIN = re.compile(r"[;&|`]|\$\(") + + +def _is_cd_only(command: str) -> bool: + """Return True if the command is just ``cd`` with no chaining.""" + stripped = command.strip() + if stripped != "cd" and not stripped.startswith(("cd ", "cd\t")): + return False + return not _SHELL_CHAIN.search(stripped) + + +def _extract_cd(command: str) -> str: + """Extract the cd portion from a chained command like ``cd /path && ...``.""" + m = re.match(r"(cd\s+\S+)", command.strip()) + return m.group(1) if m else "cd" + + +def should_deny(input_data: dict) -> str | None: + """Return a deny message, or None to allow.""" + if input_data.get("tool_name") != "Bash": + return None + suggestions = input_data.get("permission_suggestions") + if suggestions: + return None + command = input_data.get("tool_input", {}).get("command", "") + if _is_cd_only(command): + return None + if command.strip().startswith(("cd ", "cd\t")): + return CD_DENY_MESSAGE.format(cd_part=_extract_cd(command)) + return DENY_MESSAGE + + +def main() -> None: + try: + input_data = json.load(sys.stdin) + except json.JSONDecodeError as e: + print(f"Error: Invalid JSON input: {e}", file=sys.stderr) + sys.exit(1) + + message = should_deny(input_data) + if message: + result = { + "hookSpecificOutput": { + "hookEventName": "PermissionRequest", + "decision": { + "behavior": "deny", + "message": message, + }, + } + } + json.dump(result, sys.stdout) + + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/codex/plugins/ox/scripts/ban_custom_debug.py b/codex/plugins/ox/scripts/ban_custom_debug.py new file mode 100644 index 0000000..8b1ea99 --- /dev/null +++ b/codex/plugins/ox/scripts/ban_custom_debug.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +import json +import re +import sys + +# Define validation rules as a list of (regex pattern, message) tuples +VALIDATION_RULES_BASH = [ + ( + r"python3?\s+-c\b", + "Avoid 'python3 -c' commands for debugging. Run the existing tests with 'uv run pytest $TEST_FILE --vv --log-cli-level=INFO' and add `logger.info` logs to the code if you need to debug.", + ), + ( + r"debug.*<< 'EOF'", + "Avoid creating custom debug files. Run the existing tests with 'uv run pytest $TEST_FILE --vv --log-cli-level=INFO' and add `logger.info` logs to the code if you need to debug.", + ), +] + +VALIDATION_RULES_WRITE = [ + ( + r".*debug.*", + "Avoid creating custom debug files. Run the existing tests with 'uv run pytest $TEST_FILE --vv --log-cli-level=INFO' and add `logger.info` logs to the code if you need to debug.", + ), +] + + +def validate_command(command: str) -> list[str]: + if not command: + print("ERROR: no command provided to validate_command", file=sys.stderr) + sys.exit(1) + + issues = [] + for pattern, message in VALIDATION_RULES_BASH: + if re.search(pattern, command): + issues.append(message) + return issues + + +def validate_write(tool_input: dict) -> list[str]: + if not tool_input: + print("ERROR: no tool_input provided to validate_write", file=sys.stderr) + sys.exit(1) + + issues = [] + for pattern, message in VALIDATION_RULES_WRITE: + if re.search(pattern, tool_input["file_path"]): + issues.append(message) + return issues + + +try: + input_data = json.load(sys.stdin) +except json.JSONDecodeError as e: + print(f"Error: Invalid JSON input: {e}", file=sys.stderr) + sys.exit(1) + +tool_name = input_data.get("tool_name", "") +tool_input = input_data.get("tool_input", {}) +command = tool_input.get("command", "") + +if tool_name == "Bash": + issues = validate_command(command) +elif tool_name == "Write": + issues = validate_write(tool_input) +else: + print(f"ERROR: unknown tool_name '{tool_name}', exiting", file=sys.stderr) + sys.exit(1) + +if issues: + for message in issues: + print(f"\u2022 {message}", file=sys.stderr) + # Exit code 2 blocks tool call and shows stderr to Claude + sys.exit(2) diff --git a/codex/plugins/ox/scripts/ban_lint_suppressions.py b/codex/plugins/ox/scripts/ban_lint_suppressions.py new file mode 100644 index 0000000..4a01cbc --- /dev/null +++ b/codex/plugins/ox/scripts/ban_lint_suppressions.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +import json +import sys + + +def check_for_suppressions(content: str) -> list[str]: + """Check if content contains lint/type checker suppressions.""" + issues = [] + + banned_comments = ["# type: ignore", "# noqa", "# pyright: ignore"] + + for banned in banned_comments: + if banned in content: + issues.append(f"BLOCKED: Code contains '{banned}' comment") + + return issues + + +def validate_edit(tool_input: dict) -> list[str]: + """Validate Edit tool for suppressions in new_string.""" + new_string = tool_input.get("new_string", "") + return check_for_suppressions(new_string) + + +def validate_write(tool_input: dict) -> list[str]: + """Validate Write tool for suppressions in content.""" + content = tool_input.get("content", "") + return check_for_suppressions(content) + + +try: + input_data = json.load(sys.stdin) +except json.JSONDecodeError as e: + print(f"Error: Invalid JSON input: {e}", file=sys.stderr) + sys.exit(1) + +tool_name = input_data.get("tool_name", "") +tool_input = input_data.get("tool_input", {}) + +issues = [] +if tool_name == "Edit": + issues = validate_edit(tool_input) +elif tool_name == "Write": + issues = validate_write(tool_input) + +if issues: + print("\n".join(issues), file=sys.stderr) + print( + "\nStop and explain to the user why you think this lint/type checker suppression is necessary.", + file=sys.stderr, + ) + # Exit code 2 blocks tool call and shows stderr to Claude + sys.exit(2) + +# If we reach here, no suppressions found +sys.exit(0) diff --git a/codex/plugins/ox/scripts/ban_redundant_cd.py b/codex/plugins/ox/scripts/ban_redundant_cd.py new file mode 100644 index 0000000..130cf2a --- /dev/null +++ b/codex/plugins/ox/scripts/ban_redundant_cd.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +import json +import re +import sys + + +def validate_bash_command(command: str, cwd: str) -> str: + """Check if bash command has redundant cd to backend or frontend.""" + if not command: + return "" + + # Check for 'cd backend' pattern + if re.match(r"^\s*cd\s+backend\b", command) and cwd.endswith("/backend"): + # Strip the redundant cd part + clean_command = re.sub(r"^\s*cd\s+backend\s*&&\s*", "", command) + clean_command = re.sub(r"^\s*cd\s+backend\s*$", "", clean_command) + return f""" +BLOCKED: You are already in the backend directory ({cwd}). +Remove the 'cd backend' prefix. + +Command should be: {clean_command} +""" + + # Check for 'cd frontend' pattern + if re.match(r"^\s*cd\s+frontend\b", command) and cwd.endswith("/frontend"): + # Strip the redundant cd part + clean_command = re.sub(r"^\s*cd\s+frontend\s*&&\s*", "", command) + clean_command = re.sub(r"^\s*cd\s+frontend\s*$", "", clean_command) + return f""" +BLOCKED: You are already in the frontend directory ({cwd}). +Remove the 'cd frontend' prefix. + +Command should be: {clean_command} +""" + + return "" + + +try: + input_data = json.load(sys.stdin) +except json.JSONDecodeError as e: + print(f"Error: Invalid JSON input: {e}", file=sys.stderr) + sys.exit(1) + +tool_name = input_data.get("tool_name", "") +tool_input = input_data.get("tool_input", {}) +cwd = input_data.get("cwd", "") +command = tool_input.get("command", "") + +if tool_name != "Bash": + # Only applies to Bash commands + sys.exit(0) + +error_message = validate_bash_command(command, cwd) + +if error_message: + print(error_message, file=sys.stderr) + # Exit code 2 blocks tool call and shows stderr to Claude + sys.exit(2) + +# If we reach here, the command is allowed +sys.exit(0) diff --git a/codex/plugins/ox/scripts/run_if_changed.py b/codex/plugins/ox/scripts/run_if_changed.py new file mode 100644 index 0000000..f700f67 --- /dev/null +++ b/codex/plugins/ox/scripts/run_if_changed.py @@ -0,0 +1,435 @@ +#!/usr/bin/env python3 +"""Run fast or slow checks when files have changed. + +Reads project configuration from .claude/ox-hooks.json to determine which +checks to run. Each check config defines fast/slow commands and an +optional directory scope. + +Usage: + python run_if_changed.py --project-dir $CLAUDE_PROJECT_DIR --action fast + python run_if_changed.py --project-dir $CLAUDE_PROJECT_DIR --action slow + python run_if_changed.py --runtime codex --action slow +""" + +import argparse +import contextlib +import json +import os +import subprocess +import sys +from typing import TextIO + +# https://docs.anthropic.com/en/docs/claude-code/hooks#simple%3A-exit-code +BLOCKING_ERROR_CODE = 2 +SUCCESS_CODE = 0 +RUNTIME_CLAUDE = "claude" +RUNTIME_CODEX = "codex" +MAX_CODEX_FEEDBACK_CHARS = 20000 + +CONFIG_PATH = ".claude/ox-hooks.json" +DEFAULT_FAST_EVERY = 5 + + +def _emit(runtime: str, message: str, *, file: TextIO = sys.stdout) -> None: + """Print hook output only for runtimes that accept plain text logs.""" + if runtime == RUNTIME_CLAUDE: + print(message, file=file) + + +def _git_root_or_cwd(cwd: str) -> str: + """Return the Git root for cwd, falling back to cwd outside Git repos.""" + try: + result = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + capture_output=True, + text=True, + cwd=cwd, + ) + except Exception: + return cwd + + if result.returncode == 0: + root = result.stdout.strip() + if root: + return root + return cwd + + +def _codex_project_dir(hook_input: dict | None) -> str: + """Derive the Codex project directory from hook input.""" + if hook_input: + cwd = hook_input.get("cwd") + if isinstance(cwd, str) and cwd: + return _git_root_or_cwd(cwd) + return _git_root_or_cwd(os.getcwd()) + + +def _codex_failure_feedback(action: str, failure_outputs: list[str]) -> str: + """Build the continuation prompt Codex receives when checks fail.""" + check_name = "Final checks" if action == "slow" else "Fast checks" + body = "\n\n".join(output.strip() for output in failure_outputs if output.strip()) + if len(body) > MAX_CODEX_FEEDBACK_CHARS: + body = body[-MAX_CODEX_FEEDBACK_CHARS:] + if body: + return f"{check_name} failed. Fix these issues before finishing.\n\n{body}\n" + return f"{check_name} failed. Re-run the configured checks and fix the failures before finishing.\n" + + +def _get_state_file_path(session_id: str) -> str: + """Return the path to the throttle state file for this session.""" + return f"/tmp/ox-hooks-{session_id}.json" + + +def _load_edit_count(state_file: str) -> int: + """Read the edit counter from the state file. Returns 0 on missing/corrupt.""" + try: + with open(state_file) as f: + data = json.load(f) + return int(data.get("edit_count", 0)) + except (FileNotFoundError, json.JSONDecodeError, OSError, ValueError, TypeError): + return 0 + + +def _save_edit_count(state_file: str, count: int) -> None: + """Write the edit counter to the state file. Silently catches errors.""" + try: + with open(state_file, "w") as f: + json.dump({"edit_count": count}, f) + except OSError: + pass + + +def should_skip_throttled(edit_count: int, fast_every: int) -> bool: + """Decide whether to skip the fast check based on edit count. + + Runs every ``fast_every``-th edit (5, 10, 15, … by default). + Returns True when the check should be *skipped*. + """ + if fast_every <= 1: + return False + return edit_count % fast_every != 0 + + +def _is_python_import_only(old_string: str, new_string: str) -> bool: + """Check if edit only adds/modifies Python import statements.""" + + def get_non_import_lines(text: str) -> list[str]: + lines = [] + in_paren_import = False + backslash_continuation = False + + for line in text.split("\n"): + stripped = line.strip() + + # Handle backslash continuation from previous line + if backslash_continuation: + backslash_continuation = stripped.endswith("\\") + continue + + # Handle parenthesized import block + if in_paren_import: + if ")" in stripped: + in_paren_import = False + continue + + # Check if this is an import statement + if stripped.startswith(("import ", "from ")): + # Multi-line parenthesized import + if " import " in stripped and "(" in stripped and ")" not in stripped: + in_paren_import = True + # Backslash continuation + elif stripped.endswith("\\"): + backslash_continuation = True + continue + + # Non-import line + if stripped: + lines.append(line) + + return lines + + return get_non_import_lines(old_string) == get_non_import_lines(new_string) + + +def _is_js_import_only(old_string: str, new_string: str) -> bool: + """Check if edit only adds/modifies JS/TS import statements.""" + + def get_non_import_lines(text: str) -> list[str]: + lines = [] + in_import_block = False + in_export_block = False + + for line in text.split("\n"): + stripped = line.strip() + + # Handle multi-line import block + if in_import_block: + if "from" in stripped or stripped.endswith(";"): + in_import_block = False + continue + + # Handle multi-line export block + if in_export_block: + if stripped.startswith("}") or stripped.endswith("};"): + in_export_block = False + continue + + # Check if this starts an import + if stripped.startswith("import "): + if "{" in stripped and "}" not in stripped: + in_import_block = True + continue + + # Only treat re-export blocks as export-only (export { ... } or export type { ... }) + if stripped.startswith(("export {", "export type {")): + if "}" not in stripped: + in_export_block = True + continue + + # Non-import/export line + if stripped: + lines.append(line) + + return lines + + return get_non_import_lines(old_string) == get_non_import_lines(new_string) + + +def is_import_only_edit(hook_input: dict) -> bool: + """Check if this edit only modifies import statements. + + Routes to language-specific logic based on file extension. + """ + tool_input = hook_input.get("tool_input", {}) + file_path = tool_input.get("file_path", "") + old_string = tool_input.get("old_string", "") + new_string = tool_input.get("new_string", "") + + if file_path.endswith(".py"): + return _is_python_import_only(old_string, new_string) + + if any(file_path.endswith(ext) for ext in (".ts", ".tsx", ".js", ".jsx")): + return _is_js_import_only(old_string, new_string) + + return False + + +def _is_team_lead_session(session_id: str) -> bool: + """Check if this session is leading an active agent team. + + Agent teams (https://docs.anthropic.com/en/docs/claude-code/agent-teams) + store config in ~/.claude/teams//config.json, which includes + leadSessionId. When the lead session runs stop checks, it blocks the + orchestrator from coordinating members. The members' own sessions still + run their stop hooks independently, so skipping here is safe. + """ + teams_dir = os.path.expanduser("~/.claude/teams") + if not os.path.isdir(teams_dir): + return False + for entry in os.listdir(teams_dir): + config_path = os.path.join(teams_dir, entry, "config.json") + if not os.path.isfile(config_path): + continue + try: + with open(config_path) as f: + team = json.load(f) + if team.get("leadSessionId") == session_id: + return True + except (json.JSONDecodeError, OSError): + continue + return False + + +def get_changed_files(project_dir: str) -> set[str]: + """Return the set of changed file paths from git status --porcelain.""" + try: + result = subprocess.run( + "git status --porcelain", + shell=True, + capture_output=True, + text=True, + cwd=project_dir, + ) + except Exception as e: + print(f"Error getting git status: {e}", file=sys.stderr) + sys.exit(BLOCKING_ERROR_CODE) + + if result.returncode != 0: + print(f"Error getting git status: {result.stderr}", file=sys.stderr) + sys.exit(BLOCKING_ERROR_CODE) + + files = set() + for line in result.stdout.split("\n"): + if line: + files.add(line[3:]) # Skip XY status codes and space + return files + + +def directory_has_changes(changed_files: set[str], directory: str) -> bool: + """Check if any changed file is under the given directory.""" + prefix = f"{directory}/" + return any(f.startswith(prefix) for f in changed_files) + + +def run_check(command: str, cwd: str, action: str, runtime: str) -> tuple[bool, str]: + """Run a command in the given directory. Returns True on success.""" + _emit(runtime, f"Running `{command}` in {cwd}") + + process = subprocess.Popen( + command, + shell=True, + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + universal_newlines=True, + ) + output_lines = [] + if process.stdout: + for line in process.stdout: + output_lines.append(line) + process.wait() + output = "".join(output_lines) + + if process.returncode == 0: + if output and runtime == RUNTIME_CLAUDE: + print(output, end="") + if action == "slow": + _emit(runtime, "Checks passed.") + else: + _emit(runtime, "Fast check completed successfully.") + return True, "" + else: + if output and runtime == RUNTIME_CLAUDE: + print(output, end="", file=sys.stderr) + if action == "slow": + _emit(runtime, "Checks failed. You must fix them.", file=sys.stderr) + else: + _emit(runtime, "Fast check failed. You must fix the issues.", file=sys.stderr) + + failure = f"Running `{command}` in {cwd}" + if output: + failure = f"{failure}\n{output.rstrip()}" + return False, failure + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run fast/slow checks when files change") + parser.add_argument("--project-dir", help="Project root directory") + parser.add_argument( + "--action", + required=True, + choices=["fast", "slow"], + help="Action to run", + ) + parser.add_argument( + "--runtime", + default=RUNTIME_CLAUDE, + choices=[RUNTIME_CLAUDE, RUNTIME_CODEX], + help="Hook runtime output semantics", + ) + args = parser.parse_args() + + # Read hook input from stdin before resolving Codex's project directory. + hook_input = None + session_id = "" + try: + stdin_data = sys.stdin.read() + if stdin_data: + parsed_input = json.loads(stdin_data) + if isinstance(parsed_input, dict): + hook_input = parsed_input + session_id = hook_input.get("session_id", "") + except (json.JSONDecodeError, Exception): + pass + + project_dir = args.project_dir + if not project_dir and args.runtime == RUNTIME_CODEX: + project_dir = _codex_project_dir(hook_input) + if not project_dir: + parser.error("--project-dir is required unless --runtime codex can derive cwd") + + # Read config + config_file = os.path.join(project_dir, CONFIG_PATH) + if not os.path.exists(config_file): + _emit(args.runtime, f"No {CONFIG_PATH} found, skipping") + sys.exit(SUCCESS_CODE) + + with open(config_file) as f: + config = json.load(f) + + checks = config.get("checks", []) + if not checks: + _emit(args.runtime, f"No checks configured in {CONFIG_PATH}, skipping") + sys.exit(SUCCESS_CODE) + + if hook_input and hook_input.get("permission_mode") == "plan": + _emit(args.runtime, "Plan mode active, skipping") + sys.exit(SUCCESS_CODE) + if hook_input and args.action == "slow" and _is_team_lead_session(session_id): + _emit(args.runtime, "Agent team lead session, skipping stop checks") + sys.exit(SUCCESS_CODE) + + # Skip fast checks for import-only edits + if args.action == "fast" and hook_input and is_import_only_edit(hook_input): + _emit(args.runtime, "Import-only edit detected, skipping fast check") + sys.exit(SUCCESS_CODE) + + # Throttle fast checks — only run every Nth edit + if args.action == "fast" and session_id: + fast_every = config.get("fast_every", DEFAULT_FAST_EVERY) + state_file = _get_state_file_path(session_id) + edit_count = _load_edit_count(state_file) + 1 + _save_edit_count(state_file, edit_count) + if should_skip_throttled(edit_count, fast_every): + _emit(args.runtime, f"Throttled: edit {edit_count} (runs every {fast_every}), skipping fast check") + sys.exit(SUCCESS_CODE) + + changed_files = get_changed_files(project_dir) + if not changed_files: + _emit(args.runtime, "No files changed, skipping") + sys.exit(SUCCESS_CODE) + + any_failed = False + failure_outputs = [] + + for check in checks: + command = check.get(args.action) + if not command: + continue + + directory = check.get("directory") + + if directory: + if not directory_has_changes(changed_files, directory): + _emit(args.runtime, f"No {directory}/ files modified, skipping") + continue + cwd = os.path.join(project_dir, directory) + else: + # Whole-project check — run at project root on any change + cwd = project_dir + + passed, failure_output = run_check(command, cwd, args.action, args.runtime) + if not passed: + any_failed = True + failure_outputs.append(failure_output) + + if any_failed: + if args.runtime == RUNTIME_CODEX: + print(_codex_failure_feedback(args.action, failure_outputs), end="", file=sys.stderr) + sys.exit(BLOCKING_ERROR_CODE) + + if args.action == "slow" and not any_failed: + _emit(args.runtime, "All checks passed. Stop working.") + + # Clean up throttle state file after slow checks + if args.action == "slow" and session_id: + with contextlib.suppress(OSError): + os.remove(_get_state_file_path(session_id)) + + sys.exit(SUCCESS_CODE) + + +if __name__ == "__main__": + main() diff --git a/plugins/ox/.claude-plugin/plugin.json b/plugins/ox/.claude-plugin/plugin.json index e4895c8..da18a30 100644 --- a/plugins/ox/.claude-plugin/plugin.json +++ b/plugins/ox/.claude-plugin/plugin.json @@ -1,7 +1,7 @@ { "name": "ox", "description": "Base plugin — commit command, code quality hooks, auto-format and check hooks for all projects", - "version": "0.0.13", + "version": "0.0.15", "author": { "name": "Oxidian" } diff --git a/plugins/ox/codex/hooks.json b/plugins/ox/codex/hooks.json new file mode 100644 index 0000000..0101602 --- /dev/null +++ b/plugins/ox/codex/hooks.json @@ -0,0 +1,29 @@ +{ + "hooks": { + "PostToolUse": [ + { + "matcher": "apply_patch|Edit|Write", + "hooks": [ + { + "type": "command", + "command": "sh -c 'root=\"$(git rev-parse --show-toplevel 2>/dev/null || pwd)\"; bootstrap_dir=\"${CODEX_PLUGINS_BOOTSTRAP_DIR:-.codex/cc-plugins}\"; case \"$bootstrap_dir\" in /*) bootstrap_root=\"$bootstrap_dir\" ;; *) bootstrap_root=\"$root/$bootstrap_dir\" ;; esac; bootstrap_runner=\"$bootstrap_root/codex/plugins/ox/scripts/run_if_changed.py\"; repo_runner=\"$root/codex/plugins/ox/scripts/run_if_changed.py\"; if [ -f \"$bootstrap_runner\" ]; then runner=\"$bootstrap_runner\"; elif [ -f \"$repo_runner\" ]; then runner=\"$repo_runner\"; else echo \"ox hook runner not found; checked $bootstrap_runner and $repo_runner\" >&2; exit 2; fi; exec python3 \"$runner\" --runtime codex --action fast'", + "timeout": 30, + "statusMessage": "Running fast checks" + } + ] + } + ], + "Stop": [ + { + "hooks": [ + { + "type": "command", + "command": "sh -c 'root=\"$(git rev-parse --show-toplevel 2>/dev/null || pwd)\"; bootstrap_dir=\"${CODEX_PLUGINS_BOOTSTRAP_DIR:-.codex/cc-plugins}\"; case \"$bootstrap_dir\" in /*) bootstrap_root=\"$bootstrap_dir\" ;; *) bootstrap_root=\"$root/$bootstrap_dir\" ;; esac; bootstrap_runner=\"$bootstrap_root/codex/plugins/ox/scripts/run_if_changed.py\"; repo_runner=\"$root/codex/plugins/ox/scripts/run_if_changed.py\"; if [ -f \"$bootstrap_runner\" ]; then runner=\"$bootstrap_runner\"; elif [ -f \"$repo_runner\" ]; then runner=\"$repo_runner\"; else echo \"ox hook runner not found; checked $bootstrap_runner and $repo_runner\" >&2; exit 2; fi; exec python3 \"$runner\" --runtime codex --action slow'", + "timeout": 120, + "statusMessage": "Running final checks" + } + ] + } + ] + } +} diff --git a/plugins/ox/scripts/run_if_changed.py b/plugins/ox/scripts/run_if_changed.py index 8133ff4..f700f67 100644 --- a/plugins/ox/scripts/run_if_changed.py +++ b/plugins/ox/scripts/run_if_changed.py @@ -8,6 +8,7 @@ Usage: python run_if_changed.py --project-dir $CLAUDE_PROJECT_DIR --action fast python run_if_changed.py --project-dir $CLAUDE_PROJECT_DIR --action slow + python run_if_changed.py --runtime codex --action slow """ import argparse @@ -16,15 +17,64 @@ import os import subprocess import sys +from typing import TextIO # https://docs.anthropic.com/en/docs/claude-code/hooks#simple%3A-exit-code BLOCKING_ERROR_CODE = 2 SUCCESS_CODE = 0 +RUNTIME_CLAUDE = "claude" +RUNTIME_CODEX = "codex" +MAX_CODEX_FEEDBACK_CHARS = 20000 CONFIG_PATH = ".claude/ox-hooks.json" DEFAULT_FAST_EVERY = 5 +def _emit(runtime: str, message: str, *, file: TextIO = sys.stdout) -> None: + """Print hook output only for runtimes that accept plain text logs.""" + if runtime == RUNTIME_CLAUDE: + print(message, file=file) + + +def _git_root_or_cwd(cwd: str) -> str: + """Return the Git root for cwd, falling back to cwd outside Git repos.""" + try: + result = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + capture_output=True, + text=True, + cwd=cwd, + ) + except Exception: + return cwd + + if result.returncode == 0: + root = result.stdout.strip() + if root: + return root + return cwd + + +def _codex_project_dir(hook_input: dict | None) -> str: + """Derive the Codex project directory from hook input.""" + if hook_input: + cwd = hook_input.get("cwd") + if isinstance(cwd, str) and cwd: + return _git_root_or_cwd(cwd) + return _git_root_or_cwd(os.getcwd()) + + +def _codex_failure_feedback(action: str, failure_outputs: list[str]) -> str: + """Build the continuation prompt Codex receives when checks fail.""" + check_name = "Final checks" if action == "slow" else "Fast checks" + body = "\n\n".join(output.strip() for output in failure_outputs if output.strip()) + if len(body) > MAX_CODEX_FEEDBACK_CHARS: + body = body[-MAX_CODEX_FEEDBACK_CHARS:] + if body: + return f"{check_name} failed. Fix these issues before finishing.\n\n{body}\n" + return f"{check_name} failed. Re-run the configured checks and fix the failures before finishing.\n" + + def _get_state_file_path(session_id: str) -> str: """Return the path to the throttle state file for this session.""" return f"/tmp/ox-hooks-{session_id}.json" @@ -221,9 +271,9 @@ def directory_has_changes(changed_files: set[str], directory: str) -> bool: return any(f.startswith(prefix) for f in changed_files) -def run_check(command: str, cwd: str, action: str) -> bool: +def run_check(command: str, cwd: str, action: str, runtime: str) -> tuple[bool, str]: """Run a command in the given directory. Returns True on success.""" - print(f"Running `{command}` in {cwd}") + _emit(runtime, f"Running `{command}` in {cwd}") process = subprocess.Popen( command, @@ -235,43 +285,75 @@ def run_check(command: str, cwd: str, action: str) -> bool: bufsize=1, universal_newlines=True, ) + output_lines = [] + if process.stdout: + for line in process.stdout: + output_lines.append(line) process.wait() + output = "".join(output_lines) if process.returncode == 0: - if process.stdout: - for line in process.stdout: - print(line, end="") + if output and runtime == RUNTIME_CLAUDE: + print(output, end="") if action == "slow": - print("Checks passed.") + _emit(runtime, "Checks passed.") else: - print("Fast check completed successfully.") - return True + _emit(runtime, "Fast check completed successfully.") + return True, "" else: - if process.stdout: - for line in process.stdout: - print(line, end="", file=sys.stderr) + if output and runtime == RUNTIME_CLAUDE: + print(output, end="", file=sys.stderr) if action == "slow": - print("Checks failed. You must fix them.", file=sys.stderr) + _emit(runtime, "Checks failed. You must fix them.", file=sys.stderr) else: - print("Fast check failed. You must fix the issues.", file=sys.stderr) - return False + _emit(runtime, "Fast check failed. You must fix the issues.", file=sys.stderr) + + failure = f"Running `{command}` in {cwd}" + if output: + failure = f"{failure}\n{output.rstrip()}" + return False, failure def main() -> None: parser = argparse.ArgumentParser(description="Run fast/slow checks when files change") - parser.add_argument("--project-dir", required=True, help="Project root directory") + parser.add_argument("--project-dir", help="Project root directory") parser.add_argument( "--action", required=True, choices=["fast", "slow"], help="Action to run", ) + parser.add_argument( + "--runtime", + default=RUNTIME_CLAUDE, + choices=[RUNTIME_CLAUDE, RUNTIME_CODEX], + help="Hook runtime output semantics", + ) args = parser.parse_args() + # Read hook input from stdin before resolving Codex's project directory. + hook_input = None + session_id = "" + try: + stdin_data = sys.stdin.read() + if stdin_data: + parsed_input = json.loads(stdin_data) + if isinstance(parsed_input, dict): + hook_input = parsed_input + session_id = hook_input.get("session_id", "") + except (json.JSONDecodeError, Exception): + pass + + project_dir = args.project_dir + if not project_dir and args.runtime == RUNTIME_CODEX: + project_dir = _codex_project_dir(hook_input) + if not project_dir: + parser.error("--project-dir is required unless --runtime codex can derive cwd") + # Read config - config_file = os.path.join(args.project_dir, CONFIG_PATH) + config_file = os.path.join(project_dir, CONFIG_PATH) if not os.path.exists(config_file): - print(f"No {CONFIG_PATH} found, skipping") + _emit(args.runtime, f"No {CONFIG_PATH} found, skipping") sys.exit(SUCCESS_CODE) with open(config_file) as f: @@ -279,29 +361,19 @@ def main() -> None: checks = config.get("checks", []) if not checks: - print(f"No checks configured in {CONFIG_PATH}, skipping") + _emit(args.runtime, f"No checks configured in {CONFIG_PATH}, skipping") sys.exit(SUCCESS_CODE) - # Read hook input from stdin - hook_input = None - session_id = "" - try: - stdin_data = sys.stdin.read() - if stdin_data: - hook_input = json.loads(stdin_data) - session_id = hook_input.get("session_id", "") - if hook_input.get("permission_mode") == "plan": - print("Plan mode active, skipping") - sys.exit(SUCCESS_CODE) - if args.action == "slow" and _is_team_lead_session(session_id): - print("Agent team lead session, skipping stop checks") - sys.exit(SUCCESS_CODE) - except (json.JSONDecodeError, Exception): - pass + if hook_input and hook_input.get("permission_mode") == "plan": + _emit(args.runtime, "Plan mode active, skipping") + sys.exit(SUCCESS_CODE) + if hook_input and args.action == "slow" and _is_team_lead_session(session_id): + _emit(args.runtime, "Agent team lead session, skipping stop checks") + sys.exit(SUCCESS_CODE) # Skip fast checks for import-only edits if args.action == "fast" and hook_input and is_import_only_edit(hook_input): - print("Import-only edit detected, skipping fast check") + _emit(args.runtime, "Import-only edit detected, skipping fast check") sys.exit(SUCCESS_CODE) # Throttle fast checks — only run every Nth edit @@ -311,15 +383,16 @@ def main() -> None: edit_count = _load_edit_count(state_file) + 1 _save_edit_count(state_file, edit_count) if should_skip_throttled(edit_count, fast_every): - print(f"Throttled: edit {edit_count} (runs every {fast_every}), skipping fast check") + _emit(args.runtime, f"Throttled: edit {edit_count} (runs every {fast_every}), skipping fast check") sys.exit(SUCCESS_CODE) - changed_files = get_changed_files(args.project_dir) + changed_files = get_changed_files(project_dir) if not changed_files: - print("No files changed, skipping") + _emit(args.runtime, "No files changed, skipping") sys.exit(SUCCESS_CODE) any_failed = False + failure_outputs = [] for check in checks: command = check.get(args.action) @@ -330,21 +403,25 @@ def main() -> None: if directory: if not directory_has_changes(changed_files, directory): - print(f"No {directory}/ files modified, skipping") + _emit(args.runtime, f"No {directory}/ files modified, skipping") continue - cwd = os.path.join(args.project_dir, directory) + cwd = os.path.join(project_dir, directory) else: # Whole-project check — run at project root on any change - cwd = args.project_dir + cwd = project_dir - if not run_check(command, cwd, args.action): + passed, failure_output = run_check(command, cwd, args.action, args.runtime) + if not passed: any_failed = True + failure_outputs.append(failure_output) if any_failed: + if args.runtime == RUNTIME_CODEX: + print(_codex_failure_feedback(args.action, failure_outputs), end="", file=sys.stderr) sys.exit(BLOCKING_ERROR_CODE) if args.action == "slow" and not any_failed: - print("All checks passed. Stop working.") + _emit(args.runtime, "All checks passed. Stop working.") # Clean up throttle state file after slow checks if args.action == "slow" and session_id: diff --git a/scripts/generate_codex.py b/scripts/generate_codex.py index f592daa..fc97a5a 100644 --- a/scripts/generate_codex.py +++ b/scripts/generate_codex.py @@ -214,6 +214,7 @@ def codex_plugin_manifest(plugin_name: str) -> dict: author = source.get("author", {"name": "Oxidian"}) author_name = author.get("name", "Oxidian") if isinstance(author, dict) else str(author) description = source.get("description", f"{plugin_name} Codex skills") + hooks_path = PLUGINS_DIR / plugin_name / "codex" / "hooks.json" manifest = { "name": source.get("name", plugin_name), @@ -228,6 +229,8 @@ def codex_plugin_manifest(plugin_name: str) -> dict: "category": "Productivity", }, } + if hooks_path.exists(): + manifest["hooks"] = "./hooks.json" return manifest @@ -265,6 +268,19 @@ def generate_plugin_package(plugin_name: str, output_dir: Path = CODEX_PLUGINS_D include_plugin_dir=False, ) + hooks_path = PLUGINS_DIR / plugin_name / "codex" / "hooks.json" + if hooks_path.exists(): + plugin_out.mkdir(parents=True, exist_ok=True) + shutil.copy2(hooks_path, plugin_out / "hooks.json") + + scripts_dir = PLUGINS_DIR / plugin_name / "scripts" + if scripts_dir.exists(): + shutil.copytree( + scripts_dir, + plugin_out / "scripts", + ignore=shutil.ignore_patterns("__pycache__", "*.pyc"), + ) + write_json(plugin_out / ".codex-plugin" / "plugin.json", codex_plugin_manifest(plugin_name)) diff --git a/tests/ox/test_run_if_changed.py b/tests/ox/test_run_if_changed.py index 03ca400..e552428 100644 --- a/tests/ox/test_run_if_changed.py +++ b/tests/ox/test_run_if_changed.py @@ -1,6 +1,9 @@ """Tests for run_if_changed.py import detection logic.""" import importlib.util +import json +import shlex +import subprocess import sys from pathlib import Path from types import ModuleType @@ -23,6 +26,39 @@ _save_edit_count = run_if_changed._save_edit_count +def _command_for_script(script: Path) -> str: + return f"{shlex.quote(sys.executable)} {shlex.quote(str(script))}" + + +def _init_changed_repo(tmp_path: Path, command: str) -> Path: + subprocess.run(["git", "init"], cwd=tmp_path, check=True, capture_output=True, text=True) + (tmp_path / ".claude").mkdir() + (tmp_path / ".claude" / "ox-hooks.json").write_text( + json.dumps({"checks": [{"fast": command, "slow": command}], "fast_every": 1}) + "\n" + ) + (tmp_path / "changed.txt").write_text("changed\n") + subdir = tmp_path / "subdir" + subdir.mkdir() + return subdir + + +def _run_codex_hook(cwd: Path, action: str) -> subprocess.CompletedProcess[str]: + payload = { + "session_id": f"test-{action}", + "cwd": str(cwd), + "hook_event_name": "Stop" if action == "slow" else "PostToolUse", + "permission_mode": "default", + } + return subprocess.run( + [sys.executable, str(_script_path), "--runtime", "codex", "--action", action], + input=json.dumps(payload), + capture_output=True, + text=True, + cwd=cwd, + check=False, + ) + + class TestPythonImportOnly: """Tests for _is_python_import_only().""" @@ -186,3 +222,30 @@ def test_path_includes_session_id(self) -> None: path = _get_state_file_path("abc-123") assert "abc-123" in path assert path.startswith("/tmp/") + + +class TestCodexRuntime: + """Tests for Codex hook output semantics.""" + + def test_success_has_no_output_and_derives_project_dir_from_cwd(self, tmp_path: Path) -> None: + check_script = tmp_path / "check.py" + check_script.write_text("print('ok')\n") + subdir = _init_changed_repo(tmp_path, _command_for_script(check_script)) + + result = _run_codex_hook(subdir, "slow") + + assert result.returncode == 0 + assert result.stdout == "" + assert result.stderr == "" + + def test_failure_exits_two_with_feedback_on_stderr(self, tmp_path: Path) -> None: + check_script = tmp_path / "check.py" + check_script.write_text("import sys\nprint('bad check output')\nsys.exit(1)\n") + subdir = _init_changed_repo(tmp_path, _command_for_script(check_script)) + + result = _run_codex_hook(subdir, "slow") + + assert result.returncode == 2 + assert result.stdout == "" + assert "Final checks failed. Fix these issues before finishing." in result.stderr + assert "bad check output" in result.stderr diff --git a/tests/test_generate_codex.py b/tests/test_generate_codex.py index 9ae777f..d306761 100644 --- a/tests/test_generate_codex.py +++ b/tests/test_generate_codex.py @@ -1,6 +1,9 @@ """Tests for generate_codex.py.""" import importlib.util +import json +import os +import subprocess import sys import textwrap from pathlib import Path @@ -337,9 +340,38 @@ def test_generates_manifest_and_local_skills(self, tmp_path: Path, monkeypatch: skill = (output_dir / "oxgh" / "skills" / "open-pr" / "SKILL.md").read_text() assert '"name": "oxgh"' in manifest assert '"skills": "./skills/"' in manifest + assert '"hooks"' not in manifest assert "name: open-pr" in skill assert "allowed-tools" not in skill + def test_generates_plugin_hooks_and_scripts(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + plugins_dir = tmp_path / "plugins" + plugin_dir = plugins_dir / "ox" + (plugin_dir / ".claude-plugin").mkdir(parents=True) + (plugin_dir / ".claude-plugin" / "plugin.json").write_text( + "{\n" + ' "name": "ox",\n' + ' "description": "Base plugin",\n' + ' "version": "0.1.0",\n' + ' "author": {"name": "Oxidian"}\n' + "}\n" + ) + (plugin_dir / "codex").mkdir() + (plugin_dir / "codex" / "hooks.json").write_text('{"hooks": {"Stop": []}}\n') + (plugin_dir / "scripts" / "__pycache__").mkdir(parents=True) + (plugin_dir / "scripts" / "run_if_changed.py").write_text("# runner\n") + (plugin_dir / "scripts" / "__pycache__" / "run_if_changed.pyc").write_text("cache\n") + monkeypatch.setattr(generate_codex, "PLUGINS_DIR", plugins_dir) + + output_dir = tmp_path / "codex" / "plugins" + generate_plugin_package("ox", output_dir) + + manifest = (output_dir / "ox" / ".codex-plugin" / "plugin.json").read_text() + assert '"hooks": "./hooks.json"' in manifest + assert (output_dir / "ox" / "hooks.json").read_text() == '{"hooks": {"Stop": []}}\n' + assert (output_dir / "ox" / "scripts" / "run_if_changed.py").read_text() == "# runner\n" + assert not (output_dir / "ox" / "scripts" / "__pycache__").exists() + def test_writes_marketplace(self, tmp_path: Path) -> None: marketplace = tmp_path / ".agents" / "plugins" / "marketplace.json" @@ -356,6 +388,118 @@ def test_writes_marketplace(self, tmp_path: Path) -> None: class TestEndToEnd: """Test processing actual SKILL.md files from the repo.""" + @staticmethod + def _ox_fast_hook_command() -> str: + hooks_path = PLUGINS_DIR.parent / "codex" / "plugins" / "ox" / "hooks.json" + hooks = json.loads(hooks_path.read_text()) + return hooks["hooks"]["PostToolUse"][0]["hooks"][0]["command"] + + @staticmethod + def _write_runner(path: Path, label: str) -> None: + path.parent.mkdir(parents=True) + path.write_text( + "import os\n" + "import sys\n" + "from pathlib import Path\n" + f"Path(os.environ['OX_HOOK_MARKER']).write_text('{label} ' + ' '.join(sys.argv[1:]))\n" + f"print('{label} runner')\n" + ) + + def test_ox_codex_hooks_use_installed_runner_path(self) -> None: + hooks_path = PLUGINS_DIR.parent / "codex" / "plugins" / "ox" / "hooks.json" + hooks_text = hooks_path.read_text() + hooks = json.loads(hooks_text) + + fast_command = hooks["hooks"]["PostToolUse"][0]["hooks"][0]["command"] + slow_command = hooks["hooks"]["Stop"][0]["hooks"][0]["command"] + bootstrap_default = 'bootstrap_dir="${CODEX_PLUGINS_BOOTSTRAP_DIR:-.codex/cc-plugins}"' + bootstrap_relative_resolution = ( + 'case "$bootstrap_dir" in /*) bootstrap_root="$bootstrap_dir" ;; ' + '*) bootstrap_root="$root/$bootstrap_dir" ;; esac' + ) + bootstrap_runner_path = "$bootstrap_root/codex/plugins/ox/scripts/run_if_changed.py" + repo_runner_path = "$root/codex/plugins/ox/scripts/run_if_changed.py" + + assert "./scripts/run_if_changed.py" not in hooks_text + assert "~/.codex/plugins/cache" not in hooks_text + assert fast_command.startswith("sh -c ") + assert slow_command.startswith("sh -c ") + assert bootstrap_default in fast_command + assert bootstrap_default in slow_command + assert bootstrap_relative_resolution in fast_command + assert bootstrap_relative_resolution in slow_command + assert bootstrap_runner_path in fast_command + assert repo_runner_path in fast_command + assert bootstrap_runner_path in slow_command + assert repo_runner_path in slow_command + assert "exit 2" in fast_command + assert "exit 2" in slow_command + assert "--runtime codex --action fast" in fast_command + assert "--runtime codex --action slow" in slow_command + + @pytest.mark.parametrize("bootstrap_kind", ["relative", "absolute"]) + def test_ox_codex_hook_uses_custom_bootstrap_runner(self, tmp_path: Path, bootstrap_kind: str) -> None: + fast_command = self._ox_fast_hook_command() + + repo = tmp_path / "repo" + workdir = repo / "nested" / "dir" + marker = tmp_path / "marker.txt" + workdir.mkdir(parents=True) + subprocess.run(["git", "init", "--quiet"], cwd=repo, check=True) + + if bootstrap_kind == "relative": + bootstrap_env = "custom-bootstrap" + bootstrap_root = repo / bootstrap_env + else: + bootstrap_root = tmp_path / "custom-bootstrap" + bootstrap_env = str(bootstrap_root) + + runner = bootstrap_root / "codex" / "plugins" / "ox" / "scripts" / "run_if_changed.py" + self._write_runner(runner, bootstrap_kind) + + result = subprocess.run( + fast_command, + cwd=workdir, + env={ + **os.environ, + "CODEX_PLUGINS_BOOTSTRAP_DIR": bootstrap_env, + "OX_HOOK_MARKER": str(marker), + }, + shell=True, + check=False, + capture_output=True, + text=True, + ) + + assert result.returncode == 0, result.stderr + assert result.stdout == f"{bootstrap_kind} runner\n" + assert marker.read_text() == f"{bootstrap_kind} --runtime codex --action fast" + + def test_ox_codex_hook_falls_back_to_repo_local_runner(self, tmp_path: Path) -> None: + fast_command = self._ox_fast_hook_command() + + repo = tmp_path / "repo" + runner = repo / "codex" / "plugins" / "ox" / "scripts" / "run_if_changed.py" + workdir = repo / "nested" / "dir" + marker = tmp_path / "marker.txt" + workdir.mkdir(parents=True) + self._write_runner(runner, "repo-local") + subprocess.run(["git", "init", "--quiet"], cwd=repo, check=True) + + result = subprocess.run( + fast_command, + cwd=workdir, + env={**os.environ, "OX_HOOK_MARKER": str(marker)}, + shell=True, + check=False, + capture_output=True, + text=True, + ) + + assert result.returncode == 0, result.stderr + assert result.stdout == "repo-local runner\n" + assert marker.read_text() == "repo-local --runtime codex --action fast" + def test_commit_skill(self, tmp_path: Path) -> None: skill_dir = PLUGINS_DIR / "ox" / "skills" / "commit" process_skill("ox", skill_dir, tmp_path)