Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 2 additions & 55 deletions go/core/internal/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"net/http"
"reflect"
"slices"
"strings"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/kagent-dev/kagent/go/api/v1alpha2"
"github.com/kagent-dev/kagent/go/core/internal/controller/provider"
agent_translator "github.com/kagent-dev/kagent/go/core/internal/controller/translator/agent"
mcputil "github.com/kagent-dev/kagent/go/core/internal/mcp"
"github.com/kagent-dev/kagent/go/core/internal/utils"
"github.com/kagent-dev/kagent/go/core/internal/version"
"github.com/modelcontextprotocol/go-sdk/mcp"
Expand Down Expand Up @@ -784,7 +784,7 @@ func (a *kagentReconciler) upsertToolServerForRemoteMCPServer(ctx context.Contex
return nil, fmt.Errorf("failed to store toolServer %s: %w", toolServer.Name, err)
}

tsp, err := a.createMcpTransport(ctx, remoteMcpServer)
tsp, err := mcputil.CreateMCPTransport(ctx, a.kube, remoteMcpServer)
if err != nil {
return nil, fmt.Errorf("failed to create client for toolServer %s: %w", toolServer.Name, err)
}
Expand All @@ -809,59 +809,6 @@ func (a *kagentReconciler) isNamespaceWatched(namespace string) bool {
return slices.Contains(a.watchedNamespaces, namespace)
}

func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.RemoteMCPServer) (mcp.Transport, error) {
headers, err := s.ResolveHeaders(ctx, a.kube)
if err != nil {
return nil, err
}

httpClient := newHTTPClient(headers)

switch s.Spec.Protocol {
case v1alpha2.RemoteMCPServerProtocolSse:
return &mcp.SSEClientTransport{
Endpoint: s.Spec.URL,
HTTPClient: httpClient,
}, nil
default:
return &mcp.StreamableClientTransport{
Endpoint: s.Spec.URL,
HTTPClient: httpClient,
}, nil
}
}

// go-sdk does not have a WithHeaders option when initializing transport
// so we need to create a custom HTTP client that adds headers to all requests.
func newHTTPClient(headers map[string]string) *http.Client {
if len(headers) == 0 {
return http.DefaultClient
}
return &http.Client{
Transport: &headerTransport{
headers: headers,
base: http.DefaultTransport,
},
}
}

// headerTransport is an http.RoundTripper that adds custom headers to requests.
type headerTransport struct {
headers map[string]string
base http.RoundTripper
}

func (t *headerTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req = req.Clone(req.Context())
for k, v := range t.headers {
req.Header.Set(k, v)
}
if t.base == nil {
t.base = http.DefaultTransport
}
return t.base.RoundTrip(req)
}

func (a *kagentReconciler) listTools(ctx context.Context, tsp mcp.Transport, toolServer *database.ToolServer) ([]*v1alpha2.MCPTool, error) {
impl := &mcp.Implementation{
Name: "kagent-controller",
Expand Down
43 changes: 40 additions & 3 deletions go/core/internal/mcp/mcp_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
)

// MCPHandler handles MCP requests and bridges them to A2A endpoints
// and tool server discovery/invocation.
type MCPHandler struct {
kubeClient client.Client
a2aBaseURL string
authenticator auth.AuthProvider
httpHandler *mcpsdk.StreamableHTTPHandler
server *mcpsdk.Server
a2aClients sync.Map
sessions sync.Map // cached MCP client sessions keyed by "Kind/namespace/name"
}

// Input types for MCP tools
Expand Down Expand Up @@ -92,6 +94,36 @@ func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator au
handler.handleInvokeAgent,
)

// Add list_tool_servers tool
mcpsdk.AddTool[ListToolServersInput, ListToolServersOutput](
server,
&mcpsdk.Tool{
Name: "list_tool_servers",
Description: "List all MCP tool servers in the cluster (RemoteMCPServer, Service, MCPServer)",
},
handler.handleListToolServers,
)

// Add list_tools tool
mcpsdk.AddTool[ListToolsInput, ListToolsOutput](
server,
&mcpsdk.Tool{
Name: "list_tools",
Description: "Connect to a tool server and list its available tools",
},
handler.handleListTools,
)

// Add call_tool tool
mcpsdk.AddTool[CallToolInput, CallToolOutput](
server,
&mcpsdk.Tool{
Name: "call_tool",
Description: "Invoke a specific tool on a specific tool server",
},
handler.handleCallTool,
)

// Create HTTP handler
handler.httpHandler = mcpsdk.NewStreamableHTTPHandler(
func(*http.Request) *mcpsdk.Server {
Expand Down Expand Up @@ -309,9 +341,14 @@ func (h *MCPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.httpHandler.ServeHTTP(w, r)
}

// Shutdown gracefully shuts down the MCP handler
// Shutdown gracefully shuts down the MCP handler and closes cached sessions.
func (h *MCPHandler) Shutdown(ctx context.Context) error {
// The new SDK doesn't have an explicit Shutdown method on StreamableHTTPHandler
// The server will be shut down when the context is cancelled
h.sessions.Range(func(key, value any) bool {
if session, ok := value.(*mcpsdk.ClientSession); ok {
session.Close()
}
h.sessions.Delete(key)
return true
})
return nil
}
Loading