Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d91a5e8
Add dynamic workflow tool
openhands-agent May 28, 2026
181521b
Update dynamic workflow example
openhands-agent May 28, 2026
70d18ca
Address dynamic workflow review feedback
openhands-agent May 28, 2026
3b947f9
Improve workflow failure diagnostics
openhands-agent May 28, 2026
41bef42
Tighten workflow script boundaries
openhands-agent May 28, 2026
83a85ee
Merge branch 'main' into redo-dynamic-workflow-mvp
neubig May 29, 2026
5222816
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
c35e9d5
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
cd0aeb1
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
d100fd8
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
ab75043
chore: restore type() to _safe_globals (#3426)
openhands-agent May 29, 2026
9597084
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
bb8d812
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
2515653
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
76f4ee5
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
c8c50ee
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
3a480eb
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
aeafbcb
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
e33dd39
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
b722988
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
05b78ee
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
da12413
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
fe4a34e
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
97ec7a2
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
4949144
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
a60cdbe
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
eb267fa
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
5bc40d2
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
8a484b7
chore: address PR review feedback (#3426)
openhands-agent May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions examples/01_standalone_sdk/52_dynamic_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Dynamic workflow tool example.

This example demonstrates the intended workflow shape:

1. The parent agent writes a Python workflow script.
2. The parent agent calls the workflow tool with that generated script.
3. The workflow fans out sub-agents to audit test coverage by project area.
4. A reducer sub-agent summarizes the repo-wide coverage risks.
"""

import os
from pathlib import Path

from openhands.sdk import LLM, Agent, AgentContext, Conversation, Tool
from openhands.sdk.context import Skill
from openhands.sdk.subagent import register_agent_if_absent
from openhands.tools.delegate import DelegationVisualizer
from openhands.tools.file_editor import FileEditorTool
from openhands.tools.terminal import TerminalTool
from openhands.tools.workflow import WorkflowToolSet


llm = LLM(
model=os.getenv("LLM_MODEL", "gpt-5.5"),
api_key=os.getenv("LLM_API_KEY"),
base_url=os.getenv("LLM_BASE_URL"),
usage_id="dynamic-workflow-demo",
)


# Sub-agent used by the generated workflow.
def create_coverage_auditor(llm: LLM) -> Agent:
return Agent(
llm=llm,
tools=[
Tool(name=TerminalTool.name),
Tool(name=FileEditorTool.name),
],
agent_context=AgentContext(
skills=[
Skill(
name="coverage_audit",
content=(
"You audit whether source code has meaningful test "
"coverage. Use read-only inspection commands and file "
"views. Compare source modules against the matching "
"tests under tests/sdk, tests/tools, tests/workspace, "
"or tests/agent_server. Identify risky untested "
"behavior, and recommend the "
"next tests to add. Use at most three tool calls, "
"avoid broad dumps, and do not edit files."
),
trigger=None,
)
],
system_message_suffix=(
"Return a concise coverage assessment with evidence, gaps, "
"and recommended tests. Keep command output under 200 lines "
"and do not modify the repository."
),
),
)


register_agent_if_absent(
name="coverage_auditor",
factory_func=create_coverage_auditor,
description="Audits test coverage quality for one project area.",
)

# The parent agent has the workflow tool. It is responsible for writing the
# workflow script and then calling the tool with that generated Python code.
parent_agent = Agent(
llm=llm,
tools=[Tool(name=WorkflowToolSet.name)],
agent_context=AgentContext(
skills=[
Skill(
name="workflow_author",
content=(
"When a task benefits from parallel sub-agents, write a "
"Python workflow script with `async def main(wf):` and call "
"the workflow tool. Keep intermediate findings inside the "
"workflow and return only the reducer's final report. "
"Prefer bounded prompts and `max_concurrency=2` for "
"examples that inspect repositories."
),
trigger=None,
)
]
),
)

conversation = Conversation(
agent=parent_agent,
workspace=Path.cwd(),
visualizer=DelegationVisualizer(name="CoverageWorkflow"),
max_iteration_per_run=6, # increase if more turns needed to write the script
)

conversation.send_message(
"Write and run a dynamic workflow that audits whether test coverage is "
"good across this repository. In the workflow code you generate, create "
"one item for each project area: `openhands-sdk/openhands/sdk`, "
"`openhands-tools/openhands/tools`, "
"`openhands-workspace/openhands/workspace`, and "
"`openhands-agent-server/openhands/agent_server`. Use `wf.map_agents` "
"with `max_concurrency=2` to fan out one `coverage_auditor` sub-agent "
"per area. Each sub-agent should inspect source files and matching tests "
"under `tests/sdk`, `tests/tools`, `tests/workspace`, or "
"`tests/agent_server` with at most three read-only commands or file views, "
"avoid running the full test suite, and report coverage strengths, risky "
"gaps, and the "
"next tests to add. Finally use `wf.reduce_agent` with "
"`coverage_auditor` to synthesize a "
"repo-wide coverage report with the highest-priority gaps. Return the "
"final report to me."
)
conversation.run()

cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost
print(f"EXAMPLE_COST: {cost}")
2 changes: 2 additions & 0 deletions openhands-tools/openhands/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from openhands.tools.task import TaskToolSet
from openhands.tools.task_tracker import TaskTrackerTool
from openhands.tools.terminal import TerminalTool
from openhands.tools.workflow import WorkflowToolSet


try:
Expand All @@ -44,6 +45,7 @@
"TaskToolSet",
"TaskTrackerTool",
"TerminalTool",
"WorkflowToolSet",
"get_default_agent",
"get_default_tools",
"register_default_tools",
Expand Down
17 changes: 17 additions & 0 deletions openhands-tools/openhands/tools/task/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,23 @@ def __init__(
# when the parent persists, otherwise a temporary directory.
self._persistence_dir: Path | None = None

def attach_parent(self, conversation: LocalConversation) -> None:
"""Attach the parent conversation used to create sub-agent tasks.

Idempotent: if a parent conversation is already attached, subsequent
Comment thread
neubig marked this conversation as resolved.
calls with the same conversation have no effect. Calls with a different
conversation are also ignored, but log a warning to surface potential
programming errors where two subsystems try to register different parents.
"""
if (
self._parent_conversation is not None
and self._parent_conversation is not conversation
):
logger.warning(
"attach_parent called with a different conversation; ignoring."
)
self._ensure_parent(conversation)
Comment thread
neubig marked this conversation as resolved.

def _ensure_parent(self, conversation: LocalConversation) -> None:
if self._parent_conversation is None:
self._parent_conversation = conversation
Expand Down
24 changes: 24 additions & 0 deletions openhands-tools/openhands/tools/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Dynamic workflow tool for sub-agent orchestration."""

from openhands.tools.workflow.definition import (
WorkflowAction,
WorkflowObservation,
WorkflowTool,
WorkflowToolSet,
)
from openhands.tools.workflow.impl import (
WorkflowContext,
WorkflowExecutor,
WorkflowScriptError,
)


__all__ = [
"WorkflowAction",
"WorkflowContext",
"WorkflowExecutor",
"WorkflowObservation",
"WorkflowScriptError",
"WorkflowTool",
"WorkflowToolSet",
]
175 changes: 175 additions & 0 deletions openhands-tools/openhands/tools/workflow/definition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Dynamic workflow tool definitions."""

from collections.abc import Sequence
from typing import TYPE_CHECKING, Literal

from pydantic import Field

from openhands.sdk.tool import (
Action,
Observation,
ToolAnnotations,
ToolDefinition,
register_tool,
)


if TYPE_CHECKING:
from openhands.sdk.conversation.state import ConversationState
from openhands.tools.workflow.impl import WorkflowExecutor


class WorkflowAction(Action):
"""Schema for running a Python dynamic workflow script."""

name: str = Field(description="A short name for this workflow run.")
script: str = Field(
description=(
"Python workflow script to run. It must define `async def main(wf):` "
"and coordinate work only through the provided `wf` object."
)
)
max_concurrency: int = Field(
Comment thread
neubig marked this conversation as resolved.
Comment thread
neubig marked this conversation as resolved.
default=8,
ge=1,
le=64,
description=(
"Maximum number of sub-agent tasks to run concurrently. "
"Consider 2–4 for LLM-heavy workflows to avoid hitting API rate limits."
),
)


class WorkflowObservation(Observation):
"""Observation from a dynamic workflow run."""

name: str = Field(description="The workflow name that was executed.")
status: Literal["completed", "error"] = Field(
description="The workflow execution status."
)


_WORKFLOW_DESCRIPTION = """Run a dynamic workflow written as Python orchestration code.

Use this tool for large tasks that benefit from parallel sub-agents, such as
codebase-wide audits, independent plan reviews, security sweeps, or discovery
work where intermediate results should stay outside the main conversation.

Provide a Python script that defines exactly this entry point:

```python
async def main(wf):
...
```

The script coordinates sub-agents through the `wf` object. It should not read or
write files, run shell commands, or perform the engineering work directly.
Sub-agents should do that work through their normal OpenHands tools and security
policy. Scripts should use only the documented `wf` methods; private `wf`
attributes are rejected. Large reducer inputs may be truncated before being sent
to the reducer sub-agent.

Available `wf` methods:
- `await wf.run_agent(prompt, subagent_type="general-purpose", description=None)`
- `await wf.map_agents(items, prompt, subagent_type="general-purpose",
max_concurrency=None, description=None)`
- `await wf.reduce_agent(items, prompt, subagent_type="general-purpose",
description=None)`
- `wf.flatten(values)` — flatten one level of nesting (not recursive)

`subagent_type` must be a sub-agent type registered in the parent application.
Use the same type names you registered when building your agent.

Scripts must use only the documented `wf` methods listed above; calling
`wf.close()` or any other undocumented attribute is not supported.
Comment thread
neubig marked this conversation as resolved.

`print()` is available for debugging but writes to the server logs, not to
the workflow observation seen by the LLM; use the return value of `main()` to
surface results.

If one or more `map_agents` items fail, the whole call raises an
`ExceptionGroup`. The name `ExceptionGroup` is not available by name in the
workflow sandbox, so scripts cannot use `except*` for selective group handling.
A plain `except Exception` will still catch the entire group. To handle partial
failures and collect all results, design sub-agent prompts to return an error
sentinel value instead of raising.

`map_agents` accepts either a callable prompt, such as
`lambda item: f"Review this finding: {item}"`, or a string template containing
`{item}`.

Example:
```python
async def main(wf):
strategies = ["minimal fix", "test-first", "security-focused"]
plans = await wf.map_agents(
items=strategies,
subagent_type="general-purpose",
max_concurrency=3,
Comment thread
neubig marked this conversation as resolved.
prompt=lambda strategy: f"Create a plan using this strategy: {strategy}",
)
critiques = await wf.map_agents(
items=plans,
subagent_type="code-reviewer",
prompt=lambda plan: f"Adversarially critique this plan: {plan}",
)
return await wf.reduce_agent(
items={"plans": plans, "critiques": critiques},
prompt="Synthesize the safest and simplest final plan.",
)
```

This MVP executes generated Python in-process after best-effort validation. Treat
running a workflow as approving generated code execution.
"""


class WorkflowTool(ToolDefinition[WorkflowAction, WorkflowObservation]):
"""Low-level tool for explicit executor injection.

Prefer ``WorkflowToolSet`` for standard SDK auto-create usage.
Use ``WorkflowTool`` when you need to inject a custom executor
(e.g., in tests or extensions).
"""

@classmethod
def create(
cls,
conv_state: "ConversationState | None" = None, # noqa: ARG003
executor: "WorkflowExecutor | None" = None,
description: str = _WORKFLOW_DESCRIPTION,
) -> Sequence["WorkflowTool"]:
from openhands.tools.workflow.impl import WorkflowExecutor

return [
cls(
action_type=WorkflowAction,
observation_type=WorkflowObservation,
description=description,
annotations=ToolAnnotations(
title="workflow",
readOnlyHint=False,
destructiveHint=True,
idempotentHint=False,
openWorldHint=True,
),
executor=executor if executor is not None else WorkflowExecutor(),
)
]


class WorkflowToolSet(ToolDefinition[WorkflowAction, WorkflowObservation]):
"""Tool set that creates the dynamic workflow tool."""

@classmethod
def create(
cls,
conv_state: "ConversationState", # noqa: ARG003
) -> Sequence[WorkflowTool]:
from openhands.tools.workflow.impl import WorkflowExecutor

return WorkflowTool.create(executor=WorkflowExecutor())
Comment thread
neubig marked this conversation as resolved.


register_tool(WorkflowToolSet.name, WorkflowToolSet)
Comment thread
neubig marked this conversation as resolved.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: add a brief comment above the two register_tool lines (addresses thread 3322692922)

Both names are registered with distinct use cases, but a reader landing at the bottom of the file without reading the class docstrings won't see why. A one-line comment would close the suggestion from that thread:

Suggested change
register_tool(WorkflowToolSet.name, WorkflowToolSet)
# WorkflowToolSet: standard SDK auto-create path; WorkflowTool: for explicit executor injection.
register_tool(WorkflowToolSet.name, WorkflowToolSet)

register_tool(WorkflowTool.name, WorkflowTool)
Comment thread
neubig marked this conversation as resolved.
Loading
Loading