Skip to content
This repository was archived by the owner on Oct 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ classifiers = [
dependencies = [
"mcp>=1.12.0",
"semgrep==1.131.0",
"opentelemetry-api>=1.25.0",
"opentelemetry-sdk>=1.25.0",
]

[project.license]
Expand Down
12 changes: 9 additions & 3 deletions src/semgrep_mcp/semgrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from mcp.shared.exceptions import McpError
from mcp.types import INTERNAL_ERROR, ErrorData
from opentelemetry import trace

from semgrep_mcp.models import CodeFile
from semgrep_mcp.semgrep_interfaces.semgrep_output_v1 import CliOutput
Expand Down Expand Up @@ -140,9 +141,11 @@ class SemgrepContext:
process: asyncio.subprocess.Process
stdin: asyncio.StreamWriter
stdout: asyncio.StreamReader
top_level_span: trace.Span

def __init__(self, process: asyncio.subprocess.Process) -> None:
def __init__(self, process: asyncio.subprocess.Process, top_level_span: trace.Span) -> None:
self.process = process
self.top_level_span = top_level_span

if process.stdin is not None and process.stdout is not None:
self.stdin = process.stdin
Expand Down Expand Up @@ -203,14 +206,17 @@ async def run_semgrep(args: list[str]) -> asyncio.subprocess.Process:
return process


async def run_semgrep_daemon() -> SemgrepContext | None:
async def run_semgrep_daemon(top_level_span: trace.Span) -> SemgrepContext | None:
"""
Runs the semgrep daemon (`semgrep mcp`) if the user has the Pro Engine installed.

Returns None if the user doesn't have the Pro Engine installed.
"""
resp = await run_semgrep(["--pro", "--version"])

# wait for the command to exit so the exit code is set
await resp.communicate()

# The user doesn't seem to have the Pro Engine installed.
# That's fine, let's just run the free engine, without the
# `semgrep mcp` backend.
Expand All @@ -222,7 +228,7 @@ async def run_semgrep_daemon() -> SemgrepContext | None:
return None
else:
process = await run_semgrep(["mcp", "--pro"])
return SemgrepContext(process=process)
return SemgrepContext(process=process, top_level_span=top_level_span)


async def run_semgrep_output(args: list[str]) -> str:
Expand Down
19 changes: 11 additions & 8 deletions src/semgrep_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
set_semgrep_executable,
)
from semgrep_mcp.semgrep_interfaces.semgrep_output_v1 import CliOutput
from utilities.tracing import start_tracing, with_span

# ---------------------------------------------------------------------------------
# Constants
Expand Down Expand Up @@ -289,15 +290,16 @@ def remove_temp_dir_from_results(results: SemgrepScanResult, temp_dir: str) -> N
@asynccontextmanager
async def server_lifespan(_server: FastMCP) -> AsyncIterator[SemgrepContext | None]:
"""Manage server startup and shutdown lifecycle."""
# Initialize resources on startup
# Initialize resources on startup with tracing
# MCP requires Pro Engine
context = await run_semgrep_daemon()
with start_tracing("mcp-python-server") as span:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to call this here. Similarly to the other PR, if this start_tracing is called here, we will have a single span used for every request sent to the server. The server_lifespan is called only once, and we really want to call start_as_current_span when a new request is received.

That being said, since FastMCP is based on FastAPI, I wonder if it'd be enough to follow these docs 🤔

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Flavio, could you confirm your concern with this behavior? My impression (which I have confirmed via local testing) is that the server_lifespan manager is invoked once per session registered to the MCP. Meaning, if we run one server, we could service several clients who want to use the MCP, for instance if we had numerous editors open, or if we were in mcp.semgrep.ai and many users wanted to use the MCP.

In which case, my understanding is this code will cause one top-level span to be registered for each user's session, and each request of semgrep_scan_rpc therein will also produce a span.

I personally believe this to be the desirable behavior--do you disagree?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmh, that may not be what we want. Thikning outloud here.

