Skip to content

feat(a2a-bridge): multi-turn task lifecycle#314

Open
zeroasterisk wants to merge 4 commits into
GoogleCloudPlatform:mainfrom
zeroasterisk:a2a/multi-turn-lifecycle
Open

feat(a2a-bridge): multi-turn task lifecycle#314
zeroasterisk wants to merge 4 commits into
GoogleCloudPlatform:mainfrom
zeroasterisk:a2a/multi-turn-lifecycle

Conversation

@zeroasterisk

Copy link
Copy Markdown
Contributor

Summary

Stop auto-closing A2A tasks on the first content message. Tasks stay alive until the agent's state changes to a terminal state (completed/failed). Enables multi-turn conversations where agents ask clarifying questions, send progress updates, or emit interim artifacts.

Design

Content messages are broadcast to subscribers with state=working, Final=false. Task lifecycle is driven solely by state-change messages. The blocking SendMessage path returns state=working (not completed) so follow-up messages can be sent.

Design doc: .design/a2a-multi-turn-lifecycle.md

Test plan

  • 20 new tests in lifecycle_test.go covering all multi-turn scenarios
  • All existing tests pass

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a multi-turn task lifecycle for the A2A bridge, ensuring that task completion is driven by agent state changes rather than content messages. The review feedback identifies a critical bug where state-change messages are silently dropped during active blocking calls, a potential issue where active tasks could be prematurely reaped by the janitor because their update timestamps are not refreshed on content messages, and an unused, broken helper function in the test suite that should be removed.

