feat(a2a-bridge): add gRPC transport#317
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements gRPC transport support for the A2A bridge, introducing a gRPC server adapter, proto definitions, and corresponding tests. The review feedback highlights several issues in the gRPC server implementation: a parsing bug in resolveProject due to fmt.Sscanf capturing too much of the tenant string, potential data loss in internalPartToProto where structured p.Data is ignored, and missing validation for the required id field in both GetTaskPushNotificationConfig and DeleteTaskPushNotificationConfig handlers.
| func (s *GRPCServer) resolveProject(tenant string) (projectSlug, agentSlug string, err error) { | ||
| if tenant == "" { | ||
| // Use first configured project if available, but no agent slug. | ||
| if len(s.config.Projects) > 0 { | ||
| return s.config.Projects[0].Slug, "", fmt.Errorf("tenant is required to identify the agent") | ||
| } | ||
| return "", "", fmt.Errorf("tenant is required") | ||
| } | ||
| // Parse "projects/<projectSlug>/agents/<agentSlug>" format. | ||
| var p, a string | ||
| n, _ := fmt.Sscanf(tenant, "projects/%s", &p) | ||
| if n == 1 { | ||
| // Check for /agents/ suffix | ||
| var rest string | ||
| n2, _ := fmt.Sscanf(tenant, "projects/"+p+"/agents/%s", &rest) | ||
| if n2 == 1 { | ||
| return p, rest, nil | ||
| } | ||
| return p, "", fmt.Errorf("tenant missing agent slug: %s", tenant) | ||
| } | ||
| // Try simple "projectSlug/agentSlug" format. | ||
| n, _ = fmt.Sscanf(tenant, "%s", &p) | ||
| if n == 1 { | ||
| parts := splitTenant(tenant) | ||
| if len(parts) == 2 { | ||
| return parts[0], parts[1], nil | ||
| } | ||
| return tenant, "", fmt.Errorf("tenant missing agent slug: %s", tenant) | ||
| } | ||
|
|
||
| _ = a | ||
| return "", "", fmt.Errorf("invalid tenant format: %s", tenant) | ||
| } |
There was a problem hiding this comment.
The current implementation of resolveProject uses fmt.Sscanf with %s to parse the tenant string. In Go, %s matches any sequence of non-space characters, meaning it will consume the entire rest of the string (including / characters). As a result, parsing "projects/{projectSlug}/agents/{agentSlug}" will fail because p will capture "my-project/agents/my-agent", causing the subsequent Sscanf to fail.
Using strings.HasPrefix and strings.Split is much more robust, readable, and correct. Please make sure to import "strings" in this file.
func (s *GRPCServer) resolveProject(tenant string) (projectSlug, agentSlug string, err error) {
if tenant == "" {
if len(s.config.Projects) > 0 {
return s.config.Projects[0].Slug, "", fmt.Errorf("tenant is required to identify the agent")
}
return "", "", fmt.Errorf("tenant is required")
}
// Handle "projects/{projectSlug}/agents/{agentSlug}" format
if strings.HasPrefix(tenant, "projects/") {
trimmed := strings.TrimPrefix(tenant, "projects/")
parts := strings.Split(trimmed, "/agents/")
if len(parts) == 2 && parts[0] != "" && parts[1] != "" {
return parts[0], parts[1], nil
}
return "", "", fmt.Errorf("invalid tenant format: %s", tenant)
}
// Handle simple "projectSlug/agentSlug" format
parts := strings.Split(tenant, "/")
if len(parts) == 2 && parts[0] != "" && parts[1] != "" {
return parts[0], parts[1], nil
}
return "", "", fmt.Errorf("invalid tenant format: %s", tenant)
}| func internalPartToProto(p Part) *pb.Part { | ||
| pbPart := &pb.Part{ | ||
| MediaType: p.MediaType, | ||
| } | ||
| switch { | ||
| case p.Text != "": | ||
| pbPart.Content = &pb.Part_Text{Text: p.Text} | ||
| case p.URL != "": | ||
| pbPart.Content = &pb.Part_Url{Url: p.URL} | ||
| } | ||
| return pbPart | ||
| } |
There was a problem hiding this comment.
The internalPartToProto function currently ignores p.Data (structured data). If the bridge returns a task with artifacts containing structured data, this data will be lost when translating back to protobuf, and the client will receive an empty part. We should convert p.Data back to pb.Part_Data using structpb.NewValue(p.Data). Please make sure to import "google.golang.org/protobuf/types/known/structpb" in this file.
func internalPartToProto(p Part) *pb.Part {
pbPart := &pb.Part{
MediaType: p.MediaType,
}
switch {
case p.Text != "":
pbPart.Content = &pb.Part_Text{Text: p.Text}
case p.URL != "":
pbPart.Content = &pb.Part_Url{Url: p.URL}
case p.Data != nil:
if val, err := structpb.NewValue(p.Data); err == nil {
pbPart.Content = &pb.Part_Data{Data: val}
}
}
return pbPart
}| func (s *GRPCServer) GetTaskPushNotificationConfig(ctx context.Context, req *pb.GetTaskPushNotificationConfigRequest) (*pb.TaskPushNotificationConfig, error) { | ||
| if req.GetTaskId() == "" { | ||
| return nil, grpcstatus.Errorf(codes.InvalidArgument, "task_id is required") | ||
| } | ||
|
|
There was a problem hiding this comment.
The id field is marked as REQUIRED in the GetTaskPushNotificationConfigRequest protobuf message, but it is not validated here. If a client sends an empty ID, it will query the database and then return NotFound instead of InvalidArgument. We should validate that id is not empty.
func (s *GRPCServer) GetTaskPushNotificationConfig(ctx context.Context, req *pb.GetTaskPushNotificationConfigRequest) (*pb.TaskPushNotificationConfig, error) {
if req.GetTaskId() == "" {
return nil, grpcstatus.Errorf(codes.InvalidArgument, "task_id is required")
}
if req.GetId() == "" {
return nil, grpcstatus.Errorf(codes.InvalidArgument, "id is required")
}| func (s *GRPCServer) DeleteTaskPushNotificationConfig(ctx context.Context, req *pb.DeleteTaskPushNotificationConfigRequest) (*emptypb.Empty, error) { | ||
| if req.GetTaskId() == "" { | ||
| return nil, grpcstatus.Errorf(codes.InvalidArgument, "task_id is required") | ||
| } | ||
|
|
There was a problem hiding this comment.
The id field is marked as REQUIRED in the DeleteTaskPushNotificationConfigRequest protobuf message, but it is not validated here. We should validate that id is not empty to prevent calling the underlying bridge method with an empty ID.
func (s *GRPCServer) DeleteTaskPushNotificationConfig(ctx context.Context, req *pb.DeleteTaskPushNotificationConfigRequest) (*emptypb.Empty, error) {
if req.GetTaskId() == "" {
return nil, grpcstatus.Errorf(codes.InvalidArgument, "task_id is required")
}
if req.GetId() == "" {
return nil, grpcstatus.Errorf(codes.InvalidArgument, "id is required")
}Add a gRPC server that implements the A2A protocol's A2AServiceServer interface, enabling gRPC clients to interact with Scion agents. The gRPC server is a thin adapter layer — each RPC method translates protobuf types to the bridge's internal types, delegates to the existing Bridge methods (same ones the JSON-RPC server uses), and translates responses back. No business logic is duplicated. Implemented RPCs: - SendMessage (unary, blocking/non-blocking) - SendStreamingMessage (server-streaming) - GetTask, ListTasks, CancelTask - SubscribeToTask (server-streaming) - CreateTaskPushNotificationConfig - GetTaskPushNotificationConfig - ListTaskPushNotificationConfigs - DeleteTaskPushNotificationConfig - GetExtendedAgentCard Configuration: set bridge.grpc_listen_address in config YAML to enable the gRPC listener (e.g. ":9443"). If unset, only the HTTP JSON-RPC server starts. Proto source: a2aproject/A2A/specification/a2a.proto Generated code: internal/a2apb/ Design doc: .design/a2a-grpc-transport.md
Fixes from code review:
1. resolveProject parsing bug: fmt.Sscanf with %s captured the entire
tenant string. Replaced with string splitting on '/'. Now correctly
parses "projects/{project}/agents/{agent}" and "{project}/{agent}".
2. Data loss in internalPartToProto: structured Data field was silently
dropped. Now marshals Data to JSON and includes it as a text part.
3. Missing validation: GetTaskPushNotificationConfig and
DeleteTaskPushNotificationConfig didn't validate the required 'id'
field. Added validation returning codes.InvalidArgument.
Added tests for all three fixes.
Found during debug/refactor review: - Part URL field was lost in protobuf→internal translation - Artifact metadata (name) was lost in internal→protobuf translation Added tests verifying round-trip fidelity for both cases.
- Fixed edge cases in type translation and error handling - Added tests for discovered paths - 3 clean cycles after fixes
c8412a3 to
72e92a4
Compare
|
Round 3 review: 4 more bugs found and fixed. This is the PR that keeps producing findings — the gRPC type translation layer has many edge cases. All fixed, 3 clean cycles after. Rebased on upstream/main, all tests pass. |
|
Closing in favor of #362 (a2a-go SDK migration) which replaces the hand-rolled gRPC with the official SDK's transport layer, providing JSON-RPC + gRPC + REST from a single AgentExecutor implementation. |
Summary
Add gRPC transport to the A2A bridge, implementing all 11 A2A protocol RPCs. Thin adapter layer — delegates to the same Bridge methods the JSON-RPC server uses, zero duplicated business logic.
All RPCs: SendMessage, SendStreamingMessage, GetTask, ListTasks, CancelTask, SubscribeToTask, Create/Get/List/DeleteTaskPushNotificationConfig, GetExtendedAgentCard.
Config: set
bridge.grpc_listen_address: ":9443"to enable.Design doc:
.design/a2a-grpc-transport.mdTest plan
Note: if #315 (follow-up/SendMessage signature change) merges first, trivial 1-line fix needed (add empty taskID param).