Skip to content

Commit ac5ab57

Browse files
committed
feat: Server.run drives JSONRPCDispatcher + ServerRunner; ServerSession is a dispatcher proxy
The swap. Production server traffic now flows through the dispatcher/runner path; BaseSession is no longer reached on the server side. ServerRequestContext: standalone dataclass (drops RequestContext base, inlines session/request_id/meta). ServerMiddleware retyped to take it; _MwLifespanT no longer contravariant while the ctx is the invariant mutable dataclass (TODO marked). Connection: client_params holds the full InitializeRequestParams; client_info / client_capabilities are read-through properties. ServerSession: rewritten as a connection-scoped proxy over JSONRPCDispatcher + Connection. send_request / send_notification model-dump and forward to dispatcher.send_raw_request / notify, threading related_request_id so SHTTP routing is unchanged. The typed helpers (create_message, elicit_*, send_log_message, send_*_list_changed, list_roots, send_ping, send_progress_notification, send_elicit_complete, check_client_capability) are kept verbatim. Deleted: the BaseSession-derived receive loop, _received_*, incoming_messages, InitializationState, ServerRequestResponder, send_message, the tasks-only _build_* helpers. ServerRunner: dispatcher is JSONRPCDispatcher concretely (the ServerSession shim needs its _related_request_id kwarg). __post_init__ builds the connection-scoped session. _make_context builds ServerRequestContext from dctx.request_id and dctx.message_metadata (the same isinstance(ServerMessageMetadata) narrow the previous Server._handle_request did). _handle_initialize sets connection.client_params. Both cast(Any, entry.handler) and the getattr(typed_params, 'meta', ...) are gone (meta read via isinstance(typed_params, RequestParams)). Server import is under TYPE_CHECKING to break the cycle with lowlevel/server. Server.run(): builds JSONRPCDispatcher(read, write, raise_handler_exceptions=...) and ServerRunner(..., dispatch_middleware=[otel_middleware]) inside the lifespan, then awaits runner.run(). _handle_message / _handle_request / _handle_notification deleted.
1 parent 94b3ce9 commit ac5ab57

5 files changed

Lines changed: 173 additions & 569 deletions

File tree

src/mcp/server/connection.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from mcp.shared.dispatcher import CallOptions, Outbound
2525
from mcp.shared.exceptions import NoBackChannelError
2626
from mcp.shared.peer import Meta, dump_params
27-
from mcp.types import ClientCapabilities, Implementation, LoggingLevel
27+
from mcp.types import ClientCapabilities, Implementation, InitializeRequestParams, LoggingLevel
2828

2929
__all__ = ["Connection"]
3030

