Skip to content

Commit 9fc5d0b

Browse files
intel352claudeCopilot
authored
refactor: use interfaces.PipelineRunner in RoutePipelineSetter (#116)
* refactor: use interfaces.PipelineRunner in RoutePipelineSetter (closes #58) Replace the concrete *module.Pipeline type in the RoutePipelineSetter interface and the CommandHandler/QueryHandler routePipelines map with interfaces.PipelineRunner. This decouples the engine's route-pipeline wiring contract from the concrete Pipeline implementation, allowing test doubles and future plugin-provided pipelines to satisfy the interface without importing the module package. Changes: - engine.go: RoutePipelineSetter.SetRoutePipeline now takes interfaces.PipelineRunner - module/command_handler.go: routePipelines map and SetRoutePipeline use interfaces.PipelineRunner; ServeHTTP type-asserts to *Pipeline for concrete field access (Metadata, RoutePattern, Execute) with a Run()-based fallback for non-*Pipeline implementations - module/query_handler.go: same as command_handler *module.Pipeline already satisfies PipelineRunner, so engine.go callers pass *module.Pipeline unchanged — no call-site changes required there. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: typed-nil guard and _response_handled parity for PipelineRunner fallback path (#121) * Initial plan * fix: typed-nil guard, _response_handled fallback, add route pipeline tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent e502c24 commit 9fc5d0b

7 files changed

Lines changed: 310 additions & 67 deletions

File tree

cmd/server/main_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -792,20 +792,22 @@ func TestImportBundles_MultipleBundles(t *testing.T) {
792792
// mockFeatureFlagAdmin implements module.FeatureFlagAdmin for testing.
793793
type mockFeatureFlagAdmin struct{}
794794

795-
func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil }
796-
func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil }
797-
func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil }
795+
func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil }
796+
func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil }
797+
func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil }
798798
func (m *mockFeatureFlagAdmin) UpdateFlag(key string, data json.RawMessage) (any, error) {
799799
return nil, nil
800800
}
801-
func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil }
801+
func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil }
802802
func (m *mockFeatureFlagAdmin) SetOverrides(key string, data json.RawMessage) (any, error) {
803803
return nil, nil
804804
}
805805
func (m *mockFeatureFlagAdmin) EvaluateFlag(key string, user string, group string) (any, error) {
806806
return nil, nil
807807
}
808-
func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {}) }
808+
func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler {
809+
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {})
810+
}
809811

810812
// TestFeatureFlagAutoWiring verifies that registerPostStartServices wires a
811813
// FeatureFlagAdmin from the service registry into the V1 API handler.

engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ func (e *StdEngine) wrapPipelineTriggerConfig(triggerType, pipelineName string,
691691
// buildPipelineSteps creates PipelineStep instances from step configurations.
692692
// RoutePipelineSetter is implemented by handlers (QueryHandler, CommandHandler) that support per-route pipelines.
693693
type RoutePipelineSetter interface {
694-
SetRoutePipeline(routePath string, pipeline *module.Pipeline)
694+
SetRoutePipeline(routePath string, pipeline interfaces.PipelineRunner)
695695
}
696696

697697
// configureRoutePipelines scans HTTP workflow routes for inline pipeline steps

module/command_handler.go

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010

1111
"github.com/CrisisTextLine/modular"
12+
"github.com/GoCodeAlone/workflow/interfaces"
1213
)
1314

1415
// CommandFunc is a state-changing command function that returns a result or an error.
@@ -25,7 +26,7 @@ type CommandHandler struct {
2526
delegateHandler http.Handler
2627
app modular.Application
2728
commands map[string]CommandFunc
28-
routePipelines map[string]*Pipeline
29+
routePipelines map[string]interfaces.PipelineRunner
2930
executionTracker ExecutionTrackerProvider
3031
mu sync.RWMutex
3132
}
@@ -35,12 +36,12 @@ func NewCommandHandler(name string) *CommandHandler {
3536
return &CommandHandler{
3637
name: name,
3738
commands: make(map[string]CommandFunc),
38-
routePipelines: make(map[string]*Pipeline),
39+
routePipelines: make(map[string]interfaces.PipelineRunner),
3940
}
4041
}
4142

