Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 21 additions & 33 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/GoCodeAlone/workflow/deploy"
"github.com/GoCodeAlone/workflow/dynamic"
"github.com/GoCodeAlone/workflow/environment"
"github.com/GoCodeAlone/workflow/handlers"
"github.com/GoCodeAlone/workflow/interfaces"
"github.com/GoCodeAlone/workflow/module"
"github.com/GoCodeAlone/workflow/observability"
Expand Down Expand Up @@ -92,13 +91,12 @@ var (
)

// buildEngine creates the workflow engine with all handlers registered and built from config.
func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, *handlers.PipelineWorkflowHandler, error) {
func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, error) {
app := modular.NewStdApplication(nil, logger)
engine := workflow.NewStdEngine(app, logger)

// Load all engine plugins — each registers its module factories, step factories,
// trigger factories, and workflow handlers via engine.LoadPlugin.
pipelinePlugin := pluginpipeline.New()
plugins := []plugin.EnginePlugin{
pluginlicense.New(),
pluginhttp.New(),
Expand All @@ -108,7 +106,7 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std
pluginauth.New(),
pluginstorage.New(),
pluginapi.New(),
pipelinePlugin,
pluginpipeline.New(),
plugincicd.New(),
pluginff.New(),
pluginsecrets.New(),
Expand Down Expand Up @@ -149,15 +147,6 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std
}
}

// Wire the PipelineWorkflowHandler (provided by the pipeline plugin) with
// the engine's StepRegistry and logger. The handler was already registered
// by LoadPlugin; we just need to inject its dependencies.
pipelineHandler := pipelinePlugin.PipelineHandler()
if pipelineHandler != nil {
pipelineHandler.SetStepRegistry(engine.GetStepRegistry())
pipelineHandler.SetLogger(logger)
}

// Set up dynamic component system
pool := dynamic.NewInterpreterPool()
registry := dynamic.NewComponentRegistry()
Expand All @@ -179,10 +168,10 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std

// Build engine from config
if err := engine.BuildFromConfig(cfg); err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to build workflow: %w", err)
return nil, nil, nil, fmt.Errorf("failed to build workflow: %w", err)
}

return engine, loader, registry, pipelineHandler, nil
return engine, loader, registry, nil
}

// loadConfig loads a workflow configuration from the configured file path,
Expand Down Expand Up @@ -226,12 +215,6 @@ type closableEventStore interface {
Close() error
}

// pipelineEventSetter is the subset of *handlers.PipelineWorkflowHandler
// called after the engine starts.
type pipelineEventSetter interface {
SetEventRecorder(r module.EventRecorder)
}

