-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemory.go
More file actions
153 lines (124 loc) · 3.28 KB
/
memory.go
File metadata and controls
153 lines (124 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package goqueue
import (
"context"
"sync"
)
// MemoryBackend is an in-memory queue backend using Go channels.
// This is ideal for testing, development, and single-process applications.
type MemoryBackend struct {
queues map[string]chan *Envelope
pendingMsgs map[string]*Envelope // messageID -> envelope
mu sync.RWMutex
closed bool
bufferSize int
}
// MemoryOption is a function that configures the memory backend
type MemoryOption func(*MemoryBackend)
// WithBufferSize sets the channel buffer size for each queue
func WithBufferSize(size int) MemoryOption {
return func(b *MemoryBackend) {
if size > 0 {
b.bufferSize = size
}
}
}
// NewMemoryBackend creates a new in-memory backend
func NewMemoryBackend(opts ...MemoryOption) *MemoryBackend {
b := &MemoryBackend{
queues: make(map[string]chan *Envelope),
pendingMsgs: make(map[string]*Envelope),
bufferSize: 100, // default buffer size
}
for _, opt := range opts {
opt(b)
}
return b
}
// Publish sends a message envelope to the specified queue
func (b *MemoryBackend) Publish(ctx context.Context, queue string, envelope *Envelope) error {
b.mu.Lock()
if b.closed {
b.mu.Unlock()
return ErrQueueStopped
}
// Get or create the queue channel
queueChan, exists := b.queues[queue]
if !exists {
queueChan = make(chan *Envelope, b.bufferSize)
b.queues[queue] = queueChan
}
// Store the envelope as pending
b.pendingMsgs[envelope.ID] = envelope
b.mu.Unlock()
// Send to channel (non-blocking with context)
select {
case queueChan <- envelope:
return nil
case <-ctx.Done():
// Remove from pending if context cancelled
b.mu.Lock()
delete(b.pendingMsgs, envelope.ID)
b.mu.Unlock()
return ctx.Err()
}
}
// Subscribe creates a subscription to the specified queue
func (b *MemoryBackend) Subscribe(ctx context.Context, queue string) (<-chan *Envelope, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return nil, ErrQueueStopped
}
// Get or create the queue channel
queueChan, exists := b.queues[queue]
if !exists {
queueChan = make(chan *Envelope, b.bufferSize)
b.queues[queue] = queueChan
}
return queueChan, nil
}
// Ack acknowledges successful processing of a message
func (b *MemoryBackend) Ack(ctx context.Context, messageID string) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return ErrQueueStopped
}
// Remove from pending messages
if _, exists := b.pendingMsgs[messageID]; !exists {
return ErrMessageNotFound
}
delete(b.pendingMsgs, messageID)
return nil
}
// Nack indicates that a message failed to process
func (b *MemoryBackend) Nack(ctx context.Context, messageID string) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return ErrQueueStopped
}
// For in-memory backend, we just remove it from pending
// The retry logic is handled by the worker pool
if _, exists := b.pendingMsgs[messageID]; !exists {
return ErrMessageNotFound
}
delete(b.pendingMsgs, messageID)
return nil
}
// Close releases resources held by the backend
func (b *MemoryBackend) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return nil
}
b.closed = true
// Close all queue channels
for _, queueChan := range b.queues {
close(queueChan)
}
// Clear pending messages
b.pendingMsgs = make(map[string]*Envelope)
return nil
}