diff --git a/module/pipeline_step_foreach.go b/module/pipeline_step_foreach.go new file mode 100644 index 00000000..90e74b3f --- /dev/null +++ b/module/pipeline_step_foreach.go @@ -0,0 +1,278 @@ +package module + +import ( + "context" + "fmt" + "maps" + + "github.com/CrisisTextLine/modular" +) + +// ForEachStep iterates over a collection and executes sub-steps for each item. +type ForEachStep struct { + name string + collection string + itemKey string + indexKey string + subSteps []PipelineStep + tmpl *TemplateEngine +} + +// NewForEachStepFactory returns a StepFactory that creates ForEachStep instances. +// registryFn is called at step-creation time to obtain the step registry used to +// build sub-steps. Passing a function (rather than the registry directly) allows +// the factory to be registered before the registry is fully populated, enabling +// sub-steps to themselves be any registered step type. +func NewForEachStepFactory(registryFn func() *StepRegistry, app modular.Application) StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + registry := registryFn() + + collection, _ := config["collection"].(string) + if collection == "" { + return nil, fmt.Errorf("foreach step %q: 'collection' is required", name) + } + + itemKey, _ := config["item_key"].(string) + if itemKey == "" { + itemKey = "item" + } + + indexKey, _ := config["index_key"].(string) + if indexKey == "" { + indexKey = "index" + } + + // Build sub-steps from inline config + stepsRaw, _ := config["steps"].([]any) + subSteps := make([]PipelineStep, 0, len(stepsRaw)) + for i, raw := range stepsRaw { + stepCfg, ok := raw.(map[string]any) + if !ok { + return nil, fmt.Errorf("foreach step %q: steps[%d] must be a map", name, i) + } + + stepType, _ := stepCfg["type"].(string) + if stepType == "" { + return nil, fmt.Errorf("foreach step %q: steps[%d] missing 'type'", name, i) + } + + stepName, _ := stepCfg["name"].(string) + if stepName == "" { + stepName = fmt.Sprintf("%s-sub-%d", name, i) + } + + // Build the step config without meta fields + subCfg := make(map[string]any) + for k, v := range stepCfg { + if k != "type" && k != "name" { + subCfg[k] = v + } + } + + step, err := registry.Create(stepType, stepName, subCfg, app) + if err != nil { + return nil, fmt.Errorf("foreach step %q: failed to build sub-step %d (%s): %w", name, i, stepType, err) + } + subSteps = append(subSteps, step) + } + + return &ForEachStep{ + name: name, + collection: collection, + itemKey: itemKey, + indexKey: indexKey, + subSteps: subSteps, + tmpl: NewTemplateEngine(), + }, nil + } +} + +// Name returns the step name. +func (s *ForEachStep) Name() string { return s.name } + +// Execute iterates over the collection and runs sub-steps for each item. +func (s *ForEachStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + // Resolve the collection from the pipeline context + items, err := s.resolveCollection(pc) + if err != nil { + return nil, fmt.Errorf("foreach step %q: %w", s.name, err) + } + + // Handle empty collections gracefully + if len(items) == 0 { + return &StepResult{ + Output: map[string]any{ + "results": []any{}, + "count": 0, + }, + }, nil + } + + collected := make([]any, 0, len(items)) + + for i, item := range items { + // Create a child context with item and index injected + childPC := s.buildChildContext(pc, item, i) + + // Execute each sub-step sequentially for this item + iterResult := make(map[string]any) + for _, step := range s.subSteps { + result, execErr := step.Execute(ctx, childPC) + if execErr != nil { + return nil, fmt.Errorf("foreach step %q: iteration %d, sub-step %q failed: %w", + s.name, i, step.Name(), execErr) + } + if result != nil && result.Output != nil { + childPC.MergeStepOutput(step.Name(), result.Output) + maps.Copy(iterResult, result.Output) + } + if result != nil && result.Stop { + break + } + } + collected = append(collected, iterResult) + } + + return &StepResult{ + Output: map[string]any{ + "results": collected, + "count": len(collected), + }, + }, nil +} + +// resolveCollection resolves the collection field to a []any. +func (s *ForEachStep) resolveCollection(pc *PipelineContext) ([]any, error) { + // Look up the field path directly in Current (handles simple keys) + if val, ok := pc.Current[s.collection]; ok { + return foreachToSlice(val) + } + + // Try trigger data + if val, ok := pc.TriggerData[s.collection]; ok { + return foreachToSlice(val) + } + + // Try dot-separated path through the full template data + // (e.g., "steps.fetch.rows" or nested keys) + data := make(map[string]any) + maps.Copy(data, pc.Current) + data["steps"] = pc.StepOutputs + data["trigger"] = pc.TriggerData + + if val, found := foreachWalkPath(data, s.collection); found { + return foreachToSlice(val) + } + + return nil, fmt.Errorf("collection %q not found in context", s.collection) +} + +// buildChildContext creates a child PipelineContext with item and index injected. +func (s *ForEachStep) buildChildContext(parent *PipelineContext, item any, index int) *PipelineContext { + // Copy trigger data + childTrigger := make(map[string]any) + maps.Copy(childTrigger, parent.TriggerData) + + // Copy metadata + childMeta := make(map[string]any) + maps.Copy(childMeta, parent.Metadata) + + // Build current: start with parent's current, inject item and index + childCurrent := make(map[string]any) + maps.Copy(childCurrent, parent.Current) + childCurrent[s.itemKey] = item + childCurrent[s.indexKey] = index + + // Copy step outputs + childOutputs := make(map[string]map[string]any) + for k, v := range parent.StepOutputs { + out := make(map[string]any) + maps.Copy(out, v) + childOutputs[k] = out + } + + return &PipelineContext{ + TriggerData: childTrigger, + StepOutputs: childOutputs, + Current: childCurrent, + Metadata: childMeta, + } +} + +// foreachWalkPath traverses a dot-separated path through nested maps. +// Returns the found value and true if found, or nil and false if not. +// It handles both map[string]any and map[string]map[string]any (step outputs). +func foreachWalkPath(data map[string]any, path string) (any, bool) { + // Try the full path as a key first + if val, ok := data[path]; ok { + return val, true + } + + // Walk dot-separated segments + current := any(data) + segments := foreachSplitPath(path) + for _, seg := range segments { + switch m := current.(type) { + case map[string]any: + val, ok := m[seg] + if !ok { + return nil, false + } + current = val + case map[string]map[string]any: + // Step outputs are stored as map[string]map[string]any + val, ok := m[seg] + if !ok { + return nil, false + } + current = val + default: + return nil, false + } + } + return current, true +} + +// foreachSplitPath splits a dot-separated path into segments. +func foreachSplitPath(path string) []string { + var segs []string + start := 0 + for i := 0; i < len(path); i++ { + if path[i] == '.' { + segs = append(segs, path[start:i]) + start = i + 1 + } + } + segs = append(segs, path[start:]) + return segs +} + +// foreachToSlice converts a value to []any if possible. +func foreachToSlice(val any) ([]any, error) { + switch v := val.(type) { + case []any: + return v, nil + case []map[string]any: + result := make([]any, len(v)) + for i, item := range v { + result[i] = item + } + return result, nil + case []string: + result := make([]any, len(v)) + for i, item := range v { + result[i] = item + } + return result, nil + case []int: + result := make([]any, len(v)) + for i, item := range v { + result[i] = item + } + return result, nil + case nil: + return []any{}, nil + default: + return nil, fmt.Errorf("expected a slice, got %T", val) + } +} diff --git a/module/pipeline_step_foreach_test.go b/module/pipeline_step_foreach_test.go new file mode 100644 index 00000000..6fcd2cd6 --- /dev/null +++ b/module/pipeline_step_foreach_test.go @@ -0,0 +1,321 @@ +package module + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/CrisisTextLine/modular" +) + +// buildTestForEachStep creates a ForEachStep with a fresh StepRegistry for testing. +// It registers a simple "step.set" factory so sub-steps can be built. +func buildTestForEachStep(t *testing.T, name string, config map[string]any) (PipelineStep, error) { + t.Helper() + registry := NewStepRegistry() + registry.Register("step.set", NewSetStepFactory()) + registry.Register("step.log", NewLogStepFactory()) + factory := NewForEachStepFactory(func() *StepRegistry { return registry }, nil) + return factory(name, config, nil) +} + +func TestForEachStep_IteratesOverSliceOfMaps(t *testing.T) { + step, err := buildTestForEachStep(t, "foreach-test", map[string]any{ + "collection": "items", + "item_key": "item", + "index_key": "index", + "steps": []any{ + map[string]any{ + "type": "step.set", + "name": "capture", + "values": map[string]any{ + "captured_name": "{{.item.name}}", + "captured_idx": "{{.index}}", + }, + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "items": []any{ + map[string]any{"name": "alice", "age": 30}, + map[string]any{"name": "bob", "age": 25}, + }, + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["count"] != 2 { + t.Errorf("expected count=2, got %v", result.Output["count"]) + } + + results, ok := result.Output["results"].([]any) + if !ok { + t.Fatalf("expected results to be []any, got %T", result.Output["results"]) + } + if len(results) != 2 { + t.Fatalf("expected 2 results, got %d", len(results)) + } + + first, ok := results[0].(map[string]any) + if !ok { + t.Fatalf("expected first result to be map[string]any, got %T", results[0]) + } + if first["captured_name"] != "alice" { + t.Errorf("expected captured_name='alice', got %v", first["captured_name"]) + } + if first["captured_idx"] != "0" { + t.Errorf("expected captured_idx='0', got %v", first["captured_idx"]) + } + + second, ok := results[1].(map[string]any) + if !ok { + t.Fatalf("expected second result to be map[string]any, got %T", results[1]) + } + if second["captured_name"] != "bob" { + t.Errorf("expected captured_name='bob', got %v", second["captured_name"]) + } +} + +func TestForEachStep_EmptyCollection(t *testing.T) { + step, err := buildTestForEachStep(t, "foreach-empty", map[string]any{ + "collection": "items", + "steps": []any{ + map[string]any{ + "type": "step.set", + "name": "set-item", + "values": map[string]any{ + "processed": "true", + }, + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "items": []any{}, + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["count"] != 0 { + t.Errorf("expected count=0, got %v", result.Output["count"]) + } + + results, ok := result.Output["results"].([]any) + if !ok { + t.Fatalf("expected results to be []any, got %T", result.Output["results"]) + } + if len(results) != 0 { + t.Errorf("expected 0 results, got %d", len(results)) + } +} + +func TestForEachStep_DefaultItemAndIndexKeys(t *testing.T) { + step, err := buildTestForEachStep(t, "foreach-defaults", map[string]any{ + "collection": "things", + // no item_key or index_key — should default to "item" and "index" + "steps": []any{ + map[string]any{ + "type": "step.set", + "name": "set-val", + "values": map[string]any{ + "got_item": "{{.item}}", + "got_index": "{{.index}}", + }, + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "things": []any{"x", "y"}, + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["count"] != 2 { + t.Errorf("expected count=2, got %v", result.Output["count"]) + } +} + +func TestForEachStep_SubStepErrorStopsIteration(t *testing.T) { + // Register a failing step type for this test + registry := NewStepRegistry() + registry.Register("step.set", NewSetStepFactory()) + + callCount := 0 + registry.Register("step.fail", func(name string, _ map[string]any, _ modular.Application) (PipelineStep, error) { + return &mockStep{ + name: name, + execFn: func(_ context.Context, _ *PipelineContext) (*StepResult, error) { + callCount++ + if callCount >= 1 { + return nil, errors.New("deliberate sub-step failure") + } + return &StepResult{Output: map[string]any{}}, nil + }, + }, nil + }) + + factory := NewForEachStepFactory(func() *StepRegistry { return registry }, nil) + step, err := factory("foreach-fail", map[string]any{ + "collection": "items", + "steps": []any{ + map[string]any{ + "type": "step.fail", + "name": "fail-step", + }, + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "items": []any{"a", "b", "c"}, + }, nil) + + _, execErr := step.Execute(context.Background(), pc) + if execErr == nil { + t.Fatal("expected error from failing sub-step") + } + if callCount > 1 { + t.Errorf("expected iteration to stop after first failure, but sub-step was called %d times", callCount) + } +} + +func TestForEachStep_FactoryRejectsMissingCollection(t *testing.T) { + _, err := buildTestForEachStep(t, "bad-foreach", map[string]any{ + "steps": []any{}, + }) + if err == nil { + t.Fatal("expected error for missing 'collection'") + } +} + +func TestForEachStep_FactoryRejectsInvalidSubStepType(t *testing.T) { + _, err := buildTestForEachStep(t, "bad-substep", map[string]any{ + "collection": "items", + "steps": []any{ + map[string]any{ + "type": "step.nonexistent", + "name": "bad", + }, + }, + }) + if err == nil { + t.Fatal("expected error for unknown sub-step type") + } +} + +func TestForEachStep_IteratesWithStepOutputAccess(t *testing.T) { + // Test that the item is accessible and sub-steps can build on each other within an iteration + step, err := buildTestForEachStep(t, "foreach-chained", map[string]any{ + "collection": "users", + "item_key": "user", + "index_key": "i", + "steps": []any{ + map[string]any{ + "type": "step.set", + "name": "extract", + "values": map[string]any{ + "user_id": "{{.user.id}}", + }, + }, + map[string]any{ + "type": "step.set", + "name": "annotate", + "values": map[string]any{ + "label": "user-{{.user_id}}", + }, + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "users": []any{ + map[string]any{"id": "u1"}, + map[string]any{"id": "u2"}, + }, + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["count"] != 2 { + t.Fatalf("expected count=2, got %v", result.Output["count"]) + } + + results := result.Output["results"].([]any) + first := results[0].(map[string]any) + if first["label"] != "user-u1" { + t.Errorf("expected label='user-u1', got %v", first["label"]) + } + + second := results[1].(map[string]any) + if second["label"] != "user-u2" { + t.Errorf("expected label='user-u2', got %v", second["label"]) + } +} + +func TestForEachStep_CollectionFromStepOutputs(t *testing.T) { + step, err := buildTestForEachStep(t, "foreach-from-step", map[string]any{ + "collection": "steps.fetch.rows", + "steps": []any{ + map[string]any{ + "type": "step.set", + "name": "tag", + "values": map[string]any{ + "tagged": "yes", + }, + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("fetch", map[string]any{ + "rows": []any{ + map[string]any{"id": 1}, + map[string]any{"id": 2}, + }, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["count"] != 2 { + t.Errorf("expected count=2, got %v", result.Output["count"]) + } +} + +// Compile-time check: ensure the step_fail factory signature matches StepFactory. +// This avoids having an unused import of fmt. +var _ = fmt.Sprintf diff --git a/module/pipeline_step_webhook_verify.go b/module/pipeline_step_webhook_verify.go new file mode 100644 index 00000000..9e658357 --- /dev/null +++ b/module/pipeline_step_webhook_verify.go @@ -0,0 +1,269 @@ +package module + +import ( + "context" + "crypto/hmac" + "crypto/sha256" + "crypto/subtle" + "encoding/hex" + "fmt" + "io" + "net/http" + "os" + "strconv" + "strings" + "time" + + "github.com/CrisisTextLine/modular" +) + +const ( + webhookVerifyProviderGitHub = "github" + webhookVerifyProviderStripe = "stripe" + webhookVerifyProviderGeneric = "generic" + + // stripeTimestampTolerance is the maximum allowed age of a Stripe timestamp. + stripeTimestampTolerance = 5 * time.Minute +) + +// WebhookVerifyStep verifies HMAC signatures for incoming webhook requests. +type WebhookVerifyStep struct { + name string + provider string + secret string + header string +} + +// NewWebhookVerifyStepFactory returns a StepFactory that creates WebhookVerifyStep instances. +func NewWebhookVerifyStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + provider, _ := config["provider"].(string) + if provider == "" { + return nil, fmt.Errorf("webhook_verify step %q: 'provider' is required (github, stripe, or generic)", name) + } + + switch provider { + case webhookVerifyProviderGitHub, webhookVerifyProviderStripe, webhookVerifyProviderGeneric: + // valid + default: + return nil, fmt.Errorf("webhook_verify step %q: unknown provider %q (must be github, stripe, or generic)", name, provider) + } + + secret, _ := config["secret"].(string) + if secret == "" { + return nil, fmt.Errorf("webhook_verify step %q: 'secret' is required", name) + } + + // Expand environment variable references (e.g., "$MY_SECRET" or "${MY_SECRET}") + secret = expandEnvSecret(secret) + + header, _ := config["header"].(string) + + return &WebhookVerifyStep{ + name: name, + provider: provider, + secret: secret, + header: header, + }, nil + } +} + +// Name returns the step name. +func (s *WebhookVerifyStep) Name() string { return s.name } + +// Execute verifies the webhook signature from the HTTP request in pipeline context metadata. +func (s *WebhookVerifyStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { + req, _ := pc.Metadata["_http_request"].(*http.Request) + if req == nil { + return s.unauthorized(pc, "no HTTP request in pipeline context") + } + + // Read the request body. Body may have been read already; use raw body from metadata if present. + body, err := s.readBody(req, pc) + if err != nil { + return s.unauthorized(pc, fmt.Sprintf("failed to read request body: %v", err)) + } + + switch s.provider { + case webhookVerifyProviderGitHub: + return s.verifyGitHub(req, body, pc) + case webhookVerifyProviderStripe: + return s.verifyStripe(req, body, pc) + case webhookVerifyProviderGeneric: + return s.verifyGeneric(req, body, pc) + default: + return s.unauthorized(pc, fmt.Sprintf("unknown provider: %s", s.provider)) + } +} + +// verifyGitHub checks the X-Hub-Signature-256 header (format: sha256=). +func (s *WebhookVerifyStep) verifyGitHub(req *http.Request, body []byte, pc *PipelineContext) (*StepResult, error) { + sig := req.Header.Get("X-Hub-Signature-256") + if sig == "" { + return s.unauthorized(pc, "missing X-Hub-Signature-256 header") + } + + if !strings.HasPrefix(sig, "sha256=") { + return s.unauthorized(pc, "X-Hub-Signature-256 must have format sha256=") + } + + sigHex := strings.TrimPrefix(sig, "sha256=") + sigBytes, err := hex.DecodeString(sigHex) + if err != nil { + return s.unauthorized(pc, "invalid hex in X-Hub-Signature-256") + } + + expected := computeHMACSHA256([]byte(s.secret), body) + if subtle.ConstantTimeCompare(expected, sigBytes) != 1 { + return s.unauthorized(pc, "signature mismatch") + } + + return &StepResult{ + Output: map[string]any{"verified": true}, + }, nil +} + +// verifyStripe checks the Stripe-Signature header (format: t=,v1=). +func (s *WebhookVerifyStep) verifyStripe(req *http.Request, body []byte, pc *PipelineContext) (*StepResult, error) { + sig := req.Header.Get("Stripe-Signature") + if sig == "" { + return s.unauthorized(pc, "missing Stripe-Signature header") + } + + timestamp, v1Sigs, err := parseStripeSignature(sig) + if err != nil { + return s.unauthorized(pc, fmt.Sprintf("invalid Stripe-Signature: %v", err)) + } + + // Validate timestamp is within tolerance + ts := time.Unix(timestamp, 0) + age := time.Since(ts) + if age < 0 { + age = -age + } + if age > stripeTimestampTolerance { + return s.unauthorized(pc, fmt.Sprintf("Stripe timestamp is too old or too far in the future (%v)", age)) + } + + // Stripe signed payload: "." + signedPayload := fmt.Sprintf("%d.%s", timestamp, string(body)) + expected := computeHMACSHA256([]byte(s.secret), []byte(signedPayload)) + expectedHex := hex.EncodeToString(expected) + + // Check any of the v1 signatures + for _, candidate := range v1Sigs { + if subtle.ConstantTimeCompare([]byte(expectedHex), []byte(candidate)) == 1 { + return &StepResult{ + Output: map[string]any{"verified": true, "timestamp": timestamp}, + }, nil + } + } + + return s.unauthorized(pc, "signature mismatch") +} + +// verifyGeneric checks a configurable header (default: X-Signature) with raw hex HMAC-SHA256. +func (s *WebhookVerifyStep) verifyGeneric(req *http.Request, body []byte, pc *PipelineContext) (*StepResult, error) { + headerName := s.header + if headerName == "" { + headerName = "X-Signature" + } + + sig := req.Header.Get(headerName) + if sig == "" { + return s.unauthorized(pc, fmt.Sprintf("missing %s header", headerName)) + } + + sigBytes, err := hex.DecodeString(sig) + if err != nil { + return s.unauthorized(pc, fmt.Sprintf("invalid hex in %s", headerName)) + } + + expected := computeHMACSHA256([]byte(s.secret), body) + if subtle.ConstantTimeCompare(expected, sigBytes) != 1 { + return s.unauthorized(pc, "signature mismatch") + } + + return &StepResult{ + Output: map[string]any{"verified": true}, + }, nil +} + +// unauthorized writes a 401 response if a response writer is available, and returns Stop: true. +func (s *WebhookVerifyStep) unauthorized(pc *PipelineContext, reason string) (*StepResult, error) { + if w, ok := pc.Metadata["_http_response_writer"].(http.ResponseWriter); ok { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte(`{"error":"unauthorized","reason":"webhook signature verification failed"}`)) + } + return &StepResult{ + Stop: true, + Output: map[string]any{"verified": false, "reason": reason}, + }, nil +} + +// readBody reads the request body, preferring a cached copy in pipeline metadata. +func (s *WebhookVerifyStep) readBody(req *http.Request, pc *PipelineContext) ([]byte, error) { + // Check if raw body is already cached in metadata + if raw, ok := pc.Metadata["_raw_body"].([]byte); ok { + return raw, nil + } + + if req.Body == nil { + return []byte{}, nil + } + + body, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + + // Cache it for other steps that may need the raw body + pc.Metadata["_raw_body"] = body + + return body, nil +} + +// computeHMACSHA256 returns the HMAC-SHA256 of data using key. +func computeHMACSHA256(key, data []byte) []byte { + mac := hmac.New(sha256.New, key) + mac.Write(data) + return mac.Sum(nil) +} + +// parseStripeSignature parses the Stripe-Signature header. +// Format: t=,v1=[,v1=]... +func parseStripeSignature(sig string) (int64, []string, error) { + var timestamp int64 + var v1Sigs []string + + parts := strings.Split(sig, ",") + for _, part := range parts { + part = strings.TrimSpace(part) + if strings.HasPrefix(part, "t=") { + tsStr := strings.TrimPrefix(part, "t=") + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + return 0, nil, fmt.Errorf("invalid timestamp: %w", err) + } + timestamp = ts + } else if strings.HasPrefix(part, "v1=") { + v1Sigs = append(v1Sigs, strings.TrimPrefix(part, "v1=")) + } + } + + if timestamp == 0 { + return 0, nil, fmt.Errorf("missing timestamp (t=) in Stripe-Signature") + } + if len(v1Sigs) == 0 { + return 0, nil, fmt.Errorf("missing v1 signature in Stripe-Signature") + } + + return timestamp, v1Sigs, nil +} + +// expandEnvSecret expands environment variable references in the secret string. +// Supports $VAR_NAME and ${VAR_NAME} formats. +func expandEnvSecret(secret string) string { + return os.ExpandEnv(secret) +} diff --git a/module/pipeline_step_webhook_verify_test.go b/module/pipeline_step_webhook_verify_test.go new file mode 100644 index 00000000..e2896a15 --- /dev/null +++ b/module/pipeline_step_webhook_verify_test.go @@ -0,0 +1,403 @@ +package module + +import ( + "bytes" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// computeTestHMAC is a test helper to compute HMAC-SHA256. +func computeTestHMAC(secret, data string) string { + mac := hmac.New(sha256.New, []byte(secret)) + mac.Write([]byte(data)) + return hex.EncodeToString(mac.Sum(nil)) +} + +func TestWebhookVerifyStep_ValidGitHub(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-gh", map[string]any{ + "provider": "github", + "secret": "my-secret", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := []byte(`{"action":"opened","number":1}`) + sig := "sha256=" + computeTestHMAC("my-secret", string(body)) + + req := httptest.NewRequest(http.MethodPost, "/webhook", bytes.NewReader(body)) + req.Header.Set("X-Hub-Signature-256", sig) + req.Header.Set("Content-Type", "application/json") + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Stop { + t.Errorf("expected Stop=false on valid signature, got true (reason: %v)", result.Output["reason"]) + } + if result.Output["verified"] != true { + t.Errorf("expected verified=true, got %v", result.Output["verified"]) + } +} + +func TestWebhookVerifyStep_InvalidGitHub(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-gh-bad", map[string]any{ + "provider": "github", + "secret": "my-secret", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := []byte(`{"action":"opened"}`) + badSig := "sha256=" + computeTestHMAC("wrong-secret", string(body)) + + req := httptest.NewRequest(http.MethodPost, "/webhook", bytes.NewReader(body)) + req.Header.Set("X-Hub-Signature-256", badSig) + + w := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + "_http_response_writer": w, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Stop { + t.Error("expected Stop=true on invalid signature") + } + if result.Output["verified"] != false { + t.Errorf("expected verified=false, got %v", result.Output["verified"]) + } + if w.Code != http.StatusUnauthorized { + t.Errorf("expected HTTP 401, got %d", w.Code) + } +} + +func TestWebhookVerifyStep_MissingGitHubHeader(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-gh-missing", map[string]any{ + "provider": "github", + "secret": "my-secret", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/webhook", strings.NewReader(`{}`)) + // No X-Hub-Signature-256 header + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Stop { + t.Error("expected Stop=true on missing signature header") + } +} + +func TestWebhookVerifyStep_ValidStripe(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-stripe", map[string]any{ + "provider": "stripe", + "secret": "whsec_test", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := []byte(`{"type":"payment_intent.succeeded"}`) + timestamp := time.Now().Unix() + signedPayload := fmt.Sprintf("%d.%s", timestamp, string(body)) + sig := computeTestHMAC("whsec_test", signedPayload) + stripeHeader := fmt.Sprintf("t=%d,v1=%s", timestamp, sig) + + req := httptest.NewRequest(http.MethodPost, "/webhook/stripe", bytes.NewReader(body)) + req.Header.Set("Stripe-Signature", stripeHeader) + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Stop { + t.Errorf("expected Stop=false on valid Stripe signature, reason: %v", result.Output["reason"]) + } + if result.Output["verified"] != true { + t.Errorf("expected verified=true, got %v", result.Output["verified"]) + } +} + +func TestWebhookVerifyStep_StripeExpiredTimestamp(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-stripe-expired", map[string]any{ + "provider": "stripe", + "secret": "whsec_test", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := []byte(`{"type":"payment_intent.succeeded"}`) + // Timestamp 10 minutes in the past — beyond the 5-minute tolerance + timestamp := time.Now().Add(-10 * time.Minute).Unix() + signedPayload := fmt.Sprintf("%d.%s", timestamp, string(body)) + sig := computeTestHMAC("whsec_test", signedPayload) + stripeHeader := fmt.Sprintf("t=%d,v1=%s", timestamp, sig) + + req := httptest.NewRequest(http.MethodPost, "/webhook/stripe", bytes.NewReader(body)) + req.Header.Set("Stripe-Signature", stripeHeader) + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Stop { + t.Error("expected Stop=true for expired Stripe timestamp") + } + reason, _ := result.Output["reason"].(string) + if !strings.Contains(reason, "too old") { + t.Errorf("expected 'too old' in reason, got: %q", reason) + } +} + +func TestWebhookVerifyStep_InvalidStripeSignature(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-stripe-bad", map[string]any{ + "provider": "stripe", + "secret": "whsec_test", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := []byte(`{"type":"payment_intent.succeeded"}`) + timestamp := time.Now().Unix() + // Use wrong secret to generate signature + sig := computeTestHMAC("wrong-secret", fmt.Sprintf("%d.%s", timestamp, string(body))) + stripeHeader := fmt.Sprintf("t=%d,v1=%s", timestamp, sig) + + req := httptest.NewRequest(http.MethodPost, "/webhook/stripe", bytes.NewReader(body)) + req.Header.Set("Stripe-Signature", stripeHeader) + + w := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + "_http_response_writer": w, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Stop { + t.Error("expected Stop=true on invalid Stripe signature") + } + if w.Code != http.StatusUnauthorized { + t.Errorf("expected HTTP 401, got %d", w.Code) + } +} + +func TestWebhookVerifyStep_ValidGeneric(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-generic", map[string]any{ + "provider": "generic", + "secret": "generic-secret", + "header": "X-My-Signature", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := []byte(`{"event":"test"}`) + sig := computeTestHMAC("generic-secret", string(body)) + + req := httptest.NewRequest(http.MethodPost, "/webhook/custom", bytes.NewReader(body)) + req.Header.Set("X-My-Signature", sig) + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Stop { + t.Errorf("expected Stop=false on valid generic signature, reason: %v", result.Output["reason"]) + } + if result.Output["verified"] != true { + t.Errorf("expected verified=true, got %v", result.Output["verified"]) + } +} + +func TestWebhookVerifyStep_GenericDefaultHeader(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-generic-default", map[string]any{ + "provider": "generic", + "secret": "generic-secret", + // no "header" field — should default to X-Signature + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := []byte(`{"event":"test"}`) + sig := computeTestHMAC("generic-secret", string(body)) + + req := httptest.NewRequest(http.MethodPost, "/webhook/custom", bytes.NewReader(body)) + req.Header.Set("X-Signature", sig) + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Stop { + t.Errorf("expected Stop=false on valid generic signature (default header), reason: %v", result.Output["reason"]) + } +} + +func TestWebhookVerifyStep_MissingGenericHeader(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-generic-missing", map[string]any{ + "provider": "generic", + "secret": "generic-secret", + "header": "X-My-Sig", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/webhook/custom", strings.NewReader(`{}`)) + // No X-My-Sig header + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Stop { + t.Error("expected Stop=true when signature header is missing") + } +} + +func TestWebhookVerifyStep_NoHTTPRequest(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-no-req", map[string]any{ + "provider": "github", + "secret": "my-secret", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + // No _http_request in metadata + pc := NewPipelineContext(nil, nil) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !result.Stop { + t.Error("expected Stop=true when no HTTP request is in context") + } +} + +func TestWebhookVerifyStep_FactoryRejectsMissingProvider(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + _, err := factory("bad-verify", map[string]any{ + "secret": "my-secret", + }, nil) + if err == nil { + t.Fatal("expected error for missing 'provider'") + } +} + +func TestWebhookVerifyStep_FactoryRejectsUnknownProvider(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + _, err := factory("bad-verify", map[string]any{ + "provider": "unknown-provider", + "secret": "my-secret", + }, nil) + if err == nil { + t.Fatal("expected error for unknown provider") + } +} + +func TestWebhookVerifyStep_FactoryRejectsMissingSecret(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + _, err := factory("bad-verify", map[string]any{ + "provider": "github", + }, nil) + if err == nil { + t.Fatal("expected error for missing 'secret'") + } +} + +func TestWebhookVerifyStep_RawBodyCachedInMetadata(t *testing.T) { + factory := NewWebhookVerifyStepFactory() + step, err := factory("verify-cached-body", map[string]any{ + "provider": "github", + "secret": "cached-secret", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := []byte(`{"cached":"body"}`) + sig := "sha256=" + computeTestHMAC("cached-secret", string(body)) + + // Provide the body as raw bytes in metadata (simulating pre-read body) + req := httptest.NewRequest(http.MethodPost, "/webhook", http.NoBody) + req.Header.Set("X-Hub-Signature-256", sig) + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + "_raw_body": body, + }) + + result, err := step.Execute(t.Context(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Stop { + t.Errorf("expected Stop=false when using cached body, reason: %v", result.Output["reason"]) + } + if result.Output["verified"] != true { + t.Errorf("expected verified=true, got %v", result.Output["verified"]) + } +} diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index 4ccb12a4..c02f74f9 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -1,7 +1,8 @@ // Package pipelinesteps provides a plugin that registers generic pipeline step // types: validate, transform, conditional, set, log, delegate, jq, publish, // http_call, request_parse, db_query, db_exec, json_response, -// validate_path_param, validate_pagination, validate_request_body. +// validate_path_param, validate_pagination, validate_request_body, +// foreach, webhook_verify. // It also provides the PipelineWorkflowHandler for composable pipelines. package pipelinesteps @@ -28,8 +29,9 @@ type Plugin struct { // pipelineHandler is retained so the wiring hook can inject dependencies. pipelineHandler *handlers.PipelineWorkflowHandler // stepRegistry and logger are injected by the engine via optional setter interfaces. - stepRegistry interfaces.StepRegistryProvider - logger *slog.Logger + stepRegistry interfaces.StepRegistryProvider + concreteStepRegistry *module.StepRegistry + logger *slog.Logger } // New creates a new pipeline-steps plugin. @@ -39,7 +41,7 @@ func New() *Plugin { BaseNativePlugin: plugin.BaseNativePlugin{ PluginName: "pipeline-steps", PluginVersion: "1.0.0", - PluginDescription: "Generic pipeline step types (validate, transform, conditional, set, log, delegate, jq, validate_path_param, validate_pagination, validate_request_body, etc.)", + PluginDescription: "Generic pipeline step types (validate, transform, conditional, set, log, delegate, jq, validate_path_param, validate_pagination, validate_request_body, foreach, webhook_verify, etc.)", }, Manifest: plugin.PluginManifest{ Name: "pipeline-steps", @@ -65,6 +67,8 @@ func New() *Plugin { "step.validate_path_param", "step.validate_pagination", "step.validate_request_body", + "step.foreach", + "step.webhook_verify", }, WorkflowTypes: []string{"pipeline"}, Capabilities: []plugin.CapabilityDecl{ @@ -80,7 +84,7 @@ func (p *Plugin) Capabilities() []capability.Contract { return []capability.Contract{ { Name: "pipeline-steps", - Description: "Generic pipeline step operations: validate, transform, conditional, set, log, delegate, jq, etc.", + Description: "Generic pipeline step operations: validate, transform, conditional, set, log, delegate, jq, foreach, webhook_verify, etc.", }, } } @@ -104,6 +108,12 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.validate_path_param": wrapStepFactory(module.NewValidatePathParamStepFactory()), "step.validate_pagination": wrapStepFactory(module.NewValidatePaginationStepFactory()), "step.validate_request_body": wrapStepFactory(module.NewValidateRequestBodyStepFactory()), + // step.foreach uses a lazy registry getter so it can reference any registered step type, + // including types registered by other plugins loaded after this one. + "step.foreach": wrapStepFactory(module.NewForEachStepFactory(func() *module.StepRegistry { + return p.concreteStepRegistry + }, nil)), + "step.webhook_verify": wrapStepFactory(module.NewWebhookVerifyStepFactory()), } } @@ -119,8 +129,15 @@ func (p *Plugin) WorkflowHandlers() map[string]plugin.WorkflowHandlerFactory { // SetStepRegistry is called by the engine (via optional-interface detection in LoadPlugin) // to inject the step registry after all step factories have been registered. +// It also stores the concrete *module.StepRegistry so that step.foreach can build +// sub-steps using the full registry at step-creation time. func (p *Plugin) SetStepRegistry(registry interfaces.StepRegistryProvider) { p.stepRegistry = registry + // Type-assert to the concrete registry so step.foreach can call Create(). + // The engine always passes *module.StepRegistry; this is safe. + if concrete, ok := registry.(*module.StepRegistry); ok { + p.concreteStepRegistry = concrete + } } // SetLogger is called by the engine (via optional-interface detection in LoadPlugin) diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index 338ff99c..8314fc0e 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -47,6 +47,8 @@ func TestStepFactories(t *testing.T) { "step.validate_path_param", "step.validate_pagination", "step.validate_request_body", + "step.foreach", + "step.webhook_verify", } for _, stepType := range expectedSteps { @@ -68,7 +70,7 @@ func TestPluginLoads(t *testing.T) { } steps := loader.StepFactories() - if len(steps) != 16 { - t.Fatalf("expected 16 step factories after load, got %d", len(steps)) + if len(steps) != 18 { + t.Fatalf("expected 18 step factories after load, got %d", len(steps)) } }