Skip to content

Commit fcccb0c

Browse files
committed
feat(queue/sql): add subscriber with partition leasing and offset tracking
Implements distributed SQL queue subscriber with partition-based load balancing, offset tracking, and retry/DLQ support. ## Core Features - **Partition Leasing**: Workers acquire exclusive 30s leases on partitions, auto-renewed every 10s. Failed workers release via expiry for automatic rebalancing. - **Offset Tracking**: Per-partition offsets with optimistic locking. Ack deletes message + updates offset atomically. Prevents duplicate processing on restarts. - **Retry/DLQ**: Persistent retry counter, visibility timeout for crash recovery, automatic DLQ move when MaxAttempts exceeded. - **Event-Driven Architecture**: One goroutine per topic with dual timers (poll: 100ms, lease renewal: 10s). Dynamic partition discovery and rebalancing. ## Implementation **Store Interfaces** (in `extensions/queue/sql/store/`): - `MessageStore`: FetchByOffset, MoveToDLQ, SetVisibilityTimeout - `OffsetStore`: Initialize, GetAckedOffset, UpdateAckedOffset, AckMessage - `PartitionLeaseStore`: DiscoverAndAcquirePartitions, RenewLease, ReleaseLease **Subscriber Design**: - `managePartitions()` event loop: poll partitions, renew leases, handle shutdown - Buffered delivery channel (BatchSize*2) for throughput - `sqlDelivery` implements Ack/Nack/ExtendVisibilityTimeout - Standardized metrics with partition-level tags (topic + partition_key) **Polling Flow**: 1. Discover and acquire new partitions 2. Fetch messages from owned partitions (beyond last acked offset) 3. Check retry limit → DLQ if exceeded 4. Send to delivery channel for consumer processing ## Breaking Changes - `SetVisibilityTimeout`: Changed from `time.Duration` to `int64` (milliseconds) - Created `store/` subpackage for interfaces (breaks circular dependency with mocks) - Added `MessageRow` struct for fetched message representation ## Known Limitations 1. Blocking channel send prevents lease renewal if consumer slow (can cause duplicates) 2. Worker lifetime tied to Subscribe() context (dies if context cancelled) 3. No backpressure metrics or channel depth monitoring 4. Sequential partition processing (one slow partition blocks others) Follow-up PRs will add timeout-based sends, channel metrics, and concurrent polling. ## Testing - Unit tests with gomock for all store interfaces - External test package (`sql_test`) avoids circular dependencies - Tests: subscription lifecycle, multiple topics, graceful shutdown, close idempotency ## Verification ```bash bazel test //extensions/queue/sql:sql_test # PASSED ```
1 parent 331f173 commit fcccb0c

8 files changed

Lines changed: 1023 additions & 12 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ go_library(
66
"config.go",
77
"publisher.go",
88
"stores.go",
9+
"subscriber.go",
910
"validation.go",
1011
],
1112
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
1213
visibility = ["//visibility:public"],
1314
deps = [
1415
"//entities/queue",
16+
"//extensions/queue",
17+
"//extensions/queue/sql/store",
1518
"@com_github_uber_go_tally_v4//:tally",
1619
"@org_uber_go_zap//:zap",
1720
],
@@ -22,8 +25,8 @@ go_test(
2225
srcs = [
2326
"config_test.go",
2427
"publisher_test.go",
28+
"subscriber_test.go",
2529
],
26-
data = ["//extensions/queue/sql/schema"],
2730
embed = [":sql"],
2831
deps = [
2932
"//entities/queue",

extensions/queue/sql/mocks/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//entities/queue",
10+
"//extensions/queue/sql/store",
1011
"@org_uber_go_mock//gomock",
1112
],
1213
)

extensions/queue/sql/mocks/mock_stores.go

Lines changed: 238 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
load("@rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "store",
5+
srcs = ["stores.go"],
6+
importpath = "github.com/uber/submitqueue/extensions/queue/sql/store",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//entities/queue",
10+
],
11+
)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package store
2+
3+
//go:generate mockgen -source=stores.go -destination=../mocks/mock_stores.go -package=mocks
4+
5+
import (
6+
"context"
7+
8+
"github.com/uber/submitqueue/entities/queue"
9+
)
10+
11+
// MessageRow represents a row from the messages table
12+
type MessageRow struct {
13+
Offset int64
14+
ID string
15+
Payload []byte
16+
Metadata map[string]string
17+
PartitionKey string
18+
RetryCount int
19+
PublishedAt int64
20+
}
21+
22+
// MessageStore handles message table operations
23+
type MessageStore interface {
24+
// Insert inserts messages into the topic table
25+
Insert(ctx context.Context, topic string, messages []queue.Message) error
26+
27+
// Delete deletes a message by ID
28+
Delete(ctx context.Context, topic string, messageID string) error
29+
30+
// FetchByOffset fetches messages with offset > currentOffset for a specific partition
31+
// Only fetches visible messages (invisible_until <= now)
32+
// Atomically sets invisible_until and increments retry_count
33+
FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) ([]MessageRow, error)
34+
35+
// MoveToDLQ moves a message to the dead letter queue
36+
MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error
37+
38+
// SetVisibilityTimeout sets the invisible_until timestamp for a message
39+
// visibilityTimeoutMillis: milliseconds from now to hide the message
40+
// If visibilityTimeoutMillis is 0, makes the message visible immediately
41+
// If visibilityTimeoutMillis > 0, makes the message invisible until now + visibilityTimeoutMillis
42+
SetVisibilityTimeout(ctx context.Context, topic string, messageID string, visibilityTimeoutMillis int64) error
43+
}
44+
45+
// OffsetStore handles offset table operations for per-partition offset tracking
46+
type OffsetStore interface {
47+
// Initialize creates an offset entry for a topic+partition if it doesn't exist
48+
Initialize(ctx context.Context, topic string, partitionKey string) error
49+
50+
// GetAckedOffset returns the current acked offset for a topic+partition
51+
GetAckedOffset(ctx context.Context, topic string, partitionKey string) (int64, error)
52+
53+
// UpdateAckedOffset updates the offset_acked for a topic+partition (only if new offset is greater)
54+
UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64) error
55+
56+
// AckMessage atomically deletes a message and updates the acked offset
57+
AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, messageStore MessageStore) error
58+
}
59+
60+
// PartitionLeaseStore handles partition lease operations
61+
type PartitionLeaseStore interface {
62+
// TryAcquireLease attempts to acquire or renew a lease for a partition
63+
// Returns true if lease is acquired/owned by this worker
64+
TryAcquireLease(ctx context.Context, topic string, partitionKey string) (bool, error)
65+
66+
// RenewLease renews the lease for a partition owned by this worker
67+
RenewLease(ctx context.Context, topic string, partitionKey string) error
68+
69+
// ReleaseLease releases the lease for a partition owned by this worker
70+
ReleaseLease(ctx context.Context, topic string, partitionKey string) error
71+
72+
// GetLeasedPartitions returns all partitions currently leased by this worker
73+
GetLeasedPartitions(ctx context.Context, topic string) ([]string, error)
74+
75+
// DiscoverAndAcquirePartitions discovers partitions from messages table and tries to acquire leases
76+
// Returns the number of new leases acquired
77+
DiscoverAndAcquirePartitions(ctx context.Context, topic string) (int, error)
78+
}

0 commit comments

Comments
 (0)