Skip to content

Commit aca4bfe

Browse files
committed
feat(queue/sql): add subscriber with partition leasing
To implement the SQL-based queue subscriber with partition leasing for distributed consumption and offset tracking for reliable message processing. - Add MessageStore methods for subscriber: Delete, FetchByOffset, MoveToDLQ, SetVisibilityTimeout - Add OffsetStore interface for offset tracking and message acknowledgment - Add PartitionLeaseStore interface for distributed partition leasing - Implement Subscriber with partition leasing, polling, and retry/DLQ logic - Add gomock-based tests for Subscriber - Regenerate gomock mocks for all 3 store interfaces - Change SetVisibilityTimeout to use int64 (milliseconds) instead of time.Duration - Add MessageRow struct for representing fetched message rows - Use external test package pattern (sql_test) to avoid circular dependencies - Unit tests pass: `bazel test //extensions/queue/sql:sql_test` - All tests use gomock exclusively, no manual mocks - Publisher uses defer for RUnlock (cleaner and safer)
1 parent 642429b commit aca4bfe

8 files changed

Lines changed: 1002 additions & 8 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ go_library(
66
"config.go",
77
"publisher.go",
88
"stores.go",
9+
"subscriber.go",
910
"validation.go",
1011
],
1112
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
1213
visibility = ["//visibility:public"],
1314
deps = [
1415
"//entities/queue",
16+
"//extensions/queue",
1517
"@com_github_uber_go_tally_v4//:tally",
1618
"@org_uber_go_zap//:zap",
1719
],
@@ -21,11 +23,36 @@ go_test(
2123
name = "sql_test",
2224
srcs = [
2325
"config_test.go",
24-
"publisher_test.go",
2526
],
26-
data = ["//extensions/queue/sql/schema"],
2727
embed = [":sql"],
2828
deps = [
29+
"@com_github_stretchr_testify//assert",
30+
"@com_github_stretchr_testify//require",
31+
],
32+
)
33+
34+
go_test(
35+
name = "sql_publisher_test",
36+
srcs = ["publisher_test.go"],
37+
deps = [
38+
":sql",
39+
"//entities/queue",
40+
"//extensions/queue",
41+
"//extensions/queue/sql/mocks",
42+
"@com_github_stretchr_testify//assert",
43+
"@com_github_stretchr_testify//require",
44+
"@com_github_uber_go_tally_v4//:tally",
45+
"@org_uber_go_mock//gomock",
46+
"@org_uber_go_zap//zaptest",
47+
],
48+
)
49+
50+
go_test(
51+
name = "sql_subscriber_test",
52+
srcs = ["subscriber_test.go"],
53+
data = ["//extensions/queue/sql/schema"],
54+
deps = [
55+
":sql",
2956
"//entities/queue",
3057
"//extensions/queue",
3158
"//extensions/queue/sql/mocks",

extensions/queue/sql/mocks/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//entities/queue",
10+
"//extensions/queue/sql",
1011
"@org_uber_go_mock//gomock",
1112
],
1213
)

extensions/queue/sql/mocks/mock_stores.go

Lines changed: 238 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/queue/sql/publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type publisher struct {
2020
closed bool
2121
}
2222

23-
// NewPublisher creates a publisher with the given configuration and dependencies
23+
// NewPublisher creates a publisher for testing with a custom message store
2424
func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore MessageStore) *publisher {
2525
return &publisher{
2626
config: config,

extensions/queue/sql/publisher_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1-
package sql
1+
package sql_test
22

33
import (
44
"context"
55
"fmt"
66
"testing"
77

8+
"github.com/stretchr/testify/assert"
89
"github.com/stretchr/testify/require"
910
"github.com/uber-go/tally/v4"
1011
"go.uber.org/mock/gomock"
1112
"go.uber.org/zap/zaptest"
1213

1314
"github.com/uber/submitqueue/entities/queue"
1415
extqueue "github.com/uber/submitqueue/extensions/queue"
16+
"github.com/uber/submitqueue/extensions/queue/sql"
1517
"github.com/uber/submitqueue/extensions/queue/sql/mocks"
1618
)
1719

@@ -20,9 +22,10 @@ const fixedTimestamp = int64(1234567890000) // Fixed timestamp for test repeatab
2022
func setupPublisherTest(t *testing.T, mockStore *mocks.MockMessageStore) extqueue.Publisher {
2123
t.Helper()
2224

23-
config := DefaultConfig("test-consumer", "test-worker")
25+
config := sql.DefaultConfig("test-consumer", "test-worker")
2426

25-
return NewPublisher(config,
27+
return sql.NewPublisher(
28+
config,
2629
zaptest.NewLogger(t).Sugar().Named("publisher"),
2730
tally.NoopScope.SubScope("publisher"),
2831
mockStore,

0 commit comments

Comments
 (0)