Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/GoCodeAlone/workflow/capability"
"github.com/GoCodeAlone/workflow/config"
"github.com/GoCodeAlone/workflow/dynamic"
"github.com/GoCodeAlone/workflow/interfaces"
"github.com/GoCodeAlone/workflow/module"
"github.com/GoCodeAlone/workflow/plugin"
"github.com/GoCodeAlone/workflow/schema"
Expand All @@ -32,7 +33,7 @@ type WorkflowHandler interface {
// PipelineAdder is implemented by workflow handlers that can receive named pipelines.
// This allows the engine to add pipelines without importing the handlers package.
type PipelineAdder interface {
AddPipeline(name string, p *module.Pipeline)
AddPipeline(name string, p interfaces.PipelineRunner)
}

// ModuleFactory is a function that creates a module from a name and configuration
Expand Down
53 changes: 30 additions & 23 deletions handlers/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,59 @@ import (
"strings"

"github.com/CrisisTextLine/modular"
"github.com/GoCodeAlone/workflow/module"
"github.com/GoCodeAlone/workflow/interfaces"
)

// PipelineWorkflowHandler manages and executes pipeline-based workflows.
type PipelineWorkflowHandler struct {
pipelines map[string]*module.Pipeline
stepRegistry *module.StepRegistry
pipelines map[string]interfaces.PipelineRunner
stepRegistry interfaces.StepRegistryProvider
logger *slog.Logger
eventRecorder module.EventRecorder
eventRecorder interfaces.EventRecorder
}

// NewPipelineWorkflowHandler creates a new PipelineWorkflowHandler.
func NewPipelineWorkflowHandler() *PipelineWorkflowHandler {
return &PipelineWorkflowHandler{
pipelines: make(map[string]*module.Pipeline),
pipelines: make(map[string]interfaces.PipelineRunner),
}
}

// SetStepRegistry sets the step registry used to create pipeline steps.
func (h *PipelineWorkflowHandler) SetStepRegistry(registry *module.StepRegistry) {
func (h *PipelineWorkflowHandler) SetStepRegistry(registry interfaces.StepRegistryProvider) {
h.stepRegistry = registry
}

// SetLogger sets the logger for pipeline execution.
// SetLogger sets the logger for pipeline execution and propagates it to
// all already-registered pipelines. Pipelines added after this call will
// also have the logger injected in AddPipeline.
func (h *PipelineWorkflowHandler) SetLogger(logger *slog.Logger) {
h.logger = logger
for _, p := range h.pipelines {
p.SetLogger(logger)
}
}

// SetEventRecorder sets the event recorder for pipeline execution events.
// When set, each pipeline execution will record events to this recorder.
func (h *PipelineWorkflowHandler) SetEventRecorder(recorder module.EventRecorder) {
// SetEventRecorder sets the event recorder for pipeline execution events and
// propagates it to all already-registered pipelines. Pipelines added after
// this call will also have the recorder injected in AddPipeline.
func (h *PipelineWorkflowHandler) SetEventRecorder(recorder interfaces.EventRecorder) {
h.eventRecorder = recorder
for _, p := range h.pipelines {
p.SetEventRecorder(recorder)
}
}

// AddPipeline registers a named pipeline with the handler.
func (h *PipelineWorkflowHandler) AddPipeline(name string, p *module.Pipeline) {
// If a logger or event recorder has already been set on the handler,
// they are injected into the pipeline immediately at configuration time.
func (h *PipelineWorkflowHandler) AddPipeline(name string, p interfaces.PipelineRunner) {
if h.logger != nil {
p.SetLogger(h.logger)
}
if h.eventRecorder != nil {
p.SetEventRecorder(h.eventRecorder)
}
h.pipelines[name] = p
}

Expand Down Expand Up @@ -87,20 +104,10 @@ func (h *PipelineWorkflowHandler) ExecuteWorkflow(ctx context.Context, workflowT
return nil, fmt.Errorf("pipeline %q not found", name)
}

// Set logger on pipeline if available
if h.logger != nil && pipeline.Logger == nil {
pipeline.Logger = h.logger
}

// Set event recorder on pipeline if available
if h.eventRecorder != nil && pipeline.EventRecorder == nil {
pipeline.EventRecorder = h.eventRecorder
}

pc, err := pipeline.Execute(ctx, data)
result, err := pipeline.Run(ctx, data)
if err != nil {
return nil, fmt.Errorf("pipeline %q execution failed: %w", name, err)
}

return pc.Current, nil
return result, nil
}
86 changes: 86 additions & 0 deletions handlers/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,36 @@ package handlers

import (
"context"
"log/slog"
"strings"
"testing"

"github.com/GoCodeAlone/workflow/interfaces"
"github.com/GoCodeAlone/workflow/module"
)

// mockPipelineRunner is a minimal PipelineRunner mock used to verify that
// PipelineWorkflowHandler tests do not require concrete module types.
type mockPipelineRunner struct {
runResult map[string]any
runErr error
loggerSet bool
recSet bool
}

func (m *mockPipelineRunner) Run(_ context.Context, _ map[string]any) (map[string]any, error) {
return m.runResult, m.runErr
}
func (m *mockPipelineRunner) SetLogger(_ *slog.Logger) { m.loggerSet = true }
func (m *mockPipelineRunner) SetEventRecorder(_ interfaces.EventRecorder) { m.recSet = true }

// mockEventRecorder is a no-op EventRecorder for tests.
type mockEventRecorder struct{}

func (mockEventRecorder) RecordEvent(_ context.Context, _, _ string, _ map[string]any) error {
return nil
}

func TestPipelineHandler_CanHandle_PrefixFormat(t *testing.T) {
h := NewPipelineWorkflowHandler()

Expand Down Expand Up @@ -233,3 +257,65 @@ func TestPipelineHandler_MultiplePipelines(t *testing.T) {
t.Error("expected CanHandle true for pipeline-b")
}
}

// TestPipelineHandler_MockRunner verifies that PipelineWorkflowHandler works
// with a mock PipelineRunner without depending on concrete module types.
func TestPipelineHandler_MockRunner(t *testing.T) {
h := NewPipelineWorkflowHandler()
h.SetLogger(slog.Default())

mock := &mockPipelineRunner{
runResult: map[string]any{"mocked": true},
}
h.AddPipeline("mock-pipeline", mock)

if !h.CanHandle("pipeline:mock-pipeline") {
t.Fatal("expected CanHandle true for mock-pipeline")
}

result, err := h.ExecuteWorkflow(context.Background(), "pipeline:mock-pipeline", "", map[string]any{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result["mocked"] != true {
t.Errorf("expected mocked=true, got %v", result["mocked"])
}
if !mock.loggerSet {
t.Error("expected SetLogger to be called on mock runner at configuration time")
}
}

// TestPipelineHandler_InjectionAtConfigTime verifies that logger and recorder
// are injected into pipelines at configuration time, not on each execution.
func TestPipelineHandler_InjectionAtConfigTime(t *testing.T) {
h := NewPipelineWorkflowHandler()

// Add pipeline before logger/recorder are set.
m1 := &mockPipelineRunner{runResult: map[string]any{}}
h.AddPipeline("p1", m1)
if m1.loggerSet || m1.recSet {
t.Error("expected no injection before logger/recorder are set")
}

// SetLogger should propagate to the already-registered pipeline.
h.SetLogger(slog.Default())
if !m1.loggerSet {
t.Error("expected SetLogger to propagate to existing pipeline p1")
}

// SetEventRecorder should propagate to the already-registered pipeline.
h.SetEventRecorder(mockEventRecorder{})
if !m1.recSet {
t.Error("expected SetEventRecorder to propagate to existing pipeline p1")
}

// Pipeline added after both are set should receive them in AddPipeline.
m2 := &mockPipelineRunner{runResult: map[string]any{}}
h.AddPipeline("p2", m2)
if !m2.loggerSet {
t.Error("expected logger injected into p2 via AddPipeline")
}
if !m2.recSet {
t.Error("expected recorder injected into p2 via AddPipeline")
}
}
42 changes: 42 additions & 0 deletions interfaces/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Package interfaces defines shared interface types used across the workflow
// engine, handlers, and module packages. Placing these interfaces here breaks
// the direct handler→module concrete-type dependency and enables each package
// to be tested in isolation with mocks.
package interfaces

import (
"context"
"log/slog"
)

// EventRecorder records pipeline execution events for observability.
// *store.EventRecorderAdapter and any compatible recorder satisfy this interface.
type EventRecorder interface {
RecordEvent(ctx context.Context, executionID string, eventType string, data map[string]any) error
}

// PipelineRunner is the interface satisfied by *module.Pipeline.
// It allows workflow handlers to execute pipelines without importing
// the concrete module types, enabling handler unit tests with mocks.
type PipelineRunner interface {
// Run executes the pipeline with the given trigger data and returns the
// merged result map (equivalent to PipelineContext.Current).
Run(ctx context.Context, data map[string]any) (map[string]any, error)

// SetLogger sets the logger used for pipeline execution.
// Implementations should be idempotent: if a logger is already set,
// a subsequent call should be a no-op.
SetLogger(logger *slog.Logger)

// SetEventRecorder sets the recorder used for pipeline execution events.
// Implementations should be idempotent: if a recorder is already set,
// a subsequent call should be a no-op.
SetEventRecorder(recorder EventRecorder)
}

// StepRegistryProvider exposes the step types registered in a step registry.
// *module.StepRegistry satisfies this interface.
type StepRegistryProvider interface {
// Types returns all registered step type names.
Types() []string
}
36 changes: 33 additions & 3 deletions module/pipeline_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"log/slog"
"time"

"github.com/GoCodeAlone/workflow/interfaces"
)

// ErrorStrategy defines how a pipeline handles step errors.
Expand All @@ -19,9 +21,9 @@ const (
// EventRecorder is an optional interface for recording execution events.
// When set on Pipeline, execution events are appended for observability.
// The store.EventStore can satisfy this via an adapter at the wiring layer.
type EventRecorder interface {
RecordEvent(ctx context.Context, executionID string, eventType string, data map[string]any) error
}
// This is a type alias for interfaces.EventRecorder so callers using
// module.EventRecorder or interfaces.EventRecorder interchangeably are unaffected.
type EventRecorder = interfaces.EventRecorder

// Pipeline is an ordered sequence of steps with error handling.
type Pipeline struct {
Expand Down Expand Up @@ -295,3 +297,31 @@ func (p *Pipeline) runCompensation(ctx context.Context, pc *PipelineContext, log

return firstErr
}

// SetLogger sets the logger for pipeline execution if one is not already set.
// This implements part of interfaces.PipelineRunner and allows the handler
// to inject a logger without directly accessing the Logger field.
func (p *Pipeline) SetLogger(logger *slog.Logger) {
if p.Logger == nil {
p.Logger = logger
}
}

// SetEventRecorder sets the event recorder for pipeline execution if one is
// not already set. This implements part of interfaces.PipelineRunner.
func (p *Pipeline) SetEventRecorder(recorder interfaces.EventRecorder) {
if p.EventRecorder == nil {
p.EventRecorder = recorder
}
}

// Run executes the pipeline and returns the merged result data map.
// It implements interfaces.PipelineRunner by wrapping Execute and
// returning PipelineContext.Current so callers need not import PipelineContext.
func (p *Pipeline) Run(ctx context.Context, data map[string]any) (map[string]any, error) {
pc, err := p.Execute(ctx, data)
if err != nil {
return nil, err
}
return pc.Current, nil
}
Loading