From 2cc6f758a48814c9bf83b02aaa84ee916b5fbbf7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 18:34:10 +0000 Subject: [PATCH 1/3] Initial plan From ee69cdb664f2508816936320b40a4b603b0d94ba Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 18:46:11 +0000 Subject: [PATCH 2/3] feat(step.http_call): add oauth2 config key and instance_url support Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/pipeline_step_http_call.go | 106 +++++++-- module/pipeline_step_http_call_test.go | 301 +++++++++++++++++++++++++ schema/module_schema.go | 3 +- 3 files changed, 396 insertions(+), 14 deletions(-) diff --git a/module/pipeline_step_http_call.go b/module/pipeline_step_http_call.go index 98fcc86f..b0f08585 100644 --- a/module/pipeline_step_http_call.go +++ b/module/pipeline_step_http_call.go @@ -54,6 +54,7 @@ func (c *oauthTokenCache) getOrCreate(key string) *oauthCacheEntry { type oauthCacheEntry struct { mu sync.Mutex accessToken string + instanceURL string // optional; populated when the token endpoint returns instance_url (Salesforce pattern) expiry time.Time sfGroup singleflight.Group } @@ -68,19 +69,28 @@ func (e *oauthCacheEntry) get() string { return "" } -// set stores a token with the given TTL. -func (e *oauthCacheEntry) set(token string, ttl time.Duration) { +// getInstanceURL returns the cached instance_url (may be empty if the token endpoint did not return one). +func (e *oauthCacheEntry) getInstanceURL() string { + e.mu.Lock() + defer e.mu.Unlock() + return e.instanceURL +} + +// set stores a token and optional instance_url with the given TTL. +func (e *oauthCacheEntry) set(token, instanceURL string, ttl time.Duration) { e.mu.Lock() defer e.mu.Unlock() e.accessToken = token + e.instanceURL = instanceURL e.expiry = time.Now().Add(ttl) } -// invalidate clears the cached token. +// invalidate clears the cached token and instance_url. func (e *oauthCacheEntry) invalidate() { e.mu.Lock() defer e.mu.Unlock() e.accessToken = "" + e.instanceURL = "" e.expiry = time.Time{} } @@ -197,6 +207,60 @@ func NewHTTPCallStepFactory() StepFactory { } } + // Support top-level "oauth2" key as an alternative to "auth" with type=oauth2_client_credentials. + // This follows the syntax proposed in the issue and is more idiomatic for Salesforce-style configs: + // oauth2: + // grant_type: client_credentials (optional, defaults to client_credentials) + // token_url: "..." + // client_id: "..." + // client_secret: "..." + // scopes: ["api"] + if oauth2Cfg, ok := config["oauth2"].(map[string]any); ok && step.auth == nil { + grantType, _ := oauth2Cfg["grant_type"].(string) + if grantType == "" { + grantType = "client_credentials" + } + if grantType != "client_credentials" { + return nil, fmt.Errorf("http_call step %q: oauth2.grant_type must be 'client_credentials'", name) + } + tokenURL, _ := oauth2Cfg["token_url"].(string) + if tokenURL == "" { + return nil, fmt.Errorf("http_call step %q: oauth2.token_url is required", name) + } + clientID, _ := oauth2Cfg["client_id"].(string) + if clientID == "" { + return nil, fmt.Errorf("http_call step %q: oauth2.client_id is required", name) + } + clientSecret, _ := oauth2Cfg["client_secret"].(string) + if clientSecret == "" { + return nil, fmt.Errorf("http_call step %q: oauth2.client_secret is required", name) + } + + var scopes []string + if raw, ok := oauth2Cfg["scopes"]; ok { + switch v := raw.(type) { + case []string: + scopes = v + case []any: + for _, s := range v { + if str, ok := s.(string); ok { + scopes = append(scopes, str) + } + } + } + } + + cacheKey := tokenURL + "\x00" + clientID + "\x00" + clientSecret + "\x00" + strings.Join(scopes, " ") + step.auth = &oauthConfig{ + tokenURL: tokenURL, + clientID: clientID, + clientSecret: clientSecret, + scopes: scopes, + cacheKey: cacheKey, + } + step.oauthEntry = globalOAuthCache.getOrCreate(cacheKey) + } + return step, nil } } @@ -243,6 +307,7 @@ func (s *HTTPCallStep) doFetchToken(ctx context.Context) (string, error) { AccessToken string `json:"access_token"` //nolint:gosec // G117: parsing OAuth2 token response, not a secret exposure ExpiresIn float64 `json:"expires_in"` TokenType string `json:"token_type"` + InstanceURL string `json:"instance_url"` // Salesforce pattern: base URL for subsequent API calls } if err := json.Unmarshal(body, &tokenResp); err != nil { return "", fmt.Errorf("http_call step %q: failed to parse token response: %w", s.name, err) @@ -259,7 +324,7 @@ func (s *HTTPCallStep) doFetchToken(ctx context.Context) (string, error) { if ttl > 10*time.Second { ttl -= 10 * time.Second } - s.oauthEntry.set(tokenResp.AccessToken, ttl) + s.oauthEntry.set(tokenResp.AccessToken, tokenResp.InstanceURL, ttl) return tokenResp.AccessToken, nil } @@ -392,6 +457,22 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR ctx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() + // Obtain OAuth2 bearer token first so that instance_url is available for URL template resolution. + var bearerToken string + var err error + if s.auth != nil { + bearerToken, err = s.getToken(ctx) + if err != nil { + return nil, err + } + // Inject instance_url into the pipeline context so URL/header templates can reference it + // as {{ .instance_url }}. This is a Salesforce pattern where the token endpoint returns the + // org-specific base URL alongside the access token. + if instanceURL := s.oauthEntry.getInstanceURL(); instanceURL != "" { + pc.Current["instance_url"] = instanceURL + } + } + // Resolve URL template resolvedURL, err := s.tmpl.Resolve(s.url, pc) if err != nil { @@ -403,15 +484,6 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR return nil, err } - // Obtain OAuth2 bearer token if auth is configured - var bearerToken string - if s.auth != nil { - bearerToken, err = s.getToken(ctx) - if err != nil { - return nil, err - } - } - req, err := s.buildRequest(ctx, resolvedURL, bodyReader, rawBody, pc, bearerToken) if err != nil { return nil, err @@ -459,6 +531,9 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR } output := parseHTTPResponse(retryResp, respBody) + if instanceURL := s.oauthEntry.getInstanceURL(); instanceURL != "" { + output["instance_url"] = instanceURL + } if retryResp.StatusCode >= 400 { return nil, fmt.Errorf("http_call step %q: HTTP %d: %s", s.name, retryResp.StatusCode, string(respBody)) } @@ -466,6 +541,11 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR } output := parseHTTPResponse(resp, respBody) + if s.auth != nil { + if instanceURL := s.oauthEntry.getInstanceURL(); instanceURL != "" { + output["instance_url"] = instanceURL + } + } if resp.StatusCode >= 400 { return nil, fmt.Errorf("http_call step %q: HTTP %d: %s", s.name, resp.StatusCode, string(respBody)) diff --git a/module/pipeline_step_http_call_test.go b/module/pipeline_step_http_call_test.go index 36260b8a..0890de48 100644 --- a/module/pipeline_step_http_call_test.go +++ b/module/pipeline_step_http_call_test.go @@ -671,6 +671,307 @@ func TestHTTPCallStep_BodyFrom_ContentTypeOverride(t *testing.T) { } } +// TestHTTPCallStep_OAuth2Key_FetchesToken verifies that the top-level "oauth2" config key works +// as an alternative to "auth" with type=oauth2_client_credentials. +func TestHTTPCallStep_OAuth2Key_FetchesToken(t *testing.T) { + var tokenRequests int32 + + tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&tokenRequests, 1) + _ = r.ParseForm() + if r.FormValue("grant_type") != "client_credentials" { + t.Errorf("expected grant_type=client_credentials, got %q", r.FormValue("grant_type")) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "access_token": "oauth2-key-token", + "expires_in": 3600, + "token_type": "Bearer", + }) + })) + defer tokenSrv.Close() + + apiSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Authorization") != "Bearer oauth2-key-token" { + t.Errorf("expected Bearer oauth2-key-token, got %q", r.Header.Get("Authorization")) + w.WriteHeader(http.StatusUnauthorized) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) + })) + defer apiSrv.Close() + + factory := NewHTTPCallStepFactory() + step, err := factory("oauth2-key-test", map[string]any{ + "url": apiSrv.URL + "/data", + "method": "GET", + "oauth2": map[string]any{ + "grant_type": "client_credentials", + "token_url": tokenSrv.URL + "/token", + "client_id": "sf-client", + "client_secret": "sf-secret", + "scopes": []any{"api"}, + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*HTTPCallStep).httpClient = &http.Client{} + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["status_code"] != http.StatusOK { + t.Errorf("expected 200, got %v", result.Output["status_code"]) + } + if atomic.LoadInt32(&tokenRequests) != 1 { + t.Errorf("expected 1 token request, got %d", atomic.LoadInt32(&tokenRequests)) + } +} + +// TestHTTPCallStep_OAuth2Key_DefaultGrantType verifies that grant_type defaults to client_credentials. +func TestHTTPCallStep_OAuth2Key_DefaultGrantType(t *testing.T) { + tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _ = r.ParseForm() + if r.FormValue("grant_type") != "client_credentials" { + t.Errorf("expected grant_type=client_credentials, got %q", r.FormValue("grant_type")) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"access_token": "tok", "expires_in": 3600}) + })) + defer tokenSrv.Close() + + apiSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + })) + defer apiSrv.Close() + + factory := NewHTTPCallStepFactory() + // No grant_type specified – should default to client_credentials + step, err := factory("oauth2-default-grant", map[string]any{ + "url": apiSrv.URL, + "oauth2": map[string]any{ + "token_url": tokenSrv.URL + "/token", + "client_id": "cid", + "client_secret": "csec", + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*HTTPCallStep).httpClient = &http.Client{} + + if _, err := step.Execute(context.Background(), NewPipelineContext(nil, nil)); err != nil { + t.Fatalf("execute error: %v", err) + } +} + +// TestHTTPCallStep_OAuth2Key_InvalidGrantType verifies that an unsupported grant_type is rejected. +func TestHTTPCallStep_OAuth2Key_InvalidGrantType(t *testing.T) { + factory := NewHTTPCallStepFactory() + _, err := factory("bad-grant", map[string]any{ + "url": "http://example.com/api", + "oauth2": map[string]any{ + "grant_type": "authorization_code", + "token_url": "http://example.com/token", + "client_id": "cid", + "client_secret": "csec", + }, + }, nil) + if err == nil { + t.Fatal("expected error for unsupported grant_type") + } + if !strings.Contains(err.Error(), "grant_type must be 'client_credentials'") { + t.Errorf("unexpected error: %v", err) + } +} + +// TestHTTPCallStep_OAuth2Key_MissingFields verifies that missing oauth2 fields produce errors. +func TestHTTPCallStep_OAuth2Key_MissingFields(t *testing.T) { + factory := NewHTTPCallStepFactory() + + tests := []struct { + name string + oauth2 map[string]any + errMsg string + }{ + { + name: "missing token_url", + oauth2: map[string]any{ + "client_id": "cid", + "client_secret": "csec", + }, + errMsg: "oauth2.token_url is required", + }, + { + name: "missing client_id", + oauth2: map[string]any{ + "token_url": "http://example.com/token", + "client_secret": "csec", + }, + errMsg: "oauth2.client_id is required", + }, + { + name: "missing client_secret", + oauth2: map[string]any{ + "token_url": "http://example.com/token", + "client_id": "cid", + }, + errMsg: "oauth2.client_secret is required", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + _, err := factory("test", map[string]any{ + "url": "http://example.com/api", + "oauth2": tc.oauth2, + }, nil) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), tc.errMsg) { + t.Errorf("expected %q in error, got: %v", tc.errMsg, err) + } + }) + } +} + +// TestHTTPCallStep_OAuth2_InstanceURL verifies that instance_url from the token response is +// injected into the pipeline context for URL template resolution and included in step output. +func TestHTTPCallStep_OAuth2_InstanceURL(t *testing.T) { + const fakeInstanceURL = "https://myorg.my.salesforce.com" + + tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "access_token": "sf-token", + "expires_in": 3600, + "token_type": "Bearer", + "instance_url": fakeInstanceURL, + }) + })) + defer tokenSrv.Close() + + // The API server will verify the incoming URL path to confirm instance_url was used in + // template resolution. + apiSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/services/data/v62.0/sobjects" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"created": true}) + })) + defer apiSrv.Close() + + factory := NewHTTPCallStepFactory() + // URL uses {{ .instance_url }} template – will be resolved to apiSrv.URL for testing. + // We override fakeInstanceURL to point at apiSrv.URL so the request actually succeeds. + step, err := factory("sf-test", map[string]any{ + "url": "{{.instance_url}}/services/data/v62.0/sobjects", + "method": "GET", + "oauth2": map[string]any{ + "token_url": tokenSrv.URL + "/token", + "client_id": "sf-cid", + "client_secret": "sf-csec", + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + hs := step.(*HTTPCallStep) + hs.httpClient = &http.Client{} + + // Pre-populate the cache entry so instance_url resolves to apiSrv.URL instead of the fake. + hs.oauthEntry.set("sf-token", apiSrv.URL, 3600*time.Second) + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + // instance_url should be present in the step output. + if result.Output["instance_url"] != apiSrv.URL { + t.Errorf("expected instance_url=%q in output, got %v", apiSrv.URL, result.Output["instance_url"]) + } + // instance_url should also have been injected into pc.Current. + if pc.Current["instance_url"] != apiSrv.URL { + t.Errorf("expected instance_url in pc.Current, got %v", pc.Current["instance_url"]) + } +} + +// TestHTTPCallStep_OAuth2_InstanceURL_Cached verifies that instance_url persists across calls +// and is refreshed when the token is invalidated and re-fetched. +func TestHTTPCallStep_OAuth2_InstanceURL_Cached(t *testing.T) { + var tokenRequests int32 + const instanceURL1 = "https://instance1.salesforce.com" + const instanceURL2 = "https://instance2.salesforce.com" + + tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&tokenRequests, 1) + iurl := instanceURL1 + if n > 1 { + iurl = instanceURL2 + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "access_token": "tok", + "expires_in": 3600, + "instance_url": iurl, + }) + })) + defer tokenSrv.Close() + + apiSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{}`)) + })) + defer apiSrv.Close() + + factory := NewHTTPCallStepFactory() + step, err := factory("iurl-cached", map[string]any{ + "url": apiSrv.URL, + "method": "GET", + "oauth2": map[string]any{ + "token_url": tokenSrv.URL + "/token", + "client_id": "cid", + "client_secret": "csec", + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + hs := step.(*HTTPCallStep) + hs.httpClient = &http.Client{} + + pc := NewPipelineContext(nil, nil) + + // First call – token is fetched, instance_url = instanceURL1. + if _, err := step.Execute(context.Background(), pc); err != nil { + t.Fatalf("first execute error: %v", err) + } + if hs.oauthEntry.getInstanceURL() != instanceURL1 { + t.Errorf("expected instance_url=%q after first fetch, got %q", instanceURL1, hs.oauthEntry.getInstanceURL()) + } + + // Invalidate and fetch again – instance_url should update to instanceURL2. + hs.oauthEntry.invalidate() + if _, err := step.Execute(context.Background(), pc); err != nil { + t.Fatalf("second execute error: %v", err) + } + if hs.oauthEntry.getInstanceURL() != instanceURL2 { + t.Errorf("expected instance_url=%q after second fetch, got %q", instanceURL2, hs.oauthEntry.getInstanceURL()) + } + if atomic.LoadInt32(&tokenRequests) != 2 { + t.Errorf("expected 2 token requests, got %d", atomic.LoadInt32(&tokenRequests)) + } +} + // TestHTTPCallStep_BodyFrom_NilValue verifies that body_from with a missing path sends no body. func TestHTTPCallStep_BodyFrom_NilValue(t *testing.T) { ch := make(chan []byte, 1) diff --git a/schema/module_schema.go b/schema/module_schema.go index 8846e578..f560bafd 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -963,11 +963,12 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Inputs: []ServiceIODef{{Name: "context", Type: "PipelineContext", Description: "Pipeline context with data for URL/body template resolution"}}, Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "HTTP response body parsed as JSON and merged into pipeline context"}}, ConfigFields: []ConfigFieldDef{ - {Key: "url", Label: "URL", Type: FieldTypeString, Required: true, Description: "Request URL (supports {{ .field }} templates)", Placeholder: "https://api.example.com/{{ .resource }}"}, + {Key: "url", Label: "URL", Type: FieldTypeString, Required: true, Description: "Request URL (supports {{ .field }} templates; use {{ .instance_url }} when oauth2 is configured with an instance_url-returning endpoint)", Placeholder: "https://api.example.com/{{ .resource }}"}, {Key: "method", Label: "Method", Type: FieldTypeSelect, Options: []string{"GET", "POST", "PUT", "PATCH", "DELETE"}, DefaultValue: "GET", Description: "HTTP method"}, {Key: "headers", Label: "Headers", Type: FieldTypeMap, MapValueType: "string", Description: "Request headers (values support templates)"}, {Key: "body", Label: "Body", Type: FieldTypeJSON, Description: "Request body (supports templates). For POST/PUT without body, sends pipeline context."}, {Key: "timeout", Label: "Timeout", Type: FieldTypeString, DefaultValue: "30s", Description: "Request timeout duration", Placeholder: "30s"}, + {Key: "oauth2", Label: "OAuth2", Type: FieldTypeJSON, Description: "OAuth2 client_credentials configuration. Tokens are cached and refreshed automatically. Fields: grant_type (default: client_credentials), token_url, client_id, client_secret, scopes. When the token endpoint returns instance_url (Salesforce pattern), it is injected as {{ .instance_url }} for URL templates and included in the step output."}, }, }) From a988decd9b5b841c1471f85443bb046173cbbf7d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 20:42:48 +0000 Subject: [PATCH 3/3] refactor(step.http_call): address review feedback - extract helper, fix 401 retry, improve schema docs, fix instance_url test Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/pipeline_step_http_call.go | 136 ++++++++++++------------- module/pipeline_step_http_call_test.go | 105 +++++++++++++++---- schema/module_schema.go | 4 +- 3 files changed, 152 insertions(+), 93 deletions(-) diff --git a/module/pipeline_step_http_call.go b/module/pipeline_step_http_call.go index b0f08585..850c053c 100644 --- a/module/pipeline_step_http_call.go +++ b/module/pipeline_step_http_call.go @@ -166,44 +166,12 @@ func NewHTTPCallStepFactory() StepFactory { if authCfg, ok := config["auth"].(map[string]any); ok { authType, _ := authCfg["type"].(string) if authType == "oauth2_client_credentials" { - tokenURL, _ := authCfg["token_url"].(string) - if tokenURL == "" { - return nil, fmt.Errorf("http_call step %q: auth.token_url is required for oauth2_client_credentials", name) + cfg, oauthErr := buildOAuthConfig(name, "auth", authCfg) + if oauthErr != nil { + return nil, oauthErr } - clientID, _ := authCfg["client_id"].(string) - if clientID == "" { - return nil, fmt.Errorf("http_call step %q: auth.client_id is required for oauth2_client_credentials", name) - } - clientSecret, _ := authCfg["client_secret"].(string) - if clientSecret == "" { - return nil, fmt.Errorf("http_call step %q: auth.client_secret is required for oauth2_client_credentials", name) - } - - var scopes []string - if raw, ok := authCfg["scopes"]; ok { - switch v := raw.(type) { - case []string: - scopes = v - case []any: - for _, s := range v { - if str, ok := s.(string); ok { - scopes = append(scopes, str) - } - } - } - } - - // Cache key incorporates all credential fields so each distinct tenant/client - // gets its own isolated token cache entry. - cacheKey := tokenURL + "\x00" + clientID + "\x00" + clientSecret + "\x00" + strings.Join(scopes, " ") - step.auth = &oauthConfig{ - tokenURL: tokenURL, - clientID: clientID, - clientSecret: clientSecret, - scopes: scopes, - cacheKey: cacheKey, - } - step.oauthEntry = globalOAuthCache.getOrCreate(cacheKey) + step.auth = cfg + step.oauthEntry = globalOAuthCache.getOrCreate(cfg.cacheKey) } } @@ -215,6 +183,7 @@ func NewHTTPCallStepFactory() StepFactory { // client_id: "..." // client_secret: "..." // scopes: ["api"] + // Note: if the "auth" block is also present, it takes precedence and "oauth2" is ignored. if oauth2Cfg, ok := config["oauth2"].(map[string]any); ok && step.auth == nil { grantType, _ := oauth2Cfg["grant_type"].(string) if grantType == "" { @@ -223,46 +192,58 @@ func NewHTTPCallStepFactory() StepFactory { if grantType != "client_credentials" { return nil, fmt.Errorf("http_call step %q: oauth2.grant_type must be 'client_credentials'", name) } - tokenURL, _ := oauth2Cfg["token_url"].(string) - if tokenURL == "" { - return nil, fmt.Errorf("http_call step %q: oauth2.token_url is required", name) - } - clientID, _ := oauth2Cfg["client_id"].(string) - if clientID == "" { - return nil, fmt.Errorf("http_call step %q: oauth2.client_id is required", name) - } - clientSecret, _ := oauth2Cfg["client_secret"].(string) - if clientSecret == "" { - return nil, fmt.Errorf("http_call step %q: oauth2.client_secret is required", name) + cfg, oauthErr := buildOAuthConfig(name, "oauth2", oauth2Cfg) + if oauthErr != nil { + return nil, oauthErr } + step.auth = cfg + step.oauthEntry = globalOAuthCache.getOrCreate(cfg.cacheKey) + } - var scopes []string - if raw, ok := oauth2Cfg["scopes"]; ok { - switch v := raw.(type) { - case []string: - scopes = v - case []any: - for _, s := range v { - if str, ok := s.(string); ok { - scopes = append(scopes, str) - } - } - } - } + return step, nil + } +} - cacheKey := tokenURL + "\x00" + clientID + "\x00" + clientSecret + "\x00" + strings.Join(scopes, " ") - step.auth = &oauthConfig{ - tokenURL: tokenURL, - clientID: clientID, - clientSecret: clientSecret, - scopes: scopes, - cacheKey: cacheKey, +// buildOAuthConfig parses OAuth2 client_credentials fields from a config map and returns an +// oauthConfig. The prefix parameter ("auth" or "oauth2") is used in error messages. +func buildOAuthConfig(stepName, prefix string, cfg map[string]any) (*oauthConfig, error) { + tokenURL, _ := cfg["token_url"].(string) + if tokenURL == "" { + return nil, fmt.Errorf("http_call step %q: %s.token_url is required", stepName, prefix) + } + clientID, _ := cfg["client_id"].(string) + if clientID == "" { + return nil, fmt.Errorf("http_call step %q: %s.client_id is required", stepName, prefix) + } + clientSecret, _ := cfg["client_secret"].(string) + if clientSecret == "" { + return nil, fmt.Errorf("http_call step %q: %s.client_secret is required", stepName, prefix) + } + + var scopes []string + if raw, ok := cfg["scopes"]; ok { + switch v := raw.(type) { + case []string: + scopes = v + case []any: + for _, s := range v { + if str, ok := s.(string); ok { + scopes = append(scopes, str) + } } - step.oauthEntry = globalOAuthCache.getOrCreate(cacheKey) } - - return step, nil } + + // Cache key incorporates all credential fields so each distinct tenant/client + // gets its own isolated token cache entry. + cacheKey := tokenURL + "\x00" + clientID + "\x00" + clientSecret + "\x00" + strings.Join(scopes, " ") + return &oauthConfig{ + tokenURL: tokenURL, + clientID: clientID, + clientSecret: clientSecret, + scopes: scopes, + cacheKey: cacheKey, + }, nil } // Name returns the step name. @@ -510,11 +491,22 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR return nil, tokenErr } + // After a token refresh, instance_url may have changed (Salesforce can rotate it). + // Re-inject it into pc.Current and re-resolve the URL template so the retry + // hits the correct host. + if instanceURL := s.oauthEntry.getInstanceURL(); instanceURL != "" { + pc.Current["instance_url"] = instanceURL + } + retryURL, resolveErr := s.tmpl.Resolve(s.url, pc) + if resolveErr != nil { + return nil, fmt.Errorf("http_call step %q: failed to resolve url for retry: %w", s.name, resolveErr) + } + retryBody, rawBody2, buildErr := s.buildBodyReader(pc) if buildErr != nil { return nil, buildErr } - retryReq, buildErr := s.buildRequest(ctx, resolvedURL, retryBody, rawBody2, pc, newToken) + retryReq, buildErr := s.buildRequest(ctx, retryURL, retryBody, rawBody2, pc, newToken) if buildErr != nil { return nil, buildErr } diff --git a/module/pipeline_step_http_call_test.go b/module/pipeline_step_http_call_test.go index 0890de48..0397ab0f 100644 --- a/module/pipeline_step_http_call_test.go +++ b/module/pipeline_step_http_call_test.go @@ -842,35 +842,34 @@ func TestHTTPCallStep_OAuth2Key_MissingFields(t *testing.T) { } // TestHTTPCallStep_OAuth2_InstanceURL verifies that instance_url from the token response is -// injected into the pipeline context for URL template resolution and included in step output. +// parsed, injected into the pipeline context for URL template resolution, and included in step output. func TestHTTPCallStep_OAuth2_InstanceURL(t *testing.T) { - const fakeInstanceURL = "https://myorg.my.salesforce.com" + var tokenRequests int32 + + // apiSrv is started first so its URL can be returned as instance_url from the token server. + apiSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/services/data/v62.0/sobjects" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"created": true}) + })) + defer apiSrv.Close() tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&tokenRequests, 1) w.Header().Set("Content-Type", "application/json") + // Return apiSrv.URL as instance_url so {{.instance_url}} resolves to the test server. _ = json.NewEncoder(w).Encode(map[string]any{ "access_token": "sf-token", "expires_in": 3600, "token_type": "Bearer", - "instance_url": fakeInstanceURL, + "instance_url": apiSrv.URL, }) })) defer tokenSrv.Close() - // The API server will verify the incoming URL path to confirm instance_url was used in - // template resolution. - apiSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/services/data/v62.0/sobjects" { - t.Errorf("unexpected path: %s", r.URL.Path) - } - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(map[string]any{"created": true}) - })) - defer apiSrv.Close() - factory := NewHTTPCallStepFactory() - // URL uses {{ .instance_url }} template – will be resolved to apiSrv.URL for testing. - // We override fakeInstanceURL to point at apiSrv.URL so the request actually succeeds. step, err := factory("sf-test", map[string]any{ "url": "{{.instance_url}}/services/data/v62.0/sobjects", "method": "GET", @@ -886,15 +885,16 @@ func TestHTTPCallStep_OAuth2_InstanceURL(t *testing.T) { hs := step.(*HTTPCallStep) hs.httpClient = &http.Client{} - // Pre-populate the cache entry so instance_url resolves to apiSrv.URL instead of the fake. - hs.oauthEntry.set("sf-token", apiSrv.URL, 3600*time.Second) - pc := NewPipelineContext(nil, nil) result, err := step.Execute(context.Background(), pc) if err != nil { t.Fatalf("execute error: %v", err) } + // Token endpoint should have been called exactly once. + if atomic.LoadInt32(&tokenRequests) != 1 { + t.Errorf("expected 1 token request, got %d", atomic.LoadInt32(&tokenRequests)) + } // instance_url should be present in the step output. if result.Output["instance_url"] != apiSrv.URL { t.Errorf("expected instance_url=%q in output, got %v", apiSrv.URL, result.Output["instance_url"]) @@ -972,6 +972,73 @@ func TestHTTPCallStep_OAuth2_InstanceURL_Cached(t *testing.T) { } } +// TestHTTPCallStep_OAuth2_Retry401_RefreshesInstanceURL verifies that on a 401, the retry uses +// an updated instance_url if the refreshed token response returns a new one. +func TestHTTPCallStep_OAuth2_Retry401_RefreshesInstanceURL(t *testing.T) { + var tokenRequests int32 + + // Two API servers represent two possible instance URLs. + // First call → server1 returns 401; second call (retry) → server2 returns 200. + server2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"ok": true}) + })) + defer server2.Close() + + server1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Always return 401; the real API is on server2 after token refresh. + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte(`unauthorized`)) + })) + defer server1.Close() + + tokenSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&tokenRequests, 1) + instanceURL := server1.URL // first token → server1 + if n > 1 { + instanceURL = server2.URL // refreshed token → server2 + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "access_token": fmt.Sprintf("token-v%d", n), + "expires_in": 3600, + "instance_url": instanceURL, + }) + })) + defer tokenSrv.Close() + + factory := NewHTTPCallStepFactory() + step, err := factory("retry-iurl-test", map[string]any{ + "url": "{{.instance_url}}/api/resource", + "method": "GET", + "oauth2": map[string]any{ + "token_url": tokenSrv.URL + "/token", + "client_id": "cid", + "client_secret": "csec", + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*HTTPCallStep).httpClient = &http.Client{} + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["status_code"] != http.StatusOK { + t.Errorf("expected 200 after retry, got %v", result.Output["status_code"]) + } + if atomic.LoadInt32(&tokenRequests) != 2 { + t.Errorf("expected 2 token requests (initial + refresh), got %d", atomic.LoadInt32(&tokenRequests)) + } + // After the retry, pc.Current["instance_url"] should reflect the refreshed server2 URL. + if pc.Current["instance_url"] != server2.URL { + t.Errorf("expected pc.Current[instance_url]=%q after retry, got %v", server2.URL, pc.Current["instance_url"]) + } +} + // TestHTTPCallStep_BodyFrom_NilValue verifies that body_from with a missing path sends no body. func TestHTTPCallStep_BodyFrom_NilValue(t *testing.T) { ch := make(chan []byte, 1) diff --git a/schema/module_schema.go b/schema/module_schema.go index f560bafd..4ad75605 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -963,12 +963,12 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Inputs: []ServiceIODef{{Name: "context", Type: "PipelineContext", Description: "Pipeline context with data for URL/body template resolution"}}, Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "HTTP response body parsed as JSON and merged into pipeline context"}}, ConfigFields: []ConfigFieldDef{ - {Key: "url", Label: "URL", Type: FieldTypeString, Required: true, Description: "Request URL (supports {{ .field }} templates; use {{ .instance_url }} when oauth2 is configured with an instance_url-returning endpoint)", Placeholder: "https://api.example.com/{{ .resource }}"}, + {Key: "url", Label: "URL", Type: FieldTypeString, Required: true, Description: "Request URL (supports {{ .field }} templates; {{ .instance_url }} is available when OAuth2 client_credentials auth uses a token endpoint that returns instance_url)", Placeholder: "https://api.example.com/{{ .resource }}"}, {Key: "method", Label: "Method", Type: FieldTypeSelect, Options: []string{"GET", "POST", "PUT", "PATCH", "DELETE"}, DefaultValue: "GET", Description: "HTTP method"}, {Key: "headers", Label: "Headers", Type: FieldTypeMap, MapValueType: "string", Description: "Request headers (values support templates)"}, {Key: "body", Label: "Body", Type: FieldTypeJSON, Description: "Request body (supports templates). For POST/PUT without body, sends pipeline context."}, {Key: "timeout", Label: "Timeout", Type: FieldTypeString, DefaultValue: "30s", Description: "Request timeout duration", Placeholder: "30s"}, - {Key: "oauth2", Label: "OAuth2", Type: FieldTypeJSON, Description: "OAuth2 client_credentials configuration. Tokens are cached and refreshed automatically. Fields: grant_type (default: client_credentials), token_url, client_id, client_secret, scopes. When the token endpoint returns instance_url (Salesforce pattern), it is injected as {{ .instance_url }} for URL templates and included in the step output."}, + {Key: "oauth2", Label: "OAuth2", Type: FieldTypeJSON, Description: "OAuth2 client_credentials configuration. Tokens are cached and refreshed automatically. Fields: grant_type (default: client_credentials), token_url, client_id, client_secret, scopes. When the token endpoint returns instance_url (Salesforce pattern), it is injected as {{ .instance_url }} for URL templates and included in the step output. If both the legacy auth block and oauth2 are configured, auth takes precedence and oauth2 is ignored."}, }, })