Skip to content

Commit 9ea80b3

Browse files
committed
feat(queue/sql): add subscriber with partition leasing and offset tracking
- Implement Subscriber interface with partition-based message polling - Add partition lease management for distributed workers - Add offset tracking per partition for consumption progress - Extend MessageStore interface with FetchByOffset, SetVisibilityTimeout, MoveToDLQ - Add OffsetStore interface for offset management - Add PartitionLeaseStore interface for partition leasing - Generate mocks for all three store interfaces - Add standardized metric tags (topic + partition_key) across all operations - Add comprehensive test coverage for subscription, ack/nack, and error handling
1 parent d7597af commit 9ea80b3

5 files changed

Lines changed: 991 additions & 0 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ go_library(
77
"mock_stores.go",
88
"publisher.go",
99
"stores.go",
10+
"subscriber.go",
1011
"validation.go",
1112
],
1213
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
1314
visibility = ["//visibility:public"],
1415
deps = [
1516
"//entities/queue",
17+
"//extensions/queue",
1618
"@com_github_uber_go_tally_v4//:tally",
1719
"@org_uber_go_mock//gomock",
1820
"@org_uber_go_zap//:zap",
@@ -24,6 +26,7 @@ go_test(
2426
srcs = [
2527
"config_test.go",
2628
"publisher_test.go",
29+
"subscriber_test.go",
2730
],
2831
embed = [":sql"],
2932
deps = [

extensions/queue/sql/mock_stores.go

Lines changed: 235 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/queue/sql/stores.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,73 @@ import (
88
"github.com/uber/submitqueue/entities/queue"
99
)
1010

11+
// StoreConfig contains configuration needed by store implementations
12+
13+
// MessageRow represents a row from the messages table
14+
type MessageRow struct {
15+
Offset int64
16+
ID string
17+
Payload []byte
18+
Metadata map[string]string
19+
PartitionKey string
20+
RetryCount int
21+
PublishedAt int64
22+
}
23+
1124
// MessageStore handles message table operations
1225
type MessageStore interface {
1326
// Insert inserts messages into the topic table
1427
Insert(ctx context.Context, topic string, messages []queue.Message) error
28+
29+
// Delete deletes a message by ID
30+
Delete(ctx context.Context, topic string, messageID string) error
31+
32+
// FetchByOffset fetches messages with offset > currentOffset for a specific partition
33+
// Only fetches visible messages (invisible_until <= now)
34+
// Atomically sets invisible_until and increments retry_count
35+
FetchByOffset(ctx context.Context, topic string, partitionKey string, currentOffset int64, limit int) ([]MessageRow, error)
36+
37+
// MoveToDLQ moves a message to the dead letter queue
38+
MoveToDLQ(ctx context.Context, topic string, messageID string, failureCount int, lastError string) error
39+
40+
// SetVisibilityTimeout sets the invisible_until timestamp for a message
41+
// visibilityTimeoutMillis: milliseconds from now to hide the message
42+
// If visibilityTimeoutMillis is 0, makes the message visible immediately
43+
// If visibilityTimeoutMillis > 0, makes the message invisible until now + visibilityTimeoutMillis
44+
SetVisibilityTimeout(ctx context.Context, topic string, messageID string, visibilityTimeoutMillis int64) error
45+
}
46+
47+
// OffsetStore handles offset table operations for per-partition offset tracking
48+
type OffsetStore interface {
49+
// Initialize creates an offset entry for a topic+partition if it doesn't exist
50+
Initialize(ctx context.Context, topic string, partitionKey string) error
51+
52+
// GetAckedOffset returns the current acked offset for a topic+partition
53+
GetAckedOffset(ctx context.Context, topic string, partitionKey string) (int64, error)
54+
55+
// UpdateAckedOffset updates the offset_acked for a topic+partition (only if new offset is greater)
56+
UpdateAckedOffset(ctx context.Context, topic string, partitionKey string, offset int64) error
57+
58+
// AckMessage atomically deletes a message and updates the acked offset
59+
AckMessage(ctx context.Context, topic string, partitionKey string, messageID string, offset int64, messageStore MessageStore) error
60+
}
61+
62+
// PartitionLeaseStore handles partition lease operations
63+
type PartitionLeaseStore interface {
64+
// TryAcquireLease attempts to acquire or renew a lease for a partition
65+
// Returns true if lease is acquired/owned by this worker
66+
TryAcquireLease(ctx context.Context, topic string, partitionKey string) (bool, error)
67+
68+
// RenewLease renews the lease for a partition owned by this worker
69+
RenewLease(ctx context.Context, topic string, partitionKey string) error
70+
71+
// ReleaseLease releases the lease for a partition owned by this worker
72+
ReleaseLease(ctx context.Context, topic string, partitionKey string) error
73+
74+
// GetLeasedPartitions returns all partitions currently leased by this worker
75+
GetLeasedPartitions(ctx context.Context, topic string) ([]string, error)
76+
77+
// DiscoverAndAcquirePartitions discovers partitions from messages table and tries to acquire leases
78+
// Returns the number of new leases acquired
79+
DiscoverAndAcquirePartitions(ctx context.Context, topic string) (int, error)
1580
}

0 commit comments

Comments
 (0)