// executionTrackerIface is the minimal interface over *module.ExecutionTracker.
type executionTrackerIface interface {
module.ExecutionTrackerProvider
Expand Down Expand Up @@ -285,7 +268,6 @@ type mgmtComponents struct {
// instance after an engine reload.
type serviceComponents struct {
v1Handler http.Handler // V1 API handler (dashboard)
pipelineHandler pipelineEventSetter // pipeline execution handler
executionTracker executionTrackerIface // CQRS execution tracking
runtimeManager runtimeLifecycle // filesystem-loaded workflow instances
reporter observabilityReporter // background observability reporter
Expand Down Expand Up @@ -330,12 +312,11 @@ func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error)
return nil, fmt.Errorf("failed to set up admin: %w", err)
}

engine, loader, registry, pipelineHandler, err := buildEngine(cfg, logger)
engine, loader, registry, err := buildEngine(cfg, logger)
if err != nil {
return nil, fmt.Errorf("failed to build engine: %w", err)
}
app.engine = engine
app.services.pipelineHandler = pipelineHandler

// Initialize AI services and dynamic component pool
pool := dynamic.NewInterpreterPool()
Expand Down Expand Up @@ -836,7 +817,7 @@ func (app *serverApp) initStores(logger *slog.Logger) error {

// Always create a RuntimeManager (returns empty list when no workflows loaded)
runtimeBuilder := func(cfg *config.WorkflowConfig, lg *slog.Logger) (func(context.Context) error, error) {
eng, _, _, _, buildErr := buildEngine(cfg, lg)
eng, _, _, buildErr := buildEngine(cfg, lg)
if buildErr != nil {
return nil, buildErr
}
Expand Down Expand Up @@ -886,11 +867,19 @@ func (app *serverApp) registerPostStartServices(logger *slog.Logger) error {
engine := app.engine

// Wire EventRecorder adapter to the pipeline handler so pipeline
// executions emit events to the event store.
if app.stores.eventStore != nil && app.services.pipelineHandler != nil {
recorder := evstore.NewEventRecorderAdapter(app.stores.eventStore)
app.services.pipelineHandler.SetEventRecorder(recorder)
logger.Info("Wired EventRecorder to PipelineWorkflowHandler")
// executions emit events to the event store. The handler is discovered
// via the service registry (registered by the pipelinesteps plugin wiring hook).
if app.stores.eventStore != nil {
type eventRecorderSetter interface {
SetEventRecorder(r module.EventRecorder)
}
if svc, ok := engine.GetApp().SvcRegistry()[pluginpipeline.PipelineHandlerServiceName]; ok {
if ph, ok := svc.(eventRecorderSetter); ok {
recorder := evstore.NewEventRecorderAdapter(app.stores.eventStore)
ph.SetEventRecorder(recorder)
logger.Info("Wired EventRecorder to PipelineWorkflowHandler")
}
}
}

// Register V1 handler
Expand Down Expand Up @@ -989,15 +978,14 @@ func (app *serverApp) reloadEngine(newCfg *config.WorkflowConfig) error {
}

// Build and start a new engine
newEngine, _, _, newPipelineHandler, buildErr := buildEngine(newCfg, logger)
newEngine, _, _, buildErr := buildEngine(newCfg, logger)
if buildErr != nil {
return fmt.Errorf("failed to rebuild engine: %w", buildErr)
}

// Update the serverApp references BEFORE registering services,
// Update the serverApp reference BEFORE registering services,
// since registerManagementServices reads app.engine.
app.engine = newEngine
app.services.pipelineHandler = newPipelineHandler

// Re-register pre-start management services with the new Application
registerManagementServices(logger, app)
Expand Down
22 changes: 12 additions & 10 deletions cmd/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestBuildEngine_WithConfig(t *testing.T) {
Triggers: map[string]any{},
}

engine, loader, registry, _, err := buildEngine(cfg, logger)
engine, loader, registry, err := buildEngine(cfg, logger)
if err != nil {
t.Fatalf("buildEngine failed: %v", err)
}
Expand All @@ -210,7 +210,7 @@ func TestBuildEngine_EmptyConfig(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
cfg := config.NewEmptyWorkflowConfig()

engine, loader, registry, _, err := buildEngine(cfg, logger)
engine, loader, registry, err := buildEngine(cfg, logger)
if err != nil {
t.Fatalf("buildEngine with empty config failed: %v", err)
}
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestBuildEngine_InvalidModuleType(t *testing.T) {
Triggers: map[string]any{},
}

_, _, _, _, err := buildEngine(cfg, logger)
_, _, _, err := buildEngine(cfg, logger)
if err == nil {
t.Fatal("expected error for invalid module type")
}
Expand Down Expand Up @@ -792,20 +792,22 @@ func TestImportBundles_MultipleBundles(t *testing.T) {
// mockFeatureFlagAdmin implements module.FeatureFlagAdmin for testing.
type mockFeatureFlagAdmin struct{}

func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) UpdateFlag(key string, data json.RawMessage) (any, error) {
return nil, nil
}
func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil }
func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil }
func (m *mockFeatureFlagAdmin) SetOverrides(key string, data json.RawMessage) (any, error) {
return nil, nil
}
func (m *mockFeatureFlagAdmin) EvaluateFlag(key string, user string, group string) (any, error) {
return nil, nil
}
func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {}) }
func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {})
}

