Skip to content

Commit 823ec31

Browse files
feat(api/runway): add external merge queue contract
Add Runway's published, language-neutral merge queue contract. merge.proto defines MergeRequest/MergeResult, reusing the shared Change and Strategy types and the topics option, generated into protopb and serialized as protobuf JSON so the queue keeps storing self-describing JSON. The Go helpers wrap protojson and expose the topic binding via reflection; topic keys are co-located with the contract. A drift test round-trips the payloads and verifies every topic key is bound to exactly one message. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent e249674 commit 823ec31

17 files changed

Lines changed: 947 additions & 302 deletions

File tree

CLAUDE.md

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ request.Version = newVersion
3636

3737
```
3838
submitqueue/ # repo root (Go module github.com/uber/submitqueue)
39-
├── api/ # Wire contracts (proto) by domain/service
40-
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/
41-
│ └── stovepipe/{gateway,orchestrator}/{proto,protopb}/
39+
├── api/ # Published wire contracts (cross-domain/external)
40+
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/ # RPC (proto)
41+
│ ├── stovepipe/{gateway,orchestrator}/{proto,protopb}/
42+
│ └── runway/messagequeue/ # external queue contracts (JSON Schema + Go binding)
4243
├── platform/ # SHARED cross-domain packages — no domain deps
4344
│ ├── errs/, metrics/, consumer/, http/
4445
│ ├── base/ # SHARED entities (change/, messagequeue/, …)
@@ -68,7 +69,7 @@ submitqueue/ # repo root (Go module github.com/uber/submi
6869

6970
The `platform/` tree holds code reused across domains (infrastructure, shared entities, shared extension contracts). Each **domain** (`submitqueue/`, `stovepipe/`, …) keeps the same internal layout (`gateway/`, `orchestrator/`, `entity/`, `extension/`, `core/`); a domain's own `core/` (e.g. `submitqueue/core/`) holds infra shared only between that domain's services.
7071

71-
The `api/` tree holds all wire contracts (proto definitions and their committed generated stubs), organized by `domain/service`: `api/{domain}/{service}/proto/` for `.proto` sources and `api/{domain}/{service}/protopb/` for generated Go. A service package may hold multiple `.proto` files — its RPC contract (`{service}.proto`) alongside messagequeue contracts (queue payload schemas) — all generating into the same `protopb/`.
72+
The `api/` tree holds **published** wire contracts — those depended on from outside the owning domain. RPC contracts live at `api/{domain}/{service}/` (`proto/` for `.proto` sources, `protopb/` for committed generated Go); a service package may hold multiple `.proto` files, all generating into the same `protopb/`. External message-queue contracts live at `api/{domain}/messagequeue/` (see Message Queue Contracts below). Internal queue contracts do **not** go here — they live under `{domain}/core/messagequeue/`.
7273

7374
### Platform notes
7475

@@ -156,9 +157,10 @@ Paths follow the directory layout: shared packages live under `platform/` at the
156157
- RPC Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller` (e.g. `.../submitqueue/gateway/controller`)
157158
- Queue Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller/{step}`
158159
- Proto (generated): `github.com/uber/submitqueue/api/{domain}/{service}/protopb`
160+
- Queue contracts: external `github.com/uber/submitqueue/api/{domain}/messagequeue`; internal `github.com/uber/submitqueue/{domain}/core/messagequeue`
159161
- Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`)
160162
- Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`)
161-
- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; domain pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey`
163+
- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; internal pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey` (external queue topic keys live with their contract package, e.g. `api/runway/messagequeue`)
162164
- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/request`)
163165
- Shared entities: `github.com/uber/submitqueue/platform/base/{pkg}` (e.g. `.../platform/base/messagequeue`)
164166
- Shared extensions: `github.com/uber/submitqueue/platform/extension/{ext}[/{impl}]` (e.g. `.../platform/extension/messagequeue/mysql`)
@@ -182,7 +184,13 @@ Generated proto files are committed. When modifying `.proto` files:
182184
2. `make proto` (generates `*.pb.go`, `*_grpc.pb.go`, `*.pb.yarpc.go` into `api/{domain}/{service}/protopb/`)
183185
3. Commit all generated files
184186
185-
To add a new `.proto` to a service (e.g. messagequeue contracts), drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package.
187+
To add a new `.proto` to a service, drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package.
188+
189+
### Message Queue Contracts
190+
191+
Queue payloads are JSON, and the contract is a **JSON Schema (draft 2020-12)** file co-located with its Go binding. Location follows audience: external/cross-domain contracts go under `api/{domain}/messagequeue/`; internal contracts (used only within the owning domain) go under `{domain}/core/messagequeue/`. Bazel `visibility` enforces the split — internal targets are domain-scoped, `api/` targets are public. See [doc/rfc/messagequeue-contract.md](doc/rfc/messagequeue-contract.md).
192+
193+
Each schema declares an `x-topics` keyword (the wire topic names it carries) — the single source of truth for the topic-to-schema mapping, distinct from the Go `consumer.TopicKey`. The contract package owns both halves of the contract: the payload (schema + Go binding that `go:embed`s it, providing the struct plus `ToBytes`/`FromBytes`) and the `TopicKey` constants for its topics. Publishers/consumers use the binding like any entity. A drift test keeps schema and binding in sync (entity JSON validates against the embedded schema; every topic key is bound by some `x-topics`). Evolution is additive-only. `api/runway/messagequeue/` is the reference example.
186194
187195
### Naming Conventions
188196

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ GOIMPORTS_VERSION ?= v0.33.0
3131
# (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A
3232
# package may hold multiple .proto files (e.g. an RPC contract plus messagequeue
3333
# contracts); all generated stubs land in the same protopb/ dir.
34-
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator
34+
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator
3535

3636
# Set REPO_ROOT for docker-compose
3737
export REPO_ROOT := $(shell pwd)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "messagequeue",
5+
srcs = [
6+
"merge.go",
7+
"topics.go",
8+
],
9+
importpath = "github.com/uber/submitqueue/api/runway/messagequeue",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//api/base/messagequeue/protopb", # keep
13+
"//api/runway/messagequeue/protopb", # keep
14+
"//platform/consumer",
15+
"@org_golang_google_protobuf//encoding/protojson",
16+
"@org_golang_google_protobuf//proto",
17+
],
18+
)
19+
20+
go_test(
21+
name = "messagequeue_test",
22+
srcs = ["merge_test.go"],
23+
embed = [":messagequeue"],
24+
deps = [
25+
"//api/base/change/protopb",
26+
"//api/base/mergestrategy/protopb",
27+
"@com_github_stretchr_testify//assert",
28+
"@com_github_stretchr_testify//require",
29+
"@org_golang_google_protobuf//proto",
30+
],
31+
)

api/runway/messagequeue/README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Runway message queue contract
2+
3+
The published, language-neutral contract for the merge queues Runway owns. A client — in any language — publishes a merge request and consumes the result without access to Runway's Go types or storage. See [the message queue contract RFC](../../../doc/rfc/messagequeue-contract.md) for the design.
4+
5+
Payloads are defined as proto3 messages in [`proto/merge.proto`](proto/merge.proto) and generated into [`protopb/`](protopb); the proto is the authority and a non-Go client compiles against it directly. On the wire, payloads are serialized as protobuf JSON (`protojson`), so the queue keeps storing self-describing JSON. The Go helpers in this package (`MergeRequestToBytes`/`MergeRequestFromBytes` and the `MergeResult` counterparts) wrap `protojson` for Go callers; field names stay snake_case (`UseProtoNames`) and enums serialize as their UPPER_SNAKE value name.
6+
7+
The shared field types `Change` and `MergeStrategy` come from `api/base/change` and `api/base/mergestrategy`, imported by the contract.
8+
9+
## Topics
10+
11+
The binding between a queue topic and its payload lives in each message's `topics` option (defined in `api/base/messagequeue`); `Topics` reads it back by reflection.
12+
13+
| Message | Direction | Topics |
14+
|---|---|---|
15+
| `MergeRequest` | client → Runway | `merge-conflict-checker`, `merger` |
16+
| `MergeResult` | Runway → client | `merge-conflict-checker-signal`, `merger-signal` |
17+
18+
One message serves a queue pair because a merge-conflict check is a dry run of a merge: Runway applies the same ordered steps onto the same target branch, and the topic the request arrives on decides whether it commits the result and reports the produced revisions. A request on `merge-conflict-checker` is a dry run; a request on `merger` commits.
19+
20+
## Evolution
21+
22+
Contract changes are additive-only: add new fields; never remove, rename, repurpose, or retype an existing field, and never reuse a field number. protojson ignores unknown fields on read and omits zero-valued fields on write, so a new optional field is backward-compatible in both directions.

api/runway/messagequeue/merge.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package messagequeue holds Runway's external message-queue contract: the wire
16+
// payloads for the merge queues Runway owns, defined by the proto files in
17+
// proto/ and generated into protopb/. The proto is the language-neutral
18+
// authority; the generated Go types in protopb are the binding for Go callers.
19+
//
20+
// Payloads are serialized as protobuf JSON (protojson), not binary, so the
21+
// MySQL-backed queue keeps storing self-describing JSON. The topic that carries
22+
// each payload is declared on the message itself via the topics proto option
23+
// (see api/base/messagequeue); Topics reads it back.
24+
//
25+
// One contract serves two queue pairs because a merge-conflict check is a dry
26+
// run of a merge: Runway applies the same ordered steps onto the same target
27+
// branch, and the only difference is whether it commits the result and reports
28+
// the revisions it produced. The topic a request arrives on encodes that choice
29+
// — the merge-conflict-checker pair for a dry run, the merger pair for a
30+
// committing merge — so MergeRequest and MergeResult are identical on both.
31+
package messagequeue
32+
33+
import (
34+
"google.golang.org/protobuf/encoding/protojson"
35+
"google.golang.org/protobuf/proto"
36+
37+
basemqpb "github.com/uber/submitqueue/api/base/messagequeue/protopb"
38+
"github.com/uber/submitqueue/api/runway/messagequeue/protopb"
39+
)
40+
41+
// Wire payload types. These alias the generated protobuf bindings so callers
42+
// reference the contract through this curated package rather than protopb.
43+
type (
44+
// MergeRequest is the payload a client publishes to one of Runway's merge
45+
// queues: the merge-conflict-checker topic for a dry-run check, the merger
46+
// topic for a committing merge.
47+
MergeRequest = protopb.MergeRequest
48+
// MergeStep is one step of an ordered merge: a single set of change(s)
49+
// applied with a strategy.
50+
MergeStep = protopb.MergeStep
51+
// MergeResult is the payload Runway publishes to the corresponding signal
52+
// queue once a request completes.
53+
MergeResult = protopb.MergeResult
54+
// StepResult reports what happened to a single MergeStep.
55+
StepResult = protopb.StepResult
56+
)
57+
58+
// marshalOpts keeps the JSON field names identical to the proto field names
59+
// (snake_case), so the wire shape matches the declared contract rather than
60+
// protojson's default lowerCamelCase. Zero-valued fields are omitted.
61+
var marshalOpts = protojson.MarshalOptions{UseProtoNames: true}
62+
63+
// unmarshalOpts tolerates unknown fields so an additive contract change (a new
64+
// field a producer sends but this consumer does not yet know) is ignored rather
65+
// than rejected.
66+
var unmarshalOpts = protojson.UnmarshalOptions{DiscardUnknown: true}
67+
68+
// MergeRequestToBytes serializes a MergeRequest to protojson bytes for the queue
69+
// payload.
70+
func MergeRequestToBytes(r *MergeRequest) ([]byte, error) {
71+
return marshalOpts.Marshal(r)
72+
}
73+
74+
// MergeRequestFromBytes deserializes a MergeRequest from protojson bytes.
75+
func MergeRequestFromBytes(data []byte) (*MergeRequest, error) {
76+
var req MergeRequest
77+
err := unmarshalOpts.Unmarshal(data, &req)
78+
return &req, err
79+
}
80+
81+
// MergeResultToBytes serializes a MergeResult to protojson bytes for the queue
82+
// payload.
83+
func MergeResultToBytes(r *MergeResult) ([]byte, error) {
84+
return marshalOpts.Marshal(r)
85+
}
86+
87+
// MergeResultFromBytes deserializes a MergeResult from protojson bytes.
88+
func MergeResultFromBytes(data []byte) (*MergeResult, error) {
89+
var res MergeResult
90+
err := unmarshalOpts.Unmarshal(data, &res)
91+
return &res, err
92+
}
93+
94+
// Topics returns the canonical wire topic names bound to a message via the
95+
// topics proto option (the proto-native equivalent of a JSON Schema x-topics
96+
// keyword). It returns nil for a message that declares no topics.
97+
func Topics(m proto.Message) []string {
98+
opts := m.ProtoReflect().Descriptor().Options()
99+
if opts == nil {
100+
return nil
101+
}
102+
topics, ok := proto.GetExtension(opts, basemqpb.E_Topics).([]string)
103+
if !ok {
104+
return nil
105+
}
106+
return topics
107+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package messagequeue
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
"google.golang.org/protobuf/proto"
23+
24+
changepb "github.com/uber/submitqueue/api/base/change/protopb"
25+
strategypb "github.com/uber/submitqueue/api/base/mergestrategy/protopb"
26+
)
27+
28+
func TestMergeRequestRoundTrip(t *testing.T) {
29+
req := &MergeRequest{
30+
Id: "queue-a/42",
31+
QueueName: "queue-a",
32+
Steps: []*MergeStep{
33+
{
34+
StepId: "queue-a/1",
35+
Changes: []*changepb.Change{{Uris: []string{"github://uber/repo/pull/1/0123456789abcdef0123456789abcdef01234567"}}},
36+
Strategy: strategypb.Strategy_REBASE,
37+
},
38+
{
39+
StepId: "queue-a/2",
40+
Changes: []*changepb.Change{{Uris: []string{"github://uber/repo/pull/2/89abcdef0123456789abcdef0123456789abcdef"}}},
41+
Strategy: strategypb.Strategy_MERGE,
42+
},
43+
},
44+
}
45+
46+
data, err := MergeRequestToBytes(req)
47+
require.NoError(t, err)
48+
49+
got, err := MergeRequestFromBytes(data)
50+
require.NoError(t, err)
51+
assert.True(t, proto.Equal(req, got), "round-tripped MergeRequest should equal the original")
52+
}
53+
54+
func TestMergeResultRoundTrip(t *testing.T) {
55+
// A committing merge reports the revisions each step produced on the target;
56+
// a dry-run check leaves output_ids empty and reports a per-step reason on
57+
// failure. Both shapes share the one MergeResult contract.
58+
cases := map[string]*MergeResult{
59+
"merged with produced revisions": {
60+
Id: "queue-a/42",
61+
Success: true,
62+
Steps: []*StepResult{
63+
{StepId: "queue-a/1", OutputIds: []string{"0123456789abcdef0123456789abcdef01234567"}},
64+
},
65+
},
66+
"failed with per-step reason": {
67+
Id: "queue-a/42",
68+
Success: false,
69+
Reason: "conflict in foo.go",
70+
Steps: []*StepResult{{StepId: "queue-a/2", Reason: "conflict in foo.go"}},
71+
},
72+
"minimal": {
73+
Id: "queue-a/42",
74+
Success: true,
75+
},
76+
}
77+
78+
for name, res := range cases {
79+
t.Run(name, func(t *testing.T) {
80+
data, err := MergeResultToBytes(res)
81+
require.NoError(t, err)
82+
83+
got, err := MergeResultFromBytes(data)
84+
require.NoError(t, err)
85+
assert.True(t, proto.Equal(res, got), "round-tripped MergeResult should equal the original")
86+
})
87+
}
88+
}
89+
90+
// TestWireFormat locks the two protojson encoding decisions the contract relies
91+
// on: snake_case field names (UseProtoNames) and proto-conventional UPPER_SNAKE
92+
// enum values on the wire.
93+
func TestWireFormat(t *testing.T) {
94+
data, err := MergeRequestToBytes(&MergeRequest{
95+
Id: "queue-a/42",
96+
QueueName: "queue-a",
97+
Steps: []*MergeStep{{StepId: "queue-a/1", Strategy: strategypb.Strategy_SQUASH_REBASE}},
98+
})
99+
require.NoError(t, err)
100+
101+
assert.Contains(t, string(data), `"queue_name"`, "fields must serialize as snake_case")
102+
assert.Contains(t, string(data), `"SQUASH_REBASE"`, "enums must serialize as their UPPER_SNAKE value name")
103+
}
104+
105+
// TestTopicsBindEveryTopicKey is the topic-binding drift guard: every Runway
106+
// topic key is carried by exactly one message's topics option, and no topics
107+
// option names an unknown topic.
108+
func TestTopicsBindEveryTopicKey(t *testing.T) {
109+
bound := map[string]int{}
110+
for _, m := range []proto.Message{&MergeRequest{}, &MergeResult{}} {
111+
topics := Topics(m)
112+
require.NotEmpty(t, topics, "message must declare a non-empty topics option")
113+
for _, topic := range topics {
114+
bound[topic]++
115+
}
116+
}
117+
118+
keys := []TopicKey{
119+
TopicKeyMergeConflictCheck,
120+
TopicKeyMergeConflictCheckSignal,
121+
TopicKeyMerge,
122+
TopicKeyMergeSignal,
123+
}
124+
125+
valid := map[string]bool{}
126+
for _, k := range keys {
127+
valid[k.String()] = true
128+
assert.Equalf(t, 1, bound[k.String()], "topic key %q must be bound to exactly one message via the topics option", k)
129+
}
130+
for topic := range bound {
131+
assert.Truef(t, valid[topic], "topics option names unknown topic %q", topic)
132+
}
133+
}

0 commit comments

Comments
 (0)