Skip to content

Commit bd13eeb

Browse files
intel352claude
andauthored
feat: add step.foreach and step.webhook_verify pipeline steps (#158)
step.foreach iterates over a collection (resolved from pipeline context by key or dot-path like "steps.fetch.rows"), executes inline sub-steps for each item, and returns {results, count}. Sub-steps share a child context with item_key (default "item") and index_key (default "index") injected. An error in any sub-step stops iteration immediately. step.webhook_verify validates HMAC-SHA256 signatures for incoming webhooks before the pipeline continues. Supports: - GitHub: X-Hub-Signature-256 (sha256=<hex>) - Stripe: Stripe-Signature (t=<ts>,v1=<hex>) with 5-min timestamp window - Generic: configurable header (default X-Signature, raw hex) On failure writes HTTP 401 and returns StepResult{Stop: true}. On success returns {verified: true}. The plugin stores the concrete *module.StepRegistry so step.foreach can create sub-steps of any registered type at pipeline-configuration time. Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent e948204 commit bd13eeb

6 files changed

Lines changed: 1297 additions & 7 deletions

File tree

module/pipeline_step_foreach.go

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
package module
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"maps"
7+
8+
"github.com/CrisisTextLine/modular"
9+
)
10+
11+
// ForEachStep iterates over a collection and executes sub-steps for each item.
12+
type ForEachStep struct {
13+
name string
14+
collection string
15+
itemKey string
16+
indexKey string
17+
subSteps []PipelineStep
18+
tmpl *TemplateEngine
19+
}
20+
21+
// NewForEachStepFactory returns a StepFactory that creates ForEachStep instances.
22+
// registryFn is called at step-creation time to obtain the step registry used to
23+
// build sub-steps. Passing a function (rather than the registry directly) allows
24+
// the factory to be registered before the registry is fully populated, enabling
25+
// sub-steps to themselves be any registered step type.
26+
func NewForEachStepFactory(registryFn func() *StepRegistry, app modular.Application) StepFactory {
27+
return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) {
28+
registry := registryFn()
29+
30+
collection, _ := config["collection"].(string)
31+
if collection == "" {
32+
return nil, fmt.Errorf("foreach step %q: 'collection' is required", name)
33+
}
34+
35+
itemKey, _ := config["item_key"].(string)
36+
if itemKey == "" {
37+
itemKey = "item"
38+
}
39+
40+
indexKey, _ := config["index_key"].(string)
41+
if indexKey == "" {
42+
indexKey = "index"
43+
}
44+
45+
// Build sub-steps from inline config
46+
stepsRaw, _ := config["steps"].([]any)
47+
subSteps := make([]PipelineStep, 0, len(stepsRaw))
48+
for i, raw := range stepsRaw {
49+
stepCfg, ok := raw.(map[string]any)
50+
if !ok {
51+
return nil, fmt.Errorf("foreach step %q: steps[%d] must be a map", name, i)
52+
}
53+
54+
stepType, _ := stepCfg["type"].(string)
55+
if stepType == "" {
56+
return nil, fmt.Errorf("foreach step %q: steps[%d] missing 'type'", name, i)
57+
}
58+
59+
stepName, _ := stepCfg["name"].(string)
60+
if stepName == "" {
61+
stepName = fmt.Sprintf("%s-sub-%d", name, i)
62+
}
63+
64+
// Build the step config without meta fields
65+
subCfg := make(map[string]any)
66+
for k, v := range stepCfg {
67+
if k != "type" && k != "name" {
68+
subCfg[k] = v
69+
}
70+
}
71+
72+
step, err := registry.Create(stepType, stepName, subCfg, app)
73+
if err != nil {
74+
return nil, fmt.Errorf("foreach step %q: failed to build sub-step %d (%s): %w", name, i, stepType, err)
75+
}
76+
subSteps = append(subSteps, step)
77+
}
78+
79+
return &ForEachStep{
80+
name: name,
81+
collection: collection,
82+
itemKey: itemKey,
83+
indexKey: indexKey,
84+
subSteps: subSteps,
85+
tmpl: NewTemplateEngine(),
86+
}, nil
87+
}
88+
}
89+
90+
// Name returns the step name.
91+
func (s *ForEachStep) Name() string { return s.name }
92+
93+
// Execute iterates over the collection and runs sub-steps for each item.
94+
func (s *ForEachStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) {
95+
// Resolve the collection from the pipeline context
96+
items, err := s.resolveCollection(pc)
97+
if err != nil {
98+
return nil, fmt.Errorf("foreach step %q: %w", s.name, err)
99+
}
100+
101+
// Handle empty collections gracefully
102+
if len(items) == 0 {
103+
return &StepResult{
104+
Output: map[string]any{
105+
"results": []any{},
106+
"count": 0,
107+
},
108+
}, nil
109+
}
110+
111+
collected := make([]any, 0, len(items))
112+
113+
for i, item := range items {
114+
// Create a child context with item and index injected
115+
childPC := s.buildChildContext(pc, item, i)
116+
117+
// Execute each sub-step sequentially for this item
118+
iterResult := make(map[string]any)
119+
for _, step := range s.subSteps {
120+
result, execErr := step.Execute(ctx, childPC)
121+
if execErr != nil {
122+
return nil, fmt.Errorf("foreach step %q: iteration %d, sub-step %q failed: %w",
123+
s.name, i, step.Name(), execErr)
124+
}
125+
if result != nil && result.Output != nil {
126+
childPC.MergeStepOutput(step.Name(), result.Output)
127+
maps.Copy(iterResult, result.Output)
128+
}
129+
if result != nil && result.Stop {
130+
break
131+
}
132+
}
133+
collected = append(collected, iterResult)
134+
}
135+
136+
return &StepResult{
137+
Output: map[string]any{
138+
"results": collected,
139+
"count": len(collected),
140+
},
141+
}, nil
142+
}
143+
144+
// resolveCollection resolves the collection field to a []any.
145+
func (s *ForEachStep) resolveCollection(pc *PipelineContext) ([]any, error) {
146+
// Look up the field path directly in Current (handles simple keys)
147+
if val, ok := pc.Current[s.collection]; ok {
148+
return foreachToSlice(val)
149+
}
150+
151+
// Try trigger data
152+
if val, ok := pc.TriggerData[s.collection]; ok {
153+
return foreachToSlice(val)
154+
}
155+
156+
// Try dot-separated path through the full template data
157+
// (e.g., "steps.fetch.rows" or nested keys)
158+
data := make(map[string]any)
159+
maps.Copy(data, pc.Current)
160+
data["steps"] = pc.StepOutputs
161+
data["trigger"] = pc.TriggerData
162+
163+
if val, found := foreachWalkPath(data, s.collection); found {
164+
return foreachToSlice(val)
165+
}
166+
167+
return nil, fmt.Errorf("collection %q not found in context", s.collection)
168+
}
169+
170+
// buildChildContext creates a child PipelineContext with item and index injected.
171+
func (s *ForEachStep) buildChildContext(parent *PipelineContext, item any, index int) *PipelineContext {
172+
// Copy trigger data
173+
childTrigger := make(map[string]any)
174+
maps.Copy(childTrigger, parent.TriggerData)
175+
176+
// Copy metadata
177+
childMeta := make(map[string]any)
178+
maps.Copy(childMeta, parent.Metadata)
179+
180+
// Build current: start with parent's current, inject item and index
181+
childCurrent := make(map[string]any)
182+
maps.Copy(childCurrent, parent.Current)
183+
childCurrent[s.itemKey] = item
184+
childCurrent[s.indexKey] = index
185+
186+
// Copy step outputs
187+
childOutputs := make(map[string]map[string]any)
188+
for k, v := range parent.StepOutputs {
189+
out := make(map[string]any)
190+
maps.Copy(out, v)
191+
childOutputs[k] = out
192+
}
193+
194+
return &PipelineContext{
195+
TriggerData: childTrigger,
196+
StepOutputs: childOutputs,
197+
Current: childCurrent,
198+
Metadata: childMeta,
199+
}
200+
}
201+
202+
// foreachWalkPath traverses a dot-separated path through nested maps.
203+
// Returns the found value and true if found, or nil and false if not.
204+
// It handles both map[string]any and map[string]map[string]any (step outputs).
205+
func foreachWalkPath(data map[string]any, path string) (any, bool) {
206+
// Try the full path as a key first
207+
if val, ok := data[path]; ok {
208+
return val, true
209+
}
210+
211+
// Walk dot-separated segments
212+
current := any(data)
213+
segments := foreachSplitPath(path)
214+
for _, seg := range segments {
215+
switch m := current.(type) {
216+
case map[string]any:
217+
val, ok := m[seg]
218+
if !ok {
219+
return nil, false
220+
}
221+
current = val
222+
case map[string]map[string]any:
223+
// Step outputs are stored as map[string]map[string]any
224+
val, ok := m[seg]
225+
if !ok {
226+
return nil, false
227+
}
228+
current = val
229+
default:
230+
return nil, false
231+
}
232+
}
233+
return current, true
234+
}
235+
236+
// foreachSplitPath splits a dot-separated path into segments.
237+
func foreachSplitPath(path string) []string {
238+
var segs []string
239+
start := 0
240+
for i := 0; i < len(path); i++ {
241+
if path[i] == '.' {
242+
segs = append(segs, path[start:i])
243+
start = i + 1
244+
}
245+
}
246+
segs = append(segs, path[start:])
247+
return segs
248+
}
249+
250+
// foreachToSlice converts a value to []any if possible.
251+
func foreachToSlice(val any) ([]any, error) {
252+
switch v := val.(type) {
253+
case []any:
254+
return v, nil
255+
case []map[string]any:
256+
result := make([]any, len(v))
257+
for i, item := range v {
258+
result[i] = item
259+
}
260+
return result, nil
261+
case []string:
262+
result := make([]any, len(v))
263+
for i, item := range v {
264+
result[i] = item
265+
}
266+
return result, nil
267+
case []int:
268+
result := make([]any, len(v))
269+
for i, item := range v {
270+
result[i] = item
271+
}
272+
return result, nil
273+
case nil:
274+
return []any{}, nil
275+
default:
276+
return nil, fmt.Errorf("expected a slice, got %T", val)
277+
}
278+
}

0 commit comments

Comments
 (0)