// TestFeatureFlagAutoWiring verifies that registerPostStartServices wires a
// FeatureFlagAdmin from the service registry into the V1 API handler.
Expand All @@ -821,7 +823,7 @@ func TestFeatureFlagAutoWiring_WiredWhenServicePresent(t *testing.T) {
t.Cleanup(func() { store.Close() })

logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
engine, _, _, _, err := buildEngine(config.NewEmptyWorkflowConfig(), logger)
engine, _, _, err := buildEngine(config.NewEmptyWorkflowConfig(), logger)
if err != nil {
t.Fatalf("buildEngine: %v", err)
}
Expand Down Expand Up @@ -869,7 +871,7 @@ func TestFeatureFlagAutoWiring_NotWiredWhenServiceAbsent(t *testing.T) {
t.Cleanup(func() { store.Close() })

logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
engine, _, _, _, err := buildEngine(config.NewEmptyWorkflowConfig(), logger)
engine, _, _, err := buildEngine(config.NewEmptyWorkflowConfig(), logger)
if err != nil {
t.Fatalf("buildEngine: %v", err)
}
Expand Down
19 changes: 19 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workflow
import (
"context"
"fmt"
"log/slog"
"strings"
"time"

Expand Down Expand Up @@ -240,6 +241,24 @@ func (e *StdEngine) LoadPlugin(p plugin.EnginePlugin) error {
e.RegisterWorkflowHandler(handler)
}
}
// Inject step registry and logger into the plugin via optional setter
// interfaces, following the same pattern as SetDynamicRegistry.
type stepRegistrySetter interface {
SetStepRegistry(interfaces.StepRegistryProvider)
}
if setter, ok := p.(stepRegistrySetter); ok {
setter.SetStepRegistry(e.stepRegistry)
}
// Inject *slog.Logger if the engine's logger is backed by one.
// Plugins declare SetLogger(*slog.Logger) to receive a structured logger.
type slogLoggerSetter interface {
SetLogger(logger *slog.Logger)
}
if setter, ok := p.(slogLoggerSetter); ok {
if sl, ok := e.logger.(*slog.Logger); ok {
setter.SetLogger(sl)
}
}
e.enginePlugins = append(e.enginePlugins, p)
return nil
}
Expand Down
14 changes: 10 additions & 4 deletions engine_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ func setupPipelineEngine(t *testing.T) (*StdEngine, *handlers.PipelineWorkflowHa
// Load pipelinesteps plugin — it registers step factories and the
// PipelineWorkflowHandler. We use it directly (no loadAllPlugins) to avoid
// registering a second handler that would shadow the plugin's handler.
pp := pluginpipeline.New()
if err := engine.LoadPlugin(pp); err != nil {
if err := engine.LoadPlugin(pluginpipeline.New()); err != nil {
t.Fatalf("LoadPlugin(pipelinesteps) failed: %v", err)
}

// Retrieve the pipeline handler created by the plugin so tests can inspect it.
pipelineHandler := pp.PipelineHandler()
// Retrieve the pipeline handler registered by the plugin via type assertion
// on the engine's workflow handler list.
var pipelineHandler *handlers.PipelineWorkflowHandler
for _, h := range engine.workflowHandlers {
if ph, ok := h.(*handlers.PipelineWorkflowHandler); ok {
pipelineHandler = ph
break
}
}
if pipelineHandler == nil {
t.Fatal("pipelinesteps plugin did not create a PipelineWorkflowHandler")
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/http/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,9 @@ func TestRateLimitMiddlewareFactory_InvalidValues(t *testing.T) {

// Zero requestsPerHour must fall through to requestsPerMinute path (not crash).
modZeroRPH := factory("rl-zero-rph", map[string]any{
"requestsPerHour": 0,
"requestsPerMinute": 30,
"burstSize": 5,
"requestsPerHour": 0,
"requestsPerMinute": 30,
"burstSize": 5,
})
if modZeroRPH == nil {
t.Fatal("factory returned nil for zero requestsPerHour config")
Expand Down
57 changes: 51 additions & 6 deletions plugins/pipelinesteps/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,30 @@
package pipelinesteps

import (
"log/slog"

"github.com/CrisisTextLine/modular"
"github.com/GoCodeAlone/workflow/capability"
"github.com/GoCodeAlone/workflow/config"
"github.com/GoCodeAlone/workflow/handlers"
"github.com/GoCodeAlone/workflow/interfaces"
"github.com/GoCodeAlone/workflow/module"
"github.com/GoCodeAlone/workflow/plugin"
)

// PipelineHandlerServiceName is the service name under which the
// PipelineWorkflowHandler is registered in the app's service registry.
// External components can look it up to call SetEventRecorder after startup.
const PipelineHandlerServiceName = "pipeline-workflow-handler"

// Plugin registers generic pipeline step factories and the pipeline workflow handler.
type Plugin struct {
plugin.BaseEnginePlugin
// pipelineHandler is retained so the wiring hook can inject dependencies.
pipelineHandler *handlers.PipelineWorkflowHandler
// stepRegistry and logger are injected by the engine via optional setter interfaces.
stepRegistry interfaces.StepRegistryProvider
logger *slog.Logger
}

// New creates a new pipeline-steps plugin.
Expand Down Expand Up @@ -97,17 +109,50 @@ func (p *Plugin) WorkflowHandlers() map[string]plugin.WorkflowHandlerFactory {
}
}

// PipelineHandler returns the plugin's pipeline handler instance, if created.
// This is used by the engine's wiring hook to inject StepRegistry and Logger.
func (p *Plugin) PipelineHandler() *handlers.PipelineWorkflowHandler {
return p.pipelineHandler
// SetStepRegistry is called by the engine (via optional-interface detection in LoadPlugin)
// to inject the step registry after all step factories have been registered.
func (p *Plugin) SetStepRegistry(registry interfaces.StepRegistryProvider) {
p.stepRegistry = registry
}

// SetLogger is called by the engine (via optional-interface detection in LoadPlugin)
// to inject the application logger.
func (p *Plugin) SetLogger(logger *slog.Logger) {
p.logger = logger
}

// WiringHooks returns a hook that wires the injected step registry and logger into
// the PipelineWorkflowHandler and registers the handler as a named service so that
// other components (e.g. the server) can look it up without reaching into the plugin.
func (p *Plugin) WiringHooks() []plugin.WiringHook {
return []plugin.WiringHook{
{
Name: "pipeline-handler-wiring",
Priority: 50,
Hook: func(app modular.Application, _ *config.WorkflowConfig) error {
if p.pipelineHandler == nil {
return nil
}
if p.stepRegistry != nil {
p.pipelineHandler.SetStepRegistry(p.stepRegistry)
}
if p.logger != nil {
p.pipelineHandler.SetLogger(p.logger)
}
// Register the handler as a service so callers can discover it
// (e.g. to wire SetEventRecorder post-start) without a plugin-specific getter.
_ = app.RegisterService(PipelineHandlerServiceName, p.pipelineHandler)
return nil
},
},
}
}

// wrapStepFactory converts a module.StepFactory to a plugin.StepFactory,
// threading the modular.Application through so steps like db_exec and
// db_query can access the service registry.
func wrapStepFactory(f module.StepFactory) plugin.StepFactory {
return func(name string, config map[string]any, app modular.Application) (any, error) {
return f(name, config, app)
return func(name string, cfg map[string]any, app modular.Application) (any, error) {
return f(name, cfg, app)
}
}
Loading