Skip to content
42 changes: 42 additions & 0 deletions mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,28 @@ func (c *Client) Connect(ctx context.Context, t Transport, opts *ClientSessionOp
if hc, ok := cs.mcpConn.(clientConnection); ok {
hc.sessionUpdated(cs.state)
}
subscribeParams := &SubscriptionsListenParams{}
if c.opts.ToolListChangedHandler != nil {
subscribeParams.Notifications.ToolsListChanged = true
}
if c.opts.PromptListChangedHandler != nil {
subscribeParams.Notifications.PromptsListChanged = true
}
if c.opts.ResourceListChangedHandler != nil {
subscribeParams.Notifications.ResourcesListChanged = true
}
if subscribeParams.Notifications.ToolsListChanged ||
subscribeParams.Notifications.PromptsListChanged ||
subscribeParams.Notifications.ResourcesListChanged {
// The listen blocks until the server cancels it. Run it in
// a goroutine so Connect can return; ClientSession.Close
// cancels its context to send notifications/cancelled.
listenCtx, cancelListen := context.WithCancel(context.Background())
cs.listenCancel = cancelListen
go func() {
_ = cs.subscriptionsListen(listenCtx, subscribeParams)
}()
}
return cs, nil
}

Expand Down Expand Up @@ -416,6 +438,7 @@ type ClientSession struct {
conn *jsonrpc2.Connection
client *Client
keepaliveCancel context.CancelFunc
listenCancel context.CancelFunc
mcpConn Connection

// No mutex is (currently) required to guard the session state, because it is
Expand Down Expand Up @@ -498,6 +521,9 @@ func (cs *ClientSession) Close() error {
if cs.keepaliveCancel != nil {
cs.keepaliveCancel()
}
if cs.listenCancel != nil {
cs.listenCancel()
}
err := cs.conn.Close()

if cs.onClose != nil && cs.calledOnClose.CompareAndSwap(false, true) {
Expand Down Expand Up @@ -1079,6 +1105,7 @@ var clientMethodInfos = map[string]methodInfo{
notificationLoggingMessage: newClientMethodInfo(clientMethod((*Client).callLoggingHandler), notification),
notificationProgress: newClientMethodInfo(clientSessionMethod((*ClientSession).callProgressNotificationHandler), notification),
notificationElicitationComplete: newClientMethodInfo(clientMethod((*Client).callElicitationCompleteHandler), notification|missingParamsOK),
notificationSubscriptionsAck: newClientMethodInfo(clientMethod((*Client).callSubscriptionsAckHandler), notification|missingParamsOK),
}

func (cs *ClientSession) sendingMethodInfos() map[string]methodInfo {
Expand Down Expand Up @@ -1234,6 +1261,21 @@ func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribePar
return err
}

// SubscriptionsListen opens a SEP-2575 "subscriptions/listen" stream and
// blocks for the lifetime of the subscription. The server's first message on
// the stream is "notifications/subscriptions/acknowledged"; subsequent
// opted-in notifications (e.g. tools/list_changed) are delivered through the
// usual handlers registered in [ClientOptions].
func (cs *ClientSession) subscriptionsListen(ctx context.Context, params *SubscriptionsListenParams) error {
params = injectRequestMeta(cs, params)
_, err := handleSend[*emptyResult](ctx, methodSubscriptionsListen, newClientRequest(cs, orZero[Params](params)))
return err
}

func (c *Client) callSubscriptionsAckHandler(context.Context, *ClientRequest[*SubscriptionsAcknowledgedParams]) (Result, error) {
return nil, nil
}

func (c *Client) callToolChangedHandler(ctx context.Context, req *ToolListChangedRequest) (Result, error) {
if h := c.opts.ToolListChangedHandler; h != nil {
h(ctx, req)
Expand Down
201 changes: 201 additions & 0 deletions mcp/mcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"net/url"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -2449,3 +2451,202 @@ func TestSetErrorPreservesContent(t *testing.T) {
}

var ctrCmpOpts = []cmp.Option{cmpopts.IgnoreUnexported(CallToolResult{}, GetPromptResult{}, ReadResourceResult{})}

// runSubscriptionsListenTest exercises the SEP-2575 auto-listen flow end-to-end
// against the supplied transport pair. It captures every notification and the
// acknowledgment the client sees, then asserts:
//
// - the auto-listen issued by Client.Connect is acknowledged with a tagged
// subscription ID;
// - tool and prompt list-changed notifications are delivered to the matching
// handlers, each carrying the same subscription ID as the ack;
// - the subscription persists across multiple unrelated changes;
// - cs.Close() ends the subscription and further changes don't deliver.
func runSubscriptionsListenTest(t *testing.T, client *Client, server *Server, ct Transport, events chan subListenEvent) {
t.Helper()

ctx, topCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer topCancel()

cs, err := client.Connect(ctx, ct, &ClientSessionOptions{protocolVersion: protocolVersion20260630})
if err != nil {
t.Fatalf("client connect: %v", err)
}

waitFor := func(kind string) subListenEvent {
t.Helper()
select {
case e := <-events:
if e.kind != kind {
t.Fatalf("got event %q (id=%s), want kind %q", e.kind, e.id, kind)
}
return e
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for %q event", kind)
return subListenEvent{}
}
}
expectNoEvent := func(d time.Duration) {
t.Helper()
select {
case e := <-events:
t.Fatalf("unexpected event %q (id=%s)", e.kind, e.id)
case <-time.After(d):
}
}

ack := waitFor("ack")
if ack.id == "" {
t.Fatalf("acknowledgment missing subscription ID")
}

server.AddTool(&Tool{Name: "t2", InputSchema: &jsonschema.Schema{Type: "object"}}, nil)
if e := waitFor("tool"); e.id != ack.id {
t.Errorf("first tool notif id = %s, want %s", e.id, ack.id)
}

server.AddPrompt(&Prompt{Name: "p2"}, nil)
if e := waitFor("prompt"); e.id != ack.id {
t.Errorf("first prompt notif id = %s, want %s", e.id, ack.id)
}

server.AddTool(&Tool{Name: "t3", InputSchema: &jsonschema.Schema{Type: "object"}}, nil)
if e := waitFor("tool"); e.id != ack.id {
t.Errorf("second tool notif id = %s, want %s", e.id, ack.id)
}
server.AddPrompt(&Prompt{Name: "p3"}, nil)
if e := waitFor("prompt"); e.id != ack.id {
t.Errorf("second prompt notif id = %s, want %s", e.id, ack.id)
}
expectNoEvent(notificationDelay * 5)

cs.Close()
time.Sleep(50 * time.Millisecond)

server.AddTool(&Tool{Name: "t4", InputSchema: &jsonschema.Schema{Type: "object"}}, nil)
server.AddPrompt(&Prompt{Name: "p4"}, nil)
expectNoEvent(notificationDelay * 20)
}

type subListenEvent struct {
kind string // "ack", "tool", "prompt"
id string // subscription ID from _meta, stringified for cross-encoding equality
}

// newSubListenClient returns a client wired to push every ack and every
// list-changed notification it receives into events, tagged with the kind
// and the subscription ID extracted from _meta.
func newSubListenClient(events chan subListenEvent) *Client {
asEvent := func(kind string, raw any) subListenEvent {
return subListenEvent{kind, fmt.Sprint(raw)}
}
c := NewClient(testImpl, &ClientOptions{
ToolListChangedHandler: func(_ context.Context, req *ToolListChangedRequest) {
events <- asEvent("tool", req.Params.Meta[MetaKeySubscriptionID])
},
PromptListChangedHandler: func(_ context.Context, req *PromptListChangedRequest) {
events <- asEvent("prompt", req.Params.Meta[MetaKeySubscriptionID])
},
})
c.AddReceivingMiddleware(func(next MethodHandler) MethodHandler {
return func(ctx context.Context, method string, req Request) (Result, error) {
if method == notificationSubscriptionsAck {
if cr, ok := req.(*ClientRequest[*SubscriptionsAcknowledgedParams]); ok && cr.Params != nil {
events <- asEvent("ack", cr.Params.Meta[MetaKeySubscriptionID])
}
}
return next(ctx, method, req)
}
})
return c
}

func newSubListenServer() *Server {
s := NewServer(testImpl, nil)
AddTool(s, &Tool{Name: "t1"}, sayHi)
s.AddPrompt(&Prompt{Name: "p1"}, nil)
return s
}

func enableNewProtocol(t *testing.T) {
t.Helper()
orig := supportedProtocolVersions
supportedProtocolVersions = append([]string{protocolVersion20260630}, slices.Clone(orig)...)
t.Cleanup(func() { supportedProtocolVersions = orig })
}

// TestSubscriptionsListen_InMemory exercises the listen flow over the
// session-shared in-memory transport (semantically equivalent to STDIO).
// Cancellation here propagates via notifications/cancelled.
func TestSubscriptionsListen_InMemory(t *testing.T) {
enableNewProtocol(t)
events := make(chan subListenEvent, 64)
server := newSubListenServer()
ct, st := NewInMemoryTransports()
ss, err := server.Connect(context.Background(), st, nil)
if err != nil {
t.Fatalf("server connect: %v", err)
}
defer ss.Close()
runSubscriptionsListenTest(t, newSubListenClient(events), server, ct, events)
}

// TestSubscriptionsListen_Streamable exercises the listen flow over a
// stateless HTTP server (SEP-2575). Each listen rides its own SSE response
// stream; cs.Close() tears it down.
func TestSubscriptionsListen_Streamable(t *testing.T) {
enableNewProtocol(t)
events := make(chan subListenEvent, 64)
server := newSubListenServer()
handler := NewStreamableHTTPHandler(
func(*http.Request) *Server { return server },
&StreamableHTTPOptions{Stateless: true},
)
httpServer := httptest.NewServer(mustNotPanic(t, handler))
defer httpServer.Close()
runSubscriptionsListenTest(t, newSubListenClient(events), server,
&StreamableClientTransport{Endpoint: httpServer.URL}, events)
}

// TestSubscriptionsListen_NoHandlersNoListen verifies that a new-protocol
// client without any list-changed handlers registered does not open an
// auto-listen on connect, and therefore does not receive any acknowledgment
// or downstream notifications.
func TestSubscriptionsListen_NoHandlersNoListen(t *testing.T) {
enableNewProtocol(t)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

events := make(chan subListenEvent, 8)
server := newSubListenServer()
ct, st := NewInMemoryTransports()
ss, err := server.Connect(ctx, st, nil)
if err != nil {
t.Fatalf("server connect: %v", err)
}
defer ss.Close()

c := NewClient(testImpl, nil)
c.AddReceivingMiddleware(func(next MethodHandler) MethodHandler {
return func(ctx context.Context, method string, req Request) (Result, error) {
if method == notificationSubscriptionsAck {
events <- subListenEvent{"ack", ""}
}
return next(ctx, method, req)
}
})
cs, err := c.Connect(ctx, ct, &ClientSessionOptions{protocolVersion: protocolVersion20260630})
if err != nil {
t.Fatalf("client connect: %v", err)
}
defer cs.Close()

server.AddTool(&Tool{Name: "t2", InputSchema: &jsonschema.Schema{Type: "object"}}, nil)

select {
case e := <-events:
t.Fatalf("unexpected event %q on no-handler client", e.kind)
case <-time.After(notificationDelay * 10):
}
}
48 changes: 48 additions & 0 deletions mcp/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,49 @@ type ResourceUpdatedNotificationParams struct {
func (x *ResourceUpdatedNotificationParams) isParams() {}
func (x *ResourceUpdatedNotificationParams) isNil() bool { return x == nil }

// NotificationSubscriptions describes the set of server-to-client
// notifications a client wishes to receive on a [SubscriptionsListenParams]
// stream. Each field is an explicit opt-in: a server MUST NOT push
// notifications of a type the client did not request.
type NotificationSubscriptions struct {
// ToolsListChanged opts in to "notifications/tools/list_changed".
ToolsListChanged bool `json:"toolsListChanged,omitempty"`
// PromptsListChanged opts in to "notifications/prompts/list_changed".
PromptsListChanged bool `json:"promptsListChanged,omitempty"`
// ResourcesListChanged opts in to "notifications/resources/list_changed".
ResourcesListChanged bool `json:"resourcesListChanged,omitempty"`
// ResourceSubscriptions enumerates the resource URIs for which the client
// wants "notifications/resources/updated". Replaces the legacy
// resources/subscribe RPC.
ResourceSubscriptions []string `json:"resourceSubscriptions,omitempty"`
}

// SubscriptionsListenParams are the parameters for the
// "subscriptions/listen" RPC.
type SubscriptionsListenParams struct {
// Meta carries the per-request `_meta` triple.
Meta `json:"_meta,omitempty"`
// Notifications declares which notification types the client wants to
// receive on this stream.
Notifications NotificationSubscriptions `json:"notifications"`
}

func (x *SubscriptionsListenParams) isParams() {}
func (x *SubscriptionsListenParams) isNil() bool { return x == nil }

// SubscriptionsAcknowledgedParams are the parameters for the
// "notifications/subscriptions/acknowledged" notification, which the server
// MUST send as the first message on a subscriptions/listen stream. It carries
// the subset of the requested [NotificationSubscriptions] that the server has
// agreed to honor.
type SubscriptionsAcknowledgedParams struct {
Meta `json:"_meta,omitempty"`
Notifications NotificationSubscriptions `json:"notifications"`
}

func (x *SubscriptionsAcknowledgedParams) isParams() {}
func (x *SubscriptionsAcknowledgedParams) isNil() bool { return x == nil }

// TODO(jba): add CompleteRequest and related types.

// A request from the server to elicit additional information from the user via the client.
Expand Down Expand Up @@ -2086,8 +2129,10 @@ const (
notificationRootsListChanged = "notifications/roots/list_changed"
methodSetLevel = "logging/setLevel"
methodSubscribe = "resources/subscribe"
methodSubscriptionsListen = "subscriptions/listen"
notificationToolListChanged = "notifications/tools/list_changed"
methodUnsubscribe = "resources/unsubscribe"
notificationSubscriptionsAck = "notifications/subscriptions/acknowledged"
)

// Per-request _meta field names for the >= 2026-06-30 protocol version.
Expand All @@ -2104,6 +2149,9 @@ const (
MetaKeyClientCapabilities = "io.modelcontextprotocol/clientCapabilities"
// MetaKeyLogLevel identifies the desired log level for the request.
MetaKeyLogLevel = "io.modelcontextprotocol/logLevel"
// MetaKeySubscriptionID identifies the subscriptions/listen request that an
// out-of-band notification belongs to.
MetaKeySubscriptionID = "io.modelcontextprotocol/subscriptionId"
)

// UnsupportedProtocolVersionData is the SEP-2575 payload carried in the
Expand Down
1 change: 1 addition & 0 deletions mcp/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type (
ReadResourceRequest = ServerRequest[*ReadResourceParams]
RootsListChangedRequest = ServerRequest[*RootsListChangedParams]
SubscribeRequest = ServerRequest[*SubscribeParams]
SubscriptionsListenRequest = ServerRequest[*SubscriptionsListenParams]
UnsubscribeRequest = ServerRequest[*UnsubscribeParams]
)

Expand Down
Loading
Loading