feat: add parallel tool orchestration with bounded concurrency#167
feat: add parallel tool orchestration with bounded concurrency#167bgauryy wants to merge 10 commits intogoogle:mainfrom
Conversation
Parallel Tool Orchestration in ADK-JS
OverviewThis change adds optional parallel tool execution with bounded concurrency to ADK-JS, while keeping sequential mode as the default.
How It WorksEnabling parallel moderunner.runAsync({
runConfig: {
parallelToolExecution: true, // opt-in; default is false
maxConcurrentToolCalls: 4, // optional; omit for no limit
},
});The config flows through Dispatch flow
Result ordering
What ChangedNew
|
| Field | Type | Default | Location |
|---|---|---|---|
parallelToolExecution |
boolean? |
false |
run_config.ts interface + createRunConfig defaults |
maxConcurrentToolCalls |
number? |
undefined (no limit) |
run_config.ts interface only (no default in createRunConfig) |
parallelToolExecutioncontrols whether tools run concurrently or sequentially.maxConcurrentToolCallscaps how many parallel tools fire at once. Whenundefinedor<= 0, all tools dispatch simultaneously. Fractional values are floored viaMath.floor. Only applies whenparallelToolExecutionistrue.
core/src/agents/functions.ts (modified)
- Extracted
executeSingleFunctionCall— canonical single-call pipeline shared by both dispatch modes. - Added
dispatchParallel— runs tools viaPromise.allSettled, optionally batched bymaxConcurrentToolCalls. - Added
dispatchSequential— runs tools one-by-one with per-calltry/catcherror isolation. - Added
executeInBatches— dispatches tool call tasks in fixed-size batches whenmaxConcurrentToolCallsis set. - Added
executeSingleFunctionCallerror recovery — if a tool is not intoolsDict, the pluginonToolErrorCallbackis invoked before throwing. - Added
structuredClonefor args — deep-copiesfunctionCall.argsbefore execution, preventing callback/tool mutations from leaking back to the originalFunctionCall. - Added
detectStateDeltaConflicts— warns when parallel tools write overlappingstateDeltakeys, labeling each as(deep-merged)or(last-write-wins). Uses the same lodashisPlainObjectcheck asdeepMergeStateDeltato ensure warning labels match actual merge behavior. - Added
createErrorResponseEvent— builds error response events for failed tool calls (used by both dispatch modes). - Modified
mergeParallelFunctionResponseEvents— already existed, but now merges actions viamergeEventActionswhich uses the new deep merge forstateDelta.
core/src/events/event_actions.ts (modified)
- Added
deepMergeStateDelta— replaced shallowObject.assignforstateDeltawith a recursive merge using lodashisPlainObjectguard:- Plain objects → recursive deep merge (creates a shallow copy at each level, then recurses)
- Arrays, primitives,
undefined→ last-write-wins via direct assignment
When to Use Each Mode
Use parallel mode when:
- Tools are independent (no shared side effects)
- Latency matters (e.g., multiple API calls)
- You want failure isolation by default
Use sequential mode when:
- Tools depend on side effects from earlier tools
- Callback logic assumes strict execution order
- You need deterministic ordering of state mutations
Caveats
Sequential mode behavior change
Previously, a tool failure in sequential mode would throw and abort the entire turn. Now, failures produce error response events and execution continues to the next tool. Callers that relied on thrown exceptions from handleFunctionCallsAsync should be aware of this change.
structuredClone limitations
structuredClone is used on functionCall.args before execution. If you pass non-cloneable values (functions, Symbols, WeakMaps) as args programmatically, a DataCloneError will be thrown. LLM-generated args are always JSON-serializable and unaffected.
stateDelta conflict warnings
In parallel mode, a warning fires when multiple tools write to the same top-level stateDelta key. Two tools writing different sub-keys of the same parent object (e.g., {user: {name}} and {user: {age}}) will trigger the warning but merge correctly via deep merge. Treat the warning as informational.
Concurrent callbacks
In parallel mode, beforeToolCallback and afterToolCallback fire concurrently across tool calls. Callbacks must not depend on cross-call execution order or shared mutable state.
stateDelta merge semantics
- Nested plain objects are recursively deep-merged
- Arrays are not element-merged — last write wins
- Explicit
undefinedwrites are preserved (clearing a key by setting it toundefinedworks)
Comparison with Python ADK
| Feature | Python ADK | JS ADK |
|---|---|---|
| Default mode | Always parallel (asyncio.gather) |
Sequential; opt-in parallel |
| Error isolation | asyncio.gather propagates first error |
Promise.allSettled isolates per-tool |
| Concurrency cap | Not available | maxConcurrentToolCalls batching |
| Args protection | copy.deepcopy |
structuredClone |
stateDelta merge |
deep_merge_dicts (mutates in place) |
deepMergeStateDelta (immutable copies per level) |
| Conflict detection | None | Warning with resolution labels |
| Sequential fallback | Not available | Full sequential mode with error isolation |
mergeParallelFunctionResponseEvents was calling createEvent without passing invocationId, causing the merged event to default to ''. Single-tool responses were unaffected (returned as-is), but any multi-tool parallel turn produced an event with an empty invocationId, breaking session history, tracing, and downstream invocation filtering. Added a regression test that asserts the merged event carries the correct invocationId across a two-tool parallel execution.
…uarded resumption
core/src/agents/functions.ts
Outdated
| } catch (e) { | ||
| toolContext = new ToolContext({ | ||
| invocationContext, | ||
| functionCallId: functionCall.id || undefined, |
There was a problem hiding this comment.
Can it be just functionCallId: functionCall.id
Why do we need || undefined?
| parts: parts, | ||
| role: functionResponseEvent.content!.role, | ||
| }, | ||
| actions: functionResponseEvent.actions, |
There was a problem hiding this comment.
why do we need to remove this?
There was a problem hiding this comment.
See main comment
Actions on confirmation event
Before: actions: functionResponseEvent.actions copied the full merged actions onto the confirmation event.
Problem: In the streaming path each individual tool-response event is yielded and appended one by one. appendEvent applies stateDelta for every event. Keeping the full actions on the confirmation event double-applied stateDelta.
Why we can't just remove it entirely: requestedToolConfirmations is not a state mutation and is not applied by appendEvent, but it is read by toStructuredEvents to emit the TOOL_CONFIRMATION signal. Without it consumers never receive the confirmation pause signal.
Fix: actions: createEventActions({ requestedToolConfirmations: functionResponseEvent.actions.requestedToolConfirmations }) carries only what consumers need, nothing that mutates state.
Test: "carries only requestedToolConfirmations stateDelta/artifactDelta/transferToAgent are not copied"
Builds a functionResponseEvent with non-empty stateDelta, artifactDelta, and transferToAgent alongside requestedToolConfirmations, then asserts the confirmation event has empty deltas and no transferToAgent.
|
I fixed all issues and run real test on my side Review Fixes PR #1671.
|
|
@kalenkevich |
| /** | ||
| * Optional metadata for framework-level runtime coordination. | ||
| */ | ||
| customMetadata?: {[key: string]: unknown}; |
There was a problem hiding this comment.
Why do we need this field in eventActions?
|
|
||
| if (source.stateDelta) { | ||
| Object.assign(result.stateDelta, source.stateDelta); | ||
| deepMergeStateDelta(result.stateDelta, source.stateDelta); |
There was a problem hiding this comment.
can we use cloneDeep from 'lodash-es' instead?
| if (source.customMetadata) { | ||
| Object.assign(result.customMetadata ?? {}, source.customMetadata); | ||
| } |
There was a problem hiding this comment.
Not sure that we need this at all.
| function deepMergeStateDelta( | ||
| target: Record<string, unknown>, | ||
| source: Record<string, unknown>, | ||
| ): void { | ||
| for (const [key, srcValue] of Object.entries(source)) { | ||
| const targetValue = target[key]; | ||
|
|
||
| if (isPlainObject(targetValue) && isPlainObject(srcValue)) { | ||
| const nestedTarget = {...(targetValue as Record<string, unknown>)}; | ||
| deepMergeStateDelta(nestedTarget, srcValue as Record<string, unknown>); | ||
| target[key] = nestedTarget; | ||
| continue; | ||
| } | ||
|
|
||
| // Preserve explicit undefined writes as last-write-wins for clear semantics. | ||
| target[key] = srcValue; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
not sure that we need this, see https://github.com/google/adk-js/pull/167/changes#r2875471658
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
If applicable, please follow the issue templates to provide as much detail as
possible.
Problem:
A clear and concise description of what the problem is.
Solution:
A clear and concise description of what you want to happen and why you choose
this solution.
Testing Plan
Please describe the tests that you ran to verify your changes. This is required
for all PRs that are not small documentation or typo fixes.
Unit Tests:
Please include a summary of passed npm test results.
Manual End-to-End (E2E) Tests:
Please provide instructions on how to manually test your changes, including any
necessary setup or configuration. Please provide logs or screenshots to help
reviewers better understand the fix.
Checklist
Additional context
Add any other context or screenshots about the feature request here.