diff --git a/cmd/wfctl/type_registry.go b/cmd/wfctl/type_registry.go index d9bb8f12..2ac3f2a2 100644 --- a/cmd/wfctl/type_registry.go +++ b/cmd/wfctl/type_registry.go @@ -593,7 +593,7 @@ func KnownStepTypes() map[string]StepTypeInfo { "step.foreach": { Type: "step.foreach", Plugin: "pipelinesteps", - ConfigKeys: []string{"collection", "steps"}, + ConfigKeys: []string{"collection", "item_var", "item_key", "step", "steps", "index_key"}, }, "step.webhook_verify": { Type: "step.webhook_verify", diff --git a/module/pipeline_step_foreach.go b/module/pipeline_step_foreach.go index 90e74b3f..dbd5d52c 100644 --- a/module/pipeline_step_foreach.go +++ b/module/pipeline_step_foreach.go @@ -23,16 +23,18 @@ type ForEachStep struct { // build sub-steps. Passing a function (rather than the registry directly) allows // the factory to be registered before the registry is fully populated, enabling // sub-steps to themselves be any registered step type. -func NewForEachStepFactory(registryFn func() *StepRegistry, app modular.Application) StepFactory { - return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { - registry := registryFn() - +func NewForEachStepFactory(registryFn func() *StepRegistry) StepFactory { + return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { collection, _ := config["collection"].(string) if collection == "" { return nil, fmt.Errorf("foreach step %q: 'collection' is required", name) } - itemKey, _ := config["item_key"].(string) + // item_var is the canonical name; item_key is kept for backward compatibility. + itemKey, _ := config["item_var"].(string) + if itemKey == "" { + itemKey, _ = config["item_key"].(string) + } if itemKey == "" { itemKey = "item" } @@ -42,38 +44,49 @@ func NewForEachStepFactory(registryFn func() *StepRegistry, app modular.Applicat indexKey = "index" } - // Build sub-steps from inline config - stepsRaw, _ := config["steps"].([]any) - subSteps := make([]PipelineStep, 0, len(stepsRaw)) - for i, raw := range stepsRaw { - stepCfg, ok := raw.(map[string]any) + // Detect presence of each key before type-asserting so we can give clear errors. + _, hasSingleStep := config["step"] + _, hasStepsList := config["steps"] + + if hasSingleStep && hasStepsList { + return nil, fmt.Errorf("foreach step %q: 'step' and 'steps' are mutually exclusive", name) + } + + // Build sub-steps: support a single "step" key or a "steps" list. + var subSteps []PipelineStep + + switch { + case hasSingleStep: + singleRaw, ok := config["step"].(map[string]any) if !ok { - return nil, fmt.Errorf("foreach step %q: steps[%d] must be a map", name, i) + return nil, fmt.Errorf("foreach step %q: 'step' must be a map", name) } - - stepType, _ := stepCfg["type"].(string) - if stepType == "" { - return nil, fmt.Errorf("foreach step %q: steps[%d] missing 'type'", name, i) + step, err := buildSubStep(name, "step", singleRaw, registryFn, app) + if err != nil { + return nil, fmt.Errorf("foreach step %q: %w", name, err) } + subSteps = []PipelineStep{step} - stepName, _ := stepCfg["name"].(string) - if stepName == "" { - stepName = fmt.Sprintf("%s-sub-%d", name, i) + case hasStepsList: + stepsRaw, ok := config["steps"].([]any) + if !ok { + return nil, fmt.Errorf("foreach step %q: 'steps' must be a list", name) } - - // Build the step config without meta fields - subCfg := make(map[string]any) - for k, v := range stepCfg { - if k != "type" && k != "name" { - subCfg[k] = v + subSteps = make([]PipelineStep, 0, len(stepsRaw)) + for i, raw := range stepsRaw { + stepCfg, ok := raw.(map[string]any) + if !ok { + return nil, fmt.Errorf("foreach step %q: steps[%d] must be a map", name, i) + } + step, err := buildSubStep(name, fmt.Sprintf("sub-%d", i), stepCfg, registryFn, app) + if err != nil { + return nil, fmt.Errorf("foreach step %q: %w", name, err) } + subSteps = append(subSteps, step) } - step, err := registry.Create(stepType, stepName, subCfg, app) - if err != nil { - return nil, fmt.Errorf("foreach step %q: failed to build sub-step %d (%s): %w", name, i, stepType, err) - } - subSteps = append(subSteps, step) + default: + subSteps = []PipelineStep{} } return &ForEachStep{ @@ -177,12 +190,24 @@ func (s *ForEachStep) buildChildContext(parent *PipelineContext, item any, index childMeta := make(map[string]any) maps.Copy(childMeta, parent.Metadata) - // Build current: start with parent's current, inject item and index + // Build current: start with parent's current, inject item and index. childCurrent := make(map[string]any) maps.Copy(childCurrent, parent.Current) childCurrent[s.itemKey] = item childCurrent[s.indexKey] = index + // Inject a "foreach" map so templates can use {{.foreach.index}}. + // Only set it when it won't conflict with the user-chosen item/index keys + // or an existing "foreach" key in the parent context. + if s.itemKey != "foreach" && s.indexKey != "foreach" { + if _, exists := childCurrent["foreach"]; !exists { + childCurrent["foreach"] = map[string]any{ + "index": index, + s.itemKey: item, + } + } + } + // Copy step outputs childOutputs := make(map[string]map[string]any) for k, v := range parent.StepOutputs { diff --git a/module/pipeline_step_foreach_test.go b/module/pipeline_step_foreach_test.go index 6fcd2cd6..fa4ba18e 100644 --- a/module/pipeline_step_foreach_test.go +++ b/module/pipeline_step_foreach_test.go @@ -16,7 +16,7 @@ func buildTestForEachStep(t *testing.T, name string, config map[string]any) (Pip registry := NewStepRegistry() registry.Register("step.set", NewSetStepFactory()) registry.Register("step.log", NewLogStepFactory()) - factory := NewForEachStepFactory(func() *StepRegistry { return registry }, nil) + factory := NewForEachStepFactory(func() *StepRegistry { return registry }) return factory(name, config, nil) } @@ -175,7 +175,7 @@ func TestForEachStep_SubStepErrorStopsIteration(t *testing.T) { }, nil }) - factory := NewForEachStepFactory(func() *StepRegistry { return registry }, nil) + factory := NewForEachStepFactory(func() *StepRegistry { return registry }) step, err := factory("foreach-fail", map[string]any{ "collection": "items", "steps": []any{ @@ -319,3 +319,247 @@ func TestForEachStep_CollectionFromStepOutputs(t *testing.T) { // Compile-time check: ensure the step_fail factory signature matches StepFactory. // This avoids having an unused import of fmt. var _ = fmt.Sprintf + +func TestForEachStep_ItemVar(t *testing.T) { + // item_var is the canonical config key name from the issue spec. + step, err := buildTestForEachStep(t, "foreach-item-var", map[string]any{ + "collection": "rows", + "item_var": "row", + "steps": []any{ + map[string]any{ + "type": "step.set", + "name": "capture", + "values": map[string]any{ + "captured": "{{.row}}", + }, + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "rows": []any{"alpha", "beta"}, + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["count"] != 2 { + t.Errorf("expected count=2, got %v", result.Output["count"]) + } + + results := result.Output["results"].([]any) + first := results[0].(map[string]any) + if first["captured"] != "alpha" { + t.Errorf("expected captured='alpha', got %v", first["captured"]) + } +} + +func TestForEachStep_ForeachIndexInContext(t *testing.T) { + // Each iteration should expose {{.foreach.index}} in the child context. + step, err := buildTestForEachStep(t, "foreach-index", map[string]any{ + "collection": "items", + "steps": []any{ + map[string]any{ + "type": "step.set", + "name": "capture", + "values": map[string]any{ + "idx": "{{.foreach.index}}", + }, + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "items": []any{"a", "b", "c"}, + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + results := result.Output["results"].([]any) + if len(results) != 3 { + t.Fatalf("expected 3 results, got %d", len(results)) + } + + // Verify indexes are 0, 1, 2 + for i, r := range results { + m := r.(map[string]any) + want := fmt.Sprintf("%d", i) + if m["idx"] != want { + t.Errorf("iteration %d: expected idx=%q, got %v", i, want, m["idx"]) + } + } +} + +func TestForEachStep_SingleStep(t *testing.T) { + // The "step" (singular) config key should work as an alternative to "steps". + step, err := buildTestForEachStep(t, "foreach-single-step", map[string]any{ + "collection": "names", + "item_var": "name", + "step": map[string]any{ + "type": "step.set", + "name": "process", + "values": map[string]any{ + "processed": "{{.name}}", + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "names": []any{"foo", "bar"}, + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["count"] != 2 { + t.Errorf("expected count=2, got %v", result.Output["count"]) + } + + results := result.Output["results"].([]any) + first := results[0].(map[string]any) + if first["processed"] != "foo" { + t.Errorf("expected processed='foo', got %v", first["processed"]) + } + second := results[1].(map[string]any) + if second["processed"] != "bar" { + t.Errorf("expected processed='bar', got %v", second["processed"]) + } +} + +func TestForEachStep_SingleStep_InvalidType(t *testing.T) { + _, err := buildTestForEachStep(t, "foreach-single-bad", map[string]any{ + "collection": "items", + "step": map[string]any{ + "type": "step.nonexistent", + }, + }) + if err == nil { + t.Fatal("expected error for unknown step type in 'step' config") + } +} + +func TestForEachStep_SingleStep_MissingType(t *testing.T) { + _, err := buildTestForEachStep(t, "foreach-single-notype", map[string]any{ + "collection": "items", + "step": map[string]any{ + "name": "no-type-here", + }, + }) + if err == nil { + t.Fatal("expected error when 'step' config has no 'type'") + } +} + +func TestForEachStep_StepAndStepsMutuallyExclusive(t *testing.T) { + _, err := buildTestForEachStep(t, "foreach-both", map[string]any{ + "collection": "items", + "step": map[string]any{ + "type": "step.set", + "name": "s", + "values": map[string]any{"x": "1"}, + }, + "steps": []any{ + map[string]any{"type": "step.set", "name": "s2", "values": map[string]any{"y": "2"}}, + }, + }) + if err == nil { + t.Fatal("expected error when both 'step' and 'steps' are provided") + } +} + +func TestForEachStep_StepWrongType(t *testing.T) { + _, err := buildTestForEachStep(t, "foreach-step-wrong-type", map[string]any{ + "collection": "items", + "step": "not-a-map", + }) + if err == nil { + t.Fatal("expected error when 'step' is not a map") + } +} + +func TestForEachStep_StepsWrongType(t *testing.T) { + _, err := buildTestForEachStep(t, "foreach-steps-wrong-type", map[string]any{ + "collection": "items", + "steps": "not-a-list", + }) + if err == nil { + t.Fatal("expected error when 'steps' is not a list") + } +} + +func TestForEachStep_AppPassedToSubStep(t *testing.T) { + // Verifies that the modular.Application passed to the StepFactory is threaded + // through to sub-step factories, not silently dropped. + var capturedApp modular.Application + registry := NewStepRegistry() + registry.Register("step.capture_app", func(name string, _ map[string]any, app modular.Application) (PipelineStep, error) { + capturedApp = app + return &mockStep{ + name: name, + execFn: func(_ context.Context, _ *PipelineContext) (*StepResult, error) { + return &StepResult{Output: map[string]any{}}, nil + }, + }, nil + }) + + sentinel := &struct{ modular.Application }{} + factory := NewForEachStepFactory(func() *StepRegistry { return registry }) + _, err := factory("foreach-app-test", map[string]any{ + "collection": "items", + "step": map[string]any{ + "type": "step.capture_app", + }, + }, sentinel) + if err != nil { + t.Fatalf("factory error: %v", err) + } + if capturedApp != sentinel { + t.Errorf("expected app to be passed through to sub-step factory; got %v", capturedApp) + } +} + +func TestForEachStep_ForeachMapNotSetWhenConflict(t *testing.T) { + // When item_var is "foreach", the "foreach" context key must NOT be overwritten. + step, err := buildTestForEachStep(t, "foreach-conflict", map[string]any{ + "collection": "items", + "item_var": "foreach", // would collide with the foreach map + "steps": []any{ + map[string]any{ + "type": "step.set", + "name": "capture", + "values": map[string]any{ + "got": "{{.foreach}}", + }, + }, + }, + }) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "items": []any{"val"}, + }, nil) + + // Should not panic or error; the "foreach" key holds the item value, not the map. + _, execErr := step.Execute(context.Background(), pc) + if execErr != nil { + t.Fatalf("execute error: %v", execErr) + } +} diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index b8ae1409..fe739edd 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -124,7 +124,7 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { // including types registered by other plugins loaded after this one. "step.foreach": wrapStepFactory(module.NewForEachStepFactory(func() *module.StepRegistry { return p.concreteStepRegistry - }, nil)), + })), "step.webhook_verify": wrapStepFactory(module.NewWebhookVerifyStepFactory()), "step.cache_get": wrapStepFactory(module.NewCacheGetStepFactory()), "step.cache_set": wrapStepFactory(module.NewCacheSetStepFactory()),