@@ -51,8 +51,8 @@ def __init__(self, outbound: Outbound, *, has_standalone_channel: bool, session_
5151
self.has_standalone_channel = has_standalone_channel
5252
self.session_id: str | None = session_id
5353

54-
self.client_info: Implementation | None = None
55-
self.client_capabilities: ClientCapabilities | None = None
54+
self.client_params: InitializeRequestParams | None = None
55+
"""The full `initialize` request params; `None` before initialization."""
5656
self.protocol_version: str | None = None
5757
self.initialized: anyio.Event = anyio.Event()
5858

@@ -68,6 +68,16 @@ def __init__(self, outbound: Outbound, *, has_standalone_channel: bool, session_
6868
middleware to register per-connection teardown. Unwound LIFO after
6969
`dispatcher.run()` returns, shielded from cancellation."""
7070

71+
@property
72+
def client_info(self) -> Implementation | None:
73+
"""The client's `Implementation` from `initialize`; `None` before initialization."""
74+
return self.client_params.client_info if self.client_params is not None else None
75+
76+
@property
77+
def client_capabilities(self) -> ClientCapabilities | None:
78+
"""The client's `ClientCapabilities` from `initialize`; `None` before initialization."""
79+
return self.client_params.capabilities if self.client_params is not None else None
80+
7181
async def send_raw_request(
7282
self,
7383
method: str,

src/mcp/server/context.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,32 @@
99

1010
from mcp.server._typed_request import TypedServerRequestMixin
1111
from mcp.server.connection import Connection
12-
from mcp.server.experimental.request_context import Experimental
1312
from mcp.server.session import ServerSession
14-
from mcp.shared._context import RequestContext
1513
from mcp.shared.context import BaseContext
1614
from mcp.shared.dispatcher import DispatchContext
1715
from mcp.shared.message import CloseSSEStreamCallback
1816
from mcp.shared.peer import Meta, PeerMixin
1917
from mcp.shared.transport_context import TransportContext
20-
from mcp.types import LoggingLevel, RequestParamsMeta
18+
from mcp.types import LoggingLevel, RequestId, RequestParamsMeta
2119

2220
LifespanContextT = TypeVar("LifespanContextT", default=dict[str, Any])
2321
RequestT = TypeVar("RequestT", default=Any)
2422

2523

2624
@dataclass(kw_only=True)
27-
class ServerRequestContext(RequestContext[ServerSession], Generic[LifespanContextT, RequestT]):
25+
class ServerRequestContext(Generic[LifespanContextT, RequestT]):
26+
"""Per-request context handed to lowlevel request and notification handlers.
27+
28+
Built by `ServerRunner._make_context` for each inbound message. Carries the
29+
connection-scoped `ServerSession` (server-to-client requests and
30+
notifications), per-request metadata, and any per-message data the
31+
transport attached (the HTTP request, SSE stream-close callbacks).
32+
"""
33+
34+
session: ServerSession
2835
lifespan_context: LifespanContextT
29-
experimental: Experimental
36+
request_id: RequestId | None = None
37+
meta: RequestParamsMeta | None = None
3038
request: RequestT | None = None
3139
close_sse_stream: CloseSSEStreamCallback | None = None
3240
close_standalone_sse_stream: CloseSSEStreamCallback | None = None
@@ -107,26 +115,32 @@ async def log(self, level: LoggingLevel, data: Any, logger: str | None = None, *
107115

108116
CallNext = Callable[[], Awaitable[HandlerResult]]
109117

110-
_MwLifespanT = TypeVar("_MwLifespanT", contravariant=True)
118+
_MwLifespanT = TypeVar("_MwLifespanT")
111119

112120

113121
class ServerMiddleware(Protocol[_MwLifespanT]):
114122
"""Context-tier middleware: `(ctx, method, typed_params, call_next) -> result`.
115123
116124
Runs *inside* `ServerRunner._on_request` after params validation and
117-
`Context` construction. Wraps registered handlers (including `ping`) but
125+
context construction. Wraps registered handlers (including `ping`) but
118126
not `initialize`, `METHOD_NOT_FOUND`, or validation failures. Listed
119127
outermost-first on `Server.middleware`.
120128
121129
`Server[L].middleware` holds `ServerMiddleware[L]`, so an app-specific
122-
middleware sees `ctx.lifespan: L`. A reusable middleware can be typed
123-
`ServerMiddleware[object]` - `Context` is covariant in `LifespanT`, so it
124-
registers on any `Server[L]`.
130+
middleware sees `ctx.lifespan_context: L`. While the context is the
131+
mutable `ServerRequestContext` dataclass it is invariant in `L`, so a
132+
reusable middleware should be typed `ServerMiddleware[Any]` to register on
133+
any `Server[L]`.
125134
"""
126135

136+
# TODO(maxisbey): once `_make_context` returns the (covariant) `Context[L]`
137+
# again, restore `_MwLifespanT` to `contravariant=True` and retype `ctx`
138+
# below to `Context[_MwLifespanT]` so reusable middleware can be
139+
# `ServerMiddleware[object]` instead of `ServerMiddleware[Any]`.
140+
127141
async def __call__(
128142
self,
129-
ctx: Context[_MwLifespanT],
143+
ctx: ServerRequestContext[_MwLifespanT, Any],
130144
method: str,
131145
params: BaseModel,
132146
call_next: CallNext,

src/mcp/server/lowlevel/server.py

Lines changed: 22 additions & 201 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,13 @@ async def main():
3636

3737
from __future__ import annotations
3838

39-
import contextvars
4039
import logging
41-
import warnings
4240
from collections.abc import AsyncIterator, Awaitable, Callable
43-
from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager
41+
from contextlib import AbstractAsyncContextManager, asynccontextmanager
4442
from dataclasses import dataclass
4543
from importlib.metadata import version as importlib_version
46-
from typing import Any, Generic, cast
44+
from typing import Any, Generic
4745

48-
import anyio
49-
from opentelemetry.trace import SpanKind, StatusCode
5046
from pydantic import BaseModel
5147
from starlette.applications import Starlette
5248
from starlette.middleware import Middleware
@@ -61,18 +57,16 @@ async def main():
6157
from mcp.server.auth.routes import build_resource_metadata_url, create_auth_routes, create_protected_resource_routes
6258
from mcp.server.auth.settings import AuthSettings
6359
from mcp.server.context import HandlerResult, ServerMiddleware, ServerRequestContext
64-
from mcp.server.experimental.request_context import Experimental
6560
from mcp.server.lowlevel.experimental import ExperimentalHandlers
6661
from mcp.server.models import InitializationOptions
67-
from mcp.server.session import ServerSession
62+
from mcp.server.runner import ServerRunner, otel_middleware
6863
from mcp.server.streamable_http import EventStore
6964
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
7065
from mcp.server.transport_security import TransportSecuritySettings
71-
from mcp.shared._otel import extract_trace_context, otel_span
7266
from mcp.shared._stream_protocols import ReadStream, WriteStream
73-
from mcp.shared.exceptions import MCPError
74-
from mcp.shared.message import ServerMessageMetadata, SessionMessage
75-
from mcp.shared.session import RequestResponder
67+
from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher
68+
from mcp.shared.message import SessionMessage
69+
from mcp.shared.transport_context import TransportContext
7670

7771
logger = logging.getLogger(__name__)
7872

@@ -432,196 +426,23 @@ async def run(
432426
# the initialization lifecycle, but can do so with any available node
433427
# rather than requiring initialization for each connection.
434428
stateless: bool = False,
435-
):
436-
async with AsyncExitStack() as stack:
437-
lifespan_context = await stack.enter_async_context(self.lifespan(self))
438-
session = await stack.enter_async_context(
439-
ServerSession(
440-
read_stream,
441-
write_stream,
442-
initialization_options,
443-
stateless=stateless,
444-
)
445-
)
446-
447-
# Configure task support for this session if enabled
448-
task_support = self._experimental_handlers.task_support if self._experimental_handlers else None
449-
if task_support is not None:
450-
task_support.configure_session(session)
451-
await stack.enter_async_context(task_support.run())
452-
453-
async with anyio.create_task_group() as tg:
454-
try:
455-
async for message in session.incoming_messages:
456-
logger.debug("Received message: %s", message)
457-
458-
if isinstance(message, RequestResponder) and message.context is not None:
459-
context = message.context
460-
else:
461-
context = contextvars.copy_context()
462-
463-
context.run(
464-
tg.start_soon,
465-
self._handle_message,
466-
message,
467-
session,
468-
lifespan_context,
469-
raise_exceptions,
470-
)
471-
finally:
472-
# Transport closed: cancel in-flight handlers. Without this the
473-
# TG join waits for them, and when they eventually try to
474-
# respond they hit a closed write stream (the session's
475-
# _receive_loop closed it when the read stream ended).
476-
tg.cancel_scope.cancel()
477-
478-
async def _handle_message(
479-
self,
480-
message: RequestResponder[types.ClientRequest, types.ServerResult] | types.ClientNotification | Exception,
481-
session: ServerSession,
482-
lifespan_context: LifespanResultT,
483-
raise_exceptions: bool = False,
484-
):
485-
with warnings.catch_warnings(record=True) as w:
486-
match message:
487-
case RequestResponder() as responder:
488-
with responder:
489-
await self._handle_request(
490-
message, responder.request, session, lifespan_context, raise_exceptions
491-
)
492-
case Exception():
493-
logger.error(f"Received exception from stream: {message}")
494-
if raise_exceptions:
495-
raise message
496-
case _:
497-
await self._handle_notification(message, session, lifespan_context)
498-
499-
for warning in w: # pragma: lax no cover
500-
logger.info("Warning: %s: %s", warning.category.__name__, warning.message)
501-
502-
async def _handle_request(
503-
self,
504-
message: RequestResponder[types.ClientRequest, types.ServerResult],
505-
req: types.ClientRequest,
506-
session: ServerSession,
507-
lifespan_context: LifespanResultT,
508-
raise_exceptions: bool,
509-
):
510-
logger.info("Processing request of type %s", type(req).__name__)
511-
512-
target = getattr(req.params, "name", None) if req.params else None
513-
span_name = f"MCP handle {req.method} {target}" if target else f"MCP handle {req.method}"
514-
515-
# Extract W3C trace context from _meta (SEP-414).
516-
meta = cast(dict[str, Any] | None, getattr(req.params, "meta", None)) if req.params else None
517-
parent_context = extract_trace_context(meta) if meta is not None else None
518-
519-
with otel_span(
520-
span_name,
521-
kind=SpanKind.SERVER,
522-
attributes={"mcp.method.name": req.method, "jsonrpc.request.id": message.request_id},
523-
context=parent_context,
524-
) as span:
525-
if entry := self._request_handlers.get(req.method):
526-
handler = entry.handler
527-
logger.debug("Dispatching request of type %s", type(req).__name__)
528-
529-
try:
530-
# Extract request context and close_sse_stream from message metadata
531-
request_data = None
532-
close_sse_stream_cb = None
533-
close_standalone_sse_stream_cb = None
534-
if message.message_metadata is not None and isinstance(
535-
message.message_metadata, ServerMessageMetadata
536-
):
537-
request_data = message.message_metadata.request_context
538-
close_sse_stream_cb = message.message_metadata.close_sse_stream
539-
close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream
540-
541-
client_capabilities = session.client_params.capabilities if session.client_params else None
542-
task_support = self._experimental_handlers.task_support if self._experimental_handlers else None
543-
# Get task metadata from request params if present
544-
task_metadata = None
545-
if hasattr(req, "params") and req.params is not None: # pragma: no branch
546-
task_metadata = getattr(req.params, "task", None)
547-
ctx = ServerRequestContext(
548-
request_id=message.request_id,
549-
meta=message.request_meta,
550-
session=session,
551-
lifespan_context=lifespan_context,
552-
experimental=Experimental(
553-
task_metadata=task_metadata,
554-
_client_capabilities=client_capabilities,
555-
_session=session,
556-
_task_support=task_support,
557-
),
558-
request=request_data,
559-
close_sse_stream=close_sse_stream_cb,
560-
close_standalone_sse_stream=close_standalone_sse_stream_cb,
561-
)
562-
response = await handler(ctx, req.params)
563-
except MCPError as err:
564-
response = err.error
565-
except anyio.get_cancelled_exc_class():
566-
if message.cancelled:
567-
# Client sent CancelledNotification; responder.cancel() already
568-
# sent an error response, so skip the duplicate.
569-
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
570-
return
571-
# Transport-close cancellation from the TG in run(); re-raise so the
572-
# TG swallows its own cancellation.
573-
raise
574-
except Exception as err:
575-
if raise_exceptions: # pragma: no cover
576-
raise err
577-
response = types.ErrorData(code=0, message=str(err))
578-
else:
579-
response = types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")
580-
581-
if isinstance(response, types.ErrorData) and span is not None:
582-
span.set_status(StatusCode.ERROR, response.message)
583-
584-
try:
585-
# TODO: cast goes away when `_handle_request` is deleted.
586-
await message.respond(cast(types.ServerResult | types.ErrorData, response))
587-
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
588-
# Transport closed between handler unblocking and respond. Happens
589-
# when _receive_loop's finally wakes a handler blocked on
590-
# send_request: the handler runs to respond() before run()'s TG
591-
# cancel fires, but after the write stream closed. Closed if our
592-
# end closed (_receive_loop's async-with exit); Broken if the peer
593-
# end closed first (streamable_http terminate()).
594-
logger.debug("Response for %s dropped - transport closed", message.request_id)
595-
return
596-
597-
logger.debug("Response sent")
598-
599-
async def _handle_notification(
600-
self,
601-
notify: types.ClientNotification,
602-
session: ServerSession,
603-
lifespan_context: LifespanResultT,
604429
) -> None:
605-
if entry := self._notification_handlers.get(notify.method):
606-
handler = entry.handler
607-
logger.debug("Dispatching notification of type %s", type(notify).__name__)
608-
609-
try:
610-
client_capabilities = session.client_params.capabilities if session.client_params else None
611-
task_support = self._experimental_handlers.task_support if self._experimental_handlers else None
612-
ctx = ServerRequestContext(
613-
session=session,
614-
lifespan_context=lifespan_context,
615-
experimental=Experimental(
616-
task_metadata=None,
617-
_client_capabilities=client_capabilities,
618-
_session=session,
619-
_task_support=task_support,
620-
),
621-
)
622-
await handler(ctx, notify.params)
623-
except Exception: # pragma: no cover
624-
logger.exception("Uncaught exception in notification handler")
430+
async with self.lifespan(self) as lifespan_context:
431+
dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(
432+
read_stream,
433+
write_stream,
434+
raise_handler_exceptions=raise_exceptions,
435+
)
436+
runner = ServerRunner(
437+
server=self,
438+
dispatcher=dispatcher,
439+
lifespan_state=lifespan_context,
440+
init_options=initialization_options,
441+
has_standalone_channel=True,
442+
stateless=stateless,
443+
dispatch_middleware=[otel_middleware],
444+
)
445+
await runner.run()
625446

626447
def streamable_http_app(
627448
self,

0 commit comments

Comments
 (0)