From a2ead3bcd589bdba4bb678c8fb398a4c33da008e Mon Sep 17 00:00:00 2001 From: Jacob Date: Mon, 13 Apr 2026 15:08:51 +0000 Subject: [PATCH 1/9] feat: add detached daemon-backed run submission --- agentflow/cli.py | 132 +++++++++++++++++++++++++++++++++++++++++++++- tests/test_cli.py | 90 +++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+), 1 deletion(-) diff --git a/agentflow/cli.py b/agentflow/cli.py index 1cd8c5e..ca4a142 100644 --- a/agentflow/cli.py +++ b/agentflow/cli.py @@ -5,6 +5,7 @@ import json import subprocess import sys +import time from dataclasses import dataclass, replace from datetime import datetime try: @@ -16,6 +17,7 @@ class StrEnum(str, Enum): pass from pathlib import Path +import httpx import typer from jinja2 import TemplateError from pydantic import ValidationError @@ -60,7 +62,7 @@ class StrEnum(str, Enum): target_uses_login_bash, ) from agentflow.prepared import resolve_local_workdir -from agentflow.specs import AgentKind, LocalTarget, PipelineSpec, normalize_agent_name, provider_uses_kimi_anthropic_auth, resolve_provider +from agentflow.specs import AgentKind, LocalTarget, PipelineSpec, RunRecord, normalize_agent_name, provider_uses_kimi_anthropic_auth, resolve_provider from agentflow.tuned_agents import list_tuned_agent_records, resolve_tuned_agent_version, run_evolution_from_payload app = typer.Typer(add_completion=False) @@ -118,6 +120,114 @@ def _build_store(runs_dir: str) -> object: return RunStore(runs_dir) +def _daemon_metadata_path(runs_dir: str) -> Path: + override = os.getenv("AGENTFLOW_DAEMON_METADATA_PATH") + if override: + return Path(override).expanduser().resolve() + return (Path(runs_dir).expanduser().resolve() / "daemon.json") + + +def _resolve_daemon_host() -> str: + return os.getenv("AGENTFLOW_DAEMON_HOST", "127.0.0.1") + + +def _resolve_daemon_port() -> int: + raw = os.getenv("AGENTFLOW_DAEMON_PORT", "8000") + try: + return int(raw) + except ValueError as exc: + raise typer.BadParameter(f"`AGENTFLOW_DAEMON_PORT` must be an integer, got `{raw}`.") from exc + + +def _daemon_base_url(host: str, port: int) -> str: + return f"http://{host}:{port}" + + +def _load_daemon_metadata(metadata_path: Path) -> dict[str, object] | None: + try: + payload = json.loads(metadata_path.read_text(encoding="utf-8")) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return None + if not isinstance(payload, dict): + return None + return payload + + +def _write_daemon_metadata(metadata_path: Path, *, host: str, port: int, pid: int) -> None: + metadata_path.parent.mkdir(parents=True, exist_ok=True) + payload = {"host": host, "port": port, "pid": pid} + metadata_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + +def _daemon_is_healthy(base_url: str) -> bool: + try: + response = httpx.get(f"{base_url}/api/runs", timeout=0.5) + except httpx.RequestError: + return False + return response.status_code == 200 + + +def _start_daemon(*, host: str, port: int, runs_dir: str, max_concurrent_runs: int) -> subprocess.Popen: + command = [sys.executable, "-m", "agentflow.cli", "serve", host, str(port)] + env = dict(os.environ) + env["AGENTFLOW_RUNS_DIR"] = runs_dir + env["AGENTFLOW_MAX_CONCURRENT_RUNS"] = str(max_concurrent_runs) + return subprocess.Popen( + command, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + env=env, + start_new_session=True, + ) + + +def _wait_for_daemon(base_url: str, *, timeout_seconds: float = 5.0) -> None: + deadline = time.monotonic() + timeout_seconds + while time.monotonic() < deadline: + if _daemon_is_healthy(base_url): + return + time.sleep(0.1) + raise typer.Exit(code=1) + + +def _ensure_daemon( + runs_dir: str, + max_concurrent_runs: int, + *, + host: str, + port: int, + metadata_path: Path, +) -> str: + metadata = _load_daemon_metadata(metadata_path) + metadata_host = metadata.get("host") if isinstance(metadata, dict) else None + metadata_port = metadata.get("port") if isinstance(metadata, dict) else None + if isinstance(metadata_host, str) and isinstance(metadata_port, int) and (metadata_host, metadata_port) == (host, port): + base_url = _daemon_base_url(metadata_host, metadata_port) + if _daemon_is_healthy(base_url): + return base_url + + base_url = _daemon_base_url(host, port) + process = _start_daemon(host=host, port=port, runs_dir=runs_dir, max_concurrent_runs=max_concurrent_runs) + _write_daemon_metadata(metadata_path, host=host, port=port, pid=process.pid) + _wait_for_daemon(base_url) + return base_url + + +def _submit_detached_run(pipeline: object, base_url: str) -> RunRecord: + payload: dict[str, object] = {"pipeline": pipeline.model_dump(mode="json")} + base_dir = getattr(pipeline, "base_dir", None) + if isinstance(base_dir, str) and base_dir: + payload["base_dir"] = base_dir + response = httpx.post(f"{base_url}/api/runs", json=payload, timeout=10.0) + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + detail = exc.response.text.strip() + typer.echo(f"Failed to submit run: {detail}", err=True) + raise typer.Exit(code=1) from exc + return RunRecord.model_validate(response.json()) + + def _render_tuned_agents_summary(records: list[object]) -> str: if not records: return "No tuned agents found." @@ -2318,6 +2428,12 @@ def run( path: str, runs_dir: str = typer.Option(".agentflow/runs", envvar="AGENTFLOW_RUNS_DIR"), max_concurrent_runs: int = typer.Option(2, envvar="AGENTFLOW_MAX_CONCURRENT_RUNS"), + detach: bool = typer.Option( + False, + "--detach", + "-d", + help="Submit the run to the local daemon and exit without waiting for completion.", + ), output: RunOutputFormat = typer.Option( RunOutputFormat.AUTO, "--output", @@ -2341,6 +2457,20 @@ def run( output, show_preflight=show_preflight, ) + if detach: + host = _resolve_daemon_host() + port = _resolve_daemon_port() + metadata_path = _daemon_metadata_path(runs_dir) + base_url = _ensure_daemon( + runs_dir, + max_concurrent_runs, + host=host, + port=port, + metadata_path=metadata_path, + ) + record = _submit_detached_run(pipeline, base_url) + _echo_run_result(record, output=output) + raise typer.Exit(code=0) _run_pipeline(pipeline, runs_dir, max_concurrent_runs, output) diff --git a/tests/test_cli.py b/tests/test_cli.py index eb1fb44..269c60b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2848,6 +2848,96 @@ def fake_build_runtime(runs_dir: str, max_concurrent_runs: int): assert captured["wait_timeout"] is None +def test_run_detach_submits_without_waiting(monkeypatch): + captured: dict[str, object] = {} + fake_pipeline = SimpleNamespace(model_dump=lambda mode="json": {"name": "detached", "nodes": []}) + + def fake_load_pipeline(*args, **kwargs): + captured["load_args"] = args + return fake_pipeline + + def fake_ensure_daemon(*args, **kwargs): + captured["ensure_args"] = args + captured["ensure_kwargs"] = kwargs + return "http://daemon.test" + + def fake_submit(pipeline: object, base_url: str): + captured["submitted_pipeline"] = pipeline + captured["submitted_base_url"] = base_url + return SimpleNamespace( + id="run-detached", + status=SimpleNamespace(value="queued"), + pipeline=SimpleNamespace(name="detached"), + nodes={}, + model_dump=lambda mode="json": {"id": "run-detached", "status": "queued"}, + ) + + monkeypatch.setattr(agentflow.cli, "_load_pipeline_with_optional_smoke_preflight", fake_load_pipeline) + monkeypatch.setattr( + agentflow.cli, + "_build_runtime", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("runtime should not build in detach mode")), + ) + monkeypatch.setattr(agentflow.cli, "_ensure_daemon", fake_ensure_daemon) + monkeypatch.setattr(agentflow.cli, "_submit_detached_run", fake_submit) + + result = runner.invoke(app, ["run", "pipeline.py", "-d", "--output", "summary"]) + + assert result.exit_code == 0 + assert "Run run-detached: queued" in result.stdout + assert "completed" not in result.stdout + assert captured["submitted_pipeline"] is fake_pipeline + assert captured["submitted_base_url"] == "http://daemon.test" + + +def test_run_detach_uses_daemon_env_overrides(monkeypatch, tmp_path): + captured: dict[str, object] = {} + fake_pipeline = SimpleNamespace(model_dump=lambda mode="json": {"name": "env-detach", "nodes": []}) + metadata_path = tmp_path / "daemon.json" + runs_dir = tmp_path / "runs" + + monkeypatch.setenv("AGENTFLOW_DAEMON_METADATA_PATH", str(metadata_path)) + monkeypatch.setenv("AGENTFLOW_DAEMON_HOST", "daemon-host") + monkeypatch.setenv("AGENTFLOW_DAEMON_PORT", "8123") + + monkeypatch.setattr(agentflow.cli, "_load_pipeline_with_optional_smoke_preflight", lambda *args, **kwargs: fake_pipeline) + monkeypatch.setattr( + agentflow.cli, + "_build_runtime", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("runtime should not build in detach mode")), + ) + + def fake_ensure_daemon(runs_dir_value: str, max_concurrent_runs: int, *, host: str, port: int, metadata_path: Path): + captured["runs_dir"] = runs_dir_value + captured["max_concurrent_runs"] = max_concurrent_runs + captured["host"] = host + captured["port"] = port + captured["metadata_path"] = metadata_path + return f"http://{host}:{port}" + + def fake_submit(pipeline: object, base_url: str): + captured["submitted_base_url"] = base_url + return SimpleNamespace( + id="run-detached-env", + status=SimpleNamespace(value="running"), + pipeline=SimpleNamespace(name="env-detach"), + nodes={}, + model_dump=lambda mode="json": {"id": "run-detached-env", "status": "running"}, + ) + + monkeypatch.setattr(agentflow.cli, "_ensure_daemon", fake_ensure_daemon) + monkeypatch.setattr(agentflow.cli, "_submit_detached_run", fake_submit) + + result = runner.invoke(app, ["run", "pipeline.py", "-d", "--runs-dir", str(runs_dir), "--output", "summary"]) + + assert result.exit_code == 0 + assert captured["runs_dir"] == str(runs_dir) + assert captured["host"] == "daemon-host" + assert captured["port"] == 8123 + assert captured["metadata_path"] == metadata_path + assert captured["submitted_base_url"] == "http://daemon-host:8123" + + def test_run_defaults_to_summary_on_tty(monkeypatch): class FakeOrchestrator: async def submit(self, pipeline: object): From ac0f7e4e50964c580e6d560bf961098305433a72 Mon Sep 17 00:00:00 2001 From: Jacob Date: Mon, 13 Apr 2026 15:22:08 +0000 Subject: [PATCH 2/9] feat: add run status process view --- agentflow/cli.py | 258 ++++++++++++++++++++++++++++++++++++++++++++++ tests/test_cli.py | 182 ++++++++++++++++++++++++++++++++ 2 files changed, 440 insertions(+) diff --git a/agentflow/cli.py b/agentflow/cli.py index ca4a142..748845f 100644 --- a/agentflow/cli.py +++ b/agentflow/cli.py @@ -516,6 +516,224 @@ def _build_run_summary(record: object, run_dir: Path | str | None = None) -> dic return summary +_STATUS_INACTIVE_NODE_STATUSES = {"pending", "queued", "ready"} +_STATUS_ACTIVE_NODE_STATUSES = {"running", "retrying", "cancelling"} + + +def _normalize_event_payload(event: object) -> dict[str, object]: + if isinstance(event, dict): + payload = dict(event) + else: + model_dump = getattr(event, "model_dump", None) + if callable(model_dump): + payload = model_dump(mode="json") + else: + payload = { + "timestamp": getattr(event, "timestamp", None), + "run_id": getattr(event, "run_id", None), + "type": getattr(event, "type", None), + "node_id": getattr(event, "node_id", None), + "data": getattr(event, "data", None), + } + if not isinstance(payload, dict): + return {} + data = payload.get("data") + if not isinstance(data, dict): + payload["data"] = {} + return payload + + +def _event_data_summary(data: object) -> str | None: + if not isinstance(data, dict): + return None + pieces: list[str] = [] + if data.get("status") is not None: + pieces.append(f"status={data['status']}") + if data.get("attempt") is not None: + pieces.append(f"attempt={data['attempt']}") + if data.get("round_number") is not None: + pieces.append(f"round={data['round_number']}") + if data.get("total_rounds") is not None: + pieces.append(f"of={data['total_rounds']}") + if data.get("child_run_id") is not None: + pieces.append(f"child={data['child_run_id']}") + if data.get("reason") is not None: + pieces.append(f"reason={data['reason']}") + if data.get("error") is not None: + pieces.append(f"error={data['error']}") + return " ".join(pieces) if pieces else None + + +def _render_status_event(event_payload: dict[str, object]) -> str: + timestamp = event_payload.get("timestamp") + event_type = event_payload.get("type") + node_id = event_payload.get("node_id") + parts: list[str] = [] + if timestamp: + parts.append(str(timestamp)) + if event_type: + parts.append(str(event_type)) + if node_id: + parts.append(f"node={node_id}") + detail = _event_data_summary(event_payload.get("data")) + if detail: + parts.append(detail) + return " ".join(parts) + + +def _build_status_progress(record: object) -> dict[str, object]: + nodes: dict[str, object] = getattr(record, "nodes", {}) or {} + pipeline_nodes = _pipeline_node_map(record) + node_ids = list(pipeline_nodes) if pipeline_nodes else list(nodes) + total_nodes = len(node_ids) + status_counts: dict[str, int] = {} + progressed_nodes = 0 + active_nodes: list[dict[str, object]] = [] + + for node_id in node_ids: + node = nodes.get(node_id) + status = _status_value(getattr(node, "status", "pending")).lower() + status_counts[status] = status_counts.get(status, 0) + 1 + if status not in _STATUS_INACTIVE_NODE_STATUSES: + progressed_nodes += 1 + if status in _STATUS_ACTIVE_NODE_STATUSES: + entry: dict[str, object] = {"id": node_id, "status": status} + attempt = _node_attempt_count(node) if node is not None else 0 + if attempt: + entry["attempt"] = attempt + active_nodes.append(entry) + + progress_percent = 0.0 + if total_nodes: + progress_percent = max(0.0, min(100.0, (progressed_nodes / total_nodes) * 100)) + + return { + "total_nodes": total_nodes, + "progressed_nodes": progressed_nodes, + "active_nodes": active_nodes, + "status_counts": status_counts, + "progress_percent": progress_percent, + } + + +def _build_status_optimization(record: object) -> dict[str, object] | None: + payload: dict[str, object] = {} + parent_run_id = getattr(record, "optimization_parent_run_id", None) + if parent_run_id: + payload["parent_run_id"] = parent_run_id + round_number = getattr(record, "optimization_round", None) + if round_number: + payload["round"] = round_number + session = getattr(record, "optimization_session", None) + if isinstance(session, dict) and session: + payload["session"] = session + return payload or None + + +def _build_status_summary( + record: object, + events: list[object], + *, + run_dir: Path | str | None = None, +) -> dict[str, object]: + summary = _build_run_summary(record, run_dir=run_dir) + normalized_events = [_normalize_event_payload(event) for event in events] + summary["events"] = normalized_events + summary["recent_events"] = normalized_events[-5:] + summary["progress"] = _build_status_progress(record) + optimization = _build_status_optimization(record) + if optimization is not None: + summary["optimization"] = optimization + return summary + + +def _render_status_optimization(optimization: dict[str, object]) -> str | None: + session = optimization.get("session") + pieces: list[str] = [] + if isinstance(session, dict): + kind = session.get("kind") + if kind: + pieces.append(str(kind)) + optimizer = session.get("optimizer") + if optimizer: + pieces.append(f"optimizer={optimizer}") + current_round = session.get("current_round") + total_rounds = session.get("total_rounds") + if current_round and total_rounds: + pieces.append(f"round {current_round}/{total_rounds}") + elif current_round: + pieces.append(f"round {current_round}") + child_run_ids = session.get("child_run_ids") + if isinstance(child_run_ids, list): + pieces.append(f"child_runs={len(child_run_ids)}") + if "round" in optimization and not any(piece.startswith("round ") for piece in pieces): + pieces.append(f"round {optimization['round']}") + if optimization.get("parent_run_id"): + pieces.append(f"parent={optimization['parent_run_id']}") + if not pieces: + return None + return f"Optimization: {' '.join(pieces)}" + + +def _render_status_summary( + record: object, + events: list[object], + *, + run_dir: Path | str | None = None, +) -> str: + summary = _build_status_summary(record, events, run_dir=run_dir) + lines = [f"Run {summary['id']}: {summary['status']}"] + pipeline = summary.get("pipeline") + if isinstance(pipeline, dict) and pipeline.get("name"): + lines.append(f"Pipeline: {pipeline['name']}") + duration = summary.get("duration") + if duration is not None: + lines.append(f"Duration: {duration}") + started_at = summary.get("started_at") + if started_at: + lines.append(f"Started: {started_at}") + run_dir_value = summary.get("run_dir") + if run_dir_value is not None: + lines.append(f"Run dir: {run_dir_value}") + optimization = summary.get("optimization") + if isinstance(optimization, dict): + rendered = _render_status_optimization(optimization) + if rendered: + lines.append(rendered) + + progress = summary.get("progress") + if isinstance(progress, dict): + total_nodes = progress.get("total_nodes", 0) + progressed_nodes = progress.get("progressed_nodes", 0) + active_nodes = progress.get("active_nodes", []) + if total_nodes: + lines.append(f"Progress: {progressed_nodes}/{total_nodes} nodes, active {len(active_nodes)}") + if isinstance(active_nodes, list) and active_nodes: + active_entries: list[str] = [] + for node in active_nodes: + node_id = node.get("id") + status = node.get("status") + if not node_id or not status: + continue + rendered = f"{node_id} ({status}" + attempt = node.get("attempt") + if attempt: + rendered += f", attempt {attempt}" + rendered += ")" + active_entries.append(rendered) + if active_entries: + lines.append(f"Active: {', '.join(active_entries)}") + + recent_events = summary.get("recent_events") + if isinstance(recent_events, list) and recent_events: + lines.append("Recent events:") + for event_payload in recent_events: + if not isinstance(event_payload, dict): + continue + lines.append(f"- {_render_status_event(event_payload)}") + return "\n".join(lines) + + def _render_run_summary(record: object, run_dir: Path | str | None = None) -> str: summary = _build_run_summary(record, run_dir=run_dir) lines = [f"Run {summary['id']}: {summary['status']}"] @@ -583,6 +801,27 @@ def _echo_run_result(record: object, *, output: RunOutputFormat, run_dir: Path | typer.echo(json.dumps(record.model_dump(mode="json"), indent=2)) +def _echo_status_result( + record: object, + events: list[object], + *, + output: RunOutputFormat, + run_dir: Path | str | None = None, +) -> None: + resolved_output = _resolve_run_output(output) + if resolved_output == RunOutputFormat.SUMMARY: + typer.echo(_render_status_summary(record, events, run_dir=run_dir)) + return + if resolved_output == RunOutputFormat.JSON_SUMMARY: + typer.echo(json.dumps(_build_status_summary(record, events, run_dir=run_dir), indent=2)) + return + model_dump = getattr(record, "model_dump", None) + if callable(model_dump): + typer.echo(json.dumps(model_dump(mode="json"), indent=2)) + return + typer.echo(json.dumps(_build_run_summary(record, run_dir=run_dir), indent=2)) + + def _run_dir_for_record(store: object | None, run_id: str) -> Path | str | None: if store is None: return None @@ -2246,6 +2485,25 @@ def show( _echo_run_result(record, output=output, run_dir=_run_dir_for_record(store, run_id)) +@app.command() +def status( + run_id: str, + runs_dir: str = typer.Option(".agentflow/runs", envvar="AGENTFLOW_RUNS_DIR"), + output: RunOutputFormat = typer.Option( + RunOutputFormat.AUTO, + "--output", + help="Result output format. Defaults to `summary` on a terminal and `json` otherwise.", + ), +) -> None: + store = _build_store(runs_dir) + record = _get_run_or_exit(store, run_id, runs_dir=runs_dir) + events = [] + get_events = getattr(store, "get_events", None) + if callable(get_events): + events = get_events(run_id) + _echo_status_result(record, events, output=output, run_dir=_run_dir_for_record(store, run_id)) + + @app.command() def cancel( run_id: str, diff --git a/tests/test_cli.py b/tests/test_cli.py index 269c60b..5b8b77c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -268,6 +268,22 @@ def _completed_run( ) +def _run_event( + event_type: str, + *, + timestamp: str, + node_id: str | None = None, + data: dict[str, object] | None = None, +): + return SimpleNamespace( + timestamp=timestamp, + run_id="run-event", + type=event_type, + node_id=node_id, + data=data or {}, + ) + + def test_validate_command_outputs_normalized_pipeline(tmp_path): pipeline_path = tmp_path / "pipeline.json" pipeline_path.write_text( @@ -3250,6 +3266,172 @@ def _missing(run_id: str): assert "Run `missing-run` not found in `.agentflow/runs`." in result.stderr +def test_status_command_exits_for_missing_run(monkeypatch): + def _missing(run_id: str): + raise KeyError(run_id) + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace(get_run=_missing), + ) + + result = runner.invoke(app, ["status", "missing-run"]) + + assert result.exit_code == 1 + assert "Run `missing-run` not found in `.agentflow/runs`." in result.stderr + + +def test_status_command_renders_summary_with_recent_events(monkeypatch): + record = _completed_run( + "run-status", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="plan", agent=SimpleNamespace(value="codex")), + SimpleNamespace(id="review", agent=SimpleNamespace(value="claude")), + ], + nodes={ + "plan": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=2, + attempts=[SimpleNamespace(number=1), SimpleNamespace(number=2)], + stderr_lines=[], + stdout_lines=[], + ), + "review": SimpleNamespace( + status=SimpleNamespace(value="pending"), + current_attempt=0, + attempts=[], + stderr_lines=[], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + events = [ + _run_event("old_event", timestamp="2026-04-12T10:00:00+00:00"), + _run_event("node_started", timestamp="2026-04-12T10:00:01+00:00", node_id="plan"), + _run_event("node_retrying", timestamp="2026-04-12T10:00:02+00:00", node_id="plan", data={"attempt": 2}), + _run_event("node_trace", timestamp="2026-04-12T10:00:03+00:00", node_id="plan"), + _run_event("node_waiting", timestamp="2026-04-12T10:00:04+00:00", node_id="review"), + _run_event("node_skipped", timestamp="2026-04-12T10:00:05+00:00", node_id="review", data={"reason": "upstream_failure"}), + ] + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: events, + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + monkeypatch.setattr(agentflow.cli, "_stream_supports_tty_summary", lambda *, err: True) + + result = runner.invoke(app, ["status", "run-status"]) + + assert result.exit_code == 0 + assert "Run run-status: running" in result.stdout + assert "Pipeline: status-pipeline" in result.stdout + assert "Progress: 1/2 nodes, active 1" in result.stdout + assert "Active: plan (running, attempt 2)" in result.stdout + assert "Recent events:" in result.stdout + assert "old_event" not in result.stdout + for event_type in ("node_started", "node_retrying", "node_trace", "node_waiting", "node_skipped"): + assert event_type in result.stdout + + +def test_status_command_supports_json_summary_output(monkeypatch): + record = _completed_run( + "run-status-json", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="plan", agent=SimpleNamespace(value="codex")), + SimpleNamespace(id="review", agent=SimpleNamespace(value="claude")), + ], + nodes={ + "plan": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=2, + attempts=[SimpleNamespace(number=1), SimpleNamespace(number=2)], + stderr_lines=[], + stdout_lines=[], + ), + "review": SimpleNamespace( + status=SimpleNamespace(value="pending"), + current_attempt=0, + attempts=[], + stderr_lines=[], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + events = [ + _run_event("run_started", timestamp="2026-04-12T10:00:01+00:00"), + _run_event("node_started", timestamp="2026-04-12T10:00:02+00:00", node_id="plan"), + ] + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: events, + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + + result = runner.invoke(app, ["status", "run-status-json", "--output", "json-summary"]) + + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["id"] == "run-status-json" + assert [event["type"] for event in payload["events"]] == ["run_started", "node_started"] + assert [event["type"] for event in payload["recent_events"]] == ["run_started", "node_started"] + assert payload["progress"] == { + "total_nodes": 2, + "progressed_nodes": 1, + "active_nodes": [{"id": "plan", "status": "running", "attempt": 2}], + "status_counts": {"running": 1, "pending": 1}, + "progress_percent": 50.0, + } + + +def test_status_command_shows_optimization_session(monkeypatch): + record = _completed_run( + "run-status-opt", + pipeline_name="status-pipeline", + status="running", + ) + record.finished_at = None + record.optimization_session = { + "kind": "graph", + "optimizer": "codex", + "total_rounds": 3, + "current_round": 2, + "child_run_ids": ["child-1", "child-2"], + } + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: [], + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + monkeypatch.setattr(agentflow.cli, "_stream_supports_tty_summary", lambda *, err: True) + + result = runner.invoke(app, ["status", "run-status-opt"]) + + assert result.exit_code == 0 + assert "Optimization: graph optimizer=codex round 2/3 child_runs=2" in result.stdout + + def test_cancel_outputs_summary_for_existing_run(monkeypatch): captured: dict[str, object] = {} From d7e1bf2002c52ec1c5ceae583f953a6e7090d4e5 Mon Sep 17 00:00:00 2001 From: Jacob Date: Mon, 13 Apr 2026 15:38:56 +0000 Subject: [PATCH 3/9] feat: surface evolution progress in run status --- agentflow/cli.py | 61 ++++++++++++ agentflow/dsl.py | 5 +- agentflow/tuned_agents.py | 73 ++++++++++++++- tests/test_cli.py | 186 +++++++++++++++++++++++++++++++++++++ tests/test_tuned_agents.py | 93 +++++++++++++++++++ 5 files changed, 414 insertions(+), 4 deletions(-) diff --git a/agentflow/cli.py b/agentflow/cli.py index 748845f..de19285 100644 --- a/agentflow/cli.py +++ b/agentflow/cli.py @@ -518,6 +518,8 @@ def _build_run_summary(record: object, run_dir: Path | str | None = None) -> dic _STATUS_INACTIVE_NODE_STATUSES = {"pending", "queued", "ready"} _STATUS_ACTIVE_NODE_STATUSES = {"running", "retrying", "cancelling"} +_EVOLUTION_PROGRESS_KEYS = {"agentflow_event", "stage", "attempt", "status", "command", "detail", "node_id"} +_EVOLUTION_PROGRESS_PREVIEW_LIMIT = 5 def _normalize_event_payload(event: object) -> dict[str, object]: @@ -581,6 +583,56 @@ def _render_status_event(event_payload: dict[str, object]) -> str: return " ".join(parts) +def _parse_evolution_progress_line(line: str) -> dict[str, object] | None: + try: + payload = json.loads(line) + except (TypeError, json.JSONDecodeError): + return None + if not isinstance(payload, dict): + return None + if payload.get("agentflow_event") != "evolution_progress": + return None + stage = payload.get("stage") + attempt = payload.get("attempt") + if not stage or attempt is None: + return None + return {key: payload[key] for key in _EVOLUTION_PROGRESS_KEYS if key in payload} + + +def _build_status_evolution_progress(record: object) -> list[dict[str, object]]: + nodes: dict[str, object] = getattr(record, "nodes", {}) or {} + events: list[dict[str, object]] = [] + for node_id, node in nodes.items(): + for line in getattr(node, "stderr_lines", []) or []: + if not isinstance(line, str): + continue + event = _parse_evolution_progress_line(line) + if event: + event["node_id"] = node_id + events.append(event) + return events + + +def _render_evolution_progress(event: dict[str, object]) -> str: + node_id = event.get("node_id") or "-" + stage = event.get("stage") or "-" + status = event.get("status") + label = f"{stage} {status}" if status else str(stage) + pieces: list[str] = [] + attempt = event.get("attempt") + if attempt is not None: + pieces.append(f"attempt {attempt}") + command = event.get("command") + if command: + pieces.append(f"command={command}") + detail = event.get("detail") + if detail: + pieces.append(f"detail={detail}") + if not pieces: + return f"{node_id}: {label}" + return f"{node_id}: {label} ({', '.join(pieces)})" + + def _build_status_progress(record: object) -> dict[str, object]: nodes: dict[str, object] = getattr(record, "nodes", {}) or {} pipeline_nodes = _pipeline_node_map(record) @@ -641,6 +693,7 @@ def _build_status_summary( summary["events"] = normalized_events summary["recent_events"] = normalized_events[-5:] summary["progress"] = _build_status_progress(record) + summary["evolution_progress"] = _build_status_evolution_progress(record) optimization = _build_status_optimization(record) if optimization is not None: summary["optimization"] = optimization @@ -724,6 +777,14 @@ def _render_status_summary( if active_entries: lines.append(f"Active: {', '.join(active_entries)}") + evolution_progress = summary.get("evolution_progress") + if isinstance(evolution_progress, list) and evolution_progress: + lines.append("Evolution progress:") + for event in evolution_progress[-_EVOLUTION_PROGRESS_PREVIEW_LIMIT:]: + if not isinstance(event, dict): + continue + lines.append(f"- {_render_evolution_progress(event)}") + recent_events = summary.get("recent_events") if isinstance(recent_events, list) and recent_events: lines.append("Recent events:") diff --git a/agentflow/dsl.py b/agentflow/dsl.py index 7014faf..34acc74 100644 --- a/agentflow/dsl.py +++ b/agentflow/dsl.py @@ -432,9 +432,12 @@ def evolve( payload_json = json.dumps(payload, ensure_ascii=False) code = ( "import json\n" + "import sys\n" "from agentflow.tuned_agents import run_evolution_from_payload\n\n" + "def _evolution_progress(event):\n" + " print(json.dumps(event, ensure_ascii=False), file=sys.stderr, flush=True)\n\n" f"payload = json.loads(r'''{payload_json}''')\n" - "result = run_evolution_from_payload(payload)\n" + "result = run_evolution_from_payload(payload, progress=_evolution_progress)\n" "print(json.dumps(result, ensure_ascii=False))\n" ) evolve_task_id = task_id or f"evolve_{profile.replace('-', '_')}" diff --git a/agentflow/tuned_agents.py b/agentflow/tuned_agents.py index c6f88a3..b59ddbd 100644 --- a/agentflow/tuned_agents.py +++ b/agentflow/tuned_agents.py @@ -6,7 +6,7 @@ import subprocess from dataclasses import dataclass from pathlib import Path -from typing import Any, Literal +from typing import Any, Callable, Literal from uuid import uuid4 import yaml @@ -600,7 +600,10 @@ def _write_failure_metadata( _write_json(version_dir / "version.json", failed_version.model_dump(mode="json")) -def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: +def run_evolution_from_payload( + payload: dict[str, Any], + progress: Callable[[dict[str, object]], None] | None = None, +) -> dict[str, Any]: request = EvolutionRequest.model_validate(payload) workspace = Path(request.workspace_dir or os.getcwd()).expanduser().resolve() resolved_config = load_tuner_config(workspace, request.profile) @@ -618,6 +621,29 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: if not request.source_nodes: raise ValueError("evolution requires at least one source node") + def _emit_progress( + stage: str, + *, + attempt: int, + status: str | None = None, + command: str | None = None, + detail: str | None = None, + ) -> None: + if progress is None: + return + payload: dict[str, object] = { + "agentflow_event": "evolution_progress", + "stage": stage, + "attempt": attempt, + } + if status is not None: + payload["status"] = status + if command is not None: + payload["command"] = command + if detail is not None: + payload["detail"] = detail + progress(payload) + version_id = uuid4().hex[:12] version_dir = tuned_agent_version_dir(workspace, resolved_config.agent_name, version_id) traces_dir = version_dir / "traces" @@ -639,8 +665,11 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: resolved_executable = _resolved_executable_path(resolved_config.config, repo_workdir) failure_summary: str | None = None + _emit_progress("start", attempt=1) + for attempt_number in range(1, resolved_config.config.max_attempts + 1): attempt_dir = ensure_dir(attempt_root / f"attempt-{attempt_number}") + _emit_progress("attempt", attempt=attempt_number, status="started") prompt = _optimizer_prompt( resolved_config, repo_dir=repo_workdir, @@ -650,6 +679,7 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: ) _write_text(attempt_dir / "optimizer-prompt.txt", prompt) + _emit_progress("optimizer", attempt=attempt_number, status="started", command="optimizer") optimizer_result = _run_optimizer( optimizer_kind, prompt=prompt, @@ -660,8 +690,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "optimizer", optimizer_result) if optimizer_result.exit_code != 0: failure_summary = _attempt_summary("Optimizer", optimizer_result) + _emit_progress("optimizer", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("optimizer", attempt=attempt_number, status="completed") + _emit_progress( + "build", + attempt=attempt_number, + status="started", + command=resolved_config.config.build_command, + ) build_result = _run_shell_command( resolved_config.config.build_command, repo_dir=repo_workdir, @@ -673,8 +711,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "build", build_result) if build_result.exit_code != 0: failure_summary = _attempt_summary("Build", build_result) + _emit_progress("build", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("build", attempt=attempt_number, status="completed") + _emit_progress( + "test", + attempt=attempt_number, + status="started", + command=resolved_config.config.test_command, + ) test_result = _run_shell_command( resolved_config.config.test_command, repo_dir=repo_workdir, @@ -686,8 +732,16 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "test", test_result) if test_result.exit_code != 0: failure_summary = _attempt_summary("Test", test_result) + _emit_progress("test", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("test", attempt=attempt_number, status="completed") + _emit_progress( + "smoke", + attempt=attempt_number, + status="started", + command=resolved_config.config.smoke_command, + ) smoke_result = _run_shell_command( resolved_config.config.smoke_command, repo_dir=repo_workdir, @@ -699,14 +753,20 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: _write_attempt_artifact(attempt_dir, "smoke", smoke_result) if smoke_result.exit_code != 0: failure_summary = _attempt_summary("Smoke", smoke_result) + _emit_progress("smoke", attempt=attempt_number, status="failed", detail=failure_summary) continue + _emit_progress("smoke", attempt=attempt_number, status="completed") executable_path = Path(resolved_executable) if not executable_path.exists(): - raise FileNotFoundError( + detail = ( f"successful evolution did not produce executable `{executable_path}`; " "set `executable_path` in the tuner config or make the build produce the default path" ) + _emit_progress("final", attempt=attempt_number, status="failed", detail=detail) + raise FileNotFoundError( + detail + ) version = TunedAgentVersion( id=version_id, @@ -722,6 +782,7 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: summary=_parse_agent_output(optimizer_kind, f"optimizer_{version_id}", optimizer_result.stdout), ) register_tuned_agent_version(workspace, version) + _emit_progress("final", attempt=attempt_number, status="success") return { "ok": True, "agent_name": version.agent_name, @@ -733,6 +794,12 @@ def run_evolution_from_payload(payload: dict[str, Any]) -> dict[str, Any]: "traces": copied_traces, } + _emit_progress( + "final", + attempt=resolved_config.config.max_attempts, + status="failed", + detail=failure_summary or "evolution failed without diagnostics", + ) _write_failure_metadata( version_dir, agent_name=resolved_config.agent_name, diff --git a/tests/test_cli.py b/tests/test_cli.py index 5b8b77c..38b5230 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3400,6 +3400,192 @@ def test_status_command_supports_json_summary_output(monkeypatch): } +def test_status_command_renders_evolution_progress(monkeypatch): + record = _completed_run( + "run-status-evolve", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="plan", agent=SimpleNamespace(value="codex")), + SimpleNamespace(id="evolve", agent=SimpleNamespace(value="python")), + SimpleNamespace(id="evolve_b", agent=SimpleNamespace(value="python")), + ], + nodes={ + "plan": SimpleNamespace( + status=SimpleNamespace(value="completed"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[], + stdout_lines=[], + ), + "evolve": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[ + "not-json", + json.dumps({"agentflow_event": "evolution_progress", "stage": "start", "attempt": 1}), + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "started", + "command": "build", + } + ), + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "completed", + } + ), + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "final", + "attempt": 1, + "status": "success", + } + ), + ], + stdout_lines=[], + ), + "evolve_b": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[ + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "started", + "command": "build-b", + } + ) + ], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + events = [] + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: events, + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + monkeypatch.setattr(agentflow.cli, "_stream_supports_tty_summary", lambda *, err: True) + + result = runner.invoke(app, ["status", "run-status-evolve"]) + + assert result.exit_code == 0 + assert "Evolution progress:" in result.stdout + assert "evolve: start (attempt 1)" in result.stdout + assert "evolve: build started (attempt 1, command=build)" in result.stdout + assert "evolve: build completed (attempt 1)" in result.stdout + assert "evolve: final success (attempt 1)" in result.stdout + assert "evolve_b: build started (attempt 1, command=build-b)" in result.stdout + + +def test_status_command_returns_evolution_progress_json(monkeypatch): + record = _completed_run( + "run-status-evolve-json", + pipeline_name="status-pipeline", + status="running", + pipeline_nodes=[ + SimpleNamespace(id="plan", agent=SimpleNamespace(value="codex")), + SimpleNamespace(id="evolve", agent=SimpleNamespace(value="python")), + ], + nodes={ + "plan": SimpleNamespace( + status=SimpleNamespace(value="completed"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[], + stdout_lines=[], + ), + "evolve": SimpleNamespace( + status=SimpleNamespace(value="running"), + current_attempt=1, + attempts=[SimpleNamespace(number=1)], + stderr_lines=[ + json.dumps({"agentflow_event": "evolution_progress", "stage": "start", "attempt": 1}), + "plain text", + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "started", + "command": "build", + } + ), + json.dumps( + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "failed", + "detail": "exit 1", + } + ), + ], + stdout_lines=[], + ), + }, + ) + record.finished_at = None + + monkeypatch.setattr( + agentflow.cli, + "_build_store", + lambda runs_dir: SimpleNamespace( + get_run=lambda run_id: record, + get_events=lambda run_id: [], + run_dir=lambda run_id: Path(runs_dir) / run_id, + ), + ) + + result = runner.invoke(app, ["status", "run-status-evolve-json", "--output", "json-summary"]) + + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["evolution_progress"] == [ + { + "agentflow_event": "evolution_progress", + "stage": "start", + "attempt": 1, + "node_id": "evolve", + }, + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "started", + "command": "build", + "node_id": "evolve", + }, + { + "agentflow_event": "evolution_progress", + "stage": "build", + "attempt": 1, + "status": "failed", + "detail": "exit 1", + "node_id": "evolve", + }, + ] + + def test_status_command_shows_optimization_session(monkeypatch): record = _completed_run( "run-status-opt", diff --git a/tests/test_tuned_agents.py b/tests/test_tuned_agents.py index 3081369..50de88b 100644 --- a/tests/test_tuned_agents.py +++ b/tests/test_tuned_agents.py @@ -247,6 +247,99 @@ def fake_shell(command_template: str, *, repo_dir: Path, version_dir: Path, trac assert Path(result["executable"]).exists() +def test_run_evolution_from_payload_reports_progress(tmp_path, monkeypatch): + workspace = tmp_path + config_dir = workspace / "agent_tuner" + config_dir.mkdir() + (config_dir / "codex.yaml").write_text( + "\n".join( + [ + "name: codex_tuned", + "base_agent: codex", + "repo_url: https://example.invalid/repo.git", + "build_command: build", + "test_command: test", + "smoke_command: smoke", + "evolution_prompt: improve the agent", + "executable_path: .venv/bin/codex", + "max_attempts: 2", + ] + ), + encoding="utf-8", + ) + trace_path = workspace / "trace.jsonl" + trace_path.write_text('{"kind":"assistant_message","content":"hello"}\n', encoding="utf-8") + + def fake_clone(_config, repo_dir: Path) -> None: + repo_dir.mkdir(parents=True) + (repo_dir / "README.md").write_text("base", encoding="utf-8") + + attempt_state = {"smoke": 0} + + def fake_optimizer(_optimizer: AgentKind, *, prompt: str, repo_dir: Path, runtime_dir: Path, env: dict[str, str]): + runtime_dir.mkdir(parents=True, exist_ok=True) + (repo_dir / "README.md").write_text(prompt, encoding="utf-8") + return CommandExecution( + command="optimizer", + exit_code=0, + stdout='{"type":"response.output_item.done","item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"updated"}]}}', + stderr="", + ) + + def fake_shell(command_template: str, *, repo_dir: Path, version_dir: Path, traces_dir: Path, executable: str, env: dict[str, str]): + if command_template == "build": + executable_path = Path(executable) + executable_path.parent.mkdir(parents=True, exist_ok=True) + executable_path.write_text("#!/bin/sh\nexit 0\n", encoding="utf-8") + if command_template == "smoke": + attempt_state["smoke"] += 1 + if attempt_state["smoke"] == 1: + return CommandExecution(command="smoke", exit_code=1, stdout="", stderr="ping failed") + return CommandExecution(command=command_template, exit_code=0, stdout="ok", stderr="") + + monkeypatch.setattr("agentflow.tuned_agents._clone_repo", fake_clone) + monkeypatch.setattr("agentflow.tuned_agents._run_optimizer", fake_optimizer) + monkeypatch.setattr("agentflow.tuned_agents._run_shell_command", fake_shell) + + progress: list[dict[str, object]] = [] + + def capture(event: dict[str, object]) -> None: + progress.append(event) + + result = run_evolution_from_payload( + { + "profile": "codex", + "target": "codex", + "optimizer": "codex", + "source_nodes": ["plan"], + "trace_paths": {"plan": str(trace_path)}, + "workspace_dir": str(workspace), + "run_id": "run-progress", + }, + progress=capture, + ) + + assert result["ok"] is True + assert any(event.get("agentflow_event") == "evolution_progress" for event in progress) + stages = [(event.get("stage"), event.get("status"), event.get("attempt")) for event in progress] + assert stages[0][0] == "start" + assert ("attempt", "started", 1) in stages + assert ("optimizer", "started", 1) in stages + assert ("optimizer", "completed", 1) in stages + assert ("build", "started", 1) in stages + assert ("build", "completed", 1) in stages + assert ("test", "started", 1) in stages + assert ("test", "completed", 1) in stages + assert ("smoke", "started", 1) in stages + assert ("smoke", "failed", 1) in stages + assert ("attempt", "started", 2) in stages + assert ("final", "success", 2) in stages + smoke_failure = next(event for event in progress if event.get("stage") == "smoke" and event.get("status") == "failed") + assert "Smoke" in str(smoke_failure.get("detail", "")) + build_start = next(event for event in progress if event.get("stage") == "build" and event.get("status") == "started") + assert build_start.get("command") == "build" + + def test_optimizer_prompt_explicitly_allows_prompt_and_tool_edits(tmp_path): resolved = ResolvedTunerConfig( profile="codex", From 38c15ff286f38cf2c839d7b6fe95eea95547b6fb Mon Sep 17 00:00:00 2001 From: Jacob Date: Mon, 13 Apr 2026 15:39:50 +0000 Subject: [PATCH 4/9] docs: add async status daemon spec and plan --- .../plans/2026-04-13-async-status-daemon.md | 266 ++++++++++++++++++ .../2026-04-13-async-status-daemon-design.md | 57 ++++ 2 files changed, 323 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-13-async-status-daemon.md create mode 100644 docs/superpowers/specs/2026-04-13-async-status-daemon-design.md diff --git a/docs/superpowers/plans/2026-04-13-async-status-daemon.md b/docs/superpowers/plans/2026-04-13-async-status-daemon.md new file mode 100644 index 0000000..e0f993c --- /dev/null +++ b/docs/superpowers/plans/2026-04-13-async-status-daemon.md @@ -0,0 +1,266 @@ +# Async Status Daemon Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add detached AgentFlow execution with `agentflow run ... -d`, a new `agentflow status ` process view, PR11 evolution progress visibility, and PR12 graph optimization runtime support in an isolated worktree branch. + +**Architecture:** Detached runs will be submitted to a long-lived local daemon built on the existing FastAPI app and `Orchestrator`, while `status` will read the persistent run store directly. PR12 graph optimization support will be ported into this branch so optimization runs generate first-class round/session data, and PR11 evolution will emit structured progress lines for status rendering. + +**Tech Stack:** Python, Typer, FastAPI, httpx, pytest, existing `RunStore`/`Orchestrator` runtime. + +--- + +### Task 1: Port PR12 Graph Optimization Runtime + +**Files:** +- Create: `agentflow/graph_optimizer.py` +- Modify: `agentflow/specs.py` +- Modify: `agentflow/orchestrator.py` +- Test: `tests/test_graph_optimizer.py` + +- [ ] **Step 1: Add the failing graph optimization tests** + +Port the PR12 graph-optimization tests into a new test file covering: +- successful multi-round optimization +- retry on invalid optimized pipeline +- failure after exhausting optimizer validation attempts + +Run: + +```bash +pytest tests/test_graph_optimizer.py -q +``` + +Expected: FAIL because the current branch does not define graph-optimization runtime support. + +- [ ] **Step 2: Add graph optimization data model support** + +Implement the `PipelineSpec` and `RunRecord` fields needed for graph optimization: +- `optimizer` +- `n_run` +- `uses_graph_optimizer` +- `optimization_parent_run_id` +- `optimization_round` +- `optimization_session` + +Run: + +```bash +pytest tests/test_graph_optimizer.py -q +``` + +Expected: FAIL later in orchestrator/runtime paths instead of schema/attribute errors. + +- [ ] **Step 3: Add graph optimization runtime helpers and orchestrator flow** + +Port the PR12 runtime pieces: +- editable pipeline artifact generation +- graph report generation +- optimization round directories +- child run creation +- round/session events +- optimizer retry/accept/failure flow + +Run: + +```bash +pytest tests/test_graph_optimizer.py -q +``` + +Expected: PASS. + +- [ ] **Step 4: Commit the graph optimization runtime** + +```bash +git add agentflow/graph_optimizer.py agentflow/specs.py agentflow/orchestrator.py tests/test_graph_optimizer.py +git commit -m "feat: port graph optimization runtime support" +``` + +### Task 2: Add Detached Daemon Submission + +**Files:** +- Modify: `agentflow/cli.py` +- Modify: `tests/test_cli.py` +- Test: `tests/test_cli.py` + +- [ ] **Step 1: Add failing CLI tests for detached run submission** + +Add tests for: +- `agentflow run pipeline.py -d` returning a queued/running `run_id` without waiting +- auto-start or reuse of the local daemon client path +- output shape for summary/json/json-summary detached results + +Run: + +```bash +pytest tests/test_cli.py -q -k "detach or daemon" +``` + +Expected: FAIL because `run` has no detach mode and no daemon submission helpers. + +- [ ] **Step 2: Implement daemon metadata and ensure-daemon helpers** + +Add CLI-side helpers that: +- compute a daemon metadata file path per `runs_dir` +- probe health on the configured host/port +- start `agentflow serve` in the background when needed +- wait until the daemon is reachable + +- [ ] **Step 3: Implement `run -d`** + +Update CLI `run` so: +- default mode keeps existing synchronous behavior +- `-d/--detach` loads the pipeline, ensures the daemon, submits via HTTP, prints the returned run record summary, and exits without waiting + +Run: + +```bash +pytest tests/test_cli.py -q -k "detach or daemon" +``` + +Expected: PASS. + +- [ ] **Step 4: Commit detached submission** + +```bash +git add agentflow/cli.py tests/test_cli.py +git commit -m "feat: add detached daemon-backed run submission" +``` + +### Task 3: Add `agentflow status ` + +**Files:** +- Modify: `agentflow/cli.py` +- Modify: `tests/test_cli.py` +- Test: `tests/test_cli.py` + +- [ ] **Step 1: Add failing tests for `status`** + +Add tests covering: +- missing run handling +- summary rendering for in-flight runs +- JSON summary payload including events and active node progress +- PR12 optimization-session visibility + +Run: + +```bash +pytest tests/test_cli.py -q -k "status_command or run_status" +``` + +Expected: FAIL because there is no `status` command or status renderer. + +- [ ] **Step 2: Implement status builders and renderers** + +Add new helpers that: +- load run + events from `RunStore` +- render process-oriented summaries +- include event timeline slices +- surface optimization session and round info when present + +- [ ] **Step 3: Add the `status` command** + +Implement a new Typer command: +- `agentflow status ` +- uses direct store reads rather than daemon queries for persistent inspection +- supports existing output styles plus richer JSON summary + +Run: + +```bash +pytest tests/test_cli.py -q -k "status_command or run_status" +``` + +Expected: PASS. + +- [ ] **Step 4: Commit status command** + +```bash +git add agentflow/cli.py tests/test_cli.py +git commit -m "feat: add run status process view" +``` + +### Task 4: Add PR11 Evolution Progress Visibility + +**Files:** +- Modify: `agentflow/tuned_agents.py` +- Modify: `agentflow/dsl.py` +- Modify: `agentflow/cli.py` +- Modify: `tests/test_tuned_agents.py` +- Modify: `tests/test_cli.py` + +- [ ] **Step 1: Add failing tests for evolution progress reporting** + +Add tests covering: +- `run_evolution_from_payload()` progress callback/stage notifications +- `dsl.evolve()` generated node code emitting structured progress lines +- `status` rendering evolution stage lines from run artifacts/events or trace-derived data + +Run: + +```bash +pytest tests/test_tuned_agents.py tests/test_cli.py -q -k "evolution_progress or evolve_status" +``` + +Expected: FAIL because no structured evolution progress reporting exists. + +- [ ] **Step 2: Implement progress callback support in tuned agent evolution** + +Update `run_evolution_from_payload()` to report: +- start +- attempt start +- optimizer/build/test/smoke start and completion/failure +- final success/failure + +- [ ] **Step 3: Emit progress lines from pipeline-driven evolve nodes** + +Update `dsl.evolve()` generated Python code so pipeline execution emits structured progress lines to stderr while preserving the final stdout JSON result. + +- [ ] **Step 4: Surface evolution progress in status rendering** + +Teach status rendering to recognize and show evolution phase lines from the run’s stored trace/stderr data. + +Run: + +```bash +pytest tests/test_tuned_agents.py tests/test_cli.py -q -k "evolution_progress or evolve_status" +``` + +Expected: PASS. + +- [ ] **Step 5: Commit evolution progress support** + +```bash +git add agentflow/tuned_agents.py agentflow/dsl.py agentflow/cli.py tests/test_tuned_agents.py tests/test_cli.py +git commit -m "feat: surface evolution progress in run status" +``` + +### Task 5: End-to-End Verification + +**Files:** +- Modify: `tests/test_api.py` +- Test: `tests/test_api.py` +- Test: `tests/test_graph_optimizer.py` +- Test: `tests/test_tuned_agents.py` +- Test: `tests/test_cli.py` + +- [ ] **Step 1: Add or adjust integration tests if needed** + +Add focused API/integration coverage only where the earlier tasks reveal missing protection, especially around detached run submission paths and optimization session payload shape. + +- [ ] **Step 2: Run the focused verification suite** + +Run: + +```bash +pytest tests/test_api.py tests/test_graph_optimizer.py tests/test_tuned_agents.py tests/test_cli.py -q +``` + +Expected: PASS for all tests added or touched by this feature. Pre-existing unrelated failures elsewhere in the repo are explicitly out of scope. + +- [ ] **Step 3: Commit verification-only follow-ups** + +```bash +git add tests/test_api.py tests/test_graph_optimizer.py tests/test_tuned_agents.py tests/test_cli.py +git commit -m "test: cover async status daemon flows" +``` diff --git a/docs/superpowers/specs/2026-04-13-async-status-daemon-design.md b/docs/superpowers/specs/2026-04-13-async-status-daemon-design.md new file mode 100644 index 0000000..e567990 --- /dev/null +++ b/docs/superpowers/specs/2026-04-13-async-status-daemon-design.md @@ -0,0 +1,57 @@ +# Async Status Daemon Design + +**Problem** + +`agentflow run` currently submits a run and then blocks until completion in the foreground CLI process. `Orchestrator.submit()` already schedules background work, but the worker thread is a daemon thread owned by the current process, so `agentflow run -d` cannot work by simply skipping `wait()`. The process would exit and the run would die. + +**User-facing goal** + +- `agentflow run pipeline.py -d` submits a run, returns a stable `run_id`, and exits immediately. +- `agentflow status ` shows in-flight progress, not only the final result. +- The status view must be able to show PR11 evolution process details and PR12 graph-optimization process details. + +**Design** + +1. Detached execution uses a long-lived local daemon based on the existing FastAPI app and `Orchestrator`, rather than a per-run child process. The CLI becomes a thin client for detached submission. +2. The daemon lifecycle is managed in-process by new CLI helpers that persist a small daemon metadata file next to the run store and auto-start `agentflow serve` in the background when needed. +3. `status` is a new CLI command. It reads the persistent run store directly for run data and events, so historical runs remain inspectable even if the daemon is offline. +4. PR12 support is brought into this branch by porting the graph optimization runtime model: `optimization_session`, parent/child run relationships, round events, and child-run bookkeeping. +5. PR11 process visibility is added by instrumenting `run_evolution_from_payload()` with progress callbacks and making the generated `dsl.evolve()` Python node emit structured progress lines to stderr. `status` interprets those lines into an evolution timeline. + +**Scope** + +- Implement detached run submission. +- Implement daemon auto-start helpers and metadata. +- Implement `status`. +- Port PR12 graph optimization support. +- Add PR11 evolution progress reporting for pipeline-driven evolution nodes. + +**Out of scope** + +- Reworking `agentflow evolve` into a fully orchestrated standalone run type with its own detached lifecycle. +- Merging the worktree branch back to `master`. + +**Architecture** + +- CLI detached submission: + - Ensure daemon running for a given `runs_dir`. + - Submit pipeline via existing HTTP `POST /api/runs`. + - Print queued/running run summary with `run_id`. +- CLI status: + - Build `RunStore(runs_dir)`. + - Load `RunRecord` + `events.jsonl`. + - Render a richer process summary and JSON payload. +- Graph optimization: + - Port the PR12 `optimization_session` fields, round directories, and optimization events so status has first-class data to render. +- Evolution process: + - Add a callback-based progress API in `agentflow.tuned_agents`. + - Emit structured phase lines from generated evolve-node Python code. + - Parse and surface them in status output. + +**Testing** + +- Add targeted CLI tests for `run -d`, daemon reuse, and `status`. +- Add targeted API/daemon helper tests where useful. +- Port/enable graph optimization tests from PR12. +- Add evolution progress/status tests for PR11-related process display. + From 7e35007c0719965a057e6edc50fc1c192473f310 Mon Sep 17 00:00:00 2001 From: Jacob Date: Tue, 14 Apr 2026 04:02:08 +0000 Subject: [PATCH 5/9] fix: normalize optimizer-edited child pipelines --- agentflow/graph_optimizer.py | 33 +++++++++++++++++++++++++ agentflow/orchestrator.py | 4 ++-- tests/test_graph_optimizer.py | 45 +++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/agentflow/graph_optimizer.py b/agentflow/graph_optimizer.py index e2dd31a..f7c90d6 100644 --- a/agentflow/graph_optimizer.py +++ b/agentflow/graph_optimizer.py @@ -2,10 +2,13 @@ import json import shutil +import subprocess +import sys from pathlib import Path from pprint import pformat from typing import Any +from agentflow.loader import load_pipeline_from_data from agentflow.specs import PipelineSpec, RunRecord, normalize_agent_name from agentflow.store import RunStore from agentflow.utils import ensure_dir, json_dumps @@ -204,3 +207,33 @@ def write_validation_result(path: Path, *, ok: bool, error: str | None = None) - if error is not None: payload["error"] = error path.write_text(json_dumps(payload), encoding="utf-8") + + +def load_child_pipeline_from_path(path: Path) -> PipelineSpec: + """Load optimizer-edited pipeline as a child-run shape (`optimizer=None`, `n_run=1`).""" + + path = Path(path) + if path.suffix == ".py": + result = subprocess.run( + [sys.executable, str(path)], + capture_output=True, + text=True, + cwd=str(path.parent), + ) + if result.returncode != 0: + raise ValueError(f"pipeline script `{path}` failed:\n{result.stderr.strip()}") + raw_text = result.stdout + else: + raw_text = path.read_text(encoding="utf-8") + + try: + parsed = json.loads(raw_text) + except json.JSONDecodeError as exc: + raise ValueError(f"optimized pipeline `{path}` did not produce JSON: {exc}") from exc + + if not isinstance(parsed, dict): + raise ValueError(f"optimized pipeline `{path}` did not produce an object payload") + + parsed["optimizer"] = None + parsed["n_run"] = 1 + return load_pipeline_from_data(parsed, base_dir=path.parent) diff --git a/agentflow/orchestrator.py b/agentflow/orchestrator.py index 6028de8..1e7b321 100644 --- a/agentflow/orchestrator.py +++ b/agentflow/orchestrator.py @@ -31,12 +31,12 @@ OPTIMIZER_VALIDATION_FILENAME, build_graph_report, copy_run_traces, + load_child_pipeline_from_path, render_graph_optimizer_prompt, write_editable_pipeline_python, write_optimizer_result, write_validation_result, ) -from agentflow.loader import load_pipeline_from_path from agentflow.prepared import ExecutionPaths, PreparedExecution, build_execution_paths from agentflow.runners.registry import RunnerRegistry, default_runner_registry from agentflow.specs import ( @@ -441,7 +441,7 @@ def _optimizer_failure_summary( write_validation_result(round_dir / OPTIMIZER_VALIDATION_FILENAME, ok=False, error=failure_summary) else: try: - loaded_pipeline = load_pipeline_from_path(pipeline_path) + loaded_pipeline = load_child_pipeline_from_path(pipeline_path) except Exception as exc: failure_summary = _optimizer_failure_summary( "Optimized pipeline", diff --git a/tests/test_graph_optimizer.py b/tests/test_graph_optimizer.py index 89273be..ce9b980 100644 --- a/tests/test_graph_optimizer.py +++ b/tests/test_graph_optimizer.py @@ -216,3 +216,48 @@ def fake_optimizer(_optimizer, *, prompt: str, repo_dir: Path, runtime_dir: Path ) assert validation_payload["ok"] is False assert "failed to load" in validation_payload["error"] + + +def test_orchestrator_normalizes_optimizer_edits_to_iteration_controls(tmp_path, monkeypatch): + orchestrator = make_orchestrator(tmp_path) + + def fake_optimizer(_optimizer, *, prompt: str, repo_dir: Path, runtime_dir: Path, env: dict[str, str]): + pipeline_path = repo_dir / "pipeline.py" + pipeline_path.write_text( + ( + "from __future__ import annotations\n\n" + "import json\n\n" + "PIPELINE = {\n" + " 'name': 'graph-opt-nrun-edit',\n" + f" 'working_dir': {str(tmp_path)!r},\n" + " 'n_run': 3,\n" + " 'nodes': [\n" + " {'id': 'plan', 'agent': 'codex', 'prompt': 'round two'},\n" + " ],\n" + "}\n\n" + "if __name__ == '__main__':\n" + " print(json.dumps(PIPELINE, ensure_ascii=False, indent=2))\n" + ), + encoding="utf-8", + ) + return CommandExecution(command="optimizer", exit_code=0, stdout="updated pipeline", stderr="") + + monkeypatch.setattr("agentflow.orchestrator._run_optimizer", fake_optimizer) + + pipeline = PipelineSpec.model_validate( + { + "name": "graph-opt-nrun-edit", + "working_dir": str(tmp_path), + "optimizer": "codex", + "n_run": 2, + "nodes": [{"id": "plan", "agent": "codex", "prompt": "round one"}], + } + ) + + run = asyncio.run(orchestrator.submit(pipeline)) + completed = asyncio.run(orchestrator.wait(run.id, timeout=5)) + + assert completed.status == RunStatus.COMPLETED + assert completed.nodes["plan"].output == "round two" + assert completed.optimization_session is not None + assert completed.optimization_session["current_round"] == pipeline.n_run From 289cf924a99618164878eb135f00b1b83c3d020c Mon Sep 17 00:00:00 2001 From: Jacob Date: Tue, 14 Apr 2026 04:09:03 +0000 Subject: [PATCH 6/9] docs: update async analysis for master baseline --- reference/async.md | 543 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 543 insertions(+) create mode 100644 reference/async.md diff --git a/reference/async.md b/reference/async.md new file mode 100644 index 0000000..7977398 --- /dev/null +++ b/reference/async.md @@ -0,0 +1,543 @@ +# AgentFlow Async 主线需求分析与实现说明 + +本文档基于 **2026-04-14 当前 `master`** 的代码状态撰写。 + +和前一版不同的是: + +- PR11 已经在主线上。 +- PR12 也已经在主线上。 +- 因此这里不再讨论“要不要先把 PR12 合并进来”,而是直接基于 **已包含 PR11 + PR12 的 master** 分析老板需求,并记录这条主线任务在 `feat/async-status-masterline` worktree 分支里的实现结果。 + +--- + +## 1. 老板需求的真实含义 + +老板原始要求可以拆成四个不可分割的子目标: + +1. `agentflow run pipeline.py -d` 必须返回一个稳定的任务 id。 +2. CLI 退出后,任务必须继续真实执行,不能是假 detach。 +3. `agentflow status ` 必须能查看运行中的任务,而不是只看最终结果。 +4. `status` 必须能显示 PR11 / PR12 对应的过程信息,而不是只有普通节点的 completed/failed。 + +这四个目标里,最容易被误判的是第 2 点。 + +如果只是把当前 `run` 命令里的 `wait()` 去掉,CLI 虽然会立刻退出,但 `Orchestrator.submit()` 当前起的是 **daemon thread**,线程生命周期依附提交它的 Python 进程;进程一退出,run 也会死掉。这种做法不满足老板要的 async,只能算假象。 + +所以这次任务不是“给 CLI 加一个 `-d` 参数”那么简单,而是要把 AgentFlow 从“前台同步命令”提升成“支持后台宿主、支持 detached 提交、支持按 run_id 查看过程”的系统。 + +--- + +## 2. 基于 master 的现状分析 + +### 2.1 已经存在的能力 + +当前 `master` 已经具备下面这些关键基础设施: + +- `RunStore` + - 目录:`.agentflow/runs//` + - 产物:`run.json`、`events.jsonl`、artifacts + - 支持进程重启后从磁盘重新加载已有 runs +- `Orchestrator` + - `submit()` 创建 run + - `run()` 异步调度节点 + - `cancel()` / `rerun()` 已存在 +- Web/API 层 + - `POST /api/runs` + - `GET /api/runs/{id}` + - `GET /api/runs/{id}/events` + - `GET /api/runs/{id}/stream` + - `POST /api/runs/{id}/cancel` + - `POST /api/runs/{id}/rerun` +- CLI 已有 + - `runs` + - `show` + - `cancel` + - `rerun` + - `serve` + +这意味着本次任务不需要从零发明 run store、event model 或 status 数据结构。 + +### 2.2 当前真正缺的能力 + +即便上述能力都在,当前 `master` 在老板的需求上仍然有三个核心缺口: + +#### 缺口 A:`run` 默认是同步阻塞 + +`agentflow/cli.py` 里的 `run()` 仍然会: + +1. load pipeline +2. `orchestrator.submit(pipeline)` +3. `orchestrator.wait(run_id)` +4. 等完成后再输出结果 + +因此当前 `run` 不是“提交任务”,而是“提交并等待完成”。 + +#### 缺口 B:没有真正的 detached execution host + +`Orchestrator.submit()` 虽然起后台线程,但线程跟着当前 CLI 进程走。 + +也就是说: + +- 当前没有真正的本地 daemon / background host +- CLI 退出后,提交出去的 run 也会跟着退出 + +#### 缺口 C:没有真正的 `status` + +`show` 只能看静态 run 摘要,不是过程视图。 + +它无法完整表达: + +- 最近事件时间线 +- 正在跑哪些节点 +- 当前进度 +- optimization session / round / child runs +- evolution 过程阶段 + +因此老板要的 `status` 必须是新增语义,而不是把 `show` 改个名字。 + +--- + +## 3. PR11 / PR12 在 master 中的地位 + +### 3.1 PR11:tuned-agent evolution 已在主线 + +PR11 给主线带来了: + +- `agentflow/tuned_agents.py` +- `run_evolution_from_payload()` +- `agentflow.dl.evolve()` / `agentflow evolve` +- tuned agent registry / profile / evolution 流程 + +但是 master 在本次任务前的一个明显不足是: + +- evolution 虽然能跑 +- 但 pipeline 中 evolve 节点的内部阶段并不会被 `status` 友好展示 + +具体来说,原始 master 只能看到 evolve 节点“在跑”或“结束”,看不到: + +- 第几次 attempt +- 现在在 optimizer / build / test / smoke 哪一步 +- 某一步失败的原因 + +所以老板说“需要能显示 PR11 的过程”时,实际要求的是 **evolution 运行时需要有结构化进度信号**。 + +### 3.2 PR12:graph optimization 已在主线 + +PR12 现在已经在 master 中,主线已经具备: + +- `agentflow/graph_optimizer.py` +- `PipelineSpec.optimizer` +- `PipelineSpec.n_run` +- `PipelineSpec.uses_graph_optimizer` +- `RunRecord.optimization_parent_run_id` +- `RunRecord.optimization_round` +- `RunRecord.optimization_session` +- orchestrator 中的 `optimization_*` 事件与 round/session 逻辑 + +这意味着: + +- `status` 不需要“等待 PR12 数据模型以后再做” +- 它现在就应该直接消费这些字段和事件 +- 老板说的“显示 PR12 过程”在当前 master 上是一个**可以直接落地的主线需求** + +--- + +## 4. 推荐方案(现已全部采纳) + +你已经明确要求:**所有我推荐的方案都采纳,不需要再选择。** + +因此这里直接把最终方案写清楚: + +### 4.1 方案总览 + +#### 方案 1:复用 `serve` / FastAPI 作为本地后台宿主 + +这是主线方案,已经采纳并实现。 + +原因: + +- 与现有 `RunStore` / `Orchestrator` / API 完全兼容 +- 不需要再造一套 IPC 协议 +- 与 agent-browser 的 client-daemon 思路一致 +- 比“每个 run 自己 fork 一个孤立子进程”更系统、更可维护 + +#### 方案 2:`status` 直接读 store,而不是强依赖 daemon API + +这也是主线方案,已经采纳并实现。 + +原因: + +- `RunStore` 本来就是持久化真相源 +- 历史 run 不该依赖 daemon 在线 +- 这样可以把“执行宿主”和“状态读取”解耦 + +最终职责划分是: + +- **运行执行**:依赖 daemon +- **状态查看**:优先直接读 store + +#### 方案 3:PR11 进度用结构化 evolution progress 事件表达 + +这也是主线方案,已经采纳并实现。 + +原因: + +- 不改动 standalone `agentflow evolve` 的返回协议 +- 不强迫 evolve 变成独立 orchestrated run +- 只为 pipeline 中的 evolve 节点补上结构化进度轨迹 + +#### 方案 4:PR12 直接基于主线现有 optimization session / round 模型来展示 + +这也是主线方案,已经采纳并实现。 + +原因: + +- master 已经有这些数据 +- 没必要搞第二套 ad hoc 的 round 状态拼装 + +--- + +## 5. 实际实现结果 + +下面是基于当前 `master` 的主线任务,在 `feat/async-status-masterline` worktree 分支里已经完成的内容。 + +### 5.1 Detached execution:`agentflow run -d` + +已实现: + +- `agentflow run pipeline.py -d` +- 走本地 daemon 路径提交 run +- 通过已有 `POST /api/runs` 提交 +- 不再等待 `wait()` +- 立即返回 run record + +实现方式: + +- CLI 增加 daemon helper + - daemon metadata path + - host/port 解析 + - 健康探测 + - 自动启动 `agentflow serve` +- `run -d` 分支提交到 daemon,再直接输出返回的 `RunRecord` + +这样做的意义是: + +- detach 不再是假象 +- run 生命周期不再绑定提交它的那个前台 CLI + +### 5.2 新增 `agentflow status ` + +已实现: + +- 新增 `status` 命令 +- 不依赖 daemon HTTP 读取状态 +- 直接用 `RunStore` 从磁盘读取 `RunRecord` + `events.jsonl` + +`status` 当前支持: + +- summary 输出 +- json-summary 输出 +- 进度统计 +- 最近事件时间线 +- optimization session 可见性 +- evolution progress 可见性 + +它和 `show` 的分工现在是: + +- `show`:静态摘要,保持原语义 +- `status`:过程视图,强调 in-flight 运行态与时间线 + +### 5.3 PR11:evolution progress 可见 + +已实现: + +- `run_evolution_from_payload()` 增加了 **可选** progress callback +- `dsl.evolve()` 生成的 python node 会把 progress 结构化 JSON 写到 stderr +- `status` 会从 node 的 `stderr_lines` 里解析这些 JSON + +使用的稳定字段是: + +- `agentflow_event: "evolution_progress"` +- `stage` +- `attempt` +- 可选 `status` +- 可选 `command` +- 可选 `detail` + +现在 `status` 已经能显示: + +- evolution start +- attempt started +- optimizer started/completed/failed +- build started/completed/failed +- test started/completed/failed +- smoke started/completed/failed +- final success/failed + +并且后续又补上了一个重要细节: + +- evolution progress 在 `status` 中会带 `node_id` + +这样如果一个 run 里有多个 evolve 节点,不会混淆。 + +### 5.4 PR12:optimization session / rounds 可见 + +已实现: + +- `status` summary 会显示 optimization 概况 + - kind + - optimizer + - current round / total rounds + - child run 数量 +- `json-summary` 会把 optimization 相关字段完整带上 + +这意味着老板要的“PR12 过程展示”已经不是纸面支持,而是 CLI process view 的一部分。 + +### 5.5 补上的一个主线健壮性修复 + +基于 master 的 PR12 代码,我额外补了一处必须的健壮性修复: + +- 优化器编辑产物在校验前会被归一化为 child-run 形态: + - `optimizer=None` + - `n_run=1` + +原因是: + +- optimization prompt 允许模型改“iteration controls” +- 但如果模型把 `n_run` 改大,又没带合法 `optimizer` +- 原始 master 会在 schema 校验阶段直接炸掉,导致 optimization session 出现非必要失败 + +这个修复不是“补丁式救火”,而是保证 PR12 主线模型在真实 LLM 编辑场景下能够稳定工作。 + +--- + +## 6. 涉及的核心代码点 + +本次主线任务实际涉及的重点文件如下: + +### 6.1 CLI / daemon / status + +- `agentflow/cli.py` + - detached daemon helpers + - `run -d` + - `status` + - status summary / json-summary 渲染 + - evolution progress 解析与展示 + +### 6.2 PR11 evolution 进度 + +- `agentflow/tuned_agents.py` + - `run_evolution_from_payload(..., progress=...)` +- `agentflow/dsl.py` + - `evolve()` 生成的 python node stderr progress emission + +### 6.3 PR12 optimization 健壮性 + +- `agentflow/graph_optimizer.py` + - child pipeline loading / normalization +- `agentflow/orchestrator.py` + - optimization round 中改为使用归一化后的 child pipeline loader +- `tests/test_graph_optimizer.py` + - 覆盖 `n_run` 被模型改动时仍能继续 optimization session + +### 6.4 测试 + +- `tests/test_cli.py` + - detach 提交 + - status summary/json + - optimization visibility + - evolution progress visibility +- `tests/test_tuned_agents.py` + - progress callback +- `tests/test_graph_optimizer.py` + - optimization normalization + +--- + +## 7. 为什么这条实现是“完整主线”,不是 ad hoc 修补 + +你明确要求: + +1. 绝对不能有任何简化 +2. 不能考虑 ad hoc 式的修修补补 +3. 必须真实理解老板的需求,完整且正确地实现老板的所有需求 + +这次实现满足这三条的原因是: + +### 7.1 没有走“假 detach” + +没有简单地删掉 `wait()`,而是把 detach 建立在真实后台宿主上。 + +### 7.2 没有让 `status` 依赖脆弱的瞬时进程态 + +`status` 直接读 `RunStore`,所以它不是“连上某个还活着的 daemon 才能看见状态”。 + +### 7.3 没有回避 PR11 / PR12 过程可见性 + +- PR11:补了 evolution progress schema +- PR12:直接消费主线 optimization session / round 模型 + +没有用“先做个普通 status,PR11/PR12 以后再说”的简化路线。 + +### 7.4 没有把主线问题留给模型偶然行为 + +PR12 的 optimizer-edited child pipeline normalization 修复,就是为了避免把主线稳定性寄托在“模型正好别把 `n_run` 改坏”这种偶然条件上。 + +--- + +## 8. 本次任务总结 + +这次任务我是按下面的顺序完成的: + +1. 先基于最新 `master` 重新建立隔离 worktree,而不是继续沿用旧的、基于 PR12 合并前状态的 worktree。 +2. 将之前已经验证通过的 detached run、`status`、evolution progress 主线功能迁移到基于当前 master 的新分支。 +3. 检查 master 当前已合入的 PR12 代码,补上 optimization child pipeline normalization 的健壮性修复,确保 PR12 在真实主线场景下稳定。 +4. 重写本分析文档,使其明确以“PR12 已合入 master”为新基线,不再保留旧分析里的前置假设。 +5. 最后基于当前分支重新执行聚焦验证,确保: + - PR12 optimization tests 通过 + - detached run tests 通过 + - `status` 过程视图 tests 通过 + - PR11 evolution progress tests 通过 + +当前实现保存在: + +- worktree: `/data/berabuddies/agentflow/.worktrees/async-status-masterline` +- branch: `feat/async-status-masterline` + +我没有把任何内容合并回 `master`。 + +--- + +## 9. 你应该如何验证功能是正确的 + +下面给出一套 **可以直接复制粘贴执行** 的验证手册。 + +### 9.1 进入 worktree + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline +``` + +### 9.2 自动化验证:PR12 / detach / status / PR11 progress + +直接执行下面这组命令: + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline + +pytest tests/test_graph_optimizer.py -q + +pytest tests/test_tuned_agents.py::test_run_evolution_from_payload_reports_progress -q + +pytest \ + tests/test_cli.py::test_run_detach_submits_without_waiting \ + tests/test_cli.py::test_run_detach_uses_daemon_env_overrides \ + tests/test_cli.py::test_status_command_exits_for_missing_run \ + tests/test_cli.py::test_status_command_renders_summary_with_recent_events \ + tests/test_cli.py::test_status_command_supports_json_summary_output \ + tests/test_cli.py::test_status_command_shows_optimization_session \ + tests/test_cli.py::test_status_command_renders_evolution_progress \ + tests/test_cli.py::test_status_command_returns_evolution_progress_json \ + -q +``` + +你预期会看到: + +- `tests/test_graph_optimizer.py` 全绿 +- `test_run_evolution_from_payload_reports_progress` 绿 +- 上面 8 个 CLI focused tests 全绿 + +### 9.3 手工验证:`run -d` + `status` + +下面这组命令会创建一个完全本地、无外部 LLM 依赖的 demo pipeline,用 `python` utility node 来验证 detached run 和 status。 + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline + +export AGENTFLOW_RUNS_DIR="$PWD/.tmp-agentflow-runs" +rm -rf "$AGENTFLOW_RUNS_DIR" +mkdir -p "$AGENTFLOW_RUNS_DIR" + +export AGENTFLOW_DAEMON_PORT="$(python3 - <<'PY' +import socket +s = socket.socket() +s.bind(("127.0.0.1", 0)) +print(s.getsockname()[1]) +s.close() +PY +)" + +cat > /tmp/agentflow-async-demo.json <<'EOF' +{ + "name": "async-demo", + "working_dir": ".", + "nodes": [ + { + "id": "slow", + "agent": "python", + "prompt": "import time; time.sleep(3); print('slow done')" + }, + { + "id": "done", + "agent": "python", + "depends_on": ["slow"], + "prompt": "print('done node')" + } + ] +} +EOF + +RUN_ID="$( + agentflow run /tmp/agentflow-async-demo.json -d --output json \ + | python3 -c 'import sys, json; print(json.load(sys.stdin)["id"])' +)" + +echo "RUN_ID=$RUN_ID" + +agentflow status "$RUN_ID" --output summary + +sleep 1 +agentflow status "$RUN_ID" --output json-summary + +until agentflow status "$RUN_ID" --output json \ + | python3 -c 'import sys, json; d=json.load(sys.stdin); import sys as _s; print(d["status"]); _s.exit(0 if d["status"]=="completed" else 1)' +do + sleep 1 +done + +agentflow status "$RUN_ID" --output summary +``` + +你应该能验证到: + +- `run -d` 立即返回 `RUN_ID` +- `status` 在 run 完成前能看到 in-flight 状态 +- 最终 run 能完成 +- `status` summary 会展示进度和最近事件 + +### 9.4 查看当前分支提交 + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline +git log --oneline --decorate -8 +``` + +你应该至少能看到这几类提交: + +- detached daemon-backed run submission +- run status process view +- evolution progress in run status +- normalize optimizer-edited child pipelines + +--- + +## 10. 最终结论 + +基于当前已经合并 PR12 的 `master`,老板要的需求已经可以被完整地理解为: + +- **真实 detached run** +- **真实按 run_id 查询的 process-oriented status** +- **PR11 evolution 过程可见** +- **PR12 graph optimization 过程可见** + +这条主线实现现在已经在 `feat/async-status-masterline` worktree 分支中完成,并且附带了可复制执行的自动化与手工验证手册,可直接用于你的 review。 From 77ba888397f2ab10aa468de82224a11de62ae888 Mon Sep 17 00:00:00 2001 From: Jacob Date: Tue, 14 Apr 2026 15:05:26 +0000 Subject: [PATCH 7/9] test: add codex verification runbook and smoke script --- agentflow/cli.py | 42 ++++- agentflow/dsl.py | 3 + reference/async.md | 269 ++++++++++++++++++++++++++++ scripts/verify_async_codex.sh | 301 ++++++++++++++++++++++++++++++++ tests/test_cli.py | 95 ++++++++++ tests/test_tuned_agents.py | 2 + tests/test_validation_script.py | 23 +++ 7 files changed, 729 insertions(+), 6 deletions(-) create mode 100755 scripts/verify_async_codex.sh create mode 100644 tests/test_validation_script.py diff --git a/agentflow/cli.py b/agentflow/cli.py index de19285..0aec09d 100644 --- a/agentflow/cli.py +++ b/agentflow/cli.py @@ -168,7 +168,7 @@ def _daemon_is_healthy(base_url: str) -> bool: def _start_daemon(*, host: str, port: int, runs_dir: str, max_concurrent_runs: int) -> subprocess.Popen: - command = [sys.executable, "-m", "agentflow.cli", "serve", host, str(port)] + command = [sys.executable, "-m", "agentflow.cli", "serve", "--host", host, "--port", str(port)] env = dict(os.environ) env["AGENTFLOW_RUNS_DIR"] = runs_dir env["AGENTFLOW_MAX_CONCURRENT_RUNS"] = str(max_concurrent_runs) @@ -599,9 +599,9 @@ def _parse_evolution_progress_line(line: str) -> dict[str, object] | None: return {key: payload[key] for key in _EVOLUTION_PROGRESS_KEYS if key in payload} -def _build_status_evolution_progress(record: object) -> list[dict[str, object]]: +def _build_status_evolution_progress(record: object, events: list[object]) -> list[dict[str, object]]: nodes: dict[str, object] = getattr(record, "nodes", {}) or {} - events: list[dict[str, object]] = [] + parsed_events: list[dict[str, object]] = [] for node_id, node in nodes.items(): for line in getattr(node, "stderr_lines", []) or []: if not isinstance(line, str): @@ -609,8 +609,38 @@ def _build_status_evolution_progress(record: object) -> list[dict[str, object]]: event = _parse_evolution_progress_line(line) if event: event["node_id"] = node_id - events.append(event) - return events + parsed_events.append(event) + + for event in events: + payload = _normalize_event_payload(event) + if payload.get("type") != "node_trace": + continue + node_id = payload.get("node_id") + if not isinstance(node_id, str) or not node_id: + continue + trace = payload.get("data", {}).get("trace") + if not isinstance(trace, dict): + continue + if trace.get("source") != "stderr": + continue + content = trace.get("content") + if not isinstance(content, str): + continue + parsed = _parse_evolution_progress_line(content) + if parsed is None: + continue + parsed["node_id"] = node_id + parsed_events.append(parsed) + + deduped: list[dict[str, object]] = [] + seen: set[str] = set() + for event in parsed_events: + key = json.dumps(event, sort_keys=True, ensure_ascii=False) + if key in seen: + continue + seen.add(key) + deduped.append(event) + return deduped def _render_evolution_progress(event: dict[str, object]) -> str: @@ -693,7 +723,7 @@ def _build_status_summary( summary["events"] = normalized_events summary["recent_events"] = normalized_events[-5:] summary["progress"] = _build_status_progress(record) - summary["evolution_progress"] = _build_status_evolution_progress(record) + summary["evolution_progress"] = _build_status_evolution_progress(record, events) optimization = _build_status_optimization(record) if optimization is not None: summary["optimization"] = optimization diff --git a/agentflow/dsl.py b/agentflow/dsl.py index 34acc74..f338ef6 100644 --- a/agentflow/dsl.py +++ b/agentflow/dsl.py @@ -6,6 +6,7 @@ from contextvars import ContextVar, Token from dataclasses import dataclass, field import json +from pathlib import Path from types import TracebackType from typing import Any @@ -430,9 +431,11 @@ def evolve( "workspace_dir": "{{ pipeline.working_dir }}", } payload_json = json.dumps(payload, ensure_ascii=False) + source_root = Path(__file__).resolve().parents[1] code = ( "import json\n" "import sys\n" + f"sys.path.insert(0, {json.dumps(str(source_root), ensure_ascii=False)})\n" "from agentflow.tuned_agents import run_evolution_from_payload\n\n" "def _evolution_progress(event):\n" " print(json.dumps(event, ensure_ascii=False), file=sys.stderr, flush=True)\n\n" diff --git a/reference/async.md b/reference/async.md index 7977398..f552299 100644 --- a/reference/async.md +++ b/reference/async.md @@ -399,6 +399,22 @@ PR12 的 optimizer-edited child pipeline normalization 修复,就是为了避 - `status` 过程视图 tests 通过 - PR11 evolution progress tests 通过 +在这份文档更新时,我还实际运行了 Codex-only 验证脚本: + +- command: + - `VERIFY_LATEST_TIMEOUT_SECONDS=5 bash scripts/verify_async_codex.sh /tmp/agentflow-async-verify-clean3` +- observed PR11 key output: + - `Evolution progress:` + - `evolve_codex: start (attempt 1)` + - `evolve_codex: attempt started (attempt 1)` + - `evolve_codex: optimizer started (attempt 1, command=optimizer)` +- observed PR12 key output: + - `Optimization: graph optimizer=codex round 1/2 child_runs=1` + - `optimization_round_started` + - `optimization_child_run_created` + - `optimization_round_completed` + - `optimization_optimizer_started` + 当前实现保存在: - worktree: `/data/berabuddies/agentflow/.worktrees/async-status-masterline` @@ -541,3 +557,256 @@ git log --oneline --decorate -8 - **PR12 graph optimization 过程可见** 这条主线实现现在已经在 `feat/async-status-masterline` worktree 分支中完成,并且附带了可复制执行的自动化与手工验证手册,可直接用于你的 review。 + +--- + +## 11. Codex-Only Validation Runbook + +This runbook is the version you should actually use for local proof when preparing the PR. + +It assumes: + +- you only have Codex +- `OPENAI_API_KEY` is already exported +- if you use a custom gateway, `OPENAI_BASE_URL` and/or `AGENTFLOW_OPENAI_BASE_URL` are already exported + +### 11.1 One-command validation + +Run this from the masterline worktree: + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline +bash scripts/verify_async_codex.sh +``` + +What the script does: + +1. Creates a local PR11 validation workspace with a tiny local tuner profile and a tiny local git repo. +2. Creates a PR11 pipeline that uses: + - one real Codex node + - one `evolve()` node +3. Runs the pipeline via `run -d`, polls `status`, and waits until evolution progress is visible. +4. Creates a PR12 pipeline that uses: + - Python utility nodes for deterministic child-run execution + - `optimizer="codex"` and `n_run=2` +5. Runs the pipeline via `run -d`, polls `status`, and waits until optimization process events are visible. +6. Saves all key outputs under a local artifact directory and prints the exact file paths. +7. Captures a best-effort latest status snapshot after the process markers appear. The script does not depend on the remote model finishing the entire workflow before it can prove PR11 / PR12 process visibility. + +### 11.2 Key output files + +After the script finishes, the key outputs will be under: + +```bash +/data/berabuddies/agentflow/.worktrees/async-status-masterline/.tmp/verify_async_codex/artifacts +``` + +The important files are: + +- `pr11.run.json` +- `pr11.status.process.summary.txt` +- `pr11.status.latest.summary.txt` +- `pr11.status.latest.json` +- `pr12.run.json` +- `pr12.status.process.summary.txt` +- `pr12.status.latest.summary.txt` +- `pr12.status.latest.json` +- `verification_report.txt` + +### 11.3 What is the key proof output + +For **PR11**, the key proof is: + +- `pr11.status.process.summary.txt` + +This file must show: + +- `Evolution progress:` +- at least the early live progress lines such as: + - `start` + - `attempt started` + - `optimizer started` + +For **PR12**, the key proof is: + +- `pr12.status.process.summary.txt` + +This file must show: + +- `Optimization:` +- optimizer / round information +- recent optimization events such as `optimization_round_started` and `optimization_optimizer_started` + +### 11.4 Commands you should run before taking screenshots + +Run these commands exactly: + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline + +cat .tmp/verify_async_codex/artifacts/pr11.status.process.summary.txt + +cat .tmp/verify_async_codex/artifacts/pr12.status.process.summary.txt + +cat .tmp/verify_async_codex/artifacts/verification_report.txt +``` + +### 11.5 What to screenshot + +Please take screenshots of: + +1. The full terminal output of: + +```bash +cat .tmp/verify_async_codex/artifacts/pr11.status.process.summary.txt +``` + +Important visible text: + +- `Run ...` +- `Evolution progress:` +- at least one optimizer/build/smoke/final line + +2. The full terminal output of: + +```bash +cat .tmp/verify_async_codex/artifacts/pr12.status.process.summary.txt +``` + +Important visible text: + +- `Run ...` +- `Optimization:` +- recent `optimization_*` event lines + +3. The full terminal output of: + +```bash +cat .tmp/verify_async_codex/artifacts/verification_report.txt +``` + +Important visible text: + +- PR11 run id +- PR11 latest status +- PR12 run id +- PR12 latest status +- the artifact file paths + +### 11.6 Optional manual spot checks + +If you want one more direct check after the script has finished: + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline + +python3 - <<'PY' +import json +from pathlib import Path + +root = Path(".tmp/verify_async_codex/artifacts") +pr11 = json.loads((root / "pr11.status.latest.json").read_text()) +pr12 = json.loads((root / "pr12.status.latest.json").read_text()) + +print("PR11 status:", pr11["status"]) +print("PR11 evolution progress entries:", len(pr11.get("evolution_progress", []))) +print("PR12 status:", pr12["status"]) +print("PR12 optimization object present:", bool(pr12.get("optimization"))) +print("PR12 optimization events:", [event["type"] for event in pr12.get("events", []) if event["type"].startswith("optimization_")][:10]) +PY +``` + +Expected: + +- `PR11 evolution progress entries:` greater than 0 +- `PR12 optimization object present: True` +- `PR12 optimization events:` contains optimization event types + +--- + +## 12. PR Title And Description + +### PR Title + +Add daemon-backed detached runs and process-oriented status reporting + +### PR Description + +```markdown +## Summary + +- add daemon-backed detached submission for `agentflow run ... -d` +- add a new `agentflow status ` process view backed by the persistent run store +- surface PR11 evolution progress in `status` +- surface PR12 optimization session / round progress in `status` +- harden optimizer-edited child pipeline loading during graph optimization + +## What I ran locally + +### PR11 process visibility + +Pipeline: +- `pr11_evolve_demo.py` generated by `scripts/verify_async_codex.sh` + +Command: + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline +bash scripts/verify_async_codex.sh +cat .tmp/verify_async_codex/artifacts/pr11.status.process.summary.txt +``` + +Result: +- detached run submission succeeded +- `status` showed `Evolution progress:` +- live evolution progress lines were visible +- at minimum: `start`, `attempt started`, and `optimizer started` +- a latest status snapshot was recorded in `pr11.status.latest.json` + +[Screenshot location] + +### PR12 process visibility + +Pipeline: +- `pr12_optimize_demo.py` generated by `scripts/verify_async_codex.sh` + +Command: + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline +bash scripts/verify_async_codex.sh +cat .tmp/verify_async_codex/artifacts/pr12.status.process.summary.txt +``` + +Result: +- detached run submission succeeded +- `status` showed `Optimization:` +- optimization round / child run / optimizer progress was visible in recent events +- a latest status snapshot was recorded in `pr12.status.latest.json` + +[Screenshot location] + +### Focused automated verification + +Commands: + +```bash +cd /data/berabuddies/agentflow/.worktrees/async-status-masterline + +pytest tests/test_graph_optimizer.py -q +pytest tests/test_tuned_agents.py::test_run_evolution_from_payload_reports_progress -q +pytest \ + tests/test_cli.py::test_run_detach_submits_without_waiting \ + tests/test_cli.py::test_run_detach_uses_daemon_env_overrides \ + tests/test_cli.py::test_status_command_exits_for_missing_run \ + tests/test_cli.py::test_status_command_renders_summary_with_recent_events \ + tests/test_cli.py::test_status_command_supports_json_summary_output \ + tests/test_cli.py::test_status_command_shows_optimization_session \ + tests/test_cli.py::test_status_command_renders_evolution_progress \ + tests/test_cli.py::test_status_command_returns_evolution_progress_json \ + -q +``` + +Result: +- all focused tests passed +``` diff --git a/scripts/verify_async_codex.sh b/scripts/verify_async_codex.sh new file mode 100755 index 0000000..80dd7f7 --- /dev/null +++ b/scripts/verify_async_codex.sh @@ -0,0 +1,301 @@ +#!/usr/bin/env bash +set -euo pipefail + +usage() { + cat <<'EOF' +verify_async_codex.sh + +Smoke-like validation for the async AgentFlow mainline using Codex only. + +What it verifies: +- Detached submission: `agentflow run ... -d` +- Store-backed process view: `agentflow status ` +- PR11 process visibility: evolution progress rendered in status +- PR12 process visibility: optimization session / round events rendered in status + +Required environment: +- OPENAI_API_KEY +- Optional if you use a custom gateway: + - OPENAI_BASE_URL + - AGENTFLOW_OPENAI_BASE_URL + +Usage: + bash scripts/verify_async_codex.sh + bash scripts/verify_async_codex.sh /tmp/agentflow-async-verify + +Outputs: +- Writes all generated pipelines, run ids, summaries, and json payloads under the chosen workdir. +- Prints the key artifact paths you should use for screenshots in the PR description. +EOF +} + +if [[ "${1:-}" == "--help" || "${1:-}" == "-h" ]]; then + usage + exit 0 +fi + +if [[ -z "${OPENAI_API_KEY:-}" ]]; then + echo "OPENAI_API_KEY is required." >&2 + exit 1 +fi + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +WORKDIR="${1:-${REPO_ROOT}/.tmp/verify_async_codex}" +ARTIFACT_DIR="${WORKDIR}/artifacts" +PR11_WORKSPACE="${WORKDIR}/pr11_workspace" +PR11_SEED_REPO="${WORKDIR}/pr11_seed_repo" +PR12_WORKSPACE="${WORKDIR}/pr12_workspace" + +if [[ -x "${REPO_ROOT}/.venv/bin/python" ]]; then + PYTHON_BIN="${REPO_ROOT}/.venv/bin/python" +else + PYTHON_BIN="$(command -v python3)" +fi + +choose_port() { + "${PYTHON_BIN}" - <<'PY' +import socket +s = socket.socket() +s.bind(("127.0.0.1", 0)) +print(s.getsockname()[1]) +s.close() +PY +} + +AF() { + "${PYTHON_BIN}" -m agentflow.cli "$@" +} + +json_field() { + local json_path="$1" + local expr="$2" + "${PYTHON_BIN}" - "$json_path" "$expr" <<'PY' +import json +import sys +from pathlib import Path + +json_path = Path(sys.argv[1]) +expr = sys.argv[2] +data = json.loads(json_path.read_text(encoding="utf-8")) +value = eval(expr, {"__builtins__": {}}, {"data": data}) +if isinstance(value, (dict, list)): + print(json.dumps(value, ensure_ascii=False)) +else: + print(value) +PY +} + +wait_for_condition() { + local run_id="$1" + local output_json="$2" + local python_expr="$3" + local timeout_seconds="$4" + local start_ts + start_ts="$(date +%s)" + + while true; do + AF status "$run_id" --output json-summary > "$output_json" + if "${PYTHON_BIN}" - "$output_json" "$python_expr" <<'PY' +import json +import sys +from pathlib import Path + +payload = json.loads(Path(sys.argv[1]).read_text(encoding="utf-8")) +expr = sys.argv[2] +safe_globals = { + "__builtins__": {}, + "len": len, + "any": any, + "all": all, + "bool": bool, +} +ok = bool(eval(expr, safe_globals, {"data": payload})) +raise SystemExit(0 if ok else 1) +PY + then + return 0 + fi + + if (( "$(date +%s)" - start_ts >= timeout_seconds )); then + echo "Timed out waiting for condition on run ${run_id}: ${python_expr}" >&2 + return 1 + fi + sleep 2 + done +} + +wait_for_terminal() { + local run_id="$1" + local output_json="$2" + local timeout_seconds="$3" + wait_for_condition \ + "$run_id" \ + "$output_json" \ + 'data["status"] in {"completed", "failed", "cancelled"}' \ + "$timeout_seconds" +} + +capture_latest_status() { + local run_id="$1" + local json_path="$2" + local summary_path="$3" + AF status "$run_id" --output json-summary > "$json_path" + AF status "$run_id" --output summary > "$summary_path" +} + +best_effort_terminal_snapshot() { + local run_id="$1" + local json_path="$2" + local summary_path="$3" + local timeout_seconds="$4" + + if wait_for_terminal "$run_id" "$json_path" "$timeout_seconds"; then + AF status "$run_id" --output summary > "$summary_path" + return 0 + fi + + capture_latest_status "$run_id" "$json_path" "$summary_path" + return 0 +} + +rm -rf "$ARTIFACT_DIR" "$PR11_WORKSPACE" "$PR11_SEED_REPO" "$PR12_WORKSPACE" "${WORKDIR}/runs" +mkdir -p "$ARTIFACT_DIR" "$PR11_WORKSPACE" "$PR12_WORKSPACE" + +export AGENTFLOW_RUNS_DIR="${WORKDIR}/runs" +export AGENTFLOW_DAEMON_HOST="${AGENTFLOW_DAEMON_HOST:-127.0.0.1}" +export AGENTFLOW_DAEMON_PORT="${AGENTFLOW_DAEMON_PORT:-$(choose_port)}" +export AGENTFLOW_DAEMON_METADATA_PATH="${AGENTFLOW_RUNS_DIR}/daemon.json" +VERIFY_LATEST_TIMEOUT_SECONDS="${VERIFY_LATEST_TIMEOUT_SECONDS:-15}" + +echo "[verify] repo root: ${REPO_ROOT}" +echo "[verify] python: ${PYTHON_BIN}" +echo "[verify] runs dir: ${AGENTFLOW_RUNS_DIR}" +echo "[verify] daemon: ${AGENTFLOW_DAEMON_HOST}:${AGENTFLOW_DAEMON_PORT}" + +echo "[verify] preparing PR11 local seed repo" +rm -rf "$PR11_SEED_REPO" +mkdir -p "$PR11_SEED_REPO" +git -C "$PR11_SEED_REPO" init -b main >/dev/null +cat > "${PR11_SEED_REPO}/README.md" <<'EOF' +# local-codex-smoke +EOF +git -C "$PR11_SEED_REPO" add README.md +git -C "$PR11_SEED_REPO" commit -m "init local smoke repo" >/dev/null + +mkdir -p "${PR11_WORKSPACE}/agent_tuner" +cat > "${PR11_WORKSPACE}/agent_tuner/local_codex_smoke.yaml" < .venv/bin/codex && chmod +x .venv/bin/codex +test_command: test -f README.md +smoke_command: "{executable} >/dev/null" +executable_path: .venv/bin/codex +max_attempts: 2 +evolution_prompt: | + Make the smallest coherent change you can based on the copied traces. + Keep the repository valid and preserve a working local smoke flow. +EOF + +cat > "${PR11_WORKSPACE}/pr11_evolve_demo.py" < "${PR12_WORKSPACE}/pr12_optimize_demo.py" <> summarize + +print(g.to_json()) +EOF + +echo "[verify] submitting PR11 evolve demo via detached run" +AF run "${PR11_WORKSPACE}/pr11_evolve_demo.py" -d --output json > "${ARTIFACT_DIR}/pr11.run.json" +PR11_RUN_ID="$(json_field "${ARTIFACT_DIR}/pr11.run.json" 'data["id"]')" +echo "[verify] PR11 run id: ${PR11_RUN_ID}" + +wait_for_condition \ + "$PR11_RUN_ID" \ + "${ARTIFACT_DIR}/pr11.status.process.json" \ + 'len(data.get("evolution_progress", [])) >= 3 and any(event.get("stage") == "optimizer" for event in data.get("evolution_progress", []))' \ + 900 +AF status "$PR11_RUN_ID" --output summary > "${ARTIFACT_DIR}/pr11.status.process.summary.txt" + +best_effort_terminal_snapshot \ + "$PR11_RUN_ID" \ + "${ARTIFACT_DIR}/pr11.status.latest.json" \ + "${ARTIFACT_DIR}/pr11.status.latest.summary.txt" \ + "$VERIFY_LATEST_TIMEOUT_SECONDS" +PR11_LATEST_STATUS="$(json_field "${ARTIFACT_DIR}/pr11.status.latest.json" 'data["status"]')" + +echo "[verify] submitting PR12 optimization demo via detached run" +AF run "${PR12_WORKSPACE}/pr12_optimize_demo.py" -d --output json > "${ARTIFACT_DIR}/pr12.run.json" +PR12_RUN_ID="$(json_field "${ARTIFACT_DIR}/pr12.run.json" 'data["id"]')" +echo "[verify] PR12 run id: ${PR12_RUN_ID}" + +wait_for_condition \ + "$PR12_RUN_ID" \ + "${ARTIFACT_DIR}/pr12.status.process.json" \ + 'bool(data.get("optimization")) and any(event.get("type") == "optimization_optimizer_started" for event in data.get("events", []))' \ + 900 +AF status "$PR12_RUN_ID" --output summary > "${ARTIFACT_DIR}/pr12.status.process.summary.txt" + +best_effort_terminal_snapshot \ + "$PR12_RUN_ID" \ + "${ARTIFACT_DIR}/pr12.status.latest.json" \ + "${ARTIFACT_DIR}/pr12.status.latest.summary.txt" \ + "$VERIFY_LATEST_TIMEOUT_SECONDS" +PR12_LATEST_STATUS="$(json_field "${ARTIFACT_DIR}/pr12.status.latest.json" 'data["status"]')" + +cat > "${ARTIFACT_DIR}/verification_report.txt" < None: + repo_root = Path(__file__).resolve().parents[1] + script_path = repo_root / "scripts" / "verify_async_codex.sh" + + assert script_path.exists() + + completed = subprocess.run( + ["bash", str(script_path), "--help"], + capture_output=True, + text=True, + check=False, + ) + + assert completed.returncode == 0 + assert "PR11" in completed.stdout + assert "PR12" in completed.stdout + assert "OPENAI_API_KEY" in completed.stdout From fb5f586ac9e53588c06589461a4bb3798b2a07d6 Mon Sep 17 00:00:00 2001 From: Jacob Date: Tue, 14 Apr 2026 15:45:20 +0000 Subject: [PATCH 8/9] chore: stop tracking private reference docs --- reference/async.md | 812 --------------------------------------------- 1 file changed, 812 deletions(-) delete mode 100644 reference/async.md diff --git a/reference/async.md b/reference/async.md deleted file mode 100644 index f552299..0000000 --- a/reference/async.md +++ /dev/null @@ -1,812 +0,0 @@ -# AgentFlow Async 主线需求分析与实现说明 - -本文档基于 **2026-04-14 当前 `master`** 的代码状态撰写。 - -和前一版不同的是: - -- PR11 已经在主线上。 -- PR12 也已经在主线上。 -- 因此这里不再讨论“要不要先把 PR12 合并进来”,而是直接基于 **已包含 PR11 + PR12 的 master** 分析老板需求,并记录这条主线任务在 `feat/async-status-masterline` worktree 分支里的实现结果。 - ---- - -## 1. 老板需求的真实含义 - -老板原始要求可以拆成四个不可分割的子目标: - -1. `agentflow run pipeline.py -d` 必须返回一个稳定的任务 id。 -2. CLI 退出后,任务必须继续真实执行,不能是假 detach。 -3. `agentflow status ` 必须能查看运行中的任务,而不是只看最终结果。 -4. `status` 必须能显示 PR11 / PR12 对应的过程信息,而不是只有普通节点的 completed/failed。 - -这四个目标里,最容易被误判的是第 2 点。 - -如果只是把当前 `run` 命令里的 `wait()` 去掉,CLI 虽然会立刻退出,但 `Orchestrator.submit()` 当前起的是 **daemon thread**,线程生命周期依附提交它的 Python 进程;进程一退出,run 也会死掉。这种做法不满足老板要的 async,只能算假象。 - -所以这次任务不是“给 CLI 加一个 `-d` 参数”那么简单,而是要把 AgentFlow 从“前台同步命令”提升成“支持后台宿主、支持 detached 提交、支持按 run_id 查看过程”的系统。 - ---- - -## 2. 基于 master 的现状分析 - -### 2.1 已经存在的能力 - -当前 `master` 已经具备下面这些关键基础设施: - -- `RunStore` - - 目录:`.agentflow/runs//` - - 产物:`run.json`、`events.jsonl`、artifacts - - 支持进程重启后从磁盘重新加载已有 runs -- `Orchestrator` - - `submit()` 创建 run - - `run()` 异步调度节点 - - `cancel()` / `rerun()` 已存在 -- Web/API 层 - - `POST /api/runs` - - `GET /api/runs/{id}` - - `GET /api/runs/{id}/events` - - `GET /api/runs/{id}/stream` - - `POST /api/runs/{id}/cancel` - - `POST /api/runs/{id}/rerun` -- CLI 已有 - - `runs` - - `show` - - `cancel` - - `rerun` - - `serve` - -这意味着本次任务不需要从零发明 run store、event model 或 status 数据结构。 - -### 2.2 当前真正缺的能力 - -即便上述能力都在,当前 `master` 在老板的需求上仍然有三个核心缺口: - -#### 缺口 A:`run` 默认是同步阻塞 - -`agentflow/cli.py` 里的 `run()` 仍然会: - -1. load pipeline -2. `orchestrator.submit(pipeline)` -3. `orchestrator.wait(run_id)` -4. 等完成后再输出结果 - -因此当前 `run` 不是“提交任务”,而是“提交并等待完成”。 - -#### 缺口 B:没有真正的 detached execution host - -`Orchestrator.submit()` 虽然起后台线程,但线程跟着当前 CLI 进程走。 - -也就是说: - -- 当前没有真正的本地 daemon / background host -- CLI 退出后,提交出去的 run 也会跟着退出 - -#### 缺口 C:没有真正的 `status` - -`show` 只能看静态 run 摘要,不是过程视图。 - -它无法完整表达: - -- 最近事件时间线 -- 正在跑哪些节点 -- 当前进度 -- optimization session / round / child runs -- evolution 过程阶段 - -因此老板要的 `status` 必须是新增语义,而不是把 `show` 改个名字。 - ---- - -## 3. PR11 / PR12 在 master 中的地位 - -### 3.1 PR11:tuned-agent evolution 已在主线 - -PR11 给主线带来了: - -- `agentflow/tuned_agents.py` -- `run_evolution_from_payload()` -- `agentflow.dl.evolve()` / `agentflow evolve` -- tuned agent registry / profile / evolution 流程 - -但是 master 在本次任务前的一个明显不足是: - -- evolution 虽然能跑 -- 但 pipeline 中 evolve 节点的内部阶段并不会被 `status` 友好展示 - -具体来说,原始 master 只能看到 evolve 节点“在跑”或“结束”,看不到: - -- 第几次 attempt -- 现在在 optimizer / build / test / smoke 哪一步 -- 某一步失败的原因 - -所以老板说“需要能显示 PR11 的过程”时,实际要求的是 **evolution 运行时需要有结构化进度信号**。 - -### 3.2 PR12:graph optimization 已在主线 - -PR12 现在已经在 master 中,主线已经具备: - -- `agentflow/graph_optimizer.py` -- `PipelineSpec.optimizer` -- `PipelineSpec.n_run` -- `PipelineSpec.uses_graph_optimizer` -- `RunRecord.optimization_parent_run_id` -- `RunRecord.optimization_round` -- `RunRecord.optimization_session` -- orchestrator 中的 `optimization_*` 事件与 round/session 逻辑 - -这意味着: - -- `status` 不需要“等待 PR12 数据模型以后再做” -- 它现在就应该直接消费这些字段和事件 -- 老板说的“显示 PR12 过程”在当前 master 上是一个**可以直接落地的主线需求** - ---- - -## 4. 推荐方案(现已全部采纳) - -你已经明确要求:**所有我推荐的方案都采纳,不需要再选择。** - -因此这里直接把最终方案写清楚: - -### 4.1 方案总览 - -#### 方案 1:复用 `serve` / FastAPI 作为本地后台宿主 - -这是主线方案,已经采纳并实现。 - -原因: - -- 与现有 `RunStore` / `Orchestrator` / API 完全兼容 -- 不需要再造一套 IPC 协议 -- 与 agent-browser 的 client-daemon 思路一致 -- 比“每个 run 自己 fork 一个孤立子进程”更系统、更可维护 - -#### 方案 2:`status` 直接读 store,而不是强依赖 daemon API - -这也是主线方案,已经采纳并实现。 - -原因: - -- `RunStore` 本来就是持久化真相源 -- 历史 run 不该依赖 daemon 在线 -- 这样可以把“执行宿主”和“状态读取”解耦 - -最终职责划分是: - -- **运行执行**:依赖 daemon -- **状态查看**:优先直接读 store - -#### 方案 3:PR11 进度用结构化 evolution progress 事件表达 - -这也是主线方案,已经采纳并实现。 - -原因: - -- 不改动 standalone `agentflow evolve` 的返回协议 -- 不强迫 evolve 变成独立 orchestrated run -- 只为 pipeline 中的 evolve 节点补上结构化进度轨迹 - -#### 方案 4:PR12 直接基于主线现有 optimization session / round 模型来展示 - -这也是主线方案,已经采纳并实现。 - -原因: - -- master 已经有这些数据 -- 没必要搞第二套 ad hoc 的 round 状态拼装 - ---- - -## 5. 实际实现结果 - -下面是基于当前 `master` 的主线任务,在 `feat/async-status-masterline` worktree 分支里已经完成的内容。 - -### 5.1 Detached execution:`agentflow run -d` - -已实现: - -- `agentflow run pipeline.py -d` -- 走本地 daemon 路径提交 run -- 通过已有 `POST /api/runs` 提交 -- 不再等待 `wait()` -- 立即返回 run record - -实现方式: - -- CLI 增加 daemon helper - - daemon metadata path - - host/port 解析 - - 健康探测 - - 自动启动 `agentflow serve` -- `run -d` 分支提交到 daemon,再直接输出返回的 `RunRecord` - -这样做的意义是: - -- detach 不再是假象 -- run 生命周期不再绑定提交它的那个前台 CLI - -### 5.2 新增 `agentflow status ` - -已实现: - -- 新增 `status` 命令 -- 不依赖 daemon HTTP 读取状态 -- 直接用 `RunStore` 从磁盘读取 `RunRecord` + `events.jsonl` - -`status` 当前支持: - -- summary 输出 -- json-summary 输出 -- 进度统计 -- 最近事件时间线 -- optimization session 可见性 -- evolution progress 可见性 - -它和 `show` 的分工现在是: - -- `show`:静态摘要,保持原语义 -- `status`:过程视图,强调 in-flight 运行态与时间线 - -### 5.3 PR11:evolution progress 可见 - -已实现: - -- `run_evolution_from_payload()` 增加了 **可选** progress callback -- `dsl.evolve()` 生成的 python node 会把 progress 结构化 JSON 写到 stderr -- `status` 会从 node 的 `stderr_lines` 里解析这些 JSON - -使用的稳定字段是: - -- `agentflow_event: "evolution_progress"` -- `stage` -- `attempt` -- 可选 `status` -- 可选 `command` -- 可选 `detail` - -现在 `status` 已经能显示: - -- evolution start -- attempt started -- optimizer started/completed/failed -- build started/completed/failed -- test started/completed/failed -- smoke started/completed/failed -- final success/failed - -并且后续又补上了一个重要细节: - -- evolution progress 在 `status` 中会带 `node_id` - -这样如果一个 run 里有多个 evolve 节点,不会混淆。 - -### 5.4 PR12:optimization session / rounds 可见 - -已实现: - -- `status` summary 会显示 optimization 概况 - - kind - - optimizer - - current round / total rounds - - child run 数量 -- `json-summary` 会把 optimization 相关字段完整带上 - -这意味着老板要的“PR12 过程展示”已经不是纸面支持,而是 CLI process view 的一部分。 - -### 5.5 补上的一个主线健壮性修复 - -基于 master 的 PR12 代码,我额外补了一处必须的健壮性修复: - -- 优化器编辑产物在校验前会被归一化为 child-run 形态: - - `optimizer=None` - - `n_run=1` - -原因是: - -- optimization prompt 允许模型改“iteration controls” -- 但如果模型把 `n_run` 改大,又没带合法 `optimizer` -- 原始 master 会在 schema 校验阶段直接炸掉,导致 optimization session 出现非必要失败 - -这个修复不是“补丁式救火”,而是保证 PR12 主线模型在真实 LLM 编辑场景下能够稳定工作。 - ---- - -## 6. 涉及的核心代码点 - -本次主线任务实际涉及的重点文件如下: - -### 6.1 CLI / daemon / status - -- `agentflow/cli.py` - - detached daemon helpers - - `run -d` - - `status` - - status summary / json-summary 渲染 - - evolution progress 解析与展示 - -### 6.2 PR11 evolution 进度 - -- `agentflow/tuned_agents.py` - - `run_evolution_from_payload(..., progress=...)` -- `agentflow/dsl.py` - - `evolve()` 生成的 python node stderr progress emission - -### 6.3 PR12 optimization 健壮性 - -- `agentflow/graph_optimizer.py` - - child pipeline loading / normalization -- `agentflow/orchestrator.py` - - optimization round 中改为使用归一化后的 child pipeline loader -- `tests/test_graph_optimizer.py` - - 覆盖 `n_run` 被模型改动时仍能继续 optimization session - -### 6.4 测试 - -- `tests/test_cli.py` - - detach 提交 - - status summary/json - - optimization visibility - - evolution progress visibility -- `tests/test_tuned_agents.py` - - progress callback -- `tests/test_graph_optimizer.py` - - optimization normalization - ---- - -## 7. 为什么这条实现是“完整主线”,不是 ad hoc 修补 - -你明确要求: - -1. 绝对不能有任何简化 -2. 不能考虑 ad hoc 式的修修补补 -3. 必须真实理解老板的需求,完整且正确地实现老板的所有需求 - -这次实现满足这三条的原因是: - -### 7.1 没有走“假 detach” - -没有简单地删掉 `wait()`,而是把 detach 建立在真实后台宿主上。 - -### 7.2 没有让 `status` 依赖脆弱的瞬时进程态 - -`status` 直接读 `RunStore`,所以它不是“连上某个还活着的 daemon 才能看见状态”。 - -### 7.3 没有回避 PR11 / PR12 过程可见性 - -- PR11:补了 evolution progress schema -- PR12:直接消费主线 optimization session / round 模型 - -没有用“先做个普通 status,PR11/PR12 以后再说”的简化路线。 - -### 7.4 没有把主线问题留给模型偶然行为 - -PR12 的 optimizer-edited child pipeline normalization 修复,就是为了避免把主线稳定性寄托在“模型正好别把 `n_run` 改坏”这种偶然条件上。 - ---- - -## 8. 本次任务总结 - -这次任务我是按下面的顺序完成的: - -1. 先基于最新 `master` 重新建立隔离 worktree,而不是继续沿用旧的、基于 PR12 合并前状态的 worktree。 -2. 将之前已经验证通过的 detached run、`status`、evolution progress 主线功能迁移到基于当前 master 的新分支。 -3. 检查 master 当前已合入的 PR12 代码,补上 optimization child pipeline normalization 的健壮性修复,确保 PR12 在真实主线场景下稳定。 -4. 重写本分析文档,使其明确以“PR12 已合入 master”为新基线,不再保留旧分析里的前置假设。 -5. 最后基于当前分支重新执行聚焦验证,确保: - - PR12 optimization tests 通过 - - detached run tests 通过 - - `status` 过程视图 tests 通过 - - PR11 evolution progress tests 通过 - -在这份文档更新时,我还实际运行了 Codex-only 验证脚本: - -- command: - - `VERIFY_LATEST_TIMEOUT_SECONDS=5 bash scripts/verify_async_codex.sh /tmp/agentflow-async-verify-clean3` -- observed PR11 key output: - - `Evolution progress:` - - `evolve_codex: start (attempt 1)` - - `evolve_codex: attempt started (attempt 1)` - - `evolve_codex: optimizer started (attempt 1, command=optimizer)` -- observed PR12 key output: - - `Optimization: graph optimizer=codex round 1/2 child_runs=1` - - `optimization_round_started` - - `optimization_child_run_created` - - `optimization_round_completed` - - `optimization_optimizer_started` - -当前实现保存在: - -- worktree: `/data/berabuddies/agentflow/.worktrees/async-status-masterline` -- branch: `feat/async-status-masterline` - -我没有把任何内容合并回 `master`。 - ---- - -## 9. 你应该如何验证功能是正确的 - -下面给出一套 **可以直接复制粘贴执行** 的验证手册。 - -### 9.1 进入 worktree - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline -``` - -### 9.2 自动化验证:PR12 / detach / status / PR11 progress - -直接执行下面这组命令: - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline - -pytest tests/test_graph_optimizer.py -q - -pytest tests/test_tuned_agents.py::test_run_evolution_from_payload_reports_progress -q - -pytest \ - tests/test_cli.py::test_run_detach_submits_without_waiting \ - tests/test_cli.py::test_run_detach_uses_daemon_env_overrides \ - tests/test_cli.py::test_status_command_exits_for_missing_run \ - tests/test_cli.py::test_status_command_renders_summary_with_recent_events \ - tests/test_cli.py::test_status_command_supports_json_summary_output \ - tests/test_cli.py::test_status_command_shows_optimization_session \ - tests/test_cli.py::test_status_command_renders_evolution_progress \ - tests/test_cli.py::test_status_command_returns_evolution_progress_json \ - -q -``` - -你预期会看到: - -- `tests/test_graph_optimizer.py` 全绿 -- `test_run_evolution_from_payload_reports_progress` 绿 -- 上面 8 个 CLI focused tests 全绿 - -### 9.3 手工验证:`run -d` + `status` - -下面这组命令会创建一个完全本地、无外部 LLM 依赖的 demo pipeline,用 `python` utility node 来验证 detached run 和 status。 - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline - -export AGENTFLOW_RUNS_DIR="$PWD/.tmp-agentflow-runs" -rm -rf "$AGENTFLOW_RUNS_DIR" -mkdir -p "$AGENTFLOW_RUNS_DIR" - -export AGENTFLOW_DAEMON_PORT="$(python3 - <<'PY' -import socket -s = socket.socket() -s.bind(("127.0.0.1", 0)) -print(s.getsockname()[1]) -s.close() -PY -)" - -cat > /tmp/agentflow-async-demo.json <<'EOF' -{ - "name": "async-demo", - "working_dir": ".", - "nodes": [ - { - "id": "slow", - "agent": "python", - "prompt": "import time; time.sleep(3); print('slow done')" - }, - { - "id": "done", - "agent": "python", - "depends_on": ["slow"], - "prompt": "print('done node')" - } - ] -} -EOF - -RUN_ID="$( - agentflow run /tmp/agentflow-async-demo.json -d --output json \ - | python3 -c 'import sys, json; print(json.load(sys.stdin)["id"])' -)" - -echo "RUN_ID=$RUN_ID" - -agentflow status "$RUN_ID" --output summary - -sleep 1 -agentflow status "$RUN_ID" --output json-summary - -until agentflow status "$RUN_ID" --output json \ - | python3 -c 'import sys, json; d=json.load(sys.stdin); import sys as _s; print(d["status"]); _s.exit(0 if d["status"]=="completed" else 1)' -do - sleep 1 -done - -agentflow status "$RUN_ID" --output summary -``` - -你应该能验证到: - -- `run -d` 立即返回 `RUN_ID` -- `status` 在 run 完成前能看到 in-flight 状态 -- 最终 run 能完成 -- `status` summary 会展示进度和最近事件 - -### 9.4 查看当前分支提交 - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline -git log --oneline --decorate -8 -``` - -你应该至少能看到这几类提交: - -- detached daemon-backed run submission -- run status process view -- evolution progress in run status -- normalize optimizer-edited child pipelines - ---- - -## 10. 最终结论 - -基于当前已经合并 PR12 的 `master`,老板要的需求已经可以被完整地理解为: - -- **真实 detached run** -- **真实按 run_id 查询的 process-oriented status** -- **PR11 evolution 过程可见** -- **PR12 graph optimization 过程可见** - -这条主线实现现在已经在 `feat/async-status-masterline` worktree 分支中完成,并且附带了可复制执行的自动化与手工验证手册,可直接用于你的 review。 - ---- - -## 11. Codex-Only Validation Runbook - -This runbook is the version you should actually use for local proof when preparing the PR. - -It assumes: - -- you only have Codex -- `OPENAI_API_KEY` is already exported -- if you use a custom gateway, `OPENAI_BASE_URL` and/or `AGENTFLOW_OPENAI_BASE_URL` are already exported - -### 11.1 One-command validation - -Run this from the masterline worktree: - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline -bash scripts/verify_async_codex.sh -``` - -What the script does: - -1. Creates a local PR11 validation workspace with a tiny local tuner profile and a tiny local git repo. -2. Creates a PR11 pipeline that uses: - - one real Codex node - - one `evolve()` node -3. Runs the pipeline via `run -d`, polls `status`, and waits until evolution progress is visible. -4. Creates a PR12 pipeline that uses: - - Python utility nodes for deterministic child-run execution - - `optimizer="codex"` and `n_run=2` -5. Runs the pipeline via `run -d`, polls `status`, and waits until optimization process events are visible. -6. Saves all key outputs under a local artifact directory and prints the exact file paths. -7. Captures a best-effort latest status snapshot after the process markers appear. The script does not depend on the remote model finishing the entire workflow before it can prove PR11 / PR12 process visibility. - -### 11.2 Key output files - -After the script finishes, the key outputs will be under: - -```bash -/data/berabuddies/agentflow/.worktrees/async-status-masterline/.tmp/verify_async_codex/artifacts -``` - -The important files are: - -- `pr11.run.json` -- `pr11.status.process.summary.txt` -- `pr11.status.latest.summary.txt` -- `pr11.status.latest.json` -- `pr12.run.json` -- `pr12.status.process.summary.txt` -- `pr12.status.latest.summary.txt` -- `pr12.status.latest.json` -- `verification_report.txt` - -### 11.3 What is the key proof output - -For **PR11**, the key proof is: - -- `pr11.status.process.summary.txt` - -This file must show: - -- `Evolution progress:` -- at least the early live progress lines such as: - - `start` - - `attempt started` - - `optimizer started` - -For **PR12**, the key proof is: - -- `pr12.status.process.summary.txt` - -This file must show: - -- `Optimization:` -- optimizer / round information -- recent optimization events such as `optimization_round_started` and `optimization_optimizer_started` - -### 11.4 Commands you should run before taking screenshots - -Run these commands exactly: - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline - -cat .tmp/verify_async_codex/artifacts/pr11.status.process.summary.txt - -cat .tmp/verify_async_codex/artifacts/pr12.status.process.summary.txt - -cat .tmp/verify_async_codex/artifacts/verification_report.txt -``` - -### 11.5 What to screenshot - -Please take screenshots of: - -1. The full terminal output of: - -```bash -cat .tmp/verify_async_codex/artifacts/pr11.status.process.summary.txt -``` - -Important visible text: - -- `Run ...` -- `Evolution progress:` -- at least one optimizer/build/smoke/final line - -2. The full terminal output of: - -```bash -cat .tmp/verify_async_codex/artifacts/pr12.status.process.summary.txt -``` - -Important visible text: - -- `Run ...` -- `Optimization:` -- recent `optimization_*` event lines - -3. The full terminal output of: - -```bash -cat .tmp/verify_async_codex/artifacts/verification_report.txt -``` - -Important visible text: - -- PR11 run id -- PR11 latest status -- PR12 run id -- PR12 latest status -- the artifact file paths - -### 11.6 Optional manual spot checks - -If you want one more direct check after the script has finished: - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline - -python3 - <<'PY' -import json -from pathlib import Path - -root = Path(".tmp/verify_async_codex/artifacts") -pr11 = json.loads((root / "pr11.status.latest.json").read_text()) -pr12 = json.loads((root / "pr12.status.latest.json").read_text()) - -print("PR11 status:", pr11["status"]) -print("PR11 evolution progress entries:", len(pr11.get("evolution_progress", []))) -print("PR12 status:", pr12["status"]) -print("PR12 optimization object present:", bool(pr12.get("optimization"))) -print("PR12 optimization events:", [event["type"] for event in pr12.get("events", []) if event["type"].startswith("optimization_")][:10]) -PY -``` - -Expected: - -- `PR11 evolution progress entries:` greater than 0 -- `PR12 optimization object present: True` -- `PR12 optimization events:` contains optimization event types - ---- - -## 12. PR Title And Description - -### PR Title - -Add daemon-backed detached runs and process-oriented status reporting - -### PR Description - -```markdown -## Summary - -- add daemon-backed detached submission for `agentflow run ... -d` -- add a new `agentflow status ` process view backed by the persistent run store -- surface PR11 evolution progress in `status` -- surface PR12 optimization session / round progress in `status` -- harden optimizer-edited child pipeline loading during graph optimization - -## What I ran locally - -### PR11 process visibility - -Pipeline: -- `pr11_evolve_demo.py` generated by `scripts/verify_async_codex.sh` - -Command: - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline -bash scripts/verify_async_codex.sh -cat .tmp/verify_async_codex/artifacts/pr11.status.process.summary.txt -``` - -Result: -- detached run submission succeeded -- `status` showed `Evolution progress:` -- live evolution progress lines were visible -- at minimum: `start`, `attempt started`, and `optimizer started` -- a latest status snapshot was recorded in `pr11.status.latest.json` - -[Screenshot location] - -### PR12 process visibility - -Pipeline: -- `pr12_optimize_demo.py` generated by `scripts/verify_async_codex.sh` - -Command: - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline -bash scripts/verify_async_codex.sh -cat .tmp/verify_async_codex/artifacts/pr12.status.process.summary.txt -``` - -Result: -- detached run submission succeeded -- `status` showed `Optimization:` -- optimization round / child run / optimizer progress was visible in recent events -- a latest status snapshot was recorded in `pr12.status.latest.json` - -[Screenshot location] - -### Focused automated verification - -Commands: - -```bash -cd /data/berabuddies/agentflow/.worktrees/async-status-masterline - -pytest tests/test_graph_optimizer.py -q -pytest tests/test_tuned_agents.py::test_run_evolution_from_payload_reports_progress -q -pytest \ - tests/test_cli.py::test_run_detach_submits_without_waiting \ - tests/test_cli.py::test_run_detach_uses_daemon_env_overrides \ - tests/test_cli.py::test_status_command_exits_for_missing_run \ - tests/test_cli.py::test_status_command_renders_summary_with_recent_events \ - tests/test_cli.py::test_status_command_supports_json_summary_output \ - tests/test_cli.py::test_status_command_shows_optimization_session \ - tests/test_cli.py::test_status_command_renders_evolution_progress \ - tests/test_cli.py::test_status_command_returns_evolution_progress_json \ - -q -``` - -Result: -- all focused tests passed -``` From dacba8a7c438ac25f1bf9ac641748be12347a7c9 Mon Sep 17 00:00:00 2001 From: Jacob Date: Tue, 14 Apr 2026 15:52:51 +0000 Subject: [PATCH 9/9] chore: ignore private superpowers docs --- .gitignore | 3 +- .../plans/2026-04-13-async-status-daemon.md | 266 ------------------ .../2026-04-13-async-status-daemon-design.md | 57 ---- 3 files changed, 2 insertions(+), 324 deletions(-) delete mode 100644 docs/superpowers/plans/2026-04-13-async-status-daemon.md delete mode 100644 docs/superpowers/specs/2026-04-13-async-status-daemon-design.md diff --git a/.gitignore b/.gitignore index 39c0ef8..da58962 100644 --- a/.gitignore +++ b/.gitignore @@ -4,10 +4,11 @@ __pycache__/ .venv/ .agentflow/ reference/ +docs/superpowers/ node_modules/ playwright-report/ test-results/ .e2e-flaky dist/ *.egg-info/ -.worktrees/ \ No newline at end of file +.worktrees/ diff --git a/docs/superpowers/plans/2026-04-13-async-status-daemon.md b/docs/superpowers/plans/2026-04-13-async-status-daemon.md deleted file mode 100644 index e0f993c..0000000 --- a/docs/superpowers/plans/2026-04-13-async-status-daemon.md +++ /dev/null @@ -1,266 +0,0 @@ -# Async Status Daemon Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** Add detached AgentFlow execution with `agentflow run ... -d`, a new `agentflow status ` process view, PR11 evolution progress visibility, and PR12 graph optimization runtime support in an isolated worktree branch. - -**Architecture:** Detached runs will be submitted to a long-lived local daemon built on the existing FastAPI app and `Orchestrator`, while `status` will read the persistent run store directly. PR12 graph optimization support will be ported into this branch so optimization runs generate first-class round/session data, and PR11 evolution will emit structured progress lines for status rendering. - -**Tech Stack:** Python, Typer, FastAPI, httpx, pytest, existing `RunStore`/`Orchestrator` runtime. - ---- - -### Task 1: Port PR12 Graph Optimization Runtime - -**Files:** -- Create: `agentflow/graph_optimizer.py` -- Modify: `agentflow/specs.py` -- Modify: `agentflow/orchestrator.py` -- Test: `tests/test_graph_optimizer.py` - -- [ ] **Step 1: Add the failing graph optimization tests** - -Port the PR12 graph-optimization tests into a new test file covering: -- successful multi-round optimization -- retry on invalid optimized pipeline -- failure after exhausting optimizer validation attempts - -Run: - -```bash -pytest tests/test_graph_optimizer.py -q -``` - -Expected: FAIL because the current branch does not define graph-optimization runtime support. - -- [ ] **Step 2: Add graph optimization data model support** - -Implement the `PipelineSpec` and `RunRecord` fields needed for graph optimization: -- `optimizer` -- `n_run` -- `uses_graph_optimizer` -- `optimization_parent_run_id` -- `optimization_round` -- `optimization_session` - -Run: - -```bash -pytest tests/test_graph_optimizer.py -q -``` - -Expected: FAIL later in orchestrator/runtime paths instead of schema/attribute errors. - -- [ ] **Step 3: Add graph optimization runtime helpers and orchestrator flow** - -Port the PR12 runtime pieces: -- editable pipeline artifact generation -- graph report generation -- optimization round directories -- child run creation -- round/session events -- optimizer retry/accept/failure flow - -Run: - -```bash -pytest tests/test_graph_optimizer.py -q -``` - -Expected: PASS. - -- [ ] **Step 4: Commit the graph optimization runtime** - -```bash -git add agentflow/graph_optimizer.py agentflow/specs.py agentflow/orchestrator.py tests/test_graph_optimizer.py -git commit -m "feat: port graph optimization runtime support" -``` - -### Task 2: Add Detached Daemon Submission - -**Files:** -- Modify: `agentflow/cli.py` -- Modify: `tests/test_cli.py` -- Test: `tests/test_cli.py` - -- [ ] **Step 1: Add failing CLI tests for detached run submission** - -Add tests for: -- `agentflow run pipeline.py -d` returning a queued/running `run_id` without waiting -- auto-start or reuse of the local daemon client path -- output shape for summary/json/json-summary detached results - -Run: - -```bash -pytest tests/test_cli.py -q -k "detach or daemon" -``` - -Expected: FAIL because `run` has no detach mode and no daemon submission helpers. - -- [ ] **Step 2: Implement daemon metadata and ensure-daemon helpers** - -Add CLI-side helpers that: -- compute a daemon metadata file path per `runs_dir` -- probe health on the configured host/port -- start `agentflow serve` in the background when needed -- wait until the daemon is reachable - -- [ ] **Step 3: Implement `run -d`** - -Update CLI `run` so: -- default mode keeps existing synchronous behavior -- `-d/--detach` loads the pipeline, ensures the daemon, submits via HTTP, prints the returned run record summary, and exits without waiting - -Run: - -```bash -pytest tests/test_cli.py -q -k "detach or daemon" -``` - -Expected: PASS. - -- [ ] **Step 4: Commit detached submission** - -```bash -git add agentflow/cli.py tests/test_cli.py -git commit -m "feat: add detached daemon-backed run submission" -``` - -### Task 3: Add `agentflow status ` - -**Files:** -- Modify: `agentflow/cli.py` -- Modify: `tests/test_cli.py` -- Test: `tests/test_cli.py` - -- [ ] **Step 1: Add failing tests for `status`** - -Add tests covering: -- missing run handling -- summary rendering for in-flight runs -- JSON summary payload including events and active node progress -- PR12 optimization-session visibility - -Run: - -```bash -pytest tests/test_cli.py -q -k "status_command or run_status" -``` - -Expected: FAIL because there is no `status` command or status renderer. - -- [ ] **Step 2: Implement status builders and renderers** - -Add new helpers that: -- load run + events from `RunStore` -- render process-oriented summaries -- include event timeline slices -- surface optimization session and round info when present - -- [ ] **Step 3: Add the `status` command** - -Implement a new Typer command: -- `agentflow status ` -- uses direct store reads rather than daemon queries for persistent inspection -- supports existing output styles plus richer JSON summary - -Run: - -```bash -pytest tests/test_cli.py -q -k "status_command or run_status" -``` - -Expected: PASS. - -- [ ] **Step 4: Commit status command** - -```bash -git add agentflow/cli.py tests/test_cli.py -git commit -m "feat: add run status process view" -``` - -### Task 4: Add PR11 Evolution Progress Visibility - -**Files:** -- Modify: `agentflow/tuned_agents.py` -- Modify: `agentflow/dsl.py` -- Modify: `agentflow/cli.py` -- Modify: `tests/test_tuned_agents.py` -- Modify: `tests/test_cli.py` - -- [ ] **Step 1: Add failing tests for evolution progress reporting** - -Add tests covering: -- `run_evolution_from_payload()` progress callback/stage notifications -- `dsl.evolve()` generated node code emitting structured progress lines -- `status` rendering evolution stage lines from run artifacts/events or trace-derived data - -Run: - -```bash -pytest tests/test_tuned_agents.py tests/test_cli.py -q -k "evolution_progress or evolve_status" -``` - -Expected: FAIL because no structured evolution progress reporting exists. - -- [ ] **Step 2: Implement progress callback support in tuned agent evolution** - -Update `run_evolution_from_payload()` to report: -- start -- attempt start -- optimizer/build/test/smoke start and completion/failure -- final success/failure - -- [ ] **Step 3: Emit progress lines from pipeline-driven evolve nodes** - -Update `dsl.evolve()` generated Python code so pipeline execution emits structured progress lines to stderr while preserving the final stdout JSON result. - -- [ ] **Step 4: Surface evolution progress in status rendering** - -Teach status rendering to recognize and show evolution phase lines from the run’s stored trace/stderr data. - -Run: - -```bash -pytest tests/test_tuned_agents.py tests/test_cli.py -q -k "evolution_progress or evolve_status" -``` - -Expected: PASS. - -- [ ] **Step 5: Commit evolution progress support** - -```bash -git add agentflow/tuned_agents.py agentflow/dsl.py agentflow/cli.py tests/test_tuned_agents.py tests/test_cli.py -git commit -m "feat: surface evolution progress in run status" -``` - -### Task 5: End-to-End Verification - -**Files:** -- Modify: `tests/test_api.py` -- Test: `tests/test_api.py` -- Test: `tests/test_graph_optimizer.py` -- Test: `tests/test_tuned_agents.py` -- Test: `tests/test_cli.py` - -- [ ] **Step 1: Add or adjust integration tests if needed** - -Add focused API/integration coverage only where the earlier tasks reveal missing protection, especially around detached run submission paths and optimization session payload shape. - -- [ ] **Step 2: Run the focused verification suite** - -Run: - -```bash -pytest tests/test_api.py tests/test_graph_optimizer.py tests/test_tuned_agents.py tests/test_cli.py -q -``` - -Expected: PASS for all tests added or touched by this feature. Pre-existing unrelated failures elsewhere in the repo are explicitly out of scope. - -- [ ] **Step 3: Commit verification-only follow-ups** - -```bash -git add tests/test_api.py tests/test_graph_optimizer.py tests/test_tuned_agents.py tests/test_cli.py -git commit -m "test: cover async status daemon flows" -``` diff --git a/docs/superpowers/specs/2026-04-13-async-status-daemon-design.md b/docs/superpowers/specs/2026-04-13-async-status-daemon-design.md deleted file mode 100644 index e567990..0000000 --- a/docs/superpowers/specs/2026-04-13-async-status-daemon-design.md +++ /dev/null @@ -1,57 +0,0 @@ -# Async Status Daemon Design - -**Problem** - -`agentflow run` currently submits a run and then blocks until completion in the foreground CLI process. `Orchestrator.submit()` already schedules background work, but the worker thread is a daemon thread owned by the current process, so `agentflow run -d` cannot work by simply skipping `wait()`. The process would exit and the run would die. - -**User-facing goal** - -- `agentflow run pipeline.py -d` submits a run, returns a stable `run_id`, and exits immediately. -- `agentflow status ` shows in-flight progress, not only the final result. -- The status view must be able to show PR11 evolution process details and PR12 graph-optimization process details. - -**Design** - -1. Detached execution uses a long-lived local daemon based on the existing FastAPI app and `Orchestrator`, rather than a per-run child process. The CLI becomes a thin client for detached submission. -2. The daemon lifecycle is managed in-process by new CLI helpers that persist a small daemon metadata file next to the run store and auto-start `agentflow serve` in the background when needed. -3. `status` is a new CLI command. It reads the persistent run store directly for run data and events, so historical runs remain inspectable even if the daemon is offline. -4. PR12 support is brought into this branch by porting the graph optimization runtime model: `optimization_session`, parent/child run relationships, round events, and child-run bookkeeping. -5. PR11 process visibility is added by instrumenting `run_evolution_from_payload()` with progress callbacks and making the generated `dsl.evolve()` Python node emit structured progress lines to stderr. `status` interprets those lines into an evolution timeline. - -**Scope** - -- Implement detached run submission. -- Implement daemon auto-start helpers and metadata. -- Implement `status`. -- Port PR12 graph optimization support. -- Add PR11 evolution progress reporting for pipeline-driven evolution nodes. - -**Out of scope** - -- Reworking `agentflow evolve` into a fully orchestrated standalone run type with its own detached lifecycle. -- Merging the worktree branch back to `master`. - -**Architecture** - -- CLI detached submission: - - Ensure daemon running for a given `runs_dir`. - - Submit pipeline via existing HTTP `POST /api/runs`. - - Print queued/running run summary with `run_id`. -- CLI status: - - Build `RunStore(runs_dir)`. - - Load `RunRecord` + `events.jsonl`. - - Render a richer process summary and JSON payload. -- Graph optimization: - - Port the PR12 `optimization_session` fields, round directories, and optimization events so status has first-class data to render. -- Evolution process: - - Add a callback-based progress API in `agentflow.tuned_agents`. - - Emit structured phase lines from generated evolve-node Python code. - - Parse and surface them in status output. - -**Testing** - -- Add targeted CLI tests for `run -d`, daemon reuse, and `status`. -- Add targeted API/daemon helper tests where useful. -- Port/enable graph optimization tests from PR12. -- Add evolution progress/status tests for PR11-related process display. -