Skip to content

feat(a2a-bridge): add gRPC transport#317

Closed
zeroasterisk wants to merge 4 commits into
GoogleCloudPlatform:mainfrom
zeroasterisk:a2a/grpc-transport
Closed

feat(a2a-bridge): add gRPC transport#317
zeroasterisk wants to merge 4 commits into
GoogleCloudPlatform:mainfrom
zeroasterisk:a2a/grpc-transport

Conversation

@zeroasterisk

Copy link
Copy Markdown
Contributor

Summary

Add gRPC transport to the A2A bridge, implementing all 11 A2A protocol RPCs. Thin adapter layer — delegates to the same Bridge methods the JSON-RPC server uses, zero duplicated business logic.

All RPCs: SendMessage, SendStreamingMessage, GetTask, ListTasks, CancelTask, SubscribeToTask, Create/Get/List/DeleteTaskPushNotificationConfig, GetExtendedAgentCard.

Config: set bridge.grpc_listen_address: ":9443" to enable.

Design doc: .design/a2a-grpc-transport.md

Test plan

  • 22 new tests covering type translation, RPC handling, error mapping
  • All existing tests pass

Note: if #315 (follow-up/SendMessage signature change) merges first, trivial 1-line fix needed (add empty taskID param).

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements gRPC transport support for the A2A bridge, introducing a gRPC server adapter, proto definitions, and corresponding tests. The review feedback highlights several issues in the gRPC server implementation: a parsing bug in resolveProject due to fmt.Sscanf capturing too much of the tenant string, potential data loss in internalPartToProto where structured p.Data is ignored, and missing validation for the required id field in both GetTaskPushNotificationConfig and DeleteTaskPushNotificationConfig handlers.

