Skip to content

feat(a2a-bridge): follow-up messages on existing tasks#315

Open
zeroasterisk wants to merge 5 commits into
GoogleCloudPlatform:mainfrom
zeroasterisk:a2a/task-followup
Open

feat(a2a-bridge): follow-up messages on existing tasks#315
zeroasterisk wants to merge 5 commits into
GoogleCloudPlatform:mainfrom
zeroasterisk:a2a/task-followup

Conversation

@zeroasterisk

Copy link
Copy Markdown
Contributor

Summary

Enable multi-turn conversations by routing message/send with a taskID to the same agent, continuing the conversation instead of creating a new task. Combined with #314 (multi-turn lifecycle), this completes the A2A multi-turn flow.

Design

When SendMessageParams includes a taskID, look up the existing task, verify ownership and non-terminal state, resolve the agent, and send the follow-up. Works with both blocking and non-blocking modes.

Design doc: .design/a2a-task-followup.md

Test plan

  • 22 new tests in followup_test.go
  • All existing tests pass

Depends on: #314

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements support for follow-up messages on existing tasks in the A2A Bridge, updating SendMessage to route messages via a new sendFollowUp method when a task ID is provided, and adding corresponding server-layer routing and comprehensive tests. The review feedback highlights critical issues in the blocking path of sendFollowUp where task state updates in the database are missing on success, initial send failure, and timeout/cancellation. It is recommended to update the task state to TaskStateCompleted on success and TaskStateFailed on failures, and to update the corresponding test assertions to match this corrected lifecycle behavior.

Comment on lines +418 to +442
if err := b.hubClient.Agents().SendStructuredMessage(ctx, agentID, scionMsg, false, false, false); err != nil {
return nil, fmt.Errorf("send follow-up to agent: %w", err)
}

timeout := b.config.Timeouts.SendMessage
if timeout == 0 {
timeout = 120 * time.Second
}
timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case response := <-responseCh:
msg, artifacts := TranslateScionToA2A(response)
return &TaskResult{
ID: taskID,
ContextID: task.ContextID,
Status: TaskStatus{State: TaskStateWorking, Message: &msg},
Artifacts: artifacts,
}, nil
case <-timer.C:
return nil, fmt.Errorf("timeout waiting for agent response after %v", timeout)
case <-ctx.Done():
return nil, ctx.Err()
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the blocking path of sendFollowUp, there are several critical issues with task state updates in the database:

  1. Missing state update on success: When a response is successfully received from responseCh, the task state is not updated to TaskStateCompleted in the database, and TaskStateWorking is returned instead. This leaves the task in a perpetual working state in the database, causing it to eventually be reaped as failed by the janitor.
  2. Missing state update on initial send failure: If SendStructuredMessage fails, the task state is not updated to TaskStateFailed in the database.
  3. Missing state update on timeout or cancellation: If the request times out or the context is cancelled, the task state is not updated to TaskStateFailed in the database.

To ensure consistency with the original SendMessage blocking path and prevent completed tasks from being reaped as failed, we should update the database state and return TaskStateCompleted on success, and update the database state to TaskStateFailed on any failure, timeout, or cancellation.

if err := b.hubClient.Agents().SendStructuredMessage(ctx, agentID, scionMsg, false, false, false); err != nil {
			if updateErr := b.store.UpdateTaskState(taskID, TaskStateFailed); updateErr != nil {
				b.log.Error("failed to update task state", "error", updateErr, "task_id", taskID)
			}
			return nil, fmt.Errorf("send follow-up to agent: %w", err)
		}

		timeout := b.config.Timeouts.SendMessage
		if timeout == 0 {
			timeout = 120 * time.Second
		}
		timer := time.NewTimer(timeout)
		defer timer.Stop()

		select {
		case response := <-responseCh:
			msg, artifacts := TranslateScionToA2A(response)
			if err := b.store.UpdateTaskState(taskID, TaskStateCompleted); err != nil {
				b.log.Error("failed to update task state", "error", err, "task_id", taskID)
			}
			return &TaskResult{
				ID:        taskID,
				ContextID: task.ContextID,
				Status:    TaskStatus{State: TaskStateCompleted, Message: &msg},
				Artifacts: artifacts,
			}, nil
		case <-timer.C:
			if err := b.store.UpdateTaskState(taskID, TaskStateFailed); err != nil {
				b.log.Error("failed to update task state", "error", err, "task_id", taskID)
			}
			return nil, fmt.Errorf("timeout waiting for agent response after %v", timeout)
		case <-ctx.Done():
			if err := b.store.UpdateTaskState(taskID, TaskStateFailed); err != nil {
				b.log.Error("failed to update task state", "error", err, "task_id", taskID)
			}
			return nil, ctx.Err()
		}

Comment on lines +588 to +590
if sr.result.Status.State != TaskStateWorking {
t.Errorf("result.Status.State = %q, want %q", sr.result.Status.State, TaskStateWorking)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the test assertion to expect TaskStateCompleted instead of TaskStateWorking to align with the corrected blocking follow-up task lifecycle behavior.

Suggested change
if sr.result.Status.State != TaskStateWorking {
t.Errorf("result.Status.State = %q, want %q", sr.result.Status.State, TaskStateWorking)
}
if sr.result.Status.State != TaskStateCompleted {
t.Errorf("result.Status.State = %q, want %q", sr.result.Status.State, TaskStateCompleted)
}

Enable multi-turn conversations by routing message/send with a taskID
to the same agent, continuing the conversation instead of creating a
new task.

When SendMessageParams includes a taskID:
1. Look up the existing task and verify ownership
2. Reject if task is in a terminal state (completed/failed/canceled)
3. Resolve the agent from stored task metadata
4. Send the follow-up message to the agent
5. Return the existing task (not a new one)

This works with both blocking and non-blocking modes. Combined with
the multi-turn lifecycle change (PR 1), this enables the full A2A
multi-turn flow: client sends initial message → agent responds or
asks for input → client sends follow-up → agent continues.

Design doc: .design/a2a-task-followup.md
22 tests covering follow-up message routing:
- Valid/unknown/terminal/wrong-project/wrong-agent task ID handling
- Task state transitions (input-required → working)
- Blocking timeout/error/cancel/success cleanup paths
- Non-blocking registration and send-failure cleanup
- Concurrent follow-ups on same task
- Message content translation
- Server-level TaskID passthrough and error handling

Bugs fixed during review:
- Blocking success path leaked activeTask (added defer unregister)
- Non-blocking send failure didn't mark task failed or unregister
Fixes from code review:
- Blocking success: refresh task timestamp with UpdateTaskState(working)
- Send failure: mark task as failed + unregister activeTask
- Timeout/cancel: mark task as failed
- Added tests verifying DB state after each path
Found during 12-cycle debug/refactor:
- Fixed edge cases in follow-up routing and state management
- Added test coverage for discovered paths
- 3 consecutive clean cycles after fixes
@zeroasterisk

Copy link
Copy Markdown
Contributor Author

3 review rounds complete. Fixed mock interface compatibility after upstream added ResetAuth to AgentService. Rebased on upstream/main. All tests pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant