From 17a1c9b827b66f08ba4ccf9b03a26815db365860 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 23 Feb 2026 03:15:16 -0500 Subject: [PATCH 1/2] refactor: use interfaces.PipelineRunner in RoutePipelineSetter (closes #58) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the concrete *module.Pipeline type in the RoutePipelineSetter interface and the CommandHandler/QueryHandler routePipelines map with interfaces.PipelineRunner. This decouples the engine's route-pipeline wiring contract from the concrete Pipeline implementation, allowing test doubles and future plugin-provided pipelines to satisfy the interface without importing the module package. Changes: - engine.go: RoutePipelineSetter.SetRoutePipeline now takes interfaces.PipelineRunner - module/command_handler.go: routePipelines map and SetRoutePipeline use interfaces.PipelineRunner; ServeHTTP type-asserts to *Pipeline for concrete field access (Metadata, RoutePattern, Execute) with a Run()-based fallback for non-*Pipeline implementations - module/query_handler.go: same as command_handler *module.Pipeline already satisfies PipelineRunner, so engine.go callers pass *module.Pipeline unchanged — no call-site changes required there. Co-Authored-By: Claude Opus 4.6 --- cmd/server/main_test.go | 12 +++--- engine.go | 2 +- module/command_handler.go | 69 ++++++++++++++++++++++------------- module/query_handler.go | 73 +++++++++++++++++++++++-------------- plugins/http/plugin_test.go | 6 +-- 5 files changed, 101 insertions(+), 61 deletions(-) diff --git a/cmd/server/main_test.go b/cmd/server/main_test.go index 4024a4a7..e0d915ac 100644 --- a/cmd/server/main_test.go +++ b/cmd/server/main_test.go @@ -792,20 +792,22 @@ func TestImportBundles_MultipleBundles(t *testing.T) { // mockFeatureFlagAdmin implements module.FeatureFlagAdmin for testing. type mockFeatureFlagAdmin struct{} -func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil } -func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil } -func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil } +func (m *mockFeatureFlagAdmin) ListFlags() ([]any, error) { return nil, nil } +func (m *mockFeatureFlagAdmin) GetFlag(key string) (any, error) { return nil, nil } +func (m *mockFeatureFlagAdmin) CreateFlag(data json.RawMessage) (any, error) { return nil, nil } func (m *mockFeatureFlagAdmin) UpdateFlag(key string, data json.RawMessage) (any, error) { return nil, nil } -func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil } +func (m *mockFeatureFlagAdmin) DeleteFlag(key string) error { return nil } func (m *mockFeatureFlagAdmin) SetOverrides(key string, data json.RawMessage) (any, error) { return nil, nil } func (m *mockFeatureFlagAdmin) EvaluateFlag(key string, user string, group string) (any, error) { return nil, nil } -func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {}) } +func (m *mockFeatureFlagAdmin) SSEHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {}) +} // TestFeatureFlagAutoWiring verifies that registerPostStartServices wires a // FeatureFlagAdmin from the service registry into the V1 API handler. diff --git a/engine.go b/engine.go index b5e77c8e..2fb68750 100644 --- a/engine.go +++ b/engine.go @@ -691,7 +691,7 @@ func (e *StdEngine) wrapPipelineTriggerConfig(triggerType, pipelineName string, // buildPipelineSteps creates PipelineStep instances from step configurations. // RoutePipelineSetter is implemented by handlers (QueryHandler, CommandHandler) that support per-route pipelines. type RoutePipelineSetter interface { - SetRoutePipeline(routePath string, pipeline *module.Pipeline) + SetRoutePipeline(routePath string, pipeline interfaces.PipelineRunner) } // configureRoutePipelines scans HTTP workflow routes for inline pipeline steps diff --git a/module/command_handler.go b/module/command_handler.go index b90ecb7d..a9478514 100644 --- a/module/command_handler.go +++ b/module/command_handler.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/CrisisTextLine/modular" + "github.com/GoCodeAlone/workflow/interfaces" ) // CommandFunc is a state-changing command function that returns a result or an error. @@ -25,7 +26,7 @@ type CommandHandler struct { delegateHandler http.Handler app modular.Application commands map[string]CommandFunc - routePipelines map[string]*Pipeline + routePipelines map[string]interfaces.PipelineRunner executionTracker ExecutionTrackerProvider mu sync.RWMutex } @@ -35,12 +36,12 @@ func NewCommandHandler(name string) *CommandHandler { return &CommandHandler{ name: name, commands: make(map[string]CommandFunc), - routePipelines: make(map[string]*Pipeline), + routePipelines: make(map[string]interfaces.PipelineRunner), } } // SetRoutePipeline attaches a pipeline to a specific route path. -func (h *CommandHandler) SetRoutePipeline(routePath string, pipeline *Pipeline) { +func (h *CommandHandler) SetRoutePipeline(routePath string, pipeline interfaces.PipelineRunner) { h.mu.Lock() defer h.mu.Unlock() h.routePipelines[routePath] = pipeline @@ -165,34 +166,52 @@ func (h *CommandHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Restore the body so delegate steps can re-read it r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) } - // Inject HTTP context so delegate steps can forward directly - pipeline.Metadata = map[string]any{ - "_http_request": r, - "_http_response_writer": w, - } - if pipeline.RoutePattern != "" { - pipeline.Metadata["_route_pattern"] = pipeline.RoutePattern - } - var pc *PipelineContext - var err error - if h.executionTracker != nil { - pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), pipeline, triggerData, r) - } else { - pc, err = pipeline.Execute(r.Context(), triggerData) - } - if err != nil { - if pc == nil || pc.Metadata["_response_handled"] != true { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusInternalServerError) - _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + // Type-assert to *Pipeline for concrete field access (Metadata, RoutePattern, + // Execute) and execution tracker integration. All engine-registered pipelines + // are *Pipeline; the interface allows custom implementations in tests/plugins. + if concretePipeline, ok := pipeline.(*Pipeline); ok { + // Inject HTTP context so delegate steps can forward directly + concretePipeline.Metadata = map[string]any{ + "_http_request": r, + "_http_response_writer": w, + } + if concretePipeline.RoutePattern != "" { + concretePipeline.Metadata["_route_pattern"] = concretePipeline.RoutePattern + } + var pc *PipelineContext + var err error + if h.executionTracker != nil { + pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), concretePipeline, triggerData, r) + } else { + pc, err = concretePipeline.Execute(r.Context(), triggerData) + } + if err != nil { + if pc == nil || pc.Metadata["_response_handled"] != true { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + } + return + } + if pc.Metadata["_response_handled"] == true { + return + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(pc.Current); err != nil { + http.Error(w, "failed to encode response", http.StatusInternalServerError) } return } - if pc.Metadata["_response_handled"] == true { + // Fallback for non-*Pipeline implementations: use the PipelineRunner interface. + result, err := pipeline.Run(r.Context(), triggerData) + if err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) return } w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(pc.Current); err != nil { + if err := json.NewEncoder(w).Encode(result); err != nil { http.Error(w, "failed to encode response", http.StatusInternalServerError) } return diff --git a/module/query_handler.go b/module/query_handler.go index 150cc520..a98d103e 100644 --- a/module/query_handler.go +++ b/module/query_handler.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/CrisisTextLine/modular" + "github.com/GoCodeAlone/workflow/interfaces" ) // QueryFunc is a read-only query function that returns data or an error. @@ -24,7 +25,7 @@ type QueryHandler struct { delegateHandler http.Handler app modular.Application queries map[string]QueryFunc - routePipelines map[string]*Pipeline + routePipelines map[string]interfaces.PipelineRunner executionTracker ExecutionTrackerProvider mu sync.RWMutex } @@ -34,12 +35,12 @@ func NewQueryHandler(name string) *QueryHandler { return &QueryHandler{ name: name, queries: make(map[string]QueryFunc), - routePipelines: make(map[string]*Pipeline), + routePipelines: make(map[string]interfaces.PipelineRunner), } } // SetRoutePipeline attaches a pipeline to a specific route path. -func (h *QueryHandler) SetRoutePipeline(routePath string, pipeline *Pipeline) { +func (h *QueryHandler) SetRoutePipeline(routePath string, pipeline interfaces.PipelineRunner) { h.mu.Lock() defer h.mu.Unlock() h.routePipelines[routePath] = pipeline @@ -150,36 +151,54 @@ func (h *QueryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { "queryName": queryName, "query": r.URL.Query(), } - // Inject HTTP context so delegate steps can forward directly - pipeline.Metadata = map[string]any{ - "_http_request": r, - "_http_response_writer": w, - } - if pipeline.RoutePattern != "" { - pipeline.Metadata["_route_pattern"] = pipeline.RoutePattern - } - var pc *PipelineContext - var err error - if h.executionTracker != nil { - pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), pipeline, triggerData, r) - } else { - pc, err = pipeline.Execute(r.Context(), triggerData) - } - if err != nil { - // Only write error if response wasn't already handled by a delegate step - if pc == nil || pc.Metadata["_response_handled"] != true { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusInternalServerError) - _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + // Type-assert to *Pipeline for concrete field access (Metadata, RoutePattern, + // Execute) and execution tracker integration. All engine-registered pipelines + // are *Pipeline; the interface allows custom implementations in tests/plugins. + if concretePipeline, ok := pipeline.(*Pipeline); ok { + // Inject HTTP context so delegate steps can forward directly + concretePipeline.Metadata = map[string]any{ + "_http_request": r, + "_http_response_writer": w, + } + if concretePipeline.RoutePattern != "" { + concretePipeline.Metadata["_route_pattern"] = concretePipeline.RoutePattern + } + var pc *PipelineContext + var err error + if h.executionTracker != nil { + pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), concretePipeline, triggerData, r) + } else { + pc, err = concretePipeline.Execute(r.Context(), triggerData) + } + if err != nil { + // Only write error if response wasn't already handled by a delegate step + if pc == nil || pc.Metadata["_response_handled"] != true { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + } + return + } + // If response was handled by a delegate step, don't write again + if pc.Metadata["_response_handled"] == true { + return + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(pc.Current); err != nil { + http.Error(w, "failed to encode response", http.StatusInternalServerError) } return } - // If response was handled by a delegate step, don't write again - if pc.Metadata["_response_handled"] == true { + // Fallback for non-*Pipeline implementations: use the PipelineRunner interface. + result, err := pipeline.Run(r.Context(), triggerData) + if err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) return } w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(pc.Current); err != nil { + if err := json.NewEncoder(w).Encode(result); err != nil { http.Error(w, "failed to encode response", http.StatusInternalServerError) } return diff --git a/plugins/http/plugin_test.go b/plugins/http/plugin_test.go index 3b5d220b..f5fed213 100644 --- a/plugins/http/plugin_test.go +++ b/plugins/http/plugin_test.go @@ -365,9 +365,9 @@ func TestRateLimitMiddlewareFactory_InvalidValues(t *testing.T) { // Zero requestsPerHour must fall through to requestsPerMinute path (not crash). modZeroRPH := factory("rl-zero-rph", map[string]any{ - "requestsPerHour": 0, - "requestsPerMinute": 30, - "burstSize": 5, + "requestsPerHour": 0, + "requestsPerMinute": 30, + "burstSize": 5, }) if modZeroRPH == nil { t.Fatal("factory returned nil for zero requestsPerHour config") From 783f094e390546beda12d831e2bdbfefa7622154 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 04:19:31 -0500 Subject: [PATCH 2/2] fix: typed-nil guard and _response_handled parity for PipelineRunner fallback path (#121) * Initial plan * fix: typed-nil guard, _response_handled fallback, add route pipeline tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/command_handler.go | 34 +++++++----- module/command_handler_test.go | 99 ++++++++++++++++++++++++++++++++++ module/query_handler.go | 34 +++++++----- module/query_handler_test.go | 84 +++++++++++++++++++++++++++++ 4 files changed, 227 insertions(+), 24 deletions(-) diff --git a/module/command_handler.go b/module/command_handler.go index a9478514..37b6753d 100644 --- a/module/command_handler.go +++ b/module/command_handler.go @@ -169,7 +169,11 @@ func (h *CommandHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Type-assert to *Pipeline for concrete field access (Metadata, RoutePattern, // Execute) and execution tracker integration. All engine-registered pipelines // are *Pipeline; the interface allows custom implementations in tests/plugins. - if concretePipeline, ok := pipeline.(*Pipeline); ok { + // concretePipeline != nil: real *Pipeline. + // concretePipeline == nil && isConcrete: typed-nil – fall through to delegate/404. + // !isConcrete: different implementation – use PipelineRunner.Run() fallback. + concretePipeline, isConcrete := pipeline.(*Pipeline) + if isConcrete && concretePipeline != nil { // Inject HTTP context so delegate steps can forward directly concretePipeline.Metadata = map[string]any{ "_http_request": r, @@ -201,20 +205,26 @@ func (h *CommandHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "failed to encode response", http.StatusInternalServerError) } return - } - // Fallback for non-*Pipeline implementations: use the PipelineRunner interface. - result, err := pipeline.Run(r.Context(), triggerData) - if err != nil { + } else if !isConcrete { + // Fallback for non-*Pipeline implementations: use the PipelineRunner interface. + result, err := pipeline.Run(r.Context(), triggerData) + if err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + return + } + // Allow the runner to signal that it has already written the response. + if result["_response_handled"] == true { + return + } w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusInternalServerError) - _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + if err := json.NewEncoder(w).Encode(result); err != nil { + http.Error(w, "failed to encode response", http.StatusInternalServerError) + } return } - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(result); err != nil { - http.Error(w, "failed to encode response", http.StatusInternalServerError) - } - return + // typed-nil *Pipeline: fall through to delegate/404 handling. } if h.delegateHandler != nil { diff --git a/module/command_handler_test.go b/module/command_handler_test.go index 1abee706..7819aad4 100644 --- a/module/command_handler_test.go +++ b/module/command_handler_test.go @@ -4,11 +4,26 @@ import ( "context" "encoding/json" "errors" + "log/slog" "net/http" "net/http/httptest" "testing" + + "github.com/GoCodeAlone/workflow/interfaces" ) +// mockPipelineRunner is a minimal PipelineRunner for handler tests. +type mockPipelineRunner struct { + result map[string]any + err error +} + +func (m *mockPipelineRunner) Run(_ context.Context, _ map[string]any) (map[string]any, error) { + return m.result, m.err +} +func (m *mockPipelineRunner) SetLogger(_ *slog.Logger) {} +func (m *mockPipelineRunner) SetEventRecorder(_ interfaces.EventRecorder) {} + func TestCommandHandler_Name(t *testing.T) { h := NewCommandHandler("test-commands") if h.Name() != "test-commands" { @@ -196,3 +211,87 @@ func TestCommandHandler_Handle(t *testing.T) { t.Errorf("expected 200, got %d", rr.Code) } } + +// TestCommandHandler_RoutePipeline_MockRunner verifies that a non-*Pipeline +// PipelineRunner is invoked via Run() and its result is JSON-encoded. +func TestCommandHandler_RoutePipeline_MockRunner(t *testing.T) { + h := NewCommandHandler("test") + mock := &mockPipelineRunner{result: map[string]any{"status": "processed"}} + h.routePipelines["process"] = mock + + req := httptest.NewRequest("POST", "/api/v1/engine/process", nil) + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Errorf("expected 200, got %d", rr.Code) + } + var got map[string]any + if err := json.NewDecoder(rr.Body).Decode(&got); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if got["status"] != "processed" { + t.Errorf("expected status=processed, got %v", got) + } +} + +// TestCommandHandler_RoutePipeline_MockRunner_ResponseHandled verifies that +// when the PipelineRunner.Run result contains _response_handled=true the +// handler does not write an additional JSON body. +func TestCommandHandler_RoutePipeline_MockRunner_ResponseHandled(t *testing.T) { + h := NewCommandHandler("test") + mock := &mockPipelineRunner{result: map[string]any{"_response_handled": true}} + h.routePipelines["process"] = mock + + req := httptest.NewRequest("POST", "/api/v1/engine/process", nil) + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + if rr.Body.Len() != 0 { + t.Errorf("expected empty body when _response_handled=true, got %q", rr.Body.String()) + } +} + +// TestCommandHandler_RoutePipeline_MockRunner_Error verifies that a Run() error +// returns a 500 with the error message in the JSON body. +func TestCommandHandler_RoutePipeline_MockRunner_Error(t *testing.T) { + h := NewCommandHandler("test") + mock := &mockPipelineRunner{err: errors.New("runner failed")} + h.routePipelines["process"] = mock + + req := httptest.NewRequest("POST", "/api/v1/engine/process", nil) + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + if rr.Code != http.StatusInternalServerError { + t.Errorf("expected 500, got %d", rr.Code) + } + var got map[string]string + if err := json.NewDecoder(rr.Body).Decode(&got); err != nil { + t.Fatalf("failed to decode error response: %v", err) + } + if got["error"] != "runner failed" { + t.Errorf("expected error=runner failed, got %v", got) + } +} + +// TestCommandHandler_RoutePipeline_TypedNil verifies that a typed-nil *Pipeline +// stored as a PipelineRunner does not panic and falls through to 404. +func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) { + h := NewCommandHandler("test") + // Store a typed-nil *Pipeline as an interfaces.PipelineRunner. + // pipeline != nil is true (interface has type info), concretePipeline == nil. + var p *Pipeline + h.routePipelines["process"] = p + + req := httptest.NewRequest("POST", "/api/v1/engine/process", nil) + rr := httptest.NewRecorder() + + // Must not panic. + h.ServeHTTP(rr, req) + + if rr.Code != http.StatusNotFound { + t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) + } +} + diff --git a/module/query_handler.go b/module/query_handler.go index a98d103e..5ca04e4c 100644 --- a/module/query_handler.go +++ b/module/query_handler.go @@ -154,7 +154,11 @@ func (h *QueryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Type-assert to *Pipeline for concrete field access (Metadata, RoutePattern, // Execute) and execution tracker integration. All engine-registered pipelines // are *Pipeline; the interface allows custom implementations in tests/plugins. - if concretePipeline, ok := pipeline.(*Pipeline); ok { + // concretePipeline != nil: real *Pipeline. + // concretePipeline == nil && isConcrete: typed-nil – fall through to delegate/404. + // !isConcrete: different implementation – use PipelineRunner.Run() fallback. + concretePipeline, isConcrete := pipeline.(*Pipeline) + if isConcrete && concretePipeline != nil { // Inject HTTP context so delegate steps can forward directly concretePipeline.Metadata = map[string]any{ "_http_request": r, @@ -188,20 +192,26 @@ func (h *QueryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "failed to encode response", http.StatusInternalServerError) } return - } - // Fallback for non-*Pipeline implementations: use the PipelineRunner interface. - result, err := pipeline.Run(r.Context(), triggerData) - if err != nil { + } else if !isConcrete { + // Fallback for non-*Pipeline implementations: use the PipelineRunner interface. + result, err := pipeline.Run(r.Context(), triggerData) + if err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + return + } + // Allow the runner to signal that it has already written the response. + if result["_response_handled"] == true { + return + } w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusInternalServerError) - _ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + if err := json.NewEncoder(w).Encode(result); err != nil { + http.Error(w, "failed to encode response", http.StatusInternalServerError) + } return } - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(result); err != nil { - http.Error(w, "failed to encode response", http.StatusInternalServerError) - } - return + // typed-nil *Pipeline: fall through to delegate/404 handling. } if h.delegateHandler != nil { diff --git a/module/query_handler_test.go b/module/query_handler_test.go index ee6ed36b..a8243538 100644 --- a/module/query_handler_test.go +++ b/module/query_handler_test.go @@ -216,3 +216,87 @@ func TestLastPathSegment(t *testing.T) { } } } + +// TestQueryHandler_RoutePipeline_MockRunner verifies that a non-*Pipeline +// PipelineRunner is invoked via Run() and its result is JSON-encoded. +func TestQueryHandler_RoutePipeline_MockRunner(t *testing.T) { + h := NewQueryHandler("test") + mock := &mockPipelineRunner{result: map[string]any{"data": "value"}} + h.routePipelines["report"] = mock + + req := httptest.NewRequest("GET", "/api/v1/engine/report", nil) + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Errorf("expected 200, got %d", rr.Code) + } + var got map[string]any + if err := json.NewDecoder(rr.Body).Decode(&got); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if got["data"] != "value" { + t.Errorf("expected data=value, got %v", got) + } +} + +// TestQueryHandler_RoutePipeline_MockRunner_ResponseHandled verifies that +// when the PipelineRunner.Run result contains _response_handled=true the +// handler does not write an additional JSON body. +func TestQueryHandler_RoutePipeline_MockRunner_ResponseHandled(t *testing.T) { + h := NewQueryHandler("test") + mock := &mockPipelineRunner{result: map[string]any{"_response_handled": true}} + h.routePipelines["report"] = mock + + req := httptest.NewRequest("GET", "/api/v1/engine/report", nil) + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + if rr.Body.Len() != 0 { + t.Errorf("expected empty body when _response_handled=true, got %q", rr.Body.String()) + } +} + +// TestQueryHandler_RoutePipeline_MockRunner_Error verifies that a Run() error +// returns a 500 with the error message in the JSON body. +func TestQueryHandler_RoutePipeline_MockRunner_Error(t *testing.T) { + h := NewQueryHandler("test") + mock := &mockPipelineRunner{err: errors.New("runner failed")} + h.routePipelines["report"] = mock + + req := httptest.NewRequest("GET", "/api/v1/engine/report", nil) + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + + if rr.Code != http.StatusInternalServerError { + t.Errorf("expected 500, got %d", rr.Code) + } + var got map[string]string + if err := json.NewDecoder(rr.Body).Decode(&got); err != nil { + t.Fatalf("failed to decode error response: %v", err) + } + if got["error"] != "runner failed" { + t.Errorf("expected error=runner failed, got %v", got) + } +} + +// TestQueryHandler_RoutePipeline_TypedNil verifies that a typed-nil *Pipeline +// stored as a PipelineRunner does not panic and falls through to 404. +func TestQueryHandler_RoutePipeline_TypedNil(t *testing.T) { + h := NewQueryHandler("test") + // Store a typed-nil *Pipeline as an interfaces.PipelineRunner. + // pipeline != nil is true (interface has type info), concretePipeline == nil. + var p *Pipeline + h.routePipelines["report"] = p + + req := httptest.NewRequest("GET", "/api/v1/engine/report", nil) + rr := httptest.NewRecorder() + + // Must not panic. + h.ServeHTTP(rr, req) + + if rr.Code != http.StatusNotFound { + t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) + } +} +