Skip to content

Commit 1bc3e29

Browse files
intel352claudeCopilot
authored
feat: add multi-workflow application config and cross-workflow invocation (#133)
* feat: add multi-workflow application config and cross-workflow invocation (#80) - Add ApplicationConfig format for multi-workflow applications referencing separate workflow YAML files with shared module registry - Add step.workflow_call pipeline step for cross-workflow invocation with sync (call-and-wait) and async (fire-and-forget) modes - Add input/output mapping with template expansion for data passing - Add pipelineRegistry to StdEngine for runtime pipeline lookup - Add BuildFromApplicationConfig() with module name conflict detection - Add auto-detection of application configs in cmd/server - Add step.workflow_call schema to module schemas - Add example multi-workflow app (chat platform with 3 workflows) - Add comprehensive tests for workflow call step and multi-config loading Closes #80 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: conflict detection for triggers/pipelines in MergeApplicationConfig, async context propagation, comment accuracy (#145) * Initial plan * fix: add conflict detection for triggers/pipelines, fix async context, clarify comment Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent f563be7 commit 1bc3e29

14 files changed

Lines changed: 1663 additions & 18 deletions

File tree

cmd/server/main.go

Lines changed: 136 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,16 +196,33 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std
196196

197197
// loadConfig loads a workflow configuration from the configured file path,
198198
// or returns an empty config if no path is set.
199-
func loadConfig(logger *slog.Logger) (*config.WorkflowConfig, error) {
199+
// If the config file contains an application-level config (multi-workflow),
200+
// the returned WorkflowConfig will be nil and the ApplicationConfig will be set.
201+
func loadConfig(logger *slog.Logger) (*config.WorkflowConfig, *config.ApplicationConfig, error) {
200202
if *configFile != "" {
203+
// Peek at the file to detect whether it is an application config.
204+
data, err := os.ReadFile(*configFile)
205+
if err != nil {
206+
return nil, nil, fmt.Errorf("failed to read configuration file: %w", err)
207+
}
208+
209+
if config.IsApplicationConfig(data) {
210+
logger.Info("Detected multi-workflow application config", "file", *configFile)
211+
appCfg, err := config.LoadApplicationConfig(*configFile)
212+
if err != nil {
213+
return nil, nil, fmt.Errorf("failed to load application configuration: %w", err)
214+
}
215+
return nil, appCfg, nil
216+
}
217+
201218
cfg, err := config.LoadFromFile(*configFile)
202219
if err != nil {
203-
return nil, fmt.Errorf("failed to load configuration: %w", err)
220+
return nil, nil, fmt.Errorf("failed to load configuration: %w", err)
204221
}
205-
return cfg, nil
222+
return cfg, nil, nil
206223
}
207224
logger.Info("No config file specified, using empty workflow config")
208-
return config.NewEmptyWorkflowConfig(), nil
225+
return config.NewEmptyWorkflowConfig(), nil, nil
209226
}
210227

211228
// ---------------------------------------------------------------------------
@@ -360,6 +377,111 @@ func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error)
360377
return app, nil
361378
}
362379

380+
// setupFromAppConfig initializes all server components from a multi-workflow
381+
// application config. It merges all workflow files into a combined WorkflowConfig,
382+
// applies the admin config overlay, then builds the engine using
383+
// BuildFromApplicationConfig so cross-workflow pipeline calls are wired up.
384+
func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) (*serverApp, error) {
385+
// Merge all workflow files into a combined config so the admin overlay
386+
// can be applied consistently (module names, route configs, etc.).
387+
combined, err := config.MergeApplicationConfig(appCfg)
388+
if err != nil {
389+
return nil, fmt.Errorf("failed to merge application config: %w", err)
390+
}
391+
392+
// Apply admin config overlay (admin UI, management routes, etc.).
393+
if err := mergeAdminConfig(logger, combined); err != nil {
394+
return nil, fmt.Errorf("failed to set up admin: %w", err)
395+
}
396+
397+
// Build the engine from the already-merged application config (including the
398+
// admin overlay). The merged config is passed directly to buildEngine, which
399+
// internally uses BuildFromConfig and ensures features like the pipeline
400+
// registry for step.workflow_call are configured correctly.
401+
engine, loader, registry, err := buildEngine(combined, logger)
402+
if err != nil {
403+
return nil, fmt.Errorf("failed to build engine: %w", err)
404+
}
405+
406+
sApp := &serverApp{
407+
engine: engine,
408+
logger: logger,
409+
}
410+
411+
pool := dynamic.NewInterpreterPool()
412+
aiSvc, deploySvc := initAIService(logger, registry, pool)
413+
initManagementHandlers(logger, engine, combined, sApp, aiSvc, deploySvc, loader, registry)
414+
registerManagementServices(logger, sApp)
415+
416+
sApp.postStartFuncs = append(sApp.postStartFuncs, func() error {
417+
if err := sApp.initStores(logger); err != nil {
418+
return err
419+
}
420+
return sApp.registerPostStartServices(logger)
421+
}, func() error {
422+
return sApp.importBundles(logger)
423+
})
424+
425+
sApp.mgmt.auditLogger = audit.NewLogger(os.Stdout)
426+
sApp.mgmt.auditLogger.LogConfigChange(context.Background(), "system", "server",
427+
"server started with application config: "+appCfg.Application.Name)
428+
429+
return sApp, nil
430+
}
431+
432+
// mergeAdminConfig loads the embedded admin config and merges admin
433+
// modules/routes into the primary config. If --admin-ui-dir (or ADMIN_UI_DIR
434+
// env var) is set the static.fileserver root is updated to that path,
435+
// allowing the admin UI to be deployed and updated independently of the binary.
436+
// If the config already contains admin modules (e.g., the user passed the
437+
// admin config directly), the merge is skipped to avoid duplicates — but
438+
// the UI root is still injected so the static fileserver works.
439+
func mergeAdminConfig(logger *slog.Logger, cfg *config.WorkflowConfig) error {
440+
// Resolve the UI root: flag > ADMIN_UI_DIR env > leave as configured in config.yaml
441+
uiDir := *adminUIDir
442+
443+
// Check if the config already contains admin modules
444+
for _, m := range cfg.Modules {
445+
if m.Name == "admin-server" {
446+
logger.Info("Config already contains admin modules, skipping merge")
447+
if uiDir != "" {
448+
injectUIRoot(cfg, uiDir)
449+
logger.Info("Admin UI root overridden", "uiDir", uiDir)
450+
}
451+
return nil
452+
}
453+
}
454+
455+
adminCfg, err := admin.LoadConfig()
456+
if err != nil {
457+
return err
458+
}
459+
460+
if uiDir != "" {
461+
injectUIRoot(adminCfg, uiDir)
462+
logger.Info("Admin UI root overridden", "uiDir", uiDir)
463+
}
464+
465+
// Merge admin modules and routes into primary config
466+
admin.MergeInto(cfg, adminCfg)
467+
468+
logger.Info("Admin UI enabled")
469+
return nil
470+
}
471+
472+
// injectUIRoot updates every static.fileserver module config in cfg to serve
473+
// from the given root directory.
474+
func injectUIRoot(cfg *config.WorkflowConfig, uiRoot string) {
475+
for i := range cfg.Modules {
476+
if cfg.Modules[i].Type == "static.fileserver" {
477+
if cfg.Modules[i].Config == nil {
478+
cfg.Modules[i].Config = make(map[string]any)
479+
}
480+
cfg.Modules[i].Config["root"] = uiRoot
481+
}
482+
}
483+
}
484+
363485
// initManagementHandlers creates all management service handlers and stores
364486
// them on the serverApp struct. These handlers are created once and persist
365487
// across engine reloads. Only the service registrations need to be refreshed.
@@ -1389,20 +1511,20 @@ func main() {
13891511
pgStore.Close()
13901512
}()
13911513

