Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/packages/kagent-adk/src/kagent/adk/_a2a.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#! /usr/bin/env python3
import asyncio
import faulthandler
import logging
import os
Expand All @@ -20,7 +21,6 @@
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

from kagent.core.a2a import (
KAgentRequestContextBuilder,
KAgentTaskStore,
Expand Down Expand Up @@ -170,6 +170,7 @@ def create_runner() -> Runner:
# Health check/readiness probe
app.add_route("/health", methods=["GET"], route=health_check)
app.add_route("/thread_dump", methods=["GET"], route=thread_dump)

a2a_app.add_routes_to_app(app)

return app
Expand Down
87 changes: 87 additions & 0 deletions python/packages/kagent-adk/src/kagent/adk/_mcp_toolset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,90 @@
from typing import Optional

from google.adk.tools import BaseTool
from google.adk.tools.mcp_tool.mcp_session_manager import MCPSessionManager
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset, ReadonlyContext
from mcp import ClientSession
from mcp.shared.exceptions import McpError

logger = logging.getLogger("kagent_adk." + __name__)

# Short timeouts to fail fast on the request path; avoid adding latency when session is valid.
_PING_TIMEOUT_SECONDS = 1.0
_SESSION_REVALIDATE_TIMEOUT_SECONDS = 2.0
_JSONRPC_METHOD_NOT_FOUND = -32601


def _is_server_alive_error(exc: Exception) -> bool:
"""Return True if the error proves the server is reachable.

Some MCP servers don't implement the optional ``ping`` method and
reply with JSON-RPC "Method not found" (-32601). This still means
the session is valid and the server is responding.
"""
if isinstance(exc, McpError):
return exc.error.code == _JSONRPC_METHOD_NOT_FOUND
return False


def _is_session_invalid_error(exc: Exception) -> bool:
"""Return True if the error indicates the MCP session is no longer valid (e.g. 404)."""
parts = [str(exc)]
if isinstance(exc, McpError) and exc.error.message:
parts.append(exc.error.message)
msg = " ".join(parts).lower()
return "404" in msg or "session terminated" in msg


class KAgentMCPSessionManager(MCPSessionManager):
"""Session manager that validates cached sessions via ping and list_tools before reuse.

The upstream ``MCPSessionManager`` checks ``_read_stream._closed`` /
``_write_stream._closed`` to decide whether a cached session is still
usable. Those are in-memory anyio channels that stay open even when
the remote MCP server restarts, so the check always passes and the
stale ``mcp-session-id`` is sent to the new server, which replies
with HTTP 404 → ``"Session terminated"``.

This subclass: (1) runs ``send_ping()`` after the parent returns a cached
session; (2) then revalidates the session with ``list_tools()`` so that
if the server restarted and the session id is invalid (404), we prune
the session from the cache and create a new one.
"""

async def _close_and_recreate_session(self, headers: dict[str, str] | None, reason: str) -> ClientSession:
"""Close the cached session (best-effort) and create a new one."""
logger.warning("%s", reason)
try:
await self.close()
except Exception as close_exc:
logger.debug("Non-fatal error while closing stale session: %s", close_exc)
return await super().create_session(headers)

async def create_session(self, headers: dict[str, str] | None = None) -> ClientSession:
session = await super().create_session(headers)

try:
await asyncio.wait_for(session.send_ping(), timeout=_PING_TIMEOUT_SECONDS)
except Exception as exc:
if _is_server_alive_error(exc):
pass
else:
return await self._close_and_recreate_session(
headers,
"MCP session failed ping validation, invalidating cached session and creating a fresh one",
)

try:
await asyncio.wait_for(session.list_tools(), timeout=_SESSION_REVALIDATE_TIMEOUT_SECONDS)
return session
except Exception as exc:
if _is_session_invalid_error(exc):
return await self._close_and_recreate_session(
headers,
"MCP session invalid (e.g. 404), pruning from cache and creating a fresh one",
)
raise
Comment on lines +70 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my one last concern is the length of these timeouts and the fact that this is added to a normal request path which may create additional latency in the request. If it fails I think we should fail fast and not wait to surface errors to the user.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_PING_TIMEOUT_SECONDS = 1.0
_SESSION_REVALIDATE_TIMEOUT_SECONDS = 2.0?

now:
same block (log warning → try/except around close() → return await super().create_session(headers)) is duplicated for ping failure and for list_tools session-invalid

so your suggestion that create_session:

  1. Gets a session from the parent
  2. Validates with ping, on failure (and not “method not found”) → return await self._close_and_recreate_session(...).
  3. Validates with list_tools, on session-invalid error → return await self._close_and_recreate_session(...), on other errors → re-raise.
  4. Returns the session when both checks pass
    so there’s one place for prune and recreate”

Did I got you right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the late reply, its more so to just shorten those timeouts so we fail fast if there is an error. I think the above timout limits make sense now.



def _enrich_cancelled_error(error: BaseException) -> asyncio.CancelledError:
message = "Failed to create MCP session: operation cancelled"
Expand All @@ -25,6 +105,13 @@ class KAgentMcpToolset(McpToolset):
implementation may not catch and propagate without enough context.
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._mcp_session_manager = KAgentMCPSessionManager(
connection_params=self._connection_params,
errlog=self._errlog,
)

async def get_tools(self, readonly_context: Optional[ReadonlyContext] = None) -> list[BaseTool]:
try:
return await super().get_tools(readonly_context)
Expand Down
Loading
Loading