diff --git a/.gitignore b/.gitignore index 39c0ef8..da58962 100644 --- a/.gitignore +++ b/.gitignore @@ -4,10 +4,11 @@ __pycache__/ .venv/ .agentflow/ reference/ +docs/superpowers/ node_modules/ playwright-report/ test-results/ .e2e-flaky dist/ *.egg-info/ -.worktrees/ \ No newline at end of file +.worktrees/ diff --git a/agentflow/cli.py b/agentflow/cli.py index 1cd8c5e..0aec09d 100644 --- a/agentflow/cli.py +++ b/agentflow/cli.py @@ -5,6 +5,7 @@ import json import subprocess import sys +import time from dataclasses import dataclass, replace from datetime import datetime try: @@ -16,6 +17,7 @@ class StrEnum(str, Enum): pass from pathlib import Path +import httpx import typer from jinja2 import TemplateError from pydantic import ValidationError @@ -60,7 +62,7 @@ class StrEnum(str, Enum): target_uses_login_bash, ) from agentflow.prepared import resolve_local_workdir -from agentflow.specs import AgentKind, LocalTarget, PipelineSpec, normalize_agent_name, provider_uses_kimi_anthropic_auth, resolve_provider +from agentflow.specs import AgentKind, LocalTarget, PipelineSpec, RunRecord, normalize_agent_name, provider_uses_kimi_anthropic_auth, resolve_provider from agentflow.tuned_agents import list_tuned_agent_records, resolve_tuned_agent_version, run_evolution_from_payload app = typer.Typer(add_completion=False) @@ -118,6 +120,114 @@ def _build_store(runs_dir: str) -> object: return RunStore(runs_dir) +def _daemon_metadata_path(runs_dir: str) -> Path: + override = os.getenv("AGENTFLOW_DAEMON_METADATA_PATH") + if override: + return Path(override).expanduser().resolve() + return (Path(runs_dir).expanduser().resolve() / "daemon.json") + + +def _resolve_daemon_host() -> str: + return os.getenv("AGENTFLOW_DAEMON_HOST", "127.0.0.1") + + +def _resolve_daemon_port() -> int: + raw = os.getenv("AGENTFLOW_DAEMON_PORT", "8000") + try: + return int(raw) + except ValueError as exc: + raise typer.BadParameter(f"`AGENTFLOW_DAEMON_PORT` must be an integer, got `{raw}`.") from exc + + +def _daemon_base_url(host: str, port: int) -> str: + return f"http://{host}:{port}" + + +def _load_daemon_metadata(metadata_path: Path) -> dict[str, object] | None: + try: + payload = json.loads(metadata_path.read_text(encoding="utf-8")) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return None + if not isinstance(payload, dict): + return None + return payload + + +def _write_daemon_metadata(metadata_path: Path, *, host: str, port: int, pid: int) -> None: + metadata_path.parent.mkdir(parents=True, exist_ok=True) + payload = {"host": host, "port": port, "pid": pid} + metadata_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + +def _daemon_is_healthy(base_url: str) -> bool: + try: + response = httpx.get(f"{base_url}/api/runs", timeout=0.5) + except httpx.RequestError: + return False + return response.status_code == 200 + + +def _start_daemon(*, host: str, port: int, runs_dir: str, max_concurrent_runs: int) -> subprocess.Popen: + command = [sys.executable, "-m", "agentflow.cli", "serve", "--host", host, "--port", str(port)] + env = dict(os.environ) + env["AGENTFLOW_RUNS_DIR"] = runs_dir + env["AGENTFLOW_MAX_CONCURRENT_RUNS"] = str(max_concurrent_runs) + return subprocess.Popen( + command, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=env, + start_new_session=True, + ) + + +def _wait_for_daemon(base_url: str, *, timeout_seconds: float = 5.0) -> None: + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + if _daemon_is_healthy(base_url): + return + time.sleep(0.1) + raise typer.Exit(code=1) + + +def _ensure_daemon( + runs_dir: str, + max_concurrent_runs: int, + *, + host: str, + port: int, + metadata_path: Path, +) -> str: + metadata = _load_daemon_metadata(metadata_path) + metadata_host = metadata.get("host") if isinstance(metadata, dict) else None + metadata_port = metadata.get("port") if isinstance(metadata, dict) else None + if isinstance(metadata_host, str) and isinstance(metadata_port, int) and (metadata_host, metadata_port) == (host, port): + base_url = _daemon_base_url(metadata_host, metadata_port) + if _daemon_is_healthy(base_url): + return base_url + + base_url = _daemon_base_url(host, port) + process = _start_daemon(host=host, port=port, runs_dir=runs_dir, max_concurrent_runs=max_concurrent_runs) + _write_daemon_metadata(metadata_path, host=host, port=port, pid=process.pid) + _wait_for_daemon(base_url) + return base_url + + +def _submit_detached_run(pipeline: object, base_url: str) -> RunRecord: + payload: dict[str, object] = {"pipeline": pipeline.model_dump(mode="json")} + base_dir = getattr(pipeline, "base_dir", None) + if isinstance(base_dir, str) and base_dir: + payload["base_dir"] = base_dir + response = httpx.post(f"{base_url}/api/runs", json=payload, timeout=10.0) + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + detail = exc.response.text.strip() + typer.echo(f"Failed to submit run: {detail}", err=True) + raise typer.Exit(code=1) from exc + return RunRecord.model_validate(response.json()) + + def _render_tuned_agents_summary(records: list[object]) -> str: if not records: return "No tuned agents found." @@ -406,6 +516,315 @@ def _build_run_summary(record: object, run_dir: Path | str | None = None) -> dic return summary +_STATUS_INACTIVE_NODE_STATUSES = {"pending", "queued", "ready"} +_STATUS_ACTIVE_NODE_STATUSES = {"running", "retrying", "cancelling"} +_EVOLUTION_PROGRESS_KEYS = {"agentflow_event", "stage", "attempt", "status", "command", "detail", "node_id"} +_EVOLUTION_PROGRESS_PREVIEW_LIMIT = 5 + + +def _normalize_event_payload(event: object) -> dict[str, object]: + if isinstance(event, dict): + payload = dict(event) + else: + model_dump = getattr(event, "model_dump", None) + if callable(model_dump): + payload = model_dump(mode="json") + else: + payload = { + "timestamp": getattr(event, "timestamp", None), + "run_id": getattr(event, "run_id", None), + "type": getattr(event, "type", None), + "node_id": getattr(event, "node_id", None), + "data": getattr(event, "data", None), + } + if not isinstance(payload, dict): + return {} + data = payload.get("data") + if not isinstance(data, dict): + payload["data"] = {} + return payload + + +def _event_data_summary(data: object) -> str | None: + if not isinstance(data, dict): + return None + pieces: list[str] = [] + if data.get("status") is not None: + pieces.append(f"status={data['status']}") + if data.get("attempt") is not None: + pieces.append(f"attempt={data['attempt']}") + if data.get("round_number") is not None: + pieces.append(f"round={data['round_number']}") + if data.get("total_rounds") is not None: + pieces.append(f"of={data['total_rounds']}") + if data.get("child_run_id") is not None: + pieces.append(f"child={data['child_run_id']}") + if data.get("reason") is not None: + pieces.append(f"reason={data['reason']}") + if data.get("error") is not None: + pieces.append(f"error={data['error']}") + return " ".join(pieces) if pieces else None + + +def _render_status_event(event_payload: dict[str, object]) -> str: + timestamp = event_payload.get("timestamp") + event_type = event_payload.get("type") + node_id = event_payload.get("node_id") + parts: list[str] = [] + if timestamp: + parts.append(str(timestamp)) + if event_type: + parts.append(str(event_type)) + if node_id: + parts.append(f"node={node_id}") + detail = _event_data_summary(event_payload.get("data")) + if detail: + parts.append(detail) + return " ".join(parts) + + +def _parse_evolution_progress_line(line: str) -> dict[str, object] | None: + try: + payload = json.loads(line) + except (TypeError, json.JSONDecodeError): + return None + if not isinstance(payload, dict): + return None + if payload.get("agentflow_event") != "evolution_progress": + return None + stage = payload.get("stage") + attempt = payload.get("attempt") + if not stage or attempt is None: + return None + return {key: payload[key] for key in _EVOLUTION_PROGRESS_KEYS if key in payload} + + +def _build_status_evolution_progress(record: object, events: list[object]) -> list[dict[str, object]]: + nodes: dict[str, object] = getattr(record, "nodes", {}) or {} + parsed_events: list[dict[str, object]] = [] + for node_id, node in nodes.items(): + for line in getattr(node, "stderr_lines", []) or []: + if not isinstance(line, str): + continue + event = _parse_evolution_progress_line(line) + if event: + event["node_id"] = node_id + parsed_events.append(event) + + for event in events: + payload = _normalize_event_payload(event) + if payload.get("type") != "node_trace": + continue + node_id = payload.get("node_id") + if not isinstance(node_id, str) or not node_id: + continue + trace = payload.get("data", {}).get("trace") + if not isinstance(trace, dict): + continue + if trace.get("source") != "stderr": + continue + content = trace.get("content") + if not isinstance(content, str): + continue + parsed = _parse_evolution_progress_line(content) + if parsed is None: + continue + parsed["node_id"] = node_id + parsed_events.append(parsed) + + deduped: list[dict[str, object]] = [] + seen: set[str] = set() + for event in parsed_events: + key = json.dumps(event, sort_keys=True, ensure_ascii=False) + if key in seen: + continue + seen.add(key) + deduped.append(event) + return deduped + + +def _render_evolution_progress(event: dict[str, object]) -> str: + node_id = event.get("node_id") or "-" + stage = event.get("stage") or "-" + status = event.get("status") + label = f"{stage} {status}" if status else str(stage) + pieces: list[str] = [] + attempt = event.get("attempt") + if attempt is not None: + pieces.append(f"attempt {attempt}") + command = event.get("command") + if command: + pieces.append(f"command={command}") + detail = event.get("detail") + if detail: + pieces.append(f"detail={detail}") + if not pieces: + return f"{node_id}: {label}" + return f"{node_id}: {label} ({', '.join(pieces)})" + + +def _build_status_progress(record: object) -> dict[str, object]: + nodes: dict[str, object] = getattr(record, "nodes", {}) or {} + pipeline_nodes = _pipeline_node_map(record) + node_ids = list(pipeline_nodes) if pipeline_nodes else list(nodes) + total_nodes = len(node_ids) + status_counts: dict[str, int] = {} + progressed_nodes = 0 + active_nodes: list[dict[str, object]] = [] + + for node_id in node_ids: + node = nodes.get(node_id) + status = _status_value(getattr(node, "status", "pending")).lower() + status_counts[status] = status_counts.get(status, 0) + 1 + if status not in _STATUS_INACTIVE_NODE_STATUSES: + progressed_nodes += 1 + if status in _STATUS_ACTIVE_NODE_STATUSES: + entry: dict[str, object] = {"id": node_id, "status": status} + attempt = _node_attempt_count(node) if node is not None else 0 + if attempt: + entry["attempt"] = attempt + active_nodes.append(entry) + + progress_percent = 0.0 + if total_nodes: + progress_percent = max(0.0, min(100.0, (progressed_nodes / total_nodes) * 100)) + + return { + "total_nodes": total_nodes, + "progressed_nodes": progressed_nodes, + "active_nodes": active_nodes, + "status_counts": status_counts, + "progress_percent": progress_percent, + } + + +def _build_status_optimization(record: object) -> dict[str, object] | None: + payload: dict[str, object] = {} + parent_run_id = getattr(record, "optimization_parent_run_id", None) + if parent_run_id: + payload["parent_run_id"] = parent_run_id + round_number = getattr(record, "optimization_round", None) + if round_number: + payload["round"] = round_number + session = getattr(record, "optimization_session", None) + if isinstance(session, dict) and session: + payload["session"] = session + return payload or None + + +def _build_status_summary( + record: object, + events: list[object], + *, + run_dir: Path | str | None = None, +) -> dict[str, object]: + summary = _build_run_summary(record, run_dir=run_dir) + normalized_events = [_normalize_event_payload(event) for event in events] + summary["events"] = normalized_events + summary["recent_events"] = normalized_events[-5:] + summary["progress"] = _build_status_progress(record) + summary["evolution_progress"] = _build_status_evolution_progress(record, events) + optimization = _build_status_optimization(record) + if optimization is not None: + summary["optimization"] = optimization + return summary + + +def _render_status_optimization(optimization: dict[str, object]) -> str | None: + session = optimization.get("session") + pieces: list[str] = [] + if isinstance(session, dict): + kind = session.get("kind") + if kind: + pieces.append(str(kind)) + optimizer = session.get("optimizer") + if optimizer: + pieces.append(f"optimizer={optimizer}") + current_round = session.get("current_round") + total_rounds = session.get("total_rounds") + if current_round and total_rounds: + pieces.append(f"round {current_round}/{total_rounds}") + elif current_round: + pieces.append(f"round {current_round}") + child_run_ids = session.get("child_run_ids") + if isinstance(child_run_ids, list): + pieces.append(f"child_runs={len(child_run_ids)}") + if "round" in optimization and not any(piece.startswith("round ") for piece in pieces): + pieces.append(f"round {optimization['round']}") + if optimization.get("parent_run_id"): + pieces.append(f"parent={optimization['parent_run_id']}") + if not pieces: + return None + return f"Optimization: {' '.join(pieces)}" + + +def _render_status_summary( + record: object, + events: list[object], + *, + run_dir: Path | str | None = None, +) -> str: + summary = _build_status_summary(record, events, run_dir=run_dir) + lines = [f"Run {summary['id']}: {summary['status']}"] + pipeline = summary.get("pipeline") + if isinstance(pipeline, dict) and pipeline.get("name"): + lines.append(f"Pipeline: {pipeline['name']}") + duration = summary.get("duration") + if duration is not None: + lines.append(f"Duration: {duration}") + started_at = summary.get("started_at") + if started_at: + lines.append(f"Started: {started_at}") + run_dir_value = summary.get("run_dir") + if run_dir_value is not None: + lines.append(f"Run dir: {run_dir_value}") + optimization = summary.get("optimization") + if isinstance(optimization, dict): + rendered = _render_status_optimization(optimization) + if rendered: + lines.append(rendered) + + progress = summary.get("progress") + if isinstance(progress, dict): + total_nodes = progress.get("total_nodes", 0) + progressed_nodes = progress.get("progressed_nodes", 0) + active_nodes = progress.get("active_nodes", []) + if total_nodes: + lines.append(f"Progress: {progressed_nodes}/{total_nodes} nodes, active {len(active_nodes)}") + if isinstance(active_nodes, list) and active_nodes: + active_entries: list[str] = [] + for node in active_nodes: + node_id = node.get("id") + status = node.get("status") + if not node_id or not status: + continue + rendered = f"{node_id} ({status}" + attempt = node.get("attempt") + if attempt: + rendered += f", attempt {attempt}" + rendered += ")" + active_entries.append(rendered) + if active_entries: + lines.append(f"Active: {', '.join(active_entries)}") + + evolution_progress = summary.get("evolution_progress") + if isinstance(evolution_progress, list) and evolution_progress: + lines.append("Evolution progress:") + for event in evolution_progress[-_EVOLUTION_PROGRESS_PREVIEW_LIMIT:]: + if not isinstance(event, dict): + continue + lines.append(f"- {_render_evolution_progress(event)}") + + recent_events = summary.get("recent_events") + if isinstance(recent_events, list) and recent_events: + lines.append("Recent events:") + for event_payload in recent_events: + if not isinstance(event_payload, dict): + continue + lines.append(f"- {_render_status_event(event_payload)}") + return "\n".join(lines) + + def _render_run_summary(record: object, run_dir: Path | str | None = None) -> str: summary = _build_run_summary(record, run_dir=run_dir) lines = [f"Run {summary['id']}: {summary['status']}"] @@ -473,6 +892,27 @@ def _echo_run_result(record: object, *, output: RunOutputFormat, run_dir: Path | typer.echo(json.dumps(record.model_dump(mode="json"), indent=2)) +def _echo_status_result( + record: object, + events: list[object], + *, + output: RunOutputFormat, + run_dir: Path | str | None = None, +) -> None: + resolved_output = _resolve_run_output(output) + if resolved_output == RunOutputFormat.SUMMARY: + typer.echo(_render_status_summary(record, events, run_dir=run_dir)) + return + if resolved_output == RunOutputFormat.JSON_SUMMARY: + typer.echo(json.dumps(_build_status_summary(record, events, run_dir=run_dir), indent=2)) + return + model_dump = getattr(record, "model_dump", None) + if callable(model_dump): + typer.echo(json.dumps(model_dump(mode="json"), indent=2)) + return + typer.echo(json.dumps(_build_run_summary(record, run_dir=run_dir), indent=2)) + + def _run_dir_for_record(store: object | None, run_id: str) -> Path | str | None: if store is None: return None @@ -2136,6 +2576,25 @@ def show( _echo_run_result(record, output=output, run_dir=_run_dir_for_record(store, run_id)) +@app.command() +def status( + run_id: str, + runs_dir: str = typer.Option(".agentflow/runs", envvar="AGENTFLOW_RUNS_DIR"), + output: RunOutputFormat = typer.Option( + RunOutputFormat.AUTO, + "--output", + help="Result output format. Defaults to `summary` on a terminal and `json` otherwise.", + ), +) -> None: + store = _build_store(runs_dir) + record = _get_run_or_exit(store, run_id, runs_dir=runs_dir) + events = [] + get_events = getattr(store, "get_events", None) + if callable(get_events): + events = get_events(run_id) + _echo_status_result(record, events, output=output, run_dir=_run_dir_for_record(store, run_id)) + + @app.command() def cancel( run_id: str, @@ -2318,6 +2777,12 @@ def run( path: str, runs_dir: str = typer.Option(".agentflow/runs", envvar="AGENTFLOW_RUNS_DIR"), max_concurrent_runs: int = typer.Option(2, envvar="AGENTFLOW_MAX_CONCURRENT_RUNS"), + detach: bool = typer.Option( + False, + "--detach", + "-d", + help="Submit the run to the local daemon and exit without waiting for completion.", + ), output: RunOutputFormat = typer.Option( RunOutputFormat.AUTO, "--output", @@ -2341,6 +2806,20 @@ def run( output, show_preflight=show_preflight, ) + if detach: + host = _resolve_daemon_host() + port = _resolve_daemon_port() + metadata_path = _daemon_metadata_path(runs_dir) + base_url = _ensure_daemon( + runs_dir, + max_concurrent_runs, + host=host, + port=port, + metadata_path=metadata_path, + ) + record = _submit_detached_run(pipeline, base_url) + _echo_run_result(record, output=output) + raise typer.Exit(code=0) _run_pipeline(pipeline, runs_dir, max_concurrent_runs, output) diff --git a/agentflow/dsl.py b/agentflow/dsl.py index 7014faf..f338ef6 100644 --- a/agentflow/dsl.py +++ b/agentflow/dsl.py @@ -6,6 +6,7 @@ from contextvars import ContextVar, Token from dataclasses import dataclass, field import json +from pathlib import Path from types import TracebackType from typing import Any @@ -430,11 +431,16 @@ def evolve( "workspace_dir": "{{ pipeline.working_dir }}", } payload_json = json.dumps(payload, ensure_ascii=False) + source_root = Path(__file__).resolve().parents[1] code = ( "import json\n" + "import sys\n" + f"sys.path.insert(0, {json.dumps(str(source_root), ensure_ascii=False)})\n" "from agentflow.tuned_agents import run_evolution_from_payload\n\n" + "def _evolution_progress(event):\n" + " print(json.dumps(event, ensure_ascii=False), file=sys.stderr, flush=True)\n\n" f"payload = json.loads(r'''{payload_json}''')\n" - "result = run_evolution_from_payload(payload)\n" + "result = run_evolution_from_payload(payload, progress=_evolution_progress)\n" "print(json.dumps(result, ensure_ascii=False))\n" ) evolve_task_id = task_id or f"evolve_{profile.replace('-', '_')}" diff --git a/agentflow/graph_optimizer.py b/agentflow/graph_optimizer.py index e2dd31a..f7c90d6 100644 --- a/agentflow/graph_optimizer.py +++ b/agentflow/graph_optimizer.py @@ -2,10 +2,13 @@ import json import shutil +import subprocess +import sys from pathlib import Path from pprint import pformat from typing import Any +from agentflow.loader import load_pipeline_from_data from agentflow.specs import PipelineSpec, RunRecord, normalize_agent_name from agentflow.store import RunStore from agentflow.utils import ensure_dir, json_dumps @@ -204,3 +207,33 @@ def write_validation_result(path: Path, *, ok: bool, error: str | None = None) - if error is not None: payload["error"] = error path.write_text(json_dumps(payload), encoding="utf-8") + + +def load_child_pipeline_from_path(path: Path) -> PipelineSpec: + """Load optimizer-edited pipeline as a child-run shape (`optimizer=None`, `n_run=1`).""" + + path = Path(path) + if path.suffix == ".py": + result = subprocess.run( + [sys.executable, str(path)], + capture_output=True, + text=True, + cwd=str(path.parent), + ) + if result.returncode != 0: + raise ValueError(f"pipeline script `{path}` failed:\n{result.stderr.strip()}") + raw_text = result.stdout + else: + raw_text = path.read_text(encoding="utf-8") + + try: + parsed = json.loads(raw_text) + except json.JSONDecodeError as exc: + raise ValueError(f"optimized pipeline `{path}` did not produce JSON: {exc}") from exc + + if not isinstance(parsed, dict): + raise ValueError(f"optimized pipeline `{path}` did not produce an object payload") + + parsed["optimizer"] = None + parsed["n_run"] = 1 + return load_pipeline_from_data(parsed, base_dir=path.parent) diff --git a/agentflow/orchestrator.py b/agentflow/orchestrator.py index 6028de8..1e7b321 100644 --- a/agentflow/orchestrator.py +++ b/agentflow/orchestrator.py @@ -31,12 +31,12 @@ OPTIMIZER_VALIDATION_FILENAME, build_graph_report, copy_run_traces, + load_child_pipeline_from_path, render_graph_optimizer_prompt, write_editable_pipeline_python, write_optimizer_result, write_validation_result, ) -from agentflow.loader import load_pipeline_from_path from agentflow.prepared import ExecutionPaths, PreparedExecution, build_execution_paths from agentflow.runners.registry import RunnerRegistry, default_runner_registry from agentflow.specs import ( @@ -441,7 +441,7 @@ def _optimizer_failure_summary( write_validation_result(round_dir / OPTIMIZER_VALIDATION_FILENAME, ok=False, error=failure_summary) else: try: - loaded_pipeline = load_pipeline_from_path(pipeline_path) + loaded_pipeline = load_child_pipeline_from_path(pipeline_path) except Exception as exc: failure_summary = _optimizer_failure_summary( "Optimized pipeline", diff --git a/agentflow/tuned_agents.py b/agentflow/tuned_agents.py index c6f88a3..b59ddbd 100644 --- a/agentflow/tuned_agents.py +++ b/agentflow/tuned_agents.py @@ -6,7 +6,7 @@ import subprocess from dataclasses import dataclass from pathlib import Path -from typing import Any, Literal +from typing import Any, Callable, Literal from uuid import uuid4 import yaml @@ -600,7 +600,10 @@ def _write_failure_metadata( _write_json(version_dir / "version.json", failed_version.model_dump(mode="json")) -def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: +def run_evolution_from_payload( + payload: dict[str, Any], + progress: Callable[[dict[str, object]], None] | None = None, +) -> dict[str, Any]: request = EvolutionRequest.model_validate(payload) workspace = Path(request.workspace_dir or os.getcwd()).expanduser().resolve() resolved_config = load_tuner_config(workspace, request.profile) @@ -618,6 +621,29 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: if not request.source_nodes: raise ValueError("evolution requires at least one source node") + def _emit_progress( + stage: str, + *, + attempt: int, + status: str | None = None, + command: str | None = None, + detail: str | None = None, + ) -> None: + if progress is None: + return + payload: dict[str, object] = { + "agentflow_event": "evolution_progress", + "stage": stage, + "attempt": attempt, + } + if status is not None: + payload["status"] = status + if command is not None: + payload["command"] = command + if detail is not None: + payload["detail"] = detail + progress(payload) + version_id = uuid4().hex[:12] version_dir = tuned_agent_version_dir(workspace, resolved_config.agent_name, version_id) traces_dir = version_dir / "traces" @@ -639,8 +665,11 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: resolved_executable = _resolved_executable_path(resolved_config.config, repo_workdir) failure_summary: str | None = None + _emit_progress("start", attempt=1) + for attempt_number in range(1, resolved_config.config.max_attempts + 1): attempt_dir = ensure_dir(attempt_root / f"attempt-{attempt_number}") + _emit_progress("attempt", attempt=attempt_number, status="started") prompt = _optimizer_prompt( resolved_config, repo_dir=repo_workdir, @@ -650,6 +679,7 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: ) _write_text(attempt_dir / "optimizer-prompt.txt", prompt) + _emit_progress("optimizer", attempt=attempt_number, status="started", command="optimizer") optimizer_result = _run_optimizer( optimizer_kind, prompt=prompt, @@ -660,8 +690,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "optimizer", optimizer_result) if optimizer_result.exit_code != 0: failure_summary = _attempt_summary("Optimizer", optimizer_result) + _emit_progress("optimizer", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("optimizer", attempt=attempt_number, status="completed") + _emit_progress( + "build", + attempt=attempt_number, + status="started", + command=resolved_config.config.build_command, + ) build_result = _run_shell_command( resolved_config.config.build_command, repo_dir=repo_workdir, @@ -673,8 +711,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "build", build_result) if build_result.exit_code != 0: failure_summary = _attempt_summary("Build", build_result) + _emit_progress("build", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("build", attempt=attempt_number, status="completed") + _emit_progress( + "test", + attempt=attempt_number, + status="started", + command=resolved_config.config.test_command, + ) test_result = _run_shell_command( resolved_config.config.test_command, repo_dir=repo_workdir, @@ -686,8 +732,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "test", test_result) if test_result.exit_code != 0: failure_summary = _attempt_summary("Test", test_result) + _emit_progress("test", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("test", attempt=attempt_number, status="completed") + _emit_progress( + "smoke", + attempt=attempt_number, + status="started", + command=resolved_config.config.smoke_command, + ) smoke_result = _run_shell_command( resolved_config.config.smoke_command, repo_dir=repo_workdir, @@ -699,14 +753,20 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "smoke", smoke_result) if smoke_result.exit_code != 0: failure_summary = _attempt_summary("Smoke", smoke_result) + _emit_progress("smoke", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("smoke", attempt=attempt_number, status="completed") executable_path = Path(resolved_executable) if not executable_path.exists(): - raise FileNotFoundError( + detail = ( f"successful evolution did not produce executable `{executable_path}`; " "set `executable_path` in the tuner config or make the build produce the default path" ) + _emit_progress("final", attempt=attempt_number, status="failed", detail=detail) + raise FileNotFoundError( + detail + ) version = TunedAgentVersion( id=version_id, @@ -722,6 +782,7 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: summary=_parse_agent_output(optimizer_kind, f"optimizer_{version_id}", optimizer_result.stdout), ) register_tuned_agent_version(workspace, version) + _emit_progress("final", attempt=attempt_number, status="success") return { "ok": True, "agent_name": version.agent_name, @@ -733,6 +794,12 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: "traces": copied_traces, } + _emit_progress( + "final", + attempt=resolved_config.config.max_attempts, + status="failed", + detail=failure_summary or "evolution failed without diagnostics", + ) _write_failure_metadata( version_dir, agent_name=resolved_config.agent_name, diff --git a/scripts/verify_async_codex.sh b/scripts/verify_async_codex.sh new file mode 100755 index 0000000..80dd7f7 --- /dev/null +++ b/scripts/verify_async_codex.sh @@ -0,0 +1,301 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +verify_async_codex.sh + +Smoke-like validation for the async AgentFlow mainline using Codex only. + +What it verifies: +- Detached submission: `agentflow run ... -d` +- Store-backed process view: `agentflow status ` +- PR11 process visibility: evolution progress rendered in status +- PR12 process visibility: optimization session / round events rendered in status + +Required environment: +- OPENAI_API_KEY +- Optional if you use a custom gateway: + - OPENAI_BASE_URL + - AGENTFLOW_OPENAI_BASE_URL + +Usage: + bash scripts/verify_async_codex.sh + bash scripts/verify_async_codex.sh /tmp/agentflow-async-verify + +Outputs: +- Writes all generated pipelines, run ids, summaries, and json payloads under the chosen workdir. +- Prints the key artifact paths you should use for screenshots in the PR description. +EOF +} + +if [[ "${1:-}" == "--help" || "${1:-}" == "-h" ]]; then + usage + exit 0 +fi + +if [[ -z "${OPENAI_API_KEY:-}" ]]; then + echo "OPENAI_API_KEY is required." >&2 + exit 1 +fi + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +WORKDIR="${1:-${REPO_ROOT}/.tmp/verify_async_codex}" +ARTIFACT_DIR="${WORKDIR}/artifacts" +PR11_WORKSPACE="${WORKDIR}/pr11_workspace" +PR11_SEED_REPO="${WORKDIR}/pr11_seed_repo" +PR12_WORKSPACE="${WORKDIR}/pr12_workspace" + +if [[ -x "${REPO_ROOT}/.venv/bin/python" ]]; then + PYTHON_BIN="${REPO_ROOT}/.venv/bin/python" +else + PYTHON_BIN="$(command -v python3)" +fi + +choose_port() { + "${PYTHON_BIN}" - <<'PY' +import socket +s = socket.socket() +s.bind(("127.0.0.1", 0)) +print(s.getsockname()[1]) +s.close() +PY +} + +AF() { + "${PYTHON_BIN}" -m agentflow.cli "$@" +} + +json_field() { + local json_path="$1" + local expr="$2" + "${PYTHON_BIN}" - "$json_path" "$expr" <<'PY' +import json +import sys +from pathlib import Path + +json_path = Path(sys.argv[1]) +expr = sys.argv[2] +data = json.loads(json_path.read_text(encoding="utf-8")) +value = eval(expr, {"__builtins__": {}}, {"data": data}) +if isinstance(value, (dict, list)): + print(json.dumps(value, ensure_ascii=False)) +else: + print(value) +PY +} + +wait_for_condition() { + local run_id="$1" + local output_json="$2" + local python_expr="$3" + local timeout_seconds="$4" + local start_ts + start_ts="$(date +%s)" + + while true; do + AF status "$run_id" --output json-summary > "$output_json" + if "${PYTHON_BIN}" - "$output_json" "$python_expr" <<'PY' +import json +import sys +from pathlib import Path + +payload = json.loads(Path(sys.argv[1]).read_text(encoding="utf-8")) +expr = sys.argv[2] +safe_globals = { + "__builtins__": {}, + "len": len, + "any": any, + "all": all, + "bool": bool, +} +ok = bool(eval(expr, safe_globals, {"data": payload})) +raise SystemExit(0 if ok else 1) +PY + then + return 0 + fi + + if (( "$(date +%s)" - start_ts >= timeout_seconds )); then + echo "Timed out waiting for condition on run ${run_id}: ${python_expr}" >&2 + return 1 + fi + sleep 2 + done +} + +wait_for_terminal() { + local run_id="$1" + local output_json="$2" + local timeout_seconds="$3" + wait_for_condition \ + "$run_id" \ + "$output_json" \ + 'data["status"] in {"completed", "failed", "cancelled"}' \ + "$timeout_seconds" +} + +capture_latest_status() { + local run_id="$1" + local json_path="$2" + local summary_path="$3" + AF status "$run_id" --output json-summary > "$json_path" + AF status "$run_id" --output summary > "$summary_path" +} + +best_effort_terminal_snapshot() { + local run_id="$1" + local json_path="$2" + local summary_path="$3" + local timeout_seconds="$4" + + if wait_for_terminal "$run_id" "$json_path" "$timeout_seconds"; then + AF status "$run_id" --output summary > "$summary_path" + return 0 + fi + + capture_latest_status "$run_id" "$json_path" "$summary_path" + return 0 +} + +rm -rf "$ARTIFACT_DIR" "$PR11_WORKSPACE" "$PR11_SEED_REPO" "$PR12_WORKSPACE" "${WORKDIR}/runs" +mkdir -p "$ARTIFACT_DIR" "$PR11_WORKSPACE" "$PR12_WORKSPACE" + +export AGENTFLOW_RUNS_DIR="${WORKDIR}/runs" +export AGENTFLOW_DAEMON_HOST="${AGENTFLOW_DAEMON_HOST:-127.0.0.1}" +export AGENTFLOW_DAEMON_PORT="${AGENTFLOW_DAEMON_PORT:-$(choose_port)}" +export AGENTFLOW_DAEMON_METADATA_PATH="${AGENTFLOW_RUNS_DIR}/daemon.json" +VERIFY_LATEST_TIMEOUT_SECONDS="${VERIFY_LATEST_TIMEOUT_SECONDS:-15}" + +echo "[verify] repo root: ${REPO_ROOT}" +echo "[verify] python: ${PYTHON_BIN}" +echo "[verify] runs dir: ${AGENTFLOW_RUNS_DIR}" +echo "[verify] daemon: ${AGENTFLOW_DAEMON_HOST}:${AGENTFLOW_DAEMON_PORT}" + +echo "[verify] preparing PR11 local seed repo" +rm -rf "$PR11_SEED_REPO" +mkdir -p "$PR11_SEED_REPO" +git -C "$PR11_SEED_REPO" init -b main >/dev/null +cat > "${PR11_SEED_REPO}/README.md" <<'EOF' +# local-codex-smoke +EOF +git -C "$PR11_SEED_REPO" add README.md +git -C "$PR11_SEED_REPO" commit -m "init local smoke repo" >/dev/null + +mkdir -p "${PR11_WORKSPACE}/agent_tuner" +cat > "${PR11_WORKSPACE}/agent_tuner/local_codex_smoke.yaml" < .venv/bin/codex && chmod +x .venv/bin/codex +test_command: test -f README.md +smoke_command: "{executable} >/dev/null" +executable_path: .venv/bin/codex +max_attempts: 2 +evolution_prompt: | + Make the smallest coherent change you can based on the copied traces. + Keep the repository valid and preserve a working local smoke flow. +EOF + +cat > "${PR11_WORKSPACE}/pr11_evolve_demo.py" < "${PR12_WORKSPACE}/pr12_optimize_demo.py" <> summarize + +print(g.to_json()) +EOF + +echo "[verify] submitting PR11 evolve demo via detached run" +AF run "${PR11_WORKSPACE}/pr11_evolve_demo.py" -d --output json > "${ARTIFACT_DIR}/pr11.run.json" +PR11_RUN_ID="$(json_field "${ARTIFACT_DIR}/pr11.run.json" 'data["id"]')" +echo "[verify] PR11 run id: ${PR11_RUN_ID}" + +wait_for_condition \ + "$PR11_RUN_ID" \ + "${ARTIFACT_DIR}/pr11.status.process.json" \ + 'len(data.get("evolution_progress", [])) >= 3 and any(event.get("stage") == "optimizer" for event in data.get("evolution_progress", []))' \ + 900 +AF status "$PR11_RUN_ID" --output summary > "${ARTIFACT_DIR}/pr11.status.process.summary.txt" + +best_effort_terminal_snapshot \ + "$PR11_RUN_ID" \ + "${ARTIFACT_DIR}/pr11.status.latest.json" \ + "${ARTIFACT_DIR}/pr11.status.latest.summary.txt" \ + "$VERIFY_LATEST_TIMEOUT_SECONDS" +PR11_LATEST_STATUS="$(json_field "${ARTIFACT_DIR}/pr11.status.latest.json" 'data["status"]')" + +echo "[verify] submitting PR12 optimization demo via detached run" +AF run "${PR12_WORKSPACE}/pr12_optimize_demo.py" -d --output json > "${ARTIFACT_DIR}/pr12.run.json" +PR12_RUN_ID="$(json_field "${ARTIFACT_DIR}/pr12.run.json" 'data["id"]')" +echo "[verify] PR12 run id: ${PR12_RUN_ID}" + +wait_for_condition \ + "$PR12_RUN_ID" \ + "${ARTIFACT_DIR}/pr12.status.process.json" \ + 'bool(data.get("optimization")) and any(event.get("type") == "optimization_optimizer_started" for event in data.get("events", []))' \ + 900 +AF status "$PR12_RUN_ID" --output summary > "${ARTIFACT_DIR}/pr12.status.process.summary.txt" + +best_effort_terminal_snapshot \ + "$PR12_RUN_ID" \ + "${ARTIFACT_DIR}/pr12.status.latest.json" \ + "${ARTIFACT_DIR}/pr12.status.latest.summary.txt" \ + "$VERIFY_LATEST_TIMEOUT_SECONDS" +PR12_LATEST_STATUS="$(json_field "${ARTIFACT_DIR}/pr12.status.latest.json" 'data["status"]')" + +cat > "${ARTIFACT_DIR}/verification_report.txt" < None: + repo_dir.mkdir(parents=True) + (repo_dir / "README.md").write_text("base", encoding="utf-8") + + attempt_state = {"smoke": 0} + + def fake_optimizer(_optimizer: AgentKind, *, prompt: str, repo_dir: Path, runtime_dir: Path, env: dict[str, str]): + runtime_dir.mkdir(parents=True, exist_ok=True) + (repo_dir / "README.md").write_text(prompt, encoding="utf-8") + return CommandExecution( + command="optimizer", + exit_code=0, + stdout='{"type":"response.output_item.done","item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"updated"}]}}', + stderr="", + ) + + def fake_shell(command_template: str, *, repo_dir: Path, version_dir: Path, traces_dir: Path, executable: str, env: dict[str, str]): + if command_template == "build": + executable_path = Path(executable) + executable_path.parent.mkdir(parents=True, exist_ok=True) + executable_path.write_text("#!/bin/sh\nexit 0\n", encoding="utf-8") + if command_template == "smoke": + attempt_state["smoke"] += 1 + if attempt_state["smoke"] == 1: + return CommandExecution(command="smoke", exit_code=1, stdout="", stderr="ping failed") + return CommandExecution(command=command_template, exit_code=0, stdout="ok", stderr="") + + monkeypatch.setattr("agentflow.tuned_agents._clone_repo", fake_clone) + monkeypatch.setattr("agentflow.tuned_agents._run_optimizer", fake_optimizer) + monkeypatch.setattr("agentflow.tuned_agents._run_shell_command", fake_shell) + + progress: list[dict[str, object]] = [] + + def capture(event: dict[str, object]) -> None: + progress.append(event) + + result = run_evolution_from_payload( + { + "profile": "codex", + "target": "codex", + "optimizer": "codex", + "source_nodes": ["plan"], + "trace_paths": {"plan": str(trace_path)}, + "workspace_dir": str(workspace), + "run_id": "run-progress", + }, + progress=capture, + ) + + assert result["ok"] is True + assert any(event.get("agentflow_event") == "evolution_progress" for event in progress) + stages = [(event.get("stage"), event.get("status"), event.get("attempt")) for event in progress] + assert stages[0][0] == "start" + assert ("attempt", "started", 1) in stages + assert ("optimizer", "started", 1) in stages + assert ("optimizer", "completed", 1) in stages + assert ("build", "started", 1) in stages + assert ("build", "completed", 1) in stages + assert ("test", "started", 1) in stages + assert ("test", "completed", 1) in stages + assert ("smoke", "started", 1) in stages + assert ("smoke", "failed", 1) in stages + assert ("attempt", "started", 2) in stages + assert ("final", "success", 2) in stages + smoke_failure = next(event for event in progress if event.get("stage") == "smoke" and event.get("status") == "failed") + assert "Smoke" in str(smoke_failure.get("detail", "")) + build_start = next(event for event in progress if event.get("stage") == "build" and event.get("status") == "started") + assert build_start.get("command") == "build" + + def test_optimizer_prompt_explicitly_allows_prompt_and_tool_edits(tmp_path): resolved = ResolvedTunerConfig( profile="codex", diff --git a/tests/test_validation_script.py b/tests/test_validation_script.py new file mode 100644 index 0000000..4ddb01f --- /dev/null +++ b/tests/test_validation_script.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import subprocess +from pathlib import Path + + +def test_verify_async_codex_script_help() -> None: + repo_root = Path(__file__).resolve().parents[1] + script_path = repo_root / "scripts" / "verify_async_codex.sh" + + assert script_path.exists() + + completed = subprocess.run( + ["bash", str(script_path), "--help"], + capture_output=True, + text=True, + check=False, + ) + + assert completed.returncode == 0 + assert "PR11" in completed.stdout + assert "PR12" in completed.stdout + assert "OPENAI_API_KEY" in completed.stdout