From d91a5e8f82b012f8c6521e98ad3dacef2600ab2f Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 22:04:23 +0000 Subject: [PATCH 01/28] Add dynamic workflow tool Co-authored-by: openhands --- .../01_standalone_sdk/52_dynamic_workflow.py | 98 +++++ openhands-tools/openhands/tools/__init__.py | 2 + .../openhands/tools/workflow/__init__.py | 28 ++ .../openhands/tools/workflow/definition.py | 145 ++++++++ .../openhands/tools/workflow/impl.py | 338 ++++++++++++++++++ tests/tools/workflow/test_workflow_tool.py | 154 ++++++++ 6 files changed, 765 insertions(+) create mode 100644 examples/01_standalone_sdk/52_dynamic_workflow.py create mode 100644 openhands-tools/openhands/tools/workflow/__init__.py create mode 100644 openhands-tools/openhands/tools/workflow/definition.py create mode 100644 openhands-tools/openhands/tools/workflow/impl.py create mode 100644 tests/tools/workflow/test_workflow_tool.py diff --git a/examples/01_standalone_sdk/52_dynamic_workflow.py b/examples/01_standalone_sdk/52_dynamic_workflow.py new file mode 100644 index 0000000000..18b36ab8df --- /dev/null +++ b/examples/01_standalone_sdk/52_dynamic_workflow.py @@ -0,0 +1,98 @@ +"""Dynamic workflow tool example. + +This example demonstrates the minimal Python workflow shape: + +1. A workflow script fans out several sub-agent tasks in parallel. +2. The workflow script keeps intermediate results in Python variables. +3. A reducer sub-agent summarizes the fan-out results into one final answer. + +In normal agent usage, an agent would generate the workflow script and call the +workflow tool. This example calls the tool directly so the workflow itself is +stable and easy to inspect. +""" + +import os + +from openhands.sdk import LLM, Agent, AgentContext, Conversation +from openhands.sdk.context import Skill +from openhands.sdk.subagent import register_agent_if_absent +from openhands.tools.workflow import WorkflowAction, WorkflowToolSet + + +llm = LLM( + model=os.getenv("LLM_MODEL", "gpt-5.5"), + api_key=os.getenv("LLM_API_KEY"), + base_url=os.getenv("LLM_BASE_URL"), + usage_id="dynamic-workflow-demo", +) + + +# Sub-agent used by the workflow. +def create_animal_expert(llm: LLM) -> Agent: + return Agent( + llm=llm, + tools=[], + agent_context=AgentContext( + skills=[ + Skill( + name="animal_expertise", + content=( + "You are a concise zoologist. Answer in one or two " + "sentences, and avoid markdown tables." + ), + trigger=None, + ) + ], + system_message_suffix="Keep responses concise and factual.", + ), + ) + + +register_agent_if_absent( + name="animal_expert", + factory_func=create_animal_expert, + description="Concise zoologist for animal facts and summaries.", +) + +# Parent conversation supplies the LLM/workspace inherited by sub-agents. +parent_agent = Agent(llm=llm, tools=[]) +conversation = Conversation(agent=parent_agent, workspace=os.getcwd()) + +workflow_script = r""" +async def main(wf): + animals = ["octopus", "honeybee", "snow leopard"] + facts = await wf.map_agents( + items=animals, + subagent_type="animal_expert", + max_concurrency=3, + prompt=lambda animal: ( + f"Give one surprising but accurate fact about the {animal}." + ), + description=lambda animal: f"Fact about {animal}", + ) + return await wf.reduce_agent( + items=facts, + subagent_type="animal_expert", + description="Animal fact summary", + prompt=( + "Combine these animal facts into a short, engaging paragraph. " + "Mention each animal exactly once." + ), + ) +""" + +workflow_tool = WorkflowToolSet.create(conv_state=conversation.state)[0].as_executable() +observation = workflow_tool( + WorkflowAction( + name="animal-facts", + script=workflow_script, + max_concurrency=3, + ), + conversation, +) + +print("Dynamic workflow result:") +print(observation.text) + +cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost +print(f"EXAMPLE_COST: {cost}") diff --git a/openhands-tools/openhands/tools/__init__.py b/openhands-tools/openhands/tools/__init__.py index 17d0eaeaa3..62f77a5ee7 100644 --- a/openhands-tools/openhands/tools/__init__.py +++ b/openhands-tools/openhands/tools/__init__.py @@ -29,6 +29,7 @@ from openhands.tools.task import TaskToolSet from openhands.tools.task_tracker import TaskTrackerTool from openhands.tools.terminal import TerminalTool +from openhands.tools.workflow import WorkflowToolSet try: @@ -44,6 +45,7 @@ "TaskToolSet", "TaskTrackerTool", "TerminalTool", + "WorkflowToolSet", "get_default_agent", "get_default_tools", "register_default_tools", diff --git a/openhands-tools/openhands/tools/workflow/__init__.py b/openhands-tools/openhands/tools/workflow/__init__.py new file mode 100644 index 0000000000..6c392b6f5d --- /dev/null +++ b/openhands-tools/openhands/tools/workflow/__init__.py @@ -0,0 +1,28 @@ +"""Dynamic workflow tool for sub-agent orchestration.""" + +from openhands.tools.workflow.definition import ( + WorkflowAction, + WorkflowObservation, + WorkflowTool, + WorkflowToolSet, +) +from openhands.tools.workflow.impl import ( + WorkflowContext, + WorkflowExecutor, + WorkflowScriptError, + execute_workflow_script, + validate_workflow_script, +) + + +__all__ = [ + "WorkflowAction", + "WorkflowContext", + "WorkflowExecutor", + "WorkflowObservation", + "WorkflowScriptError", + "WorkflowTool", + "WorkflowToolSet", + "execute_workflow_script", + "validate_workflow_script", +] diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py new file mode 100644 index 0000000000..e861e99d74 --- /dev/null +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -0,0 +1,145 @@ +"""Dynamic workflow tool definitions.""" + +from collections.abc import Sequence +from typing import TYPE_CHECKING, Literal + +from pydantic import Field + +from openhands.sdk.tool import ( + Action, + Observation, + ToolAnnotations, + ToolDefinition, + register_tool, +) + + +if TYPE_CHECKING: + from openhands.sdk.conversation.state import ConversationState + + +class WorkflowAction(Action): + """Schema for running a Python dynamic workflow script.""" + + name: str = Field(description="A short name for this workflow run.") + script: str = Field( + description=( + "Python workflow script to run. It must define `async def main(wf):` " + "and coordinate work only through the provided `wf` object." + ) + ) + max_concurrency: int = Field( + default=8, + ge=1, + le=64, + description="Maximum number of sub-agent tasks to run concurrently.", + ) + + +class WorkflowObservation(Observation): + """Observation from a dynamic workflow run.""" + + name: str = Field(description="The workflow name that was executed.") + status: Literal["completed", "error"] = Field( + description="The workflow execution status." + ) + + +_WORKFLOW_DESCRIPTION = """Run a dynamic workflow written as Python orchestration code. + +Use this tool for large tasks that benefit from parallel sub-agents, such as +codebase-wide audits, independent plan reviews, security sweeps, or discovery +work where intermediate results should stay outside the main conversation. + +Provide a Python script that defines exactly this entry point: + +```python +async def main(wf): + ... +``` + +The script coordinates sub-agents through the `wf` object. It should not read or +write files, run shell commands, or perform the engineering work directly. +Sub-agents should do that work through their normal OpenHands tools and security +policy. + +Available `wf` methods: +- `await wf.run_agent(...)` +- `await wf.map_agents(...)` +- `await wf.reduce_agent(...)` +- `wf.flatten(values)` + +`map_agents` accepts either a callable prompt, such as +`lambda item: f"Review this finding: {item}"`, or a string template containing +`{item}`. + +Example: +```python +async def main(wf): + strategies = ["minimal fix", "test-first", "security-focused"] + plans = await wf.map_agents( + items=strategies, + subagent_type="general-purpose", + max_concurrency=3, + prompt=lambda strategy: f"Create a plan using this strategy: {strategy}", + ) + critiques = await wf.map_agents( + items=plans, + subagent_type="code-reviewer", + prompt=lambda plan: f"Adversarially critique this plan: {plan}", + ) + return await wf.reduce_agent( + items={"plans": plans, "critiques": critiques}, + prompt="Synthesize the safest and simplest final plan.", + ) +``` + +This MVP executes generated Python in-process after best-effort validation. Treat +running a workflow as approving generated code execution. +""" + + +class WorkflowTool(ToolDefinition[WorkflowAction, WorkflowObservation]): + """Tool for running a dynamic Python workflow.""" + + @classmethod + def create( + cls, + executor: "WorkflowExecutor", + description: str = _WORKFLOW_DESCRIPTION, + ) -> Sequence["WorkflowTool"]: + return [ + cls( + action_type=WorkflowAction, + observation_type=WorkflowObservation, + description=description, + annotations=ToolAnnotations( + title="workflow", + readOnlyHint=False, + destructiveHint=True, + idempotentHint=False, + openWorldHint=True, + ), + executor=executor, + ) + ] + + +class WorkflowToolSet(ToolDefinition[WorkflowAction, WorkflowObservation]): + """Tool set that creates the dynamic workflow tool.""" + + @classmethod + def create( + cls, + conv_state: "ConversationState", # noqa: ARG003 + ) -> Sequence[ToolDefinition]: + from openhands.tools.workflow.impl import WorkflowExecutor + + return WorkflowTool.create(executor=WorkflowExecutor()) + + +register_tool(WorkflowToolSet.name, WorkflowToolSet) +register_tool(WorkflowTool.name, WorkflowTool) + +if TYPE_CHECKING: + from openhands.tools.workflow.impl import WorkflowExecutor diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py new file mode 100644 index 0000000000..f3067ed7b1 --- /dev/null +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -0,0 +1,338 @@ +"""Implementation of the dynamic workflow tool.""" + +from __future__ import annotations + +import ast +import asyncio +import inspect +import json as jsonlib +from collections.abc import Callable, Sequence +from typing import TYPE_CHECKING, Any, Protocol + +from openhands.sdk.logger import get_logger +from openhands.sdk.tool import ToolExecutor +from openhands.tools.task.manager import TaskManager +from openhands.tools.workflow.definition import WorkflowObservation + + +if TYPE_CHECKING: + from openhands.sdk.conversation.impl.local_conversation import LocalConversation + from openhands.tools.workflow.definition import WorkflowAction + +logger = get_logger(__name__) + +_MAX_SCRIPT_CHARS = 20_000 +_UNSAFE_CALLS = frozenset( + { + "breakpoint", + "compile", + "delattr", + "dir", + "eval", + "exec", + "getattr", + "globals", + "input", + "locals", + "open", + "setattr", + "vars", + "__import__", + } +) +_UNSAFE_ATTRIBUTE_ROOTS = frozenset({"os", "subprocess"}) + + +class WorkflowScriptError(ValueError): + """Raised when a workflow script is invalid or unsafe.""" + + +class _TaskLike(Protocol): + result: str | None + error: str | None + + +class _TaskStarter(Protocol): + def start_task( + self, + prompt: str, + subagent_type: str = "default", + resume: str | None = None, + description: str | None = None, + conversation: LocalConversation | None = None, + ) -> _TaskLike: ... + + def close(self) -> None: ... + + +class WorkflowContext: + """Small capability object exposed to generated workflow scripts.""" + + def __init__( + self, + parent_conversation: LocalConversation, + max_concurrency: int, + manager: _TaskStarter | None = None, + ) -> None: + if max_concurrency < 1: + raise ValueError("max_concurrency must be at least 1") + self._parent_conversation = parent_conversation + self._max_concurrency = max_concurrency + if manager is None: + task_manager = TaskManager() + task_manager._ensure_parent(parent_conversation) + self._manager = task_manager + else: + self._manager = manager + self._semaphore: asyncio.Semaphore | None = None + + @property + def _default_semaphore(self) -> asyncio.Semaphore: + if self._semaphore is None: + self._semaphore = asyncio.Semaphore(self._max_concurrency) + return self._semaphore + + async def run_agent( + self, + prompt: str, + subagent_type: str = "general-purpose", + description: str | None = None, + ) -> str: + """Run a single sub-agent task and return its final result.""" + async with self._default_semaphore: + return await self._run_agent_task( + prompt=prompt, + subagent_type=subagent_type, + description=description, + ) + + async def _run_agent_task( + self, + prompt: str, + subagent_type: str, + description: str | None, + ) -> str: + task = await asyncio.to_thread( + self._manager.start_task, + prompt=prompt, + subagent_type=subagent_type, + description=description, + conversation=self._parent_conversation, + ) + if task.error: + raise RuntimeError(task.error) + return task.result or "" + + async def map_agents( + self, + items: Sequence[Any], + prompt: Callable[[Any], str] | str, + subagent_type: str = "general-purpose", + max_concurrency: int | None = None, + description: Callable[[Any], str] | str | None = None, + ) -> list[str]: + """Run one sub-agent task per item and return results in item order.""" + if max_concurrency is not None and max_concurrency < 1: + raise ValueError("max_concurrency must be at least 1") + semaphore = ( + asyncio.Semaphore(max_concurrency) + if max_concurrency is not None + else self._default_semaphore + ) + + async def run_one(item: Any) -> str: + rendered_prompt = _render_template(prompt, item) + assert rendered_prompt is not None + rendered_description = _render_template(description, item) + async with semaphore: + return await self._run_agent_task( + prompt=rendered_prompt, + subagent_type=subagent_type, + description=rendered_description, + ) + + return list(await asyncio.gather(*(run_one(item) for item in items))) + + async def reduce_agent( + self, + items: Any, + prompt: str, + subagent_type: str = "general-purpose", + description: str | None = None, + ) -> str: + """Run a single reducer sub-agent with serialized intermediate results.""" + return await self.run_agent( + prompt=f"{prompt}\n\nInput:\n{_format_value(items)}", + subagent_type=subagent_type, + description=description, + ) + + def flatten(self, values: Sequence[Any]) -> list[Any]: + """Flatten one list level.""" + flattened: list[Any] = [] + for value in values: + if isinstance(value, list): + flattened.extend(value) + else: + flattened.append(value) + return flattened + + def close(self) -> None: + self._manager.close() + + +def _render_template( + template: Callable[[Any], str] | str | None, item: Any +) -> str | None: + if template is None: + return None + if callable(template): + return str(template(item)) + return template.format(item=item) + + +def _format_value(value: Any) -> str: + if isinstance(value, str): + return value + return jsonlib.dumps(value, indent=2, default=str) + + +def validate_workflow_script(script: str) -> None: + """Perform best-effort validation for generated workflow scripts.""" + if len(script) > _MAX_SCRIPT_CHARS: + raise WorkflowScriptError( + f"Workflow script is too large: {len(script)} > {_MAX_SCRIPT_CHARS}" + ) + + try: + tree = ast.parse(script) + except SyntaxError as e: + raise WorkflowScriptError(f"Workflow script has invalid syntax: {e}") from e + + main_defs = [ + node + for node in tree.body + if isinstance(node, ast.AsyncFunctionDef) and node.name == "main" + ] + if len(main_defs) != 1: + raise WorkflowScriptError( + "Workflow script must define exactly one async main(wf)" + ) + + main_def = main_defs[0] + if len(main_def.args.args) != 1 or main_def.args.args[0].arg != "wf": + raise WorkflowScriptError("Workflow entry point must be `async def main(wf):`") + + for node in ast.walk(tree): + if isinstance(node, (ast.Import, ast.ImportFrom)): + raise WorkflowScriptError("Workflow scripts may not import modules") + if isinstance(node, ast.Name) and node.id.startswith("__"): + raise WorkflowScriptError("Workflow scripts may not access dunder names") + if isinstance(node, ast.Attribute) and node.attr.startswith("__"): + raise WorkflowScriptError( + "Workflow scripts may not access dunder attributes" + ) + if ( + isinstance(node, ast.Attribute) + and _attribute_root_name(node) in _UNSAFE_ATTRIBUTE_ROOTS + ): + raise WorkflowScriptError("Workflow scripts may not access unsafe modules") + if isinstance(node, ast.Call): + call_name = _call_name(node.func) + if call_name in _UNSAFE_CALLS: + raise WorkflowScriptError( + f"Workflow scripts may not call `{call_name}`" + ) + + +def _call_name(func: ast.expr) -> str | None: + if isinstance(func, ast.Name): + return func.id + if isinstance(func, ast.Attribute): + return func.attr + return None + + +def _attribute_root_name(node: ast.Attribute) -> str | None: + value = node.value + while isinstance(value, ast.Attribute): + value = value.value + if isinstance(value, ast.Name): + return value.id + return None + + +def execute_workflow_script(script: str, context: WorkflowContext) -> Any: + """Validate and execute a workflow script against a workflow context.""" + validate_workflow_script(script) + namespace: dict[str, Any] = {} + exec(compile(script, "", "exec"), _safe_globals(), namespace) + main = namespace.get("main") + if not inspect.iscoroutinefunction(main): + raise WorkflowScriptError("Workflow entry point must be async") + return asyncio.run(main(context)) + + +def _safe_globals() -> dict[str, Any]: + safe_builtins = { + "abs": abs, + "all": all, + "any": any, + "bool": bool, + "dict": dict, + "enumerate": enumerate, + "float": float, + "int": int, + "len": len, + "list": list, + "max": max, + "min": min, + "range": range, + "round": round, + "set": set, + "sorted": sorted, + "str": str, + "sum": sum, + "tuple": tuple, + "zip": zip, + } + return {"__builtins__": safe_builtins} + + +class WorkflowExecutor(ToolExecutor["WorkflowAction", WorkflowObservation]): + """Executor for the dynamic workflow tool.""" + + def __call__( + self, + action: WorkflowAction, + conversation: LocalConversation | None = None, + ) -> WorkflowObservation: + if conversation is None: + return WorkflowObservation.from_text( + text="Workflow tool requires a local conversation context.", + name=action.name, + status="error", + is_error=True, + ) + + context = WorkflowContext( + parent_conversation=conversation, + max_concurrency=action.max_concurrency, + ) + try: + result = execute_workflow_script(action.script, context) + return WorkflowObservation.from_text( + text=str(result), + name=action.name, + status="completed", + ) + except Exception as e: + logger.warning("Workflow '%s' failed: %s", action.name, e, exc_info=True) + return WorkflowObservation.from_text( + text=str(e), + name=action.name, + status="error", + is_error=True, + ) + finally: + context.close() diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py new file mode 100644 index 0000000000..1a0ed9d56e --- /dev/null +++ b/tests/tools/workflow/test_workflow_tool.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import cast + +import pytest + +from openhands.sdk.conversation.impl.local_conversation import LocalConversation +from openhands.tools.workflow import ( + WorkflowAction, + WorkflowContext, + WorkflowExecutor, + WorkflowScriptError, + execute_workflow_script, + validate_workflow_script, +) + + +@dataclass +class _FakeTask: + result: str | None = None + error: str | None = None + + +class _FakeTaskManager: + def __init__(self) -> None: + self.prompts: list[str] = [] + self.descriptions: list[str | None] = [] + self.closed = False + + def start_task( + self, + prompt: str, + subagent_type: str = "default", + resume: str | None = None, + description: str | None = None, + conversation: LocalConversation | None = None, + ) -> _FakeTask: + self.prompts.append(f"{subagent_type}: {prompt}") + self.descriptions.append(description) + return _FakeTask(result=f"result:{prompt}") + + def close(self) -> None: + self.closed = True + + +def _context(manager: _FakeTaskManager) -> WorkflowContext: + return WorkflowContext( + parent_conversation=cast(LocalConversation, object()), + max_concurrency=4, + manager=manager, + ) + + +def test_execute_workflow_script_runs_map_and_reduce() -> None: + manager = _FakeTaskManager() + script = """ +async def main(wf): + results = await wf.map_agents( + items=["alpha", "beta"], + subagent_type="researcher", + max_concurrency=2, + prompt=lambda item: f"inspect {item}", + description=lambda item: f"job {item}", + ) + return await wf.reduce_agent( + items=results, + subagent_type="writer", + prompt="summarize the results", + description="final summary", + ) +""" + + result = execute_workflow_script(script, _context(manager)) + + expected_reduce_prompt = ( + 'writer: summarize the results\n\nInput:\n[\n "result:inspect alpha",\n' + ' "result:inspect beta"\n]' + ) + assert result.startswith("result:summarize the results") + assert manager.prompts == [ + "researcher: inspect alpha", + "researcher: inspect beta", + expected_reduce_prompt, + ] + assert manager.descriptions == ["job alpha", "job beta", "final summary"] + + +def test_map_agents_uses_default_concurrency_without_deadlock() -> None: + manager = _FakeTaskManager() + script = """ +async def main(wf): + return await wf.map_agents( + items=["one", "two"], + prompt="inspect {item}", + subagent_type="researcher", + ) +""" + + assert execute_workflow_script(script, _context(manager)) == [ + "result:inspect one", + "result:inspect two", + ] + + +def test_validate_workflow_script_rejects_missing_async_main() -> None: + with pytest.raises(WorkflowScriptError, match="async main"): + validate_workflow_script("def main(wf):\n return 'nope'\n") + + +def test_validate_workflow_script_rejects_unsafe_calls() -> None: + script = """ +async def main(wf): + return open('secrets.txt').read() +""" + + with pytest.raises(WorkflowScriptError, match="open"): + validate_workflow_script(script) + + +def test_validate_workflow_script_rejects_unsafe_module_access() -> None: + script = """ +async def main(wf): + os.system('echo nope') +""" + + with pytest.raises(WorkflowScriptError, match="unsafe modules"): + validate_workflow_script(script) + + +def test_validate_workflow_script_rejects_imports() -> None: + script = """ +import os + +async def main(wf): + return 'nope' +""" + + with pytest.raises(WorkflowScriptError, match="import"): + validate_workflow_script(script) + + +def test_workflow_executor_returns_error_observation_without_conversation() -> None: + observation = WorkflowExecutor()(WorkflowAction(name="demo", script="")) + + assert observation.is_error + assert observation.status == "error" + assert "requires a local conversation" in observation.text + + +def test_workflow_context_helper_flattens_one_level() -> None: + context = _context(_FakeTaskManager()) + + assert context.flatten([[1, 2], 3, [4]]) == [1, 2, 3, 4] From 181521b3a694e847453c1eaa3e2820f792811df8 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 22:32:50 +0000 Subject: [PATCH 02/28] Update dynamic workflow example Co-authored-by: openhands --- .../01_standalone_sdk/52_dynamic_workflow.py | 136 ++++++++++-------- 1 file changed, 80 insertions(+), 56 deletions(-) diff --git a/examples/01_standalone_sdk/52_dynamic_workflow.py b/examples/01_standalone_sdk/52_dynamic_workflow.py index 18b36ab8df..08379cf2e0 100644 --- a/examples/01_standalone_sdk/52_dynamic_workflow.py +++ b/examples/01_standalone_sdk/52_dynamic_workflow.py @@ -1,22 +1,23 @@ """Dynamic workflow tool example. -This example demonstrates the minimal Python workflow shape: +This example demonstrates the intended workflow shape: -1. A workflow script fans out several sub-agent tasks in parallel. -2. The workflow script keeps intermediate results in Python variables. -3. A reducer sub-agent summarizes the fan-out results into one final answer. - -In normal agent usage, an agent would generate the workflow script and call the -workflow tool. This example calls the tool directly so the workflow itself is -stable and easy to inspect. +1. The parent agent writes a Python workflow script. +2. The parent agent calls the workflow tool with that generated script. +3. The workflow fans out sub-agents to audit test coverage by project area. +4. A reducer sub-agent summarizes the repo-wide coverage risks. """ import os +from pathlib import Path -from openhands.sdk import LLM, Agent, AgentContext, Conversation +from openhands.sdk import LLM, Agent, AgentContext, Conversation, Tool from openhands.sdk.context import Skill from openhands.sdk.subagent import register_agent_if_absent -from openhands.tools.workflow import WorkflowAction, WorkflowToolSet +from openhands.tools.delegate import DelegationVisualizer +from openhands.tools.file_editor import FileEditorTool +from openhands.tools.terminal import TerminalTool +from openhands.tools.workflow import WorkflowToolSet llm = LLM( @@ -27,72 +28,95 @@ ) -# Sub-agent used by the workflow. -def create_animal_expert(llm: LLM) -> Agent: +# Sub-agent used by the generated workflow. +def create_coverage_auditor(llm: LLM) -> Agent: return Agent( llm=llm, - tools=[], + tools=[ + Tool(name=TerminalTool.name), + Tool(name=FileEditorTool.name), + ], agent_context=AgentContext( skills=[ Skill( - name="animal_expertise", + name="coverage_audit", content=( - "You are a concise zoologist. Answer in one or two " - "sentences, and avoid markdown tables." + "You audit whether source code has meaningful test " + "coverage. Use read-only inspection commands and file " + "views. Compare source modules against the matching " + "tests under tests/sdk, tests/tools, tests/workspace, " + "or tests/agent_server. Identify risky untested " + "behavior, and recommend the " + "next tests to add. Use at most three tool calls, " + "avoid broad dumps, and do not edit files." ), trigger=None, ) ], - system_message_suffix="Keep responses concise and factual.", + system_message_suffix=( + "Return a concise coverage assessment with evidence, gaps, " + "and recommended tests. Keep command output under 200 lines " + "and do not modify the repository." + ), ), ) register_agent_if_absent( - name="animal_expert", - factory_func=create_animal_expert, - description="Concise zoologist for animal facts and summaries.", + name="coverage_auditor", + factory_func=create_coverage_auditor, + description="Audits test coverage quality for one project area.", ) -# Parent conversation supplies the LLM/workspace inherited by sub-agents. -parent_agent = Agent(llm=llm, tools=[]) -conversation = Conversation(agent=parent_agent, workspace=os.getcwd()) - -workflow_script = r""" -async def main(wf): - animals = ["octopus", "honeybee", "snow leopard"] - facts = await wf.map_agents( - items=animals, - subagent_type="animal_expert", - max_concurrency=3, - prompt=lambda animal: ( - f"Give one surprising but accurate fact about the {animal}." - ), - description=lambda animal: f"Fact about {animal}", - ) - return await wf.reduce_agent( - items=facts, - subagent_type="animal_expert", - description="Animal fact summary", - prompt=( - "Combine these animal facts into a short, engaging paragraph. " - "Mention each animal exactly once." - ), - ) -""" - -workflow_tool = WorkflowToolSet.create(conv_state=conversation.state)[0].as_executable() -observation = workflow_tool( - WorkflowAction( - name="animal-facts", - script=workflow_script, - max_concurrency=3, +# The parent agent has the workflow tool. It is responsible for writing the +# workflow script and then calling the tool with that generated Python code. +parent_agent = Agent( + llm=llm, + tools=[Tool(name=WorkflowToolSet.name)], + agent_context=AgentContext( + skills=[ + Skill( + name="workflow_author", + content=( + "When a task benefits from parallel sub-agents, write a " + "Python workflow script with `async def main(wf):` and call " + "the workflow tool. Keep intermediate findings inside the " + "workflow and return only the reducer's final report. " + "Prefer bounded prompts and `max_concurrency=2` for " + "examples that inspect repositories." + ), + trigger=None, + ) + ] ), - conversation, ) -print("Dynamic workflow result:") -print(observation.text) +conversation = Conversation( + agent=parent_agent, + workspace=Path.cwd(), + visualizer=DelegationVisualizer(name="CoverageWorkflow"), + max_iteration_per_run=6, +) + +conversation.send_message( + "Write and run a dynamic workflow that audits whether test coverage is " + "good across this repository. In the workflow code you generate, create " + "one item for each project area: `openhands-sdk/openhands/sdk`, " + "`openhands-tools/openhands/tools`, " + "`openhands-workspace/openhands/workspace`, and " + "`openhands-agent-server/openhands/agent_server`. Use `wf.map_agents` " + "with `max_concurrency=2` to fan out one `coverage_auditor` sub-agent " + "per area. Each sub-agent should inspect source files and matching tests " + "under `tests/sdk`, `tests/tools`, `tests/workspace`, or " + "`tests/agent_server` with at most three read-only commands or file views, " + "avoid running the full test suite, and report coverage strengths, risky " + "gaps, and the " + "next tests to add. Finally use `wf.reduce_agent` with " + "`coverage_auditor` to synthesize a " + "repo-wide coverage report with the highest-priority gaps. Return the " + "final report to me." +) +conversation.run() cost = conversation.conversation_stats.get_combined_metrics().accumulated_cost print(f"EXAMPLE_COST: {cost}") From 70d18ca12ef08f52c8de2457acf337f27eb9a59c Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 23:10:49 +0000 Subject: [PATCH 03/28] Address dynamic workflow review feedback Co-authored-by: openhands --- .../openhands/tools/task/manager.py | 4 ++ .../openhands/tools/workflow/definition.py | 4 +- .../openhands/tools/workflow/impl.py | 33 +++++++++++---- tests/tools/workflow/test_workflow_tool.py | 40 +++++++++++++++++++ 4 files changed, 71 insertions(+), 10 deletions(-) diff --git a/openhands-tools/openhands/tools/task/manager.py b/openhands-tools/openhands/tools/task/manager.py index e483aa8e47..0d7e4f9999 100644 --- a/openhands-tools/openhands/tools/task/manager.py +++ b/openhands-tools/openhands/tools/task/manager.py @@ -106,6 +106,10 @@ def __init__( # when the parent persists, otherwise a temporary directory. self._persistence_dir: Path | None = None + def attach_parent(self, conversation: LocalConversation) -> None: + """Attach the parent conversation used to create sub-agent tasks.""" + self._ensure_parent(conversation) + def _ensure_parent(self, conversation: LocalConversation) -> None: if self._parent_conversation is None: self._parent_conversation = conversation diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index e861e99d74..e62d4f2ee8 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -16,6 +16,7 @@ if TYPE_CHECKING: from openhands.sdk.conversation.state import ConversationState + from openhands.tools.workflow.impl import WorkflowExecutor class WorkflowAction(Action): @@ -140,6 +141,3 @@ def create( register_tool(WorkflowToolSet.name, WorkflowToolSet) register_tool(WorkflowTool.name, WorkflowTool) - -if TYPE_CHECKING: - from openhands.tools.workflow.impl import WorkflowExecutor diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index f3067ed7b1..c46bc91d79 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -80,7 +80,7 @@ def __init__( self._max_concurrency = max_concurrency if manager is None: task_manager = TaskManager() - task_manager._ensure_parent(parent_conversation) + task_manager.attach_parent(parent_conversation) self._manager = task_manager else: self._manager = manager @@ -141,8 +141,7 @@ async def map_agents( ) async def run_one(item: Any) -> str: - rendered_prompt = _render_template(prompt, item) - assert rendered_prompt is not None + rendered_prompt = _render_required_template(prompt, item) rendered_description = _render_template(description, item) async with semaphore: return await self._run_agent_task( @@ -151,7 +150,23 @@ async def run_one(item: Any) -> str: description=rendered_description, ) - return list(await asyncio.gather(*(run_one(item) for item in items))) + results = await asyncio.gather( + *(run_one(item) for item in items), + return_exceptions=True, + ) + failures = [result for result in results if isinstance(result, BaseException)] + if failures: + exceptions = [ + failure + if isinstance(failure, Exception) + else RuntimeError(str(failure)) + for failure in failures + ] + raise ExceptionGroup( + "map_agents: one or more sub-agents failed", + exceptions, + ) + return [str(result) for result in results] async def reduce_agent( self, @@ -181,14 +196,18 @@ def close(self) -> None: self._manager.close() +def _render_required_template(template: Callable[[Any], str] | str, item: Any) -> str: + if callable(template): + return str(template(item)) + return template.format(item=item) + + def _render_template( template: Callable[[Any], str] | str | None, item: Any ) -> str | None: if template is None: return None - if callable(template): - return str(template(item)) - return template.format(item=item) + return _render_required_template(template, item) def _format_value(value: Any) -> str: diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index 1a0ed9d56e..a6b4c4ac98 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -103,6 +103,46 @@ async def main(wf): ] +def test_map_agents_reports_all_sub_agent_failures() -> None: + class FailingTaskManager(_FakeTaskManager): + def start_task( + self, + prompt: str, + subagent_type: str = "default", + resume: str | None = None, + description: str | None = None, + conversation: LocalConversation | None = None, + ) -> _FakeTask: + self.prompts.append(f"{subagent_type}: {prompt}") + if prompt in {"inspect bad", "inspect worse"}: + return _FakeTask(error=f"failed {prompt}") + return _FakeTask(result=f"result:{prompt}") + + script = """ +async def main(wf): + return await wf.map_agents( + items=["good", "bad", "worse"], + prompt="inspect {item}", + subagent_type="researcher", + ) +""" + manager = FailingTaskManager() + + with pytest.raises(ExceptionGroup) as exc_info: + execute_workflow_script(script, _context(manager)) + + assert "map_agents" in str(exc_info.value) + assert [str(exc) for exc in exc_info.value.exceptions] == [ + "failed inspect bad", + "failed inspect worse", + ] + assert set(manager.prompts) == { + "researcher: inspect good", + "researcher: inspect bad", + "researcher: inspect worse", + } + + def test_validate_workflow_script_rejects_missing_async_main() -> None: with pytest.raises(WorkflowScriptError, match="async main"): validate_workflow_script("def main(wf):\n return 'nope'\n") From 3b947f929566a87f346e92882e4475259c9affb0 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 23:39:28 +0000 Subject: [PATCH 04/28] Improve workflow failure diagnostics Co-authored-by: openhands --- .../openhands/tools/workflow/impl.py | 30 +++++++++++++++++-- tests/tools/workflow/test_workflow_tool.py | 16 +++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index c46bc91d79..76e232b7b4 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -131,7 +131,11 @@ async def map_agents( max_concurrency: int | None = None, description: Callable[[Any], str] | str | None = None, ) -> list[str]: - """Run one sub-agent task per item and return results in item order.""" + """Run one sub-agent task per item and return results in item order. + + A per-call ``max_concurrency`` overrides the context default for this map + operation only; it is not a nested subset of the context-wide limit. + """ if max_concurrency is not None and max_concurrency < 1: raise ValueError("max_concurrency must be at least 1") semaphore = ( @@ -282,7 +286,16 @@ def _attribute_root_name(node: ast.Attribute) -> str | None: def execute_workflow_script(script: str, context: WorkflowContext) -> Any: - """Validate and execute a workflow script against a workflow context.""" + """Validate and execute a workflow script from a synchronous context.""" + try: + asyncio.get_running_loop() + except RuntimeError: + pass + else: + raise WorkflowScriptError( + "Workflow scripts must be executed from a synchronous context" + ) + validate_workflow_script(script) namespace: dict[str, Any] = {} exec(compile(script, "", "exec"), _safe_globals(), namespace) @@ -292,6 +305,16 @@ def execute_workflow_script(script: str, context: WorkflowContext) -> Any: return asyncio.run(main(context)) +def _format_exception(error: Exception) -> str: + if isinstance(error, ExceptionGroup): + details = "\n".join( + f" [{index}] {exception}" + for index, exception in enumerate(error.exceptions, start=1) + ) + return f"{error.args[0]}:\n{details}" + return str(error) + + def _safe_globals() -> dict[str, Any]: safe_builtins = { "abs": abs, @@ -346,9 +369,10 @@ def __call__( status="completed", ) except Exception as e: + error_text = _format_exception(e) logger.warning("Workflow '%s' failed: %s", action.name, e, exc_info=True) return WorkflowObservation.from_text( - text=str(e), + text=error_text, name=action.name, status="error", is_error=True, diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index a6b4c4ac98..1fe863e89f 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -14,6 +14,7 @@ execute_workflow_script, validate_workflow_script, ) +from openhands.tools.workflow.impl import _format_exception @dataclass @@ -86,7 +87,7 @@ async def main(wf): assert manager.descriptions == ["job alpha", "job beta", "final summary"] -def test_map_agents_uses_default_concurrency_without_deadlock() -> None: +def test_map_agents_uses_context_default_concurrency_when_none_given() -> None: manager = _FakeTaskManager() script = """ async def main(wf): @@ -143,6 +144,19 @@ async def main(wf): } +def test_format_exception_includes_exception_group_details() -> None: + error = ExceptionGroup( + "map_agents: one or more sub-agents failed", + [RuntimeError("first failure"), RuntimeError("second failure")], + ) + + assert _format_exception(error) == ( + "map_agents: one or more sub-agents failed:\n" + " [1] first failure\n" + " [2] second failure" + ) + + def test_validate_workflow_script_rejects_missing_async_main() -> None: with pytest.raises(WorkflowScriptError, match="async main"): validate_workflow_script("def main(wf):\n return 'nope'\n") From 41bef42afd1e3d68c0650354d9c4746cccfb8fff Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 23:41:15 +0000 Subject: [PATCH 05/28] Tighten workflow script boundaries Co-authored-by: openhands --- .../openhands/tools/workflow/definition.py | 4 ++- .../openhands/tools/workflow/impl.py | 26 +++++++++++++-- tests/tools/workflow/test_workflow_tool.py | 33 ++++++++++++++++++- 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index e62d4f2ee8..1837a4f493 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -62,7 +62,9 @@ async def main(wf): The script coordinates sub-agents through the `wf` object. It should not read or write files, run shell commands, or perform the engineering work directly. Sub-agents should do that work through their normal OpenHands tools and security -policy. +policy. Scripts should use only the documented `wf` methods; private `wf` +attributes are rejected. Large reducer inputs may be truncated before being sent +to the reducer sub-agent. Available `wf` methods: - `await wf.run_agent(...)` diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 76e232b7b4..af3257a277 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -22,6 +22,7 @@ logger = get_logger(__name__) _MAX_SCRIPT_CHARS = 20_000 +_MAX_REDUCE_INPUT_CHARS = 12_000 _UNSAFE_CALLS = frozenset( { "breakpoint", @@ -216,8 +217,15 @@ def _render_template( def _format_value(value: Any) -> str: if isinstance(value, str): - return value - return jsonlib.dumps(value, indent=2, default=str) + text = value + else: + text = jsonlib.dumps(value, indent=2, default=str) + if len(text) <= _MAX_REDUCE_INPUT_CHARS: + return text + return ( + text[:_MAX_REDUCE_INPUT_CHARS] + + "\n... [truncated workflow intermediate results]" + ) def validate_workflow_script(script: str) -> None: @@ -255,6 +263,14 @@ def validate_workflow_script(script: str) -> None: raise WorkflowScriptError( "Workflow scripts may not access dunder attributes" ) + if ( + isinstance(node, ast.Attribute) + and _attribute_root_name(node) == "wf" + and node.attr.startswith("_") + ): + raise WorkflowScriptError( + "Workflow scripts may not access private wf attributes" + ) if ( isinstance(node, ast.Attribute) and _attribute_root_name(node) in _UNSAFE_ATTRIBUTE_ROOTS @@ -323,8 +339,11 @@ def _safe_globals() -> dict[str, Any]: "bool": bool, "dict": dict, "enumerate": enumerate, + "Exception": Exception, "float": float, + "IndexError": IndexError, "int": int, + "KeyError": KeyError, "len": len, "list": list, "max": max, @@ -333,9 +352,12 @@ def _safe_globals() -> dict[str, Any]: "round": round, "set": set, "sorted": sorted, + "RuntimeError": RuntimeError, "str": str, "sum": sum, "tuple": tuple, + "TypeError": TypeError, + "ValueError": ValueError, "zip": zip, } return {"__builtins__": safe_builtins} diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index 1fe863e89f..a36cf2835f 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -14,7 +14,7 @@ execute_workflow_script, validate_workflow_script, ) -from openhands.tools.workflow.impl import _format_exception +from openhands.tools.workflow.impl import _format_exception, _format_value @dataclass @@ -144,6 +144,27 @@ async def main(wf): } +def test_workflow_script_can_catch_common_exceptions() -> None: + script = """ +async def main(wf): + try: + raise ValueError("recoverable") + except ValueError as exc: + return str(exc) +""" + + assert ( + execute_workflow_script(script, _context(_FakeTaskManager())) == "recoverable" + ) + + +def test_format_value_truncates_large_intermediate_results() -> None: + value = _format_value("x" * 12_050) + + assert len(value) < 12_100 + assert value.endswith("[truncated workflow intermediate results]") + + def test_format_exception_includes_exception_group_details() -> None: error = ExceptionGroup( "map_agents: one or more sub-agents failed", @@ -172,6 +193,16 @@ async def main(wf): validate_workflow_script(script) +def test_validate_workflow_script_rejects_private_wf_access() -> None: + script = """ +async def main(wf): + return wf._parent_conversation +""" + + with pytest.raises(WorkflowScriptError, match="private wf attributes"): + validate_workflow_script(script) + + def test_validate_workflow_script_rejects_unsafe_module_access() -> None: script = """ async def main(wf): From 52228169d4c4dc6e659e6c272f1582aa3fb11729 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 04:55:35 +0000 Subject: [PATCH 06/28] chore: address PR review feedback (#3426) - add isinstance, type, print, repr to _safe_globals so generated scripts can use them - remove execute_workflow_script/validate_workflow_script from __all__ (internal helpers) - update test imports to use openhands.tools.workflow.impl for internal helpers - add test_run_agent_returns_task_result to cover semaphore path and subagent_type Co-authored-by: openhands --- .../openhands/tools/workflow/__init__.py | 4 ---- openhands-tools/openhands/tools/workflow/impl.py | 6 +++++- tests/tools/workflow/test_workflow_tool.py | 16 +++++++++++++++- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/__init__.py b/openhands-tools/openhands/tools/workflow/__init__.py index 6c392b6f5d..f55cc80b03 100644 --- a/openhands-tools/openhands/tools/workflow/__init__.py +++ b/openhands-tools/openhands/tools/workflow/__init__.py @@ -10,8 +10,6 @@ WorkflowContext, WorkflowExecutor, WorkflowScriptError, - execute_workflow_script, - validate_workflow_script, ) @@ -23,6 +21,4 @@ "WorkflowScriptError", "WorkflowTool", "WorkflowToolSet", - "execute_workflow_script", - "validate_workflow_script", ] diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index af3257a277..8a222d98c5 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -343,19 +343,23 @@ def _safe_globals() -> dict[str, Any]: "float": float, "IndexError": IndexError, "int": int, + "isinstance": isinstance, "KeyError": KeyError, "len": len, "list": list, "max": max, "min": min, + "print": print, "range": range, + "repr": repr, "round": round, + "RuntimeError": RuntimeError, "set": set, "sorted": sorted, - "RuntimeError": RuntimeError, "str": str, "sum": sum, "tuple": tuple, + "type": type, "TypeError": TypeError, "ValueError": ValueError, "zip": zip, diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index a36cf2835f..f692e6e6bb 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -11,10 +11,13 @@ WorkflowContext, WorkflowExecutor, WorkflowScriptError, +) +from openhands.tools.workflow.impl import ( + _format_exception, + _format_value, execute_workflow_script, validate_workflow_script, ) -from openhands.tools.workflow.impl import _format_exception, _format_value @dataclass @@ -87,6 +90,17 @@ async def main(wf): assert manager.descriptions == ["job alpha", "job beta", "final summary"] +def test_run_agent_returns_task_result() -> None: + manager = _FakeTaskManager() + script = """ +async def main(wf): + return await wf.run_agent("do the thing", subagent_type="analyst") +""" + result = execute_workflow_script(script, _context(manager)) + assert result == "result:do the thing" + assert manager.prompts == ["analyst: do the thing"] + + def test_map_agents_uses_context_default_concurrency_when_none_given() -> None: manager = _FakeTaskManager() script = """ From c35e9d520a99ea07463f33a53d71cc379a266cb0 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 05:09:12 +0000 Subject: [PATCH 07/28] chore: address PR review feedback (#3426) - fix string template format bypass: use plain replace() instead of .format() to prevent format mini-language attribute traversal bypassing AST guard - document wf.flatten() as one-level-only in tool description - add test_workflow_executor_success_path covering context.close() and WorkflowObservation returned on the happy path Co-authored-by: openhands --- .../openhands/tools/workflow/definition.py | 2 +- .../openhands/tools/workflow/impl.py | 4 +++- tests/tools/workflow/test_workflow_tool.py | 22 +++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index 1837a4f493..c56f4b85b1 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -70,7 +70,7 @@ async def main(wf): - `await wf.run_agent(...)` - `await wf.map_agents(...)` - `await wf.reduce_agent(...)` -- `wf.flatten(values)` +- `wf.flatten(values)` — flatten one level of nesting (not recursive) `map_agents` accepts either a callable prompt, such as `lambda item: f"Review this finding: {item}"`, or a string template containing diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 8a222d98c5..5b0fb556b2 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -204,7 +204,9 @@ def close(self) -> None: def _render_required_template(template: Callable[[Any], str] | str, item: Any) -> str: if callable(template): return str(template(item)) - return template.format(item=item) + # Plain replace avoids Python's format mini-language attribute traversal + # (e.g. "{item._manager}"), which would bypass the AST private-attribute guard. + return template.replace("{item}", str(item)) def _render_template( diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index f692e6e6bb..31d29d4d71 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -251,3 +251,25 @@ def test_workflow_context_helper_flattens_one_level() -> None: context = _context(_FakeTaskManager()) assert context.flatten([[1, 2], 3, [4]]) == [1, 2, 3, 4] + + +def test_workflow_executor_success_path() -> None: + @dataclass + class _FakeState: + persistence_dir: str | None = None + + @dataclass + class _FakeConv: + state: _FakeState + + conv = cast(LocalConversation, _FakeConv(state=_FakeState())) + action = WorkflowAction( + name="trivial", + script="async def main(wf):\n return 'done'", + ) + + obs = WorkflowExecutor()(action, conversation=conv) + + assert not obs.is_error + assert obs.status == "completed" + assert obs.text == "done" From cd0aeb19977677fc14e318198b630155b11aec53 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 05:18:52 +0000 Subject: [PATCH 08/28] chore: address PR review feedback (#3426) - restrict _UNSAFE_CALLS check to ast.Name only: method calls like result_obj.open() or df.eval() are no longer false positives - remove _call_name helper (now unused) - remove type() from _safe_globals (isinstance covers workflow needs) - add 1-hour timeout to execute_workflow_script via asyncio.timeout; TimeoutError is surfaced as WorkflowScriptError Co-authored-by: openhands --- .../openhands/tools/workflow/impl.py | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 5b0fb556b2..dcbdeffd15 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -23,6 +23,7 @@ _MAX_SCRIPT_CHARS = 20_000 _MAX_REDUCE_INPUT_CHARS = 12_000 +_WORKFLOW_TIMEOUT_SECONDS = 3600.0 # 1 hour; prevents indefinitely hung workflows _UNSAFE_CALLS = frozenset( { "breakpoint", @@ -278,22 +279,13 @@ def validate_workflow_script(script: str) -> None: and _attribute_root_name(node) in _UNSAFE_ATTRIBUTE_ROOTS ): raise WorkflowScriptError("Workflow scripts may not access unsafe modules") - if isinstance(node, ast.Call): - call_name = _call_name(node.func) - if call_name in _UNSAFE_CALLS: + if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): + if node.func.id in _UNSAFE_CALLS: raise WorkflowScriptError( - f"Workflow scripts may not call `{call_name}`" + f"Workflow scripts may not call `{node.func.id}`" ) -def _call_name(func: ast.expr) -> str | None: - if isinstance(func, ast.Name): - return func.id - if isinstance(func, ast.Attribute): - return func.attr - return None - - def _attribute_root_name(node: ast.Attribute) -> str | None: value = node.value while isinstance(value, ast.Attribute): @@ -320,7 +312,17 @@ def execute_workflow_script(script: str, context: WorkflowContext) -> Any: main = namespace.get("main") if not inspect.iscoroutinefunction(main): raise WorkflowScriptError("Workflow entry point must be async") - return asyncio.run(main(context)) + + async def _run_with_timeout() -> Any: + async with asyncio.timeout(_WORKFLOW_TIMEOUT_SECONDS): + return await main(context) + + try: + return asyncio.run(_run_with_timeout()) + except TimeoutError: + raise WorkflowScriptError( + f"Workflow timed out after {_WORKFLOW_TIMEOUT_SECONDS:.0f} seconds" + ) from None def _format_exception(error: Exception) -> str: @@ -361,7 +363,6 @@ def _safe_globals() -> dict[str, Any]: "str": str, "sum": sum, "tuple": tuple, - "type": type, "TypeError": TypeError, "ValueError": ValueError, "zip": zip, From d100fd84679db680ab8b2d53b045eb5ec6e9595a Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 05:31:55 +0000 Subject: [PATCH 09/28] chore: address PR review feedback (#3426) - WorkflowTool.create() now accepts optional conv_state and executor; executor defaults to WorkflowExecutor() so auto-create path works - flatten() type hint narrowed from Sequence[Any] to list[Any] to match actual behavior (only list elements are spread; other types appended) Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/definition.py | 7 +++++-- openhands-tools/openhands/tools/workflow/impl.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index c56f4b85b1..c42f15a2dc 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -108,9 +108,12 @@ class WorkflowTool(ToolDefinition[WorkflowAction, WorkflowObservation]): @classmethod def create( cls, - executor: "WorkflowExecutor", + conv_state: "ConversationState | None" = None, # noqa: ARG003 + executor: "WorkflowExecutor | None" = None, description: str = _WORKFLOW_DESCRIPTION, ) -> Sequence["WorkflowTool"]: + from openhands.tools.workflow.impl import WorkflowExecutor + return [ cls( action_type=WorkflowAction, @@ -123,7 +126,7 @@ def create( idempotentHint=False, openWorldHint=True, ), - executor=executor, + executor=executor if executor is not None else WorkflowExecutor(), ) ] diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index dcbdeffd15..1f61ba39a7 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -188,7 +188,7 @@ async def reduce_agent( description=description, ) - def flatten(self, values: Sequence[Any]) -> list[Any]: + def flatten(self, values: list[Any]) -> list[Any]: """Flatten one list level.""" flattened: list[Any] = [] for value in values: From ab75043cd718a8021a0ddef9b9472addb934cfa2 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 05:40:55 +0000 Subject: [PATCH 10/28] chore: restore type() to _safe_globals (#3426) type() was removed based on earlier review feedback about 3-arg dynamic class creation. A subsequent review identified this as a correctness gap since 1-arg type(x) is commonly used in workflow scripts. Re-adding it alongside isinstance; blast radius of 3-arg form remains limited given no-import constraint. Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/impl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 1f61ba39a7..6b57cadeab 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -348,6 +348,7 @@ def _safe_globals() -> dict[str, Any]: "IndexError": IndexError, "int": int, "isinstance": isinstance, + "type": type, "KeyError": KeyError, "len": len, "list": list, From 9597084f436e491b124c2516c367fd5d5958a7ea Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 05:56:40 +0000 Subject: [PATCH 11/28] chore: address PR review feedback (#3426) - map_agents wraps failures with [item N] prefix for identity tracking - add comment explaining _UNSAFE_ATTRIBUTE_ROOTS intentional narrowness - update _WORKFLOW_DESCRIPTION to show optional params for all wf methods - add reduce_agent docstring noting semaphore sharing with run_agent - update test assertion for new [item N] error format Co-authored-by: openhands --- .../openhands/tools/workflow/definition.py | 8 ++++-- .../openhands/tools/workflow/impl.py | 28 +++++++++++++------ tests/tools/workflow/test_workflow_tool.py | 4 +-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index c42f15a2dc..fa5e73ffb4 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -67,9 +67,11 @@ async def main(wf): to the reducer sub-agent. Available `wf` methods: -- `await wf.run_agent(...)` -- `await wf.map_agents(...)` -- `await wf.reduce_agent(...)` +- `await wf.run_agent(prompt, subagent_type="general-purpose", description=None)` +- `await wf.map_agents(items, prompt, subagent_type="general-purpose", + max_concurrency=None, description=None)` +- `await wf.reduce_agent(items, prompt, subagent_type="general-purpose", + description=None)` - `wf.flatten(values)` — flatten one level of nesting (not recursive) `map_agents` accepts either a callable prompt, such as diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 6b57cadeab..008dcb5a2d 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -42,6 +42,10 @@ "__import__", } ) +# Attribute-root deny-list is intentionally narrow: scripts cannot import +# modules, so only names that are pre-injected via _safe_globals() need to +# be listed here. os and subprocess are the two that would be most harmful +# if they were ever inadvertently exposed. _UNSAFE_ATTRIBUTE_ROOTS = frozenset({"os", "subprocess"}) @@ -146,18 +150,21 @@ async def map_agents( else self._default_semaphore ) - async def run_one(item: Any) -> str: + async def run_one(index: int, item: Any) -> str: rendered_prompt = _render_required_template(prompt, item) rendered_description = _render_template(description, item) async with semaphore: - return await self._run_agent_task( - prompt=rendered_prompt, - subagent_type=subagent_type, - description=rendered_description, - ) + try: + return await self._run_agent_task( + prompt=rendered_prompt, + subagent_type=subagent_type, + description=rendered_description, + ) + except Exception as exc: + raise RuntimeError(f"[item {index}] {exc}") from exc results = await asyncio.gather( - *(run_one(item) for item in items), + *(run_one(i, item) for i, item in enumerate(items)), return_exceptions=True, ) failures = [result for result in results if isinstance(result, BaseException)] @@ -181,7 +188,12 @@ async def reduce_agent( subagent_type: str = "general-purpose", description: str | None = None, ) -> str: - """Run a single reducer sub-agent with serialized intermediate results.""" + """Run a single reducer sub-agent with serialized intermediate results. + + Delegates to ``run_agent``, which acquires ``_default_semaphore``. + Counts against the shared concurrency limit if called while a + ``map_agents`` operation is in progress. + """ return await self.run_agent( prompt=f"{prompt}\n\nInput:\n{_format_value(items)}", subagent_type=subagent_type, diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index 31d29d4d71..9276508691 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -148,8 +148,8 @@ async def main(wf): assert "map_agents" in str(exc_info.value) assert [str(exc) for exc in exc_info.value.exceptions] == [ - "failed inspect bad", - "failed inspect worse", + "[item 1] failed inspect bad", + "[item 2] failed inspect worse", ] assert set(manager.prompts) == { "researcher: inspect good", From bb8d81273ab288f0900f1a7815d682fc230bb0fe Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 06:08:12 +0000 Subject: [PATCH 12/28] chore: address PR review feedback (#3426) - switch [item N] labels to 1-based indexing for user-facing consistency with _format_exception's 1-based numbering - add idempotency note to TaskManager.attach_parent docstring - update test assertion to match 1-based item indices Co-authored-by: openhands --- openhands-tools/openhands/tools/task/manager.py | 6 +++++- openhands-tools/openhands/tools/workflow/impl.py | 2 +- tests/tools/workflow/test_workflow_tool.py | 4 ++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/openhands-tools/openhands/tools/task/manager.py b/openhands-tools/openhands/tools/task/manager.py index 0d7e4f9999..cb38bdf918 100644 --- a/openhands-tools/openhands/tools/task/manager.py +++ b/openhands-tools/openhands/tools/task/manager.py @@ -107,7 +107,11 @@ def __init__( self._persistence_dir: Path | None = None def attach_parent(self, conversation: LocalConversation) -> None: - """Attach the parent conversation used to create sub-agent tasks.""" + """Attach the parent conversation used to create sub-agent tasks. + + Idempotent: if a parent conversation is already attached, subsequent + calls with the same or a different conversation have no effect. + """ self._ensure_parent(conversation) def _ensure_parent(self, conversation: LocalConversation) -> None: diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 008dcb5a2d..19afe67245 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -161,7 +161,7 @@ async def run_one(index: int, item: Any) -> str: description=rendered_description, ) except Exception as exc: - raise RuntimeError(f"[item {index}] {exc}") from exc + raise RuntimeError(f"[item {index + 1}] {exc}") from exc results = await asyncio.gather( *(run_one(i, item) for i, item in enumerate(items)), diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index 9276508691..805ce9b28a 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -148,8 +148,8 @@ async def main(wf): assert "map_agents" in str(exc_info.value) assert [str(exc) for exc in exc_info.value.exceptions] == [ - "[item 1] failed inspect bad", - "[item 2] failed inspect worse", + "[item 2] failed inspect bad", + "[item 3] failed inspect worse", ] assert set(manager.prompts) == { "researcher: inspect good", From 2515653517ec6d3dbe9eccaf35857de986b3856b Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 06:20:48 +0000 Subject: [PATCH 13/28] chore: address PR review feedback (#3426) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - validate_workflow_script now rejects kwonlyargs, vararg, kwarg in main() signature to surface clean WorkflowScriptError instead of confusing runtime TypeError - WorkflowContext.close() is now idempotent — guards against double-close and against scripts calling wf.close() mid-execution - _WORKFLOW_DESCRIPTION adds note that subagent_type must be a registered sub-agent type in the parent application Co-authored-by: openhands --- .../openhands/tools/workflow/definition.py | 3 +++ openhands-tools/openhands/tools/workflow/impl.py | 12 +++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index fa5e73ffb4..8b41c4d2c3 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -74,6 +74,9 @@ async def main(wf): description=None)` - `wf.flatten(values)` — flatten one level of nesting (not recursive) +`subagent_type` must be a sub-agent type registered in the parent application. +Use the same type names you registered when building your agent. + `map_agents` accepts either a callable prompt, such as `lambda item: f"Review this finding: {item}"`, or a string template containing `{item}`. diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 19afe67245..f5be5cb7d6 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -91,6 +91,7 @@ def __init__( else: self._manager = manager self._semaphore: asyncio.Semaphore | None = None + self._closed = False @property def _default_semaphore(self) -> asyncio.Semaphore: @@ -211,6 +212,9 @@ def flatten(self, values: list[Any]) -> list[Any]: return flattened def close(self) -> None: + if self._closed: + return + self._closed = True self._manager.close() @@ -266,7 +270,13 @@ def validate_workflow_script(script: str) -> None: ) main_def = main_defs[0] - if len(main_def.args.args) != 1 or main_def.args.args[0].arg != "wf": + if ( + len(main_def.args.args) != 1 + or main_def.args.args[0].arg != "wf" + or main_def.args.kwonlyargs + or main_def.args.vararg + or main_def.args.kwarg + ): raise WorkflowScriptError("Workflow entry point must be `async def main(wf):`") for node in ast.walk(tree): From 76f4ee584ff5fadfb89d177de0a147612aa36ba7 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 06:34:39 +0000 Subject: [PATCH 14/28] chore: address PR review feedback (#3426) - _run_agent_task now guards against closed-context calls with a clean WorkflowScriptError instead of an opaque manager error Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/impl.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index f5be5cb7d6..405227071d 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -119,6 +119,8 @@ async def _run_agent_task( subagent_type: str, description: str | None, ) -> str: + if self._closed: + raise WorkflowScriptError("WorkflowContext is already closed") task = await asyncio.to_thread( self._manager.start_task, prompt=prompt, From c8c50ee1246531268ccde5a8cf17738dc2be1bae Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 06:45:36 +0000 Subject: [PATCH 15/28] chore: address PR review feedback (#3426) - cap per-call map_agents max_concurrency at context limit with min() to prevent scripts from bypassing WorkflowAction.max_concurrency cap - add note to _WORKFLOW_DESCRIPTION that wf.close() is not part of the scripting API and must not be called from generated scripts - add tests for close() propagation to manager and idempotency Co-authored-by: openhands --- .../openhands/tools/workflow/definition.py | 3 +++ .../openhands/tools/workflow/impl.py | 6 +++--- tests/tools/workflow/test_workflow_tool.py | 18 ++++++++++++++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index 8b41c4d2c3..a79a545d02 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -77,6 +77,9 @@ async def main(wf): `subagent_type` must be a sub-agent type registered in the parent application. Use the same type names you registered when building your agent. +Scripts must use only the documented `wf` methods listed above; calling +`wf.close()` or any other undocumented attribute is not supported. + `map_agents` accepts either a callable prompt, such as `lambda item: f"Review this finding: {item}"`, or a string template containing `{item}`. diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 405227071d..d5988edf6b 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -142,13 +142,13 @@ async def map_agents( ) -> list[str]: """Run one sub-agent task per item and return results in item order. - A per-call ``max_concurrency`` overrides the context default for this map - operation only; it is not a nested subset of the context-wide limit. + A per-call ``max_concurrency`` caps concurrency for this map operation + only; it is silently capped at the context's ``max_concurrency`` limit. """ if max_concurrency is not None and max_concurrency < 1: raise ValueError("max_concurrency must be at least 1") semaphore = ( - asyncio.Semaphore(max_concurrency) + asyncio.Semaphore(min(max_concurrency, self._max_concurrency)) if max_concurrency is not None else self._default_semaphore ) diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index 805ce9b28a..7f0bef7a78 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -273,3 +273,21 @@ class _FakeConv: assert not obs.is_error assert obs.status == "completed" assert obs.text == "done" + + +def test_workflow_context_close_propagates_to_manager() -> None: + manager = _FakeTaskManager() + context = _context(manager) + + assert not manager.closed + context.close() + assert manager.closed + + +def test_workflow_context_close_is_idempotent() -> None: + manager = _FakeTaskManager() + context = _context(manager) + + context.close() + context.close() # second call must not raise + assert manager.closed From 3a480eba1f393d69d4cc51b08080bfe1c7a0b2a7 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 06:56:34 +0000 Subject: [PATCH 16/28] chore: address PR review feedback (#3426) - also check args.defaults and args.posonlyargs in main() signature validation to close gap between documented contract and actual check - add comment to max_iteration_per_run in example noting it can be increased if the agent needs more turns to generate the script Co-authored-by: openhands --- examples/01_standalone_sdk/52_dynamic_workflow.py | 2 +- openhands-tools/openhands/tools/workflow/impl.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/01_standalone_sdk/52_dynamic_workflow.py b/examples/01_standalone_sdk/52_dynamic_workflow.py index 08379cf2e0..d3ccbd4e56 100644 --- a/examples/01_standalone_sdk/52_dynamic_workflow.py +++ b/examples/01_standalone_sdk/52_dynamic_workflow.py @@ -95,7 +95,7 @@ def create_coverage_auditor(llm: LLM) -> Agent: agent=parent_agent, workspace=Path.cwd(), visualizer=DelegationVisualizer(name="CoverageWorkflow"), - max_iteration_per_run=6, + max_iteration_per_run=6, # increase if more turns needed to write the script ) conversation.send_message( diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index d5988edf6b..949a34ea18 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -278,6 +278,8 @@ def validate_workflow_script(script: str) -> None: or main_def.args.kwonlyargs or main_def.args.vararg or main_def.args.kwarg + or main_def.args.defaults + or main_def.args.posonlyargs ): raise WorkflowScriptError("Workflow entry point must be `async def main(wf):`") From aeafbcb99609e71b647e29c39a0be42f800066a2 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 07:08:14 +0000 Subject: [PATCH 17/28] chore: address PR review feedback (#3426) - add test_run_agent_raises_after_close() to verify closed-context guard - add test_map_agents_respects_context_concurrency_cap() to cover the min(max_concurrency, self._max_concurrency) regression path - update WorkflowTool docstring to clarify distinction from WorkflowToolSet and note that WorkflowToolSet is the standard SDK entrypoint Co-authored-by: openhands --- .../openhands/tools/workflow/definition.py | 7 ++++- tests/tools/workflow/test_workflow_tool.py | 27 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index a79a545d02..338860f1e9 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -111,7 +111,12 @@ async def main(wf): class WorkflowTool(ToolDefinition[WorkflowAction, WorkflowObservation]): - """Tool for running a dynamic Python workflow.""" + """Low-level tool for explicit executor injection. + + Prefer ``WorkflowToolSet`` for standard SDK auto-create usage. + Use ``WorkflowTool`` when you need to inject a custom executor + (e.g., in tests or extensions). + """ @classmethod def create( diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index 7f0bef7a78..bb30b75d2b 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from dataclasses import dataclass from typing import cast @@ -291,3 +292,29 @@ def test_workflow_context_close_is_idempotent() -> None: context.close() context.close() # second call must not raise assert manager.closed + + +def test_run_agent_raises_after_close() -> None: + manager = _FakeTaskManager() + context = _context(manager) + context.close() + + with pytest.raises(WorkflowScriptError, match="already closed"): + asyncio.run(context.run_agent("any prompt")) + + +def test_map_agents_respects_context_concurrency_cap() -> None: + """Per-call max_concurrency must be silently capped at context max.""" + script = """ +async def main(wf): + results = await wf.map_agents( + items=["a", "b"], + prompt="ping {item}", + subagent_type="researcher", + max_concurrency=1000, + ) + return results +""" + manager = _FakeTaskManager() + context = _context(manager) + execute_workflow_script(script, context) From e33dd3992be73968dba2bb8cda9911833e37bb49 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 07:21:03 +0000 Subject: [PATCH 18/28] chore: address PR review feedback (#3426) - remove 'type' from _safe_globals to close three-argument dynamic class creation footgun; isinstance covers all workflow use cases - add peak concurrency tracking to test_map_agents_respects_context_ concurrency_cap to verify the min() cap is actually enforced at runtime Co-authored-by: openhands --- .../openhands/tools/workflow/impl.py | 1 - tests/tools/workflow/test_workflow_tool.py | 51 +++++++++++++++---- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 949a34ea18..fd05458a7f 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -374,7 +374,6 @@ def _safe_globals() -> dict[str, Any]: "IndexError": IndexError, "int": int, "isinstance": isinstance, - "type": type, "KeyError": KeyError, "len": len, "list": list, diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index bb30b75d2b..2564ec2bb5 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import threading from dataclasses import dataclass from typing import cast @@ -49,10 +50,10 @@ def close(self) -> None: self.closed = True -def _context(manager: _FakeTaskManager) -> WorkflowContext: +def _context(manager: _FakeTaskManager, max_concurrency: int = 4) -> WorkflowContext: return WorkflowContext( parent_conversation=cast(LocalConversation, object()), - max_concurrency=4, + max_concurrency=max_concurrency, manager=manager, ) @@ -305,16 +306,48 @@ def test_run_agent_raises_after_close() -> None: def test_map_agents_respects_context_concurrency_cap() -> None: """Per-call max_concurrency must be silently capped at context max.""" + + class _PeakTrackingManager(_FakeTaskManager): + def __init__(self) -> None: + super().__init__() + self._active = 0 + self.peak_active = 0 + self._lock = threading.Lock() + + def start_task( + self, + prompt: str, + subagent_type: str = "default", + resume: str | None = None, + description: str | None = None, + conversation: LocalConversation | None = None, + ) -> _FakeTask: + with self._lock: + self._active += 1 + self.peak_active = max(self.peak_active, self._active) + try: + return super().start_task( + prompt, + subagent_type=subagent_type, + resume=resume, + description=description, + conversation=conversation, + ) + finally: + with self._lock: + self._active -= 1 + + # Context capped at 3; per-call max_concurrency=1000 should be min'd to 3 + context_cap = 3 + manager = _PeakTrackingManager() + context = _context(manager, max_concurrency=context_cap) script = """ async def main(wf): - results = await wf.map_agents( - items=["a", "b"], - prompt="ping {item}", - subagent_type="researcher", + return await wf.map_agents( + items=list(range(10)), + prompt="task {item}", max_concurrency=1000, ) - return results """ - manager = _FakeTaskManager() - context = _context(manager) execute_workflow_script(script, context) + assert manager.peak_active <= context_cap From b7229885d1b6c710e540d309312cb2d6862b5a61 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 07:31:19 +0000 Subject: [PATCH 19/28] chore: address PR review feedback (#3426) - add comment in _format_value noting character-boundary truncation limitation and that element-boundary truncation is a post-MVP follow-up - add note to _WORKFLOW_DESCRIPTION clarifying print() goes to server logs rather than the workflow observation seen by the LLM Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/definition.py | 4 ++++ openhands-tools/openhands/tools/workflow/impl.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index 338860f1e9..c7bcd32219 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -80,6 +80,10 @@ async def main(wf): Scripts must use only the documented `wf` methods listed above; calling `wf.close()` or any other undocumented attribute is not supported. +`print()` is available for debugging but writes to the server logs, not to +the workflow observation seen by the LLM; use the return value of `main()` to +surface results. + `map_agents` accepts either a callable prompt, such as `lambda item: f"Review this finding: {item}"`, or a string template containing `{item}`. diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index fd05458a7f..e220478c25 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -243,6 +243,8 @@ def _format_value(value: Any) -> str: text = jsonlib.dumps(value, indent=2, default=str) if len(text) <= _MAX_REDUCE_INPUT_CHARS: return text + # Character-boundary truncation can split mid-token in JSON; element-boundary + # truncation for list/dict inputs would be cleaner but is deferred post-MVP. return ( text[:_MAX_REDUCE_INPUT_CHARS] + "\n... [truncated workflow intermediate results]" From 05b78eed169d969c94351f8a9e36e219d12da49e Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 07:44:20 +0000 Subject: [PATCH 20/28] chore: address PR review feedback (#3426) - add note to _WORKFLOW_DESCRIPTION that ExceptionGroup is not in _safe_globals so partial map_agents failures propagate to the parent agent; handle per-item failures via sentinel return values instead - update reduce_agent docstring to accurately state the invariant: since workflow scripts always await sequentially, the semaphore is always fully available when reduce_agent is called Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/definition.py | 6 ++++++ openhands-tools/openhands/tools/workflow/impl.py | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index c7bcd32219..248299e73a 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -84,6 +84,12 @@ async def main(wf): the workflow observation seen by the LLM; use the return value of `main()` to surface results. +If one or more `map_agents` items fail, the whole call raises an +`ExceptionGroup`. Workflow scripts cannot catch `ExceptionGroup` because it is +not included in `_safe_globals`; the error propagates to the parent agent as an +error observation. To handle partial failures, design sub-agent prompts to +return an error sentinel value instead of raising. + `map_agents` accepts either a callable prompt, such as `lambda item: f"Review this finding: {item}"`, or a string template containing `{item}`. diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index e220478c25..db0ca472de 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -194,8 +194,8 @@ async def reduce_agent( """Run a single reducer sub-agent with serialized intermediate results. Delegates to ``run_agent``, which acquires ``_default_semaphore``. - Counts against the shared concurrency limit if called while a - ``map_agents`` operation is in progress. + Workflow scripts always await operations sequentially, so the semaphore + is always fully available when ``reduce_agent`` is called. """ return await self.run_agent( prompt=f"{prompt}\n\nInput:\n{_format_value(items)}", From da124131585a75e277227ce85a4b9b2fe415712b Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 08:05:53 +0000 Subject: [PATCH 21/28] chore: address PR review feedback (#3426) - correct ExceptionGroup documentation: it IS catchable via 'except Exception', it is the 'except*' selective-group syntax that requires naming ExceptionGroup (not in _safe_globals); clarify both points - narrow WorkflowToolSet.create return type from Sequence[ToolDefinition] to Sequence[WorkflowTool] for type-checker precision Co-authored-by: openhands --- .../openhands/tools/workflow/definition.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index 248299e73a..c13d1675e6 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -85,10 +85,11 @@ async def main(wf): surface results. If one or more `map_agents` items fail, the whole call raises an -`ExceptionGroup`. Workflow scripts cannot catch `ExceptionGroup` because it is -not included in `_safe_globals`; the error propagates to the parent agent as an -error observation. To handle partial failures, design sub-agent prompts to -return an error sentinel value instead of raising. +`ExceptionGroup`. The name `ExceptionGroup` is not available by name in the +workflow sandbox, so scripts cannot use `except*` for selective group handling. +A plain `except Exception` will still catch the entire group. To handle partial +failures and collect all results, design sub-agent prompts to return an error +sentinel value instead of raising. `map_agents` accepts either a callable prompt, such as `lambda item: f"Review this finding: {item}"`, or a string template containing @@ -161,7 +162,7 @@ class WorkflowToolSet(ToolDefinition[WorkflowAction, WorkflowObservation]): def create( cls, conv_state: "ConversationState", # noqa: ARG003 - ) -> Sequence[ToolDefinition]: + ) -> Sequence[WorkflowTool]: from openhands.tools.workflow.impl import WorkflowExecutor return WorkflowTool.create(executor=WorkflowExecutor()) From fe4a34e86922deb393045744ccfd8292b9dee113 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 08:26:07 +0000 Subject: [PATCH 22/28] chore: address PR review feedback (#3426) - extend wf attribute guard to also block wf.close() by name, making AST enforcement match the tool description's 'unsupported' warning - add test_validate_workflow_script_rejects_wf_close() to verify Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/impl.py | 3 ++- tests/tools/workflow/test_workflow_tool.py | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index db0ca472de..36d70032ae 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -297,10 +297,11 @@ def validate_workflow_script(script: str) -> None: if ( isinstance(node, ast.Attribute) and _attribute_root_name(node) == "wf" - and node.attr.startswith("_") + and (node.attr.startswith("_") or node.attr == "close") ): raise WorkflowScriptError( "Workflow scripts may not access private wf attributes" + " or call wf.close()" ) if ( isinstance(node, ast.Attribute) diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index 2564ec2bb5..bf9fe2152a 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -219,6 +219,16 @@ async def main(wf): validate_workflow_script(script) +def test_validate_workflow_script_rejects_wf_close() -> None: + script = """ +async def main(wf): + wf.close() +""" + + with pytest.raises(WorkflowScriptError, match="wf.close"): + validate_workflow_script(script) + + def test_validate_workflow_script_rejects_unsafe_module_access() -> None: script = """ async def main(wf): From 97ec7a29e0312e533897306a4916ddd732c9a0a6 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 08:42:05 +0000 Subject: [PATCH 23/28] chore: address PR review feedback (#3426) - log debug warning when map_agents string template has no '{item}' placeholder (silent no-op would send identical prompt to every sub-agent) - add comment to _run_agent_task explaining resume is intentionally not forwarded through WorkflowContext (MVP simplification) Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/impl.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 36d70032ae..af9e903e98 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -119,6 +119,9 @@ async def _run_agent_task( subagent_type: str, description: str | None, ) -> str: + # Note: `_TaskStarter.start_task` accepts a `resume` parameter, but + # workflow sub-agents are always fresh tasks; resumption is intentionally + # not exposed through WorkflowContext in the MVP. if self._closed: raise WorkflowScriptError("WorkflowContext is already closed") task = await asyncio.to_thread( @@ -225,6 +228,11 @@ def _render_required_template(template: Callable[[Any], str] | str, item: Any) - return str(template(item)) # Plain replace avoids Python's format mini-language attribute traversal # (e.g. "{item._manager}"), which would bypass the AST private-attribute guard. + if "{item}" not in template: + logger.debug( + "map_agents string template does not contain '{item}'; " + "all sub-agents will receive the same prompt." + ) return template.replace("{item}", str(item)) From 494914489ff17469ccac5b89f7857c1bc4314cf8 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 08:57:08 +0000 Subject: [PATCH 24/28] chore: address PR review feedback (#3426) - add 'format' to _safe_globals so f-strings with format specifiers (e.g. f'{value:.2f}') work without NameError in workflow scripts - use json.dumps for non-str items in _render_required_template so dict/list items are serialised consistently with reduce_agent's path Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/impl.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index af9e903e98..540e4bb252 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -233,7 +233,10 @@ def _render_required_template(template: Callable[[Any], str] | str, item: Any) - "map_agents string template does not contain '{item}'; " "all sub-agents will receive the same prompt." ) - return template.replace("{item}", str(item)) + # Use json.dumps for non-str items so dicts/lists are consistently serialised + # with double-quoted JSON syntax, matching the reduce_agent serialisation path. + serialised = item if isinstance(item, str) else jsonlib.dumps(item, default=str) + return template.replace("{item}", serialised) def _render_template( @@ -403,6 +406,7 @@ def _safe_globals() -> dict[str, Any]: "TypeError": TypeError, "ValueError": ValueError, "zip": zip, + "format": format, } return {"__builtins__": safe_builtins} From a60cdbe8d589a22212da2d01bc5832a771bb1d90 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 09:05:55 +0000 Subject: [PATCH 25/28] chore: address PR review feedback (#3426) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - extend json.dumps comment to mention scalars (booleans → true/false, None → null) explicitly, matching the reviewer suggestion Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/impl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 540e4bb252..9faf51a9c8 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -233,8 +233,8 @@ def _render_required_template(template: Callable[[Any], str] | str, item: Any) - "map_agents string template does not contain '{item}'; " "all sub-agents will receive the same prompt." ) - # Use json.dumps for non-str items so dicts/lists are consistently serialised - # with double-quoted JSON syntax, matching the reduce_agent serialisation path. + # Use json.dumps for non-str items so dicts/lists and scalars are consistently + # serialised as JSON (booleans → true/false, None → null), matching reduce_agent. serialised = item if isinstance(item, str) else jsonlib.dumps(item, default=str) return template.replace("{item}", serialised) From eb267fa83e0194e7592e20729d263ace4761ac2a Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 09:20:18 +0000 Subject: [PATCH 26/28] chore: address PR review feedback (#3426) - re-add 'type' to _safe_globals with comment explaining 3-arg safety (method bodies execute in restricted globals; 1-arg introspection is a valid use case that previously caused confusing NameErrors) - add API rate-limit note to max_concurrency field description (consider 2-4 for LLM-heavy workflows) Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/definition.py | 5 ++++- openhands-tools/openhands/tools/workflow/impl.py | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/openhands-tools/openhands/tools/workflow/definition.py b/openhands-tools/openhands/tools/workflow/definition.py index c13d1675e6..e51c4add88 100644 --- a/openhands-tools/openhands/tools/workflow/definition.py +++ b/openhands-tools/openhands/tools/workflow/definition.py @@ -33,7 +33,10 @@ class WorkflowAction(Action): default=8, ge=1, le=64, - description="Maximum number of sub-agent tasks to run concurrently.", + description=( + "Maximum number of sub-agent tasks to run concurrently. " + "Consider 2–4 for LLM-heavy workflows to avoid hitting API rate limits." + ), ) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 9faf51a9c8..50ce67631d 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -403,6 +403,10 @@ def _safe_globals() -> dict[str, Any]: "str": str, "sum": sum, "tuple": tuple, + # type() is included for 1-arg introspection (e.g. type(x).__name__); the + # 3-arg class-creation form remains safe because all method bodies execute in + # the same restricted globals and cannot access unsafe builtins or modules. + "type": type, "TypeError": TypeError, "ValueError": ValueError, "zip": zip, From 5bc40d287dbe704cafc1e7b501e28535e86f08b8 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 09:35:39 +0000 Subject: [PATCH 27/28] chore: address PR review feedback (#3426) - use set assertion for map phase prompts in test_run_map_and_reduce (asyncio.to_thread dispatch order is non-deterministic) - expand 'type' safety comment to mention both safeguards: restricted globals AND AST validator blocking __dunder__ attribute access Co-authored-by: openhands --- openhands-tools/openhands/tools/workflow/impl.py | 5 +++-- tests/tools/workflow/test_workflow_tool.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 50ce67631d..1b966c1c78 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -404,8 +404,9 @@ def _safe_globals() -> dict[str, Any]: "sum": sum, "tuple": tuple, # type() is included for 1-arg introspection (e.g. type(x).__name__); the - # 3-arg class-creation form remains safe because all method bodies execute in - # the same restricted globals and cannot access unsafe builtins or modules. + # 3-arg class-creation form is safe because method bodies execute in the same + # restricted globals AND the AST validator blocks __dunder__ attribute access, + # which closes the classic __subclasses__()-based sandbox escape path. "type": type, "TypeError": TypeError, "ValueError": ValueError, diff --git a/tests/tools/workflow/test_workflow_tool.py b/tests/tools/workflow/test_workflow_tool.py index bf9fe2152a..5eec11b83e 100644 --- a/tests/tools/workflow/test_workflow_tool.py +++ b/tests/tools/workflow/test_workflow_tool.py @@ -84,12 +84,16 @@ async def main(wf): ' "result:inspect beta"\n]' ) assert result.startswith("result:summarize the results") - assert manager.prompts == [ + # map_agents uses asyncio.to_thread; thread scheduling is non-deterministic so the + # first two prompts may arrive in any order. gather() preserves result ordering but + # not dispatch order — use set comparison for the map phase. + assert set(manager.prompts[:2]) == { "researcher: inspect alpha", "researcher: inspect beta", - expected_reduce_prompt, - ] - assert manager.descriptions == ["job alpha", "job beta", "final summary"] + } + assert manager.prompts[2] == expected_reduce_prompt + assert set(manager.descriptions[:2]) == {"job alpha", "job beta"} + assert manager.descriptions[2] == "final summary" def test_run_agent_returns_task_result() -> None: From 8a484b75764fbc47af7577b36c68cd77e8c92498 Mon Sep 17 00:00:00 2001 From: openhands Date: Fri, 29 May 2026 09:46:46 +0000 Subject: [PATCH 28/28] chore: address PR review feedback (#3426) - add docstring note to validate_workflow_script acknowledging the wf-alias bypass limitation (documentation gap, not security gap) - update type() comment to accurately note methods injected via wf are not re-sandboxed, though they only expose public wf API - add warning log in attach_parent when called with a different conversation (surfaces potential programming errors) - thread 2 (run_one re-wrap): kept as-is; the [item N] prefix is valuable context for debugging ExceptionGroup failures Co-authored-by: openhands --- openhands-tools/openhands/tools/task/manager.py | 11 ++++++++++- .../openhands/tools/workflow/impl.py | 17 ++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/openhands-tools/openhands/tools/task/manager.py b/openhands-tools/openhands/tools/task/manager.py index cb38bdf918..fc6685ab98 100644 --- a/openhands-tools/openhands/tools/task/manager.py +++ b/openhands-tools/openhands/tools/task/manager.py @@ -110,8 +110,17 @@ def attach_parent(self, conversation: LocalConversation) -> None: """Attach the parent conversation used to create sub-agent tasks. Idempotent: if a parent conversation is already attached, subsequent - calls with the same or a different conversation have no effect. + calls with the same conversation have no effect. Calls with a different + conversation are also ignored, but log a warning to surface potential + programming errors where two subsystems try to register different parents. """ + if ( + self._parent_conversation is not None + and self._parent_conversation is not conversation + ): + logger.warning( + "attach_parent called with a different conversation; ignoring." + ) self._ensure_parent(conversation) def _ensure_parent(self, conversation: LocalConversation) -> None: diff --git a/openhands-tools/openhands/tools/workflow/impl.py b/openhands-tools/openhands/tools/workflow/impl.py index 1b966c1c78..5b272e2de6 100644 --- a/openhands-tools/openhands/tools/workflow/impl.py +++ b/openhands-tools/openhands/tools/workflow/impl.py @@ -263,7 +263,13 @@ def _format_value(value: Any) -> str: def validate_workflow_script(script: str) -> None: - """Perform best-effort validation for generated workflow scripts.""" + """Perform best-effort validation for generated workflow scripts. + + Note: The private-attribute guard checks the literal name ``wf``, so aliasing + (e.g. ``x = wf; x._attr``) can bypass the check. The attributes accessible + through ``WorkflowContext`` do not expose dangerous capabilities, so this is + a documentation gap rather than a security gap. + """ if len(script) > _MAX_SCRIPT_CHARS: raise WorkflowScriptError( f"Workflow script is too large: {len(script)} > {_MAX_SCRIPT_CHARS}" @@ -403,10 +409,11 @@ def _safe_globals() -> dict[str, Any]: "str": str, "sum": sum, "tuple": tuple, - # type() is included for 1-arg introspection (e.g. type(x).__name__); the - # 3-arg class-creation form is safe because method bodies execute in the same - # restricted globals AND the AST validator blocks __dunder__ attribute access, - # which closes the classic __subclasses__()-based sandbox escape path. + # type() is included for 1-arg introspection (e.g. type(x).__name__). + # 3-arg class creation is permitted; methods DEFINED IN THE SCRIPT execute in + # restricted globals, and the AST validator blocks __dunder__ attribute access + # (closing __subclasses__()-based escapes). Calls to pre-existing injected + # objects such as wf are not re-sandboxed, but those expose only public wf API. "type": type, "TypeError": TypeError, "ValueError": ValueError,