From e93216cd331e522f16b4a848470252fbd85d02d7 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Fri, 21 Nov 2025 09:52:03 -0700 Subject: [PATCH 01/21] rewrite Mimecast adapter with multi-API support --- mimecast/client.go | 901 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 743 insertions(+), 158 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 8724e1c..4d8fc83 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -3,12 +3,17 @@ package usp_mimecast import ( "bytes" "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "fmt" - "io/ioutil" + "io" "net" "net/http" + "net/url" + "strconv" + "strings" "sync" "time" @@ -18,26 +23,25 @@ import ( ) const ( - baseURL = "https://api.services.mimecast.com" overlapPeriod = 30 * time.Second + queryInterval = 30 // seconds ) type MimecastAdapter struct { - conf MimecastConfig - uspClient *uspclient.Client - httpClient *http.Client - - chStopped chan struct{} - wgSenders sync.WaitGroup - doStop *utils.Event - - ctx context.Context - - dedupe map[string]int64 -} - -type AuthResponse struct { - AccessToken string `json:"access_token"` + conf MimecastConfig + uspClient *uspclient.Client + httpClient *http.Client + closeOnce sync.Once + fetchOnce sync.Once + chStopped chan struct{} + chFetchLoop chan struct{} + + ctx context.Context + cancel context.CancelFunc + + oauthToken string + tokenExpiry time.Time + tokenMu sync.Mutex } type AuditRequest struct { @@ -51,9 +55,9 @@ type AuditEvent struct { } type ApiResponse struct { - Meta MetaData `json:"meta"` - Data []AuditLog `json:"data"` - Fail []FailureDetails `json:"fail"` + Meta MetaData `json:"meta"` + Data []map[string]interface{} `json:"data"` + Fail []FailureDetails `json:"fail"` } type FailureDetails struct { @@ -86,32 +90,61 @@ type AuditLog struct { } type MimecastConfig struct { - ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` - ClientId string `json:"client_id" yaml:"client_id"` - ClientSecret string `json:"client_secret" yaml:"client_secret"` + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + ClientId string `json:"client_id" yaml:"client_id"` + ClientSecret string `json:"client_secret" yaml:"client_secret"` + BaseURL string `json:"base_url,omitempty" yaml:"base_url,omitempty"` + InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m + MaxConcurrentWorkers int `json:"max_concurrent_workers,omitempty" yaml:"max_concurrent_workers,omitempty"` } func (c *MimecastConfig) Validate() error { if err := c.ClientOptions.Validate(); err != nil { - return fmt.Errorf("client_options: %v", err) + return fmt.Errorf("client_options: %v", err) // Required } + if c.ClientId == "" { - return errors.New("missing client id") + return errors.New("missing client id") // Required } if c.ClientSecret == "" { - return errors.New("missing client secret") + return errors.New("missing client secret") // Required + } + + if c.BaseURL == "" { + c.BaseURL = "https://api.services.mimecast.com" // Default Global - May process data in other regions during failover + // US Specific - Ensures data stays within the US instance: https://us-api.services.mimecast.com + // UK Specific - Ensures data stays within the UK instance: https://uk-api.services.mimecast.com + } + + if c.InitialLookback == 0 { + c.InitialLookback = 24 * time.Hour // Default + } else { + if c.InitialLookback < 1*time.Minute { + return fmt.Errorf("initial_lookback must be at least 1 minute, got %s", c.InitialLookback) + } + if c.InitialLookback > 30*24*time.Hour { // 30 days + return fmt.Errorf("initial_lookback must be less than 30 days, got %s", c.InitialLookback) + } + } + + if c.MaxConcurrentWorkers == 0 { + c.MaxConcurrentWorkers = 10 // Default } return nil } func NewMimecastAdapter(ctx context.Context, conf MimecastConfig) (*MimecastAdapter, chan struct{}, error) { + if err := conf.Validate(); err != nil { + return nil, nil, err + } + var err error + ctxChild, cancel := context.WithCancel(ctx) a := &MimecastAdapter{ conf: conf, - ctx: context.Background(), - doStop: utils.NewEvent(), - dedupe: make(map[string]int64), + ctx: ctxChild, + cancel: cancel, } a.uspClient, err = uspclient.NewClient(ctx, conf.ClientOptions) @@ -120,187 +153,665 @@ func NewMimecastAdapter(ctx context.Context, conf MimecastConfig) (*MimecastAdap } a.httpClient = &http.Client{ - Timeout: 30 * time.Second, + Timeout: 60 * time.Second, Transport: &http.Transport{ - Dial: (&net.Dialer{ - Timeout: 10 * time.Second, - }).Dial, + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 90 * time.Second, }, } a.chStopped = make(chan struct{}) + a.chFetchLoop = make(chan struct{}) - a.wgSenders.Add(1) go a.fetchEvents() - go func() { - a.wgSenders.Wait() - close(a.chStopped) - }() - return a, a.chStopped, nil } func (a *MimecastAdapter) Close() error { a.conf.ClientOptions.DebugLog("closing") - a.doStop.Set() - a.wgSenders.Wait() - err1 := a.uspClient.Drain(1 * time.Minute) - _, err2 := a.uspClient.Close() - a.httpClient.CloseIdleConnections() + var err1, err2 error + a.closeOnce.Do(func() { + a.cancel() + select { + case <-a.chFetchLoop: + case <-time.After(10 * time.Second): + a.conf.ClientOptions.OnWarning("timeout waiting for fetch loop to exit; proceeding with cleanup") + } + err1 = a.uspClient.Drain(1 * time.Minute) + _, err2 = a.uspClient.Close() + a.httpClient.CloseIdleConnections() + close(a.chStopped) + }) if err1 != nil { return err1 } - return err2 } +// getAuthHeaders gets valid OAuth bearer token headers +func (a *MimecastAdapter) getAuthHeaders(ctx context.Context) (map[string]string, error) { + // Check if we have a valid token + a.tokenMu.Lock() + if a.oauthToken != "" && time.Now().Before(a.tokenExpiry) { + token := a.oauthToken + a.tokenMu.Unlock() + + return map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", token), + "Accept": "application/json", + "Content-Type": "application/json", + }, nil + } + a.tokenMu.Unlock() + + // Need to get a new token + return a.refreshOAuthToken(ctx) +} + +// refreshOAuthToken gets a new OAuth 2.0 access token +func (a *MimecastAdapter) refreshOAuthToken(ctx context.Context) (map[string]string, error) { + tokenURL := a.conf.BaseURL + "/oauth/token" + + // Prepare form data + data := url.Values{} + data.Set("client_id", a.conf.ClientId) + data.Set("client_secret", a.conf.ClientSecret) + data.Set("grant_type", "client_credentials") + + loopCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(loopCtx, "POST", tokenURL, strings.NewReader(data.Encode())) + if err != nil { + return nil, fmt.Errorf("failed to create token request: %w", err) + } + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + resp, err := a.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to request token: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("oauth token request failed: %d - %s", resp.StatusCode, string(body)) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + TokenType string `json:"token_type"` + } + + if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { + return nil, fmt.Errorf("failed to decode token response: %w", err) + } + + // Store the token in the ADAPTER with 60-second grace period + a.tokenMu.Lock() + a.oauthToken = tokenResp.AccessToken + a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-60) * time.Second) + a.tokenMu.Unlock() + + a.conf.ClientOptions.DebugLog(fmt.Sprintf("obtained new OAuth token, expires in %d seconds", tokenResp.ExpiresIn-60)) + + return map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", tokenResp.AccessToken), + "Accept": "application/json", + "Content-Type": "application/json", + }, nil +} + +type API struct { + mu sync.Mutex + key string + endpoint string + since time.Time + dedupe map[string]int64 + active bool + idField string + timeField string + startTimestampField string + endTimestampField string +} + +func (api *API) IsActive() bool { + api.mu.Lock() + defer api.mu.Unlock() + return api.active +} + +func (api *API) SetInactive() { + api.mu.Lock() + defer api.mu.Unlock() + api.active = false +} + +// Returned results from fetch routines +type routineResult struct { + key string + items []utils.Dict + err error +} + func (a *MimecastAdapter) fetchEvents() { - defer a.wgSenders.Done() - defer a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", baseURL)) + APIs := []*API{ + { + key: "auditEvents", + endpoint: "/api/audit/get-audit-events", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "id", + timeField: "eventTime", + startTimestampField: "startDateTime", + endTimestampField: "endDateTime", + }, + { + key: "attachment", + endpoint: "/api/ttp/attachment/get-logs", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "", + timeField: "date", + }, + { + key: "impersonation", + endpoint: "/api/ttp/impersonation/get-logs", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "id", + timeField: "eventTime", + }, + { + key: "url", + endpoint: "/api/ttp/url/get-logs", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "", + timeField: "date", + }, + { + key: "dlp", + endpoint: "/api/dlp/get-logs", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "", + timeField: "eventTime", + }, + } - since := time.Now().Add(-400 * time.Hour) + a.RunFetchLoop(APIs) +} - for !a.doStop.WaitFor(30 * time.Second) { - // The makeOneRequest function handles error - // handling and fatal error handling. - items, newSince, _ := a.makeOneRequest(since) - since = newSince - if items == nil { - continue +func (a *MimecastAdapter) shouldShutdown(apis []*API) bool { + // If no APIs are active due to 'Forbidden' messages, shutdown + for _, api := range apis { + if api.active { + return false } + } + a.conf.ClientOptions.OnWarning("all apis are disabled due to forbidden messages. shutting down") + return true +} - for _, item := range items { - msg := &protocol.DataMessage{ - JsonPayload: item, - TimestampMs: uint64(time.Now().UnixNano() / int64(time.Millisecond)), +func (a *MimecastAdapter) RunFetchLoop(apis []*API) { + cycleSem := make(chan struct{}, 1) + shipperSem := make(chan struct{}, 2) + workerSem := make(chan struct{}, a.conf.MaxConcurrentWorkers) + ticker := time.NewTicker(queryInterval * time.Second) + defer ticker.Stop() + + for { + select { + case <-a.ctx.Done(): + a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", a.conf.BaseURL)) + + // Wait for any ongoing cycle to complete with timeout + select { + case cycleSem <- struct{}{}: + // No cycle in progress, safe to exit + <-cycleSem + case <-time.After(30 * time.Second): + a.conf.ClientOptions.OnWarning("timeout waiting for fetch cycle to complete during shutdown") } - if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { - if err == uspclient.ErrorBufferFull { - a.conf.ClientOptions.OnWarning("stream falling behind") - err = a.uspClient.Ship(msg, 1*time.Hour) - } - if err == nil { - continue - } - a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) - a.doStop.Set() - return + + // Fetch loop isn't running, safe to close + a.fetchOnce.Do(func() { close(a.chFetchLoop) }) + return + case <-ticker.C: + select { + case cycleSem <- struct{}{}: + go func() { + // Hold cycle semaphore until cycle completes + defer func() { <-cycleSem }() + + // If no APIs are active due to forbidden messages, shutdown + if a.shouldShutdown(apis) { + a.Close() + return + } + + // Capture current time once for all APIs in this cycle + cycleTime := time.Now() + + // Communication channel to send results to shipper routine + shipCh := make(chan []utils.Dict) + // Used to flag when the shipper routine is done. + shipDone := make(chan struct{}) + + // shipper routine + go func() { + var shipperWg sync.WaitGroup + var mu sync.Mutex + + count := 0 + + defer func() { + a.conf.ClientOptions.DebugLog(fmt.Sprintf("shipped %d events", count)) + close(shipDone) + }() + + // shipper routine will run until shipCh closes + for events := range shipCh { + eventsCopy := events + // Consume a slot when spinning up a shipper routine + shipperSem <- struct{}{} + shipperWg.Add(1) + go func(events []utils.Dict) { + // Release a slot when done shipping + // Decrement shipperWg + defer func() { + <-shipperSem + shipperWg.Done() + }() + mu.Lock() + count += len(events) + mu.Unlock() + a.submitEvents(events) + }(eventsCopy) + } + shipperWg.Wait() + }() + + // Channel for returning fetch data and checking for errors + resultCh := make(chan routineResult) + + var wg sync.WaitGroup + + // fetchApi routines + for i := range apis { + // Check if signal to close has been sent before starting any fetches + select { + case <-a.ctx.Done(): + return + default: + } + // If the current api is disabled due to forbidden message, skip + if !apis[i].IsActive() { + continue + } + workerSem <- struct{}{} + wg.Add(1) + go func(api *API) { + defer func() { + <-workerSem + wg.Done() + }() + a.fetchApi(api, cycleTime, resultCh) + }(apis[i]) + } + + go func() { + wg.Wait() + // Wait until all fetch goroutines are done to close the channel + close(resultCh) + }() + + // Blocking while fetchApi routines collect events + // Events are passed off as they come in to the shipper routine + for res := range resultCh { + if res.err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", res.key, res.err)) + continue + } + if len(res.items) > 0 { + shipCh <- res.items + } + } + + // resultCh has closed, meaning all events have been pooled for shipping + close(shipCh) + + // Wait until shipping is done + <-shipDone + }() + default: + a.conf.ClientOptions.OnWarning("previous fetch cycle is still in progress, skipping this cycle") } } } } -func (a *MimecastAdapter) makeOneRequest(since time.Time) ([]utils.Dict, time.Time, error) { +func (a *MimecastAdapter) fetchApi(api *API, cycleTime time.Time, resultCh chan<- routineResult) { + fetchCtx, cancelFetch := context.WithCancel(a.ctx) + defer cancelFetch() + + // Check for a close signal + select { + case <-fetchCtx.Done(): + return + default: + } + + items, err := a.makeOneRequest(api, cycleTime) + if err != nil { + resultCh <- routineResult{api.key, nil, err} + return + } + + if len(items) > 0 { + resultCh <- routineResult{api.key, items, nil} + } +} + +func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils.Dict, error) { var allItems []utils.Dict - currentTime := time.Now() var start string - var lastDetectionTime time.Time + var retryCount int + var querySucceeded bool // Track if we successfully completed the query - if t := currentTime.Add(-overlapPeriod); t.Before(since) { - start = since.UTC().Format("2006-01-02T15:04:05-0700") - } else { - start = currentTime.Add(-overlapPeriod).UTC().Format("2006-01-02T15:04:05-0700") - } - end := currentTime.UTC().Format("2006-01-02T15:04:05-0700") + api.mu.Lock() + start = api.since.UTC().Format(time.RFC3339) + api.mu.Unlock() + + end := cycleTime.UTC().Format(time.RFC3339) pageToken := "" - url := baseURL + "/api/audit/get-audit-events" + url := a.conf.BaseURL + api.endpoint for { - // Prepare the request. - a.conf.ClientOptions.DebugLog(fmt.Sprintf("requesting from %s start %s end %s", url, start, end)) + var respBody []byte + var status int + var retryAfterInt int + var retryAfterTime time.Time + + var startTimestampField string + var endTimestampField string + + if api.startTimestampField == "" { + startTimestampField = "from" + } else { + startTimestampField = api.startTimestampField + } + if api.endTimestampField == "" { + endTimestampField = "to" + } else { + endTimestampField = api.endTimestampField + } auditData := map[string]interface{}{ "meta": map[string]interface{}{ "pagination": map[string]interface{}{ - "pageSize": 50, + "pageSize": 500, }, }, "data": []map[string]string{ { - "startDateTime": start, - "endDateTime": end, + startTimestampField: start, + endTimestampField: end, }, }, } if pageToken != "" { - auditData["meta"].(map[string]interface{})["pagination"].(map[string]interface{})["pageToken"] = pageToken + if meta, ok := auditData["meta"].(map[string]interface{}); ok { + if pagination, ok := meta["pagination"].(map[string]interface{}); ok { + pagination["pageToken"] = pageToken + } + } } - jsonData, err := json.Marshal(auditData) + err := func() error { + jsonData, err := json.Marshal(auditData) + if err != nil { + return err + } + + loopCtx, cancel := context.WithTimeout(a.ctx, 30*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(loopCtx, "POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + + headers, err := a.getAuthHeaders(loopCtx) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("failed to get OAuth token: %v", err)) + return err + } + + // Set all required Mimecast headers + for key, value := range headers { + req.Header.Set(key, value) + } + resp, err := a.httpClient.Do(req) + + if err != nil { + return err + } + defer resp.Body.Close() + + respBody, err = io.ReadAll(resp.Body) + + status = resp.StatusCode + ra := resp.Header.Get("Retry-After") + + if ra == "" { + retryAfterInt = 0 + retryAfterTime = time.Time{} + } else if secs, err := strconv.Atoi(ra); err == nil { + retryAfterInt = secs + retryAfterTime = time.Time{} + } else if t, err := http.ParseTime(ra); err == nil { + retryAfterInt = 0 + retryAfterTime = t + } + + if err != nil { + return err + } + + return nil + }() + if err != nil { - return nil, lastDetectionTime, err + return nil, err } - req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) - if err != nil { - return nil, lastDetectionTime, err + if status == http.StatusTooManyRequests { + if retryAfterInt != 0 { + a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header, sleeping %ds before retry", retryAfterInt)) + if err := sleepContext(a.ctx, time.Duration(retryAfterInt)*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + } else if !retryAfterTime.IsZero() { + retryUntilTime := time.Until(retryAfterTime).Seconds() + a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header with time %v, sleeping %vs before retry", retryAfterTime, retryUntilTime)) + if err := sleepContext(a.ctx, time.Duration(retryUntilTime)*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + } else { + a.conf.ClientOptions.OnWarning("makeOneRequest got 429 without 'Retry-After' header, sleeping 60s before retry") + if err := sleepContext(a.ctx, 60*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + } + // Try again after waiting 'Retry-After' value + continue } - token, err := a.getAuthToken() - if err != nil { - return nil, lastDetectionTime, err + if status == http.StatusUnauthorized { + // Clear the cached token in the ADAPTER and retry once + a.tokenMu.Lock() + a.oauthToken = "" + a.tokenExpiry = time.Time{} + a.tokenMu.Unlock() + + if retryCount < 3 { + a.conf.ClientOptions.OnWarning("received 401, clearing token cache and trying again") + retryCount++ + continue + } + err := fmt.Errorf("mimecast unauthorized: could not reauthenticate for API %s", api.key) + if len(allItems) > 0 { + return allItems, err + } + return nil, err } - req.Header.Set("Authorization", "Bearer "+token) - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - return nil, lastDetectionTime, err + if status == http.StatusForbidden { + err := fmt.Errorf("mimecast forbidden: %d\nRESPONSE: %s\nAPI '%s' will be disabled", status, string(respBody), api.key) + a.conf.ClientOptions.OnError(err) + api.SetInactive() + // Since this API is now disabled, update api.since to prevent it from getting stuck + // on this time window forever + api.mu.Lock() + api.since = cycleTime.Add(-overlapPeriod) + api.mu.Unlock() + if len(allItems) > 0 { + return allItems, nil + } + return nil, nil } - defer resp.Body.Close() - - // Evaluate if success. - if resp.StatusCode != http.StatusOK { - body, _ := ioutil.ReadAll(resp.Body) - a.conf.ClientOptions.OnError(fmt.Errorf("mimecast api non-200: %s\nREQUEST: %s\nRESPONSE: %s", resp.Status, string(body), string(body))) - return nil, lastDetectionTime, err + if status >= 500 && status < 600 { + err := fmt.Errorf("mimecast server error: %d\nRESPONSE: %s", status, string(respBody)) + a.conf.ClientOptions.OnError(err) + // We don't want this to be handled like an error + // The hope is these errors are temporary + if len(allItems) > 0 { + return allItems, nil + } + return nil, nil } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - a.conf.ClientOptions.OnError(fmt.Errorf("error: %v", err)) - return nil, lastDetectionTime, err + if status != http.StatusOK { + err := fmt.Errorf("mimecast api non-200: %d\nRESPONSE: %s", status, string(respBody)) + a.conf.ClientOptions.OnError(err) + if len(allItems) > 0 { + return allItems, err + } + return nil, err } // Parse the response. var response ApiResponse - //a.conf.ClientOptions.DebugLog(fmt.Sprintf("results: %s", body)) - err = json.Unmarshal(body, &response) + err = json.Unmarshal(respBody, &response) if err != nil { a.conf.ClientOptions.OnError(fmt.Errorf("mimecast api invalid json: %v", err)) - return nil, lastDetectionTime, err + return nil, err + } + + // Check for Mimecast-specific errors in the fail array + if len(response.Fail) > 0 { + var errorMessages []string + for _, failure := range response.Fail { + for _, errDetail := range failure.Errors { + errorMessages = append(errorMessages, fmt.Sprintf("%s: %s (retryable: %v)", errDetail.Code, errDetail.Message, errDetail.Retryable)) + } + } + a.conf.ClientOptions.OnError(fmt.Errorf("mimecast api returned errors: %v", errorMessages)) + return nil, fmt.Errorf("mimecast api errors: %v", errorMessages) } - //responseStr, _ := json.Marshal(response) - //a.conf.ClientOptions.DebugLog(fmt.Sprintf("results: %s", responseStr)) // Collect items. items := response.Data var newItems []utils.Dict - lastDetectionTime = since + for _, item := range items { - newItem := utils.Dict{ - "id": item.ID, - "auditType": item.AuditType, - "user": item.User, - "eventTime": item.EventTime, - "eventInfo": item.EventInfo, - "category": item.Category, + // Check if this item has nested structure (logType -> array of logs) + // or if it's flat structure (item IS the log itself) + // Known nested log types that contain arrays of logs + knownLogTypes := []string{"attachmentLogs", "impersonationLogs", "urlLogs", "dlpLogs"} + hasNestedStructure := false + for _, logType := range knownLogTypes { + if _, exists := item[logType]; exists { + hasNestedStructure = true + break + } } - timestamp := item.EventTime - eventid := item.ID - if _, ok := a.dedupe[eventid]; ok { - continue + if hasNestedStructure { + for logType, logsInterface := range item { + logs, ok := logsInterface.([]interface{}) + if !ok { + // Skip non-array fields (like resultCount, metadata, etc.) + continue + } + + for _, logInterface := range logs { + logMap, ok := logInterface.(map[string]interface{}) + if !ok { + continue + } + + // Handle deduplication FIRST + if !a.processLogItem(api, logMap) { + continue + } + + // Create a new item with the log data plus the logType + newItem := utils.Dict{ + "logType": logType, + "eventType": api.key, + } + + // Copy all fields from the log + for k, v := range logMap { + newItem[k] = v + } + + newItems = append(newItems, newItem) + } + } + } else { + // Handle deduplication FIRST + if !a.processLogItem(api, item) { + continue + } + + newItem := utils.Dict{ + "logType": api.key, + "eventType": api.key, + } + + for k, v := range item { + newItem[k] = v + } + + newItems = append(newItems, newItem) } - epoch, _ := time.Parse(time.RFC3339, timestamp) - a.dedupe[eventid] = epoch.Unix() - newItems = append(newItems, newItem) - lastDetectionTime = epoch } + allItems = append(allItems, newItems...) // Check if we need to make another request. @@ -311,42 +822,116 @@ func (a *MimecastAdapter) makeOneRequest(since time.Time) ([]utils.Dict, time.Ti } } - // Cull old dedupe entries. - for k, v := range a.dedupe { - if v < time.Now().Add(-overlapPeriod).Unix() { - delete(a.dedupe, k) + // Mark query as successful - we completed pagination successfully + querySucceeded = true + + // Update api.since to the end time of this request window + // This ensures we don't miss any events in future requests + // ONLY update if query succeeded + if querySucceeded { + api.mu.Lock() + api.since = cycleTime.Add(-overlapPeriod) + a.conf.ClientOptions.DebugLog(fmt.Sprintf("api.since updated to %s (with %v overlap)", api.since.UTC().Format(time.RFC3339), overlapPeriod)) + api.mu.Unlock() + } + + // Cull old dedupe entries - keep entries from the last lookback period + // to handle duplicates during the overlap window + // We can clean up regardless of query success + cutoffTime := cycleTime.Add(-2 * overlapPeriod).Unix() + api.mu.Lock() + for k, v := range api.dedupe { + if v < cutoffTime { + delete(api.dedupe, k) } } + a.conf.ClientOptions.DebugLog(fmt.Sprintf("After Delete, %s %d", api.key, len(api.dedupe))) + api.mu.Unlock() - return allItems, lastDetectionTime, nil + a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s + %d (success=%v)", api.key, len(allItems), querySucceeded)) + return allItems, nil } -func (a *MimecastAdapter) getAuthToken() (string, error) { - url := baseURL + "/oauth/token" - body := "grant_type=client_credentials&client_id=" + a.conf.ClientId + "&client_secret=" + a.conf.ClientSecret +func (a *MimecastAdapter) processLogItem(api *API, logMap map[string]interface{}) bool { + var dedupeID string - req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(body))) - if err != nil { - return "", err + // Determine the deduplication ID + if api.idField != "" { + // Try to use the specified ID field + if idVal, ok := logMap[api.idField]; ok { + if id, ok := idVal.(string); ok && id != "" { + dedupeID = id + } + } } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return "", err + // If no ID field specified or ID is empty, generate hash from log content + if dedupeID == "" { + dedupeID = a.generateLogHash(logMap) } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return "", fmt.Errorf("failed to get token, status code: %d", resp.StatusCode) + // Check for duplicates + api.mu.Lock() + if _, exists := api.dedupe[dedupeID]; exists { + api.mu.Unlock() + return false // Skip duplicate } - respBody, _ := ioutil.ReadAll(resp.Body) - var authResp AuthResponse - if err := json.Unmarshal(respBody, &authResp); err != nil { - return "", err + dedupeTimestamp := time.Now().Unix() // Default fallback + if timeVal, ok := logMap[api.timeField]; ok { + if timestamp, ok := timeVal.(string); ok { + if epoch, err := time.Parse(time.RFC3339, timestamp); err == nil { + dedupeTimestamp = epoch.Unix() + } + } } + api.dedupe[dedupeID] = dedupeTimestamp + api.mu.Unlock() - return authResp.AccessToken, nil + return true // Include this item +} + +// New function to generate a hash from log content +func (a *MimecastAdapter) generateLogHash(logMap map[string]interface{}) string { + // Convert the log map to a stable JSON representation + jsonBytes, err := json.Marshal(logMap) + if err != nil { + // Fallback: use timestamp if available + return fmt.Sprintf("hash-error-%d", time.Now().UnixNano()) + } + + // Generate SHA256 hash + hash := sha256.Sum256(jsonBytes) + return hex.EncodeToString(hash[:]) +} + +func (a *MimecastAdapter) submitEvents(events []utils.Dict) { + for _, item := range events { + msg := &protocol.DataMessage{ + JsonPayload: item, + TimestampMs: uint64(time.Now().UnixNano() / int64(time.Millisecond)), + } + if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { + if err == uspclient.ErrorBufferFull { + a.conf.ClientOptions.OnWarning("stream falling behind") + if err := a.uspClient.Ship(msg, 1*time.Hour); err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + a.Close() + return + } + } + } + } +} + +func sleepContext(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } } From a0582e3a71cf8729735a1bd7ca51760de1734c28 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Fri, 21 Nov 2025 11:44:07 -0700 Subject: [PATCH 02/21] Remove Debug Messages --- mimecast/client.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 4d8fc83..f662bec 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -261,8 +261,6 @@ func (a *MimecastAdapter) refreshOAuthToken(ctx context.Context) (map[string]str a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-60) * time.Second) a.tokenMu.Unlock() - a.conf.ClientOptions.DebugLog(fmt.Sprintf("obtained new OAuth token, expires in %d seconds", tokenResp.ExpiresIn-60)) - return map[string]string{ "Authorization": fmt.Sprintf("Bearer %s", tokenResp.AccessToken), "Accept": "application/json", @@ -420,7 +418,6 @@ func (a *MimecastAdapter) RunFetchLoop(apis []*API) { count := 0 defer func() { - a.conf.ClientOptions.DebugLog(fmt.Sprintf("shipped %d events", count)) close(shipDone) }() @@ -831,7 +828,6 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils if querySucceeded { api.mu.Lock() api.since = cycleTime.Add(-overlapPeriod) - a.conf.ClientOptions.DebugLog(fmt.Sprintf("api.since updated to %s (with %v overlap)", api.since.UTC().Format(time.RFC3339), overlapPeriod)) api.mu.Unlock() } @@ -845,10 +841,9 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils delete(api.dedupe, k) } } - a.conf.ClientOptions.DebugLog(fmt.Sprintf("After Delete, %s %d", api.key, len(api.dedupe))) + api.mu.Unlock() - a.conf.ClientOptions.DebugLog(fmt.Sprintf("%s + %d (success=%v)", api.key, len(allItems), querySucceeded)) return allItems, nil } From 79a085481f3b1e439d27ab582967363b9946bdf2 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 09:36:19 -0700 Subject: [PATCH 03/21] Token Refresh Race Condition --- mimecast/client.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index f662bec..f2d536d 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -208,7 +208,8 @@ func (a *MimecastAdapter) getAuthHeaders(ctx context.Context) (map[string]string "Content-Type": "application/json", }, nil } - a.tokenMu.Unlock() + + defer a.tokenMu.Unlock() // Need to get a new token return a.refreshOAuthToken(ctx) @@ -256,10 +257,8 @@ func (a *MimecastAdapter) refreshOAuthToken(ctx context.Context) (map[string]str } // Store the token in the ADAPTER with 60-second grace period - a.tokenMu.Lock() a.oauthToken = tokenResp.AccessToken a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-60) * time.Second) - a.tokenMu.Unlock() return map[string]string{ "Authorization": fmt.Sprintf("Bearer %s", tokenResp.AccessToken), From e73d07f5dfb86fd32237c684ecc2e2b2f97a2fcd Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 09:47:06 -0700 Subject: [PATCH 04/21] Potential for negative token expiry --- mimecast/client.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index f2d536d..f30bdfd 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -256,9 +256,12 @@ func (a *MimecastAdapter) refreshOAuthToken(ctx context.Context) (map[string]str return nil, fmt.Errorf("failed to decode token response: %w", err) } - // Store the token in the ADAPTER with 60-second grace period + gracePeriod := 60 + if tokenResp.ExpiresIn < gracePeriod { + gracePeriod = 0 // or tokenResp.ExpiresIn / 2, or some other logic + } a.oauthToken = tokenResp.AccessToken - a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-60) * time.Second) + a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-gracePeriod) * time.Second) return map[string]string{ "Authorization": fmt.Sprintf("Bearer %s", tokenResp.AccessToken), From e210440a6836be750c88afb9a60ce2539bc23968 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 10:04:09 -0700 Subject: [PATCH 05/21] Deterministic Serialization for hashing IDs for logs without IDs --- mimecast/client.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index f30bdfd..36a7d65 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -12,6 +12,7 @@ import ( "net" "net/http" "net/url" + "sort" "strconv" "strings" "sync" @@ -888,18 +889,22 @@ func (a *MimecastAdapter) processLogItem(api *API, logMap map[string]interface{} return true // Include this item } -// New function to generate a hash from log content func (a *MimecastAdapter) generateLogHash(logMap map[string]interface{}) string { - // Convert the log map to a stable JSON representation - jsonBytes, err := json.Marshal(logMap) - if err != nil { - // Fallback: use timestamp if available - return fmt.Sprintf("hash-error-%d", time.Now().UnixNano()) - } - - // Generate SHA256 hash - hash := sha256.Sum256(jsonBytes) - return hex.EncodeToString(hash[:]) + // Extract and sort keys + keys := make([]string, 0, len(logMap)) + for k := range logMap { + keys = append(keys, k) + } + sort.Strings(keys) + + // Build deterministic string representation + var buf bytes.Buffer + for _, k := range keys { + fmt.Fprintf(&buf, "%s:%v|", k, logMap[k]) + } + + hash := sha256.Sum256(buf.Bytes()) + return hex.EncodeToString(hash[:]) } func (a *MimecastAdapter) submitEvents(events []utils.Dict) { From fda07c3e1a36ea34b16f621a8e8dcecd6e04a5a2 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 10:05:39 -0700 Subject: [PATCH 06/21] Correcting Context Mismatch --- mimecast/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mimecast/client.go b/mimecast/client.go index 36a7d65..4741eaf 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -148,7 +148,7 @@ func NewMimecastAdapter(ctx context.Context, conf MimecastConfig) (*MimecastAdap cancel: cancel, } - a.uspClient, err = uspclient.NewClient(ctx, conf.ClientOptions) + a.uspClient, err = uspclient.NewClient(ctxChild, conf.ClientOptions) if err != nil { return nil, nil, err } From 1aa30f73b1b4ea20f1570f1bedc29f716fdba2b5 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 10:13:41 -0700 Subject: [PATCH 07/21] Potential Negative Retry-After Duration --- mimecast/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mimecast/client.go b/mimecast/client.go index 4741eaf..d514d5a 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -651,6 +651,8 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils } return nil, err } + } else if retryAfterTime.Before(time.Now()) { + continue } else if !retryAfterTime.IsZero() { retryUntilTime := time.Until(retryAfterTime).Seconds() a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header with time %v, sleeping %vs before retry", retryAfterTime, retryUntilTime)) From b3483917e1c2e12ddde1d331d0b4ccd2ef089fe8 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 10:27:56 -0700 Subject: [PATCH 08/21] Setting MaxConcurrentWorkers upper limit --- mimecast/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mimecast/client.go b/mimecast/client.go index d514d5a..2e937be 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -130,6 +130,8 @@ func (c *MimecastConfig) Validate() error { if c.MaxConcurrentWorkers == 0 { c.MaxConcurrentWorkers = 10 // Default + } else if c.MaxConcurrentWorkers > 100 { + return fmt.Errorf("max_concurrent_workers cannot exceed 100, got %d", c.MaxConcurrentWorkers) } return nil From 9d1555d5146a4e43a18335906939b5105a4ec8d4 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 10:48:37 -0700 Subject: [PATCH 09/21] Retryable Errors --- mimecast/client.go | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 2e937be..dad29b7 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -533,6 +533,7 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils var allItems []utils.Dict var start string var retryCount int + var retryableErrorCount int var querySucceeded bool // Track if we successfully completed the query api.mu.Lock() @@ -738,12 +739,43 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils // Check for Mimecast-specific errors in the fail array if len(response.Fail) > 0 { var errorMessages []string + allRetryable := true + for _, failure := range response.Fail { for _, errDetail := range failure.Errors { - errorMessages = append(errorMessages, fmt.Sprintf("%s: %s (retryable: %v)", errDetail.Code, errDetail.Message, errDetail.Retryable)) + errorMessages = append(errorMessages, fmt.Sprintf("%s: %s (retryable: %v)", + errDetail.Code, errDetail.Message, errDetail.Retryable)) + if !errDetail.Retryable { + allRetryable = false + } + } + } + + if allRetryable { + retryableErrorCount++ + + if retryableErrorCount >= 5 { + a.conf.ClientOptions.OnError(fmt.Errorf("max retries exceeded for retryable errors: %v", errorMessages)) + if len(allItems) > 0 { + return allItems, fmt.Errorf("mimecast api errors after %d retries: %v", retryableErrorCount, errorMessages) + } + return nil, fmt.Errorf("mimecast api errors after %d retries: %v", retryableErrorCount, errorMessages) + } + + // Log warning and retry + a.conf.ClientOptions.OnWarning(fmt.Sprintf("mimecast api returned retryable errors: %v, retrying", errorMessages)) + + if err := sleepContext(a.ctx, 5*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err } + continue // Retry the request } - a.conf.ClientOptions.OnError(fmt.Errorf("mimecast api returned errors: %v", errorMessages)) + + // Non-retryable error - fail + a.conf.ClientOptions.OnError(fmt.Errorf("mimecast api returned non-retryable errors: %v", errorMessages)) return nil, fmt.Errorf("mimecast api errors: %v", errorMessages) } From 9c543d9cf6182cff4b2b8e14c492b730f81868ae Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 11:03:22 -0700 Subject: [PATCH 10/21] Remove unused variable (dead code) --- mimecast/client.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index dad29b7..b6c2b3a 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -420,8 +420,6 @@ func (a *MimecastAdapter) RunFetchLoop(apis []*API) { var shipperWg sync.WaitGroup var mu sync.Mutex - count := 0 - defer func() { close(shipDone) }() @@ -440,7 +438,6 @@ func (a *MimecastAdapter) RunFetchLoop(apis []*API) { shipperWg.Done() }() mu.Lock() - count += len(events) mu.Unlock() a.submitEvents(events) }(eventsCopy) From 0ee034fbf5abafda2cef182dd67312000688edd9 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 11:10:21 -0700 Subject: [PATCH 11/21] MaxConcurrentShippers --- mimecast/client.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index b6c2b3a..b495078 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -91,12 +91,13 @@ type AuditLog struct { } type MimecastConfig struct { - ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` - ClientId string `json:"client_id" yaml:"client_id"` - ClientSecret string `json:"client_secret" yaml:"client_secret"` - BaseURL string `json:"base_url,omitempty" yaml:"base_url,omitempty"` - InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m - MaxConcurrentWorkers int `json:"max_concurrent_workers,omitempty" yaml:"max_concurrent_workers,omitempty"` + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + ClientId string `json:"client_id" yaml:"client_id"` + ClientSecret string `json:"client_secret" yaml:"client_secret"` + BaseURL string `json:"base_url,omitempty" yaml:"base_url,omitempty"` + InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m + MaxConcurrentWorkers int `json:"max_concurrent_workers,omitempty" yaml:"max_concurrent_workers,omitempty"` + MaxConcurrentShippers int `json:"max_concurrent_shippers,omitempty" yaml:"max_concurrent_shippers,omitempty"` } func (c *MimecastConfig) Validate() error { @@ -134,6 +135,12 @@ func (c *MimecastConfig) Validate() error { return fmt.Errorf("max_concurrent_workers cannot exceed 100, got %d", c.MaxConcurrentWorkers) } + if c.MaxConcurrentShippers == 0 { + c.MaxConcurrentShippers = 2 // Default + } else if c.MaxConcurrentShippers > 100 { + return fmt.Errorf("max_concurrent_shippers cannot exceed 100, got %d", c.MaxConcurrentShippers) + } + return nil } @@ -372,7 +379,7 @@ func (a *MimecastAdapter) shouldShutdown(apis []*API) bool { func (a *MimecastAdapter) RunFetchLoop(apis []*API) { cycleSem := make(chan struct{}, 1) - shipperSem := make(chan struct{}, 2) + shipperSem := make(chan struct{}, a.conf.MaxConcurrentShippers) workerSem := make(chan struct{}, a.conf.MaxConcurrentWorkers) ticker := time.NewTicker(queryInterval * time.Second) defer ticker.Stop() From 4f5fffa41cb3c712ed21f80d5d2a865f1f6438db Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 11:13:29 -0700 Subject: [PATCH 12/21] Redundant querySucceeded Flag --- mimecast/client.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index b495078..4d53d7e 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -538,7 +538,6 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils var start string var retryCount int var retryableErrorCount int - var querySucceeded bool // Track if we successfully completed the query api.mu.Lock() start = api.since.UTC().Format(time.RFC3339) @@ -862,17 +861,11 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils } } - // Mark query as successful - we completed pagination successfully - querySucceeded = true - // Update api.since to the end time of this request window - // This ensures we don't miss any events in future requests - // ONLY update if query succeeded - if querySucceeded { - api.mu.Lock() - api.since = cycleTime.Add(-overlapPeriod) - api.mu.Unlock() - } + // This ensures we don't miss any events in future requests + api.mu.Lock() + api.since = cycleTime.Add(-overlapPeriod) + api.mu.Unlock() // Cull old dedupe entries - keep entries from the last lookback period // to handle duplicates during the overlap window From 07dc737fdbac11b15a1aca5a137bfd138c664227 Mon Sep 17 00:00:00 2001 From: Kiersten Gross <72712706+RagingRedRiot@users.noreply.github.com> Date: Mon, 24 Nov 2025 11:15:15 -0700 Subject: [PATCH 13/21] Initial Lookback Validation Removal Co-authored-by: Maxime Lamothe-Brassard --- mimecast/client.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 4d53d7e..7d6afcf 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -118,16 +118,6 @@ func (c *MimecastConfig) Validate() error { // UK Specific - Ensures data stays within the UK instance: https://uk-api.services.mimecast.com } - if c.InitialLookback == 0 { - c.InitialLookback = 24 * time.Hour // Default - } else { - if c.InitialLookback < 1*time.Minute { - return fmt.Errorf("initial_lookback must be at least 1 minute, got %s", c.InitialLookback) - } - if c.InitialLookback > 30*24*time.Hour { // 30 days - return fmt.Errorf("initial_lookback must be less than 30 days, got %s", c.InitialLookback) - } - } if c.MaxConcurrentWorkers == 0 { c.MaxConcurrentWorkers = 10 // Default From 87fc0246203efa78eec496684bccd7f07abc39b0 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 24 Nov 2025 12:13:36 -0700 Subject: [PATCH 14/21] Remove unused variable (dead code) --- mimecast/client.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 7d6afcf..a080248 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -415,7 +415,6 @@ func (a *MimecastAdapter) RunFetchLoop(apis []*API) { // shipper routine go func() { var shipperWg sync.WaitGroup - var mu sync.Mutex defer func() { close(shipDone) @@ -434,8 +433,6 @@ func (a *MimecastAdapter) RunFetchLoop(apis []*API) { <-shipperSem shipperWg.Done() }() - mu.Lock() - mu.Unlock() a.submitEvents(events) }(eventsCopy) } From c95298f1de6550b8d50490a43223b650fb439b31 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Wed, 10 Dec 2025 16:27:32 -0800 Subject: [PATCH 15/21] Fix multiple issues in Mimecast adapter rewrite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix error variable shadowing bug where err from io.ReadAll was being shadowed by err from strconv.Atoi/http.ParseTime - Fix mutex contention by not holding tokenMu during HTTP calls in refreshOAuthToken - Fix silent error ignore in submitEvents for non-ErrorBufferFull errors - Fix potential deadlock by using context cancellation instead of calling Close() from within fetch loop goroutines - Fix tight loop when Retry-After time has already passed by adding minimum 1 second sleep - Fix 5xx errors being swallowed - now properly returns error so api.since won't be updated and data won't be lost - Fix struct tag alignment inconsistencies in MimecastConfig - Fix generateLogHash to use JSON marshaling for deterministic hashing of complex/nested values - Add shutdown check in submitEvents loop 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- mimecast/client.go | 110 ++++++++++++++++++++++++++++----------------- 1 file changed, 70 insertions(+), 40 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index a080248..acd4234 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -91,13 +91,13 @@ type AuditLog struct { } type MimecastConfig struct { - ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` - ClientId string `json:"client_id" yaml:"client_id"` - ClientSecret string `json:"client_secret" yaml:"client_secret"` - BaseURL string `json:"base_url,omitempty" yaml:"base_url,omitempty"` - InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m - MaxConcurrentWorkers int `json:"max_concurrent_workers,omitempty" yaml:"max_concurrent_workers,omitempty"` - MaxConcurrentShippers int `json:"max_concurrent_shippers,omitempty" yaml:"max_concurrent_shippers,omitempty"` + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + ClientId string `json:"client_id" yaml:"client_id"` + ClientSecret string `json:"client_secret" yaml:"client_secret"` + BaseURL string `json:"base_url,omitempty" yaml:"base_url,omitempty"` + InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m + MaxConcurrentWorkers int `json:"max_concurrent_workers,omitempty" yaml:"max_concurrent_workers,omitempty"` + MaxConcurrentShippers int `json:"max_concurrent_shippers,omitempty" yaml:"max_concurrent_shippers,omitempty"` } func (c *MimecastConfig) Validate() error { @@ -208,10 +208,9 @@ func (a *MimecastAdapter) getAuthHeaders(ctx context.Context) (map[string]string "Content-Type": "application/json", }, nil } - - defer a.tokenMu.Unlock() + a.tokenMu.Unlock() - // Need to get a new token + // Need to get a new token - refreshOAuthToken handles its own locking return a.refreshOAuthToken(ctx) } @@ -258,10 +257,13 @@ func (a *MimecastAdapter) refreshOAuthToken(ctx context.Context) (map[string]str gracePeriod := 60 if tokenResp.ExpiresIn < gracePeriod { - gracePeriod = 0 // or tokenResp.ExpiresIn / 2, or some other logic + gracePeriod = 0 // or tokenResp.ExpiresIn / 2, or some other logic } + + a.tokenMu.Lock() a.oauthToken = tokenResp.AccessToken a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-gracePeriod) * time.Second) + a.tokenMu.Unlock() return map[string]string{ "Authorization": fmt.Sprintf("Bearer %s", tokenResp.AccessToken), @@ -400,7 +402,9 @@ func (a *MimecastAdapter) RunFetchLoop(apis []*API) { // If no APIs are active due to forbidden messages, shutdown if a.shouldShutdown(apis) { - a.Close() + // Signal shutdown via context cancellation instead of calling Close() + // to avoid deadlock (Close waits for fetch loop which we're inside) + a.cancel() return } @@ -609,6 +613,9 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils defer resp.Body.Close() respBody, err = io.ReadAll(resp.Body) + if err != nil { + return err + } status = resp.StatusCode ra := resp.Header.Get("Retry-After") @@ -616,18 +623,14 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils if ra == "" { retryAfterInt = 0 retryAfterTime = time.Time{} - } else if secs, err := strconv.Atoi(ra); err == nil { + } else if secs, parseErr := strconv.Atoi(ra); parseErr == nil { retryAfterInt = secs retryAfterTime = time.Time{} - } else if t, err := http.ParseTime(ra); err == nil { + } else if t, parseErr := http.ParseTime(ra); parseErr == nil { retryAfterInt = 0 retryAfterTime = t } - if err != nil { - return err - } - return nil }() @@ -644,8 +647,15 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils } return nil, err } - } else if retryAfterTime.Before(time.Now()) { - continue + } else if retryAfterTime.Before(time.Now()) { + // Retry-After time already passed, wait a minimum of 1 second to avoid tight loop + if err := sleepContext(a.ctx, 1*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + continue } else if !retryAfterTime.IsZero() { retryUntilTime := time.Until(retryAfterTime).Seconds() a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header with time %v, sleeping %vs before retry", retryAfterTime, retryUntilTime)) @@ -702,12 +712,12 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils if status >= 500 && status < 600 { err := fmt.Errorf("mimecast server error: %d\nRESPONSE: %s", status, string(respBody)) a.conf.ClientOptions.OnError(err) - // We don't want this to be handled like an error - // The hope is these errors are temporary + // Return collected items with error so caller knows about the failure + // and api.since won't be updated (preventing data loss on retry) if len(allItems) > 0 { - return allItems, nil + return allItems, err } - return nil, nil + return nil, err } if status != http.StatusOK { err := fmt.Errorf("mimecast api non-200: %d\nRESPONSE: %s", status, string(respBody)) @@ -910,25 +920,40 @@ func (a *MimecastAdapter) processLogItem(api *API, logMap map[string]interface{} } func (a *MimecastAdapter) generateLogHash(logMap map[string]interface{}) string { - // Extract and sort keys - keys := make([]string, 0, len(logMap)) - for k := range logMap { - keys = append(keys, k) - } - sort.Strings(keys) - - // Build deterministic string representation - var buf bytes.Buffer - for _, k := range keys { - fmt.Fprintf(&buf, "%s:%v|", k, logMap[k]) - } - - hash := sha256.Sum256(buf.Bytes()) - return hex.EncodeToString(hash[:]) + // Extract and sort keys + keys := make([]string, 0, len(logMap)) + for k := range logMap { + keys = append(keys, k) + } + sort.Strings(keys) + + // Build deterministic string representation using JSON for complex values + var buf bytes.Buffer + for _, k := range keys { + v := logMap[k] + // Use JSON marshaling for deterministic representation of complex types + valueBytes, err := json.Marshal(v) + if err != nil { + // Fallback to fmt if JSON fails + fmt.Fprintf(&buf, "%s:%v|", k, v) + } else { + fmt.Fprintf(&buf, "%s:%s|", k, valueBytes) + } + } + + hash := sha256.Sum256(buf.Bytes()) + return hex.EncodeToString(hash[:]) } func (a *MimecastAdapter) submitEvents(events []utils.Dict) { for _, item := range events { + // Check if we're shutting down + select { + case <-a.ctx.Done(): + return + default: + } + msg := &protocol.DataMessage{ JsonPayload: item, TimestampMs: uint64(time.Now().UnixNano() / int64(time.Millisecond)), @@ -938,10 +963,15 @@ func (a *MimecastAdapter) submitEvents(events []utils.Dict) { a.conf.ClientOptions.OnWarning("stream falling behind") if err := a.uspClient.Ship(msg, 1*time.Hour); err != nil { a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) - a.Close() + // Signal shutdown via context cancellation instead of calling Close() + // to avoid deadlock (Close waits for fetch loop which spawned us) + a.cancel() return } + continue } + // Handle non-ErrorBufferFull errors + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) } } } From af7d98a68b834e7d519a5b3caad3d8bd0d3fd81d Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Wed, 10 Dec 2025 19:23:26 -0800 Subject: [PATCH 16/21] Fix zero time check causing incorrect rate limit handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When there was no Retry-After header or it couldn't be parsed, retryAfterTime remained the zero value. The condition retryAfterTime.Before(time.Now()) was always true for the zero value (year 0001 is before current time), causing the code to incorrectly enter the "time already passed" branch (1s wait) instead of the "no header" branch (60s wait). Fix by checking !retryAfterTime.IsZero() before the Before check and restructure the conditions for clarity. Also added comment documenting that InitialLookback defaults to zero. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- mimecast/client.go | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index acd4234..62ec650 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -118,6 +118,7 @@ func (c *MimecastConfig) Validate() error { // UK Specific - Ensures data stays within the UK instance: https://uk-api.services.mimecast.com } + // InitialLookback defaults to zero (current time, no lookback) if c.MaxConcurrentWorkers == 0 { c.MaxConcurrentWorkers = 10 // Default @@ -640,6 +641,7 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils if status == http.StatusTooManyRequests { if retryAfterInt != 0 { + // Retry-After header with integer seconds value a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header, sleeping %ds before retry", retryAfterInt)) if err := sleepContext(a.ctx, time.Duration(retryAfterInt)*time.Second); err != nil { if len(allItems) > 0 { @@ -647,25 +649,28 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils } return nil, err } - } else if retryAfterTime.Before(time.Now()) { - // Retry-After time already passed, wait a minimum of 1 second to avoid tight loop - if err := sleepContext(a.ctx, 1*time.Second); err != nil { - if len(allItems) > 0 { - return allItems, err - } - return nil, err - } - continue } else if !retryAfterTime.IsZero() { - retryUntilTime := time.Until(retryAfterTime).Seconds() - a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header with time %v, sleeping %vs before retry", retryAfterTime, retryUntilTime)) - if err := sleepContext(a.ctx, time.Duration(retryUntilTime)*time.Second); err != nil { - if len(allItems) > 0 { - return allItems, err + // Retry-After header with HTTP-date value + if retryAfterTime.Before(time.Now()) { + // Retry-After time already passed, wait a minimum of 1 second to avoid tight loop + if err := sleepContext(a.ctx, 1*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + } else { + retryUntilTime := time.Until(retryAfterTime).Seconds() + a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header with time %v, sleeping %vs before retry", retryAfterTime, retryUntilTime)) + if err := sleepContext(a.ctx, time.Duration(retryUntilTime)*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err } - return nil, err } } else { + // No Retry-After header, use default 60s wait a.conf.ClientOptions.OnWarning("makeOneRequest got 429 without 'Retry-After' header, sleeping 60s before retry") if err := sleepContext(a.ctx, 60*time.Second); err != nil { if len(allItems) > 0 { From 27700ff57c5d486c6f23dee23f9b21cfeb3a7a75 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Thu, 11 Dec 2025 17:47:40 -0800 Subject: [PATCH 17/21] Refactor Mimecast adapter: simplify RunFetchLoop and fix bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace complex nested goroutine structure with errgroup for cleaner concurrency control and automatic cancellation propagation - Fix data race in shouldShutdown() by using api.IsActive() instead of direct field access - Fix token refresh race condition with double-checked locking - Fix Retry-After duration truncation by using time.Duration directly The refactored RunFetchLoop is ~75 lines shorter and eliminates: - 3 levels of nested goroutines - 4 coordination channels (cycleSem, shipperSem, shipCh, shipDone) - Multiple early exit paths that could leak goroutines 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- mimecast/client.go | 211 +++++++++++++++------------------------------ 1 file changed, 68 insertions(+), 143 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 62ec650..5ca093c 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -21,6 +21,7 @@ import ( "github.com/refractionPOINT/go-uspclient" "github.com/refractionPOINT/go-uspclient/protocol" "github.com/refractionPOINT/usp-adapters/utils" + "golang.org/x/sync/errgroup" ) const ( @@ -217,6 +218,19 @@ func (a *MimecastAdapter) getAuthHeaders(ctx context.Context) (map[string]string // refreshOAuthToken gets a new OAuth 2.0 access token func (a *MimecastAdapter) refreshOAuthToken(ctx context.Context) (map[string]string, error) { + // Double-check: another goroutine may have refreshed while we waited + a.tokenMu.Lock() + if a.oauthToken != "" && time.Now().Before(a.tokenExpiry) { + token := a.oauthToken + a.tokenMu.Unlock() + return map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", token), + "Accept": "application/json", + "Content-Type": "application/json", + }, nil + } + a.tokenMu.Unlock() + tokenURL := a.conf.BaseURL + "/oauth/token" // Prepare form data @@ -298,12 +312,6 @@ func (api *API) SetInactive() { api.active = false } -// Returned results from fetch routines -type routineResult struct { - key string - items []utils.Dict - err error -} func (a *MimecastAdapter) fetchEvents() { APIs := []*API{ @@ -362,7 +370,7 @@ func (a *MimecastAdapter) fetchEvents() { func (a *MimecastAdapter) shouldShutdown(apis []*API) bool { // If no APIs are active due to 'Forbidden' messages, shutdown for _, api := range apis { - if api.active { + if api.IsActive() { return false } } @@ -371,158 +379,75 @@ func (a *MimecastAdapter) shouldShutdown(apis []*API) bool { } func (a *MimecastAdapter) RunFetchLoop(apis []*API) { - cycleSem := make(chan struct{}, 1) - shipperSem := make(chan struct{}, a.conf.MaxConcurrentShippers) - workerSem := make(chan struct{}, a.conf.MaxConcurrentWorkers) ticker := time.NewTicker(queryInterval * time.Second) defer ticker.Stop() + defer a.fetchOnce.Do(func() { close(a.chFetchLoop) }) for { select { case <-a.ctx.Done(): a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", a.conf.BaseURL)) - - // Wait for any ongoing cycle to complete with timeout - select { - case cycleSem <- struct{}{}: - // No cycle in progress, safe to exit - <-cycleSem - case <-time.After(30 * time.Second): - a.conf.ClientOptions.OnWarning("timeout waiting for fetch cycle to complete during shutdown") - } - - // Fetch loop isn't running, safe to close - a.fetchOnce.Do(func() { close(a.chFetchLoop) }) return case <-ticker.C: - select { - case cycleSem <- struct{}{}: - go func() { - // Hold cycle semaphore until cycle completes - defer func() { <-cycleSem }() - - // If no APIs are active due to forbidden messages, shutdown - if a.shouldShutdown(apis) { - // Signal shutdown via context cancellation instead of calling Close() - // to avoid deadlock (Close waits for fetch loop which we're inside) - a.cancel() - return - } - - // Capture current time once for all APIs in this cycle - cycleTime := time.Now() - - // Communication channel to send results to shipper routine - shipCh := make(chan []utils.Dict) - // Used to flag when the shipper routine is done. - shipDone := make(chan struct{}) - - // shipper routine - go func() { - var shipperWg sync.WaitGroup - - defer func() { - close(shipDone) - }() - - // shipper routine will run until shipCh closes - for events := range shipCh { - eventsCopy := events - // Consume a slot when spinning up a shipper routine - shipperSem <- struct{}{} - shipperWg.Add(1) - go func(events []utils.Dict) { - // Release a slot when done shipping - // Decrement shipperWg - defer func() { - <-shipperSem - shipperWg.Done() - }() - a.submitEvents(events) - }(eventsCopy) - } - shipperWg.Wait() - }() - - // Channel for returning fetch data and checking for errors - resultCh := make(chan routineResult) - - var wg sync.WaitGroup - - // fetchApi routines - for i := range apis { - // Check if signal to close has been sent before starting any fetches - select { - case <-a.ctx.Done(): - return - default: - } - // If the current api is disabled due to forbidden message, skip - if !apis[i].IsActive() { - continue - } - workerSem <- struct{}{} - wg.Add(1) - go func(api *API) { - defer func() { - <-workerSem - wg.Done() - }() - a.fetchApi(api, cycleTime, resultCh) - }(apis[i]) - } - - go func() { - wg.Wait() - // Wait until all fetch goroutines are done to close the channel - close(resultCh) - }() - - // Blocking while fetchApi routines collect events - // Events are passed off as they come in to the shipper routine - for res := range resultCh { - if res.err != nil { - a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", res.key, res.err)) - continue - } - if len(res.items) > 0 { - shipCh <- res.items - } - } - - // resultCh has closed, meaning all events have been pooled for shipping - close(shipCh) - - // Wait until shipping is done - <-shipDone - }() - default: - a.conf.ClientOptions.OnWarning("previous fetch cycle is still in progress, skipping this cycle") + if err := a.runOneCycle(apis); err != nil { + if a.ctx.Err() != nil { + return // Context cancelled, exit gracefully + } + // All APIs disabled, shutdown + a.cancel() + return } } } } -func (a *MimecastAdapter) fetchApi(api *API, cycleTime time.Time, resultCh chan<- routineResult) { - fetchCtx, cancelFetch := context.WithCancel(a.ctx) - defer cancelFetch() - - // Check for a close signal - select { - case <-fetchCtx.Done(): - return - default: +func (a *MimecastAdapter) runOneCycle(apis []*API) error { + if a.shouldShutdown(apis) { + return fmt.Errorf("all APIs disabled") } - items, err := a.makeOneRequest(api, cycleTime) - if err != nil { - resultCh <- routineResult{api.key, nil, err} - return + cycleTime := time.Now() + g, ctx := errgroup.WithContext(a.ctx) + g.SetLimit(a.conf.MaxConcurrentWorkers) + + // Buffered channel to collect results from workers + resultCh := make(chan []utils.Dict, len(apis)) + + // Launch fetch workers + for _, api := range apis { + if !api.IsActive() { + continue + } + api := api // capture for goroutine + g.Go(func() error { + items, err := a.makeOneRequest(api, cycleTime) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.key, err)) + return nil // Don't fail the whole group for one API error + } + if len(items) > 0 { + select { + case resultCh <- items: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) } - if len(items) > 0 { - resultCh <- routineResult{api.key, items, nil} + // Close resultCh when all fetchers are done + go func() { + g.Wait() + close(resultCh) + }() + + // Ship results as they arrive + for items := range resultCh { + a.submitEvents(items) } + + return g.Wait() } func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils.Dict, error) { @@ -660,9 +585,9 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils return nil, err } } else { - retryUntilTime := time.Until(retryAfterTime).Seconds() - a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header with time %v, sleeping %vs before retry", retryAfterTime, retryUntilTime)) - if err := sleepContext(a.ctx, time.Duration(retryUntilTime)*time.Second); err != nil { + retryDuration := time.Until(retryAfterTime) + a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header with time %v, sleeping %v before retry", retryAfterTime, retryDuration)) + if err := sleepContext(a.ctx, retryDuration); err != nil { if len(allItems) > 0 { return allItems, err } From 62a8716bd61227e2d8e317577327796a8daf3407 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Thu, 11 Dec 2025 18:02:19 -0800 Subject: [PATCH 18/21] Code review fixes: retry limits, singleflight, cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove unused MaxConcurrentShippers config field - Remove unused AuditLog type - Add 1-hour retry deadline for 429 rate limiting - Add 5XX retry with exponential backoff (30s-5m), 1h max - Use singleflight for token refresh to prevent thundering herd - Extend dedupe cleanup window from 60s to 1 hour - Fix minor style issues (indentation, blank lines) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- mimecast/client.go | 187 ++++++++++++++++++++++++--------------------- 1 file changed, 102 insertions(+), 85 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 5ca093c..7b89cd8 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -22,6 +22,7 @@ import ( "github.com/refractionPOINT/go-uspclient/protocol" "github.com/refractionPOINT/usp-adapters/utils" "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" ) const ( @@ -41,9 +42,10 @@ type MimecastAdapter struct { ctx context.Context cancel context.CancelFunc - oauthToken string - tokenExpiry time.Time - tokenMu sync.Mutex + oauthToken string + tokenExpiry time.Time + tokenMu sync.Mutex + tokenRefreshGroup singleflight.Group } type AuditRequest struct { @@ -82,23 +84,13 @@ type Pagination struct { Next string `json:"next"` } -type AuditLog struct { - ID string `json:"id"` - AuditType string `json:"auditType"` - User string `json:"user"` - EventTime string `json:"eventTime"` - EventInfo string `json:"eventInfo"` - Category string `json:"category"` -} - type MimecastConfig struct { ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` ClientId string `json:"client_id" yaml:"client_id"` ClientSecret string `json:"client_secret" yaml:"client_secret"` BaseURL string `json:"base_url,omitempty" yaml:"base_url,omitempty"` InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m - MaxConcurrentWorkers int `json:"max_concurrent_workers,omitempty" yaml:"max_concurrent_workers,omitempty"` - MaxConcurrentShippers int `json:"max_concurrent_shippers,omitempty" yaml:"max_concurrent_shippers,omitempty"` + MaxConcurrentWorkers int `json:"max_concurrent_workers,omitempty" yaml:"max_concurrent_workers,omitempty"` } func (c *MimecastConfig) Validate() error { @@ -124,13 +116,7 @@ func (c *MimecastConfig) Validate() error { if c.MaxConcurrentWorkers == 0 { c.MaxConcurrentWorkers = 10 // Default } else if c.MaxConcurrentWorkers > 100 { - return fmt.Errorf("max_concurrent_workers cannot exceed 100, got %d", c.MaxConcurrentWorkers) - } - - if c.MaxConcurrentShippers == 0 { - c.MaxConcurrentShippers = 2 // Default - } else if c.MaxConcurrentShippers > 100 { - return fmt.Errorf("max_concurrent_shippers cannot exceed 100, got %d", c.MaxConcurrentShippers) + return fmt.Errorf("max_concurrent_workers cannot exceed 100, got %d", c.MaxConcurrentWorkers) } return nil @@ -216,75 +202,82 @@ func (a *MimecastAdapter) getAuthHeaders(ctx context.Context) (map[string]string return a.refreshOAuthToken(ctx) } -// refreshOAuthToken gets a new OAuth 2.0 access token +// refreshOAuthToken gets a new OAuth 2.0 access token using singleflight to prevent thundering herd func (a *MimecastAdapter) refreshOAuthToken(ctx context.Context) (map[string]string, error) { - // Double-check: another goroutine may have refreshed while we waited - a.tokenMu.Lock() - if a.oauthToken != "" && time.Now().Before(a.tokenExpiry) { - token := a.oauthToken + // singleflight ensures only one token refresh happens at a time + result, err, _ := a.tokenRefreshGroup.Do("token", func() (interface{}, error) { + // Check if token was refreshed while waiting for singleflight + a.tokenMu.Lock() + if a.oauthToken != "" && time.Now().Before(a.tokenExpiry) { + token := a.oauthToken + a.tokenMu.Unlock() + return map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", token), + "Accept": "application/json", + "Content-Type": "application/json", + }, nil + } a.tokenMu.Unlock() - return map[string]string{ - "Authorization": fmt.Sprintf("Bearer %s", token), - "Accept": "application/json", - "Content-Type": "application/json", - }, nil - } - a.tokenMu.Unlock() - tokenURL := a.conf.BaseURL + "/oauth/token" + tokenURL := a.conf.BaseURL + "/oauth/token" - // Prepare form data - data := url.Values{} - data.Set("client_id", a.conf.ClientId) - data.Set("client_secret", a.conf.ClientSecret) - data.Set("grant_type", "client_credentials") + data := url.Values{} + data.Set("client_id", a.conf.ClientId) + data.Set("client_secret", a.conf.ClientSecret) + data.Set("grant_type", "client_credentials") - loopCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() + loopCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() - req, err := http.NewRequestWithContext(loopCtx, "POST", tokenURL, strings.NewReader(data.Encode())) - if err != nil { - return nil, fmt.Errorf("failed to create token request: %w", err) - } + req, err := http.NewRequestWithContext(loopCtx, "POST", tokenURL, strings.NewReader(data.Encode())) + if err != nil { + return nil, fmt.Errorf("failed to create token request: %w", err) + } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - resp, err := a.httpClient.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to request token: %w", err) - } - defer resp.Body.Close() + resp, err := a.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to request token: %w", err) + } + defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("oauth token request failed: %d - %s", resp.StatusCode, string(body)) - } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("oauth token request failed: %d - %s", resp.StatusCode, string(body)) + } - var tokenResp struct { - AccessToken string `json:"access_token"` - ExpiresIn int `json:"expires_in"` - TokenType string `json:"token_type"` - } + var tokenResp struct { + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + TokenType string `json:"token_type"` + } - if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { - return nil, fmt.Errorf("failed to decode token response: %w", err) - } + if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { + return nil, fmt.Errorf("failed to decode token response: %w", err) + } - gracePeriod := 60 - if tokenResp.ExpiresIn < gracePeriod { - gracePeriod = 0 // or tokenResp.ExpiresIn / 2, or some other logic - } + gracePeriod := 60 + if tokenResp.ExpiresIn < gracePeriod { + gracePeriod = 0 + } - a.tokenMu.Lock() - a.oauthToken = tokenResp.AccessToken - a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-gracePeriod) * time.Second) - a.tokenMu.Unlock() + a.tokenMu.Lock() + a.oauthToken = tokenResp.AccessToken + a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-gracePeriod) * time.Second) + a.tokenMu.Unlock() - return map[string]string{ - "Authorization": fmt.Sprintf("Bearer %s", tokenResp.AccessToken), - "Accept": "application/json", - "Content-Type": "application/json", - }, nil + return map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", tokenResp.AccessToken), + "Accept": "application/json", + "Content-Type": "application/json", + }, nil + }) + + if err != nil { + return nil, err + } + return result.(map[string]string), nil } type API struct { @@ -312,7 +305,6 @@ func (api *API) SetInactive() { api.active = false } - func (a *MimecastAdapter) fetchEvents() { APIs := []*API{ { @@ -455,6 +447,8 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils var start string var retryCount int var retryableErrorCount int + var retryCount5xx int + retryDeadline := time.Now().Add(1 * time.Hour) api.mu.Lock() start = api.since.UTC().Format(time.RFC3339) @@ -565,6 +559,14 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils } if status == http.StatusTooManyRequests { + // Check if we've exceeded the 1 hour retry deadline + if time.Now().After(retryDeadline) { + err := fmt.Errorf("mimecast rate limit: exceeded 1 hour retry deadline for API %s", api.key) + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } if retryAfterInt != 0 { // Retry-After header with integer seconds value a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header, sleeping %ds before retry", retryAfterInt)) @@ -640,14 +642,29 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils return nil, nil } if status >= 500 && status < 600 { - err := fmt.Errorf("mimecast server error: %d\nRESPONSE: %s", status, string(respBody)) - a.conf.ClientOptions.OnError(err) - // Return collected items with error so caller knows about the failure - // and api.since won't be updated (preventing data loss on retry) - if len(allItems) > 0 { - return allItems, err + // Check if we've exceeded the 1 hour retry deadline + if time.Now().After(retryDeadline) { + err := fmt.Errorf("mimecast server error: %d after 1h retries\nRESPONSE: %s", status, string(respBody)) + a.conf.ClientOptions.OnError(err) + if len(allItems) > 0 { + return allItems, err + } + return nil, err } - return nil, err + // Exponential backoff: 30s, 60s, 90s, ... capped at 5 minutes + backoff := time.Duration(retryCount5xx+1) * 30 * time.Second + if backoff > 5*time.Minute { + backoff = 5 * time.Minute + } + a.conf.ClientOptions.OnWarning(fmt.Sprintf("mimecast server error %d, retrying in %v", status, backoff)) + if err := sleepContext(a.ctx, backoff); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + retryCount5xx++ + continue } if status != http.StatusOK { err := fmt.Errorf("mimecast api non-200: %d\nRESPONSE: %s", status, string(respBody)) @@ -797,7 +814,7 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils // Cull old dedupe entries - keep entries from the last lookback period // to handle duplicates during the overlap window // We can clean up regardless of query success - cutoffTime := cycleTime.Add(-2 * overlapPeriod).Unix() + cutoffTime := cycleTime.Add(-1 * time.Hour).Unix() api.mu.Lock() for k, v := range api.dedupe { if v < cutoffTime { From 7407d8a9d45481ce090b6eb6dd6fcb5e89ca17c9 Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Thu, 11 Dec 2025 18:10:21 -0800 Subject: [PATCH 19/21] Fix Close() timeout and reset retry counters between pages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Increase Close() timeout from 10s to 2min to allow in-flight HTTP requests (60s timeout) and Ship() calls to complete gracefully - Reset retry counters after each successful page fetch so each page gets a fresh retry budget 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- mimecast/client.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mimecast/client.go b/mimecast/client.go index 7b89cd8..35a0d7d 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -167,7 +167,7 @@ func (a *MimecastAdapter) Close() error { a.cancel() select { case <-a.chFetchLoop: - case <-time.After(10 * time.Second): + case <-time.After(2 * time.Minute): a.conf.ClientOptions.OnWarning("timeout waiting for fetch loop to exit; proceeding with cleanup") } err1 = a.uspClient.Drain(1 * time.Minute) @@ -797,6 +797,11 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils allItems = append(allItems, newItems...) + // Reset retry counters for next page + retryCount = 0 + retryableErrorCount = 0 + retryCount5xx = 0 + // Check if we need to make another request. if response.Meta.Pagination.Next != "" { pageToken = response.Meta.Pagination.Next From 09ae21a979076d91d5986802cc0c5783c5e986b8 Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Fri, 17 Apr 2026 15:38:26 -0600 Subject: [PATCH 20/21] Update submitEvents to use EventType in DataMessage --- mimecast/client.go | 51 ++++++++++++++++++++-------------------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 35a0d7d..4315663 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -293,6 +293,12 @@ type API struct { endTimestampField string } +// EventItem pairs a log payload with the event type LC should route it as. +type EventItem struct { + EventType string + Data utils.Dict +} + func (api *API) IsActive() bool { api.mu.Lock() defer api.mu.Unlock() @@ -403,7 +409,7 @@ func (a *MimecastAdapter) runOneCycle(apis []*API) error { g.SetLimit(a.conf.MaxConcurrentWorkers) // Buffered channel to collect results from workers - resultCh := make(chan []utils.Dict, len(apis)) + resultCh := make(chan []EventItem, len(apis)) // Launch fetch workers for _, api := range apis { @@ -442,8 +448,8 @@ func (a *MimecastAdapter) runOneCycle(apis []*API) error { return g.Wait() } -func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils.Dict, error) { - var allItems []utils.Dict +func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]EventItem, error) { + var allItems []EventItem var start string var retryCount int var retryableErrorCount int @@ -728,7 +734,7 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils // Collect items. items := response.Data - var newItems []utils.Dict + var newItems []EventItem for _, item := range items { // Check if this item has nested structure (logType -> array of logs) @@ -762,18 +768,10 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils continue } - // Create a new item with the log data plus the logType - newItem := utils.Dict{ - "logType": logType, - "eventType": api.key, - } - - // Copy all fields from the log - for k, v := range logMap { - newItem[k] = v - } - - newItems = append(newItems, newItem) + newItems = append(newItems, EventItem{ + EventType: logType, + Data: utils.Dict(logMap), + }) } } } else { @@ -782,16 +780,10 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]utils continue } - newItem := utils.Dict{ - "logType": api.key, - "eventType": api.key, - } - - for k, v := range item { - newItem[k] = v - } - - newItems = append(newItems, newItem) + newItems = append(newItems, EventItem{ + EventType: api.key, + Data: utils.Dict(item), + }) } } @@ -897,8 +889,8 @@ func (a *MimecastAdapter) generateLogHash(logMap map[string]interface{}) string return hex.EncodeToString(hash[:]) } -func (a *MimecastAdapter) submitEvents(events []utils.Dict) { - for _, item := range events { +func (a *MimecastAdapter) submitEvents(events []EventItem) { + for _, event := range events { // Check if we're shutting down select { case <-a.ctx.Done(): @@ -907,7 +899,8 @@ func (a *MimecastAdapter) submitEvents(events []utils.Dict) { } msg := &protocol.DataMessage{ - JsonPayload: item, + EventType: event.EventType, + JsonPayload: event.Data, TimestampMs: uint64(time.Now().UnixNano() / int64(time.Millisecond)), } if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { From d64c60ab84e65d84eded1724ac319bcbbc40089c Mon Sep 17 00:00:00 2001 From: Kiersten Gross Date: Mon, 20 Apr 2026 10:08:54 -0600 Subject: [PATCH 21/21] Use short-noun EventType values and explicit nestedKey unwrap Replace the knownLogTypes allowlist (which had a urlLogs typo that kept clickLogs events wrapped) with an explicit nestedKey field per API config. Every log type now unwraps consistently and ships a flat payload. EventType values switch from raw Mimecast wrapper names (auditEvents, attachmentLogs, clickLogs, impersonationLogs, dlpLogs) to short nouns (audit, attachment, url, impersonation, dlp). Audit events still prefer the event's category field when present. --- mimecast/client.go | 68 +++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/mimecast/client.go b/mimecast/client.go index 4315663..3d181e8 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -291,6 +291,14 @@ type API struct { timeField string startTimestampField string endTimestampField string + // nestedKey is the wrapper array key Mimecast uses for this endpoint + // (e.g., "attachmentLogs", "clickLogs"). Empty means the endpoint returns + // records flat in data[]. + nestedKey string + // useCategory: when true and a flat record has a string "category" field, + // use it as EventType instead of key. Only valid for audit events, whose + // native category field identifies the log subtype. + useCategory bool } // EventItem pairs a log payload with the event type LC should route it as. @@ -314,7 +322,7 @@ func (api *API) SetInactive() { func (a *MimecastAdapter) fetchEvents() { APIs := []*API{ { - key: "auditEvents", + key: "audit", endpoint: "/api/audit/get-audit-events", since: time.Now().Add(-1 * a.conf.InitialLookback), dedupe: make(map[string]int64), @@ -323,6 +331,7 @@ func (a *MimecastAdapter) fetchEvents() { timeField: "eventTime", startTimestampField: "startDateTime", endTimestampField: "endDateTime", + useCategory: true, }, { key: "attachment", @@ -332,6 +341,7 @@ func (a *MimecastAdapter) fetchEvents() { active: true, idField: "", timeField: "date", + nestedKey: "attachmentLogs", }, { key: "impersonation", @@ -341,6 +351,7 @@ func (a *MimecastAdapter) fetchEvents() { active: true, idField: "id", timeField: "eventTime", + nestedKey: "impersonationLogs", }, { key: "url", @@ -350,6 +361,7 @@ func (a *MimecastAdapter) fetchEvents() { active: true, idField: "", timeField: "date", + nestedKey: "clickLogs", }, { key: "dlp", @@ -359,6 +371,7 @@ func (a *MimecastAdapter) fetchEvents() { active: true, idField: "", timeField: "eventTime", + nestedKey: "dlpLogs", }, } @@ -737,54 +750,41 @@ func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]Event var newItems []EventItem for _, item := range items { - // Check if this item has nested structure (logType -> array of logs) - // or if it's flat structure (item IS the log itself) - // Known nested log types that contain arrays of logs - knownLogTypes := []string{"attachmentLogs", "impersonationLogs", "urlLogs", "dlpLogs"} - hasNestedStructure := false - for _, logType := range knownLogTypes { - if _, exists := item[logType]; exists { - hasNestedStructure = true - break - } - } - - if hasNestedStructure { - for logType, logsInterface := range item { - logs, ok := logsInterface.([]interface{}) - if !ok { - // Skip non-array fields (like resultCount, metadata, etc.) - continue - } - - for _, logInterface := range logs { + // If this endpoint wraps records under a known key (attachmentLogs, + // clickLogs, etc.) and the item has that wrapper, unpack each record. + if api.nestedKey != "" { + if arr, ok := item[api.nestedKey].([]interface{}); ok { + for _, logInterface := range arr { logMap, ok := logInterface.(map[string]interface{}) if !ok { continue } - - // Handle deduplication FIRST if !a.processLogItem(api, logMap) { continue } - newItems = append(newItems, EventItem{ - EventType: logType, + EventType: api.key, Data: utils.Dict(logMap), }) } - } - } else { - // Handle deduplication FIRST - if !a.processLogItem(api, item) { continue } + } - newItems = append(newItems, EventItem{ - EventType: api.key, - Data: utils.Dict(item), - }) + // Flat record. + if !a.processLogItem(api, item) { + continue + } + eventType := api.key + if api.useCategory { + if cat, ok := item["category"].(string); ok && cat != "" { + eventType = cat + } } + newItems = append(newItems, EventItem{ + EventType: eventType, + Data: utils.Dict(item), + }) } allItems = append(allItems, newItems...)