diff --git a/packages/toolbox-core/pyproject.toml b/packages/toolbox-core/pyproject.toml index 68c490483..595652ed8 100644 --- a/packages/toolbox-core/pyproject.toml +++ b/packages/toolbox-core/pyproject.toml @@ -43,6 +43,11 @@ Repository = "https://github.com/googleapis/mcp-toolbox-sdk-python.git" Changelog = "https://github.com/googleapis/mcp-toolbox-sdk-python/blob/main/packages/toolbox-core/CHANGELOG.md" [project.optional-dependencies] +telemetry = [ + "opentelemetry-api>=1.20.0,<2.0.0", + "opentelemetry-sdk>=1.20.0,<2.0.0", + "opentelemetry-exporter-otlp>=1.20.0,<2.0.0" +] test = [ "black[jupyter]==26.1.0", "isort==8.0.0", diff --git a/packages/toolbox-core/src/toolbox_core/client.py b/packages/toolbox-core/src/toolbox_core/client.py index 79e598e64..65aecf1ae 100644 --- a/packages/toolbox-core/src/toolbox_core/client.py +++ b/packages/toolbox-core/src/toolbox_core/client.py @@ -28,6 +28,8 @@ McpHttpTransportV20250618, McpHttpTransportV20251125, ) +from . import version +from .mcp_transport.telemetry import TELEMETRY_AVAILABLE, setup_telemetry from .protocol import Protocol, ToolSchema from .tool import ToolboxTool from .utils import identify_auth_requirements, resolve_value, warn_if_http_and_headers @@ -54,6 +56,7 @@ def __init__( protocol: Protocol = Protocol.MCP, client_name: Optional[str] = None, client_version: Optional[str] = None, + telemetry_url: Optional[str] = None, ): """ Initializes the ToolboxClient. @@ -67,29 +70,70 @@ def __init__( client_headers: Headers to include in each request sent through this client. protocol: The communication protocol to use. + client_name: Optional client name for identification. + client_version: Optional client version for identification. + telemetry_url: Optional OTLP endpoint URL for sending telemetry + (e.g., "http://localhost:4318"). If provided, sets up an OTLP + tracer provider to export traces to this endpoint. """ + + # Setup OpenTelemetry (metrics and traces) if endpoint is provided + if telemetry_url: + try: + setup_telemetry( + service_name="toolbox", + service_version=version.__version__, + use_console_exporter=False, # Using OTLP endpoint instead + otlp_endpoint=telemetry_url, + ) + except Exception as e: + logging.warning(f"Failed to setup telemetry: {e}") + if protocol != Protocol.MCP_LATEST: logging.warning( f"A newer version of MCP ({Protocol.MCP_LATEST.value}) is available. " "Please use Protocol.MCP_LATEST to use the latest features." ) + # Telemetry is only enabled if URL is provided AND OpenTelemetry is available + telemetry_enabled = bool(telemetry_url) and TELEMETRY_AVAILABLE + match protocol: case Protocol.MCP_v20251125: self.__transport = McpHttpTransportV20251125( - url, session, protocol, client_name, client_version + url, + session, + protocol, + client_name, + client_version, + telemetry_enabled=telemetry_enabled, ) case Protocol.MCP_v20250618: self.__transport = McpHttpTransportV20250618( - url, session, protocol, client_name, client_version + url, + session, + protocol, + client_name, + client_version, + telemetry_enabled=telemetry_enabled, ) case Protocol.MCP_v20250326: self.__transport = McpHttpTransportV20250326( - url, session, protocol, client_name, client_version + url, + session, + protocol, + client_name, + client_version, + telemetry_enabled=telemetry_enabled, ) case Protocol.MCP_v20241105: self.__transport = McpHttpTransportV20241105( - url, session, protocol, client_name, client_version + url, + session, + protocol, + client_name, + client_version, + telemetry_enabled=telemetry_enabled, ) case _: raise ValueError(f"Unsupported MCP protocol version: {protocol}") diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/telemetry.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/telemetry.py new file mode 100644 index 000000000..a07b1e014 --- /dev/null +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/telemetry.py @@ -0,0 +1,533 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenTelemetry telemetry utilities for MCP protocol. + +This module implements telemetry following the MCP Semantic Conventions: +https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + +Note: OpenTelemetry is an optional dependency. Install with: + pip install toolbox-core[telemetry] +""" + +from typing import Optional +from urllib.parse import urlparse + +# Try to import OpenTelemetry - it's an optional dependency +try: + from opentelemetry import metrics, trace + from opentelemetry.metrics import Histogram, Meter + from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics._internal.aggregation import ( + ExplicitBucketHistogramAggregation, + ) + from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, + ) + from opentelemetry.sdk.metrics.view import View + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter + from opentelemetry.trace import SpanKind, Status, StatusCode, Tracer + from opentelemetry.trace.propagation.tracecontext import ( + TraceContextTextMapPropagator, + ) + + TELEMETRY_AVAILABLE = True +except ImportError: + TELEMETRY_AVAILABLE = False + Tracer = None # type: ignore + Meter = None # type: ignore + Histogram = None # type: ignore + +# Attribute names following MCP semantic conventions +ATTR_MCP_METHOD_NAME = "mcp.method.name" +ATTR_MCP_PROTOCOL_VERSION = "mcp.protocol.version" +ATTR_MCP_SESSION_ID = "mcp.session.id" +ATTR_ERROR_TYPE = "error.type" +ATTR_GEN_AI_TOOL_NAME = "gen_ai.tool.name" +ATTR_GEN_AI_OPERATION_NAME = "gen_ai.operation.name" +ATTR_GEN_AI_PROMPT_NAME = "gen_ai.prompt.name" +ATTR_SERVER_ADDRESS = "server.address" +ATTR_SERVER_PORT = "server.port" +ATTR_NETWORK_TRANSPORT = "network.transport" +ATTR_NETWORK_PROTOCOL_NAME = "network.protocol.name" + +# Metric names following MCP semantic conventions +METRIC_CLIENT_OPERATION_DURATION = "mcp.client.operation.duration" +METRIC_CLIENT_SESSION_DURATION = "mcp.client.session.duration" + +# Histogram bucket boundaries for MCP metrics (in seconds) +# As specified in: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#metrics +MCP_DURATION_BUCKETS = [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30, 60, 120, 300] + + +def setup_telemetry( + service_name: str = "toolbox", + service_version: Optional[str] = None, + use_console_exporter: bool = True, + otlp_endpoint: Optional[str] = None, +) -> None: + """Initialize OpenTelemetry with MeterProvider and TracerProvider. + + This function must be called before using any telemetry features to properly + configure metric and trace exporters. + + Args: + service_name: Name of the service for telemetry + service_version: Version of the service + use_console_exporter: If True, exports metrics/traces to console (for debugging) + otlp_endpoint: Optional OTLP endpoint URL (e.g., "http://localhost:4318") + + Raises: + RuntimeError: If OpenTelemetry is not installed + """ + if not TELEMETRY_AVAILABLE: + raise RuntimeError( + "Telemetry support requires OpenTelemetry. Install with: " + "pip install toolbox-core[telemetry]" + ) + # Create resource with service information + resource = Resource.create( + { + "service.name": service_name, + "service.version": service_version or "unknown", + } + ) + + # Set up MeterProvider for metrics + metric_readers = [] + + if use_console_exporter: + # Console exporter for debugging - prints metrics to stdout + console_reader = PeriodicExportingMetricReader( + ConsoleMetricExporter(), export_interval_millis=5000 + ) + metric_readers.append(console_reader) + + if otlp_endpoint: + # OTLP exporter for production use + otlp_metric_exporter = OTLPMetricExporter( + endpoint=f"{otlp_endpoint}/v1/metrics" + ) + otlp_reader = PeriodicExportingMetricReader( + otlp_metric_exporter, export_interval_millis=5000 + ) + metric_readers.append(otlp_reader) + + # Configure Views with MCP histogram bucket boundaries + # These Views ensure that the MCP duration metrics use the bucket boundaries + # specified in the MCP semantic conventions + mcp_histogram_aggregation = ExplicitBucketHistogramAggregation( + boundaries=MCP_DURATION_BUCKETS + ) + + views = [ + View( + instrument_name=METRIC_CLIENT_OPERATION_DURATION, + aggregation=mcp_histogram_aggregation, + ), + View( + instrument_name=METRIC_CLIENT_SESSION_DURATION, + aggregation=mcp_histogram_aggregation, + ), + ] + + meter_provider = MeterProvider( + resource=resource, + metric_readers=metric_readers, + views=views, + ) + metrics.set_meter_provider(meter_provider) + + # Set up TracerProvider for traces + tracer_provider = TracerProvider(resource=resource) + + if use_console_exporter: + # Console exporter for debugging - prints traces to stdout + console_span_processor = BatchSpanProcessor(ConsoleSpanExporter()) + tracer_provider.add_span_processor(console_span_processor) + + if otlp_endpoint: + # OTLP exporter for production use + otlp_span_exporter = OTLPSpanExporter(endpoint=f"{otlp_endpoint}/v1/traces") + otlp_span_processor = BatchSpanProcessor(otlp_span_exporter) + tracer_provider.add_span_processor(otlp_span_processor) + + trace.set_tracer_provider(tracer_provider) + + +def get_tracer( + name: str = "toolbox", version: Optional[str] = None +) -> Optional[Tracer]: + """Get or create a tracer for MCP operations. + + Args: + name: The tracer name + version: The tracer version + + Returns: + An OpenTelemetry Tracer instance, or None if telemetry is not available + + Raises: + RuntimeError: If OpenTelemetry is not installed + """ + if not TELEMETRY_AVAILABLE: + raise RuntimeError( + "Telemetry support requires OpenTelemetry. Install with: " + "pip install toolbox-core[telemetry]" + ) + return trace.get_tracer(name, version) + + +def get_meter(name: str = "toolbox", version: Optional[str] = None) -> Optional[Meter]: + """Get or create a meter for MCP metrics. + + Args: + name: The meter name + version: The meter version + + Returns: + An OpenTelemetry Meter instance, or None if telemetry is not available + + Raises: + RuntimeError: If OpenTelemetry is not installed + """ + if not TELEMETRY_AVAILABLE: + raise RuntimeError( + "Telemetry support requires OpenTelemetry. Install with: " + "pip install toolbox-core[telemetry]" + ) + return metrics.get_meter(name, version) + + +def create_operation_duration_histogram(meter: Meter) -> Optional[Histogram]: + """Create histogram for MCP client operation duration. + + Bucket boundaries are configured via Views in setup_telemetry() to match + the MCP semantic conventions. + + Args: + meter: The OpenTelemetry meter + + Returns: + Histogram instance or None if creation failed + """ + try: + return meter.create_histogram( + name=METRIC_CLIENT_OPERATION_DURATION, + unit="s", + description="Duration of MCP client operations (requests/notifications)", + ) + except Exception: + return None + + +def create_session_duration_histogram(meter: Meter) -> Optional[Histogram]: + """Create histogram for MCP client session duration. + + Bucket boundaries are configured via Views in setup_telemetry() to match + the MCP semantic conventions. + + Args: + meter: The OpenTelemetry meter + + Returns: + Histogram instance or None if creation failed + """ + try: + return meter.create_histogram( + name=METRIC_CLIENT_SESSION_DURATION, + unit="s", + description="Total duration of MCP client sessions", + ) + except Exception: + return None + + +def extract_server_info(url: str) -> tuple[str, Optional[int], str]: + """Extract server address, port, and protocol from URL. + + Args: + url: The server URL + + Returns: + Tuple of (server_address, server_port, protocol_name) + """ + parsed = urlparse(url) + protocol_name = parsed.scheme if parsed.scheme else "http" + return parsed.hostname or parsed.netloc, parsed.port, protocol_name + + +def create_traceparent_from_context() -> str: + """Create W3C traceparent header from current trace context. + + Returns: + W3C traceparent header string in format: + version-trace_id-parent_id-trace_flags + Example: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 + """ + propagator = TraceContextTextMapPropagator() + carrier: dict[str, str] = {} + propagator.inject(carrier) + return carrier.get("traceparent", "") + + +def create_tracestate_from_context() -> str: + """Create W3C tracestate header from current trace context. + + Returns: + W3C tracestate header string + """ + propagator = TraceContextTextMapPropagator() + carrier: dict[str, str] = {} + propagator.inject(carrier) + return carrier.get("tracestate", "") + + +def start_span( + tracer: Tracer, + method_name: str, + protocol_version: str, + server_url: str, + tool_name: Optional[str] = None, + network_transport: Optional[str] = None, +) -> Optional[trace.Span]: + """Start a telemetry span for MCP operations. Returns None if telemetry fails. + + Args: + tracer: The OpenTelemetry tracer + method_name: The MCP method name (e.g., "tools/call", "tools/list") + protocol_version: The MCP protocol version + server_url: The MCP server URL + tool_name: Optional tool name for tools/call operations + network_transport: Optional network transport type ("tcp" for HTTP/HTTPS, "pipe" for stdio) + + Returns: + The started span, or None if telemetry failed + """ + try: + span_name = f"{method_name} {tool_name}" if tool_name else method_name + span = tracer.start_span(span_name, kind=SpanKind.CLIENT) + + # Required: MCP method name + span.set_attribute(ATTR_MCP_METHOD_NAME, method_name) + span.set_attribute(ATTR_MCP_PROTOCOL_VERSION, protocol_version) + + # Extract server info and network protocol from URL + server_address, server_port, protocol_name = extract_server_info(server_url) + span.set_attribute(ATTR_SERVER_ADDRESS, server_address) + span.set_attribute(ATTR_NETWORK_PROTOCOL_NAME, protocol_name) + if server_port: + span.set_attribute(ATTR_SERVER_PORT, server_port) + + # Network transport ("tcp" for HTTP/HTTPS, "pipe" for stdio) + if network_transport: + span.set_attribute(ATTR_NETWORK_TRANSPORT, network_transport) + + if tool_name: + span.set_attribute(ATTR_GEN_AI_TOOL_NAME, tool_name) + if method_name == "tools/call": + span.set_attribute(ATTR_GEN_AI_OPERATION_NAME, "execute_tool") + + return span + except Exception: + # Telemetry failed - continue without it + return None + + +def end_span(span: Optional[trace.Span], error: Optional[Exception] = None) -> None: + """End a telemetry span. Safe to call with None span. + + Args: + span: The span to end (can be None if telemetry failed) + error: Optional exception if operation failed + """ + if span is None: + return + try: + if error: + span.set_status(Status(StatusCode.ERROR, str(error))) + span.set_attribute(ATTR_ERROR_TYPE, type(error).__name__) + span.end() + except Exception: + # Ignore telemetry errors + pass + + +def record_error_from_jsonrpc( + span: trace.Span, error_code: int, error_message: str +) -> None: + """Record error information from JSON-RPC error response. + + Args: + span: The span to record the error on + error_code: The JSON-RPC error code + error_message: The JSON-RPC error message + """ + span.set_status(Status(StatusCode.ERROR, error_message)) + span.set_attribute(ATTR_ERROR_TYPE, f"jsonrpc.error.{error_code}") + + +def record_operation_duration( + histogram: Optional[Histogram], + duration_seconds: float, + method_name: str, + protocol_version: str, + server_url: str, + tool_name: Optional[str] = None, + network_transport: Optional[str] = None, + error: Optional[Exception] = None, +) -> None: + """Record MCP client operation duration metric. + + Args: + histogram: The operation duration histogram (can be None if metrics failed) + duration_seconds: Duration of the operation in seconds + method_name: The MCP method name (required attribute) + protocol_version: The MCP protocol version (recommended attribute) + server_url: The MCP server URL (for extracting server address/port) + tool_name: Optional tool name for tools/call operations + network_transport: Optional network transport type ("tcp" for HTTP/HTTPS) + error: Optional exception if operation failed (for error.type attribute) + """ + if histogram is None: + return + + try: + # Build attributes dict following MCP semantic conventions + attributes = { + ATTR_MCP_METHOD_NAME: method_name, + ATTR_MCP_PROTOCOL_VERSION: protocol_version, + } + + # Extract and add server info + server_address, server_port, protocol_name = extract_server_info(server_url) + attributes[ATTR_SERVER_ADDRESS] = server_address + attributes[ATTR_NETWORK_PROTOCOL_NAME] = protocol_name + if server_port: + attributes[ATTR_SERVER_PORT] = server_port + + # Add optional network transport + if network_transport: + attributes[ATTR_NETWORK_TRANSPORT] = network_transport + + # Add tool-related attributes for tools/call operations + if tool_name: + attributes[ATTR_GEN_AI_TOOL_NAME] = tool_name + if method_name == "tools/call": + attributes[ATTR_GEN_AI_OPERATION_NAME] = "execute_tool" + + # Add error type if operation failed + if error: + attributes[ATTR_ERROR_TYPE] = type(error).__name__ + + histogram.record(duration_seconds, attributes) + except Exception: + # Ignore metrics recording errors + pass + + +def record_session_duration( + histogram: Optional[Histogram], + duration_seconds: float, + protocol_version: str, + server_url: str, + network_transport: Optional[str] = None, + error: Optional[Exception] = None, +) -> None: + """Record MCP client session duration metric. + + Args: + histogram: The session duration histogram (can be None if metrics failed) + duration_seconds: Duration of the session in seconds + protocol_version: The MCP protocol version (recommended attribute) + server_url: The MCP server URL (for extracting server address/port) + network_transport: Optional network transport type ("tcp" for HTTP/HTTPS) + error: Optional exception if session ended with error + """ + if histogram is None: + return + + try: + # Build attributes dict following MCP semantic conventions + attributes = { + ATTR_MCP_PROTOCOL_VERSION: protocol_version, + } + + # Extract and add server info + server_address, server_port, protocol_name = extract_server_info(server_url) + attributes[ATTR_SERVER_ADDRESS] = server_address + attributes[ATTR_NETWORK_PROTOCOL_NAME] = protocol_name + if server_port: + attributes[ATTR_SERVER_PORT] = server_port + + # Add optional network transport + if network_transport: + attributes[ATTR_NETWORK_TRANSPORT] = network_transport + + # Add error type if session ended with error + if error: + attributes[ATTR_ERROR_TYPE] = type(error).__name__ + + histogram.record(duration_seconds, attributes) + except Exception: + # Ignore metrics recording errors + pass + + +def setup_otlp_tracer_provider( + otlp_endpoint: str, service_name: str = "toolbox" +) -> None: + """Setup OTLP tracer provider to send telemetry to an OTLP endpoint. + + Args: + otlp_endpoint: The OTLP endpoint URL (e.g., "http://localhost:4318") + service_name: The service name for the resource + + Raises: + RuntimeError: If OpenTelemetry is not installed + """ + if not TELEMETRY_AVAILABLE: + raise RuntimeError( + "Telemetry support requires OpenTelemetry. Install with: " + "pip install toolbox-core[telemetry]" + ) + # Create resource with service name + resource = Resource.create({"service.name": service_name}) + + # Create OTLP exporter with the provided endpoint + # The endpoint should include the full path for HTTP: http://host:port/v1/traces + # If only host:port is provided, append the traces path + if not otlp_endpoint.endswith("/v1/traces"): + if otlp_endpoint.endswith("/"): + otlp_endpoint = otlp_endpoint + "v1/traces" + else: + otlp_endpoint = otlp_endpoint + "/v1/traces" + + otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint) + + # Create tracer provider with resource + tracer_provider = TracerProvider(resource=resource) + + # Add batch span processor with OTLP exporter + tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) + + # Set as global tracer provider + trace.set_tracer_provider(tracer_provider) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/transport_base.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/transport_base.py index 5a7b3d0e8..bd01075e6 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/transport_base.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/transport_base.py @@ -38,6 +38,7 @@ def __init__( protocol: Protocol = Protocol.MCP, client_name: Optional[str] = None, client_version: Optional[str] = None, + telemetry_enabled: bool = False, ): self._mcp_base_url = f"{base_url}/mcp/" self._protocol_version = protocol.value @@ -45,6 +46,7 @@ def __init__( self._client_name = client_name self._client_version = client_version + self._telemetry_enabled = telemetry_enabled self._manage_session = session is None self._session = session or ClientSession() diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/mcp.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/mcp.py index cffa8b244..84685d52d 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/mcp.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/mcp.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Mapping, Optional, TypeVar from pydantic import BaseModel from ... import version from ...protocol import ManifestSchema +from .. import telemetry from ..transport_base import _McpHttpTransportBase from . import types @@ -27,6 +29,27 @@ class McpHttpTransportV20241105(_McpHttpTransportBase): """Transport for the MCP v2024-11-05 protocol.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if self._telemetry_enabled: + self._tracer = telemetry.get_tracer("toolbox", version.__version__) + + # Initialize metrics following MCP semantic conventions + meter = telemetry.get_meter("toolbox", version.__version__) + self._operation_duration_histogram = ( + telemetry.create_operation_duration_histogram(meter) + ) + self._session_duration_histogram = ( + telemetry.create_session_duration_histogram(meter) + ) + self._session_start_time: Optional[float] = None + else: + self._tracer = None + self._operation_duration_histogram = None + self._session_duration_histogram = None + self._session_start_time = None + async def _send_request( self, url: str, @@ -85,6 +108,34 @@ async def _initialize_session( self, headers: Optional[Mapping[str, str]] = None ) -> None: """Initializes the MCP session.""" + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Track session start time for session duration metric + self._session_start_time = time.time() + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + params = types.InitializeRequestParams( protocolVersion=self._protocol_version, capabilities=types.ClientCapabilities(), @@ -92,32 +143,56 @@ async def _initialize_session( name=self._client_name or "toolbox-core-python", version=self._client_version or version.__version__, ), + field_meta=meta, ) - result = await self._send_request( - url=self._mcp_base_url, - request=types.InitializeRequest(params=params), - headers=headers, - ) + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.InitializeRequest(params=params), + headers=headers, + ) + + if result is None: + raise RuntimeError( + "Failed to initialize session: No response from server." + ) + + self._server_version = result.serverInfo.version + + if result.protocolVersion != self._protocol_version: + raise RuntimeError( + f"MCP version mismatch: client does not support server version {result.protocolVersion}" + ) - if result is None: - raise RuntimeError("Failed to initialize session: No response from server.") + if not result.capabilities.tools: + if self._manage_session: + await self.close() + raise RuntimeError("Server does not support the 'tools' capability.") - self._server_version = result.serverInfo.version - if result.protocolVersion != self._protocol_version: - raise RuntimeError( - f"MCP version mismatch: client does not support server version {result.protocolVersion}" + await self._send_request( + url=self._mcp_base_url, + request=types.InitializedNotification(), + headers=headers, ) - if not result.capabilities.tools: - if self._manage_session: - await self.close() - raise RuntimeError("Server does not support the 'tools' capability.") - - await self._send_request( - url=self._mcp_base_url, - request=types.InitializedNotification(), - headers=headers, - ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + error=error, + ) + telemetry.end_span(span, error=error) async def tools_list( self, @@ -128,20 +203,73 @@ async def tools_list( await self._ensure_initialized(headers=headers) url = self._mcp_base_url + (toolset_name if toolset_name else "") - result = await self._send_request( - url=url, request=types.ListToolsRequest(), headers=headers - ) - if result is None: - raise RuntimeError("Failed to list tools: No response from server.") - tools_map = { - t.name: self._convert_tool_schema(t.model_dump(mode="json", by_alias=True)) - for t in result.tools - } - if self._server_version is None: - raise RuntimeError("Server version not available.") + meta: Optional[types.MCPMeta] = None - return ManifestSchema(serverVersion=self._server_version, tools=tools_map) + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=url, + request=types.ListToolsRequest( + params=types.ListToolsRequestParams(field_meta=meta) + ), + headers=headers, + ) + if result is None: + raise RuntimeError("Failed to list tools: No response from server.") + + tools_map = { + t.name: self._convert_tool_schema( + t.model_dump(mode="json", by_alias=True) + ) + for t in result.tools + } + if self._server_version is None: + raise RuntimeError("Server version not available.") + + return ManifestSchema(serverVersion=self._server_version, tools=tools_map) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) async def tool_get( self, tool_name: str, headers: Optional[Mapping[str, str]] = None @@ -157,22 +285,90 @@ async def tool_get( tools={tool_name: manifest.tools[tool_name]}, ) + async def close(self): + """Closes the MCP session and records session duration metric.""" + if self._telemetry_enabled: + # Record session duration if session was initialized + if self._session_start_time is not None: + session_duration = time.time() - self._session_start_time + telemetry.record_session_duration( + self._session_duration_histogram, + session_duration, + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + # Call parent's close method + await super().close() + async def tool_invoke( self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] ) -> str: """Invokes a specific tool on the server using the MCP protocol.""" await self._ensure_initialized(headers=headers) - result = await self._send_request( - url=self._mcp_base_url, - request=types.CallToolRequest( - params=types.CallToolRequestParams(name=tool_name, arguments=arguments) - ), - headers=headers, - ) - if result is None: - raise RuntimeError( - f"Failed to invoke tool '{tool_name}': No response from server." + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.CallToolRequest( + params=types.CallToolRequestParams( + name=tool_name, arguments=arguments, field_meta=meta + ) + ), + headers=headers, ) - return self._process_tool_result_content(result.content) + if result is None: + raise RuntimeError( + f"Failed to invoke tool '{tool_name}': No response from server." + ) + + return self._process_tool_result_content(result.content) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/types.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/types.py index 5cfca277a..646d5b271 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/types.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/types.py @@ -77,6 +77,9 @@ class InitializeRequestParams(RequestParams): protocolVersion: str capabilities: ClientCapabilities clientInfo: Implementation + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") class ServerCapabilities(_BaseMCPModel): @@ -100,6 +103,12 @@ class ListToolsResult(_BaseMCPModel): tools: list[Tool] +class ListToolsRequestParams(_BaseMCPModel): + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") + + class TextContent(_BaseMCPModel): type: Literal["text"] text: str @@ -141,15 +150,25 @@ class InitializedNotification(MCPNotification): class ListToolsRequest(MCPRequest[ListToolsResult]): method: Literal["tools/list"] = "tools/list" - params: dict[str, Any] = {} + params: ListToolsRequestParams = Field(default_factory=ListToolsRequestParams) def get_result_model(self) -> Type[ListToolsResult]: return ListToolsResult +class MCPMeta(_BaseMCPModel): + """Metadata for MCP requests including OpenTelemetry trace context.""" + + traceparent: str | None = None + tracestate: str | None = None + + class CallToolRequestParams(_BaseMCPModel): name: str arguments: dict[str, Any] + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: MCPMeta | None = Field(default=None, serialization_alias="_meta") class CallToolRequest(MCPRequest[CallToolResult]): diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/mcp.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/mcp.py index 14023a2a9..d9045a631 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/mcp.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/mcp.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Mapping, Optional, TypeVar from pydantic import BaseModel from ... import version from ...protocol import ManifestSchema +from .. import telemetry from ..transport_base import _McpHttpTransportBase from . import types @@ -31,6 +33,24 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._session_id: Optional[str] = None + if self._telemetry_enabled: + self._tracer = telemetry.get_tracer("toolbox", version.__version__) + + # Initialize metrics following MCP semantic conventions + meter = telemetry.get_meter("toolbox", version.__version__) + self._operation_duration_histogram = ( + telemetry.create_operation_duration_histogram(meter) + ) + self._session_duration_histogram = ( + telemetry.create_session_duration_histogram(meter) + ) + self._session_start_time: Optional[float] = None + else: + self._tracer = None + self._operation_duration_histogram = None + self._session_duration_histogram = None + self._session_start_time = None + async def _send_request( self, url: str, @@ -101,6 +121,34 @@ async def _initialize_session( self, headers: Optional[Mapping[str, str]] = None ) -> None: """Initializes the MCP session.""" + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Track session start time for session duration metric + self._session_start_time = time.time() + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + params = types.InitializeRequestParams( protocolVersion=self._protocol_version, capabilities=types.ClientCapabilities(), @@ -108,45 +156,67 @@ async def _initialize_session( name=self._client_name or "toolbox-core-python", version=self._client_version or version.__version__, ), + field_meta=meta, ) - result = await self._send_request( - url=self._mcp_base_url, - request=types.InitializeRequest(params=params), - headers=headers, - ) + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.InitializeRequest(params=params), + headers=headers, + ) + + if result is None: + raise RuntimeError( + "Failed to initialize session: No response from server." + ) - if result is None: - raise RuntimeError("Failed to initialize session: No response from server.") + self._server_version = result.serverInfo.version - self._server_version = result.serverInfo.version + if result.protocolVersion != self._protocol_version: + raise RuntimeError( + "MCP version mismatch: client does not support server version" + f" {result.protocolVersion}" + ) - if result.protocolVersion != self._protocol_version: - raise RuntimeError( - "MCP version mismatch: client does not support server version" - f" {result.protocolVersion}" - ) + if not result.capabilities.tools: + if self._manage_session: + await self.close() + raise RuntimeError("Server does not support the 'tools' capability.") - if not result.capabilities.tools: - if self._manage_session: - await self.close() - raise RuntimeError("Server does not support the 'tools' capability.") + # Extract session ID from extra fields (v2025-03-26 specific) + # Session ID is captured from headers in _send_request - # Extract session ID from extra fields (v2025-03-26 specific) - # Session ID is captured from headers in _send_request + if not self._session_id: + if self._manage_session: + await self.close() + raise RuntimeError( + "Server did not return a Mcp-Session-Id during initialization." + ) - if not self._session_id: - if self._manage_session: - await self.close() - raise RuntimeError( - "Server did not return a Mcp-Session-Id during initialization." + await self._send_request( + url=self._mcp_base_url, + request=types.InitializedNotification(), + headers=headers, ) - - await self._send_request( - url=self._mcp_base_url, - request=types.InitializedNotification(), - headers=headers, - ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + error=error, + ) + telemetry.end_span(span, error=error) async def tools_list( self, @@ -157,23 +227,76 @@ async def tools_list( await self._ensure_initialized(headers=headers) url = self._mcp_base_url + (toolset_name if toolset_name else "") - result = await self._send_request( - url=url, request=types.ListToolsRequest(), headers=headers - ) - if result is None: - raise RuntimeError("Failed to list tools: No response from server.") - tools_map = { - t.name: self._convert_tool_schema(t.model_dump(mode="json", by_alias=True)) - for t in result.tools - } - if self._server_version is None: - raise RuntimeError("Server version not available.") + meta: Optional[types.MCPMeta] = None - return ManifestSchema( - serverVersion=self._server_version, - tools=tools_map, - ) + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=url, + request=types.ListToolsRequest( + params=types.ListToolsRequestParams(field_meta=meta) + ), + headers=headers, + ) + if result is None: + raise RuntimeError("Failed to list tools: No response from server.") + + tools_map = { + t.name: self._convert_tool_schema( + t.model_dump(mode="json", by_alias=True) + ) + for t in result.tools + } + if self._server_version is None: + raise RuntimeError("Server version not available.") + + return ManifestSchema( + serverVersion=self._server_version, + tools=tools_map, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) async def tool_get( self, tool_name: str, headers: Optional[Mapping[str, str]] = None @@ -189,23 +312,90 @@ async def tool_get( tools={tool_name: manifest.tools[tool_name]}, ) + async def close(self): + """Closes the MCP session and records session duration metric.""" + if self._telemetry_enabled: + # Record session duration if session was initialized + if self._session_start_time is not None: + session_duration = time.time() - self._session_start_time + telemetry.record_session_duration( + self._session_duration_histogram, + session_duration, + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + # Call parent's close method + await super().close() + async def tool_invoke( self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] ) -> str: """Invokes a specific tool on the server using the MCP protocol.""" await self._ensure_initialized(headers=headers) - result = await self._send_request( - url=self._mcp_base_url, - request=types.CallToolRequest( - params=types.CallToolRequestParams(name=tool_name, arguments=arguments) - ), - headers=headers, - ) + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) - if result is None: - raise RuntimeError( - f"Failed to invoke tool '{tool_name}': No response from server." + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.CallToolRequest( + params=types.CallToolRequestParams( + name=tool_name, arguments=arguments, field_meta=meta + ) + ), + headers=headers, ) - return self._process_tool_result_content(result.content) + if result is None: + raise RuntimeError( + f"Failed to invoke tool '{tool_name}': No response from server." + ) + + return self._process_tool_result_content(result.content) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/types.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/types.py index 5cfca277a..646d5b271 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/types.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/types.py @@ -77,6 +77,9 @@ class InitializeRequestParams(RequestParams): protocolVersion: str capabilities: ClientCapabilities clientInfo: Implementation + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") class ServerCapabilities(_BaseMCPModel): @@ -100,6 +103,12 @@ class ListToolsResult(_BaseMCPModel): tools: list[Tool] +class ListToolsRequestParams(_BaseMCPModel): + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") + + class TextContent(_BaseMCPModel): type: Literal["text"] text: str @@ -141,15 +150,25 @@ class InitializedNotification(MCPNotification): class ListToolsRequest(MCPRequest[ListToolsResult]): method: Literal["tools/list"] = "tools/list" - params: dict[str, Any] = {} + params: ListToolsRequestParams = Field(default_factory=ListToolsRequestParams) def get_result_model(self) -> Type[ListToolsResult]: return ListToolsResult +class MCPMeta(_BaseMCPModel): + """Metadata for MCP requests including OpenTelemetry trace context.""" + + traceparent: str | None = None + tracestate: str | None = None + + class CallToolRequestParams(_BaseMCPModel): name: str arguments: dict[str, Any] + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: MCPMeta | None = Field(default=None, serialization_alias="_meta") class CallToolRequest(MCPRequest[CallToolResult]): diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/mcp.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/mcp.py index 81e0bc183..12c507c7c 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/mcp.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/mcp.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Mapping, Optional, TypeVar from pydantic import BaseModel from ... import version from ...protocol import ManifestSchema +from .. import telemetry from ..transport_base import _McpHttpTransportBase from . import types @@ -27,6 +29,27 @@ class McpHttpTransportV20250618(_McpHttpTransportBase): """Transport for the MCP v2025-06-18 protocol.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if self._telemetry_enabled: + self._tracer = telemetry.get_tracer("toolbox", version.__version__) + + # Initialize metrics following MCP semantic conventions + meter = telemetry.get_meter("toolbox", version.__version__) + self._operation_duration_histogram = ( + telemetry.create_operation_duration_histogram(meter) + ) + self._session_duration_histogram = ( + telemetry.create_session_duration_histogram(meter) + ) + self._session_start_time: Optional[float] = None + else: + self._tracer = None + self._operation_duration_histogram = None + self._session_duration_histogram = None + self._session_start_time = None + async def _send_request( self, url: str, @@ -92,6 +115,34 @@ async def _initialize_session( self, headers: Optional[Mapping[str, str]] = None ) -> None: """Initializes the MCP session.""" + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Track session start time for session duration metric + self._session_start_time = time.time() + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + params = types.InitializeRequestParams( protocolVersion=self._protocol_version, capabilities=types.ClientCapabilities(), @@ -99,35 +150,57 @@ async def _initialize_session( name=self._client_name or "toolbox-core-python", version=self._client_version or version.__version__, ), + field_meta=meta, ) - result = await self._send_request( - url=self._mcp_base_url, - request=types.InitializeRequest(params=params), - headers=headers, - ) + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.InitializeRequest(params=params), + headers=headers, + ) - if result is None: - raise RuntimeError("Failed to initialize session: No response from server.") + if result is None: + raise RuntimeError( + "Failed to initialize session: No response from server." + ) - self._server_version = result.serverInfo.version + self._server_version = result.serverInfo.version - if result.protocolVersion != self._protocol_version: - raise RuntimeError( - "MCP version mismatch: client does not support server version" - f" {result.protocolVersion}" - ) + if result.protocolVersion != self._protocol_version: + raise RuntimeError( + "MCP version mismatch: client does not support server version" + f" {result.protocolVersion}" + ) - if not result.capabilities.tools: - if self._manage_session: - await self.close() - raise RuntimeError("Server does not support the 'tools' capability.") + if not result.capabilities.tools: + if self._manage_session: + await self.close() + raise RuntimeError("Server does not support the 'tools' capability.") - await self._send_request( - url=self._mcp_base_url, - request=types.InitializedNotification(), - headers=headers, - ) + await self._send_request( + url=self._mcp_base_url, + request=types.InitializedNotification(), + headers=headers, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + error=error, + ) + telemetry.end_span(span, error=error) async def tools_list( self, @@ -138,23 +211,76 @@ async def tools_list( await self._ensure_initialized(headers=headers) url = self._mcp_base_url + (toolset_name if toolset_name else "") - result = await self._send_request( - url=url, request=types.ListToolsRequest(), headers=headers - ) - if result is None: - raise RuntimeError("Failed to list tools: No response from server.") - tools_map = { - t.name: self._convert_tool_schema(t.model_dump(mode="json", by_alias=True)) - for t in result.tools - } - if self._server_version is None: - raise RuntimeError("Server version not available.") + meta: Optional[types.MCPMeta] = None - return ManifestSchema( - serverVersion=self._server_version, - tools=tools_map, - ) + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=url, + request=types.ListToolsRequest( + params=types.ListToolsRequestParams(field_meta=meta) + ), + headers=headers, + ) + if result is None: + raise RuntimeError("Failed to list tools: No response from server.") + + tools_map = { + t.name: self._convert_tool_schema( + t.model_dump(mode="json", by_alias=True) + ) + for t in result.tools + } + if self._server_version is None: + raise RuntimeError("Server version not available.") + + return ManifestSchema( + serverVersion=self._server_version, + tools=tools_map, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) async def tool_get( self, tool_name: str, headers: Optional[Mapping[str, str]] = None @@ -170,23 +296,90 @@ async def tool_get( tools={tool_name: manifest.tools[tool_name]}, ) + async def close(self): + """Closes the MCP session and records session duration metric.""" + if self._telemetry_enabled: + # Record session duration if session was initialized + if self._session_start_time is not None: + session_duration = time.time() - self._session_start_time + telemetry.record_session_duration( + self._session_duration_histogram, + session_duration, + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + # Call parent's close method + await super().close() + async def tool_invoke( self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] ) -> str: """Invokes a specific tool on the server using the MCP protocol.""" await self._ensure_initialized(headers=headers) - result = await self._send_request( - url=self._mcp_base_url, - request=types.CallToolRequest( - params=types.CallToolRequestParams(name=tool_name, arguments=arguments) - ), - headers=headers, - ) + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) - if result is None: - raise RuntimeError( - f"Failed to invoke tool '{tool_name}': No response from server." + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.CallToolRequest( + params=types.CallToolRequestParams( + name=tool_name, arguments=arguments, field_meta=meta + ) + ), + headers=headers, ) - return self._process_tool_result_content(result.content) + if result is None: + raise RuntimeError( + f"Failed to invoke tool '{tool_name}': No response from server." + ) + + return self._process_tool_result_content(result.content) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/types.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/types.py index 5cfca277a..646d5b271 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/types.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/types.py @@ -77,6 +77,9 @@ class InitializeRequestParams(RequestParams): protocolVersion: str capabilities: ClientCapabilities clientInfo: Implementation + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") class ServerCapabilities(_BaseMCPModel): @@ -100,6 +103,12 @@ class ListToolsResult(_BaseMCPModel): tools: list[Tool] +class ListToolsRequestParams(_BaseMCPModel): + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") + + class TextContent(_BaseMCPModel): type: Literal["text"] text: str @@ -141,15 +150,25 @@ class InitializedNotification(MCPNotification): class ListToolsRequest(MCPRequest[ListToolsResult]): method: Literal["tools/list"] = "tools/list" - params: dict[str, Any] = {} + params: ListToolsRequestParams = Field(default_factory=ListToolsRequestParams) def get_result_model(self) -> Type[ListToolsResult]: return ListToolsResult +class MCPMeta(_BaseMCPModel): + """Metadata for MCP requests including OpenTelemetry trace context.""" + + traceparent: str | None = None + tracestate: str | None = None + + class CallToolRequestParams(_BaseMCPModel): name: str arguments: dict[str, Any] + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: MCPMeta | None = Field(default=None, serialization_alias="_meta") class CallToolRequest(MCPRequest[CallToolResult]): diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/mcp.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/mcp.py index 81ab05cbb..87080c2bd 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/mcp.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/mcp.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Mapping, Optional, TypeVar from pydantic import BaseModel from ... import version from ...protocol import ManifestSchema +from .. import telemetry from ..transport_base import _McpHttpTransportBase from . import types @@ -27,6 +29,27 @@ class McpHttpTransportV20251125(_McpHttpTransportBase): """Transport for the MCP v2025-11-25 protocol.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if self._telemetry_enabled: + self._tracer = telemetry.get_tracer("toolbox", version.__version__) + + # Initialize metrics following MCP semantic conventions + meter = telemetry.get_meter("toolbox", version.__version__) + self._operation_duration_histogram = ( + telemetry.create_operation_duration_histogram(meter) + ) + self._session_duration_histogram = ( + telemetry.create_session_duration_histogram(meter) + ) + self._session_start_time: Optional[float] = None + else: + self._tracer = None + self._operation_duration_histogram = None + self._session_duration_histogram = None + self._session_start_time = None + async def _send_request( self, url: str, @@ -38,7 +61,7 @@ async def _send_request( req_headers["MCP-Protocol-Version"] = self._protocol_version params = ( - request.params.model_dump(mode="json", exclude_none=True) + request.params.model_dump(mode="json", exclude_none=True, by_alias=True) if isinstance(request.params, BaseModel) else request.params ) @@ -92,6 +115,34 @@ async def _initialize_session( self, headers: Optional[Mapping[str, str]] = None ) -> None: """Initializes the MCP session.""" + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Track session start time for session duration metric + self._session_start_time = time.time() + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + params = types.InitializeRequestParams( protocolVersion=self._protocol_version, capabilities=types.ClientCapabilities(), @@ -99,35 +150,57 @@ async def _initialize_session( name=self._client_name or "toolbox-core-python", version=self._client_version or version.__version__, ), + field_meta=meta, ) - result = await self._send_request( - url=self._mcp_base_url, - request=types.InitializeRequest(params=params), - headers=headers, - ) + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.InitializeRequest(params=params), + headers=headers, + ) - if result is None: - raise RuntimeError("Failed to initialize session: No response from server.") + if result is None: + raise RuntimeError( + "Failed to initialize session: No response from server." + ) - self._server_version = result.serverInfo.version + self._server_version = result.serverInfo.version - if result.protocolVersion != self._protocol_version: - raise RuntimeError( - "MCP version mismatch: client does not support server version" - f" {result.protocolVersion}" - ) + if result.protocolVersion != self._protocol_version: + raise RuntimeError( + "MCP version mismatch: client does not support server version" + f" {result.protocolVersion}" + ) - if not result.capabilities.tools: - if self._manage_session: - await self.close() - raise RuntimeError("Server does not support the 'tools' capability.") + if not result.capabilities.tools: + if self._manage_session: + await self.close() + raise RuntimeError("Server does not support the 'tools' capability.") - await self._send_request( - url=self._mcp_base_url, - request=types.InitializedNotification(), - headers=headers, - ) + await self._send_request( + url=self._mcp_base_url, + request=types.InitializedNotification(), + headers=headers, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + error=error, + ) + telemetry.end_span(span, error=error) async def tools_list( self, @@ -138,23 +211,76 @@ async def tools_list( await self._ensure_initialized(headers=headers) url = self._mcp_base_url + (toolset_name if toolset_name else "") - result = await self._send_request( - url=url, request=types.ListToolsRequest(), headers=headers - ) - if result is None: - raise RuntimeError("Failed to list tools: No response from server.") - tools_map = { - t.name: self._convert_tool_schema(t.model_dump(mode="json", by_alias=True)) - for t in result.tools - } - if self._server_version is None: - raise RuntimeError("Server version not available.") + meta: Optional[types.MCPMeta] = None - return ManifestSchema( - serverVersion=self._server_version, - tools=tools_map, - ) + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=url, + request=types.ListToolsRequest( + params=types.ListToolsRequestParams(field_meta=meta) + ), + headers=headers, + ) + if result is None: + raise RuntimeError("Failed to list tools: No response from server.") + + tools_map = { + t.name: self._convert_tool_schema( + t.model_dump(mode="json", by_alias=True) + ) + for t in result.tools + } + if self._server_version is None: + raise RuntimeError("Server version not available.") + + return ManifestSchema( + serverVersion=self._server_version, + tools=tools_map, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) async def tool_get( self, tool_name: str, headers: Optional[Mapping[str, str]] = None @@ -170,23 +296,90 @@ async def tool_get( tools={tool_name: manifest.tools[tool_name]}, ) + async def close(self): + """Closes the MCP session and records session duration metric.""" + if self._telemetry_enabled: + # Record session duration if session was initialized + if self._session_start_time is not None: + session_duration = time.time() - self._session_start_time + telemetry.record_session_duration( + self._session_duration_histogram, + session_duration, + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + # Call parent's close method + await super().close() + async def tool_invoke( self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] ) -> str: """Invokes a specific tool on the server using the MCP protocol.""" await self._ensure_initialized(headers=headers) - result = await self._send_request( - url=self._mcp_base_url, - request=types.CallToolRequest( - params=types.CallToolRequestParams(name=tool_name, arguments=arguments) - ), - headers=headers, - ) + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) - if result is None: - raise RuntimeError( - f"Failed to invoke tool '{tool_name}': No response from server." + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.CallToolRequest( + params=types.CallToolRequestParams( + name=tool_name, arguments=arguments, field_meta=meta + ) + ), + headers=headers, ) - return self._process_tool_result_content(result.content) + if result is None: + raise RuntimeError( + f"Failed to invoke tool '{tool_name}': No response from server." + ) + + return self._process_tool_result_content(result.content) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/types.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/types.py index 4cbcfa992..443f5a474 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/types.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/types.py @@ -77,6 +77,9 @@ class InitializeRequestParams(RequestParams): protocolVersion: str capabilities: ClientCapabilities clientInfo: Implementation + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") class ServerCapabilities(_BaseMCPModel): @@ -100,6 +103,12 @@ class ListToolsResult(_BaseMCPModel): tools: list[Tool] +class ListToolsRequestParams(_BaseMCPModel): + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") + + class TextContent(_BaseMCPModel): type: Literal["text"] text: str @@ -141,15 +150,25 @@ class InitializedNotification(MCPNotification): class ListToolsRequest(MCPRequest[ListToolsResult]): method: Literal["tools/list"] = "tools/list" - params: dict[str, Any] = {} + params: ListToolsRequestParams = Field(default_factory=ListToolsRequestParams) def get_result_model(self) -> Type[ListToolsResult]: return ListToolsResult +class MCPMeta(_BaseMCPModel): + """Metadata for MCP requests including OpenTelemetry trace context.""" + + traceparent: str | None = None + tracestate: str | None = None + + class CallToolRequestParams(_BaseMCPModel): name: str arguments: dict[str, Any] + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: MCPMeta | None = Field(default=None, serialization_alias="_meta") class CallToolRequest(MCPRequest[CallToolResult]): diff --git a/packages/toolbox-core/src/toolbox_core/sync_client.py b/packages/toolbox-core/src/toolbox_core/sync_client.py index 606c27234..8e4d8af6a 100644 --- a/packages/toolbox-core/src/toolbox_core/sync_client.py +++ b/packages/toolbox-core/src/toolbox_core/sync_client.py @@ -45,6 +45,7 @@ def __init__( protocol: Protocol = Protocol.MCP, client_name: Optional[str] = None, client_version: Optional[str] = None, + telemetry_url: Optional[str] = None, ): """ Initializes the ToolboxSyncClient. @@ -52,6 +53,12 @@ def __init__( Args: url: The base URL for the Toolbox service API (e.g., "http://localhost:5000"). client_headers: Headers to include in each request sent through this client. + protocol: The communication protocol to use. + client_name: Optional client name for identification. + client_version: Optional client version for identification. + telemetry_url: Optional OTLP endpoint URL for sending telemetry + (e.g., "http://localhost:4318"). If provided, sets up an OTLP + tracer provider to export traces to this endpoint. """ # Running a loop in a background thread allows us to support async # methods from non-async environments. @@ -71,6 +78,7 @@ async def create_client(): protocol=protocol, client_name=client_name, client_version=client_version, + telemetry_url=telemetry_url, ) self.__async_client = run_coroutine_threadsafe(