Skip to content

Commit 3d2eb47

Browse files
intel352claudeCopilot
authored
feat: fan-out/fan-in/map-reduce pipeline capabilities (#274)
* docs: fan-out/fan-in/map-reduce design document * docs: fan-out/fan-in implementation plan * chore: add .worktrees to gitignore for worktree isolation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: add concurrent execution to step.foreach via concurrency config Adds optional worker-pool concurrency to ForEachStep via two new config keys: - concurrency: int (0 = sequential, default; N = N-worker pool) - error_strategy: "fail_fast" (default) or "collect_errors" When concurrency > 0, executeConcurrent() spawns goroutines bounded by a channel semaphore. Results are pre-allocated by index to preserve order. Each goroutine deep-copies the PipelineContext via buildChildContext() for isolation. Backward compatible: concurrency=0 or omitted uses existing sequential path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat: add step.parallel for concurrent fixed-branch fan-out * feat: register step.parallel in plugin, add schemas and inference * feat: add collection template functions (sum, pluck, flatten, unique, groupBy, sortBy, first, last, min, max) Adds 10 collection functions to the pipeline template engine for inline aggregation of slice data. All functions accept []any (or any slice type via reflect) and are composable via Go template pipelines. Helper functions added before templateFuncMap(): - toAnySlice: reflect-based conversion for any slice type - extractField: pulls a named key from map elements (for key-based variants) Functions with optional KEY parameter (sum, min, max, unique) operate on scalar slices or map-field extraction. sortBy and groupBy operate on slices of maps. first/last return single elements (nil renders as <no value>). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs: add collection functions to LSP template completions Registers the 10 new collection template functions (sum, pluck, flatten, unique, groupBy, sortBy, first, last, min, max) in lsp/registry.go so they appear in template expression completions. DOCUMENTATION.md and step table entries were already updated by Task 4. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs: add step.parallel, concurrent foreach, and collection functions to docs and LSP * docs: add parallel fan-out example config Demonstrates all three new capabilities in a single runnable example: 1. step.parallel with collect_errors — parallel API aggregation fan-out 2. step.foreach with concurrency: 5 and error_strategy: collect_errors — concurrent batch processing 3. Collection template functions (sum, max, min, unique, groupBy, pluck) — map/reduce stats endpoint Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: replace list/dict template calls with inline YAML in parallel example * fix: correct step.foreach schema output key from iterations to count * fix: correct example config structure (workflows map, handler refs) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: convert if-else chain to switch statement (gocritic) Rewrite the error handling loop in pipeline_step_foreach.go to use a switch statement instead of if-else-if-else chain, as recommended by gocritic linter. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address Copilot review comments on PR #274 - Fix DefaultValue type: "0" (string) → 0 (numeric) for concurrency field - Add step.parallel to coreModuleTypes for JSON Schema validation - Add step.parallel module schema for UI registry - Fix plugin identifier: "pipeline-steps" → "pipelinesteps" for consistency - Validate error_strategy in foreach factory (reject invalid values) - Fix example: unique returns maps, use pluck first for scalar list Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address fan-out/fan-in review feedback — precision, deep copy, string sort, context cancellation, linting (#276) * Initial plan * fix: address all PR #274 review comments - precision, deep copy, string sort, context cancellation Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * fix: resolve all 61 gosec linter issues via documented exclusions in .golangci.yml Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent 34f06fc commit 3d2eb47

21 files changed

Lines changed: 3996 additions & 40 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ module/ui_dist/*
5050
!module/ui_dist/.gitkeep
5151
*.db-shm
5252
*.db-wal
53+
.worktrees

.golangci.yml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ linters:
2727
- G104 # Duplicates errcheck
2828
- G304 # File inclusion from variable - expected for config/dynamic loading
2929
- G704 # SSRF via taint analysis - URLs from admin config are intentional
30-
- G118 # Goroutine context: background goroutines are intentional for long-lived tasks
31-
- G120 # Form data size limit: internal admin APIs with trusted callers
32-
- G122 # Walk/WalkDir race: application-controlled paths, not attacker-influenced
33-
- G701 # SQL injection taint: all dynamic queries use parameterized args
34-
- G703 # Path traversal taint: paths are application-constructed, not raw user input
35-
- G705 # XSS taint: response bodies are intentionally written from resolved templates
36-
- G706 # Log injection taint: log messages are from trusted application context
30+
- G118 # Context cancel/goroutine patterns - cancel is returned to caller or used in signal handler; background goroutines are intentional
31+
- G120 # Form data size - ParseMultipartForm already sets explicit limits; FormValue after that is already bounded
32+
- G122 # WalkDir/Walk TOCTOU - OS-provided walk paths; acceptable for CI/CD pipeline tooling
33+
- G701 # SQL injection via taint - queries are admin-configured workflow step definitions, not user input
34+
- G703 # Path traversal via taint - paths come from admin config or are internal CI/CD pipeline paths, not user input
35+
- G705 # XSS via taint - raw-response/static-file steps write admin-configured content by design
36+
- G706 # Log injection via taint - step.log and publish steps are designed to log arbitrary admin-configured content
3737
gocritic:
3838
enabled-tags:
3939
- diagnostic

DOCUMENTATION.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ flowchart TD
132132
| `step.validate_path_param` | Validates a URL path parameter against a set of rules |
133133
| `step.validate_pagination` | Validates and normalizes pagination query params |
134134
| `step.validate_request_body` | Validates request body against a JSON schema |
135-
| `step.foreach` | Iterates over a slice and runs a sub-pipeline per element |
135+
| `step.foreach` | Iterates over a slice and runs sub-steps per element. Optional `concurrency: N` for parallel processing |
136+
| `step.parallel` | Executes named sub-steps concurrently and collects results. O(max(branch)) time |
136137
| `step.webhook_verify` | Verifies an inbound webhook signature |
137138
| `step.base64_decode` | Decodes a base64-encoded field |
138139
| `step.cache_get` | Reads a value from the cache module |
@@ -221,6 +222,21 @@ Pipeline steps support Go template syntax with these built-in functions:
221222
| `length` | `length VALUE` | Length of string, slice, array, or map |
222223
| `coalesce` | `coalesce VAL1 VAL2 ...` | First non-nil, non-empty value |
223224

225+
#### Collection Functions
226+
227+
| Function | Signature | Complexity | Description |
228+
|----------|-----------|-----------|-------------|
229+
| `sum` | `sum SLICE [KEY]` | O(n) | Sum numeric values. Optional KEY for maps |
230+
| `pluck` | `pluck SLICE KEY` | O(n) | Extract one field from each map |
231+
| `flatten` | `flatten SLICE` | O(n×m) | Flatten one level of nested slices |
232+
| `unique` | `unique SLICE [KEY]` | O(n) | Deduplicate, preserving insertion order |
233+
| `groupBy` | `groupBy SLICE KEY` | O(n) | Group maps by key → `map[string][]any` |
234+
| `sortBy` | `sortBy SLICE KEY` | O(n log n) | Stable sort ascending by key |
235+
| `first` | `first SLICE` | O(1) | First element, nil if empty |
236+
| `last` | `last SLICE` | O(1) | Last element, nil if empty |
237+
| `min` | `min SLICE [KEY]` | O(n) | Minimum numeric value |
238+
| `max` | `max SLICE [KEY]` | O(n) | Maximum numeric value |
239+
224240
#### Context (added per-pipeline by the engine)
225241

226242
| Function | Signature | Description |
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
# Fan-Out / Fan-In / Map-Reduce Design
2+
3+
## Problem
4+
5+
The workflow engine executes pipeline steps strictly sequentially. There is no way to:
6+
7+
- Run independent API calls or DB queries concurrently (e.g., fetch user + orders + inventory in parallel)
8+
- Process large collections with bounded concurrency (e.g., send 1000 notifications with 10 workers)
9+
- Aggregate collection data inline (sum, group, deduplicate) without writing a full `step.jq` expression
10+
11+
## Decision
12+
13+
**Approach A — step.parallel + enhanced step.foreach + collection template functions.**
14+
15+
Concurrency is opt-in at the step level. The pipeline executor stays sequential. Individual steps internally spawn goroutines using deep-copied PipelineContext instances, eliminating shared mutable state.
16+
17+
This is a non-breaking, additive change. Existing pipelines behave identically.
18+
19+
## Components
20+
21+
### 1. `step.parallel` — Fixed-Branch Fan-Out
22+
23+
Execute N named sub-steps concurrently, collect all results.
24+
25+
**Config:**
26+
27+
```yaml
28+
- type: step.parallel
29+
name: fetch-all
30+
config:
31+
error_strategy: fail_fast # fail_fast | collect_errors (default: fail_fast)
32+
steps:
33+
- name: users
34+
type: step.http_call
35+
config: { url: "https://api/users/{{ .id }}", method: GET }
36+
- name: orders
37+
type: step.db_query
38+
config: { query: "SELECT * FROM orders WHERE user_id = $1", args: ["{{ .id }}"] }
39+
- name: inventory
40+
type: step.http_call
41+
config: { url: "https://api/inventory", method: GET }
42+
```
43+
44+
**Output:**
45+
46+
```json
47+
{
48+
"results": {
49+
"users": { "body": {...}, "status": 200 },
50+
"orders": { "rows": [...], "count": 5 },
51+
"inventory": { "body": {...}, "status": 200 }
52+
},
53+
"errors": {},
54+
"completed": 3,
55+
"failed": 0
56+
}
57+
```
58+
59+
**Access pattern:**
60+
61+
```yaml
62+
{{ index .steps "fetch-all" "results" "users" "body" }}
63+
{{ index .steps "fetch-all" "results" "orders" "rows" }}
64+
```
65+
66+
**Error strategies:**
67+
68+
- `fail_fast` (default): Cancel derived context on first error. `errors` map contains the first failure. Step returns error.
69+
- `collect_errors`: Let all branches finish. Failed branches go into `errors` map, successful into `results`. Step returns error only if ALL branches fail.
70+
71+
**Implementation:**
72+
73+
- Each branch gets a deep copy of PipelineContext (same pattern as `ForEachStep.buildChildContext`)
74+
- Goroutines write to pre-allocated result slots indexed by branch name — no shared mutable state
75+
- Uses `sync.WaitGroup` for coordination, `context.WithCancel` for fail-fast
76+
- Parent step merges all branch results into its output after WaitGroup completes
77+
- Reuses `buildSubStep()` from `pipeline_step_resilience.go` for sub-step construction
78+
- Uses lazy registry function pattern (same as foreach, retry, circuit breaker)
79+
80+
**Complexity:**
81+
82+
| Metric | Complexity |
83+
|--------|-----------|
84+
| Time | O(max(branch_duration)) — wall clock bounded by slowest branch |
85+
| Space | O(branches × context_size) — deep copy of PipelineContext per branch |
86+
87+
### 2. Enhanced `step.foreach` — Concurrent Collection Processing
88+
89+
Add optional `concurrency` and `error_strategy` fields. When `concurrency` is set, items are processed by a bounded worker pool.
90+
91+
**Config:**
92+
93+
```yaml
94+
- type: step.foreach
95+
name: send-notifications
96+
config:
97+
collection: users
98+
item_var: user
99+
concurrency: 10 # 0 or absent = sequential (backward compatible)
100+
error_strategy: fail_fast # fail_fast | collect_errors (default: fail_fast)
101+
step:
102+
type: step.http_call
103+
config:
104+
url: "https://notify.example.com/send"
105+
method: POST
106+
body: '{"email": "{{ .user.email }}"}'
107+
```
108+
109+
**Behavior:**
110+
111+
- `concurrency: 0` or absent → existing sequential behavior (100% backward compatible)
112+
- `concurrency: N` → semaphore-based worker pool with N goroutines
113+
- Each worker gets a child PipelineContext copy (existing `buildChildContext` pattern)
114+
- Results collected in original order (slot-indexed, not arrival order) to maintain determinism
115+
- `error_strategy: fail_fast` → cancel context on first error
116+
- `error_strategy: collect_errors` → continue, mark failed items with `_error` key
117+
118+
**Output (unchanged format, new optional field):**
119+
120+
```json
121+
{
122+
"results": [
123+
{ "status": 200 },
124+
{ "status": 200 },
125+
{ "_error": "timeout", "_index": 2 }
126+
],
127+
"count": 3,
128+
"error_count": 1
129+
}
130+
```
131+
132+
**Implementation:**
133+
134+
- Semaphore channel (`make(chan struct{}, concurrency)`) controls worker count
135+
- Pre-allocated `results []any` slice indexed by item position preserves order
136+
- `sync.WaitGroup` for completion, `context.WithCancel` for fail-fast
137+
- `error_count` field added to output when `error_strategy: collect_errors`
138+
139+
**Complexity:**
140+
141+
| Metric | Complexity |
142+
|--------|-----------|
143+
| Time (sequential) | O(n × per_item) |
144+
| Time (concurrent) | O(⌈n/c⌉ × per_item) where c = concurrency |
145+
| Space (sequential) | O(context_size) — reuses single child context |
146+
| Space (concurrent) | O(c × context_size) — one deep copy per active worker |
147+
148+
### 3. Collection Template Functions
149+
150+
New template functions for inline aggregation and transformation of slices:
151+
152+
| Function | Signature | Complexity | Description |
153+
|----------|-----------|-----------|-------------|
154+
| `sum` | `sum SLICE [KEY]` | O(n) | Sum numeric values. Optional KEY for maps. |
155+
| `pluck` | `pluck SLICE KEY` | O(n) | Extract one field from each map in slice. |
156+
| `flatten` | `flatten SLICE` | O(n×m) | Flatten one level of nested slices. n=outer, m=avg inner. |
157+
| `unique` | `unique SLICE [KEY]` | O(n) | Deduplicate. Hash-map based, preserves insertion order. |
158+
| `groupBy` | `groupBy SLICE KEY` | O(n) | Group maps by key value → `map[string][]any`. |
159+
| `sortBy` | `sortBy SLICE KEY` | O(n log n) | Stable sort ascending by key. Uses `sort.SliceStable`. |
160+
| `first` | `first SLICE` | O(1) | First element, nil if empty. |
161+
| `last` | `last SLICE` | O(1) | Last element, nil if empty. |
162+
| `min` | `min SLICE [KEY]` | O(n) | Minimum numeric value. |
163+
| `max` | `max SLICE [KEY]` | O(n) | Maximum numeric value. |
164+
165+
All functions accept `[]any` and `[]map[string]any`. The optional `KEY` parameter extracts a map field for numeric operations. For simple scalar slices (e.g., `[1,2,3]`), `sum`/`min`/`max` work without a key.
166+
167+
**Examples:**
168+
169+
```yaml
170+
# Sum all amounts
171+
total: "{{ sum .steps.fetch-sales.rows \"amount\" }}"
172+
173+
# Group by region
174+
by_region: "{{ json (groupBy .steps.fetch-sales.rows \"region\") }}"
175+
176+
# Get unique tags
177+
tags: "{{ json (unique .steps.fetch-items.results \"category\") }}"
178+
179+
# Extract names
180+
names: "{{ json (pluck .steps.fetch-users.results \"name\") }}"
181+
182+
# Top sale amount
183+
top: "{{ max .steps.fetch-sales.rows \"amount\" }}"
184+
```
185+
186+
## Scenarios
187+
188+
### Scenario 1 — API Gateway Aggregation
189+
190+
Fetch user profile from 3 microservices in parallel, merge into single response.
191+
192+
```yaml
193+
steps:
194+
- type: step.request_parse
195+
name: parse
196+
config: { path_params: [id] }
197+
- type: step.parallel
198+
name: aggregate
199+
config:
200+
error_strategy: collect_errors
201+
steps:
202+
- name: profile
203+
type: step.http_call
204+
config: { url: "https://users/{{ .path_params.id }}" }
205+
- name: orders
206+
type: step.http_call
207+
config: { url: "https://orders?user={{ .path_params.id }}" }
208+
- name: recommendations
209+
type: step.http_call
210+
config: { url: "https://recs/{{ .path_params.id }}" }
211+
- type: step.json_response
212+
name: respond
213+
config:
214+
status_code: 200
215+
body: '{{ json .steps.aggregate.results }}'
216+
```
217+
218+
### Scenario 2 — Batch Webhook Processing
219+
220+
Process incoming webhook with array of events using 20 concurrent workers.
221+
222+
```yaml
223+
steps:
224+
- type: step.request_parse
225+
name: parse
226+
config: { parse_body: true }
227+
- type: step.foreach
228+
name: process-events
229+
config:
230+
collection: body.events
231+
item_var: event
232+
concurrency: 20
233+
error_strategy: collect_errors
234+
step:
235+
type: step.http_call
236+
config:
237+
url: "https://internal/process"
238+
method: POST
239+
body: '{{ json .event }}'
240+
- type: step.set
241+
name: summary
242+
config:
243+
values:
244+
total: "{{ .steps.process-events.count }}"
245+
errors: "{{ .steps.process-events.error_count }}"
246+
```
247+
248+
### Scenario 3 — Map/Reduce Sales Report
249+
250+
Query sales data, aggregate with template functions.
251+
252+
```yaml
253+
steps:
254+
- type: step.db_query
255+
name: fetch-sales
256+
config:
257+
query: "SELECT region, amount FROM sales WHERE date >= $1"
258+
args: ["{{ .start_date }}"]
259+
mode: list
260+
- type: step.set
261+
name: report
262+
config:
263+
values:
264+
total: '{{ sum .steps.fetch-sales.rows "amount" }}'
265+
by_region: '{{ json (groupBy .steps.fetch-sales.rows "region") }}'
266+
top_sale: '{{ max .steps.fetch-sales.rows "amount" }}'
267+
regions: '{{ json (unique .steps.fetch-sales.rows "region") }}'
268+
```
269+
270+
### Scenario 4 — Scatter/Gather Validation
271+
272+
Run fraud, inventory, and credit checks in parallel; route based on results.
273+
274+
```yaml
275+
steps:
276+
- type: step.request_parse
277+
name: parse
278+
config: { parse_body: true, path_params: [id] }
279+
- type: step.parallel
280+
name: checks
281+
config:
282+
error_strategy: fail_fast
283+
steps:
284+
- name: inventory
285+
type: step.http_call
286+
config: { url: "https://inventory/check/{{ .body.product_id }}" }
287+
- name: fraud
288+
type: step.http_call
289+
config: { url: "https://fraud/score/{{ .body.user_id }}" }
290+
- name: credit
291+
type: step.http_call
292+
config: { url: "https://credit/verify/{{ .body.user_id }}" }
293+
- type: step.conditional
294+
name: route
295+
config:
296+
field: steps.checks.results.fraud.risk_level
297+
routes:
298+
high: reject-order
299+
low: fulfill-order
300+
default: manual-review
301+
```
302+
303+
## Non-Goals
304+
305+
- DAG executor / `depends_on` — may be added in a future version if use cases demand it
306+
- Nested parallelism limits (step.parallel inside step.parallel) — allowed but users should use judgment
307+
- Distributed fan-out across nodes — out of scope, single-process concurrency only
308+
309+
## Files Changed
310+
311+
| Action | File |
312+
|--------|------|
313+
| Create | `module/pipeline_step_parallel.go` |
314+
| Create | `module/pipeline_step_parallel_test.go` |
315+
| Modify | `module/pipeline_step_foreach.go` |
316+
| Modify | `module/pipeline_step_foreach_test.go` |
317+
| Modify | `module/pipeline_template.go` |
318+
| Modify | `module/pipeline_template_test.go` |
319+
| Modify | `plugins/pipelinesteps/plugin.go` |
320+
| Modify | `schema/step_schema.go` |
321+
| Modify | `schema/step_inference.go` |
322+
| Modify | `DOCUMENTATION.md` |

0 commit comments

Comments
 (0)