Skip to content
Merged
1 change: 1 addition & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var (
anthropicModel = flag.String("anthropic-model", "", "Anthropic model name")

// Multi-workflow mode flags
multiWorkflowAddr = flag.String("multi-workflow-addr", ":8080", "Listen address for the multi-workflow API server")
databaseDSN = flag.String("database-dsn", "", "PostgreSQL connection string for multi-workflow mode")
jwtSecret = flag.String("jwt-secret", "", "JWT signing secret for API authentication")
adminEmail = flag.String("admin-email", "", "Initial admin user email (first-run bootstrap)")
Expand Down
66 changes: 28 additions & 38 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ type StdEngine struct {
moduleFactories map[string]ModuleFactory
logger modular.Logger
modules []modular.Module
triggers []module.Trigger
triggerRegistry *module.TriggerRegistry
triggers []interfaces.Trigger
triggerRegistry interfaces.TriggerRegistrar
dynamicRegistry *dynamic.ComponentRegistry
dynamicLoader *dynamic.Loader
eventEmitter *module.WorkflowEventEmitter
eventEmitter interfaces.EventEmitter
secretsResolver *secrets.MultiResolver
stepRegistry *module.StepRegistry
pluginInstaller *plugin.PluginInstaller
configDir string // directory of the config file, for resolving relative paths
// stepRegistry is a concrete *module.StepRegistry because StepFactory and
// PipelineStep are module-level types not yet abstracted in interfaces.
// TODO(phase5): move StepFactory/PipelineStep to interfaces and change this
// field to interfaces.StepRegistrar.
stepRegistry *module.StepRegistry
pluginInstaller *plugin.PluginInstaller
configDir string // directory of the config file, for resolving relative paths

// triggerTypeMap maps trigger config type keys (e.g., "http", "schedule")
// to trigger names (e.g., "trigger.http", "trigger.schedule"). Populated
Expand Down Expand Up @@ -129,10 +133,10 @@ func NewStdEngine(app modular.Application, logger modular.Logger) *StdEngine {
moduleFactories: make(map[string]ModuleFactory),
logger: logger,
modules: make([]modular.Module, 0),
triggers: make([]module.Trigger, 0),
triggerRegistry: module.NewTriggerRegistry(),
triggers: make([]interfaces.Trigger, 0),
triggerRegistry: newTriggerRegistrar(), // bridge: returns *module.TriggerRegistry
secretsResolver: secrets.NewMultiResolver(),
stepRegistry: module.NewStepRegistry(),
stepRegistry: newStepRegistry(), // bridge: returns *module.StepRegistry
triggerTypeMap: make(map[string]string),
triggerConfigWrappers: make(map[string]plugin.TriggerConfigWrapperFunc),
pipelineRegistry: make(map[string]*module.Pipeline),
Expand All @@ -159,7 +163,7 @@ func (e *StdEngine) RegisterWorkflowHandler(handler WorkflowHandler) {
}

// RegisterTrigger registers a trigger with the engine
func (e *StdEngine) RegisterTrigger(trigger module.Trigger) {
func (e *StdEngine) RegisterTrigger(trigger interfaces.Trigger) {
e.triggers = append(e.triggers, trigger)
e.triggerRegistry.RegisterTrigger(trigger)
}
Expand Down Expand Up @@ -220,26 +224,17 @@ func (e *StdEngine) LoadPlugin(p plugin.EnginePlugin) error {
schema.RegisterModuleType(typeName)
}
for typeName, factory := range p.StepFactories() {
stepFactory := factory
capturedType := typeName
e.stepRegistry.Register(typeName, func(name string, cfg map[string]any, app modular.Application) (module.PipelineStep, error) {
result, err := stepFactory(name, cfg, app)
if err != nil {
return nil, err
}
if step, ok := result.(module.PipelineStep); ok {
return step, nil
}
return nil, fmt.Errorf("step factory for %q returned non-PipelineStep type", capturedType)
})
// Delegate to the bridge helper which type-asserts to module.PipelineStep
// so that engine.go need not reference that concrete type directly.
e.registerPluginSteps(typeName, factory)
}
// Register triggers from plugin. The factory map key is the trigger
// config type (e.g., "http", "schedule") used in YAML configs.
for triggerType, factory := range p.TriggerFactories() {
result := factory()
if trigger, ok := result.(module.Trigger); ok {
e.triggerTypeMap[triggerType] = trigger.Name()
e.RegisterTrigger(trigger)
// Delegate to the bridge helper; triggers are interfaces.Trigger values
// (module.Trigger is a type alias for interfaces.Trigger).
if err := e.registerPluginTrigger(triggerType, factory); err != nil {
return fmt.Errorf("load plugin: %w", err)
}
}

Expand Down Expand Up @@ -404,8 +399,8 @@ func (e *StdEngine) BuildFromConfig(cfg *config.WorkflowConfig) error {
e.logger.Debug("Loaded service: " + name)
}

// Initialize the workflow event emitter
e.eventEmitter = module.NewWorkflowEventEmitter(e.app)
// Initialize the workflow event emitter via bridge (avoids direct module dep).
e.eventEmitter = newEventEmitter(e.app)

// Register config section for workflow
e.app.RegisterConfigSection("workflow", modular.NewStdConfigProvider(cfg))
Expand Down Expand Up @@ -590,14 +585,9 @@ func (e *StdEngine) TriggerWorkflow(ctx context.Context, workflowType string, ac
return fmt.Errorf("no handler found for workflow type: %s", workflowType)
}

// recordWorkflowMetrics records workflow execution metrics if the metrics collector is available.
func (e *StdEngine) recordWorkflowMetrics(workflowType, action, status string, duration time.Duration) {
var mc *module.MetricsCollector
if err := e.app.GetService("metrics.collector", &mc); err == nil && mc != nil {
mc.RecordWorkflowExecution(workflowType, action, status)
mc.RecordWorkflowDuration(workflowType, action, duration)
}
}
// recordWorkflowMetrics is defined in engine_module_bridge.go.
// It records execution metrics via interfaces.MetricsRecorder so that engine.go
// need not reference the concrete *module.MetricsCollector type.

// configureTriggers sets up all triggers from configuration
func (e *StdEngine) configureTriggers(triggerConfigs map[string]any) error {
Expand Down Expand Up @@ -638,7 +628,7 @@ func (e *StdEngine) configureTriggers(triggerConfigs map[string]any) error {
// by looking up the trigger type in the engine's registry. Falls back to
// matching the trigger name directly (e.g., trigger type "mock" matches
// trigger name "mock.trigger" via "trigger.<type>" convention).
func (e *StdEngine) canHandleTrigger(trigger module.Trigger, triggerType string) bool {
func (e *StdEngine) canHandleTrigger(trigger interfaces.Trigger, triggerType string) bool {
// Check the trigger type registry first (populated by LoadPlugin and RegisterTriggerType)
if expectedName, ok := e.triggerTypeMap[triggerType]; ok {
return trigger.Name() == expectedName
Expand Down Expand Up @@ -927,7 +917,7 @@ func (e *StdEngine) LoadedPlugins() []plugin.EnginePlugin {

type Engine interface {
RegisterWorkflowHandler(handler WorkflowHandler)
RegisterTrigger(trigger module.Trigger)
RegisterTrigger(trigger interfaces.Trigger)
AddModuleType(moduleType string, factory ModuleFactory)
BuildFromConfig(cfg *config.WorkflowConfig) error
Start(ctx context.Context) error
Expand Down
100 changes: 100 additions & 0 deletions engine_module_bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package workflow

// engine_module_bridge.go bridges the engine core (engine.go) with concrete
// module implementations that cannot yet be abstracted away without a larger
// refactor. This file intentionally imports the module package so that
// engine.go itself need not import it for these specific operations.
//
// Remaining blockers preventing full engine.go ↔ module decoupling:
//
// 1. module.StepRegistry / module.StepFactory / module.PipelineStep — the
// StepFactory signature returns (PipelineStep, error), and PipelineStep.Execute
// takes *PipelineContext, both of which are concrete types in the module package.
// Moving them to interfaces would require updating 70+ files in module/. Deferred.
//
// 2. module.Pipeline struct construction (configurePipelines,
// configureRoutePipelines, buildPipelineSteps) — depends on module.PipelineStep
// slices and module.ErrorStrategy constants. These functions live in engine.go
// and are candidates for moving here once StepFactory is abstracted.
//
// What HAS been cleaned up in this phase:
// - module.Trigger → interfaces.Trigger (type alias in module; engine uses interfaces)
// - module.WorkflowEventEmitter → interfaces.EventEmitter (engine field + bridge ctor)
// - module.TriggerRegistry → interfaces.TriggerRegistrar (engine field + bridge ctor)
// - recordWorkflowMetrics → uses interfaces.MetricsRecorder (no concrete *MetricsCollector)
// - Plugin step/trigger wiring → bridge helpers (registerPluginSteps, registerPluginTrigger)

import (
"fmt"
"time"

"github.com/CrisisTextLine/modular"
"github.com/GoCodeAlone/workflow/interfaces"
"github.com/GoCodeAlone/workflow/module"
)

// newTriggerRegistrar creates the default concrete trigger registry.
// Called from NewStdEngine so that engine.go need not import module.
func newTriggerRegistrar() interfaces.TriggerRegistrar {
return module.NewTriggerRegistry()
}

// newStepRegistry creates the default concrete step registry.
// *module.StepRegistry satisfies interfaces.StepRegistryProvider and is used
// as a concrete type in engine.go until StepFactory is fully abstracted.
func newStepRegistry() *module.StepRegistry {
return module.NewStepRegistry()
}

// newEventEmitter creates a WorkflowEventEmitter from the application service
// registry. Called from BuildFromConfig after app.Init().
func newEventEmitter(app modular.Application) interfaces.EventEmitter {
return module.NewWorkflowEventEmitter(app)
}

// recordWorkflowMetrics records workflow execution metrics if the metrics
// collector service is available. Uses interfaces.MetricsRecorder so that
// engine.go need not hold a concrete *module.MetricsCollector pointer.
func (e *StdEngine) recordWorkflowMetrics(workflowType, action, status string, duration time.Duration) {
var mr interfaces.MetricsRecorder
if err := e.app.GetService("metrics.collector", &mr); err == nil && mr != nil {
mr.RecordWorkflowExecution(workflowType, action, status)
mr.RecordWorkflowDuration(workflowType, action, duration)
}
}

// registerPluginSteps wires step factories from a plugin into the engine's
// step registry. Lives here (instead of LoadPlugin in engine.go) because it
// type-asserts the factory result to module.PipelineStep.
func (e *StdEngine) registerPluginSteps(typeName string, stepFactory func(name string, cfg map[string]any, app modular.Application) (any, error)) {
capturedType := typeName
e.stepRegistry.Register(typeName, func(name string, cfg map[string]any, app modular.Application) (module.PipelineStep, error) {
result, err := stepFactory(name, cfg, app)
if err != nil {
return nil, err
}
if step, ok := result.(module.PipelineStep); ok {
return step, nil
}
return nil, fmt.Errorf("step factory for %q returned non-PipelineStep type", capturedType)
})
}

// registerPluginTrigger wires a trigger from a plugin into the engine.
// Lives here to avoid a direct module.Trigger type assertion in engine.go.
// Since module.Trigger is now an alias for interfaces.Trigger, the assertion
// uses the canonical interface type.
// Returns an error when the factory returns a value that does not satisfy
// interfaces.Trigger, so LoadPlugin can fail deterministically instead of
// silently skipping the trigger and surfacing a confusing "no handler found"
// error later at runtime.
func (e *StdEngine) registerPluginTrigger(triggerType string, factory func() any) error {
result := factory()
trigger, ok := result.(interfaces.Trigger)
if !ok {
return fmt.Errorf("workflow: plugin trigger factory for %q returned non-Trigger type %T", triggerType, result)
}
e.triggerTypeMap[triggerType] = trigger.Name()
e.RegisterTrigger(trigger)
return nil
}
Loading
Loading