Skip to content

Commit e5f2167

Browse files
committed
move URI claim back to start
start writes the change rows on entry; validate stays read-only and uses the change store + request store liveness check to detect duplicates. Trade-off: validation-failing requests leave behind change rows whose owner request will become terminal, which the controller-side liveness check filters out at duplicate-detection time. Reverses the previous move-claim-to-validate commit. The earlier-claim story trades simpler validate (read-only) for orphan claim rows (filtered, not blocking).
1 parent 4738828 commit e5f2167

6 files changed

Lines changed: 122 additions & 75 deletions

File tree

example/server/orchestrator/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
406406
logger,
407407
scope,
408408
store,
409+
changeStore,
409410
registry,
410411
consumer.TopicKeyStart,
411412
"orchestrator-start",

orchestrator/controller/start/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
"//core/request",
1111
"//entity",
1212
"//entity/queue",
13+
"//extension/changestore",
1314
"//extension/storage",
1415
"@com_github_uber_go_tally_v4//:tally",
1516
"@org_uber_go_zap//:zap",
@@ -25,6 +26,7 @@ go_test(
2526
"//core/errs",
2627
"//entity",
2728
"//entity/queue",
29+
"//extension/changestore/mock",
2830
"//extension/queue/mock",
2931
"//extension/storage",
3032
"//extension/storage/mock",

orchestrator/controller/start/start.go

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,29 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"time"
2122

2223
"github.com/uber-go/tally/v4"
2324
"github.com/uber/submitqueue/core/consumer"
2425
corerequest "github.com/uber/submitqueue/core/request"
2526
"github.com/uber/submitqueue/entity"
2627
entityqueue "github.com/uber/submitqueue/entity/queue"
28+
"github.com/uber/submitqueue/extension/changestore"
2729
"github.com/uber/submitqueue/extension/storage"
2830
"go.uber.org/zap"
2931
)
3032

3133
// Controller handles start queue messages.
32-
// It consumes requests, persists them to storage, and publishes to the validate stage.
33-
// Duplicate detection and URI claiming happen downstream in the validate controller.
34-
// Implements consumer.Controller.
34+
// It consumes requests, persists them to the request store, claims their URIs in
35+
// the change store, and publishes to the validate stage. Both writes are idempotent
36+
// on retries; the duplicate-detection check itself is performed downstream by the
37+
// validate controller, which reads the change store and consults the request store
38+
// for liveness. Implements consumer.Controller.
3539
type Controller struct {
3640
logger *zap.SugaredLogger
3741
metricsScope tally.Scope
3842
store storage.Storage
43+
changeStore changestore.ChangeStore
3944
registry consumer.TopicRegistry
4045
topicKey consumer.TopicKey
4146
consumerGroup string
@@ -49,6 +54,7 @@ func NewController(
4954
logger *zap.SugaredLogger,
5055
scope tally.Scope,
5156
store storage.Storage,
57+
changeStore changestore.ChangeStore,
5258
registry consumer.TopicRegistry,
5359
topicKey consumer.TopicKey,
5460
consumerGroup string,
@@ -57,29 +63,28 @@ func NewController(
5763
logger: logger.Named("start_controller"),
5864
metricsScope: scope.SubScope("start_controller"),
5965
store: store,
66+
changeStore: changeStore,
6067
registry: registry,
6168
topicKey: topicKey,
6269
consumerGroup: consumerGroup,
6370
}
6471
}
6572

6673
// Process processes a request delivery from the queue.
67-
// Deserializes the request, persists it, and publishes to the validate topic.
74+
// Persists the request, claims its URIs in the change store, and publishes to validate.
6875
// Returns nil to ack (success), or error to nack (retry).
6976
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
7077
c.metricsScope.Counter("received").Inc(1)
7178

7279
msg := delivery.Message()
7380

74-
// Deserialize land request from gateway
7581
landRequest, err := entity.LandRequestFromBytes(msg.Payload)
7682
if err != nil {
7783
c.metricsScope.Counter("deserialize_errors").Inc(1)
7884
// Non-retryable: malformed messages will never succeed regardless of retry count
7985
return fmt.Errorf("failed to deserialize land request: %w", err)
8086
}
8187

82-
// Construct the full versioned Request entity with orchestrator-owned fields
8388
request := entity.Request{
8489
ID: landRequest.ID,
8590
Queue: landRequest.Queue,
@@ -103,22 +108,27 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
103108

104109
// Persist request to storage. ErrAlreadyExists means a queue redelivery of the same
105110
// request_id (an at-least-once retry of THIS message), not a cross-request collision.
106-
// Cross-request URI overlap is detected and claims are written downstream in validate.
107111
if err := c.store.GetRequestStore().Create(ctx, request); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
108112
c.metricsScope.Counter("storage_errors").Inc(1)
109113
return fmt.Errorf("failed to create request: %w", err)
110114
}
111115

112-
// Record the "new" status in the request log
116+
// Claim each URI in the change store. Different requests on the same URI write
117+
// distinct rows (different request_id), so cross-request URI overlap does not
118+
// collide on insert; the validate controller surfaces it via GetByURI + a
119+
// liveness check against the request store.
120+
if err := c.claimURIs(ctx, request); err != nil {
121+
c.metricsScope.Counter("change_store_errors").Inc(1)
122+
return fmt.Errorf("failed to claim URIs for request %s: %w", request.ID, err)
123+
}
124+
125+
// Record the "new" status in the request log.
113126
logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusStarted, request.Version, "", nil)
114-
// Using request.ID as the partition key to ensure ordering of log entries for the same request
115-
// and parallel processing of log entries for different requests.
116127
if err := corerequest.PublishLog(ctx, c.registry, logEntry, request.ID); err != nil {
117128
c.metricsScope.Counter("request_log_errors").Inc(1)
118129
return fmt.Errorf("failed to publish request log: %w", err)
119130
}
120131

