diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index b2e8defb18..202af876d9 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -810,6 +810,18 @@ def workflow_run( "--json", help="Emit the run outcome as a single JSON object instead of formatted text.", ), + dry_run: bool = typer.Option( + False, + "--dry-run", + help=( + "Preview the workflow without dispatching any AI or shell " + "commands for built-in command, prompt, and gate steps. " + "Those steps emit a synthetic preview message; the run is " + "persisted so it can be inspected but not resumed to a " + "real run. Other step types (e.g. init, shell) may still " + "perform their normal work during dry-run." + ), + ), ): """Run a workflow from an installed ID or local YAML path.""" from .workflows import load_custom_steps @@ -857,20 +869,52 @@ def workflow_run( # Parse inputs inputs = _parse_input_values(input_values) + if dry_run and not json_output: + console.print( + "\n[bold yellow]DRY RUN:[/bold yellow] previewing built-in " + "command, prompt, and gate steps without dispatching. " + "Other step types (e.g. shell, init) may still execute." + ) + if not json_output: console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})") console.print(f"[dim]Version: {definition.version}[/dim]\n") try: with _stdout_to_stderr_when(json_output): - state = engine.execute(definition, inputs) + state = engine.execute(definition, inputs, dry_run=dry_run) except ValueError as exc: + if dry_run and not json_output: + _print_dry_run_previews(getattr(exc, "partial_state", None)) + if json_output: + partial = getattr(exc, "partial_state", None) + if partial: + _emit_workflow_json(_workflow_run_payload(partial)) + raise typer.Exit( + _run_outcome_exit_code( + partial.status.value if partial else "failed" + ) + ) console.print(f"[red]Error:[/red] {exc}") raise typer.Exit(1) except Exception as exc: + if dry_run and not json_output: + _print_dry_run_previews(getattr(exc, "partial_state", None)) + if json_output: + partial = getattr(exc, "partial_state", None) + if partial: + _emit_workflow_json(_workflow_run_payload(partial)) + raise typer.Exit( + _run_outcome_exit_code( + partial.status.value if partial else "failed" + ) + ) console.print(f"[red]Workflow failed:[/red] {exc}") raise typer.Exit(1) + if dry_run and not json_output: + _print_dry_run_previews(state) + if json_output: _emit_workflow_json(_workflow_run_payload(state)) raise typer.Exit(_run_outcome_exit_code(state.status.value)) @@ -891,6 +935,44 @@ def workflow_run( raise typer.Exit(_run_outcome_exit_code(state.status.value)) +def _print_dry_run_previews(state: Any) -> None: + """Print the dry-run preview message emitted by each step. + + Called by ``workflow run`` after a successful dry-run and from + exception handlers so a mid-run failure still surfaces the + previews resolved by earlier steps. Skipped silently when + ``state`` is ``None`` (e.g. the engine raised before any step + ran) or when the run did not include a dry-run step. + """ + if state is None: + return + step_results = getattr(state, "step_results", None) or {} + if not step_results: + return + console.print("\n[bold yellow]DRY RUN previews:[/bold yellow]") + for step_id, result in step_results.items(): + if not isinstance(result, dict): + continue + output = result.get("output") or {} + if not output.get("dry_run"): + continue + step_id_display = _escape_markup(str(step_id)) + preview = output.get("dry_run_message") or output.get("message") or "" + preview_escaped = _escape_markup(preview) + console.print(f" [cyan][{step_id_display}][/cyan] {preview_escaped}") + + +from rich.markup import escape as _rich_escape_markup + + +def _escape_markup(text: str) -> str: + """Escape Rich markup characters so user-controlled text can be + printed safely. Delegates to ``rich.markup.escape`` for canonical + handling of ``[``, ``]``, ``{``, ``}``, and other special chars. + """ + return _rich_escape_markup(text) + + @workflow_app.command("resume") def workflow_resume( run_id: str = typer.Argument(..., help="Run ID to resume"), @@ -922,12 +1004,19 @@ def workflow_resume( console.print(f"[red]Error:[/red] Run not found: {run_id}") raise typer.Exit(1) except ValueError as exc: + if getattr(state, "dry_run", False) and not json_output: + _print_dry_run_previews(getattr(exc, "partial_state", None)) console.print(f"[red]Error:[/red] {exc}") raise typer.Exit(1) except Exception as exc: + if getattr(state, "dry_run", False) and not json_output: + _print_dry_run_previews(getattr(exc, "partial_state", None)) console.print(f"[red]Resume failed:[/red] {exc}") raise typer.Exit(1) + if getattr(state, "dry_run", False) and not json_output: + _print_dry_run_previews(state) + if json_output: _emit_workflow_json(_workflow_run_payload(state)) raise typer.Exit(_run_outcome_exit_code(state.status.value)) diff --git a/src/specify_cli/workflows/base.py b/src/specify_cli/workflows/base.py index b61fdb1a08..0fdc4948a9 100644 --- a/src/specify_cli/workflows/base.py +++ b/src/specify_cli/workflows/base.py @@ -74,6 +74,21 @@ class StepContext: #: Current run ID. run_id: str | None = None + #: When ``True``, the built-in step implementations + #: (``command`` / ``prompt`` / ``gate``) short-circuit and return a + #: synthetic ``StepResult`` carrying a preview of what would have + #: been dispatched — no subprocess, no CLI call, no network I/O for + #: those step types. Custom steps and built-in steps that have not + #: been updated to honor ``dry_run`` may still perform their normal + #: side effects; the flag is opt-in per step. Step implementations + #: publish the preview on ``output["dry_run_message"]`` (consumed + #: by the CLI's preview loop). ``output["message"]`` preserves the + #: step's original value (e.g. the gate prompt or command name) so + #: ``{{ steps..output.message }}`` remains stable across dry-run + #: and real execution. Downstream templates that need the preview + #: text should reference ``output.dry_run_message``. + dry_run: bool = False + @dataclass class StepResult: diff --git a/src/specify_cli/workflows/engine.py b/src/specify_cli/workflows/engine.py index f463bc66c1..c9eec174e4 100644 --- a/src/specify_cli/workflows/engine.py +++ b/src/specify_cli/workflows/engine.py @@ -334,6 +334,11 @@ def __init__( self.created_at = datetime.now(timezone.utc).isoformat() self.updated_at = self.created_at self.log_entries: list[dict[str, Any]] = [] + #: Whether the run was started in dry-run mode. Persisted via + #: :meth:`save` so :meth:`load` (and the resumed run's + #: ``StepContext``) can keep the run in preview mode across + #: process restarts. + self.dry_run: bool = False @property def runs_dir(self) -> Path: @@ -352,6 +357,7 @@ def save(self) -> None: "current_step_index": self.current_step_index, "current_step_id": self.current_step_id, "step_results": self.step_results, + "dry_run": self.dry_run, "created_at": self.created_at, "updated_at": self.updated_at, } @@ -398,6 +404,7 @@ def load(cls, run_id: str, project_root: Path) -> RunState: state.step_results = state_data.get("step_results", {}) state.created_at = state_data.get("created_at", "") state.updated_at = state_data.get("updated_at", "") + state.dry_run = state_data.get("dry_run", False) inputs_path = runs_dir / "inputs.json" if inputs_path.exists(): @@ -478,6 +485,7 @@ def execute( definition: WorkflowDefinition, inputs: dict[str, Any] | None = None, run_id: str | None = None, + dry_run: bool = False, ) -> RunState: """Execute a workflow definition. @@ -489,6 +497,21 @@ def execute( User-provided input values. run_id: Optional run ID (uses SPECKIT_WORKFLOW_RUN_ID when set, otherwise auto-generated). + dry_run: + Preview-only mode. When ``True``, the built-in ``command``, + ``prompt`` and ``gate`` step implementations skip + side-effecting work (AI invocations, interactive prompts, + subprocess dispatches) and emit a synthetic + ``dry_run_message`` instead. Other built-in steps (``init``, + ``shell``, custom user-registered steps) currently still + execute their normal logic during a dry run; the flag is + opt-in per step. ``dry_run`` propagates into each step's + ``StepContext`` and is persisted on the resulting + ``RunState`` so ``resume()`` keeps the run in preview mode + across restarts. Step ``output`` shape is unchanged; + downstream ``switch``/``do-while`` gates coerce any + dry-run-only fields (e.g. ``output.choice``) so the preview + branch is deterministic. Returns ------- @@ -507,6 +530,7 @@ def execute( workflow_id=definition.id, project_root=self.project_root, ) + state.dry_run = dry_run # Persist a copy of the workflow definition so resume can # reload it even if the original source is no longer available @@ -531,6 +555,7 @@ def execute( default_options=definition.default_options, project_root=str(self.project_root), run_id=state.run_id, + dry_run=dry_run, ) # Execute steps @@ -545,6 +570,10 @@ def execute( state.status = RunStatus.FAILED state.append_log({"event": "workflow_failed", "error": str(exc)}) state.save() + # Attach the partially-populated state so the CLI can render + # any dry-run previews resolved by earlier steps when the + # engine raises mid-run (e.g. template resolution failure). + exc.partial_state = state # type: ignore[attr-defined] raise if state.status == RunStatus.RUNNING: @@ -587,7 +616,8 @@ def resume( merged = {**state.inputs, **inputs} state.inputs = self._resolve_inputs(definition, merged) - # Restore context + # Restore context — including the persisted ``dry_run`` flag so an + # interrupted dry-run stays a dry-run after a process restart. context = StepContext( inputs=state.inputs, steps=state.step_results, @@ -596,6 +626,7 @@ def resume( default_options=definition.default_options, project_root=str(self.project_root), run_id=state.run_id, + dry_run=state.dry_run, ) from . import STEP_REGISTRY @@ -622,6 +653,10 @@ def resume( state.status = RunStatus.FAILED state.append_log({"event": "resume_failed", "error": str(exc)}) state.save() + # Same preview surface as ``execute()`` — when the engine + # raises mid-resume the CLI wants the partially-resolved + # dry-run previews for debugging. + exc.partial_state = state # type: ignore[attr-defined] raise if state.status == RunStatus.RUNNING: diff --git a/src/specify_cli/workflows/steps/command/__init__.py b/src/specify_cli/workflows/steps/command/__init__.py index 891b9da4e7..4b7f31ff2f 100644 --- a/src/specify_cli/workflows/steps/command/__init__.py +++ b/src/specify_cli/workflows/steps/command/__init__.py @@ -55,9 +55,6 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: # Attempt CLI dispatch args_str = str(resolved_input.get("args", "")) - dispatch_result = self._try_dispatch( - command, integration, model, args_str, context - ) output: dict[str, Any] = { "command": command, @@ -67,11 +64,64 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: "input": resolved_input, } + # Dry-run short-circuit — surface a synthetic preview of what a + # real run would have dispatched, without invoking the CLI. + if context.dry_run: + preview_invocation: str | None = None + if integration: + try: + from specify_cli.integrations import get_integration + + impl = get_integration(integration) + except (ImportError, AttributeError, TypeError): + impl = None + if impl is not None: + try: + preview_invocation = impl.build_command_invocation( + command, args_str + ) + except (ImportError, AttributeError, TypeError): + # ImportError: integrations module not importable in + # minimal environments or test sandboxes. + # AttributeError: integration class is missing + # ``build_command_invocation`` (older integration + # API). + # TypeError: integration returned a non-string value + # from ``build_command_invocation``. + # Anything else (e.g. a real bug inside the + # integration) bubbles up so it's not silently + # masked by the dry-run preview path. + preview_invocation = None + if preview_invocation: + preview = f"DRY RUN: would invoke {preview_invocation!r}" + else: + preview = ( + f"DRY RUN: would invoke command {command!r} " + f"(integration {integration!r}, args {args_str!r})" + ) + output["exit_code"] = 0 + output["dispatched"] = False + output["executed"] = False + output["dry_run"] = True + output["dry_run_message"] = preview + # Preserve the original command/integration/input so + # ``{{ steps..output.message }}`` keeps resolving to the + # original command description for downstream templates. + output["message"] = command + output["invoke_command"] = preview_invocation or command + return StepResult(status=StepStatus.COMPLETED, output=output) + + dispatch_result = self._try_dispatch( + command, integration, model, args_str, context + ) + if dispatch_result is not None: output["exit_code"] = dispatch_result["exit_code"] output["stdout"] = dispatch_result["stdout"] output["stderr"] = dispatch_result["stderr"] output["dispatched"] = True + output["executed"] = True + output["dry_run"] = False if dispatch_result["exit_code"] != 0: return StepResult( status=StepStatus.FAILED, @@ -85,6 +135,8 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: else: output["exit_code"] = 1 output["dispatched"] = False + output["executed"] = False + output["dry_run"] = False return StepResult( status=StepStatus.FAILED, output=output, diff --git a/src/specify_cli/workflows/steps/gate/__init__.py b/src/specify_cli/workflows/steps/gate/__init__.py index a2e473244e..a1c519b018 100644 --- a/src/specify_cli/workflows/steps/gate/__init__.py +++ b/src/specify_cli/workflows/steps/gate/__init__.py @@ -40,7 +40,7 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: if isinstance(message, str) and "{{" in message: message = evaluate_expression(message, context) - options = config.get("options", ["approve", "reject"]) + options = self._coerce_options(config.get("options", ["approve", "reject"])) on_reject = config.get("on_reject", "abort") show_file = config.get("show_file") @@ -61,6 +61,22 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: "choice": None, } + # Dry-run short-circuit — pick a deterministic first-non-sentinel + # choice so downstream ``switch``/``do-while`` branches resolve + # without an interactive prompt. We never want a dry-run to + # route into the ``reject``/``abort`` paths, so explicit + # ``reject`` / ``abort`` options are skipped over. + if context.dry_run: + preview_choice = self._first_non_sentinel(options) + preview = f"DRY RUN: gate skipped (would choose {preview_choice!r})" + output["choice"] = preview_choice + output["dry_run"] = True + output["dry_run_message"] = preview + # Preserve the original message so + # ``{{ steps..output.message }}`` keeps resolving to the + # gate prompt for downstream template references. + return StepResult(status=StepStatus.COMPLETED, output=output) + # Non-interactive: pause for later resume (the file is not read here) if not sys.stdin.isatty(): return StepResult(status=StepStatus.PAUSED, output=output) @@ -89,6 +105,42 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: return StepResult(status=StepStatus.COMPLETED, output=output) + @staticmethod + def _coerce_options(options: Any) -> list[str]: + """Normalize gate ``options`` into a list of strings. + + A YAML literal can pass through as ``None``, a bare string, a + dict, or a non-string sequence. Without coercion any of these + would crash ``_prompt`` (which iterates and indexes). Returns + the default ``["approve", "reject"]`` when no usable options + are present so the gate always has a valid choice surface. + """ + if options is None: + return ["approve", "reject"] + if isinstance(options, str): + return [options] + if isinstance(options, dict): + return [str(k) for k in options.keys()] + try: + coerced = [str(o) for o in options] + except TypeError: + return ["approve", "reject"] + return coerced or ["approve", "reject"] + + @staticmethod + def _first_non_sentinel(options: list[str]) -> str: + """Return the first option that is not a reject/abort sentinel. + + Dry-run picks the first non-sentinel so the preview branch + never routes into a FAILED / ABORTED downstream outcome; if + every option is a sentinel, fall back to the first one so the + gate still resolves. + """ + for opt in options: + if opt.lower() not in ("reject", "abort"): + return opt + return options[0] + @classmethod def _compose_prompt(cls, message: object, show_file: str | None) -> str: """Build the gate's display text. @@ -180,9 +232,9 @@ def validate(self, config: dict[str, Any]) -> list[str]: f"Gate step {config.get('id', '?')!r} is missing 'message' field." ) options = config.get("options", ["approve", "reject"]) - if not isinstance(options, list) or not options: + if not isinstance(options, (list, tuple)) or not options: errors.append( - f"Gate step {config.get('id', '?')!r}: 'options' must be a non-empty list." + f"Gate step {config.get('id', '?')!r}: 'options' must be a non-empty list or tuple." ) elif not all(isinstance(o, str) for o in options): errors.append( diff --git a/src/specify_cli/workflows/steps/prompt/__init__.py b/src/specify_cli/workflows/steps/prompt/__init__.py index 5ec99b794d..602b4f85e2 100644 --- a/src/specify_cli/workflows/steps/prompt/__init__.py +++ b/src/specify_cli/workflows/steps/prompt/__init__.py @@ -52,22 +52,39 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: if model and isinstance(model, str) and "{{" in model: model = evaluate_expression(model, context) - # Attempt CLI dispatch - dispatch_result = self._try_dispatch( - prompt, integration, model, context - ) - output: dict[str, Any] = { "prompt": prompt, "integration": integration, "model": model, } + # Dry-run short-circuit — emit a synthetic preview without + # dispatching to the integration CLI. + if context.dry_run: + preview = f"DRY RUN: would prompt integration {integration!r}: {prompt!r}" + output["exit_code"] = 0 + output["dispatched"] = False + output["executed"] = False + output["dry_run"] = True + output["dry_run_message"] = preview + # Preserve the original prompt so + # ``{{ steps..output.message }}`` keeps resolving to the + # original prompt text for downstream templates. + output["message"] = prompt + return StepResult(status=StepStatus.COMPLETED, output=output) + + # Attempt CLI dispatch + dispatch_result = self._try_dispatch( + prompt, integration, model, context + ) + if dispatch_result is not None: output["exit_code"] = dispatch_result["exit_code"] output["stdout"] = dispatch_result["stdout"] output["stderr"] = dispatch_result["stderr"] output["dispatched"] = True + output["executed"] = True + output["dry_run"] = False if dispatch_result["exit_code"] != 0: return StepResult( status=StepStatus.FAILED, @@ -84,6 +101,8 @@ def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: else: output["exit_code"] = 1 output["dispatched"] = False + output["executed"] = False + output["dry_run"] = False return StepResult( status=StepStatus.FAILED, output=output, diff --git a/tests/test_dry_run.py b/tests/test_dry_run.py new file mode 100644 index 0000000000..18461f3c83 --- /dev/null +++ b/tests/test_dry_run.py @@ -0,0 +1,501 @@ +"""Dry-run tests for ``specify workflow run --dry-run``. + +Scoped to the engine-only changes in this branch: +- ``StepContext.dry_run`` flag +- ``RunState.dry_run`` field (persisted via save/load) +- ``WorkflowEngine.execute(..., dry_run=...)`` + resume() restores it +- ``CommandStep`` / ``PromptStep`` / ``GateStep`` short-circuit on + ``context.dry_run`` +- ``specify workflow run --dry-run`` flag + preview rendering + +Intentionally NOT covered here: +- ``init`` step registration (pre-existing test, not dry-run related) +- from_json filter / Win test stability (separate concerns) +- preset extraction (already on ``main``, not in this PR) +""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +import pytest + + +# -- Helpers --------------------------------------------------------------- + + +@pytest.fixture +def project_dir(tmp_path: Path) -> Path: + """Create a minimal spec-kit project for workflow runs.""" + specify_dir = tmp_path / ".specify" + specify_dir.mkdir() + (specify_dir / "workflows").mkdir() + return tmp_path + + +def _write_wf(project_dir: Path, text: str, name: str) -> Path: + path = project_dir / f"{name}.yml" + path.write_text(text, encoding="utf-8") + return path + + +def _invoke(project_dir: Path, args: list[str]): + from typer.testing import CliRunner + from specify_cli import app + + runner = CliRunner() + with patch.object(Path, "cwd", return_value=project_dir): + return runner.invoke(app, args, catch_exceptions=False) + + +# -- Step-level: CommandStep ---------------------------------------------- + + +class TestCommandStepDryRun: + def test_returns_completed_without_dispatch(self, tmp_path: Path) -> None: + from specify_cli.workflows.base import StepContext, StepStatus + from specify_cli.workflows.steps.command import CommandStep + + step = CommandStep() + ctx = StepContext( + inputs={"name": "login"}, + default_integration="claude", + project_root=str(tmp_path), + dry_run=True, + ) + config = { + "id": "test", + "command": "speckit.specify", + "input": {"args": "{{ inputs.name }}"}, + } + result = step.execute(config, ctx) + + assert result.status == StepStatus.COMPLETED + assert result.output["dry_run"] is True + assert result.output["dispatched"] is False + assert result.output["executed"] is False + assert result.output["command"] == "speckit.specify" + assert result.output["input"]["args"] == "login" + assert "DRY RUN" in result.output["dry_run_message"] + assert result.output["message"] == "speckit.specify" + assert result.output["dry_run_message"] != result.output["message"] + + def test_falls_back_when_no_integration(self, tmp_path: Path) -> None: + from specify_cli.workflows.base import StepContext, StepStatus + from specify_cli.workflows.steps.command import CommandStep + + step = CommandStep() + ctx = StepContext( + inputs={"name": "login"}, + default_integration=None, + project_root=str(tmp_path), + dry_run=True, + ) + config = { + "id": "test", + "command": "speckit.specify", + "input": {"args": "{{ inputs.name }}"}, + } + result = step.execute(config, ctx) + + assert result.status == StepStatus.COMPLETED + assert "speckit.specify" in result.output["message"] + assert "DRY RUN" in result.output["dry_run_message"] + + +# -- Step-level: PromptStep ----------------------------------------------- + + +class TestPromptStepDryRun: + def test_prompt_short_circuits(self, tmp_path: Path) -> None: + from specify_cli.workflows.base import StepContext, StepStatus + from specify_cli.workflows.steps.prompt import PromptStep + + step = PromptStep() + ctx = StepContext( + inputs={"file": "auth.py"}, + default_integration="claude", + project_root=str(tmp_path), + dry_run=True, + ) + config = { + "id": "review", + "type": "prompt", + "prompt": "Review {{ inputs.file }} for security issues", + } + result = step.execute(config, ctx) + + assert result.status == StepStatus.COMPLETED + assert result.output["dry_run"] is True + assert result.output["executed"] is False + assert result.output["dispatched"] is False + assert result.output["exit_code"] == 0 + assert "DRY RUN" in result.output["dry_run_message"] + assert result.output["message"] == "Review auth.py for security issues" + assert result.output["dry_run_message"] != result.output["message"] + + +# -- Step-level: GateStep ------------------------------------------------- + + +class TestGateStepDryRun: + def test_skips_interactive_prompt(self) -> None: + from specify_cli.workflows.base import StepContext, StepStatus + from specify_cli.workflows.steps.gate import GateStep + + step = GateStep() + ctx = StepContext(dry_run=True) + config = { + "id": "review", + "message": "Review the spec.", + "options": ["approve", "reject"], + "on_reject": "abort", + } + result = step.execute(config, ctx) + + assert result.status == StepStatus.COMPLETED + assert result.output["dry_run"] is True + # Original ``message`` preserved so downstream + # ``{{ steps..output.message }}`` references still resolve. + assert result.output["message"] == "Review the spec." + assert "DRY RUN" in result.output["dry_run_message"] + assert result.output["dry_run_message"] != result.output["message"] + # First non-sentinel option is the preview choice. + assert result.output["choice"] == "approve" + + def test_skips_reject_sentinels_for_choice(self) -> None: + from specify_cli.workflows.base import StepContext, StepStatus + from specify_cli.workflows.steps.gate import GateStep + + step = GateStep() + ctx = StepContext(dry_run=True) + config = { + "id": "review", + "message": "Review the spec.", + "options": ["reject", "approve", "skip"], + } + result = step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + assert result.output["choice"] == "approve" + + def test_accepts_tuple_options(self) -> None: + from specify_cli.workflows.base import StepContext, StepStatus + from specify_cli.workflows.steps.gate import GateStep + + step = GateStep() + ctx = StepContext(dry_run=True) + config = { + "id": "review", + "message": "Review.", + "options": ("approve", "reject"), + } + result = step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + assert result.output["options"] == ["approve", "reject"] + assert result.output["choice"] == "approve" + + def test_normalizes_non_list_options(self) -> None: + from specify_cli.workflows.base import StepContext, StepStatus + from specify_cli.workflows.steps.gate import GateStep + + step = GateStep() + ctx = StepContext(dry_run=True) + for bad in (None, "approve,reject", {"approve": True}, 42): + config = { + "id": "review", + "message": "Review.", + "options": bad, + } + result = step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + + +# -- Engine-level --------------------------------------------------------- + + +class TestWorkflowEngineDryRun: + def test_execute_dry_run_short_circuits_command_steps( + self, project_dir: Path + ) -> None: + from specify_cli.workflows.base import RunStatus + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "dryrun-test" + name: "Dry Run Test" + version: "1.0.0" +inputs: + spec: + type: string + default: "test spec" +steps: + - id: specify + command: speckit.specify + input: + args: "{{ inputs.spec }}" + - id: plan + command: speckit.plan + input: + args: "{{ inputs.spec }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + + with patch( + "specify_cli.workflows.steps.command.shutil.which", + return_value="/usr/local/bin/claude", + ), patch("subprocess.run") as mock_run: + state = engine.execute( + definition, {"spec": "login feature"}, dry_run=True + ) + + assert state.status == RunStatus.COMPLETED + assert state.step_results["specify"]["output"]["dry_run"] is True + assert state.step_results["specify"]["output"]["dispatched"] is False + assert state.step_results["plan"]["output"]["dry_run"] is True + # Crucial: subprocess.run was never invoked in dry-run mode. + assert mock_run.call_count == 0 + + def test_dry_run_persisted_in_run_state(self, project_dir: Path) -> None: + from specify_cli.workflows.engine import ( + RunState, + WorkflowEngine, + WorkflowDefinition, + ) + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "dryrun-persist" + name: "Dry Run Persist" + version: "1.0.0" +steps: + - id: only + type: gate + message: "Continue?" + options: ["yes", "no"] +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + state = engine.execute(definition, dry_run=True, run_id="dr-persist") + + assert state.dry_run is True + + reloaded = RunState.load("dr-persist", project_dir) + assert reloaded.dry_run is True + + def test_resume_restores_dry_run( + self, project_dir: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + from specify_cli.workflows.base import RunStatus + from specify_cli.workflows.engine import ( + RunState, + WorkflowEngine, + WorkflowDefinition, + ) + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "dryrun-resume" + name: "Dry Run Resume" + version: "1.0.0" +steps: + - id: only + type: gate + message: "Continue?" + options: ["yes", "no"] +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + + # Persist a paused dry-run with a known run_id, including a copy + # of the workflow YAML so resume() can reload the definition. + run_id = "dr-resume" + state = RunState( + run_id=run_id, + workflow_id=definition.id, + project_root=project_dir, + ) + state.status = RunStatus.PAUSED + state.dry_run = True + state.save() + run_dir = project_dir / ".specify" / "workflows" / "runs" / run_id + run_dir.mkdir(parents=True, exist_ok=True) + (run_dir / "workflow.yml").write_text(yaml_str, encoding="utf-8") + + captured: dict[str, bool] = {} + + def spy_execute_steps(steps, context, state, registry, *, step_offset=0): + captured["dry_run"] = context.dry_run + return None + + monkeypatch.setattr(engine, "_execute_steps", spy_execute_steps) + engine.resume(run_id) + + assert captured.get("dry_run") is True + + def test_execute_attaches_partial_state_on_exception( + self, project_dir: Path + ) -> None: + """When the engine raises mid-run, the partially-resolved step + results are attached to the exception as ``partial_state`` so + the CLI can render dry-run previews for the steps that did + resolve.""" + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "exc-dry" + name: "Exception Dry" + version: "1.0.0" +steps: + - id: boom + type: shell + run: "echo should-not-run" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + + def _raise(_steps, _ctx, _state, _registry, *, step_offset=0): + raise RuntimeError("synthetic engine failure") + + with patch.object(engine, "_execute_steps", _raise): + with pytest.raises(RuntimeError) as excinfo: + engine.execute(definition, dry_run=True) + + assert getattr(excinfo.value, "partial_state", None) is not None + + +# -- CLI-level ------------------------------------------------------------ + + +_WF_GATED = """ +schema_version: "1.0" +workflow: + id: "dry-wf" + name: "Dry WF" + version: "1.0.0" +steps: + - id: ask + type: gate + message: "Review" + options: [approve, reject] + - id: after + type: shell + run: "echo done" +""" + + +class TestWorkflowRunDryRunFlag: + def test_emits_banner_and_previews(self, project_dir: Path) -> None: + import json as _json + + wf = _write_wf(project_dir, _WF_GATED, "dry") + result = _invoke(project_dir, ["workflow", "run", str(wf), "--dry-run"]) + + assert result.exit_code == 0 + # Banner is printed to stdout in default (non-JSON) mode. + assert "DRY RUN" in result.stdout + # Preview loop printed the dry-run message for the gated step. + assert "gate skipped" in result.stdout + assert "would choose" in result.stdout + # Status is reported as completed (gate short-circuits in dry-run). + assert "Status: completed" in result.stdout + # "Resume with:" hint only appears for paused runs. + assert "Resume with:" not in result.stdout + # No shell step ran (after step was a shell, never executed). + _ = _json # keep import in case future assertions need it + + def test_with_json_suppresses_banner_and_previews( + self, project_dir: Path + ) -> None: + import json as _json + + wf = _write_wf(project_dir, _WF_GATED, "dry-json") + result = _invoke( + project_dir, ["workflow", "run", str(wf), "--dry-run", "--json"] + ) + + assert result.exit_code == 0 + # Stdout is exactly a JSON object — no DRY-RUN banner or preview lines. + assert "DRY RUN" not in result.stdout + assert "[DRY RUN]" not in result.stdout + payload = _json.loads(result.stdout) + assert payload["status"] == "completed" + + def test_prints_previews_on_engine_exception(self, project_dir: Path) -> None: + from specify_cli.workflows.base import RunStatus + from specify_cli.workflows.engine import RunState + + wf = _write_wf(project_dir, _WF_GATED, "dry-fail") + + def _raise_with_partial(self, *_args, **_kwargs): + state = RunState(run_id="dr-fail-partial", workflow_id="dry-wf") + state.step_results["ask"] = { + "output": { + "dry_run": True, + "dry_run_message": "[DRY RUN] Gate: Review", + "message": "Review", + } + } + state.status = RunStatus.FAILED + err = RuntimeError("synthetic engine failure mid-dry-run") + err.partial_state = state # type: ignore[attr-defined] + raise err + + with patch( + "specify_cli.workflows.engine.WorkflowEngine.execute", + _raise_with_partial, + ): + result = _invoke( + project_dir, ["workflow", "run", str(wf), "--dry-run"] + ) + + # CLI exits non-zero on the engine exception. + assert result.exit_code != 0 + assert "Workflow failed" in result.stdout + # The previously-resolved dry-run previews are still printed. + assert "DRY RUN" in result.stdout + assert "Gate: Review" in result.stdout + + def test_no_previews_when_json_and_engine_fails(self, project_dir: Path) -> None: + import json as _json + + from specify_cli.workflows.base import RunStatus + from specify_cli.workflows.engine import RunState + + wf = _write_wf(project_dir, _WF_GATED, "dry-fail-json") + + def _raise_with_partial(self, *_args, **_kwargs): + state = RunState(run_id="dr-fail-json", workflow_id="dry-wf") + state.step_results["ask"] = { + "output": { + "dry_run": True, + "dry_run_message": "[DRY RUN] Gate: Review", + "message": "Review", + } + } + state.status = RunStatus.FAILED + err = RuntimeError("synthetic engine failure mid-dry-run") + err.partial_state = state # type: ignore[attr-defined] + raise err + + with patch( + "specify_cli.workflows.engine.WorkflowEngine.execute", + _raise_with_partial, + ): + result = _invoke( + project_dir, + ["workflow", "run", str(wf), "--dry-run", "--json"], + ) + + # Stdout stays a JSON object — no preview leak in failure path. + assert "DRY RUN" not in result.stdout + assert "[DRY RUN]" not in result.stdout + payload = _json.loads(result.stdout) + assert payload["status"] == "failed" \ No newline at end of file