Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions module/pipeline_step_request_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"io"
"net/http"
"net/url"
"strings"

"github.com/CrisisTextLine/modular"
Expand Down Expand Up @@ -155,12 +156,41 @@ func (s *RequestParseStep) Execute(_ context.Context, pc *PipelineContext) (*Ste
output["body"] = body
} else {
req, _ := pc.Metadata["_http_request"].(*http.Request)
if req != nil && req.Body != nil {
bodyBytes, err := io.ReadAll(req.Body)
if err == nil && len(bodyBytes) > 0 {
var bodyData map[string]any
if json.Unmarshal(bodyBytes, &bodyData) == nil {
output["body"] = bodyData
if req != nil {
// Prefer cached raw body (set by a prior step, e.g. step.webhook_verify)
// to avoid consuming req.Body a second time.
var bodyBytes []byte
if cached, ok := pc.Metadata["_raw_body"].([]byte); ok && len(cached) > 0 {
bodyBytes = cached
} else if req.Body != nil {
b, err := io.ReadAll(req.Body)
if err == nil && len(b) > 0 {
bodyBytes = b
pc.Metadata["_raw_body"] = bodyBytes
}
}
if len(bodyBytes) > 0 {
ct := req.Header.Get("Content-Type")
if idx := strings.Index(ct, ";"); idx != -1 {
ct = strings.TrimSpace(ct[:idx])
}
if strings.EqualFold(ct, "application/x-www-form-urlencoded") {
if formValues, parseErr := url.ParseQuery(string(bodyBytes)); parseErr == nil {
bodyData := make(map[string]any)
for k, v := range formValues {
if len(v) == 1 {
bodyData[k] = v[0]
} else {
bodyData[k] = v
}
}
output["body"] = bodyData
}
} else {
var bodyData map[string]any
if json.Unmarshal(bodyBytes, &bodyData) == nil {
output["body"] = bodyData
}
}
}
}
Expand Down
198 changes: 198 additions & 0 deletions module/pipeline_step_request_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,204 @@ func TestRequestParseStep_WildcardPathParam_SingleSegment(t *testing.T) {
}
}

func TestRequestParseStep_ParseBody_FormURLEncoded(t *testing.T) {
factory := NewRequestParseStepFactory()
step, err := factory("parse-form", map[string]any{
"parse_body": true,
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

body := bytes.NewBufferString(`Body=Hello&From=%2B15551234567&To=%2B15559876543&MessageSid=SM1234`)
req, _ := http.NewRequest("POST", "/webhook", body)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

pc := NewPipelineContext(nil, map[string]any{
"_http_request": req,
})

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

bodyData, ok := result.Output["body"].(map[string]any)
if !ok {
t.Fatal("expected body in output")
}
if bodyData["Body"] != "Hello" {
t.Errorf("expected Body='Hello', got %v", bodyData["Body"])
}
if bodyData["From"] != "+15551234567" {
t.Errorf("expected From='+15551234567', got %v", bodyData["From"])
}
if bodyData["To"] != "+15559876543" {
t.Errorf("expected To='+15559876543', got %v", bodyData["To"])
}
if bodyData["MessageSid"] != "SM1234" {
t.Errorf("expected MessageSid='SM1234', got %v", bodyData["MessageSid"])
}

// Raw body should be cached in metadata
rawBody, ok := pc.Metadata["_raw_body"].([]byte)
if !ok {
t.Fatal("expected _raw_body in metadata")
}
if string(rawBody) != `Body=Hello&From=%2B15551234567&To=%2B15559876543&MessageSid=SM1234` {
t.Errorf("unexpected _raw_body: %s", rawBody)
}
}
Comment on lines +230 to +238
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s no test covering the common “body was already read and cached” flow: when pc.Metadata["_raw_body"] is set (e.g. by step.webhook_verify) and req.Body is empty/consumed, request_parse should still parse from the cached bytes. Adding a test for that scenario would prevent regressions and validate the intended raw-body-cache interoperability.

Copilot generated this review using guidance from organization custom instructions.

func TestRequestParseStep_ParseBody_FormURLEncoded_MultiValue(t *testing.T) {
factory := NewRequestParseStepFactory()
step, err := factory("parse-form-multi", map[string]any{
"parse_body": true,
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

body := bytes.NewBufferString(`tag=foo&tag=bar&name=test`)
req, _ := http.NewRequest("POST", "/webhook", body)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

pc := NewPipelineContext(nil, map[string]any{
"_http_request": req,
})

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

bodyData, ok := result.Output["body"].(map[string]any)
if !ok {
t.Fatal("expected body in output")
}
// Single value should be a string
if bodyData["name"] != "test" {
t.Errorf("expected name='test', got %v", bodyData["name"])
}
// Multiple values should be []string
tags, ok := bodyData["tag"].([]string)
if !ok {
t.Fatalf("expected tag to be []string, got %T", bodyData["tag"])
}
if len(tags) != 2 || tags[0] != "foo" || tags[1] != "bar" {
t.Errorf("expected tag=['foo','bar'], got %v", tags)
}
}

func TestRequestParseStep_ParseBody_FormURLEncoded_ContentTypeWithCharset(t *testing.T) {
factory := NewRequestParseStepFactory()
step, err := factory("parse-form-charset", map[string]any{
"parse_body": true,
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

body := bytes.NewBufferString(`key=value`)
req, _ := http.NewRequest("POST", "/webhook", body)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")

pc := NewPipelineContext(nil, map[string]any{
"_http_request": req,
})

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

bodyData, ok := result.Output["body"].(map[string]any)
if !ok {
t.Fatal("expected body in output")
}
if bodyData["key"] != "value" {
t.Errorf("expected key='value', got %v", bodyData["key"])
}
}

func TestRequestParseStep_ParseBody_FormURLEncoded_CachedRawBody(t *testing.T) {
// Simulate scenario where req.Body has already been consumed by a prior step
// (e.g. step.webhook_verify) and the raw bytes are cached in _raw_body.
factory := NewRequestParseStepFactory()
step, err := factory("parse-form-cached", map[string]any{
"parse_body": true,
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

rawBody := `Body=Hello&From=%2B15551234567`
// req.Body is empty/consumed (simulate body already read)
req, _ := http.NewRequest("POST", "/webhook", bytes.NewBufferString(""))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

pc := NewPipelineContext(nil, map[string]any{
"_http_request": req,
"_raw_body": []byte(rawBody),
})

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

bodyData, ok := result.Output["body"].(map[string]any)
if !ok {
t.Fatal("expected body in output when reading from _raw_body cache")
}
if bodyData["Body"] != "Hello" {
t.Errorf("expected Body='Hello', got %v", bodyData["Body"])
}
if bodyData["From"] != "+15551234567" {
t.Errorf("expected From='+15551234567', got %v", bodyData["From"])
}
}

func TestRequestParseStep_ParseBody_JSON_CachesRawBody(t *testing.T) {
// Verify that reading a JSON body also caches the raw bytes in _raw_body.
factory := NewRequestParseStepFactory()
step, err := factory("parse-json-cache", map[string]any{
"parse_body": true,
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

bodyStr := `{"name":"test"}`
req, _ := http.NewRequest("POST", "/api/resource", bytes.NewBufferString(bodyStr))
req.Header.Set("Content-Type", "application/json")

pc := NewPipelineContext(nil, map[string]any{
"_http_request": req,
})

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

bodyData, ok := result.Output["body"].(map[string]any)
if !ok {
t.Fatal("expected body in output")
}
if bodyData["name"] != "test" {
t.Errorf("expected name='test', got %v", bodyData["name"])
}

rawBody, ok := pc.Metadata["_raw_body"].([]byte)
if !ok {
t.Fatal("expected _raw_body cached in metadata for JSON body")
}
if string(rawBody) != bodyStr {
t.Errorf("unexpected _raw_body: %s", rawBody)
}
}

func TestRequestParseStep_EmptyConfig(t *testing.T) {
factory := NewRequestParseStepFactory()
step, err := factory("parse-empty", map[string]any{}, nil)
Expand Down
Loading