Skip to content

Commit 8d4d338

Browse files
committed
feat(queue/sql): add subscriber with partition leasing
## Why? Implement the Subscriber interface with distributed partition leasing to enable reliable message consumption across multiple workers. ## What? - Subscriber with automatic partition discovery and lease acquisition - Per-partition polling with ack/nack support - DLQ handling for messages exceeding retry limits - Lease renewal and graceful shutdown - Fast test execution (~4s) with comprehensive coverage ## Test Plan - Message delivery and ordering verified - Ack/nack operations work correctly - DLQ handling tested (max retries exceeded) - Multi-partition and multi-consumer scenarios validated - Lease management (acquire, renew, release) tested
1 parent 5925535 commit 8d4d338

3 files changed

Lines changed: 914 additions & 0 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"offset_store.go",
1010
"partition_lease_store.go",
1111
"publisher.go",
12+
"subscriber.go",
1213
"test_helpers.go",
1314
],
1415
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
@@ -36,6 +37,7 @@ go_test(
3637
"offset_store_test.go",
3738
"partition_lease_store_test.go",
3839
"publisher_test.go",
40+
"subscriber_test.go",
3941
],
4042
data = ["//schema/queue/mysql:schema.sql"],
4143
embed = [":sql"],

extensions/queue/sql/subscriber.go

Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
package sql
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/uber-go/tally/v4"
10+
"go.uber.org/zap"
11+
12+
"github.com/uber/submitqueue/entities/queue"
13+
)
14+
15+
type subscriber struct {
16+
config Config
17+
logger *zap.SugaredLogger
18+
metrics tally.Scope
19+
messageRepo MessageStore
20+
offsetRepo OffsetStore
21+
leaseRepo PartitionLeaseStore
22+
mu sync.RWMutex
23+
closed bool
24+
25+
// Active subscriptions
26+
subscriptions map[string]*subscription
27+
subMu sync.Mutex
28+
}
29+
30+
type subscription struct {
31+
topic string
32+
deliveryCh chan *queue.Delivery
33+
cancelFunc context.CancelFunc
34+
wg sync.WaitGroup
35+
}
36+
37+
func newSubscriber(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageRepo MessageStore, offsetRepo OffsetStore, leaseRepo PartitionLeaseStore) *subscriber {
38+
logger.Infow("created subscriber",
39+
"consumer_group", config.ConsumerGroup,
40+
"worker_id", config.WorkerID,
41+
"poll_interval", config.PollInterval,
42+
"batch_size", config.BatchSize,
43+
"max_retry_attempts", config.Retry.MaxAttempts,
44+
"lease_renewal_interval", config.LeaseRenewalInterval,
45+
)
46+
47+
return &subscriber{
48+
config: config,
49+
logger: logger,
50+
metrics: metrics,
51+
messageRepo: messageRepo,
52+
offsetRepo: offsetRepo,
53+
leaseRepo: leaseRepo,
54+
subscriptions: make(map[string]*subscription),
55+
}
56+
}
57+
58+
// Subscribe starts consuming messages from the specified topic
59+
func (s *subscriber) Subscribe(ctx context.Context, topic string) (<-chan *queue.Delivery, error) {
60+
s.mu.RLock()
61+
if s.closed {
62+
s.mu.RUnlock()
63+
s.logger.Errorw("subscribe failed: subscriber is closed", "topic", topic)
64+
return nil, fmt.Errorf("subscriber is closed")
65+
}
66+
s.mu.RUnlock()
67+
68+
// Validate topic name
69+
if err := validateTopicName(topic); err != nil {
70+
s.logger.Errorw("subscribe failed: invalid topic name", "topic", topic, "error", err)
71+
return nil, err
72+
}
73+
74+
s.subMu.Lock()
75+
defer s.subMu.Unlock()
76+
77+
// Check if already subscribed
78+
if sub, exists := s.subscriptions[topic]; exists {
79+
s.logger.Debugw("reusing existing subscription", "topic", topic)
80+
return sub.deliveryCh, nil
81+
}
82+
83+
s.logger.Infow("creating new subscription", "topic", topic)
84+
85+
// Create new subscription
86+
subCtx, cancel := context.WithCancel(ctx)
87+
sub := &subscription{
88+
topic: topic,
89+
deliveryCh: make(chan *queue.Delivery),
90+
cancelFunc: cancel,
91+
}
92+
93+
s.subscriptions[topic] = sub
94+
95+
// Track active subscription
96+
s.metrics.Tagged(map[string]string{"topic": topic}).Gauge("active_subscriptions").Update(1)
97+
98+
// Start partition leasing and polling goroutine
99+
sub.wg.Add(1)
100+
go s.managePartitions(subCtx, sub)
101+
102+
s.logger.Infow("subscription created", "topic", topic)
103+
return sub.deliveryCh, nil
104+
}
105+
106+
// managePartitions discovers partitions, acquires leases, and polls messages
107+
func (s *subscriber) managePartitions(ctx context.Context, sub *subscription) {
108+
defer sub.wg.Done()
109+
defer close(sub.deliveryCh)
110+
111+
pollTicker := time.NewTicker(s.config.PollInterval)
112+
defer pollTicker.Stop()
113+
114+
leaseTicker := time.NewTicker(s.config.LeaseRenewalInterval)
115+
defer leaseTicker.Stop()
116+
117+
for {
118+
select {
119+
case <-ctx.Done():
120+
// Release all leases on shutdown
121+
s.releaseAllLeases(context.Background(), sub.topic)
122+
return
123+
124+
case <-leaseTicker.C:
125+
// Renew existing leases
126+
s.renewLeases(ctx, sub.topic)
127+
128+
case <-pollTicker.C:
129+
// Fetch and deliver messages from leased partitions
130+
s.pollLeasedPartitions(ctx, sub)
131+
}
132+
}
133+
}
134+
135+
// renewLeases renews leases for all partitions owned by this worker
136+
func (s *subscriber) renewLeases(ctx context.Context, topic string) {
137+
leasedPartitions, err := s.leaseRepo.GetLeasedPartitions(ctx, topic)
138+
if err != nil {
139+
return
140+
}
141+
142+
for _, partitionKey := range leasedPartitions {
143+
s.leaseRepo.RenewLease(ctx, topic, partitionKey)
144+
}
145+
}
146+
147+
// releaseAllLeases releases all leases for a topic
148+
func (s *subscriber) releaseAllLeases(ctx context.Context, topic string) {
149+
leasedPartitions, err := s.leaseRepo.GetLeasedPartitions(ctx, topic)
150+
if err != nil {
151+
return
152+
}
153+
154+
for _, partitionKey := range leasedPartitions {
155+
s.leaseRepo.ReleaseLease(ctx, topic, partitionKey)
156+
}
157+
}
158+
159+
// pollLeasedPartitions fetches and delivers messages from all leased partitions
160+
func (s *subscriber) pollLeasedPartitions(ctx context.Context, sub *subscription) {
161+
// Discover and try to acquire leases for new partitions
162+
acquiredCount, err := s.leaseRepo.DiscoverAndAcquirePartitions(ctx, sub.topic)
163+
if err == nil && acquiredCount > 0 {
164+
s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("leases_acquired").Inc(int64(acquiredCount))
165+
}
166+
167+
// Get currently leased partitions
168+
leasedPartitions, err := s.leaseRepo.GetLeasedPartitions(ctx, sub.topic)
169+
if err != nil {
170+
s.logger.Errorw("failed to get leased partitions", "topic", sub.topic, "error", err)
171+
return
172+
}
173+
174+
// Poll each leased partition
175+
for _, partitionKey := range leasedPartitions {
176+
s.fetchAndDeliverPartition(ctx, sub, partitionKey)
177+
}
178+
}
179+
180+
// fetchAndDeliverPartition fetches messages from a specific partition and delivers them
181+
func (s *subscriber) fetchAndDeliverPartition(ctx context.Context, sub *subscription, partitionKey string) {
182+
start := time.Now()
183+
184+
// Initialize offset for this partition if needed
185+
if err := s.offsetRepo.Initialize(ctx, sub.topic, partitionKey); err != nil {
186+
s.logger.Errorw("failed to initialize offset", "topic", sub.topic, "partition_key", partitionKey, "error", err)
187+
return
188+
}
189+
190+
// Get current offset for this partition
191+
currentOffset, err := s.offsetRepo.GetAckedOffset(ctx, sub.topic, partitionKey)
192+
if err != nil {
193+
s.logger.Errorw("failed to get current offset", "topic", sub.topic, "partition_key", partitionKey, "error", err)
194+
return
195+
}
196+
197+
// Fetch messages for this partition
198+
rows, err := s.messageRepo.FetchByOffset(ctx, sub.topic, partitionKey, currentOffset, s.config.BatchSize)
199+
if err != nil {
200+
return
201+
}
202+
203+
messageCount := 0
204+
for _, row := range rows {
205+
// Check if message has exceeded retry limit (persistent retry_count from DB)
206+
if row.RetryCount >= s.config.Retry.MaxAttempts {
207+
s.logger.Warnw("message exceeded retry limit",
208+
"topic", sub.topic,
209+
"partition_key", partitionKey,
210+
"message_id", row.ID,
211+
"retry_count", row.RetryCount,
212+
)
213+
214+
// Move to DLQ if enabled
215+
if s.config.DLQ.Enabled {
216+
if err := s.messageRepo.MoveToDLQ(ctx, sub.topic, row.ID, row.RetryCount, "exceeded retry limit"); err != nil {
217+
s.logger.Errorw("failed to move message to DLQ",
218+
"topic", sub.topic,
219+
"message_id", row.ID,
220+
"error", err,
221+
)
222+
} else {
223+
s.logger.Infow("moved message to DLQ",
224+
"topic", sub.topic,
225+
"message_id", row.ID,
226+
"retry_count", row.RetryCount,
227+
)
228+
s.metrics.Tagged(map[string]string{"topic": sub.topic}).Counter("messages_moved_to_dlq").Inc(1)
229+
230+
// Update offset since message is now processed (moved to DLQ)
231+
if err := s.offsetRepo.UpdateAckedOffset(ctx, sub.topic, partitionKey, row.Offset); err != nil {
232+
s.logger.Errorw("failed to update offset after DLQ move",
233+
"topic", sub.topic,
234+
"partition_key", partitionKey,
235+
"offset", row.Offset,
236+
"error", err,
237+
)
238+
}
239+
}
240+
}
241+
continue
242+
}
243+
244+
// Create message
245+
msg := queue.NewMessage(row.ID, row.Payload)
246+
msg.Metadata = row.Metadata
247+
msg.PartitionKey = row.PartitionKey
248+
msg.PublishedAt = row.PublishedAt
249+
250+
// Calculate message age for metrics
251+
messageAge := time.Since(row.PublishedAt)
252+
s.metrics.Tagged(map[string]string{"topic": sub.topic}).Timer("message_age").Record(messageAge)
253+
254+
// Create delivery with ack/nack handlers
255+
delivery := queue.NewDelivery(
256+
msg,
257+
s.createAckFunc(sub.topic, partitionKey, row.Offset, msg.ID),
258+
s.createNackFunc(sub.topic, partitionKey, msg.ID, row.Offset),
259+
s.createExtendVisibilityTimeoutFunc(sub.topic, msg.ID),
260+
)
261+
262+
// Deliver message
263+
select {
264+
case sub.deliveryCh <- delivery:
265+
messageCount++
266+
case <-ctx.Done():
267+
return
268+
}
269+
}
270+
271+
// Record metrics
272+
if messageCount > 0 {
273+
elapsed := time.Since(start)
274+
tags := map[string]string{"topic": sub.topic, "partition_key": partitionKey}
275+
s.metrics.Tagged(tags).Counter("messages_received").Inc(int64(messageCount))
276+
s.metrics.Tagged(map[string]string{"topic": sub.topic}).Timer("poll_latency").Record(elapsed)
277+
278+
s.logger.Debugw("delivered messages",
279+
"topic", sub.topic,
280+
"partition_key", partitionKey,
281+
"count", messageCount,
282+
"duration_ms", elapsed.Milliseconds(),
283+
)
284+
}
285+
}
286+
287+
// createAckFunc creates an ack function for a message
288+
func (s *subscriber) createAckFunc(topic string, partitionKey string, offset int64, messageID string) func(ctx context.Context) error {
289+
return func(ctx context.Context) error {
290+
if err := s.offsetRepo.AckMessage(ctx, topic, partitionKey, messageID, offset, s.messageRepo); err != nil {
291+
return err
292+
}
293+
294+
s.metrics.Tagged(map[string]string{"topic": topic}).Counter("messages_acked").Inc(1)
295+
return nil
296+
}
297+
}
298+
299+
// createNackFunc creates a nack function for a message with DLQ support
300+
// Note: Retry tracking is now persistent via retry_count in database
301+
func (s *subscriber) createNackFunc(topic string, partitionKey string, messageID string, offset int64) func(ctx context.Context, requeueAfter time.Duration) error {
302+
return func(ctx context.Context, requeueAfter time.Duration) error {
303+
// Set visibility timeout to make message visible after requeueAfter duration
304+
if err := s.messageRepo.SetVisibilityTimeout(ctx, topic, messageID, requeueAfter); err != nil {
305+
s.logger.Errorw("failed to set visibility timeout for nack",
306+
"topic", topic,
307+
"partition_key", partitionKey,
308+
"message_id", messageID,
309+
"error", err,
310+
)
311+
return err
312+
}
313+
314+
s.metrics.Tagged(map[string]string{"topic": topic}).Counter("messages_nacked").Inc(1)
315+
316+
s.logger.Infow("message nacked",
317+
"topic", topic,
318+
"partition_key", partitionKey,
319+
"message_id", messageID,
320+
"requeue_after", requeueAfter,
321+
)
322+
323+
return nil
324+
}
325+
}
326+
327+
// createExtendVisibilityTimeoutFunc creates a function to extend visibility timeout for a message
328+
func (s *subscriber) createExtendVisibilityTimeoutFunc(topic string, messageID string) func(ctx context.Context, duration time.Duration) error {
329+
return func(ctx context.Context, duration time.Duration) error {
330+
if err := s.messageRepo.SetVisibilityTimeout(ctx, topic, messageID, duration); err != nil {
331+
return err
332+
}
333+
334+
s.metrics.Tagged(map[string]string{"topic": topic}).Counter("visibility_extended").Inc(1)
335+
return nil
336+
}
337+
}
338+
339+
// Close gracefully shuts down the subscriber
340+
func (s *subscriber) Close() error {
341+
s.mu.Lock()
342+
defer s.mu.Unlock()
343+
344+
if s.closed {
345+
return nil
346+
}
347+
348+
s.logger.Info("closing subscriber")
349+
350+
s.subMu.Lock()
351+
defer s.subMu.Unlock()
352+
353+
// Cancel all subscriptions
354+
for topic, sub := range s.subscriptions {
355+
s.logger.Debugw("closing subscription", "topic", topic)
356+
sub.cancelFunc()
357+
sub.wg.Wait()
358+
359+
// Update metrics
360+
s.metrics.Tagged(map[string]string{"topic": topic}).Gauge("active_subscriptions").Update(0)
361+
}
362+
363+
s.subscriptions = make(map[string]*subscription)
364+
365+
s.closed = true
366+
367+
s.logger.Info("subscriber closed")
368+
return nil
369+
}

0 commit comments

Comments
 (0)