Skip to content

Commit 0e6c4cc

Browse files
committed
feat(queue/sql): add factory and documentation
Add Factory implementation (sql.go) for SQL queue that provides Publisher and Subscriber instances with proper lifecycle management.
1 parent 4c8da50 commit 0e6c4cc

5 files changed

Lines changed: 493 additions & 1 deletion

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
"offset_store.go",
1212
"partition_lease_store.go",
1313
"publisher.go",
14+
"sql.go",
1415
"stores.go",
1516
"subscriber.go",
1617
"validation.go",
@@ -34,6 +35,7 @@ go_test(
3435
"offset_store_test.go",
3536
"partition_lease_store_test.go",
3637
"publisher_test.go",
38+
"sql_test.go",
3739
"subscriber_test.go",
3840
],
3941
embed = [":sql"],

extensions/queue/sql/README.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# SQL Queue Implementation
2+
3+
MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery.
4+
5+
## Key Features
6+
7+
- **Partition leasing** - Workers coordinate via database leases with automatic failover
8+
- **Visibility timeout** - Messages retry automatically if worker crashes
9+
- **At-least-once delivery** - Offset tracking for crash recovery
10+
- **Dead letter queue** - Failed messages moved to DLQ after max retries
11+
12+
## Quick Start
13+
14+
```go
15+
import (
16+
"database/sql"
17+
_ "github.com/go-sql-driver/mysql"
18+
queueSQL "github.com/uber/submitqueue/extensions/queue/sql"
19+
"github.com/uber/submitqueue/entities/queue"
20+
)
21+
22+
// Setup
23+
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db")
24+
q, _ := queueSQL.NewQueue(queueSQL.Params{
25+
DB: db,
26+
Logger: logger,
27+
Config: queueSQL.DefaultConfig("orchestrator", "worker-1"),
28+
})
29+
defer q.Close()
30+
31+
// Publish
32+
msg := queue.NewMessage("msg-id", []byte(`{"data": "value"}`))
33+
msg.PartitionKey = "repo-123" // Required for ordering
34+
q.Publisher().Publish(ctx, "merge_events", msg)
35+
36+
// Subscribe
37+
deliveryCh, _ := q.Subscriber().Subscribe(ctx, "merge_events")
38+
for delivery := range deliveryCh {
39+
if err := process(delivery.Message()); err != nil {
40+
delivery.Nack(ctx, 0) // Retry
41+
continue
42+
}
43+
delivery.Ack(ctx)
44+
}
45+
```
46+
47+
## Configuration
48+
49+
```go
50+
config := queueSQL.DefaultConfig("consumer-group", "worker-id")
51+
config.PollInterval = 50 * time.Millisecond // Poll frequency
52+
config.BatchSize = 20 // Messages per poll
53+
config.VisibilityTimeout = 60 * time.Second // Retry delay
54+
config.Retry.MaxAttempts = 3 // Max retries before DLQ
55+
```
56+
57+
## How It Works
58+
59+
**Partition Leasing:**
60+
1. Workers discover partitions from messages table
61+
2. Workers acquire leases (one worker per partition)
62+
3. Stale leases can be stolen by other workers
63+
64+
**Message Flow:**
65+
1. Fetch visible messages (invisible_until <= now)
66+
2. Process message
67+
3. Ack: DELETE message, UPDATE offset
68+
4. Nack: Message becomes visible after timeout
69+
5. If retry_count >= MaxAttempts: Move to DLQ
70+
71+
**Crash Recovery:**
72+
- Messages become visible after visibility timeout
73+
- Other workers steal stale leases
74+
- Resume from last acked offset
75+
76+
## Partition Ordering
77+
78+
Messages with same `PartitionKey` are processed in order by a single worker.
79+
80+
## Distributed Processing
81+
82+
Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently.

extensions/queue/sql/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
)
77

