1+ package expr
2+
3+ import (
4+ "context"
5+ "fmt"
6+ "strconv"
7+ "sync"
8+
9+ "github.com/RoaringBitmap/roaring"
10+ "github.com/cespare/xxhash/v2"
11+ "github.com/google/cel-go/common/operators"
12+ "github.com/ohler55/ojg/jp"
13+ )
14+
15+ // bitmapStringLookup is an optimized version of stringLookup that uses Roaring Bitmaps
16+ // for much faster set operations and reduced memory usage
17+ type bitmapStringLookup struct {
18+ // Use sharded locks to reduce contention
19+ shards [64 ]struct {
20+ mu sync.RWMutex
21+ // For each field path, store bitmaps of pause IDs that match specific values
22+ equality map [string ]map [string ]* roaring.Bitmap // fieldPath -> hashedValue -> bitmap
23+ inequality map [string ]map [string ]* roaring.Bitmap // fieldPath -> hashedValue -> bitmap
24+ in map [string ]map [string ]* roaring.Bitmap // fieldPath -> hashedValue -> bitmap
25+ }
26+
27+ // Global tracking of all fields we've seen
28+ vars map [string ]struct {}
29+ varsMu sync.RWMutex
30+
31+ // Mapping from pause ID to stored expression parts for final lookups
32+ pauseIndex map [uint32 ]* StoredExpressionPart
33+ pauseIndexMu sync.RWMutex
34+
35+ concurrency int64
36+ nextPauseID uint32
37+ idMu sync.Mutex
38+ }
39+
40+ func newBitmapStringEqualityMatcher (concurrency int64 ) MatchingEngine {
41+ engine := & bitmapStringLookup {
42+ vars : make (map [string ]struct {}),
43+ pauseIndex : make (map [uint32 ]* StoredExpressionPart ),
44+ concurrency : concurrency ,
45+ }
46+
47+ // Initialize shards
48+ for i := range engine .shards {
49+ engine .shards [i ].equality = make (map [string ]map [string ]* roaring.Bitmap )
50+ engine .shards [i ].inequality = make (map [string ]map [string ]* roaring.Bitmap )
51+ engine .shards [i ].in = make (map [string ]map [string ]* roaring.Bitmap )
52+ }
53+
54+ return engine
55+ }
56+
57+ func (b * bitmapStringLookup ) Type () EngineType {
58+ return EngineTypeStringHash
59+ }
60+
61+ func (b * bitmapStringLookup ) getShard (key string ) * struct {
62+ mu sync.RWMutex
63+ equality map [string ]map [string ]* roaring.Bitmap
64+ inequality map [string ]map [string ]* roaring.Bitmap
65+ in map [string ]map [string ]* roaring.Bitmap
66+ } {
67+ hash := xxhash .Sum64String (key )
68+ return & b .shards [hash % 64 ]
69+ }
70+
71+ func (b * bitmapStringLookup ) getNextPauseID () uint32 {
72+ b .idMu .Lock ()
73+ defer b .idMu .Unlock ()
74+ b .nextPauseID ++
75+ return b .nextPauseID
76+ }
77+
78+ func (b * bitmapStringLookup ) hash (input string ) string {
79+ ui := xxhash .Sum64String (input )
80+ return strconv .FormatUint (ui , 36 )
81+ }
82+
83+ func (b * bitmapStringLookup ) Match (ctx context.Context , input map [string ]any , result * MatchResult ) error {
84+ // Instead of doing complex bitmap operations, let's use the same logic as the original
85+ // but optimize the storage with bitmaps. We'll collect all matching pause IDs
86+ // and let the group validation logic in the main aggregator handle the filtering.
87+
88+ b .varsMu .RLock ()
89+ fieldPaths := make ([]string , 0 , len (b .vars ))
90+ for path := range b .vars {
91+ fieldPaths = append (fieldPaths , path )
92+ }
93+ b .varsMu .RUnlock ()
94+
95+ // For each field path we track, check if it exists in the input and collect matches
96+ for _ , path := range fieldPaths {
97+ shard := b .getShard (path )
98+ shard .mu .RLock ()
99+
100+ x , err := jp .ParseString (path )
101+ if err != nil {
102+ shard .mu .RUnlock ()
103+ continue
104+ }
105+
106+ res := x .Get (input )
107+ if len (res ) == 0 {
108+ res = []any {"" }
109+ }
110+
111+ switch val := res [0 ].(type ) {
112+ case string :
113+ hashedVal := b .hash (val )
114+
115+ // Check equality matches
116+ if valueMap , exists := shard .equality [path ]; exists {
117+ if bitmap , exists := valueMap [hashedVal ]; exists {
118+ b .addBitmapMatches (bitmap , result )
119+ }
120+ }
121+
122+ // Check inequality matches (all except this value)
123+ if valueMap , exists := shard .inequality [path ]; exists {
124+ for value , bitmap := range valueMap {
125+ if value != hashedVal {
126+ b .addBitmapMatches (bitmap , result )
127+ }
128+ }
129+ }
130+
131+ case []any :
132+ // Handle 'in' operations for arrays
133+ for _ , item := range val {
134+ if str , ok := item .(string ); ok {
135+ hashedVal := b .hash (str )
136+ if valueMap , exists := shard .in [path ]; exists {
137+ if bitmap , exists := valueMap [hashedVal ]; exists {
138+ b .addBitmapMatches (bitmap , result )
139+ }
140+ }
141+ }
142+ }
143+ case []string :
144+ // Handle 'in' operations for string arrays
145+ for _ , str := range val {
146+ hashedVal := b .hash (str )
147+ if valueMap , exists := shard .in [path ]; exists {
148+ if bitmap , exists := valueMap [hashedVal ]; exists {
149+ b .addBitmapMatches (bitmap , result )
150+ }
151+ }
152+ }
153+ }
154+
155+ shard .mu .RUnlock ()
156+ }
157+
158+ return nil
159+ }
160+
161+ // addBitmapMatches converts bitmap results to MatchResult format
162+ func (b * bitmapStringLookup ) addBitmapMatches (bitmap * roaring.Bitmap , result * MatchResult ) {
163+ b .pauseIndexMu .RLock ()
164+ defer b .pauseIndexMu .RUnlock ()
165+
166+ for _ , pauseID := range bitmap .ToArray () {
167+ if part , exists := b .pauseIndex [pauseID ]; exists {
168+ result .Add (part .EvaluableID , part .GroupID )
169+ }
170+ }
171+ }
172+
173+ func (b * bitmapStringLookup ) Search (ctx context.Context , variable string , input any , result * MatchResult ) {
174+ // This method is kept for interface compatibility but uses the same logic as Match
175+ testInput := map [string ]any {variable : input }
176+ b .Match (ctx , testInput , result )
177+ }
178+
179+ func (b * bitmapStringLookup ) Add (ctx context.Context , p ExpressionPart ) error {
180+ // Generate a unique pause ID for this expression part
181+ pauseID := b .getNextPauseID ()
182+
183+ // Store the mapping from pause ID to expression part
184+ b .pauseIndexMu .Lock ()
185+ b .pauseIndex [pauseID ] = p .ToStored ()
186+ b .pauseIndexMu .Unlock ()
187+
188+ // Track the variable
189+ b .varsMu .Lock ()
190+ b .vars [p .Predicate .Ident ] = struct {}{}
191+ b .varsMu .Unlock ()
192+
193+ shard := b .getShard (p .Predicate .Ident )
194+ shard .mu .Lock ()
195+ defer shard .mu .Unlock ()
196+
197+ switch p .Predicate .Operator {
198+ case operators .Equals :
199+ hashedVal := b .hash (p .Predicate .LiteralAsString ())
200+
201+ if shard .equality [p .Predicate .Ident ] == nil {
202+ shard .equality [p .Predicate .Ident ] = make (map [string ]* roaring.Bitmap )
203+ }
204+ if shard .equality [p .Predicate .Ident ][hashedVal ] == nil {
205+ shard .equality [p .Predicate .Ident ][hashedVal ] = roaring .New ()
206+ }
207+ shard .equality [p .Predicate .Ident ][hashedVal ].Add (pauseID )
208+
209+ case operators .NotEquals :
210+ hashedVal := b .hash (p .Predicate .LiteralAsString ())
211+
212+ if shard .inequality [p .Predicate .Ident ] == nil {
213+ shard .inequality [p .Predicate .Ident ] = make (map [string ]* roaring.Bitmap )
214+ }
215+ if shard .inequality [p .Predicate .Ident ][hashedVal ] == nil {
216+ shard .inequality [p .Predicate .Ident ][hashedVal ] = roaring .New ()
217+ }
218+ shard .inequality [p .Predicate .Ident ][hashedVal ].Add (pauseID )
219+
220+ case operators .In :
221+ if str , ok := p .Predicate .Literal .(string ); ok {
222+ hashedVal := b .hash (str )
223+
224+ if shard .in [p .Predicate .Ident ] == nil {
225+ shard .in [p .Predicate .Ident ] = make (map [string ]* roaring.Bitmap )
226+ }
227+ if shard .in [p .Predicate .Ident ][hashedVal ] == nil {
228+ shard .in [p .Predicate .Ident ][hashedVal ] = roaring .New ()
229+ }
230+ shard .in [p .Predicate .Ident ][hashedVal ].Add (pauseID )
231+ }
232+
233+ default :
234+ return fmt .Errorf ("BitmapStringHash engines only support string equality/inequality/in operations" )
235+ }
236+
237+ return nil
238+ }
239+
240+ func (b * bitmapStringLookup ) Remove (ctx context.Context , p ExpressionPart ) error {
241+ // Find the pause ID for this expression part
242+ var pauseIDToRemove uint32
243+ var found bool
244+
245+ b .pauseIndexMu .RLock ()
246+ for pauseID , stored := range b .pauseIndex {
247+ if p .EqualsStored (stored ) {
248+ pauseIDToRemove = pauseID
249+ found = true
250+ break
251+ }
252+ }
253+ b .pauseIndexMu .RUnlock ()
254+
255+ if ! found {
256+ return ErrExpressionPartNotFound
257+ }
258+
259+ // Remove from pause index
260+ b .pauseIndexMu .Lock ()
261+ delete (b .pauseIndex , pauseIDToRemove )
262+ b .pauseIndexMu .Unlock ()
263+
264+ shard := b .getShard (p .Predicate .Ident )
265+ shard .mu .Lock ()
266+ defer shard .mu .Unlock ()
267+
268+ switch p .Predicate .Operator {
269+ case operators .Equals :
270+ hashedVal := b .hash (p .Predicate .LiteralAsString ())
271+ if valueMap , exists := shard .equality [p .Predicate .Ident ]; exists {
272+ if bitmap , exists := valueMap [hashedVal ]; exists {
273+ bitmap .Remove (pauseIDToRemove )
274+ if bitmap .IsEmpty () {
275+ delete (valueMap , hashedVal )
276+ }
277+ }
278+ }
279+
280+ case operators .NotEquals :
281+ hashedVal := b .hash (p .Predicate .LiteralAsString ())
282+ if valueMap , exists := shard .inequality [p .Predicate .Ident ]; exists {
283+ if bitmap , exists := valueMap [hashedVal ]; exists {
284+ bitmap .Remove (pauseIDToRemove )
285+ if bitmap .IsEmpty () {
286+ delete (valueMap , hashedVal )
287+ }
288+ }
289+ }
290+
291+ case operators .In :
292+ if str , ok := p .Predicate .Literal .(string ); ok {
293+ hashedVal := b .hash (str )
294+ if valueMap , exists := shard .in [p .Predicate .Ident ]; exists {
295+ if bitmap , exists := valueMap [hashedVal ]; exists {
296+ bitmap .Remove (pauseIDToRemove )
297+ if bitmap .IsEmpty () {
298+ delete (valueMap , hashedVal )
299+ }
300+ }
301+ }
302+ }
303+
304+ default :
305+ return fmt .Errorf ("BitmapStringHash engines only support string equality/inequality/in operations" )
306+ }
307+
308+ return nil
309+ }
0 commit comments