From 87d14a256425ba3ed9752d864a192b38e29520dc Mon Sep 17 00:00:00 2001 From: Eric Simmerman Date: Wed, 25 Mar 2026 21:04:00 -0400 Subject: [PATCH 1/3] Replace local MCP tool implementations with FastMCP proxy to remote server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CLI's MCP server previously maintained 35 tool implementations that duplicated the Qualytics API. Since every deployment exposes its own full MCP server at {base_url}/api/mcp, the CLI now proxies that remote server over stdio transport — combining the platform's always-current native tool surface with easy local connectivity for desktop LLM clients (Claude Code, Cursor, etc.). Changes: - mcp/server.py: stripped from 828 → 62 lines; only auth_status() remains as a local tool (reads ~/.qualytics/config.yaml, not the API) - cli/mcp_cmd.py: builds a FastMCPProxy via create_proxy() backed by a StreamableHttpTransport pointed at {config_url}/mcp with bearer token auth and configurable SSL verification; auth_status injected via proxy.add_tool() - cli/auth.py: added full URL field to auth status output (was hostname-only) - pyproject.toml + uv.lock: added httpx>=0.27.0 as explicit dependency - tests/test_mcp.py: removed 34 now-remote tool tests; added proxy URL/auth construction test and unauthenticated exit path test Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 1 + qualytics/cli/auth.py | 1 + qualytics/cli/mcp_cmd.py | 36 +- qualytics/mcp/server.py | 764 +-------------------------------------- tests/test_mcp.py | 306 ++-------------- uv.lock | 2 + 6 files changed, 77 insertions(+), 1033 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 78e9b7e..c81113e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "urllib3>=2.0.0", "openpyxl>=3.1.0", "fastmcp>=3.0.2", + "httpx>=0.27.0", ] [project.urls] diff --git a/qualytics/cli/auth.py b/qualytics/cli/auth.py index c53eae6..5b95943 100644 --- a/qualytics/cli/auth.py +++ b/qualytics/cli/auth.py @@ -184,6 +184,7 @@ def auth_status(): ssl_label = "[green]enabled[/green]" if ssl_verify else "[yellow]disabled[/yellow]" print(f"[bold]{host}[/bold]") + print(f" URL: {url}") print(f" Status: {status_icon}") print(f" Token: {masked_token}") if expiry_line: diff --git a/qualytics/cli/mcp_cmd.py b/qualytics/cli/mcp_cmd.py index 6d7afd0..1ab2a52 100644 --- a/qualytics/cli/mcp_cmd.py +++ b/qualytics/cli/mcp_cmd.py @@ -1,8 +1,14 @@ """CLI command to start the Qualytics MCP server.""" +import httpx import typer +from fastmcp.client import Client +from fastmcp.client.transports import StreamableHttpTransport +from fastmcp.server import create_proxy from . import add_suggestion_callback +from ..config import load_config, CONFIG_PATH +from ..mcp.server import auth_status mcp_app = typer.Typer(name="mcp", help="Start an MCP server for LLM integrations") add_suggestion_callback(mcp_app, "mcp") @@ -30,6 +36,10 @@ def mcp_serve( ): """Start the Qualytics MCP server for Claude Code, Cursor, and other LLM tools. + Proxies the full Qualytics MCP tool set from the remote deployment over a + local stdio transport, combining the platform's native tool surface with + easy connectivity for desktop LLM clients. + STDIO transport (default) is used by Claude Code and Cursor. Streamable-HTTP transport is available for network-accessible deployments. @@ -44,9 +54,29 @@ def mcp_serve( } } """ - from ..mcp.server import mcp + config = load_config() + if config is None: + raise typer.BadParameter( + f"Not authenticated. Run 'qualytics auth login' or " + f"'qualytics auth init'. Config expected at {CONFIG_PATH}", + param_hint="authentication", + ) + + url = config.get("url", "").rstrip("/") + token = config.get("token", "") + ssl_verify = config.get("ssl_verify", True) + + remote_transport = StreamableHttpTransport( + url=f"{url}/mcp", + auth=token, + httpx_client_factory=lambda **kwargs: httpx.AsyncClient( + verify=ssl_verify, **kwargs + ), + ) + proxy = create_proxy(Client(remote_transport), name="Qualytics") + proxy.add_tool(auth_status) if transport in ("streamable-http", "http", "sse"): - mcp.run(transport="streamable-http", host=host, port=port) + proxy.run(transport="streamable-http", host=host, port=port) else: - mcp.run(transport="stdio") + proxy.run(transport="stdio") diff --git a/qualytics/mcp/server.py b/qualytics/mcp/server.py index 0f86633..c7c9f07 100644 --- a/qualytics/mcp/server.py +++ b/qualytics/mcp/server.py @@ -1,4 +1,4 @@ -"""Qualytics MCP server — exposes CLI operations as structured tools for LLMs.""" +"""Qualytics MCP — local tools exposed alongside the remote proxy.""" from __future__ import annotations @@ -6,49 +6,11 @@ from urllib.parse import urlparse import jwt -from fastmcp import FastMCP from fastmcp.exceptions import ToolError -from ..api.client import get_client, QualyticsAPIError from ..config import load_config, CONFIG_PATH -mcp = FastMCP( - name="Qualytics", - instructions=( - "Qualytics is a data quality platform. Use these tools to manage " - "connections, datastores, containers, quality checks, anomalies, " - "and operations. Always call auth_status first to verify " - "the CLI is configured before calling other tools." - ), -) - -# ── helpers ────────────────────────────────────────────────────────────── - - -def _client(): - """Get an authenticated QualyticsClient, raising ToolError on failure.""" - try: - return get_client() - except SystemExit: - raise ToolError( - "Not authenticated. Run 'qualytics auth login --url ' " - "or 'qualytics auth init --url --token ' first." - ) - - -def _api_call(fn, *args, **kwargs): - """Call an API function, converting QualyticsAPIError to ToolError.""" - try: - return fn(*args, **kwargs) - except QualyticsAPIError as e: - raise ToolError(f"API error {e.status_code}: {e.message}") - - -# ── auth ───────────────────────────────────────────────────────────────── - - -@mcp.tool def auth_status() -> dict: """Show current Qualytics CLI authentication status. @@ -101,727 +63,3 @@ def auth_status() -> dict: result["token_decode_error"] = True return result - - -# ── connections ────────────────────────────────────────────────────────── - - -@mcp.tool -def list_connections( - name: str | None = None, - type: str | None = None, -) -> list[dict]: - """List all connections, optionally filtered by name or type.""" - from ..api.connections import list_all_connections - - client = _client() - type_list = [t.strip() for t in type.split(",")] if type else None - return _api_call(list_all_connections, client, name=name, connection_type=type_list) - - -@mcp.tool -def get_connection( - id: int | None = None, - name: str | None = None, -) -> dict: - """Get a connection by ID or name. Provide exactly one.""" - from ..services.connections import get_connection_by - - if id is None and name is None: - raise ToolError("Provide either 'id' or 'name'.") - client = _client() - result = _api_call( - get_connection_by, client, connection_id=id, connection_name=name - ) - if result is None: - raise ToolError(f"Connection not found: id={id}, name={name}") - return result - - -@mcp.tool -def create_connection( - type: str, - name: str, - host: str | None = None, - port: int | None = None, - username: str | None = None, - password: str | None = None, - uri: str | None = None, - database: str | None = None, - parameters: dict | None = None, -) -> dict: - """Create a new connection. Type examples: postgresql, snowflake, bigquery, mysql, etc.""" - from ..api.connections import create_connection as api_create - from ..services.connections import build_create_connection_payload - - payload = build_create_connection_payload( - type, - name=name, - host=host, - port=port, - username=username, - password=password, - uri=uri, - parameters=parameters, - ) - client = _client() - return _api_call(api_create, client, payload) - - -@mcp.tool -def delete_connection(id: int) -> dict: - """Delete a connection by ID.""" - from ..api.connections import delete_connection as api_delete - - client = _client() - return _api_call(api_delete, client, id) - - -@mcp.tool -def test_connection(id: int) -> dict: - """Test connectivity for an existing connection.""" - from ..api.connections import test_connection as api_test - - client = _client() - return _api_call(api_test, client, id) - - -# ── datastores ─────────────────────────────────────────────────────────── - - -@mcp.tool -def list_datastores( - name: str | None = None, - type: str | None = None, - tag: str | None = None, -) -> list[dict]: - """List all datastores, optionally filtered by name, type, or tag.""" - from ..api.datastores import list_all_datastores - - client = _client() - type_list = [t.strip() for t in type.split(",")] if type else None - return _api_call( - list_all_datastores, - client, - name=name, - datastore_type=type_list, - tag=tag, - ) - - -@mcp.tool -def get_datastore( - id: int | None = None, - name: str | None = None, -) -> dict: - """Get a datastore by ID or name. Provide exactly one.""" - from ..services.datastores import get_datastore_by - - if id is None and name is None: - raise ToolError("Provide either 'id' or 'name'.") - client = _client() - result = _api_call(get_datastore_by, client, datastore_id=id, datastore_name=name) - if result is None: - raise ToolError(f"Datastore not found: id={id}, name={name}") - return result - - -@mcp.tool -def create_datastore( - name: str, - connection_id: int, - database: str, - schema: str, - tags: list[str] | None = None, - teams: list[str] | None = None, - trigger_sync: bool = True, -) -> dict: - """Create a new datastore linked to an existing connection.""" - from ..api.datastores import create_datastore as api_create - from ..services.datastores import build_create_datastore_payload - - payload = build_create_datastore_payload( - name=name, - connection_id=connection_id, - database=database, - schema=schema, - tags=tags, - teams=teams, - trigger_catalog=trigger_sync, - ) - client = _client() - return _api_call(api_create, client, payload) - - -@mcp.tool -def delete_datastore(id: int) -> dict: - """Delete a datastore by ID.""" - from ..api.datastores import delete_datastore as api_delete - - client = _client() - return _api_call(api_delete, client, id) - - -@mcp.tool -def verify_datastore_connection(datastore_id: int) -> dict: - """Verify the database connection for a datastore. Returns {connected, message}.""" - from ..api.datastores import verify_connection - - client = _client() - return _api_call(verify_connection, client, datastore_id) - - -@mcp.tool -def link_enrichment(datastore_id: int, enrichment_id: int) -> dict: - """Link an enrichment datastore to a source datastore.""" - from ..api.datastores import connect_enrichment - - client = _client() - return _api_call(connect_enrichment, client, datastore_id, enrichment_id) - - -@mcp.tool -def unlink_enrichment(datastore_id: int) -> dict: - """Unlink the enrichment datastore from a source datastore.""" - from ..api.datastores import disconnect_enrichment - - client = _client() - return _api_call(disconnect_enrichment, client, datastore_id) - - -# ── containers ─────────────────────────────────────────────────────────── - - -@mcp.tool -def list_containers( - datastore_id: int, - type: str | None = None, - name: str | None = None, - tag: str | None = None, - search: str | None = None, -) -> list[dict]: - """List containers for a datastore. Filter by type (table, view, computed_table, etc.), name, tag, or search string.""" - from ..api.containers import list_all_containers - - client = _client() - type_list = [t.strip() for t in type.split(",")] if type else None - tag_list = [tag] if tag else None - return _api_call( - list_all_containers, - client, - datastore=[datastore_id], - container_type=type_list, - name=name, - tag=tag_list, - search=search, - ) - - -@mcp.tool -def get_container(id: int) -> dict: - """Get a container by ID.""" - from ..api.containers import get_container as api_get - - client = _client() - return _api_call(api_get, client, id) - - -@mcp.tool -def get_field_profiles(container_id: int) -> dict: - """Get field profiles (column metadata) for a container.""" - from ..api.containers import get_field_profiles as api_get_profiles - - client = _client() - return _api_call(api_get_profiles, client, container_id) - - -@mcp.tool -def create_container( - container_type: str, - name: str, - datastore_id: int | None = None, - query: str | None = None, - source_container_id: int | None = None, - select_clause: str | None = None, - where_clause: str | None = None, - group_by_clause: str | None = None, - left_container_id: int | None = None, - right_container_id: int | None = None, - left_key_field: str | None = None, - right_key_field: str | None = None, - join_type: str | None = None, - description: str | None = None, - tags: list[str] | None = None, -) -> dict: - """Create a computed container. - - container_type must be one of: computed_table, computed_file, computed_join. - - computed_table: requires datastore_id, name, query - - computed_file: requires datastore_id, name, source_container_id, select_clause - - computed_join: requires name, left_container_id, right_container_id, left_key_field, right_key_field, select_clause - """ - from ..api.containers import create_container as api_create - from ..services.containers import build_create_container_payload - - try: - payload = build_create_container_payload( - container_type, - datastore_id=datastore_id, - name=name, - query=query, - source_container_id=source_container_id, - select_clause=select_clause, - where_clause=where_clause, - group_by_clause=group_by_clause, - left_container_id=left_container_id, - right_container_id=right_container_id, - left_key_field=left_key_field, - right_key_field=right_key_field, - join_type=join_type, - description=description, - tags=tags, - ) - except ValueError as e: - raise ToolError(str(e)) - - client = _client() - return _api_call(api_create, client, payload) - - -@mcp.tool -def delete_container(id: int) -> dict: - """Delete a container by ID. Cascades to fields, checks, and anomalies.""" - from ..api.containers import delete_container as api_delete - - client = _client() - return _api_call(api_delete, client, id) - - -@mcp.tool -def validate_container( - container_type: str, - name: str = "validation_test", - datastore_id: int | None = None, - query: str | None = None, - source_container_id: int | None = None, - select_clause: str | None = None, - left_container_id: int | None = None, - right_container_id: int | None = None, - left_key_field: str | None = None, - right_key_field: str | None = None, - join_type: str | None = None, -) -> dict: - """Validate a computed container definition without creating it (dry-run).""" - from ..api.containers import validate_container as api_validate - from ..services.containers import build_create_container_payload - - try: - payload = build_create_container_payload( - container_type, - datastore_id=datastore_id, - name=name, - query=query, - source_container_id=source_container_id, - select_clause=select_clause, - left_container_id=left_container_id, - right_container_id=right_container_id, - left_key_field=left_key_field, - right_key_field=right_key_field, - join_type=join_type, - ) - except ValueError as e: - raise ToolError(str(e)) - - client = _client() - return _api_call(api_validate, client, payload) - - -# ── quality checks ─────────────────────────────────────────────────────── - - -@mcp.tool -def list_checks( - datastore_id: int, - container_id: int | None = None, - tag: str | None = None, - status: str | None = None, -) -> list[dict]: - """List quality checks for a datastore. Filter by container, tag, or status.""" - from ..api.quality_checks import list_all_quality_checks - - client = _client() - containers = [container_id] if container_id else None - tags = [tag] if tag else None - return _api_call( - list_all_quality_checks, - client, - datastore_id, - containers=containers, - tags=tags, - status=status, - ) - - -@mcp.tool -def get_check(check_id: int) -> dict: - """Get a quality check by ID.""" - from ..api.quality_checks import get_quality_check - - client = _client() - return _api_call(get_quality_check, client, check_id) - - -@mcp.tool -def create_check(payload: dict) -> dict: - """Create a quality check from a payload dict. - - Required fields: container_id, rule (rule type), fields (list of field names). - Optional: description, coverage, filter, properties, tags, status. - """ - from ..api.quality_checks import create_quality_check - - client = _client() - return _api_call(create_quality_check, client, payload) - - -@mcp.tool -def update_check(check_id: int, payload: dict) -> dict: - """Update a quality check. Provide the full updated check payload.""" - from ..api.quality_checks import update_quality_check - - client = _client() - return _api_call(update_quality_check, client, check_id, payload) - - -@mcp.tool -def delete_check(check_id: int) -> None: - """Delete (archive) a quality check by ID.""" - from ..api.quality_checks import delete_quality_check - - client = _client() - _api_call(delete_quality_check, client, check_id) - - -@mcp.tool -def export_checks(datastore_id: int, output_dir: str) -> dict: - """Export quality checks to a directory (one YAML per check, organized by container). - - Returns {exported: count, containers: count}. - """ - from ..api.quality_checks import list_all_quality_checks - from ..services.quality_checks import export_checks_to_directory - - client = _client() - checks = _api_call(list_all_quality_checks, client, datastore_id) - return export_checks_to_directory(checks, output_dir) - - -@mcp.tool -def import_checks( - datastore_id: int, - input_dir: str, - dry_run: bool = False, -) -> dict: - """Import quality checks from a directory with upsert (create or update). - - Returns {created, updated, failed, errors}. - """ - from ..services.quality_checks import ( - import_checks_to_datastore, - load_checks_from_directory, - ) - - client = _client() - checks = load_checks_from_directory(input_dir) - return _api_call( - import_checks_to_datastore, client, datastore_id, checks, dry_run=dry_run - ) - - -# ── anomalies ──────────────────────────────────────────────────────────── - - -@mcp.tool -def list_anomalies( - datastore_id: int | None = None, - container_id: int | None = None, - check_id: int | None = None, - status: str | None = None, - type: str | None = None, - tag: str | None = None, - start_date: str | None = None, - end_date: str | None = None, -) -> list[dict]: - """List anomalies with optional filters. - - Status: Active, Acknowledged, Resolved, Invalid, Duplicate, Discarded. - Type: shape, record (anomaly_type). - """ - from ..api.anomalies import list_all_anomalies - - client = _client() - tag_list = [tag] if tag else None - return _api_call( - list_all_anomalies, - client, - datastore=datastore_id, - container=container_id, - quality_check=check_id, - status=status, - anomaly_type=type, - tag=tag_list, - start_date=start_date, - end_date=end_date, - ) - - -@mcp.tool -def get_anomaly(id: int) -> dict: - """Get a single anomaly by ID.""" - from ..api.anomalies import get_anomaly as api_get - - client = _client() - return _api_call(api_get, client, id) - - -@mcp.tool -def update_anomaly( - id: int, - status: str, -) -> dict: - """Update anomaly status. Status must be Active or Acknowledged.""" - from ..api.anomalies import update_anomaly as api_update - - if status not in ("Active", "Acknowledged"): - raise ToolError( - f"Status must be 'Active' or 'Acknowledged', got '{status}'. " - "Use archive_anomaly for Resolved/Invalid/Duplicate/Discarded." - ) - client = _client() - return _api_call(api_update, client, id, {"status": status}) - - -@mcp.tool -def archive_anomaly( - id: int, - status: str = "Resolved", -) -> None: - """Archive (soft-delete) an anomaly. Status: Resolved, Invalid, Duplicate, Discarded.""" - from ..api.anomalies import delete_anomaly as api_delete - - valid = {"Resolved", "Invalid", "Duplicate", "Discarded"} - if status not in valid: - raise ToolError(f"Status must be one of {valid}, got '{status}'.") - client = _client() - _api_call(api_delete, client, id, archive=True, status=status) - - -@mcp.tool -def delete_anomaly(id: int) -> None: - """Permanently delete an anomaly (hard delete, cannot be undone).""" - from ..api.anomalies import delete_anomaly as api_delete - - client = _client() - _api_call(api_delete, client, id, archive=False) - - -# ── operations ─────────────────────────────────────────────────────────── - - -@mcp.tool -def run_sync( - datastore_ids: list[int], - prune: bool = False, - recreate: bool = False, -) -> list[dict]: - """Trigger a sync operation to discover containers in datastores. - - Returns the operation details for each datastore. Use get_operation to check progress. - """ - from ..api.operations import run_operation - - client = _client() - results = [] - for ds_id in datastore_ids: - payload = { - "type": "sync", - "datastore_id": ds_id, - "prune": prune, - "recreate": recreate, - } - results.append(_api_call(run_operation, client, payload)) - return results - - -@mcp.tool -def run_profile( - datastore_ids: list[int], - container_names: list[str] | None = None, - container_tags: list[str] | None = None, - max_records_analyzed_per_partition: int | None = None, -) -> list[dict]: - """Trigger a profile operation to infer quality checks. - - Returns the operation details for each datastore. Use get_operation to check progress. - """ - from ..api.operations import run_operation - - client = _client() - results = [] - for ds_id in datastore_ids: - payload: dict = { - "type": "profile", - "datastore_id": ds_id, - } - if container_names: - payload["container_names"] = container_names - if container_tags: - payload["container_tags"] = container_tags - if max_records_analyzed_per_partition is not None: - payload["max_records_analyzed_per_partition"] = ( - max_records_analyzed_per_partition - ) - results.append(_api_call(run_operation, client, payload)) - return results - - -@mcp.tool -def run_scan( - datastore_ids: list[int], - container_names: list[str] | None = None, - container_tags: list[str] | None = None, - incremental: bool | None = None, - max_records_analyzed_per_partition: int | None = None, -) -> list[dict]: - """Trigger a scan operation to detect anomalies. - - Returns the operation details for each datastore. Use get_operation to check progress. - """ - from ..api.operations import run_operation - - client = _client() - results = [] - for ds_id in datastore_ids: - payload: dict = { - "type": "scan", - "datastore_id": ds_id, - } - if container_names: - payload["container_names"] = container_names - if container_tags: - payload["container_tags"] = container_tags - if incremental is not None: - payload["incremental"] = incremental - if max_records_analyzed_per_partition is not None: - payload["max_records_analyzed_per_partition"] = ( - max_records_analyzed_per_partition - ) - results.append(_api_call(run_operation, client, payload)) - return results - - -@mcp.tool -def run_materialize( - datastore_ids: list[int], - container_names: list[str] | None = None, - container_tags: list[str] | None = None, -) -> list[dict]: - """Trigger a materialize operation for computed containers. - - Returns the operation details for each datastore. Use get_operation to check progress. - """ - from ..api.operations import run_operation - - client = _client() - results = [] - for ds_id in datastore_ids: - payload: dict = { - "type": "materialize", - "datastore_id": ds_id, - } - if container_names: - payload["container_names"] = container_names - if container_tags: - payload["container_tags"] = container_tags - results.append(_api_call(run_operation, client, payload)) - return results - - -@mcp.tool -def get_operation(operation_id: int) -> dict: - """Get operation details including progress counters.""" - from ..api.operations import get_operation as api_get - - client = _client() - return _api_call(api_get, client, operation_id) - - -@mcp.tool -def list_operations( - datastore_id: int | None = None, - type: str | None = None, - status: str | None = None, -) -> list[dict]: - """List operations, optionally filtered by datastore, type, or result status.""" - from ..api.operations import list_all_operations - - client = _client() - ds_list = [datastore_id] if datastore_id else None - result_list = [status] if status else None - return _api_call( - list_all_operations, - client, - datastore=ds_list, - operation_type=type, - result=result_list, - ) - - -@mcp.tool -def abort_operation(operation_id: int) -> dict: - """Abort a running operation (best-effort).""" - from ..api.operations import abort_operation as api_abort - - client = _client() - return _api_call(api_abort, client, operation_id) - - -# ── config export/import ───────────────────────────────────────────────── - - -@mcp.tool -def export_config( - datastore_ids: list[int], - output_dir: str = "qualytics-export", - include: list[str] | None = None, -) -> dict: - """Export Qualytics configuration as hierarchical YAML for git tracking. - - Exports connections, datastores, containers, and checks. - Use 'include' to limit: ["connections", "datastores", "containers", "checks"]. - """ - from ..services.export_import import export_config as svc_export - - client = _client() - include_set = set(include) if include else None - return _api_call(svc_export, client, datastore_ids, output_dir, include=include_set) - - -@mcp.tool -def import_config( - input_dir: str, - dry_run: bool = False, - include: list[str] | None = None, -) -> dict: - """Import Qualytics configuration from a YAML directory with upsert. - - Follows dependency order: connections → datastores → containers → checks. - Use dry_run=True to preview without making changes. - """ - from ..services.export_import import import_config as svc_import - - client = _client() - include_set = set(include) if include else None - return _api_call( - svc_import, client, input_dir, dry_run=dry_run, include=include_set - ) diff --git a/tests/test_mcp.py b/tests/test_mcp.py index 2961041..2047bb6 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -1,4 +1,4 @@ -"""Tests for the Qualytics MCP server tools.""" +"""Tests for the Qualytics MCP server.""" import time from unittest.mock import patch, MagicMock @@ -7,81 +7,18 @@ import pytest from fastmcp.exceptions import ToolError -from qualytics.mcp.server import ( - mcp, - _client, - _api_call, - auth_status, - list_connections, - get_connection, - list_datastores, - get_datastore, - list_containers, - get_container, - list_checks, - get_check, - list_anomalies, - get_anomaly, - update_anomaly, - archive_anomaly, - get_operation, - list_operations, -) -from qualytics.api.client import QualyticsAPIError +from qualytics.mcp.server import auth_status -# ── server instance tests ──────────────────────────────────────────────── +# ── server module tests ────────────────────────────────────────────────── class TestMCPServer: - """Tests for the MCP server instance.""" + """Tests for the MCP server module.""" - def test_server_exists(self): - """Test that the MCP server is created.""" - assert mcp is not None - assert mcp.name == "Qualytics" - - def test_server_has_tools(self): - """Test that tools are registered on the server.""" - # The server should have tool functions registered - assert mcp is not None - # Verify at least one known tool function is importable + def test_auth_status_is_callable(self): + """Test that auth_status is exported as a callable.""" assert callable(auth_status) - assert callable(list_datastores) - - -# ── helper tests ───────────────────────────────────────────────────────── - - -class TestHelpers: - """Tests for _client and _api_call helpers.""" - - @patch("qualytics.mcp.server.get_client") - def test_client_returns_client(self, mock_get): - """Test that _client returns the client when configured.""" - mock_get.return_value = MagicMock() - result = _client() - assert result is not None - - @patch("qualytics.mcp.server.get_client", side_effect=SystemExit(1)) - def test_client_raises_tool_error_on_system_exit(self, mock_get): - """Test that _client converts SystemExit to ToolError.""" - with pytest.raises(ToolError, match="Not authenticated"): - _client() - - def test_api_call_success(self): - """Test that _api_call passes through successful results.""" - result = _api_call(lambda: {"id": 1}) - assert result == {"id": 1} - - def test_api_call_converts_api_error(self): - """Test that _api_call converts QualyticsAPIError to ToolError.""" - - def failing_fn(): - raise QualyticsAPIError(404, "Not found", "http://example.com") - - with pytest.raises(ToolError, match="API error 404"): - _api_call(failing_fn) # ── auth tool tests ────────────────────────────────────────────────────── @@ -134,204 +71,7 @@ def test_expired_token_detected(self): assert result["token_expired"] is True -# ── connection tool tests ──────────────────────────────────────────────── - - -class TestConnectionTools: - """Tests for connection MCP tools.""" - - @patch("qualytics.mcp.server.get_client") - def test_list_connections(self, mock_get_client): - """Test list_connections tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = [{"id": 1, "name": "pg"}] - - with patch( - "qualytics.api.connections.list_all_connections", return_value=expected - ): - result = list_connections() - assert result == expected - - @patch("qualytics.mcp.server.get_client") - def test_get_connection_requires_id_or_name(self, mock_get_client): - """Test that get_connection requires either id or name.""" - with pytest.raises(ToolError, match="Provide either"): - get_connection() - - @patch("qualytics.mcp.server.get_client") - def test_get_connection_not_found(self, mock_get_client): - """Test that get_connection raises error when not found.""" - mock_get_client.return_value = MagicMock() - with patch( - "qualytics.services.connections.get_connection_by", return_value=None - ): - with pytest.raises(ToolError, match="Connection not found"): - get_connection(name="nonexistent") - - -# ── datastore tool tests ──────────────────────────────────────────────── - - -class TestDatastoreTools: - """Tests for datastore MCP tools.""" - - @patch("qualytics.mcp.server.get_client") - def test_list_datastores(self, mock_get_client): - """Test list_datastores tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = [{"id": 1, "name": "warehouse"}] - - with patch( - "qualytics.api.datastores.list_all_datastores", return_value=expected - ): - result = list_datastores() - assert result == expected - - @patch("qualytics.mcp.server.get_client") - def test_get_datastore_requires_id_or_name(self, mock_get_client): - """Test that get_datastore requires either id or name.""" - with pytest.raises(ToolError, match="Provide either"): - get_datastore() - - -# ── container tool tests ───────────────────────────────────────────────── - - -class TestContainerTools: - """Tests for container MCP tools.""" - - @patch("qualytics.mcp.server.get_client") - def test_list_containers(self, mock_get_client): - """Test list_containers tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = [{"id": 1, "name": "orders", "container_type": "table"}] - - with patch( - "qualytics.api.containers.list_all_containers", return_value=expected - ): - result = list_containers(datastore_id=1) - assert result == expected - - @patch("qualytics.mcp.server.get_client") - def test_get_container(self, mock_get_client): - """Test get_container tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = {"id": 42, "name": "orders"} - - with patch("qualytics.api.containers.get_container", return_value=expected): - result = get_container(id=42) - assert result == expected - - -# ── quality check tool tests ───────────────────────────────────────────── - - -class TestCheckTools: - """Tests for quality check MCP tools.""" - - @patch("qualytics.mcp.server.get_client") - def test_list_checks(self, mock_get_client): - """Test list_checks tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = [{"id": 1, "rule": "notNull"}] - - with patch( - "qualytics.api.quality_checks.list_all_quality_checks", - return_value=expected, - ): - result = list_checks(datastore_id=1) - assert result == expected - - @patch("qualytics.mcp.server.get_client") - def test_get_check(self, mock_get_client): - """Test get_check tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = {"id": 10, "rule": "between"} - - with patch( - "qualytics.api.quality_checks.get_quality_check", return_value=expected - ): - result = get_check(check_id=10) - assert result == expected - - -# ── anomaly tool tests ─────────────────────────────────────────────────── - - -class TestAnomalyTools: - """Tests for anomaly MCP tools.""" - - @patch("qualytics.mcp.server.get_client") - def test_list_anomalies(self, mock_get_client): - """Test list_anomalies tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = [{"id": 1, "status": "Active"}] - - with patch("qualytics.api.anomalies.list_all_anomalies", return_value=expected): - result = list_anomalies(datastore_id=1) - assert result == expected - - @patch("qualytics.mcp.server.get_client") - def test_get_anomaly(self, mock_get_client): - """Test get_anomaly tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = {"id": 5, "status": "Active"} - - with patch("qualytics.api.anomalies.get_anomaly", return_value=expected): - result = get_anomaly(id=5) - assert result == expected - - def test_update_anomaly_rejects_archive_status(self): - """Test that update_anomaly rejects archive statuses.""" - with pytest.raises(ToolError, match="Active.*Acknowledged"): - update_anomaly(id=1, status="Resolved") - - def test_archive_anomaly_rejects_open_status(self): - """Test that archive_anomaly rejects open statuses.""" - with pytest.raises(ToolError, match="must be one of"): - archive_anomaly(id=1, status="Active") - - -# ── operation tool tests ───────────────────────────────────────────────── - - -class TestOperationTools: - """Tests for operation MCP tools.""" - - @patch("qualytics.mcp.server.get_client") - def test_get_operation(self, mock_get_client): - """Test get_operation tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = {"id": 99, "type": "scan", "result": "success"} - - with patch("qualytics.api.operations.get_operation", return_value=expected): - result = get_operation(operation_id=99) - assert result == expected - - @patch("qualytics.mcp.server.get_client") - def test_list_operations(self, mock_get_client): - """Test list_operations tool.""" - mock_client = MagicMock() - mock_get_client.return_value = mock_client - expected = [{"id": 1, "type": "sync"}] - - with patch( - "qualytics.api.operations.list_all_operations", return_value=expected - ): - result = list_operations() - assert result == expected - - -# ── CLI command test ───────────────────────────────────────────────────── +# ── CLI command tests ───────────────────────────────────────────────────── class TestMCPCommand: @@ -353,3 +93,35 @@ def test_mcp_serve_help(self, cli_runner): assert result.exit_code == 0 assert "stdio" in result.output.lower() assert "claude" in result.output.lower() + + def test_mcp_serve_exits_when_not_authenticated(self, cli_runner): + """Test that mcp serve fails gracefully when no config exists.""" + from qualytics.qualytics import app + + with patch("qualytics.cli.mcp_cmd.load_config", return_value=None): + result = cli_runner.invoke(app, ["mcp", "serve"]) + assert result.exit_code != 0 + + def test_mcp_serve_builds_proxy_with_correct_url(self, cli_runner): + """Test that mcp serve constructs the proxy URL from config.""" + import jwt as _jwt + + token = _jwt.encode({"sub": "u"}, key="", algorithm="HS256") + config = {"url": "https://demo.qualytics.io/api", "token": token, "ssl_verify": True} + + with patch("qualytics.cli.mcp_cmd.load_config", return_value=config): + with patch("qualytics.cli.mcp_cmd.create_proxy") as mock_proxy: + with patch("qualytics.cli.mcp_cmd.Client"): + with patch("qualytics.cli.mcp_cmd.StreamableHttpTransport") as mock_transport: + mock_server = MagicMock() + mock_proxy.return_value = mock_server + + cli_runner.invoke( + __import__("qualytics.qualytics", fromlist=["app"]).app, + ["mcp", "serve"], + ) + + mock_transport.assert_called_once() + call_kwargs = mock_transport.call_args + assert call_kwargs.kwargs["url"] == "https://demo.qualytics.io/api/mcp" # config url already includes /api + assert call_kwargs.kwargs["auth"] == token diff --git a/uv.lock b/uv.lock index 66a73ab..503b0a7 100644 --- a/uv.lock +++ b/uv.lock @@ -1394,6 +1394,7 @@ dependencies = [ { name = "click" }, { name = "croniter" }, { name = "fastmcp" }, + { name = "httpx" }, { name = "openpyxl" }, { name = "pyjwt" }, { name = "python-dotenv" }, @@ -1419,6 +1420,7 @@ requires-dist = [ { name = "click", specifier = ">=8.0.0" }, { name = "croniter", specifier = ">=1.3.0" }, { name = "fastmcp", specifier = ">=3.0.2" }, + { name = "httpx", specifier = ">=0.27.0" }, { name = "openpyxl", specifier = ">=3.1.0" }, { name = "pyjwt", specifier = ">=2.6.0" }, { name = "python-dotenv", specifier = ">=1.0.0" }, From 57ce522ab1df985929b8f8368b7ee268e558c747 Mon Sep 17 00:00:00 2001 From: Eric Simmerman Date: Wed, 25 Mar 2026 21:11:49 -0400 Subject: [PATCH 2/3] Fix ruff formatting in tests/test_mcp.py Co-Authored-By: Claude Sonnet 4.6 --- tests/test_mcp.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/test_mcp.py b/tests/test_mcp.py index 2047bb6..6742ebd 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -107,12 +107,18 @@ def test_mcp_serve_builds_proxy_with_correct_url(self, cli_runner): import jwt as _jwt token = _jwt.encode({"sub": "u"}, key="", algorithm="HS256") - config = {"url": "https://demo.qualytics.io/api", "token": token, "ssl_verify": True} + config = { + "url": "https://demo.qualytics.io/api", + "token": token, + "ssl_verify": True, + } with patch("qualytics.cli.mcp_cmd.load_config", return_value=config): with patch("qualytics.cli.mcp_cmd.create_proxy") as mock_proxy: with patch("qualytics.cli.mcp_cmd.Client"): - with patch("qualytics.cli.mcp_cmd.StreamableHttpTransport") as mock_transport: + with patch( + "qualytics.cli.mcp_cmd.StreamableHttpTransport" + ) as mock_transport: mock_server = MagicMock() mock_proxy.return_value = mock_server @@ -123,5 +129,8 @@ def test_mcp_serve_builds_proxy_with_correct_url(self, cli_runner): mock_transport.assert_called_once() call_kwargs = mock_transport.call_args - assert call_kwargs.kwargs["url"] == "https://demo.qualytics.io/api/mcp" # config url already includes /api + assert ( + call_kwargs.kwargs["url"] + == "https://demo.qualytics.io/api/mcp" + ) # config url already includes /api assert call_kwargs.kwargs["auth"] == token From b9b0014ed5a50773132819f0714482ae3e0f96ed Mon Sep 17 00:00:00 2001 From: josecsotomorales Date: Thu, 26 Mar 2026 13:43:45 -0400 Subject: [PATCH 3/3] Address pr review --- qualytics/cli/mcp_cmd.py | 7 +++++ tests/test_mcp.py | 57 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/qualytics/cli/mcp_cmd.py b/qualytics/cli/mcp_cmd.py index 1ab2a52..1259a30 100644 --- a/qualytics/cli/mcp_cmd.py +++ b/qualytics/cli/mcp_cmd.py @@ -66,6 +66,13 @@ def mcp_serve( token = config.get("token", "") ssl_verify = config.get("ssl_verify", True) + if not url or not token: + raise typer.BadParameter( + f"Config at {CONFIG_PATH} is missing 'url' or 'token'. " + f"Run 'qualytics auth login' to re-authenticate.", + param_hint="authentication", + ) + remote_transport = StreamableHttpTransport( url=f"{url}/mcp", auth=token, diff --git a/tests/test_mcp.py b/tests/test_mcp.py index 6742ebd..74591e0 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -102,6 +102,28 @@ def test_mcp_serve_exits_when_not_authenticated(self, cli_runner): result = cli_runner.invoke(app, ["mcp", "serve"]) assert result.exit_code != 0 + def test_mcp_serve_exits_when_url_empty(self, cli_runner): + """Test that mcp serve fails when config url is empty.""" + from qualytics.qualytics import app + + config = {"url": "", "token": "some-token", "ssl_verify": True} + with patch("qualytics.cli.mcp_cmd.load_config", return_value=config): + result = cli_runner.invoke(app, ["mcp", "serve"]) + assert result.exit_code != 0 + + def test_mcp_serve_exits_when_token_empty(self, cli_runner): + """Test that mcp serve fails when config token is empty.""" + from qualytics.qualytics import app + + config = { + "url": "https://demo.qualytics.io/api", + "token": "", + "ssl_verify": True, + } + with patch("qualytics.cli.mcp_cmd.load_config", return_value=config): + result = cli_runner.invoke(app, ["mcp", "serve"]) + assert result.exit_code != 0 + def test_mcp_serve_builds_proxy_with_correct_url(self, cli_runner): """Test that mcp serve constructs the proxy URL from config.""" import jwt as _jwt @@ -134,3 +156,38 @@ def test_mcp_serve_builds_proxy_with_correct_url(self, cli_runner): == "https://demo.qualytics.io/api/mcp" ) # config url already includes /api assert call_kwargs.kwargs["auth"] == token + + def test_mcp_serve_forwards_ssl_verify_false(self, cli_runner): + """Test that ssl_verify=False is wired into the httpx client factory.""" + import jwt as _jwt + + token = _jwt.encode({"sub": "u"}, key="", algorithm="HS256") + config = { + "url": "https://demo.qualytics.io/api", + "token": token, + "ssl_verify": False, + } + + with patch("qualytics.cli.mcp_cmd.load_config", return_value=config): + with patch("qualytics.cli.mcp_cmd.create_proxy") as mock_proxy: + with patch("qualytics.cli.mcp_cmd.Client"): + with patch( + "qualytics.cli.mcp_cmd.StreamableHttpTransport" + ) as mock_transport: + mock_server = MagicMock() + mock_proxy.return_value = mock_server + + cli_runner.invoke( + __import__("qualytics.qualytics", fromlist=["app"]).app, + ["mcp", "serve"], + ) + + mock_transport.assert_called_once() + factory = mock_transport.call_args.kwargs[ + "httpx_client_factory" + ] + with patch( + "qualytics.cli.mcp_cmd.httpx.AsyncClient" + ) as mock_async: + factory() + mock_async.assert_called_once_with(verify=False)