diff --git a/mimecast/client.go b/mimecast/client.go index 8724e1c..3d181e8 100644 --- a/mimecast/client.go +++ b/mimecast/client.go @@ -3,41 +3,49 @@ package usp_mimecast import ( "bytes" "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "fmt" - "io/ioutil" + "io" "net" "net/http" + "net/url" + "sort" + "strconv" + "strings" "sync" "time" "github.com/refractionPOINT/go-uspclient" "github.com/refractionPOINT/go-uspclient/protocol" "github.com/refractionPOINT/usp-adapters/utils" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" ) const ( - baseURL = "https://api.services.mimecast.com" overlapPeriod = 30 * time.Second + queryInterval = 30 // seconds ) type MimecastAdapter struct { - conf MimecastConfig - uspClient *uspclient.Client - httpClient *http.Client - - chStopped chan struct{} - wgSenders sync.WaitGroup - doStop *utils.Event - - ctx context.Context - - dedupe map[string]int64 -} - -type AuthResponse struct { - AccessToken string `json:"access_token"` + conf MimecastConfig + uspClient *uspclient.Client + httpClient *http.Client + closeOnce sync.Once + fetchOnce sync.Once + chStopped chan struct{} + chFetchLoop chan struct{} + + ctx context.Context + cancel context.CancelFunc + + oauthToken string + tokenExpiry time.Time + tokenMu sync.Mutex + tokenRefreshGroup singleflight.Group } type AuditRequest struct { @@ -51,9 +59,9 @@ type AuditEvent struct { } type ApiResponse struct { - Meta MetaData `json:"meta"` - Data []AuditLog `json:"data"` - Fail []FailureDetails `json:"fail"` + Meta MetaData `json:"meta"` + Data []map[string]interface{} `json:"data"` + Fail []FailureDetails `json:"fail"` } type FailureDetails struct { @@ -76,233 +84,716 @@ type Pagination struct { Next string `json:"next"` } -type AuditLog struct { - ID string `json:"id"` - AuditType string `json:"auditType"` - User string `json:"user"` - EventTime string `json:"eventTime"` - EventInfo string `json:"eventInfo"` - Category string `json:"category"` -} - type MimecastConfig struct { - ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` - ClientId string `json:"client_id" yaml:"client_id"` - ClientSecret string `json:"client_secret" yaml:"client_secret"` + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + ClientId string `json:"client_id" yaml:"client_id"` + ClientSecret string `json:"client_secret" yaml:"client_secret"` + BaseURL string `json:"base_url,omitempty" yaml:"base_url,omitempty"` + InitialLookback time.Duration `json:"initial_lookback,omitempty" yaml:"initial_lookback,omitempty"` // eg, 24h, 30m, 168h, 1h30m + MaxConcurrentWorkers int `json:"max_concurrent_workers,omitempty" yaml:"max_concurrent_workers,omitempty"` } func (c *MimecastConfig) Validate() error { if err := c.ClientOptions.Validate(); err != nil { - return fmt.Errorf("client_options: %v", err) + return fmt.Errorf("client_options: %v", err) // Required } + if c.ClientId == "" { - return errors.New("missing client id") + return errors.New("missing client id") // Required } if c.ClientSecret == "" { - return errors.New("missing client secret") + return errors.New("missing client secret") // Required + } + + if c.BaseURL == "" { + c.BaseURL = "https://api.services.mimecast.com" // Default Global - May process data in other regions during failover + // US Specific - Ensures data stays within the US instance: https://us-api.services.mimecast.com + // UK Specific - Ensures data stays within the UK instance: https://uk-api.services.mimecast.com + } + + // InitialLookback defaults to zero (current time, no lookback) + + if c.MaxConcurrentWorkers == 0 { + c.MaxConcurrentWorkers = 10 // Default + } else if c.MaxConcurrentWorkers > 100 { + return fmt.Errorf("max_concurrent_workers cannot exceed 100, got %d", c.MaxConcurrentWorkers) } return nil } func NewMimecastAdapter(ctx context.Context, conf MimecastConfig) (*MimecastAdapter, chan struct{}, error) { + if err := conf.Validate(); err != nil { + return nil, nil, err + } + var err error + ctxChild, cancel := context.WithCancel(ctx) a := &MimecastAdapter{ conf: conf, - ctx: context.Background(), - doStop: utils.NewEvent(), - dedupe: make(map[string]int64), + ctx: ctxChild, + cancel: cancel, } - a.uspClient, err = uspclient.NewClient(ctx, conf.ClientOptions) + a.uspClient, err = uspclient.NewClient(ctxChild, conf.ClientOptions) if err != nil { return nil, nil, err } a.httpClient = &http.Client{ - Timeout: 30 * time.Second, + Timeout: 60 * time.Second, Transport: &http.Transport{ - Dial: (&net.Dialer{ - Timeout: 10 * time.Second, - }).Dial, + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 90 * time.Second, }, } a.chStopped = make(chan struct{}) + a.chFetchLoop = make(chan struct{}) - a.wgSenders.Add(1) go a.fetchEvents() - go func() { - a.wgSenders.Wait() - close(a.chStopped) - }() - return a, a.chStopped, nil } func (a *MimecastAdapter) Close() error { a.conf.ClientOptions.DebugLog("closing") - a.doStop.Set() - a.wgSenders.Wait() - err1 := a.uspClient.Drain(1 * time.Minute) - _, err2 := a.uspClient.Close() - a.httpClient.CloseIdleConnections() + var err1, err2 error + a.closeOnce.Do(func() { + a.cancel() + select { + case <-a.chFetchLoop: + case <-time.After(2 * time.Minute): + a.conf.ClientOptions.OnWarning("timeout waiting for fetch loop to exit; proceeding with cleanup") + } + err1 = a.uspClient.Drain(1 * time.Minute) + _, err2 = a.uspClient.Close() + a.httpClient.CloseIdleConnections() + close(a.chStopped) + }) if err1 != nil { return err1 } - return err2 } +// getAuthHeaders gets valid OAuth bearer token headers +func (a *MimecastAdapter) getAuthHeaders(ctx context.Context) (map[string]string, error) { + // Check if we have a valid token + a.tokenMu.Lock() + if a.oauthToken != "" && time.Now().Before(a.tokenExpiry) { + token := a.oauthToken + a.tokenMu.Unlock() + + return map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", token), + "Accept": "application/json", + "Content-Type": "application/json", + }, nil + } + a.tokenMu.Unlock() + + // Need to get a new token - refreshOAuthToken handles its own locking + return a.refreshOAuthToken(ctx) +} + +// refreshOAuthToken gets a new OAuth 2.0 access token using singleflight to prevent thundering herd +func (a *MimecastAdapter) refreshOAuthToken(ctx context.Context) (map[string]string, error) { + // singleflight ensures only one token refresh happens at a time + result, err, _ := a.tokenRefreshGroup.Do("token", func() (interface{}, error) { + // Check if token was refreshed while waiting for singleflight + a.tokenMu.Lock() + if a.oauthToken != "" && time.Now().Before(a.tokenExpiry) { + token := a.oauthToken + a.tokenMu.Unlock() + return map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", token), + "Accept": "application/json", + "Content-Type": "application/json", + }, nil + } + a.tokenMu.Unlock() + + tokenURL := a.conf.BaseURL + "/oauth/token" + + data := url.Values{} + data.Set("client_id", a.conf.ClientId) + data.Set("client_secret", a.conf.ClientSecret) + data.Set("grant_type", "client_credentials") + + loopCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(loopCtx, "POST", tokenURL, strings.NewReader(data.Encode())) + if err != nil { + return nil, fmt.Errorf("failed to create token request: %w", err) + } + + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + resp, err := a.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to request token: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("oauth token request failed: %d - %s", resp.StatusCode, string(body)) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + TokenType string `json:"token_type"` + } + + if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { + return nil, fmt.Errorf("failed to decode token response: %w", err) + } + + gracePeriod := 60 + if tokenResp.ExpiresIn < gracePeriod { + gracePeriod = 0 + } + + a.tokenMu.Lock() + a.oauthToken = tokenResp.AccessToken + a.tokenExpiry = time.Now().Add(time.Duration(tokenResp.ExpiresIn-gracePeriod) * time.Second) + a.tokenMu.Unlock() + + return map[string]string{ + "Authorization": fmt.Sprintf("Bearer %s", tokenResp.AccessToken), + "Accept": "application/json", + "Content-Type": "application/json", + }, nil + }) + + if err != nil { + return nil, err + } + return result.(map[string]string), nil +} + +type API struct { + mu sync.Mutex + key string + endpoint string + since time.Time + dedupe map[string]int64 + active bool + idField string + timeField string + startTimestampField string + endTimestampField string + // nestedKey is the wrapper array key Mimecast uses for this endpoint + // (e.g., "attachmentLogs", "clickLogs"). Empty means the endpoint returns + // records flat in data[]. + nestedKey string + // useCategory: when true and a flat record has a string "category" field, + // use it as EventType instead of key. Only valid for audit events, whose + // native category field identifies the log subtype. + useCategory bool +} + +// EventItem pairs a log payload with the event type LC should route it as. +type EventItem struct { + EventType string + Data utils.Dict +} + +func (api *API) IsActive() bool { + api.mu.Lock() + defer api.mu.Unlock() + return api.active +} + +func (api *API) SetInactive() { + api.mu.Lock() + defer api.mu.Unlock() + api.active = false +} + func (a *MimecastAdapter) fetchEvents() { - defer a.wgSenders.Done() - defer a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", baseURL)) + APIs := []*API{ + { + key: "audit", + endpoint: "/api/audit/get-audit-events", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "id", + timeField: "eventTime", + startTimestampField: "startDateTime", + endTimestampField: "endDateTime", + useCategory: true, + }, + { + key: "attachment", + endpoint: "/api/ttp/attachment/get-logs", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "", + timeField: "date", + nestedKey: "attachmentLogs", + }, + { + key: "impersonation", + endpoint: "/api/ttp/impersonation/get-logs", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "id", + timeField: "eventTime", + nestedKey: "impersonationLogs", + }, + { + key: "url", + endpoint: "/api/ttp/url/get-logs", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "", + timeField: "date", + nestedKey: "clickLogs", + }, + { + key: "dlp", + endpoint: "/api/dlp/get-logs", + since: time.Now().Add(-1 * a.conf.InitialLookback), + dedupe: make(map[string]int64), + active: true, + idField: "", + timeField: "eventTime", + nestedKey: "dlpLogs", + }, + } - since := time.Now().Add(-400 * time.Hour) + a.RunFetchLoop(APIs) +} - for !a.doStop.WaitFor(30 * time.Second) { - // The makeOneRequest function handles error - // handling and fatal error handling. - items, newSince, _ := a.makeOneRequest(since) - since = newSince - if items == nil { - continue +func (a *MimecastAdapter) shouldShutdown(apis []*API) bool { + // If no APIs are active due to 'Forbidden' messages, shutdown + for _, api := range apis { + if api.IsActive() { + return false } + } + a.conf.ClientOptions.OnWarning("all apis are disabled due to forbidden messages. shutting down") + return true +} - for _, item := range items { - msg := &protocol.DataMessage{ - JsonPayload: item, - TimestampMs: uint64(time.Now().UnixNano() / int64(time.Millisecond)), - } - if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { - if err == uspclient.ErrorBufferFull { - a.conf.ClientOptions.OnWarning("stream falling behind") - err = a.uspClient.Ship(msg, 1*time.Hour) - } - if err == nil { - continue +func (a *MimecastAdapter) RunFetchLoop(apis []*API) { + ticker := time.NewTicker(queryInterval * time.Second) + defer ticker.Stop() + defer a.fetchOnce.Do(func() { close(a.chFetchLoop) }) + + for { + select { + case <-a.ctx.Done(): + a.conf.ClientOptions.DebugLog(fmt.Sprintf("fetching of %s events exiting", a.conf.BaseURL)) + return + case <-ticker.C: + if err := a.runOneCycle(apis); err != nil { + if a.ctx.Err() != nil { + return // Context cancelled, exit gracefully } - a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) - a.doStop.Set() + // All APIs disabled, shutdown + a.cancel() return } } } } -func (a *MimecastAdapter) makeOneRequest(since time.Time) ([]utils.Dict, time.Time, error) { - var allItems []utils.Dict - currentTime := time.Now() - var start string - var lastDetectionTime time.Time +func (a *MimecastAdapter) runOneCycle(apis []*API) error { + if a.shouldShutdown(apis) { + return fmt.Errorf("all APIs disabled") + } + + cycleTime := time.Now() + g, ctx := errgroup.WithContext(a.ctx) + g.SetLimit(a.conf.MaxConcurrentWorkers) + + // Buffered channel to collect results from workers + resultCh := make(chan []EventItem, len(apis)) + + // Launch fetch workers + for _, api := range apis { + if !api.IsActive() { + continue + } + api := api // capture for goroutine + g.Go(func() error { + items, err := a.makeOneRequest(api, cycleTime) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("%s fetch failed: %w", api.key, err)) + return nil // Don't fail the whole group for one API error + } + if len(items) > 0 { + select { + case resultCh <- items: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + + // Close resultCh when all fetchers are done + go func() { + g.Wait() + close(resultCh) + }() - if t := currentTime.Add(-overlapPeriod); t.Before(since) { - start = since.UTC().Format("2006-01-02T15:04:05-0700") - } else { - start = currentTime.Add(-overlapPeriod).UTC().Format("2006-01-02T15:04:05-0700") + // Ship results as they arrive + for items := range resultCh { + a.submitEvents(items) } - end := currentTime.UTC().Format("2006-01-02T15:04:05-0700") + + return g.Wait() +} + +func (a *MimecastAdapter) makeOneRequest(api *API, cycleTime time.Time) ([]EventItem, error) { + var allItems []EventItem + var start string + var retryCount int + var retryableErrorCount int + var retryCount5xx int + retryDeadline := time.Now().Add(1 * time.Hour) + + api.mu.Lock() + start = api.since.UTC().Format(time.RFC3339) + api.mu.Unlock() + + end := cycleTime.UTC().Format(time.RFC3339) pageToken := "" - url := baseURL + "/api/audit/get-audit-events" + url := a.conf.BaseURL + api.endpoint for { - // Prepare the request. - a.conf.ClientOptions.DebugLog(fmt.Sprintf("requesting from %s start %s end %s", url, start, end)) + var respBody []byte + var status int + var retryAfterInt int + var retryAfterTime time.Time + + var startTimestampField string + var endTimestampField string + + if api.startTimestampField == "" { + startTimestampField = "from" + } else { + startTimestampField = api.startTimestampField + } + if api.endTimestampField == "" { + endTimestampField = "to" + } else { + endTimestampField = api.endTimestampField + } auditData := map[string]interface{}{ "meta": map[string]interface{}{ "pagination": map[string]interface{}{ - "pageSize": 50, + "pageSize": 500, }, }, "data": []map[string]string{ { - "startDateTime": start, - "endDateTime": end, + startTimestampField: start, + endTimestampField: end, }, }, } if pageToken != "" { - auditData["meta"].(map[string]interface{})["pagination"].(map[string]interface{})["pageToken"] = pageToken + if meta, ok := auditData["meta"].(map[string]interface{}); ok { + if pagination, ok := meta["pagination"].(map[string]interface{}); ok { + pagination["pageToken"] = pageToken + } + } } - jsonData, err := json.Marshal(auditData) + err := func() error { + jsonData, err := json.Marshal(auditData) + if err != nil { + return err + } + + loopCtx, cancel := context.WithTimeout(a.ctx, 30*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(loopCtx, "POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + + headers, err := a.getAuthHeaders(loopCtx) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("failed to get OAuth token: %v", err)) + return err + } + + // Set all required Mimecast headers + for key, value := range headers { + req.Header.Set(key, value) + } + resp, err := a.httpClient.Do(req) + + if err != nil { + return err + } + defer resp.Body.Close() + + respBody, err = io.ReadAll(resp.Body) + if err != nil { + return err + } + + status = resp.StatusCode + ra := resp.Header.Get("Retry-After") + + if ra == "" { + retryAfterInt = 0 + retryAfterTime = time.Time{} + } else if secs, parseErr := strconv.Atoi(ra); parseErr == nil { + retryAfterInt = secs + retryAfterTime = time.Time{} + } else if t, parseErr := http.ParseTime(ra); parseErr == nil { + retryAfterInt = 0 + retryAfterTime = t + } + + return nil + }() + if err != nil { - return nil, lastDetectionTime, err + return nil, err } - req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) - if err != nil { - return nil, lastDetectionTime, err + if status == http.StatusTooManyRequests { + // Check if we've exceeded the 1 hour retry deadline + if time.Now().After(retryDeadline) { + err := fmt.Errorf("mimecast rate limit: exceeded 1 hour retry deadline for API %s", api.key) + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + if retryAfterInt != 0 { + // Retry-After header with integer seconds value + a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header, sleeping %ds before retry", retryAfterInt)) + if err := sleepContext(a.ctx, time.Duration(retryAfterInt)*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + } else if !retryAfterTime.IsZero() { + // Retry-After header with HTTP-date value + if retryAfterTime.Before(time.Now()) { + // Retry-After time already passed, wait a minimum of 1 second to avoid tight loop + if err := sleepContext(a.ctx, 1*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + } else { + retryDuration := time.Until(retryAfterTime) + a.conf.ClientOptions.OnWarning(fmt.Sprintf("makeOneRequest got 429 with 'Retry-After' header with time %v, sleeping %v before retry", retryAfterTime, retryDuration)) + if err := sleepContext(a.ctx, retryDuration); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + } + } else { + // No Retry-After header, use default 60s wait + a.conf.ClientOptions.OnWarning("makeOneRequest got 429 without 'Retry-After' header, sleeping 60s before retry") + if err := sleepContext(a.ctx, 60*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + } + // Try again after waiting 'Retry-After' value + continue } - token, err := a.getAuthToken() - if err != nil { - return nil, lastDetectionTime, err + if status == http.StatusUnauthorized { + // Clear the cached token in the ADAPTER and retry once + a.tokenMu.Lock() + a.oauthToken = "" + a.tokenExpiry = time.Time{} + a.tokenMu.Unlock() + + if retryCount < 3 { + a.conf.ClientOptions.OnWarning("received 401, clearing token cache and trying again") + retryCount++ + continue + } + err := fmt.Errorf("mimecast unauthorized: could not reauthenticate for API %s", api.key) + if len(allItems) > 0 { + return allItems, err + } + return nil, err } - req.Header.Set("Authorization", "Bearer "+token) - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - return nil, lastDetectionTime, err + if status == http.StatusForbidden { + err := fmt.Errorf("mimecast forbidden: %d\nRESPONSE: %s\nAPI '%s' will be disabled", status, string(respBody), api.key) + a.conf.ClientOptions.OnError(err) + api.SetInactive() + // Since this API is now disabled, update api.since to prevent it from getting stuck + // on this time window forever + api.mu.Lock() + api.since = cycleTime.Add(-overlapPeriod) + api.mu.Unlock() + if len(allItems) > 0 { + return allItems, nil + } + return nil, nil } - defer resp.Body.Close() - - // Evaluate if success. - if resp.StatusCode != http.StatusOK { - body, _ := ioutil.ReadAll(resp.Body) - a.conf.ClientOptions.OnError(fmt.Errorf("mimecast api non-200: %s\nREQUEST: %s\nRESPONSE: %s", resp.Status, string(body), string(body))) - return nil, lastDetectionTime, err + if status >= 500 && status < 600 { + // Check if we've exceeded the 1 hour retry deadline + if time.Now().After(retryDeadline) { + err := fmt.Errorf("mimecast server error: %d after 1h retries\nRESPONSE: %s", status, string(respBody)) + a.conf.ClientOptions.OnError(err) + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + // Exponential backoff: 30s, 60s, 90s, ... capped at 5 minutes + backoff := time.Duration(retryCount5xx+1) * 30 * time.Second + if backoff > 5*time.Minute { + backoff = 5 * time.Minute + } + a.conf.ClientOptions.OnWarning(fmt.Sprintf("mimecast server error %d, retrying in %v", status, backoff)) + if err := sleepContext(a.ctx, backoff); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + retryCount5xx++ + continue } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - a.conf.ClientOptions.OnError(fmt.Errorf("error: %v", err)) - return nil, lastDetectionTime, err + if status != http.StatusOK { + err := fmt.Errorf("mimecast api non-200: %d\nRESPONSE: %s", status, string(respBody)) + a.conf.ClientOptions.OnError(err) + if len(allItems) > 0 { + return allItems, err + } + return nil, err } // Parse the response. var response ApiResponse - //a.conf.ClientOptions.DebugLog(fmt.Sprintf("results: %s", body)) - err = json.Unmarshal(body, &response) + err = json.Unmarshal(respBody, &response) if err != nil { a.conf.ClientOptions.OnError(fmt.Errorf("mimecast api invalid json: %v", err)) - return nil, lastDetectionTime, err + return nil, err + } + + // Check for Mimecast-specific errors in the fail array + if len(response.Fail) > 0 { + var errorMessages []string + allRetryable := true + + for _, failure := range response.Fail { + for _, errDetail := range failure.Errors { + errorMessages = append(errorMessages, fmt.Sprintf("%s: %s (retryable: %v)", + errDetail.Code, errDetail.Message, errDetail.Retryable)) + if !errDetail.Retryable { + allRetryable = false + } + } + } + + if allRetryable { + retryableErrorCount++ + + if retryableErrorCount >= 5 { + a.conf.ClientOptions.OnError(fmt.Errorf("max retries exceeded for retryable errors: %v", errorMessages)) + if len(allItems) > 0 { + return allItems, fmt.Errorf("mimecast api errors after %d retries: %v", retryableErrorCount, errorMessages) + } + return nil, fmt.Errorf("mimecast api errors after %d retries: %v", retryableErrorCount, errorMessages) + } + + // Log warning and retry + a.conf.ClientOptions.OnWarning(fmt.Sprintf("mimecast api returned retryable errors: %v, retrying", errorMessages)) + + if err := sleepContext(a.ctx, 5*time.Second); err != nil { + if len(allItems) > 0 { + return allItems, err + } + return nil, err + } + continue // Retry the request + } + + // Non-retryable error - fail + a.conf.ClientOptions.OnError(fmt.Errorf("mimecast api returned non-retryable errors: %v", errorMessages)) + return nil, fmt.Errorf("mimecast api errors: %v", errorMessages) } - //responseStr, _ := json.Marshal(response) - //a.conf.ClientOptions.DebugLog(fmt.Sprintf("results: %s", responseStr)) // Collect items. items := response.Data - var newItems []utils.Dict - lastDetectionTime = since + var newItems []EventItem + for _, item := range items { - newItem := utils.Dict{ - "id": item.ID, - "auditType": item.AuditType, - "user": item.User, - "eventTime": item.EventTime, - "eventInfo": item.EventInfo, - "category": item.Category, + // If this endpoint wraps records under a known key (attachmentLogs, + // clickLogs, etc.) and the item has that wrapper, unpack each record. + if api.nestedKey != "" { + if arr, ok := item[api.nestedKey].([]interface{}); ok { + for _, logInterface := range arr { + logMap, ok := logInterface.(map[string]interface{}) + if !ok { + continue + } + if !a.processLogItem(api, logMap) { + continue + } + newItems = append(newItems, EventItem{ + EventType: api.key, + Data: utils.Dict(logMap), + }) + } + continue + } } - timestamp := item.EventTime - eventid := item.ID - if _, ok := a.dedupe[eventid]; ok { + // Flat record. + if !a.processLogItem(api, item) { continue } - epoch, _ := time.Parse(time.RFC3339, timestamp) - a.dedupe[eventid] = epoch.Unix() - newItems = append(newItems, newItem) - lastDetectionTime = epoch + eventType := api.key + if api.useCategory { + if cat, ok := item["category"].(string); ok && cat != "" { + eventType = cat + } + } + newItems = append(newItems, EventItem{ + EventType: eventType, + Data: utils.Dict(item), + }) } + allItems = append(allItems, newItems...) + // Reset retry counters for next page + retryCount = 0 + retryableErrorCount = 0 + retryCount5xx = 0 + // Check if we need to make another request. if response.Meta.Pagination.Next != "" { pageToken = response.Meta.Pagination.Next @@ -311,42 +802,133 @@ func (a *MimecastAdapter) makeOneRequest(since time.Time) ([]utils.Dict, time.Ti } } - // Cull old dedupe entries. - for k, v := range a.dedupe { - if v < time.Now().Add(-overlapPeriod).Unix() { - delete(a.dedupe, k) + // Update api.since to the end time of this request window + // This ensures we don't miss any events in future requests + api.mu.Lock() + api.since = cycleTime.Add(-overlapPeriod) + api.mu.Unlock() + + // Cull old dedupe entries - keep entries from the last lookback period + // to handle duplicates during the overlap window + // We can clean up regardless of query success + cutoffTime := cycleTime.Add(-1 * time.Hour).Unix() + api.mu.Lock() + for k, v := range api.dedupe { + if v < cutoffTime { + delete(api.dedupe, k) } } - return allItems, lastDetectionTime, nil + api.mu.Unlock() + + return allItems, nil } -func (a *MimecastAdapter) getAuthToken() (string, error) { - url := baseURL + "/oauth/token" - body := "grant_type=client_credentials&client_id=" + a.conf.ClientId + "&client_secret=" + a.conf.ClientSecret +func (a *MimecastAdapter) processLogItem(api *API, logMap map[string]interface{}) bool { + var dedupeID string - req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(body))) - if err != nil { - return "", err + // Determine the deduplication ID + if api.idField != "" { + // Try to use the specified ID field + if idVal, ok := logMap[api.idField]; ok { + if id, ok := idVal.(string); ok && id != "" { + dedupeID = id + } + } } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return "", err + // If no ID field specified or ID is empty, generate hash from log content + if dedupeID == "" { + dedupeID = a.generateLogHash(logMap) } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return "", fmt.Errorf("failed to get token, status code: %d", resp.StatusCode) + // Check for duplicates + api.mu.Lock() + if _, exists := api.dedupe[dedupeID]; exists { + api.mu.Unlock() + return false // Skip duplicate } - respBody, _ := ioutil.ReadAll(resp.Body) - var authResp AuthResponse - if err := json.Unmarshal(respBody, &authResp); err != nil { - return "", err + dedupeTimestamp := time.Now().Unix() // Default fallback + if timeVal, ok := logMap[api.timeField]; ok { + if timestamp, ok := timeVal.(string); ok { + if epoch, err := time.Parse(time.RFC3339, timestamp); err == nil { + dedupeTimestamp = epoch.Unix() + } + } + } + api.dedupe[dedupeID] = dedupeTimestamp + api.mu.Unlock() + + return true // Include this item +} + +func (a *MimecastAdapter) generateLogHash(logMap map[string]interface{}) string { + // Extract and sort keys + keys := make([]string, 0, len(logMap)) + for k := range logMap { + keys = append(keys, k) + } + sort.Strings(keys) + + // Build deterministic string representation using JSON for complex values + var buf bytes.Buffer + for _, k := range keys { + v := logMap[k] + // Use JSON marshaling for deterministic representation of complex types + valueBytes, err := json.Marshal(v) + if err != nil { + // Fallback to fmt if JSON fails + fmt.Fprintf(&buf, "%s:%v|", k, v) + } else { + fmt.Fprintf(&buf, "%s:%s|", k, valueBytes) + } + } + + hash := sha256.Sum256(buf.Bytes()) + return hex.EncodeToString(hash[:]) +} + +func (a *MimecastAdapter) submitEvents(events []EventItem) { + for _, event := range events { + // Check if we're shutting down + select { + case <-a.ctx.Done(): + return + default: + } + + msg := &protocol.DataMessage{ + EventType: event.EventType, + JsonPayload: event.Data, + TimestampMs: uint64(time.Now().UnixNano() / int64(time.Millisecond)), + } + if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { + if err == uspclient.ErrorBufferFull { + a.conf.ClientOptions.OnWarning("stream falling behind") + if err := a.uspClient.Ship(msg, 1*time.Hour); err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + // Signal shutdown via context cancellation instead of calling Close() + // to avoid deadlock (Close waits for fetch loop which spawned us) + a.cancel() + return + } + continue + } + // Handle non-ErrorBufferFull errors + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + } } +} + +func sleepContext(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() - return authResp.AccessToken, nil + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } }