Skip to content

Commit ff9f23e

Browse files
behinddwallsclaude
andcommitted
feat(queue/sql): add factory and documentation
Add Factory implementation (sql.go) for SQL queue that provides Publisher and Subscriber instances with proper lifecycle management. Renamed NewFactory to NewQueue for clarity. Add comprehensive unit tests (sql_test.go) for factory covering: - Success cases with all params and nil metrics scope - Error cases: nil DB, nil Logger, invalid config, ping failure - Publisher and Subscriber singleton behavior - Close lifecycle and idempotency (multiple errors combined) - Full integration test Tests use table-driven approach for consistency. Add concise README documenting SQL queue implementation, usage patterns, and configuration options. Removed verbose metric details and examples. Fixed critical issues: - Close() now releases lock before calling subscriber/publisher Close() to prevent deadlocks - Close() combines all errors instead of only returning first error Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4c8da50 commit ff9f23e

5 files changed

Lines changed: 601 additions & 1 deletion

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
"offset_store.go",
1212
"partition_lease_store.go",
1313
"publisher.go",
14+
"sql.go",
1415
"stores.go",
1516
"subscriber.go",
1617
"validation.go",
@@ -34,6 +35,7 @@ go_test(
3435
"offset_store_test.go",
3536
"partition_lease_store_test.go",
3637
"publisher_test.go",
38+
"sql_test.go",
3739
"subscriber_test.go",
3840
],
3941
embed = [":sql"],

extensions/queue/sql/README.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# SQL Queue Implementation
2+
3+
MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery.
4+
5+
## Key Features
6+
7+
- **Partition leasing** - Workers coordinate via database leases with automatic failover
8+
- **Visibility timeout** - Messages retry automatically if worker crashes
9+
- **At-least-once delivery** - Offset tracking for crash recovery
10+
- **Dead letter queue** - Failed messages moved to DLQ after max retries
11+
12+
## Quick Start
13+
14+
```go
15+
import (
16+
"database/sql"
17+
_ "github.com/go-sql-driver/mysql"
18+
queueSQL "github.com/uber/submitqueue/extensions/queue/sql"
19+
"github.com/uber/submitqueue/entities/queue"
20+
)
21+
22+
// Setup
23+
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db")
24+
q, _ := queueSQL.NewQueue(queueSQL.Params{
25+
DB: db,
26+
Logger: logger,
27+
Config: queueSQL.DefaultConfig("orchestrator", "worker-1"),
28+
})
29+
defer q.Close()
30+
31+
// Publish
32+
msg := queue.NewMessage("msg-id", []byte(`{"data": "value"}`))
33+
msg.PartitionKey = "repo-123" // Required for ordering
34+
q.Publisher().Publish(ctx, "merge_events", msg)
35+
36+
// Subscribe
37+
deliveryCh, _ := q.Subscriber().Subscribe(ctx, "merge_events")
38+
for delivery := range deliveryCh {
39+
if err := process(delivery.Message()); err != nil {
40+
delivery.Nack(ctx, 0) // Retry
41+
continue
42+
}
43+
delivery.Ack(ctx)
44+
}
45+
```
46+
47+
## Configuration
48+
49+
```go
50+
config := queueSQL.DefaultConfig("consumer-group", "worker-id")
51+
config.PollInterval = 50 * time.Millisecond // Poll frequency
52+
config.BatchSize = 20 // Messages per poll
53+
config.VisibilityTimeout = 60 * time.Second // Retry delay
54+
config.Retry.MaxAttempts = 3 // Max retries before DLQ
55+
```
56+
57+
## How It Works
58+
59+
**Partition Leasing:**
60+
1. Workers discover partitions from messages table
61+
2. Workers acquire leases (one worker per partition)
62+
3. Stale leases can be stolen by other workers
63+
64+
**Message Flow:**
65+
1. Fetch visible messages (invisible_until <= now)
66+
2. Process message
67+
3. Ack: DELETE message, UPDATE offset
68+
4. Nack: Message becomes visible after timeout
69+
5. If retry_count >= MaxAttempts: Move to DLQ
70+
71+
**Crash Recovery:**
72+
- Messages become visible after visibility timeout
73+
- Other workers steal stale leases
74+
- Resume from last acked offset
75+
76+
## Partition Ordering
77+
78+
Messages with same `PartitionKey` are processed in order by a single worker.
79+
80+
## Distributed Processing
81+
82+
Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently.

extensions/queue/sql/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
)
77

