Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.13.3-slim
FROM python:3.12-slim

WORKDIR /app

Expand Down
57 changes: 57 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
FastMCP web deployment entrypoint.

Exposes the ``mcp`` variable expected by FastMCP cloud / streamable-http
hosting (entrypoint ``app.py:mcp``).

Compatible with Prefect Horizon, FastMCP Cloud, and any ASGI-based
MCP hosting platform.

Environment variables (MASSIVE_API_KEY, etc.) are read at import time so the
hosted runtime can inject them via its secrets / env configuration.
"""

import logging
import os

from dotenv import load_dotenv

load_dotenv()

from mcp_massive.server import ( # noqa: E402
mass_mcp,
configure_credentials,
shutdown_http_client,
)

logger = logging.getLogger(__name__)

# Read configuration from environment
_massive_api_key = os.environ.get("MASSIVE_API_KEY", "")
_polygon_api_key = os.environ.get("POLYGON_API_KEY", "")

if not _massive_api_key and _polygon_api_key:
_massive_api_key = _polygon_api_key

_base_url = os.environ.get("MASSIVE_API_BASE_URL", "https://api.massive.com").rstrip("/")
_llms_txt_url = os.environ.get("MASSIVE_LLMS_TXT_URL")
_max_tables = int(os.environ["MASSIVE_MAX_TABLES"]) if os.environ.get("MASSIVE_MAX_TABLES") else None
_max_rows = int(os.environ["MASSIVE_MAX_ROWS"]) if os.environ.get("MASSIVE_MAX_ROWS") else None

configure_credentials(
_massive_api_key,
_base_url,
llms_txt_url=_llms_txt_url,
max_tables=_max_tables,
max_rows=_max_rows,
)

# SECURITY: Remove secret env vars from the process after capturing them,
# without nuking system vars needed by the hosted runtime.
for _secret_key in ("MASSIVE_API_KEY", "POLYGON_API_KEY"):
os.environ.pop(_secret_key, None)

logger.info("Massive MCP server configured for web deployment (streamable-http).")

# Expose as ``mcp`` — the name FastMCP cloud expects (entrypoint: app.py:mcp)
mcp = mass_mcp
11 changes: 9 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "mcp_massive"
version = "0.8.4"
description = "A MCP server project"
version = "1.0.3"
description = "Stocks, options & indices market data via Massive.com financial data API"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
Expand All @@ -15,10 +15,17 @@ dependencies = [
"pydantic>=2.12.0",
"sqlglot>=28.10.0",
]
license = "MIT"
keywords = ["finance", "market-data", "stocks", "forex", "crypto", "options", "mcp"]
[[project.authors]]
name = "Massive"
email = "support@massive.com"

[project.urls]
Homepage = "https://massive.com"
Repository = "https://github.com/massive-com/mcp_massive"
Issues = "https://github.com/massive-com/mcp_massive/issues"

[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"
Expand Down
53 changes: 19 additions & 34 deletions src/mcp_massive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,41 +86,26 @@ def main() -> None:
max_rows=max_rows,
)

# SECURITY: Strip the environment down to only what the OS and Python
# runtime need to function (networking, DNS, SSL, temp files). All
# application values (API key, base URL, etc.) have already been
# captured into module-level variables via configure_credentials().
# SECURITY: Remove secrets from environment variables so they cannot be
# exfiltrated via user-supplied code or SQL. All values the server needs
# (API key, base URL, etc.) have already been captured into module-level
# variables above via configure_credentials().
#
# On Windows, removing SYSTEMROOT breaks Winsock DNS resolution and
# SSL entirely; removing TEMP/TMP breaks tempfile. We keep a small
# whitelist rather than clearing everything.
_keep = {
# Windows networking / OS
"SYSTEMROOT",
"SYSTEMDRIVE",
"WINDIR",
"COMSPEC",
"TEMP",
"TMP",
"USERPROFILE",
"APPDATA",
"LOCALAPPDATA",
"PROGRAMDATA",
# Unix / shared
"HOME",
"TMPDIR",
"LANG",
"LC_ALL",
"LC_CTYPE",
"PATH",
# SSL / cert resolution
"SSL_CERT_FILE",
"SSL_CERT_DIR",
"REQUESTS_CA_BUNDLE",
"CURL_CA_BUNDLE",
# We use a keep-list instead of os.environ.clear() because hosted
# runtimes (FastMCP Cloud, Prefect Horizon, ASGI servers) rely on env
# vars like PATH, HOME, SSL_CERT_FILE, PYTHONPATH, LANG, etc. for basic
# operation. Clearing them breaks SSL connections, subprocess spawning,
# and the HTTP transport itself.
_SECRETS_TO_REMOVE = {
"MASSIVE_API_KEY",
"POLYGON_API_KEY",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
"DATABASE_URL",
"SECRET_KEY",
}
for key in list(os.environ):
if key not in _keep:
del os.environ[key]
for key in _SECRETS_TO_REMOVE:
os.environ.pop(key, None)

run(transport=transport)
3 changes: 1 addition & 2 deletions src/mcp_massive/index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import os
import re
import logging
import ssl
Expand Down Expand Up @@ -408,7 +407,7 @@ async def _fetch_doc(

async def build_index(llms_txt_url: str | None = None) -> EndpointIndex:
if llms_txt_url is None:
llms_txt_url = os.environ.get("MASSIVE_LLMS_TXT_URL", _DEFAULT_LLMS_TXT_URL)
llms_txt_url = _DEFAULT_LLMS_TXT_URL
logger.info("Building endpoint index from llms.txt...")

ssl_ctx = ssl.create_default_context(cafile=certifi.where())
Expand Down
111 changes: 100 additions & 11 deletions src/mcp_massive/server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import atexit
import json
import logging
Expand Down Expand Up @@ -97,15 +98,29 @@ def _get_base_url() -> str:
return _base_url


_async_index_lock: asyncio.Lock | None = None


def _get_async_index_lock() -> asyncio.Lock:
"""Return the asyncio lock for index building, creating it lazily."""
global _async_index_lock
if _async_index_lock is None:
_async_index_lock = asyncio.Lock()
return _async_index_lock


async def _get_index() -> EndpointIndex:
global _index
with _init_lock:
# Fast path: index already built (no lock needed)
if _index is not None:
return _index
# Slow path: build index under an asyncio lock so we don't block the
# event loop the way threading.Lock would in an ASGI/web context.
async with _get_async_index_lock():
if _index is not None:
return _index
idx = await build_index(llms_txt_url=_llms_txt_url)
with _init_lock:
if _index is None:
_index = idx
idx = await build_index(llms_txt_url=_llms_txt_url)
_index = idx
return _index


Expand Down Expand Up @@ -134,14 +149,25 @@ def _get_http_client() -> httpx.AsyncClient:
global _http_client
with _init_lock:
if _http_client is None:
ssl_ctx = ssl.create_default_context(cafile=certifi.where())
_http_client = httpx.AsyncClient(timeout=30.0, verify=ssl_ctx)
atexit.register(_close_http_client)
_http_client = httpx.AsyncClient(timeout=30.0)
return _http_client


def _close_http_client() -> None:
"""Close the httpx client at process exit to release connections."""
async def shutdown_http_client() -> None:
"""Gracefully close the httpx client.

