Skip to content

Concurrent tool-using LlmAgents run via ctx.run_node crash with "No function call event found for function responses ids" (single_turn appends branchless/scopeless user event to shared session.events) #5989

@leoromero

Description

@leoromero

🔴 Required Information

Describe the Bug:

When several tool-using LlmAgents are run concurrently as dynamic-workflow nodes (asyncio.gather of ctx.run_node(agent, ...)), runs intermittently crash with:

ValueError: No function call event found for function responses ids: {'tooluse_...'}

raised from google/adk/flows/llm_flows/contents.py _rearrange_events_for_latest_function_response.

Root cause appears to be in run_llm_agent_as_node + the content builder:

  1. A bare LlmAgent run via ctx.run_node defaults to mode='single_turn', which force-overrides agent.include_contents = 'none' (workflow/_llm_agent_wrapper.py), regardless of the value set on the agent. This routes content building through _get_current_turn_contents.
  2. single_turn then appends the node_input user event to the shared session.events list with no branch and no isolation_scope (_llm_agent_wrapper.prepare_llm_agent_input).
  3. A branchless + scopeless event is considered visible to every branch (contents._is_event_belongs_to_branch returns True when event.branch is falsy) and every scope (isolation_scope=None matches the default None).
  4. Under concurrency, a freshly-started sibling worker appends its branchless user event into the window between an already in-flight worker's function_call event and its function_response event. On that worker's next LLM step, _get_current_turn_contents walks backward, selects the sibling's user event as the "current turn" boundary, and slices the event list so that it contains the worker's function_response but excludes its own function_call → orphaned function responses → ValueError.

It is a race, so it is intermittent and gets much more likely as the number of concurrent workers and the number of tool rounds per worker increase.

Notably, both of ADK's isolation primitives are broken for this use case:

  • use_sub_branch=True → the crash above.
  • unique override_isolation_scope per worker (use_sub_branch=False) → no crash, but the agents loop forever (LlmCallsLimitExceededError: Max number of llm calls limit of 500 exceeded) because the scoped contents path never re-presents the tool results to the agent.

The only reliable workaround we found is to run each concurrent tool-using agent in its own session (separate Runner + InMemorySessionService).

Steps to Reproduce:

  1. pip install google-adk==2.1.0 litellm==1.85.0
  2. Save the script under Minimal Reproduction Code below as repro.py.
  3. Run python repro.py → crashes with No function call event found for function responses ids.
  4. Run FIX=1 python repro.py → no crash, but infinite tool loop (Max number of llm calls limit of 500 exceeded).
  5. Run FIX=2 python repro.pyCOMPLETED WITHOUT ERROR (own-session-per-worker workaround).

(No network / API keys needed — the model client is a canned fake that returns parallel tool calls then a final message.)

Expected Behavior:

Running tool-using LlmAgents concurrently as workflow nodes should isolate each agent's conversation so that one worker's node_input/user event cannot be selected as another worker's turn boundary. Each worker should see its own function_call/function_response pair intact and complete normally.

Observed Behavior:

Intermittent crash (scales with concurrency × tool rounds):

