Skip to content

Support out-of-band progress callbacks for streaming tools in live mode #5979

@kazunori279

Description

@kazunori279

Description

Related: #5947 (streaming tool yields cause re-invocation loop)

Issue #5947 addresses the immediate bug where streaming tool yields interrupt the model via turn_complete=True. The turn_complete=False fix (in progress) solves the re-invocation loop, but intermediate yields still land in the model's context window — even when they're purely presentational status updates the model doesn't need to reason about.

This feature request proposes a complementary mechanism: an out-of-band progress callback that lets streaming tools report status directly to the application layer, bypassing the model's context entirely.

Motivation

Many streaming tool use cases involve two distinct types of intermediate output:

Type Example Model needs it?
Context-relevant "Found 3 matching documents" Yes — model should narrate or reason about it
Presentational "Step 2/5: processing...", spinner updates No — only the UI needs it

After #5947's turn_complete=False fix, context-relevant yields work correctly — the model receives them without interruption. But presentational yields still consume context window space unnecessarily and can accumulate noise over long-running tools.

An out-of-band callback gives tool authors explicit control: yield for model-facing updates, progress_callback() for UI-only status.

Proposed Design

1. Parameter injection (following ToolContext pattern)

FunctionTool already detects tool_context in the function signature via find_context_parameter() and injects it at call time (function_tool.py:88). A progress_callback would follow the same pattern:

async def long_running_task(
    query: str,
    tool_context: ToolContext,
    progress_callback: Callable[[Any], Awaitable[None]] | None = None,
):
    """A tool that takes time and reports progress."""
    # OOB: goes directly to the app layer, not to the model
    if progress_callback:
        await progress_callback({"status": "running", "message": "Step 1: fetching data..."})
    
    await asyncio.sleep(5)
    
    if progress_callback:
        await progress_callback({"status": "running", "message": "Step 2: processing..."})
    
    await asyncio.sleep(5)
    
    # Model-facing: returned as the tool result for the model to narrate
    return {"status": "completed", "result": "Done!"}

For async generator (streaming) tools, both mechanisms coexist:

async def long_running_task(
    query: str,
    progress_callback: Callable[[Any], Awaitable[None]] | None = None,
):
    # OOB progress — UI only
    if progress_callback:
        await progress_callback({"step": 1, "message": "Searching..."})
    
    results = await search(query)
    
    # Model-facing yield — model can narrate "I found 3 results"
    yield {"status": "found", "count": len(results)}
    
    if progress_callback:
        await progress_callback({"step": 2, "message": "Analyzing..."})
    
    analysis = await analyze(results)
    yield {"status": "completed", "analysis": analysis}

2. Injection point

In FunctionTool._call_live() (function_tool.py:320-321), where tool_context is already injected for async generator tools:

async def _call_live(self, *, args, tool_context, invocation_context) -> Any:
    args_to_call = args.copy()
    signature = inspect.signature(self.func)
    
    if 'tool_context' in signature.parameters:
        args_to_call['tool_context'] = tool_context
    
    # New: inject progress_callback if the tool declares it
    if 'progress_callback' in signature.parameters:
        args_to_call['progress_callback'] = self._make_progress_callback(
            invocation_context, tool_context
        )
    
    async with Aclosing(self.func(**args_to_call)) as agen:
        async for item in agen:
            yield item

Same injection in run_async() for non-generator tools.

3. Callback routing

The bound callback routes progress to a handler registered on the runner or invocation context — never entering live_request_queue:

def _make_progress_callback(self, invocation_context, tool_context):
    async def callback(data: Any) -> None:
        if invocation_context.progress_handler:
            await invocation_context.progress_handler(
                tool_name=self.name,
                function_call_id=tool_context.function_call_id,
                data=data,
            )
    return callback

4. Runner-level handler registration

runner = Runner(
    agent=agent,
    app_name="my_app",
    session_service=session_service,
)

# Application registers a handler for OOB progress
runner.on_tool_progress = my_progress_handler

async def my_progress_handler(tool_name: str, function_call_id: str, data: Any):
    # Route to WebSocket, SSE, logging, etc.
    await websocket.send_json({
        "type": "tool_progress",
        "tool": tool_name,
        "data": data,
    })

Surface area

File Change
InvocationContext Add progress_handler: Optional[Callable] field
FunctionTool._call_live() Detect and inject progress_callback
FunctionTool.run_async() Same injection for non-generator tools
FunctionTool._ignore_params Add 'progress_callback' to exclude from function declaration
Runner Add on_tool_progress registration, thread handler into InvocationContext

Non-goals

Environment

Metadata

Metadata

Assignees

Labels

live[Component] This issue is related to live, voice and video chattools[Component] This issue is related to tools

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions