Skip to content

Commit 8c32db0

Browse files
committed
feat(queue/sql): add subscriber with partition leasing
## Why? Implement the Subscriber interface with distributed partition leasing to enable reliable message consumption across multiple workers. ## What? - Subscriber with automatic partition discovery and lease acquisition - Per-partition polling with ack/nack support - DLQ handling for messages exceeding retry limits - Lease renewal and graceful shutdown - Fast test execution (~4s) with comprehensive coverage ## Test Plan - Message delivery and ordering verified - Ack/nack operations work correctly - DLQ handling tested (max retries exceeded) - Multi-partition and multi-consumer scenarios validated - Lease management (acquire, renew, release) tested
1 parent 6b9a01f commit 8c32db0

3 files changed

Lines changed: 1070 additions & 0 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ go_library(
99
"offset_store.go",
1010
"partition_lease_store.go",
1111
"publisher.go",
12+
"subscriber.go",
1213
"test_helpers.go",
1314
],
1415
importpath = "github.com/uber/submitqueue/extensions/queue/sql",
1516
visibility = ["//visibility:public"],
1617
deps = [
1718
"//entities/queue",
19+
"//extensions/queue",
1820
"@com_github_dolthub_go_mysql_server//:go-mysql-server",
1921
"@com_github_dolthub_go_mysql_server//memory",
2022
"@com_github_dolthub_go_mysql_server//server",
@@ -36,11 +38,13 @@ go_test(
3638
"offset_store_test.go",
3739
"partition_lease_store_test.go",
3840
"publisher_test.go",
41+
"subscriber_test.go",
3942
],
4043
data = ["//schema/queue/mysql:schema.sql"],
4144
embed = [":sql"],
4245
deps = [
4346
"//entities/queue",
47+
"//extensions/queue",
4448
"@com_github_go_sql_driver_mysql//:mysql",
4549
"@com_github_stretchr_testify//assert",
4650
"@com_github_stretchr_testify//require",

0 commit comments

Comments
 (0)