diff --git a/admin/config.yaml b/admin/config.yaml index 8da1acd6..3ded6c8b 100644 --- a/admin/config.yaml +++ b/admin/config.yaml @@ -17,6 +17,7 @@ # - Phase B: V1 CRUD routes decomposed into pipeline primitives (db_query/db_exec) # - Phase C: Missing DB tables added, broken routes replaced with working pipelines # - Phase D: All remaining delegate steps given meaningful names +# - Phase D+: Pre-processing pipeline steps (validation, pagination, rate limiting) for domain routes modules: # --- HTTP Server (admin port) --- @@ -266,6 +267,10 @@ workflows: middlewares: [admin-cors, admin-auth-middleware] pipeline: steps: + - name: validate-body + type: step.validate_request_body + config: + required_fields: [config] - name: validate-engine-config type: step.delegate config: @@ -276,6 +281,17 @@ workflows: middlewares: [admin-cors, admin-auth-middleware] pipeline: steps: + - name: rate-limit + type: step.rate_limit + config: + requests_per_minute: 5 + burst_size: 2 + key_from: "global" + - name: log-reload-attempt + type: step.log + config: + level: warn + message: "Engine reload requested via admin API" - name: reload-engine type: step.delegate config: @@ -1226,6 +1242,12 @@ workflows: type: step.request_parse config: path_params: [id] + - name: validate-id + type: step.validate_path_param + config: + params: [id] + format: uuid + source: "steps.parse-request.path_params" - name: get-execution type: step.db_query config: @@ -1261,6 +1283,18 @@ workflows: type: step.request_parse config: path_params: [id] + - name: validate-id + type: step.validate_path_param + config: + params: [id] + format: uuid + source: "steps.parse-request.path_params" + - name: validate-pagination + type: step.validate_pagination + config: + max_limit: 100 + default_limit: 50 + default_page: 1 - name: list-steps type: step.db_query config: @@ -1696,6 +1730,12 @@ workflows: middlewares: [admin-cors, admin-auth-middleware] pipeline: steps: + - name: validate-pagination + type: step.validate_pagination + config: + max_limit: 100 + default_limit: 20 + default_page: 1 - name: list-all-executions type: step.delegate config: @@ -1781,6 +1821,22 @@ workflows: middlewares: [admin-cors, admin-auth-middleware] pipeline: steps: + - name: parse-request + type: step.request_parse + config: + path_params: [id] + - name: validate-id + type: step.validate_path_param + config: + params: [id] + format: uuid + source: "steps.parse-request.path_params" + - name: rate-limit + type: step.rate_limit + config: + requests_per_minute: 30 + burst_size: 5 + key_from: "global" - name: retry-dlq-entry type: step.delegate config: @@ -1791,6 +1847,22 @@ workflows: middlewares: [admin-cors, admin-auth-middleware] pipeline: steps: + - name: parse-request + type: step.request_parse + config: + path_params: [id] + - name: validate-id + type: step.validate_path_param + config: + params: [id] + format: uuid + source: "steps.parse-request.path_params" + - name: rate-limit + type: step.rate_limit + config: + requests_per_minute: 30 + burst_size: 5 + key_from: "global" - name: discard-dlq-entry type: step.delegate config: diff --git a/module/command_handler_test.go b/module/command_handler_test.go index 7819aad4..14605036 100644 --- a/module/command_handler_test.go +++ b/module/command_handler_test.go @@ -294,4 +294,3 @@ func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/module/pipeline_step_validate_pagination.go b/module/pipeline_step_validate_pagination.go new file mode 100644 index 00000000..5b40218e --- /dev/null +++ b/module/pipeline_step_validate_pagination.go @@ -0,0 +1,109 @@ +package module + +import ( + "context" + "fmt" + "net/http" + "strconv" + + "github.com/CrisisTextLine/modular" +) + +// ValidatePaginationStep validates and normalises page/limit query parameters. +// It reads from the HTTP request in pipeline metadata and outputs resolved +// pagination values with defaults applied. +type ValidatePaginationStep struct { + name string + maxLimit int + defaultLimit int + defaultPage int +} + +// NewValidatePaginationStepFactory returns a StepFactory that creates ValidatePaginationStep instances. +func NewValidatePaginationStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + maxLimit := 100 + if v, ok := config["max_limit"]; ok { + switch val := v.(type) { + case int: + maxLimit = val + case float64: + maxLimit = int(val) + } + } + if maxLimit <= 0 { + return nil, fmt.Errorf("validate_pagination step %q: max_limit must be positive", name) + } + + defaultLimit := 20 + if v, ok := config["default_limit"]; ok { + switch val := v.(type) { + case int: + defaultLimit = val + case float64: + defaultLimit = int(val) + } + } + + defaultPage := 1 + if v, ok := config["default_page"]; ok { + switch val := v.(type) { + case int: + defaultPage = val + case float64: + defaultPage = int(val) + } + } + + return &ValidatePaginationStep{ + name: name, + maxLimit: maxLimit, + defaultLimit: defaultLimit, + defaultPage: defaultPage, + }, nil + } +} + +// Name returns the step name. +func (s *ValidatePaginationStep) Name() string { return s.name } + +// Execute reads page and limit query parameters from the HTTP request, +// validates their ranges, applies defaults, and outputs the resolved values. +func (s *ValidatePaginationStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { + page := s.defaultPage + limit := s.defaultLimit + + req, _ := pc.Metadata["_http_request"].(*http.Request) + if req != nil { + q := req.URL.Query() + + if pageStr := q.Get("page"); pageStr != "" { + p, err := strconv.Atoi(pageStr) + if err != nil || p < 1 { + return nil, fmt.Errorf("validate_pagination step %q: invalid page parameter %q — must be a positive integer", s.name, pageStr) + } + page = p + } + + if limitStr := q.Get("limit"); limitStr != "" { + l, err := strconv.Atoi(limitStr) + if err != nil || l < 1 { + return nil, fmt.Errorf("validate_pagination step %q: invalid limit parameter %q — must be a positive integer", s.name, limitStr) + } + if l > s.maxLimit { + return nil, fmt.Errorf("validate_pagination step %q: limit %d exceeds maximum %d", s.name, l, s.maxLimit) + } + limit = l + } + } + + offset := (page - 1) * limit + + return &StepResult{ + Output: map[string]any{ + "page": page, + "limit": limit, + "offset": offset, + }, + }, nil +} diff --git a/module/pipeline_step_validate_pagination_test.go b/module/pipeline_step_validate_pagination_test.go new file mode 100644 index 00000000..2f14da1e --- /dev/null +++ b/module/pipeline_step_validate_pagination_test.go @@ -0,0 +1,121 @@ +package module + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" +) + +func TestValidatePaginationStepFactory(t *testing.T) { + factory := NewValidatePaginationStepFactory() + + t.Run("defaults", func(t *testing.T) { + step, err := factory("test", map[string]any{}, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if step.Name() != "test" { + t.Fatalf("expected name 'test', got %q", step.Name()) + } + }) + + t.Run("invalid max_limit", func(t *testing.T) { + _, err := factory("test", map[string]any{"max_limit": 0}, nil) + if err == nil { + t.Fatal("expected error for zero max_limit") + } + }) +} + +func TestValidatePaginationStep_Execute(t *testing.T) { + factory := NewValidatePaginationStepFactory() + + t.Run("defaults applied", func(t *testing.T) { + step, _ := factory("test", map[string]any{}, nil) + req := httptest.NewRequest(http.MethodGet, "/test", nil) + pc := &PipelineContext{ + Metadata: map[string]any{"_http_request": req}, + Current: map[string]any{}, + } + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Output["page"] != 1 { + t.Fatalf("expected page=1, got %v", result.Output["page"]) + } + if result.Output["limit"] != 20 { + t.Fatalf("expected limit=20, got %v", result.Output["limit"]) + } + if result.Output["offset"] != 0 { + t.Fatalf("expected offset=0, got %v", result.Output["offset"]) + } + }) + + t.Run("custom values", func(t *testing.T) { + step, _ := factory("test", map[string]any{"max_limit": 50}, nil) + req := httptest.NewRequest(http.MethodGet, "/test?page=3&limit=10", nil) + pc := &PipelineContext{ + Metadata: map[string]any{"_http_request": req}, + Current: map[string]any{}, + } + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Output["page"] != 3 { + t.Fatalf("expected page=3, got %v", result.Output["page"]) + } + if result.Output["limit"] != 10 { + t.Fatalf("expected limit=10, got %v", result.Output["limit"]) + } + if result.Output["offset"] != 20 { + t.Fatalf("expected offset=20, got %v", result.Output["offset"]) + } + }) + + t.Run("invalid page", func(t *testing.T) { + step, _ := factory("test", map[string]any{}, nil) + req := httptest.NewRequest(http.MethodGet, "/test?page=-1", nil) + pc := &PipelineContext{ + Metadata: map[string]any{"_http_request": req}, + Current: map[string]any{}, + } + + _, err := step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for negative page") + } + }) + + t.Run("limit exceeds max", func(t *testing.T) { + step, _ := factory("test", map[string]any{"max_limit": 50}, nil) + req := httptest.NewRequest(http.MethodGet, "/test?limit=200", nil) + pc := &PipelineContext{ + Metadata: map[string]any{"_http_request": req}, + Current: map[string]any{}, + } + + _, err := step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for limit exceeding max") + } + }) + + t.Run("non-numeric limit", func(t *testing.T) { + step, _ := factory("test", map[string]any{}, nil) + req := httptest.NewRequest(http.MethodGet, "/test?limit=abc", nil) + pc := &PipelineContext{ + Metadata: map[string]any{"_http_request": req}, + Current: map[string]any{}, + } + + _, err := step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for non-numeric limit") + } + }) +} diff --git a/module/pipeline_step_validate_path_param.go b/module/pipeline_step_validate_path_param.go new file mode 100644 index 00000000..2ae9beaa --- /dev/null +++ b/module/pipeline_step_validate_path_param.go @@ -0,0 +1,117 @@ +package module + +import ( + "context" + "fmt" + "regexp" + "strings" + + "github.com/CrisisTextLine/modular" +) + +var uuidPattern = regexp.MustCompile(`^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$`) + +// ValidatePathParamStep validates that path parameters extracted by a +// request_parse step are present and optionally conform to a format (e.g. UUID). +type ValidatePathParamStep struct { + name string + params []string + format string // "uuid" or "" (non-empty only) + source string // dotted path to the params map, e.g. "steps.parse-request.path_params" +} + +// NewValidatePathParamStepFactory returns a StepFactory that creates ValidatePathParamStep instances. +func NewValidatePathParamStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + rawParams, _ := config["params"].([]any) + if len(rawParams) == 0 { + return nil, fmt.Errorf("validate_path_param step %q: 'params' list is required", name) + } + params := make([]string, 0, len(rawParams)) + for _, p := range rawParams { + s, ok := p.(string) + if !ok { + return nil, fmt.Errorf("validate_path_param step %q: params entries must be strings", name) + } + params = append(params, s) + } + + format, _ := config["format"].(string) + source, _ := config["source"].(string) + + return &ValidatePathParamStep{ + name: name, + params: params, + format: format, + source: source, + }, nil + } +} + +// Name returns the step name. +func (s *ValidatePathParamStep) Name() string { return s.name } + +// Execute validates that each configured path parameter is present and +// optionally matches the required format. +func (s *ValidatePathParamStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { + data := s.resolveSource(pc) + if data == nil { + return nil, fmt.Errorf("validate_path_param step %q: source %q resolved to nil", s.name, s.source) + } + + var errors []string + for _, param := range s.params { + val, exists := data[param] + if !exists { + errors = append(errors, fmt.Sprintf("missing path parameter %q", param)) + continue + } + str, ok := val.(string) + if !ok || str == "" { + errors = append(errors, fmt.Sprintf("path parameter %q must be a non-empty string", param)) + continue + } + if s.format == "uuid" && !uuidPattern.MatchString(str) { + errors = append(errors, fmt.Sprintf("path parameter %q must be a valid UUID, got %q", param, str)) + } + } + + if len(errors) > 0 { + return nil, fmt.Errorf("validate_path_param step %q: %s", s.name, strings.Join(errors, "; ")) + } + + return &StepResult{Output: map[string]any{"valid": true}}, nil +} + +// resolveSource walks a dotted path into the pipeline context to find the +// parameter map to validate. Falls back to pc.Current when source is empty. +func (s *ValidatePathParamStep) resolveSource(pc *PipelineContext) map[string]any { + if s.source == "" { + return pc.Current + } + + data := map[string]any{ + "steps": func() map[string]any { + result := make(map[string]any) + for k, v := range pc.StepOutputs { + result[k] = v + } + return result + }(), + } + + parts := strings.Split(s.source, ".") + current := data + for _, part := range parts { + val, ok := current[part] + if !ok { + return nil + } + nested, ok := val.(map[string]any) + if !ok { + return nil + } + current = nested + } + return current +} diff --git a/module/pipeline_step_validate_path_param_test.go b/module/pipeline_step_validate_path_param_test.go new file mode 100644 index 00000000..04732d56 --- /dev/null +++ b/module/pipeline_step_validate_path_param_test.go @@ -0,0 +1,144 @@ +package module + +import ( + "context" + "testing" +) + +func TestValidatePathParamStepFactory(t *testing.T) { + factory := NewValidatePathParamStepFactory() + + t.Run("missing params config", func(t *testing.T) { + _, err := factory("test", map[string]any{}, nil) + if err == nil { + t.Fatal("expected error for missing params") + } + }) + + t.Run("valid factory config", func(t *testing.T) { + step, err := factory("test", map[string]any{ + "params": []any{"id"}, + "format": "uuid", + "source": "steps.parse-request.path_params", + }, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if step.Name() != "test" { + t.Fatalf("expected name 'test', got %q", step.Name()) + } + }) +} + +func TestValidatePathParamStep_Execute(t *testing.T) { + factory := NewValidatePathParamStepFactory() + + t.Run("valid UUID", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "params": []any{"id"}, + "format": "uuid", + "source": "steps.parse-request.path_params", + }, nil) + + pc := &PipelineContext{ + StepOutputs: map[string]map[string]any{ + "parse-request": { + "path_params": map[string]any{ + "id": "550e8400-e29b-41d4-a716-446655440000", + }, + }, + }, + Current: map[string]any{}, + } + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Output["valid"] != true { + t.Fatal("expected valid=true") + } + }) + + t.Run("invalid UUID", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "params": []any{"id"}, + "format": "uuid", + "source": "steps.parse-request.path_params", + }, nil) + + pc := &PipelineContext{ + StepOutputs: map[string]map[string]any{ + "parse-request": { + "path_params": map[string]any{ + "id": "not-a-uuid", + }, + }, + }, + Current: map[string]any{}, + } + + _, err := step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for invalid UUID") + } + }) + + t.Run("missing param", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "params": []any{"id"}, + "source": "steps.parse-request.path_params", + }, nil) + + pc := &PipelineContext{ + StepOutputs: map[string]map[string]any{ + "parse-request": { + "path_params": map[string]any{}, + }, + }, + Current: map[string]any{}, + } + + _, err := step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for missing param") + } + }) + + t.Run("non-empty only (no format)", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "params": []any{"id"}, + }, nil) + + pc := &PipelineContext{ + Current: map[string]any{ + "id": "some-value", + }, + } + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.Output["valid"] != true { + t.Fatal("expected valid=true") + } + }) + + t.Run("empty string param", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "params": []any{"id"}, + }, nil) + + pc := &PipelineContext{ + Current: map[string]any{ + "id": "", + }, + } + + _, err := step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for empty param") + } + }) +} diff --git a/module/pipeline_step_validate_request_body.go b/module/pipeline_step_validate_request_body.go new file mode 100644 index 00000000..eff3278d --- /dev/null +++ b/module/pipeline_step_validate_request_body.go @@ -0,0 +1,90 @@ +package module + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/CrisisTextLine/modular" +) + +// ValidateRequestBodyStep parses the JSON request body from the HTTP +// request and validates that all required fields are present. +type ValidateRequestBodyStep struct { + name string + requiredFields []string +} + +// NewValidateRequestBodyStepFactory returns a StepFactory that creates +// ValidateRequestBodyStep instances. +func NewValidateRequestBodyStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + var required []string + if raw, ok := config["required_fields"].([]any); ok { + for _, f := range raw { + s, ok := f.(string) + if !ok { + return nil, fmt.Errorf("validate_request_body step %q: required_fields entries must be strings", name) + } + required = append(required, s) + } + } + + return &ValidateRequestBodyStep{ + name: name, + requiredFields: required, + }, nil + } +} + +// Name returns the step name. +func (s *ValidateRequestBodyStep) Name() string { return s.name } + +// Execute parses the JSON body from the HTTP request and validates +// required fields are present. The parsed body is returned as output +// so downstream steps can reference it. +func (s *ValidateRequestBodyStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { + // Try trigger data first (command handler may have pre-parsed) + var body map[string]any + if b, ok := pc.TriggerData["body"].(map[string]any); ok { + body = b + } else if b, ok := pc.Current["body"].(map[string]any); ok { + body = b + } else { + req, _ := pc.Metadata["_http_request"].(*http.Request) + if req != nil && req.Body != nil { + bodyBytes, err := io.ReadAll(req.Body) + if err != nil { + return nil, fmt.Errorf("validate_request_body step %q: failed to read body: %w", s.name, err) + } + if len(bodyBytes) > 0 { + if err := json.Unmarshal(bodyBytes, &body); err != nil { + return nil, fmt.Errorf("validate_request_body step %q: invalid JSON body: %w", s.name, err) + } + } + } + } + + if body == nil && len(s.requiredFields) > 0 { + return nil, fmt.Errorf("validate_request_body step %q: request body is required", s.name) + } + + var missing []string + for _, field := range s.requiredFields { + if _, exists := body[field]; !exists { + missing = append(missing, field) + } + } + if len(missing) > 0 { + return nil, fmt.Errorf("validate_request_body step %q: missing required fields: %s", s.name, strings.Join(missing, ", ")) + } + + return &StepResult{ + Output: map[string]any{ + "body": body, + }, + }, nil +} diff --git a/module/pipeline_step_validate_request_body_test.go b/module/pipeline_step_validate_request_body_test.go new file mode 100644 index 00000000..4ffbee87 --- /dev/null +++ b/module/pipeline_step_validate_request_body_test.go @@ -0,0 +1,145 @@ +package module + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestValidateRequestBodyStepFactory(t *testing.T) { + factory := NewValidateRequestBodyStepFactory() + + t.Run("no required fields", func(t *testing.T) { + step, err := factory("test", map[string]any{}, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if step.Name() != "test" { + t.Fatalf("expected name 'test', got %q", step.Name()) + } + }) + + t.Run("with required fields", func(t *testing.T) { + step, err := factory("test", map[string]any{ + "required_fields": []any{"config"}, + }, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if step.Name() != "test" { + t.Fatalf("expected name 'test', got %q", step.Name()) + } + }) +} + +func TestValidateRequestBodyStep_Execute(t *testing.T) { + factory := NewValidateRequestBodyStepFactory() + + t.Run("valid body from trigger data", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "required_fields": []any{"config"}, + }, nil) + + pc := &PipelineContext{ + TriggerData: map[string]any{ + "body": map[string]any{"config": "test-data"}, + }, + Current: map[string]any{}, + } + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + body, ok := result.Output["body"].(map[string]any) + if !ok { + t.Fatal("expected body in output") + } + if body["config"] != "test-data" { + t.Fatalf("expected config='test-data', got %v", body["config"]) + } + }) + + t.Run("missing required field", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "required_fields": []any{"config"}, + }, nil) + + pc := &PipelineContext{ + TriggerData: map[string]any{ + "body": map[string]any{"other": "value"}, + }, + Current: map[string]any{}, + } + + _, err := step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for missing required field") + } + }) + + t.Run("no body when required", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "required_fields": []any{"config"}, + }, nil) + + pc := &PipelineContext{ + TriggerData: map[string]any{}, + Current: map[string]any{}, + Metadata: map[string]any{}, + } + + _, err := step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for missing body") + } + }) + + t.Run("body from HTTP request", func(t *testing.T) { + step, _ := factory("test", map[string]any{ + "required_fields": []any{"name"}, + }, nil) + + body := `{"name": "test-config"}` + req := httptest.NewRequest(http.MethodPost, "/test", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + + pc := &PipelineContext{ + TriggerData: map[string]any{}, + Current: map[string]any{}, + Metadata: map[string]any{"_http_request": req}, + } + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + parsedBody, ok := result.Output["body"].(map[string]any) + if !ok { + t.Fatal("expected body in output") + } + if parsedBody["name"] != "test-config" { + t.Fatalf("expected name='test-config', got %v", parsedBody["name"]) + } + }) + + t.Run("no required fields allows empty body", func(t *testing.T) { + step, _ := factory("test", map[string]any{}, nil) + + pc := &PipelineContext{ + TriggerData: map[string]any{}, + Current: map[string]any{}, + Metadata: map[string]any{}, + } + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result == nil { + t.Fatal("expected non-nil result") + } + }) +} diff --git a/module/query_handler_test.go b/module/query_handler_test.go index a8243538..132ebab5 100644 --- a/module/query_handler_test.go +++ b/module/query_handler_test.go @@ -299,4 +299,3 @@ func TestQueryHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/plugins/api/plugin.go b/plugins/api/plugin.go index c3e1fa49..9ef86b44 100644 --- a/plugins/api/plugin.go +++ b/plugins/api/plugin.go @@ -100,12 +100,14 @@ func New() *Plugin { return &Plugin{ // Default constructors wrap the concrete module constructors, adapting // their return types to modular.Module via implicit interface satisfaction. - newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, - newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, - newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, - newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, - newWorkflowRegistry: func(name, storageBackend string) modular.Module { return module.NewWorkflowRegistry(name, storageBackend) }, - newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, + newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, + newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, + newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, + newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, + newWorkflowRegistry: func(name, storageBackend string) modular.Module { + return module.NewWorkflowRegistry(name, storageBackend) + }, + newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, newProcessingStep: func(name string, cfg module.ProcessingStepConfig) modular.Module { return module.NewProcessingStep(name, cfg) }, diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index 49b4591e..326c301e 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -1,6 +1,7 @@ // Package pipelinesteps provides a plugin that registers generic pipeline step // types: validate, transform, conditional, set, log, delegate, jq, publish, -// http_call, request_parse, db_query, db_exec, json_response. +// http_call, request_parse, db_query, db_exec, json_response, +// validate_path_param, validate_pagination, validate_request_body. // It also provides the PipelineWorkflowHandler for composable pipelines. package pipelinesteps @@ -38,13 +39,13 @@ func New() *Plugin { BaseNativePlugin: plugin.BaseNativePlugin{ PluginName: "pipeline-steps", PluginVersion: "1.0.0", - PluginDescription: "Generic pipeline step types (validate, transform, conditional, set, log, delegate, jq, etc.)", + PluginDescription: "Generic pipeline step types (validate, transform, conditional, set, log, delegate, jq, validate_path_param, validate_pagination, validate_request_body, etc.)", }, Manifest: plugin.PluginManifest{ Name: "pipeline-steps", Version: "1.0.0", Author: "GoCodeAlone", - Description: "Generic pipeline step types and pipeline workflow handler", + Description: "Generic pipeline step types, pre-processing validators, and pipeline workflow handler", Tier: plugin.TierCore, StepTypes: []string{ "step.validate", @@ -60,6 +61,9 @@ func New() *Plugin { "step.db_query", "step.db_exec", "step.json_response", + "step.validate_path_param", + "step.validate_pagination", + "step.validate_request_body", }, WorkflowTypes: []string{"pipeline"}, Capabilities: []plugin.CapabilityDecl{ @@ -83,19 +87,22 @@ func (p *Plugin) Capabilities() []capability.Contract { // StepFactories returns the step factories provided by this plugin. func (p *Plugin) StepFactories() map[string]plugin.StepFactory { return map[string]plugin.StepFactory{ - "step.validate": wrapStepFactory(module.NewValidateStepFactory()), - "step.transform": wrapStepFactory(module.NewTransformStepFactory()), - "step.conditional": wrapStepFactory(module.NewConditionalStepFactory()), - "step.set": wrapStepFactory(module.NewSetStepFactory()), - "step.log": wrapStepFactory(module.NewLogStepFactory()), - "step.delegate": wrapStepFactory(module.NewDelegateStepFactory()), - "step.jq": wrapStepFactory(module.NewJQStepFactory()), - "step.publish": wrapStepFactory(module.NewPublishStepFactory()), - "step.http_call": wrapStepFactory(module.NewHTTPCallStepFactory()), - "step.request_parse": wrapStepFactory(module.NewRequestParseStepFactory()), - "step.db_query": wrapStepFactory(module.NewDBQueryStepFactory()), - "step.db_exec": wrapStepFactory(module.NewDBExecStepFactory()), - "step.json_response": wrapStepFactory(module.NewJSONResponseStepFactory()), + "step.validate": wrapStepFactory(module.NewValidateStepFactory()), + "step.transform": wrapStepFactory(module.NewTransformStepFactory()), + "step.conditional": wrapStepFactory(module.NewConditionalStepFactory()), + "step.set": wrapStepFactory(module.NewSetStepFactory()), + "step.log": wrapStepFactory(module.NewLogStepFactory()), + "step.delegate": wrapStepFactory(module.NewDelegateStepFactory()), + "step.jq": wrapStepFactory(module.NewJQStepFactory()), + "step.publish": wrapStepFactory(module.NewPublishStepFactory()), + "step.http_call": wrapStepFactory(module.NewHTTPCallStepFactory()), + "step.request_parse": wrapStepFactory(module.NewRequestParseStepFactory()), + "step.db_query": wrapStepFactory(module.NewDBQueryStepFactory()), + "step.db_exec": wrapStepFactory(module.NewDBExecStepFactory()), + "step.json_response": wrapStepFactory(module.NewJSONResponseStepFactory()), + "step.validate_path_param": wrapStepFactory(module.NewValidatePathParamStepFactory()), + "step.validate_pagination": wrapStepFactory(module.NewValidatePaginationStepFactory()), + "step.validate_request_body": wrapStepFactory(module.NewValidateRequestBodyStepFactory()), } } diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index 7cb31f86..338ff99c 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -44,6 +44,9 @@ func TestStepFactories(t *testing.T) { "step.db_query", "step.db_exec", "step.json_response", + "step.validate_path_param", + "step.validate_pagination", + "step.validate_request_body", } for _, stepType := range expectedSteps { @@ -65,7 +68,7 @@ func TestPluginLoads(t *testing.T) { } steps := loader.StepFactories() - if len(steps) != 13 { - t.Fatalf("expected 13 step factories after load, got %d", len(steps)) + if len(steps) != 16 { + t.Fatalf("expected 16 step factories after load, got %d", len(steps)) } }