File ".../google/adk/flows/llm_flows/contents.py", line 85, in run_async
    llm_request.contents = _get_current_turn_contents(
File ".../google/adk/flows/llm_flows/contents.py", line 709, in _get_current_turn_contents
    return _get_contents(
File ".../google/adk/flows/llm_flows/contents.py", line 637, in _get_contents
    result_events = _rearrange_events_for_latest_function_response(
File ".../google/adk/flows/llm_flows/contents.py", line 224, in _rearrange_events_for_latest_function_response
    raise ValueError(
ValueError: No function call event found for function responses ids: {'tooluse_414dd7_0_1', 'tooluse_414dd7_0_0'}

Environment Details:

  • ADK Library Version (pip show google-adk): 2.1.0
  • Desktop OS: Windows 11 (also reproduces on Linux)
  • Python Version (python -V): 3.14.0 (also reproduces on 3.13/3.12)

Model Information:

  • Are you using LiteLLM: Yes
  • Which model is being used: Bedrock Anthropic Claude (Sonnet 4.x / Haiku 4.5) in production; the minimal repro uses a canned fake LiteLLM client so no real model is called.

🟡 Optional Information

Regression:

Unknown — first observed on 2.1.0; have not bisected against earlier versions.

Logs:

google_adk.google.adk.workflow._node_runner - ERROR - Node execution failed with exception
Traceback (most recent call last):
  ...
  File ".../google/adk/flows/llm_flows/contents.py", line 85, in run_async
    llm_request.contents = _get_current_turn_contents(...)
  File ".../google/adk/flows/llm_flows/contents.py", line 709, in _get_current_turn_contents
    return _get_contents(current_branch, events[i:], ...)
  File ".../google/adk/flows/llm_flows/contents.py", line 637, in _get_contents
    result_events = _rearrange_events_for_latest_function_response(filtered_events)
  File ".../google/adk/flows/llm_flows/contents.py", line 224, in _rearrange_events_for_latest_function_response
    raise ValueError(...)
ValueError: No function call event found for function responses ids: {'tooluse_...'}

Additional Context:

  • The damaging interleave does NOT reproduce if all workers fire their tool calls in one synchronized wave (their user events cluster up front, before any function_call). It requires staggered worker starts and/or multiple tool rounds per worker so a sibling's user event lands between a worker's function_call and function_response. The minimal repro forces this with a slow tool, a fast LLM step, staggered starts, and 2 tool rounds.
  • run_llm_agent_as_node forcing include_contents='none' means an explicit include_contents='default' on the agent has no effect for node-run agents — this may be surprising independent of the crash.

Minimal Reproduction Code:

"""Reproduces google-adk 2.1.0 'No function call event found for function responses ids'
for concurrent tool-using LlmAgents run as dynamic-workflow nodes. No network / API keys.

  python repro.py          # PROD: use_sub_branch=True   -> ValueError (crash)
  FIX=1 python repro.py     # isolation_scope             -> infinite tool loop (500 llm calls)
  FIX=2 python repro.py     # own session per worker      -> COMPLETED WITHOUT ERROR
"""
from __future__ import annotations
import asyncio, json, os, uuid
from pydantic import BaseModel
from google.genai import types
from google.adk.models.lite_llm import LiteLlm
from google.adk.agents.llm_agent import LlmAgent
from google.adk.workflow import Workflow, node
from google.adk.runners import Runner
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from litellm import ModelResponse
from litellm.types.utils import Message, Choices, ChatCompletionMessageToolCall, Function

N_AGENTS, TOOL_ROUNDS, STAGGER, TOOL_DELAY, LLM_DELAY = 8, 2, 0.05, 0.30, 0.01
MODEL = "bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0"
FIX = os.getenv("FIX", "")


class Out(BaseModel):
    result: str


def _rounds_done(messages):
    return sum(1 for m in (messages or [])
               if (m.get("role") if isinstance(m, dict) else getattr(m, "role", None)) == "tool")


class FakeClient:
    def __init__(self): self.tag = uuid.uuid4().hex[:6]
    async def acompletion(self, **kwargs):
        await asyncio.sleep(LLM_DELAY)
        r = _rounds_done(kwargs.get("messages"))
        if r >= TOOL_ROUNDS:
            msg = Message(role="assistant", content='{"result":"done"}')
            return ModelResponse(choices=[Choices(finish_reason="stop", index=0, message=msg)], model=MODEL)
        tool_calls = [ChatCompletionMessageToolCall(
            id=f"tooluse_{self.tag}_{r}_{i}", type="function",
            function=Function(name="lookup", arguments=json.dumps({"q": str(i)}))) for i in range(2)]
        msg = Message(role="assistant", content=None, tool_calls=tool_calls)
        return ModelResponse(choices=[Choices(finish_reason="tool_calls", index=0, message=msg)], model=MODEL)


async def lookup(q: str) -> dict:
    await asyncio.sleep(TOOL_DELAY)
    return {"q": q, "rows": 1}


def make_agent():
    model = LiteLlm(model=MODEL)
    model.llm_client = FakeClient()
    return LlmAgent(name="worker", model=model, tools=[lookup],
                    instruction="Use the lookup tool twice.",
                    output_schema=Out, output_key="out", include_contents="default")


async def _own_session(idx):
    runner = Runner(node=make_agent(), session_service=InMemorySessionService(),
                    artifact_service=InMemoryArtifactService(), memory_service=InMemoryMemoryService(),
                    auto_create_session=True)
    msg = types.Content(role="user", parts=[types.Part(text="Source data for this concept.")])
    async for _ in runner.run_async(user_id="u", session_id=f"s{idx}", new_message=msg):
        pass


_FAILURES = []


@node(rerun_on_resume=True)
async def orchestrator(ctx, node_input):
    async def run_one(idx):
        await asyncio.sleep(idx * STAGGER)
        if FIX == "2":
            return await _own_session(idx)
        kw = {"node_input": "Source data for this concept."}
        if FIX == "1":
            kw["use_sub_branch"] = False
            kw["override_isolation_scope"] = f"ds-{idx}-{uuid.uuid4().hex[:6]}"
        else:
            kw["use_sub_branch"] = True
        return await ctx.run_node(make_agent(), **kw)

    results = await asyncio.gather(*[run_one(i) for i in range(N_AGENTS)], return_exceptions=True)
    for r in results:
        if isinstance(r, BaseException):
            _FAILURES.append(repr(r))
    return results


root = Workflow(name="Repro", edges=[("START", orchestrator)])


async def main():
    runner = Runner(node=root, session_service=InMemorySessionService(),
                    artifact_service=InMemoryArtifactService(), memory_service=InMemoryMemoryService(),
                    auto_create_session=True)
    msg = types.Content(role="user", parts=[types.Part(text="start")])
    async for _ in runner.run_async(user_id="u", session_id="s", new_message=msg):
        pass
    print(f"\nFIX={FIX!r}  failures={len(_FAILURES)}/{N_AGENTS}")
    print("COMPLETED WITHOUT ERROR" if not _FAILURES else "FAILED (see traceback above)")


if __name__ == "__main__":
    asyncio.run(main())

How often has this issue occurred?:

  • Intermittently (<50%) in production (race; scales with concurrency × tool rounds). The minimal repro above forces the interleave and reproduces it ~consistently (7–8 of 8 workers fail).

Metadata

Metadata

Assignees

Labels

workflow[Component] This issue is related to ADKworkflow

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions