Skip to content

Commit 604a616

Browse files
authored
feat(queue/sql): add subscriber with partition leasing (#23)
## Summary ## Why? Need Subscriber implementation with partition leasing to enable distributed message processing from SQL queue. ## What? - Subscriber discovers partitions and acquires leases for distributed work - Partition leasing ensures each partition processed by exactly one worker - Automatic failover via stale lease stealing - Visibility timeout with automatic retry on worker crash - DLQ integration for messages exceeding retry limit - Thread-safe delivery acknowledgment - Moved Store in separate package to avoid circular dependency with mocks ## Test Plan bazel test
1 parent 7371c3c commit 604a616

8 files changed

Lines changed: 1054 additions & 35 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@ go_library(
44
name = "sql",
55
srcs = [
66
"config.go",
7+
"errors.go",
78
"mock_stores.go",
89
"publisher.go",
910
"stores.go",
11+
"subscriber.go",
1012
"validation.go",
1113
],
1214
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
1315
visibility = ["//visibility:public"],
1416
deps = [
1517
"//entities/queue",
18+
"//extensions/queue",
1619
"@com_github_uber_go_tally_v4//:tally",
1720
"@org_uber_go_mock//gomock",
1821
"@org_uber_go_zap//:zap",
@@ -24,6 +27,7 @@ go_test(
2427
srcs = [
2528
"config_test.go",
2629
"publisher_test.go",
30+
"subscriber_test.go",
2731
],
2832
embed = [":sql"],
2933
deps = [

extensions/queue/sql/errors.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package sql
2+
3+
import "fmt"
4+
5+
// ErrAlreadyAcknowledged is returned when attempting to ack/nack a delivery that was already processed
6+
type ErrAlreadyAcknowledged struct {
7+
DeliveryID string
8+
}
9+
10+
func (e *ErrAlreadyAcknowledged) Error() string {
11+
return fmt.Sprintf("delivery %s already acknowledged or nacked", e.DeliveryID)
12+
}

extensions/queue/sql/mock_stores.go

Lines changed: 250 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/queue/sql/publisher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ type publisher struct {
1515
config Config
1616
logger *zap.SugaredLogger
1717
metrics tally.Scope
18-
messageStore MessageStore
18+
messageStore messageStore
1919
mu sync.RWMutex
2020
closed bool
2121
}
2222

2323
// NewPublisher creates a publisher with the given configuration and dependencies
24-
func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore MessageStore) *publisher {
24+
func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore messageStore) *publisher {
2525
return &publisher{
2626
config: config,
2727
logger: logger,

0 commit comments

Comments
 (0)