From b3fb8f410517fcb7c25976900b68c9f03a6a758a Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 8 Dec 2025 17:21:11 -0700 Subject: [PATCH 1/5] Darktrace Functioning Initial --- containers/conf/all.go | 2 + containers/general/tool.go | 6 + darktrace/client.go | 416 +++++++++++++++++++++++++++++++++++++ 3 files changed, 424 insertions(+) create mode 100644 darktrace/client.go diff --git a/containers/conf/all.go b/containers/conf/all.go index 55784b9..4d2e92a 100755 --- a/containers/conf/all.go +++ b/containers/conf/all.go @@ -9,6 +9,7 @@ import ( "github.com/refractionPOINT/usp-adapters/box" "github.com/refractionPOINT/usp-adapters/cato" "github.com/refractionPOINT/usp-adapters/cylance" + "github.com/refractionPOINT/usp-adapters/darktrace" "github.com/refractionPOINT/usp-adapters/defender" "github.com/refractionPOINT/usp-adapters/duo" "github.com/refractionPOINT/usp-adapters/entraid" @@ -56,6 +57,7 @@ type GeneralConfigs struct { ITGlue usp_itglue.ITGlueConfig `json:"itglue" yaml:"itglue"` Sophos usp_sophos.SophosConfig `json:"sophos" yaml:"sophos"` EntraID usp_entraid.EntraIDConfig `json:"entraid" yaml:"entraid"` + Darktrace usp_darktrace.DarktraceConfig `json:"darktrace" yaml:"darktrace"` Defender usp_defender.DefenderConfig `json:"defender" yaml:"defender"` Cato usp_cato.CatoConfig `json:"cato" yaml:"cato"` Cylance usp_cylance.CylanceConfig `json:"cylance" yaml:"cylance"` diff --git a/containers/general/tool.go b/containers/general/tool.go index 4c7af0b..deae432 100755 --- a/containers/general/tool.go +++ b/containers/general/tool.go @@ -23,6 +23,7 @@ import ( "github.com/refractionPOINT/usp-adapters/box" "github.com/refractionPOINT/usp-adapters/cato" "github.com/refractionPOINT/usp-adapters/cylance" + "github.com/refractionPOINT/usp-adapters/darktrace" "github.com/refractionPOINT/usp-adapters/defender" "github.com/refractionPOINT/usp-adapters/duo" "github.com/refractionPOINT/usp-adapters/entraid" @@ -381,6 +382,11 @@ func runAdapter(ctx context.Context, method string, configs Configuration, showC configs.EntraID.ClientOptions.Architecture = "usp_adapter" configToShow = configs.EntraID client, chRunning, err = usp_entraid.NewEntraIDAdapter(ctx, configs.EntraID) + } else if method == "darktrace" { + configs.Darktrace.ClientOptions = applyLogging(configs.Darktrace.ClientOptions) + configs.Darktrace.ClientOptions.Architecture = "usp_adapter" + configToShow = configs.Darktrace + client, chRunning, err = usp_darktrace.NewDarkTraceAdapter(ctx, configs.Darktrace) } else if method == "defender" { configs.Defender.ClientOptions = applyLogging(configs.Defender.ClientOptions) configs.Defender.ClientOptions.Architecture = "usp_adapter" diff --git a/darktrace/client.go b/darktrace/client.go new file mode 100644 index 0000000..67f8424 --- /dev/null +++ b/darktrace/client.go @@ -0,0 +1,416 @@ +package usp_darktrace + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha1" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "sort" + "sync" + "time" + + "github.com/refractionPOINT/go-uspclient" + "github.com/refractionPOINT/go-uspclient/protocol" + "github.com/refractionPOINT/usp-adapters/utils" +) + +const ( + queryInterval = 60 + aiAnalystAlerts = "/aianalyst/incidentevents?includeacknowledged=true&includeincidenteventurl=true" + modelBreachAlerts = "/modelbreaches?expandenums=true&historicmodelonly=true&includeacknowledged=true&includebreachurl=true" +) + +type DarktraceConfig struct { + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + Url string `json:"url" yaml:"url"` + PublicToken string `json:"public_token" yaml:"public_token"` + PrivateToken string `json:"private_token" yaml:"private_token"` +} + +type DarkTraceAdapter struct { + conf DarktraceConfig + uspClient *uspclient.Client + httpClient *http.Client + chStopped chan struct{} + + once sync.Once + ctx context.Context + cancel context.CancelFunc + + aiAnalystDedupe map[string]int64 + modelBreachDedupe map[string]int64 +} + +type DarktraceResponse interface { + GetDict() []utils.Dict +} + +type DarktraceEventsResponse []utils.Dict + +func (r DarktraceEventsResponse) GetDict() []utils.Dict { + return []utils.Dict(r) +} + +func NewDarkTraceAdapter(ctx context.Context, conf DarktraceConfig) (*DarkTraceAdapter, chan struct{}, error) { + if err := conf.Validate(); err != nil { + return nil, nil, err + } + a := &DarkTraceAdapter{ + conf: conf, + aiAnalystDedupe: make(map[string]int64), + modelBreachDedupe: make(map[string]int64), + } + + rootCtx, cancel := context.WithCancel(ctx) + a.ctx = rootCtx + a.cancel = cancel + + var err error + a.uspClient, err = uspclient.NewClient(ctx, conf.ClientOptions) + if err != nil { + return nil, nil, err + } + + a.httpClient = &http.Client{ + Timeout: 60 * time.Second, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 90 * time.Second, + }, + } + + a.chStopped = make(chan struct{}) + + go a.fetchEvents() + + return a, a.chStopped, nil +} + +func (c *DarktraceConfig) Validate() error { + if err := c.ClientOptions.Validate(); err != nil { + return fmt.Errorf("client_options: %v", err) + } + if c.Url == "" { + return errors.New("missing url") + } + if c.PublicToken == "" { + return errors.New("missing public token") + } + if c.PrivateToken == "" { + return errors.New("missing private token") + } + return nil +} + +func (a *DarkTraceAdapter) Close() error { + a.conf.ClientOptions.DebugLog("closing") + var err1, err2 error + a.once.Do(func() { + a.cancel() + err1 = a.uspClient.Drain(1 * time.Minute) + _, err2 = a.uspClient.Close() + a.httpClient.CloseIdleConnections() + close(a.chStopped) + }) + if err1 != nil { + return err1 + } + return err2 +} + +type API struct { + Endpoint string + Key string + ResponseType DarktraceResponse + Dedupe map[string]int64 + timeField string + timeFormat string +} + +func (a *DarkTraceAdapter) fetchEvents() { + + since := map[string]int64{ + "aiAnalyst": time.Now().Add(-24*7*time.Hour).UTC().UnixMilli(), + "modelBreaches": time.Now().Add(-24*time.Hour).UTC().UnixMilli(), + } + + APIs := []API{ + { + Endpoint: aiAnalystAlerts, + Key: "aiAnalyst", + ResponseType: &DarktraceEventsResponse{}, + timeFormat: "20060102T150405", + timeField: "createdAt", + Dedupe: a.aiAnalystDedupe, + }, + { + Endpoint: modelBreachAlerts, + Key: "modelBreaches", + ResponseType: &DarktraceEventsResponse{}, + timeFormat: "20060102T150405", + timeField: "creationTime", + Dedupe: a.modelBreachDedupe, + }, + } + + ticker := time.NewTicker(queryInterval * time.Second) + defer ticker.Stop() + + for { + select { + case <-a.ctx.Done(): + a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", a.conf.Url)) + return + case <-ticker.C: + + allItems := []utils.Dict{} + + for _, api := range APIs { + pageURL := fmt.Sprintf("%s%s", a.conf.Url, api.Endpoint) + items, newSince, err := a.getEvents(pageURL, since[api.Key], api) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.Key, err)) + continue + } + since[api.Key] = newSince + a.conf.ClientOptions.DebugLog(fmt.Sprintf("Received: %d", len(items))) + allItems = append(allItems, items...) + } + + if len(allItems) > 0 { + a.submitEvents(allItems) + } + } + } +} + +func (a *DarkTraceAdapter) getEvents(pageUrl string, since int64, api API) ([]utils.Dict, int64, error) { + var allItems []utils.Dict + lastDetectionTime := since + + defer func() { + for k, v := range api.Dedupe { + if v < time.UnixMilli(since).Add(-1*time.Minute).UnixMilli() { + delete(api.Dedupe, k) + } + } + }() + + urlWithTimes := fmt.Sprintf("%s&starttime=%d&endtime=%d", pageUrl, since, time.Now().UTC().UnixMilli()) + + a.conf.ClientOptions.DebugLog(fmt.Sprintf("Fetching %s from: %s", api.Key, urlWithTimes)) + + response, err := a.doWithRetry(urlWithTimes, api) + if err != nil { + return nil, 0, err + } + + // Debug: Log response structure + if response != nil { + events := response.GetDict() + a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s response: received %d events", api.Key, len(events))) + if len(events) > 0 { + // Log first event structure for debugging + firstEventJSON, _ := json.MarshalIndent(events[0], "", " ") + a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s first event structure:\n%s", api.Key, string(firstEventJSON))) + } + } + + for _, event := range response.GetDict() { + // Always generate hash-based ID since Darktrace logs don't have ID fields + dedupeID := a.generateLogHash(event) + + // Get timestamp - handle both string and numeric formats + var timeString time.Time + var err error + + timeValue := event[api.timeField] + switch v := timeValue.(type) { + case string: + // Parse string timestamp + timeString, err = time.Parse(api.timeFormat, v) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("darktrace %s api invalid string timestamp: %v\n%v", api.Key, err, event)) + continue + } + case float64: + // Handle numeric timestamp (milliseconds) + timeString = time.UnixMilli(int64(v)) + case int64: + // Handle int64 timestamp (milliseconds) + timeString = time.UnixMilli(v) + case uint64: + // Handle uint64 timestamp (milliseconds) + timeString = time.UnixMilli(int64(v)) + case int: + // Handle int timestamp (milliseconds) + timeString = time.UnixMilli(int64(v)) + default: + a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: event time field '%s' has unsupported type %T with value: %v", api.Key, api.timeField, timeValue, timeValue)) + continue + } + + // Check for duplicates using hash-based ID + if _, seen := api.Dedupe[dedupeID]; !seen { + if timeString.After(time.UnixMilli(since)) { + api.Dedupe[dedupeID] = time.Now().UTC().UnixMilli() + allItems = append(allItems, event) + if timeString.After(time.UnixMilli(lastDetectionTime)) { + lastDetectionTime = timeString.UnixMilli() + } + } + } + } + return allItems, lastDetectionTime, nil +} + +func (a *DarkTraceAdapter) generateLogHash(logMap map[string]interface{}) string { + // Extract and sort keys + keys := make([]string, 0, len(logMap)) + for k := range logMap { + keys = append(keys, k) + } + sort.Strings(keys) + + // Build deterministic string representation + var buf bytes.Buffer + for _, k := range keys { + fmt.Fprintf(&buf, "%s:%v|", k, logMap[k]) + } + + hash := sha256.Sum256(buf.Bytes()) + return hex.EncodeToString(hash[:]) +} + +func (a *DarkTraceAdapter) generateSignature(timeString string, fullURL string) (string, error) { + u, err := url.Parse(fullURL) + if err != nil { + return "", err + } + mac := hmac.New(sha1.New, []byte(a.conf.PrivateToken)) + payload := fmt.Sprintf("%s\n%s\n%s", u.RequestURI(), a.conf.PublicToken, timeString) + mac.Write([]byte(payload)) + return hex.EncodeToString(mac.Sum(nil)), nil +} + +func (a *DarkTraceAdapter) doWithRetry(url string, api API) (DarktraceResponse, error) { + for { + var respBody []byte + var status int + + err := func() error { + loopCtx, cancel := context.WithTimeout(a.ctx, 30*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(loopCtx, "GET", url, nil) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("darktrace %s api request error: %v", api.Key, err)) + return err + } + + nowTime := time.Now().UTC().Format(api.timeFormat) + + signature, err := a.generateSignature(nowTime, url) + if err != nil { + return err + } + + req.Header.Set("DTAPI-Token", a.conf.PublicToken) + req.Header.Set("DTAPI-Date", nowTime) + req.Header.Set("DTAPI-Signature", signature) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + resp, err := a.httpClient.Do(req) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("darktrace %s api do error: %v", api.Key, err)) + return err + } + + defer resp.Body.Close() + + respBody, err = io.ReadAll(resp.Body) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("darktrace %s api read error: %v", api.Key, err)) + return err + } + status = resp.StatusCode + return nil + }() + if err != nil { + return nil, err + } + + if status == http.StatusTooManyRequests { + a.conf.ClientOptions.OnWarning("getEventsRequest got 429, sleeping 60s before retry") + if err := a.sleepContext(60 * time.Second); err != nil { + return nil, err + } + continue + } + if status != http.StatusOK { + a.conf.ClientOptions.OnError(fmt.Errorf("darktrace %s api non-200: %d\nRESPONSE %s", api.Key, status, string(respBody))) + return nil, fmt.Errorf("darktrace %s api non-200: %d\nRESPONSE %s", api.Key, status, string(respBody)) + } + + // Debug: Log raw response body preview + respPreview := string(respBody) + if len(respPreview) > 500 { + respPreview = respPreview[:500] + "... (truncated)" + } + a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s raw response preview: %s", api.Key, respPreview)) + + err = json.Unmarshal(respBody, &api.ResponseType) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("darktrace %s api invalid json: %v\nResponse body: %s", api.Key, err, string(respBody))) + return nil, err + } + + // Debug: Log parsed response type and structure + a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s parsed response type: %T", api.Key, api.ResponseType)) + + return api.ResponseType, nil + } +} + +func (a *DarkTraceAdapter) submitEvents(items []utils.Dict) { + for _, item := range items { + msg := &protocol.DataMessage{ + JsonPayload: item, + TimestampMs: uint64(time.Now().UTC().UnixNano() / int64(time.Millisecond)), + } + if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { + if err == uspclient.ErrorBufferFull { + a.conf.ClientOptions.OnWarning("stream falling behind") + if err := a.uspClient.Ship(msg, 1*time.Hour); err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + a.Close() + return + } + } + } + } +} + +func (a *DarkTraceAdapter) sleepContext(d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-timer.C: + return nil + case <-a.ctx.Done(): + return a.ctx.Err() + } +} \ No newline at end of file From 7ef1f7d6946b571890a9f5645f4633f377b5c596 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Wed, 10 Dec 2025 11:46:10 -0700 Subject: [PATCH 2/5] Debug statements removal and touch up --- darktrace/client.go | 99 +++++++++++++++++++++------------------------ 1 file changed, 46 insertions(+), 53 deletions(-) diff --git a/darktrace/client.go b/darktrace/client.go index 67f8424..25b0830 100644 --- a/darktrace/client.go +++ b/darktrace/client.go @@ -30,10 +30,11 @@ const ( ) type DarktraceConfig struct { - ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` - Url string `json:"url" yaml:"url"` - PublicToken string `json:"public_token" yaml:"public_token"` - PrivateToken string `json:"private_token" yaml:"private_token"` + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + Url string `json:"url" yaml:"url"` + PublicToken string `json:"public_token" yaml:"public_token"` + PrivateToken string `json:"private_token" yaml:"private_token"` + InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m } type DarkTraceAdapter struct { @@ -142,9 +143,9 @@ type API struct { func (a *DarkTraceAdapter) fetchEvents() { - since := map[string]int64{ - "aiAnalyst": time.Now().Add(-24*7*time.Hour).UTC().UnixMilli(), - "modelBreaches": time.Now().Add(-24*time.Hour).UTC().UnixMilli(), + since := map[string]time.Time{ + "aiAnalyst": time.Now().Add(-1*a.conf.InitialLookback).UTC(), + "modelBreaches": time.Now().Add(-1*a.conf.InitialLookback).UTC(), } APIs := []API{ @@ -175,19 +176,23 @@ func (a *DarkTraceAdapter) fetchEvents() { a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", a.conf.Url)) return case <-ticker.C: + // Capture current time once for all APIs in this cycle + cycleTime := time.Now() allItems := []utils.Dict{} for _, api := range APIs { pageURL := fmt.Sprintf("%s%s", a.conf.Url, api.Endpoint) - items, newSince, err := a.getEvents(pageURL, since[api.Key], api) + items, err := a.getEvents(pageURL, since[api.Key], cycleTime, api) if err != nil { a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.Key, err)) continue } - since[api.Key] = newSince - a.conf.ClientOptions.DebugLog(fmt.Sprintf("Received: %d", len(items))) - allItems = append(allItems, items...) + + if len(items) > 0 { + since[api.Key] = cycleTime.Add(-queryInterval * time.Second) + allItems = append(allItems, items...) + } } if len(allItems) > 0 { @@ -197,36 +202,27 @@ func (a *DarkTraceAdapter) fetchEvents() { } } -func (a *DarkTraceAdapter) getEvents(pageUrl string, since int64, api API) ([]utils.Dict, int64, error) { +func (a *DarkTraceAdapter) getEvents(pageUrl string, since time.Time, cycleTime time.Time, api API) ([]utils.Dict, error) { var allItems []utils.Dict - lastDetectionTime := since - - defer func() { - for k, v := range api.Dedupe { - if v < time.UnixMilli(since).Add(-1*time.Minute).UnixMilli() { - delete(api.Dedupe, k) - } + + // Cull old dedupe entries - keep entries from the last lookback period + // to handle duplicates during the overlap window + cutoffTime := cycleTime.Add(-2 * queryInterval * time.Second).Unix() + for k, v := range api.Dedupe { + if v < cutoffTime { + delete(api.Dedupe, k) } - }() + } - urlWithTimes := fmt.Sprintf("%s&starttime=%d&endtime=%d", pageUrl, since, time.Now().UTC().UnixMilli()) + // Build URL with time range including overlap + sinceMs := since.UTC().UnixMilli() + endMs := cycleTime.UTC().UnixMilli() + + urlWithTimes := fmt.Sprintf("%s&starttime=%d&endtime=%d", pageUrl, sinceMs, endMs) - a.conf.ClientOptions.DebugLog(fmt.Sprintf("Fetching %s from: %s", api.Key, urlWithTimes)) - response, err := a.doWithRetry(urlWithTimes, api) if err != nil { - return nil, 0, err - } - - // Debug: Log response structure - if response != nil { - events := response.GetDict() - a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s response: received %d events", api.Key, len(events))) - if len(events) > 0 { - // Log first event structure for debugging - firstEventJSON, _ := json.MarshalIndent(events[0], "", " ") - a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s first event structure:\n%s", api.Key, string(firstEventJSON))) - } + return nil, err } for _, event := range response.GetDict() { @@ -235,9 +231,12 @@ func (a *DarkTraceAdapter) getEvents(pageUrl string, since int64, api API) ([]ut // Get timestamp - handle both string and numeric formats var timeString time.Time - var err error - timeValue := event[api.timeField] + timeValue, exists := event[api.timeField] + if !exists { + a.conf.ClientOptions.OnWarning(fmt.Sprintf("%s: event missing time field '%s'", api.Key, api.timeField)) + continue + } switch v := timeValue.(type) { case string: // Parse string timestamp @@ -265,16 +264,15 @@ func (a *DarkTraceAdapter) getEvents(pageUrl string, since int64, api API) ([]ut // Check for duplicates using hash-based ID if _, seen := api.Dedupe[dedupeID]; !seen { - if timeString.After(time.UnixMilli(since)) { - api.Dedupe[dedupeID] = time.Now().UTC().UnixMilli() + if timeString.After(since) || timeString.Equal(since) { + // Store the event timestamp for dedupe cleanup + api.Dedupe[dedupeID] = timeString.Unix() allItems = append(allItems, event) - if timeString.After(time.UnixMilli(lastDetectionTime)) { - lastDetectionTime = timeString.UnixMilli() - } } } } - return allItems, lastDetectionTime, nil + + return allItems, nil } func (a *DarkTraceAdapter) generateLogHash(logMap map[string]interface{}) string { @@ -308,6 +306,11 @@ func (a *DarkTraceAdapter) generateSignature(timeString string, fullURL string) func (a *DarkTraceAdapter) doWithRetry(url string, api API) (DarktraceResponse, error) { for { + select { + case <-a.ctx.Done(): + return nil, a.ctx.Err() + default: + } var respBody []byte var status int @@ -364,22 +367,12 @@ func (a *DarkTraceAdapter) doWithRetry(url string, api API) (DarktraceResponse, return nil, fmt.Errorf("darktrace %s api non-200: %d\nRESPONSE %s", api.Key, status, string(respBody)) } - // Debug: Log raw response body preview - respPreview := string(respBody) - if len(respPreview) > 500 { - respPreview = respPreview[:500] + "... (truncated)" - } - a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s raw response preview: %s", api.Key, respPreview)) - err = json.Unmarshal(respBody, &api.ResponseType) if err != nil { a.conf.ClientOptions.OnError(fmt.Errorf("darktrace %s api invalid json: %v\nResponse body: %s", api.Key, err, string(respBody))) return nil, err } - // Debug: Log parsed response type and structure - a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s parsed response type: %T", api.Key, api.ResponseType)) - return api.ResponseType, nil } } From 7c203fc5f58bd0b49e6552122b96a6c99a08e304 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Wed, 10 Dec 2025 16:40:19 -0800 Subject: [PATCH 3/5] Fix issues in Darktrace adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix silent error loss in submitEvents when Ship() returns non-BufferFull error - Remove duplicate error logging in doRequest for non-200 responses - Use correct context (rootCtx) for USP client initialization - Rename DarkTraceAdapter to DarktraceAdapter for consistent naming - Rename doWithRetry to doRequest (only retries on 429) - Remove trailing whitespace - Add newline at end of file 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- containers/general/tool.go | 2 +- darktrace/client.go | 33 +++++++++++++++++---------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/containers/general/tool.go b/containers/general/tool.go index deae432..f8ae90b 100755 --- a/containers/general/tool.go +++ b/containers/general/tool.go @@ -386,7 +386,7 @@ func runAdapter(ctx context.Context, method string, configs Configuration, showC configs.Darktrace.ClientOptions = applyLogging(configs.Darktrace.ClientOptions) configs.Darktrace.ClientOptions.Architecture = "usp_adapter" configToShow = configs.Darktrace - client, chRunning, err = usp_darktrace.NewDarkTraceAdapter(ctx, configs.Darktrace) + client, chRunning, err = usp_darktrace.NewDarktraceAdapter(ctx, configs.Darktrace) } else if method == "defender" { configs.Defender.ClientOptions = applyLogging(configs.Defender.ClientOptions) configs.Defender.ClientOptions.Architecture = "usp_adapter" diff --git a/darktrace/client.go b/darktrace/client.go index 25b0830..3b57759 100644 --- a/darktrace/client.go +++ b/darktrace/client.go @@ -37,7 +37,7 @@ type DarktraceConfig struct { InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m } -type DarkTraceAdapter struct { +type DarktraceAdapter struct { conf DarktraceConfig uspClient *uspclient.Client httpClient *http.Client @@ -61,11 +61,11 @@ func (r DarktraceEventsResponse) GetDict() []utils.Dict { return []utils.Dict(r) } -func NewDarkTraceAdapter(ctx context.Context, conf DarktraceConfig) (*DarkTraceAdapter, chan struct{}, error) { +func NewDarktraceAdapter(ctx context.Context, conf DarktraceConfig) (*DarktraceAdapter, chan struct{}, error) { if err := conf.Validate(); err != nil { return nil, nil, err } - a := &DarkTraceAdapter{ + a := &DarktraceAdapter{ conf: conf, aiAnalystDedupe: make(map[string]int64), modelBreachDedupe: make(map[string]int64), @@ -76,7 +76,7 @@ func NewDarkTraceAdapter(ctx context.Context, conf DarktraceConfig) (*DarkTraceA a.cancel = cancel var err error - a.uspClient, err = uspclient.NewClient(ctx, conf.ClientOptions) + a.uspClient, err = uspclient.NewClient(rootCtx, conf.ClientOptions) if err != nil { return nil, nil, err } @@ -116,7 +116,7 @@ func (c *DarktraceConfig) Validate() error { return nil } -func (a *DarkTraceAdapter) Close() error { +func (a *DarktraceAdapter) Close() error { a.conf.ClientOptions.DebugLog("closing") var err1, err2 error a.once.Do(func() { @@ -141,7 +141,7 @@ type API struct { timeFormat string } -func (a *DarkTraceAdapter) fetchEvents() { +func (a *DarktraceAdapter) fetchEvents() { since := map[string]time.Time{ "aiAnalyst": time.Now().Add(-1*a.conf.InitialLookback).UTC(), @@ -189,7 +189,7 @@ func (a *DarkTraceAdapter) fetchEvents() { continue } - if len(items) > 0 { + if len(items) > 0 { since[api.Key] = cycleTime.Add(-queryInterval * time.Second) allItems = append(allItems, items...) } @@ -202,7 +202,7 @@ func (a *DarkTraceAdapter) fetchEvents() { } } -func (a *DarkTraceAdapter) getEvents(pageUrl string, since time.Time, cycleTime time.Time, api API) ([]utils.Dict, error) { +func (a *DarktraceAdapter) getEvents(pageUrl string, since time.Time, cycleTime time.Time, api API) ([]utils.Dict, error) { var allItems []utils.Dict // Cull old dedupe entries - keep entries from the last lookback period @@ -220,7 +220,7 @@ func (a *DarkTraceAdapter) getEvents(pageUrl string, since time.Time, cycleTime urlWithTimes := fmt.Sprintf("%s&starttime=%d&endtime=%d", pageUrl, sinceMs, endMs) - response, err := a.doWithRetry(urlWithTimes, api) + response, err := a.doRequest(urlWithTimes, api) if err != nil { return nil, err } @@ -275,7 +275,7 @@ func (a *DarkTraceAdapter) getEvents(pageUrl string, since time.Time, cycleTime return allItems, nil } -func (a *DarkTraceAdapter) generateLogHash(logMap map[string]interface{}) string { +func (a *DarktraceAdapter) generateLogHash(logMap map[string]interface{}) string { // Extract and sort keys keys := make([]string, 0, len(logMap)) for k := range logMap { @@ -293,7 +293,7 @@ func (a *DarkTraceAdapter) generateLogHash(logMap map[string]interface{}) string return hex.EncodeToString(hash[:]) } -func (a *DarkTraceAdapter) generateSignature(timeString string, fullURL string) (string, error) { +func (a *DarktraceAdapter) generateSignature(timeString string, fullURL string) (string, error) { u, err := url.Parse(fullURL) if err != nil { return "", err @@ -304,7 +304,7 @@ func (a *DarkTraceAdapter) generateSignature(timeString string, fullURL string) return hex.EncodeToString(mac.Sum(nil)), nil } -func (a *DarkTraceAdapter) doWithRetry(url string, api API) (DarktraceResponse, error) { +func (a *DarktraceAdapter) doRequest(url string, api API) (DarktraceResponse, error) { for { select { case <-a.ctx.Done(): @@ -363,7 +363,6 @@ func (a *DarkTraceAdapter) doWithRetry(url string, api API) (DarktraceResponse, continue } if status != http.StatusOK { - a.conf.ClientOptions.OnError(fmt.Errorf("darktrace %s api non-200: %d\nRESPONSE %s", api.Key, status, string(respBody))) return nil, fmt.Errorf("darktrace %s api non-200: %d\nRESPONSE %s", api.Key, status, string(respBody)) } @@ -377,7 +376,7 @@ func (a *DarkTraceAdapter) doWithRetry(url string, api API) (DarktraceResponse, } } -func (a *DarkTraceAdapter) submitEvents(items []utils.Dict) { +func (a *DarktraceAdapter) submitEvents(items []utils.Dict) { for _, item := range items { msg := &protocol.DataMessage{ JsonPayload: item, @@ -391,12 +390,14 @@ func (a *DarkTraceAdapter) submitEvents(items []utils.Dict) { a.Close() return } + } else { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) } } } } -func (a *DarkTraceAdapter) sleepContext(d time.Duration) error { +func (a *DarktraceAdapter) sleepContext(d time.Duration) error { timer := time.NewTimer(d) defer timer.Stop() @@ -406,4 +407,4 @@ func (a *DarkTraceAdapter) sleepContext(d time.Duration) error { case <-a.ctx.Done(): return a.ctx.Err() } -} \ No newline at end of file +} From 953e1b5843c42f66231c1b6e00867317f248c2e9 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Wed, 10 Dec 2025 19:20:22 -0800 Subject: [PATCH 4/5] Document that InitialLookback defaults to zero (current time) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The InitialLookback field intentionally defaults to zero, meaning the adapter starts fetching events from the current moment forward with no historical lookback. This is the desired behavior as adapters run ephemerally and lookback is mainly for manual backfill jobs. Add a comment to document this intentional default behavior. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- darktrace/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/darktrace/client.go b/darktrace/client.go index 3b57759..ce76420 100644 --- a/darktrace/client.go +++ b/darktrace/client.go @@ -113,6 +113,7 @@ func (c *DarktraceConfig) Validate() error { if c.PrivateToken == "" { return errors.New("missing private token") } + // InitialLookback defaults to zero (current time, no lookback) return nil } From e9a22b7406bfbec7349db96025ee58fe36c864ba Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 2 Mar 2026 16:24:10 -0700 Subject: [PATCH 5/5] darktrace: Add event type to distinguish AI Analyst from Model Breach events Set DataMessage.EventType to "ai_analyst" or "model_breach" so downstream consumers can differentiate between the two API sources. Submit events per-API instead of accumulating into a single batch. Co-Authored-By: Claude Opus 4.6 --- darktrace/client.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/darktrace/client.go b/darktrace/client.go index ce76420..40d1d82 100644 --- a/darktrace/client.go +++ b/darktrace/client.go @@ -136,6 +136,7 @@ func (a *DarktraceAdapter) Close() error { type API struct { Endpoint string Key string + EventType string // value set on DataMessage.EventType to distinguish event sources ResponseType DarktraceResponse Dedupe map[string]int64 timeField string @@ -153,6 +154,7 @@ func (a *DarktraceAdapter) fetchEvents() { { Endpoint: aiAnalystAlerts, Key: "aiAnalyst", + EventType: "ai_analyst", ResponseType: &DarktraceEventsResponse{}, timeFormat: "20060102T150405", timeField: "createdAt", @@ -161,6 +163,7 @@ func (a *DarktraceAdapter) fetchEvents() { { Endpoint: modelBreachAlerts, Key: "modelBreaches", + EventType: "model_breach", ResponseType: &DarktraceEventsResponse{}, timeFormat: "20060102T150405", timeField: "creationTime", @@ -180,8 +183,6 @@ func (a *DarktraceAdapter) fetchEvents() { // Capture current time once for all APIs in this cycle cycleTime := time.Now() - allItems := []utils.Dict{} - for _, api := range APIs { pageURL := fmt.Sprintf("%s%s", a.conf.Url, api.Endpoint) items, err := a.getEvents(pageURL, since[api.Key], cycleTime, api) @@ -189,16 +190,12 @@ func (a *DarktraceAdapter) fetchEvents() { a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.Key, err)) continue } - + if len(items) > 0 { + a.submitEvents(items, api.EventType) since[api.Key] = cycleTime.Add(-queryInterval * time.Second) - allItems = append(allItems, items...) } } - - if len(allItems) > 0 { - a.submitEvents(allItems) - } } } } @@ -377,9 +374,10 @@ func (a *DarktraceAdapter) doRequest(url string, api API) (DarktraceResponse, er } } -func (a *DarktraceAdapter) submitEvents(items []utils.Dict) { +func (a *DarktraceAdapter) submitEvents(items []utils.Dict, eventType string) { for _, item := range items { msg := &protocol.DataMessage{ + EventType: eventType, JsonPayload: item, TimestampMs: uint64(time.Now().UTC().UnixNano() / int64(time.Millisecond)), }