Skip to content

Commit 424e438

Browse files
committed
feat(stovepipe): add start controller for the change ingress flow
Stovepipe gateway -> start -> validate skeleton: - ChangeEvent: single-commit, source-agnostic ingress payload ({"uri": "git://owner/repo/branch/revision"}) published to the start topic by the webhook/poller. - start controller deserializes the ChangeEvent, derives the owner/repo partition key so a repo's commits stay ordered, and publishes a thin ChangeURI reference to the validate topic (publish-only until validate lands). - stovepipe/entity/git parses git:// commit identities into a ChangeID, domain-scoped like submitqueue's github/phabricator ChangeID parsers. Deferred to follow-up PRs once their consumers exist: orchestrator commit storage, the gateway status/log sinks, and lifting the cross-domain change identity into the shared entity package (a SubmitQueue refactor unrelated to this flow).
1 parent 2a53a64 commit 424e438

14 files changed

Lines changed: 845 additions & 4 deletions

File tree

example/stovepipe/orchestrator/server/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,17 @@ go_library(
1111
importpath = "github.com/uber/submitqueue/example/stovepipe/orchestrator/server",
1212
visibility = ["//visibility:private"],
1313
deps = [
14+
"//core/consumer",
15+
"//core/errs",
16+
"//core/errs/generic",
17+
"//core/errs/mysql",
18+
"//extension/messagequeue",
19+
"//extension/messagequeue/mysql",
20+
"//stovepipe/core/topickey",
1421
"//stovepipe/orchestrator/controller",
22+
"//stovepipe/orchestrator/controller/start",
1523
"//stovepipe/orchestrator/protopb",
24+
"@com_github_go_sql_driver_mysql//:mysql",
1625
"@com_github_uber_go_tally//:tally",
1726
"@org_golang_google_grpc//:grpc",
1827
"@org_golang_google_grpc//reflection",

example/stovepipe/orchestrator/server/main.go

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package main
1616

1717
import (
1818
"context"
19+
"database/sql"
1920
"errors"
2021
"fmt"
2122
"net"
@@ -25,8 +26,17 @@ import (
2526
"syscall"
2627
"time"
2728

29+
_ "github.com/go-sql-driver/mysql"
2830
"github.com/uber-go/tally"
31+
"github.com/uber/submitqueue/core/consumer"
32+
"github.com/uber/submitqueue/core/errs"
33+
genericerrs "github.com/uber/submitqueue/core/errs/generic"
34+
mysqlerrs "github.com/uber/submitqueue/core/errs/mysql"
35+
extqueue "github.com/uber/submitqueue/extension/messagequeue"
36+
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
37+
"github.com/uber/submitqueue/stovepipe/core/topickey"
2938
"github.com/uber/submitqueue/stovepipe/orchestrator/controller"
39+
"github.com/uber/submitqueue/stovepipe/orchestrator/controller/start"
3040
pb "github.com/uber/submitqueue/stovepipe/orchestrator/protopb"
3141
"go.uber.org/zap"
3242
"google.golang.org/grpc"
@@ -102,6 +112,58 @@ func run() error {
102112
metricsWgDone.Wait()
103113
}()
104114

115+
queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
116+
if queueDSN == "" {
117+
return fmt.Errorf("QUEUE_MYSQL_DSN environment variable is required")
118+
}
119+
queueDB, err := sql.Open("mysql", queueDSN)
120+
if err != nil {
121+
return fmt.Errorf("failed to open queue database: %w", err)
122+
}
123+
defer queueDB.Close()
124+
125+
mysqlQueue, err := queueMySQL.NewQueue(queueMySQL.Params{
126+
DB: queueDB,
127+
Logger: logger,
128+
MetricsScope: scope.SubScope("queue"),
129+
})
130+
if err != nil {
131+
return fmt.Errorf("failed to create queue: %w", err)
132+
}
133+
defer mysqlQueue.Close()
134+
135+
logger.Info("initialized queue", zap.String("dsn", queueDSN))
136+
137+
subscriberName := os.Getenv("HOSTNAME")
138+
if subscriberName == "" {
139+
subscriberName = fmt.Sprintf("stovepipe-orchestrator-%d", time.Now().Unix())
140+
}
141+
142+
registry, err := newTopicRegistry(mysqlQueue, subscriberName)
143+
if err != nil {
144+
return fmt.Errorf("failed to create topic registry: %w", err)
145+
}
146+
147+
primaryConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
148+
errs.NewClassifierProcessor(
149+
genericerrs.Classifier,
150+
mysqlerrs.Classifier,
151+
),
152+
)
153+
154+
startController := start.NewController(
155+
logger.Sugar(), scope, registry, topickey.TopicKeyStart, "orchestrator-start",
156+
)
157+
if err := primaryConsumer.Register(startController); err != nil {
158+
return fmt.Errorf("failed to register start controller: %w", err)
159+
}
160+
logger.Info("controllers registered", zap.Int("primary", 1))
161+
162+
if err := primaryConsumer.Start(ctx); err != nil {
163+
return fmt.Errorf("failed to start primary consumer: %w", err)
164+
}
165+
logger.Info("consumer started")
166+
105167
grpcServer := grpc.NewServer()
106168

107169
pingController := controller.NewPingController(logger, scope)
@@ -140,11 +202,36 @@ func run() error {
140202
serverErr = <-serverErrCh
141203
case serverErr = <-serverErrCh:
142204
fmt.Println("Shutting down stovepipe orchestrator server due to critical GRPC server error...")
205+
cancel()
143206
}
144207

145208
if serverErr != nil {
146-
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
209+
serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr)
210+
}
211+
212+
primaryStopErr := primaryConsumer.Stop(30000)
213+
if primaryStopErr != nil {
214+
primaryStopErr = fmt.Errorf("failed to stop consumer: %w", primaryStopErr)
215+
}
216+
217+
if primaryStopErr != nil || serverErr != nil {
218+
err = errors.Join(primaryStopErr, serverErr)
147219
}
148220

149221
return err
150222
}
223+
224+
func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
225+
return consumer.NewTopicRegistry([]consumer.TopicConfig{
226+
{
227+
Key: topickey.TopicKeyStart,
228+
Name: "start",
229+
Queue: q,
230+
Subscription: extqueue.DefaultSubscriptionConfig(
231+
subscriberName, "orchestrator-start",
232+
),
233+
},
234+
// Publish-only until the validate controller lands.
235+
{Key: topickey.TopicKeyValidate, Name: "validate", Queue: q},
236+
})
237+
}

