From 1c76d953f200c77a4e5ef91f5c16ba0dc4c9eb9e Mon Sep 17 00:00:00 2001 From: jasinluo <1127097451@qq.com> Date: Tue, 21 Apr 2026 21:21:10 +0800 Subject: [PATCH] fix(teams): fix parallel delegation signal loss and enable streaming output --- trpc_agent_sdk/teams/_team_agent.py | 76 ++++++++++++++++++++++------- 1 file changed, 58 insertions(+), 18 deletions(-) diff --git a/trpc_agent_sdk/teams/_team_agent.py b/trpc_agent_sdk/teams/_team_agent.py index 501aaa1..4b612cc 100644 --- a/trpc_agent_sdk/teams/_team_agent.py +++ b/trpc_agent_sdk/teams/_team_agent.py @@ -414,6 +414,9 @@ async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, iteration += 1 last_event: Optional[Event] = None + # Collect ALL non-partial events from leader for delegation signal extraction + all_leader_events: List[Event] = [] + # Update activity tracking current_activity = "leader planning" @@ -467,6 +470,7 @@ async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, # LongRunningEvent is partial, so will not enter below logic. if not event.partial: last_event = event + all_leader_events.append(event) # Collect text from leader's response event_text = self._extract_text_from_event(event) if event_text: @@ -486,8 +490,13 @@ async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, logger.debug("TeamAgent: Leader requested transfer to: %s", transfer_to_agent) return - # Try to extract delegation signals from last event - signals = self._extract_delegation_signals(last_event) + # Try to extract delegation signals from ALL leader events + # (not just last_event, because when tools execute sequentially, + # each tool response is a separate event and only the last one + # would be captured by last_event) + signals: List[DelegationSignal] = [] + for leader_event in all_leader_events: + signals.extend(self._extract_delegation_signals(leader_event)) if signals: # Delegation detected, execute member agent(s) @@ -786,9 +795,12 @@ async def _execute_delegations_parallel( is_member_mode: bool = False, context_lock: Optional[asyncio.Lock] = None, ) -> AsyncGenerator[Event, None]: - """Execute multiple delegations in parallel. + """Execute multiple delegations in parallel with streaming output. + + Uses an asyncio.Queue so that events from whichever member finishes + first are yielded immediately, instead of waiting for all members to + complete before yielding any events. - Used when parallel_execution=True and there are multiple delegation signals. Each _execute_delegation call handles its own interaction recording. Args: @@ -800,25 +812,53 @@ async def _execute_delegations_parallel( context_lock: Lock for thread-safe access to team_run_context. Yields: - Events from all member executions. + Events from member executions, streamed as soon as each member + produces them. """ + # Use a queue to stream events from concurrent tasks as they arrive + queue: asyncio.Queue = asyncio.Queue() + # Sentinel object to signal that a producer task has finished + _DONE = object() - async def run_delegation(signal: DelegationSignal) -> List[Event]: - """Run a single delegation and collect events.""" - events: List[Event] = [] - async for event in self._execute_delegation(ctx, signal, team_run_context, message_builder, is_member_mode, - context_lock): - events.append(event) - return events + async def run_delegation(signal: DelegationSignal) -> None: + """Run a single delegation and push events to the queue.""" + try: + async for event in self._execute_delegation( + ctx, + signal, + team_run_context, + message_builder, + is_member_mode, + context_lock, + ): + await queue.put(event) + except Exception as exc: + # Push the exception so the consumer can re-raise it + await queue.put(exc) + finally: + await queue.put(_DONE) # Run all delegations in parallel - logger.debug("TeamAgent: Executing %s delegations in parallel", len(signals)) - results = await asyncio.gather(*[run_delegation(signal) for signal in signals]) + logger.debug("TeamAgent: Executing %s delegations in parallel (streaming)", len(signals)) + tasks = [asyncio.create_task(run_delegation(signal)) for signal in signals] + finished_count = 0 + total = len(tasks) + + # Consume events from the queue as they arrive + while finished_count < total: + item = await queue.get() + if item is _DONE: + finished_count += 1 + elif isinstance(item, Exception): + # Cancel remaining tasks and re-raise the first exception + for task in tasks: + task.cancel() + raise item + else: + yield item - # Yield all events (interaction records already added by _execute_delegation) - for events in results: - for event in events: - yield event + # Ensure all tasks are cleaned up (handles edge cases) + await asyncio.gather(*tasks, return_exceptions=True) def _find_member_by_name(self, name: str) -> Optional[BaseAgent]: """Find a member agent by name.