diff --git a/module/pipeline_step_raw_response.go b/module/pipeline_step_raw_response.go new file mode 100644 index 00000000..9420a014 --- /dev/null +++ b/module/pipeline_step_raw_response.go @@ -0,0 +1,135 @@ +package module + +import ( + "context" + "fmt" + "net/http" + + "github.com/CrisisTextLine/modular" +) + +// RawResponseStep writes a non-JSON HTTP response (e.g. XML, HTML, plain text) +// with a custom status code, content type, and optional headers, then stops the pipeline. +type RawResponseStep struct { + name string + status int + contentType string + headers map[string]string + body string + bodyFrom string + tmpl *TemplateEngine +} + +// NewRawResponseStepFactory returns a StepFactory that creates RawResponseStep instances. +func NewRawResponseStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + contentType, _ := config["content_type"].(string) + if contentType == "" { + return nil, fmt.Errorf("raw_response step %q: 'content_type' is required", name) + } + + status := 200 + if s, ok := config["status"]; ok { + switch v := s.(type) { + case int: + status = v + case float64: + status = int(v) + } + } + + var headers map[string]string + if h, ok := config["headers"].(map[string]any); ok { + headers = make(map[string]string, len(h)) + for k, v := range h { + if s, ok := v.(string); ok { + headers[k] = s + } + } + } + + body, _ := config["body"].(string) + bodyFrom, _ := config["body_from"].(string) + + return &RawResponseStep{ + name: name, + status: status, + contentType: contentType, + headers: headers, + body: body, + bodyFrom: bodyFrom, + tmpl: NewTemplateEngine(), + }, nil + } +} + +func (s *RawResponseStep) Name() string { return s.name } + +func (s *RawResponseStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { + // Resolve the response body + responseBody := s.resolveBody(pc) + + w, ok := pc.Metadata["_http_response_writer"].(http.ResponseWriter) + if !ok { + // No response writer — return the body as output without writing HTTP + output := map[string]any{ + "status": s.status, + "content_type": s.contentType, + } + if responseBody != "" { + output["body"] = responseBody + } + return &StepResult{Output: output, Stop: true}, nil + } + + // Set Content-Type header + w.Header().Set("Content-Type", s.contentType) + + // Set additional headers + for k, v := range s.headers { + w.Header().Set(k, v) + } + + // Write status code + w.WriteHeader(s.status) + + // Write body + if responseBody != "" { + if _, err := w.Write([]byte(responseBody)); err != nil { + return nil, fmt.Errorf("raw_response step %q: failed to write response: %w", s.name, err) + } + } + + // Mark response as handled + pc.Metadata["_response_handled"] = true + + return &StepResult{ + Output: map[string]any{ + "status": s.status, + "content_type": s.contentType, + }, + Stop: true, + }, nil +} + +// resolveBody determines the response body string from the step configuration. +func (s *RawResponseStep) resolveBody(pc *PipelineContext) string { + if s.bodyFrom != "" { + val := resolveBodyFrom(s.bodyFrom, pc) + if str, ok := val.(string); ok { + return str + } + if val != nil { + return fmt.Sprintf("%v", val) + } + return "" + } + if s.body != "" { + resolved, err := s.tmpl.Resolve(s.body, pc) + if err != nil { + return s.body // fallback to unresolved + } + return resolved + } + return "" +} diff --git a/module/pipeline_step_raw_response_test.go b/module/pipeline_step_raw_response_test.go new file mode 100644 index 00000000..35b84b5c --- /dev/null +++ b/module/pipeline_step_raw_response_test.go @@ -0,0 +1,276 @@ +package module + +import ( + "context" + "io" + "net/http/httptest" + "testing" +) + +func TestRawResponseStep_BasicXML(t *testing.T) { + factory := NewRawResponseStepFactory() + step, err := factory("respond", map[string]any{ + "content_type": "text/xml", + "body": ``, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if !result.Stop { + t.Error("expected Stop=true") + } + + resp := recorder.Result() + if resp.StatusCode != 200 { + t.Errorf("expected status 200, got %d", resp.StatusCode) + } + if ct := resp.Header.Get("Content-Type"); ct != "text/xml" { + t.Errorf("expected Content-Type text/xml, got %q", ct) + } + + body, _ := io.ReadAll(resp.Body) + expected := `` + if string(body) != expected { + t.Errorf("expected body %q, got %q", expected, string(body)) + } + + if pc.Metadata["_response_handled"] != true { + t.Error("expected _response_handled=true") + } +} + +func TestRawResponseStep_CustomStatus(t *testing.T) { + factory := NewRawResponseStepFactory() + step, err := factory("health", map[string]any{ + "status": 503, + "content_type": "text/plain", + "body": "Service Unavailable", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if !result.Stop { + t.Error("expected Stop=true") + } + + resp := recorder.Result() + if resp.StatusCode != 503 { + t.Errorf("expected status 503, got %d", resp.StatusCode) + } + + body, _ := io.ReadAll(resp.Body) + if string(body) != "Service Unavailable" { + t.Errorf("expected body 'Service Unavailable', got %q", string(body)) + } +} + +func TestRawResponseStep_CustomHeaders(t *testing.T) { + factory := NewRawResponseStepFactory() + step, err := factory("with-headers", map[string]any{ + "content_type": "text/html", + "headers": map[string]any{ + "X-Custom": "test-value", + "Cache-Control": "no-cache", + }, + "body": "OK", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if recorder.Header().Get("Content-Type") != "text/html" { + t.Errorf("expected Content-Type text/html, got %q", recorder.Header().Get("Content-Type")) + } + if recorder.Header().Get("X-Custom") != "test-value" { + t.Errorf("expected X-Custom header, got %q", recorder.Header().Get("X-Custom")) + } + if recorder.Header().Get("Cache-Control") != "no-cache" { + t.Errorf("expected Cache-Control header, got %q", recorder.Header().Get("Cache-Control")) + } +} + +func TestRawResponseStep_TemplateBody(t *testing.T) { + factory := NewRawResponseStepFactory() + step, err := factory("templated", map[string]any{ + "content_type": "text/xml", + "body": `{{ .steps.prepare.id }}`, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + pc.MergeStepOutput("prepare", map[string]any{"id": "new-id-123"}) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + body, _ := io.ReadAll(recorder.Body) + expected := `new-id-123` + if string(body) != expected { + t.Errorf("expected body %q, got %q", expected, string(body)) + } +} + +func TestRawResponseStep_BodyFrom(t *testing.T) { + factory := NewRawResponseStepFactory() + step, err := factory("from-step", map[string]any{ + "content_type": "text/plain", + "body_from": "steps.generate.content", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + pc.MergeStepOutput("generate", map[string]any{ + "content": "Hello from pipeline context", + }) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + body, _ := io.ReadAll(recorder.Body) + if string(body) != "Hello from pipeline context" { + t.Errorf("expected body 'Hello from pipeline context', got %q", string(body)) + } +} + +func TestRawResponseStep_NoWriter(t *testing.T) { + factory := NewRawResponseStepFactory() + step, err := factory("no-writer", map[string]any{ + "content_type": "text/plain", + "body": "test body", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, map[string]any{}) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if !result.Stop { + t.Error("expected Stop=true even without writer") + } + if result.Output["status"] != 200 { + t.Errorf("expected status=200, got %v", result.Output["status"]) + } + if result.Output["content_type"] != "text/plain" { + t.Errorf("expected content_type=text/plain, got %v", result.Output["content_type"]) + } + if result.Output["body"] != "test body" { + t.Errorf("expected body='test body', got %v", result.Output["body"]) + } +} + +func TestRawResponseStep_DefaultStatus(t *testing.T) { + factory := NewRawResponseStepFactory() + step, err := factory("default-status", map[string]any{ + "content_type": "text/plain", + "body": "ok", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + resp := recorder.Result() + if resp.StatusCode != 200 { + t.Errorf("expected default status 200, got %d", resp.StatusCode) + } +} + +func TestRawResponseStep_MissingContentType(t *testing.T) { + factory := NewRawResponseStepFactory() + _, err := factory("bad", map[string]any{ + "body": "test", + }, nil) + if err == nil { + t.Fatal("expected error for missing content_type") + } +} + +func TestRawResponseStep_EmptyBody(t *testing.T) { + factory := NewRawResponseStepFactory() + step, err := factory("empty", map[string]any{ + "status": 204, + "content_type": "text/plain", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + resp := recorder.Result() + if resp.StatusCode != 204 { + t.Errorf("expected status 204, got %d", resp.StatusCode) + } + + body, _ := io.ReadAll(resp.Body) + if len(body) != 0 { + t.Errorf("expected empty body, got %q", string(body)) + } +} diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index fe739edd..6628cf33 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -1,6 +1,6 @@ // Package pipelinesteps provides a plugin that registers generic pipeline step // types: validate, transform, conditional, set, log, delegate, jq, publish, -// http_call, request_parse, db_query, db_exec, json_response, +// http_call, request_parse, db_query, db_exec, json_response, raw_response, // validate_path_param, validate_pagination, validate_request_body, // foreach, webhook_verify, ui_scaffold, ui_scaffold_analyze, // dlq_send, dlq_replay, retry_with_backoff, circuit_breaker (wrapping). @@ -65,6 +65,7 @@ func New() *Plugin { "step.db_query", "step.db_exec", "step.json_response", + "step.raw_response", "step.workflow_call", "step.validate_path_param", "step.validate_pagination", @@ -111,12 +112,13 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.delegate": wrapStepFactory(module.NewDelegateStepFactory()), "step.jq": wrapStepFactory(module.NewJQStepFactory()), "step.publish": wrapStepFactory(module.NewPublishStepFactory()), - "step.event_publish": wrapStepFactory(module.NewEventPublishStepFactory()), + "step.event_publish": wrapStepFactory(module.NewEventPublishStepFactory()), "step.http_call": wrapStepFactory(module.NewHTTPCallStepFactory()), "step.request_parse": wrapStepFactory(module.NewRequestParseStepFactory()), "step.db_query": wrapStepFactory(module.NewDBQueryStepFactory()), "step.db_exec": wrapStepFactory(module.NewDBExecStepFactory()), "step.json_response": wrapStepFactory(module.NewJSONResponseStepFactory()), + "step.raw_response": wrapStepFactory(module.NewRawResponseStepFactory()), "step.validate_path_param": wrapStepFactory(module.NewValidatePathParamStepFactory()), "step.validate_pagination": wrapStepFactory(module.NewValidatePaginationStepFactory()), "step.validate_request_body": wrapStepFactory(module.NewValidateRequestBodyStepFactory()), @@ -125,7 +127,7 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.foreach": wrapStepFactory(module.NewForEachStepFactory(func() *module.StepRegistry { return p.concreteStepRegistry })), - "step.webhook_verify": wrapStepFactory(module.NewWebhookVerifyStepFactory()), + "step.webhook_verify": wrapStepFactory(module.NewWebhookVerifyStepFactory()), "step.cache_get": wrapStepFactory(module.NewCacheGetStepFactory()), "step.cache_set": wrapStepFactory(module.NewCacheSetStepFactory()), "step.cache_delete": wrapStepFactory(module.NewCacheDeleteStepFactory()), diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index df833648..71d4491a 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -45,6 +45,7 @@ func TestStepFactories(t *testing.T) { "step.db_query", "step.db_exec", "step.json_response", + "step.raw_response", "step.validate_path_param", "step.validate_pagination", "step.validate_request_body", @@ -80,7 +81,7 @@ func TestPluginLoads(t *testing.T) { } steps := loader.StepFactories() - if len(steps) != 28 { - t.Fatalf("expected 28 step factories after load, got %d", len(steps)) + if len(steps) != 29 { + t.Fatalf("expected 29 step factories after load, got %d", len(steps)) } }