88
// Config holds configuration for SQL-based queue.
9-
// DB connection, logger, and metrics are passed separately to NewFactory.
9+
// DB connection, logger, and metrics are passed separately to NewQueue.
1010
type Config struct {
1111
// ConsumerGroup identifies this consumer for offset tracking (required)
1212
ConsumerGroup string

extensions/queue/sql/sql.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package sql
2+
3+
import (
4+
"database/sql"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/uber-go/tally/v4"
9+
"go.uber.org/zap"
10+
11+
"github.com/uber/submitqueue/extensions/queue"
12+
)
13+
14+
type queueImpl struct {
15+
publisher queue.Publisher
16+
subscriber queue.Subscriber
17+
closed bool
18+
}
19+
20+
// Params holds dependencies for creating a SQL queue factory
21+
type Params struct {
22+
// DB is the database connection (required)
23+
DB *sql.DB
24+
25+
// Logger for debugging and observability (required)
26+
Logger *zap.Logger
27+
28+
// MetricsScope for metrics collection (required)
29+
MetricsScope tally.Scope
30+
31+
// Config holds queue configuration
32+
Config Config
33+
}
34+
35+
// NewQueue creates a new SQL-based queue factory
36+
func NewQueue(params Params) (queue.Queue, error) {
37+
if err := params.Config.Validate(); err != nil {
38+
return nil, fmt.Errorf("invalid config: %w", err)
39+
}
40+
41+
// Test connection
42+
if err := params.DB.Ping(); err != nil {
43+
return nil, fmt.Errorf("failed to ping database: %w", err)
44+
}
45+
46+
logger := params.Logger.Sugar().Named("queue.sql")
47+
logger.Infow("created SQL queue factory",
48+
"consumer_group", params.Config.ConsumerGroup,
49+
"worker_id", params.Config.WorkerID,
50+
"poll_interval", params.Config.PollInterval,
51+
"batch_size", params.Config.BatchSize,
52+
)
53+
54+
// Create stores
55+
messageStore := newMessageStore(params.DB, params.Config, params.Logger, params.MetricsScope)
56+
offsetStore := newOffsetStore(params.DB, params.Config, params.Logger, params.MetricsScope)
57+
leaseStore := newPartitionLeaseStore(params.DB, params.Config, params.Logger, params.MetricsScope)
58+
59+
queueMetrics := params.MetricsScope.SubScope("queue")
60+
61+
// Create publisher and subscriber
62+
publisher := NewPublisher(
63+
params.Config,
64+
logger.Named("publisher"),
65+
queueMetrics.SubScope("publisher"),
66+
messageStore,
67+
)
68+
69+
subscriber := NewSubscriber(
70+
params.Config,
71+
logger.Named("subscriber"),
72+
queueMetrics.SubScope("subscriber"),
73+
messageStore,
74+
offsetStore,
75+
leaseStore,
76+
)
77+
78+
return &queueImpl{
79+
publisher: publisher,
80+
subscriber: subscriber,
81+
closed: false,
82+
}, nil
83+
}
84+
85+
// Publisher returns a Publisher instance
86+
func (q *queueImpl) Publisher() queue.Publisher {
87+
return q.publisher
88+
}
89+
90+
// Subscriber returns a Subscriber instance
91+
func (q *queueImpl) Subscriber() queue.Subscriber {
92+
return q.subscriber
93+
}
94+
95+
// Close shuts down the factory and all associated resources
96+
func (q *queueImpl) Close() error {
97+
if q.closed {
98+
return nil
99+
}
100+
q.closed = true
101+
102+
// Close subscriber and publisher
103+
var errs []error
104+
105+
if err := q.subscriber.Close(); err != nil {
106+
errs = append(errs, fmt.Errorf("subscriber close failed: %w", err))
107+
}
108+
109+
if err := q.publisher.Close(); err != nil {
110+
errs = append(errs, fmt.Errorf("publisher close failed: %w", err))
111+
}
112+
113+
return errors.Join(errs...)
114+
}

0 commit comments

Comments
 (0)