Skip to content

Commit 278b0a3

Browse files
committed
refactor: derive the initialize gate from connection data
Connection no longer stores an initialize_accepted flag or exposes lifecycle mutators; initialize_accepted is a read-only property derived from client_params/initialized, and the runner writes those fields directly, so the gate can never disagree with the recorded state.
1 parent c501226 commit 278b0a3

3 files changed

Lines changed: 108 additions & 113 deletions

File tree

src/mcp/server/connection.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ def _notification_params(payload: dict[str, Any] | None, meta: Meta | None) -> d
4242
class Connection(TypedServerRequestMixin):
4343
"""Per-client connection state and standalone-stream `Outbound`.
4444
45-
Constructed by `ServerRunner` once per connection. The peer-info fields are
46-
`None` until `initialize` completes; `initialized` is set then. In
47-
stateless deployments the runner sets `initialized` immediately and
48-
peer-info remains `None` (no handshake reaches a stateless connection).
45+
Constructed by `ServerRunner` once per connection. The peer-info fields
46+
are `None` until `initialize` completes; `initialized` is set later, when
47+
the client's `notifications/initialized` follow-up arrives. In stateless
48+
deployments the runner sets `initialized` immediately and peer-info
49+
remains `None` (no handshake reaches a stateless connection).
4950
"""
5051

