-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcache.go
More file actions
701 lines (609 loc) · 18.5 KB
/
cache.go
File metadata and controls
701 lines (609 loc) · 18.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
// Copyright (C) 2025 SquareCows
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package pages_server
import (
"bufio"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
)
// Cache defines the interface for caching file content.
type Cache interface {
Get(key string) ([]byte, bool)
Set(key string, value []byte)
Delete(key string)
DeleteByPrefix(prefix string)
Clear()
}
// MemoryCache implements an in-memory cache with TTL support.
type MemoryCache struct {
mu sync.RWMutex
items map[string]*cacheItem
ttl time.Duration
janitor *janitor
}
// cacheItem represents a cached item with expiration.
type cacheItem struct {
value []byte
expiration int64
}
// janitor periodically cleans up expired items.
type janitor struct {
interval time.Duration
stop chan bool
}
// NewMemoryCache creates a new in-memory cache.
func NewMemoryCache(ttlSeconds int) *MemoryCache {
ttl := time.Duration(ttlSeconds) * time.Second
mc := &MemoryCache{
items: make(map[string]*cacheItem),
ttl: ttl,
}
// Start janitor for cleanup only if TTL is positive
if ttl > 0 {
interval := ttl / 2
if interval < 1*time.Second {
interval = 1 * time.Second // Minimum interval
}
mc.janitor = &janitor{
interval: interval,
stop: make(chan bool),
}
go mc.janitor.run(mc)
}
return mc
}
// Get retrieves a value from the cache.
func (mc *MemoryCache) Get(key string) ([]byte, bool) {
mc.mu.RLock()
defer mc.mu.RUnlock()
item, found := mc.items[key]
if !found {
return nil, false
}
// Check if item has expired (expiration = -1 means never expires)
if item.expiration != -1 && time.Now().UnixNano() > item.expiration {
return nil, false
}
return item.value, true
}
// Set stores a value in the cache.
func (mc *MemoryCache) Set(key string, value []byte) {
mc.mu.Lock()
defer mc.mu.Unlock()
var expiration int64
if mc.ttl <= 0 {
// TTL = 0 or negative means never expire
expiration = -1
} else {
expiration = time.Now().Add(mc.ttl).UnixNano()
}
mc.items[key] = &cacheItem{
value: value,
expiration: expiration,
}
}
// Delete removes a value from the cache.
func (mc *MemoryCache) Delete(key string) {
mc.mu.Lock()
defer mc.mu.Unlock()
delete(mc.items, key)
}
// DeleteByPrefix removes all items from the cache whose keys start with the given prefix.
func (mc *MemoryCache) DeleteByPrefix(prefix string) {
mc.mu.Lock()
defer mc.mu.Unlock()
// Iterate through all items and delete those with matching prefix
for key := range mc.items {
if strings.HasPrefix(key, prefix) {
delete(mc.items, key)
}
}
}
// Clear removes all items from the cache.
func (mc *MemoryCache) Clear() {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.items = make(map[string]*cacheItem)
}
// deleteExpired removes expired items from the cache.
func (mc *MemoryCache) deleteExpired() {
mc.mu.Lock()
defer mc.mu.Unlock()
now := time.Now().UnixNano()
for key, item := range mc.items {
if now > item.expiration {
delete(mc.items, key)
}
}
}
// run starts the janitor cleanup routine.
func (j *janitor) run(mc *MemoryCache) {
ticker := time.NewTicker(j.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
mc.deleteExpired()
case <-j.stop:
return
}
}
}
// Stop stops the janitor cleanup routine.
func (mc *MemoryCache) Stop() {
if mc.janitor != nil {
mc.janitor.stop <- true
}
}
// RedisCache implements a Redis-based cache using the RESP protocol.
// This implementation uses only Go standard library for Yaegi compatibility.
type RedisCache struct {
host string
port int
password string
ttl int
mu sync.RWMutex
fallback *MemoryCache // Used only if Redis connection fails
connPool chan net.Conn // Buffered channel for connection pooling
poolSize int // Size of the connection pool
maxConnections int // Maximum total connections allowed
connWaitTimeout time.Duration // Timeout for waiting for a connection
timeout time.Duration // Timeout for individual Redis operations
connSemaphore chan struct{} // Semaphore to limit total connections
}
// NewRedisCache creates a new Redis cache with connection pooling.
// Parameters:
// - host: Redis server hostname
// - port: Redis server port
// - password: Redis password (empty string for no authentication)
// - ttlSeconds: Default TTL for cached items in seconds
// - poolSize: Number of connections to maintain in the pool
// - maxConnections: Maximum total connections allowed (includes pool + active)
// - connWaitTimeoutSeconds: Timeout in seconds for waiting for an available connection
func NewRedisCache(host string, port int, password string, ttlSeconds int, poolSize int, maxConnections int, connWaitTimeoutSeconds int) *RedisCache {
rc := &RedisCache{
host: host,
port: port,
password: password,
ttl: ttlSeconds,
fallback: NewMemoryCache(ttlSeconds),
poolSize: poolSize,
maxConnections: maxConnections,
connWaitTimeout: time.Duration(connWaitTimeoutSeconds) * time.Second,
timeout: 5 * time.Second,
}
// Initialize connection pool with buffered channel
rc.connPool = make(chan net.Conn, rc.poolSize)
// Initialize semaphore to limit total connections
// The semaphore has maxConnections slots - each connection acquisition takes one slot
rc.connSemaphore = make(chan struct{}, rc.maxConnections)
// Pre-populate pool with connections
// If initial connections fail, we'll fall back to in-memory cache
for i := 0; i < rc.poolSize; i++ {
conn, err := rc.newConnection()
if err == nil {
rc.connPool <- conn
// Acquire semaphore slot for this pooled connection
rc.connSemaphore <- struct{}{}
}
}
return rc
}
// newConnection creates a new Redis connection and authenticates if password is set.
func (rc *RedisCache) newConnection() (net.Conn, error) {
// Connect to Redis server
// Use net.JoinHostPort for proper IPv6 support
addr := net.JoinHostPort(rc.host, strconv.Itoa(rc.port))
conn, err := net.DialTimeout("tcp", addr, rc.timeout)
if err != nil {
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
}
// Authenticate if password is provided
if rc.password != "" {
err = rc.sendCommand(conn, "AUTH", rc.password)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to authenticate: %w", err)
}
// Read authentication response
_, err = rc.readResponse(conn)
if err != nil {
conn.Close()
return nil, fmt.Errorf("authentication failed: %w", err)
}
}
return conn, nil
}
// getConnection retrieves a connection from the pool or creates a new one.
// This implementation uses a semaphore to limit the total number of connections.
// If the maximum number of connections is reached, it blocks until a connection is available
// or the wait timeout is exceeded.
func (rc *RedisCache) getConnection() (net.Conn, error) {
// First, try to get a connection from the pool without blocking
select {
case conn := <-rc.connPool:
// Got a pooled connection - test it with PING
err := rc.sendCommand(conn, "PING")
if err != nil {
conn.Close()
// Connection is dead, release semaphore and try to create a new one
<-rc.connSemaphore
return rc.createNewConnectionWithSemaphore()
}
// Read PING response
_, err = rc.readResponse(conn)
if err != nil {
conn.Close()
// Connection is dead, release semaphore and try to create a new one
<-rc.connSemaphore
return rc.createNewConnectionWithSemaphore()
}
// Connection is healthy, return it
return conn, nil
default:
// Pool is empty, need to create a new connection
// Try to acquire semaphore with timeout
return rc.createNewConnectionWithSemaphore()
}
}
// createNewConnectionWithSemaphore creates a new connection while respecting the semaphore limit.
// It will block up to connWaitTimeout waiting for a semaphore slot.
func (rc *RedisCache) createNewConnectionWithSemaphore() (net.Conn, error) {
// Try to acquire semaphore slot with timeout
select {
case rc.connSemaphore <- struct{}{}:
// Successfully acquired semaphore slot, create connection
conn, err := rc.newConnection()
if err != nil {
// Failed to create connection, release semaphore slot
<-rc.connSemaphore
return nil, err
}
return conn, nil
case <-time.After(rc.connWaitTimeout):
// Timeout waiting for semaphore slot - all connections are in use
return nil, fmt.Errorf("connection wait timeout: all %d connections in use", rc.maxConnections)
}
}
// releaseConnection returns a connection to the pool or closes it if the pool is full.
// Always releases the semaphore slot to allow new connections to be created.
func (rc *RedisCache) releaseConnection(conn net.Conn) {
if conn == nil {
// No connection to release, but still release semaphore if it was acquired
// This handles cases where connection creation failed but semaphore was acquired
return
}
// Try to return connection to pool
select {
case rc.connPool <- conn:
// Connection successfully returned to pool
// Semaphore slot is still held by this pooled connection
default:
// Pool is full, close the connection and release semaphore
conn.Close()
<-rc.connSemaphore
}
}
// sendCommand sends a Redis command using RESP protocol.
// RESP format: *<number of arguments>\r\n$<length of argument 1>\r\n<argument 1>\r\n...
func (rc *RedisCache) sendCommand(conn net.Conn, args ...string) error {
// Set write deadline
conn.SetWriteDeadline(time.Now().Add(rc.timeout))
// Build RESP array
var cmd strings.Builder
cmd.WriteString(fmt.Sprintf("*%d\r\n", len(args)))
for _, arg := range args {
cmd.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg))
}
// Send command
_, err := conn.Write([]byte(cmd.String()))
return err
}
// readResponse reads a Redis response using RESP protocol.
// Returns the response as interface{} which can be:
// - string (for simple strings and bulk strings)
// - int64 (for integers)
// - error (for errors)
// - nil (for null bulk strings)
// - []interface{} (for arrays)
func (rc *RedisCache) readResponse(conn net.Conn) (interface{}, error) {
// Set read deadline
conn.SetReadDeadline(time.Now().Add(rc.timeout))
reader := bufio.NewReader(conn)
// Read first byte to determine response type
typeByte, err := reader.ReadByte()
if err != nil {
return nil, fmt.Errorf("failed to read response type: %w", err)
}
switch typeByte {
case '+': // Simple string
line, err := reader.ReadString('\n')
if err != nil {
return nil, fmt.Errorf("failed to read simple string: %w", err)
}
return strings.TrimSuffix(line, "\r\n"), nil
case '-': // Error
line, err := reader.ReadString('\n')
if err != nil {
return nil, fmt.Errorf("failed to read error: %w", err)
}
return nil, fmt.Errorf("redis error: %s", strings.TrimSuffix(line, "\r\n"))
case ':': // Integer
line, err := reader.ReadString('\n')
if err != nil {
return nil, fmt.Errorf("failed to read integer: %w", err)
}
val, err := strconv.ParseInt(strings.TrimSuffix(line, "\r\n"), 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse integer: %w", err)
}
return val, nil
case '$': // Bulk string
// Read length
line, err := reader.ReadString('\n')
if err != nil {
return nil, fmt.Errorf("failed to read bulk string length: %w", err)
}
length, err := strconv.Atoi(strings.TrimSuffix(line, "\r\n"))
if err != nil {
return nil, fmt.Errorf("failed to parse bulk string length: %w", err)
}
// -1 indicates null
if length == -1 {
return nil, nil
}
// Read the actual data
data := make([]byte, length+2) // +2 for \r\n
_, err = io.ReadFull(reader, data)
if err != nil {
return nil, fmt.Errorf("failed to read bulk string data: %w", err)
}
return data[:length], nil // Return without \r\n
case '*': // Array
// Read number of elements
line, err := reader.ReadString('\n')
if err != nil {
return nil, fmt.Errorf("failed to read array length: %w", err)
}
count, err := strconv.Atoi(strings.TrimSuffix(line, "\r\n"))
if err != nil {
return nil, fmt.Errorf("failed to parse array length: %w", err)
}
// Read each element recursively
result := make([]interface{}, count)
for i := 0; i < count; i++ {
elem, err := rc.readResponse(conn)
if err != nil {
return nil, fmt.Errorf("failed to read array element %d: %w", i, err)
}
result[i] = elem
}
return result, nil
default:
return nil, fmt.Errorf("unknown response type: %c", typeByte)
}
}
// Get retrieves a value from Redis cache.
func (rc *RedisCache) Get(key string) ([]byte, bool) {
conn, err := rc.getConnection()
if err != nil {
// Fall back to in-memory cache if Redis is unavailable
return rc.fallback.Get(key)
}
defer rc.releaseConnection(conn)
// Send GET command
err = rc.sendCommand(conn, "GET", key)
if err != nil {
return rc.fallback.Get(key)
}
// Read response
resp, err := rc.readResponse(conn)
if err != nil {
return rc.fallback.Get(key)
}
// Handle nil response (key not found)
if resp == nil {
return nil, false
}
// Convert response to []byte
if data, ok := resp.([]byte); ok {
return data, true
}
return nil, false
}
// Set stores a value in Redis cache with TTL.
func (rc *RedisCache) Set(key string, value []byte) {
rc.SetWithTTL(key, value, rc.ttl)
}
// SetWithTTL stores a value in Redis cache with a specific TTL in seconds.
// This allows storing values with different TTLs than the cache's default.
func (rc *RedisCache) SetWithTTL(key string, value []byte, ttlSeconds int) error {
conn, err := rc.getConnection()
if err != nil {
// Fall back to in-memory cache if Redis is unavailable
rc.fallback.Set(key, value)
return fmt.Errorf("failed to get Redis connection: %w", err)
}
defer rc.releaseConnection(conn)
// If TTL is 0 or negative, store without expiration (persistent)
// Otherwise use SETEX for TTL-based expiration
if ttlSeconds <= 0 {
// Send SET command (no expiration)
// SET key value
err = rc.sendCommand(conn, "SET", key, string(value))
if err != nil {
rc.fallback.Set(key, value)
return fmt.Errorf("failed to send SET command: %w", err)
}
} else {
// Send SETEX command (SET with expiration)
// SETEX key seconds value
err = rc.sendCommand(conn, "SETEX", key, strconv.Itoa(ttlSeconds), string(value))
if err != nil {
rc.fallback.Set(key, value)
return fmt.Errorf("failed to send SETEX command: %w", err)
}
}
// Read response (should be +OK)
_, err = rc.readResponse(conn)
if err != nil {
rc.fallback.Set(key, value)
return fmt.Errorf("failed to read response: %w", err)
}
// Also update fallback cache for consistency
rc.fallback.Set(key, value)
return nil
}
// Delete removes a value from Redis cache.
func (rc *RedisCache) Delete(key string) {
conn, err := rc.getConnection()
if err != nil {
// Fall back to in-memory cache if Redis is unavailable
rc.fallback.Delete(key)
return
}
defer rc.releaseConnection(conn)
// Send DEL command
err = rc.sendCommand(conn, "DEL", key)
if err != nil {
rc.fallback.Delete(key)
return
}
// Read response (returns number of keys deleted)
_, err = rc.readResponse(conn)
if err != nil {
rc.fallback.Delete(key)
return
}
// Also delete from fallback cache
rc.fallback.Delete(key)
}
// DeleteByPrefix removes all items from Redis cache whose keys start with the given prefix.
// Uses SCAN with MATCH pattern to avoid blocking the server (unlike KEYS).
func (rc *RedisCache) DeleteByPrefix(prefix string) {
conn, err := rc.getConnection()
if err != nil {
// Fall back to in-memory cache if Redis is unavailable
rc.fallback.DeleteByPrefix(prefix)
return
}
defer rc.releaseConnection(conn)
// Use SCAN to iterate through matching keys
// SCAN cursor MATCH pattern COUNT count
cursor := "0"
pattern := prefix + "*"
keysToDelete := []string{}
// Loop until cursor returns to 0
for {
// Send SCAN command
err = rc.sendCommand(conn, "SCAN", cursor, "MATCH", pattern, "COUNT", "100")
if err != nil {
rc.fallback.DeleteByPrefix(prefix)
return
}
// Read SCAN response: [cursor_string, [key1, key2, ...]]
resp, err := rc.readResponse(conn)
if err != nil {
rc.fallback.DeleteByPrefix(prefix)
return
}
// Parse response array
respArray, ok := resp.([]interface{})
if !ok || len(respArray) != 2 {
rc.fallback.DeleteByPrefix(prefix)
return
}
// Extract new cursor
cursorBytes, ok := respArray[0].([]byte)
if !ok {
rc.fallback.DeleteByPrefix(prefix)
return
}
cursor = string(cursorBytes)
// Extract keys array
keysArray, ok := respArray[1].([]interface{})
if !ok {
rc.fallback.DeleteByPrefix(prefix)
return
}
// Collect keys to delete
for _, keyInterface := range keysArray {
if keyBytes, ok := keyInterface.([]byte); ok {
keysToDelete = append(keysToDelete, string(keyBytes))
}
}
// If cursor is "0", we've completed the scan
if cursor == "0" {
break
}
}
// Delete all collected keys
for _, key := range keysToDelete {
err = rc.sendCommand(conn, "DEL", key)
if err != nil {
continue // Try to delete remaining keys even if one fails
}
// Read DEL response
_, err = rc.readResponse(conn)
if err != nil {
continue
}
}
// Also delete from fallback cache
rc.fallback.DeleteByPrefix(prefix)
}
// Clear removes all items from Redis cache.
// Note: This uses FLUSHDB which clears the entire database.
// In production, you may want to use key prefixes and delete by pattern instead.
func (rc *RedisCache) Clear() {
conn, err := rc.getConnection()
if err != nil {
// Fall back to in-memory cache if Redis is unavailable
rc.fallback.Clear()
return
}
defer rc.releaseConnection(conn)
// Send FLUSHDB command
err = rc.sendCommand(conn, "FLUSHDB")
if err != nil {
rc.fallback.Clear()
return
}
// Read response
_, err = rc.readResponse(conn)
if err != nil {
rc.fallback.Clear()
return
}
// Also clear fallback cache
rc.fallback.Clear()
}
// Close closes all connections in the pool.
func (rc *RedisCache) Close() {
close(rc.connPool)
for conn := range rc.connPool {
if conn != nil {
conn.Close()
}
}
if rc.fallback != nil {
rc.fallback.Stop()
}
}