Skip to content

Commit 92efd41

Browse files
committed
feat(stovepipe): add thin Ingest controller and wire it into the example server
Add the IngestController, the pipeline's entry point. It validates the queue name, mints a request ID via the counter extension (format `request/<queue>/<counter>`, following SubmitQueue's convention), builds the resulting `Request` (state `accepted`), logs it, and returns the ID. This is intentionally thin: resolving the commit URI via the SourceControl extension, persisting the Request, and publishing it onto the pipeline are follow-ups. The example server wires the controller behind the new Ingest RPC using a minimal in-process counter (a real deployment supplies a persistent counter implementation).
1 parent 36c476f commit 92efd41

4 files changed

Lines changed: 209 additions & 6 deletions

File tree

example/stovepipe/server/main.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,43 @@ import (
3333
"google.golang.org/grpc/reflection"
3434
)
3535

36-
// StovepipeServer wraps the controller and implements the gRPC service interface.
36+
// StovepipeServer wraps the controllers and implements the gRPC service interface.
3737
type StovepipeServer struct {
3838
pb.UnimplementedStovepipeServer
39-
pingController *controller.PingController
39+
pingController *controller.PingController
40+
ingestController *controller.IngestController
4041
}
4142

4243
// Ping delegates to the controller.
4344
func (s *StovepipeServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
4445
return s.pingController.Ping(ctx, req)
4546
}
4647

