feat(a2a-bridge): adopt a2a-go SDK for protocol handling#362
feat(a2a-bridge): adopt a2a-go SDK for protocol handling#362zeroasterisk wants to merge 1 commit into
Conversation
Replace hand-rolled JSON-RPC server with the official a2a-go SDK (github.com/a2aproject/a2a-go/v2). This gives us spec-compliant protocol handling, built-in streaming, and a foundation for multi-transport support (gRPC, REST). Key changes: - New ScionExecutor (executor.go) implements a2asrv.AgentExecutor, bridging SDK events to/from Scion Hub message routing - server.go simplified: delegates JSON-RPC to SDK handler, keeps multi-project routing, auth middleware, agent cards - translate.go: added SDK-compatible type translation functions (TranslateA2APartsToScion, TranslateScionToA2AParts, etc.) - bridge.go: added sdkRequestHandler field for multi-transport use - main.go: wires SDK executor → handler → JSON-RPC transport Preserved: Hub routing, broker plugin, agent lookup, context resolution, auto-provisioning, auth, metrics, rate limiting.
There was a problem hiding this comment.
Code Review
This pull request migrates the scion-a2a-bridge to the official a2a-go SDK, replacing custom JSON-RPC and task management with the SDK's spec-compliant implementations. Feedback on the changes focuses on robustness and security: it is recommended to add defensive checks for closed or nil channels in executor.go and nil message pointers in translate.go, validate context routing info during task cancellation, and avoid disabling the global HTTP WriteTimeout to protect against Slowloris attacks.
| case response := <-responseCh: | ||
| agentMsg, artifacts := TranslateScionToA2AParts(response) |
There was a problem hiding this comment.
Reading from responseCh without checking the ok status or checking if response is nil can lead to a nil pointer dereference and panic if the channel is closed or receives a nil message. Adding defensive checks ensures robustness.
case response, ok := <-responseCh:
if !ok {
failMsg := a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("Response channel closed unexpectedly"))
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateFailed, failMsg), nil)
return
}
if response == nil {
failMsg := a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("Received empty response from agent"))
yield(a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateFailed, failMsg), nil)
return
}
agentMsg, artifacts := TranslateScionToA2AParts(response)| func TranslateScionToA2AParts(msg *messages.StructuredMessage) (*a2a.Message, []*a2a.Artifact) { | ||
| var sdkParts []*a2a.Part | ||
| sdkParts = append(sdkParts, &a2a.Part{Content: a2a.Text(msg.Msg), MediaType: "text/plain"}) |
There was a problem hiding this comment.
In accordance with defensive programming practices, msg should be checked for nil before accessing its fields (msg.Msg, msg.Attachments, msg.Type) to prevent potential nil pointer dereferences.
func TranslateScionToA2AParts(msg *messages.StructuredMessage) (*a2a.Message, []*a2a.Artifact) {
if msg == nil {
return nil, nil
}
var sdkParts []*a2a.Part
sdkParts = append(sdkParts, &a2a.Part{Content: a2a.Text(msg.Msg), MediaType: "text/plain"})| if execCtx.StoredTask != nil && e.bridge.hubClient != nil { | ||
| route, _ := RouteInfoFrom(ctx) | ||
| if agent := e.bridge.lookupAgent(ctx, route.ProjectSlug, route.AgentSlug); agent != nil { | ||
| interruptMsg := &messages.StructuredMessage{ | ||
| Version: 1, | ||
| Timestamp: time.Now().UTC().Format(time.RFC3339), | ||
| Sender: fmt.Sprintf("user:%s", e.bridge.config.Hub.User), | ||
| Recipient: fmt.Sprintf("agent:%s", route.AgentSlug), | ||
| Msg: "Task cancelled by A2A client.", | ||
| Type: messages.TypeInstruction, | ||
| Metadata: map[string]string{"a2aTaskId": string(taskID)}, | ||
| } | ||
| if err := e.bridge.hubClient.Agents().SendStructuredMessage(ctx, agent.ID, interruptMsg, true, false, false); err != nil { | ||
| e.log.Error("failed to send cancel interrupt", "error", err, "task_id", taskID) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The check execCtx.StoredTask != nil is unnecessary because StoredTask is not used within the block. Additionally, RouteInfoFrom(ctx) should be checked for success (ok) to avoid calling lookupAgent with empty strings if the routing info is missing.
| if execCtx.StoredTask != nil && e.bridge.hubClient != nil { | |
| route, _ := RouteInfoFrom(ctx) | |
| if agent := e.bridge.lookupAgent(ctx, route.ProjectSlug, route.AgentSlug); agent != nil { | |
| interruptMsg := &messages.StructuredMessage{ | |
| Version: 1, | |
| Timestamp: time.Now().UTC().Format(time.RFC3339), | |
| Sender: fmt.Sprintf("user:%s", e.bridge.config.Hub.User), | |
| Recipient: fmt.Sprintf("agent:%s", route.AgentSlug), | |
| Msg: "Task cancelled by A2A client.", | |
| Type: messages.TypeInstruction, | |
| Metadata: map[string]string{"a2aTaskId": string(taskID)}, | |
| } | |
| if err := e.bridge.hubClient.Agents().SendStructuredMessage(ctx, agent.ID, interruptMsg, true, false, false); err != nil { | |
| e.log.Error("failed to send cancel interrupt", "error", err, "task_id", taskID) | |
| } | |
| } | |
| } | |
| if e.bridge.hubClient != nil { | |
| if route, ok := RouteInfoFrom(ctx); ok { | |
| if agent := e.bridge.lookupAgent(ctx, route.ProjectSlug, route.AgentSlug); agent != nil { | |
| interruptMsg := &messages.StructuredMessage{ | |
| Version: 1, | |
| Timestamp: time.Now().UTC().Format(time.RFC3339), | |
| Sender: fmt.Sprintf("user:%s", e.bridge.config.Hub.User), | |
| Recipient: fmt.Sprintf("agent:%s", route.AgentSlug), | |
| Msg: "Task cancelled by A2A client.", | |
| Type: messages.TypeInstruction, | |
| Metadata: map[string]string{"a2aTaskId": string(taskID)}, | |
| } | |
| if err := e.bridge.hubClient.Agents().SendStructuredMessage(ctx, agent.ID, interruptMsg, true, false, false); err != nil { | |
| e.log.Error("failed to send cancel interrupt", "error", err, "task_id", taskID) | |
| } | |
| } | |
| } | |
| } |
| Handler: srv.Handler(), | ||
| ReadTimeout: 30 * time.Second, | ||
| WriteTimeout: 30 * time.Second, | ||
| WriteTimeout: 0, // Disabled for SSE connections; SDK handles timeouts. |
There was a problem hiding this comment.
Disabling WriteTimeout globally on the HTTP server exposes it to Slowloris attacks and resource exhaustion. It is safer to keep a reasonable global WriteTimeout (e.g., 30 seconds) and disable or extend it per-request for SSE connections using http.ResponseController.
| WriteTimeout: 0, // Disabled for SSE connections; SDK handles timeouts. | |
| WriteTimeout: 30 * time.Second, // SSE connections should disable write deadlines individually via ResponseController. |
|
12-cycle review completed. 4 bugs found and fixed:
306 lines of fixes and tests. All tests pass. |
Summary
a2a-goSDK (github.com/a2aproject/a2a-go/v2)ScionExecutorimplementsa2asrv.AgentExecutorinterface, bridging SDK events to/from Scion Hub routinga2asrv.NewJSONRPCHandler— gets spec-compliant protocol handling, SSE streaming, and task lifecycle managementTest plan
go test ./...inextras/scion-a2a-bridge/)go build ./...)go vetclean/groves/path backward compatibility