diff --git a/module/pipeline_step_request_parse.go b/module/pipeline_step_request_parse.go index 3c718310..4fdfafd3 100644 --- a/module/pipeline_step_request_parse.go +++ b/module/pipeline_step_request_parse.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io" "net/http" + "net/url" "strings" "github.com/CrisisTextLine/modular" @@ -155,12 +156,41 @@ func (s *RequestParseStep) Execute(_ context.Context, pc *PipelineContext) (*Ste output["body"] = body } else { req, _ := pc.Metadata["_http_request"].(*http.Request) - if req != nil && req.Body != nil { - bodyBytes, err := io.ReadAll(req.Body) - if err == nil && len(bodyBytes) > 0 { - var bodyData map[string]any - if json.Unmarshal(bodyBytes, &bodyData) == nil { - output["body"] = bodyData + if req != nil { + // Prefer cached raw body (set by a prior step, e.g. step.webhook_verify) + // to avoid consuming req.Body a second time. + var bodyBytes []byte + if cached, ok := pc.Metadata["_raw_body"].([]byte); ok && len(cached) > 0 { + bodyBytes = cached + } else if req.Body != nil { + b, err := io.ReadAll(req.Body) + if err == nil && len(b) > 0 { + bodyBytes = b + pc.Metadata["_raw_body"] = bodyBytes + } + } + if len(bodyBytes) > 0 { + ct := req.Header.Get("Content-Type") + if idx := strings.Index(ct, ";"); idx != -1 { + ct = strings.TrimSpace(ct[:idx]) + } + if strings.EqualFold(ct, "application/x-www-form-urlencoded") { + if formValues, parseErr := url.ParseQuery(string(bodyBytes)); parseErr == nil { + bodyData := make(map[string]any) + for k, v := range formValues { + if len(v) == 1 { + bodyData[k] = v[0] + } else { + bodyData[k] = v + } + } + output["body"] = bodyData + } + } else { + var bodyData map[string]any + if json.Unmarshal(bodyBytes, &bodyData) == nil { + output["body"] = bodyData + } } } } diff --git a/module/pipeline_step_request_parse_test.go b/module/pipeline_step_request_parse_test.go index e89080b6..aa1cdd42 100644 --- a/module/pipeline_step_request_parse_test.go +++ b/module/pipeline_step_request_parse_test.go @@ -188,6 +188,204 @@ func TestRequestParseStep_WildcardPathParam_SingleSegment(t *testing.T) { } } +func TestRequestParseStep_ParseBody_FormURLEncoded(t *testing.T) { + factory := NewRequestParseStepFactory() + step, err := factory("parse-form", map[string]any{ + "parse_body": true, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := bytes.NewBufferString(`Body=Hello&From=%2B15551234567&To=%2B15559876543&MessageSid=SM1234`) + req, _ := http.NewRequest("POST", "/webhook", body) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + bodyData, ok := result.Output["body"].(map[string]any) + if !ok { + t.Fatal("expected body in output") + } + if bodyData["Body"] != "Hello" { + t.Errorf("expected Body='Hello', got %v", bodyData["Body"]) + } + if bodyData["From"] != "+15551234567" { + t.Errorf("expected From='+15551234567', got %v", bodyData["From"]) + } + if bodyData["To"] != "+15559876543" { + t.Errorf("expected To='+15559876543', got %v", bodyData["To"]) + } + if bodyData["MessageSid"] != "SM1234" { + t.Errorf("expected MessageSid='SM1234', got %v", bodyData["MessageSid"]) + } + + // Raw body should be cached in metadata + rawBody, ok := pc.Metadata["_raw_body"].([]byte) + if !ok { + t.Fatal("expected _raw_body in metadata") + } + if string(rawBody) != `Body=Hello&From=%2B15551234567&To=%2B15559876543&MessageSid=SM1234` { + t.Errorf("unexpected _raw_body: %s", rawBody) + } +} + +func TestRequestParseStep_ParseBody_FormURLEncoded_MultiValue(t *testing.T) { + factory := NewRequestParseStepFactory() + step, err := factory("parse-form-multi", map[string]any{ + "parse_body": true, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := bytes.NewBufferString(`tag=foo&tag=bar&name=test`) + req, _ := http.NewRequest("POST", "/webhook", body) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + bodyData, ok := result.Output["body"].(map[string]any) + if !ok { + t.Fatal("expected body in output") + } + // Single value should be a string + if bodyData["name"] != "test" { + t.Errorf("expected name='test', got %v", bodyData["name"]) + } + // Multiple values should be []string + tags, ok := bodyData["tag"].([]string) + if !ok { + t.Fatalf("expected tag to be []string, got %T", bodyData["tag"]) + } + if len(tags) != 2 || tags[0] != "foo" || tags[1] != "bar" { + t.Errorf("expected tag=['foo','bar'], got %v", tags) + } +} + +func TestRequestParseStep_ParseBody_FormURLEncoded_ContentTypeWithCharset(t *testing.T) { + factory := NewRequestParseStepFactory() + step, err := factory("parse-form-charset", map[string]any{ + "parse_body": true, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + body := bytes.NewBufferString(`key=value`) + req, _ := http.NewRequest("POST", "/webhook", body) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8") + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + bodyData, ok := result.Output["body"].(map[string]any) + if !ok { + t.Fatal("expected body in output") + } + if bodyData["key"] != "value" { + t.Errorf("expected key='value', got %v", bodyData["key"]) + } +} + +func TestRequestParseStep_ParseBody_FormURLEncoded_CachedRawBody(t *testing.T) { + // Simulate scenario where req.Body has already been consumed by a prior step + // (e.g. step.webhook_verify) and the raw bytes are cached in _raw_body. + factory := NewRequestParseStepFactory() + step, err := factory("parse-form-cached", map[string]any{ + "parse_body": true, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + rawBody := `Body=Hello&From=%2B15551234567` + // req.Body is empty/consumed (simulate body already read) + req, _ := http.NewRequest("POST", "/webhook", bytes.NewBufferString("")) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + "_raw_body": []byte(rawBody), + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + bodyData, ok := result.Output["body"].(map[string]any) + if !ok { + t.Fatal("expected body in output when reading from _raw_body cache") + } + if bodyData["Body"] != "Hello" { + t.Errorf("expected Body='Hello', got %v", bodyData["Body"]) + } + if bodyData["From"] != "+15551234567" { + t.Errorf("expected From='+15551234567', got %v", bodyData["From"]) + } +} + +func TestRequestParseStep_ParseBody_JSON_CachesRawBody(t *testing.T) { + // Verify that reading a JSON body also caches the raw bytes in _raw_body. + factory := NewRequestParseStepFactory() + step, err := factory("parse-json-cache", map[string]any{ + "parse_body": true, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + bodyStr := `{"name":"test"}` + req, _ := http.NewRequest("POST", "/api/resource", bytes.NewBufferString(bodyStr)) + req.Header.Set("Content-Type", "application/json") + + pc := NewPipelineContext(nil, map[string]any{ + "_http_request": req, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + bodyData, ok := result.Output["body"].(map[string]any) + if !ok { + t.Fatal("expected body in output") + } + if bodyData["name"] != "test" { + t.Errorf("expected name='test', got %v", bodyData["name"]) + } + + rawBody, ok := pc.Metadata["_raw_body"].([]byte) + if !ok { + t.Fatal("expected _raw_body cached in metadata for JSON body") + } + if string(rawBody) != bodyStr { + t.Errorf("unexpected _raw_body: %s", rawBody) + } +} + func TestRequestParseStep_EmptyConfig(t *testing.T) { factory := NewRequestParseStepFactory() step, err := factory("parse-empty", map[string]any{}, nil)