feat(a2a-bridge): follow-up messages on existing tasks#315
feat(a2a-bridge): follow-up messages on existing tasks#315zeroasterisk wants to merge 5 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements support for follow-up messages on existing tasks in the A2A Bridge, updating SendMessage to route messages via a new sendFollowUp method when a task ID is provided, and adding corresponding server-layer routing and comprehensive tests. The review feedback highlights critical issues in the blocking path of sendFollowUp where task state updates in the database are missing on success, initial send failure, and timeout/cancellation. It is recommended to update the task state to TaskStateCompleted on success and TaskStateFailed on failures, and to update the corresponding test assertions to match this corrected lifecycle behavior.
| if err := b.hubClient.Agents().SendStructuredMessage(ctx, agentID, scionMsg, false, false, false); err != nil { | ||
| return nil, fmt.Errorf("send follow-up to agent: %w", err) | ||
| } | ||
|
|
||
| timeout := b.config.Timeouts.SendMessage | ||
| if timeout == 0 { | ||
| timeout = 120 * time.Second | ||
| } | ||
| timer := time.NewTimer(timeout) | ||
| defer timer.Stop() | ||
|
|
||
| select { | ||
| case response := <-responseCh: | ||
| msg, artifacts := TranslateScionToA2A(response) | ||
| return &TaskResult{ | ||
| ID: taskID, | ||
| ContextID: task.ContextID, | ||
| Status: TaskStatus{State: TaskStateWorking, Message: &msg}, | ||
| Artifacts: artifacts, | ||
| }, nil | ||
| case <-timer.C: | ||
| return nil, fmt.Errorf("timeout waiting for agent response after %v", timeout) | ||
| case <-ctx.Done(): | ||
| return nil, ctx.Err() | ||
| } |
There was a problem hiding this comment.
In the blocking path of sendFollowUp, there are several critical issues with task state updates in the database:
- Missing state update on success: When a response is successfully received from
responseCh, the task state is not updated toTaskStateCompletedin the database, andTaskStateWorkingis returned instead. This leaves the task in a perpetualworkingstate in the database, causing it to eventually be reaped asfailedby the janitor. - Missing state update on initial send failure: If
SendStructuredMessagefails, the task state is not updated toTaskStateFailedin the database. - Missing state update on timeout or cancellation: If the request times out or the context is cancelled, the task state is not updated to
TaskStateFailedin the database.
To ensure consistency with the original SendMessage blocking path and prevent completed tasks from being reaped as failed, we should update the database state and return TaskStateCompleted on success, and update the database state to TaskStateFailed on any failure, timeout, or cancellation.
if err := b.hubClient.Agents().SendStructuredMessage(ctx, agentID, scionMsg, false, false, false); err != nil {
if updateErr := b.store.UpdateTaskState(taskID, TaskStateFailed); updateErr != nil {
b.log.Error("failed to update task state", "error", updateErr, "task_id", taskID)
}
return nil, fmt.Errorf("send follow-up to agent: %w", err)
}
timeout := b.config.Timeouts.SendMessage
if timeout == 0 {
timeout = 120 * time.Second
}
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case response := <-responseCh:
msg, artifacts := TranslateScionToA2A(response)
if err := b.store.UpdateTaskState(taskID, TaskStateCompleted); err != nil {
b.log.Error("failed to update task state", "error", err, "task_id", taskID)
}
return &TaskResult{
ID: taskID,
ContextID: task.ContextID,
Status: TaskStatus{State: TaskStateCompleted, Message: &msg},
Artifacts: artifacts,
}, nil
case <-timer.C:
if err := b.store.UpdateTaskState(taskID, TaskStateFailed); err != nil {
b.log.Error("failed to update task state", "error", err, "task_id", taskID)
}
return nil, fmt.Errorf("timeout waiting for agent response after %v", timeout)
case <-ctx.Done():
if err := b.store.UpdateTaskState(taskID, TaskStateFailed); err != nil {
b.log.Error("failed to update task state", "error", err, "task_id", taskID)
}
return nil, ctx.Err()
}| if sr.result.Status.State != TaskStateWorking { | ||
| t.Errorf("result.Status.State = %q, want %q", sr.result.Status.State, TaskStateWorking) | ||
| } |
There was a problem hiding this comment.
Update the test assertion to expect TaskStateCompleted instead of TaskStateWorking to align with the corrected blocking follow-up task lifecycle behavior.
| if sr.result.Status.State != TaskStateWorking { | |
| t.Errorf("result.Status.State = %q, want %q", sr.result.Status.State, TaskStateWorking) | |
| } | |
| if sr.result.Status.State != TaskStateCompleted { | |
| t.Errorf("result.Status.State = %q, want %q", sr.result.Status.State, TaskStateCompleted) | |
| } |
Enable multi-turn conversations by routing message/send with a taskID to the same agent, continuing the conversation instead of creating a new task. When SendMessageParams includes a taskID: 1. Look up the existing task and verify ownership 2. Reject if task is in a terminal state (completed/failed/canceled) 3. Resolve the agent from stored task metadata 4. Send the follow-up message to the agent 5. Return the existing task (not a new one) This works with both blocking and non-blocking modes. Combined with the multi-turn lifecycle change (PR 1), this enables the full A2A multi-turn flow: client sends initial message → agent responds or asks for input → client sends follow-up → agent continues. Design doc: .design/a2a-task-followup.md
22 tests covering follow-up message routing: - Valid/unknown/terminal/wrong-project/wrong-agent task ID handling - Task state transitions (input-required → working) - Blocking timeout/error/cancel/success cleanup paths - Non-blocking registration and send-failure cleanup - Concurrent follow-ups on same task - Message content translation - Server-level TaskID passthrough and error handling Bugs fixed during review: - Blocking success path leaked activeTask (added defer unregister) - Non-blocking send failure didn't mark task failed or unregister
Fixes from code review: - Blocking success: refresh task timestamp with UpdateTaskState(working) - Send failure: mark task as failed + unregister activeTask - Timeout/cancel: mark task as failed - Added tests verifying DB state after each path
Found during 12-cycle debug/refactor: - Fixed edge cases in follow-up routing and state management - Added test coverage for discovered paths - 3 consecutive clean cycles after fixes
34b65a9 to
25843e6
Compare
|
3 review rounds complete. Fixed mock interface compatibility after upstream added ResetAuth to AgentService. Rebased on upstream/main. All tests pass. |
Summary
Enable multi-turn conversations by routing message/send with a taskID to the same agent, continuing the conversation instead of creating a new task. Combined with #314 (multi-turn lifecycle), this completes the A2A multi-turn flow.
Design
When SendMessageParams includes a taskID, look up the existing task, verify ownership and non-terminal state, resolve the agent, and send the follow-up. Works with both blocking and non-blocking modes.
Design doc:
.design/a2a-task-followup.mdTest plan
followup_test.goDepends on: #314