Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ This mode is used for CLI or agent-to-agent workflows.
You can configure the Parseable connection using environment variables or flags:

- `PARSEABLE_URL` or `--parseable-url`- url to the parseable instance (default: http://localhost:8000)
- `PARSEABLE_USER` or `--parseable-user` (default: admin)
- `PARSEABLE_PASS` or `--parseable-pass` (default: admin)
- `PARSEABLE_USERNAME` or `--parseable-user` (default: admin)
- `PARSEABLE_PASSWORD` or `--parseable-pass` (default: admin)
- `LISTEN_ADDR` or `--listen` - the address when running the mcp server in http mode (default: :9034)

- `INSECURE` - set to `true` to skip TLS verification (default: false)`
Example:
```sh
PARSEABLE_URL="http://your-parseable-host:8000" PARSEABLE_USER="admin" PARSEABLE_PASS="admin" ./mcp-parseable-server
Expand Down
4 changes: 2 additions & 2 deletions cmd/mcp_parseable_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func main() {
tools.ParseableBaseURL = "http://localhost:8000"
}
}
tools.ParseableUser = os.Getenv("PARSEABLE_USER")
tools.ParseableUser = os.Getenv("PARSEABLE_USERNAME")
if tools.ParseableUser == "" {
if *parseableUserFlag != "" {
tools.ParseableUser = *parseableUserFlag
} else {
tools.ParseableUser = "admin"
}
}
tools.ParseablePass = os.Getenv("PARSEABLE_PASS")
tools.ParseablePass = os.Getenv("PARSEABLE_PASSWORD")
if tools.ParseablePass == "" {
if *parseablePassFlag != "" {
tools.ParseablePass = *parseablePassFlag
Expand Down
116 changes: 91 additions & 25 deletions tools/parseable.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package tools

import (
"bytes"
"crypto/tls"
"encoding/json"
"io"
"log"
"net/http"
"os"
"strconv"
)

// These variables must be set by main.go before calling RegisterParseableTools
Expand All @@ -13,18 +18,67 @@ var (
ParseablePass string
)

// package-level HTTP client; initialized in init() to respect UNSECURE env var
var HTTPClient *http.Client

func init() {
// UNSECURE environment variable controls whether TLS verification is skipped.
// Accepts the same values as strconv.ParseBool (true/1/t etc.).
unsecureEnv := os.Getenv("UNSECURE")
if ok, _ := strconv.ParseBool(unsecureEnv); ok {
HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
log.Printf("UNSECURE=true: HTTP client will skip TLS verification")
} else {
HTTPClient = http.DefaultClient
}
}

func addBasicAuth(req *http.Request) {
req.SetBasicAuth(ParseableUser, ParseablePass)
}

func doParseableQuery(query string, streamName string, startTime string, endTime string) ([]map[string]interface{}, error) {
payload := map[string]string{
"query": query,
"streamName": streamName,
"startTime": startTime,
"endTime": endTime,
}
jsonPayload, _ := json.Marshal(payload)
url := ParseableBaseURL + parseableSQLPath
httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload))
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")
addBasicAuth(httpReq)
resp, err := HTTPClient.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)

// Try to unmarshal as array of rows
var arrResult []map[string]interface{}
if err := json.Unmarshal(body, &arrResult); err != nil {
return nil, err
}
return arrResult, nil
}

func listParseableStreams() ([]string, error) {
url := ParseableBaseURL + "/api/v1/logstream"
httpReq, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
addBasicAuth(httpReq)
resp, err := http.DefaultClient.Do(httpReq)
resp, err := HTTPClient.Do(httpReq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -53,7 +107,7 @@ func getParseableSchema(stream string) (map[string]string, error) {
return nil, err
}
addBasicAuth(httpReq)
resp, err := http.DefaultClient.Do(httpReq)
resp, err := HTTPClient.Do(httpReq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -85,46 +139,58 @@ func getParseableSchema(stream string) (map[string]string, error) {

func getParseableStats(streamName string) (map[string]interface{}, error) {
url := ParseableBaseURL + "/api/v1/logstream/" + streamName + "/stats"
httpReq, err := http.NewRequest("GET", url, nil)
stats, m, err := doSimpleGet(url)
if err != nil {
return nil, err
}
addBasicAuth(httpReq)
resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("failed to close response body: %v", err)
}
}()
var stats map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return nil, err
return m, err
}
return stats, nil
}

func getParseableInfo(streamName string) (map[string]interface{}, error) {
url := ParseableBaseURL + "/api/v1/logstream/" + streamName + "/info"
info, m, err := doSimpleGet(url)
if err != nil {
return m, err
}
return info, nil
}

func getParseableAbout() (map[string]interface{}, error) {
url := ParseableBaseURL + "/api/v1/about"
about, m, err := doSimpleGet(url)
if err != nil {
return m, err
}
return about, nil
}

func getParseableRoles() (map[string]interface{}, error) {
url := ParseableBaseURL + "/api/v1/roles"
roles, m, err := doSimpleGet(url)
if err != nil {
return m, err
}
return roles, nil
}

func doSimpleGet(url string) (map[string]interface{}, map[string]interface{}, error) {
httpReq, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
return nil, nil, err
}
addBasicAuth(httpReq)
resp, err := http.DefaultClient.Do(httpReq)
resp, err := HTTPClient.Do(httpReq)
if err != nil {
return nil, err
return nil, nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("failed to close response body: %v", err)
}
}()
var info map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
return nil, err
var stats map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return nil, nil, err
}
return info, nil
return stats, nil, nil
}
48 changes: 48 additions & 0 deletions tools/parseable_about.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package tools

import (
"context"
"fmt"
"strings"

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)

func RegisterGetAboutTool(mcpServer *server.MCPServer) {
mcpServer.AddTool(mcp.NewTool(
"get_about",
mcp.WithDescription(`Get information about the Parseable instance. Calls /api/v1/about.

Returned fields:
- version: version of Parseable
- uiVersion: the UI version of Parseable
- commit: the git commit hash
- deploymentId: the deployment ID of Parseable
- updateAvailable: if updates of Parseable is available
- latestVersion: the latest version of Parseable
- llmActive: if the Parseable is configured with LLM support
- llmProvider: what LLM provider is used
- oidcActive: if Parseable is configured with OpenID Connect support
- license: the license of Parseable
- mode: if Parseable is running as Standalone or Cluster mode
- staging: the staging path
- hotTier: if hot tier is enabled or disabled
- grpcPort: the grpc port of Parseable
- store: the storage type used for Parseable like local or object store
- analytics: if analytics is enabled or disabled
`),
), func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
about, err := getParseableAbout()
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
var lines []string
for k, v := range about {
lines = append(lines, k+": "+fmt.Sprintf("%v", v))
}
return mcp.NewToolResultText(strings.Join(lines, "\n")), nil
// Optionally, for structured output:
// return mcp.NewToolResultStructured(map[string]interface{}{"info": info}, "Info returned"), nil
})
}
41 changes: 15 additions & 26 deletions tools/parseable_query.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package tools

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
Expand All @@ -16,7 +13,7 @@ func RegisterQueryDataStreamTool(mcpServer *server.MCPServer) {
mcpServer.AddTool(mcp.NewTool(
"query_data_stream",
mcp.WithDescription("Execute a SQL query against a data stream in Parseable. All fields are required. All times must be in ISO 8601 format."),
mcp.WithString("query", mcp.Required(), mcp.Description("SQL query to run")),
mcp.WithString("query", mcp.Required(), mcp.Description("SQL query to run, but the FROM must always be set to streamName")),
mcp.WithString("streamName", mcp.Required(), mcp.Description("Name of the data stream (table)")),
mcp.WithString("startTime", mcp.Required(), mcp.Description("Query start time in ISO 8601 (format: yyyy-MM-ddTHH:mm:ss+hh:mm)")),
mcp.WithString("endTime", mcp.Required(), mcp.Description("Query end time in ISO 8601 (format: yyyy-MM-ddTHH:mm:ss+hh:mm)")),
Expand All @@ -28,32 +25,24 @@ func RegisterQueryDataStreamTool(mcpServer *server.MCPServer) {
if query == "" || streamName == "" || startTime == "" || endTime == "" {
return mcp.NewToolResultError("missing required fields: query, streamName, startTime, and endTime are required"), nil
}
payload := map[string]string{
"query": query,
"streamName": streamName,
"startTime": startTime,
"endTime": endTime,
}
jsonPayload, _ := json.Marshal(payload)
url := ParseableBaseURL + parseableSQLPath
httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload))
if err != nil {
return mcp.NewToolResultError("failed to create request"), nil
}
httpReq.Header.Set("Content-Type", "application/json")
addBasicAuth(httpReq)
resp, err := http.DefaultClient.Do(httpReq)
queryResult, err := doParseableQuery(query, streamName, startTime, endTime)
if err != nil {
return mcp.NewToolResultError("query failed: " + err.Error()), nil
}
defer resp.Body.Close()
var result interface{}
body, _ := io.ReadAll(resp.Body)
if err := json.Unmarshal(body, &result); err != nil {
return mcp.NewToolResultError("failed to parse parseable response"), nil

b, err := json.MarshalIndent(queryResult, "", " ")
if err != nil {
return nil, err
}
// Default: return as text
return mcp.NewToolResultText(fmt.Sprintf("%v", result)), nil

// Optional: a one-liner summary that sets expectations.
text := fmt.Sprintf(
"Returned %d rows as JSON (array of objects). Use keys exactly as shown.\n```json\n%s\n```",
len(queryResult),
string(b),
)

return mcp.NewToolResultText(text), nil
// Optionally, for structured output:
// return mcp.NewToolResultStructured(map[string]interface{}{"result": result}, "Query successful"), nil
})
Expand Down
40 changes: 40 additions & 0 deletions tools/parseable_roles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package tools

import (
"context"
"fmt"
"strings"

"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)

func RegisterGetRolesTool(mcpServer *server.MCPServer) {
mcpServer.AddTool(mcp.NewTool(
"get_roles",
mcp.WithDescription(`Get information about the Parseable roles. Roles is used for handling RBAC permissions/privilege and define access to datasets. Calls /api/v1/roles.

Data is returned as a dictionary with the role name and a list of privilege. The privilege can be of the following:
- admin - have all privileges
- editor - have limited privileges like cluster features
- reader - allow read from datasets
- writer - allow read and write from datasets
- ingestor - allow write from datasets

For reader, writer and ingestor role there is always at least one resource connected to the role. This resources is typical a dataset.
For full description of roles and RBAC use https://www.parseable.com/docs/user-guide/rbac
`),
), func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
about, err := getParseableRoles()
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
var lines []string
for k, v := range about {
lines = append(lines, k+": "+fmt.Sprintf("%v", v))
}
return mcp.NewToolResultText(strings.Join(lines, "\n")), nil
// Optionally, for structured output:
// return mcp.NewToolResultStructured(map[string]interface{}{"info": info}, "Info returned"), nil
})
}
2 changes: 2 additions & 0 deletions tools/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ func RegisterParseableTools(mcpServer *server.MCPServer) {
RegisterGetDataStreamSchemaTool(mcpServer)
RegisterGetDataStreamStatsTool(mcpServer)
RegisterGetDataStreamInfoTool(mcpServer)
RegisterGetAboutTool(mcpServer)
RegisterGetRolesTool(mcpServer)
}
Loading