Call this from an ASGI lifespan shutdown handler or atexit.
Prefer calling from an async context (e.g. FastMCP lifespan).
"""
global _http_client
client = _http_client
if client is not None:
_http_client = None
await client.aclose()


def _close_http_client_sync() -> None:
"""Synchronous fallback for atexit — best-effort close."""
global _http_client
client = _http_client
if client is not None:
Expand Down Expand Up @@ -482,5 +508,68 @@ def run(transport: Literal["stdio", "sse", "streamable-http"] = "stdio") -> None
``_get_index()``) so the MCP protocol can start responding to
``initialize`` immediately without waiting for all doc pages to be
fetched.

If an asyncio event loop is already running (e.g. inside Prefect,
AWS Lambda, Jupyter, or similar frameworks), we run the server in a
new thread with its own event loop to avoid the
``RuntimeError: Already running asyncio in this thread`` that
``anyio.run()`` would otherwise raise.
"""
mass_mcp.run(transport)
import asyncio

# Register sync atexit fallback for CLI / stdio usage.
# For web deployments (streamable-http), prefer the async
# shutdown_http_client() called from an ASGI lifespan handler.
atexit.register(_close_http_client_sync)

try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

def _run_in_new_thread() -> None:
"""Run the async server in a dedicated thread with its own event loop."""
if transport == "stdio":
coro_fn = mass_mcp.run_stdio_async
elif transport == "sse":
coro_fn = mass_mcp.run_sse_async
elif transport == "streamable-http":
coro_fn = mass_mcp.run_streamable_http_async
else:
raise ValueError(f"Unknown transport: {transport}")

exception: BaseException | None = None

def _target() -> None:
nonlocal exception
try:
asyncio.run(coro_fn())
except BaseException as exc:
exception = exc

t = threading.Thread(target=_target, daemon=True)
t.start()
t.join()
if exception is not None:
raise exception

if loop is not None and loop.is_running():
# Already inside an event loop – run the async server in a
# dedicated thread with its own event loop.
_run_in_new_thread()
else:
try:
mass_mcp.run(transport)
except RuntimeError as exc:
if "running" in str(exc).lower() and "asyncio" in str(exc).lower():
# asyncio.get_running_loop() missed the loop (e.g. AWS Lambda,
# Prefect, or other frameworks where the loop exists but isn't
# detected from a synchronous call site). Fall back to the
# threaded approach.
logger.debug(
"Detected nested event loop after mass_mcp.run() failed; "
"retrying in a dedicated thread."
)
_run_in_new_thread()
else:
raise
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.