feat: add step.foreach and step.webhook_verify pipeline steps#158
feat: add step.foreach and step.webhook_verify pipeline steps#158
Conversation
step.foreach iterates over a collection (resolved from pipeline context
by key or dot-path like "steps.fetch.rows"), executes inline sub-steps
for each item, and returns {results, count}. Sub-steps share a child
context with item_key (default "item") and index_key (default "index")
injected. An error in any sub-step stops iteration immediately.
step.webhook_verify validates HMAC-SHA256 signatures for incoming
webhooks before the pipeline continues. Supports:
- GitHub: X-Hub-Signature-256 (sha256=<hex>)
- Stripe: Stripe-Signature (t=<ts>,v1=<hex>) with 5-min timestamp window
- Generic: configurable header (default X-Signature, raw hex)
On failure writes HTTP 401 and returns StepResult{Stop: true}.
On success returns {verified: true}.
The plugin stores the concrete *module.StepRegistry so step.foreach can
create sub-steps of any registered type at pipeline-configuration time.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds two new generic pipeline steps to the workflow engine’s pipeline-steps plugin: an iterator (step.foreach) for running inline sub-steps over a collection, and an HMAC-based request authenticator (step.webhook_verify) for validating incoming webhook signatures before continuing a pipeline.
Changes:
- Register new step types
step.foreachandstep.webhook_verifyin thepipeline-stepsplugin and update plugin tests accordingly. - Implement
step.foreachwith collection resolution (simple key, trigger data, and dot-path throughsteps.*outputs) and inline sub-step construction. - Implement
step.webhook_verifyfor GitHub/Stripe/generic signature verification with tests.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| plugins/pipelinesteps/plugin.go | Registers the two new step factories and stores a concrete step registry for foreach. |
| plugins/pipelinesteps/plugin_test.go | Updates expected step types and factory count assertions. |
| module/pipeline_step_foreach.go | Implements step.foreach factory + execution and collection/path helpers. |
| module/pipeline_step_foreach_test.go | Adds unit tests for step.foreach. |
| module/pipeline_step_webhook_verify.go | Implements step.webhook_verify factory + verification logic and request-body caching. |
| module/pipeline_step_webhook_verify_test.go | Adds unit tests for GitHub/Stripe/generic verification behavior. |
| for _, step := range s.subSteps { | ||
| result, execErr := step.Execute(ctx, childPC) | ||
| if execErr != nil { | ||
| return nil, fmt.Errorf("foreach step %q: iteration %d, sub-step %q failed: %w", | ||
| s.name, i, step.Name(), execErr) | ||
| } | ||
| if result != nil && result.Output != nil { | ||
| childPC.MergeStepOutput(step.Name(), result.Output) | ||
| maps.Copy(iterResult, result.Output) | ||
| } | ||
| if result != nil && result.Stop { | ||
| break | ||
| } | ||
| } | ||
| collected = append(collected, iterResult) |
There was a problem hiding this comment.
If any sub-step returns Stop: true (e.g. step.json_response/step.delegate), step.foreach currently only breaks the inner sub-step loop and then continues iterating remaining items, ultimately returning Stop: false. This prevents sub-steps from stopping the pipeline as intended. Propagate the stop signal by returning Stop: true from step.foreach (and likely stop iterating) when a sub-step sets it.
| // including types registered by other plugins loaded after this one. | ||
| "step.foreach": wrapStepFactory(module.NewForEachStepFactory(func() *module.StepRegistry { | ||
| return p.concreteStepRegistry | ||
| }, nil)), |
There was a problem hiding this comment.
step.foreach is registered by calling module.NewForEachStepFactory(..., nil), so sub-steps will be built with a nil modular.Application and steps that depend on the app service registry (db_exec/db_query/delegate/etc.) will break. After adjusting NewForEachStepFactory to use the app provided to the returned StepFactory, update this registration to stop passing nil (and consider returning a clear error if p.concreteStepRegistry is still nil when the factory is invoked).
| }, nil)), | |
| })), |
| if w, ok := pc.Metadata["_http_response_writer"].(http.ResponseWriter); ok { | ||
| w.Header().Set("Content-Type", "application/json") | ||
| w.WriteHeader(http.StatusUnauthorized) | ||
| _, _ = w.Write([]byte(`{"error":"unauthorized","reason":"webhook signature verification failed"}`)) |
There was a problem hiding this comment.
When unauthorized writes a 401 response to _http_response_writer, it should also set pc.Metadata["_response_handled"] = true. The HTTP handlers use this flag to decide whether to write a fallback JSON response; without it, they can append an additional response body after the 401, potentially leaking data.
| _, _ = w.Write([]byte(`{"error":"unauthorized","reason":"webhook signature verification failed"}`)) | |
| _, _ = w.Write([]byte(`{"error":"unauthorized","reason":"webhook signature verification failed"}`)) | |
| pc.Metadata["_response_handled"] = true |
| body, err := io.ReadAll(req.Body) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // Cache it for other steps that may need the raw body | ||
| pc.Metadata["_raw_body"] = body | ||
|
|
||
| return body, nil |
There was a problem hiding this comment.
readBody consumes req.Body via io.ReadAll but never restores it. Downstream steps like step.request_parse and step.validate_request_body read from req.Body directly, so after step.webhook_verify runs they will see an empty body. After reading, reset req.Body to a fresh reader (e.g., io.NopCloser(bytes.NewReader(body))) in addition to caching _raw_body.
| // Compile-time check: ensure the step_fail factory signature matches StepFactory. | ||
| // This avoids having an unused import of fmt. | ||
| var _ = fmt.Sprintf |
There was a problem hiding this comment.
The fmt import is only kept alive via var _ = fmt.Sprintf. It would be cleaner to remove the unused import and this dummy assignment (or use fmt for real assertions/logging if needed).
| func NewForEachStepFactory(registryFn func() *StepRegistry, app modular.Application) StepFactory { | ||
| return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { | ||
| registry := registryFn() | ||
|
|
||
| collection, _ := config["collection"].(string) | ||
| if collection == "" { | ||
| return nil, fmt.Errorf("foreach step %q: 'collection' is required", name) | ||
| } | ||
|
|
||
| itemKey, _ := config["item_key"].(string) | ||
| if itemKey == "" { | ||
| itemKey = "item" | ||
| } | ||
|
|
||
| indexKey, _ := config["index_key"].(string) | ||
| if indexKey == "" { | ||
| indexKey = "index" | ||
| } | ||
|
|
||
| // Build sub-steps from inline config | ||
| stepsRaw, _ := config["steps"].([]any) | ||
| subSteps := make([]PipelineStep, 0, len(stepsRaw)) | ||
| for i, raw := range stepsRaw { | ||
| stepCfg, ok := raw.(map[string]any) | ||
| if !ok { | ||
| return nil, fmt.Errorf("foreach step %q: steps[%d] must be a map", name, i) | ||
| } | ||
|
|
||
| stepType, _ := stepCfg["type"].(string) | ||
| if stepType == "" { | ||
| return nil, fmt.Errorf("foreach step %q: steps[%d] missing 'type'", name, i) | ||
| } | ||
|
|
||
| stepName, _ := stepCfg["name"].(string) | ||
| if stepName == "" { | ||
| stepName = fmt.Sprintf("%s-sub-%d", name, i) | ||
| } | ||
|
|
||
| // Build the step config without meta fields | ||
| subCfg := make(map[string]any) | ||
| for k, v := range stepCfg { | ||
| if k != "type" && k != "name" { | ||
| subCfg[k] = v | ||
| } | ||
| } | ||
|
|
||
| step, err := registry.Create(stepType, stepName, subCfg, app) | ||
| if err != nil { |
There was a problem hiding this comment.
NewForEachStepFactory captures a modular.Application argument but the returned StepFactory ignores its own app parameter (it uses the captured app in registry.Create). In engine usage the app is only available at step-factory invocation time, so sub-steps that require app (e.g. delegate/db_*) will be created with a nil app and later fail. Use the app parameter passed to the returned StepFactory (and drop the captured app argument) when calling registry.Create.
Summary
step.foreach: Iterates over a collection (resolved from pipeline context by key or dot-path likesteps.fetch.rows), executes inline sub-steps for each item, and returns{results: [...], count: N}. Sub-steps share a child context with configurableitem_key(default"item") andindex_key(default"index"). An error in any sub-step stops iteration immediately.step.webhook_verify: Validates HMAC-SHA256 signatures on incoming webhook requests before the pipeline continues. Supports three providers:github:X-Hub-Signature-256header (sha256=<hex>)stripe:Stripe-Signatureheader (t=<timestamp>,v1=<hex>) with 5-minute timestamp window enforcementgeneric: Configurable header (defaultX-Signature, raw hex)StepResult{Stop: true}$MY_SECRET/${MY_SECRET})_raw_bodymetadata to avoid double-readsImplementation notes
step.foreachuses a lazy registry getter (func() *StepRegistry) so its factory is registered before all step types are loaded, allowing sub-steps to reference any step type registered by any plugin*module.StepRegistry(via type assertion inSetStepRegistry) sostep.foreachcan callCreate()at pipeline-configuration timeforeachto avoid collision with existingwalkPath/splitDotPathinpipeline_step_sub_workflow.goTest plan
TestForEachStep_IteratesOverSliceOfMaps— iterates, passes item/index to templatesTestForEachStep_EmptyCollection— returns{results: [], count: 0}TestForEachStep_DefaultItemAndIndexKeys— defaults to"item"/"index"TestForEachStep_SubStepErrorStopsIteration— stops after first sub-step errorTestForEachStep_FactoryRejectsMissingCollection— validation errorTestForEachStep_FactoryRejectsInvalidSubStepType— unknown type errorTestForEachStep_IteratesWithStepOutputAccess— chained sub-steps share contextTestForEachStep_CollectionFromStepOutputs— dot-path"steps.fetch.rows"resolutionTestWebhookVerifyStep_ValidGitHub— HMAC matchesTestWebhookVerifyStep_InvalidGitHub— wrong signature → Stop + 401TestWebhookVerifyStep_MissingGitHubHeader— missing header → StopTestWebhookVerifyStep_ValidStripe— current timestamp passesTestWebhookVerifyStep_StripeExpiredTimestamp— 10 min old → StopTestWebhookVerifyStep_InvalidStripeSignature— wrong secret → Stop + 401TestWebhookVerifyStep_ValidGeneric— custom headerTestWebhookVerifyStep_GenericDefaultHeader— defaults toX-SignatureTestWebhookVerifyStep_MissingGenericHeader— missing → StopTestWebhookVerifyStep_NoHTTPRequest— no request in context → StopTestWebhookVerifyStep_FactoryRejects*— validation errorsTestWebhookVerifyStep_RawBodyCachedInMetadata— uses pre-read body from metadatago test ./...suite passes🤖 Generated with Claude Code