88
// Config holds configuration for SQL-based queue.
9-
// DB connection, logger, and metrics are passed separately to NewFactory.
9+
// DB connection, logger, and metrics are passed separately to NewQueue.
1010
type Config struct {
1111
// ConsumerGroup identifies this consumer for offset tracking (required)
1212
ConsumerGroup string

extensions/queue/sql/sql.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package sql
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/uber-go/tally/v4"
9+
"go.uber.org/zap"
10+
11+
"github.com/uber/submitqueue/extensions/queue"
12+
)
13+
14+
type factory struct {
15+
db *sql.DB
16+
config Config
17+
logger *zap.SugaredLogger
18+
metrics tally.Scope
19+
messageStore messageStore
20+
offsetStore offsetStore
21+
leaseStore partitionLeaseStore
22+
publisher queue.Publisher
23+
subscriber queue.Subscriber
24+
mu sync.RWMutex
25+
closed bool
26+
}
27+
28+
// Params holds dependencies for creating a SQL queue factory
29+
type Params struct {
30+
// DB is the database connection (required)
31+
DB *sql.DB
32+
33+
// Logger for debugging and observability (required)
34+
Logger *zap.Logger
35+
36+
// MetricsScope for metrics collection (optional, uses noop if nil)
37+
MetricsScope tally.Scope
38+
39+
// Config holds queue configuration
40+
Config Config
41+
}
42+
43+
// NewQueue creates a new SQL-based queue factory
44+
func NewQueue(params Params) (queue.Queue, error) {
45+
if params.DB == nil {
46+
return nil, fmt.Errorf("DB is required")
47+
}
48+
if params.Logger == nil {
49+
return nil, fmt.Errorf("Logger is required")
50+
}
51+
52+
if err := params.Config.Validate(); err != nil {
53+
return nil, fmt.Errorf("invalid config: %w", err)
54+
}
55+
56+
// Use noop scope if not provided
57+
metricsScope := params.MetricsScope
58+
if metricsScope == nil {
59+
metricsScope = tally.NoopScope
60+
}
61+
62+
// Test connection
63+
if err := params.DB.Ping(); err != nil {
64+
return nil, fmt.Errorf("failed to ping database: %w", err)
65+
}
66+
67+
logger := params.Logger.Sugar().Named("queue.sql")
68+
logger.Infow("created SQL queue factory",
69+
"consumer_group", params.Config.ConsumerGroup,
70+
"worker_id", params.Config.WorkerID,
71+
"poll_interval", params.Config.PollInterval,
72+
"batch_size", params.Config.BatchSize,
73+
)
74+
75+
// Create stores
76+
messageStore := newMessageStore(params.DB, params.Config, params.Logger, metricsScope)
77+
offsetStore := newOffsetStore(params.DB, params.Config, params.Logger, metricsScope)
78+
leaseStore := newPartitionLeaseStore(params.DB, params.Config, params.Logger, metricsScope)
79+
80+
f := &factory{
81+
db: params.DB,
82+
config: params.Config,
83+
logger: logger,
84+
metrics: metricsScope.SubScope("queue"),
85+
messageStore: messageStore,
86+
offsetStore: offsetStore,
87+
leaseStore: leaseStore,
88+
}
89+
90+
return f, nil
91+
}
92+
93+
// Publisher returns a Publisher instance (singleton)
94+
func (f *factory) Publisher() queue.Publisher {
95+
f.mu.Lock()
96+
defer f.mu.Unlock()
97+
98+
if f.publisher == nil {
99+
f.publisher = &publisher{
100+
config: f.config,
101+
logger: f.logger.Named("publisher"),
102+
metrics: f.metrics.SubScope("publisher"),
103+
messageStore: f.messageStore,
104+
}
105+
}
106+
107+
return f.publisher
108+
}
109+
110+
// Subscriber returns a Subscriber instance (singleton)
111+
func (f *factory) Subscriber() queue.Subscriber {
112+
f.mu.Lock()
113+
defer f.mu.Unlock()
114+
115+
if f.subscriber == nil {
116+
f.subscriber = NewSubscriber(
117+
f.config,
118+
f.logger.Named("subscriber"),
119+
f.metrics.SubScope("subscriber"),
120+
f.messageStore,
121+
f.offsetStore,
122+
f.leaseStore,
123+
)
124+
}
125+
126+
return f.subscriber
127+
}
128+
129+
// Close shuts down the factory and all associated resources
130+
func (f *factory) Close() error {
131+
// Mark as closed and grab references under lock
132+
f.mu.Lock()
133+
if f.closed {
134+
f.mu.Unlock()
135+
return nil
136+
}
137+
f.closed = true
138+
sub := f.subscriber
139+
pub := f.publisher
140+
f.mu.Unlock()
141+
142+
// Close subscriber and publisher outside the lock to avoid deadlocks
143+
var errs []error
144+
145+
if sub != nil {
146+
if err := sub.Close(); err != nil {
147+
errs = append(errs, fmt.Errorf("subscriber close failed: %w", err))
148+
}
149+
}
150+
151+
if pub != nil {
152+
if err := pub.Close(); err != nil {
153+
errs = append(errs, fmt.Errorf("publisher close failed: %w", err))
154+
}
155+
}
156+
157+
// Return all errors if any occurred
158+
if len(errs) == 0 {
159+
return nil
160+
}
161+
162+
// Combine all errors into a single descriptive message
163+
errMsg := "multiple close errors: "
164+
for i, err := range errs {
165+
if i > 0 {
166+
errMsg += "; "
167+
}
168+
errMsg += err.Error()
169+
}
170+
return fmt.Errorf(errMsg)
171+
}

0 commit comments

Comments
 (0)