121-
// Publish to validate topic
122132
if err := c.publish(ctx, consumer.TopicKeyValidate, request.ID, request.Queue); err != nil {
123133
c.metricsScope.Counter("publish_errors").Inc(1)
124134
return fmt.Errorf("failed to publish to validate: %w", err)
@@ -130,8 +140,28 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
130140
)
131141

132142
c.metricsScope.Counter("processed").Inc(1)
143+
return nil
144+
}
133145

134-
return nil // Success - message will be acked
146+
// claimURIs persists one ChangeRecord per URI in the request. Each Create call is
147+
// independent; the change store's per-PK idempotency makes the loop safe under
148+
// queue redelivery (same (Queue, URI, RequestID) is a no-op on retry).
149+
func (c *Controller) claimURIs(ctx context.Context, request entity.Request) error {
150+
now := time.Now().UnixMilli()
151+
for _, uri := range request.Change.URIs {
152+
record := entity.ChangeRecord{
153+
URI: uri,
154+
RequestID: request.ID,
155+
Queue: request.Queue,
156+
CreatedAt: now,
157+
UpdatedAt: now,
158+
Version: 1,
159+
}
160+
if err := c.changeStore.Create(ctx, record); err != nil {
161+
return fmt.Errorf("failed to claim uri=%s for request %s: %w", uri, request.ID, err)
162+
}
163+
}
164+
return nil
135165
}
136166

137167
// publish publishes a request ID to the specified topic key.

orchestrator/controller/start/start_test.go

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/uber/submitqueue/core/errs"
2727
"github.com/uber/submitqueue/entity"
2828
"github.com/uber/submitqueue/entity/queue"
29+
changemock "github.com/uber/submitqueue/extension/changestore/mock"
2930
queuemock "github.com/uber/submitqueue/extension/queue/mock"
3031
"github.com/uber/submitqueue/extension/storage"
3132
storagemock "github.com/uber/submitqueue/extension/storage/mock"
@@ -34,7 +35,13 @@ import (
3435
)
3536

3637
// newTestController creates a controller with test dependencies.
37-
func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller {
38+
func newTestController(
39+
t *testing.T,
40+
ctrl *gomock.Controller,
41+
store *storagemock.MockStorage,
42+
cs *changemock.MockChangeStore,
43+
publishErr error,
44+
) *Controller {
3845
logger := zaptest.NewLogger(t).Sugar()
3946
scope := tally.NoopScope
4047

@@ -56,7 +63,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock
5663
)
5764
require.NoError(t, err)
5865

59-
return NewController(logger, scope, store, registry, consumer.TopicKeyStart, "orchestrator-start")
66+
return NewController(logger, scope, store, cs, registry, consumer.TopicKeyStart, "orchestrator-start")
6067
}
6168

6269
// newMockStorage creates a MockStorage with a MockRequestStore that succeeds on Create.
@@ -69,6 +76,13 @@ func newMockStorage(ctrl *gomock.Controller) *storagemock.MockStorage {
6976
return store
7077
}
7178

79+
// newMockChangeStore returns a MockChangeStore that accepts any Create call.
80+
func newMockChangeStore(ctrl *gomock.Controller) *changemock.MockChangeStore {
81+
cs := changemock.NewMockChangeStore(ctrl)
82+
cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
83+
return cs
84+
}
85+
7286
// makeDelivery builds a MockDelivery wrapping a serialized LandRequest.
7387
func makeDelivery(t *testing.T, ctrl *gomock.Controller, lr entity.LandRequest) *queuemock.MockDelivery {
7488
payload, err := lr.ToBytes()
@@ -83,7 +97,7 @@ func makeDelivery(t *testing.T, ctrl *gomock.Controller, lr entity.LandRequest)
8397

8498
func TestNewController(t *testing.T) {
8599
ctrl := gomock.NewController(t)
86-
controller := newTestController(t, ctrl, newMockStorage(ctrl), nil)
100+
controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil)
87101

88102
require.NotNil(t, controller)
89103
assert.Equal(t, consumer.TopicKeyStart, controller.TopicKey())
@@ -93,7 +107,7 @@ func TestNewController(t *testing.T) {
93107

94108
func TestController_Process_Success(t *testing.T) {
95109
ctrl := gomock.NewController(t)
96-
controller := newTestController(t, ctrl, newMockStorage(ctrl), nil)
110+
controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil)
97111

98112
delivery := makeDelivery(t, ctrl, entity.LandRequest{
99113
ID: "test-queue/123",
@@ -107,7 +121,7 @@ func TestController_Process_Success(t *testing.T) {
107121

108122
func TestController_Process_InvalidJSON(t *testing.T) {
109123
ctrl := gomock.NewController(t)
110-
controller := newTestController(t, ctrl, newMockStorage(ctrl), nil)
124+
controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil)
111125

112126
invalidPayload := []byte(`{"invalid": json"}`)
113127
msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil)
@@ -134,7 +148,7 @@ func TestController_Process_ConstructsRequestWithStateAndVersion(t *testing.T) {
134148
store := storagemock.NewMockStorage(ctrl)
135149
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()
136150

137-
controller := newTestController(t, ctrl, store, nil)
151+
controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil)
138152

139153
landRequest := entity.LandRequest{
140154
ID: "test-queue/42",
@@ -166,7 +180,7 @@ func TestController_Process_AllStrategies(t *testing.T) {
166180
for _, tt := range tests {
167181
t.Run(tt.name, func(t *testing.T) {
168182
ctrl := gomock.NewController(t)
169-
controller := newTestController(t, ctrl, newMockStorage(ctrl), nil)
183+
controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil)
170184

171185
delivery := makeDelivery(t, ctrl, entity.LandRequest{
172186
ID: fmt.Sprintf("queue/%s", tt.strategy),
@@ -182,27 +196,46 @@ func TestController_Process_AllStrategies(t *testing.T) {
182196

183197
func TestController_Process_MultipleChanges(t *testing.T) {
184198
ctrl := gomock.NewController(t)
185-
controller := newTestController(t, ctrl, newMockStorage(ctrl), nil)
186199

187-
delivery := makeDelivery(t, ctrl, entity.LandRequest{
188-
ID: "queue/999",
189-
Queue: "test-queue",
190-
Change: entity.Change{
191-
URIs: []string{
192-
"github://uber/monorepo/pull/1/aaa111",
193-
"github://uber/monorepo/pull/2/bbb222",
194-
"github://uber/monorepo/pull/3/ccc333",
195-
},
200+
cs := changemock.NewMockChangeStore(ctrl)
201+
var captured []entity.ChangeRecord
202+
cs.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn(
203+
func(ctx context.Context, record entity.ChangeRecord) error {
204+
captured = append(captured, record)
205+
return nil
196206
},
207+
).Times(3)
208+
209+
controller := newTestController(t, ctrl, newMockStorage(ctrl), cs, nil)
210+
211+
uris := []string{
212+
"github://uber/monorepo/pull/1/aaa111",
213+
"github://uber/monorepo/pull/2/bbb222",
214+
"github://uber/monorepo/pull/3/ccc333",
215+
}
216+
delivery := makeDelivery(t, ctrl, entity.LandRequest{
217+
ID: "queue/999",
218+
Queue: "test-queue",
219+
Change: entity.Change{URIs: uris},
197220
LandStrategy: entity.RequestLandStrategySquashRebase,
198221
})
199222

200223
require.NoError(t, controller.Process(context.Background(), delivery))
224+
225+
require.Len(t, captured, len(uris))
226+
for i, r := range captured {
227+
assert.Equal(t, uris[i], r.URI)
228+
assert.Equal(t, "queue/999", r.RequestID)
229+
assert.Equal(t, "test-queue", r.Queue)
230+
assert.Equal(t, int32(1), r.Version)
231+
assert.Positive(t, r.CreatedAt)
232+
assert.Equal(t, r.CreatedAt, r.UpdatedAt)
233+
}
201234
}
202235

203236
func TestController_Process_PublishFailure(t *testing.T) {
204237
ctrl := gomock.NewController(t)
205-
controller := newTestController(t, ctrl, newMockStorage(ctrl), fmt.Errorf("publish failed"))
238+
controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), fmt.Errorf("publish failed"))
206239

207240
delivery := makeDelivery(t, ctrl, entity.LandRequest{
208241
ID: "test-queue/123",
@@ -222,7 +255,7 @@ func TestController_Process_StorageFailure(t *testing.T) {
222255
store := storagemock.NewMockStorage(ctrl)
223256
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()
224257

225-
controller := newTestController(t, ctrl, store, nil)
258+
controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil)
226259

227260
delivery := makeDelivery(t, ctrl, entity.LandRequest{
228261
ID: "test-queue/123",
@@ -244,7 +277,7 @@ func TestController_Process_AlreadyExistsSucceeds(t *testing.T) {
244277
store := storagemock.NewMockStorage(ctrl)
245278
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()
246279

247-
controller := newTestController(t, ctrl, store, nil)
280+
controller := newTestController(t, ctrl, store, newMockChangeStore(ctrl), nil)
248281

249282
delivery := makeDelivery(t, ctrl, entity.LandRequest{
250283
ID: "test-queue/123",
@@ -256,9 +289,27 @@ func TestController_Process_AlreadyExistsSucceeds(t *testing.T) {
256289
require.NoError(t, controller.Process(context.Background(), delivery))
257290
}
258291

292+
func TestController_Process_ChangeStoreFailure(t *testing.T) {
293+
ctrl := gomock.NewController(t)
294+
295+
cs := changemock.NewMockChangeStore(ctrl)
296+
cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("change store down"))
297+
298+
controller := newTestController(t, ctrl, newMockStorage(ctrl), cs, nil)
299+
300+
delivery := makeDelivery(t, ctrl, entity.LandRequest{
301+
ID: "test-queue/123",
302+
Queue: "test-queue",
303+
Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}},
304+
LandStrategy: entity.RequestLandStrategyRebase,
305+
})
306+
307+
require.Error(t, controller.Process(context.Background(), delivery))
308+
}
309+
259310
func TestController_InterfaceImplementation(t *testing.T) {
260311
ctrl := gomock.NewController(t)
261-
controller := newTestController(t, ctrl, newMockStorage(ctrl), nil)
312+
controller := newTestController(t, ctrl, newMockStorage(ctrl), newMockChangeStore(ctrl), nil)
262313

263314
var _ consumer.Controller = controller
264315
}

0 commit comments

Comments
 (0)