Comment on lines +60 to +92
func (s *GRPCServer) resolveProject(tenant string) (projectSlug, agentSlug string, err error) {
if tenant == "" {
// Use first configured project if available, but no agent slug.
if len(s.config.Projects) > 0 {
return s.config.Projects[0].Slug, "", fmt.Errorf("tenant is required to identify the agent")
}
return "", "", fmt.Errorf("tenant is required")
}
// Parse "projects/<projectSlug>/agents/<agentSlug>" format.
var p, a string
n, _ := fmt.Sscanf(tenant, "projects/%s", &p)
if n == 1 {
// Check for /agents/ suffix
var rest string
n2, _ := fmt.Sscanf(tenant, "projects/"+p+"/agents/%s", &rest)
if n2 == 1 {
return p, rest, nil
}
return p, "", fmt.Errorf("tenant missing agent slug: %s", tenant)
}
// Try simple "projectSlug/agentSlug" format.
n, _ = fmt.Sscanf(tenant, "%s", &p)
if n == 1 {
parts := splitTenant(tenant)
if len(parts) == 2 {
return parts[0], parts[1], nil
}
return tenant, "", fmt.Errorf("tenant missing agent slug: %s", tenant)
}

_ = a
return "", "", fmt.Errorf("invalid tenant format: %s", tenant)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation of resolveProject uses fmt.Sscanf with %s to parse the tenant string. In Go, %s matches any sequence of non-space characters, meaning it will consume the entire rest of the string (including / characters). As a result, parsing "projects/{projectSlug}/agents/{agentSlug}" will fail because p will capture "my-project/agents/my-agent", causing the subsequent Sscanf to fail.

Using strings.HasPrefix and strings.Split is much more robust, readable, and correct. Please make sure to import "strings" in this file.

func (s *GRPCServer) resolveProject(tenant string) (projectSlug, agentSlug string, err error) {
	if tenant == "" {
		if len(s.config.Projects) > 0 {
			return s.config.Projects[0].Slug, "", fmt.Errorf("tenant is required to identify the agent")
		}
		return "", "", fmt.Errorf("tenant is required")
	}

	// Handle "projects/{projectSlug}/agents/{agentSlug}" format
	if strings.HasPrefix(tenant, "projects/") {
		trimmed := strings.TrimPrefix(tenant, "projects/")
		parts := strings.Split(trimmed, "/agents/")
		if len(parts) == 2 && parts[0] != "" && parts[1] != "" {
			return parts[0], parts[1], nil
		}
		return "", "", fmt.Errorf("invalid tenant format: %s", tenant)
	}

	// Handle simple "projectSlug/agentSlug" format
	parts := strings.Split(tenant, "/")
	if len(parts) == 2 && parts[0] != "" && parts[1] != "" {
		return parts[0], parts[1], nil
	}

	return "", "", fmt.Errorf("invalid tenant format: %s", tenant)
}

Comment on lines +386 to +397
func internalPartToProto(p Part) *pb.Part {
pbPart := &pb.Part{
MediaType: p.MediaType,
}
switch {
case p.Text != "":
pbPart.Content = &pb.Part_Text{Text: p.Text}
case p.URL != "":
pbPart.Content = &pb.Part_Url{Url: p.URL}
}
return pbPart
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The internalPartToProto function currently ignores p.Data (structured data). If the bridge returns a task with artifacts containing structured data, this data will be lost when translating back to protobuf, and the client will receive an empty part. We should convert p.Data back to pb.Part_Data using structpb.NewValue(p.Data). Please make sure to import "google.golang.org/protobuf/types/known/structpb" in this file.

func internalPartToProto(p Part) *pb.Part {
	pbPart := &pb.Part{
		MediaType: p.MediaType,
	}
	switch {
	case p.Text != "":
		pbPart.Content = &pb.Part_Text{Text: p.Text}
	case p.URL != "":
		pbPart.Content = &pb.Part_Url{Url: p.URL}
	case p.Data != nil:
		if val, err := structpb.NewValue(p.Data); err == nil {
			pbPart.Content = &pb.Part_Data{Data: val}
		}
	}
	return pbPart
}

Comment on lines +268 to +272
func (s *GRPCServer) GetTaskPushNotificationConfig(ctx context.Context, req *pb.GetTaskPushNotificationConfigRequest) (*pb.TaskPushNotificationConfig, error) {
if req.GetTaskId() == "" {
return nil, grpcstatus.Errorf(codes.InvalidArgument, "task_id is required")
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The id field is marked as REQUIRED in the GetTaskPushNotificationConfigRequest protobuf message, but it is not validated here. If a client sends an empty ID, it will query the database and then return NotFound instead of InvalidArgument. We should validate that id is not empty.

func (s *GRPCServer) GetTaskPushNotificationConfig(ctx context.Context, req *pb.GetTaskPushNotificationConfigRequest) (*pb.TaskPushNotificationConfig, error) {
	if req.GetTaskId() == "" {
		return nil, grpcstatus.Errorf(codes.InvalidArgument, "task_id is required")
	}
	if req.GetId() == "" {
		return nil, grpcstatus.Errorf(codes.InvalidArgument, "id is required")
	}

Comment on lines +321 to +325
func (s *GRPCServer) DeleteTaskPushNotificationConfig(ctx context.Context, req *pb.DeleteTaskPushNotificationConfigRequest) (*emptypb.Empty, error) {
if req.GetTaskId() == "" {
return nil, grpcstatus.Errorf(codes.InvalidArgument, "task_id is required")
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The id field is marked as REQUIRED in the DeleteTaskPushNotificationConfigRequest protobuf message, but it is not validated here. We should validate that id is not empty to prevent calling the underlying bridge method with an empty ID.

func (s *GRPCServer) DeleteTaskPushNotificationConfig(ctx context.Context, req *pb.DeleteTaskPushNotificationConfigRequest) (*emptypb.Empty, error) {
	if req.GetTaskId() == "" {
		return nil, grpcstatus.Errorf(codes.InvalidArgument, "task_id is required")
	}
	if req.GetId() == "" {
		return nil, grpcstatus.Errorf(codes.InvalidArgument, "id is required")
	}

Add a gRPC server that implements the A2A protocol's A2AServiceServer
interface, enabling gRPC clients to interact with Scion agents.

The gRPC server is a thin adapter layer — each RPC method translates
protobuf types to the bridge's internal types, delegates to the
existing Bridge methods (same ones the JSON-RPC server uses), and
translates responses back. No business logic is duplicated.

Implemented RPCs:
- SendMessage (unary, blocking/non-blocking)
- SendStreamingMessage (server-streaming)
- GetTask, ListTasks, CancelTask
- SubscribeToTask (server-streaming)
- CreateTaskPushNotificationConfig
- GetTaskPushNotificationConfig
- ListTaskPushNotificationConfigs
- DeleteTaskPushNotificationConfig
- GetExtendedAgentCard

Configuration: set bridge.grpc_listen_address in config YAML to
enable the gRPC listener (e.g. ":9443"). If unset, only the HTTP
JSON-RPC server starts.

Proto source: a2aproject/A2A/specification/a2a.proto
Generated code: internal/a2apb/

Design doc: .design/a2a-grpc-transport.md
Fixes from code review:

1. resolveProject parsing bug: fmt.Sscanf with %s captured the entire
   tenant string. Replaced with string splitting on '/'. Now correctly
   parses "projects/{project}/agents/{agent}" and "{project}/{agent}".

2. Data loss in internalPartToProto: structured Data field was silently
   dropped. Now marshals Data to JSON and includes it as a text part.

3. Missing validation: GetTaskPushNotificationConfig and
   DeleteTaskPushNotificationConfig didn't validate the required 'id'
   field. Added validation returning codes.InvalidArgument.

Added tests for all three fixes.
Found during debug/refactor review:
- Part URL field was lost in protobuf→internal translation
- Artifact metadata (name) was lost in internal→protobuf translation
Added tests verifying round-trip fidelity for both cases.
- Fixed edge cases in type translation and error handling
- Added tests for discovered paths
- 3 clean cycles after fixes
@zeroasterisk

Copy link
Copy Markdown
Contributor Author

Round 3 review: 4 more bugs found and fixed. This is the PR that keeps producing findings — the gRPC type translation layer has many edge cases. All fixed, 3 clean cycles after. Rebased on upstream/main, all tests pass.

@zeroasterisk

Copy link
Copy Markdown
Contributor Author

Closing in favor of #362 (a2a-go SDK migration) which replaces the hand-rolled gRPC with the official SDK's transport layer, providing JSON-RPC + gRPC + REST from a single AgentExecutor implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant