diff --git a/engine.go b/engine.go index 506d5b5d..45fed7d5 100644 --- a/engine.go +++ b/engine.go @@ -583,6 +583,13 @@ func (e *StdEngine) TriggerWorkflow(ctx context.Context, workflowType string, ac e.logger.Debug(fmt.Sprintf(" Result %s: %v", k, v)) } + // If the caller stored a PipelineResultHolder in the context, populate it + // so HTTP trigger handlers can read response_status/body/headers without + // requiring the WorkflowEngine interface to return a result map. + if holder, ok := ctx.Value(module.PipelineResultContextKey).(*module.PipelineResultHolder); ok && holder != nil { + holder.Set(results) + } + if e.eventEmitter != nil { e.eventEmitter.EmitWorkflowCompleted(ctx, workflowType, action, time.Since(startTime), results) } diff --git a/engine_test.go b/engine_test.go index c9d499ae..fbcafef1 100644 --- a/engine_test.go +++ b/engine_test.go @@ -1268,6 +1268,62 @@ func TestCanHandleTrigger_EventBus(t *testing.T) { } } +// TestEngine_TriggerWorkflow_PopulatesPipelineResultHolder verifies that +// TriggerWorkflow populates a *module.PipelineResultHolder stored in the context +// with the workflow handler's result map after successful execution. +func TestEngine_TriggerWorkflow_PopulatesPipelineResultHolder(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + loadAllPlugins(t, engine) + + handler := &errorMockWorkflowHandler{ + mockWorkflowHandler: mockWorkflowHandler{ + name: "holder-handler", + handlesFor: []string{"holder-wf"}, + }, + // returns a result map with response fields + } + engine.RegisterWorkflowHandler(handler) + + holder := &module.PipelineResultHolder{} + ctx := context.WithValue(context.Background(), module.PipelineResultContextKey, holder) + + err := engine.TriggerWorkflow(ctx, "holder-wf", "run", map[string]any{}) + if err != nil { + t.Fatalf("TriggerWorkflow failed: %v", err) + } + + got := holder.Get() + if got == nil { + t.Fatal("expected PipelineResultHolder to be populated, got nil") + } + if got["status"] != "ok" { + t.Errorf("expected result[status]='ok', got %v", got["status"]) + } +} + +// TestEngine_TriggerWorkflow_HolderAbsent verifies that TriggerWorkflow does not +// panic and succeeds normally when no PipelineResultHolder is in the context. +func TestEngine_TriggerWorkflow_HolderAbsent(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + loadAllPlugins(t, engine) + + handler := &errorMockWorkflowHandler{ + mockWorkflowHandler: mockWorkflowHandler{ + name: "no-holder-handler", + handlesFor: []string{"no-holder-wf"}, + }, + } + engine.RegisterWorkflowHandler(handler) + + // No holder in context — should succeed without panicking. + err := engine.TriggerWorkflow(context.Background(), "no-holder-wf", "run", map[string]any{}) + if err != nil { + t.Fatalf("TriggerWorkflow failed: %v", err) + } +} + // ============================================================================ // Tests for requires.plugins validation (Phase 4 - Engine Decomposition) // ============================================================================ diff --git a/module/http_trigger.go b/module/http_trigger.go index eb207465..480b2e81 100644 --- a/module/http_trigger.go +++ b/module/http_trigger.go @@ -9,6 +9,8 @@ import ( "log" "maps" "net/http" + "strconv" + "strings" "github.com/CrisisTextLine/modular" ) @@ -31,6 +33,129 @@ type httpReqContextKey struct{} // headers, path parameters, and the request body. var HTTPRequestContextKey = httpReqContextKey{} +// pipelineResultKey is the unexported type for the pipeline result context key. +type pipelineResultKey struct{} + +// PipelineResultContextKey is the context key used to capture pipeline execution +// results from TriggerWorkflow. HTTP trigger handlers store a *PipelineResultHolder +// in the context before calling TriggerWorkflow; the engine populates it with the +// pipeline's result.Current map after execution. This lets the trigger apply +// response_status/response_body/response_headers from the pipeline output when no +// step wrote directly to the HTTP response writer. +var PipelineResultContextKey = pipelineResultKey{} + +// PipelineResultHolder is a mutable container used to pass pipeline execution +// results back through the context from the engine to the HTTP trigger handler. +type PipelineResultHolder struct { + result map[string]any +} + +// Set stores the pipeline result in the holder. +func (h *PipelineResultHolder) Set(result map[string]any) { + h.result = result +} + +// Get returns the stored pipeline result, or nil if not set. +func (h *PipelineResultHolder) Get() map[string]any { + return h.result +} + +// coercePipelineStatus coerces common numeric/string types into an HTTP status +// code. Pipeline steps may emit response_status as int, int64, float64 (common +// after generic JSON decoding), json.Number, or a numeric string. +func coercePipelineStatus(v any) (int, bool) { + switch s := v.(type) { + case int: + return s, true + case int64: + status := int(s) + if int64(status) != s { + return 0, false + } + return status, true + case float64: + status := int(s) + if float64(status) != s { + return 0, false + } + return status, true + case json.Number: + i64, err := s.Int64() + if err != nil { + return 0, false + } + status := int(i64) + if int64(status) != i64 { + return 0, false + } + return status, true + case string: + n, err := strconv.Atoi(strings.TrimSpace(s)) + if err != nil { + return 0, false + } + return n, true + default: + return 0, false + } +} + +// applyPipelineHeaders writes response headers from common map/header shapes +// that pipeline steps may emit for response_headers. +func applyPipelineHeaders(w http.ResponseWriter, rawHeaders any) { + switch headers := rawHeaders.(type) { + case map[string]any: + for k, v := range headers { + switch hv := v.(type) { + case string: + w.Header().Set(k, hv) + case []string: + for _, sv := range hv { + w.Header().Add(k, sv) + } + case []any: + for _, sv := range hv { + w.Header().Add(k, fmt.Sprint(sv)) + } + default: + w.Header().Set(k, fmt.Sprint(hv)) + } + } + case map[string]string: + for k, v := range headers { + w.Header().Set(k, v) + } + case http.Header: + for k, vals := range headers { + for _, v := range vals { + w.Header().Add(k, v) + } + } + } +} + +// writePipelineContextResponse checks the result map for response_status and, +// if present, applies response_headers and writes the response. Returns true if +// the response was written from the pipeline context fields. +func writePipelineContextResponse(w http.ResponseWriter, result map[string]any) bool { + rawStatus, ok := result["response_status"] + if !ok { + return false + } + status, ok := coercePipelineStatus(rawStatus) + if !ok { + return false + } + if rawHeaders, ok := result["response_headers"]; ok { + applyPipelineHeaders(w, rawHeaders) + } + w.WriteHeader(status) + if body, ok := result["response_body"].(string); ok { + _, _ = w.Write([]byte(body)) //nolint:gosec // G705: body is pipeline step output explicitly set as response body + } + return true +} + // trackedResponseWriter wraps http.ResponseWriter and tracks whether a response // body has been written, so the HTTP trigger can fall back to the generic // "workflow triggered" response only when the pipeline didn't write one. @@ -267,6 +392,11 @@ func (t *HTTPTrigger) createHandler(route HTTPTriggerRoute) HTTPHandler { // to headers (e.g. Authorization), method, URL, and body. ctx = context.WithValue(ctx, HTTPRequestContextKey, r) + // Inject a result holder so the engine can pass the pipeline's result.Current + // back to this handler without changing the WorkflowEngine interface. + resultHolder := &PipelineResultHolder{} + ctx = context.WithValue(ctx, PipelineResultContextKey, resultHolder) + // Extract data from the request to pass to the workflow. // Include method, path, and parsed body so pipelines have full // access to request context (consistent with CommandHandler). @@ -316,6 +446,14 @@ func (t *HTTPTrigger) createHandler(route HTTPTriggerRoute) HTTPHandler { return } + // If the pipeline set response_status in its output (without writing + // directly to the response writer), use those values to build the response. + if result := resultHolder.Get(); result != nil { + if writePipelineContextResponse(w, result) { + return + } + } + // Fallback: return a generic accepted response when the pipeline doesn't // write its own HTTP response. w.Header().Set("Content-Type", "application/json") diff --git a/module/http_trigger_test.go b/module/http_trigger_test.go index a0ab1908..8745f23a 100644 --- a/module/http_trigger_test.go +++ b/module/http_trigger_test.go @@ -2,6 +2,7 @@ package module import ( "context" + "encoding/json" "net/http" "net/http/httptest" "strings" @@ -313,3 +314,250 @@ func (e *captureContextEngine) TriggerWorkflow(ctx context.Context, _ string, _ *e.capture = ctx return nil } + +// pipelineContextResultEngine is a mock WorkflowEngine that simulates a pipeline +// setting response_status/response_body/response_headers in result.Current +// without writing directly to the HTTP response writer. It populates the +// PipelineResultHolder stored in the context, the way the real engine does. +type pipelineContextResultEngine struct { + result map[string]any +} + +func (e *pipelineContextResultEngine) TriggerWorkflow(ctx context.Context, _ string, _ string, _ map[string]any) error { + if holder, ok := ctx.Value(PipelineResultContextKey).(*PipelineResultHolder); ok && holder != nil { + holder.Set(e.result) + } + return nil +} + +// TestHTTPTrigger_PipelineContextResponse verifies that when a pipeline step +// sets response_status/response_body/response_headers in result.Current without +// writing to the HTTP response writer, the trigger uses those values instead of +// the generic 202 fallback. +func TestHTTPTrigger_PipelineContextResponse(t *testing.T) { + app := NewMockApplication() + router := NewMockHTTPRouter("test-router") + _ = app.RegisterService("httpRouter", router) + + engine := &pipelineContextResultEngine{result: map[string]any{ + "response_status": 403, + "response_body": `{"error":"forbidden"}`, + "response_headers": map[string]any{ + "Content-Type": "application/json", + }, + }} + _ = app.RegisterService("workflowEngine", engine) + + trigger := NewHTTPTrigger() + app.RegisterModule(trigger) + + cfg := map[string]any{ + "routes": []any{ + map[string]any{ + "path": "/api/secure", + "method": "GET", + "workflow": "secure-workflow", + "action": "execute", + }, + }, + } + if err := trigger.Configure(app, cfg); err != nil { + t.Fatalf("Configure: %v", err) + } + if err := trigger.Start(context.Background()); err != nil { + t.Fatalf("Start: %v", err) + } + + handler := router.routes["GET /api/secure"] + if handler == nil { + t.Fatal("handler not registered") + } + + req := httptest.NewRequest("GET", "/api/secure", nil) + w := httptest.NewRecorder() + handler.Handle(w, req) + + resp := w.Result() + if resp.StatusCode != 403 { + t.Errorf("expected 403 from pipeline context, got %d", resp.StatusCode) + } + if w.Body.String() != `{"error":"forbidden"}` { + t.Errorf("expected pipeline body, got %q", w.Body.String()) + } + if w.Header().Get("Content-Type") != "application/json" { + t.Errorf("expected Content-Type header, got %q", w.Header().Get("Content-Type")) + } +} + +// TestHTTPTrigger_PipelineContextResponse_NoStatus verifies that when +// response_status is absent from the pipeline result, the trigger still +// falls back to the generic 202 accepted response. +func TestHTTPTrigger_PipelineContextResponse_NoStatus(t *testing.T) { + app := NewMockApplication() + router := NewMockHTTPRouter("test-router") + _ = app.RegisterService("httpRouter", router) + + engine := &pipelineContextResultEngine{result: map[string]any{ + "some_internal_data": "secret", + }} + _ = app.RegisterService("workflowEngine", engine) + + trigger := NewHTTPTrigger() + app.RegisterModule(trigger) + + cfg := map[string]any{ + "routes": []any{ + map[string]any{ + "path": "/api/noisy", + "method": "GET", + "workflow": "noisy-workflow", + "action": "execute", + }, + }, + } + if err := trigger.Configure(app, cfg); err != nil { + t.Fatalf("Configure: %v", err) + } + if err := trigger.Start(context.Background()); err != nil { + t.Fatalf("Start: %v", err) + } + + handler := router.routes["GET /api/noisy"] + req := httptest.NewRequest("GET", "/api/noisy", nil) + w := httptest.NewRecorder() + handler.Handle(w, req) + + resp := w.Result() + if resp.StatusCode != 202 { + t.Errorf("expected fallback 202, got %d", resp.StatusCode) + } + if !strings.Contains(w.Body.String(), "workflow triggered") { + t.Errorf("expected fallback body, got %q", w.Body.String()) + } +} + +// TestCoercePipelineStatus verifies that coercePipelineStatus handles all +// common numeric and string types that pipeline steps may emit. +func TestCoercePipelineStatus(t *testing.T) { + tests := []struct { + name string + input any + want int + wantOK bool + }{ + {"int", 403, 403, true}, + {"int64", int64(201), 201, true}, + {"float64 whole", float64(200), 200, true}, + {"float64 fractional", float64(200.5), 0, false}, + {"json.Number int", json.Number("404"), 404, true}, + {"json.Number float", json.Number("404.5"), 0, false}, + {"string numeric", "500", 500, true}, + {"string with spaces", " 403 ", 403, true}, + {"string non-numeric", "ok", 0, false}, + {"nil", nil, 0, false}, + {"bool", true, 0, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, ok := coercePipelineStatus(tt.input) + if ok != tt.wantOK { + t.Errorf("coercePipelineStatus(%v): ok=%v, want %v", tt.input, ok, tt.wantOK) + } + if ok && got != tt.want { + t.Errorf("coercePipelineStatus(%v): got %d, want %d", tt.input, got, tt.want) + } + }) + } +} + +// TestHTTPTrigger_PipelineContextResponse_Float64Status verifies that a +// response_status emitted as float64 (common after generic JSON decoding) is +// correctly coerced into an HTTP status code. +func TestHTTPTrigger_PipelineContextResponse_Float64Status(t *testing.T) { + app := NewMockApplication() + router := NewMockHTTPRouter("test-router") + _ = app.RegisterService("httpRouter", router) + + engine := &pipelineContextResultEngine{result: map[string]any{ + "response_status": float64(422), + "response_body": `{"error":"unprocessable"}`, + }} + _ = app.RegisterService("workflowEngine", engine) + + trigger := NewHTTPTrigger() + app.RegisterModule(trigger) + + cfg := map[string]any{ + "routes": []any{ + map[string]any{ + "path": "/api/validate", + "method": "POST", + "workflow": "validate-wf", + "action": "execute", + }, + }, + } + if err := trigger.Configure(app, cfg); err != nil { + t.Fatalf("Configure: %v", err) + } + if err := trigger.Start(context.Background()); err != nil { + t.Fatalf("Start: %v", err) + } + + handler := router.routes["POST /api/validate"] + req := httptest.NewRequest("POST", "/api/validate", nil) + w := httptest.NewRecorder() + handler.Handle(w, req) + + if w.Result().StatusCode != 422 { + t.Errorf("expected 422 from float64 status, got %d", w.Result().StatusCode) + } +} + +// TestHTTPTrigger_PipelineContextResponse_MapStringStringHeaders verifies that +// response_headers emitted as map[string]string are applied correctly. +func TestHTTPTrigger_PipelineContextResponse_MapStringStringHeaders(t *testing.T) { + app := NewMockApplication() + router := NewMockHTTPRouter("test-router") + _ = app.RegisterService("httpRouter", router) + + engine := &pipelineContextResultEngine{result: map[string]any{ + "response_status": 200, + "response_body": `ok`, + "response_headers": map[string]string{"X-Custom": "value"}, + }} + _ = app.RegisterService("workflowEngine", engine) + + trigger := NewHTTPTrigger() + app.RegisterModule(trigger) + + cfg := map[string]any{ + "routes": []any{ + map[string]any{ + "path": "/api/hdr", + "method": "GET", + "workflow": "hdr-wf", + "action": "execute", + }, + }, + } + if err := trigger.Configure(app, cfg); err != nil { + t.Fatalf("Configure: %v", err) + } + if err := trigger.Start(context.Background()); err != nil { + t.Fatalf("Start: %v", err) + } + + handler := router.routes["GET /api/hdr"] + req := httptest.NewRequest("GET", "/api/hdr", nil) + w := httptest.NewRecorder() + handler.Handle(w, req) + + if w.Result().StatusCode != 200 { + t.Errorf("expected 200, got %d", w.Result().StatusCode) + } + if w.Header().Get("X-Custom") != "value" { + t.Errorf("expected X-Custom header, got %q", w.Header().Get("X-Custom")) + } +} diff --git a/module/openapi.go b/module/openapi.go index 73c8fb81..85de200e 100644 --- a/module/openapi.go +++ b/module/openapi.go @@ -383,6 +383,12 @@ func (h *openAPIRouteHandler) Handle(w http.ResponseWriter, r *http.Request) { return } + // If the pipeline set response_status in its output (without writing + // directly to the response writer), use those values to build the response. + if writePipelineContextResponse(w, result.Current) { + return + } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(result.Current) diff --git a/module/openapi_test.go b/module/openapi_test.go index ec9d9eb3..7e6603c2 100644 --- a/module/openapi_test.go +++ b/module/openapi_test.go @@ -1124,3 +1124,182 @@ func (s *stubPipelineStep) Name() string { return s.name } func (s *stubPipelineStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { return s.exec(ctx, pc) } + +// TestOpenAPIModule_XPipeline_ResponseStatusFromContext verifies that when a +// pipeline step sets response_status/response_body/response_headers in its +// output and no step writes directly to the HTTP response writer, the openapi +// handler uses those values instead of falling through to 200 with all state. +func TestOpenAPIModule_XPipeline_ResponseStatusFromContext(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", xPipelineYAML) + + mod := NewOpenAPIModule("pipe-api", OpenAPIConfig{ + SpecFile: specPath, + BasePath: "/api", + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init: %v", err) + } + + // Pipeline step that returns a 403 with a custom body via result.Current, + // without writing to the HTTP response writer directly. + authStep := &stubPipelineStep{ + name: "auth-check", + exec: func(_ context.Context, _ *PipelineContext) (*StepResult, error) { + return &StepResult{ + Output: map[string]any{ + "response_status": 403, + "response_body": `{"error":"forbidden"}`, + "response_headers": map[string]any{ + "Content-Type": "application/json", + }, + }, + Stop: true, + }, nil + }, + } + authPipeline := &Pipeline{ + Name: "greet-pipeline", + Steps: []PipelineStep{authStep}, + } + + mod.SetPipelineLookup(func(name string) (*Pipeline, bool) { + if name == "greet-pipeline" { + return authPipeline, true + } + return nil, false + }) + + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("GET", "/api/greet") + if h == nil { + t.Fatal("GET /api/greet handler not found") + } + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/api/greet", nil) + h.Handle(w, r) + + if w.Code != http.StatusForbidden { + t.Errorf("expected 403 from pipeline context, got %d: %s", w.Code, w.Body.String()) + } + if w.Body.String() != `{"error":"forbidden"}` { + t.Errorf("expected pipeline body, got %q", w.Body.String()) + } + if w.Header().Get("Content-Type") != "application/json" { + t.Errorf("expected Content-Type header, got %q", w.Header().Get("Content-Type")) + } +} + +// TestOpenAPIModule_XPipeline_NoResponseStatusFallsThrough verifies that when +// response_status is absent from result.Current, the handler still falls through +// to the default 200 JSON encoding of result.Current. +func TestOpenAPIModule_XPipeline_NoResponseStatusFallsThrough(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", xPipelineYAML) + + mod := NewOpenAPIModule("pipe-api", OpenAPIConfig{ + SpecFile: specPath, + BasePath: "/api", + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init: %v", err) + } + + dataStep := &stubPipelineStep{ + name: "produce-data", + exec: func(_ context.Context, _ *PipelineContext) (*StepResult, error) { + return &StepResult{Output: map[string]any{"key": "value"}}, nil + }, + } + dataPipeline := &Pipeline{ + Name: "greet-pipeline", + Steps: []PipelineStep{dataStep}, + } + + mod.SetPipelineLookup(func(name string) (*Pipeline, bool) { + if name == "greet-pipeline" { + return dataPipeline, true + } + return nil, false + }) + + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("GET", "/api/greet") + if h == nil { + t.Fatal("GET /api/greet handler not found") + } + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/api/greet", nil) + h.Handle(w, r) + + if w.Code != http.StatusOK { + t.Errorf("expected 200 fallback, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]any + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("expected JSON fallback body, got error: %v", err) + } + if resp["key"] != "value" { + t.Errorf("expected key=value in fallback body, got %v", resp) + } +} + +// TestOpenAPIModule_XPipeline_ResponseStatus_Float64 verifies that response_status +// emitted as float64 (common after JSON round-trip) is correctly coerced. +func TestOpenAPIModule_XPipeline_ResponseStatus_Float64(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", xPipelineYAML) + + mod := NewOpenAPIModule("pipe-api", OpenAPIConfig{ + SpecFile: specPath, + BasePath: "/api", + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init: %v", err) + } + + step := &stubPipelineStep{ + name: "float-status", + exec: func(_ context.Context, _ *PipelineContext) (*StepResult, error) { + return &StepResult{ + Output: map[string]any{ + "response_status": float64(422), + "response_body": `{"error":"unprocessable"}`, + "response_headers": map[string]string{ + "Content-Type": "application/json", + }, + }, + Stop: true, + }, nil + }, + } + pipe := &Pipeline{Name: "greet-pipeline", Steps: []PipelineStep{step}} + mod.SetPipelineLookup(func(name string) (*Pipeline, bool) { + if name == "greet-pipeline" { + return pipe, true + } + return nil, false + }) + + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("GET", "/api/greet") + if h == nil { + t.Fatal("handler not found") + } + + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/api/greet", nil) + h.Handle(w, r) + + if w.Code != 422 { + t.Errorf("expected 422 from float64 status, got %d: %s", w.Code, w.Body.String()) + } + if w.Body.String() != `{"error":"unprocessable"}` { + t.Errorf("unexpected body: %q", w.Body.String()) + } +}