Skip to content

Commit 30db103

Browse files
committed
refactor(consumer): lift framework to core/consumer with domain topickey packages
Move the shared queue consumer framework to top-level core/consumer so both SubmitQueue and Stovepipe can reuse it. Pipeline stage constants now live in per-domain submitqueue/core/topickey and stovepipe/core/topickey packages.
1 parent 58673fd commit 30db103

75 files changed

Lines changed: 506 additions & 348 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CLAUDE.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ Paths follow the directory layout: shared code is top-level, domain code nests u
147147
- Proto (generated): `github.com/uber/submitqueue/{domain}/{service}/protopb`
148148
- Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`)
149149
- Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`)
150-
- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/consumer`, `.../submitqueue/core/request`)
150+
- Cross-domain consumer framework: `github.com/uber/submitqueue/core/consumer`; domain pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey`
151+
- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/request`)
151152
- Shared entities: `github.com/uber/submitqueue/entity/{name}` (e.g. `.../entity/messagequeue`)
152153
- Shared extensions: `github.com/uber/submitqueue/extension/{name}` (e.g. `.../extension/messagequeue`)
153154
- Cross-domain infra: `github.com/uber/submitqueue/core/{pkg}` (e.g. `.../core/errs`, `.../core/metrics`)

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe
336336

337337
mocks: ## Generate mock files using mockgen
338338
@echo "Generating mocks..."
339-
@$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/...
339+
@$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./core/consumer/...
340340
@echo "Mocks generated successfully!"
341341

342342
proto: ## Generate protobuf files from .proto definitions
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ go_library(
77
"controller.go",
88
"registry.go",
99
],
10-
importpath = "github.com/uber/submitqueue/submitqueue/core/consumer",
10+
importpath = "github.com/uber/submitqueue/core/consumer",
1111
visibility = ["//visibility:public"],
1212
deps = [
1313
"//core/errs",
@@ -27,11 +27,12 @@ go_test(
2727
],
2828
deps = [
2929
":consumer",
30+
"//core/consumer/mock",
3031
"//core/errs",
3132
"//entity/messagequeue",
3233
"//extension/messagequeue",
3334
"//extension/messagequeue/mock",
34-
"//submitqueue/core/consumer/mock",
35+
"//submitqueue/core/topickey",
3536
"@com_github_stretchr_testify//assert",
3637
"@com_github_stretchr_testify//require",
3738
"@com_github_uber_go_tally//:tally",
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ The top-level orchestrator. Register controllers, start consuming, and stop grac
2626

2727
```go
2828
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
29-
{Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig},
29+
{Key: topickey.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig},
3030
})
3131

3232
c := consumer.New(logger, scope, registry,
@@ -71,21 +71,21 @@ The `TopicRegistry` maps topic keys to queue backends, topic names, and subscrip
7171
```go
7272
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
7373
{
74-
Key: consumer.TopicKeyStart,
74+
Key: topickey.TopicKeyStart,
7575
Name: "request",
7676
Queue: q,
7777
Subscription: extqueue.DefaultSubscriptionConfig("worker-1", "orchestrator"),
7878
},
7979
{
80-
Key: consumer.TopicKeyBuild,
80+
Key: topickey.TopicKeyBuild,
8181
Name: "build",
8282
Queue: q,
8383
// No Subscription — publish-only topic
8484
},
8585
})
8686
```
8787

88-
**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions.
88+
**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). Constants live in each domain's `core/topickey` package; this package defines only the `TopicKey` type and registry machinery. The actual queue topic name is configured separately, so library consumers can use their own naming conventions.
8989

9090
## Error Handling
9191

submitqueue/core/consumer/consumer_test.go renamed to core/consumer/consumer_test.go

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,17 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/uber/submitqueue/submitqueue/core/topickey"
27+
2628
"github.com/stretchr/testify/assert"
2729
"github.com/stretchr/testify/require"
2830
"github.com/uber-go/tally"
31+
"github.com/uber/submitqueue/core/consumer"
32+
consumermock "github.com/uber/submitqueue/core/consumer/mock"
2933
"github.com/uber/submitqueue/core/errs"
3034
entityqueue "github.com/uber/submitqueue/entity/messagequeue"
3135
extqueue "github.com/uber/submitqueue/extension/messagequeue"
3236
queuemock "github.com/uber/submitqueue/extension/messagequeue/mock"
33-
"github.com/uber/submitqueue/submitqueue/core/consumer"
34-
consumermock "github.com/uber/submitqueue/submitqueue/core/consumer/mock"
3537
"go.uber.org/mock/gomock"
3638
"go.uber.org/zap/zaptest"
3739
)
@@ -103,7 +105,7 @@ func TestConsumer_Register(t *testing.T) {
103105
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
104106

105107
handler1 := consumermock.NewMockController(ctrl)
106-
setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil)
108+
setupController(handler1, "handler1", topickey.TopicKeyStart, "group1", nil)
107109

