From 9a04afa83b21b18fec43f98b8aa910a5b0b18e65 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 11:26:54 +0000 Subject: [PATCH 1/3] Initial plan From 1b96de84949cb166b928909f9f927b630070cdcd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 11:37:52 +0000 Subject: [PATCH 2/3] fix: handle additionalProperties boolean shorthand in openapi module Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/openapi.go | 109 ++++++++++++--- module/openapi_test.go | 292 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 382 insertions(+), 19 deletions(-) diff --git a/module/openapi.go b/module/openapi.go index 2d541cc9..54bc0f3e 100644 --- a/module/openapi.go +++ b/module/openapi.go @@ -102,22 +102,81 @@ type openAPIResponse struct { Content map[string]openAPIMediaType `yaml:"content" json:"content"` } +// openAPIAdditionalProperties represents the OpenAPI / JSON Schema +// "additionalProperties" keyword, which may be either a boolean shorthand +// (true = allow any extra keys, false = forbid extra keys) or a full Schema +// object that every additional key's value must satisfy. +type openAPIAdditionalProperties struct { + // Bool is set when the YAML/JSON value is a plain boolean. + // When Schema is non-nil, Bool is ignored. + Bool bool + Schema *openAPISchema +} + +// UnmarshalYAML handles both the boolean shorthand and the object form. +func (a *openAPIAdditionalProperties) UnmarshalYAML(value *yaml.Node) error { + // Boolean shorthand: additionalProperties: true / false + if value.Kind == yaml.ScalarNode && value.Tag == "!!bool" { + var b bool + if err := value.Decode(&b); err != nil { + return err + } + a.Bool = b + a.Schema = nil + return nil + } + // Object form: additionalProperties: { type: string, … } + var s openAPISchema + if err := value.Decode(&s); err != nil { + return err + } + a.Schema = &s + return nil +} + +// UnmarshalJSON handles both the boolean shorthand and the object form. +func (a *openAPIAdditionalProperties) UnmarshalJSON(data []byte) error { + if len(data) > 0 && (data[0] == 't' || data[0] == 'f') { + var b bool + if err := json.Unmarshal(data, &b); err != nil { + return err + } + a.Bool = b + a.Schema = nil + return nil + } + var s openAPISchema + if err := json.Unmarshal(data, &s); err != nil { + return err + } + a.Schema = &s + return nil +} + +// MarshalJSON serialises back to the compact boolean or object form. +func (a openAPIAdditionalProperties) MarshalJSON() ([]byte, error) { + if a.Schema == nil { + return json.Marshal(a.Bool) + } + return json.Marshal(a.Schema) +} + // openAPISchema is a minimal JSON Schema subset used for parameter/body validation. type openAPISchema struct { - Type string `yaml:"type" json:"type"` - Required []string `yaml:"required" json:"required"` - Properties map[string]*openAPISchema `yaml:"properties" json:"properties"` - Format string `yaml:"format" json:"format"` - Minimum *float64 `yaml:"minimum" json:"minimum"` - Maximum *float64 `yaml:"maximum" json:"maximum"` - MinLength *int `yaml:"minLength" json:"minLength"` - MaxLength *int `yaml:"maxLength" json:"maxLength"` - Pattern string `yaml:"pattern" json:"pattern"` - Enum []any `yaml:"enum" json:"enum"` - Items *openAPISchema `yaml:"items" json:"items"` - MinItems *int `yaml:"minItems" json:"minItems"` - MaxItems *int `yaml:"maxItems" json:"maxItems"` - AdditionalProperties *openAPISchema `yaml:"additionalProperties" json:"additionalProperties"` + Type string `yaml:"type" json:"type"` + Required []string `yaml:"required" json:"required"` + Properties map[string]*openAPISchema `yaml:"properties" json:"properties"` + Format string `yaml:"format" json:"format"` + Minimum *float64 `yaml:"minimum" json:"minimum"` + Maximum *float64 `yaml:"maximum" json:"maximum"` + MinLength *int `yaml:"minLength" json:"minLength"` + MaxLength *int `yaml:"maxLength" json:"maxLength"` + Pattern string `yaml:"pattern" json:"pattern"` + Enum []any `yaml:"enum" json:"enum"` + Items *openAPISchema `yaml:"items" json:"items"` + MinItems *int `yaml:"minItems" json:"minItems"` + MaxItems *int `yaml:"maxItems" json:"maxItems"` + AdditionalProperties *openAPIAdditionalProperties `yaml:"additionalProperties" json:"additionalProperties"` } // ---- OpenAPIModule ---- @@ -982,14 +1041,26 @@ func validateJSONBody(body any, schema *openAPISchema, bodyLabel string) []strin // Validate additionalProperties: keys not declared in Properties are checked // against the additionalProperties schema when it is specified. if schema.AdditionalProperties != nil { - for key, val := range obj { - if _, defined := schema.Properties[key]; defined { - continue + ap := schema.AdditionalProperties + if ap.Schema == nil && !ap.Bool { + // additionalProperties: false — reject any key not listed in Properties + for key := range obj { + if _, defined := schema.Properties[key]; !defined { + errs = append(errs, fmt.Sprintf("additional property %q is not allowed", key)) + } } - if fieldErrs := validateJSONValue(val, key, schema.AdditionalProperties); len(fieldErrs) > 0 { - errs = append(errs, fieldErrs...) + } else if ap.Schema != nil { + // additionalProperties: — validate each extra key against the schema + for key, val := range obj { + if _, defined := schema.Properties[key]; defined { + continue + } + if fieldErrs := validateJSONValue(val, key, ap.Schema); len(fieldErrs) > 0 { + errs = append(errs, fieldErrs...) + } } } + // additionalProperties: true — any extra key is allowed; nothing to validate } return errs } diff --git a/module/openapi_test.go b/module/openapi_test.go index 276bad30..d8b1bd52 100644 --- a/module/openapi_test.go +++ b/module/openapi_test.go @@ -11,6 +11,8 @@ import ( "path/filepath" "strings" "testing" + + "gopkg.in/yaml.v3" ) // ---- spec fixtures ---- @@ -2394,3 +2396,293 @@ func TestOpenAPIModule_ResponseValidation_DefaultAction_IsWarn(t *testing.T) { t.Errorf("expected 200 with default warn action, got %d: %s", w.Code, w.Body.String()) } } + +// ---- additionalProperties tests ---- + +// additionalPropertiesTrueYAML exercises additionalProperties: true (bool shorthand). +const additionalPropertiesTrueYAML = ` +openapi: "3.0.0" +info: + title: AdditionalProperties True + version: "1.0.0" +paths: + /metadata: + post: + operationId: postMetadata + summary: Post metadata with any extra keys + x-pipeline: metadata-pipeline + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + name: + type: string + additionalProperties: true + responses: + "200": + description: OK +` + +// additionalPropertiesFalseYAML exercises additionalProperties: false (reject extras). +const additionalPropertiesFalseYAML = ` +openapi: "3.0.0" +info: + title: AdditionalProperties False + version: "1.0.0" +paths: + /strict: + post: + operationId: postStrict + summary: Post object with no extra keys allowed + x-pipeline: strict-pipeline + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + name: + type: string + additionalProperties: false + responses: + "200": + description: OK +` + +// additionalPropertiesSchemaYAML exercises additionalProperties: . +const additionalPropertiesSchemaYAML = ` +openapi: "3.0.0" +info: + title: AdditionalProperties Schema + version: "1.0.0" +paths: + /typed-extra: + post: + operationId: postTypedExtra + summary: Post object where extra keys must be strings + x-pipeline: typed-extra-pipeline + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + name: + type: string + additionalProperties: + type: string + responses: + "200": + description: OK +` + +// TestOpenAPIModule_AdditionalProperties_True verifies that a spec with +// "additionalProperties: true" is parsed without error and that arbitrary +// extra keys in the request body are accepted. +func TestOpenAPIModule_AdditionalProperties_True(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", additionalPropertiesTrueYAML) + + mod := NewOpenAPIModule("ap-true", OpenAPIConfig{ + SpecFile: specPath, + Validation: OpenAPIValidationConfig{Request: true}, + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init with additionalProperties:true should succeed, got: %v", err) + } + + step := &stubPipelineStep{ + name: "metadata", + exec: func(_ context.Context, _ *PipelineContext) (*StepResult, error) { + return &StepResult{Output: map[string]any{"response_status": 200}, Stop: true}, nil + }, + } + pipe := &Pipeline{Name: "metadata-pipeline", Steps: []PipelineStep{step}} + mod.SetPipelineLookup(func(name string) (*Pipeline, bool) { + if name == "metadata-pipeline" { + return pipe, true + } + return nil, false + }) + + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("POST", "/metadata") + if h == nil { + t.Fatal("POST /metadata handler not found") + } + + // Body contains declared key "name" plus extra keys — all should pass + body := `{"name":"test","extra_field":"value","another_key":42}` + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPost, "/metadata", strings.NewReader(body)) + r.Header.Set("Content-Type", "application/json") + h.Handle(w, r) + + if w.Code == http.StatusBadRequest { + t.Errorf("expected extra keys to pass with additionalProperties:true, got 400: %s", w.Body.String()) + } +} + +// TestOpenAPIModule_AdditionalProperties_False verifies that extra keys are +// rejected when "additionalProperties: false" is set. +func TestOpenAPIModule_AdditionalProperties_False(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", additionalPropertiesFalseYAML) + + mod := NewOpenAPIModule("ap-false", OpenAPIConfig{ + SpecFile: specPath, + Validation: OpenAPIValidationConfig{Request: true}, + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init with additionalProperties:false should succeed, got: %v", err) + } + + step := &stubPipelineStep{ + name: "strict", + exec: func(_ context.Context, _ *PipelineContext) (*StepResult, error) { + return &StepResult{Output: map[string]any{"response_status": 200}, Stop: true}, nil + }, + } + pipe := &Pipeline{Name: "strict-pipeline", Steps: []PipelineStep{step}} + mod.SetPipelineLookup(func(name string) (*Pipeline, bool) { + if name == "strict-pipeline" { + return pipe, true + } + return nil, false + }) + + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("POST", "/strict") + if h == nil { + t.Fatal("POST /strict handler not found") + } + + // Body contains extra key "unknown" which should be rejected + body := `{"name":"test","unknown":"value"}` + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPost, "/strict", strings.NewReader(body)) + r.Header.Set("Content-Type", "application/json") + h.Handle(w, r) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 for extra key with additionalProperties:false, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestOpenAPIModule_AdditionalProperties_Schema verifies that extra keys are +// validated against the schema when "additionalProperties: {type: string}" is set. +func TestOpenAPIModule_AdditionalProperties_Schema(t *testing.T) { + specPath := writeTempSpec(t, ".yaml", additionalPropertiesSchemaYAML) + + mod := NewOpenAPIModule("ap-schema", OpenAPIConfig{ + SpecFile: specPath, + Validation: OpenAPIValidationConfig{Request: true}, + }) + if err := mod.Init(nil); err != nil { + t.Fatalf("Init with additionalProperties schema should succeed, got: %v", err) + } + + step := &stubPipelineStep{ + name: "typed-extra", + exec: func(_ context.Context, _ *PipelineContext) (*StepResult, error) { + return &StepResult{Output: map[string]any{"response_status": 200}, Stop: true}, nil + }, + } + pipe := &Pipeline{Name: "typed-extra-pipeline", Steps: []PipelineStep{step}} + mod.SetPipelineLookup(func(name string) (*Pipeline, bool) { + if name == "typed-extra-pipeline" { + return pipe, true + } + return nil, false + }) + + router := &testRouter{} + mod.RegisterRoutes(router) + + h := router.findHandler("POST", "/typed-extra") + if h == nil { + t.Fatal("POST /typed-extra handler not found") + } + + // Valid: extra key is a string + body := `{"name":"test","extra":"string-value"}` + w := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPost, "/typed-extra", strings.NewReader(body)) + r.Header.Set("Content-Type", "application/json") + h.Handle(w, r) + + if w.Code == http.StatusBadRequest { + t.Errorf("expected string extra key to pass additionalProperties:{type:string}, got 400: %s", w.Body.String()) + } +} + +// TestOpenAPIAdditionalProperties_UnmarshalYAML checks that the custom YAML +// unmarshaller handles bool and object forms correctly. +func TestOpenAPIAdditionalProperties_UnmarshalYAML(t *testing.T) { + tests := []struct { + input string + wantBool bool + wantSchema bool // whether Schema should be non-nil + wantType string // expected Schema.Type when wantSchema is true + }{ + {"true", true, false, ""}, + {"false", false, false, ""}, + {"type: string", false, true, "string"}, + {"type: integer", false, true, "integer"}, + } + + for _, tc := range tests { + var ap openAPIAdditionalProperties + if err := yaml.Unmarshal([]byte(tc.input), &ap); err != nil { + t.Errorf("UnmarshalYAML(%q): unexpected error: %v", tc.input, err) + continue + } + if ap.Bool != tc.wantBool { + t.Errorf("UnmarshalYAML(%q): Bool = %v, want %v", tc.input, ap.Bool, tc.wantBool) + } + if (ap.Schema != nil) != tc.wantSchema { + t.Errorf("UnmarshalYAML(%q): Schema non-nil = %v, want %v", tc.input, ap.Schema != nil, tc.wantSchema) + } + if tc.wantSchema && ap.Schema != nil && ap.Schema.Type != tc.wantType { + t.Errorf("UnmarshalYAML(%q): Schema.Type = %q, want %q", tc.input, ap.Schema.Type, tc.wantType) + } + } +} + +// TestOpenAPIAdditionalProperties_UnmarshalJSON checks the JSON unmarshaller. +func TestOpenAPIAdditionalProperties_UnmarshalJSON(t *testing.T) { + tests := []struct { + input string + wantBool bool + wantSchema bool + wantType string + }{ + {`true`, true, false, ""}, + {`false`, false, false, ""}, + {`{"type":"string"}`, false, true, "string"}, + } + + for _, tc := range tests { + var ap openAPIAdditionalProperties + if err := json.Unmarshal([]byte(tc.input), &ap); err != nil { + t.Errorf("UnmarshalJSON(%q): unexpected error: %v", tc.input, err) + continue + } + if ap.Bool != tc.wantBool { + t.Errorf("UnmarshalJSON(%q): Bool = %v, want %v", tc.input, ap.Bool, tc.wantBool) + } + if (ap.Schema != nil) != tc.wantSchema { + t.Errorf("UnmarshalJSON(%q): Schema non-nil = %v, want %v", tc.input, ap.Schema != nil, tc.wantSchema) + } + if tc.wantSchema && ap.Schema != nil && ap.Schema.Type != tc.wantType { + t.Errorf("UnmarshalJSON(%q): Schema.Type = %q, want %q", tc.input, ap.Schema.Type, tc.wantType) + } + } +} From 9bb5c4ff19262d18014d02becfeb057c8fe15868 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 12:14:45 +0000 Subject: [PATCH 3/3] fix: address review feedback and fix linter failures - Add bodyLabel prefix to additionalProperties:false error messages - Strengthen AdditionalProperties_True assertion to check for 200 (not just non-400) - Add invalid-type failing case to AdditionalProperties_Schema test - Fix gocritic ifElseChain lint issue in pipeline_step_graphql.go (from main) Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/openapi.go | 2 +- module/openapi_test.go | 19 +- module/pipeline_step_graphql.go | 885 ++++++++++++++++++++++++++++++++ 3 files changed, 901 insertions(+), 5 deletions(-) create mode 100644 module/pipeline_step_graphql.go diff --git a/module/openapi.go b/module/openapi.go index 54bc0f3e..50130407 100644 --- a/module/openapi.go +++ b/module/openapi.go @@ -1046,7 +1046,7 @@ func validateJSONBody(body any, schema *openAPISchema, bodyLabel string) []strin // additionalProperties: false — reject any key not listed in Properties for key := range obj { if _, defined := schema.Properties[key]; !defined { - errs = append(errs, fmt.Sprintf("additional property %q is not allowed", key)) + errs = append(errs, fmt.Sprintf("%s: additional property %q is not allowed", bodyLabel, key)) } } } else if ap.Schema != nil { diff --git a/module/openapi_test.go b/module/openapi_test.go index d8b1bd52..2683b88f 100644 --- a/module/openapi_test.go +++ b/module/openapi_test.go @@ -2524,8 +2524,8 @@ func TestOpenAPIModule_AdditionalProperties_True(t *testing.T) { r.Header.Set("Content-Type", "application/json") h.Handle(w, r) - if w.Code == http.StatusBadRequest { - t.Errorf("expected extra keys to pass with additionalProperties:true, got 400: %s", w.Body.String()) + if w.Code != http.StatusOK { + t.Errorf("expected extra keys to pass with additionalProperties:true, got status %d: %s", w.Code, w.Body.String()) } } @@ -2618,8 +2618,19 @@ func TestOpenAPIModule_AdditionalProperties_Schema(t *testing.T) { r.Header.Set("Content-Type", "application/json") h.Handle(w, r) - if w.Code == http.StatusBadRequest { - t.Errorf("expected string extra key to pass additionalProperties:{type:string}, got 400: %s", w.Body.String()) + if w.Code != http.StatusOK { + t.Errorf("expected string extra key to pass additionalProperties:{type:string}, got status %d: %s", w.Code, w.Body.String()) + } + + // Invalid: extra key value is an integer, not a string — should be rejected + bodyInvalid := `{"name":"test","extra":42}` + w2 := httptest.NewRecorder() + r2 := httptest.NewRequest(http.MethodPost, "/typed-extra", strings.NewReader(bodyInvalid)) + r2.Header.Set("Content-Type", "application/json") + h.Handle(w2, r2) + + if w2.Code != http.StatusBadRequest { + t.Errorf("expected integer extra key to fail additionalProperties:{type:string}, got status %d: %s", w2.Code, w2.Body.String()) } } diff --git a/module/pipeline_step_graphql.go b/module/pipeline_step_graphql.go new file mode 100644 index 00000000..a42aa2a1 --- /dev/null +++ b/module/pipeline_step_graphql.go @@ -0,0 +1,885 @@ +package module + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/GoCodeAlone/modular" +) + +// batchQuery holds a single query in a batch request. +type batchQuery struct { + query string + variables map[string]any +} + +// paginationConfig holds cursor or offset pagination settings. +type paginationConfig struct { + strategy string // "cursor" or "offset" + pageInfoPath string + cursorVariable string + hasNextField string + cursorField string + maxPages int + maxPerPage int + offsetVariable string +} + +// GraphQLStep executes GraphQL queries and mutations as a pipeline step. +type GraphQLStep struct { + name string + url string + query string + variables map[string]any + dataPath string + headers map[string]string + fragments []string + failOnGraphQLErrors bool + timeout time.Duration + retryOnNetworkError bool + tmpl *TemplateEngine + httpClient *http.Client + pagination *paginationConfig + batch []batchQuery + apqEnabled bool + apqSHA256 string + introspection bool + + // OAuth2 (reuses globalOAuthCache from pipeline_step_http_call.go) + auth *oauthConfig + oauthEntry *oauthCacheEntry +} + +// NewGraphQLStepFactory returns a StepFactory for GraphQLStep. +func NewGraphQLStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + rawURL, _ := config["url"].(string) + if rawURL == "" { + return nil, fmt.Errorf("graphql step %q: 'url' is required", name) + } + + query, _ := config["query"].(string) + // query can be empty if introspection or batch mode is used + introspectionCfg, _ := config["introspection"].(map[string]any) + batchCfg, _ := config["batch"].(map[string]any) + if query == "" && introspectionCfg == nil && batchCfg == nil { + return nil, fmt.Errorf("graphql step %q: 'query' is required (unless introspection or batch is configured)", name) + } + + step := &GraphQLStep{ + name: name, + url: rawURL, + query: query, + failOnGraphQLErrors: true, + timeout: 30 * time.Second, + tmpl: NewTemplateEngine(), + httpClient: http.DefaultClient, + } + + if vars, ok := config["variables"].(map[string]any); ok { + step.variables = vars + } + + if dp, ok := config["data_path"].(string); ok { + step.dataPath = dp + } + + if headers, ok := config["headers"].(map[string]any); ok { + step.headers = make(map[string]string, len(headers)) + for k, v := range headers { + if s, ok := v.(string); ok { + step.headers[k] = s + } + } + } + + if frags, ok := config["fragments"].([]any); ok { + for _, f := range frags { + if s, ok := f.(string); ok { + step.fragments = append(step.fragments, s) + } + } + } + + if batchCfgParsed, ok := config["batch"].(map[string]any); ok { + if queries, ok := batchCfgParsed["queries"].([]any); ok { + for _, q := range queries { + qMap, ok := q.(map[string]any) + if !ok { + continue + } + bq := batchQuery{} + bq.query, _ = qMap["query"].(string) + if vars, ok := qMap["variables"].(map[string]any); ok { + bq.variables = vars + } + step.batch = append(step.batch, bq) + } + } + } + + if pqCfg, ok := config["persisted_query"].(map[string]any); ok { + if enabled, ok := pqCfg["enabled"].(bool); ok && enabled { + step.apqEnabled = true + if hash, ok := pqCfg["sha256"].(string); ok && hash != "" { + step.apqSHA256 = hash + } + } + } + + if introCfg, ok := config["introspection"].(map[string]any); ok { + if enabled, ok := introCfg["enabled"].(bool); ok && enabled { + step.introspection = true + } + } + + if pagCfg, ok := config["pagination"].(map[string]any); ok { + pc := &paginationConfig{ + strategy: "cursor", + maxPages: 10, + maxPerPage: 100, + offsetVariable: "offset", + } + if s, ok := pagCfg["strategy"].(string); ok { + pc.strategy = s + } + if s, ok := pagCfg["page_info_path"].(string); ok { + pc.pageInfoPath = s + } + if s, ok := pagCfg["cursor_variable"].(string); ok { + pc.cursorVariable = s + } + if s, ok := pagCfg["has_next_field"].(string); ok { + pc.hasNextField = s + } + if s, ok := pagCfg["cursor_field"].(string); ok { + pc.cursorField = s + } + if s, ok := pagCfg["offset_variable"].(string); ok { + pc.offsetVariable = s + } + if v, ok := pagCfg["max_pages"]; ok { + switch val := v.(type) { + case int: + pc.maxPages = val + case float64: + pc.maxPages = int(val) + } + } + if v, ok := pagCfg["max_per_page"]; ok { + switch val := v.(type) { + case int: + pc.maxPerPage = val + case float64: + pc.maxPerPage = int(val) + } + } + step.pagination = pc + } + + if v, ok := config["fail_on_graphql_errors"]; ok { + if b, ok := v.(bool); ok { + step.failOnGraphQLErrors = b + } + } + + if s, ok := config["timeout"].(string); ok && s != "" { + d, err := time.ParseDuration(s) + if err != nil { + return nil, fmt.Errorf("graphql step %q: invalid timeout %q: %w", name, s, err) + } + step.timeout = d + } + + if v, ok := config["retry_on_network_error"].(bool); ok { + step.retryOnNetworkError = v + } + + // OAuth2 auth — reuses step.http_call patterns + if authCfg, ok := config["auth"].(map[string]any); ok { + authType, _ := authCfg["type"].(string) + switch authType { + case "oauth2_client_credentials": + cfg, oauthErr := buildOAuthConfig(name, "auth", authCfg) + if oauthErr != nil { + return nil, oauthErr + } + step.auth = cfg + step.oauthEntry = globalOAuthCache.getOrCreate(cfg.cacheKey) + case "bearer": + // bearer token stored as a simple header, resolved at execution time + if token, ok := authCfg["token"].(string); ok { + if step.headers == nil { + step.headers = make(map[string]string) + } + step.headers["Authorization"] = "Bearer " + token + } + case "api_key": + headerName, _ := authCfg["header"].(string) + if headerName == "" { + headerName = "X-API-Key" + } + apiKey, _ := authCfg["key"].(string) + if step.headers == nil { + step.headers = make(map[string]string) + } + step.headers[headerName] = apiKey + case "basic": + user, _ := authCfg["username"].(string) + pass, _ := authCfg["password"].(string) + if step.headers == nil { + step.headers = make(map[string]string) + } + step.headers["Authorization"] = "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass)) + } + } + + return step, nil + } +} + +// Name returns the step name. +func (s *GraphQLStep) Name() string { return s.name } + +// Execute runs the GraphQL query/mutation and returns the result. +func (s *GraphQLStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.introspection { + return s.executeIntrospection(ctx, pc) + } + + if len(s.batch) > 0 { + return s.executeBatch(ctx, pc) + } + + if s.pagination != nil { + return s.executePaginated(ctx, pc) + } + + if s.apqEnabled { + return s.executeAPQ(ctx, pc) + } + + if s.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + + // Resolve URL template + resolvedURL, err := s.tmpl.Resolve(s.url, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve url: %w", s.name, err) + } + + // Resolve variables + var resolvedVars map[string]any + if s.variables != nil { + resolvedVars, err = s.tmpl.ResolveMap(s.variables, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve variables: %w", s.name, err) + } + } + + // Build query with fragments prepended + fullQuery := s.query + if len(s.fragments) > 0 { + fullQuery = strings.Join(s.fragments, "\n") + "\n" + s.query + } + + // Resolve query template (allows dynamic queries) + fullQuery, err = s.tmpl.Resolve(fullQuery, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve query template: %w", s.name, err) + } + + // Build request body + reqBody := map[string]any{"query": fullQuery} + if resolvedVars != nil { + reqBody["variables"] = resolvedVars + } + + // Get OAuth2 token if configured + bearerToken, err := s.getBearerToken(ctx) + if err != nil { + return nil, err + } + + // Execute request (with optional network error retry) + output, statusCode, err := s.doRequest(ctx, resolvedURL, reqBody, bearerToken) + if err != nil { + switch { + case statusCode == http.StatusUnauthorized && s.auth != nil: + // 401 retry with token refresh + s.oauthEntry.invalidate() + bearerToken, err = s.fetchTokenDirect(ctx) + if err != nil { + return nil, err + } + output, statusCode, err = s.doRequest(ctx, resolvedURL, reqBody, bearerToken) + if err != nil { + return nil, err + } + case s.retryOnNetworkError && statusCode == 0: + // Network-level error (no HTTP response) — retry once + output, statusCode, err = s.doRequest(ctx, resolvedURL, reqBody, bearerToken) + if err != nil { + return nil, err + } + default: + return nil, err + } + } + + _ = statusCode + return &StepResult{Output: output}, nil +} + +// executePaginated handles cursor and offset pagination, collecting all pages. +func (s *GraphQLStep) executePaginated(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + + resolvedURL, err := s.tmpl.Resolve(s.url, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve url: %w", s.name, err) + } + + bearerToken, err := s.getBearerToken(ctx) + if err != nil { + return nil, err + } + + fullQuery := s.query + if len(s.fragments) > 0 { + fullQuery = strings.Join(s.fragments, "\n") + "\n" + s.query + } + fullQuery, err = s.tmpl.Resolve(fullQuery, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve query: %w", s.name, err) + } + + var allData []any + var allErrors []any + pageCount := 0 + var cursor any + offset := 0 + + for page := 0; page < s.pagination.maxPages; page++ { + vars := make(map[string]any) + if s.variables != nil { + resolved, resolveErr := s.tmpl.ResolveMap(s.variables, pc) + if resolveErr != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve variables: %w", s.name, resolveErr) + } + for k, v := range resolved { + vars[k] = v + } + } + + switch s.pagination.strategy { + case "cursor": + if cursor != nil { + vars[s.pagination.cursorVariable] = cursor + } + case "offset": + vars[s.pagination.offsetVariable] = offset + } + + reqBody := map[string]any{"query": fullQuery} + if len(vars) > 0 { + reqBody["variables"] = vars + } + + output, _, reqErr := s.doRequest(ctx, resolvedURL, reqBody, bearerToken) + if reqErr != nil { + return nil, reqErr + } + + // Collect errors + if errs, ok := output["errors"].([]any); ok && len(errs) > 0 { + allErrors = append(allErrors, errs...) + } + + pageCount++ + + // Get full response data (before data_path extraction) from "full_data" key + fullData := output["full_data"] + + switch s.pagination.strategy { + case "cursor": + // Extract page items via data_path + pageData := fullData + if s.dataPath != "" { + pageData = extractDataPath(fullData, s.dataPath) + } + if arr, ok := pageData.([]any); ok { + allData = append(allData, arr...) + } + + // Check for next page via pageInfo + pageInfo := extractDataPath(fullData, s.pagination.pageInfoPath) + pageInfoMap, ok := pageInfo.(map[string]any) + if !ok { + goto done + } + hasNext, _ := pageInfoMap[s.pagination.hasNextField].(bool) + if !hasNext { + goto done + } + cursor = pageInfoMap[s.pagination.cursorField] + if cursor == nil { + goto done + } + + case "offset": + // Extract page items via data_path + pageData := fullData + if s.dataPath != "" { + pageData = extractDataPath(fullData, s.dataPath) + } + arr, ok := pageData.([]any) + if !ok || len(arr) == 0 { + goto done + } + allData = append(allData, arr...) + if len(arr) < s.pagination.maxPerPage { + goto done + } + offset += len(arr) + } + } + +done: + result := map[string]any{ + "data": allData, + "errors": allErrors, + "has_errors": len(allErrors) > 0, + "page_count": pageCount, + "total_items": len(allData), + "status_code": 200, + } + if allErrors == nil { + result["errors"] = []any{} + } + + return &StepResult{Output: result}, nil +} + +const introspectionQuery = `query IntrospectionQuery { + __schema { + queryType { name } + mutationType { name } + subscriptionType { name } + types { + kind name description + fields(includeDeprecated: true) { + name description + args { name description type { kind name ofType { kind name ofType { kind name } } } defaultValue } + type { kind name ofType { kind name ofType { kind name ofType { kind name } } } } + isDeprecated deprecationReason + } + inputFields { name description type { kind name ofType { kind name } } defaultValue } + interfaces { kind name } + enumValues(includeDeprecated: true) { name description isDeprecated deprecationReason } + possibleTypes { kind name } + } + directives { + name description locations + args { name description type { kind name ofType { kind name } } defaultValue } + } + } +}` + +// executeIntrospection sends the standard introspection query. +func (s *GraphQLStep) executeIntrospection(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + + resolvedURL, err := s.tmpl.Resolve(s.url, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve url: %w", s.name, err) + } + + bearerToken, err := s.getBearerToken(ctx) + if err != nil { + return nil, err + } + + reqBody := map[string]any{"query": introspectionQuery} + output, _, err := s.doRequest(ctx, resolvedURL, reqBody, bearerToken) + if err != nil { + return nil, err + } + + // Extract __schema and types for convenience + if fullData, ok := output["full_data"].(map[string]any); ok { + if schema, ok := fullData["__schema"]; ok { + output["schema"] = schema + if schemaMap, ok := schema.(map[string]any); ok { + output["types"] = schemaMap["types"] + } + } + } + + return &StepResult{Output: output}, nil +} + +// executeAPQ sends an Automatic Persisted Query: first hash-only, then with full query on cache miss. +func (s *GraphQLStep) executeAPQ(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + + resolvedURL, err := s.tmpl.Resolve(s.url, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve url: %w", s.name, err) + } + + fullQuery := s.query + if len(s.fragments) > 0 { + fullQuery = strings.Join(s.fragments, "\n") + "\n" + s.query + } + fullQuery, err = s.tmpl.Resolve(fullQuery, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve query: %w", s.name, err) + } + + var resolvedVars map[string]any + if s.variables != nil { + resolvedVars, err = s.tmpl.ResolveMap(s.variables, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve variables: %w", s.name, err) + } + } + + hash := s.apqSHA256 + if hash == "" { + h := sha256.Sum256([]byte(fullQuery)) + hash = hex.EncodeToString(h[:]) + } + + bearerToken, err := s.getBearerToken(ctx) + if err != nil { + return nil, err + } + + // First attempt: send hash only (no query body) + reqBody := map[string]any{ + "extensions": map[string]any{ + "persistedQuery": map[string]any{ + "version": 1, + "sha256Hash": hash, + }, + }, + } + if resolvedVars != nil { + reqBody["variables"] = resolvedVars + } + + // Temporarily disable fail_on_graphql_errors so we can inspect PersistedQueryNotFound + origFail := s.failOnGraphQLErrors + s.failOnGraphQLErrors = false + output, _, firstErr := s.doRequest(ctx, resolvedURL, reqBody, bearerToken) + s.failOnGraphQLErrors = origFail + + if firstErr == nil && !isPersistedQueryNotFound(output) { + return &StepResult{Output: output}, nil + } + + // Retry with full query body + reqBody["query"] = fullQuery + output, _, err = s.doRequest(ctx, resolvedURL, reqBody, bearerToken) + if err != nil { + return nil, err + } + return &StepResult{Output: output}, nil +} + +// isPersistedQueryNotFound checks if the output contains a PersistedQueryNotFound error. +func isPersistedQueryNotFound(output map[string]any) bool { + errors, ok := output["errors"].([]any) + if !ok || len(errors) == 0 { + return false + } + for _, e := range errors { + if errMap, ok := e.(map[string]any); ok { + if msg, ok := errMap["message"].(string); ok && strings.Contains(msg, "PersistedQueryNotFound") { + return true + } + } + } + return false +} + +// executeBatch sends all batch queries in a single HTTP request. +func (s *GraphQLStep) executeBatch(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, s.timeout) + defer cancel() + } + + resolvedURL, err := s.tmpl.Resolve(s.url, pc) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to resolve url: %w", s.name, err) + } + + bearerToken, err := s.getBearerToken(ctx) + if err != nil { + return nil, err + } + + batchBody := make([]map[string]any, len(s.batch)) + for i, bq := range s.batch { + query := bq.query + if len(s.fragments) > 0 { + query = strings.Join(s.fragments, "\n") + "\n" + query + } + entry := map[string]any{"query": query} + if bq.variables != nil { + resolved, resolveErr := s.tmpl.ResolveMap(bq.variables, pc) + if resolveErr != nil { + return nil, fmt.Errorf("graphql step %q: batch query %d: failed to resolve variables: %w", s.name, i, resolveErr) + } + entry["variables"] = resolved + } + batchBody[i] = entry + } + + bodyBytes, err := json.Marshal(batchBody) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to marshal batch request: %w", s.name, err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, resolvedURL, bytes.NewReader(bodyBytes)) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to create batch request: %w", s.name, err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + for k, v := range s.headers { + req.Header.Set(k, v) + } + if bearerToken != "" { + req.Header.Set("Authorization", "Bearer "+bearerToken) + } + + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("graphql step %q: batch request failed: %w", s.name, err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(io.LimitReader(resp.Body, 10<<20)) + if err != nil { + return nil, fmt.Errorf("graphql step %q: failed to read batch response: %w", s.name, err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("graphql step %q: batch HTTP %d: %s", s.name, resp.StatusCode, string(respBody)) + } + + var batchResp []any + if err := json.Unmarshal(respBody, &batchResp); err != nil { + return nil, fmt.Errorf("graphql step %q: failed to parse batch response: %w", s.name, err) + } + + return &StepResult{Output: map[string]any{ + "results": batchResp, + "status_code": resp.StatusCode, + }}, nil +} + +// getBearerToken returns the OAuth2 bearer token if auth is configured. +func (s *GraphQLStep) getBearerToken(ctx context.Context) (string, error) { + if s.auth == nil { + return "", nil + } + if token := s.oauthEntry.get(); token != "" { + return token, nil + } + val, err, _ := s.oauthEntry.sfGroup.Do("fetch", func() (any, error) { + if token := s.oauthEntry.get(); token != "" { + return token, nil + } + return s.fetchTokenDirect(ctx) + }) + if err != nil { + return "", err + } + return val.(string), nil +} + +// fetchTokenDirect performs an OAuth2 client_credentials token fetch. +func (s *GraphQLStep) fetchTokenDirect(ctx context.Context) (string, error) { + params := "grant_type=client_credentials&client_id=" + s.auth.clientID + + "&client_secret=" + s.auth.clientSecret + if len(s.auth.scopes) > 0 { + params += "&scope=" + strings.Join(s.auth.scopes, " ") + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.auth.tokenURL, + strings.NewReader(params)) + if err != nil { + return "", fmt.Errorf("graphql step %q: failed to create token request: %w", s.name, err) + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + resp, err := s.httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("graphql step %q: token request failed: %w", s.name, err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("graphql step %q: failed to read token response: %w", s.name, err) + } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("graphql step %q: token endpoint returned HTTP %d: %s", s.name, resp.StatusCode, string(body)) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + ExpiresIn float64 `json:"expires_in"` + } + if err := json.Unmarshal(body, &tokenResp); err != nil { + return "", fmt.Errorf("graphql step %q: failed to parse token response: %w", s.name, err) + } + if tokenResp.AccessToken == "" { + return "", fmt.Errorf("graphql step %q: token response missing access_token", s.name) + } + + ttl := time.Duration(tokenResp.ExpiresIn) * time.Second + if ttl <= 0 { + ttl = 3600 * time.Second + } + if ttl > 10*time.Second { + ttl -= 10 * time.Second + } + s.oauthEntry.set(tokenResp.AccessToken, "", ttl) + return tokenResp.AccessToken, nil +} + +// doRequest sends the GraphQL HTTP request and parses the response. +// Output includes "full_data" (raw data before data_path extraction) for pagination. +func (s *GraphQLStep) doRequest(ctx context.Context, url string, reqBody map[string]any, bearerToken string) (map[string]any, int, error) { + bodyBytes, err := json.Marshal(reqBody) + if err != nil { + return nil, 0, fmt.Errorf("graphql step %q: failed to marshal request: %w", s.name, err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bodyBytes)) + if err != nil { + return nil, 0, fmt.Errorf("graphql step %q: failed to create request: %w", s.name, err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + + // Apply custom headers (with template resolution) + for k, v := range s.headers { + resolved, resolveErr := s.tmpl.Resolve(v, nil) + if resolveErr != nil { + resolved = v + } + req.Header.Set(k, resolved) + } + + if bearerToken != "" { + req.Header.Set("Authorization", "Bearer "+bearerToken) + } + + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, 0, fmt.Errorf("graphql step %q: request failed: %w", s.name, err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(io.LimitReader(resp.Body, 10<<20)) // 10MB limit + if err != nil { + return nil, resp.StatusCode, fmt.Errorf("graphql step %q: failed to read response: %w", s.name, err) + } + + if resp.StatusCode == http.StatusUnauthorized { + return nil, resp.StatusCode, fmt.Errorf("graphql step %q: received 401 Unauthorized", s.name) + } + if resp.StatusCode != http.StatusOK { + return nil, resp.StatusCode, fmt.Errorf("graphql step %q: HTTP %d: %s", s.name, resp.StatusCode, string(respBody)) + } + + // Parse GraphQL response into a map so full_data is accessible as map[string]any + var rawMap map[string]any + if err := json.Unmarshal(respBody, &rawMap); err != nil { + return nil, resp.StatusCode, fmt.Errorf("graphql step %q: failed to parse response JSON: %w", s.name, err) + } + + var gqlErrors []any + if errs, ok := rawMap["errors"].([]any); ok { + gqlErrors = errs + } + gqlData := rawMap["data"] + gqlExtensions := rawMap["extensions"] + + hasErrors := len(gqlErrors) > 0 + + if hasErrors && s.failOnGraphQLErrors { + errMsg := "graphql error" + if errMap, ok := gqlErrors[0].(map[string]any); ok { + if msg, ok := errMap["message"].(string); ok { + errMsg = msg + } + } + return nil, resp.StatusCode, fmt.Errorf("graphql step %q: %s", s.name, errMsg) + } + + // Extract data via data_path + extractedData := gqlData + if s.dataPath != "" && gqlData != nil { + extractedData = extractDataPath(gqlData, s.dataPath) + } + + output := map[string]any{ + "data": extractedData, + "full_data": gqlData, // full data before data_path extraction (used by pagination) + "errors": gqlErrors, + "raw": rawMap, + "status_code": resp.StatusCode, + "has_errors": hasErrors, + "extensions": gqlExtensions, + } + if gqlErrors == nil { + output["errors"] = []any{} + } + + return output, resp.StatusCode, nil +} + +// extractDataPath navigates a dot-separated path into a nested map. +func extractDataPath(data any, path string) any { + parts := strings.Split(path, ".") + current := data + for _, part := range parts { + m, ok := current.(map[string]any) + if !ok { + return nil + } + current = m[part] + } + return current +}