Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions cmd/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,20 +792,22 @@ func TestImportBundles_MultipleBundles(t *testing.T) {
// mockFeatureFlagAdmin implements module.FeatureFlagAdmin for testing.
type mockFeatureFlagAdmin struct{}

func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil }
func (m *mockFeatureFlagAdmin) UpdateFlag(key string, data json.RawMessage) (any, error) {
return nil, nil
}
func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil }
func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil }
func (m *mockFeatureFlagAdmin) SetOverrides(key string, data json.RawMessage) (any, error) {
return nil, nil
}
func (m *mockFeatureFlagAdmin) EvaluateFlag(key string, user string, group string) (any, error) {
return nil, nil
}
func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {}) }
func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {})
}

// TestFeatureFlagAutoWiring verifies that registerPostStartServices wires a
// FeatureFlagAdmin from the service registry into the V1 API handler.
Expand Down
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (e *StdEngine) wrapPipelineTriggerConfig(triggerType, pipelineName string,
// buildPipelineSteps creates PipelineStep instances from step configurations.
// RoutePipelineSetter is implemented by handlers (QueryHandler, CommandHandler) that support per-route pipelines.
type RoutePipelineSetter interface {
SetRoutePipeline(routePath string, pipeline *module.Pipeline)
SetRoutePipeline(routePath string, pipeline interfaces.PipelineRunner)
}

// configureRoutePipelines scans HTTP workflow routes for inline pipeline steps
Expand Down
85 changes: 57 additions & 28 deletions module/command_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"

"github.com/CrisisTextLine/modular"
"github.com/GoCodeAlone/workflow/interfaces"
)

// CommandFunc is a state-changing command function that returns a result or an error.
Expand All @@ -25,7 +26,7 @@ type CommandHandler struct {
delegateHandler http.Handler
app modular.Application
commands map[string]CommandFunc
routePipelines map[string]*Pipeline
routePipelines map[string]interfaces.PipelineRunner
executionTracker ExecutionTrackerProvider
mu sync.RWMutex
}
Expand All @@ -35,12 +36,12 @@ func NewCommandHandler(name string) *CommandHandler {
return &CommandHandler{
name: name,
commands: make(map[string]CommandFunc),
routePipelines: make(map[string]*Pipeline),
routePipelines: make(map[string]interfaces.PipelineRunner),
}
}

