diff --git a/engine.go b/engine.go index 0de86a70..b5e77c8e 100644 --- a/engine.go +++ b/engine.go @@ -10,6 +10,7 @@ import ( "github.com/GoCodeAlone/workflow/capability" "github.com/GoCodeAlone/workflow/config" "github.com/GoCodeAlone/workflow/dynamic" + "github.com/GoCodeAlone/workflow/interfaces" "github.com/GoCodeAlone/workflow/module" "github.com/GoCodeAlone/workflow/plugin" "github.com/GoCodeAlone/workflow/schema" @@ -32,7 +33,7 @@ type WorkflowHandler interface { // PipelineAdder is implemented by workflow handlers that can receive named pipelines. // This allows the engine to add pipelines without importing the handlers package. type PipelineAdder interface { - AddPipeline(name string, p *module.Pipeline) + AddPipeline(name string, p interfaces.PipelineRunner) } // ModuleFactory is a function that creates a module from a name and configuration diff --git a/handlers/pipeline.go b/handlers/pipeline.go index 1309cff4..8cefd252 100644 --- a/handlers/pipeline.go +++ b/handlers/pipeline.go @@ -7,42 +7,59 @@ import ( "strings" "github.com/CrisisTextLine/modular" - "github.com/GoCodeAlone/workflow/module" + "github.com/GoCodeAlone/workflow/interfaces" ) // PipelineWorkflowHandler manages and executes pipeline-based workflows. type PipelineWorkflowHandler struct { - pipelines map[string]*module.Pipeline - stepRegistry *module.StepRegistry + pipelines map[string]interfaces.PipelineRunner + stepRegistry interfaces.StepRegistryProvider logger *slog.Logger - eventRecorder module.EventRecorder + eventRecorder interfaces.EventRecorder } // NewPipelineWorkflowHandler creates a new PipelineWorkflowHandler. func NewPipelineWorkflowHandler() *PipelineWorkflowHandler { return &PipelineWorkflowHandler{ - pipelines: make(map[string]*module.Pipeline), + pipelines: make(map[string]interfaces.PipelineRunner), } } // SetStepRegistry sets the step registry used to create pipeline steps. -func (h *PipelineWorkflowHandler) SetStepRegistry(registry *module.StepRegistry) { +func (h *PipelineWorkflowHandler) SetStepRegistry(registry interfaces.StepRegistryProvider) { h.stepRegistry = registry } -// SetLogger sets the logger for pipeline execution. +// SetLogger sets the logger for pipeline execution and propagates it to +// all already-registered pipelines. Pipelines added after this call will +// also have the logger injected in AddPipeline. func (h *PipelineWorkflowHandler) SetLogger(logger *slog.Logger) { h.logger = logger + for _, p := range h.pipelines { + p.SetLogger(logger) + } } -// SetEventRecorder sets the event recorder for pipeline execution events. -// When set, each pipeline execution will record events to this recorder. -func (h *PipelineWorkflowHandler) SetEventRecorder(recorder module.EventRecorder) { +// SetEventRecorder sets the event recorder for pipeline execution events and +// propagates it to all already-registered pipelines. Pipelines added after +// this call will also have the recorder injected in AddPipeline. +func (h *PipelineWorkflowHandler) SetEventRecorder(recorder interfaces.EventRecorder) { h.eventRecorder = recorder + for _, p := range h.pipelines { + p.SetEventRecorder(recorder) + } } // AddPipeline registers a named pipeline with the handler. -func (h *PipelineWorkflowHandler) AddPipeline(name string, p *module.Pipeline) { +// If a logger or event recorder has already been set on the handler, +// they are injected into the pipeline immediately at configuration time. +func (h *PipelineWorkflowHandler) AddPipeline(name string, p interfaces.PipelineRunner) { + if h.logger != nil { + p.SetLogger(h.logger) + } + if h.eventRecorder != nil { + p.SetEventRecorder(h.eventRecorder) + } h.pipelines[name] = p } @@ -87,20 +104,10 @@ func (h *PipelineWorkflowHandler) ExecuteWorkflow(ctx context.Context, workflowT return nil, fmt.Errorf("pipeline %q not found", name) } - // Set logger on pipeline if available - if h.logger != nil && pipeline.Logger == nil { - pipeline.Logger = h.logger - } - - // Set event recorder on pipeline if available - if h.eventRecorder != nil && pipeline.EventRecorder == nil { - pipeline.EventRecorder = h.eventRecorder - } - - pc, err := pipeline.Execute(ctx, data) + result, err := pipeline.Run(ctx, data) if err != nil { return nil, fmt.Errorf("pipeline %q execution failed: %w", name, err) } - return pc.Current, nil + return result, nil } diff --git a/handlers/pipeline_test.go b/handlers/pipeline_test.go index d12b1bd5..a7f7bf9c 100644 --- a/handlers/pipeline_test.go +++ b/handlers/pipeline_test.go @@ -2,12 +2,36 @@ package handlers import ( "context" + "log/slog" "strings" "testing" + "github.com/GoCodeAlone/workflow/interfaces" "github.com/GoCodeAlone/workflow/module" ) +// mockPipelineRunner is a minimal PipelineRunner mock used to verify that +// PipelineWorkflowHandler tests do not require concrete module types. +type mockPipelineRunner struct { + runResult map[string]any + runErr error + loggerSet bool + recSet bool +} + +func (m *mockPipelineRunner) Run(_ context.Context, _ map[string]any) (map[string]any, error) { + return m.runResult, m.runErr +} +func (m *mockPipelineRunner) SetLogger(_ *slog.Logger) { m.loggerSet = true } +func (m *mockPipelineRunner) SetEventRecorder(_ interfaces.EventRecorder) { m.recSet = true } + +// mockEventRecorder is a no-op EventRecorder for tests. +type mockEventRecorder struct{} + +func (mockEventRecorder) RecordEvent(_ context.Context, _, _ string, _ map[string]any) error { + return nil +} + func TestPipelineHandler_CanHandle_PrefixFormat(t *testing.T) { h := NewPipelineWorkflowHandler() @@ -233,3 +257,65 @@ func TestPipelineHandler_MultiplePipelines(t *testing.T) { t.Error("expected CanHandle true for pipeline-b") } } + +// TestPipelineHandler_MockRunner verifies that PipelineWorkflowHandler works +// with a mock PipelineRunner without depending on concrete module types. +func TestPipelineHandler_MockRunner(t *testing.T) { + h := NewPipelineWorkflowHandler() + h.SetLogger(slog.Default()) + + mock := &mockPipelineRunner{ + runResult: map[string]any{"mocked": true}, + } + h.AddPipeline("mock-pipeline", mock) + + if !h.CanHandle("pipeline:mock-pipeline") { + t.Fatal("expected CanHandle true for mock-pipeline") + } + + result, err := h.ExecuteWorkflow(context.Background(), "pipeline:mock-pipeline", "", map[string]any{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result["mocked"] != true { + t.Errorf("expected mocked=true, got %v", result["mocked"]) + } + if !mock.loggerSet { + t.Error("expected SetLogger to be called on mock runner at configuration time") + } +} + +// TestPipelineHandler_InjectionAtConfigTime verifies that logger and recorder +// are injected into pipelines at configuration time, not on each execution. +func TestPipelineHandler_InjectionAtConfigTime(t *testing.T) { + h := NewPipelineWorkflowHandler() + + // Add pipeline before logger/recorder are set. + m1 := &mockPipelineRunner{runResult: map[string]any{}} + h.AddPipeline("p1", m1) + if m1.loggerSet || m1.recSet { + t.Error("expected no injection before logger/recorder are set") + } + + // SetLogger should propagate to the already-registered pipeline. + h.SetLogger(slog.Default()) + if !m1.loggerSet { + t.Error("expected SetLogger to propagate to existing pipeline p1") + } + + // SetEventRecorder should propagate to the already-registered pipeline. + h.SetEventRecorder(mockEventRecorder{}) + if !m1.recSet { + t.Error("expected SetEventRecorder to propagate to existing pipeline p1") + } + + // Pipeline added after both are set should receive them in AddPipeline. + m2 := &mockPipelineRunner{runResult: map[string]any{}} + h.AddPipeline("p2", m2) + if !m2.loggerSet { + t.Error("expected logger injected into p2 via AddPipeline") + } + if !m2.recSet { + t.Error("expected recorder injected into p2 via AddPipeline") + } +} diff --git a/interfaces/pipeline.go b/interfaces/pipeline.go new file mode 100644 index 00000000..6a236019 --- /dev/null +++ b/interfaces/pipeline.go @@ -0,0 +1,42 @@ +// Package interfaces defines shared interface types used across the workflow +// engine, handlers, and module packages. Placing these interfaces here breaks +// the direct handler→module concrete-type dependency and enables each package +// to be tested in isolation with mocks. +package interfaces + +import ( + "context" + "log/slog" +) + +// EventRecorder records pipeline execution events for observability. +// *store.EventRecorderAdapter and any compatible recorder satisfy this interface. +type EventRecorder interface { + RecordEvent(ctx context.Context, executionID string, eventType string, data map[string]any) error +} + +// PipelineRunner is the interface satisfied by *module.Pipeline. +// It allows workflow handlers to execute pipelines without importing +// the concrete module types, enabling handler unit tests with mocks. +type PipelineRunner interface { + // Run executes the pipeline with the given trigger data and returns the + // merged result map (equivalent to PipelineContext.Current). + Run(ctx context.Context, data map[string]any) (map[string]any, error) + + // SetLogger sets the logger used for pipeline execution. + // Implementations should be idempotent: if a logger is already set, + // a subsequent call should be a no-op. + SetLogger(logger *slog.Logger) + + // SetEventRecorder sets the recorder used for pipeline execution events. + // Implementations should be idempotent: if a recorder is already set, + // a subsequent call should be a no-op. + SetEventRecorder(recorder EventRecorder) +} + +// StepRegistryProvider exposes the step types registered in a step registry. +// *module.StepRegistry satisfies this interface. +type StepRegistryProvider interface { + // Types returns all registered step type names. + Types() []string +} diff --git a/module/pipeline_executor.go b/module/pipeline_executor.go index d1256c65..dd8f4c01 100644 --- a/module/pipeline_executor.go +++ b/module/pipeline_executor.go @@ -5,6 +5,8 @@ import ( "fmt" "log/slog" "time" + + "github.com/GoCodeAlone/workflow/interfaces" ) // ErrorStrategy defines how a pipeline handles step errors. @@ -19,9 +21,9 @@ const ( // EventRecorder is an optional interface for recording execution events. // When set on Pipeline, execution events are appended for observability. // The store.EventStore can satisfy this via an adapter at the wiring layer. -type EventRecorder interface { - RecordEvent(ctx context.Context, executionID string, eventType string, data map[string]any) error -} +// This is a type alias for interfaces.EventRecorder so callers using +// module.EventRecorder or interfaces.EventRecorder interchangeably are unaffected. +type EventRecorder = interfaces.EventRecorder // Pipeline is an ordered sequence of steps with error handling. type Pipeline struct { @@ -295,3 +297,31 @@ func (p *Pipeline) runCompensation(ctx context.Context, pc *PipelineContext, log return firstErr } + +// SetLogger sets the logger for pipeline execution if one is not already set. +// This implements part of interfaces.PipelineRunner and allows the handler +// to inject a logger without directly accessing the Logger field. +func (p *Pipeline) SetLogger(logger *slog.Logger) { + if p.Logger == nil { + p.Logger = logger + } +} + +// SetEventRecorder sets the event recorder for pipeline execution if one is +// not already set. This implements part of interfaces.PipelineRunner. +func (p *Pipeline) SetEventRecorder(recorder interfaces.EventRecorder) { + if p.EventRecorder == nil { + p.EventRecorder = recorder + } +} + +// Run executes the pipeline and returns the merged result data map. +// It implements interfaces.PipelineRunner by wrapping Execute and +// returning PipelineContext.Current so callers need not import PipelineContext. +func (p *Pipeline) Run(ctx context.Context, data map[string]any) (map[string]any, error) { + pc, err := p.Execute(ctx, data) + if err != nil { + return nil, err + } + return pc.Current, nil +}