Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/mcp/server/fastmcp/utilities/func_metadata.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import inspect
import json
from collections.abc import Awaitable, Callable, Sequence
from functools import partial
from itertools import chain
from types import GenericAlias
from typing import Annotated, Any, cast, get_args, get_origin, get_type_hints

import anyio
import anyio.to_thread
import pydantic_core
from pydantic import (
BaseModel,
Expand Down Expand Up @@ -92,7 +95,7 @@ async def call_fn_with_arg_validation(
if fn_is_async:
return await fn(**arguments_parsed_dict)
else:
return fn(**arguments_parsed_dict)
return await anyio.to_thread.run_sync(partial(fn, **arguments_parsed_dict))

def convert_result(self, result: Any) -> Any:
"""Convert the result of a function call to the appropriate format for
Expand Down
80 changes: 80 additions & 0 deletions tests/server/fastmcp/test_func_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
# pyright: reportMissingParameterType=false
# pyright: reportUnknownArgumentType=false
# pyright: reportUnknownLambdaType=false
import threading
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import Annotated, Any, Final, NamedTuple, TypedDict

import annotated_types
import anyio
import pytest
from dirty_equals import IsPartialDict
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -1189,3 +1192,80 @@ def func_with_metadata() -> Annotated[int, Field(gt=1)]: ... # pragma: no branc

assert meta.output_schema is not None
assert meta.output_schema["properties"]["result"] == {"exclusiveMinimum": 1, "title": "Result", "type": "integer"}


@pytest.mark.anyio
async def test_sync_tool_does_not_block_event_loop() -> None:
"""Regression test: sync tools must not run inline on the event loop.

If sync tools run inline, this test will fail because `fast_probe`
won't get scheduled until after `time.sleep`.
"""

def slow_sync(x: int) -> int:
time.sleep(0.30) # intentionally blocks if run on event loop
return x + 1

md = func_metadata(slow_sync)

start = anyio.current_time()
fast_probe_elapsed: float | None = None
slow_result: int | None = None

async def run_slow() -> None:
nonlocal slow_result
# call_fn_with_arg_validation is the execution path used for tools
slow_result = await md.call_fn_with_arg_validation(
fn=slow_sync,
fn_is_async=False,
arguments_to_validate={"x": 1},
arguments_to_pass_directly=None,
)

async def fast_probe() -> None:
nonlocal fast_probe_elapsed
# If event loop is not blocked, this should run "immediately"
await anyio.sleep(0)
fast_probe_elapsed = anyio.current_time() - start

# Keep the whole test bounded even if something regresses badly
with anyio.fail_after(2):
async with anyio.create_task_group() as tg:
tg.start_soon(run_slow)
tg.start_soon(fast_probe)

assert slow_result == 2

assert fast_probe_elapsed is not None
# If slow_sync blocks the loop, this will be ~0.30s and fail.
# If slow_sync is offloaded, this should typically be a few ms.
assert fast_probe_elapsed < 0.10


@pytest.mark.anyio
async def test_sync_function_runs_in_worker_thread():
"""Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync,
instead of blocking the event loop thread.
"""

def blocking_sync(delay: float) -> int: # pragma: no cover
# Sleep to simulate a blocking sync tool
time.sleep(delay)
# Return the thread ID we are running on
return threading.get_ident()

meta = func_metadata(blocking_sync)

# This is the event loop thread ID (where the test itself is running)
loop_thread_id = threading.get_ident()

# Call the sync function through call_fn_with_arg_validation
result_thread_id = await meta.call_fn_with_arg_validation(
blocking_sync,
fn_is_async=False,
arguments_to_validate={"delay": 0.01},
arguments_to_pass_directly=None,
)

# The tool should have executed in a different worker thread
assert result_thread_id != loop_thread_id