diff --git a/README.md b/README.md index 9a762c3..5900426 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # AgentFlow -Orchestrate codex, claude, and kimi agents in dependency graphs with parallel fanout, iterative cycles, and remote execution on SSH/EC2/ECS. +Orchestrate codex, claude, kimi, and gemini agents in dependency graphs with parallel fanout, iterative cycles, and remote execution on SSH/EC2/ECS. ![AgentFlow Graph](docs/graph.png) *94-node pipeline: plan → 64 workers → 8 batch merges → 16 reviews → 4 review merges → synthesis* diff --git a/agentflow/__init__.py b/agentflow/__init__.py index 13bca4a..a067acb 100644 --- a/agentflow/__init__.py +++ b/agentflow/__init__.py @@ -6,6 +6,7 @@ claude, codex, fanout, + gemini, kimi, merge, python_node, @@ -26,6 +27,7 @@ def create_app(*args, **kwargs): "claude", "codex", "fanout", + "gemini", "kimi", "merge", "python_node", diff --git a/agentflow/agents/gemini.py b/agentflow/agents/gemini.py new file mode 100644 index 0000000..ae8b68f --- /dev/null +++ b/agentflow/agents/gemini.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import os +from pathlib import Path + +from agentflow.agents.base import AgentAdapter +from agentflow.env import merge_env_layers +from agentflow.prepared import ExecutionPaths, PreparedExecution +from agentflow.specs import NodeSpec, RepoInstructionsMode, ToolAccess + + +class GeminiAdapter(AgentAdapter): + def prepare(self, node: NodeSpec, prompt: str, paths: ExecutionPaths) -> PreparedExecution: + provider = self.provider_config(node.provider, node.agent) + executable = node.executable or "gemini" + + # Use -p for non-interactive (headless) mode; positional prompt + # launches interactive mode which hangs in automation. + command = [ + executable, + "-p", + prompt, + "--output-format", + "stream-json", + ] + + # Permission flags: map tools access to Gemini's approval model. + # --approval-mode plan = read-only (no writes allowed). + # --yolo = auto-approve all tool calls (required for non-interactive write). + if node.tools == ToolAccess.READ_ONLY: + command.extend(["--sandbox", "--approval-mode", "plan"]) + else: + command.extend(["--yolo"]) + + if node.model: + command.extend(["--model", node.model]) + + runtime_files: dict[str, str] = {} + + repo_instructions_ignored = node.repo_instructions_mode == RepoInstructionsMode.IGNORE + if repo_instructions_ignored: + # Gemini reads GEMINI.md for repo instructions; running from runtime dir avoids it + pass + + command.extend(node.extra_args) + + env = merge_env_layers(getattr(provider, "env", None), node.env) + if provider: + if provider.api_key_env: + if provider.api_key_env in env: + api_key = env[provider.api_key_env] + else: + api_key = os.getenv(provider.api_key_env) + if api_key is not None: + env.setdefault("GEMINI_API_KEY", api_key) + + cwd = paths.target_workdir + if repo_instructions_ignored: + cwd = str(Path(paths.target_runtime_dir)) + + return PreparedExecution( + command=command, + env=env, + cwd=cwd, + trace_kind="gemini", + runtime_files=runtime_files, + ) diff --git a/agentflow/agents/registry.py b/agentflow/agents/registry.py index f640431..ae9cf68 100644 --- a/agentflow/agents/registry.py +++ b/agentflow/agents/registry.py @@ -3,6 +3,7 @@ from agentflow.agents.base import AgentAdapter from agentflow.agents.claude import ClaudeAdapter from agentflow.agents.codex import CodexAdapter +from agentflow.agents.gemini import GeminiAdapter from agentflow.agents.kimi import KimiAdapter from agentflow.agents.util import PythonAdapter, ShellAdapter, SyncAdapter from agentflow.specs import AgentKind @@ -14,6 +15,7 @@ def __init__(self) -> None: AgentKind.CODEX: CodexAdapter(), AgentKind.CLAUDE: ClaudeAdapter(), AgentKind.KIMI: KimiAdapter(), + AgentKind.GEMINI: GeminiAdapter(), AgentKind.PYTHON: PythonAdapter(), AgentKind.SHELL: ShellAdapter(), AgentKind.SYNC: SyncAdapter(), diff --git a/agentflow/cli.py b/agentflow/cli.py index 5f5e2e3..cd2d48e 100644 --- a/agentflow/cli.py +++ b/agentflow/cli.py @@ -262,6 +262,8 @@ def _provider_error_subject(pipeline_node: object | None) -> str: return "Claude" if agent_name == "kimi": return "Kimi" + if agent_name == "gemini": + return "Gemini" return "The agent" diff --git a/agentflow/cloud/installer.py b/agentflow/cloud/installer.py index c97fe89..7d24a7e 100644 --- a/agentflow/cloud/installer.py +++ b/agentflow/cloud/installer.py @@ -8,7 +8,7 @@ def agent_install_script(agents: list[str]) -> str: """Return a bash script that installs the requested agent CLIs. - Supported agents: codex, claude, kimi. + Supported agents: codex, claude, kimi, gemini. """ lines = ["#!/bin/bash", "set -euo pipefail", "export DEBIAN_FRONTEND=noninteractive", ""] @@ -36,6 +36,11 @@ def agent_install_script(agents: list[str]) -> str: lines.append("if ! command -v kimi &>/dev/null; then") lines.append(" pip3 install kimi-cli || pip install kimi-cli") lines.append("fi") + elif agent == "gemini": + lines.append("# Install Gemini CLI") + lines.append("if ! command -v gemini &>/dev/null; then") + lines.append(" npm install -g @google/gemini-cli") + lines.append("fi") lines.append("") lines.append("echo 'Agent installation complete'") @@ -58,6 +63,8 @@ def agent_dockerfile(agents: list[str], base_image: str = "ubuntu:24.04") -> str lines.append("RUN npm install -g @anthropic-ai/claude-code") elif agent == "kimi": lines.append("RUN pip3 install kimi-cli") + elif agent == "gemini": + lines.append("RUN npm install -g @google/gemini-cli") lines.append("") lines.append("WORKDIR /workspace") @@ -109,5 +116,9 @@ def agent_auth_setup(agent: str, env: dict[str, str]) -> str: if api_key: parts.append(f"export KIMI_API_KEY={shlex.quote(api_key)}") parts.append(f"export MOONSHOT_API_KEY={shlex.quote(api_key)}") + elif agent == "gemini": + api_key = env.get("GEMINI_API_KEY", "") or env.get("GOOGLE_API_KEY", "") + if api_key: + parts.append(f"export GEMINI_API_KEY={shlex.quote(api_key)}") return " && ".join(parts) if parts else "" diff --git a/agentflow/doctor.py b/agentflow/doctor.py index 592098a..3d6349a 100644 --- a/agentflow/doctor.py +++ b/agentflow/doctor.py @@ -395,6 +395,8 @@ class LocalToolchainReport: codex_version: str | None = None claude_path: str | None = None claude_version: str | None = None + gemini_path: str | None = None + gemini_version: str | None = None detail: str | None = None def as_dict(self) -> dict[str, object]: @@ -422,6 +424,10 @@ def as_dict(self) -> dict[str, object]: payload["claude_path"] = self.claude_path if self.claude_version is not None: payload["claude_version"] = self.claude_version + if self.gemini_path is not None: + payload["gemini_path"] = self.gemini_path + if self.gemini_version is not None: + payload["gemini_version"] = self.gemini_version if self.detail is not None: payload["detail"] = self.detail return payload @@ -572,6 +578,20 @@ def _local_kimi_ready_ok_check_detail(node_id: str, probe_command: str, executio return detail + "." +def _local_gemini_ready_check_detail(node_id: str, executable: str) -> str: + return ( + f"Node `{node_id}` (gemini) cannot launch local Gemini CLI after the node shell bootstrap; " + f"`{executable} --version` fails in the prepared local shell." + ) + + +def _local_gemini_ready_ok_check_detail(node_id: str, executable: str) -> str: + return ( + f"Node `{node_id}` (gemini) can launch local Gemini CLI after the node shell bootstrap; " + f"`{executable} --version` succeeds in the prepared local shell." + ) + + def _local_probe_timeout_detail(node_id: str, agent: str, command_text: str, timeout_seconds: float) -> str: return ( f"Node `{node_id}` ({agent}) cannot finish the local preflight probe after the node shell bootstrap; " @@ -1078,6 +1098,120 @@ def build_pipeline_local_kimi_readiness_info_checks(pipeline: object) -> list[Do return checks +def _prepared_gemini_readiness_execution( + node: object, + pipeline: object | None = None, +) -> tuple[PreparedExecution, object, str] | None: + agent = _status_value(_object_value(node, "agent")).lower() + if agent != AgentKind.GEMINI.value: + return None + + target = _coerce_local_target(_object_value(node, "target")) + if target is None: + return None + + pipeline_workdir = _node_pipeline_workdir(node, pipeline) + paths = build_execution_paths( + base_dir=Path.cwd() / ".agentflow" / "doctor", + pipeline_workdir=pipeline_workdir, + run_id="doctor", + node_id=str(_object_value(node, "id", "gemini")), + node_target=target, + create_runtime_dir=False, + ) + env = merge_env_layers(_object_value(None, "env"), _object_value(node, "env")) + executable = str(_object_value(node, "executable") or "gemini") + prepared = PreparedExecution( + command=[executable, "--version"], + env=env, + cwd=str(paths.host_workdir), + trace_kind="final", + ) + return prepared, paths, executable + + +def _can_launch_local_gemini(node: object, pipeline: object | None = None) -> tuple[bool, str | None, str | None]: + prepared_with_paths = _prepared_gemini_readiness_execution(node, pipeline) + if prepared_with_paths is None: + return True, None, None + + prepared, paths, executable = prepared_with_paths + + try: + launch_plan = LocalRunner().plan_execution( + SimpleNamespace(target=_coerce_local_target(_object_value(node, "target"))), + prepared, + paths, + ) + except (AttributeError, TypeError, ValidationError, ValueError): + return False, executable, None + + env = os.environ.copy() + env.update(launch_plan.env) + try: + result = _run_doctor_subprocess( + launch_plan.command, + check=False, + capture_output=True, + cwd=launch_plan.cwd, + env=env, + text=True, + ) + except OSError: + return False, executable, None + except _DoctorSubprocessTimeout as exc: + return False, executable, _local_probe_timeout_detail( + str(_object_value(node, "id", "gemini")), + AgentKind.GEMINI.value, + exc.command_text, + exc.timeout_seconds, + ) + return result.returncode == 0, executable, None + + +def build_pipeline_local_gemini_readiness_checks(pipeline: object) -> list[DoctorCheck]: + checks: list[DoctorCheck] = [] + for node in _object_value(pipeline, "nodes", []) or []: + agent = _status_value(_object_value(node, "agent")).lower() + if agent != AgentKind.GEMINI.value: + continue + + ready, executable, failure_detail = _can_launch_local_gemini(node, pipeline) + if ready: + continue + + node_id = str(_object_value(node, "id", "gemini")) + checks.append( + DoctorCheck( + name="gemini_ready", + status="failed", + detail=failure_detail or _local_gemini_ready_check_detail(node_id, executable or "gemini"), + ) + ) + return checks + + +def build_pipeline_local_gemini_readiness_info_checks(pipeline: object) -> list[DoctorCheck]: + checks: list[DoctorCheck] = [] + for node in _object_value(pipeline, "nodes", []) or []: + if _prepared_gemini_readiness_execution(node, pipeline) is None: + continue + + ready, executable, failure_detail = _can_launch_local_gemini(node, pipeline) + if not ready: + continue + + node_id = str(_object_value(node, "id", "gemini")) + checks.append( + DoctorCheck( + name="gemini_ready", + status="ok", + detail=failure_detail or _local_gemini_ready_ok_check_detail(node_id, executable or "gemini"), + ) + ) + return checks + + def build_pipeline_local_codex_readiness_checks(pipeline: object) -> list[DoctorCheck]: checks: list[DoctorCheck] = [] for node in _object_value(pipeline, "nodes", []) or []: diff --git a/agentflow/dsl.py b/agentflow/dsl.py index ff0e6dc..71b45d1 100644 --- a/agentflow/dsl.py +++ b/agentflow/dsl.py @@ -359,6 +359,10 @@ def kimi(*, task_id: str, prompt: str, **kwargs: Any) -> NodeBuilder: return _node(AgentKind.KIMI, task_id=task_id, prompt=prompt, **kwargs) +def gemini(*, task_id: str, prompt: str, **kwargs: Any) -> NodeBuilder: + return _node(AgentKind.GEMINI, task_id=task_id, prompt=prompt, **kwargs) + + def python_node(*, task_id: str, code: str, **kwargs: Any) -> NodeBuilder: """Run Python code directly. The ``code`` is executed as ``python3 -c ``.""" return _node(AgentKind.PYTHON, task_id=task_id, prompt=code, **kwargs) diff --git a/agentflow/specs.py b/agentflow/specs.py index 29053e7..b5a1e87 100644 --- a/agentflow/specs.py +++ b/agentflow/specs.py @@ -31,6 +31,7 @@ class AgentKind(StrEnum): CODEX = "codex" CLAUDE = "claude" KIMI = "kimi" + GEMINI = "gemini" PYTHON = "python" SHELL = "shell" SYNC = "sync" @@ -221,6 +222,11 @@ def resolve_provider(value: str | ProviderConfig | None, agent: AgentKind) -> Pr base_url="https://api.anthropic.com", api_key_env="ANTHROPIC_API_KEY", ) + if alias in {"google", "gemini"} and agent == AgentKind.GEMINI: + return ProviderConfig( + name="google", + api_key_env="GEMINI_API_KEY", + ) if alias in {"kimi", "moonshot", "moonshot-ai"}: if agent == AgentKind.CLAUDE: return ProviderConfig( diff --git a/agentflow/traces.py b/agentflow/traces.py index 85f1557..bfaa3e3 100644 --- a/agentflow/traces.py +++ b/agentflow/traces.py @@ -257,6 +257,72 @@ def feed(self, line: str) -> list[NormalizedTraceEvent]: return events +@dataclass(slots=True) +class GeminiTraceParser(BaseTraceParser): + """Parse Gemini CLI ``--output-format stream-json`` NDJSON output. + + The Gemini CLI writes hook execution messages to stdout after the + final ``result`` JSON event. Once we see a result, we stop + accumulating text into ``final_chunks`` so hook noise does not + pollute the extracted output. + """ + + _seen_result: bool = False + + def supports_raw_stdout_fallback(self) -> bool: + return False + + def feed(self, line: str) -> list[NormalizedTraceEvent]: + payload = _json(line) + if payload is None: + text = line.rstrip() + # After the result event, remaining stdout is hook/cleanup noise. + if not self._seen_result: + self.remember(text) + return [self.emit("stdout", "stdout", text, line)] if text else [] + + event_type = payload.get("type") or "gemini" + events: list[NormalizedTraceEvent] = [] + + if event_type == "message": + role = payload.get("role", "") + is_delta = payload.get("delta", False) + text = _stringify(payload.get("content") or payload.get("message") or payload) + if role == "model" or role == "assistant": + self.remember(text) + kind = "assistant_delta" if is_delta else "assistant_message" + events.append(self.emit(kind, "Assistant delta" if is_delta else "Assistant message", text, payload)) + else: + events.append(self.emit("event", str(role or "gemini"), text, payload)) + elif event_type == "result": + self._seen_result = True + text = _stringify(payload.get("content") or payload.get("message") or payload) + if text and text != self.last_message: + self.remember(text) + events.append(self.emit("result", "Result", text, payload)) + elif event_type == "init": + events.append(self.emit("event", "Session init", _stringify(payload), payload)) + elif event_type == "tool_use": + name = payload.get("tool_name") or payload.get("name") or "tool" + events.append(self.emit("tool_call", f"Tool call: {name}", _stringify(payload.get("parameters") or payload.get("input") or payload.get("arguments")), payload)) + elif event_type == "tool_result": + events.append(self.emit("tool_result", "Tool result", _stringify(payload.get("output") or payload.get("content")), payload)) + elif event_type == "error": + events.append(self.emit("error", "Error", _stringify(payload.get("message") or payload.get("error") or payload), payload)) + else: + text = _stringify(payload) + if not self._seen_result and text: + self.remember(text) + events.append(self.emit("event", str(event_type), text, payload)) + return events + + def start_attempt(self, attempt: int) -> None: + self.attempt = attempt + self.final_chunks.clear() + self.last_message = None + self._seen_result = False + + @dataclass(slots=True) class GenericTraceParser(BaseTraceParser): def feed(self, line: str) -> list[NormalizedTraceEvent]: @@ -273,4 +339,6 @@ def create_trace_parser(agent: AgentKind, node_id: str) -> BaseTraceParser: return ClaudeTraceParser(node_id=node_id, agent=agent) case AgentKind.KIMI: return KimiTraceParser(node_id=node_id, agent=agent) + case AgentKind.GEMINI: + return GeminiTraceParser(node_id=node_id, agent=agent) return GenericTraceParser(node_id=node_id, agent=agent) diff --git a/skills/agentflow/SKILL.md b/skills/agentflow/SKILL.md index af02fd8..96fe17e 100644 --- a/skills/agentflow/SKILL.md +++ b/skills/agentflow/SKILL.md @@ -1,11 +1,11 @@ --- name: agentflow -description: Build and run multi-agent pipelines using AgentFlow. Use when the user wants to orchestrate codex, claude, or kimi agents in parallel, in sequence, or in iterative loops. Trigger when the user mentions multi-agent workflows, fan-out tasks, code review pipelines, iterative implementation loops, running agents on EC2/ECS, or any task that needs multiple AI agents coordinated together. Also trigger for "agentflow", "pipeline", "graph of agents", "fanout", "shard", or "run codex on remote". +description: Build and run multi-agent pipelines using AgentFlow. Use when the user wants to orchestrate codex, claude, kimi, or gemini agents in parallel, in sequence, or in iterative loops. Trigger when the user mentions multi-agent workflows, fan-out tasks, code review pipelines, iterative implementation loops, running agents on EC2/ECS, or any task that needs multiple AI agents coordinated together. Also trigger for "agentflow", "pipeline", "graph of agents", "fanout", "shard", or "run codex on remote". --- # AgentFlow -Build multi-agent pipelines where codex, claude, and kimi work together in dependency graphs with parallel fanout, iterative cycles, and remote execution. +Build multi-agent pipelines where codex, claude, kimi, and gemini work together in dependency graphs with parallel fanout, iterative cycles, and remote execution. ## Quick Start @@ -26,14 +26,14 @@ Run: `agentflow run pipeline.py` ## Imports ```python -from agentflow import Graph, codex, claude, kimi # agents +from agentflow import Graph, codex, claude, kimi, gemini # agents from agentflow import fanout, merge # parallel shards from agentflow import shell, python_node, sync # utility nodes ``` ## Nodes -Create agent nodes with `codex()`, `claude()`, or `kimi()`. Required: `task_id`, `prompt`. +Create agent nodes with `codex()`, `claude()`, `kimi()`, or `gemini()`. Required: `task_id`, `prompt`. ```python codex(