Comment on lines +789 to +792
handled := b.dispatchToWaiter(taskID, stateMsg)
if !handled {
t.Error("dispatchToWaiter should return true for state-change (to suppress further dispatch)")
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The assertion that dispatchToWaiter should return true for state-change messages (to "suppress further dispatch") actually codifies a significant bug.

When dispatchToWaiter returns true, the caller dispatchBrokerMessage immediately stops processing the message and returns. This completely prevents the state-change message from falling through to dispatchToActiveTask. As a result, any state-change messages (such as WORKING, WAITING_FOR_INPUT, or COMPLETED) sent by the agent while a blocking SendMessage call is active will be silently dropped. They will never update the task state in the database, nor will they be broadcast to SSE subscribers or push notifications.

To fix this, dispatchToWaiter should return false for state-change messages so they can fall through to dispatchToActiveTask, while still ensuring they are not sent to the waiter's channel.

Please update dispatchToWaiter in bridge.go to return false for state-change messages, and update this test assertion accordingly.

Suggested change
handled := b.dispatchToWaiter(taskID, stateMsg)
if !handled {
t.Error("dispatchToWaiter should return true for state-change (to suppress further dispatch)")
}
handled := b.dispatchToWaiter(taskID, stateMsg)
if handled {
t.Error("dispatchToWaiter should return false for state-change to allow further dispatch to active task")
}

Comment on lines +633 to +635
} else {
// TODO(multi-turn): MVP limitation — treats any non-state-change message as
// a terminal response. Multi-turn agents that emit interim content (e.g.
// clarifying questions, progress updates) will have their task closed
// prematurely on the first content message. This breaks agents that use
// input-required → completed flows. Must be fixed before exposing
// non-trivial agent types.
b.log.Debug("treating content message as task completion (MVP)", "task_id", taskID)
if err := b.store.UpdateTaskState(taskID, TaskStateCompleted); err != nil {
b.log.Error("failed to update task state", "error", err, "task_id", taskID)
}

// Content message — broadcast to subscribers but keep task alive.
// Task lifecycle is driven by state-change messages, not content.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For long-running tasks or streaming agents that send multiple content messages (e.g., progress updates) but do not frequently change their state, the task's UpdatedAt timestamp in the database is never updated.

Because the janitor reaps tasks based on task.UpdatedAt being older than maxAge (which defaults to 4 minutes), these active tasks will be prematurely reaped and marked as failed by the janitor, even though they are actively sending content.

To prevent this, we should touch the task in the database (by updating its state to TaskStateWorking) whenever a content message is received. This will update the UpdatedAt timestamp and keep the task alive.

	} else {
		// Content message — broadcast to subscribers but keep task alive.
		// Task lifecycle is driven by state-change messages, not content.
		if err := b.store.UpdateTaskState(taskID, TaskStateWorking); err != nil {
			b.log.Error("failed to update task state", "error", err, "task_id", taskID)
		}

Comment on lines +76 to +103
func collectEvents(ctx context.Context, b *Bridge, taskID string) ([]StreamEvent, func(), error) {
ch, cleanup, err := b.streams.Subscribe(taskID)
if err != nil {
return nil, nil, err
}
var events []StreamEvent
done := make(chan struct{})
go func() {
defer close(done)
for {
select {
case ev, ok := <-ch:
if !ok {
return
}
events = append(events, ev)
case <-ctx.Done():
return
}
}
}()
wait := func() {
<-done
cleanup()
}
return events, wait, nil // Note: caller reads events after wait()
// Actually the events slice is shared, we need a different approach.
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The helper function collectEvents is completely unused in this test file and contains a comment indicating that its implementation is broken (Actually the events slice is shared, we need a different approach.).

To keep the codebase clean and maintainable, we should remove this unused and broken helper function.

Stop auto-closing tasks on the first content message from an agent.
Previously, any non-state-change message immediately marked the task
as completed and closed all subscriptions (the MVP single-turn
limitation documented in the TODO at bridge.go:633).

Now content messages are broadcast to streaming and push subscribers
with state=working and Final=false, keeping the task alive. Task
lifecycle is driven solely by agent state-change messages:
- working/thinking/executing → working (non-terminal)
- waiting_for_input → input-required (non-terminal)
- completed → completed (terminal, closes task)
- error/stalled → failed (terminal, closes task)

This enables multi-turn conversations where agents ask clarifying
questions, send progress updates, or emit interim artifacts before
completing.

Design doc: .design/a2a-multi-turn-lifecycle.md
20 tests covering the multi-turn task lifecycle:
- Content messages don't complete tasks
- Content broadcasts with state=working, Final=false
- Multiple content messages keep task alive
- State-change to completed/failed closes task properly
- State-change to input-required keeps task alive
- Blocking SendMessage returns working (not completed)
- Blocking timeout/error/cancel cleans up activeTask
- Full multi-turn lifecycle integration test
- Slug-based fallback correlation with content
- Metrics not incremented on content messages
Fixes from code review:

1. Terminal state-changes dropped during blocking calls: dispatchToWaiter
   skipped state-change messages entirely, even terminal ones. The task's
   DB state was never updated to completed/failed. Fix: update DB state
   for terminal state-changes even when a waiter is active.

2. Janitor reaping active multi-turn tasks: content messages didn't
   refresh the task's UpdatedAt timestamp, so long conversations could
   be reaped as stale. Fix: call UpdateTaskState(working) on content
   messages to refresh the timestamp.

Added/updated tests for both scenarios.
…review

Debug/refactor cycle findings:
- Refactored dispatchToActiveTask for clarity
- Added test coverage for edge cases in state-change handling
- All tests pass
@zeroasterisk zeroasterisk force-pushed the a2a/multi-turn-lifecycle branch from b9aca4c to fadcf60 Compare June 7, 2026 22:56
@zeroasterisk

Copy link
Copy Markdown
Contributor Author

3 review rounds complete, all clean. Summary of all changes:

  • Fixed: terminal state-changes silently dropped during blocking calls (DB state never updated)
  • Fixed: janitor reaping active multi-turn tasks (content messages now refresh timestamp)
  • Fixed: blocking SendMessage returned state=completed instead of working
  • Fixed: activeTask leaked on blocking return (removed defer, explicit cleanup on error only)
  • 20+ lifecycle tests covering all multi-turn scenarios
  • Rebased on upstream/main, all tests pass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant