Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,10 @@ func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) (
return nil, fmt.Errorf("failed to set up admin: %w", err)
}

// Build the engine using the application config so BuildFromApplicationConfig
// is called (which handles the pipeline registry for step.workflow_call).
// We re-use buildEngineFromAppConfig but pass the merged+admin config separately
// as the admin modules are now part of combined. Since BuildFromApplicationConfig
// calls MergeApplicationConfig internally, we bypass that by calling
// BuildFromConfig on the already-merged config via buildEngine.
// Build the engine from the already-merged application config (including the
// admin overlay). The merged config is passed directly to buildEngine, which
// internally uses BuildFromConfig and ensures features like the pipeline
// registry for step.workflow_call are configured correctly.
engine, loader, registry, err := buildEngine(combined, logger)
if err != nil {
return nil, fmt.Errorf("failed to build engine: %w", err)
Expand Down
20 changes: 20 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error)
}

combined := NewEmptyWorkflowConfig()
combined.ConfigDir = appCfg.ConfigDir
seenModules := make(map[string]string)
seenTriggers := make(map[string]string)
seenPipelines := make(map[string]string)

for _, ref := range appCfg.Application.Workflows {
if ref.File == "" {
Expand Down Expand Up @@ -203,6 +206,21 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error)
seenModules[modCfg.Name] = wfName
}

for k := range wfCfg.Triggers {
if existing, conflict := seenTriggers[k]; conflict {
return nil, fmt.Errorf("application %q: trigger name conflict: trigger %q is defined in both %q and %q",
appCfg.Application.Name, k, existing, wfName)
}
seenTriggers[k] = wfName
}
for k := range wfCfg.Pipelines {
if existing, conflict := seenPipelines[k]; conflict {
return nil, fmt.Errorf("application %q: pipeline name conflict: pipeline %q is defined in both %q and %q",
appCfg.Application.Name, k, existing, wfName)
}
seenPipelines[k] = wfName
}

combined.Modules = append(combined.Modules, wfCfg.Modules...)
for k, v := range wfCfg.Workflows {
combined.Workflows[k] = v
Expand All @@ -213,6 +231,8 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error)
for k, v := range wfCfg.Pipelines {
combined.Pipelines[k] = v
}
// Fall back to first workflow file's directory if application config
// directory was not set.
if combined.ConfigDir == "" {
combined.ConfigDir = wfCfg.ConfigDir
}
Expand Down
94 changes: 94 additions & 0 deletions engine_multi_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"os"
"path/filepath"
"strings"
"testing"

"github.com/CrisisTextLine/modular"
Expand Down Expand Up @@ -212,6 +213,99 @@ triggers: {}
}
}

func TestBuildFromApplicationConfig_TriggerNameConflict(t *testing.T) {
dir := t.TempDir()

wfA := `
modules: []
workflows: {}
triggers:
my-trigger:
type: http
pipelines: {}
`
wfB := `
modules: []
workflows: {}
triggers:
my-trigger:
type: schedule
pipelines: {}
`
writeTempYAML(t, dir, "a.yaml", wfA)
writeTempYAML(t, dir, "b.yaml", wfB)

engine := newTestEngine(t)
err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{
Application: config.ApplicationInfo{
Name: "trigger-conflict-app",
Workflows: []config.WorkflowRef{
{File: filepath.Join(dir, "a.yaml")},
{File: filepath.Join(dir, "b.yaml")},
},
},
ConfigDir: dir,
})
if err == nil {
t.Fatal("expected error for trigger name conflict")
}
if !strings.Contains(err.Error(), "trigger name conflict") {
t.Fatalf("expected 'trigger name conflict' in error, got: %v", err)
}
}

func TestBuildFromApplicationConfig_PipelineNameConflict(t *testing.T) {
dir := t.TempDir()

wfA := `
modules: []
workflows: {}
triggers: {}
pipelines:
shared-pipeline:
steps:
- name: step-a
type: step.set
config:
values:
msg: "from a"
`
wfB := `
modules: []
workflows: {}
triggers: {}
pipelines:
shared-pipeline:
steps:
- name: step-b
type: step.set
config:
values:
msg: "from b"
`
writeTempYAML(t, dir, "a.yaml", wfA)
writeTempYAML(t, dir, "b.yaml", wfB)

engine := newTestEngine(t)
err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{
Application: config.ApplicationInfo{
Name: "pipeline-conflict-app",
Workflows: []config.WorkflowRef{
{File: filepath.Join(dir, "a.yaml")},
{File: filepath.Join(dir, "b.yaml")},
},
},
ConfigDir: dir,
})
if err == nil {
t.Fatal("expected error for pipeline name conflict")
}
if !strings.Contains(err.Error(), "pipeline name conflict") {
t.Fatalf("expected 'pipeline name conflict' in error, got: %v", err)
}
}


func TestBuildFromApplicationConfig_MultipleWorkflows_MergesPipelines(t *testing.T) {
dir := t.TempDir()

Expand Down
11 changes: 6 additions & 5 deletions module/pipeline_step_workflow_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S
}

if s.mode == WorkflowCallModeAsync {
// Fire-and-forget: run in background goroutine with its own timeout
go func() {
asyncCtx, cancel := context.WithTimeout(context.Background(), s.timeout)
// Fire-and-forget: run in background goroutine derived from parent context
// so cancellation signals propagate, bounded by the configured timeout.
go func(parentCtx context.Context, data map[string]any) {
asyncCtx, cancel := context.WithTimeout(parentCtx, s.timeout)
defer cancel()
_, _ = target.Execute(asyncCtx, triggerData) //nolint:errcheck
}()
_, _ = target.Execute(asyncCtx, data) //nolint:errcheck
}(ctx, triggerData)
return &StepResult{Output: map[string]any{"workflow": s.workflow, "mode": "async", "dispatched": true}}, nil
}

Expand Down