Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions trpc_agent_sdk/teams/_team_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event,
iteration += 1
last_event: Optional[Event] = None

# Collect ALL non-partial events from leader for delegation signal extraction
all_leader_events: List[Event] = []

# Update activity tracking
current_activity = "leader planning"

Expand Down Expand Up @@ -467,6 +470,7 @@ async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event,
# LongRunningEvent is partial, so will not enter below logic.
if not event.partial:
last_event = event
all_leader_events.append(event)
# Collect text from leader's response
event_text = self._extract_text_from_event(event)
if event_text:
Expand All @@ -486,8 +490,13 @@ async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event,
logger.debug("TeamAgent: Leader requested transfer to: %s", transfer_to_agent)
return

# Try to extract delegation signals from last event
signals = self._extract_delegation_signals(last_event)
# Try to extract delegation signals from ALL leader events
# (not just last_event, because when tools execute sequentially,
# each tool response is a separate event and only the last one
# would be captured by last_event)
signals: List[DelegationSignal] = []
for leader_event in all_leader_events:
signals.extend(self._extract_delegation_signals(leader_event))

if signals:
# Delegation detected, execute member agent(s)
Expand Down Expand Up @@ -786,9 +795,12 @@ async def _execute_delegations_parallel(
is_member_mode: bool = False,
context_lock: Optional[asyncio.Lock] = None,
) -> AsyncGenerator[Event, None]:
"""Execute multiple delegations in parallel.
"""Execute multiple delegations in parallel with streaming output.

Uses an asyncio.Queue so that events from whichever member finishes
first are yielded immediately, instead of waiting for all members to
complete before yielding any events.

Used when parallel_execution=True and there are multiple delegation signals.
Each _execute_delegation call handles its own interaction recording.

Args:
Expand All @@ -800,25 +812,53 @@ async def _execute_delegations_parallel(
context_lock: Lock for thread-safe access to team_run_context.

Yields:
Events from all member executions.
Events from member executions, streamed as soon as each member
produces them.
"""
# Use a queue to stream events from concurrent tasks as they arrive
queue: asyncio.Queue = asyncio.Queue()
# Sentinel object to signal that a producer task has finished
_DONE = object()

async def run_delegation(signal: DelegationSignal) -> List[Event]:
"""Run a single delegation and collect events."""
events: List[Event] = []
async for event in self._execute_delegation(ctx, signal, team_run_context, message_builder, is_member_mode,
context_lock):
events.append(event)
return events
async def run_delegation(signal: DelegationSignal) -> None:
"""Run a single delegation and push events to the queue."""
try:
async for event in self._execute_delegation(
ctx,
signal,
team_run_context,
message_builder,
is_member_mode,
context_lock,
):
await queue.put(event)
except Exception as exc:
# Push the exception so the consumer can re-raise it
await queue.put(exc)
finally:
await queue.put(_DONE)

# Run all delegations in parallel
logger.debug("TeamAgent: Executing %s delegations in parallel", len(signals))
results = await asyncio.gather(*[run_delegation(signal) for signal in signals])
logger.debug("TeamAgent: Executing %s delegations in parallel (streaming)", len(signals))
tasks = [asyncio.create_task(run_delegation(signal)) for signal in signals]
finished_count = 0
total = len(tasks)

# Consume events from the queue as they arrive
while finished_count < total:
item = await queue.get()
if item is _DONE:
finished_count += 1
elif isinstance(item, Exception):
# Cancel remaining tasks and re-raise the first exception
for task in tasks:
task.cancel()
raise item
else:
yield item

# Yield all events (interaction records already added by _execute_delegation)
for events in results:
for event in events:
yield event
# Ensure all tasks are cleaned up (handles edge cases)
await asyncio.gather(*tasks, return_exceptions=True)

def _find_member_by_name(self, name: str) -> Optional[BaseAgent]:
"""Find a member agent by name.
Expand Down
Loading