Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/wfctl/type_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func KnownStepTypes() map[string]StepTypeInfo {
"step.foreach": {
Type: "step.foreach",
Plugin: "pipelinesteps",
ConfigKeys: []string{"collection", "steps"},
ConfigKeys: []string{"collection", "item_var", "item_key", "step", "steps", "index_key"},
},
"step.webhook_verify": {
Type: "step.webhook_verify",
Expand Down
85 changes: 55 additions & 30 deletions module/pipeline_step_foreach.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ type ForEachStep struct {
// build sub-steps. Passing a function (rather than the registry directly) allows
// the factory to be registered before the registry is fully populated, enabling
// sub-steps to themselves be any registered step type.
func NewForEachStepFactory(registryFn func() *StepRegistry, app modular.Application) StepFactory {
return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) {
registry := registryFn()

func NewForEachStepFactory(registryFn func() *StepRegistry) StepFactory {
return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) {
collection, _ := config["collection"].(string)
if collection == "" {
return nil, fmt.Errorf("foreach step %q: 'collection' is required", name)
}

itemKey, _ := config["item_key"].(string)
// item_var is the canonical name; item_key is kept for backward compatibility.
itemKey, _ := config["item_var"].(string)
if itemKey == "" {
itemKey, _ = config["item_key"].(string)
}
if itemKey == "" {
itemKey = "item"
}
Expand All @@ -42,38 +44,49 @@ func NewForEachStepFactory(registryFn func() *StepRegistry, app modular.Applicat
indexKey = "index"
}

// Build sub-steps from inline config
stepsRaw, _ := config["steps"].([]any)
subSteps := make([]PipelineStep, 0, len(stepsRaw))
for i, raw := range stepsRaw {
stepCfg, ok := raw.(map[string]any)
// Detect presence of each key before type-asserting so we can give clear errors.
_, hasSingleStep := config["step"]
_, hasStepsList := config["steps"]

if hasSingleStep && hasStepsList {
return nil, fmt.Errorf("foreach step %q: 'step' and 'steps' are mutually exclusive", name)
}

// Build sub-steps: support a single "step" key or a "steps" list.
var subSteps []PipelineStep

switch {
case hasSingleStep:
singleRaw, ok := config["step"].(map[string]any)
if !ok {
return nil, fmt.Errorf("foreach step %q: steps[%d] must be a map", name, i)
return nil, fmt.Errorf("foreach step %q: 'step' must be a map", name)
}

stepType, _ := stepCfg["type"].(string)
if stepType == "" {
return nil, fmt.Errorf("foreach step %q: steps[%d] missing 'type'", name, i)
step, err := buildSubStep(name, "step", singleRaw, registryFn, app)
if err != nil {
return nil, fmt.Errorf("foreach step %q: %w", name, err)
}
subSteps = []PipelineStep{step}

stepName, _ := stepCfg["name"].(string)
if stepName == "" {
stepName = fmt.Sprintf("%s-sub-%d", name, i)
case hasStepsList:
stepsRaw, ok := config["steps"].([]any)
if !ok {
return nil, fmt.Errorf("foreach step %q: 'steps' must be a list", name)
}

// Build the step config without meta fields
subCfg := make(map[string]any)
for k, v := range stepCfg {
if k != "type" && k != "name" {
subCfg[k] = v
subSteps = make([]PipelineStep, 0, len(stepsRaw))
for i, raw := range stepsRaw {
stepCfg, ok := raw.(map[string]any)
if !ok {
return nil, fmt.Errorf("foreach step %q: steps[%d] must be a map", name, i)
}
step, err := buildSubStep(name, fmt.Sprintf("sub-%d", i), stepCfg, registryFn, app)
if err != nil {
return nil, fmt.Errorf("foreach step %q: %w", name, err)
}
subSteps = append(subSteps, step)
}

step, err := registry.Create(stepType, stepName, subCfg, app)
if err != nil {
return nil, fmt.Errorf("foreach step %q: failed to build sub-step %d (%s): %w", name, i, stepType, err)
}
subSteps = append(subSteps, step)
default:
subSteps = []PipelineStep{}
}

return &ForEachStep{
Expand Down Expand Up @@ -177,12 +190,24 @@ func (s *ForEachStep) buildChildContext(parent *PipelineContext, item any, index
childMeta := make(map[string]any)
maps.Copy(childMeta, parent.Metadata)

// Build current: start with parent's current, inject item and index
// Build current: start with parent's current, inject item and index.
childCurrent := make(map[string]any)
maps.Copy(childCurrent, parent.Current)
childCurrent[s.itemKey] = item
childCurrent[s.indexKey] = index

// Inject a "foreach" map so templates can use {{.foreach.index}}.
// Only set it when it won't conflict with the user-chosen item/index keys
// or an existing "foreach" key in the parent context.
if s.itemKey != "foreach" && s.indexKey != "foreach" {
if _, exists := childCurrent["foreach"]; !exists {
childCurrent["foreach"] = map[string]any{
"index": index,
s.itemKey: item,
}
}
}

// Copy step outputs
childOutputs := make(map[string]map[string]any)
for k, v := range parent.StepOutputs {
Expand Down
248 changes: 246 additions & 2 deletions module/pipeline_step_foreach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func buildTestForEachStep(t *testing.T, name string, config map[string]any) (Pip
registry := NewStepRegistry()
registry.Register("step.set", NewSetStepFactory())
registry.Register("step.log", NewLogStepFactory())
factory := NewForEachStepFactory(func() *StepRegistry { return registry }, nil)
factory := NewForEachStepFactory(func() *StepRegistry { return registry })
return factory(name, config, nil)
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func TestForEachStep_SubStepErrorStopsIteration(t *testing.T) {
}, nil
})

factory := NewForEachStepFactory(func() *StepRegistry { return registry }, nil)
factory := NewForEachStepFactory(func() *StepRegistry { return registry })
step, err := factory("foreach-fail", map[string]any{
"collection": "items",
"steps": []any{
Expand Down Expand Up @@ -319,3 +319,247 @@ func TestForEachStep_CollectionFromStepOutputs(t *testing.T) {
// Compile-time check: ensure the step_fail factory signature matches StepFactory.
// This avoids having an unused import of fmt.
var _ = fmt.Sprintf

func TestForEachStep_ItemVar(t *testing.T) {
// item_var is the canonical config key name from the issue spec.
step, err := buildTestForEachStep(t, "foreach-item-var", map[string]any{
"collection": "rows",
"item_var": "row",
"steps": []any{
map[string]any{
"type": "step.set",
"name": "capture",
"values": map[string]any{
"captured": "{{.row}}",
},
},
},
})
if err != nil {
t.Fatalf("factory error: %v", err)
}

pc := NewPipelineContext(map[string]any{
"rows": []any{"alpha", "beta"},
}, nil)

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

if result.Output["count"] != 2 {
t.Errorf("expected count=2, got %v", result.Output["count"])
}

results := result.Output["results"].([]any)
first := results[0].(map[string]any)
if first["captured"] != "alpha" {
t.Errorf("expected captured='alpha', got %v", first["captured"])
}
}

func TestForEachStep_ForeachIndexInContext(t *testing.T) {
// Each iteration should expose {{.foreach.index}} in the child context.
step, err := buildTestForEachStep(t, "foreach-index", map[string]any{
"collection": "items",
"steps": []any{
map[string]any{
"type": "step.set",
"name": "capture",
"values": map[string]any{
"idx": "{{.foreach.index}}",
},
},
},
})
if err != nil {
t.Fatalf("factory error: %v", err)
}

pc := NewPipelineContext(map[string]any{
"items": []any{"a", "b", "c"},
}, nil)

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

results := result.Output["results"].([]any)
if len(results) != 3 {
t.Fatalf("expected 3 results, got %d", len(results))
}

// Verify indexes are 0, 1, 2
for i, r := range results {
m := r.(map[string]any)
want := fmt.Sprintf("%d", i)
if m["idx"] != want {
t.Errorf("iteration %d: expected idx=%q, got %v", i, want, m["idx"])
}
}
}

func TestForEachStep_SingleStep(t *testing.T) {
// The "step" (singular) config key should work as an alternative to "steps".
step, err := buildTestForEachStep(t, "foreach-single-step", map[string]any{
"collection": "names",
"item_var": "name",
"step": map[string]any{
"type": "step.set",
"name": "process",
"values": map[string]any{
"processed": "{{.name}}",
},
},
})
if err != nil {
t.Fatalf("factory error: %v", err)
}

pc := NewPipelineContext(map[string]any{
"names": []any{"foo", "bar"},
}, nil)

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

if result.Output["count"] != 2 {
t.Errorf("expected count=2, got %v", result.Output["count"])
}

results := result.Output["results"].([]any)
first := results[0].(map[string]any)
if first["processed"] != "foo" {
t.Errorf("expected processed='foo', got %v", first["processed"])
}
second := results[1].(map[string]any)
if second["processed"] != "bar" {
t.Errorf("expected processed='bar', got %v", second["processed"])
}
}

func TestForEachStep_SingleStep_InvalidType(t *testing.T) {
_, err := buildTestForEachStep(t, "foreach-single-bad", map[string]any{
"collection": "items",
"step": map[string]any{
"type": "step.nonexistent",
},
})
if err == nil {
t.Fatal("expected error for unknown step type in 'step' config")
}
}

func TestForEachStep_SingleStep_MissingType(t *testing.T) {
_, err := buildTestForEachStep(t, "foreach-single-notype", map[string]any{
"collection": "items",
"step": map[string]any{
"name": "no-type-here",
},
})
if err == nil {
t.Fatal("expected error when 'step' config has no 'type'")
}
}

func TestForEachStep_StepAndStepsMutuallyExclusive(t *testing.T) {
_, err := buildTestForEachStep(t, "foreach-both", map[string]any{
"collection": "items",
"step": map[string]any{
"type": "step.set",
"name": "s",
"values": map[string]any{"x": "1"},
},
"steps": []any{
map[string]any{"type": "step.set", "name": "s2", "values": map[string]any{"y": "2"}},
},
})
if err == nil {
t.Fatal("expected error when both 'step' and 'steps' are provided")
}
}

func TestForEachStep_StepWrongType(t *testing.T) {
_, err := buildTestForEachStep(t, "foreach-step-wrong-type", map[string]any{
"collection": "items",
"step": "not-a-map",
})
if err == nil {
t.Fatal("expected error when 'step' is not a map")
}
}

func TestForEachStep_StepsWrongType(t *testing.T) {
_, err := buildTestForEachStep(t, "foreach-steps-wrong-type", map[string]any{
"collection": "items",
"steps": "not-a-list",
})
if err == nil {
t.Fatal("expected error when 'steps' is not a list")
}
}

func TestForEachStep_AppPassedToSubStep(t *testing.T) {
// Verifies that the modular.Application passed to the StepFactory is threaded
// through to sub-step factories, not silently dropped.
var capturedApp modular.Application
registry := NewStepRegistry()
registry.Register("step.capture_app", func(name string, _ map[string]any, app modular.Application) (PipelineStep, error) {
capturedApp = app
return &mockStep{
name: name,
execFn: func(_ context.Context, _ *PipelineContext) (*StepResult, error) {
return &StepResult{Output: map[string]any{}}, nil
},
}, nil
})

sentinel := &struct{ modular.Application }{}
factory := NewForEachStepFactory(func() *StepRegistry { return registry })
_, err := factory("foreach-app-test", map[string]any{
"collection": "items",
"step": map[string]any{
"type": "step.capture_app",
},
}, sentinel)
if err != nil {
t.Fatalf("factory error: %v", err)
}
if capturedApp != sentinel {
t.Errorf("expected app to be passed through to sub-step factory; got %v", capturedApp)
}
}

func TestForEachStep_ForeachMapNotSetWhenConflict(t *testing.T) {
// When item_var is "foreach", the "foreach" context key must NOT be overwritten.
step, err := buildTestForEachStep(t, "foreach-conflict", map[string]any{
"collection": "items",
"item_var": "foreach", // would collide with the foreach map
"steps": []any{
map[string]any{
"type": "step.set",
"name": "capture",
"values": map[string]any{
"got": "{{.foreach}}",
},
},
},
})
if err != nil {
t.Fatalf("factory error: %v", err)
}

pc := NewPipelineContext(map[string]any{
"items": []any{"val"},
}, nil)

// Should not panic or error; the "foreach" key holds the item value, not the map.
_, execErr := step.Execute(context.Background(), pc)
if execErr != nil {
t.Fatalf("execute error: %v", execErr)
}
}
Loading
Loading