feat(a2a-bridge): add gRPC and REST transport support#363
Conversation
Replace hand-rolled JSON-RPC server with the official a2a-go SDK (github.com/a2aproject/a2a-go/v2). This gives us spec-compliant protocol handling, built-in streaming, and a foundation for multi-transport support (gRPC, REST). Key changes: - New ScionExecutor (executor.go) implements a2asrv.AgentExecutor, bridging SDK events to/from Scion Hub message routing - server.go simplified: delegates JSON-RPC to SDK handler, keeps multi-project routing, auth middleware, agent cards - translate.go: added SDK-compatible type translation functions (TranslateA2APartsToScion, TranslateScionToA2AParts, etc.) - bridge.go: added sdkRequestHandler field for multi-transport use - main.go: wires SDK executor → handler → JSON-RPC transport Preserved: Hub routing, broker plugin, agent lookup, context resolution, auto-provisioning, auth, metrics, rate limiting.
Enable optional gRPC and REST transports for the A2A bridge using the SDK's built-in handlers. Both are opt-in via config — existing JSON-RPC deployments are unaffected. Config: - bridge.grpc_listen_address: starts gRPC server (a2agrpc.NewHandler) - bridge.rest_listen_address: starts REST server (a2asrv.NewRESTHandler) Both transports share the same SDK RequestHandler and ScionExecutor, so routing, auth, and Hub integration are identical across protocols.
There was a problem hiding this comment.
Code Review
This pull request migrates the scion-a2a-bridge to the official a2a-go SDK, replacing custom JSON-RPC handling, task lifecycle management, and streaming with spec-compliant SDK implementations while introducing gRPC and REST transports. Feedback on these changes highlights several critical issues: grpcServer.GracefulStop() could block shutdown indefinitely on active streaming connections; gRPC and REST requests will fail because routing metadata is only injected in the HTTP JSON-RPC handler; potential nil pointer dereferences exist in ScionExecutor.Execute and TranslateScionToA2AParts; and disabling WriteTimeout globally on the HTTP server introduces a Slowloris DoS vulnerability.
| if grpcServer != nil { | ||
| grpcServer.GracefulStop() | ||
| log.Info("gRPC server stopped") | ||
| } |
There was a problem hiding this comment.
grpcServer.GracefulStop() blocks until all active connections and RPCs are closed. If there are active long-lived streaming connections (which are common in A2A), this call can block indefinitely, preventing the rest of the shutdown sequence (like stopping the REST server and draining background goroutines via b.Shutdown()) from executing within the 30-second shutdown timeout.
Consider wrapping the graceful shutdown with a timeout using a select block.
| if grpcServer != nil { | |
| grpcServer.GracefulStop() | |
| log.Info("gRPC server stopped") | |
| } | |
| if grpcServer != nil { | |
| stopped := make(chan struct{}) | |
| go func() { | |
| grpcServer.GracefulStop() | |
| close(stopped) | |
| }() | |
| select { | |
| case <-stopped: | |
| log.Info("gRPC server stopped gracefully") | |
| case <-shutdownCtx.Done(): | |
| grpcServer.Stop() | |
| log.Info("gRPC server stopped forcefully") | |
| } | |
| } |
| route, ok := RouteInfoFrom(ctx) | ||
| if !ok { | ||
| yield(nil, fmt.Errorf("missing route info in context: %w", a2a.ErrInternalError)) | ||
| return | ||
| } |
There was a problem hiding this comment.
RouteInfo is only injected into the context within Server.handleJSONRPC (in server.go), which is specific to the HTTP JSON-RPC transport. For gRPC and REST requests, this handler is bypassed, meaning RouteInfoFrom(ctx) will always return ok = false. This will cause all gRPC and REST requests to fail immediately with missing route info in context.
Consider extracting the project and agent slugs directly from the SDK's ExecutorContext if available, or implement gRPC interceptors and REST middleware to extract and inject RouteInfo into the context for those transports.
| taskID := execCtx.TaskID | ||
| contextID := execCtx.ContextID |
There was a problem hiding this comment.
execCtx is a pointer (*a2asrv.ExecutorContext) and could potentially be nil. Accessing execCtx.TaskID or execCtx.ContextID without a nil check can cause a runtime panic.
| taskID := execCtx.TaskID | |
| contextID := execCtx.ContextID | |
| if execCtx == nil { | |
| yield(nil, fmt.Errorf("executor context is nil: %w", a2a.ErrInternalError)) | |
| return | |
| } | |
| taskID := execCtx.TaskID | |
| contextID := execCtx.ContextID |
| func TranslateScionToA2AParts(msg *messages.StructuredMessage) (*a2a.Message, []*a2a.Artifact) { | ||
| var sdkParts []*a2a.Part | ||
| sdkParts = append(sdkParts, &a2a.Part{Content: a2a.Text(msg.Msg), MediaType: "text/plain"}) |
There was a problem hiding this comment.
msg is a pointer (*messages.StructuredMessage) and could be nil. Accessing msg.Msg or msg.Attachments without a nil check can cause a runtime panic.
func TranslateScionToA2AParts(msg *messages.StructuredMessage) (*a2a.Message, []*a2a.Artifact) {
if msg == nil {
return nil, nil
}
var sdkParts []*a2a.Part
sdkParts = append(sdkParts, &a2a.Part{Content: a2a.Text(msg.Msg), MediaType: "text/plain"})| Handler: srv.Handler(), | ||
| ReadTimeout: 30 * time.Second, | ||
| WriteTimeout: 30 * time.Second, | ||
| WriteTimeout: 0, // Disabled for SSE connections; SDK handles timeouts. |
There was a problem hiding this comment.
Setting WriteTimeout: 0 globally on the main HTTP server disables write timeouts for all endpoints (including non-SSE ones like standard JSON-RPC POST, healthz, readyz, metrics, etc.). This exposes the server to Slowloris-style denial of service attacks where clients read responses extremely slowly to exhaust server resources.
If the SDK's SSE handler does not automatically disable write deadlines dynamically, consider keeping a safe global WriteTimeout (e.g., 30 seconds) and using a reverse proxy (like Caddy or Nginx) to handle timeouts and connection limits, or use a middleware to limit concurrent connections.
12-cycle review findings: 1. Cancel() silently failed when RouteInfoFrom(ctx) returned no route info — interrupt was never sent to the agent with no log. Fixed: check ok return, log warning. 2. Per-agent cards said streaming:false while registry card said streaming:true. Fixed: consistent streaming:true on both. 3. Executor refactored to use SDK constructors for artifact events instead of manual struct building. 4. Added translate_test.go with 153 lines of type translation tests. 5. Added server test coverage for new SDK handler paths. 306 lines of fixes and tests. All tests pass.
|
Review fixes pushed. The review fixes from PR #362 are included in this branch (it's stacked on top). 4 bugs fixed, 306 lines of fixes/tests added. All tests pass. |
Summary
a2agrpc.NewHandler(config:bridge.grpc_listen_address)a2asrv.NewRESTHandler(config:bridge.rest_listen_address)RequestHandlerandScionExecutorDepends on #362
Test plan
grpc_listen_address, verify gRPC service respondsrest_listen_address, verify REST endpoints respond