108110
handler2 := consumermock.NewMockController(ctrl)
109111
setupController(handler2, "handler2", consumer.TopicKey("other-topic"), "group2", nil)
@@ -123,10 +125,10 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) {
123125
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
124126

125127
handler1 := consumermock.NewMockController(ctrl)
126-
setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil)
128+
setupController(handler1, "handler1", topickey.TopicKeyStart, "group1", nil)
127129

128130
handler2 := consumermock.NewMockController(ctrl)
129-
setupController(handler2, "handler2", consumer.TopicKeyStart, "group2", nil)
131+
setupController(handler2, "handler2", topickey.TopicKeyStart, "group2", nil)
130132

131133
err := c.Register(handler1)
132134
require.NoError(t, err)
@@ -146,7 +148,7 @@ func TestConsumer_Register_AfterStop(t *testing.T) {
146148
require.NoError(t, err)
147149

148150
handler := consumermock.NewMockController(ctrl)
149-
setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil)
151+
setupController(handler, "handler1", topickey.TopicKeyStart, "group1", nil)
150152

151153
err = c.Register(handler)
152154
assert.Error(t, err)
@@ -170,7 +172,7 @@ func TestConsumer_Start_AfterStop(t *testing.T) {
170172
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
171173

172174
handler := consumermock.NewMockController(ctrl)
173-
setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil)
175+
setupController(handler, "handler1", topickey.TopicKeyStart, "group1", nil)
174176

175177
err := c.Register(handler)
176178
require.NoError(t, err)
@@ -189,14 +191,14 @@ func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) {
189191
mockQ := queuemock.NewMockQueue(ctrl)
190192
// Registry has queue but no subscription config
191193
reg, err := consumer.NewTopicRegistry(
192-
[]consumer.TopicConfig{{Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ}},
194+
[]consumer.TopicConfig{{Key: topickey.TopicKeyStart, Name: "request", Queue: mockQ}},
193195
)
194196
require.NoError(t, err)
195197

196198
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
197199

198200
handler := consumermock.NewMockController(ctrl)
199-
setupController(handler, "handler", consumer.TopicKeyStart, "group", nil)
201+
setupController(handler, "handler", topickey.TopicKeyStart, "group", nil)
200202

201203
err = c.Register(handler)
202204
require.NoError(t, err)
@@ -217,12 +219,12 @@ func TestConsumer_Start_SubscribeFailure(t *testing.T) {
217219
mockQ := queuemock.NewMockQueue(ctrl)
218220
mockQ.EXPECT().Subscriber().Return(mockSub)
219221

220-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "group")
222+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "group")
221223

222224
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
223225

224226
handler := consumermock.NewMockController(ctrl)
225-
setupController(handler, "handler", consumer.TopicKeyStart, "group", nil)
227+
setupController(handler, "handler", topickey.TopicKeyStart, "group", nil)
226228

