Skip to content

Commit bf2716b

Browse files
committed
feat(stovepipe): add start controller + change entity for the change ingress flow
Adds the ChangeEvent entity (commit URI + carried, gateway-stamped PartitionKey) and the orchestrator start controller, the pipeline entry point. start is the reading side of the partitioning model: it consumes the partition key stamped by the producer and forwards it to validate verbatim, never re-parsing the URI (mirrors SubmitQueue's request.Queue).
1 parent c4fa997 commit bf2716b

6 files changed

Lines changed: 501 additions & 2 deletions

File tree

stovepipe/entity/BUILD.bazel

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
1-
load("@rules_go//go:def.bzl", "go_library")
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "entity",
5-
srcs = ["entity.go"],
5+
srcs = [
6+
"change_event.go",
7+
"entity.go",
8+
],
69
importpath = "github.com/uber/submitqueue/stovepipe/entity",
710
visibility = ["//visibility:public"],
811
)
12+
13+
go_test(
14+
name = "entity_test",
15+
srcs = ["change_event_test.go"],
16+
embed = [":entity"],
17+
deps = [
18+
"@com_github_stretchr_testify//assert",
19+
"@com_github_stretchr_testify//require",
20+
],
21+
)

stovepipe/entity/change_event.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import (
18+
"encoding/json"
19+
"fmt"
20+
"net/url"
21+
)
22+
23+
// ChangeEvent represents a change in the pipeline.
24+
// URI represents the identity of the change.
25+
//
26+
// PartitionKey is decided once at ingestion (in the gateway) and carried
27+
// unchanged across stages.
28+
type ChangeEvent struct {
29+
// URI represents the identity of the change. The scheme names the VCS; the rest
30+
// is provider-specific (e.g. git://remote/repo/ref/commit_sha).
31+
URI string `json:"uri"`
32+
33+
// PartitionKey is the repository-scoped ordering key the gateway derived from
34+
// the URI at ingestion.
35+
PartitionKey string `json:"partition_key"`
36+
}
37+
38+
// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload.
39+
func (e ChangeEvent) ToBytes() ([]byte, error) {
40+
return json.Marshal(e)
41+
}
42+
43+
// Scheme returns the URI scheme identifying the VCS (e.g. "git"), or "" if the
44+
// URI is empty or not scheme-qualified. It is the key used to select a resolver.
45+
func (e ChangeEvent) Scheme() string {
46+
u, err := url.Parse(e.URI)
47+
if err != nil {
48+
return ""
49+
}
50+
return u.Scheme
51+
}
52+
53+
// Validate checks that the change event carries a scheme-qualified commit URI.
54+
func (e ChangeEvent) Validate() error {
55+
if e.URI == "" {
56+
return fmt.Errorf("change event requires a commit URI")
57+
}
58+
if e.Scheme() == "" {
59+
return fmt.Errorf("change event URI %q must be scheme-qualified", e.URI)
60+
}
61+
return nil
62+
}
63+
64+
// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes.
65+
func ChangeEventFromBytes(data []byte) (ChangeEvent, error) {
66+
var event ChangeEvent
67+
if err := json.Unmarshal(data, &event); err != nil {
68+
return ChangeEvent{}, err
69+
}
70+
if err := event.Validate(); err != nil {
71+
return ChangeEvent{}, err
72+
}
73+
return event, nil
74+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
const testURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01"
25+
26+
func TestChangeEvent_Validate(t *testing.T) {
27+
t.Run("valid scheme-qualified URI", func(t *testing.T) {
28+
require.NoError(t, ChangeEvent{URI: testURI}.Validate())
29+
})
30+
31+
t.Run("rejects empty URI", func(t *testing.T) {
32+
require.Error(t, ChangeEvent{}.Validate())
33+
})
34+
35+
t.Run("rejects URI without scheme", func(t *testing.T) {
36+
require.Error(t, ChangeEvent{URI: "not-a-uri"}.Validate())
37+
})
38+
39+
// Validate is VCS-agnostic: a non-git but scheme-qualified URI passes here;
40+
// rejecting an unsupported VCS is the resolver/wiring layer's job.
41+
t.Run("accepts non-git scheme", func(t *testing.T) {
42+
require.NoError(t, ChangeEvent{URI: "hg://example.com/repo/rev"}.Validate())
43+
})
44+
}
45+
46+
func TestChangeEvent_Scheme(t *testing.T) {
47+
assert.Equal(t, "git", ChangeEvent{URI: testURI}.Scheme())
48+
assert.Equal(t, "", ChangeEvent{URI: "not-a-uri"}.Scheme())
49+
}
50+
51+
func TestChangeEventFromBytes(t *testing.T) {
52+
original := ChangeEvent{URI: testURI, PartitionKey: "git.example.com/uber/monorepo"}
53+
data, err := original.ToBytes()
54+
require.NoError(t, err)
55+
56+
got, err := ChangeEventFromBytes(data)
57+
require.NoError(t, err)
58+
assert.Equal(t, original.URI, got.URI)
59+
assert.Equal(t, original.PartitionKey, got.PartitionKey)
60+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "start",
5+
srcs = ["start.go"],
6+
importpath = "github.com/uber/submitqueue/stovepipe/orchestrator/controller/start",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//core/consumer",
10+
"//core/metrics",
11+
"//entity/messagequeue",
12+
"//stovepipe/core/topickey",
13+
"//stovepipe/entity",
14+
"@com_github_uber_go_tally//:tally",
15+
"@org_uber_go_zap//:zap",
16+
],
17+
)
18+
19+
go_test(
20+
name = "start_test",
21+
srcs = ["start_test.go"],
22+
embed = [":start"],
23+
deps = [
24+
"//core/consumer",
25+
"//entity/messagequeue",
26+
"//extension/messagequeue/mock",
27+
"//stovepipe/core/topickey",
28+
"//stovepipe/entity",
29+
"@com_github_stretchr_testify//assert",
30+
"@com_github_stretchr_testify//require",
31+
"@com_github_uber_go_tally//:tally",
32+
"@org_uber_go_mock//gomock",
33+
"@org_uber_go_zap//zaptest",
34+
],
35+
)
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package start
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/uber-go/tally"
22+
"github.com/uber/submitqueue/core/consumer"
23+
"github.com/uber/submitqueue/core/metrics"
24+
entityqueue "github.com/uber/submitqueue/entity/messagequeue"
25+
"github.com/uber/submitqueue/stovepipe/core/topickey"
26+
entity "github.com/uber/submitqueue/stovepipe/entity"
27+
"go.uber.org/zap"
28+
)
29+
30+
// Controller handles start queue messages. It is the pipeline entry point: it
31+
// deserializes the inbound change event and forwards the commit reference to the
32+
// validate stage using the partition key for ordering.
33+
//
34+
// Ordering key is decided once at ingestion and carried through the pipeline.
35+
//
36+
// Currently a forwarding stub. Per the Stovepipe workflow RFC, start will
37+
// also record the Commit as `unknown` (keyed by SHA, making ingest idempotent across
38+
// the webhook and poll producers) and emit status + log events.
39+
40+
var _ consumer.Controller = (*Controller)(nil)
41+
42+
type Controller struct {
43+
logger *zap.SugaredLogger
44+
metricsScope tally.Scope
45+
registry consumer.TopicRegistry
46+
topicKey consumer.TopicKey
47+
consumerGroup string
48+
}
49+
50+
// Params are the parameters for creating a new start controller.
51+
type Params struct {
52+
Registry consumer.TopicRegistry
53+
TopicKey consumer.TopicKey
54+
ConsumerGroup string
55+
56+
Scope tally.Scope
57+
Logger *zap.SugaredLogger
58+
}
59+
60+
// NewController creates a new start controller for the orchestrator.
61+
func NewController(p Params) *Controller {
62+
return &Controller{
63+
logger: p.Logger.Named("start_controller"),
64+
metricsScope: p.Scope.SubScope("start_controller"),
65+
registry: p.Registry,
66+
topicKey: p.TopicKey,
67+
consumerGroup: p.ConsumerGroup,
68+
}
69+
}
70+
71+
// Process deserializes the change event and forwards the commit to the validate stage.
72+
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
73+
const opName = "process"
74+
75+
op := metrics.Begin(c.metricsScope, opName)
76+
defer func() { op.Complete(retErr) }()
77+
78+
msg := delivery.Message()
79+
80+
event, err := entity.ChangeEventFromBytes(msg.Payload)
81+
if err != nil {
82+
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
83+
return fmt.Errorf("failed to deserialize change event: %w", err)
84+
}
85+
86+
if event.PartitionKey == "" {
87+
metrics.NamedCounter(c.metricsScope, opName, "missing_partition_key", 1)
88+
return fmt.Errorf("change event for uri=%s is missing a partition key (must be stamped by the producer)", event.URI)
89+
}
90+
91+
c.logger.Infow("received change event",
92+
"uri", event.URI,
93+
"attempt", delivery.Attempt(),
94+
"partition_key", event.PartitionKey,
95+
)
96+
97+
// Core Logic to be added here:
98+
// - Record the commit as `unknown` (keyed by SHA)
99+
// - Emit status + log events
100+
101+
if err := c.publish(ctx, topickey.TopicKeyValidate, event, event.PartitionKey); err != nil {
102+
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
103+
return fmt.Errorf("failed to publish to validate: %w", err)
104+
}
105+
106+
c.logger.Infow("published commit to validate",
107+
"uri", event.URI,
108+
"topic_key", topickey.TopicKeyValidate,
109+
)
110+
111+
return nil
112+
}
113+
114+
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, event entity.ChangeEvent, partitionKey string) error {
115+
payload, err := event.ToBytes()
116+
if err != nil {
117+
return fmt.Errorf("failed to serialize change event: %w", err)
118+
}
119+
120+
msg := entityqueue.NewMessage(event.URI, payload, partitionKey, nil)
121+
122+
q, ok := c.registry.Queue(key)
123+
if !ok {
124+
return fmt.Errorf("no queue registered for topic key %s", key)
125+
}
126+
127+
topicName, ok := c.registry.TopicName(key)
128+
if !ok {
129+
return fmt.Errorf("no topic name registered for topic key %s", key)
130+
}
131+
132+
if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
133+
return fmt.Errorf("failed to publish message: %w", err)
134+
}
135+
136+
return nil
137+
}
138+
139+
// Name returns the controller name for logging and metrics.
140+
func (c *Controller) Name() string {
141+
return "start"
142+
}
143+
144+
// TopicKey returns the topic key this controller subscribes to.
145+
func (c *Controller) TopicKey() consumer.TopicKey {
146+
return c.topicKey
147+
}
148+
149+
// ConsumerGroup returns the consumer group for offset tracking.
150+
func (c *Controller) ConsumerGroup() string {
151+
return c.consumerGroup
152+
}

0 commit comments

Comments
 (0)