From 3eae41879fbafaf92f50b52e86613a996a59202b Mon Sep 17 00:00:00 2001 From: ireland_55 <114760354+0BL1V10N-55@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:46:00 -0600 Subject: [PATCH] updated S3 to support split array Keeper sends batch json events via a json array adding function to split array to separate json events. --- s3/client.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/s3/client.go b/s3/client.go index 36762b5..d6482bb 100644 --- a/s3/client.go +++ b/s3/client.go @@ -2,6 +2,7 @@ package usp_s3 import ( "context" + "encoding/json" "errors" "fmt" "net/http" @@ -81,6 +82,7 @@ type S3Config struct { AccessKey string `json:"access_key" yaml:"access_key"` SecretKey string `json:"secret_key,omitempty" yaml:"secret_key,omitempty"` IsOneTimeLoad bool `json:"single_load" yaml:"single_load"` + IsJsonArray bool `json:"is_json_array" yaml:"is_json_array"` Prefix string `json:"prefix" yaml:"prefix"` ParallelFetch int `json:"parallel_fetch" yaml:"parallel_fetch"` Region string `json:"region" yaml:"region"` @@ -441,6 +443,10 @@ func (a *S3Adapter) lookForFiles() (bool, error) { } func (a *S3Adapter) processEvent(data []byte, isCompressed bool) bool { + if a.conf.IsJsonArray && !isCompressed { + return a.processJsonArray(data) + } + // Since we're dealing with files, we use the // bundle payloads to avoid having to go through // the whole unmarshal+marshal roundtrip. @@ -469,3 +475,34 @@ func (a *S3Adapter) processEvent(data []byte, isCompressed bool) bool { } return true } + +func (a *S3Adapter) processJsonArray(data []byte) bool { + var items []json.RawMessage + if err := json.Unmarshal(data, &items); err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("processJsonArray: unmarshal: %v", err)) + return false + } + + for _, raw := range items { + var payload utils.Dict + if err := json.Unmarshal(raw, &payload); err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("processJsonArray: unmarshal element: %v", err)) + return false + } + msg := &protocol.DataMessage{ + JsonPayload: payload, + TimestampMs: uint64(time.Now().UnixNano() / int64(time.Millisecond)), + } + if err := a.uspClient.Ship(msg, 10*time.Second); err != nil { + if err == uspclient.ErrorBufferFull { + a.conf.ClientOptions.OnWarning("stream falling behind") + err = a.uspClient.Ship(msg, 1*time.Hour) + } + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + return false + } + } + } + return true +}