227229
err := c.Register(handler)
228230
require.NoError(t, err)
@@ -243,13 +245,13 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) {
243245
mockQ := queuemock.NewMockQueue(ctrl)
244246
mockQ.EXPECT().Subscriber().Return(mockSub)
245247

246-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
248+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
247249

248250
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
249251

250252
handledMsg := ""
251253
handler := consumermock.NewMockController(ctrl)
252-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
254+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
253255
func(ctx context.Context, delivery consumer.Delivery) error {
254256
handledMsg = delivery.Message().ID
255257
return nil
@@ -289,12 +291,12 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) {
289291
mockQ := queuemock.NewMockQueue(ctrl)
290292
mockQ.EXPECT().Subscriber().Return(mockSub)
291293

292-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
294+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
293295

294296
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
295297

296298
handler := consumermock.NewMockController(ctrl)
297-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
299+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
298300
func(ctx context.Context, delivery consumer.Delivery) error {
299301
return errs.NewRetryableError(fmt.Errorf("processing failed"))
300302
},
@@ -331,12 +333,12 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) {
331333
mockQ := queuemock.NewMockQueue(ctrl)
332334
mockQ.EXPECT().Subscriber().Return(mockSub)
333335

334-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
336+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
335337

336338
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
337339

338340
handler := consumermock.NewMockController(ctrl)
339-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
341+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
340342
func(ctx context.Context, delivery consumer.Delivery) error {
341343
return fmt.Errorf("bad payload")
342344
},
@@ -382,12 +384,12 @@ func TestConsumer_Stop(t *testing.T) {
382384
mockQ := queuemock.NewMockQueue(ctrl)
383385
mockQ.EXPECT().Subscriber().Return(mockSub)
384386

385-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
387+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
386388

387389
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
388390

389391
handler := consumermock.NewMockController(ctrl)
390-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", nil)
392+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group", nil)
391393

392394
err := c.Register(handler)
393395
require.NoError(t, err)
@@ -440,12 +442,12 @@ func TestConsumer_ObservabilityTags(t *testing.T) {
440442
mockQ := queuemock.NewMockQueue(ctrl)
441443
mockQ.EXPECT().Subscriber().Return(mockSub)
442444

443-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
445+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
444446

445447
testC := consumer.New(logger, testScope, reg, errs.NewClassifierProcessor())
446448

447449
handler := consumermock.NewMockController(ctrl)
448-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
450+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
449451
func(ctx context.Context, delivery consumer.Delivery) error {
450452
return tt.handlerError
451453
},
@@ -515,12 +517,12 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) {
515517
mockQ := queuemock.NewMockQueue(ctrl)
516518
mockQ.EXPECT().Subscriber().Return(mockSub)
517519

518-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
520+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
519521

520522
c := consumer.New(logger, scope, reg, errs.NewClassifierProcessor())
521523

522524
handler := consumermock.NewMockController(ctrl)
523-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
525+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
524526
func(ctx context.Context, delivery consumer.Delivery) error { return nil },
525527
)
526528

@@ -560,12 +562,12 @@ func TestConsumer_ErrorMetrics(t *testing.T) {
560562
mockQ := queuemock.NewMockQueue(ctrl)
561563
mockQ.EXPECT().Subscriber().Return(mockSub)
562564

563-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
565+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
564566

565567
c := consumer.New(logger, scope, reg, errs.NewClassifierProcessor())
566568

567569
handler := consumermock.NewMockController(ctrl)
568-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
570+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
569571
func(ctx context.Context, delivery consumer.Delivery) error {
570572
return errs.NewRetryableError(fmt.Errorf("processing failed"))
571573
},
@@ -616,7 +618,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) {
616618
mockQ := queuemock.NewMockQueue(ctrl)
617619
mockQ.EXPECT().Subscriber().Return(mockSub)
618620

619-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
621+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
620622

621623
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
622624

@@ -626,7 +628,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) {
626628
var partBProcessed atomic.Bool
627629

628630
handler := consumermock.NewMockController(ctrl)
629-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
631+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
630632
func(ctx context.Context, delivery consumer.Delivery) error {
631633
pk := delivery.Message().PartitionKey
632634
if pk == "partition-a" {
@@ -701,7 +703,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) {
701703
mockQ := queuemock.NewMockQueue(ctrl)
702704
mockQ.EXPECT().Subscriber().Return(mockSub)
703705

704-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
706+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
705707

706708
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
707709

@@ -712,7 +714,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) {
712714
allDone := make(chan struct{})
713715

714716
handler := consumermock.NewMockController(ctrl)
715-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
717+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
716718
func(ctx context.Context, delivery consumer.Delivery) error {
717719
mu.Lock()
718720
order = append(order, delivery.Message().ID)
@@ -770,14 +772,14 @@ func TestConsumer_PartitionWorkerCleanup(t *testing.T) {
770772
mockQ := queuemock.NewMockQueue(ctrl)
771773
mockQ.EXPECT().Subscriber().Return(mockSub)
772774

773-
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
775+
reg := newRegistry(t, mockQ, topickey.TopicKeyStart, "test-group")
774776

775777
c := consumer.New(logger, tally.NoopScope, reg, errs.NewClassifierProcessor())
776778

777779
processedCount := int64(0)
778780

779781
handler := consumermock.NewMockController(ctrl)
780-
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
782+
setupController(handler, "test-handler", topickey.TopicKeyStart, "test-group",
781783
func(ctx context.Context, delivery consumer.Delivery) error {
782784
atomic.AddInt64(&processedCount, 1)
783785
return nil

submitqueue/core/consumer/mock/BUILD.bazel renamed to core/consumer/mock/BUILD.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ load("@rules_go//go:def.bzl", "go_library")
33
go_library(
44
name = "mock",
55
srcs = ["controller_mock.go"],
6-
importpath = "github.com/uber/submitqueue/submitqueue/core/consumer/mock",
6+
importpath = "github.com/uber/submitqueue/core/consumer/mock",
77
visibility = ["//visibility:public"],
88
deps = [
9+
"//core/consumer",
910
"//entity/messagequeue",
10-
"//submitqueue/core/consumer",
1111
"@org_uber_go_mock//gomock",
1212
],
1313
)

submitqueue/core/consumer/mock/controller_mock.go renamed to core/consumer/mock/controller_mock.go

Lines changed: 3 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)