Skip to content

Commit cd3ef63

Browse files
feat(merge): make merge asynchronous via runway
Rework the merge stage from a synchronous in-process pusher call into a runway round-trip, mirroring the merge-conflict check. The merge controller now builds a full runway MergeRequest from the batch's member requests (one MergeStep per request, in Contains order) and publishes it to the runway-owned merger queue, keyed by the batch id as the correlation id. A new mergesignal controller consumes the MergeResult off merger-signal, transitions the batch to Succeeded/Failed, and fans out to conclude and speculate; a mergesignal DLQ reconciler fails the batch on an unprocessable result. The in-process pusher extension is retired from the orchestrator wiring (left in-tree but unused, like mergechecker); removal is a follow-up. workflow.md and extension-contract.md updated to reflect both the check and the merge crossing into runway over the shared MergeRequest/ MergeResult contract. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent bc59a27 commit cd3ef63

13 files changed

Lines changed: 987 additions & 531 deletions

File tree

doc/rfc/submitqueue/extension-contract.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ Two facts this grounds: `conflict` already resolves nothing (the baseline), and
3737
| `scorer.Scorer` | score | flat `Change`, per request | `entity.Batch` — resolve + reduce internally | one batch score (`float64`) — unchanged | request store + change provider |
3838
| `changeprovider.ChangeProvider` | validate | `Change` | `entity.Request` | per-URI change info (`[]ChangeInfo`, `URI`-tagged) — unchanged | none — it *is* the resolver |
3939
| `buildrunner.BuildRunner` | build | base/head `[]Change` | base `[]entity.Batch` + head `entity.Batch` | build id, then status/cancel (`BuildID`, `BuildStatus`) — unchanged | request store + change provider |
40-
| `pusher.Pusher` | merge | `[]Change` | ordered `[]entity.Batch` | **per-batch** outcomes (`Result` grouped by `BatchID`) — **changed** | request store + change provider |
40+
| `pusher.Pusher` *(removed)* | merge | | **moved out-of-process to runway** (`merger` / `merger-signal`); see the note below the table || |
4141
| `storage`, `changestore`, `queueconfig` || keys + entities | unchanged — resolution targets | entities ||
4242

43-
**Outputs are unchanged except `pusher`.** This RFC moves the *input* toward identity; four of the five return contracts — conflicts, score, change info, build id/status — are exactly what they are today. `pusher` is the lone exception: because its input becomes a *list* of independently-landed batches, its result regroups per batch (`BatchID`-tagged, per-change commit detail kept underneath) so each batch's outcome stays correlatable — the "output mirrors the input unit" principle above. No other output shape changes.
43+
**Outputs are unchanged.** This RFC moves the *input* toward identity; the four live return contracts — conflicts, score, change info, build id/status — are exactly what they are today. (The `pusher` row is not an in-process extension: merge runs out-of-process in runway, so its output is not part of this catalog — see the note below.) No other output shape changes.
4444

45-
The validate-time mergeability check runs **asynchronously and out-of-process** in runway rather than as an in-process extension: `validate` hands off to the `mergeconflict` controller, which publishes a full check request to the runway-owned `merge-conflict-checker` queue, and `mergeconflictsignal` consumes runway's result (see [workflow.md](workflow.md)). The in-process `mergechecker` package is unused on the validate path.
45+
The validate-time mergeability **check** and the **merge** itself both run **asynchronously and out-of-process** in runway rather than as in-process extensions, over the one shared `MergeRequest`/`MergeResult` contract — a check is a dry run of a merge. `validate` hands off to the `mergeconflict` controller (→ runway `merge-conflict-checker`, result back via `mergeconflictsignal`); `merge` hands the batch to runway (→ `merger`, result back via `mergesignal`) rather than calling an in-process `pusher`. See [workflow.md](workflow.md). The in-process `mergechecker` and `pusher` packages are unused on the pipeline path.
4646

4747
Non-obvious points:
4848

doc/rfc/submitqueue/workflow.md

Lines changed: 83 additions & 78 deletions
Large diffs are not rendered by default.

example/submitqueue/orchestrator/server/BUILD.bazel

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ go_library(
3434
"//submitqueue/extension/conflict/fake",
3535
"//submitqueue/extension/conflict/fileoverlap",
3636
"//submitqueue/extension/conflict/none",
37-
"//submitqueue/extension/pusher",
38-
"//submitqueue/extension/pusher/fake",
39-
"//submitqueue/extension/pusher/git",
4037
"//submitqueue/extension/scorer",
4138
"//submitqueue/extension/scorer/composite",
4239
"//submitqueue/extension/scorer/fake",
@@ -53,6 +50,7 @@ go_library(
5350
"//submitqueue/orchestrator/controller/merge",
5451
"//submitqueue/orchestrator/controller/mergeconflict",
5552
"//submitqueue/orchestrator/controller/mergeconflictsignal",
53+
"//submitqueue/orchestrator/controller/mergesignal",
5654
"//submitqueue/orchestrator/controller/score",
5755
"//submitqueue/orchestrator/controller/speculate",
5856
"//submitqueue/orchestrator/controller/start",
@@ -68,7 +66,7 @@ go_library(
6866
)
6967

7068
go_binary(
71-
name = "orchestrator",
69+
name = "server",
7270
embed = [":orchestrator_lib"],
7371
visibility = ["//visibility:public"],
7472
)

example/submitqueue/orchestrator/server/main.go

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ import (
5353
conflictfake "github.com/uber/submitqueue/submitqueue/extension/conflict/fake"
5454
"github.com/uber/submitqueue/submitqueue/extension/conflict/fileoverlap"
5555
"github.com/uber/submitqueue/submitqueue/extension/conflict/none"
56-
"github.com/uber/submitqueue/submitqueue/extension/pusher"
57-
pushfake "github.com/uber/submitqueue/submitqueue/extension/pusher/fake"
58-
gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git"
5956
"github.com/uber/submitqueue/submitqueue/extension/scorer"
6057
"github.com/uber/submitqueue/submitqueue/extension/scorer/composite"
6158
scorerfake "github.com/uber/submitqueue/submitqueue/extension/scorer/fake"
@@ -72,6 +69,7 @@ import (
7269
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge"
7370
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflict"
7471
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergeconflictsignal"
72+
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/mergesignal"
7573
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/score"
7674
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate"
7775
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/start"
@@ -240,13 +238,12 @@ func run() error {
240238

241239
// Per-extension factories all resolve against the registry by queue name.
242240
cpf := changeProviderFactory{queues}
243-
pshf := pusherFactory{queues}
244241
brf := buildRunnerFactory{queues}
245242
scf := scorerFactory{queues}
246243
cof := analyzerFactory{queues}
247244

248245
// Register controllers
249-
primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, pshf, brf, scf, cof, cnt, store)
246+
primaryCount, err := registerPrimaryControllers(primaryConsumer, logger.Sugar(), scope, registry, cpf, brf, scf, cof, cnt, store)
250247
if err != nil {
251248
return err
252249
}
@@ -384,6 +381,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
384381
{topickey.TopicKeyBuild, "build", "orchestrator-build"},
385382
{topickey.TopicKeyBuildSignal, "buildsignal", "orchestrator-buildsignal"},
386383
{topickey.TopicKeyMerge, "merge", "orchestrator-merge"},
384+
{runwaytopickey.TopicKeyMergeSignal, "merger-signal", "orchestrator-mergesignal"},
387385
{topickey.TopicKeyConclude, "conclude", "orchestrator-conclude"},
388386
}
389387

@@ -443,6 +441,17 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
443441
Queue: q,
444442
})
445443

444+
// Publish-only: the orchestrator hands merge requests to runway via the
445+
// runway-owned merger queue. Runway is the sole consumer, so the
446+
// orchestrator registers no consuming subscription (and no DLQ) here; the
447+
// inbound result arrives on the separate merger-signal queue, which is a
448+
// consumed primary topic above.
449+
configs = append(configs, consumer.TopicConfig{
450+
Key: runwaytopickey.TopicKeyMerge,
451+
Name: "merger",
452+
Queue: q,
453+
})
454+
446455
return consumer.NewTopicRegistry(configs)
447456
}
448457

@@ -475,11 +484,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
475484
//
476485
// queueExtensions is the full set of extension implementations for a single
477486
// queue. Grouping them per queue (rather than per extension) lets the wiring
478-
// read as "for this queue, here are its scorer, analyzer, pusher, …", and lets
487+
// read as "for this queue, here are its scorer, analyzer, change provider, …", and lets
479488
// a queue profile start from a baseline and override only what differs.
480489
type queueExtensions struct {
481490
changeProvider changeprovider.ChangeProvider
482-
pusher pusher.Pusher
483491
buildRunner buildrunner.BuildRunner
484492
scorer scorer.Scorer
485493
analyzer conflict.Analyzer
@@ -510,12 +518,6 @@ func (f changeProviderFactory) For(cfg changeprovider.Config) (changeprovider.Ch
510518
return f.reg.get(cfg.QueueName).changeProvider, nil
511519
}
512520

513-
type pusherFactory struct{ reg queueRegistry }
514-
515-
func (f pusherFactory) For(cfg pusher.Config) (pusher.Pusher, error) {
516-
return f.reg.get(cfg.QueueName).pusher, nil
517-
}
518-
519521
type buildRunnerFactory struct{ reg queueRegistry }
520522

521523
func (f buildRunnerFactory) For(cfg buildrunner.Config) (buildrunner.BuildRunner, error) {
@@ -534,7 +536,7 @@ func (f analyzerFactory) For(cfg conflict.Config) (conflict.Analyzer, error) {
534536
return f.reg.get(cfg.QueueName).analyzer, nil
535537
}
536538

537-
func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, pshf pusher.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) {
539+
func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cpf changeprovider.Factory, brf buildrunner.Factory, scf scorer.Factory, cof conflict.Factory, cnt counter.Counter, store storage.Storage) (int, error) {
538540
var count int
539541
requestController := start.NewController(
540542
logger,
@@ -678,7 +680,7 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger,
678680
scope,
679681
store,
680682
registry,
681-
pshf,
683+
runwaytopickey.TopicKeyMerge,
682684
topickey.TopicKeyMerge,
683685
"orchestrator-merge",
684686
)
@@ -687,6 +689,19 @@ func registerPrimaryControllers(c consumer.Consumer, logger *zap.SugaredLogger,
687689
}
688690
count++
689691

692+
mergesignalController := mergesignal.NewController(
693+
logger,
694+
scope,
695+
store,
696+
registry,
697+
runwaytopickey.TopicKeyMergeSignal,
698+
"orchestrator-mergesignal",
699+
)
700+
if err := c.Register(mergesignalController); err != nil {
701+
return count, fmt.Errorf("failed to register mergesignal controller: %w", err)
702+
}
703+
count++
704+
690705
concludeController := conclude.NewController(
691706
logger,
692707
scope,
@@ -724,6 +739,7 @@ func registerDLQControllers(c consumer.Consumer, logger *zap.SugaredLogger, scop
724739
{"build_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuild), "orchestrator-build-dlq")},
725740
{"buildsignal_dlq", dlq.NewDLQBuildSignalController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyBuildSignal), "orchestrator-buildsignal-dlq")},
726741
{"merge_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyMerge), "orchestrator-merge-dlq")},
742+
{"mergesignal_dlq", dlq.NewDLQMergeSignalController(logger, dlqScope, store, dlq.TopicKey(runwaytopickey.TopicKeyMergeSignal), "orchestrator-mergesignal-dlq")},
727743
{"conclude_dlq", dlq.NewDLQBatchController(logger, dlqScope, store, dlq.TopicKey(topickey.TopicKeyConclude), "orchestrator-conclude-dlq")},
728744
}
729745
var count int
@@ -784,29 +800,8 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch
784800
}), nil
785801
}
786802

787-
// newPusher creates a git-backed Pusher bound to the configured checkout path,
788-
// remote, and target branch (PUSHER_CHECKOUT_PATH, PUSHER_REMOTE default
789-
// "origin", PUSHER_TARGET default "main"). When PUSHER_CHECKOUT_PATH is unset it
790-
// returns the fake pusher (commits succeed unless a change URI carries a failure
791-
// marker, see pusher/fake), keeping the example runnable without a git checkout.
792-
func newPusher(logger *zap.Logger, scope tally.Scope, resolver changeset.Resolver) (pusher.Pusher, error) {
793-
checkout := os.Getenv("PUSHER_CHECKOUT_PATH")
794-
if checkout == "" {
795-
logger.Warn("PUSHER_CHECKOUT_PATH not set; using fake pusher (commits succeed unless URI-marked)")
796-
return pushfake.New(resolver), nil
797-
}
798-
return gitpusher.NewPusher(gitpusher.Params{
799-
CheckoutPath: checkout,
800-
Remote: getEnv("PUSHER_REMOTE", "origin"),
801-
Target: getEnv("PUSHER_TARGET", "main"),
802-
Resolver: resolver,
803-
Logger: logger.Sugar(),
804-
MetricsScope: scope.SubScope("pusher"),
805-
}), nil
806-
}
807-
808803
// newQueueRegistry builds the per-queue extension profiles for the example.
809-
// Edge integrations (merge checker, change provider, pusher) and the build
804+
// Edge integrations (change provider) and the build
810805
// runner form a shared baseline; each per-queue profile starts from that
811806
// baseline and overrides only the extensions that differ — here the scorer and
812807
// conflict analyzer. Queues without an explicit profile fall back to the
@@ -817,10 +812,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.
817812
if err != nil {
818813
return queueRegistry{}, fmt.Errorf("failed to create change provider: %w", err)
819814
}
820-
psh, err := newPusher(logger, scope, resolver)
821-
if err != nil {
822-
return queueRegistry{}, fmt.Errorf("failed to create pusher: %w", err)
823-
}
824815

825816
// batchLines buckets a batch by total lines changed across all its changes —
826817
// larger batches are likelier to fail to land.
@@ -842,7 +833,6 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope, resolver changeset.
842833
// below does.
843834
base := queueExtensions{
844835
changeProvider: cp,
845-
pusher: psh,
846836
buildRunner: buildfake.New(resolver),
847837
scorer: scorerfake.New(resolver, heuristic.New(
848838
resolver,

submitqueue/orchestrator/controller/dlq/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ go_library(
88
"dlq.go",
99
"log.go",
1010
"mergeconflictsignal.go",
11+
"mergesignal.go",
1112
"request.go",
1213
],
1314
importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/dlq",
@@ -31,6 +32,7 @@ go_test(
3132
"dlq_test.go",
3233
"log_test.go",
3334
"mergeconflictsignal_test.go",
35+
"mergesignal_test.go",
3436
"request_test.go",
3537
],
3638
embed = [":dlq"],
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dlq
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/uber-go/tally"
22+
"github.com/uber/submitqueue/core/consumer"
23+
"github.com/uber/submitqueue/core/metrics"
24+
runwayentity "github.com/uber/submitqueue/runway/entity"
25+
"github.com/uber/submitqueue/submitqueue/extension/storage"
26+
"go.uber.org/zap"
27+
)
28+
29+
// mergeSignalController is the DLQ reconciler for the mergesignal topic. Its
30+
// payload carries a runway MergeResult whose id is the batch id echoed back, so
31+
// reconciliation fails that batch directly via failBatch (which also fans out
32+
// to the member requests).
33+
type mergeSignalController struct {
34+
logger *zap.SugaredLogger
35+
metricsScope tally.Scope
36+
store storage.Storage
37+
topicKey consumer.TopicKey
38+
consumerGroup string
39+
}
40+
41+
// Verify mergeSignalController implements consumer.Controller at compile time.
42+
var _ consumer.Controller = (*mergeSignalController)(nil)
43+
44+
// NewDLQMergeSignalController builds a DLQ controller for the mergesignal topic.
45+
func NewDLQMergeSignalController(
46+
logger *zap.SugaredLogger,
47+
scope tally.Scope,
48+
store storage.Storage,
49+
topicKey consumer.TopicKey,
50+
consumerGroup string,
51+
) consumer.Controller {
52+
name := string(topicKey) + "_controller"
53+
return &mergeSignalController{
54+
logger: logger.Named(name),
55+
metricsScope: scope.SubScope(name),
56+
store: store,
57+
topicKey: topicKey,
58+
consumerGroup: consumerGroup,
59+
}
60+
}
61+
62+
// Process reconciles a single DLQ delivery for the mergesignal topic.
63+
func (c *mergeSignalController) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
64+
const opName = "process"
65+
66+
op := metrics.Begin(c.metricsScope, opName)
67+
defer func() { op.Complete(retErr) }()
68+
69+
msg := delivery.Message()
70+
71+
result, err := runwayentity.MergeResultFromBytes(msg.Payload)
72+
if err != nil {
73+
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
74+
return fmt.Errorf("failed to decode merge result from dlq payload: %w", err)
75+
}
76+
77+
dmeta := delivery.Metadata()
78+
c.logger.Warnw("dlq message received",
79+
"batch_id", result.ID,
80+
"attempt", delivery.Attempt(),
81+
"dlq_original_topic", dmeta["dlq.original_topic"],
82+
"dlq_failure_count", dmeta["dlq.failure_count"],
83+
"dlq_last_error", dmeta["dlq.last_error"],
84+
)
85+
86+
if err := failBatch(ctx, c.store, c.logger, result.ID); err != nil {
87+
metrics.NamedCounter(c.metricsScope, opName, "reconcile_errors", 1)
88+
return err
89+
}
90+
91+
metrics.NamedCounter(c.metricsScope, opName, "reconciled", 1)
92+
return nil
93+
}
94+
95+
// Name returns the controller name for logging and metrics.
96+
func (c *mergeSignalController) Name() string {
97+
return string(c.topicKey)
98+
}
99+
100+
// TopicKey returns the topic key this controller subscribes to.
101+
func (c *mergeSignalController) TopicKey() consumer.TopicKey {
102+
return c.topicKey
103+
}
104+
105+
// ConsumerGroup returns the consumer group for offset tracking.
106+
func (c *mergeSignalController) ConsumerGroup() string {
107+
return c.consumerGroup
108+
}

0 commit comments

Comments
 (0)