Skip to content

Commit 2c37061

Browse files
authored
Merge branch 'main' into mnoah1/stovepipe-validate-controller
2 parents d152470 + 2d254c1 commit 2c37061

38 files changed

Lines changed: 2910 additions & 1012 deletions

CLAUDE.md

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ request.Version = newVersion
3636

3737
```
3838
submitqueue/ # repo root (Go module github.com/uber/submitqueue)
39-
├── api/ # Wire contracts (proto) by domain/service
40-
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/
41-
│ └── stovepipe/{gateway,orchestrator}/{proto,protopb}/
39+
├── api/ # Published wire contracts (cross-domain/external)
40+
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/ # RPC (proto)
41+
│ ├── stovepipe/{gateway,orchestrator}/{proto,protopb}/
42+
│ └── runway/messagequeue/ # external queue contracts (proto + protojson)
4243
├── platform/ # SHARED cross-domain packages — no domain deps
4344
│ ├── errs/, metrics/, consumer/, http/
4445
│ ├── base/ # SHARED entities (change/, messagequeue/, …)
@@ -68,7 +69,7 @@ submitqueue/ # repo root (Go module github.com/uber/submi
6869

6970
The `platform/` tree holds code reused across domains (infrastructure, shared entities, shared extension contracts). Each **domain** (`submitqueue/`, `stovepipe/`, …) keeps the same internal layout (`gateway/`, `orchestrator/`, `entity/`, `extension/`, `core/`); a domain's own `core/` (e.g. `submitqueue/core/`) holds infra shared only between that domain's services.
7071

71-
The `api/` tree holds all wire contracts (proto definitions and their committed generated stubs), organized by `domain/service`: `api/{domain}/{service}/proto/` for `.proto` sources and `api/{domain}/{service}/protopb/` for generated Go. A service package may hold multiple `.proto` files — its RPC contract (`{service}.proto`) alongside messagequeue contracts (queue payload schemas) — all generating into the same `protopb/`.
72+
The `api/` tree holds **published** wire contracts — those depended on from outside the owning domain. RPC contracts live at `api/{domain}/{service}/` (`proto/` for `.proto` sources, `protopb/` for committed generated Go); a service package may hold multiple `.proto` files, all generating into the same `protopb/`. External message-queue contracts live at `api/{domain}/messagequeue/` (see Message Queue Contracts below). Internal queue contracts do **not** go here — they live under `{domain}/core/messagequeue/`.
7273

7374
### Platform notes
7475

@@ -108,6 +109,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
108109

109110
Controllers receive `consumer.Delivery` (subset interface without Ack/Nack) to enforce separation of business logic from infrastructure.
110111

112+
**Queue payloads: IDs within a boundary, full payloads across one.** When producer and consumer share a store (same service — e.g. `build`→`buildsignal`, `validate`→`mergeconflict`), put only the entity **ID** on the queue and reload from storage (the store is the source of truth, messages stay small, redelivery is idempotent). When a queue **crosses a service boundary** (the consumer cannot read the producer's store — e.g. orchestrator→runway), publish the **full payload** the consumer needs, and have the **client own the correlation ID** so it can match the async result back to the work it is tracking. The queue's **owner defines the wire contract and topic keys** (in its own domain package); the other side imports them.
113+
111114
### Entities
112115

113116
Domain objects live under each domain's `entity/` tree, or under `platform/base/` when shared across domains. Guidelines:
@@ -156,9 +159,10 @@ Paths follow the directory layout: shared packages live under `platform/` at the
156159
- RPC Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller` (e.g. `.../submitqueue/gateway/controller`)
157160
- Queue Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller/{step}`
158161
- Proto (generated): `github.com/uber/submitqueue/api/{domain}/{service}/protopb`
162+
- Queue contracts: external `github.com/uber/submitqueue/api/{domain}/messagequeue`; internal `github.com/uber/submitqueue/{domain}/core/messagequeue`
159163
- Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`)
160164
- Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`)
161-
- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; domain pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey`
165+
- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; internal pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey` (external queue topic keys live with their contract package, e.g. `api/runway/messagequeue`)
162166
- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/request`)
163167
- Shared entities: `github.com/uber/submitqueue/platform/base/{pkg}` (e.g. `.../platform/base/messagequeue`)
164168
- Shared extensions: `github.com/uber/submitqueue/platform/extension/{ext}[/{impl}]` (e.g. `.../platform/extension/messagequeue/mysql`)
@@ -182,7 +186,13 @@ Generated proto files are committed. When modifying `.proto` files:
182186
2. `make proto` (generates `*.pb.go`, `*_grpc.pb.go`, `*.pb.yarpc.go` into `api/{domain}/{service}/protopb/`)
183187
3. Commit all generated files
184188
185-
To add a new `.proto` to a service (e.g. messagequeue contracts), drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package.
189+
To add a new `.proto` to a service, drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package.
190+
191+
### Message Queue Contracts
192+
193+
Queue payloads are defined in **proto3** (`.proto` under `proto/`, generated Go in `protopb/` as the binding) and serialized as **protobuf JSON** (protojson) so the queue keeps storing self-describing JSON. Location follows audience: external/cross-domain contracts go under `api/{domain}/messagequeue/`; internal contracts (used only within the owning domain) go under `{domain}/core/messagequeue/`. Bazel `visibility` enforces the split — internal targets are domain-scoped, `api/` targets are public. See [doc/rfc/messagequeue-contract.md](doc/rfc/messagequeue-contract.md).
194+
195+
The message types are generated; the contract package adds only generic `protojson` glue — `Marshal(m)` / `Unmarshal[T](b, m)` — owning the wire conventions: `UseProtoNames` (snake_case fields), UPPER_SNAKE enum values, int64-as-string, unknown fields discarded on read (additive evolution). The topic key(s) carrying a message are declared on the message via the `topic_keys` proto option — a `google.protobuf.MessageOptions` extension defined in `api/base/messagequeue`. A topic key is a stable logical name, not a concrete wire topic; each implementer maps it to its backend's topic name, and a `TopicKeys(msg)` reflection helper reads the option back. It is contract metadata, not the hot path — publish/consume still routes on `consumer.TopicKey` + `TopicRegistry`. The contract package owns both halves: the proto payload and the `TopicKey` constants for its topic keys. A contract test round-trips the payloads and asserts every topic key is bound to exactly one message. Shared field types (`Change`, `Strategy`) are shared protos under `api/base/{change,mergestrategy}`. `api/runway/messagequeue/` is the reference example.
186196
187197
### Naming Conventions
188198

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ GOIMPORTS_VERSION ?= v0.33.0
3131
# (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A
3232
# package may hold multiple .proto files (e.g. an RPC contract plus messagequeue
3333
# contracts); all generated stubs land in the same protopb/ dir.
34-
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator
34+
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator
3535

3636
# Set REPO_ROOT for docker-compose
3737
export REPO_ROOT := $(shell pwd)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "messagequeue",
5+
srcs = [
6+
"merge.go",
7+
"topics.go",
8+
],
9+
importpath = "github.com/uber/submitqueue/api/runway/messagequeue",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//api/base/messagequeue/protopb", # keep
13+
"//api/runway/messagequeue/protopb", # keep
14+
"//platform/consumer",
15+
"@org_golang_google_protobuf//encoding/protojson",
16+
"@org_golang_google_protobuf//proto",
17+
],
18+
)
19+
20+
go_test(
21+
name = "messagequeue_test",
22+
srcs = ["merge_test.go"],
23+
embed = [":messagequeue"],
24+
deps = [
25+
"//api/base/change/protopb",
26+
"//api/base/mergestrategy/protopb",
27+
"@com_github_stretchr_testify//assert",
28+
"@com_github_stretchr_testify//require",
29+
"@org_golang_google_protobuf//proto",
30+
],
31+
)

api/runway/messagequeue/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Runway message queue contract
2+
3+
The published, language-neutral contract for the merge queues Runway owns. A client — in any language — publishes a merge request and consumes the result without access to Runway's Go types or storage. See [the message queue contract RFC](../../../doc/rfc/messagequeue-contract.md) for the design.
4+
5+
Payloads are defined as proto3 messages in [`proto/merge.proto`](proto/merge.proto) and generated into [`protopb/`](protopb); the proto is the authority and a non-Go client compiles against it directly. On the wire, payloads are serialized as protobuf JSON (`protojson`), so the queue keeps storing self-describing JSON. The message types are generated, so the Go helpers in this package are just generic `protojson` glue — `Marshal(m)` and `Unmarshal[T](b, m)` — for Go callers; field names stay snake_case (`UseProtoNames`) and enums serialize as their UPPER_SNAKE value name.
6+
7+
The shared field types `Change` and `MergeStrategy` come from `api/base/change` and `api/base/mergestrategy`, imported by the contract.
8+
9+
## Topic keys
10+
11+
The binding between a topic key and its payload lives in each message's `topic_keys` option (defined in `api/base/messagequeue`); `TopicKeys` reads it back by reflection. A topic key is a stable logical name, not a concrete wire topic — each implementer maps the key to whatever topic name its broker/queue requires. Our Go wiring maps it via `consumer.TopicRegistry`.
12+
13+
| Message | Direction | Topic keys |
14+
|---|---|---|
15+
| `MergeRequest` | client → Runway | `merge-conflict-check`, `merge` |
16+
| `MergeResult` | Runway → client | `merge-conflict-check-signal`, `merge-signal` |
17+
18+
One message serves a queue pair because a merge-conflict check is a dry run of a merge: Runway applies the same ordered steps onto the same target branch, and the topic key the request arrives on decides whether it commits the result and reports the produced revisions. A request on `merge-conflict-check` is a dry run; a request on `merge` commits.
19+
20+
## Result shape
21+
22+
`MergeResult.outcome` is an `Outcome` enum (`OUTCOME_UNSPECIFIED`/`SUCCEEDED`/`FAILED`): `SUCCEEDED` means mergeable (check) or merged (commit), `FAILED` a conflict or a failed apply; `reason` carries the explanation when `FAILED`. Per-step detail is in `steps` (request order): each `StepResult.outputs` is the list of `StepOutput`s the step produced on the target branch, **in application order** (the order they were created). A committing merge populates `outputs`; a dry-run check, an already-present change, or a failed step leaves them empty. `StepOutput.id` is the VCS-neutral revision identifier (git SHA, Mercurial hash, Subversion revision, Perforce changelist, …), with room to grow (author, timestamp, …).
23+
24+
## Evolution
25+
26+
Contract changes are additive-only: add new fields; never remove, rename, repurpose, or retype an existing field, and never reuse a field number. protojson ignores unknown fields on read and omits zero-valued fields on write, so a new optional field is backward-compatible in both directions.

api/runway/messagequeue/merge.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package messagequeue holds Runway's external message-queue contract: the wire
16+
// payloads for the merge queues Runway owns, defined by the proto files in
17+
// proto/ and generated into protopb/. The proto is the language-neutral
18+
// authority; the generated Go types in protopb are the binding for Go callers.
19+
//
20+
// The message types are generated into protopb; this package adds only generic
21+
// protojson glue (Marshal/Unmarshal) and the topic-key reflection lookup
22+
// (TopicKeys), so there is no per-message serialization code. Payloads are
23+
// serialized as protobuf JSON, not binary, so the MySQL-backed queue keeps
24+
// storing self-describing JSON. The topic key that carries each payload is
25+
// declared on the message itself via the topic_keys proto option (see
26+
// api/base/messagequeue).
27+
//
28+
// One contract serves two queue pairs because a merge-conflict check is a dry
29+
// run of a merge: Runway applies the same ordered steps onto the same target
30+
// branch, and the only difference is whether it commits the result and reports
31+
// the revisions it produced. The topic key a request arrives on encodes that choice
32+
// — the merge-conflict-check pair for a dry run, the merge pair for a
33+
// committing merge — so MergeRequest and MergeResult are identical on both.
34+
package messagequeue
35+
36+
import (
37+
"google.golang.org/protobuf/encoding/protojson"
38+
"google.golang.org/protobuf/proto"
39+
40+
basemqpb "github.com/uber/submitqueue/api/base/messagequeue/protopb"
41+
"github.com/uber/submitqueue/api/runway/messagequeue/protopb"
42+
)
43+
44+
// Wire payload types. These alias the generated protobuf bindings so callers
45+
// reference the contract through this curated package rather than protopb.
46+
type (
47+
// MergeRequest is the payload a client publishes to one of Runway's merge
48+
// queues: the merge-conflict-check topic for a dry-run check, the merge
49+
// topic for a committing merge.
50+
MergeRequest = protopb.MergeRequest
51+
// MergeStep is one step of an ordered merge: a single set of change(s)
52+
// applied with a strategy.
53+
MergeStep = protopb.MergeStep
54+
// MergeResult is the payload Runway publishes to the corresponding signal
55+
// queue once a request completes.
56+
MergeResult = protopb.MergeResult
57+
// StepResult reports what happened to a single MergeStep.
58+
StepResult = protopb.StepResult
59+
// StepOutput is a single revision a merge step produced on the target branch.
60+
StepOutput = protopb.StepOutput
61+
)
62+
63+
// marshalOpts keeps the JSON field names identical to the proto field names
64+
// (snake_case), so the wire shape matches the declared contract rather than
65+
// protojson's default lowerCamelCase. Zero-valued fields are omitted.
66+
var marshalOpts = protojson.MarshalOptions{UseProtoNames: true}
67+
68+
// unmarshalOpts tolerates unknown fields so an additive contract change (a new
69+
// field a producer sends but this consumer does not yet know) is ignored rather
70+
// than rejected.
71+
var unmarshalOpts = protojson.UnmarshalOptions{DiscardUnknown: true}
72+
73+
// Marshal serializes any contract message to protojson bytes for the queue
74+
// payload, keeping the proto field names (snake_case) on the wire.
75+
func Marshal(m proto.Message) ([]byte, error) {
76+
return marshalOpts.Marshal(m)
77+
}
78+
79+
// Unmarshal deserializes protojson bytes into the contract message m, tolerating
80+
// unknown fields so an additive contract change is ignored rather than rejected.
81+
func Unmarshal[T proto.Message](b []byte, m T) error {
82+
return unmarshalOpts.Unmarshal(b, m)
83+
}
84+
85+
// TopicKeys returns the stable logical topic keys bound to a message via the
86+
// topic_keys proto option — not concrete wire names; a caller maps each key to
87+
// its backend's topic name. Returns nil for a message that declares no keys.
88+
func TopicKeys(m proto.Message) []string {
89+
opts := m.ProtoReflect().Descriptor().Options()
90+
if opts == nil {
91+
return nil
92+
}
93+
keys, ok := proto.GetExtension(opts, basemqpb.E_TopicKeys).([]string)
94+
if !ok {
95+
return nil
96+
}
97+
return keys
98+
}

0 commit comments

Comments
 (0)