stovepipe/core/topickey/topickey.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import "github.com/uber/submitqueue/core/consumer"
2121
type TopicKey = consumer.TopicKey
2222

2323
const (
24-
// TopicKeyStart is the pipeline stage where trunk push events arrive from the gateway.
24+
// TopicKeyStart is the pipeline stage where trunk change events arrive from the gateway.
2525
TopicKeyStart TopicKey = "start"
2626
// TopicKeyValidate is the pipeline stage where commits are published for metadata resolution.
2727
TopicKeyValidate TopicKey = "validate"

stovepipe/entity/BUILD.bazel

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,26 @@
1-
load("@rules_go//go:def.bzl", "go_library")
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "entity",
5-
srcs = ["entity.go"],
5+
srcs = [
6+
"change_event.go",
7+
"change_uri.go",
8+
"entity.go",
9+
],
610
importpath = "github.com/uber/submitqueue/stovepipe/entity",
711
visibility = ["//visibility:public"],
12+
deps = ["//stovepipe/entity/git"],
13+
)
14+
15+
go_test(
16+
name = "entity_test",
17+
srcs = [
18+
"change_event_test.go",
19+
"change_uri_test.go",
20+
],
21+
embed = [":entity"],
22+
deps = [
23+
"@com_github_stretchr_testify//assert",
24+
"@com_github_stretchr_testify//require",
25+
],
826
)

