Skip to content

Commit 03a6c15

Browse files
committed
feat(stovepipe): add start controller + change entity for the change ingress flow
Adds the ChangeEvent entity (commit URI + carried, gateway-stamped PartitionKey) and the orchestrator start controller, the pipeline entry point. start is the reading side of the partitioning model: it consumes the partition key stamped by the producer and forwards it to validate verbatim, never re-parsing the URI (mirrors SubmitQueue's request.Queue).
1 parent c4fa997 commit 03a6c15

6 files changed

Lines changed: 493 additions & 2 deletions

File tree

stovepipe/entity/BUILD.bazel

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
1-
load("@rules_go//go:def.bzl", "go_library")
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "entity",
5-
srcs = ["entity.go"],
5+
srcs = [
6+
"change_event.go",
7+
"entity.go",
8+
],
69
importpath = "github.com/uber/submitqueue/stovepipe/entity",
710
visibility = ["//visibility:public"],
811
)
12+
13+
go_test(
14+
name = "entity_test",
15+
srcs = ["change_event_test.go"],
16+
embed = [":entity"],
17+
deps = [
18+
"@com_github_stretchr_testify//assert",
19+
"@com_github_stretchr_testify//require",
20+
],
21+
)

stovepipe/entity/change_event.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import (
18+
"encoding/json"
19+
"fmt"
20+
"net/url"
21+
)
22+
23+
// ChangeEvent represents a change in the pipeline. URI represents the identity
24+
// of the change.
25+
//
26+
// The repository-scoped ordering key is carried on the queue message envelope
27+
// (Message.PartitionKey), stamped once at ingestion and propagated unchanged
28+
// across stages, so it is deliberately not duplicated here.
29+
type ChangeEvent struct {
30+
// URI represents the identity of the change. The scheme names the VCS; the rest
31+
// is provider-specific (e.g. git://remote/repo/ref/commit_sha).
32+
URI string `json:"uri"`
33+
}
34+
35+
// ToBytes serializes the ChangeEvent to JSON bytes for queue message payload.
36+
func (e ChangeEvent) ToBytes() ([]byte, error) {
37+
return json.Marshal(e)
38+
}
39+
40+
// Scheme returns the URI scheme identifying the VCS (e.g. "git"), or "" if the
41+
// URI is empty or not scheme-qualified. It is the key used to select a resolver.
42+
func (e ChangeEvent) Scheme() string {
43+
u, err := url.Parse(e.URI)
44+
if err != nil {
45+
return ""
46+
}
47+
return u.Scheme
48+
}
49+
50+
// Validate checks that the change event carries a scheme-qualified commit URI.
51+
func (e ChangeEvent) Validate() error {
52+
if e.URI == "" {
53+
return fmt.Errorf("change event requires a commit URI")
54+
}
55+
if e.Scheme() == "" {
56+
return fmt.Errorf("change event URI %q must be scheme-qualified", e.URI)
57+
}
58+
return nil
59+
}
60+
61+
// ChangeEventFromBytes deserializes a ChangeEvent from JSON bytes.
62+
func ChangeEventFromBytes(data []byte) (ChangeEvent, error) {
63+
var event ChangeEvent
64+
if err := json.Unmarshal(data, &event); err != nil {
65+
return ChangeEvent{}, err
66+
}
67+
if err := event.Validate(); err != nil {
68+
return ChangeEvent{}, err
69+
}
70+
return event, nil
71+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
const testURI = "git://git.example.com/uber/monorepo/refs%2Fheads%2Fmain/abcdef0123456789abcdef0123456789abcdef01"
25+
26+
func TestChangeEvent_Validate(t *testing.T) {
27+
t.Run("valid scheme-qualified URI", func(t *testing.T) {
28+
require.NoError(t, ChangeEvent{URI: testURI}.Validate())
29+
})
30+
31+
t.Run("rejects empty URI", func(t *testing.T) {
32+
require.Error(t, ChangeEvent{}.Validate())
33+
})
34+
35+
t.Run("rejects URI without scheme", func(t *testing.T) {
36+
require.Error(t, ChangeEvent{URI: "not-a-uri"}.Validate())
37+
})
38+
39+
// Validate is VCS-agnostic: a non-git but scheme-qualified URI passes here;
40+
// rejecting an unsupported VCS is the resolver/wiring layer's job.
41+
t.Run("accepts non-git scheme", func(t *testing.T) {
42+
require.NoError(t, ChangeEvent{URI: "hg://example.com/repo/rev"}.Validate())
43+
})
44+
}
45+
46+
func TestChangeEvent_Scheme(t *testing.T) {
47+
assert.Equal(t, "git", ChangeEvent{URI: testURI}.Scheme())
48+
assert.Equal(t, "", ChangeEvent{URI: "not-a-uri"}.Scheme())
49+
}
50+
51+
func TestChangeEventFromBytes(t *testing.T) {
52+
original := ChangeEvent{URI: testURI}
53+
data, err := original.ToBytes()
54+
require.NoError(t, err)
55+
56+
got, err := ChangeEventFromBytes(data)
57+
require.NoError(t, err)
58+
assert.Equal(t, original.URI, got.URI)
59+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "start",
5+
srcs = ["start.go"],
6+
importpath = "github.com/uber/submitqueue/stovepipe/orchestrator/controller/start",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//core/consumer",
10+
"//core/metrics",
11+
"//entity/messagequeue",
12+
"//stovepipe/core/topickey",
13+
"//stovepipe/entity",
14+
"@com_github_uber_go_tally//:tally",
15+
"@org_uber_go_zap//:zap",
16+
],
17+
)
18+
19+
go_test(
20+
name = "start_test",
21+
srcs = ["start_test.go"],
22+
embed = [":start"],
23+
deps = [
24+
"//core/consumer",
25+
"//entity/messagequeue",
26+
"//extension/messagequeue/mock",
27+
"//stovepipe/core/topickey",
28+
"//stovepipe/entity",
29+
"@com_github_stretchr_testify//assert",
30+
"@com_github_stretchr_testify//require",
31+
"@com_github_uber_go_tally//:tally",
32+
"@org_uber_go_mock//gomock",
33+
"@org_uber_go_zap//zaptest",
34+
],
35+
)
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package start
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/uber-go/tally"
22+
"github.com/uber/submitqueue/core/consumer"
23+
"github.com/uber/submitqueue/core/metrics"
24+
entityqueue "github.com/uber/submitqueue/entity/messagequeue"
25+
"github.com/uber/submitqueue/stovepipe/core/topickey"
26+
entity "github.com/uber/submitqueue/stovepipe/entity"
27+
"go.uber.org/zap"
28+
)
29+
30+
// Controller handles start queue messages. It is the pipeline entry point: it
31+
// deserializes the inbound change event and forwards the commit reference to the
32+
// validate stage using the partition key for ordering.
33+
//
34+
// Ordering key is decided once at ingestion and carried through the pipeline.
35+
//
36+
// Currently a forwarding stub. Per the Stovepipe workflow RFC, start will
37+
// also record the Commit as `unknown` (keyed by SHA, making ingest idempotent across
38+
// the webhook and poll producers) and emit status + log events.
39+
40+
var _ consumer.Controller = (*Controller)(nil)
41+
42+
type Controller struct {
43+
logger *zap.SugaredLogger
44+
metricsScope tally.Scope
45+
registry consumer.TopicRegistry
46+
topicKey consumer.TopicKey
47+
consumerGroup string
48+
}
49+
50+
// Params are the parameters for creating a new start controller.
51+
type Params struct {
52+
Registry consumer.TopicRegistry
53+
TopicKey consumer.TopicKey
54+
ConsumerGroup string
55+
56+
Scope tally.Scope
57+
Logger *zap.SugaredLogger
58+
}
59+
60+
// NewController creates a new start controller for the orchestrator.
61+
func NewController(p Params) *Controller {
62+
return &Controller{
63+
logger: p.Logger.Named("start_controller"),
64+
metricsScope: p.Scope.SubScope("start_controller"),
65+
registry: p.Registry,
66+
topicKey: p.TopicKey,
67+
consumerGroup: p.ConsumerGroup,
68+
}
69+
}
70+
71+
// Process deserializes the change event and forwards the commit to the validate stage.
72+
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
73+
const opName = "process"
74+
75+
op := metrics.Begin(c.metricsScope, opName)
76+
defer func() { op.Complete(retErr) }()
77+
78+
msg := delivery.Message()
79+
80+
event, err := entity.ChangeEventFromBytes(msg.Payload)
81+
if err != nil {
82+
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
83+
return fmt.Errorf("failed to deserialize change event: %w", err)
84+
}
85+
86+
// The ordering key lives on the message envelope, stamped by the producer at
87+
// ingestion; the controller propagates it verbatim to the next stage.
88+
partitionKey := msg.PartitionKey
89+
if partitionKey == "" {
90+
metrics.NamedCounter(c.metricsScope, opName, "missing_partition_key", 1)
91+
return fmt.Errorf("change event for uri=%s is missing a partition key (must be stamped by the producer)", event.URI)
92+
}
93+
94+
c.logger.Infow("received change event",
95+
"uri", event.URI,
96+
"attempt", delivery.Attempt(),
97+
"partition_key", partitionKey,
98+
)
99+
100+
// Core Logic to be added here:
101+
// - Record the commit as `unknown` (keyed by SHA)
102+
// - Emit status + log events
103+
104+
if err := c.publish(ctx, topickey.TopicKeyValidate, event, partitionKey); err != nil {
105+
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
106+
return fmt.Errorf("failed to publish to validate: %w", err)
107+
}
108+
109+
c.logger.Infow("published commit to validate",
110+
"uri", event.URI,
111+
"topic_key", topickey.TopicKeyValidate,
112+
)
113+
114+
return nil
115+
}
116+
117+
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, event entity.ChangeEvent, partitionKey string) error {
118+
payload, err := event.ToBytes()
119+
if err != nil {
120+
return fmt.Errorf("failed to serialize change event: %w", err)
121+
}
122+
123+
msg := entityqueue.NewMessage(event.URI, payload, partitionKey, nil)
124+
125+
q, ok := c.registry.Queue(key)
126+
if !ok {
127+
return fmt.Errorf("no queue registered for topic key %s", key)
128+
}
129+
130+
topicName, ok := c.registry.TopicName(key)
131+
if !ok {
132+
return fmt.Errorf("no topic name registered for topic key %s", key)
133+
}
134+
135+
if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
136+
return fmt.Errorf("failed to publish message: %w", err)
137+
}
138+
139+
return nil
140+
}
141+
142+
// Name returns the controller name for logging and metrics.
143+
func (c *Controller) Name() string {
144+
return "start"
145+
}
146+
147+
// TopicKey returns the topic key this controller subscribes to.
148+
func (c *Controller) TopicKey() consumer.TopicKey {
149+
return c.topicKey
150+
}
151+
152+
// ConsumerGroup returns the consumer group for offset tracking.
153+
func (c *Controller) ConsumerGroup() string {
154+
return c.consumerGroup
155+
}

0 commit comments

Comments
 (0)