diff --git a/beeline/scripts/nlsql/README.md b/beeline/scripts/nlsql/README.md new file mode 100644 index 000000000000..b7592b631dcf --- /dev/null +++ b/beeline/scripts/nlsql/README.md @@ -0,0 +1,112 @@ +# Natural Language to SQL (nlsql) for Apache Hive Beeline + +Converts natural language queries to HiveQL using an LLM, then executes them automatically. + +## Prerequisites + +- Python 3.10+ +- An Anthropic API key or compatible LLM gateway +- A running Hive Metastore (Docker Compose or standalone) + +## Installation + +Install the nlsql Python dependencies: + +```bash +pip install -r $HIVE_HOME/scripts/nlsql/requirements.txt +``` + +For the stdio fallback (non-Docker), also install the MCP server dependencies — see +[Local (stdio)](#local-stdio--fallback) below. + +## Configuration + +Set the following environment variables before starting Beeline: + +```bash +export ANTHROPIC_BASE_URL="https://api.anthropic.com" # or your gateway URL +export ANTHROPIC_AUTH_TOKEN="your-token" # or use ANTHROPIC_API_KEY +export ANTHROPIC_MODEL="claude-sonnet-4-20250514" # optional, this is the default +``` + +| Variable | Default | Description | +|----------|---------|-------------| +| `ANTHROPIC_BASE_URL` | `https://api.anthropic.com` | API base URL (use this for proxies or gateways) | +| `ANTHROPIC_MODEL` | `claude-sonnet-4-20250514` | Model to use for SQL generation | +| `ANTHROPIC_AUTH_TOKEN` | _(none)_ | Auth token for the LLM API | +| `ANTHROPIC_API_KEY` | _(none)_ | Fallback if `ANTHROPIC_AUTH_TOKEN` is not set | + +## MCP Server Connection + +The nlsql agent connects to the Metastore MCP Server to discover schema. There are two modes: + +### Docker (SSE) — recommended + +When running Hive via Docker Compose, the MCP server runs as a sidecar process inside +the metastore container, exposed on port 3000. Set `MCP_SERVER_URL` to connect: + +```bash +export MCP_SERVER_URL="http://localhost:3000/sse" +``` + +No additional setup is needed — the Docker image includes the MCP server and its +Python dependencies. + +### Local (stdio) — fallback + +If `MCP_SERVER_URL` is not set, the agent falls back to spawning the MCP server as +a local subprocess via stdio. This requires the MCP server script available locally +and a running Metastore with the Iceberg REST Catalog endpoint enabled. + +The agent searches for `metastore_mcp_server.py` in the source tree at +`standalone-metastore/metastore-tools/mcp-server/` (relative to the repo root). + +Install the MCP server Python dependencies: +```bash +pip install -r path/to/mcp-server/requirements.txt +``` + +Configure the metastore REST catalog URL: +```bash +export METASTORE_REST_URL="http://localhost:9001/iceberg" +``` + +| Variable | Default | Description | +|----------|---------|-------------| +| `MCP_SERVER_URL` | _(none)_ | MCP server SSE endpoint (e.g. `http://localhost:3000/sse`) | +| `METASTORE_REST_URL` | `http://localhost:9001/iceberg` | HMS REST Catalog URL (only used in stdio fallback) | + +## Usage + +Connect to HiveServer2 via Beeline, then use the `!nlsql` command: + +``` +$ beeline -u jdbc:hive2://localhost:10000 + +beeline> !nlsql show me the top 10 orders by amount +beeline> !nlsql count all rows in the customers table +beeline> !nlsql which tables have more than 1 million rows +``` + +The agent will: +1. Discover the schema of the current database via the Metastore MCP Server +2. Send the schema and your natural language query to the LLM +3. Display the generated SQL +4. Execute it against HiveServer2 + +## How It Works + +``` +!nlsql + | + v +Beeline (Java) -- spawns Python subprocess + | + v +nlsql_agent.py + |-- MCP_SERVER_URL set? --> connects via SSE (http://host:3000/sse) + |-- not set? --> spawns metastore_mcp_server.py via stdio + | + v +metastore_mcp_server.py -- queries HMS Iceberg REST Catalog API +``` diff --git a/beeline/scripts/nlsql/nlsql_agent.py b/beeline/scripts/nlsql/nlsql_agent.py new file mode 100644 index 000000000000..586613f2685a --- /dev/null +++ b/beeline/scripts/nlsql/nlsql_agent.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Natural Language to SQL agent for Apache Hive Beeline. + +Connects to the Metastore MCP Server (via SSE or stdio) to discover schema, +then uses LangChain + Claude to generate HiveQL. +Prints only the generated SQL to stdout. +""" + +import argparse +import asyncio +import os +import sys +import re + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from mcp.client.sse import sse_client +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import HumanMessage, SystemMessage + + +async def _call_schema_tool(session, database): + """Call get_table_schema_sql on an initialized MCP session.""" + result = await session.call_tool( + 'get_table_schema_sql', + arguments={'database': database} + ) + schema_text = '' + for content in result.content: + if hasattr(content, 'text'): + schema_text += content.text + return schema_text + + +async def get_schema_via_sse(mcp_server_url, database): + """Connect to a running MCP server via SSE and get schema.""" + async with sse_client(mcp_server_url) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + return await _call_schema_tool(session, database) + + +async def get_schema_via_stdio(mcp_server_script, database): + """Spawn the Metastore MCP server as a subprocess and get schema.""" + metastore_url = os.environ.get('METASTORE_REST_URL', 'http://localhost:9001/iceberg') + + server_params = StdioServerParameters( + command='python3', + args=[mcp_server_script, '--transport', 'stdio'], + env={**os.environ, 'METASTORE_REST_URL': metastore_url}, + ) + + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + return await _call_schema_tool(session, database) + + +def generate_sql(schema_info, nl_query, database): + """Use LangChain + Claude to convert natural language to HiveQL.""" + base_url = os.environ.get('ANTHROPIC_BASE_URL', 'https://api.anthropic.com') + api_key = os.environ.get('ANTHROPIC_AUTH_TOKEN', + os.environ.get('ANTHROPIC_API_KEY', '')) + model = os.environ.get('ANTHROPIC_MODEL', 'claude-sonnet-4-20250514') + + llm = ChatAnthropic( + model=model, + anthropic_api_url=base_url, + anthropic_api_key=api_key, + max_tokens=1024, + temperature=0, + ) + + system_prompt = f"""You are a HiveQL expert. Convert the user's natural language request into a valid HiveQL query. + +RULES: +- Output ONLY the SQL query, nothing else. No markdown, no explanation, no code fences. +- Use HiveQL syntax (not MySQL or PostgreSQL). +- The current database is `{database}`. +- Use ONLY the tables and columns listed in the schema below. Do NOT reference tables that don't exist. +- If the request cannot be answered with the available schema, write the closest possible query using the actual tables. +- Always include a LIMIT clause for SELECT queries unless the user explicitly asks for all rows. + +SCHEMA: +{schema_info}""" + + messages = [ + SystemMessage(content=system_prompt), + HumanMessage(content=nl_query), + ] + + response = llm.invoke(messages) + sql = response.content.strip() + # Strip markdown code fences if the model wraps the output + sql = re.sub(r'^```(?:sql)?\s*', '', sql) + sql = re.sub(r'\s*```$', '', sql) + return sql.strip() + + +async def async_main(args): + mcp_server_url = os.environ.get('MCP_SERVER_URL', '') + + if mcp_server_url: + # Connect to remote MCP server via SSE + try: + schema_info = await get_schema_via_sse(mcp_server_url, args.database) + except Exception as e: + print(f'Warning: MCP SSE connection failed: {e}', file=sys.stderr) + schema_info = '(Schema not available)' + else: + # Fall back to spawning MCP server as subprocess + # In the source tree: beeline/scripts/nlsql/ -> standalone-metastore/metastore-tools/mcp-server/ + script_dir = os.path.dirname(os.path.abspath(__file__)) + source_root = os.path.join(script_dir, '..', '..', '..') + candidates = [ + os.path.join(source_root, 'standalone-metastore', 'metastore-tools', + 'mcp-server', 'metastore_mcp_server.py'), + ] + mcp_server_script = None + for candidate in candidates: + candidate = os.path.normpath(candidate) + if os.path.exists(candidate): + mcp_server_script = candidate + break + + if mcp_server_script is None: + print('Warning: MCP server not found in any known location', + file=sys.stderr) + schema_info = '(Schema not available - MCP server not found)' + else: + try: + schema_info = await get_schema_via_stdio(mcp_server_script, args.database) + except Exception as e: + print(f'Warning: MCP schema discovery failed: {e}', file=sys.stderr) + schema_info = '(Schema not available)' + + # Generate SQL + try: + sql = generate_sql(schema_info, args.query, args.database) + except Exception as e: + print(f'Error generating SQL: {e}', file=sys.stderr) + sys.exit(1) + + # Print ONLY the SQL to stdout + print(sql) + + +def main(): + parser = argparse.ArgumentParser(description='Natural Language to HiveQL') + parser.add_argument('--query', required=True, + help='Natural language query') + parser.add_argument('--database', default='default', + help='Current database name') + parser.add_argument('--metastore-url', + default=os.environ.get('METASTORE_REST_URL', + 'http://localhost:9001/iceberg'), + help='Metastore Iceberg REST Catalog URL (stdio fallback only)') + args = parser.parse_args() + + # Set env var so the MCP server picks it up (stdio fallback) + os.environ['METASTORE_REST_URL'] = args.metastore_url.rstrip('/') + + asyncio.run(async_main(args)) + + +if __name__ == '__main__': + main() diff --git a/beeline/scripts/nlsql/requirements.txt b/beeline/scripts/nlsql/requirements.txt new file mode 100644 index 000000000000..bae9f0405b0e --- /dev/null +++ b/beeline/scripts/nlsql/requirements.txt @@ -0,0 +1,3 @@ +langchain>=0.2.0 +langchain-anthropic>=0.1.0 +mcp>=1.0.0 diff --git a/beeline/src/java/org/apache/hive/beeline/BeeLine.java b/beeline/src/java/org/apache/hive/beeline/BeeLine.java index bef953409657..8489b7edc51c 100644 --- a/beeline/src/java/org/apache/hive/beeline/BeeLine.java +++ b/beeline/src/java/org/apache/hive/beeline/BeeLine.java @@ -304,6 +304,8 @@ public class BeeLine implements Closeable { new ReflectiveCommandHandler(this, new String[]{"addlocaldrivername"}, null), new ReflectiveCommandHandler(this, new String[]{"delimiter"}, + null), + new ReflectiveCommandHandler(this, new String[]{"nlsql"}, null) }; diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java index e1364d1848de..64a23ca8aa79 100644 --- a/beeline/src/java/org/apache/hive/beeline/Commands.java +++ b/beeline/src/java/org/apache/hive/beeline/Commands.java @@ -1183,6 +1183,141 @@ public boolean sh(String line) { } } + public boolean nlsql(String line) { + if (line == null || line.length() == 0) { + return false; + } + + if (!line.startsWith("nlsql")) { + return false; + } + + String nlQuery = line.substring("nlsql".length()).trim(); + if (nlQuery.isEmpty()) { + return beeLine.error("Usage: !nlsql "); + } + + // Must be connected + if (beeLine.getDatabaseConnection() == null || beeLine.getDatabaseConnection().getUrl() == null) { + return beeLine.error("Not connected. Use !connect first."); + } + + // Locate the Python script + String hiveHome = System.getenv("HIVE_HOME"); + String scriptPath; + if (hiveHome != null) { + scriptPath = hiveHome + File.separator + "scripts" + File.separator + + "nlsql" + File.separator + "nlsql_agent.py"; + } else { + scriptPath = "scripts" + File.separator + "nlsql" + File.separator + + "nlsql_agent.py"; + } + + if (!new File(scriptPath).exists()) { + return beeLine.error("nlsql script not found at: " + scriptPath + + ". Set HIVE_HOME or ensure the script exists."); + } + + // ANSI colors + String RED = "\u001B[31m"; + String CYAN = "\u001B[36m"; + String BOLD = "\u001B[1m"; + String DIM = "\u001B[2m"; + String RESET = "\u001B[0m"; + + beeLine.output(DIM + "Generating SQL for: " + RESET + CYAN + nlQuery + RESET); + + try { + // Get current database name + String database = "default"; + try { + database = beeLine.getDatabaseConnection().getConnection().getSchema(); + if (database == null || database.isEmpty()) { + database = "default"; + } + } catch (Exception e) { + // ignore, use default + } + + // HMS REST Catalog URL (configurable via METASTORE_REST_URL env var) + String metastoreUrl = System.getenv("METASTORE_REST_URL"); + if (metastoreUrl == null || metastoreUrl.isEmpty()) { + metastoreUrl = "http://localhost:9001/iceberg"; + } + + ProcessBuilder pb = new ProcessBuilder( + "python3", scriptPath, + "--query", nlQuery, + "--database", database, + "--metastore-url", metastoreUrl + ); + // Pass through environment variables for LLM configuration + Map env = pb.environment(); + String[] envKeys = {"ANTHROPIC_BASE_URL", "ANTHROPIC_AUTH_TOKEN", + "ANTHROPIC_API_KEY", "ANTHROPIC_MODEL", "METASTORE_REST_URL", + "MCP_SERVER_URL"}; + for (String key : envKeys) { + String val = System.getenv(key); + if (val != null) { + env.put(key, val); + } + } + + pb.redirectErrorStream(false); + Process process = pb.start(); + process.getOutputStream().close(); + + // Read stdout (the generated SQL) + StringBuilder sqlBuilder = new StringBuilder(); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getInputStream()))) { + String outputLine; + while ((outputLine = reader.readLine()) != null) { + if (sqlBuilder.length() > 0) { + sqlBuilder.append("\n"); + } + sqlBuilder.append(outputLine); + } + } + + // Read stderr (errors/warnings) + StringBuilder errBuilder = new StringBuilder(); + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(process.getErrorStream()))) { + String errLine; + while ((errLine = reader.readLine()) != null) { + errBuilder.append(errLine).append("\n"); + } + } + + int exitCode = process.waitFor(); + if (exitCode != 0) { + beeLine.error("nlsql script failed (exit code " + exitCode + ")"); + if (errBuilder.length() > 0) { + beeLine.error(errBuilder.toString().trim()); + } + return false; + } + + String generatedSql = sqlBuilder.toString().trim(); + if (generatedSql.isEmpty()) { + return beeLine.error("nlsql script produced no output"); + } + + // Display the generated SQL + beeLine.output(""); + beeLine.output(BOLD + RED + generatedSql + RESET); + beeLine.output(""); + + // Execute the generated SQL through the normal Beeline path + return executeInternal(generatedSql, false); + + } catch (Exception e) { + beeLine.error("Exception running nlsql: " + e.getMessage()); + return false; + } + } + public boolean call(String line) { return execute(line, true, false); } diff --git a/beeline/src/main/resources/BeeLine.properties b/beeline/src/main/resources/BeeLine.properties index f58cf100f3ef..4e49ce2e473f 100644 --- a/beeline/src/main/resources/BeeLine.properties +++ b/beeline/src/main/resources/BeeLine.properties @@ -58,6 +58,7 @@ help-unalias: Unset a command alias help-scan: Scan for installed JDBC drivers help-sql: Execute a SQL command help-sh: Execute a shell command +help-nlsql: Convert natural language to SQL and execute it help-history: Display the command history help-record: Record all output to the specified file help-indexes: List all the indexes for the specified table diff --git a/packaging/src/docker/Dockerfile b/packaging/src/docker/Dockerfile index 17f7c96eade8..767b209fb234 100644 --- a/packaging/src/docker/Dockerfile +++ b/packaging/src/docker/Dockerfile @@ -79,7 +79,7 @@ ARG TEZ_VERSION # Install dependencies RUN set -ex; \ microdnf update -y; \ - microdnf -y install procps gettext; \ + microdnf -y install procps gettext python3.11 python3.11-pip; \ microdnf clean all; \ useradd --no-create-home -s /sbin/nologin -c "" --uid $UID hive @@ -97,8 +97,10 @@ COPY --from=env --chown=hive /opt/apache-tez-$TEZ_VERSION-bin $TEZ_HOME COPY --chown=hive entrypoint.sh / COPY --chown=hive conf $HIVE_HOME/conf +COPY --chown=hive mcp-server $HIVE_HOME/scripts/metastore/mcp-server -RUN chmod +x /entrypoint.sh && \ +RUN pip3.11 install --no-cache-dir -r $HIVE_HOME/scripts/metastore/mcp-server/requirements.txt && \ + chmod +x /entrypoint.sh && \ mkdir -p $HIVE_HOME/data/warehouse && \ chown hive $HIVE_HOME/data/warehouse && \ mkdir -p $HIVE_HOME/scratch && \ @@ -108,6 +110,6 @@ RUN chmod +x /entrypoint.sh && \ USER hive WORKDIR $HIVE_HOME -EXPOSE 10000 10002 9083 +EXPOSE 10000 10002 9083 3000 ENTRYPOINT ["sh", "-c", "/entrypoint.sh"] diff --git a/packaging/src/docker/build.sh b/packaging/src/docker/build.sh index 9d8258320f5f..090c697ad1e6 100755 --- a/packaging/src/docker/build.sh +++ b/packaging/src/docker/build.sh @@ -123,6 +123,7 @@ cp "$CACHE_DIR/apache-tez-$TEZ_VERSION-bin.tar.gz" "$WORK_DIR/" cp -R "$SOURCE_DIR/packaging/src/docker/conf" "$WORK_DIR/" cp -R "$SOURCE_DIR/packaging/src/docker/entrypoint.sh" "$WORK_DIR/" cp "$SOURCE_DIR/packaging/src/docker/Dockerfile" "$WORK_DIR/" +cp -R "$SOURCE_DIR/standalone-metastore/metastore-tools/mcp-server" "$WORK_DIR/" docker build \ "$WORK_DIR" \ -f "$WORK_DIR/Dockerfile" \ diff --git a/packaging/src/docker/conf/hive-site.xml.template b/packaging/src/docker/conf/hive-site.xml.template index 616ecc23a605..c3864d1459f6 100644 --- a/packaging/src/docker/conf/hive-site.xml.template +++ b/packaging/src/docker/conf/hive-site.xml.template @@ -76,4 +76,12 @@ hive.query.results.cache.directory ${HIVE_QUERY_RESULTS_CACHE_DIRECTORY} + + metastore.catalog.servlet.port + 9001 + + + metastore.catalog.servlet.auth + none + diff --git a/packaging/src/docker/docker-compose.yml b/packaging/src/docker/docker-compose.yml index 3827cbf94f04..466e50419d73 100644 --- a/packaging/src/docker/docker-compose.yml +++ b/packaging/src/docker/docker-compose.yml @@ -53,11 +53,16 @@ services: -Djavax.jdo.option.ConnectionUserName=hive -Djavax.jdo.option.ConnectionPassword=password + MCP_SERVER_ENABLED: 'true' + MCP_SERVER_PORT: '3000' + S3_ENDPOINT_URL: "${S3_ENDPOINT_URL}" AWS_ACCESS_KEY_ID: "${AWS_ACCESS_KEY_ID}" AWS_SECRET_ACCESS_KEY: "${AWS_SECRET_ACCESS_KEY}" ports: + - '9001:9001' - '9083:9083' + - '3000:3000' volumes: - warehouse:/opt/hive/data/warehouse - type: bind diff --git a/packaging/src/docker/entrypoint.sh b/packaging/src/docker/entrypoint.sh index b6e71c2e7ee5..49fbb91c11bc 100644 --- a/packaging/src/docker/entrypoint.sh +++ b/packaging/src/docker/entrypoint.sh @@ -135,6 +135,16 @@ if [[ "${SKIP_SCHEMA_INIT}" == "false" && ( "${SERVICE_NAME}" == "hiveserver2" | initialize_hive fi +# Start Metastore MCP Server as a sidecar process if enabled +if [ "${MCP_SERVER_ENABLED:-false}" == "true" ] && [ "${SERVICE_NAME}" == "metastore" ]; then + MCP_PORT="${MCP_SERVER_PORT:-3000}" + MCP_METASTORE_URL="${METASTORE_REST_URL:-http://localhost:9001/iceberg}" + echo "Starting Metastore MCP Server on port ${MCP_PORT}..." + python3.11 "$HIVE_HOME/scripts/metastore/mcp-server/metastore_mcp_server.py" \ + --transport sse --port "${MCP_PORT}" \ + --metastore-url "${MCP_METASTORE_URL}" & +fi + if [ "${SERVICE_NAME}" == "hiveserver2" ]; then export HADOOP_CLASSPATH="$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH" exec "$HIVE_HOME/bin/hive" --skiphadoopversion --skiphbasecp --service "$SERVICE_NAME" diff --git a/packaging/src/main/assembly/bin.xml b/packaging/src/main/assembly/bin.xml index a5af4478bc53..b8c0378f22b0 100644 --- a/packaging/src/main/assembly/bin.xml +++ b/packaging/src/main/assembly/bin.xml @@ -338,6 +338,16 @@ lib/py/queryplan + + + ${project.parent.basedir}/beeline/scripts/nlsql + 755 + + **/* + + scripts/nlsql + + ${project.parent.basedir}/hcatalog/bin diff --git a/standalone-metastore/metastore-tools/mcp-server/README.md b/standalone-metastore/metastore-tools/mcp-server/README.md new file mode 100644 index 000000000000..11897b4d8361 --- /dev/null +++ b/standalone-metastore/metastore-tools/mcp-server/README.md @@ -0,0 +1,126 @@ +# Metastore MCP Server + +A [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) server that exposes +Hive Metastore metadata via the Iceberg REST Catalog API. + +LLM agents can use the provided tools to discover databases, tables, columns, +partitions, and table properties. + +## Prerequisites + +- Python 3.10+ +- A running Hive Metastore with the Iceberg REST Catalog endpoint enabled + +### Metastore Configuration + +Enable the REST Catalog endpoint in `metastore-site.xml`: + +```xml + + metastore.catalog.servlet.port + 9001 + + + metastore.catalog.servlet.auth + none + +``` + +## Installation + +```bash +pip install -r requirements.txt +``` + +## Usage + +### SSE transport (default) — for Docker / network access + +```bash +python3 metastore_mcp_server.py --port 3000 --metastore-url http://localhost:9001/iceberg +``` + +The server listens on `http://0.0.0.0:3000/sse` and accepts MCP client connections over SSE. + +### stdio transport — for local subprocess usage + +```bash +python3 metastore_mcp_server.py --transport stdio --metastore-url http://localhost:9001/iceberg +``` + +Communicates over stdin/stdout. Used when an MCP client spawns the server as a subprocess. + +### Docker + +When running Hive via Docker Compose, the MCP server runs automatically as a sidecar +process inside the metastore container. Set `MCP_SERVER_ENABLED=true` in the metastore +service environment (already configured in the provided `docker-compose.yml`). + +The server is accessible at `http://localhost:3000/sse` from the host. + +### Integration with Claude Desktop / Claude Code + +If the MCP server is running via Docker (SSE), add to your MCP configuration: + +```json +{ + "mcpServers": { + "metastore": { + "url": "http://localhost:3000/sse" + } + } +} +``` + +Alternatively, use stdio transport (spawns the server as a subprocess): + +```json +{ + "mcpServers": { + "metastore": { + "command": "python3", + "args": ["/path/to/metastore_mcp_server.py", "--transport", "stdio"], + "env": { + "METASTORE_REST_URL": "http://localhost:9001/iceberg" + } + } + } +} +``` + +### Python MCP Client Example + +```python +from mcp import ClientSession +from mcp.client.sse import sse_client + +async with sse_client("http://localhost:3000/sse") as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + + # List available tools + tools = await session.list_tools() + + # Get schema for all tables in a database + result = await session.call_tool( + 'get_table_schema_sql', + arguments={'database': 'default'} + ) +``` + +## Available Tools + +| Tool | Description | +|------|-------------| +| `list_databases` | List all databases (namespaces) in the metastore | +| `list_tables` | List all tables in a database | +| `describe_table` | Get detailed schema, partitions, and properties for a table | +| `get_table_schema_sql` | Get a SQL-friendly schema summary of all tables in a database | + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `METASTORE_REST_URL` | `http://localhost:9001/iceberg` | Metastore Iceberg REST Catalog URL | +| `MCP_SERVER_PORT` | `3000` | Port for SSE transport | +| `MCP_SERVER_ENABLED` | `false` | Set to `true` in Docker to start the sidecar | diff --git a/standalone-metastore/metastore-tools/mcp-server/metastore_mcp_server.py b/standalone-metastore/metastore-tools/mcp-server/metastore_mcp_server.py new file mode 100644 index 000000000000..93f5d550d7e3 --- /dev/null +++ b/standalone-metastore/metastore-tools/mcp-server/metastore_mcp_server.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Metastore MCP Server — exposes Hive Metastore metadata via Model Context Protocol. + +Backed by the Metastore Iceberg REST Catalog API. Provides tools for LLM agents +to discover databases, tables, columns, partitions, and table properties. + +Usage: + python3 metastore_mcp_server.py [--metastore-url http://localhost:9001/iceberg] + + # Or via env var: + METASTORE_REST_URL=http://localhost:9001/iceberg python3 metastore_mcp_server.py +""" + +import argparse +import os +import json + +import requests +from mcp.server.fastmcp import FastMCP + +# Create MCP server +mcp = FastMCP("metastore-mcp", instructions=""" +You have access to a Hive Metastore via the Iceberg REST Catalog API. +Use the tools below to discover databases, tables, and their schemas +before writing HiveQL queries. Always check available tables first. +""") + +METASTORE_URL = os.environ.get("METASTORE_REST_URL", "http://localhost:9001/iceberg").rstrip("/") + + +def _load_table_metadata(database, table_name): + """Fetch table metadata from the REST Catalog API.""" + resp = requests.get( + f"{METASTORE_URL}/v1/namespaces/{database}/tables/{table_name}", timeout=10) + resp.raise_for_status() + return resp.json().get("metadata", {}) + + +def _get_current_schema(metadata): + """Extract the current schema from Iceberg table metadata.""" + current_schema_id = metadata.get("current-schema-id", 0) + schemas = metadata.get("schemas", []) + return next((s for s in schemas if s.get("schema-id") == current_schema_id), + schemas[0] if schemas else {}) + + +def _parse_columns(schema): + """Parse column definitions from an Iceberg schema.""" + columns = [] + for field in schema.get("fields", []): + col_type = field.get("type", "unknown") + if isinstance(col_type, dict): + col_type = col_type.get("type", "complex") + columns.append({ + "name": field.get("name"), + "type": col_type, + "required": field.get("required", False), + }) + return columns + + +def _get_partition_fields(metadata): + """Extract partition fields from the default partition spec.""" + partition_specs = metadata.get("partition-specs", []) + if not partition_specs: + return [] + default_spec_id = metadata.get("default-spec-id", 0) + spec = next((s for s in partition_specs if s.get("spec-id") == default_spec_id), None) + return spec.get("fields", []) if spec else [] + + +@mcp.tool() +def list_databases() -> str: + """List all databases (namespaces) in the Hive Metastore.""" + resp = requests.get(f"{METASTORE_URL}/v1/namespaces", timeout=10) + resp.raise_for_status() + data = resp.json() + namespaces = [ns[0] if isinstance(ns, list) else ns + for ns in data.get("namespaces", [])] + return json.dumps(namespaces, indent=2) + + +@mcp.tool() +def list_tables(database: str = "default") -> str: + """List all tables in a database. + + Args: + database: The database/namespace name (default: "default") + """ + resp = requests.get(f"{METASTORE_URL}/v1/namespaces/{database}/tables", timeout=10) + resp.raise_for_status() + data = resp.json() + tables = [ident["name"] for ident in data.get("identifiers", [])] + return json.dumps(tables, indent=2) + + +@mcp.tool() +def describe_table(table: str, database: str = "default") -> str: + """Get detailed schema information for a table including columns, + types, partition spec, and key properties. + + Args: + table: The table name + database: The database/namespace name (default: "default") + """ + metadata = _load_table_metadata(database, table) + schema = _get_current_schema(metadata) + + props = metadata.get("properties", {}) + interesting = ["format-version", "write.format.default", "comment", + "engine.hive.enabled", "numRows", "totalSize"] + + result = { + "table": f"{database}.{table}", + "columns": _parse_columns(schema), + "partition_fields": _get_partition_fields(metadata), + "properties": {k: v for k, v in props.items() if k in interesting}, + "location": metadata.get("location", ""), + "format_version": metadata.get("format-version", ""), + } + return json.dumps(result, indent=2) + + +@mcp.tool() +def get_table_schema_sql(database: str = "default") -> str: + """Get a SQL-friendly schema summary of ALL tables in a database. + Useful as context before generating SQL queries. + + Args: + database: The database/namespace name (default: "default") + """ + # List tables + resp = requests.get(f"{METASTORE_URL}/v1/namespaces/{database}/tables", timeout=10) + resp.raise_for_status() + tables = [ident["name"] for ident in resp.json().get("identifiers", [])] + + schema_parts = [] + for table_name in tables: + try: + metadata = _load_table_metadata(database, table_name) + schema = _get_current_schema(metadata) + columns = _parse_columns(schema) + + col_defs = [f" {col['name']} {col['type']}" for col in columns] + + part_info = "" + part_fields = _get_partition_fields(metadata) + if part_fields: + pnames = [f.get("name", "") for f in part_fields] + part_info = f"\n -- PARTITIONED BY: {', '.join(pnames)}" + + schema_parts.append( + f"TABLE {database}.{table_name} (\n" + + ",\n".join(col_defs) + + part_info + + "\n)") + except Exception: + pass + + return "\n\n".join(schema_parts) if schema_parts else "(No tables found)" + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Metastore MCP Server") + parser.add_argument("--metastore-url", + default=os.environ.get("METASTORE_REST_URL", + "http://localhost:9001/iceberg"), + help="Metastore REST Catalog URL") + parser.add_argument("--transport", default="sse", + choices=["sse", "stdio"], + help="MCP transport (default: sse)") + parser.add_argument("--port", type=int, + default=int(os.environ.get("MCP_SERVER_PORT", "3000")), + help="Port for SSE transport (default: 3000)") + args = parser.parse_args() + METASTORE_URL = args.metastore_url.rstrip("/") + + if args.transport == "sse": + mcp.settings.host = "0.0.0.0" + mcp.settings.port = args.port + mcp.run(transport=args.transport) diff --git a/standalone-metastore/metastore-tools/mcp-server/requirements.txt b/standalone-metastore/metastore-tools/mcp-server/requirements.txt new file mode 100644 index 000000000000..ee1a75468177 --- /dev/null +++ b/standalone-metastore/metastore-tools/mcp-server/requirements.txt @@ -0,0 +1,2 @@ +mcp[cli]>=1.0.0 +requests>=2.28.0 diff --git a/standalone-metastore/packaging/src/docker/Dockerfile b/standalone-metastore/packaging/src/docker/Dockerfile index 12e17a2d0d9d..92f955b21ecd 100644 --- a/standalone-metastore/packaging/src/docker/Dockerfile +++ b/standalone-metastore/packaging/src/docker/Dockerfile @@ -80,7 +80,7 @@ ARG HIVE_VERSION # Install dependencies RUN set -ex; \ microdnf update -y; \ - microdnf -y install procps gettext; \ + microdnf -y install procps gettext python3.11 python3.11-pip; \ microdnf clean all; \ useradd --no-create-home -s /sbin/nologin -c "" --uid $UID hive @@ -96,13 +96,15 @@ COPY --from=env --chown=hive /opt/apache-hive-metastore-$HIVE_VERSION-bin $HIVE_ COPY --chown=hive entrypoint.sh / COPY --chown=hive conf $HIVE_HOME/conf +COPY --chown=hive mcp-server $HIVE_HOME/scripts/metastore/mcp-server -RUN chmod +x /entrypoint.sh && \ +RUN pip3.11 install --no-cache-dir -r $HIVE_HOME/scripts/metastore/mcp-server/requirements.txt && \ + chmod +x /entrypoint.sh && \ mkdir -p $HIVE_HOME/data/warehouse && \ chown hive $HIVE_HOME/data/warehouse USER hive WORKDIR $HIVE_HOME -EXPOSE 9001 9083 +EXPOSE 9001 9083 3000 ENTRYPOINT ["sh", "-c", "/entrypoint.sh"] diff --git a/standalone-metastore/packaging/src/docker/build.sh b/standalone-metastore/packaging/src/docker/build.sh index 490ca284b619..4bcaf929d7fa 100755 --- a/standalone-metastore/packaging/src/docker/build.sh +++ b/standalone-metastore/packaging/src/docker/build.sh @@ -103,6 +103,7 @@ cp "$CACHE_DIR/hadoop-$HADOOP_VERSION.tar.gz" "$WORK_DIR/" cp -R "$SOURCE_DIR/packaging/src/docker/conf" "$WORK_DIR/" cp -R "$SOURCE_DIR/packaging/src/docker/entrypoint.sh" "$WORK_DIR/" cp "$SOURCE_DIR/packaging/src/docker/Dockerfile" "$WORK_DIR/" +cp -R "$SOURCE_DIR/metastore-tools/mcp-server" "$WORK_DIR/" docker build \ "$WORK_DIR" \ -f "$WORK_DIR/Dockerfile" \ diff --git a/standalone-metastore/packaging/src/docker/docker-compose.yml b/standalone-metastore/packaging/src/docker/docker-compose.yml index 852953c0295a..df2c2e83e982 100644 --- a/standalone-metastore/packaging/src/docker/docker-compose.yml +++ b/standalone-metastore/packaging/src/docker/docker-compose.yml @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -version: '3.9' services: postgres: image: postgres @@ -52,12 +51,16 @@ services: -Djavax.jdo.option.ConnectionUserName=hive -Djavax.jdo.option.ConnectionPassword=password + MCP_SERVER_ENABLED: 'true' + MCP_SERVER_PORT: '3000' + S3_ENDPOINT_URL: "${S3_ENDPOINT_URL}" AWS_ACCESS_KEY_ID: "${AWS_ACCESS_KEY_ID}" AWS_SECRET_ACCESS_KEY: "${AWS_SECRET_ACCESS_KEY}" ports: - '9001:9001' - '9083:9083' + - '3000:3000' volumes: - warehouse:/opt/hive/data/warehouse - type: bind diff --git a/standalone-metastore/packaging/src/docker/entrypoint.sh b/standalone-metastore/packaging/src/docker/entrypoint.sh index ee72357f9e45..0aa27a685622 100644 --- a/standalone-metastore/packaging/src/docker/entrypoint.sh +++ b/standalone-metastore/packaging/src/docker/entrypoint.sh @@ -76,5 +76,15 @@ if [[ "${SKIP_SCHEMA_INIT}" == "false" ]]; then initialize_hive fi +# Start Metastore MCP Server as a sidecar process if enabled +if [ "${MCP_SERVER_ENABLED:-false}" == "true" ]; then + MCP_PORT="${MCP_SERVER_PORT:-3000}" + MCP_METASTORE_URL="${METASTORE_REST_URL:-http://localhost:9001/iceberg}" + echo "Starting Metastore MCP Server on port ${MCP_PORT}..." + python3.11 "$HIVE_HOME/scripts/metastore/mcp-server/metastore_mcp_server.py" \ + --transport sse --port "${MCP_PORT}" \ + --metastore-url "${MCP_METASTORE_URL}" & +fi + export METASTORE_PORT=${METASTORE_PORT:-9083} exec "$HIVE_HOME/bin/start-metastore"