stovepipe/entity/change_event.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import (
18+
"encoding/json"
19+
"fmt"
20+
21+
entitygit "github.com/uber/submitqueue/stovepipe/entity/git"
22+
)
23+
24+
// ChangeEvent represents a single new trunk change entering the pipeline, published to the
25+
// start topic. A trunk change is one commit, so the event carries one git-backed URI. It is
26+
// source-agnostic: both the webhook and the reconciliation poller emit it. Additional fields
27+
// (e.g. source, committer time) can be added later as ingestion needs them.
28+
type ChangeEvent struct {
29+
// URI identifies the commit that entered the pipeline (git://owner/repo/branch/revision).
30+
URI string `json:"uri"`
31+
}
32+
33+
// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload.
34+
func (e ChangeEvent) ToBytes() ([]byte, error) {
35+
return json.Marshal(e)
36+
}
37+
38+
// Validate checks that the change event carries a valid git-backed commit URI.
39+
func (e ChangeEvent) Validate() error {
40+
if e.URI == "" {
41+
return fmt.Errorf("change event requires a commit URI")
42+
}
43+
if _, err := entitygit.ParseChangeID(e.URI); err != nil {
44+
return fmt.Errorf("change event URI: %w", err)
45+
}
46+
return nil
47+
}
48+
49+
// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes.
50+
func ChangeEventFromBytes(data []byte) (ChangeEvent, error) {
51+
var event ChangeEvent
52+
if err := json.Unmarshal(data, &event); err != nil {
53+
return ChangeEvent{}, err
54+
}
55+
if err := event.Validate(); err != nil {
56+
return ChangeEvent{}, err
57+
}
58+
return event, nil
59+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
const testURI = "git://uber/monorepo/main/abcdef0123456789abcdef0123456789abcdef01"
25+
26+
func TestChangeEvent_Validate(t *testing.T) {
27+
t.Run("valid URI", func(t *testing.T) {
28+
event := ChangeEvent{URI: testURI}
29+
require.NoError(t, event.Validate())
30+
})
31+
32+
t.Run("rejects empty URI", func(t *testing.T) {
33+
require.Error(t, ChangeEvent{}.Validate())
34+
})
35+
36+
t.Run("rejects non-git URI", func(t *testing.T) {
37+
event := ChangeEvent{URI: "github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}
38+
require.Error(t, event.Validate())
39+
})
40+
}
41+
42+
func TestChangeEventFromBytes(t *testing.T) {
43+
original := ChangeEvent{URI: testURI}
44+
data, err := original.ToBytes()
45+
require.NoError(t, err)
46+
47+
got, err := ChangeEventFromBytes(data)
48+
require.NoError(t, err)
49+
assert.Equal(t, original.URI, got.URI)
50+
}

stovepipe/entity/change_uri.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import "encoding/json"
18+
19+
// ChangeURI is the lightweight reference passed between pipeline stages. It
20+
// carries only the change identity; stages re-resolve any state they need.
21+
type ChangeURI struct {
22+
// URI is the change identity (git://owner/repo/branch/revision).
23+
URI string `json:"uri"`
24+
}
25+
26+
// ToBytes serializes the ChangeURI to JSON bytes for a queue message payload.
27+
func (c ChangeURI) ToBytes() ([]byte, error) {
28+
return json.Marshal(c)
29+
}
30+
31+
// ChangeURIFromBytes deserializes a ChangeURI from JSON bytes.
32+
func ChangeURIFromBytes(data []byte) (ChangeURI, error) {
33+
var ref ChangeURI
34+
err := json.Unmarshal(data, &ref)
35+
return ref, err
36+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestChangeURI_RoundTrip(t *testing.T) {
25+
original := ChangeURI{URI: "git://uber/monorepo/main/abcdef0123456789abcdef0123456789abcdef01"}
26+
data, err := original.ToBytes()
27+
require.NoError(t, err)
28+
29+
got, err := ChangeURIFromBytes(data)
30+
require.NoError(t, err)
31+
assert.Equal(t, original, got)
32+
}

stovepipe/entity/git/BUILD.bazel

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "git",
5+
srcs = ["change_id.go"],
6+
importpath = "github.com/uber/submitqueue/stovepipe/entity/git",
7+
visibility = ["//visibility:public"],
8+
)
9+
10+
go_test(
11+
name = "git_test",
12+
srcs = ["change_id_test.go"],
13+
embed = [":git"],
14+
deps = [
15+
"@com_github_stretchr_testify//assert",
16+
"@com_github_stretchr_testify//require",
17+
],
18+
)

0 commit comments

Comments
 (0)