diff --git a/cmd/server/main.go b/cmd/server/main.go index 785b1229..e0cfda48 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -379,12 +379,10 @@ func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) ( return nil, fmt.Errorf("failed to set up admin: %w", err) } - // Build the engine using the application config so BuildFromApplicationConfig - // is called (which handles the pipeline registry for step.workflow_call). - // We re-use buildEngineFromAppConfig but pass the merged+admin config separately - // as the admin modules are now part of combined. Since BuildFromApplicationConfig - // calls MergeApplicationConfig internally, we bypass that by calling - // BuildFromConfig on the already-merged config via buildEngine. + // Build the engine from the already-merged application config (including the + // admin overlay). The merged config is passed directly to buildEngine, which + // internally uses BuildFromConfig and ensures features like the pipeline + // registry for step.workflow_call are configured correctly. engine, loader, registry, err := buildEngine(combined, logger) if err != nil { return nil, fmt.Errorf("failed to build engine: %w", err) diff --git a/config/config.go b/config/config.go index 3ae240e8..7a261306 100644 --- a/config/config.go +++ b/config/config.go @@ -171,7 +171,10 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) } combined := NewEmptyWorkflowConfig() + combined.ConfigDir = appCfg.ConfigDir seenModules := make(map[string]string) + seenTriggers := make(map[string]string) + seenPipelines := make(map[string]string) for _, ref := range appCfg.Application.Workflows { if ref.File == "" { @@ -203,6 +206,21 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) seenModules[modCfg.Name] = wfName } + for k := range wfCfg.Triggers { + if existing, conflict := seenTriggers[k]; conflict { + return nil, fmt.Errorf("application %q: trigger name conflict: trigger %q is defined in both %q and %q", + appCfg.Application.Name, k, existing, wfName) + } + seenTriggers[k] = wfName + } + for k := range wfCfg.Pipelines { + if existing, conflict := seenPipelines[k]; conflict { + return nil, fmt.Errorf("application %q: pipeline name conflict: pipeline %q is defined in both %q and %q", + appCfg.Application.Name, k, existing, wfName) + } + seenPipelines[k] = wfName + } + combined.Modules = append(combined.Modules, wfCfg.Modules...) for k, v := range wfCfg.Workflows { combined.Workflows[k] = v @@ -213,6 +231,8 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) for k, v := range wfCfg.Pipelines { combined.Pipelines[k] = v } + // Fall back to first workflow file's directory if application config + // directory was not set. if combined.ConfigDir == "" { combined.ConfigDir = wfCfg.ConfigDir } diff --git a/engine_multi_config_test.go b/engine_multi_config_test.go index 9e1a3354..0d08e2fa 100644 --- a/engine_multi_config_test.go +++ b/engine_multi_config_test.go @@ -6,6 +6,7 @@ import ( "log/slog" "os" "path/filepath" + "strings" "testing" "github.com/CrisisTextLine/modular" @@ -212,6 +213,99 @@ triggers: {} } } +func TestBuildFromApplicationConfig_TriggerNameConflict(t *testing.T) { + dir := t.TempDir() + + wfA := ` +modules: [] +workflows: {} +triggers: + my-trigger: + type: http +pipelines: {} +` + wfB := ` +modules: [] +workflows: {} +triggers: + my-trigger: + type: schedule +pipelines: {} +` + writeTempYAML(t, dir, "a.yaml", wfA) + writeTempYAML(t, dir, "b.yaml", wfB) + + engine := newTestEngine(t) + err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "trigger-conflict-app", + Workflows: []config.WorkflowRef{ + {File: filepath.Join(dir, "a.yaml")}, + {File: filepath.Join(dir, "b.yaml")}, + }, + }, + ConfigDir: dir, + }) + if err == nil { + t.Fatal("expected error for trigger name conflict") + } + if !strings.Contains(err.Error(), "trigger name conflict") { + t.Fatalf("expected 'trigger name conflict' in error, got: %v", err) + } +} + +func TestBuildFromApplicationConfig_PipelineNameConflict(t *testing.T) { + dir := t.TempDir() + + wfA := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + shared-pipeline: + steps: + - name: step-a + type: step.set + config: + values: + msg: "from a" +` + wfB := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + shared-pipeline: + steps: + - name: step-b + type: step.set + config: + values: + msg: "from b" +` + writeTempYAML(t, dir, "a.yaml", wfA) + writeTempYAML(t, dir, "b.yaml", wfB) + + engine := newTestEngine(t) + err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "pipeline-conflict-app", + Workflows: []config.WorkflowRef{ + {File: filepath.Join(dir, "a.yaml")}, + {File: filepath.Join(dir, "b.yaml")}, + }, + }, + ConfigDir: dir, + }) + if err == nil { + t.Fatal("expected error for pipeline name conflict") + } + if !strings.Contains(err.Error(), "pipeline name conflict") { + t.Fatalf("expected 'pipeline name conflict' in error, got: %v", err) + } +} + + func TestBuildFromApplicationConfig_MultipleWorkflows_MergesPipelines(t *testing.T) { dir := t.TempDir() diff --git a/module/pipeline_step_workflow_call.go b/module/pipeline_step_workflow_call.go index 373b6efc..e0c4d3fe 100644 --- a/module/pipeline_step_workflow_call.go +++ b/module/pipeline_step_workflow_call.go @@ -123,12 +123,13 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S } if s.mode == WorkflowCallModeAsync { - // Fire-and-forget: run in background goroutine with its own timeout - go func() { - asyncCtx, cancel := context.WithTimeout(context.Background(), s.timeout) + // Fire-and-forget: run in background goroutine derived from parent context + // so cancellation signals propagate, bounded by the configured timeout. + go func(parentCtx context.Context, data map[string]any) { + asyncCtx, cancel := context.WithTimeout(parentCtx, s.timeout) defer cancel() - _, _ = target.Execute(asyncCtx, triggerData) //nolint:errcheck - }() + _, _ = target.Execute(asyncCtx, data) //nolint:errcheck + }(ctx, triggerData) return &StepResult{Output: map[string]any{"workflow": s.workflow, "mode": "async", "dispatched": true}}, nil }