diff --git a/mcp/client.go b/mcp/client.go index 1e0e18a4..fff96859 100644 --- a/mcp/client.go +++ b/mcp/client.go @@ -300,6 +300,28 @@ func (c *Client) Connect(ctx context.Context, t Transport, opts *ClientSessionOp if hc, ok := cs.mcpConn.(clientConnection); ok { hc.sessionUpdated(cs.state) } + subscribeParams := &SubscriptionsListenParams{} + if c.opts.ToolListChangedHandler != nil { + subscribeParams.Notifications.ToolsListChanged = true + } + if c.opts.PromptListChangedHandler != nil { + subscribeParams.Notifications.PromptsListChanged = true + } + if c.opts.ResourceListChangedHandler != nil { + subscribeParams.Notifications.ResourcesListChanged = true + } + if subscribeParams.Notifications.ToolsListChanged || + subscribeParams.Notifications.PromptsListChanged || + subscribeParams.Notifications.ResourcesListChanged { + // The listen blocks until the server cancels it. Run it in + // a goroutine so Connect can return; ClientSession.Close + // cancels its context to send notifications/cancelled. + listenCtx, cancelListen := context.WithCancel(context.Background()) + cs.listenCancel = cancelListen + go func() { + _ = cs.subscriptionsListen(listenCtx, subscribeParams) + }() + } return cs, nil } @@ -416,6 +438,7 @@ type ClientSession struct { conn *jsonrpc2.Connection client *Client keepaliveCancel context.CancelFunc + listenCancel context.CancelFunc mcpConn Connection // No mutex is (currently) required to guard the session state, because it is @@ -498,6 +521,9 @@ func (cs *ClientSession) Close() error { if cs.keepaliveCancel != nil { cs.keepaliveCancel() } + if cs.listenCancel != nil { + cs.listenCancel() + } err := cs.conn.Close() if cs.onClose != nil && cs.calledOnClose.CompareAndSwap(false, true) { @@ -1079,6 +1105,7 @@ var clientMethodInfos = map[string]methodInfo{ notificationLoggingMessage: newClientMethodInfo(clientMethod((*Client).callLoggingHandler), notification), notificationProgress: newClientMethodInfo(clientSessionMethod((*ClientSession).callProgressNotificationHandler), notification), notificationElicitationComplete: newClientMethodInfo(clientMethod((*Client).callElicitationCompleteHandler), notification|missingParamsOK), + notificationSubscriptionsAck: newClientMethodInfo(clientMethod((*Client).callSubscriptionsAckHandler), notification|missingParamsOK), } func (cs *ClientSession) sendingMethodInfos() map[string]methodInfo { @@ -1234,6 +1261,21 @@ func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribePar return err } +// SubscriptionsListen opens a SEP-2575 "subscriptions/listen" stream and +// blocks for the lifetime of the subscription. The server's first message on +// the stream is "notifications/subscriptions/acknowledged"; subsequent +// opted-in notifications (e.g. tools/list_changed) are delivered through the +// usual handlers registered in [ClientOptions]. +func (cs *ClientSession) subscriptionsListen(ctx context.Context, params *SubscriptionsListenParams) error { + params = injectRequestMeta(cs, params) + _, err := handleSend[*emptyResult](ctx, methodSubscriptionsListen, newClientRequest(cs, orZero[Params](params))) + return err +} + +func (c *Client) callSubscriptionsAckHandler(context.Context, *ClientRequest[*SubscriptionsAcknowledgedParams]) (Result, error) { + return nil, nil +} + func (c *Client) callToolChangedHandler(ctx context.Context, req *ToolListChangedRequest) (Result, error) { if h := c.opts.ToolListChangedHandler; h != nil { h(ctx, req) diff --git a/mcp/mcp_test.go b/mcp/mcp_test.go index 5c8e7d12..02f05618 100644 --- a/mcp/mcp_test.go +++ b/mcp/mcp_test.go @@ -12,6 +12,8 @@ import ( "fmt" "io" "log/slog" + "net/http" + "net/http/httptest" "net/url" "path/filepath" "runtime" @@ -2449,3 +2451,202 @@ func TestSetErrorPreservesContent(t *testing.T) { } var ctrCmpOpts = []cmp.Option{cmpopts.IgnoreUnexported(CallToolResult{}, GetPromptResult{}, ReadResourceResult{})} + +// runSubscriptionsListenTest exercises the SEP-2575 auto-listen flow end-to-end +// against the supplied transport pair. It captures every notification and the +// acknowledgment the client sees, then asserts: +// +// - the auto-listen issued by Client.Connect is acknowledged with a tagged +// subscription ID; +// - tool and prompt list-changed notifications are delivered to the matching +// handlers, each carrying the same subscription ID as the ack; +// - the subscription persists across multiple unrelated changes; +// - cs.Close() ends the subscription and further changes don't deliver. +func runSubscriptionsListenTest(t *testing.T, client *Client, server *Server, ct Transport, events chan subListenEvent) { + t.Helper() + + ctx, topCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer topCancel() + + cs, err := client.Connect(ctx, ct, &ClientSessionOptions{protocolVersion: protocolVersion20260630}) + if err != nil { + t.Fatalf("client connect: %v", err) + } + + waitFor := func(kind string) subListenEvent { + t.Helper() + select { + case e := <-events: + if e.kind != kind { + t.Fatalf("got event %q (id=%s), want kind %q", e.kind, e.id, kind) + } + return e + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for %q event", kind) + return subListenEvent{} + } + } + expectNoEvent := func(d time.Duration) { + t.Helper() + select { + case e := <-events: + t.Fatalf("unexpected event %q (id=%s)", e.kind, e.id) + case <-time.After(d): + } + } + + ack := waitFor("ack") + if ack.id == "" { + t.Fatalf("acknowledgment missing subscription ID") + } + + server.AddTool(&Tool{Name: "t2", InputSchema: &jsonschema.Schema{Type: "object"}}, nil) + if e := waitFor("tool"); e.id != ack.id { + t.Errorf("first tool notif id = %s, want %s", e.id, ack.id) + } + + server.AddPrompt(&Prompt{Name: "p2"}, nil) + if e := waitFor("prompt"); e.id != ack.id { + t.Errorf("first prompt notif id = %s, want %s", e.id, ack.id) + } + + server.AddTool(&Tool{Name: "t3", InputSchema: &jsonschema.Schema{Type: "object"}}, nil) + if e := waitFor("tool"); e.id != ack.id { + t.Errorf("second tool notif id = %s, want %s", e.id, ack.id) + } + server.AddPrompt(&Prompt{Name: "p3"}, nil) + if e := waitFor("prompt"); e.id != ack.id { + t.Errorf("second prompt notif id = %s, want %s", e.id, ack.id) + } + expectNoEvent(notificationDelay * 5) + + cs.Close() + time.Sleep(50 * time.Millisecond) + + server.AddTool(&Tool{Name: "t4", InputSchema: &jsonschema.Schema{Type: "object"}}, nil) + server.AddPrompt(&Prompt{Name: "p4"}, nil) + expectNoEvent(notificationDelay * 20) +} + +type subListenEvent struct { + kind string // "ack", "tool", "prompt" + id string // subscription ID from _meta, stringified for cross-encoding equality +} + +// newSubListenClient returns a client wired to push every ack and every +// list-changed notification it receives into events, tagged with the kind +// and the subscription ID extracted from _meta. +func newSubListenClient(events chan subListenEvent) *Client { + asEvent := func(kind string, raw any) subListenEvent { + return subListenEvent{kind, fmt.Sprint(raw)} + } + c := NewClient(testImpl, &ClientOptions{ + ToolListChangedHandler: func(_ context.Context, req *ToolListChangedRequest) { + events <- asEvent("tool", req.Params.Meta[MetaKeySubscriptionID]) + }, + PromptListChangedHandler: func(_ context.Context, req *PromptListChangedRequest) { + events <- asEvent("prompt", req.Params.Meta[MetaKeySubscriptionID]) + }, + }) + c.AddReceivingMiddleware(func(next MethodHandler) MethodHandler { + return func(ctx context.Context, method string, req Request) (Result, error) { + if method == notificationSubscriptionsAck { + if cr, ok := req.(*ClientRequest[*SubscriptionsAcknowledgedParams]); ok && cr.Params != nil { + events <- asEvent("ack", cr.Params.Meta[MetaKeySubscriptionID]) + } + } + return next(ctx, method, req) + } + }) + return c +} + +func newSubListenServer() *Server { + s := NewServer(testImpl, nil) + AddTool(s, &Tool{Name: "t1"}, sayHi) + s.AddPrompt(&Prompt{Name: "p1"}, nil) + return s +} + +func enableNewProtocol(t *testing.T) { + t.Helper() + orig := supportedProtocolVersions + supportedProtocolVersions = append([]string{protocolVersion20260630}, slices.Clone(orig)...) + t.Cleanup(func() { supportedProtocolVersions = orig }) +} + +// TestSubscriptionsListen_InMemory exercises the listen flow over the +// session-shared in-memory transport (semantically equivalent to STDIO). +// Cancellation here propagates via notifications/cancelled. +func TestSubscriptionsListen_InMemory(t *testing.T) { + enableNewProtocol(t) + events := make(chan subListenEvent, 64) + server := newSubListenServer() + ct, st := NewInMemoryTransports() + ss, err := server.Connect(context.Background(), st, nil) + if err != nil { + t.Fatalf("server connect: %v", err) + } + defer ss.Close() + runSubscriptionsListenTest(t, newSubListenClient(events), server, ct, events) +} + +// TestSubscriptionsListen_Streamable exercises the listen flow over a +// stateless HTTP server (SEP-2575). Each listen rides its own SSE response +// stream; cs.Close() tears it down. +func TestSubscriptionsListen_Streamable(t *testing.T) { + enableNewProtocol(t) + events := make(chan subListenEvent, 64) + server := newSubListenServer() + handler := NewStreamableHTTPHandler( + func(*http.Request) *Server { return server }, + &StreamableHTTPOptions{Stateless: true}, + ) + httpServer := httptest.NewServer(mustNotPanic(t, handler)) + defer httpServer.Close() + runSubscriptionsListenTest(t, newSubListenClient(events), server, + &StreamableClientTransport{Endpoint: httpServer.URL}, events) +} + +// TestSubscriptionsListen_NoHandlersNoListen verifies that a new-protocol +// client without any list-changed handlers registered does not open an +// auto-listen on connect, and therefore does not receive any acknowledgment +// or downstream notifications. +func TestSubscriptionsListen_NoHandlersNoListen(t *testing.T) { + enableNewProtocol(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + events := make(chan subListenEvent, 8) + server := newSubListenServer() + ct, st := NewInMemoryTransports() + ss, err := server.Connect(ctx, st, nil) + if err != nil { + t.Fatalf("server connect: %v", err) + } + defer ss.Close() + + c := NewClient(testImpl, nil) + c.AddReceivingMiddleware(func(next MethodHandler) MethodHandler { + return func(ctx context.Context, method string, req Request) (Result, error) { + if method == notificationSubscriptionsAck { + events <- subListenEvent{"ack", ""} + } + return next(ctx, method, req) + } + }) + cs, err := c.Connect(ctx, ct, &ClientSessionOptions{protocolVersion: protocolVersion20260630}) + if err != nil { + t.Fatalf("client connect: %v", err) + } + defer cs.Close() + + server.AddTool(&Tool{Name: "t2", InputSchema: &jsonschema.Schema{Type: "object"}}, nil) + + select { + case e := <-events: + t.Fatalf("unexpected event %q on no-handler client", e.kind) + case <-time.After(notificationDelay * 10): + } +} diff --git a/mcp/protocol.go b/mcp/protocol.go index acc7ec48..c5dd952d 100644 --- a/mcp/protocol.go +++ b/mcp/protocol.go @@ -1883,6 +1883,49 @@ type ResourceUpdatedNotificationParams struct { func (x *ResourceUpdatedNotificationParams) isParams() {} func (x *ResourceUpdatedNotificationParams) isNil() bool { return x == nil } +// NotificationSubscriptions describes the set of server-to-client +// notifications a client wishes to receive on a [SubscriptionsListenParams] +// stream. Each field is an explicit opt-in: a server MUST NOT push +// notifications of a type the client did not request. +type NotificationSubscriptions struct { + // ToolsListChanged opts in to "notifications/tools/list_changed". + ToolsListChanged bool `json:"toolsListChanged,omitempty"` + // PromptsListChanged opts in to "notifications/prompts/list_changed". + PromptsListChanged bool `json:"promptsListChanged,omitempty"` + // ResourcesListChanged opts in to "notifications/resources/list_changed". + ResourcesListChanged bool `json:"resourcesListChanged,omitempty"` + // ResourceSubscriptions enumerates the resource URIs for which the client + // wants "notifications/resources/updated". Replaces the legacy + // resources/subscribe RPC. + ResourceSubscriptions []string `json:"resourceSubscriptions,omitempty"` +} + +// SubscriptionsListenParams are the parameters for the +// "subscriptions/listen" RPC. +type SubscriptionsListenParams struct { + // Meta carries the per-request `_meta` triple. + Meta `json:"_meta,omitempty"` + // Notifications declares which notification types the client wants to + // receive on this stream. + Notifications NotificationSubscriptions `json:"notifications"` +} + +func (x *SubscriptionsListenParams) isParams() {} +func (x *SubscriptionsListenParams) isNil() bool { return x == nil } + +// SubscriptionsAcknowledgedParams are the parameters for the +// "notifications/subscriptions/acknowledged" notification, which the server +// MUST send as the first message on a subscriptions/listen stream. It carries +// the subset of the requested [NotificationSubscriptions] that the server has +// agreed to honor. +type SubscriptionsAcknowledgedParams struct { + Meta `json:"_meta,omitempty"` + Notifications NotificationSubscriptions `json:"notifications"` +} + +func (x *SubscriptionsAcknowledgedParams) isParams() {} +func (x *SubscriptionsAcknowledgedParams) isNil() bool { return x == nil } + // TODO(jba): add CompleteRequest and related types. // A request from the server to elicit additional information from the user via the client. @@ -2086,8 +2129,10 @@ const ( notificationRootsListChanged = "notifications/roots/list_changed" methodSetLevel = "logging/setLevel" methodSubscribe = "resources/subscribe" + methodSubscriptionsListen = "subscriptions/listen" notificationToolListChanged = "notifications/tools/list_changed" methodUnsubscribe = "resources/unsubscribe" + notificationSubscriptionsAck = "notifications/subscriptions/acknowledged" ) // Per-request _meta field names for the >= 2026-06-30 protocol version. @@ -2104,6 +2149,9 @@ const ( MetaKeyClientCapabilities = "io.modelcontextprotocol/clientCapabilities" // MetaKeyLogLevel identifies the desired log level for the request. MetaKeyLogLevel = "io.modelcontextprotocol/logLevel" + // MetaKeySubscriptionID identifies the subscriptions/listen request that an + // out-of-band notification belongs to. + MetaKeySubscriptionID = "io.modelcontextprotocol/subscriptionId" ) // UnsupportedProtocolVersionData is the SEP-2575 payload carried in the diff --git a/mcp/requests.go b/mcp/requests.go index 36368c99..64414caa 100644 --- a/mcp/requests.go +++ b/mcp/requests.go @@ -19,6 +19,7 @@ type ( ReadResourceRequest = ServerRequest[*ReadResourceParams] RootsListChangedRequest = ServerRequest[*RootsListChangedParams] SubscribeRequest = ServerRequest[*SubscribeParams] + SubscriptionsListenRequest = ServerRequest[*SubscriptionsListenParams] UnsubscribeRequest = ServerRequest[*UnsubscribeParams] ) diff --git a/mcp/server.go b/mcp/server.go index 912dea98..0b866a4b 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -643,12 +643,11 @@ func (s *Server) complete(ctx context.Context, req *CompleteRequest) (*CompleteR return s.opts.CompletionHandler(ctx, req) } -// Map from notification name to its corresponding params. The params have no fields, -// so a single struct can be reused. -var changeNotificationParams = map[string]Params{ - notificationToolListChanged: &ToolListChangedParams{}, - notificationPromptListChanged: &PromptListChangedParams{}, - notificationResourceListChanged: &ResourceListChangedParams{}, +// Map from notification name to its corresponding params. +var changeNotificationParams = map[string]func() Params{ + notificationToolListChanged: func() Params { return &ToolListChangedParams{} }, + notificationPromptListChanged: func() Params { return &PromptListChangedParams{} }, + notificationResourceListChanged: func() Params { return &ResourceListChangedParams{} }, } // How long to wait before sending a change notification. @@ -682,12 +681,48 @@ func (s *Server) changeAndNotify(notification string, change func() bool) { // notifySessions sends the notification n to all existing sessions. // It is called asynchronously by changeAndNotify. +// +// Legacy (pre-SEP-2575) sessions receive the notification on the shared +// session channel. Sessions speaking the new protocol receive it only if they +// have an active subscriptions/listen stream that opted in to this +// notification type. func (s *Server) notifySessions(n string) { s.mu.Lock() sessions := slices.Clone(s.sessions) s.pendingNotifications[n] = nil s.mu.Unlock() // Don't hold the lock during notification: it causes deadlock. - notifySessions(sessions, n, changeNotificationParams[n], s.opts.Logger) + + // Only add legacy sessions for the notification, new ones use the new notification mechanism. + var legacySessions []*ServerSession + for _, session := range sessions { + if session.InitializeParams().isNil() || session.InitializeParams().ProtocolVersion < protocolVersion20260630 { + legacySessions = append(legacySessions, session) + } + } + notifySessions(legacySessions, n, changeNotificationParams[n](), s.opts.Logger) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for _, session := range sessions { + for _, reqID := range session.subscribedListenIDs(n) { + params := changeNotificationParams[n]() + injectMetaSubscriptionID(params, reqID) + req := newRequest(session, params) + reqCtx := context.WithValue(ctx, idContextKey{}, reqID) + if err := handleNotify(reqCtx, n, req); err != nil { + s.opts.Logger.Warn(fmt.Sprintf("calling %s: %v", n, err)) + } + } + } +} + +func injectMetaSubscriptionID(params Params, reqID jsonrpc.ID) { + m := params.GetMeta() + if m == nil { + m = map[string]any{} + } + m[MetaKeySubscriptionID] = reqID.Raw() + params.SetMeta(m) } // shouldSendListChangedNotification checks if the server's capabilities allow @@ -1027,6 +1062,92 @@ func (s *Server) unsubscribe(ctx context.Context, req *UnsubscribeRequest) (*emp return &emptyResult{}, nil } +func (s *Server) subscriptionsListen(ctx context.Context, req *SubscriptionsListenRequest) (*emptyResult, error) { + requestID, ok := ctx.Value(idContextKey{}).(jsonrpc.ID) + if !ok || !requestID.IsValid() { + return nil, fmt.Errorf("%w: subscriptions/listen requires a request ID", jsonrpc2.ErrInvalidRequest) + } + + allowed := s.allowedSubscriptions(req.Params.Notifications) + req.Session.addSubscription(requestID, allowed) + defer req.Session.removeSubscription(requestID) + + ackParams := &SubscriptionsAcknowledgedParams{ + Notifications: allowed, + Meta: Meta{MetaKeySubscriptionID: requestID.Raw()}, + } + if err := req.Session.NotifySubscriptionAcked(ctx, ackParams); err != nil { + return nil, fmt.Errorf("sending subscriptions/acknowledged: %w", err) + } + + <-ctx.Done() + return &emptyResult{}, nil +} + +func (s *Server) allowedSubscriptions(want NotificationSubscriptions) NotificationSubscriptions { + caps := s.capabilities() + agreed := NotificationSubscriptions{} + if want.ToolsListChanged && caps.Tools != nil && caps.Tools.ListChanged { + agreed.ToolsListChanged = true + } + if want.PromptsListChanged && caps.Prompts != nil && caps.Prompts.ListChanged { + agreed.PromptsListChanged = true + } + if want.ResourcesListChanged && caps.Resources != nil && caps.Resources.ListChanged { + agreed.ResourcesListChanged = true + } + return agreed +} + +// addSubscription registers a subscriptions/listen stream on this session. +func (ss *ServerSession) addSubscription(id jsonrpc.ID, allowed NotificationSubscriptions) { + if !allowed.ToolsListChanged && !allowed.PromptsListChanged && !allowed.ResourcesListChanged { + return + } + ss.mu.Lock() + defer ss.mu.Unlock() + if ss.subscriptions == nil { + ss.subscriptions = make(map[jsonrpc.ID]*listenSubscription) + } + ss.subscriptions[id] = &listenSubscription{ + toolsListChanged: allowed.ToolsListChanged, + promptsListChanged: allowed.PromptsListChanged, + resourcesListChanged: allowed.ResourcesListChanged, + } +} + +func (ss *ServerSession) removeSubscription(id jsonrpc.ID) { + ss.mu.Lock() + defer ss.mu.Unlock() + delete(ss.subscriptions, id) +} + +// subscribedListenIDs returns the listen-request IDs that opted in to the +// given list-changed notification. +func (ss *ServerSession) subscribedListenIDs(notification string) []jsonrpc.ID { + ss.mu.Lock() + defer ss.mu.Unlock() + if len(ss.subscriptions) == 0 { + return nil + } + var ids []jsonrpc.ID + for id, sub := range ss.subscriptions { + var match bool + switch notification { + case notificationToolListChanged: + match = sub.toolsListChanged + case notificationPromptListChanged: + match = sub.promptsListChanged + case notificationResourceListChanged: + match = sub.resourcesListChanged + } + if match { + ids = append(ids, id) + } + } + return ids +} + // Run runs the server over the given transport, which must be persistent. // // Run blocks until the client terminates the connection or the provided @@ -1096,6 +1217,7 @@ func (s *Server) disconnect(cc *ServerSession) { for _, subscribedSessions := range s.resourceSubscriptions { delete(subscribedSessions, cc) } + s.opts.Logger.Info("server session disconnected", "session_id", cc.ID()) } @@ -1196,6 +1318,12 @@ func (ss *ServerSession) NotifyProgress(ctx context.Context, params *ProgressNot return handleNotify(ctx, notificationProgress, newServerRequest(ss, orZero[Params](params))) } +// NotifySubscriptionAcked sends a subscription acknowledged notification from the server to the client +// associated with this session. +func (ss *ServerSession) NotifySubscriptionAcked(ctx context.Context, params *SubscriptionsAcknowledgedParams) error { + return handleNotify(ctx, notificationSubscriptionsAck, newServerRequest(ss, orZero[Params](params))) +} + func newServerRequest[P Params](ss *ServerSession, params P) *ServerRequest[P] { return &ServerRequest[P]{Session: ss, Params: params} } @@ -1226,6 +1354,17 @@ type ServerSession struct { mu sync.Mutex state ServerSessionState + // subscriptions tracks the SEP-2575 subscriptions/listen streams opened by + // this session, keyed by the originating listen request ID. + subscriptions map[jsonrpc.ID]*listenSubscription +} + +// listenSubscription records the notification types a single +// subscriptions/listen stream has opted in to. +type listenSubscription struct { + toolsListChanged bool + promptsListChanged bool + resourcesListChanged bool } func (ss *ServerSession) updateState(mut func(*ServerSessionState)) { @@ -1497,6 +1636,7 @@ var serverMethodInfos = map[string]methodInfo{ methodReadResource: newServerMethodInfo(serverMethod((*Server).readResource), 0), methodSetLevel: newServerMethodInfo(serverSessionMethod((*ServerSession).setLevel), 0), methodSubscribe: newServerMethodInfo(serverMethod((*Server).subscribe), 0), + methodSubscriptionsListen: newServerMethodInfo(serverMethod((*Server).subscriptionsListen), 0), methodUnsubscribe: newServerMethodInfo(serverMethod((*Server).unsubscribe), 0), notificationCancelled: newServerMethodInfo(serverSessionMethod((*ServerSession).cancel), notification|missingParamsOK), notificationInitialized: newServerMethodInfo(serverSessionMethod((*ServerSession).initialized), notification|missingParamsOK), @@ -1573,7 +1713,7 @@ func (ss *ServerSession) handle(ctx context.Context, req *jsonrpc.Request) (any, } switch req.Method { - case methodInitialize, methodPing, notificationInitialized, methodSetLevel: + case methodInitialize, methodPing, notificationInitialized, methodSetLevel, methodSubscribe, methodUnsubscribe: if validatedMeta.usesNewProtocol { ss.server.opts.Logger.Error("method removed in the new protocol", "method", req.Method) return nil, &jsonrpc.Error{ diff --git a/mcp/streamable.go b/mcp/streamable.go index 8ff9cd1f..02c9a4fd 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -1365,6 +1365,7 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques calls := make(map[jsonrpc.ID]struct{}) tokenInfo := auth.TokenInfoFromContext(req.Context()) isInitialize := false + isSubscriptionsListen := false var initializeProtocolVersion string for _, msg := range incoming { if jreq, ok := msg.(*jsonrpc.Request); ok { @@ -1390,6 +1391,9 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques initializeProtocolVersion = params.ProtocolVersion } } + if jreq.Method == methodSubscriptionsListen { + isSubscriptionsListen = true + } // SEP-2575: requests carrying `_meta.protocolVersion` require the // Mcp-Protocol-Version HTTP header to be present and to match the // per-request `_meta.protocolVersion` value. @@ -1536,13 +1540,20 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques return } + // subscriptions/listen is inherently a long-lived SSE endpoint (SEP-2575): + // it has no synchronous result, the response stream stays open until the + // client cancels, and the server pushes notifications on it as they occur. + // Force SSE mode (bypassing JSONResponse) so the buffered application/json + // path doesn't deadlock waiting for a completion that won't come. + useSSE := !c.jsonResponse || isSubscriptionsListen + // Set response headers. Accept was checked in [StreamableHTTPHandler]. w.Header().Set("Cache-Control", "no-cache, no-transform") - if c.jsonResponse { - w.Header().Set("Content-Type", "application/json") - } else { + if useSSE { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Connection", "keep-alive") + } else { + w.Header().Set("Content-Type", "application/json") } if c.sessionID != "" && isInitialize { w.Header().Set(sessionIDHeader, c.sessionID) @@ -1553,7 +1564,7 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques done := make(chan struct{}) stream.done = done stream.protocolVersion = effectiveVersion - if c.jsonResponse { + if !useSSE { // JSON mode: collect messages in pendingJSONMessages until done. // Set pendingJSONMessages to a non-nil value to signal that this is an // application/json stream.