diff --git a/cmd/server/main.go b/cmd/server/main.go index 6bfe1509..aaf59157 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -28,7 +28,6 @@ import ( "github.com/GoCodeAlone/workflow/deploy" "github.com/GoCodeAlone/workflow/dynamic" "github.com/GoCodeAlone/workflow/environment" - "github.com/GoCodeAlone/workflow/handlers" "github.com/GoCodeAlone/workflow/interfaces" "github.com/GoCodeAlone/workflow/module" "github.com/GoCodeAlone/workflow/observability" @@ -92,13 +91,12 @@ var ( ) // buildEngine creates the workflow engine with all handlers registered and built from config. -func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, *handlers.PipelineWorkflowHandler, error) { +func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, error) { app := modular.NewStdApplication(nil, logger) engine := workflow.NewStdEngine(app, logger) // Load all engine plugins — each registers its module factories, step factories, // trigger factories, and workflow handlers via engine.LoadPlugin. - pipelinePlugin := pluginpipeline.New() plugins := []plugin.EnginePlugin{ pluginlicense.New(), pluginhttp.New(), @@ -108,7 +106,7 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std pluginauth.New(), pluginstorage.New(), pluginapi.New(), - pipelinePlugin, + pluginpipeline.New(), plugincicd.New(), pluginff.New(), pluginsecrets.New(), @@ -149,15 +147,6 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std } } - // Wire the PipelineWorkflowHandler (provided by the pipeline plugin) with - // the engine's StepRegistry and logger. The handler was already registered - // by LoadPlugin; we just need to inject its dependencies. - pipelineHandler := pipelinePlugin.PipelineHandler() - if pipelineHandler != nil { - pipelineHandler.SetStepRegistry(engine.GetStepRegistry()) - pipelineHandler.SetLogger(logger) - } - // Set up dynamic component system pool := dynamic.NewInterpreterPool() registry := dynamic.NewComponentRegistry() @@ -179,10 +168,10 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std // Build engine from config if err := engine.BuildFromConfig(cfg); err != nil { - return nil, nil, nil, nil, fmt.Errorf("failed to build workflow: %w", err) + return nil, nil, nil, fmt.Errorf("failed to build workflow: %w", err) } - return engine, loader, registry, pipelineHandler, nil + return engine, loader, registry, nil } // loadConfig loads a workflow configuration from the configured file path, @@ -226,12 +215,6 @@ type closableEventStore interface { Close() error } -// pipelineEventSetter is the subset of *handlers.PipelineWorkflowHandler -// called after the engine starts. -type pipelineEventSetter interface { - SetEventRecorder(r module.EventRecorder) -} - // executionTrackerIface is the minimal interface over *module.ExecutionTracker. type executionTrackerIface interface { module.ExecutionTrackerProvider @@ -285,7 +268,6 @@ type mgmtComponents struct { // instance after an engine reload. type serviceComponents struct { v1Handler http.Handler // V1 API handler (dashboard) - pipelineHandler pipelineEventSetter // pipeline execution handler executionTracker executionTrackerIface // CQRS execution tracking runtimeManager runtimeLifecycle // filesystem-loaded workflow instances reporter observabilityReporter // background observability reporter @@ -330,12 +312,11 @@ func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error) return nil, fmt.Errorf("failed to set up admin: %w", err) } - engine, loader, registry, pipelineHandler, err := buildEngine(cfg, logger) + engine, loader, registry, err := buildEngine(cfg, logger) if err != nil { return nil, fmt.Errorf("failed to build engine: %w", err) } app.engine = engine - app.services.pipelineHandler = pipelineHandler // Initialize AI services and dynamic component pool pool := dynamic.NewInterpreterPool() @@ -836,7 +817,7 @@ func (app *serverApp) initStores(logger *slog.Logger) error { // Always create a RuntimeManager (returns empty list when no workflows loaded) runtimeBuilder := func(cfg *config.WorkflowConfig, lg *slog.Logger) (func(context.Context) error, error) { - eng, _, _, _, buildErr := buildEngine(cfg, lg) + eng, _, _, buildErr := buildEngine(cfg, lg) if buildErr != nil { return nil, buildErr } @@ -886,11 +867,19 @@ func (app *serverApp) registerPostStartServices(logger *slog.Logger) error { engine := app.engine // Wire EventRecorder adapter to the pipeline handler so pipeline - // executions emit events to the event store. - if app.stores.eventStore != nil && app.services.pipelineHandler != nil { - recorder := evstore.NewEventRecorderAdapter(app.stores.eventStore) - app.services.pipelineHandler.SetEventRecorder(recorder) - logger.Info("Wired EventRecorder to PipelineWorkflowHandler") + // executions emit events to the event store. The handler is discovered + // via the service registry (registered by the pipelinesteps plugin wiring hook). + if app.stores.eventStore != nil { + type eventRecorderSetter interface { + SetEventRecorder(r module.EventRecorder) + } + if svc, ok := engine.GetApp().SvcRegistry()[pluginpipeline.PipelineHandlerServiceName]; ok { + if ph, ok := svc.(eventRecorderSetter); ok { + recorder := evstore.NewEventRecorderAdapter(app.stores.eventStore) + ph.SetEventRecorder(recorder) + logger.Info("Wired EventRecorder to PipelineWorkflowHandler") + } + } } // Register V1 handler @@ -989,15 +978,14 @@ func (app *serverApp) reloadEngine(newCfg *config.WorkflowConfig) error { } // Build and start a new engine - newEngine, _, _, newPipelineHandler, buildErr := buildEngine(newCfg, logger) + newEngine, _, _, buildErr := buildEngine(newCfg, logger) if buildErr != nil { return fmt.Errorf("failed to rebuild engine: %w", buildErr) } - // Update the serverApp references BEFORE registering services, + // Update the serverApp reference BEFORE registering services, // since registerManagementServices reads app.engine. app.engine = newEngine - app.services.pipelineHandler = newPipelineHandler // Re-register pre-start management services with the new Application registerManagementServices(logger, app) diff --git a/cmd/server/main_test.go b/cmd/server/main_test.go index 4024a4a7..4f6a86ac 100644 --- a/cmd/server/main_test.go +++ b/cmd/server/main_test.go @@ -191,7 +191,7 @@ func TestBuildEngine_WithConfig(t *testing.T) { Triggers: map[string]any{}, } - engine, loader, registry, _, err := buildEngine(cfg, logger) + engine, loader, registry, err := buildEngine(cfg, logger) if err != nil { t.Fatalf("buildEngine failed: %v", err) } @@ -210,7 +210,7 @@ func TestBuildEngine_EmptyConfig(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) cfg := config.NewEmptyWorkflowConfig() - engine, loader, registry, _, err := buildEngine(cfg, logger) + engine, loader, registry, err := buildEngine(cfg, logger) if err != nil { t.Fatalf("buildEngine with empty config failed: %v", err) } @@ -545,7 +545,7 @@ func TestBuildEngine_InvalidModuleType(t *testing.T) { Triggers: map[string]any{}, } - _, _, _, _, err := buildEngine(cfg, logger) + _, _, _, err := buildEngine(cfg, logger) if err == nil { t.Fatal("expected error for invalid module type") } @@ -792,20 +792,22 @@ func TestImportBundles_MultipleBundles(t *testing.T) { // mockFeatureFlagAdmin implements module.FeatureFlagAdmin for testing. type mockFeatureFlagAdmin struct{} -func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil } -func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil } -func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil } +func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil } +func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil } +func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil } func (m *mockFeatureFlagAdmin) UpdateFlag(key string, data json.RawMessage) (any, error) { return nil, nil } -func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil } +func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil } func (m *mockFeatureFlagAdmin) SetOverrides(key string, data json.RawMessage) (any, error) { return nil, nil } func (m *mockFeatureFlagAdmin) EvaluateFlag(key string, user string, group string) (any, error) { return nil, nil } -func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {}) } +func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {}) +} // TestFeatureFlagAutoWiring verifies that registerPostStartServices wires a // FeatureFlagAdmin from the service registry into the V1 API handler. @@ -821,7 +823,7 @@ func TestFeatureFlagAutoWiring_WiredWhenServicePresent(t *testing.T) { t.Cleanup(func() { store.Close() }) logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) - engine, _, _, _, err := buildEngine(config.NewEmptyWorkflowConfig(), logger) + engine, _, _, err := buildEngine(config.NewEmptyWorkflowConfig(), logger) if err != nil { t.Fatalf("buildEngine: %v", err) } @@ -869,7 +871,7 @@ func TestFeatureFlagAutoWiring_NotWiredWhenServiceAbsent(t *testing.T) { t.Cleanup(func() { store.Close() }) logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) - engine, _, _, _, err := buildEngine(config.NewEmptyWorkflowConfig(), logger) + engine, _, _, err := buildEngine(config.NewEmptyWorkflowConfig(), logger) if err != nil { t.Fatalf("buildEngine: %v", err) } diff --git a/engine.go b/engine.go index b5e77c8e..68af7b40 100644 --- a/engine.go +++ b/engine.go @@ -3,6 +3,7 @@ package workflow import ( "context" "fmt" + "log/slog" "strings" "time" @@ -240,6 +241,24 @@ func (e *StdEngine) LoadPlugin(p plugin.EnginePlugin) error { e.RegisterWorkflowHandler(handler) } } + // Inject step registry and logger into the plugin via optional setter + // interfaces, following the same pattern as SetDynamicRegistry. + type stepRegistrySetter interface { + SetStepRegistry(interfaces.StepRegistryProvider) + } + if setter, ok := p.(stepRegistrySetter); ok { + setter.SetStepRegistry(e.stepRegistry) + } + // Inject *slog.Logger if the engine's logger is backed by one. + // Plugins declare SetLogger(*slog.Logger) to receive a structured logger. + type slogLoggerSetter interface { + SetLogger(logger *slog.Logger) + } + if setter, ok := p.(slogLoggerSetter); ok { + if sl, ok := e.logger.(*slog.Logger); ok { + setter.SetLogger(sl) + } + } e.enginePlugins = append(e.enginePlugins, p) return nil } diff --git a/engine_pipeline_test.go b/engine_pipeline_test.go index 3d05711e..16e457e9 100644 --- a/engine_pipeline_test.go +++ b/engine_pipeline_test.go @@ -21,13 +21,19 @@ func setupPipelineEngine(t *testing.T) (*StdEngine, *handlers.PipelineWorkflowHa // Load pipelinesteps plugin — it registers step factories and the // PipelineWorkflowHandler. We use it directly (no loadAllPlugins) to avoid // registering a second handler that would shadow the plugin's handler. - pp := pluginpipeline.New() - if err := engine.LoadPlugin(pp); err != nil { + if err := engine.LoadPlugin(pluginpipeline.New()); err != nil { t.Fatalf("LoadPlugin(pipelinesteps) failed: %v", err) } - // Retrieve the pipeline handler created by the plugin so tests can inspect it. - pipelineHandler := pp.PipelineHandler() + // Retrieve the pipeline handler registered by the plugin via type assertion + // on the engine's workflow handler list. + var pipelineHandler *handlers.PipelineWorkflowHandler + for _, h := range engine.workflowHandlers { + if ph, ok := h.(*handlers.PipelineWorkflowHandler); ok { + pipelineHandler = ph + break + } + } if pipelineHandler == nil { t.Fatal("pipelinesteps plugin did not create a PipelineWorkflowHandler") } diff --git a/plugins/http/plugin_test.go b/plugins/http/plugin_test.go index 3b5d220b..f5fed213 100644 --- a/plugins/http/plugin_test.go +++ b/plugins/http/plugin_test.go @@ -365,9 +365,9 @@ func TestRateLimitMiddlewareFactory_InvalidValues(t *testing.T) { // Zero requestsPerHour must fall through to requestsPerMinute path (not crash). modZeroRPH := factory("rl-zero-rph", map[string]any{ - "requestsPerHour": 0, - "requestsPerMinute": 30, - "burstSize": 5, + "requestsPerHour": 0, + "requestsPerMinute": 30, + "burstSize": 5, }) if modZeroRPH == nil { t.Fatal("factory returned nil for zero requestsPerHour config") diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index ad8fa803..49b4591e 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -5,18 +5,30 @@ package pipelinesteps import ( + "log/slog" + "github.com/CrisisTextLine/modular" "github.com/GoCodeAlone/workflow/capability" + "github.com/GoCodeAlone/workflow/config" "github.com/GoCodeAlone/workflow/handlers" + "github.com/GoCodeAlone/workflow/interfaces" "github.com/GoCodeAlone/workflow/module" "github.com/GoCodeAlone/workflow/plugin" ) +// PipelineHandlerServiceName is the service name under which the +// PipelineWorkflowHandler is registered in the app's service registry. +// External components can look it up to call SetEventRecorder after startup. +const PipelineHandlerServiceName = "pipeline-workflow-handler" + // Plugin registers generic pipeline step factories and the pipeline workflow handler. type Plugin struct { plugin.BaseEnginePlugin // pipelineHandler is retained so the wiring hook can inject dependencies. pipelineHandler *handlers.PipelineWorkflowHandler + // stepRegistry and logger are injected by the engine via optional setter interfaces. + stepRegistry interfaces.StepRegistryProvider + logger *slog.Logger } // New creates a new pipeline-steps plugin. @@ -97,17 +109,50 @@ func (p *Plugin) WorkflowHandlers() map[string]plugin.WorkflowHandlerFactory { } } -// PipelineHandler returns the plugin's pipeline handler instance, if created. -// This is used by the engine's wiring hook to inject StepRegistry and Logger. -func (p *Plugin) PipelineHandler() *handlers.PipelineWorkflowHandler { - return p.pipelineHandler +// SetStepRegistry is called by the engine (via optional-interface detection in LoadPlugin) +// to inject the step registry after all step factories have been registered. +func (p *Plugin) SetStepRegistry(registry interfaces.StepRegistryProvider) { + p.stepRegistry = registry +} + +// SetLogger is called by the engine (via optional-interface detection in LoadPlugin) +// to inject the application logger. +func (p *Plugin) SetLogger(logger *slog.Logger) { + p.logger = logger +} + +// WiringHooks returns a hook that wires the injected step registry and logger into +// the PipelineWorkflowHandler and registers the handler as a named service so that +// other components (e.g. the server) can look it up without reaching into the plugin. +func (p *Plugin) WiringHooks() []plugin.WiringHook { + return []plugin.WiringHook{ + { + Name: "pipeline-handler-wiring", + Priority: 50, + Hook: func(app modular.Application, _ *config.WorkflowConfig) error { + if p.pipelineHandler == nil { + return nil + } + if p.stepRegistry != nil { + p.pipelineHandler.SetStepRegistry(p.stepRegistry) + } + if p.logger != nil { + p.pipelineHandler.SetLogger(p.logger) + } + // Register the handler as a service so callers can discover it + // (e.g. to wire SetEventRecorder post-start) without a plugin-specific getter. + _ = app.RegisterService(PipelineHandlerServiceName, p.pipelineHandler) + return nil + }, + }, + } } // wrapStepFactory converts a module.StepFactory to a plugin.StepFactory, // threading the modular.Application through so steps like db_exec and // db_query can access the service registry. func wrapStepFactory(f module.StepFactory) plugin.StepFactory { - return func(name string, config map[string]any, app modular.Application) (any, error) { - return f(name, config, app) + return func(name string, cfg map[string]any, app modular.Application) (any, error) { + return f(name, cfg, app) } }