From b7251124cfdf33304f607449151154b6b41034dc Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 19 Jan 2026 13:34:25 -0700 Subject: [PATCH 1/9] Initial Cortex XDR Adapter --- containers/conf/all.go | 2 + containers/general/tool.go | 6 + corex_xdr/client.go | 464 +++++++++++++++++++++++++++++++++++++ 3 files changed, 472 insertions(+) create mode 100644 corex_xdr/client.go diff --git a/containers/conf/all.go b/containers/conf/all.go index 55784b9..2ee9d20 100755 --- a/containers/conf/all.go +++ b/containers/conf/all.go @@ -11,6 +11,7 @@ import ( "github.com/refractionPOINT/usp-adapters/cylance" "github.com/refractionPOINT/usp-adapters/defender" "github.com/refractionPOINT/usp-adapters/duo" + "github.com/refractionPOINT/usp-adapters/corex_xdr" "github.com/refractionPOINT/usp-adapters/entraid" "github.com/refractionPOINT/usp-adapters/evtx" "github.com/refractionPOINT/usp-adapters/falconcloud" @@ -65,6 +66,7 @@ type GeneralConfigs struct { MacUnifiedLogging usp_mac_unified_logging.MacUnifiedLoggingConfig `json:"mac_unified_logging" yaml:"mac_unified_logging"` AzureEventHub usp_azure_event_hub.EventHubConfig `json:"azure_event_hub" yaml:"azure_event_hub"` Duo usp_duo.DuoConfig `json:"duo" yaml:"duo"` + CortexXDR usp_cortex_xdr.CortexXDRConfig `json:"cortex_xdr" yaml:"cortex_xdr"` Gcs usp_gcs.GCSConfig `json:"gcs" yaml:"gcs"` Slack usp_slack.SlackConfig `json:"slack" yaml:"slack"` Sqs usp_sqs.SQSConfig `json:"sqs" yaml:"sqs"` diff --git a/containers/general/tool.go b/containers/general/tool.go index 4c7af0b..381007c 100755 --- a/containers/general/tool.go +++ b/containers/general/tool.go @@ -22,6 +22,7 @@ import ( "github.com/refractionPOINT/usp-adapters/bitwarden" "github.com/refractionPOINT/usp-adapters/box" "github.com/refractionPOINT/usp-adapters/cato" + usp_cortex_xdr "github.com/refractionPOINT/usp-adapters/corex_xdr" "github.com/refractionPOINT/usp-adapters/cylance" "github.com/refractionPOINT/usp-adapters/defender" "github.com/refractionPOINT/usp-adapters/duo" @@ -366,6 +367,11 @@ func runAdapter(ctx context.Context, method string, configs Configuration, showC configs.Duo.ClientOptions.Architecture = "usp_adapter" configToShow = configs.Duo client, chRunning, err = usp_duo.NewDuoAdapter(ctx, configs.Duo) + } else if method == "cortex_xdr" { + configs.CortexXDR.ClientOptions = applyLogging(configs.CortexXDR.ClientOptions) + configs.CortexXDR.ClientOptions.Architecture = "usp_adapter" + configToShow = configs.CortexXDR + client, chRunning, err = usp_cortex_xdr.NewCortexXDRAdapter(ctx, configs.CortexXDR) } else if method == "cato" { configs.Cato.ClientOptions = applyLogging(configs.Cato.ClientOptions) configs.Cato.ClientOptions.Architecture = "usp_adapter" diff --git a/corex_xdr/client.go b/corex_xdr/client.go new file mode 100644 index 0000000..e39197d --- /dev/null +++ b/corex_xdr/client.go @@ -0,0 +1,464 @@ +package usp_cortex_xdr + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "sort" + "strconv" + "sync" + "time" + + "github.com/refractionPOINT/go-uspclient" + "github.com/refractionPOINT/go-uspclient/protocol" + "github.com/refractionPOINT/usp-adapters/utils" +) + +const ( + queryInterval = 60 + incidentsEndpoint = "/public_api/v1/incidents/get_incidents/" + alertsEndpoint = "/public_api/v1/alerts/get_alerts_multi_events/" +) + +type CortexXDRConfig struct { + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + FQDN string `json:"fqdn" yaml:"fqdn"` + APIKey string `json:"api_key" yaml:"api_key"` + APIKeyID string `json:"api_key_id" yaml:"api_key_id"` + InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m +} + +type CortexXDRAdapter struct { + conf CortexXDRConfig + uspClient *uspclient.Client + httpClient *http.Client + chStopped chan struct{} + + once sync.Once + ctx context.Context + cancel context.CancelFunc + + incidentsDedupe map[string]int64 + alertsDedupe map[string]int64 +} + +type CortexXDRResponse interface { + GetData() []utils.Dict + GetResultCount() int + GetTotalCount() int +} + +type CortexXDRIncidentsResponse struct { + Reply struct { + ResultCount int `json:"result_count"` + TotalCount int `json:"total_count"` + Incidents []utils.Dict `json:"incidents"` + } `json:"reply"` +} + +func (r *CortexXDRIncidentsResponse) GetData() []utils.Dict { + return r.Reply.Incidents +} + +func (r *CortexXDRIncidentsResponse) GetResultCount() int { + return r.Reply.ResultCount +} + +func (r *CortexXDRIncidentsResponse) GetTotalCount() int { + return r.Reply.TotalCount +} + +type CortexXDRAlertsResponse struct { + Reply struct { + ResultCount int `json:"result_count"` + TotalCount int `json:"total_count"` + Alerts []utils.Dict `json:"alerts"` + } `json:"reply"` +} + +func (r *CortexXDRAlertsResponse) GetData() []utils.Dict { + return r.Reply.Alerts +} + +func (r *CortexXDRAlertsResponse) GetResultCount() int { + return r.Reply.ResultCount +} + +func (r *CortexXDRAlertsResponse) GetTotalCount() int { + return r.Reply.TotalCount +} + +func NewCortexXDRAdapter(ctx context.Context, conf CortexXDRConfig) (*CortexXDRAdapter, chan struct{}, error) { + if err := conf.Validate(); err != nil { + return nil, nil, err + } + a := &CortexXDRAdapter{ + conf: conf, + incidentsDedupe: make(map[string]int64), + alertsDedupe: make(map[string]int64), + } + + rootCtx, cancel := context.WithCancel(ctx) + a.ctx = rootCtx + a.cancel = cancel + + var err error + a.uspClient, err = uspclient.NewClient(rootCtx, conf.ClientOptions) + if err != nil { + return nil, nil, err + } + + a.httpClient = &http.Client{ + Timeout: 60 * time.Second, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 90 * time.Second, + }, + } + + a.chStopped = make(chan struct{}) + + go a.fetchEvents() + + return a, a.chStopped, nil +} + +func (c *CortexXDRConfig) Validate() error { + if err := c.ClientOptions.Validate(); err != nil { + return fmt.Errorf("client_options: %v", err) + } + if c.FQDN == "" { + return errors.New("missing fqdn") + } + if c.APIKey == "" { + return errors.New("missing api_key") + } + if c.APIKeyID == "" { + return errors.New("missing api_key_id") + } + // InitialLookback defaults to zero (current time, no lookback) + return nil +} + +func (a *CortexXDRAdapter) Close() error { + a.conf.ClientOptions.DebugLog("closing") + var err1, err2 error + a.once.Do(func() { + a.cancel() + err1 = a.uspClient.Drain(1 * time.Minute) + _, err2 = a.uspClient.Close() + a.httpClient.CloseIdleConnections() + close(a.chStopped) + }) + if err1 != nil { + return err1 + } + return err2 +} + +type API struct { + Endpoint string + Key string + ResponseType CortexXDRResponse + Dedupe map[string]int64 + timeField string + idField string +} + +func (a *CortexXDRAdapter) fetchEvents() { + since := map[string]time.Time{ + "incidents": time.Now().Add(-1 * a.conf.InitialLookback).UTC(), + "alerts": time.Now().Add(-1 * a.conf.InitialLookback).UTC(), + } + + APIs := []API{ + { + Endpoint: incidentsEndpoint, + Key: "incidents", + ResponseType: &CortexXDRIncidentsResponse{}, + timeField: "creation_time", + idField: "incident_id", + Dedupe: a.incidentsDedupe, + }, + { + Endpoint: alertsEndpoint, + Key: "alerts", + ResponseType: &CortexXDRAlertsResponse{}, + timeField: "server_creation_time", // CRITICAL: Use server_creation_time, not detection_timestamp! + idField: "alert_id", + Dedupe: a.alertsDedupe, + }, + } + + ticker := time.NewTicker(queryInterval * time.Second) + defer ticker.Stop() + + for { + select { + case <-a.ctx.Done(): + a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", a.conf.FQDN)) + return + case <-ticker.C: + // Capture current time once for all APIs in this cycle + cycleTime := time.Now() + + allItems := []utils.Dict{} + + for _, api := range APIs { + items, err := a.getEvents(since[api.Key], cycleTime, api) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.Key, err)) + } + + since[api.Key] = cycleTime.Add(-queryInterval * time.Second) + + if len(items) > 0 { + allItems = append(allItems, items...) + } + } + + if len(allItems) > 0 { + a.submitEvents(allItems) + } + } + } +} + +func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api API) ([]utils.Dict, error) { + var allItems []utils.Dict + + cutoffTime := cycleTime.Add(-2 * queryInterval * time.Second).Unix() + for k, v := range api.Dedupe { + if v < cutoffTime { + delete(api.Dedupe, k) + } + } + + sinceMs := since.UTC().UnixMilli() + + searchFrom := 0 + pageSize := 100 + + for { + requestBody := map[string]interface{}{ + "request_data": map[string]interface{}{ + "filters": []map[string]interface{}{ + { + "field": api.timeField, + "operator": "gte", + "value": sinceMs, + }, + }, + "search_from": searchFrom, + "search_to": searchFrom + pageSize, + "sort": map[string]string{ + "field": api.timeField, + "keyword": "asc", + }, + }, + } + + response, err := a.doRequest(api.Endpoint, requestBody, api) + if err != nil { + return nil, err + } + + resultCount := response.GetResultCount() + totalCount := response.GetTotalCount() + + a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s: fetched %d results (total: %d, from: %d)", + api.Key, resultCount, totalCount, searchFrom)) + + for _, event := range response.GetData() { + var dedupeID string + if idValue, exists := event[api.idField]; exists { + dedupeID = fmt.Sprintf("%v", idValue) + } else { + dedupeID = a.generateLogHash(event) + } + + var timeValue time.Time + timeField, exists := event[api.timeField] + if !exists { + a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: event missing time field '%s'", api.Key, api.timeField)) + continue + } + + switch v := timeField.(type) { + case float64: + // Handle numeric timestamp (milliseconds) + timeValue = time.UnixMilli(int64(v)) + case int64: + // Handle int64 timestamp (milliseconds) + timeValue = time.UnixMilli(v) + case uint64: + // Handle uint64 timestamp (milliseconds) + timeValue = time.UnixMilli(int64(v)) + case int: + // Handle int timestamp (milliseconds) + timeValue = time.UnixMilli(int64(v)) + case string: + // Handle string timestamp - try to parse as milliseconds + if ms, err := strconv.ParseInt(v, 10, 64); err == nil { + timeValue = time.UnixMilli(ms) + } else { + a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: could not parse string timestamp '%s'", api.Key, v)) + continue + } + default: + a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: event time field '%s' has unsupported type %T with value: %v", api.Key, api.timeField, timeField, timeField)) + continue + } + + if _, seen := api.Dedupe[dedupeID]; !seen { + if timeValue.After(since) || timeValue.Equal(since) { + api.Dedupe[dedupeID] = timeValue.Unix() + allItems = append(allItems, event) + } + } + } + + if resultCount == 0 || searchFrom+resultCount >= totalCount { + // No more results to fetch + break + } + + searchFrom += pageSize + } + + return allItems, nil +} + +func (a *CortexXDRAdapter) generateLogHash(logMap map[string]interface{}) string { + // Extract and sort keys + keys := make([]string, 0, len(logMap)) + for k := range logMap { + keys = append(keys, k) + } + sort.Strings(keys) + + // Build deterministic string representation + var buf bytes.Buffer + for _, k := range keys { + fmt.Fprintf(&buf, "%s:%v|", k, logMap[k]) + } + + hash := sha256.Sum256(buf.Bytes()) + return hex.EncodeToString(hash[:]) +} + +func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]interface{}, api API) (CortexXDRResponse, error) { + for { + select { + case <-a.ctx.Done(): + return nil, a.ctx.Err() + default: + } + var respBody []byte + var status int + + err := func() error { + loopCtx, cancel := context.WithTimeout(a.ctx, 30*time.Second) + defer cancel() + + bodyJSON, err := json.Marshal(requestBody) + if err != nil { + return err + } + + url := fmt.Sprintf("https://%s%s", a.conf.FQDN, endpoint) + req, err := http.NewRequestWithContext(loopCtx, "POST", url, bytes.NewBuffer(bodyJSON)) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("cortex xdr %s api request error: %v", api.Key, err)) + return err + } + + // Set authentication headers (Standard mode) + req.Header.Set("Authorization", a.conf.APIKey) + req.Header.Set("x-xdr-auth-id", a.conf.APIKeyID) + req.Header.Set("Content-Type", "application/json") + + resp, err := a.httpClient.Do(req) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("cortex xdr %s api do error: %v", api.Key, err)) + return err + } + + defer resp.Body.Close() + + respBody, err = io.ReadAll(resp.Body) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("cortex xdr %s api read error: %v", api.Key, err)) + return err + } + status = resp.StatusCode + return nil + }() + if err != nil { + return nil, err + } + + if status == http.StatusTooManyRequests { + a.conf.ClientOptions.OnWarning("getEventsRequest got 429, sleeping 60s before retry") + if err := a.sleepContext(60 * time.Second); err != nil { + return nil, err + } + continue + } + if status != http.StatusOK { + return nil, fmt.Errorf("cortex xdr %s api non-200: %d\nRESPONSE %s", api.Key, status, string(respBody)) + } + + err = json.Unmarshal(respBody, &api.ResponseType) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("cortex xdr %s api invalid json: %v\nResponse body: %s", api.Key, err, string(respBody))) + return nil, err + } + + return api.ResponseType, nil + } +} + +func (a *CortexXDRAdapter) submitEvents(items []utils.Dict) { + for _, item := range items { + msg := &protocol.DataMessage{ + JsonPayload: item, + TimestampMs: uint64(time.Now().UTC().UnixNano() / int64(time.Millisecond)), + } + if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { + if err == uspclient.ErrorBufferFull { + a.conf.ClientOptions.OnWarning("stream falling behind") + if err := a.uspClient.Ship(msg, 1*time.Hour); err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + a.Close() + return + } + } else { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + } + } + } +} + +func (a *CortexXDRAdapter) sleepContext(d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-timer.C: + return nil + case <-a.ctx.Done(): + return a.ctx.Err() + } +} \ No newline at end of file From 8797a0a26a45fbd540ca728ec05dfa2aca5d57ba Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Thu, 29 Jan 2026 09:57:19 -0700 Subject: [PATCH 2/9] cortex_xdr: Execute initial fetch immediately on startup The adapter previously used a ticker-based loop that only fires after the first 60-second interval. This meant no events were fetched until 60 seconds after the adapter started, causing a delayed initial sync. Refactor fetchEvents() to extract the fetch logic into a helper function and call it immediately before entering the ticker loop. This ensures events are fetched as soon as the adapter starts. Co-Authored-By: Claude Opus 4.5 --- corex_xdr/client.go | 59 +++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/corex_xdr/client.go b/corex_xdr/client.go index e39197d..dd4e282 100644 --- a/corex_xdr/client.go +++ b/corex_xdr/client.go @@ -195,12 +195,39 @@ func (a *CortexXDRAdapter) fetchEvents() { Endpoint: alertsEndpoint, Key: "alerts", ResponseType: &CortexXDRAlertsResponse{}, - timeField: "server_creation_time", // CRITICAL: Use server_creation_time, not detection_timestamp! + timeField: "server_creation_time", idField: "alert_id", Dedupe: a.alertsDedupe, }, } + // Helper function to fetch and process events for all APIs + fetchAllAPIs := func() { + cycleTime := time.Now() + allItems := []utils.Dict{} + + for _, api := range APIs { + items, err := a.getEvents(since[api.Key], cycleTime, api) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.Key, err)) + } + + since[api.Key] = cycleTime.Add(-queryInterval * time.Second) + + if len(items) > 0 { + allItems = append(allItems, items...) + } + } + + if len(allItems) > 0 { + a.submitEvents(allItems) + } + } + + // Execute immediately on startup + a.conf.ClientOptions.DebugLog("performing initial fetch") + fetchAllAPIs() + ticker := time.NewTicker(queryInterval * time.Second) defer ticker.Stop() @@ -210,27 +237,7 @@ func (a *CortexXDRAdapter) fetchEvents() { a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", a.conf.FQDN)) return case <-ticker.C: - // Capture current time once for all APIs in this cycle - cycleTime := time.Now() - - allItems := []utils.Dict{} - - for _, api := range APIs { - items, err := a.getEvents(since[api.Key], cycleTime, api) - if err != nil { - a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.Key, err)) - } - - since[api.Key] = cycleTime.Add(-queryInterval * time.Second) - - if len(items) > 0 { - allItems = append(allItems, items...) - } - } - - if len(allItems) > 0 { - a.submitEvents(allItems) - } + fetchAllAPIs() } } } @@ -249,7 +256,7 @@ func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api A searchFrom := 0 pageSize := 100 - + for { requestBody := map[string]interface{}{ "request_data": map[string]interface{}{ @@ -276,8 +283,8 @@ func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api A resultCount := response.GetResultCount() totalCount := response.GetTotalCount() - - a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s: fetched %d results (total: %d, from: %d)", + + a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s: fetched %d results (total: %d, from: %d)", api.Key, resultCount, totalCount, searchFrom)) for _, event := range response.GetData() { @@ -461,4 +468,4 @@ func (a *CortexXDRAdapter) sleepContext(d time.Duration) error { case <-a.ctx.Done(): return a.ctx.Err() } -} \ No newline at end of file +} From 58939f0f8a0f644f543da99d9d55ffb7ac24f000 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Thu, 29 Jan 2026 09:57:33 -0700 Subject: [PATCH 3/9] cortex_xdr: Only update since time on successful API fetch Previously, the since timestamp was updated even when a fetch failed. This could cause data loss: if the API returned an error, the time window would still advance, and events in that window would never be retried on subsequent fetch cycles. Now the since time is only updated when the fetch succeeds. On failure, the loop continues to the next API but preserves the previous since time, ensuring the failed time window will be retried on the next cycle. Co-Authored-By: Claude Opus 4.5 --- corex_xdr/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/corex_xdr/client.go b/corex_xdr/client.go index dd4e282..6c1b21b 100644 --- a/corex_xdr/client.go +++ b/corex_xdr/client.go @@ -210,8 +210,11 @@ func (a *CortexXDRAdapter) fetchEvents() { items, err := a.getEvents(since[api.Key], cycleTime, api) if err != nil { a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.Key, err)) + // Don't update since time on failure to avoid data loss + continue } + // Only update since time on successful fetch since[api.Key] = cycleTime.Add(-queryInterval * time.Second) if len(items) > 0 { From 6af12949e6733991730b1994af8e2ecd04523a94 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Thu, 29 Jan 2026 09:57:54 -0700 Subject: [PATCH 4/9] cortex_xdr: Increase dedupe window from 2 minutes to 30 minutes The previous 2-minute dedupe window (2 * queryInterval) was too short. If the Cortex XDR API returned events with slightly older timestamps, or if there were any processing delays, events could be re-ingested after their dedupe entries expired. Increase the window to 30 minutes to match the pattern used by other adapters (e.g., okta/client.go uses a 30-minute overlap period). This provides a safer buffer against duplicate event ingestion while still allowing memory to be reclaimed for old entries. Co-Authored-By: Claude Opus 4.5 --- corex_xdr/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/corex_xdr/client.go b/corex_xdr/client.go index 6c1b21b..04e5812 100644 --- a/corex_xdr/client.go +++ b/corex_xdr/client.go @@ -25,6 +25,7 @@ const ( queryInterval = 60 incidentsEndpoint = "/public_api/v1/incidents/get_incidents/" alertsEndpoint = "/public_api/v1/alerts/get_alerts_multi_events/" + dedupeWindow = 30 * time.Minute ) type CortexXDRConfig struct { @@ -248,7 +249,8 @@ func (a *CortexXDRAdapter) fetchEvents() { func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api API) ([]utils.Dict, error) { var allItems []utils.Dict - cutoffTime := cycleTime.Add(-2 * queryInterval * time.Second).Unix() + // Clean up old dedupe entries (30 minute window) + cutoffTime := cycleTime.Add(-dedupeWindow).Unix() for k, v := range api.Dedupe { if v < cutoffTime { delete(api.Dedupe, k) From 56af1ef9a54af290485c639fba9fb80281be9dda Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Thu, 29 Jan 2026 09:58:33 -0700 Subject: [PATCH 5/9] cortex_xdr: Add retry logic with exponential backoff for transient errors The adapter previously had no retry mechanism for transient failures. Network timeouts, connection errors, and server errors (5xx) would cause immediate failure, potentially missing events during temporary API unavailability. Add retry logic with exponential backoff: - Network errors (connection failures, timeouts): retry up to 3 times with exponential backoff starting at 5 seconds (5s -> 10s -> 20s) - Server errors (5xx status codes): same retry behavior - Rate limiting (429): unchanged, still uses fixed 60-second sleep This improves resilience against temporary API issues while avoiding infinite retry loops through the maxRetries limit. Co-Authored-By: Claude Opus 4.5 --- corex_xdr/client.go | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/corex_xdr/client.go b/corex_xdr/client.go index 04e5812..e547f47 100644 --- a/corex_xdr/client.go +++ b/corex_xdr/client.go @@ -26,6 +26,8 @@ const ( incidentsEndpoint = "/public_api/v1/incidents/get_incidents/" alertsEndpoint = "/public_api/v1/alerts/get_alerts_multi_events/" dedupeWindow = 30 * time.Minute + maxRetries = 3 + initialRetryDelay = 5 * time.Second ) type CortexXDRConfig struct { @@ -371,14 +373,19 @@ func (a *CortexXDRAdapter) generateLogHash(logMap map[string]interface{}) string } func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]interface{}, api API) (CortexXDRResponse, error) { + retryDelay := initialRetryDelay + retries := 0 + for { select { case <-a.ctx.Done(): return nil, a.ctx.Err() default: } + var respBody []byte var status int + var isTransientError bool err := func() error { loopCtx, cancel := context.WithTimeout(a.ctx, 30*time.Second) @@ -403,6 +410,7 @@ func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]int resp, err := a.httpClient.Do(req) if err != nil { + isTransientError = true a.conf.ClientOptions.OnError(fmt.Errorf("cortex xdr %s api do error: %v", api.Key, err)) return err } @@ -411,16 +419,31 @@ func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]int respBody, err = io.ReadAll(resp.Body) if err != nil { + isTransientError = true a.conf.ClientOptions.OnError(fmt.Errorf("cortex xdr %s api read error: %v", api.Key, err)) return err } status = resp.StatusCode return nil }() - if err != nil { + + // Handle transient network errors with retry + if err != nil && isTransientError { + retries++ + if retries > maxRetries { + return nil, fmt.Errorf("cortex xdr %s api failed after %d retries: %w", api.Key, maxRetries, err) + } + a.conf.ClientOptions.OnWarning(fmt.Sprintf("cortex xdr %s api transient error, retry %d/%d in %v", api.Key, retries, maxRetries, retryDelay)) + if err := a.sleepContext(retryDelay); err != nil { + return nil, err + } + retryDelay *= 2 // Exponential backoff + continue + } else if err != nil { return nil, err } + // Handle rate limiting if status == http.StatusTooManyRequests { a.conf.ClientOptions.OnWarning("getEventsRequest got 429, sleeping 60s before retry") if err := a.sleepContext(60 * time.Second); err != nil { @@ -428,6 +451,21 @@ func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]int } continue } + + // Handle server errors (5xx) with retry + if status >= 500 && status < 600 { + retries++ + if retries > maxRetries { + return nil, fmt.Errorf("cortex xdr %s api server error %d after %d retries\nRESPONSE: %s", api.Key, status, maxRetries, string(respBody)) + } + a.conf.ClientOptions.OnWarning(fmt.Sprintf("cortex xdr %s api server error %d, retry %d/%d in %v", api.Key, status, retries, maxRetries, retryDelay)) + if err := a.sleepContext(retryDelay); err != nil { + return nil, err + } + retryDelay *= 2 // Exponential backoff + continue + } + if status != http.StatusOK { return nil, fmt.Errorf("cortex xdr %s api non-200: %d\nRESPONSE %s", api.Key, status, string(respBody)) } From 2b76c4347e399ba1d26d02e066d0cdc78500b446 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Fri, 13 Feb 2026 12:44:25 -0700 Subject: [PATCH 6/9] cortex_xdr: Fix production readiness issues and rename directory - Fix Ship() error handling to stop adapter on any non-buffer-full error, matching the pattern used by all other adapters (prevents silent data loss) - Gate since-cursor advancement on successful shipping, not just fetch - Create fresh response structs per API page to avoid stale data from pointer reuse across pagination - Add context cancellation check in pagination loop for graceful shutdown - Fix pagination offset to advance by result count instead of page size, preventing data loss when API returns fewer items than total_count - Add upper-bound (lte) query filter with 5s buffer to avoid querying events still being indexed by the API - Make 401/403 auth errors fatal with clear diagnostic message - Add rate-limit hit counter with warning every 10 consecutive 429s - Rename directory from corex_xdr to cortex_xdr to match package/product name - Update import paths in containers/conf and containers/general Co-Authored-By: Claude Opus 4.6 --- containers/conf/all.go | 2 +- containers/general/tool.go | 2 +- {corex_xdr => cortex_xdr}/client.go | 137 ++++++++++++++++++---------- 3 files changed, 93 insertions(+), 48 deletions(-) rename {corex_xdr => cortex_xdr}/client.go (77%) diff --git a/containers/conf/all.go b/containers/conf/all.go index 2ee9d20..25abe2b 100755 --- a/containers/conf/all.go +++ b/containers/conf/all.go @@ -11,7 +11,7 @@ import ( "github.com/refractionPOINT/usp-adapters/cylance" "github.com/refractionPOINT/usp-adapters/defender" "github.com/refractionPOINT/usp-adapters/duo" - "github.com/refractionPOINT/usp-adapters/corex_xdr" + "github.com/refractionPOINT/usp-adapters/cortex_xdr" "github.com/refractionPOINT/usp-adapters/entraid" "github.com/refractionPOINT/usp-adapters/evtx" "github.com/refractionPOINT/usp-adapters/falconcloud" diff --git a/containers/general/tool.go b/containers/general/tool.go index 381007c..e64bae1 100755 --- a/containers/general/tool.go +++ b/containers/general/tool.go @@ -22,7 +22,7 @@ import ( "github.com/refractionPOINT/usp-adapters/bitwarden" "github.com/refractionPOINT/usp-adapters/box" "github.com/refractionPOINT/usp-adapters/cato" - usp_cortex_xdr "github.com/refractionPOINT/usp-adapters/corex_xdr" + usp_cortex_xdr "github.com/refractionPOINT/usp-adapters/cortex_xdr" "github.com/refractionPOINT/usp-adapters/cylance" "github.com/refractionPOINT/usp-adapters/defender" "github.com/refractionPOINT/usp-adapters/duo" diff --git a/corex_xdr/client.go b/cortex_xdr/client.go similarity index 77% rename from corex_xdr/client.go rename to cortex_xdr/client.go index e547f47..118edf8 100644 --- a/corex_xdr/client.go +++ b/cortex_xdr/client.go @@ -13,6 +13,7 @@ import ( "net/http" "sort" "strconv" + "strings" "sync" "time" @@ -22,12 +23,21 @@ import ( ) const ( + // queryInterval controls both the polling ticker and the since-cursor overlap. + // Each cycle queries [since, cycleTime - queryBuffer] and then sets since back + // by queryInterval from cycleTime. This creates an overlap between consecutive + // query windows so that events near the boundary are not missed. The dedupe map + // prevents duplicate delivery. Changing queryInterval without updating the + // ticker (or vice versa) will break the overlap guarantee. queryInterval = 60 incidentsEndpoint = "/public_api/v1/incidents/get_incidents/" alertsEndpoint = "/public_api/v1/alerts/get_alerts_multi_events/" dedupeWindow = 30 * time.Minute maxRetries = 3 initialRetryDelay = 5 * time.Second + // queryBuffer excludes recent events from queries to avoid race conditions + // where the API's total_count includes events not yet fully queryable. + queryBuffer = 5 * time.Second ) type CortexXDRConfig struct { @@ -144,6 +154,9 @@ func (c *CortexXDRConfig) Validate() error { if c.FQDN == "" { return errors.New("missing fqdn") } + c.FQDN = strings.TrimPrefix(c.FQDN, "https://") + c.FQDN = strings.TrimPrefix(c.FQDN, "http://") + c.FQDN = strings.TrimRight(c.FQDN, "/") if c.APIKey == "" { return errors.New("missing api_key") } @@ -171,12 +184,12 @@ func (a *CortexXDRAdapter) Close() error { } type API struct { - Endpoint string - Key string - ResponseType CortexXDRResponse - Dedupe map[string]int64 - timeField string - idField string + Endpoint string + Key string + Dedupe map[string]int64 + filterField string // field name used in API filter/sort requests + responseField string // field name in the response data for timestamp + idField string } func (a *CortexXDRAdapter) fetchEvents() { @@ -187,27 +200,26 @@ func (a *CortexXDRAdapter) fetchEvents() { APIs := []API{ { - Endpoint: incidentsEndpoint, - Key: "incidents", - ResponseType: &CortexXDRIncidentsResponse{}, - timeField: "creation_time", - idField: "incident_id", - Dedupe: a.incidentsDedupe, + Endpoint: incidentsEndpoint, + Key: "incidents", + filterField: "creation_time", + responseField: "creation_time", + idField: "incident_id", + Dedupe: a.incidentsDedupe, }, { - Endpoint: alertsEndpoint, - Key: "alerts", - ResponseType: &CortexXDRAlertsResponse{}, - timeField: "server_creation_time", - idField: "alert_id", - Dedupe: a.alertsDedupe, + Endpoint: alertsEndpoint, + Key: "alerts", + filterField: "creation_time", + responseField: "detection_timestamp", + idField: "alert_id", + Dedupe: a.alertsDedupe, }, } // Helper function to fetch and process events for all APIs fetchAllAPIs := func() { cycleTime := time.Now() - allItems := []utils.Dict{} for _, api := range APIs { items, err := a.getEvents(since[api.Key], cycleTime, api) @@ -217,16 +229,15 @@ func (a *CortexXDRAdapter) fetchEvents() { continue } - // Only update since time on successful fetch - since[api.Key] = cycleTime.Add(-queryInterval * time.Second) - if len(items) > 0 { - allItems = append(allItems, items...) + if err := a.submitEvents(items); err != nil { + // submitEvents already called Close(), stop processing + return + } } - } - if len(allItems) > 0 { - a.submitEvents(allItems) + // Only update since time after successful fetch and ship + since[api.Key] = cycleTime.Add(-queryInterval * time.Second) } } @@ -260,24 +271,36 @@ func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api A } sinceMs := since.UTC().UnixMilli() + untilMs := cycleTime.Add(-queryBuffer).UTC().UnixMilli() searchFrom := 0 pageSize := 100 for { + select { + case <-a.ctx.Done(): + return nil, a.ctx.Err() + default: + } + requestBody := map[string]interface{}{ "request_data": map[string]interface{}{ "filters": []map[string]interface{}{ { - "field": api.timeField, + "field": api.filterField, "operator": "gte", "value": sinceMs, }, + { + "field": api.filterField, + "operator": "lte", + "value": untilMs, + }, }, "search_from": searchFrom, "search_to": searchFrom + pageSize, "sort": map[string]string{ - "field": api.timeField, + "field": api.filterField, "keyword": "asc", }, }, @@ -288,13 +311,14 @@ func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api A return nil, err } + data := response.GetData() resultCount := response.GetResultCount() totalCount := response.GetTotalCount() - a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s: fetched %d results (total: %d, from: %d)", - api.Key, resultCount, totalCount, searchFrom)) + a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s: fetched %d results (total: %d, items_in_response: %d, from: %d)", + api.Key, resultCount, totalCount, len(data), searchFrom)) - for _, event := range response.GetData() { + for _, event := range data { var dedupeID string if idValue, exists := event[api.idField]; exists { dedupeID = fmt.Sprintf("%v", idValue) @@ -303,13 +327,13 @@ func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api A } var timeValue time.Time - timeField, exists := event[api.timeField] + tsValue, exists := event[api.responseField] if !exists { - a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: event missing time field '%s'", api.Key, api.timeField)) + a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: event missing time field '%s'", api.Key, api.responseField)) continue } - switch v := timeField.(type) { + switch v := tsValue.(type) { case float64: // Handle numeric timestamp (milliseconds) timeValue = time.UnixMilli(int64(v)) @@ -331,7 +355,7 @@ func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api A continue } default: - a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: event time field '%s' has unsupported type %T with value: %v", api.Key, api.timeField, timeField, timeField)) + a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: event time field '%s' has unsupported type %T with value: %v", api.Key, api.responseField, tsValue, tsValue)) continue } @@ -348,7 +372,7 @@ func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api A break } - searchFrom += pageSize + searchFrom += resultCount } return allItems, nil @@ -375,6 +399,7 @@ func (a *CortexXDRAdapter) generateLogHash(logMap map[string]interface{}) string func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]interface{}, api API) (CortexXDRResponse, error) { retryDelay := initialRetryDelay retries := 0 + rateLimitHits := 0 for { select { @@ -445,13 +470,25 @@ func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]int // Handle rate limiting if status == http.StatusTooManyRequests { - a.conf.ClientOptions.OnWarning("getEventsRequest got 429, sleeping 60s before retry") + rateLimitHits++ + if rateLimitHits%10 == 0 { + a.conf.ClientOptions.OnWarning(fmt.Sprintf("cortex xdr %s api has been rate limited %d consecutive times", api.Key, rateLimitHits)) + } + a.conf.ClientOptions.DebugLog(fmt.Sprintf("cortex xdr %s api got 429, sleeping 60s before retry (%d consecutive)", api.Key, rateLimitHits)) if err := a.sleepContext(60 * time.Second); err != nil { return nil, err } continue } + // Handle authentication errors as fatal + if status == http.StatusUnauthorized || status == http.StatusForbidden { + err := fmt.Errorf("cortex xdr %s api authentication failed (%d) - check api_key and api_key_id configuration\nRESPONSE: %s", api.Key, status, string(respBody)) + a.conf.ClientOptions.OnError(err) + a.Close() + return nil, err + } + // Handle server errors (5xx) with retry if status >= 500 && status < 600 { retries++ @@ -470,17 +507,25 @@ func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]int return nil, fmt.Errorf("cortex xdr %s api non-200: %d\nRESPONSE %s", api.Key, status, string(respBody)) } - err = json.Unmarshal(respBody, &api.ResponseType) + var response CortexXDRResponse + switch api.Key { + case "incidents": + response = &CortexXDRIncidentsResponse{} + case "alerts": + response = &CortexXDRAlertsResponse{} + } + + err = json.Unmarshal(respBody, response) if err != nil { a.conf.ClientOptions.OnError(fmt.Errorf("cortex xdr %s api invalid json: %v\nResponse body: %s", api.Key, err, string(respBody))) return nil, err } - return api.ResponseType, nil + return response, nil } } -func (a *CortexXDRAdapter) submitEvents(items []utils.Dict) { +func (a *CortexXDRAdapter) submitEvents(items []utils.Dict) error { for _, item := range items { msg := &protocol.DataMessage{ JsonPayload: item, @@ -489,16 +534,16 @@ func (a *CortexXDRAdapter) submitEvents(items []utils.Dict) { if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { if err == uspclient.ErrorBufferFull { a.conf.ClientOptions.OnWarning("stream falling behind") - if err := a.uspClient.Ship(msg, 1*time.Hour); err != nil { - a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) - a.Close() - return - } - } else { + err = a.uspClient.Ship(msg, 1*time.Hour) + } + if err != nil { a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + a.Close() + return err } } } + return nil } func (a *CortexXDRAdapter) sleepContext(d time.Duration) error { From 4eefb6b12a728d1ba5dbc3add82461439d0df0b9 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Fri, 13 Feb 2026 13:01:46 -0700 Subject: [PATCH 7/9] cortex_xdr: Remove per-request debug logging Co-Authored-By: Claude Opus 4.6 --- cortex_xdr/client.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cortex_xdr/client.go b/cortex_xdr/client.go index 118edf8..9001a8f 100644 --- a/cortex_xdr/client.go +++ b/cortex_xdr/client.go @@ -315,9 +315,6 @@ func (a *CortexXDRAdapter) getEvents(since time.Time, cycleTime time.Time, api A resultCount := response.GetResultCount() totalCount := response.GetTotalCount() - a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s: fetched %d results (total: %d, items_in_response: %d, from: %d)", - api.Key, resultCount, totalCount, len(data), searchFrom)) - for _, event := range data { var dedupeID string if idValue, exists := event[api.idField]; exists { @@ -474,7 +471,6 @@ func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]int if rateLimitHits%10 == 0 { a.conf.ClientOptions.OnWarning(fmt.Sprintf("cortex xdr %s api has been rate limited %d consecutive times", api.Key, rateLimitHits)) } - a.conf.ClientOptions.DebugLog(fmt.Sprintf("cortex xdr %s api got 429, sleeping 60s before retry (%d consecutive)", api.Key, rateLimitHits)) if err := a.sleepContext(60 * time.Second); err != nil { return nil, err } From 1399a54c5a06a001aaaa906f546e7366b4c1596a Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 2 Mar 2026 13:17:32 -0700 Subject: [PATCH 8/9] cortex_xdr: Upgrade alerts endpoint to v2 API and use server-side timestamps Switch from the legacy v1 alerts endpoint to v2 and change the filter/cursor fields from creation_time/detection_timestamp to server_creation_time/local_insert_ts. This prevents missed alerts when endpoints reconnect after being offline, as server-side ingestion timestamps are not affected by client clock delays. Co-Authored-By: Claude Opus 4.6 --- cortex_xdr/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cortex_xdr/client.go b/cortex_xdr/client.go index 9001a8f..4cf4b3f 100644 --- a/cortex_xdr/client.go +++ b/cortex_xdr/client.go @@ -31,7 +31,7 @@ const ( // ticker (or vice versa) will break the overlap guarantee. queryInterval = 60 incidentsEndpoint = "/public_api/v1/incidents/get_incidents/" - alertsEndpoint = "/public_api/v1/alerts/get_alerts_multi_events/" + alertsEndpoint = "/public_api/v2/alerts/get_alerts_multi_events/" dedupeWindow = 30 * time.Minute maxRetries = 3 initialRetryDelay = 5 * time.Second @@ -210,8 +210,8 @@ func (a *CortexXDRAdapter) fetchEvents() { { Endpoint: alertsEndpoint, Key: "alerts", - filterField: "creation_time", - responseField: "detection_timestamp", + filterField: "server_creation_time", + responseField: "local_insert_ts", idField: "alert_id", Dedupe: a.alertsDedupe, }, From 4f7cdcf4fc14489003ca8b34dac8f86e50746684 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 2 Mar 2026 16:07:40 -0700 Subject: [PATCH 9/9] cortex_xdr: Add event type to distinguish incidents from alerts Set DataMessage.EventType to "incident" or "alert" so downstream consumers (D&R rules, parsers) can differentiate between the two API sources without inspecting payload fields. Co-Authored-By: Claude Opus 4.6 --- cortex_xdr/client.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cortex_xdr/client.go b/cortex_xdr/client.go index 4cf4b3f..2ee3d26 100644 --- a/cortex_xdr/client.go +++ b/cortex_xdr/client.go @@ -186,6 +186,7 @@ func (a *CortexXDRAdapter) Close() error { type API struct { Endpoint string Key string + EventType string // value set on DataMessage.EventType to distinguish event sources Dedupe map[string]int64 filterField string // field name used in API filter/sort requests responseField string // field name in the response data for timestamp @@ -202,6 +203,7 @@ func (a *CortexXDRAdapter) fetchEvents() { { Endpoint: incidentsEndpoint, Key: "incidents", + EventType: "incident", filterField: "creation_time", responseField: "creation_time", idField: "incident_id", @@ -210,6 +212,7 @@ func (a *CortexXDRAdapter) fetchEvents() { { Endpoint: alertsEndpoint, Key: "alerts", + EventType: "alert", filterField: "server_creation_time", responseField: "local_insert_ts", idField: "alert_id", @@ -230,7 +233,7 @@ func (a *CortexXDRAdapter) fetchEvents() { } if len(items) > 0 { - if err := a.submitEvents(items); err != nil { + if err := a.submitEvents(items, api.EventType); err != nil { // submitEvents already called Close(), stop processing return } @@ -521,9 +524,10 @@ func (a *CortexXDRAdapter) doRequest(endpoint string, requestBody map[string]int } } -func (a *CortexXDRAdapter) submitEvents(items []utils.Dict) error { +func (a *CortexXDRAdapter) submitEvents(items []utils.Dict, eventType string) error { for _, item := range items { msg := &protocol.DataMessage{ + EventType: eventType, JsonPayload: item, TimestampMs: uint64(time.Now().UTC().UnixNano() / int64(time.Millisecond)), }