Skip to content

Commit e934d78

Browse files
Copilotintel352
andauthored
step.http_call: add oauth2 config key and Salesforce instance_url support (#244)
* Initial plan * feat(step.http_call): add oauth2 config key and instance_url support Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * refactor(step.http_call): address review feedback - extract helper, fix 401 retry, improve schema docs, fix instance_url test Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> Co-authored-by: Jonathan Langevin <codingsloth@pm.me>
1 parent 08478c6 commit e934d78

3 files changed

Lines changed: 492 additions & 51 deletions

File tree

module/pipeline_step_http_call.go

Lines changed: 122 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func (c *oauthTokenCache) getOrCreate(key string) *oauthCacheEntry {
5454
type oauthCacheEntry struct {
5555
mu sync.Mutex
5656
accessToken string
57+
instanceURL string // optional; populated when the token endpoint returns instance_url (Salesforce pattern)
5758
expiry time.Time
5859
sfGroup singleflight.Group
5960
}
@@ -68,19 +69,28 @@ func (e *oauthCacheEntry) get() string {
6869
return ""
6970
}
7071

71-
// set stores a token with the given TTL.
72-
func (e *oauthCacheEntry) set(token string, ttl time.Duration) {
72+
// getInstanceURL returns the cached instance_url (may be empty if the token endpoint did not return one).
73+
func (e *oauthCacheEntry) getInstanceURL() string {
74+
e.mu.Lock()
75+
defer e.mu.Unlock()
76+
return e.instanceURL
77+
}
78+
79+
// set stores a token and optional instance_url with the given TTL.
80+
func (e *oauthCacheEntry) set(token, instanceURL string, ttl time.Duration) {
7381
e.mu.Lock()
7482
defer e.mu.Unlock()
7583
e.accessToken = token
84+
e.instanceURL = instanceURL
7685
e.expiry = time.Now().Add(ttl)
7786
}
7887

79-
// invalidate clears the cached token.
88+
// invalidate clears the cached token and instance_url.
8089
func (e *oauthCacheEntry) invalidate() {
8190
e.mu.Lock()
8291
defer e.mu.Unlock()
8392
e.accessToken = ""
93+
e.instanceURL = ""
8494
e.expiry = time.Time{}
8595
}
8696

@@ -156,51 +166,86 @@ func NewHTTPCallStepFactory() StepFactory {
156166
if authCfg, ok := config["auth"].(map[string]any); ok {
157167
authType, _ := authCfg["type"].(string)
158168
if authType == "oauth2_client_credentials" {
159-
tokenURL, _ := authCfg["token_url"].(string)
160-
if tokenURL == "" {
161-
return nil, fmt.Errorf("http_call step %q: auth.token_url is required for oauth2_client_credentials", name)
162-
}
163-
clientID, _ := authCfg["client_id"].(string)
164-
if clientID == "" {
165-
return nil, fmt.Errorf("http_call step %q: auth.client_id is required for oauth2_client_credentials", name)
166-
}
167-
clientSecret, _ := authCfg["client_secret"].(string)
168-
if clientSecret == "" {
169-
return nil, fmt.Errorf("http_call step %q: auth.client_secret is required for oauth2_client_credentials", name)
170-
}
171-
172-
var scopes []string
173-
if raw, ok := authCfg["scopes"]; ok {
174-
switch v := raw.(type) {
175-
case []string:
176-
scopes = v
177-
case []any:
178-
for _, s := range v {
179-
if str, ok := s.(string); ok {
180-
scopes = append(scopes, str)
181-
}
182-
}
183-
}
169+
cfg, oauthErr := buildOAuthConfig(name, "auth", authCfg)
170+
if oauthErr != nil {
171+
return nil, oauthErr
184172
}
173+
step.auth = cfg
174+
step.oauthEntry = globalOAuthCache.getOrCreate(cfg.cacheKey)
175+
}
176+
}
185177

186-
// Cache key incorporates all credential fields so each distinct tenant/client
187-
// gets its own isolated token cache entry.
188-
cacheKey := tokenURL + "\x00" + clientID + "\x00" + clientSecret + "\x00" + strings.Join(scopes, " ")
189-
step.auth = &oauthConfig{
190-
tokenURL: tokenURL,
191-
clientID: clientID,
192-
clientSecret: clientSecret,
193-
scopes: scopes,
194-
cacheKey: cacheKey,
195-
}
196-
step.oauthEntry = globalOAuthCache.getOrCreate(cacheKey)
178+
// Support top-level "oauth2" key as an alternative to "auth" with type=oauth2_client_credentials.
179+
// This follows the syntax proposed in the issue and is more idiomatic for Salesforce-style configs:
180+
// oauth2:
181+
// grant_type: client_credentials (optional, defaults to client_credentials)
182+
// token_url: "..."
183+
// client_id: "..."
184+
// client_secret: "..."
185+
// scopes: ["api"]
186+
// Note: if the "auth" block is also present, it takes precedence and "oauth2" is ignored.
187+
if oauth2Cfg, ok := config["oauth2"].(map[string]any); ok && step.auth == nil {
188+
grantType, _ := oauth2Cfg["grant_type"].(string)
189+
if grantType == "" {
190+
grantType = "client_credentials"
191+
}
192+
if grantType != "client_credentials" {
193+
return nil, fmt.Errorf("http_call step %q: oauth2.grant_type must be 'client_credentials'", name)
197194
}
195+
cfg, oauthErr := buildOAuthConfig(name, "oauth2", oauth2Cfg)
196+
if oauthErr != nil {
197+
return nil, oauthErr
198+
}
199+
step.auth = cfg
200+
step.oauthEntry = globalOAuthCache.getOrCreate(cfg.cacheKey)
198201
}
199202

200203
return step, nil
201204
}
202205
}
203206

207+
// buildOAuthConfig parses OAuth2 client_credentials fields from a config map and returns an
208+
// oauthConfig. The prefix parameter ("auth" or "oauth2") is used in error messages.
209+
func buildOAuthConfig(stepName, prefix string, cfg map[string]any) (*oauthConfig, error) {
210+
tokenURL, _ := cfg["token_url"].(string)
211+
if tokenURL == "" {
212+
return nil, fmt.Errorf("http_call step %q: %s.token_url is required", stepName, prefix)
213+
}
214+
clientID, _ := cfg["client_id"].(string)
215+
if clientID == "" {
216+
return nil, fmt.Errorf("http_call step %q: %s.client_id is required", stepName, prefix)
217+
}
218+
clientSecret, _ := cfg["client_secret"].(string)
219+
if clientSecret == "" {
220+
return nil, fmt.Errorf("http_call step %q: %s.client_secret is required", stepName, prefix)
221+
}
222+
223+
var scopes []string
224+
if raw, ok := cfg["scopes"]; ok {
225+
switch v := raw.(type) {
226+
case []string:
227+
scopes = v
228+
case []any:
229+
for _, s := range v {
230+
if str, ok := s.(string); ok {
231+
scopes = append(scopes, str)
232+
}
233+
}
234+
}
235+
}
236+
237+
// Cache key incorporates all credential fields so each distinct tenant/client
238+
// gets its own isolated token cache entry.
239+
cacheKey := tokenURL + "\x00" + clientID + "\x00" + clientSecret + "\x00" + strings.Join(scopes, " ")
240+
return &oauthConfig{
241+
tokenURL: tokenURL,
242+
clientID: clientID,
243+
clientSecret: clientSecret,
244+
scopes: scopes,
245+
cacheKey: cacheKey,
246+
}, nil
247+
}
248+
204249
// Name returns the step name.
205250
func (s *HTTPCallStep) Name() string { return s.name }
206251

@@ -243,6 +288,7 @@ func (s *HTTPCallStep) doFetchToken(ctx context.Context) (string, error) {
243288
AccessToken string `json:"access_token"` //nolint:gosec // G117: parsing OAuth2 token response, not a secret exposure
244289
ExpiresIn float64 `json:"expires_in"`
245290
TokenType string `json:"token_type"`
291+
InstanceURL string `json:"instance_url"` // Salesforce pattern: base URL for subsequent API calls
246292
}
247293
if err := json.Unmarshal(body, &tokenResp); err != nil {
248294
return "", fmt.Errorf("http_call step %q: failed to parse token response: %w", s.name, err)
@@ -259,7 +305,7 @@ func (s *HTTPCallStep) doFetchToken(ctx context.Context) (string, error) {
259305
if ttl > 10*time.Second {
260306
ttl -= 10 * time.Second
261307
}
262-
s.oauthEntry.set(tokenResp.AccessToken, ttl)
308+
s.oauthEntry.set(tokenResp.AccessToken, tokenResp.InstanceURL, ttl)
263309

264310
return tokenResp.AccessToken, nil
265311
}
@@ -392,6 +438,22 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR
392438
ctx, cancel := context.WithTimeout(ctx, s.timeout)
393439
defer cancel()
394440

441+
// Obtain OAuth2 bearer token first so that instance_url is available for URL template resolution.
442+
var bearerToken string
443+
var err error
444+
if s.auth != nil {
445+
bearerToken, err = s.getToken(ctx)
446+
if err != nil {
447+
return nil, err
448+
}
449+
// Inject instance_url into the pipeline context so URL/header templates can reference it
450+
// as {{ .instance_url }}. This is a Salesforce pattern where the token endpoint returns the
451+
// org-specific base URL alongside the access token.
452+
if instanceURL := s.oauthEntry.getInstanceURL(); instanceURL != "" {
453+
pc.Current["instance_url"] = instanceURL
454+
}
455+
}
456+
395457
// Resolve URL template
396458
resolvedURL, err := s.tmpl.Resolve(s.url, pc)
397459
if err != nil {
@@ -403,15 +465,6 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR
403465
return nil, err
404466
}
405467

406-
// Obtain OAuth2 bearer token if auth is configured
407-
var bearerToken string
408-
if s.auth != nil {
409-
bearerToken, err = s.getToken(ctx)
410-
if err != nil {
411-
return nil, err
412-
}
413-
}
414-
415468
req, err := s.buildRequest(ctx, resolvedURL, bodyReader, rawBody, pc, bearerToken)
416469
if err != nil {
417470
return nil, err
@@ -438,11 +491,22 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR
438491
return nil, tokenErr
439492
}
440493

494+
// After a token refresh, instance_url may have changed (Salesforce can rotate it).
495+
// Re-inject it into pc.Current and re-resolve the URL template so the retry
496+
// hits the correct host.
497+
if instanceURL := s.oauthEntry.getInstanceURL(); instanceURL != "" {
498+
pc.Current["instance_url"] = instanceURL
499+
}
500+
retryURL, resolveErr := s.tmpl.Resolve(s.url, pc)
501+
if resolveErr != nil {
502+
return nil, fmt.Errorf("http_call step %q: failed to resolve url for retry: %w", s.name, resolveErr)
503+
}
504+
441505
retryBody, rawBody2, buildErr := s.buildBodyReader(pc)
442506
if buildErr != nil {
443507
return nil, buildErr
444508
}
445-
retryReq, buildErr := s.buildRequest(ctx, resolvedURL, retryBody, rawBody2, pc, newToken)
509+
retryReq, buildErr := s.buildRequest(ctx, retryURL, retryBody, rawBody2, pc, newToken)
446510
if buildErr != nil {
447511
return nil, buildErr
448512
}
@@ -459,13 +523,21 @@ func (s *HTTPCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR
459523
}
460524

461525
output := parseHTTPResponse(retryResp, respBody)
526+
if instanceURL := s.oauthEntry.getInstanceURL(); instanceURL != "" {
527+
output["instance_url"] = instanceURL
528+
}
462529
if retryResp.StatusCode >= 400 {
463530
return nil, fmt.Errorf("http_call step %q: HTTP %d: %s", s.name, retryResp.StatusCode, string(respBody))
464531
}
465532
return &StepResult{Output: output}, nil
466533
}
467534

468535
output := parseHTTPResponse(resp, respBody)
536+
if s.auth != nil {
537+
if instanceURL := s.oauthEntry.getInstanceURL(); instanceURL != "" {
538+
output["instance_url"] = instanceURL
539+
}
540+
}
469541

470542
if resp.StatusCode >= 400 {
471543
return nil, fmt.Errorf("http_call step %q: HTTP %d: %s", s.name, resp.StatusCode, string(respBody))

0 commit comments

Comments
 (0)