Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,46 @@ This project provides an MCP (Message Context Protocol) server for [Parseable](h
- Modular MCP tool registration for easy extension
- Supports both HTTP and stdio MCP modes
- Environment variable and flag-based configuration
- The mcp server returns responses in json where the payload is both in text and structured format.


# Testing
To test the server you can use the [mcp-cli](https://github.com/philschmid/mcp-cli)
```shell
mcp-cli call parseable get_roles
```
Returns
```json
{
"content": [
{
"type": "text",
"text": "{\"admins\":[{\"privilege\":\"admin\"}],\"network_role\":[{\"privilege\":\"reader\",\"resource\":{\"stream\":\"network_logstream\"}}],\"otel_gateway\":[{\"privilege\":\"editor\"}]}"
}
],
"structuredContent": {
"admins": [
{
"privilege": "admin"
}
],
"network_role": [
{
"privilege": "reader",
"resource": {
"stream": "network_logstream"
}
}
],
"otel_gateway": [
{
"privilege": "editor"
}
]
}
}

```

# Building

Expand Down
18 changes: 13 additions & 5 deletions cmd/mcp_parseable_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"flag"
"log"
"log/slog"
"os"

"github.com/mark3labs/mcp-go/server"
Expand All @@ -21,6 +21,12 @@ func main() {
versionFlag := flag.Bool("version", false, "print version and exit")
flag.Parse()

// Setup structured logger for stdout
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
slog.SetDefault(logger)

if *versionFlag {
println("mcp-parseable-server " + version)
os.Exit(0)
Expand Down Expand Up @@ -72,16 +78,18 @@ Try not to second guess information - if you don't know something or lack inform
tools.RegisterParseableTools(mcpServer)

if *mode == "stdio" {
log.Printf("MCP server running in stdio mode (Parseable at %s)", tools.ParseableBaseURL)
slog.Info("MCP server running in stdio mode", "parseable_url", tools.ParseableBaseURL)
if err := server.ServeStdio(mcpServer); err != nil {
log.Fatalf("MCP stdio server failed: %v", err)
slog.Error("MCP stdio server failed", "error", err)
os.Exit(1)
}
return
}

httpServer := server.NewStreamableHTTPServer(mcpServer)
log.Printf("MCP server running on %s, Parseable at %s", *listenAddr, tools.ParseableBaseURL)
slog.Info("MCP server running", "address", *listenAddr, "parseable_url", tools.ParseableBaseURL)
if err := httpServer.Start(*listenAddr); err != nil {
log.Fatalf("MCP server failed: %v", err)
slog.Error("MCP server failed", "error", err)
os.Exit(1)
}
}
36 changes: 13 additions & 23 deletions tools/parseable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"crypto/tls"
"encoding/json"
"io"
"log"
"log/slog"
"net/http"
"os"
"strconv"
Expand All @@ -31,7 +31,7 @@ func init() {
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
log.Printf("UNSECURE=true: HTTP client will skip TLS verification")
slog.Info("UNSECURE=true: HTTP client will skip TLS verification")
} else {
HTTPClient = http.DefaultClient
}
Expand Down Expand Up @@ -60,7 +60,11 @@ func doParseableQuery(query string, streamName string, startTime string, endTime
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer func() {
if err := resp.Body.Close(); err != nil {
slog.Error("failed to close response body", "error", err)
}
}()
body, _ := io.ReadAll(resp.Body)

// Try to unmarshal as array of rows
Expand All @@ -84,7 +88,7 @@ func listParseableStreams() ([]string, error) {
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("failed to close response body: %v", err)
slog.Error("failed to close response body", "error", err)
}
}()
var apiResult []struct {
Expand All @@ -100,7 +104,7 @@ func listParseableStreams() ([]string, error) {
return streams, nil
}

func getParseableSchema(stream string) (map[string]string, error) {
func getParseableSchema(stream string) (map[string]interface{}, error) {
url := ParseableBaseURL + "/api/v1/logstream/" + stream + "/schema"
httpReq, err := http.NewRequest("GET", url, nil)
if err != nil {
Expand All @@ -113,28 +117,14 @@ func getParseableSchema(stream string) (map[string]string, error) {
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("failed to close response body: %v", err)
slog.Error("failed to close response body", "error", err)
}
}()
var result struct {
Fields []struct {
Name string `json:"name"`
DataType json.RawMessage `json:"data_type"`
} `json:"fields"`
}
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
schema := make(map[string]string)
for _, field := range result.Fields {
var dtStr string
if err := json.Unmarshal(field.DataType, &dtStr); err == nil {
schema[field.Name] = dtStr
continue
}
schema[field.Name] = string(field.DataType)
}
return schema, nil
return result, nil
}

func getParseableStats(streamName string) (map[string]interface{}, error) {
Expand Down Expand Up @@ -185,7 +175,7 @@ func doSimpleGet(url string) (map[string]interface{}, map[string]interface{}, er
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("failed to close response body: %v", err)
slog.Error("failed to close response body", "error", err)
}
}()
var stats map[string]interface{}
Expand Down
64 changes: 36 additions & 28 deletions tools/parseable_about.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package tools

import (
"context"
"fmt"
"strings"
"log/slog"

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
Expand All @@ -12,37 +11,46 @@ import (
func RegisterGetAboutTool(mcpServer *server.MCPServer) {
mcpServer.AddTool(mcp.NewTool(
"get_about",
mcp.WithDescription(`Get information about the Parseable instance. Calls /api/v1/about.

Returned fields:
- version: version of Parseable
- uiVersion: the UI version of Parseable
- commit: the git commit hash
- deploymentId: the deployment ID of Parseable
- updateAvailable: if updates of Parseable is available
- latestVersion: the latest version of Parseable
- llmActive: if the Parseable is configured with LLM support
- llmProvider: what LLM provider is used
- oidcActive: if Parseable is configured with OpenID Connect support
- license: the license of Parseable
- mode: if Parseable is running as Standalone or Cluster mode
- staging: the staging path
- hotTier: if hot tier is enabled or disabled
- grpcPort: the grpc port of Parseable
- store: the storage type used for Parseable like local or object store
- analytics: if analytics is enabled or disabled
mcp.WithDescription(`Get configuration and version information about the Parseable instance.
Use this to understand the Parseable deployment, available features, and version compatibility.
Calls /api/v1/about.

Returns a JSON object with deployment and configuration details:

Deployment Info:
- version: semantic version of Parseable (e.g., "1.2.0")
- uiVersion: version of the web UI
- commit: git commit hash of the Parseable build
- deploymentId: unique identifier for this Parseable instance
- mode: deployment mode ("Standalone" or "Cluster")

Update Information:
- updateAvailable: boolean indicating if a newer version is available
- latestVersion: the latest available version of Parseable

Configuration & Features:
- llmActive: boolean indicating if LLM support is enabled
- llmProvider: name of the LLM provider if configured (e.g., "openai", "anthropic", or null)
- oidcActive: boolean indicating if OpenID Connect authentication is enabled
- analytics: boolean indicating if analytics collection is enabled
- hotTier: boolean indicating if hot tier (fast storage) is enabled

Storage & Infrastructure:
- store: storage backend type (e.g., "local", "s3", "gcs")
- staging: staging/cache path for data processing
- grpcPort: port number for gRPC API connections

Licensing:
- license: license type or status of Parseable

Use this tool to check Parseable capabilities, version information, and configuration state.
`),
), func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
about, err := getParseableAbout()
if err != nil {
slog.Error("failed to get response", "tool", "get_about", "error", err)
return mcp.NewToolResultError(err.Error()), nil
}
var lines []string
for k, v := range about {
lines = append(lines, k+": "+fmt.Sprintf("%v", v))
}
return mcp.NewToolResultText(strings.Join(lines, "\n")), nil
// Optionally, for structured output:
// return mcp.NewToolResultStructured(map[string]interface{}{"info": info}, "Info returned"), nil
return mcp.NewToolResultJSON(about)
})
}
33 changes: 20 additions & 13 deletions tools/parseable_get_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package tools

import (
"context"
"strings"
"log/slog"

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
Expand All @@ -11,24 +11,31 @@ import (
func RegisterGetDataStreamSchemaTool(mcpServer *server.MCPServer) {
mcpServer.AddTool(mcp.NewTool(
"get_data_stream_schema",
mcp.WithDescription("Get the schema for a specific data stream in Parseable. The full content of the stream is typically in the 'body' field as a string."),
mcp.WithString("stream", mcp.Required(), mcp.Description("Data stream name")),
mcp.WithDescription(`Get the complete field schema for a Parseable data stream.
Use this to discover field names, data types, and structure before constructing queries.
Calls /api/v1/logstream/<streamName>/schema.

Returns a JSON object with a 'fields' array containing field definitions for each available field in the stream.
Each field includes:
- name: the field name (string)
- data_type: the data type of the field (e.g., "String", "i64", "f64", "bool", "DateTime")

Use this tool to understand what fields are available for filtering, grouping, or selecting in query_data_stream operations.
`),
mcp.WithString("streamName", mcp.Required(), mcp.Description("Name of the data stream to get the schema for. Example: 'otellogs' or 'monitor_logstream'. Stream must exist in Parseable.")),
), func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
stream := mcp.ParseString(req, "stream", "")
stream := mcp.ParseString(req, "streamName", "")
if stream == "" {
return mcp.NewToolResultError("missing stream in context"), nil
slog.Warn("Missing parameter", "tool", "get_data_stream_schema", "parameter", "streamName")
return mcp.NewToolResultError("missing required field: streamName"), nil
}

schema, err := getParseableSchema(stream)
if err != nil {
slog.Error("failed to get response", "tool", "get_data_stream_schema", "streamName", stream, "error", err)
return mcp.NewToolResultError(err.Error()), nil
}
// Default: return as text
var lines []string
for field, typ := range schema {
lines = append(lines, field+": "+typ)
}
return mcp.NewToolResultText(strings.Join(lines, "\n")), nil
// Optionally, for structured output:
// return mcp.NewToolResultStructured(map[string]interface{}{"schema": schema}, "Schema returned"), nil

return mcp.NewToolResultJSON(schema)
})
}
45 changes: 23 additions & 22 deletions tools/parseable_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package tools

import (
"context"
"fmt"
"strings"
"log/slog"

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
Expand All @@ -12,35 +11,37 @@ import (
func RegisterGetDataStreamInfoTool(mcpServer *server.MCPServer) {
mcpServer.AddTool(mcp.NewTool(
"get_data_stream_info",
mcp.WithDescription(`Get info for a Parseable data stream by name. Calls /api/v1/logstream/<streamName>/info.

Returned fields:
- createdAt: when the data stream was created (ISO 8601)
- firstEventAt: timestamp of the first event (ISO 8601)
- latestEventAt: timestamp of the latest event (ISO 8601)
- streamType: type of data stream (e.g. UserDefined)
- logSource: array of log source objects
- log_source_format: format of the log source (e.g. otel-logs)
- fields: list of field names in the log source
- telemetryType: type of telemetry (e.g. logs, metrics, traces)
mcp.WithDescription(`Get comprehensive metadata information for a Parseable data stream.
Use this to understand stream composition, available fields, and data ingestion timeline.
Calls /api/v1/logstream/<streamName>/info.

Returns a JSON object with the following structure:

- createdAt: ISO 8601 timestamp when the data stream was created
- firstEventAt: ISO 8601 timestamp of the first event (null if stream has no events)
- latestEventAt: ISO 8601 timestamp of the most recent event (null if stream has no events)
- streamType: classification of the stream (e.g., "UserDefined", "System")
- logSource: array of log source objects describing data sources
- log_source_format: format of the ingested data (e.g., "otel-logs", "json", "logfmt")
- fields: array of field names available in this data source
- telemetryType: category of telemetry data (e.g., "logs", "metrics", "traces")

Use this tool before querying a stream to understand its fields and structure.
`),
mcp.WithString("streamName", mcp.Required(), mcp.Description("Name of the data stream (e.g. otellogs)")),
mcp.WithString("streamName", mcp.Required(), mcp.Description("Name of the data stream to get info for. Example: 'otellogs' or 'monitor_logstream'. Stream must exist in Parseable.")),
), func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
streamName := mcp.ParseString(req, "streamName", "")
if streamName == "" {
slog.Warn("called with missing parameter", "parameter", "streamName", "tool", "get_data_stream_info")
return mcp.NewToolResultError("missing required field: streamName"), nil
}

info, err := getParseableInfo(streamName)
if err != nil {
slog.Error("failed to get response", "streamName", streamName, "error", err, "tool", "get_data_stream_info")
return mcp.NewToolResultError("failed to get info: " + err.Error()), nil
}
// Default: return as text
var lines []string
for k, v := range info {
lines = append(lines, k+": "+fmt.Sprintf("%v", v))
}
return mcp.NewToolResultText(strings.Join(lines, "\n")), nil
// Optionally, for structured output:
// return mcp.NewToolResultStructured(map[string]interface{}{"info": info}, "Info returned"), nil

return mcp.NewToolResultJSON(info)
})
}
Loading
Loading