48+
// Ingest delegates to the controller.
49+
func (s *StovepipeServer) Ingest(ctx context.Context, req *pb.IngestRequest) (*pb.IngestResponse, error) {
50+
return s.ingestController.Ingest(ctx, req)
51+
}
52+
53+
// inMemoryCounter is a minimal, process-local counter.Counter used to wire the example
54+
// server. It is not durable; a real deployment supplies a persistent implementation
55+
// (e.g. platform/extension/counter/mysql).
56+
type inMemoryCounter struct {
57+
mu sync.Mutex
58+
values map[string]int64
59+
}
60+
61+
func newInMemoryCounter() *inMemoryCounter {
62+
return &inMemoryCounter{values: make(map[string]int64)}
63+
}
64+
65+
// Next returns the next value in the sequence for the given domain, starting at 1.
66+
func (c *inMemoryCounter) Next(_ context.Context, domain string) (int64, error) {
67+
c.mu.Lock()
68+
defer c.mu.Unlock()
69+
c.values[domain]++
70+
return c.values[domain], nil
71+
}
72+
4773
func main() {
4874
code := 0
4975
if err := run(); err != nil {
@@ -108,10 +134,12 @@ func run() error {
108134
// Create gRPC server
109135
grpcServer := grpc.NewServer()
110136

111-
// Create ping controller and wrap it for gRPC
137+
// Create controllers and wrap them for gRPC
112138
pingController := controller.NewPingController(logger, scope)
139+
ingestController := controller.NewIngestController(logger.Sugar(), scope, newInMemoryCounter())
113140
srv := &StovepipeServer{
114-
pingController: pingController,
141+
pingController: pingController,
142+
ingestController: ingestController,
115143
}
116144
pb.RegisterStovepipeServer(grpcServer, srv)
117145

stovepipe/controller/BUILD.bazel

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,37 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "controller",
5-
srcs = ["ping.go"],
5+
srcs = [
6+
"ingest.go",
7+
"ping.go",
8+
],
69
importpath = "github.com/uber/submitqueue/stovepipe/controller",
710
visibility = ["//visibility:public"],
811
deps = [
912
"//api/stovepipe/protopb",
13+
"//platform/errs",
14+
"//platform/extension/counter",
1015
"//platform/metrics",
16+
"//stovepipe/entity",
1117
"@com_github_uber_go_tally//:tally",
1218
"@org_uber_go_zap//:zap",
1319
],
1420
)
1521

1622
go_test(
1723
name = "controller_test",
18-
srcs = ["ping_test.go"],
24+
srcs = [
25+
"ingest_test.go",
26+
"ping_test.go",
27+
],
1928
embed = [":controller"],
2029
deps = [
2130
"//api/stovepipe/protopb",
31+
"//platform/extension/counter/mock",
2232
"@com_github_stretchr_testify//assert",
2333
"@com_github_stretchr_testify//require",
2434
"@com_github_uber_go_tally//:tally",
35+
"@org_uber_go_mock//gomock",
2536
"@org_uber_go_zap//:zap",
2637
],
2738
)

stovepipe/controller/ingest.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package controller
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
22+
"github.com/uber-go/tally"
23+
pb "github.com/uber/submitqueue/api/stovepipe/protopb"
24+
"github.com/uber/submitqueue/platform/errs"
25+
"github.com/uber/submitqueue/platform/extension/counter"
26+
"github.com/uber/submitqueue/platform/metrics"
27+
"github.com/uber/submitqueue/stovepipe/entity"
28+
"go.uber.org/zap"
29+
)
30+
31+
// ErrInvalidRequest is returned when the request fails validation.
32+
// This error should be mapped to codes.InvalidArgument at the gRPC layer.
33+
var ErrInvalidRequest = errs.NewUserError(errors.New("invalid request"))
34+
35+
// IsInvalidRequest returns true if any error in the error chain is ErrInvalidRequest.
36+
func IsInvalidRequest(err error) bool {
37+
return errors.Is(err, ErrInvalidRequest)
38+
}
39+
40+
// IngestController handles ingest business logic for stovepipe: it admits a queue's newly
41+
// observed commit into the validation pipeline.
42+
//
43+
// This is the thin entry point. It mints a request ID namespaced by the queue and records the
44+
// resulting Request. Resolving the commit URI via the SourceControl extension, persisting the
45+
// Request, and publishing it onto the pipeline are deliberately out of scope for now.
46+
type IngestController struct {
47+
logger *zap.SugaredLogger
48+
metricsScope tally.Scope
49+
counter counter.Counter
50+
}
51+
52+
// NewIngestController creates a new instance of the stovepipe ingest controller.
53+
func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter) *IngestController {
54+
return &IngestController{
55+
logger: logger,
56+
metricsScope: scope.SubScope("ingest_controller"),
57+
counter: counter,
58+
}
59+
}
60+
61+
// Ingest admits a queue's newly observed commit into the validation pipeline and returns the minted request ID.
62+
func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (resp *pb.IngestResponse, retErr error) {
63+
const opName = "ingest"
64+
65+
op := metrics.Begin(c.metricsScope, opName)
66+
defer func() { op.Complete(retErr) }()
67+
68+
if req.Queue == "" {
69+
return nil, fmt.Errorf("IngestController requires the request to have a queue name specified: %w", ErrInvalidRequest)
70+
}
71+
72+
queue := req.Queue
73+
74+
// Generate a globally unique request ID namespaced by the queue.
75+
seq, err := c.counter.Next(ctx, "request/"+queue)
76+
if err != nil {
77+
return nil, fmt.Errorf("IngestController failed to generate request ID for queue=%s: %w", queue, err)
78+
}
79+
80+
request := entity.Request{
81+
ID: fmt.Sprintf("request/%s/%d", queue, seq),
82+
Queue: queue,
83+
State: entity.RequestStateAccepted,
84+
Version: 1,
85+
}
86+
87+
c.logger.Infow("accepted request",
88+
"id", request.ID,
89+
"queue", request.Queue,
90+
"state", request.State,
91+
)
92+
93+
return &pb.IngestResponse{Id: request.ID}, nil
94+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package controller
16+
17+
import (
18+
"context"
19+
"errors"
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
"github.com/uber-go/tally"
25+
pb "github.com/uber/submitqueue/api/stovepipe/protopb"
26+
countermock "github.com/uber/submitqueue/platform/extension/counter/mock"
27+
"go.uber.org/mock/gomock"
28+
"go.uber.org/zap"
29+
)
30+
31+
func newIngestController(t *testing.T, c *countermock.MockCounter) *IngestController {
32+
t.Helper()
33+
return NewIngestController(zap.NewNop().Sugar(), tally.NewTestScope("test", nil), c)
34+
}
35+
36+
func TestIngestController_Ingest(t *testing.T) {
37+
ctrl := gomock.NewController(t)
38+
mockCounter := countermock.NewMockCounter(ctrl)
39+
mockCounter.EXPECT().Next(gomock.Any(), "request/monorepo/main").Return(int64(7), nil)
40+
41+
c := newIngestController(t, mockCounter)
42+
43+
resp, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: "monorepo/main"})
44+
require.NoError(t, err)
45+
assert.Equal(t, "request/monorepo/main/7", resp.Id)
46+
}
47+
48+
func TestIngestController_Ingest_EmptyQueue(t *testing.T) {
49+
ctrl := gomock.NewController(t)
50+
mockCounter := countermock.NewMockCounter(ctrl)
51+
// Counter must not be consulted when the queue is missing.
52+
53+
c := newIngestController(t, mockCounter)
54+
55+
_, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: ""})
56+
require.Error(t, err)
57+
assert.True(t, IsInvalidRequest(err))
58+
}
59+
60+
func TestIngestController_Ingest_CounterError(t *testing.T) {
61+
ctrl := gomock.NewController(t)
62+
mockCounter := countermock.NewMockCounter(ctrl)
63+
mockCounter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), errors.New("counter unavailable"))
64+
65+
c := newIngestController(t, mockCounter)
66+
67+
_, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: "monorepo/main"})
68+
require.Error(t, err)
69+
assert.False(t, IsInvalidRequest(err))
70+
}

0 commit comments

Comments
 (0)