1392-
// Block until a termination signal is received, then let deferred cleanup run.
1393-
sigCh := make(chan os.Signal, 1)
1394-
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
1395-
fmt.Printf("Multi-workflow API on %s\n", *multiWorkflowAddr)
1396-
<-sigCh
1397-
fmt.Println("Shutting down multi-workflow mode...")
1398-
return
1399-
}
1400-
cfg, err := loadConfig(logger)
1514+
// Load configuration — supports both single-workflow and multi-workflow application configs.
1515+
cfg, appCfg, err := loadConfig(logger)
14011516
if err != nil {
14021517
log.Fatalf("Configuration error: %v", err) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
14031518
}
14041519

1405-
app, err := setup(logger, cfg)
1520+
var app *serverApp
1521+
if appCfg != nil {
1522+
// Multi-workflow application config: build engine from application config
1523+
app, err = setupFromAppConfig(logger, appCfg)
1524+
} else {
1525+
// Single-workflow config (backward-compatible)
1526+
app, err = setup(logger, cfg)
1527+
}
14061528
if err != nil {
14071529
log.Fatalf("Setup error: %v", err) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
14081530
}

cmd/server/main_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,16 @@ func TestLoadConfig_NoFile(t *testing.T) {
329329
*configFile = ""
330330
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
331331

332-
cfg, err := loadConfig(logger)
332+
cfg, appCfg, err := loadConfig(logger)
333333
if err != nil {
334334
t.Fatalf("loadConfig failed: %v", err)
335335
}
336336
if cfg == nil {
337337
t.Fatal("expected non-nil config")
338338
}
339+
if appCfg != nil {
340+
t.Fatal("expected nil application config for empty configFile")
341+
}
339342
if len(cfg.Modules) != 0 {
340343
t.Errorf("expected empty modules, got %d", len(cfg.Modules))
341344
}
@@ -346,7 +349,7 @@ func TestLoadConfig_InvalidFile(t *testing.T) {
346349
defer func() { *configFile = "" }()
347350
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
348351

349-
_, err := loadConfig(logger)
352+
_, _, err := loadConfig(logger)
350353
if err == nil {
351354
t.Fatal("expected error for nonexistent config file")
352355
}
@@ -377,10 +380,13 @@ triggers: {}
377380
defer func() { *configFile = "" }()
378381
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
379382

380-
cfg, err := loadConfig(logger)
383+
cfg, appCfg, err := loadConfig(logger)
381384
if err != nil {
382385
t.Fatalf("loadConfig failed: %v", err)
383386
}
387+
if appCfg != nil {
388+
t.Fatal("expected nil application config for single-workflow YAML")
389+
}
384390
if len(cfg.Modules) != 1 {
385391
t.Fatalf("expected 1 module, got %d", len(cfg.Modules))
386392
}

config/config.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,67 @@ import (
88
"gopkg.in/yaml.v3"
99
)
1010

11+
// WorkflowRef is a reference to a workflow config file within an application config.
12+
type WorkflowRef struct {
13+
// File is the path to the workflow YAML config file (relative to the application config).
14+
File string `json:"file" yaml:"file"`
15+
// Name is an optional override for the workflow's name within the application namespace.
16+
// If empty, the filename stem (without extension) is used.
17+
Name string `json:"name,omitempty" yaml:"name,omitempty"`
18+
}
19+
20+
// ApplicationInfo holds top-level metadata about a multi-workflow application.
21+
type ApplicationInfo struct {
22+
// Name is the application name.
23+
Name string `json:"name" yaml:"name"`
24+
// Workflows lists the workflow config files that make up this application.
25+
Workflows []WorkflowRef `json:"workflows" yaml:"workflows"`
26+
}
27+
28+
// ApplicationConfig is the top-level config for a multi-workflow application.
29+
// It references multiple workflow config files that share a module registry.
30+
type ApplicationConfig struct {
31+
// Application holds the application-level metadata and workflow references.
32+
Application ApplicationInfo `json:"application" yaml:"application"`
33+
// ConfigDir is the directory of the application config file, used for resolving relative paths.
34+
ConfigDir string `json:"-" yaml:"-"`
35+
}
36+
37+
// LoadApplicationConfig loads an application config from a YAML file.
38+
func LoadApplicationConfig(filepath string) (*ApplicationConfig, error) {
39+
data, err := os.ReadFile(filepath)
40+
if err != nil {
41+
return nil, fmt.Errorf("failed to read application config file: %w", err)
42+
}
43+
44+
var cfg ApplicationConfig
45+
if err := yaml.Unmarshal(data, &cfg); err != nil {
46+
return nil, fmt.Errorf("failed to parse application config file: %w", err)
47+
}
48+
49+
// Store the config file's directory for relative path resolution
50+
absPath, err := pathpkg.Abs(filepath)
51+
if err == nil {
52+
cfg.ConfigDir = pathpkg.Dir(absPath)
53+
}
54+
55+
return &cfg, nil
56+
}
57+
58+
// IsApplicationConfig returns true if the YAML data contains an application-level config
59+
// (i.e., has an "application" key with a "workflows" section).
60+
func IsApplicationConfig(data []byte) bool {
61+
var probe struct {
62+
Application *struct {
63+
Workflows []any `yaml:"workflows"`
64+
} `yaml:"application"`
65+
}
66+
if err := yaml.Unmarshal(data, &probe); err != nil {
67+
return false
68+
}
69+
return probe.Application != nil && len(probe.Application.Workflows) > 0
70+
}
71+
1172
// ModuleConfig represents a single module configuration
1273
type ModuleConfig struct {
1374
Name string `json:"name" yaml:"name"`
@@ -98,6 +159,88 @@ func ResolvePathInConfig(cfg map[string]any, path string) string {
98159
return path
99160
}
100161

162+
// MergeApplicationConfig loads all workflow config files referenced by an
163+
// ApplicationConfig and merges them into a single WorkflowConfig. This is
164+
// useful for callers that need a single combined config (e.g., the server's
165+
// admin merge step) before passing it to the engine.
166+
//
167+
// Module name conflicts across files are reported as errors.
168+
func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) {
169+
if appCfg == nil {
170+
return nil, fmt.Errorf("application config is nil")
171+
}
172+
173+
combined := NewEmptyWorkflowConfig()
174+
combined.ConfigDir = appCfg.ConfigDir
175+
seenModules := make(map[string]string)
176+
seenTriggers := make(map[string]string)
177+
seenPipelines := make(map[string]string)
178+
179+
for _, ref := range appCfg.Application.Workflows {
180+
if ref.File == "" {
181+
return nil, fmt.Errorf("application %q: workflow reference has no 'file' field", appCfg.Application.Name)
182+
}
183+
184+
filePath := ref.File
185+
if !pathpkg.IsAbs(filePath) && appCfg.ConfigDir != "" {
186+
filePath = pathpkg.Join(appCfg.ConfigDir, filePath)
187+
}
188+
189+
wfCfg, err := LoadFromFile(filePath)
190+
if err != nil {
191+
return nil, fmt.Errorf("application %q: failed to load workflow file %q: %w", appCfg.Application.Name, ref.File, err)
192+
}
193+
194+
// Derive a name for error messages
195+
wfName := ref.Name
196+
if wfName == "" {
197+
base := pathpkg.Base(filePath)
198+
wfName = base[:len(base)-len(pathpkg.Ext(base))]
199+
}
200+
201+
for _, modCfg := range wfCfg.Modules {
202+
if existing, conflict := seenModules[modCfg.Name]; conflict {
203+
return nil, fmt.Errorf("application %q: module name conflict: module %q is defined in both %q and %q",
204+
appCfg.Application.Name, modCfg.Name, existing, wfName)
205+
}
206+
seenModules[modCfg.Name] = wfName
207+
}
208+
209+
for k := range wfCfg.Triggers {
210+
if existing, conflict := seenTriggers[k]; conflict {
211+
return nil, fmt.Errorf("application %q: trigger name conflict: trigger %q is defined in both %q and %q",
212+
appCfg.Application.Name, k, existing, wfName)
213+
}
214+
seenTriggers[k] = wfName
215+
}
216+
for k := range wfCfg.Pipelines {
217+
if existing, conflict := seenPipelines[k]; conflict {
218+
return nil, fmt.Errorf("application %q: pipeline name conflict: pipeline %q is defined in both %q and %q",
219+
appCfg.Application.Name, k, existing, wfName)
220+
}
221+
seenPipelines[k] = wfName
222+
}
223+
224+
combined.Modules = append(combined.Modules, wfCfg.Modules...)
225+
for k, v := range wfCfg.Workflows {
226+
combined.Workflows[k] = v
227+
}
228+
for k, v := range wfCfg.Triggers {
229+
combined.Triggers[k] = v
230+
}
231+
for k, v := range wfCfg.Pipelines {
232+
combined.Pipelines[k] = v
233+
}
234+
// Fall back to first workflow file's directory if application config
235+
// directory was not set.
236+
if combined.ConfigDir == "" {
237+
combined.ConfigDir = wfCfg.ConfigDir
238+
}
239+
}
240+
241+
return combined, nil
242+
}
243+
101244
// NewEmptyWorkflowConfig creates a new empty workflow configuration
102245
func NewEmptyWorkflowConfig() *WorkflowConfig {
103246
return &WorkflowConfig{

0 commit comments

Comments
 (0)