Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ def streamable_http_app(
stateless_http: bool = False,
event_store: EventStore | None = None,
retry_interval: int | None = None,
session_idle_timeout: float | None = None,
transport_security: TransportSecuritySettings | None = None,
host: str = "127.0.0.1",
auth: AuthSettings | None = None,
Expand All @@ -461,6 +462,7 @@ def streamable_http_app(
app=self,
event_store=event_store,
retry_interval=retry_interval,
session_idle_timeout=session_idle_timeout,
json_response=json_response,
stateless=stateless_http,
security_settings=transport_security,
Expand Down
2 changes: 2 additions & 0 deletions src/mcp/server/mcpserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,7 @@ def streamable_http_app(
stateless_http: bool = False,
event_store: EventStore | None = None,
retry_interval: int | None = None,
session_idle_timeout: float | None = None,
transport_security: TransportSecuritySettings | None = None,
host: str = "127.0.0.1",
) -> Starlette:
Expand All @@ -1056,6 +1057,7 @@ def streamable_http_app(
stateless_http=stateless_http,
event_store=event_store,
retry_interval=retry_interval,
session_idle_timeout=session_idle_timeout,
transport_security=transport_security,
host=host,
auth=self.settings.auth,
Expand Down
19 changes: 19 additions & 0 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import logging
import math
import re
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Awaitable, Callable
Expand Down Expand Up @@ -171,6 +172,7 @@ def __init__(
] = {}
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
self._terminated = False
self._active_request_count = 0
# Idle timeout cancel scope; managed by the session manager.
self.idle_scope: anyio.CancelScope | None = None

Expand All @@ -179,6 +181,23 @@ def is_terminated(self) -> bool:
"""Check if this transport has been explicitly terminated."""
return self._terminated

def mark_request_started(self) -> None:
"""Suspend idle reaping while at least one HTTP request is in flight."""
self._active_request_count += 1
if self.idle_scope is not None:
self.idle_scope.deadline = math.inf

def mark_request_finished(self, idle_timeout_seconds: float | None) -> None:
"""Resume idle reaping once the last in-flight request completes."""
self._active_request_count = max(0, self._active_request_count - 1)
if (
idle_timeout_seconds is not None
and self.idle_scope is not None
and self._active_request_count == 0
and not self._terminated
):
self.idle_scope.deadline = anyio.current_time() + idle_timeout_seconds

def close_sse_stream(self, request_id: RequestId) -> None:
"""Close SSE connection for a specific request without terminating the stream.

Expand Down
23 changes: 17 additions & 6 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,14 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S
await response(scope, receive, send)
return
logger.debug("Session already exists, handling request directly")
# Push back idle deadline on activity
if transport.idle_scope is not None and self.session_idle_timeout is not None:
transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout # pragma: no cover
await transport.handle_request(scope, receive, send)
# Suspend idle reaping for the duration of the request so an in-flight
# request is never counted as an idle session; the deadline is pushed
# forward when the last concurrent request completes.
transport.mark_request_started()
try:
await transport.handle_request(scope, receive, send)
finally:
transport.mark_request_finished(self.session_idle_timeout)
return

if request_mcp_session_id is None:
Expand All @@ -251,7 +255,6 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S
async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORED) -> None:
async with http_transport.connect() as streams:
read_stream, write_stream = streams
task_status.started()
try:
# Use a cancel scope for idle timeout — when the
# deadline passes the scope cancels app.run() and
Expand All @@ -262,6 +265,10 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
idle_scope.deadline = anyio.current_time() + self.session_idle_timeout
http_transport.idle_scope = idle_scope

# Signal readiness only after idle_scope is attached so the
# first request (below) can suspend reaping without a race.
task_status.started()

with idle_scope:
await self.app.run(
read_stream,
Expand Down Expand Up @@ -297,7 +304,11 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
await self._task_group.start(run_server)

# Handle the HTTP request and return the response
await http_transport.handle_request(scope, receive, send)
http_transport.mark_request_started()
try:
await http_transport.handle_request(scope, receive, send)
finally:
http_transport.mark_request_finished(self.session_idle_timeout)
else:
# Unknown or expired session ID - return 404 per MCP spec
# TODO: Align error code once spec clarifies
Expand Down
72 changes: 72 additions & 0 deletions tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,75 @@ async def test_anonymous_session_accepts_anonymous_requests(
session_id = await _open_session(manager, None)

assert await _request_session(manager, session_id, None) != 404


# ---------------------------------------------------------------------------
# session_idle_timeout: API exposure + in-flight request protection (#2455)
# ---------------------------------------------------------------------------


def test_streamable_http_app_forwards_session_idle_timeout():
"""The high-level streamable_http_app() API exposes session_idle_timeout.

Previously the parameter only existed on StreamableHTTPSessionManager, forcing
users to drop down to manual session-manager wiring to configure idle reaping.
"""
app = Server("test-idle-expose")
app.streamable_http_app(session_idle_timeout=12.5)
assert app.session_manager.session_idle_timeout == 12.5


def test_streamable_http_app_session_idle_timeout_defaults_to_none():
app = Server("test-idle-default")
app.streamable_http_app()
assert app.session_manager.session_idle_timeout is None


@pytest.mark.anyio
async def test_mark_request_suspends_and_resumes_idle_reaping():
"""An in-flight request pushes the idle deadline to infinity until it completes.

Regression for #2455: a request actively being processed must not be counted as
an idle session. The deadline is restored only when the last concurrent request
finishes, and never moved if the session has no configured timeout.
"""
transport = StreamableHTTPServerTransport(mcp_session_id=None)
scope = anyio.CancelScope()
scope.deadline = 100.0
transport.idle_scope = scope

# Two overlapping requests: deadline stays suspended until both finish.
transport.mark_request_started()
assert scope.deadline == float("inf")
transport.mark_request_started()
assert scope.deadline == float("inf")

transport.mark_request_finished(idle_timeout_seconds=30.0)
# Still one request in flight -> still suspended.
assert scope.deadline == float("inf")

transport.mark_request_finished(idle_timeout_seconds=30.0)
# Last request done -> deadline re-armed into the future.
assert scope.deadline != float("inf")
assert scope.deadline > anyio.current_time()


@pytest.mark.anyio
async def test_mark_request_finished_is_noop_without_idle_timeout():
"""When no idle timeout is configured the deadline is left untouched."""
transport = StreamableHTTPServerTransport(mcp_session_id=None)
scope = anyio.CancelScope() # default deadline is +inf
transport.idle_scope = scope

transport.mark_request_started()
transport.mark_request_finished(idle_timeout_seconds=None)
assert scope.deadline == float("inf")


@pytest.mark.anyio
async def test_mark_request_finished_does_not_underflow():
"""Unbalanced finishes never drive the active-request counter negative."""
transport = StreamableHTTPServerTransport(mcp_session_id=None)
transport.mark_request_finished(idle_timeout_seconds=30.0)
transport.mark_request_finished(idle_timeout_seconds=30.0)
assert transport._active_request_count == 0
Loading