4243
// SetRoutePipeline attaches a pipeline to a specific route path.
43-
func (h *CommandHandler) SetRoutePipeline(routePath string, pipeline *Pipeline) {
44+
func (h *CommandHandler) SetRoutePipeline(routePath string, pipeline interfaces.PipelineRunner) {
4445
h.mu.Lock()
4546
defer h.mu.Unlock()
4647
h.routePipelines[routePath] = pipeline
@@ -165,37 +166,65 @@ func (h *CommandHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
165166
// Restore the body so delegate steps can re-read it
166167
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
167168
}
168-
// Inject HTTP context so delegate steps can forward directly
169-
pipeline.Metadata = map[string]any{
170-
"_http_request": r,
171-
"_http_response_writer": w,
172-
}
173-
if pipeline.RoutePattern != "" {
174-
pipeline.Metadata["_route_pattern"] = pipeline.RoutePattern
175-
}
176-
var pc *PipelineContext
177-
var err error
178-
if h.executionTracker != nil {
179-
pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), pipeline, triggerData, r)
180-
} else {
181-
pc, err = pipeline.Execute(r.Context(), triggerData)
182-
}
183-
if err != nil {
184-
if pc == nil || pc.Metadata["_response_handled"] != true {
169+
// Type-assert to *Pipeline for concrete field access (Metadata, RoutePattern,
170+
// Execute) and execution tracker integration. All engine-registered pipelines
171+
// are *Pipeline; the interface allows custom implementations in tests/plugins.
172+
// concretePipeline != nil: real *Pipeline.
173+
// concretePipeline == nil && isConcrete: typed-nil – fall through to delegate/404.
174+
// !isConcrete: different implementation – use PipelineRunner.Run() fallback.
175+
concretePipeline, isConcrete := pipeline.(*Pipeline)
176+
if isConcrete && concretePipeline != nil {
177+
// Inject HTTP context so delegate steps can forward directly
178+
concretePipeline.Metadata = map[string]any{
179+
"_http_request": r,
180+
"_http_response_writer": w,
181+
}
182+
if concretePipeline.RoutePattern != "" {
183+
concretePipeline.Metadata["_route_pattern"] = concretePipeline.RoutePattern
184+
}
185+
var pc *PipelineContext
186+
var err error
187+
if h.executionTracker != nil {
188+
pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), concretePipeline, triggerData, r)
189+
} else {
190+
pc, err = concretePipeline.Execute(r.Context(), triggerData)
191+
}
192+
if err != nil {
193+
if pc == nil || pc.Metadata["_response_handled"] != true {
194+
w.Header().Set("Content-Type", "application/json")
195+
w.WriteHeader(http.StatusInternalServerError)
196+
_ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
197+
}
198+
return
199+
}
200+
if pc.Metadata["_response_handled"] == true {
201+
return
202+
}
203+
w.Header().Set("Content-Type", "application/json")
204+
if err := json.NewEncoder(w).Encode(pc.Current); err != nil {
205+
http.Error(w, "failed to encode response", http.StatusInternalServerError)
206+
}
207+
return
208+
} else if !isConcrete {
209+
// Fallback for non-*Pipeline implementations: use the PipelineRunner interface.
210+
result, err := pipeline.Run(r.Context(), triggerData)
211+
if err != nil {
185212
w.Header().Set("Content-Type", "application/json")
186213
w.WriteHeader(http.StatusInternalServerError)
187214
_ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
215+
return
216+
}
217+
// Allow the runner to signal that it has already written the response.
218+
if result["_response_handled"] == true {
219+
return
220+
}
221+
w.Header().Set("Content-Type", "application/json")
222+
if err := json.NewEncoder(w).Encode(result); err != nil {
223+
http.Error(w, "failed to encode response", http.StatusInternalServerError)
188224
}
189225
return
190226
}
191-
if pc.Metadata["_response_handled"] == true {
192-
return
193-
}
194-
w.Header().Set("Content-Type", "application/json")
195-
if err := json.NewEncoder(w).Encode(pc.Current); err != nil {
196-
http.Error(w, "failed to encode response", http.StatusInternalServerError)
197-
}
198-
return
227+
// typed-nil *Pipeline: fall through to delegate/404 handling.
199228
}
200229

201230
if h.delegateHandler != nil {

module/command_handler_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,26 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"log/slog"
78
"net/http"
89
"net/http/httptest"
910
"testing"
11+
12+
"github.com/GoCodeAlone/workflow/interfaces"
1013
)
1114

15+
// mockPipelineRunner is a minimal PipelineRunner for handler tests.
16+
type mockPipelineRunner struct {
17+
result map[string]any
18+
err error
19+
}
20+
21+
func (m *mockPipelineRunner) Run(_ context.Context, _ map[string]any) (map[string]any, error) {
22+
return m.result, m.err
23+
}
24+
func (m *mockPipelineRunner) SetLogger(_ *slog.Logger) {}
25+
func (m *mockPipelineRunner) SetEventRecorder(_ interfaces.EventRecorder) {}
26+
1227
func TestCommandHandler_Name(t *testing.T) {
1328
h := NewCommandHandler("test-commands")
1429
if h.Name() != "test-commands" {
@@ -196,3 +211,87 @@ func TestCommandHandler_Handle(t *testing.T) {
196211
t.Errorf("expected 200, got %d", rr.Code)
197212
}
198213
}
214+
215+
// TestCommandHandler_RoutePipeline_MockRunner verifies that a non-*Pipeline
216+
// PipelineRunner is invoked via Run() and its result is JSON-encoded.
217+
func TestCommandHandler_RoutePipeline_MockRunner(t *testing.T) {
218+
h := NewCommandHandler("test")
219+
mock := &mockPipelineRunner{result: map[string]any{"status": "processed"}}
220+
h.routePipelines["process"] = mock
221+
222+
req := httptest.NewRequest("POST", "/api/v1/engine/process", nil)
223+
rr := httptest.NewRecorder()
224+
h.ServeHTTP(rr, req)
225+
226+
if rr.Code != http.StatusOK {
227+
t.Errorf("expected 200, got %d", rr.Code)
228+
}
229+
var got map[string]any
230+
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
231+
t.Fatalf("failed to decode response: %v", err)
232+
}
233+
if got["status"] != "processed" {
234+
t.Errorf("expected status=processed, got %v", got)
235+
}
236+
}
237+
238+
// TestCommandHandler_RoutePipeline_MockRunner_ResponseHandled verifies that
239+
// when the PipelineRunner.Run result contains _response_handled=true the
240+
// handler does not write an additional JSON body.
241+
func TestCommandHandler_RoutePipeline_MockRunner_ResponseHandled(t *testing.T) {
242+
h := NewCommandHandler("test")
243+
mock := &mockPipelineRunner{result: map[string]any{"_response_handled": true}}
244+
h.routePipelines["process"] = mock
245+
246+
req := httptest.NewRequest("POST", "/api/v1/engine/process", nil)
247+
rr := httptest.NewRecorder()
248+
h.ServeHTTP(rr, req)
249+
250+
if rr.Body.Len() != 0 {
251+
t.Errorf("expected empty body when _response_handled=true, got %q", rr.Body.String())
252+
}
253+
}
254+
255+
// TestCommandHandler_RoutePipeline_MockRunner_Error verifies that a Run() error
256+
// returns a 500 with the error message in the JSON body.
257+
func TestCommandHandler_RoutePipeline_MockRunner_Error(t *testing.T) {
258+
h := NewCommandHandler("test")
259+
mock := &mockPipelineRunner{err: errors.New("runner failed")}
260+
h.routePipelines["process"] = mock
261+
262+
req := httptest.NewRequest("POST", "/api/v1/engine/process", nil)
263+
rr := httptest.NewRecorder()
264+
h.ServeHTTP(rr, req)
265+
266+
if rr.Code != http.StatusInternalServerError {
267+
t.Errorf("expected 500, got %d", rr.Code)
268+
}
269+
var got map[string]string
270+
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
271+
t.Fatalf("failed to decode error response: %v", err)
272+
}
273+
if got["error"] != "runner failed" {
274+
t.Errorf("expected error=runner failed, got %v", got)
275+
}
276+
}
277+
278+
// TestCommandHandler_RoutePipeline_TypedNil verifies that a typed-nil *Pipeline
279+
// stored as a PipelineRunner does not panic and falls through to 404.
280+
func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) {
281+
h := NewCommandHandler("test")
282+
// Store a typed-nil *Pipeline as an interfaces.PipelineRunner.
283+
// pipeline != nil is true (interface has type info), concretePipeline == nil.
284+
var p *Pipeline
285+
h.routePipelines["process"] = p
286+
287+
req := httptest.NewRequest("POST", "/api/v1/engine/process", nil)
288+
rr := httptest.NewRecorder()
289+
290+
// Must not panic.
291+
h.ServeHTTP(rr, req)
292+
293+
if rr.Code != http.StatusNotFound {
294+
t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code)
295+
}
296+
}
297+

0 commit comments

Comments
 (0)