Skip to content

Commit ff722d5

Browse files
Copilotintel352
andcommitted
fix: address all PR #274 review comments - precision, deep copy, string sort, context cancellation
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent c6efd5a commit ff722d5

11 files changed

Lines changed: 429 additions & 76 deletions

docs/plans/2026-03-06-fan-out-fan-in.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# Fan-Out / Fan-In / Map-Reduce Implementation Plan
22

3-
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
4-
53
**Goal:** Add parallel execution capabilities (`step.parallel`, concurrent `step.foreach`) and collection template functions (`sum`, `pluck`, `groupBy`, etc.) to the workflow engine.
64

75
**Architecture:** Concurrency is opt-in at the step level — the pipeline executor stays sequential. `step.parallel` spawns goroutines for fixed branches; `step.foreach` gets an optional worker pool. Each goroutine operates on a deep-copied PipelineContext, eliminating shared mutable state. 10 collection template functions are added for inline aggregation.

lsp/document.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,17 @@ const (
7474

7575
// PositionContext describes what context the cursor is in within the document.
7676
type PositionContext struct {
77-
Section SectionKind
78-
ModuleType string // if inside a modules[] item config, the type value
79-
StepType string // if inside a pipeline step config, the step type value
80-
FieldName string // the field name at the cursor
81-
InTemplate bool // cursor is inside {{ }}
82-
DependsOn bool // cursor is in a dependsOn array value
83-
PipelineName string // name of the pipeline containing the cursor (if any)
84-
CurrentStepName string // name of the step containing the cursor (if any)
85-
TemplatePath *TemplateExprPath // parsed template expression at cursor, if InTemplate
86-
Line int
87-
Character int
77+
Section SectionKind
78+
ModuleType string // if inside a modules[] item config, the type value
79+
StepType string // if inside a pipeline step config, the step type value
80+
FieldName string // the field name at the cursor
81+
InTemplate bool // cursor is inside {{ }}
82+
DependsOn bool // cursor is in a dependsOn array value
83+
PipelineName string // name of the pipeline containing the cursor (if any)
84+
CurrentStepName string // name of the step containing the cursor (if any)
85+
TemplatePath *TemplateExprPath // parsed template expression at cursor, if InTemplate
86+
Line int
87+
Character int
8888
}
8989

9090
// ContextAt analyses the document content at the given (zero-based) line and

mcp/tools.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,9 +1236,9 @@ func (s *Server) handleInferPipelineContext(_ context.Context, req mcp.CallToolR
12361236
// Infer outputs for each preceding step.
12371237
stepReg := schema.GetStepSchemaRegistry()
12381238
type stepContext struct {
1239-
Name string `json:"name"`
1240-
Type string `json:"type"`
1241-
Outputs []schema.InferredOutput `json:"outputs"`
1239+
Name string `json:"name"`
1240+
Type string `json:"type"`
1241+
Outputs []schema.InferredOutput `json:"outputs"`
12421242
}
12431243
stepsCtx := make([]stepContext, 0, len(precedingSteps))
12441244
for _, step := range precedingSteps {

module/auth_m2m_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1270,7 +1270,7 @@ func TestM2M_Introspect_AllowOthers_BearerMissingClaim(t *testing.T) {
12701270
m := NewM2MAuthModule("m2m", "this-is-a-valid-secret-32-bytes!", time.Hour, "test-issuer")
12711271
m.RegisterClient(M2MClient{ClientID: "client-a", ClientSecret: "secret-a-long-enough!", Scopes: []string{"read"}}) //nolint:gosec // test credential
12721272
m.RegisterClient(M2MClient{ClientID: "client-b", ClientSecret: "secret-b-long-enough!", Scopes: []string{"read"}}) //nolint:gosec // test credential
1273-
m.SetIntrospectPolicy(true, "", "role", "admin") // role=admin required
1273+
m.SetIntrospectPolicy(true, "", "role", "admin") // role=admin required
12741274

12751275
tokenA := issueTestToken(t, m, "client-a", "secret-a-long-enough!")
12761276
tokenB := issueTestToken(t, m, "client-b", "secret-b-long-enough!")

module/database_partitioned_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,11 @@ func TestIsSupportedPartitionDriver(t *testing.T) {
197197
// testMultiPartitionManager extends testPartitionManager with MultiPartitionManager support.
198198
type testMultiPartitionManager struct {
199199
testPartitionManager
200-
configs []PartitionConfig
201-
ensureForKeyCalledWith []struct{ key, value string }
202-
syncForKeyCalledWith []string
203-
ensureForKeyErr error
204-
syncForKeyErr error
200+
configs []PartitionConfig
201+
ensureForKeyCalledWith []struct{ key, value string }
202+
syncForKeyCalledWith []string
203+
ensureForKeyErr error
204+
syncForKeyErr error
205205
}
206206

207207
func (m *testMultiPartitionManager) PartitionConfigs() []PartitionConfig { return m.configs }
@@ -403,10 +403,10 @@ func TestPartitionedDatabase_PartitionConfigs_DeepCopiesTables(t *testing.T) {
403403
func TestPartitionedDatabase_BackwardCompat_SinglePartition(t *testing.T) {
404404
// Old-style config without Partitions field must behave exactly as before.
405405
cfg := PartitionedDatabaseConfig{
406-
Driver: "pgx",
407-
PartitionKey: "tenant_id",
408-
Tables: []string{"forms", "submissions"},
409-
PartitionType: PartitionTypeList,
406+
Driver: "pgx",
407+
PartitionKey: "tenant_id",
408+
Tables: []string{"forms", "submissions"},
409+
PartitionType: PartitionTypeList,
410410
PartitionNameFormat: "{table}_{tenant}",
411411
}
412412
pd := NewPartitionedDatabase("db", cfg)

module/pipeline_step_foreach.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,25 @@ func (s *ForEachStep) executeConcurrent(ctx context.Context, pc *PipelineContext
197197
var firstErr error
198198
var errOnce sync.Once
199199

200+
outer:
200201
for i, item := range items {
201-
wg.Add(1)
202+
// Stop launching new goroutines if the context is already cancelled
203+
// (e.g., from a previous fail_fast error or external cancellation).
204+
if branchCtx.Err() != nil {
205+
break outer
206+
}
207+
202208
i, item := i, item
203209

204-
sem <- struct{}{} // acquire semaphore slot
210+
// Acquire a semaphore slot, but respect context cancellation so we don't
211+
// block indefinitely when fail_fast has already cancelled the context.
212+
select {
213+
case sem <- struct{}{}: // acquired slot; fall through to launch goroutine
214+
case <-branchCtx.Done():
215+
break outer
216+
}
217+
218+
wg.Add(1)
205219
go func() {
206220
defer wg.Done()
207221
defer func() { <-sem }() // release slot

module/pipeline_step_foreach_test.go

Lines changed: 114 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync/atomic"
78
"testing"
9+
"time"
810

911
"github.com/CrisisTextLine/modular"
1012
)
@@ -19,6 +21,24 @@ func (s *foreachFailStep) Execute(_ context.Context, _ *PipelineContext) (*StepR
1921
return nil, errors.New("step failed")
2022
}
2123

24+
// foreachSlowStep is a PipelineStep that sleeps for a fixed duration then succeeds.
25+
// It is used in timing-based concurrency tests to verify that items actually run
26+
// in parallel rather than sequentially.
27+
type foreachSlowStep struct {
28+
stepName string
29+
delay time.Duration
30+
}
31+
32+
func (s *foreachSlowStep) Name() string { return s.stepName }
33+
func (s *foreachSlowStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) {
34+
select {
35+
case <-time.After(s.delay):
36+
return &StepResult{Output: map[string]any{"done": true}}, nil
37+
case <-ctx.Done():
38+
return nil, ctx.Err()
39+
}
40+
}
41+
2242
// buildTestForEachStep creates a ForEachStep with a fresh StepRegistry for testing.
2343
// It registers a simple "step.set" factory so sub-steps can be built.
2444
func buildTestForEachStep(t *testing.T, name string, config map[string]any) (PipelineStep, error) {
@@ -545,45 +565,58 @@ func TestForEachStep_AppPassedToSubStep(t *testing.T) {
545565
}
546566

547567
func TestForEachStep_ConcurrentExecution(t *testing.T) {
548-
// 5 items each taking 100ms, concurrency=5 — should complete in ~100ms not 500ms
568+
// 5 items each taking 50ms, concurrency=5 — should complete in ~50ms not 250ms.
569+
// We use a custom slow step that sleeps to ensure actual concurrency is tested.
570+
const itemDelay = 50 * time.Millisecond
571+
const numItems = 5
572+
549573
registry := NewStepRegistry()
550-
registry.Register("step.set", NewSetStepFactory())
574+
registry.Register("step.slow", func(name string, cfg map[string]any, app modular.Application) (PipelineStep, error) {
575+
return &foreachSlowStep{stepName: name, delay: itemDelay}, nil
576+
})
551577

552578
factory := NewForEachStepFactory(func() *StepRegistry { return registry })
553579
step, err := factory("par-foreach", map[string]any{
554580
"collection": "items",
555581
"item_var": "item",
556-
"concurrency": 5,
582+
"concurrency": numItems, // full concurrency — all items run in parallel
557583
"step": map[string]any{
558584
"name": "process",
559-
"type": "step.set",
560-
"values": map[string]any{
561-
"processed": "true",
562-
},
585+
"type": "step.slow",
563586
},
564587
}, nil)
565588
if err != nil {
566589
t.Fatal(err)
567590
}
568591

569-
items := make([]any, 5)
592+
items := make([]any, numItems)
570593
for i := range items {
571594
items[i] = map[string]any{"id": i}
572595
}
573596
pc := NewPipelineContext(map[string]any{"items": items}, nil)
574597

598+
start := time.Now()
575599
result, err := step.Execute(context.Background(), pc)
600+
elapsed := time.Since(start)
576601
if err != nil {
577602
t.Fatal(err)
578603
}
579604

580605
results := result.Output["results"].([]any)
581-
if len(results) != 5 {
582-
t.Fatalf("expected 5 results, got %d", len(results))
606+
if len(results) != numItems {
607+
t.Fatalf("expected %d results, got %d", numItems, len(results))
583608
}
584609
count := result.Output["count"]
585-
if count != 5 {
586-
t.Fatalf("expected count=5, got %v", count)
610+
if count != numItems {
611+
t.Fatalf("expected count=%d, got %v", numItems, count)
612+
}
613+
614+
// With full concurrency the wall-clock time should be roughly one item's delay.
615+
// Allow 3× headroom for slow CI environments; reject if it took as long as sequential.
616+
maxExpected := itemDelay * 3
617+
if elapsed > time.Duration(numItems)*itemDelay {
618+
t.Fatalf("concurrent execution took %v; expected <%v (sequential would be %v)",
619+
elapsed, maxExpected, time.Duration(numItems)*itemDelay)
587620
}
588621
}
589622

@@ -706,8 +739,8 @@ func TestForEachStep_ConcurrencyZeroIsSequential(t *testing.T) {
706739
"item_var": "item",
707740
"concurrency": 0,
708741
"step": map[string]any{
709-
"name": "s",
710-
"type": "step.set",
742+
"name": "s",
743+
"type": "step.set",
711744
"values": map[string]any{"ok": "true"},
712745
},
713746
}, nil)
@@ -755,3 +788,70 @@ func TestForEachStep_ForeachMapNotSetWhenConflict(t *testing.T) {
755788
t.Fatalf("execute error: %v", execErr)
756789
}
757790
}
791+
792+
func TestForEachStep_ConcurrentFailFastStopsEarly(t *testing.T) {
793+
// With fail_fast and slow remaining items, context cancellation should prevent
794+
// the producer from launching unnecessary goroutines after the first error.
795+
const itemDelay = 100 * time.Millisecond
796+
registry := NewStepRegistry()
797+
// first item fails immediately; remaining items sleep
798+
callCount := int32(0)
799+
registry.Register("step.slow_or_fail", func(name string, cfg map[string]any, app modular.Application) (PipelineStep, error) {
800+
return &foreachSlowOrFailStep{stepName: name, delay: itemDelay, callCount: &callCount}, nil
801+
})
802+
803+
factory := NewForEachStepFactory(func() *StepRegistry { return registry })
804+
step, err := factory("ff-early-stop", map[string]any{
805+
"collection": "items",
806+
"item_var": "item",
807+
"concurrency": 1, // 1 worker so cancellation stops the queue
808+
"error_strategy": "fail_fast",
809+
"step": map[string]any{
810+
"name": "work",
811+
"type": "step.slow_or_fail",
812+
},
813+
}, nil)
814+
if err != nil {
815+
t.Fatal(err)
816+
}
817+
818+
// 10 items; with concurrency=1 the producer should stop after the first failure.
819+
items := make([]any, 10)
820+
for i := range items {
821+
items[i] = map[string]any{"id": i}
822+
}
823+
pc := NewPipelineContext(map[string]any{"items": items}, nil)
824+
825+
start := time.Now()
826+
_, err = step.Execute(context.Background(), pc)
827+
elapsed := time.Since(start)
828+
if err == nil {
829+
t.Fatal("expected error with fail_fast")
830+
}
831+
// If context cancellation works, we should not execute all 10 slow items.
832+
// Sequential execution of all 10 would take ~1s; early stop should be much faster.
833+
if elapsed >= time.Duration(len(items))*itemDelay {
834+
t.Fatalf("fail_fast did not stop early: took %v (expected less than %v)", elapsed, time.Duration(len(items))*itemDelay)
835+
}
836+
}
837+
838+
// foreachSlowOrFailStep fails on the first call and sleeps on subsequent calls.
839+
type foreachSlowOrFailStep struct {
840+
stepName string
841+
delay time.Duration
842+
callCount *int32
843+
}
844+
845+
func (s *foreachSlowOrFailStep) Name() string { return s.stepName }
846+
func (s *foreachSlowOrFailStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) {
847+
n := atomic.AddInt32(s.callCount, 1)
848+
if n == 1 {
849+
return nil, fmt.Errorf("first item fails immediately")
850+
}
851+
select {
852+
case <-time.After(s.delay):
853+
return &StepResult{Output: map[string]any{"done": true}}, nil
854+
case <-ctx.Done():
855+
return nil, ctx.Err()
856+
}
857+
}

module/pipeline_step_parallel.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,29 +171,54 @@ func (s *ParallelStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR
171171
}, nil
172172
}
173173

174-
// buildParallelChildContext creates a deep copy of the PipelineContext for a parallel branch.
175-
// Each branch gets its own isolated copy so goroutines don't share mutable state.
174+
// deepCopyValue recursively copies maps and slices so that goroutines operating
175+
// on different branches cannot mutate each other's data through shared references.
176+
// Primitive values (bool, string, numbers, nil) are returned as-is since they
177+
// are immutable in Go.
178+
func deepCopyValue(v any) any {
179+
switch val := v.(type) {
180+
case map[string]any:
181+
cp := make(map[string]any, len(val))
182+
for k, v2 := range val {
183+
cp[k] = deepCopyValue(v2)
184+
}
185+
return cp
186+
case []any:
187+
cp := make([]any, len(val))
188+
for i, v2 := range val {
189+
cp[i] = deepCopyValue(v2)
190+
}
191+
return cp
192+
default:
193+
// Primitive values are safe to share.
194+
return v
195+
}
196+
}
197+
198+
// buildParallelChildContext creates a true deep copy of the PipelineContext for a
199+
// parallel branch. Each branch gets its own isolated copy so goroutines cannot
200+
// race on nested maps or slices.
176201
func buildParallelChildContext(parent *PipelineContext) *PipelineContext {
177202
childTrigger := make(map[string]any, len(parent.TriggerData))
178203
for k, v := range parent.TriggerData {
179-
childTrigger[k] = v
204+
childTrigger[k] = deepCopyValue(v)
180205
}
181206

182207
childMeta := make(map[string]any, len(parent.Metadata))
183208
for k, v := range parent.Metadata {
184-
childMeta[k] = v
209+
childMeta[k] = deepCopyValue(v)
185210
}
186211

187212
childCurrent := make(map[string]any, len(parent.Current))
188213
for k, v := range parent.Current {
189-
childCurrent[k] = v
214+
childCurrent[k] = deepCopyValue(v)
190215
}
191216

192217
childOutputs := make(map[string]map[string]any, len(parent.StepOutputs))
193218
for k, v := range parent.StepOutputs {
194219
out := make(map[string]any, len(v))
195220
for k2, v2 := range v {
196-
out[k2] = v2
221+
out[k2] = deepCopyValue(v2)
197222
}
198223
childOutputs[k] = out
199224
}

module/pipeline_step_parallel_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,3 +325,30 @@ func TestParallelStep_DefaultErrorStrategy(t *testing.T) {
325325
t.Fatal("expected error with default fail_fast strategy")
326326
}
327327
}
328+
329+
func TestBuildParallelChildContext_DeepCopy(t *testing.T) {
330+
// Verify that buildParallelChildContext performs a true deep copy so that
331+
// mutations to nested maps/slices in a child context don't affect the parent.
332+
nested := map[string]any{"inner": "original"}
333+
slice := []any{"a", "b"}
334+
parent := &PipelineContext{
335+
TriggerData: map[string]any{"data": nested},
336+
Current: map[string]any{"list": slice},
337+
Metadata: map[string]any{},
338+
StepOutputs: map[string]map[string]any{},
339+
}
340+
341+
child := buildParallelChildContext(parent)
342+
343+
// Mutate the child's nested map — parent should be unaffected.
344+
child.TriggerData["data"].(map[string]any)["inner"] = "mutated"
345+
if parent.TriggerData["data"].(map[string]any)["inner"] != "original" {
346+
t.Fatal("deep copy failed: mutation of child TriggerData nested map affected parent")
347+
}
348+
349+
// Mutate the child's slice — parent should be unaffected.
350+
child.Current["list"].([]any)[0] = "z"
351+
if parent.Current["list"].([]any)[0] != "a" {
352+
t.Fatal("deep copy failed: mutation of child Current slice affected parent")
353+
}
354+
}

0 commit comments

Comments
 (0)