Skip to content

Commit b1d1d3f

Browse files
authored
Merge pull request #11 from BryceWayne/refactor-memorystore-sharding-13428490407117788011
Refactor MemoryStore to use sharded storage
2 parents a268870 + 26e6da5 commit b1d1d3f

1 file changed

Lines changed: 101 additions & 41 deletions

File tree

memorystore/memorystore.go

Lines changed: 101 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ package memorystore
66

77
import (
88
"context"
9+
"hash/fnv"
910
"sync"
1011
"sync/atomic"
1112
"time"
1213

1314
"github.com/goccy/go-json"
1415
)
1516

17+
const numShards = 256
18+
1619
// item represents a single cache entry with its value and expiration time.
1720
type item struct {
1821
value []byte // Raw data stored as a byte slice
@@ -27,11 +30,18 @@ type StoreMetrics struct {
2730
Evictions int64 // Total number of items evicted (expired)
2831
}
2932

33+
type shard struct {
34+
mu sync.RWMutex
35+
store map[string]item
36+
}
37+
3038
// MemoryStore implements an in-memory cache with automatic cleanup of expired items.
3139
// It is safe for concurrent use by multiple goroutines.
3240
type MemoryStore struct {
33-
mu sync.RWMutex // Protects access to the store map
34-
store map[string]item // Internal storage for cache items
41+
// lifecycleMu protects the lifecycle state (cancelFunc)
42+
lifecycleMu sync.RWMutex
43+
44+
shards []*shard // Sharded storage
3545
ps *pubSubManager // PubSub manager for cache events
3646
ctx context.Context // Context for controlling the cleanup worker
3747
cancelFunc context.CancelFunc // Function to stop the cleanup worker
@@ -49,15 +59,29 @@ type MemoryStore struct {
4959
func NewMemoryStore() *MemoryStore {
5060
ctx, cancel := context.WithCancel(context.Background())
5161
ms := &MemoryStore{
52-
store: make(map[string]item),
62+
shards: make([]*shard, numShards),
5363
ctx: ctx,
5464
cancelFunc: cancel,
5565
}
66+
67+
for i := 0; i < numShards; i++ {
68+
ms.shards[i] = &shard{
69+
store: make(map[string]item),
70+
}
71+
}
72+
5673
ms.initPubSub()
5774
ms.startCleanupWorker()
5875
return ms
5976
}
6077

78+
// getShard returns the shard responsible for the given key.
79+
func (m *MemoryStore) getShard(key string) *shard {
80+
h := fnv.New64a()
81+
h.Write([]byte(key))
82+
return m.shards[h.Sum64()%numShards]
83+
}
84+
6185
// Stop gracefully shuts down the MemoryStore by stopping the cleanup goroutine
6286
// and releasing associated resources. After calling Stop, the store cannot be used.
6387
// Multiple calls to Stop will not cause a panic and return nil.
@@ -67,8 +91,8 @@ func NewMemoryStore() *MemoryStore {
6791
// store := NewMemoryStore()
6892
// defer store.Stop()
6993
func (m *MemoryStore) Stop() error {
70-
m.mu.Lock()
71-
defer m.mu.Unlock()
94+
m.lifecycleMu.Lock()
95+
defer m.lifecycleMu.Unlock()
7296

7397
if m.cancelFunc == nil {
7498
return nil
@@ -83,7 +107,11 @@ func (m *MemoryStore) Stop() error {
83107
m.wg.Wait()
84108

85109
// Clear the store to free up memory
86-
m.store = nil
110+
for _, s := range m.shards {
111+
s.mu.Lock()
112+
s.store = nil
113+
s.mu.Unlock()
114+
}
87115

88116
return nil
89117
}
@@ -98,8 +126,8 @@ func (m *MemoryStore) Stop() error {
98126
// return
99127
// }
100128
func (m *MemoryStore) IsStopped() bool {
101-
m.mu.RLock()
102-
defer m.mu.RUnlock()
129+
m.lifecycleMu.RLock()
130+
defer m.lifecycleMu.RUnlock()
103131
return m.cancelFunc == nil
104132
}
105133

@@ -124,26 +152,31 @@ func (m *MemoryStore) startCleanupWorker() {
124152
}
125153

126154
// cleanupExpiredItems removes all expired items from the cache.
127-
// This method acquires a write lock on the store while performing the cleanup.
155+
// It iterates over shards and cleans them one by one to avoid global locking.
128156
func (m *MemoryStore) cleanupExpiredItems() {
129-
m.mu.Lock()
130-
defer m.mu.Unlock()
131-
for key, item := range m.store {
132-
if time.Now().After(item.expiresAt) {
133-
delete(m.store, key)
134-
atomic.AddInt64(&m.evictions, 1)
157+
now := time.Now()
158+
for _, s := range m.shards {
159+
// Lock only the current shard
160+
s.mu.Lock()
161+
for key, item := range s.store {
162+
if now.After(item.expiresAt) {
163+
delete(s.store, key)
164+
atomic.AddInt64(&m.evictions, 1)
165+
}
135166
}
167+
s.mu.Unlock()
136168
}
137169
}
138170

139171
// Set stores a raw byte slice in the cache with the specified key and duration.
140172
// The item will automatically expire after the specified duration.
141173
// If an error occurs, it will be returned to the caller.
142174
func (m *MemoryStore) Set(key string, value []byte, duration time.Duration) error {
143-
m.mu.Lock()
144-
defer m.mu.Unlock()
175+
s := m.getShard(key)
176+
s.mu.Lock()
177+
defer s.mu.Unlock()
145178

146-
m.store[key] = item{
179+
s.store[key] = item{
147180
value: value,
148181
expiresAt: time.Now().Add(duration),
149182
}
@@ -175,10 +208,11 @@ func (m *MemoryStore) SetJSON(key string, value interface{}, duration time.Durat
175208
// Returns the value and a boolean indicating whether the key was found.
176209
// If the item has expired, returns (nil, false).
177210
func (m *MemoryStore) Get(key string) ([]byte, bool) {
178-
m.mu.RLock()
179-
defer m.mu.RUnlock()
211+
s := m.getShard(key)
212+
s.mu.RLock()
213+
defer s.mu.RUnlock()
180214

181-
it, exists := m.store[key]
215+
it, exists := s.store[key]
182216
if !exists || time.Now().After(it.expiresAt) {
183217
atomic.AddInt64(&m.misses, 1)
184218
return nil, false
@@ -217,45 +251,68 @@ func (m *MemoryStore) GetJSON(key string, dest interface{}) (bool, error) {
217251
// Delete removes an item from the cache.
218252
// If the key doesn't exist, the operation is a no-op.
219253
func (m *MemoryStore) Delete(key string) {
220-
m.mu.Lock()
221-
defer m.mu.Unlock()
222-
delete(m.store, key)
254+
s := m.getShard(key)
255+
s.mu.Lock()
256+
defer s.mu.Unlock()
257+
delete(s.store, key)
223258
}
224259

225260
// SetMulti stores multiple key-value pairs in the cache.
226-
// This is more efficient than calling Set multiple times as it acquires the lock only once.
261+
// This is more efficient than calling Set multiple times as it groups keys by shard.
227262
// All items will have the same expiration duration.
228263
func (m *MemoryStore) SetMulti(items map[string][]byte, duration time.Duration) error {
229-
m.mu.Lock()
230-
defer m.mu.Unlock()
231-
264+
// Group items by shard
265+
shardItems := make(map[*shard]map[string]item)
232266
expiresAt := time.Now().Add(duration)
267+
233268
for key, value := range items {
234-
m.store[key] = item{
269+
s := m.getShard(key)
270+
if _, ok := shardItems[s]; !ok {
271+
shardItems[s] = make(map[string]item)
272+
}
273+
shardItems[s][key] = item{
235274
value: value,
236275
expiresAt: expiresAt,
237276
}
238277
}
278+
279+
// Apply updates per shard
280+
for s, items := range shardItems {
281+
s.mu.Lock()
282+
for k, v := range items {
283+
s.store[k] = v
284+
}
285+
s.mu.Unlock()
286+
}
239287
return nil
240288
}
241289

242290
// GetMulti retrieves multiple values from the cache.
243291
// It returns a map of found items. Keys that don't exist or are expired are omitted.
244292
func (m *MemoryStore) GetMulti(keys []string) map[string][]byte {
245-
m.mu.RLock()
246-
defer m.mu.RUnlock()
247-
248293
result := make(map[string][]byte)
249294
now := time.Now()
250295

296+
// Group keys by shard
297+
shardKeys := make(map[*shard][]string)
251298
for _, key := range keys {
252-
it, exists := m.store[key]
253-
if exists && !now.After(it.expiresAt) {
254-
result[key] = it.value
255-
atomic.AddInt64(&m.hits, 1)
256-
} else {
257-
atomic.AddInt64(&m.misses, 1)
299+
s := m.getShard(key)
300+
shardKeys[s] = append(shardKeys[s], key)
301+
}
302+
303+
// Retrieve from each shard
304+
for s, keys := range shardKeys {
305+
s.mu.RLock()
306+
for _, key := range keys {
307+
it, exists := s.store[key]
308+
if exists && !now.After(it.expiresAt) {
309+
result[key] = it.value
310+
atomic.AddInt64(&m.hits, 1)
311+
} else {
312+
atomic.AddInt64(&m.misses, 1)
313+
}
258314
}
315+
s.mu.RUnlock()
259316
}
260317

261318
return result
@@ -264,9 +321,12 @@ func (m *MemoryStore) GetMulti(keys []string) map[string][]byte {
264321
// GetMetrics returns the current statistics of the MemoryStore.
265322
// It returns a copy of the metrics to ensure thread safety.
266323
func (m *MemoryStore) GetMetrics() StoreMetrics {
267-
m.mu.RLock()
268-
itemCount := len(m.store)
269-
m.mu.RUnlock()
324+
itemCount := 0
325+
for _, s := range m.shards {
326+
s.mu.RLock()
327+
itemCount += len(s.store)
328+
s.mu.RUnlock()
329+
}
270330

271331
return StoreMetrics{
272332
Items: itemCount,

0 commit comments

Comments
 (0)