-
Notifications
You must be signed in to change notification settings - Fork 270
Add dynamic workflow tool #3426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d91a5e8
181521b
70d18ca
3b947f9
41bef42
83a85ee
5222816
c35e9d5
cd0aeb1
d100fd8
ab75043
9597084
bb8d812
2515653
76f4ee5
c8c50ee
3a480eb
aeafbcb
e33dd39
b722988
05b78ee
da12413
fe4a34e
97ec7a2
4949144
a60cdbe
eb267fa
5bc40d2
8a484b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| """Dynamic workflow tool example. | ||
|
|
||
| This example demonstrates the intended workflow shape: | ||
|
|
||
| 1. The parent agent writes a Python workflow script. | ||
| 2. The parent agent calls the workflow tool with that generated script. | ||
| 3. The workflow fans out sub-agents to audit test coverage by project area. | ||
| 4. A reducer sub-agent summarizes the repo-wide coverage risks. | ||
| """ | ||
|
|
||
| import os | ||
| from pathlib import Path | ||
|
|
||
| from openhands.sdk import LLM, Agent, AgentContext, Conversation, Tool | ||
| from openhands.sdk.context import Skill | ||
| from openhands.sdk.subagent import register_agent_if_absent | ||
| from openhands.tools.delegate import DelegationVisualizer | ||
| from openhands.tools.file_editor import FileEditorTool | ||
| from openhands.tools.terminal import TerminalTool | ||
| from openhands.tools.workflow import WorkflowToolSet | ||
|
|
||
|
|
||
| llm = LLM( | ||
| model=os.getenv("LLM_MODEL", "gpt-5.5"), | ||
| api_key=os.getenv("LLM_API_KEY"), | ||
| base_url=os.getenv("LLM_BASE_URL"), | ||
| usage_id="dynamic-workflow-demo", | ||
| ) | ||
|
|
||
|
|
||
| # Sub-agent used by the generated workflow. | ||
| def create_coverage_auditor(llm: LLM) -> Agent: | ||
| return Agent( | ||
| llm=llm, | ||
| tools=[ | ||
| Tool(name=TerminalTool.name), | ||
| Tool(name=FileEditorTool.name), | ||
| ], | ||
| agent_context=AgentContext( | ||
| skills=[ | ||
| Skill( | ||
| name="coverage_audit", | ||
| content=( | ||
| "You audit whether source code has meaningful test " | ||
| "coverage. Use read-only inspection commands and file " | ||
| "views. Compare source modules against the matching " | ||
| "tests under tests/sdk, tests/tools, tests/workspace, " | ||
| "or tests/agent_server. Identify risky untested " | ||
| "behavior, and recommend the " | ||
| "next tests to add. Use at most three tool calls, " | ||
| "avoid broad dumps, and do not edit files." | ||
| ), | ||
| trigger=None, | ||
| ) | ||
| ], | ||
| system_message_suffix=( | ||
| "Return a concise coverage assessment with evidence, gaps, " | ||
| "and recommended tests. Keep command output under 200 lines " | ||
| "and do not modify the repository." | ||
| ), | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| register_agent_if_absent( | ||
| name="coverage_auditor", | ||
| factory_func=create_coverage_auditor, | ||
| description="Audits test coverage quality for one project area.", | ||
| ) | ||
|
|
||
| # The parent agent has the workflow tool. It is responsible for writing the | ||
| # workflow script and then calling the tool with that generated Python code. | ||
| parent_agent = Agent( | ||
| llm=llm, | ||
| tools=[Tool(name=WorkflowToolSet.name)], | ||
| agent_context=AgentContext( | ||
| skills=[ | ||
| Skill( | ||
| name="workflow_author", | ||
| content=( | ||
| "When a task benefits from parallel sub-agents, write a " | ||
| "Python workflow script with `async def main(wf):` and call " | ||
| "the workflow tool. Keep intermediate findings inside the " | ||
| "workflow and return only the reducer's final report. " | ||
| "Prefer bounded prompts and `max_concurrency=2` for " | ||
| "examples that inspect repositories." | ||
| ), | ||
| trigger=None, | ||
| ) | ||
| ] | ||
| ), | ||
| ) | ||
|
|
||
| conversation = Conversation( | ||
| agent=parent_agent, | ||
| workspace=Path.cwd(), | ||
| visualizer=DelegationVisualizer(name="CoverageWorkflow"), | ||
| max_iteration_per_run=6, # increase if more turns needed to write the script | ||
| ) | ||
|
|
||
| conversation.send_message( | ||
| "Write and run a dynamic workflow that audits whether test coverage is " | ||
| "good across this repository. In the workflow code you generate, create " | ||
| "one item for each project area: `openhands-sdk/openhands/sdk`, " | ||
| "`openhands-tools/openhands/tools`, " | ||
| "`openhands-workspace/openhands/workspace`, and " | ||
| "`openhands-agent-server/openhands/agent_server`. Use `wf.map_agents` " | ||
| "with `max_concurrency=2` to fan out one `coverage_auditor` sub-agent " | ||
| "per area. Each sub-agent should inspect source files and matching tests " | ||
| "under `tests/sdk`, `tests/tools`, `tests/workspace`, or " | ||
| "`tests/agent_server` with at most three read-only commands or file views, " | ||
| "avoid running the full test suite, and report coverage strengths, risky " | ||
| "gaps, and the " | ||
| "next tests to add. Finally use `wf.reduce_agent` with " | ||
| "`coverage_auditor` to synthesize a " | ||
| "repo-wide coverage report with the highest-priority gaps. Return the " | ||
| "final report to me." | ||
| ) | ||
| conversation.run() | ||
|
|
||
| cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost | ||
| print(f"EXAMPLE_COST: {cost}") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| """Dynamic workflow tool for sub-agent orchestration.""" | ||
|
|
||
| from openhands.tools.workflow.definition import ( | ||
| WorkflowAction, | ||
| WorkflowObservation, | ||
| WorkflowTool, | ||
| WorkflowToolSet, | ||
| ) | ||
| from openhands.tools.workflow.impl import ( | ||
| WorkflowContext, | ||
| WorkflowExecutor, | ||
| WorkflowScriptError, | ||
| ) | ||
|
|
||
|
|
||
| __all__ = [ | ||
| "WorkflowAction", | ||
| "WorkflowContext", | ||
| "WorkflowExecutor", | ||
| "WorkflowObservation", | ||
| "WorkflowScriptError", | ||
| "WorkflowTool", | ||
| "WorkflowToolSet", | ||
| ] |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,175 @@ | ||||||||
| """Dynamic workflow tool definitions.""" | ||||||||
|
|
||||||||
| from collections.abc import Sequence | ||||||||
| from typing import TYPE_CHECKING, Literal | ||||||||
|
|
||||||||
| from pydantic import Field | ||||||||
|
|
||||||||
| from openhands.sdk.tool import ( | ||||||||
| Action, | ||||||||
| Observation, | ||||||||
| ToolAnnotations, | ||||||||
| ToolDefinition, | ||||||||
| register_tool, | ||||||||
| ) | ||||||||
|
|
||||||||
|
|
||||||||
| if TYPE_CHECKING: | ||||||||
| from openhands.sdk.conversation.state import ConversationState | ||||||||
| from openhands.tools.workflow.impl import WorkflowExecutor | ||||||||
|
|
||||||||
|
|
||||||||
| class WorkflowAction(Action): | ||||||||
| """Schema for running a Python dynamic workflow script.""" | ||||||||
|
|
||||||||
| name: str = Field(description="A short name for this workflow run.") | ||||||||
| script: str = Field( | ||||||||
| description=( | ||||||||
| "Python workflow script to run. It must define `async def main(wf):` " | ||||||||
| "and coordinate work only through the provided `wf` object." | ||||||||
| ) | ||||||||
| ) | ||||||||
| max_concurrency: int = Field( | ||||||||
|
neubig marked this conversation as resolved.
neubig marked this conversation as resolved.
|
||||||||
| default=8, | ||||||||
| ge=1, | ||||||||
| le=64, | ||||||||
| description=( | ||||||||
| "Maximum number of sub-agent tasks to run concurrently. " | ||||||||
| "Consider 2–4 for LLM-heavy workflows to avoid hitting API rate limits." | ||||||||
| ), | ||||||||
| ) | ||||||||
|
|
||||||||
|
|
||||||||
| class WorkflowObservation(Observation): | ||||||||
| """Observation from a dynamic workflow run.""" | ||||||||
|
|
||||||||
| name: str = Field(description="The workflow name that was executed.") | ||||||||
| status: Literal["completed", "error"] = Field( | ||||||||
| description="The workflow execution status." | ||||||||
| ) | ||||||||
|
|
||||||||
|
|
||||||||
| _WORKFLOW_DESCRIPTION = """Run a dynamic workflow written as Python orchestration code. | ||||||||
|
|
||||||||
| Use this tool for large tasks that benefit from parallel sub-agents, such as | ||||||||
| codebase-wide audits, independent plan reviews, security sweeps, or discovery | ||||||||
| work where intermediate results should stay outside the main conversation. | ||||||||
|
|
||||||||
| Provide a Python script that defines exactly this entry point: | ||||||||
|
|
||||||||
| ```python | ||||||||
| async def main(wf): | ||||||||
| ... | ||||||||
| ``` | ||||||||
|
|
||||||||
| The script coordinates sub-agents through the `wf` object. It should not read or | ||||||||
| write files, run shell commands, or perform the engineering work directly. | ||||||||
| Sub-agents should do that work through their normal OpenHands tools and security | ||||||||
| policy. Scripts should use only the documented `wf` methods; private `wf` | ||||||||
| attributes are rejected. Large reducer inputs may be truncated before being sent | ||||||||
| to the reducer sub-agent. | ||||||||
|
|
||||||||
| Available `wf` methods: | ||||||||
| - `await wf.run_agent(prompt, subagent_type="general-purpose", description=None)` | ||||||||
| - `await wf.map_agents(items, prompt, subagent_type="general-purpose", | ||||||||
| max_concurrency=None, description=None)` | ||||||||
| - `await wf.reduce_agent(items, prompt, subagent_type="general-purpose", | ||||||||
| description=None)` | ||||||||
| - `wf.flatten(values)` — flatten one level of nesting (not recursive) | ||||||||
|
|
||||||||
| `subagent_type` must be a sub-agent type registered in the parent application. | ||||||||
| Use the same type names you registered when building your agent. | ||||||||
|
|
||||||||
| Scripts must use only the documented `wf` methods listed above; calling | ||||||||
| `wf.close()` or any other undocumented attribute is not supported. | ||||||||
|
neubig marked this conversation as resolved.
|
||||||||
|
|
||||||||
| `print()` is available for debugging but writes to the server logs, not to | ||||||||
| the workflow observation seen by the LLM; use the return value of `main()` to | ||||||||
| surface results. | ||||||||
|
|
||||||||
| If one or more `map_agents` items fail, the whole call raises an | ||||||||
| `ExceptionGroup`. The name `ExceptionGroup` is not available by name in the | ||||||||
| workflow sandbox, so scripts cannot use `except*` for selective group handling. | ||||||||
| A plain `except Exception` will still catch the entire group. To handle partial | ||||||||
| failures and collect all results, design sub-agent prompts to return an error | ||||||||
| sentinel value instead of raising. | ||||||||
|
|
||||||||
| `map_agents` accepts either a callable prompt, such as | ||||||||
| `lambda item: f"Review this finding: {item}"`, or a string template containing | ||||||||
| `{item}`. | ||||||||
|
|
||||||||
| Example: | ||||||||
| ```python | ||||||||
| async def main(wf): | ||||||||
| strategies = ["minimal fix", "test-first", "security-focused"] | ||||||||
| plans = await wf.map_agents( | ||||||||
| items=strategies, | ||||||||
| subagent_type="general-purpose", | ||||||||
| max_concurrency=3, | ||||||||
|
neubig marked this conversation as resolved.
|
||||||||
| prompt=lambda strategy: f"Create a plan using this strategy: {strategy}", | ||||||||
| ) | ||||||||
| critiques = await wf.map_agents( | ||||||||
| items=plans, | ||||||||
| subagent_type="code-reviewer", | ||||||||
| prompt=lambda plan: f"Adversarially critique this plan: {plan}", | ||||||||
| ) | ||||||||
| return await wf.reduce_agent( | ||||||||
| items={"plans": plans, "critiques": critiques}, | ||||||||
| prompt="Synthesize the safest and simplest final plan.", | ||||||||
| ) | ||||||||
| ``` | ||||||||
|
|
||||||||
| This MVP executes generated Python in-process after best-effort validation. Treat | ||||||||
| running a workflow as approving generated code execution. | ||||||||
| """ | ||||||||
|
|
||||||||
|
|
||||||||
| class WorkflowTool(ToolDefinition[WorkflowAction, WorkflowObservation]): | ||||||||
| """Low-level tool for explicit executor injection. | ||||||||
|
|
||||||||
| Prefer ``WorkflowToolSet`` for standard SDK auto-create usage. | ||||||||
| Use ``WorkflowTool`` when you need to inject a custom executor | ||||||||
| (e.g., in tests or extensions). | ||||||||
| """ | ||||||||
|
|
||||||||
| @classmethod | ||||||||
| def create( | ||||||||
| cls, | ||||||||
| conv_state: "ConversationState | None" = None, # noqa: ARG003 | ||||||||
| executor: "WorkflowExecutor | None" = None, | ||||||||
| description: str = _WORKFLOW_DESCRIPTION, | ||||||||
| ) -> Sequence["WorkflowTool"]: | ||||||||
| from openhands.tools.workflow.impl import WorkflowExecutor | ||||||||
|
|
||||||||
| return [ | ||||||||
| cls( | ||||||||
| action_type=WorkflowAction, | ||||||||
| observation_type=WorkflowObservation, | ||||||||
| description=description, | ||||||||
| annotations=ToolAnnotations( | ||||||||
| title="workflow", | ||||||||
| readOnlyHint=False, | ||||||||
| destructiveHint=True, | ||||||||
| idempotentHint=False, | ||||||||
| openWorldHint=True, | ||||||||
| ), | ||||||||
| executor=executor if executor is not None else WorkflowExecutor(), | ||||||||
| ) | ||||||||
| ] | ||||||||
|
|
||||||||
|
|
||||||||
| class WorkflowToolSet(ToolDefinition[WorkflowAction, WorkflowObservation]): | ||||||||
| """Tool set that creates the dynamic workflow tool.""" | ||||||||
|
|
||||||||
| @classmethod | ||||||||
| def create( | ||||||||
| cls, | ||||||||
| conv_state: "ConversationState", # noqa: ARG003 | ||||||||
| ) -> Sequence[WorkflowTool]: | ||||||||
| from openhands.tools.workflow.impl import WorkflowExecutor | ||||||||
|
|
||||||||
| return WorkflowTool.create(executor=WorkflowExecutor()) | ||||||||
|
neubig marked this conversation as resolved.
|
||||||||
|
|
||||||||
|
|
||||||||
| register_tool(WorkflowToolSet.name, WorkflowToolSet) | ||||||||
|
neubig marked this conversation as resolved.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Suggestion: add a brief comment above the two Both names are registered with distinct use cases, but a reader landing at the bottom of the file without reading the class docstrings won't see why. A one-line comment would close the suggestion from that thread:
Suggested change
|
||||||||
| register_tool(WorkflowTool.name, WorkflowTool) | ||||||||
|
neubig marked this conversation as resolved.
|
||||||||
Uh oh!
There was an error while loading. Please reload this page.