The way tracing of HTTP requests is normally done is that we would have a trace per request and multiple spans as part of that trace. That way, it is possible to analyze traces independently.

If we create a span per user session, that means all subsequent calls (usages of MCP/tools/etc) for that session will be under the same trace. This may make things like comparing multiple calls to security_check, as they will all belong to the same trace.

By creating a trace per request (tool call, etc), we can forward that trace.id to semgrep CLI and have semgrep's create spans under the same trace.

TL;DR: What I'm proposing is that we should not have a root span per user session, but rather per request.

Does it make sense? Happy to discuss this further

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. but i think that having a top-level span gives us more context when analyzing the calls. with the newly added semgrep mcp command, we pull and cache rules per each session. so all the calls in the same session will be using the same set of cached rules. we might also add more features in the future where the context of the session will matter for each call. so, this case might be slightly different from HTTP requests in the sense that each request is not necessarily independent.

i also think that we won't necessarily lose anything (in fact, i think we gain info) by nesting the calls under a parent span. but happy to know what you think about this!

context = await run_semgrep_daemon(top_level_span=span)

try:
yield context
finally:
if context is not None:
context.shutdown()
try:
yield context
finally:
if context is not None:
context.shutdown()


# Create a fast MCP server
Expand Down Expand Up @@ -688,7 +690,8 @@ async def semgrep_scan_rpc(
temp_dir = None
try:
# TODO: perhaps should return more interpretable results?
cli_output = await run_semgrep_via_rpc(context, code_files)
with with_span(context.top_level_span, "semgrep_scan_rpc"):
cli_output = await run_semgrep_via_rpc(context, code_files)
return cli_output
except McpError as e:
raise e
Expand Down
80 changes: 80 additions & 0 deletions src/utilities/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env python3

import logging
import os
from collections.abc import Generator
from contextlib import contextmanager

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# coupling: these need to be kept in sync with semgrep-proprietary/tracing.py
DEFAULT_TRACE_ENDPOINT = "https://telemetry.semgrep.dev/v1/traces"
DEFAULT_DEV_ENDPOINT = "https://telemetry.dev2.semgrep.dev/v1/traces"
DEFAULT_LOCAL_ENDPOINT = "http://localhost:4318/v1/traces"

MCP_SERVICE_NAME = "mcp"


def get_trace_endpoint() -> tuple[str, str]:
"""Get the appropriate trace endpoint based on environment."""
env = os.environ.get("ENVIRONMENT", "dev").lower()

if env == "prod":
return (DEFAULT_TRACE_ENDPOINT, "prod")
elif env == "local":
return (DEFAULT_LOCAL_ENDPOINT, "local")
else:
return (DEFAULT_DEV_ENDPOINT, "dev")
Comment thread
liukatkat marked this conversation as resolved.


@contextmanager
def start_tracing(name: str) -> Generator[trace.Span, None, None]:
"""Initialize OpenTelemetry tracing."""
(endpoint, env) = get_trace_endpoint()

# Create resource with basic attributes
resource = Resource.create(
{
SERVICE_NAME: MCP_SERVICE_NAME,
DEPLOYMENT_ENVIRONMENT: env,
}
)
# Create tracer provider
provider = TracerProvider(resource=resource)

# Create OTLP exporter
exporter = OTLPSpanExporter(endpoint=endpoint)

# Create span processor
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)

# Set the global tracer provider
trace.set_tracer_provider(provider)

# Get tracer instance
tracer = trace.get_tracer(MCP_SERVICE_NAME)

with tracer.start_as_current_span(name) as span:
trace_id = trace.format_trace_id(span.get_span_context().trace_id)

logging.info("Tracing initialized")
logging.info(f"Tracing initialized with trace ID: {trace_id}")

yield span


@contextmanager
def with_span(
parent_span: trace.Span,
name: str,
) -> Generator[trace.Span, None, None]:
tracer = trace.get_tracer(MCP_SERVICE_NAME)

context = trace.set_span_in_context(parent_span)
with tracer.start_span(name, context=context) as span:
yield span
4 changes: 4 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading