From 67bbf39e4121be2bf60668d6d8f47c3aa5ea7806 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 05:32:53 +0000 Subject: [PATCH 1/4] Initial plan From 5cd1c29e2ec6a891af1f87fcd263633f4ec6a7ed Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 05:53:12 +0000 Subject: [PATCH 2/4] fix: add missing step types and dynamic plugin manifest loading to MCP server Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- mcp/server.go | 93 +++++++++++-- mcp/server_test.go | 283 ++++++++++++++++++++++++++++++++++++++++ schema/module_schema.go | 183 ++++++++++++++++++++++++++ schema/schema.go | 109 +++++++++++++++- 4 files changed, 653 insertions(+), 15 deletions(-) diff --git a/mcp/server.go b/mcp/server.go index 917c69bd..ff3afcb8 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -15,11 +15,11 @@ import ( "strings" "github.com/GoCodeAlone/workflow/config" - "golang.org/x/text/cases" - "golang.org/x/text/language" "github.com/GoCodeAlone/workflow/schema" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "golang.org/x/text/cases" + "golang.org/x/text/language" ) // Version is the MCP server version, set at build time. @@ -34,7 +34,8 @@ type Server struct { // NewServer creates a new MCP server with all workflow engine tools and // resources registered. pluginDir is the directory where installed plugins -// reside (e.g., "data/plugins"). +// reside (e.g., "data/plugins"). If set, the server will read plugin manifests +// from this directory and include plugin-provided types in all type listings. func NewServer(pluginDir string) *Server { s := &Server{ pluginDir: pluginDir, @@ -51,6 +52,12 @@ func NewServer(pluginDir string) *Server { "and manage plugins. Resources provide documentation and example configurations."), ) + // Load types from installed plugin manifests so that plugin-provided types + // appear in all type listings (list_module_types, list_step_types, etc.). + if pluginDir != "" { + s.loadInstalledPluginTypes(pluginDir) + } + s.registerTools() s.registerResources() @@ -72,7 +79,7 @@ func (s *Server) registerTools() { // list_module_types s.mcpServer.AddTool( mcp.NewTool("list_module_types", - mcp.WithDescription("List all available workflow module types that can be used in the 'modules' section of a workflow YAML config. Returns both built-in and plugin-provided types."), + mcp.WithDescription("List all available workflow module types that can be used in the 'modules' section of a workflow YAML config. Returns built-in types plus types from installed plugins (loaded from plugin_dir at server startup)."), mcp.WithReadOnlyHintAnnotation(true), ), s.handleListModuleTypes, @@ -81,7 +88,7 @@ func (s *Server) registerTools() { // list_step_types s.mcpServer.AddTool( mcp.NewTool("list_step_types", - mcp.WithDescription("List all available pipeline step types that can be used in pipeline definitions. Steps are the building blocks of workflow pipelines."), + mcp.WithDescription("List all available pipeline step types that can be used in pipeline definitions. Returns built-in steps plus steps from installed plugins (loaded from plugin_dir at server startup)."), mcp.WithReadOnlyHintAnnotation(true), ), s.handleListStepTypes, @@ -90,7 +97,7 @@ func (s *Server) registerTools() { // list_trigger_types s.mcpServer.AddTool( mcp.NewTool("list_trigger_types", - mcp.WithDescription("List all available trigger types (e.g., http, schedule, event, eventbus) that can start workflow execution."), + mcp.WithDescription("List all available trigger types (e.g., http, schedule, event, eventbus) that can start workflow execution. Includes types from installed plugins."), mcp.WithReadOnlyHintAnnotation(true), ), s.handleListTriggerTypes, @@ -99,7 +106,7 @@ func (s *Server) registerTools() { // list_workflow_types s.mcpServer.AddTool( mcp.NewTool("list_workflow_types", - mcp.WithDescription("List all available workflow handler types (e.g., http, messaging, statemachine, scheduler, integration, event) that define how workflows process work."), + mcp.WithDescription("List all available workflow handler types (e.g., http, messaging, statemachine, scheduler, integration, event, pipeline) that define how workflows process work. Includes types from installed plugins."), mcp.WithReadOnlyHintAnnotation(true), ), s.handleListWorkflowTypes, @@ -394,16 +401,32 @@ func (s *Server) handleListPlugins(_ context.Context, req mcp.CallToolRequest) ( } type pluginInfo struct { - Name string `json:"name"` - Version string `json:"version"` + Name string `json:"name"` + Version string `json:"version"` + ModuleTypes []string `json:"module_types,omitempty"` + StepTypes []string `json:"step_types,omitempty"` + TriggerTypes []string `json:"trigger_types,omitempty"` + WorkflowTypes []string `json:"workflow_types,omitempty"` } var plugins []pluginInfo for _, e := range entries { if !e.IsDir() { continue } - ver := readPluginVersion(filepath.Join(dataDir, e.Name())) - plugins = append(plugins, pluginInfo{Name: e.Name(), Version: ver}) + dir := filepath.Join(dataDir, e.Name()) + ver := readPluginVersion(dir) + info := pluginInfo{Name: e.Name(), Version: ver} + // Enrich with type declarations from the plugin manifest. + if data, err := os.ReadFile(filepath.Join(dir, "plugin.json")); err == nil { //nolint:gosec // G304: path is within the trusted plugins directory + var m pluginManifestTypes + if json.Unmarshal(data, &m) == nil { + info.ModuleTypes = m.ModuleTypes + info.StepTypes = m.StepTypes + info.TriggerTypes = m.TriggerTypes + info.WorkflowTypes = m.WorkflowTypes + } + } + plugins = append(plugins, info) } result := map[string]any{ @@ -483,6 +506,54 @@ func marshalToolResult(v any) (*mcp.CallToolResult, error) { return mcp.NewToolResultText(string(data)), nil } +// pluginManifestTypes holds the type declarations from a plugin.json manifest file. +// This is a minimal subset of plugin.PluginManifest used to avoid a package dependency. +type pluginManifestTypes struct { + ModuleTypes []string `json:"moduleTypes"` + StepTypes []string `json:"stepTypes"` + TriggerTypes []string `json:"triggerTypes"` + WorkflowTypes []string `json:"workflowTypes"` +} + +// loadInstalledPluginTypes scans pluginDir for subdirectories containing a +// plugin.json manifest, reads each manifest's type declarations, and registers +// them with the schema package so that they appear in all type listings. +// Unknown or malformed manifests are silently skipped. +func (s *Server) loadInstalledPluginTypes(pluginDir string) { + entries, err := os.ReadDir(pluginDir) + if err != nil { + return + } + for _, e := range entries { + if !e.IsDir() { + continue + } + manifestPath := filepath.Join(pluginDir, e.Name(), "plugin.json") + data, err := os.ReadFile(manifestPath) //nolint:gosec // G304: path is within the trusted plugins directory + if err != nil { + continue + } + var m pluginManifestTypes + if err := json.Unmarshal(data, &m); err != nil { + continue + } + for _, t := range m.ModuleTypes { + schema.RegisterModuleType(t) + } + for _, t := range m.StepTypes { + // Step types are also surfaced as module types in the MCP server view + // (they share the same registry and are identified by the "step." prefix). + schema.RegisterModuleType(t) + } + for _, t := range m.TriggerTypes { + schema.RegisterTriggerType(t) + } + for _, t := range m.WorkflowTypes { + schema.RegisterWorkflowType(t) + } + } +} + func readPluginVersion(dir string) string { data, err := os.ReadFile(filepath.Join(dir, "plugin.json")) if err != nil { diff --git a/mcp/server_test.go b/mcp/server_test.go index 20a5a561..b0b092d3 100644 --- a/mcp/server_test.go +++ b/mcp/server_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "github.com/GoCodeAlone/workflow/schema" "github.com/mark3labs/mcp-go/mcp" ) @@ -84,6 +85,288 @@ func TestListStepTypes(t *testing.T) { t.Errorf("step type %q does not start with 'step.'", str) } } + + // Verify that step.foreach (previously missing) is now listed. + stepSet := make(map[string]bool) + for _, s := range steps { + stepSet[s.(string)] = true + } + for _, expected := range []string{ + "step.foreach", + "step.webhook_verify", + "step.cache_get", + "step.cache_set", + "step.cache_delete", + "step.event_publish", + "step.retry_with_backoff", + "step.resilient_circuit_breaker", + } { + if !stepSet[expected] { + t.Errorf("expected step type %q not found in list", expected) + } + } +} + +func TestListStepTypes_PluginDir(t *testing.T) { + // Create a temp plugin directory with a plugin that declares a custom step type. + dir := t.TempDir() + pluginDir := dir + "/custom-plugin" + if err := os.MkdirAll(pluginDir, 0750); err != nil { + t.Fatal(err) + } + manifest := []byte(`{ + "name": "custom-plugin", + "version": "1.0.0", + "author": "test", + "description": "test plugin", + "stepTypes": ["step.authz_check", "step.chimera_custom"] + }`) + if err := os.WriteFile(pluginDir+"/plugin.json", manifest, 0640); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + schema.UnregisterModuleType("step.authz_check") + schema.UnregisterModuleType("step.chimera_custom") + }) + + srv := NewServer(dir) + result, err := srv.handleListStepTypes(context.Background(), mcp.CallToolRequest{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + text := extractText(t, result) + var data map[string]any + if err := json.Unmarshal([]byte(text), &data); err != nil { + t.Fatalf("failed to parse result JSON: %v", err) + } + + steps, ok := data["step_types"].([]any) + if !ok { + t.Fatal("step_types not found in result") + } + stepSet := make(map[string]bool) + for _, s := range steps { + stepSet[s.(string)] = true + } + if !stepSet["step.authz_check"] { + t.Error("expected plugin step type 'step.authz_check' to be listed") + } + if !stepSet["step.chimera_custom"] { + t.Error("expected plugin step type 'step.chimera_custom' to be listed") + } +} + +func TestListModuleTypes_PluginDir(t *testing.T) { + // Create a temp plugin directory with a plugin that declares custom module types. + dir := t.TempDir() + pluginDir := dir + "/auth-plugin" + if err := os.MkdirAll(pluginDir, 0750); err != nil { + t.Fatal(err) + } + manifest := []byte(`{ + "name": "auth-plugin", + "version": "2.0.0", + "author": "test", + "description": "auth plugin", + "moduleTypes": ["auth.m2m", "auth.oauth2"] + }`) + if err := os.WriteFile(pluginDir+"/plugin.json", manifest, 0640); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + schema.UnregisterModuleType("auth.m2m") + schema.UnregisterModuleType("auth.oauth2") + }) + + srv := NewServer(dir) + result, err := srv.handleListModuleTypes(context.Background(), mcp.CallToolRequest{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + text := extractText(t, result) + var data map[string]any + if err := json.Unmarshal([]byte(text), &data); err != nil { + t.Fatalf("failed to parse result JSON: %v", err) + } + + types, ok := data["module_types"].([]any) + if !ok { + t.Fatal("module_types not found in result") + } + typeSet := make(map[string]bool) + for _, tt := range types { + typeSet[tt.(string)] = true + } + if !typeSet["auth.m2m"] { + t.Error("expected plugin module type 'auth.m2m' to be listed") + } + if !typeSet["auth.oauth2"] { + t.Error("expected plugin module type 'auth.oauth2' to be listed") + } +} + +func TestListTriggerTypes_PluginDir(t *testing.T) { + dir := t.TempDir() + pluginDir := dir + "/custom-trigger-plugin" + if err := os.MkdirAll(pluginDir, 0750); err != nil { + t.Fatal(err) + } + manifest := []byte(`{ + "name": "custom-trigger-plugin", + "version": "1.0.0", + "author": "test", + "description": "custom trigger plugin", + "triggerTypes": ["reconciliation", "webhook"] + }`) + if err := os.WriteFile(pluginDir+"/plugin.json", manifest, 0640); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + schema.UnregisterTriggerType("reconciliation") + schema.UnregisterTriggerType("webhook") + }) + + srv := NewServer(dir) + result, err := srv.handleListTriggerTypes(context.Background(), mcp.CallToolRequest{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + text := extractText(t, result) + var data map[string]any + if err := json.Unmarshal([]byte(text), &data); err != nil { + t.Fatalf("failed to parse result JSON: %v", err) + } + + triggers, ok := data["trigger_types"].([]any) + if !ok { + t.Fatal("trigger_types not found in result") + } + triggerSet := make(map[string]bool) + for _, tt := range triggers { + triggerSet[tt.(string)] = true + } + if !triggerSet["reconciliation"] { + t.Error("expected plugin trigger type 'reconciliation' to be listed") + } +} + +func TestListWorkflowTypes_PluginDir(t *testing.T) { + dir := t.TempDir() + pluginDir := dir + "/pipeline-plugin" + if err := os.MkdirAll(pluginDir, 0750); err != nil { + t.Fatal(err) + } + manifest := []byte(`{ + "name": "pipeline-plugin", + "version": "1.0.0", + "author": "test", + "description": "pipeline plugin", + "workflowTypes": ["pipeline", "custom_workflow"] + }`) + if err := os.WriteFile(pluginDir+"/plugin.json", manifest, 0640); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + schema.UnregisterWorkflowType("pipeline") + schema.UnregisterWorkflowType("custom_workflow") + }) + + srv := NewServer(dir) + result, err := srv.handleListWorkflowTypes(context.Background(), mcp.CallToolRequest{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + text := extractText(t, result) + var data map[string]any + if err := json.Unmarshal([]byte(text), &data); err != nil { + t.Fatalf("failed to parse result JSON: %v", err) + } + + workflows, ok := data["workflow_types"].([]any) + if !ok { + t.Fatal("workflow_types not found in result") + } + workflowSet := make(map[string]bool) + for _, wt := range workflows { + workflowSet[wt.(string)] = true + } + if !workflowSet["pipeline"] { + t.Error("expected plugin workflow type 'pipeline' to be listed") + } + if !workflowSet["custom_workflow"] { + t.Error("expected plugin workflow type 'custom_workflow' to be listed") + } +} + +func TestListPlugins_WithTypes(t *testing.T) { + dir := t.TempDir() + pluginDir := dir + "/my-plugin" + if err := os.MkdirAll(pluginDir, 0750); err != nil { + t.Fatal(err) + } + manifest := []byte(`{ + "name": "my-plugin", + "version": "3.0.0", + "author": "test", + "description": "test plugin", + "moduleTypes": ["my.module"], + "stepTypes": ["step.my_step"], + "triggerTypes": ["my_trigger"], + "workflowTypes": ["my_workflow"] + }`) + if err := os.WriteFile(pluginDir+"/plugin.json", manifest, 0640); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + schema.UnregisterModuleType("my.module") + schema.UnregisterModuleType("step.my_step") + schema.UnregisterTriggerType("my_trigger") + schema.UnregisterWorkflowType("my_workflow") + }) + + srv := NewServer(dir) + req := makeCallToolRequest(map[string]any{}) + result, err := srv.handleListPlugins(context.Background(), req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + text := extractText(t, result) + var data map[string]any + if err := json.Unmarshal([]byte(text), &data); err != nil { + t.Fatalf("failed to parse result JSON: %v", err) + } + + plugins := data["plugins"].([]any) + if len(plugins) != 1 { + t.Fatalf("expected 1 plugin, got %d", len(plugins)) + } + p := plugins[0].(map[string]any) + if p["version"] != "3.0.0" { + t.Errorf("expected version 3.0.0, got %v", p["version"]) + } + + // Verify type declarations are included in the plugin listing. + moduleTypes, ok := p["module_types"].([]any) + if !ok || len(moduleTypes) == 0 { + t.Error("expected module_types in plugin listing") + } + stepTypes, ok := p["step_types"].([]any) + if !ok || len(stepTypes) == 0 { + t.Error("expected step_types in plugin listing") + } + if stepTypes[0] != "step.my_step" { + t.Errorf("expected step type 'step.my_step', got %v", stepTypes[0]) + } } func TestListTriggerTypes(t *testing.T) { diff --git a/schema/module_schema.go b/schema/module_schema.go index 017bb7fe..db62f30a 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1594,6 +1594,189 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { }, }) + // ---- Additional Pipeline Steps ---- + + r.Register(&ModuleSchema{ + Type: "step.foreach", + Label: "For Each", + Category: "pipeline_steps", + Description: "Iterates over a collection and executes a sub-pipeline step for each item", + ConfigFields: []ConfigFieldDef{ + {Key: "items_from", Label: "Items From", Type: FieldTypeString, Required: true, Description: "Dotted path to the collection to iterate over", Placeholder: "steps.fetch.items"}, + {Key: "step", Label: "Step Type", Type: FieldTypeString, Required: true, Description: "Step type to execute for each item"}, + {Key: "step_config", Label: "Step Config", Type: FieldTypeJSON, Description: "Configuration for the sub-step"}, + {Key: "output_key", Label: "Output Key", Type: FieldTypeString, Description: "Key to store the collected results in context", DefaultValue: "foreach_results"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.webhook_verify", + Label: "Webhook Verify", + Category: "pipeline_steps", + Description: "Verifies incoming webhook request signatures (supports HMAC-SHA1, HMAC-SHA256)", + ConfigFields: []ConfigFieldDef{ + {Key: "provider", Label: "Provider", Type: FieldTypeSelect, Options: []string{"github", "stripe", "generic"}, Description: "Webhook provider (legacy; prefer scheme)"}, + {Key: "scheme", Label: "Scheme", Type: FieldTypeSelect, Options: []string{"hmac-sha1", "hmac-sha256", "hmac-sha256-hex"}, Description: "HMAC signature scheme to use"}, + {Key: "secret", Label: "Secret", Type: FieldTypeString, Sensitive: true, Description: "Webhook signing secret"}, + {Key: "secret_from", Label: "Secret From", Type: FieldTypeString, Description: "Context key containing the secret at runtime"}, + {Key: "signature_header", Label: "Signature Header", Type: FieldTypeString, Description: "HTTP header containing the signature", Placeholder: "X-Hub-Signature-256"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.cache_get", + Label: "Cache Get", + Category: "pipeline_steps", + Description: "Retrieves a value from the cache by key", + ConfigFields: []ConfigFieldDef{ + {Key: "key", Label: "Key", Type: FieldTypeString, Required: true, Description: "Cache key (supports template expressions)", Placeholder: "user:{{.user_id}}"}, + {Key: "output_key", Label: "Output Key", Type: FieldTypeString, Description: "Context key for the retrieved value", DefaultValue: "cache_value"}, + {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Description: "Name of the cache module to use"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.cache_set", + Label: "Cache Set", + Category: "pipeline_steps", + Description: "Stores a value in the cache with optional TTL", + ConfigFields: []ConfigFieldDef{ + {Key: "key", Label: "Key", Type: FieldTypeString, Required: true, Description: "Cache key (supports template expressions)", Placeholder: "user:{{.user_id}}"}, + {Key: "value_from", Label: "Value From", Type: FieldTypeString, Description: "Dotted path to the value to cache"}, + {Key: "ttl", Label: "TTL", Type: FieldTypeDuration, Description: "Cache entry time-to-live", Placeholder: "5m"}, + {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Description: "Name of the cache module to use"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.cache_delete", + Label: "Cache Delete", + Category: "pipeline_steps", + Description: "Removes a value from the cache by key", + ConfigFields: []ConfigFieldDef{ + {Key: "key", Label: "Key", Type: FieldTypeString, Required: true, Description: "Cache key to delete (supports template expressions)", Placeholder: "user:{{.user_id}}"}, + {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Description: "Name of the cache module to use"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.event_publish", + Label: "Event Publish", + Category: "pipeline_steps", + Description: "Publishes an event to the event bus", + ConfigFields: []ConfigFieldDef{ + {Key: "event_type", Label: "Event Type", Type: FieldTypeString, Required: true, Description: "Event type identifier to publish", Placeholder: "user.created"}, + {Key: "payload_from", Label: "Payload From", Type: FieldTypeString, Description: "Dotted path to the event payload in the pipeline context"}, + {Key: "payload", Label: "Payload", Type: FieldTypeJSON, Description: "Static event payload (supports template expressions)"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.validate_path_param", + Label: "Validate Path Param", + Category: "pipeline_steps", + Description: "Validates and extracts a URL path parameter", + ConfigFields: []ConfigFieldDef{ + {Key: "param", Label: "Parameter Name", Type: FieldTypeString, Required: true, Description: "Path parameter name to extract", Placeholder: "id"}, + {Key: "required", Label: "Required", Type: FieldTypeBool, DefaultValue: true, Description: "Whether the parameter is required"}, + {Key: "output_key", Label: "Output Key", Type: FieldTypeString, Description: "Context key for the extracted value"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.validate_pagination", + Label: "Validate Pagination", + Category: "pipeline_steps", + Description: "Validates and normalizes pagination query parameters (page, page_size, limit, offset)", + ConfigFields: []ConfigFieldDef{ + {Key: "default_page_size", Label: "Default Page Size", Type: FieldTypeNumber, DefaultValue: 20, Description: "Default number of items per page"}, + {Key: "max_page_size", Label: "Max Page Size", Type: FieldTypeNumber, DefaultValue: 100, Description: "Maximum allowed page size"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.validate_request_body", + Label: "Validate Request Body", + Category: "pipeline_steps", + Description: "Parses and validates the HTTP request body against a schema", + ConfigFields: []ConfigFieldDef{ + {Key: "schema", Label: "JSON Schema", Type: FieldTypeJSON, Description: "JSON Schema to validate the request body against"}, + {Key: "output_key", Label: "Output Key", Type: FieldTypeString, DefaultValue: "body", Description: "Context key to store the parsed body"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.dlq_send", + Label: "DLQ Send", + Category: "pipeline_steps", + Description: "Sends a failed message to the dead letter queue for later replay", + ConfigFields: []ConfigFieldDef{ + {Key: "queue", Label: "Queue Name", Type: FieldTypeString, Description: "DLQ queue name (defaults to pipeline name)"}, + {Key: "reason_from", Label: "Reason From", Type: FieldTypeString, Description: "Context key containing the failure reason"}, + {Key: "payload_from", Label: "Payload From", Type: FieldTypeString, Description: "Dotted path to the message payload"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.dlq_replay", + Label: "DLQ Replay", + Category: "pipeline_steps", + Description: "Replays messages from the dead letter queue", + ConfigFields: []ConfigFieldDef{ + {Key: "queue", Label: "Queue Name", Type: FieldTypeString, Description: "DLQ queue name to replay from"}, + {Key: "limit", Label: "Limit", Type: FieldTypeNumber, DefaultValue: 10, Description: "Maximum number of messages to replay"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.retry_with_backoff", + Label: "Retry With Backoff", + Category: "pipeline_steps", + Description: "Wraps a sub-step with automatic retry logic using exponential backoff", + ConfigFields: []ConfigFieldDef{ + {Key: "step", Label: "Step Type", Type: FieldTypeString, Required: true, Description: "Step type to retry"}, + {Key: "step_config", Label: "Step Config", Type: FieldTypeJSON, Description: "Configuration for the wrapped step"}, + {Key: "max_attempts", Label: "Max Attempts", Type: FieldTypeNumber, DefaultValue: 3, Description: "Maximum number of retry attempts"}, + {Key: "initial_delay", Label: "Initial Delay", Type: FieldTypeDuration, DefaultValue: "100ms", Description: "Initial delay before first retry"}, + {Key: "max_delay", Label: "Max Delay", Type: FieldTypeDuration, DefaultValue: "30s", Description: "Maximum delay between retries"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.resilient_circuit_breaker", + Label: "Resilient Circuit Breaker", + Category: "pipeline_steps", + Description: "Wraps a sub-step with circuit breaker pattern to prevent cascading failures", + ConfigFields: []ConfigFieldDef{ + {Key: "step", Label: "Step Type", Type: FieldTypeString, Required: true, Description: "Step type to protect"}, + {Key: "step_config", Label: "Step Config", Type: FieldTypeJSON, Description: "Configuration for the wrapped step"}, + {Key: "threshold", Label: "Failure Threshold", Type: FieldTypeNumber, DefaultValue: 5, Description: "Number of consecutive failures to open the circuit"}, + {Key: "timeout", Label: "Timeout", Type: FieldTypeDuration, DefaultValue: "60s", Description: "Duration to keep the circuit open before trying again"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.ui_scaffold", + Label: "UI Scaffold", + Category: "pipeline_steps", + Description: "Generates UI component scaffolding from a prompt using AI assistance", + ConfigFields: []ConfigFieldDef{ + {Key: "prompt_from", Label: "Prompt From", Type: FieldTypeString, Description: "Context key containing the scaffold prompt"}, + {Key: "output_key", Label: "Output Key", Type: FieldTypeString, DefaultValue: "scaffold_result", Description: "Context key for the generated scaffold"}, + }, + }) + + r.Register(&ModuleSchema{ + Type: "step.ui_scaffold_analyze", + Label: "UI Scaffold Analyze", + Category: "pipeline_steps", + Description: "Analyzes existing UI components to extract scaffold metadata", + ConfigFields: []ConfigFieldDef{ + {Key: "target_from", Label: "Target From", Type: FieldTypeString, Description: "Context key containing the component to analyze"}, + {Key: "output_key", Label: "Output Key", Type: FieldTypeString, DefaultValue: "analysis_result", Description: "Context key for the analysis result"}, + }, + }) + // ---- License ---- r.Register(&ModuleSchema{ diff --git a/schema/schema.go b/schema/schema.go index da0a492e..557728f4 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -14,6 +14,18 @@ var ( dynamicModuleTypes = make(map[string]bool) ) +// dynamicTriggerTypes holds trigger types registered at runtime by plugins. +var ( + dynamicTriggerMu sync.RWMutex + dynamicTriggerTypes = make(map[string]bool) +) + +// dynamicWorkflowTypes holds workflow types registered at runtime by plugins. +var ( + dynamicWorkflowMu sync.RWMutex + dynamicWorkflowTypes = make(map[string]bool) +) + // RegisterModuleType registers a module type so it is recognized by KnownModuleTypes. func RegisterModuleType(moduleType string) { dynamicMu.Lock() @@ -28,6 +40,34 @@ func UnregisterModuleType(moduleType string) { delete(dynamicModuleTypes, moduleType) } +// RegisterTriggerType registers a trigger type so it is recognized by KnownTriggerTypes. +func RegisterTriggerType(triggerType string) { + dynamicTriggerMu.Lock() + defer dynamicTriggerMu.Unlock() + dynamicTriggerTypes[triggerType] = true +} + +// UnregisterTriggerType removes a dynamically registered trigger type. Intended for testing. +func UnregisterTriggerType(triggerType string) { + dynamicTriggerMu.Lock() + defer dynamicTriggerMu.Unlock() + delete(dynamicTriggerTypes, triggerType) +} + +// RegisterWorkflowType registers a workflow type so it is recognized by KnownWorkflowTypes. +func RegisterWorkflowType(workflowType string) { + dynamicWorkflowMu.Lock() + defer dynamicWorkflowMu.Unlock() + dynamicWorkflowTypes[workflowType] = true +} + +// UnregisterWorkflowType removes a dynamically registered workflow type. Intended for testing. +func UnregisterWorkflowType(workflowType string) { + dynamicWorkflowMu.Lock() + defer dynamicWorkflowMu.Unlock() + delete(dynamicWorkflowTypes, workflowType) +} + // Schema represents a JSON Schema document. type Schema struct { Schema string `json:"$schema"` @@ -108,6 +148,9 @@ var coreModuleTypes = []string{ "step.artifact_pull", "step.artifact_push", "step.build_ui", + "step.cache_delete", + "step.cache_get", + "step.cache_set", "step.circuit_breaker", "step.conditional", "step.constraint_check", @@ -115,12 +158,16 @@ var coreModuleTypes = []string{ "step.db_query", "step.delegate", "step.deploy", + "step.dlq_replay", + "step.dlq_send", "step.docker_build", "step.docker_push", "step.docker_run", "step.drift_check", + "step.event_publish", "step.feature_flag", "step.ff_gate", + "step.foreach", "step.gate", "step.http_call", "step.jq", @@ -133,6 +180,8 @@ var coreModuleTypes = []string{ "step.publish", "step.rate_limit", "step.request_parse", + "step.resilient_circuit_breaker", + "step.retry_with_backoff", "step.scan_container", "step.scan_deps", "step.scan_sast", @@ -140,7 +189,13 @@ var coreModuleTypes = []string{ "step.shell_exec", "step.sub_workflow", "step.transform", + "step.ui_scaffold", + "step.ui_scaffold_analyze", "step.validate", + "step.validate_pagination", + "step.validate_path_param", + "step.validate_request_body", + "step.webhook_verify", "step.workflow_call", "storage.gcs", "storage.local", @@ -187,19 +242,43 @@ func KnownModuleTypes() []string { return result } -// KnownTriggerTypes returns all built-in trigger type identifiers. +// KnownTriggerTypes returns all built-in trigger type identifiers plus any types +// registered at runtime by plugins. The result is sorted and deduplicated. func KnownTriggerTypes() []string { - return []string{ + core := []string{ "http", "schedule", "event", "eventbus", } + + dynamicTriggerMu.RLock() + defer dynamicTriggerMu.RUnlock() + + if len(dynamicTriggerTypes) == 0 { + return core + } + + seen := make(map[string]bool, len(core)+len(dynamicTriggerTypes)) + for _, t := range core { + seen[t] = true + } + for t := range dynamicTriggerTypes { + seen[t] = true + } + + result := make([]string, 0, len(seen)) + for t := range seen { + result = append(result, t) + } + sort.Strings(result) + return result } -// KnownWorkflowTypes returns all built-in workflow handler type identifiers. +// KnownWorkflowTypes returns all built-in workflow handler type identifiers plus any types +// registered at runtime by plugins. The result is sorted and deduplicated. func KnownWorkflowTypes() []string { - return []string{ + core := []string{ "event", "http", "messaging", @@ -207,6 +286,28 @@ func KnownWorkflowTypes() []string { "scheduler", "integration", } + + dynamicWorkflowMu.RLock() + defer dynamicWorkflowMu.RUnlock() + + if len(dynamicWorkflowTypes) == 0 { + return core + } + + seen := make(map[string]bool, len(core)+len(dynamicWorkflowTypes)) + for _, t := range core { + seen[t] = true + } + for t := range dynamicWorkflowTypes { + seen[t] = true + } + + result := make([]string, 0, len(seen)) + for t := range seen { + result = append(result, t) + } + sort.Strings(result) + return result } // GenerateWorkflowSchema produces the full JSON Schema describing a valid From 9d196881df211a69c87e295a5f6b590c481af9a5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 07:22:30 +0000 Subject: [PATCH 3/4] fix(schema): correct config field names in new pipeline step schemas MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - step.foreach: items_from→collection; add item_var/item_key/index_key; step/steps as maps - step.validate_path_param: param→params (array); add format and source fields - step.validate_request_body: replace schema/output_key with required_fields (actual impl) - step.retry_with_backoff: step_config removed, max_attempts→max_retries, add multiplier; step is map - step.resilient_circuit_breaker: step_config removed, threshold→failure_threshold, timeout→reset_timeout, add fallback map field - step.ui_scaffold: replace prompt_from/output_key with title/theme/auth/filename (actual impl) - step.ui_scaffold_analyze: replace target_from/output_key with title/theme (actual impl) Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- schema/module_schema.go | 53 ++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/schema/module_schema.go b/schema/module_schema.go index db62f30a..01685897 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1602,10 +1602,12 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Category: "pipeline_steps", Description: "Iterates over a collection and executes a sub-pipeline step for each item", ConfigFields: []ConfigFieldDef{ - {Key: "items_from", Label: "Items From", Type: FieldTypeString, Required: true, Description: "Dotted path to the collection to iterate over", Placeholder: "steps.fetch.items"}, - {Key: "step", Label: "Step Type", Type: FieldTypeString, Required: true, Description: "Step type to execute for each item"}, - {Key: "step_config", Label: "Step Config", Type: FieldTypeJSON, Description: "Configuration for the sub-step"}, - {Key: "output_key", Label: "Output Key", Type: FieldTypeString, Description: "Key to store the collected results in context", DefaultValue: "foreach_results"}, + {Key: "collection", Label: "Collection", Type: FieldTypeString, Required: true, Description: "Dotted path to the collection to iterate over", Placeholder: "steps.fetch.items"}, + {Key: "item_var", Label: "Item Variable", Type: FieldTypeString, Description: "Context variable name for the current item (defaults to 'item')", DefaultValue: "item"}, + {Key: "item_key", Label: "Item Key (legacy)", Type: FieldTypeString, Description: "Legacy alias for item_var"}, + {Key: "index_key", Label: "Index Key", Type: FieldTypeString, Description: "Context variable name for the current index (defaults to 'index')", DefaultValue: "index"}, + {Key: "step", Label: "Step", Type: FieldTypeMap, Description: "Single step map to execute for each item (mutually exclusive with steps); must include 'type' key"}, + {Key: "steps", Label: "Steps", Type: FieldTypeJSON, Description: "Array of step maps to execute for each item (mutually exclusive with step)"}, }, }) @@ -1675,11 +1677,11 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.validate_path_param", Label: "Validate Path Param", Category: "pipeline_steps", - Description: "Validates and extracts a URL path parameter", + Description: "Validates URL path parameters are present and optionally conform to a format (e.g. UUID)", ConfigFields: []ConfigFieldDef{ - {Key: "param", Label: "Parameter Name", Type: FieldTypeString, Required: true, Description: "Path parameter name to extract", Placeholder: "id"}, - {Key: "required", Label: "Required", Type: FieldTypeBool, DefaultValue: true, Description: "Whether the parameter is required"}, - {Key: "output_key", Label: "Output Key", Type: FieldTypeString, Description: "Context key for the extracted value"}, + {Key: "params", Label: "Parameter Names", Type: FieldTypeArray, Required: true, ArrayItemType: "string", Description: "List of path parameter names to validate", Placeholder: "id"}, + {Key: "format", Label: "Format", Type: FieldTypeString, Description: "Validation format to apply to each parameter (e.g. 'uuid')"}, + {Key: "source", Label: "Source Path", Type: FieldTypeString, Description: "Dotted path within the context to read path parameters from (e.g. 'steps.parse-request.path_params')"}, }, }) @@ -1698,10 +1700,9 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.validate_request_body", Label: "Validate Request Body", Category: "pipeline_steps", - Description: "Parses and validates the HTTP request body against a schema", + Description: "Parses the HTTP request body and validates required fields are present", ConfigFields: []ConfigFieldDef{ - {Key: "schema", Label: "JSON Schema", Type: FieldTypeJSON, Description: "JSON Schema to validate the request body against"}, - {Key: "output_key", Label: "Output Key", Type: FieldTypeString, DefaultValue: "body", Description: "Context key to store the parsed body"}, + {Key: "required_fields", Label: "Required Fields", Type: FieldTypeArray, ArrayItemType: "string", Description: "List of required top-level field names in the request body"}, }, }) @@ -1734,11 +1735,11 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Category: "pipeline_steps", Description: "Wraps a sub-step with automatic retry logic using exponential backoff", ConfigFields: []ConfigFieldDef{ - {Key: "step", Label: "Step Type", Type: FieldTypeString, Required: true, Description: "Step type to retry"}, - {Key: "step_config", Label: "Step Config", Type: FieldTypeJSON, Description: "Configuration for the wrapped step"}, - {Key: "max_attempts", Label: "Max Attempts", Type: FieldTypeNumber, DefaultValue: 3, Description: "Maximum number of retry attempts"}, - {Key: "initial_delay", Label: "Initial Delay", Type: FieldTypeDuration, DefaultValue: "100ms", Description: "Initial delay before first retry"}, + {Key: "step", Label: "Step", Type: FieldTypeMap, Required: true, Description: "Sub-step map to retry; must include 'type' key with inline step configuration"}, + {Key: "max_retries", Label: "Max Retries", Type: FieldTypeNumber, DefaultValue: 3, Description: "Maximum number of retry attempts"}, + {Key: "initial_delay", Label: "Initial Delay", Type: FieldTypeDuration, DefaultValue: "1s", Description: "Initial delay before first retry"}, {Key: "max_delay", Label: "Max Delay", Type: FieldTypeDuration, DefaultValue: "30s", Description: "Maximum delay between retries"}, + {Key: "multiplier", Label: "Backoff Multiplier", Type: FieldTypeNumber, DefaultValue: 2.0, Description: "Multiplier applied to the delay for each retry (exponential backoff factor)"}, }, }) @@ -1748,10 +1749,10 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Category: "pipeline_steps", Description: "Wraps a sub-step with circuit breaker pattern to prevent cascading failures", ConfigFields: []ConfigFieldDef{ - {Key: "step", Label: "Step Type", Type: FieldTypeString, Required: true, Description: "Step type to protect"}, - {Key: "step_config", Label: "Step Config", Type: FieldTypeJSON, Description: "Configuration for the wrapped step"}, - {Key: "threshold", Label: "Failure Threshold", Type: FieldTypeNumber, DefaultValue: 5, Description: "Number of consecutive failures to open the circuit"}, - {Key: "timeout", Label: "Timeout", Type: FieldTypeDuration, DefaultValue: "60s", Description: "Duration to keep the circuit open before trying again"}, + {Key: "step", Label: "Step", Type: FieldTypeMap, Required: true, Description: "Sub-step map to protect; must include 'type' key with inline step configuration"}, + {Key: "failure_threshold", Label: "Failure Threshold", Type: FieldTypeNumber, DefaultValue: 5, Description: "Number of consecutive failures to open the circuit"}, + {Key: "reset_timeout", Label: "Reset Timeout", Type: FieldTypeDuration, DefaultValue: "60s", Description: "Duration the circuit remains open before attempting a half-open state"}, + {Key: "fallback", Label: "Fallback Step", Type: FieldTypeMap, Description: "Optional fallback step map executed when the circuit is open; must include 'type' key"}, }, }) @@ -1759,10 +1760,12 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.ui_scaffold", Label: "UI Scaffold", Category: "pipeline_steps", - Description: "Generates UI component scaffolding from a prompt using AI assistance", + Description: "Generates a Vite+React+TypeScript UI scaffold from an OpenAPI spec (read from the request body or context) and returns a ZIP archive", ConfigFields: []ConfigFieldDef{ - {Key: "prompt_from", Label: "Prompt From", Type: FieldTypeString, Description: "Context key containing the scaffold prompt"}, - {Key: "output_key", Label: "Output Key", Type: FieldTypeString, DefaultValue: "scaffold_result", Description: "Context key for the generated scaffold"}, + {Key: "title", Label: "Title", Type: FieldTypeString, Description: "Title to use for the generated UI"}, + {Key: "theme", Label: "Theme", Type: FieldTypeString, Description: "UI theme or design system to target"}, + {Key: "auth", Label: "Auth", Type: FieldTypeBool, Description: "Whether to generate authentication UI components"}, + {Key: "filename", Label: "Filename", Type: FieldTypeString, DefaultValue: "scaffold.zip", Description: "Filename for the generated ZIP archive"}, }, }) @@ -1770,10 +1773,10 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.ui_scaffold_analyze", Label: "UI Scaffold Analyze", Category: "pipeline_steps", - Description: "Analyzes existing UI components to extract scaffold metadata", + Description: "Analyzes an OpenAPI spec (read from the request body or context) to produce scaffold analysis metadata", ConfigFields: []ConfigFieldDef{ - {Key: "target_from", Label: "Target From", Type: FieldTypeString, Description: "Context key containing the component to analyze"}, - {Key: "output_key", Label: "Output Key", Type: FieldTypeString, DefaultValue: "analysis_result", Description: "Context key for the analysis result"}, + {Key: "title", Label: "Title", Type: FieldTypeString, Description: "Title to use for the generated scaffold analysis"}, + {Key: "theme", Label: "Theme", Type: FieldTypeString, Description: "Visual theme or design system to target when generating scaffold analysis"}, }, }) From 42b6c99c67fb0ac14620824cf877ff13024763df Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 09:40:31 +0000 Subject: [PATCH 4/4] fix(schema): correct remaining step schema fields to match implementations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KnownTriggerTypes/KnownWorkflowTypes: always return sorted slice (even without dynamic types) - step.webhook_verify: add both scheme-based and legacy provider-based fields - step.cache_get: output_key→output (default "value"), cache required, add miss_ok - step.cache_set: value_from→value (required), cache required - step.cache_delete: cache now required - step.event_publish: event_type→topic (required), remove payload_from, add headers/broker - step.validate_pagination: default_page_size→default_page/default_limit, max_page_size→max_limit - step.dlq_send: queue/reason_from/payload_from→topic (required)/original_topic/error/payload/broker - step.dlq_replay: queue/limit→dlq_topic+target_topic (required)/max_messages/broker Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- schema/module_schema.go | 53 ++++++++++++++++++++++++----------------- schema/schema.go | 10 ++++++-- 2 files changed, 39 insertions(+), 24 deletions(-) diff --git a/schema/module_schema.go b/schema/module_schema.go index 63f0bd2b..ea8d3f48 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1635,11 +1635,12 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Category: "pipeline_steps", Description: "Verifies incoming webhook request signatures (supports HMAC-SHA1, HMAC-SHA256)", ConfigFields: []ConfigFieldDef{ + {Key: "scheme", Label: "Scheme", Type: FieldTypeSelect, Options: []string{"hmac-sha1", "hmac-sha256", "hmac-sha256-hex"}, Description: "HMAC signature scheme to use (preferred over provider)"}, {Key: "provider", Label: "Provider", Type: FieldTypeSelect, Options: []string{"github", "stripe", "generic"}, Description: "Webhook provider (legacy; prefer scheme)"}, - {Key: "scheme", Label: "Scheme", Type: FieldTypeSelect, Options: []string{"hmac-sha1", "hmac-sha256", "hmac-sha256-hex"}, Description: "HMAC signature scheme to use"}, {Key: "secret", Label: "Secret", Type: FieldTypeString, Sensitive: true, Description: "Webhook signing secret"}, - {Key: "secret_from", Label: "Secret From", Type: FieldTypeString, Description: "Context key containing the secret at runtime"}, - {Key: "signature_header", Label: "Signature Header", Type: FieldTypeString, Description: "HTTP header containing the signature", Placeholder: "X-Hub-Signature-256"}, + {Key: "secret_from", Label: "Secret From", Type: FieldTypeString, Description: "Context key containing the secret at runtime (scheme mode only)"}, + {Key: "signature_header", Label: "Signature Header", Type: FieldTypeString, Description: "HTTP header containing the signature (scheme mode only)", Placeholder: "X-Hub-Signature-256"}, + {Key: "header", Label: "Signature Header (legacy)", Type: FieldTypeString, Description: "HTTP header containing the signature (provider/legacy mode)", Placeholder: "X-Hub-Signature-256"}, }, }) @@ -1650,8 +1651,9 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Description: "Retrieves a value from the cache by key", ConfigFields: []ConfigFieldDef{ {Key: "key", Label: "Key", Type: FieldTypeString, Required: true, Description: "Cache key (supports template expressions)", Placeholder: "user:{{.user_id}}"}, - {Key: "output_key", Label: "Output Key", Type: FieldTypeString, Description: "Context key for the retrieved value", DefaultValue: "cache_value"}, - {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Description: "Name of the cache module to use"}, + {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Required: true, Description: "Name of the cache module to use"}, + {Key: "output", Label: "Output Key", Type: FieldTypeString, Description: "Context key to store the retrieved value", DefaultValue: "value"}, + {Key: "miss_ok", Label: "Allow Cache Miss", Type: FieldTypeBool, Description: "If true, do not fail when the cache key is missing (default: true)"}, }, }) @@ -1662,9 +1664,9 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Description: "Stores a value in the cache with optional TTL", ConfigFields: []ConfigFieldDef{ {Key: "key", Label: "Key", Type: FieldTypeString, Required: true, Description: "Cache key (supports template expressions)", Placeholder: "user:{{.user_id}}"}, - {Key: "value_from", Label: "Value From", Type: FieldTypeString, Description: "Dotted path to the value to cache"}, + {Key: "value", Label: "Value", Type: FieldTypeString, Required: true, Description: "Value to cache (supports template expressions, e.g. {{.field}})"}, + {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Required: true, Description: "Name of the cache module to use"}, {Key: "ttl", Label: "TTL", Type: FieldTypeDuration, Description: "Cache entry time-to-live", Placeholder: "5m"}, - {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Description: "Name of the cache module to use"}, }, }) @@ -1675,7 +1677,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Description: "Removes a value from the cache by key", ConfigFields: []ConfigFieldDef{ {Key: "key", Label: "Key", Type: FieldTypeString, Required: true, Description: "Cache key to delete (supports template expressions)", Placeholder: "user:{{.user_id}}"}, - {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Description: "Name of the cache module to use"}, + {Key: "cache", Label: "Cache Module", Type: FieldTypeString, Required: true, Description: "Name of the cache module to use"}, }, }) @@ -1683,11 +1685,13 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.event_publish", Label: "Event Publish", Category: "pipeline_steps", - Description: "Publishes an event to the event bus", + Description: "Publishes an event to a messaging broker or event bus", ConfigFields: []ConfigFieldDef{ - {Key: "event_type", Label: "Event Type", Type: FieldTypeString, Required: true, Description: "Event type identifier to publish", Placeholder: "user.created"}, - {Key: "payload_from", Label: "Payload From", Type: FieldTypeString, Description: "Dotted path to the event payload in the pipeline context"}, - {Key: "payload", Label: "Payload", Type: FieldTypeJSON, Description: "Static event payload (supports template expressions)"}, + {Key: "topic", Label: "Topic", Type: FieldTypeString, Required: true, Description: "Topic or channel to publish the event to", Placeholder: "user-events"}, + {Key: "payload", Label: "Payload", Type: FieldTypeJSON, Description: "Event payload as a JSON object (supports template expressions); defaults to current pipeline context"}, + {Key: "headers", Label: "Headers", Type: FieldTypeJSON, Description: "Additional headers/metadata to include with the event as a JSON object"}, + {Key: "event_type", Label: "Event Type", Type: FieldTypeString, Description: "Optional event type identifier to include with the message", Placeholder: "user.created"}, + {Key: "broker", Label: "Broker", Type: FieldTypeString, Description: "Name of the messaging broker module to use (falls back to eventbus if not set)"}, }, }) @@ -1707,10 +1711,11 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.validate_pagination", Label: "Validate Pagination", Category: "pipeline_steps", - Description: "Validates and normalizes pagination query parameters (page, page_size, limit, offset)", + Description: "Validates and normalizes pagination query parameters (page, limit, offset)", ConfigFields: []ConfigFieldDef{ - {Key: "default_page_size", Label: "Default Page Size", Type: FieldTypeNumber, DefaultValue: 20, Description: "Default number of items per page"}, - {Key: "max_page_size", Label: "Max Page Size", Type: FieldTypeNumber, DefaultValue: 100, Description: "Maximum allowed page size"}, + {Key: "default_page", Label: "Default Page", Type: FieldTypeNumber, DefaultValue: 1, Description: "Default page number when none is provided"}, + {Key: "default_limit", Label: "Default Limit", Type: FieldTypeNumber, DefaultValue: 20, Description: "Default number of items to return when no limit is provided"}, + {Key: "max_limit", Label: "Max Limit", Type: FieldTypeNumber, DefaultValue: 100, Description: "Maximum allowed number of items to return per request"}, }, }) @@ -1728,11 +1733,13 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.dlq_send", Label: "DLQ Send", Category: "pipeline_steps", - Description: "Sends a failed message to the dead letter queue for later replay", + Description: "Sends a failed message to the dead letter topic for later replay", ConfigFields: []ConfigFieldDef{ - {Key: "queue", Label: "Queue Name", Type: FieldTypeString, Description: "DLQ queue name (defaults to pipeline name)"}, - {Key: "reason_from", Label: "Reason From", Type: FieldTypeString, Description: "Context key containing the failure reason"}, - {Key: "payload_from", Label: "Payload From", Type: FieldTypeString, Description: "Dotted path to the message payload"}, + {Key: "topic", Label: "DLQ Topic", Type: FieldTypeString, Required: true, Description: "Dead letter topic to publish failed messages to"}, + {Key: "original_topic", Label: "Original Topic", Type: FieldTypeString, Description: "Optional name of the original topic the message came from"}, + {Key: "error", Label: "Error", Type: FieldTypeString, Description: "Optional error message or template expression containing the failure reason"}, + {Key: "payload", Label: "Payload", Type: FieldTypeMap, Description: "Message payload to send to the DLQ (defaults to current pipeline context)"}, + {Key: "broker", Label: "Broker", Type: FieldTypeString, Description: "Optional name of the messaging broker module to use (falls back to eventbus if not set)"}, }, }) @@ -1740,10 +1747,12 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "step.dlq_replay", Label: "DLQ Replay", Category: "pipeline_steps", - Description: "Replays messages from the dead letter queue", + Description: "Replays messages from a dead letter topic back to the original target topic", ConfigFields: []ConfigFieldDef{ - {Key: "queue", Label: "Queue Name", Type: FieldTypeString, Description: "DLQ queue name to replay from"}, - {Key: "limit", Label: "Limit", Type: FieldTypeNumber, DefaultValue: 10, Description: "Maximum number of messages to replay"}, + {Key: "dlq_topic", Label: "DLQ Topic", Type: FieldTypeString, Required: true, Description: "Dead letter topic name to replay messages from"}, + {Key: "target_topic", Label: "Target Topic", Type: FieldTypeString, Required: true, Description: "Target topic to publish replayed messages to"}, + {Key: "max_messages", Label: "Max Messages", Type: FieldTypeNumber, DefaultValue: 100, Description: "Maximum number of messages to replay"}, + {Key: "broker", Label: "Broker", Type: FieldTypeString, Description: "Name of the messaging broker module to use for replay (falls back to eventbus if not set)"}, }, }) diff --git a/schema/schema.go b/schema/schema.go index 297f1120..ba8f2531 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -257,7 +257,10 @@ func KnownTriggerTypes() []string { defer dynamicTriggerMu.RUnlock() if len(dynamicTriggerTypes) == 0 { - return core + out := make([]string, len(core)) + copy(out, core) + sort.Strings(out) + return out } seen := make(map[string]bool, len(core)+len(dynamicTriggerTypes)) @@ -292,7 +295,10 @@ func KnownWorkflowTypes() []string { defer dynamicWorkflowMu.RUnlock() if len(dynamicWorkflowTypes) == 0 { - return core + out := make([]string, len(core)) + copy(out, core) + sort.Strings(out) + return out } seen := make(map[string]bool, len(core)+len(dynamicWorkflowTypes))