// SetRoutePipeline attaches a pipeline to a specific route path.
func (h *CommandHandler) SetRoutePipeline(routePath string, pipeline *Pipeline) {
func (h *CommandHandler) SetRoutePipeline(routePath string, pipeline interfaces.PipelineRunner) {
h.mu.Lock()
defer h.mu.Unlock()
h.routePipelines[routePath] = pipeline
Expand Down Expand Up @@ -165,37 +166,65 @@ func (h *CommandHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Restore the body so delegate steps can re-read it
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
// Inject HTTP context so delegate steps can forward directly
pipeline.Metadata = map[string]any{
"_http_request": r,
"_http_response_writer": w,
}
if pipeline.RoutePattern != "" {
pipeline.Metadata["_route_pattern"] = pipeline.RoutePattern
}
var pc *PipelineContext
var err error
if h.executionTracker != nil {
pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), pipeline, triggerData, r)
} else {
pc, err = pipeline.Execute(r.Context(), triggerData)
}
if err != nil {
if pc == nil || pc.Metadata["_response_handled"] != true {
// Type-assert to *Pipeline for concrete field access (Metadata, RoutePattern,
// Execute) and execution tracker integration. All engine-registered pipelines
// are *Pipeline; the interface allows custom implementations in tests/plugins.
// concretePipeline != nil: real *Pipeline.
// concretePipeline == nil && isConcrete: typed-nil – fall through to delegate/404.
// !isConcrete: different implementation – use PipelineRunner.Run() fallback.
concretePipeline, isConcrete := pipeline.(*Pipeline)
if isConcrete && concretePipeline != nil {
// Inject HTTP context so delegate steps can forward directly
concretePipeline.Metadata = map[string]any{
"_http_request": r,
"_http_response_writer": w,
}
if concretePipeline.RoutePattern != "" {
concretePipeline.Metadata["_route_pattern"] = concretePipeline.RoutePattern
}
var pc *PipelineContext
var err error
if h.executionTracker != nil {
pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), concretePipeline, triggerData, r)
} else {
pc, err = concretePipeline.Execute(r.Context(), triggerData)
}
if err != nil {
if pc == nil || pc.Metadata["_response_handled"] != true {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
}
return
}
if pc.Metadata["_response_handled"] == true {
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(pc.Current); err != nil {
http.Error(w, "failed to encode response", http.StatusInternalServerError)
}
return
} else if !isConcrete {
// Fallback for non-*Pipeline implementations: use the PipelineRunner interface.
result, err := pipeline.Run(r.Context(), triggerData)
if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
// Allow the runner to signal that it has already written the response.
if result["_response_handled"] == true {
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(result); err != nil {
http.Error(w, "failed to encode response", http.StatusInternalServerError)
}
return
}
if pc.Metadata["_response_handled"] == true {
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(pc.Current); err != nil {
http.Error(w, "failed to encode response", http.StatusInternalServerError)
}
return
// typed-nil *Pipeline: fall through to delegate/404 handling.
}

if h.delegateHandler != nil {
Expand Down
99 changes: 99 additions & 0 deletions module/command_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,26 @@ import (
"context"
"encoding/json"
"errors"
"log/slog"
"net/http"
"net/http/httptest"
"testing"

"github.com/GoCodeAlone/workflow/interfaces"
)

// mockPipelineRunner is a minimal PipelineRunner for handler tests.
type mockPipelineRunner struct {
result map[string]any
err error
}

func (m *mockPipelineRunner) Run(_ context.Context, _ map[string]any) (map[string]any, error) {
return m.result, m.err
}
func (m *mockPipelineRunner) SetLogger(_ *slog.Logger) {}
func (m *mockPipelineRunner) SetEventRecorder(_ interfaces.EventRecorder) {}

func TestCommandHandler_Name(t *testing.T) {
h := NewCommandHandler("test-commands")
if h.Name() != "test-commands" {
Expand Down Expand Up @@ -196,3 +211,87 @@ func TestCommandHandler_Handle(t *testing.T) {
t.Errorf("expected 200, got %d", rr.Code)
}
}

// TestCommandHandler_RoutePipeline_MockRunner verifies that a non-*Pipeline
// PipelineRunner is invoked via Run() and its result is JSON-encoded.
func TestCommandHandler_RoutePipeline_MockRunner(t *testing.T) {
h := NewCommandHandler("test")
mock := &mockPipelineRunner{result: map[string]any{"status": "processed"}}
h.routePipelines["process"] = mock

req := httptest.NewRequest("POST", "/api/v1/engine/process", nil)
rr := httptest.NewRecorder()
h.ServeHTTP(rr, req)

if rr.Code != http.StatusOK {
t.Errorf("expected 200, got %d", rr.Code)
}
var got map[string]any
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if got["status"] != "processed" {
t.Errorf("expected status=processed, got %v", got)
}
}

// TestCommandHandler_RoutePipeline_MockRunner_ResponseHandled verifies that
// when the PipelineRunner.Run result contains _response_handled=true the
// handler does not write an additional JSON body.
func TestCommandHandler_RoutePipeline_MockRunner_ResponseHandled(t *testing.T) {
h := NewCommandHandler("test")
mock := &mockPipelineRunner{result: map[string]any{"_response_handled": true}}
h.routePipelines["process"] = mock

req := httptest.NewRequest("POST", "/api/v1/engine/process", nil)
rr := httptest.NewRecorder()
h.ServeHTTP(rr, req)

if rr.Body.Len() != 0 {
t.Errorf("expected empty body when _response_handled=true, got %q", rr.Body.String())
}
}

// TestCommandHandler_RoutePipeline_MockRunner_Error verifies that a Run() error
// returns a 500 with the error message in the JSON body.
func TestCommandHandler_RoutePipeline_MockRunner_Error(t *testing.T) {
h := NewCommandHandler("test")
mock := &mockPipelineRunner{err: errors.New("runner failed")}
h.routePipelines["process"] = mock

req := httptest.NewRequest("POST", "/api/v1/engine/process", nil)
rr := httptest.NewRecorder()
h.ServeHTTP(rr, req)

if rr.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d", rr.Code)
}
var got map[string]string
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
t.Fatalf("failed to decode error response: %v", err)
}
if got["error"] != "runner failed" {
t.Errorf("expected error=runner failed, got %v", got)
}
}

// TestCommandHandler_RoutePipeline_TypedNil verifies that a typed-nil *Pipeline
// stored as a PipelineRunner does not panic and falls through to 404.
func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) {
h := NewCommandHandler("test")
// Store a typed-nil *Pipeline as an interfaces.PipelineRunner.
// pipeline != nil is true (interface has type info), concretePipeline == nil.
var p *Pipeline
h.routePipelines["process"] = p

req := httptest.NewRequest("POST", "/api/v1/engine/process", nil)
rr := httptest.NewRecorder()

// Must not panic.
h.ServeHTTP(rr, req)

if rr.Code != http.StatusNotFound {
t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code)
}
}

Loading
Loading