diff --git a/example/go.mod b/example/go.mod index ca153a59..5665b04a 100644 --- a/example/go.mod +++ b/example/go.mod @@ -22,7 +22,7 @@ require ( github.com/BurntSushi/toml v1.6.0 // indirect github.com/CrisisTextLine/modular/modules/auth v0.4.0 // indirect github.com/CrisisTextLine/modular/modules/cache v0.4.0 // indirect - github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.0.0 // indirect + github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.1.0 // indirect github.com/CrisisTextLine/modular/modules/jsonschema v1.4.0 // indirect github.com/CrisisTextLine/modular/modules/reverseproxy/v2 v2.2.0 // indirect github.com/CrisisTextLine/modular/modules/scheduler v0.4.0 // indirect @@ -31,7 +31,7 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.55.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0 // indirect - github.com/IBM/sarama v1.46.3 // indirect + github.com/IBM/sarama v1.47.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/aws/aws-sdk-go-v2 v1.41.2 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect @@ -77,7 +77,6 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/eapache/go-resiliency v1.7.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/envoyproxy/go-control-plane/envoy v1.36.0 // indirect @@ -95,7 +94,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.3.1 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/golobby/cast v1.3.3 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.7.0 // indirect @@ -133,7 +131,7 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 // indirect - github.com/klauspost/compress v1.18.3 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -149,7 +147,7 @@ require ( github.com/ncruces/go-strftime v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect - github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/pierrec/lz4/v4 v4.1.25 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/prometheus/client_golang v1.19.1 // indirect @@ -184,7 +182,7 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.48.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect - golang.org/x/net v0.50.0 // indirect + golang.org/x/net v0.51.0 // indirect golang.org/x/oauth2 v0.35.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect diff --git a/example/go.sum b/example/go.sum index e7af08ac..4b7d528c 100644 --- a/example/go.sum +++ b/example/go.sum @@ -32,8 +32,8 @@ github.com/CrisisTextLine/modular/modules/cache v0.4.0 h1:vlPXAsucSM1M0RsPly9cWy github.com/CrisisTextLine/modular/modules/cache v0.4.0/go.mod h1:4irZOGXxUlgJqAnWlpMyPC3C1tM/f5145/wMThYnAsY= github.com/CrisisTextLine/modular/modules/eventbus v1.7.0 h1:SSeu7rjuECDgFa+iNyndn94YPQxffHxJgfR7U4psz6E= github.com/CrisisTextLine/modular/modules/eventbus v1.7.0/go.mod h1:I1tGf3DmadwyMP2NE2m6XHYl9ebXB9wBc/KZLywTR4c= -github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.0.0 h1:bDNWBparvVzXnhLxjFPJ9MDg7N4NUnNOjfn56G/CwGU= -github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.0.0/go.mod h1:5DmacIYrhhiN18i2OHyAVBiNKbN2jHuEv2UJoRToMg0= +github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.1.0 h1:jCG/5cuCITnGH4ztOrU5vY00+ykP9j+RL0zXy/CX1ak= +github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.1.0/go.mod h1:5DmacIYrhhiN18i2OHyAVBiNKbN2jHuEv2UJoRToMg0= github.com/CrisisTextLine/modular/modules/jsonschema v1.4.0 h1:NIhTrDgjhGwMi2D0ukGsd3n/M1W807u6Rhlqm89Sj8Q= github.com/CrisisTextLine/modular/modules/jsonschema v1.4.0/go.mod h1:TeM3mt/+1X5VmlWF4nZpgp4qCGPmAahQs5jAzuWLbOo= github.com/CrisisTextLine/modular/modules/reverseproxy/v2 v2.2.0 h1:SUJEPA61IbjdUwKdSembQTbX9rKz5v4vmyr/cbvb4tY= @@ -52,8 +52,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.55.0/go.mod h1:vB2GH9GAYYJTO3mEn8oYwzEdhlayZIdQz6zdzgUIRvA= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0 h1:0s6TxfCu2KHkkZPnBfsQ2y5qia0jl3MMrmBhu3nCOYk= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc= -github.com/IBM/sarama v1.46.3 h1:njRsX6jNlnR+ClJ8XmkO+CM4unbrNr/2vB5KK6UA+IE= -github.com/IBM/sarama v1.46.3/go.mod h1:GTUYiF9DMOZVe3FwyGT+dtSPceGFIgA+sPc5u6CBwko= +github.com/IBM/sarama v1.47.0 h1:GcQFEd12+KzfPYeLgN69Fh7vLCtYRhVIx0rO4TZO318= +github.com/IBM/sarama v1.47.0/go.mod h1:7gLLIU97nznOmA6TX++Qds+DRxH89P2XICY2KAQUzAY= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= @@ -91,8 +91,8 @@ github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0 h1:hggRKpv26DpYMOik3wWo1Ty5MkAN github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0/go.mod h1:pMlGFDpHoLTJOIZHGdJOAWmi+xeIlQXuFTuQxs1epYE= github.com/aws/aws-sdk-go-v2/service/eks v1.80.0 h1:moQGV8cPbVTN7r2Xte1Mybku35QDePSJEd3onYVmBtY= github.com/aws/aws-sdk-go-v2/service/eks v1.80.0/go.mod h1:Qg678m+87sCuJhcsZojenz8mblYG+Tq86V4m3hjVz0s= -github.com/aws/aws-sdk-go-v2/service/iam v1.53.2 h1:62G6btFUwAa5uR5iPlnlNVAM0zJSLbWgDfKOfUC7oW4= -github.com/aws/aws-sdk-go-v2/service/iam v1.53.2/go.mod h1:av9clChrbZbJ5E21msSsiT2oghl2BJHfQGhCkXmhyu8= +github.com/aws/aws-sdk-go-v2/service/iam v1.53.3 h1:boKZv8dNdHznhAA68hb/dqFz5pxoWmRAOJr9LtscVCI= +github.com/aws/aws-sdk-go-v2/service/iam v1.53.3/go.mod h1:E0QHh3aEwxYb7xshjvxYDELiOda7KBYJ77e/TvGhpcM= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.5 h1:CeY9LUdur+Dxoeldqoun6y4WtJ3RQtzk0JMP2gfUay0= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.5/go.mod h1:AZLZf2fMaahW5s/wMRciu1sYbdsikT/UHwbUjOdEVTc= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 h1:Z5EiPIzXKewUQK0QTMkutjiaPVeVYXX7KIqhXu/0fXs= @@ -168,8 +168,6 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= -github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= -github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= @@ -221,8 +219,6 @@ github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArs github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golobby/cast v1.3.3 h1:s2Lawb9RMz7YyYf8IrfMQY4IFmA1R/lgfmj97Vc6fig= github.com/golobby/cast v1.3.3/go.mod h1:0oDO5IT84HTXcbLDf1YXuk0xtg/cRDrxhbpWKxwtJCY= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= @@ -323,8 +319,8 @@ github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 h1:9Nu54bhS/H/ github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12/go.mod h1:TBzl5BIHNXfS9+C35ZyJaklL7mLDbgUkcgXzSLa8Tk0= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= -github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -383,8 +379,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -514,8 +510,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= -golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= +golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ= golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/module/pipeline_step_http_call.go b/module/pipeline_step_http_call.go index 3a05b539..98fcc86f 100644 --- a/module/pipeline_step_http_call.go +++ b/module/pipeline_step_http_call.go @@ -100,6 +100,7 @@ type HTTPCallStep struct { method string headers map[string]string body map[string]any + bodyFrom string // dot-path into pc.Current or a prior step result via "steps...."; if set, the resolved value is used as the request body (strings/[]byte sent as-is, other types JSON-marshaled) timeout time.Duration tmpl *TemplateEngine auth *oauthConfig @@ -142,6 +143,10 @@ func NewHTTPCallStepFactory() StepFactory { step.body = body } + if bodyFrom, ok := config["body_from"].(string); ok { + step.bodyFrom = bodyFrom + } + if timeout, ok := config["timeout"].(string); ok && timeout != "" { if d, err := time.ParseDuration(timeout); err == nil { step.timeout = d @@ -284,36 +289,56 @@ func (s *HTTPCallStep) getToken(ctx context.Context) (string, error) { } // buildBodyReader constructs the request body reader from the step configuration. -func (s *HTTPCallStep) buildBodyReader(pc *PipelineContext) (io.Reader, error) { +func (s *HTTPCallStep) buildBodyReader(pc *PipelineContext) (io.Reader, bool, error) { + if s.bodyFrom != "" { + val := resolveBodyFrom(s.bodyFrom, pc) + switch v := val.(type) { + case []byte: + return bytes.NewReader(v), true, nil + case string: + return strings.NewReader(v), true, nil + case nil: + return nil, true, nil + default: + data, marshalErr := json.Marshal(v) + if marshalErr != nil { + return nil, false, fmt.Errorf("http_call step %q: failed to marshal body_from value: %w", s.name, marshalErr) + } + return bytes.NewReader(data), false, nil + } + } if s.body != nil { resolvedBody, resolveErr := s.tmpl.ResolveMap(s.body, pc) if resolveErr != nil { - return nil, fmt.Errorf("http_call step %q: failed to resolve body: %w", s.name, resolveErr) + return nil, false, fmt.Errorf("http_call step %q: failed to resolve body: %w", s.name, resolveErr) } data, marshalErr := json.Marshal(resolvedBody) if marshalErr != nil { - return nil, fmt.Errorf("http_call step %q: failed to marshal body: %w", s.name, marshalErr) + return nil, false, fmt.Errorf("http_call step %q: failed to marshal body: %w", s.name, marshalErr) } - return bytes.NewReader(data), nil + return bytes.NewReader(data), false, nil } if s.method != "GET" && s.method != "HEAD" { data, marshalErr := json.Marshal(pc.Current) if marshalErr != nil { - return nil, fmt.Errorf("http_call step %q: failed to marshal current data: %w", s.name, marshalErr) + return nil, false, fmt.Errorf("http_call step %q: failed to marshal current data: %w", s.name, marshalErr) } - return bytes.NewReader(data), nil + return bytes.NewReader(data), false, nil } - return nil, nil + return nil, false, nil } // buildRequest constructs the HTTP request with resolved headers and optional bearer token. -func (s *HTTPCallStep) buildRequest(ctx context.Context, resolvedURL string, bodyReader io.Reader, pc *PipelineContext, bearerToken string) (*http.Request, error) { +// rawBody, when true, indicates that the request body is a raw value (string/[]byte/nil, +// typically provided via body_from) and should not have its Content-Type automatically +// overridden with application/json. +func (s *HTTPCallStep) buildRequest(ctx context.Context, resolvedURL string, bodyReader io.Reader, rawBody bool, pc *PipelineContext, bearerToken string) (*http.Request, error) { req, err := http.NewRequestWithContext(ctx, s.method, resolvedURL, bodyReader) if err != nil { return nil, fmt.Errorf("http_call step %q: failed to create request: %w", s.name, err) } - if bodyReader != nil { + if bodyReader != nil && !rawBody { req.Header.Set("Content-Type", "application/json") } for k, v := range s.headers { @@ -373,7 +398,7 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR return nil, fmt.Errorf("http_call step %q: failed to resolve url: %w", s.name, err) } - bodyReader, err := s.buildBodyReader(pc) + bodyReader, rawBody, err := s.buildBodyReader(pc) if err != nil { return nil, err } @@ -387,7 +412,7 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR } } - req, err := s.buildRequest(ctx, resolvedURL, bodyReader, pc, bearerToken) + req, err := s.buildRequest(ctx, resolvedURL, bodyReader, rawBody, pc, bearerToken) if err != nil { return nil, err } @@ -413,11 +438,11 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR return nil, tokenErr } - retryBody, buildErr := s.buildBodyReader(pc) + retryBody, rawBody2, buildErr := s.buildBodyReader(pc) if buildErr != nil { return nil, buildErr } - retryReq, buildErr := s.buildRequest(ctx, resolvedURL, retryBody, pc, newToken) + retryReq, buildErr := s.buildRequest(ctx, resolvedURL, retryBody, rawBody2, pc, newToken) if buildErr != nil { return nil, buildErr } diff --git a/module/pipeline_step_http_call_test.go b/module/pipeline_step_http_call_test.go index e5c4a29f..36260b8a 100644 --- a/module/pipeline_step_http_call_test.go +++ b/module/pipeline_step_http_call_test.go @@ -1,9 +1,11 @@ package module import ( + "bytes" "context" "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" "strings" @@ -501,3 +503,208 @@ func TestHTTPCallStep_OAuth2_ConcurrentFetch(t *testing.T) { t.Errorf("expected exactly 1 token request via singleflight, got %d", n) } } + +// TestHTTPCallStep_BodyFrom_String verifies that body_from with a string value sends raw bytes +// without JSON-encoding and without auto-setting Content-Type: application/json. +func TestHTTPCallStep_BodyFrom_String(t *testing.T) { + type captured struct { + body []byte + contentType string + } + ch := make(chan captured, 1) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("failed to read request body: %v", err) + } + ch <- captured{body: b, contentType: r.Header.Get("Content-Type")} + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"ok":true}`)) + })) + defer srv.Close() + + factory := NewHTTPCallStepFactory() + step, err := factory("body-from-string", map[string]any{ + "url": srv.URL, + "method": "POST", + "body_from": "raw_payload", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*HTTPCallStep).httpClient = srv.Client() + + pc := NewPipelineContext(nil, nil) + pc.Current["raw_payload"] = `{"hello":"world"}` + + if _, err := step.Execute(context.Background(), pc); err != nil { + t.Fatalf("execute error: %v", err) + } + + got := <-ch + if string(got.body) != `{"hello":"world"}` { + t.Errorf("expected raw body %q, got %q", `{"hello":"world"}`, string(got.body)) + } + // Content-Type should NOT be auto-set to application/json for raw bodies + if got.contentType == "application/json" { + t.Errorf("expected Content-Type not to be application/json for body_from, got %q", got.contentType) + } +} + +// TestHTTPCallStep_BodyFrom_Bytes verifies that body_from with a []byte value sends raw bytes. +func TestHTTPCallStep_BodyFrom_Bytes(t *testing.T) { + ch := make(chan []byte, 1) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("failed to read request body: %v", err) + } + ch <- b + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + + factory := NewHTTPCallStepFactory() + step, err := factory("body-from-bytes", map[string]any{ + "url": srv.URL, + "method": "POST", + "body_from": "raw_data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*HTTPCallStep).httpClient = srv.Client() + + pc := NewPipelineContext(nil, nil) + pc.Current["raw_data"] = []byte("binary\x00data") + + if _, err := step.Execute(context.Background(), pc); err != nil { + t.Fatalf("execute error: %v", err) + } + + gotBody := <-ch + if !bytes.Equal(gotBody, []byte("binary\x00data")) { + t.Errorf("expected raw bytes, got %q", string(gotBody)) + } +} + +// TestHTTPCallStep_BodyFrom_StepOutput verifies that body_from can resolve from step outputs. +func TestHTTPCallStep_BodyFrom_StepOutput(t *testing.T) { + ch := make(chan []byte, 1) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("failed to read request body: %v", err) + } + ch <- b + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + + factory := NewHTTPCallStepFactory() + step, err := factory("body-from-step", map[string]any{ + "url": srv.URL, + "method": "POST", + "body_from": "steps.parse.raw_body", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*HTTPCallStep).httpClient = srv.Client() + + pc := NewPipelineContext(nil, nil) + pc.StepOutputs["parse"] = map[string]any{ + "raw_body": `{"event":"push"}`, + } + + if _, err := step.Execute(context.Background(), pc); err != nil { + t.Fatalf("execute error: %v", err) + } + + gotBody := <-ch + if string(gotBody) != `{"event":"push"}` { + t.Errorf("expected raw body from step output, got %q", string(gotBody)) + } +} + +// TestHTTPCallStep_BodyFrom_ContentTypeOverride verifies that Content-Type set in headers +// takes effect even with body_from. +func TestHTTPCallStep_BodyFrom_ContentTypeOverride(t *testing.T) { + ch := make(chan string, 1) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ch <- r.Header.Get("Content-Type") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + + factory := NewHTTPCallStepFactory() + step, err := factory("body-from-ct", map[string]any{ + "url": srv.URL, + "method": "POST", + "body_from": "payload", + "headers": map[string]any{ + "Content-Type": "application/xml", + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*HTTPCallStep).httpClient = srv.Client() + + pc := NewPipelineContext(nil, nil) + pc.Current["payload"] = `1` + + if _, err := step.Execute(context.Background(), pc); err != nil { + t.Fatalf("execute error: %v", err) + } + + gotCT := <-ch + if gotCT != "application/xml" { + t.Errorf("expected Content-Type application/xml, got %q", gotCT) + } +} + +// TestHTTPCallStep_BodyFrom_NilValue verifies that body_from with a missing path sends no body. +func TestHTTPCallStep_BodyFrom_NilValue(t *testing.T) { + ch := make(chan []byte, 1) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("failed to read request body: %v", err) + } + ch <- b + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + })) + defer srv.Close() + + factory := NewHTTPCallStepFactory() + step, err := factory("body-from-nil", map[string]any{ + "url": srv.URL, + "method": "POST", + "body_from": "nonexistent.path", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*HTTPCallStep).httpClient = srv.Client() + + pc := NewPipelineContext(nil, nil) + + if _, err := step.Execute(context.Background(), pc); err != nil { + t.Fatalf("execute error: %v", err) + } + + gotBody := <-ch + if len(gotBody) != 0 { + t.Errorf("expected empty body for nil body_from, got %q", string(gotBody)) + } +}