Skip to content

Commit fc19779

Browse files
committed
feat(queue/sql): add MySQL schema and configuration
## Why? Need database schema and configuration for SQL-based queue implementation to support distributed message processing. ## What? - MySQL schema with 4 tables: messages, offsets, partition leases, and DLQ - Configuration struct with validation for consumer groups, timeouts, and retry policies - Bazel build integration ## Test Plan - Config validation tests pass - Default config values are correct - Invalid configs are rejected appropriately
1 parent 0b4f26f commit fc19779

8 files changed

Lines changed: 426 additions & 0 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "sql",
5+
srcs = ["config.go"],
6+
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
7+
visibility = ["//visibility:public"],
8+
)
9+
10+
go_test(
11+
name = "sql_test",
12+
srcs = ["config_test.go"],
13+
embed = [":sql"],
14+
deps = [
15+
"@com_github_stretchr_testify//assert",
16+
"@com_github_stretchr_testify//require",
17+
],
18+
)

extensions/queue/sql/config.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package sql
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
const (
9+
// Fixed table names for single-table design
10+
MessagesTableName = "queue_messages"
11+
PartitionLeasesTableName = "queue_partition_leases"
12+
OffsetsTableName = "queue_offsets"
13+
DLQTableName = "queue_dlq"
14+
)
15+
16+
// Config holds configuration for SQL-based queue.
17+
// DB connection, logger, and metrics are passed separately to NewFactory.
18+
type Config struct {
19+
// ConsumerGroup identifies this consumer for offset tracking (required)
20+
ConsumerGroup string
21+
22+
// WorkerID uniquely identifies this worker instance (required for partition leases)
23+
// Example: hostname, pod name, UUID, etc.
24+
WorkerID string
25+
26+
// PollInterval is how often to poll for new messages
27+
PollInterval time.Duration
28+
29+
// BatchSize is the number of messages to fetch per poll
30+
BatchSize int
31+
32+
// VisibilityTimeout is how long a message is invisible after being fetched
33+
// If worker crashes or gets stuck, message becomes visible again after this duration
34+
VisibilityTimeout time.Duration
35+
36+
// LeaseRenewalInterval is how often to renew partition leases
37+
LeaseRenewalInterval time.Duration
38+
39+
// LeaseDuration is how long a lease is valid without renewal
40+
// Stale leases (not renewed within this duration) can be stolen by other workers
41+
LeaseDuration time.Duration
42+
43+
// Retry configuration for message retry
44+
Retry RetryConfig
45+
46+
// DLQ configuration
47+
DLQ DLQConfig
48+
}
49+
50+
// RetryConfig configures message retry behavior
51+
type RetryConfig struct {
52+
// MaxAttempts is the maximum number of processing attempts
53+
// After this many retries, message is moved to DLQ (if enabled)
54+
// This includes both visibility timeout retries and explicit Nack retries
55+
MaxAttempts int
56+
57+
// InitialBackoff is the initial backoff duration for explicit Nack retries
58+
InitialBackoff time.Duration
59+
60+
// MaxBackoff is the maximum backoff duration
61+
MaxBackoff time.Duration
62+
63+
// BackoffMultiplier is the multiplier for exponential backoff
64+
BackoffMultiplier float64
65+
}
66+
67+
// DLQConfig configures dead letter queue
68+
type DLQConfig struct {
69+
// Enabled enables dead letter queue
70+
Enabled bool
71+
}
72+
73+
// DefaultConfig returns a Config with sensible defaults
74+
func DefaultConfig(consumerGroup, workerID string) Config {
75+
return Config{
76+
ConsumerGroup: consumerGroup,
77+
WorkerID: workerID,
78+
PollInterval: 100 * time.Millisecond,
79+
BatchSize: 10,
80+
VisibilityTimeout: 60 * time.Second,
81+
LeaseRenewalInterval: 10 * time.Second,
82+
LeaseDuration: 30 * time.Second,
83+
Retry: RetryConfig{
84+
MaxAttempts: 3,
85+
InitialBackoff: 1 * time.Second,
86+
MaxBackoff: 30 * time.Second,
87+
BackoffMultiplier: 2.0,
88+
},
89+
DLQ: DLQConfig{
90+
Enabled: true,
91+
},
92+
}
93+
}
94+
95+
// Validate checks if the configuration is valid
96+
func (c *Config) Validate() error {
97+
if c.ConsumerGroup == "" {
98+
return fmt.Errorf("ConsumerGroup is required")
99+
}
100+
if c.WorkerID == "" {
101+
return fmt.Errorf("WorkerID is required")
102+
}
103+
if c.PollInterval <= 0 {
104+
return fmt.Errorf("PollInterval must be positive")
105+
}
106+
if c.BatchSize <= 0 {
107+
return fmt.Errorf("BatchSize must be positive")
108+
}
109+
if c.VisibilityTimeout <= 0 {
110+
return fmt.Errorf("VisibilityTimeout must be positive")
111+
}
112+
if c.LeaseRenewalInterval <= 0 {
113+
return fmt.Errorf("LeaseRenewalInterval must be positive")
114+
}
115+
if c.LeaseDuration <= 0 {
116+
return fmt.Errorf("LeaseDuration must be positive")
117+
}
118+
if c.LeaseRenewalInterval >= c.LeaseDuration {
119+
return fmt.Errorf("LeaseRenewalInterval must be less than LeaseDuration")
120+
}
121+
if c.Retry.MaxAttempts < 1 {
122+
return fmt.Errorf("Retry.MaxAttempts must be at least 1")
123+
}
124+
if c.Retry.InitialBackoff <= 0 {
125+
return fmt.Errorf("Retry.InitialBackoff must be positive")
126+
}
127+
if c.Retry.MaxBackoff <= 0 {
128+
return fmt.Errorf("Retry.MaxBackoff must be positive")
129+
}
130+
if c.Retry.BackoffMultiplier < 1.0 {
131+
return fmt.Errorf("Retry.BackoffMultiplier must be >= 1.0")
132+
}
133+
return nil
134+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package sql
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestDefaultConfig(t *testing.T) {
12+
cfg := DefaultConfig("test-consumer", "test-worker")
13+
14+
assert.Equal(t, "test-consumer", cfg.ConsumerGroup)
15+
assert.Equal(t, "test-worker", cfg.WorkerID)
16+
assert.Equal(t, 100*time.Millisecond, cfg.PollInterval)
17+
assert.Equal(t, 10, cfg.BatchSize)
18+
assert.Equal(t, 60*time.Second, cfg.VisibilityTimeout)
19+
assert.Equal(t, 10*time.Second, cfg.LeaseRenewalInterval)
20+
assert.Equal(t, 30*time.Second, cfg.LeaseDuration)
21+
assert.True(t, cfg.DLQ.Enabled)
22+
assert.Equal(t, 3, cfg.Retry.MaxAttempts)
23+
assert.Equal(t, 1*time.Second, cfg.Retry.InitialBackoff)
24+
assert.Equal(t, 30*time.Second, cfg.Retry.MaxBackoff)
25+
assert.Equal(t, 2.0, cfg.Retry.BackoffMultiplier)
26+
}
27+
28+
func TestConfigValidation(t *testing.T) {
29+
tests := []struct {
30+
name string
31+
config Config
32+
expectError bool
33+
errorMsg string
34+
}{
35+
{
36+
name: "valid config",
37+
config: DefaultConfig("test-consumer", "test-worker"),
38+
expectError: false,
39+
},
40+
{
41+
name: "empty consumer group",
42+
config: Config{
43+
ConsumerGroup: "",
44+
WorkerID: "test-worker",
45+
PollInterval: 100 * time.Millisecond,
46+
BatchSize: 10,
47+
VisibilityTimeout: 60 * time.Second,
48+
LeaseRenewalInterval: 10 * time.Second,
49+
LeaseDuration: 30 * time.Second,
50+
Retry: DefaultConfig("dummy", "dummy").Retry,
51+
},
52+
expectError: true,
53+
errorMsg: "ConsumerGroup is required",
54+
},
55+
{
56+
name: "empty worker ID",
57+
config: Config{
58+
ConsumerGroup: "test",
59+
WorkerID: "",
60+
PollInterval: 100 * time.Millisecond,
61+
BatchSize: 10,
62+
VisibilityTimeout: 60 * time.Second,
63+
LeaseRenewalInterval: 10 * time.Second,
64+
LeaseDuration: 30 * time.Second,
65+
Retry: DefaultConfig("dummy", "dummy").Retry,
66+
},
67+
expectError: true,
68+
errorMsg: "WorkerID is required",
69+
},
70+
{
71+
name: "invalid poll interval",
72+
config: Config{
73+
ConsumerGroup: "test",
74+
WorkerID: "test-worker",
75+
PollInterval: 0,
76+
BatchSize: 10,
77+
VisibilityTimeout: 60 * time.Second,
78+
LeaseRenewalInterval: 10 * time.Second,
79+
LeaseDuration: 30 * time.Second,
80+
Retry: DefaultConfig("dummy", "dummy").Retry,
81+
},
82+
expectError: true,
83+
errorMsg: "PollInterval must be positive",
84+
},
85+
{
86+
name: "invalid batch size",
87+
config: Config{
88+
ConsumerGroup: "test",
89+
WorkerID: "test-worker",
90+
PollInterval: 100 * time.Millisecond,
91+
BatchSize: 0,
92+
VisibilityTimeout: 60 * time.Second,
93+
LeaseRenewalInterval: 10 * time.Second,
94+
LeaseDuration: 30 * time.Second,
95+
Retry: DefaultConfig("dummy", "dummy").Retry,
96+
},
97+
expectError: true,
98+
errorMsg: "BatchSize must be positive",
99+
},
100+
}
101+
102+
for _, tt := range tests {
103+
t.Run(tt.name, func(t *testing.T) {
104+
err := tt.config.Validate()
105+
if tt.expectError {
106+
require.Error(t, err)
107+
assert.Contains(t, err.Error(), tt.errorMsg)
108+
} else {
109+
require.NoError(t, err)
110+
}
111+
})
112+
}
113+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
filegroup(
2+
name = "schema",
3+
srcs = [
4+
"queue_dlq.sql",
5+
"queue_messages.sql",
6+
"queue_offsets.sql",
7+
"queue_partition_leases.sql",
8+
],
9+
visibility = ["//visibility:public"],
10+
)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
-- DEAD LETTER QUEUE TABLE
2+
-- Failed messages that exhausted retry attempts.
3+
4+
CREATE TABLE IF NOT EXISTS queue_dlq (
5+
-- Auto-incrementing global offset for ordering/acking in DLQ
6+
offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
7+
8+
-- Original topic and partition
9+
topic VARCHAR(255) NOT NULL,
10+
partition_key VARCHAR(255) NOT NULL,
11+
12+
-- Message identification (for deduplication)
13+
id VARCHAR(255) NOT NULL,
14+
15+
-- Message data
16+
payload BLOB NOT NULL,
17+
metadata JSON,
18+
19+
-- Original timestamps (epoch milliseconds)
20+
created_at BIGINT UNSIGNED NOT NULL,
21+
published_at BIGINT UNSIGNED NOT NULL,
22+
23+
-- DLQ-specific fields
24+
failed_at BIGINT UNSIGNED NOT NULL,
25+
failure_count INT UNSIGNED NOT NULL,
26+
last_error TEXT,
27+
28+
-- Supports: SELECT ... WHERE topic=? AND partition_key=? AND failed_at>=? ORDER BY failed_at
29+
-- Used for fetching recently failed messages for a specific topic/partition, e.g., for retrying or monitoring
30+
INDEX idx_topic_partition_failed (topic, partition_key, failed_at),
31+
32+
-- Supports: SELECT ... WHERE topic=? AND failed_at>=? ORDER BY failed_at
33+
-- Used for fetching recently failed messages across all partitions of a topic
34+
INDEX idx_failed_at (topic, failed_at),
35+
36+
-- Unique constraint to prevent duplicate entries for the same message in the DLQ
37+
-- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent DLQ operations
38+
-- Also enables efficient lookups for retrying or inspecting specific failed messages
39+
UNIQUE KEY idx_topic_partition_id (topic, partition_key, id)
40+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
-- MESSAGES TABLE
2+
-- Single table for all topics. Partition key determines distribution across workers.
3+
-- Example: topic="merge_queue", partition_key="uber/cadence"
4+
5+
CREATE TABLE IF NOT EXISTS queue_messages (
6+
-- Auto-incrementing global offset for ordering
7+
offset BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
8+
9+
-- Topic identifies the queue type
10+
topic VARCHAR(255) NOT NULL,
11+
12+
-- Partition key for distributing work across workers
13+
-- Example: repo ID, user ID, tenant ID
14+
partition_key VARCHAR(255) NOT NULL,
15+
16+
-- Message identification
17+
id VARCHAR(255) NOT NULL,
18+
19+
-- Message data
20+
payload BLOB NOT NULL,
21+
metadata JSON,
22+
23+
-- Retry tracking (persistent across workers)
24+
retry_count INT UNSIGNED NOT NULL,
25+
26+
-- Visibility timeout (epoch milliseconds)
27+
-- Messages invisible until this timestamp expires
28+
invisible_until BIGINT UNSIGNED NOT NULL,
29+
30+
-- Timestamps (epoch milliseconds)
31+
created_at BIGINT UNSIGNED NOT NULL,
32+
published_at BIGINT UNSIGNED NOT NULL,
33+
34+
-- Supports: SELECT ... WHERE topic=? AND partition_key=? AND invisible_until<=? ORDER BY offset
35+
-- Used by subscribers to poll for ready-to-process messages within their assigned partition
36+
INDEX idx_topic_partition_visible_offset (topic, partition_key, invisible_until, offset),
37+
38+
-- Supports: INSERT ... ON DUPLICATE KEY to enforce idempotent publishes
39+
-- Also enables efficient lookups for message updates/deletes by ID
40+
UNIQUE KEY idx_topic_partition_id (topic, partition_key, id)
41+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
-- CONSUMER OFFSETS TABLE
2+
-- Tracks consumption progress per consumer group + topic + partition.
3+
-- Each partition has independent offset tracking for crash recovery.
4+
5+
CREATE TABLE IF NOT EXISTS queue_offsets (
6+
-- Consumer group consuming the topic
7+
consumer_group VARCHAR(255) NOT NULL,
8+
9+
-- Topic being consumed
10+
topic VARCHAR(255) NOT NULL,
11+
12+
-- Partition being consumed
13+
partition_key VARCHAR(255) NOT NULL,
14+
15+
-- Last offset that was successfully acked for this partition
16+
offset_acked BIGINT UNSIGNED NOT NULL,
17+
18+
-- Last update timestamp (epoch milliseconds)
19+
updated_at BIGINT UNSIGNED NOT NULL,
20+
21+
-- Primary key ensures each consumer group has one offset per topic/partition
22+
-- Supports: INSERT ... ON DUPLICATE KEY UPDATE for idempotent offset updates
23+
-- Also enables efficient lookups: SELECT ... WHERE consumer_group=? AND topic=? AND partition_key=?
24+
PRIMARY KEY (consumer_group, topic, partition_key),
25+
26+
-- Supports: SELECT ... WHERE consumer_group=?
27+
-- Used for querying all offsets for a specific consumer group (e.g., for monitoring or rebalancing)
28+
INDEX idx_consumer_group (consumer_group),
29+
30+
-- Supports: SELECT ... WHERE topic=?
31+
-- Used for querying all consumer groups consuming a specific topic
32+
INDEX idx_topic (topic)
33+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

0 commit comments

Comments
 (0)