feat(a2a-bridge): multi-turn task lifecycle#314
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements a multi-turn task lifecycle for the A2A bridge, ensuring that task completion is driven by agent state changes rather than content messages. The review feedback identifies a critical bug where state-change messages are silently dropped during active blocking calls, a potential issue where active tasks could be prematurely reaped by the janitor because their update timestamps are not refreshed on content messages, and an unused, broken helper function in the test suite that should be removed.
| handled := b.dispatchToWaiter(taskID, stateMsg) | ||
| if !handled { | ||
| t.Error("dispatchToWaiter should return true for state-change (to suppress further dispatch)") | ||
| } |
There was a problem hiding this comment.
The assertion that dispatchToWaiter should return true for state-change messages (to "suppress further dispatch") actually codifies a significant bug.
When dispatchToWaiter returns true, the caller dispatchBrokerMessage immediately stops processing the message and returns. This completely prevents the state-change message from falling through to dispatchToActiveTask. As a result, any state-change messages (such as WORKING, WAITING_FOR_INPUT, or COMPLETED) sent by the agent while a blocking SendMessage call is active will be silently dropped. They will never update the task state in the database, nor will they be broadcast to SSE subscribers or push notifications.
To fix this, dispatchToWaiter should return false for state-change messages so they can fall through to dispatchToActiveTask, while still ensuring they are not sent to the waiter's channel.
Please update dispatchToWaiter in bridge.go to return false for state-change messages, and update this test assertion accordingly.
| handled := b.dispatchToWaiter(taskID, stateMsg) | |
| if !handled { | |
| t.Error("dispatchToWaiter should return true for state-change (to suppress further dispatch)") | |
| } | |
| handled := b.dispatchToWaiter(taskID, stateMsg) | |
| if handled { | |
| t.Error("dispatchToWaiter should return false for state-change to allow further dispatch to active task") | |
| } |
| } else { | ||
| // TODO(multi-turn): MVP limitation — treats any non-state-change message as | ||
| // a terminal response. Multi-turn agents that emit interim content (e.g. | ||
| // clarifying questions, progress updates) will have their task closed | ||
| // prematurely on the first content message. This breaks agents that use | ||
| // input-required → completed flows. Must be fixed before exposing | ||
| // non-trivial agent types. | ||
| b.log.Debug("treating content message as task completion (MVP)", "task_id", taskID) | ||
| if err := b.store.UpdateTaskState(taskID, TaskStateCompleted); err != nil { | ||
| b.log.Error("failed to update task state", "error", err, "task_id", taskID) | ||
| } | ||
|
|
||
| // Content message — broadcast to subscribers but keep task alive. | ||
| // Task lifecycle is driven by state-change messages, not content. |
There was a problem hiding this comment.
For long-running tasks or streaming agents that send multiple content messages (e.g., progress updates) but do not frequently change their state, the task's UpdatedAt timestamp in the database is never updated.
Because the janitor reaps tasks based on task.UpdatedAt being older than maxAge (which defaults to 4 minutes), these active tasks will be prematurely reaped and marked as failed by the janitor, even though they are actively sending content.
To prevent this, we should touch the task in the database (by updating its state to TaskStateWorking) whenever a content message is received. This will update the UpdatedAt timestamp and keep the task alive.
} else {
// Content message — broadcast to subscribers but keep task alive.
// Task lifecycle is driven by state-change messages, not content.
if err := b.store.UpdateTaskState(taskID, TaskStateWorking); err != nil {
b.log.Error("failed to update task state", "error", err, "task_id", taskID)
}| func collectEvents(ctx context.Context, b *Bridge, taskID string) ([]StreamEvent, func(), error) { | ||
| ch, cleanup, err := b.streams.Subscribe(taskID) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
| var events []StreamEvent | ||
| done := make(chan struct{}) | ||
| go func() { | ||
| defer close(done) | ||
| for { | ||
| select { | ||
| case ev, ok := <-ch: | ||
| if !ok { | ||
| return | ||
| } | ||
| events = append(events, ev) | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| }() | ||
| wait := func() { | ||
| <-done | ||
| cleanup() | ||
| } | ||
| return events, wait, nil // Note: caller reads events after wait() | ||
| // Actually the events slice is shared, we need a different approach. | ||
| } |
There was a problem hiding this comment.
The helper function collectEvents is completely unused in this test file and contains a comment indicating that its implementation is broken (Actually the events slice is shared, we need a different approach.).
To keep the codebase clean and maintainable, we should remove this unused and broken helper function.
Stop auto-closing tasks on the first content message from an agent. Previously, any non-state-change message immediately marked the task as completed and closed all subscriptions (the MVP single-turn limitation documented in the TODO at bridge.go:633). Now content messages are broadcast to streaming and push subscribers with state=working and Final=false, keeping the task alive. Task lifecycle is driven solely by agent state-change messages: - working/thinking/executing → working (non-terminal) - waiting_for_input → input-required (non-terminal) - completed → completed (terminal, closes task) - error/stalled → failed (terminal, closes task) This enables multi-turn conversations where agents ask clarifying questions, send progress updates, or emit interim artifacts before completing. Design doc: .design/a2a-multi-turn-lifecycle.md
20 tests covering the multi-turn task lifecycle: - Content messages don't complete tasks - Content broadcasts with state=working, Final=false - Multiple content messages keep task alive - State-change to completed/failed closes task properly - State-change to input-required keeps task alive - Blocking SendMessage returns working (not completed) - Blocking timeout/error/cancel cleans up activeTask - Full multi-turn lifecycle integration test - Slug-based fallback correlation with content - Metrics not incremented on content messages
Fixes from code review: 1. Terminal state-changes dropped during blocking calls: dispatchToWaiter skipped state-change messages entirely, even terminal ones. The task's DB state was never updated to completed/failed. Fix: update DB state for terminal state-changes even when a waiter is active. 2. Janitor reaping active multi-turn tasks: content messages didn't refresh the task's UpdatedAt timestamp, so long conversations could be reaped as stale. Fix: call UpdateTaskState(working) on content messages to refresh the timestamp. Added/updated tests for both scenarios.
…review Debug/refactor cycle findings: - Refactored dispatchToActiveTask for clarity - Added test coverage for edge cases in state-change handling - All tests pass
b9aca4c to
fadcf60
Compare
|
3 review rounds complete, all clean. Summary of all changes:
|
Summary
Stop auto-closing A2A tasks on the first content message. Tasks stay alive until the agent's state changes to a terminal state (completed/failed). Enables multi-turn conversations where agents ask clarifying questions, send progress updates, or emit interim artifacts.
Design
Content messages are broadcast to subscribers with state=working, Final=false. Task lifecycle is driven solely by state-change messages. The blocking SendMessage path returns state=working (not completed) so follow-up messages can be sent.
Design doc:
.design/a2a-multi-turn-lifecycle.mdTest plan
lifecycle_test.gocovering all multi-turn scenarios