5152
def __init__(self, outbound: Outbound, *, has_standalone_channel: bool, session_id: str | None = None) -> None:
@@ -57,20 +58,24 @@ def __init__(self, outbound: Outbound, *, has_standalone_channel: bool, session_
5758
"""The full `initialize` request params; `None` before initialization."""
5859
self.protocol_version: str | None = None
5960
self.initialized: anyio.Event = anyio.Event()
61+
"""Set when `notifications/initialized` arrives (matches TS `oninitialized`);
62+
the point from which the spec permits server-initiated requests beyond
63+
ping/logging. Pre-set on stateless connections."""
6064

6165
self.state: dict[str, Any] = {}
62-
"""Per-connection scratch state. Handlers and middleware may read and
63-
write freely; persists across requests on this connection."""
66+
"""Per-connection scratch state; persists across requests on this connection."""
6467

6568
self.exit_stack: AsyncExitStack = AsyncExitStack()
66-
"""Cleanup stack unwound by `ServerRunner` when the connection closes.
67-
68-
Push context managers (`await exit_stack.enter_async_context(...)`)
69-
or callbacks (`exit_stack.push_async_callback(...)`) from handlers or
70-
middleware to register per-connection teardown. Unwound LIFO after
71-
`dispatcher.run()` returns, shielded from cancellation. Exceptions
72-
raised by callbacks are logged and swallowed; they never propagate
73-
out of `ServerRunner.run()`."""
69+
"""Per-connection teardown, unwound LIFO (shielded) when the connection
70+
closes. Push cleanup from handlers or middleware; exceptions are logged
71+
and swallowed."""
72+
73+
@property
74+
def initialize_accepted(self) -> bool:
75+
"""True once the inbound request gate is open: `initialize` recorded the
76+
peer info, or the handshake completed outright (stateless birth, or a
77+
bare `notifications/initialized`). Derived, never stored."""
78+
return self.client_params is not None or self.initialized.is_set()
7479

7580
async def send_raw_request(
7681
self,

src/mcp/server/runner.py

Lines changed: 42 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,13 @@
6565
_INIT_EXEMPT: frozenset[str] = frozenset({"ping"})
6666

6767
_EXIT_STACK_CLOSE_TIMEOUT: float = 5
68-
"""Grace period in seconds for `connection.exit_stack` teardown in `run()`.
69-
70-
The unwind is shielded from outer cancellation so per-connection cleanup runs
71-
even when the server is being torn down; the shield must be bounded or a
72-
misbehaving cleanup callback would hang shutdown indefinitely."""
68+
"""Bound for the shielded exit-stack unwind in `run()`; a hung cleanup
69+
callback must not wedge shutdown."""
7370

7471

7572
def _extract_meta(params: Mapping[str, Any] | None) -> RequestParamsMeta | None:
76-
"""Lift `_meta` from raw params with the same key-aliasing pydantic applies.
77-
78-
`RequestParams` only declares `meta` (alias `_meta`) and `MCPModel` does
79-
not forbid extras, so this validate ignores everything else and never
80-
rejects on the caller's other fields. Returns `None` for absent or
81-
malformed `_meta` so context construction is independent of params
82-
validity (which `_on_request` checks separately).
83-
"""
73+
"""Lift `_meta` from raw params; `None` when absent or malformed, so
74+
context construction is independent of params validity."""
8475
if not params or "_meta" not in params:
8576
return None
8677
try:
@@ -149,11 +140,8 @@ def _dump_result(result: Any) -> dict[str, Any]:
149140
if result is None:
150141
return {}
151142
if isinstance(result, ErrorData):
152-
# The existing `BaseSession._send_response` treats a returned
153-
# `ErrorData` as a JSON-RPC error, not a success result. Handler
154-
# returns are converted inside `_on_request`'s `_inner` (so
155-
# `Server.middleware` observes the raise); this branch is the boundary
156-
# check for middleware that itself returns `ErrorData`.
143+
# ErrorData is a JSON-RPC error, not a success result. Handler returns
144+
# already raise in `_inner`; this catches middleware returning one.
157145
raise MCPError.from_error_data(result)
158146
if isinstance(result, BaseModel):
159147
return result.model_dump(by_alias=True, mode="json", exclude_none=True)
@@ -179,19 +167,15 @@ class ServerRunner(Generic[LifespanT]):
179167
connection: Connection = field(init=False)
180168
session: ServerSession = field(init=False)
181169
"""Connection-scoped: the same instance reaches every request as `ctx.session`."""
182-
_initialized: bool = field(init=False)
183170

184171
def __post_init__(self) -> None:
185-
self._initialized = self.stateless
186172
if self.init_options is None:
187173
self.init_options = self.server.create_initialization_options()
188174
self.connection = Connection(
189175
self.dispatcher, has_standalone_channel=self.has_standalone_channel, session_id=self.session_id
190176
)
191177
if self.stateless:
192-
# Keep the public event in lockstep with the gate flag so a handler
193-
# awaiting `connection.initialized` does not hang on a stateless
194-
# connection (where no `initialize` exchange ever arrives).
178+
# No handshake ever arrives on a stateless connection; born ready.
195179
self.connection.initialized.set()
196180
self.session = ServerSession(self.dispatcher, self.connection, stateless=self.stateless)
197181

@@ -214,11 +198,8 @@ async def run(self, *, task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STAT
214198
try:
215199
await self.connection.exit_stack.aclose()
216200
except Exception:
217-
# Top-level boundary: a cleanup callback raising must not
218-
# escape `run()` - it would crash stdio servers on a normal
219-
# disconnect and, via raise-in-finally, mask the original
220-
# exception from `dispatcher.run()` (including the
221-
# CancelledError that SHTTP idle-timeout teardown checks).
201+
# Raising here would mask dispatcher.run()'s exception and
202+
# crash stdio servers on normal disconnect.
222203
logger.exception("connection exit_stack cleanup raised")
223204
if scope.cancelled_caught:
224205
logger.warning(
@@ -245,66 +226,45 @@ async def _on_request(
245226
ctx = self._make_context(dctx, _extract_meta(params))
246227

247228
async def _inner() -> HandlerResult:
248-
# TODO(maxisbey): pinned compat. `BaseSession._receive_loop`
249-
# validates every inbound request against the spec `ClientRequest`
250-
# discriminated union *before* handler lookup, so a spec method
251-
# with malformed params surfaces as INVALID_PARAMS via the
252-
# dispatcher's ValidationError boundary even when no handler is
253-
# registered. v2 wanted to decouple the runner from the spec union;
254-
# revisit once the suite's divergence entry is resolved. Gated on
255-
# spec methods so custom methods registered via
256-
# `add_request_handler` still route (the existing server rejects
257-
# those too, but nothing pins that and routing is strictly better).
229+
# TODO(maxisbey): pinned compat: spec methods are validated against
230+
# the ClientRequest union before lookup, so malformed params are
231+
# INVALID_PARAMS even with no handler registered.
258232
if method in _SPEC_CLIENT_METHODS:
259233
payload: dict[str, Any] = {"method": method}
260234
if params is not None:
261235
payload["params"] = dict(params)
262236
client_request_adapter.validate_python(payload, by_name=False)
263-
# TODO(maxisbey): rework initialization into a pure incoming
264-
# middleware, and add an outgoing-middleware seam whose default
265-
# chain blocks server-to-client requests when the negotiated
266-
# protocol version requires initialization and the connection is
267-
# neither initialized nor stateless. Until then, note that
268-
# `initialize` is handled inline (the dispatcher's read loop is
269-
# parked until this whole call - middleware included - returns),
270-
# so awaiting a peer response anywhere on this path deadlocks the
271-
# connection.
237+
# TODO(maxisbey): the 2026-07-28 spec drops the handshake; this branch and
238+
# the gate become a per-version legacy path then. Initialize runs inline
239+
# (read loop parked), so awaiting the peer anywhere on this path deadlocks.
272240
if method == "initialize":
273241
return self._handle_initialize(params)
274-
if not self._initialized and method not in _INIT_EXEMPT:
275-
# TODO(maxisbey): pinned compat. The existing server has no
276-
# dedicated pre-init check; the request dies in ClientRequest
277-
# validation, so the client sees the generic invalid-params
278-
# shape.
242+
if not self.connection.initialize_accepted and method not in _INIT_EXEMPT:
243+
# Pinned compat: the same error shape the union validation produced.
279244
raise MCPError(code=INVALID_PARAMS, message="Invalid request parameters", data="")
280245
entry = self.server.get_request_handler(method)
281246
if entry is None:
282247
raise MCPError(code=METHOD_NOT_FOUND, message="Method not found")
283-
# ValidationError propagates; the dispatcher's exception boundary
284-
# maps it to INVALID_PARAMS. Absent wire params reach the handler
285-
# as None (matches the existing `Server._handle_request`, where
286-
# `req.params` is None for optional-params requests like
287-
# tools/list); the empty-dict validate is a required-field check
288-
# so a required-params model still surfaces as INVALID_PARAMS
289-
# rather than reaching the handler as None.
248+
# Absent params reach the handler as None; the empty-dict validate
249+
# still enforces required fields (pinned compat).
290250
if params is None:
291251
entry.params_type.model_validate({}, by_name=False)
292252
typed_params = None
293253
else:
294254
typed_params = entry.params_type.model_validate(params, by_name=False)
295255
result = await entry.handler(ctx, typed_params)
296256
if isinstance(result, ErrorData):
297-
# A handler-returned `ErrorData` is a JSON-RPC error, not a
298-
# success result (matches `BaseSession._send_response`). Raise
299-
# here, inside the middleware chain, so `Server.middleware`
300-
# observes the failure as a raised `MCPError` out of
301-
# `call_next()` per the `ServerMiddleware` contract instead of
302-
# a successful-looking `ErrorData` return.
257+
# Raise inside the chain so middleware observes the failure.
303258
raise MCPError.from_error_data(result)
304259
return result
305260

306261
call = self._compose_server_middleware(ctx, method, params, _inner)
307-
return _dump_result(await call())
262+
result = _dump_result(await call())
263+
if method == "initialize":
264+
# Commit only on chain success, so a middleware veto leaves no state.
265+
# Race-free: the read loop is parked until this call returns.
266+
self.connection.client_params, self.connection.protocol_version = self._negotiate_initialize(params)
267+
return result
308268

309269
async def _on_notify(
310270
self,
@@ -316,35 +276,24 @@ async def _on_notify(
316276

317277
async def _inner() -> None:
318278
if method == "notifications/initialized":
319-
# Validate against the spec params model *before* flipping the
320-
# init state, so a malformed initialized notification drops
321-
# like any other malformed notification and leaves the
322-
# connection uninitialized (the existing server validated the
323-
# full `ClientNotification` union before dispatch). On
324-
# success, fall through to the registry so a handler
325-
# registered for this method observes an initialized
326-
# connection.
279+
# Validate before committing so a malformed notification leaves
280+
# state untouched; then fall through so a registered handler
281+
# observes an initialized connection.
327282
if params is not None:
328283
try:
329284
NotificationParams.model_validate(params, by_name=False)
330285
except ValidationError:
331286
logger.warning("dropped %r: malformed params", method)
332287
return
333-
self._initialized = True
334288
self.connection.initialized.set()
335-
elif not self._initialized:
289+
elif not self.connection.initialize_accepted:
336290
logger.debug("dropped %s: received before initialization", method)
337291
return
338292
entry = self.server.get_notification_handler(method)
339293
if entry is None:
340294
logger.debug("no handler for notification %s", method)
341295
return
342-
# Absent wire params reach the handler as None, not an empty model
343-
# (matches the existing `Server._handle_notification`). The
344-
# empty-dict validate is a required-field check: a required-params
345-
# model (e.g. ProgressNotificationParams) takes the
346-
# malformed-params drop path instead of reaching a non-Optional
347-
# handler as None.
296+
# Same absent-params contract as requests.
348297
try:
349298
if params is None:
350299
entry.params_type.model_validate({}, by_name=False)
@@ -360,11 +309,8 @@ async def _inner() -> None:
360309
try:
361310
await call()
362311
except Exception:
363-
# Top-level boundary: a notification handler (or middleware)
364-
# crashing must not tear down the connection (it runs as a bare
365-
# task in the dispatcher's task group; an uncaught exception would
366-
# cancel every sibling, including the read loop and in-flight
367-
# requests). Middleware sees the raise out of `call_next()` first.
312+
# A crashing handler must not cancel the dispatcher's task group;
313+
# middleware saw the raise out of call_next() first.
368314
logger.exception("notification handler for %r raised", method)
369315

370316
def _compose_server_middleware(
@@ -407,17 +353,20 @@ def _make_context(
407353
close_standalone_sse_stream=close_standalone_sse_stream,
408354
)
409355

410-
def _handle_initialize(self, params: Mapping[str, Any] | None) -> InitializeResult:
356+
@staticmethod
357+
def _negotiate_initialize(params: Mapping[str, Any] | None) -> tuple[InitializeRequestParams, str]:
358+
"""Validate `initialize` params and pick the protocol version."""
411359
init = InitializeRequestParams.model_validate(params or {}, by_name=False)
412-
self.connection.client_params = init
413360
requested = init.protocol_version
414361
negotiated = requested if requested in SUPPORTED_PROTOCOL_VERSIONS else LATEST_PROTOCOL_VERSION
415-
self.connection.protocol_version = negotiated
416-
self._initialized = True
417-
self.connection.initialized.set()
362+
return init, negotiated
363+
364+
def _handle_initialize(self, params: Mapping[str, Any] | None) -> InitializeResult:
365+
"""Build the `initialize` result; state commits later in `_on_request`."""
366+
_, negotiated = self._negotiate_initialize(params)
418367
assert self.init_options is not None
419368
opts = self.init_options
420-
result = InitializeResult(
369+
return InitializeResult(
421370
protocol_version=negotiated,
422371
capabilities=opts.capabilities,
423372
server_info=Implementation(
@@ -430,4 +379,3 @@ def _handle_initialize(self, params: Mapping[str, Any] | None) -> InitializeResu
430379
),
431380
instructions=opts.instructions,
432381
)
433-
return result

0 commit comments

Comments
 (0)