From 7fdab86d45dce176ab02e9a9f30730ec28b2bcbd Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 25 Jun 2026 22:59:57 -0700 Subject: [PATCH 1/3] feat(stovepipe): add Request entity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the stovepipe domain's first entity, the foundation for the ingest pipeline described in the workflow RFC. `Request` represents one validation of a queue at a particular commit: an ID namespaced by the queue, following SubmitQueue's convention — `"request//"` (e.g. `request/monorepo/main/42`) — the queue name, the opaque VCS-agnostic commit `URI` (empty until SourceControl resolution is wired in), a `RequestState` (initial state `accepted`), and a `Version` for optimistic locking. It carries `ToBytes`/`FromBytes` and a lightweight `RequestID` payload type for future ID-only queue hops. --- stovepipe/entity/BUILD.bazel | 18 +++++++ stovepipe/entity/request.go | 93 ++++++++++++++++++++++++++++++++ stovepipe/entity/request_test.go | 90 +++++++++++++++++++++++++++++++ 3 files changed, 201 insertions(+) create mode 100644 stovepipe/entity/BUILD.bazel create mode 100644 stovepipe/entity/request.go create mode 100644 stovepipe/entity/request_test.go diff --git a/stovepipe/entity/BUILD.bazel b/stovepipe/entity/BUILD.bazel new file mode 100644 index 00000000..778cf6d1 --- /dev/null +++ b/stovepipe/entity/BUILD.bazel @@ -0,0 +1,18 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "entity", + srcs = ["request.go"], + importpath = "github.com/uber/submitqueue/stovepipe/entity", + visibility = ["//visibility:public"], +) + +go_test( + name = "entity_test", + srcs = ["request_test.go"], + embed = [":entity"], + deps = [ + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/stovepipe/entity/request.go b/stovepipe/entity/request.go new file mode 100644 index 00000000..e6b8ea3a --- /dev/null +++ b/stovepipe/entity/request.go @@ -0,0 +1,93 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import ( + "encoding/json" +) + +// RequestState defines the internal state of a Stovepipe validation request as it moves +// through the pipeline. States are internal and used to implement a state machine; a +// customer-facing status type may be layered on top later, as in SubmitQueue. +type RequestState string + +const ( + // RequestStateUnknown is the unreachable zero value. It is set by default when the + // structure is initialized and should never be seen in the system. + RequestStateUnknown RequestState = "" + // RequestStateAccepted is the initial state of a request: a new commit has been observed + // for the queue and the request has been admitted into the pipeline, but no validation + // strategy has been chosen yet. Later stages (process, build, record) add their own states. + RequestStateAccepted RequestState = "accepted" +) + +// Request represents a single validation of a queue at a particular commit. The queue reports +// a newly observed commit, Stovepipe mints a Request (identity namespaced by the queue), and +// the request flows through the pipeline accumulating state. +type Request struct { + // **************** + // Immutable fields, fixed at request entity creation + // **************** + + // ID is the globally unique identifier for the request. Format: "request//" + // (e.g. "request/monorepo/main/42"). + ID string `json:"id"` + // Queue is the name of the queue (a named repo+ref) being validated. It namespaces the ID + // and is the stable handle the ingest caller supplies. + Queue string `json:"queue"` + // URI is the opaque, VCS-agnostic locator of the commit under validation, as produced by the + // SourceControl extension. It is empty until SourceControl resolution is wired in. + URI string `json:"uri"` + + // **************** + // Following fields could be changed throughout the lifecycle of the request + // **************** + + // State is the current state of the request in the pipeline. + State RequestState `json:"state"` + // Version is the version of the object. It is used for optimistic locking. + // Versioning starts at 1 and is incremented for each change to the object. + Version int32 `json:"version"` +} + +// ToBytes serializes the Request to JSON bytes for queue message payload. +func (r Request) ToBytes() ([]byte, error) { + return json.Marshal(r) +} + +// RequestFromBytes deserializes a Request from JSON bytes. +func RequestFromBytes(data []byte) (Request, error) { + var req Request + err := json.Unmarshal(data, &req) + return req, err +} + +// RequestID is a lightweight entity for publishing and consuming just the request identifier via the queue. +type RequestID struct { + // ID is the globally unique identifier for the request. + ID string `json:"id"` +} + +// ToBytes serializes the RequestID to JSON bytes for queue message payload. +func (r RequestID) ToBytes() ([]byte, error) { + return json.Marshal(r) +} + +// RequestIDFromBytes deserializes a RequestID from JSON bytes. +func RequestIDFromBytes(data []byte) (RequestID, error) { + var rid RequestID + err := json.Unmarshal(data, &rid) + return rid, err +} diff --git a/stovepipe/entity/request_test.go b/stovepipe/entity/request_test.go new file mode 100644 index 00000000..9d6e202f --- /dev/null +++ b/stovepipe/entity/request_test.go @@ -0,0 +1,90 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRequest_SerializationRoundTrip(t *testing.T) { + tests := []struct { + name string + req Request + }{ + { + name: "accepted with resolved uri", + req: Request{ + ID: "request/monorepo/main/100", + Queue: "monorepo/main", + URI: "git://remote/monorepo/main/abcdef0123456789", + State: RequestStateAccepted, + Version: 1, + }, + }, + { + name: "uri not yet resolved", + req: Request{ + ID: "request/monorepo/main/101", + Queue: "monorepo/main", + URI: "", + State: RequestStateAccepted, + Version: 1, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data, err := tt.req.ToBytes() + require.NoError(t, err) + + deserialized, err := RequestFromBytes(data) + require.NoError(t, err) + + assert.Equal(t, tt.req, deserialized) + }) + } +} + +func TestRequestFromBytes_InvalidJSON(t *testing.T) { + _, err := RequestFromBytes([]byte(`{"invalid": json"}`)) + assert.Error(t, err) +} + +func TestRequestFromBytes_EmptyData(t *testing.T) { + req, err := RequestFromBytes([]byte(`{}`)) + require.NoError(t, err) + + assert.Empty(t, req.ID) + assert.Empty(t, req.Queue) + assert.Empty(t, req.URI) + assert.Equal(t, RequestStateUnknown, req.State) + assert.Equal(t, int32(0), req.Version) +} + +func TestRequestID_SerializationRoundTrip(t *testing.T) { + original := RequestID{ID: "request/monorepo/main/100"} + + data, err := original.ToBytes() + require.NoError(t, err) + + deserialized, err := RequestIDFromBytes(data) + require.NoError(t, err) + + assert.Equal(t, original, deserialized) +} From bb319e491d004d5079e893caf4cccf2cb890468b Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 25 Jun 2026 22:59:57 -0700 Subject: [PATCH 2/3] feat(api/stovepipe): add Ingest RPC to the Stovepipe service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the pipeline's entry RPC. `Ingest(IngestRequest) returns (IngestResponse)` admits a queue's newly observed commit into the validation pipeline: the caller (the external poller) supplies a logical queue name, and the response carries the minted request ID (format `request//`) namespaced by that queue. Only the queue name is on the wire — commit-URI resolution via the SourceControl extension is a follow-up. Includes the regenerated protopb stubs (pb, grpc, yarpc). --- api/stovepipe/proto/stovepipe.proto | 18 +++ api/stovepipe/protopb/stovepipe.pb.go | 122 ++++++++++++++++++-- api/stovepipe/protopb/stovepipe.pb.yarpc.go | 90 ++++++++++++--- api/stovepipe/protopb/stovepipe_grpc.pb.go | 44 ++++++- 4 files changed, 246 insertions(+), 28 deletions(-) diff --git a/api/stovepipe/proto/stovepipe.proto b/api/stovepipe/proto/stovepipe.proto index a56f841a..a00decc9 100644 --- a/api/stovepipe/proto/stovepipe.proto +++ b/api/stovepipe/proto/stovepipe.proto @@ -39,8 +39,26 @@ message PingResponse { string hostname = 4; } +// IngestRequest is the request for the Ingest method. The poller reports that a queue +// (a named repo+ref) has a new commit to validate. +message IngestRequest { + // Logical queue name to validate (e.g. "monorepo/main"). It namespaces the minted + // request ID and is the stable handle for the repo+ref being validated. + string queue = 1; +} + +// IngestResponse is the response for the Ingest method. +message IngestResponse { + // The minted request ID, namespaced by queue. Format: "request//" + // (e.g. "request/monorepo/main/42"). + string id = 1; +} + // Stovepipe provides the Stovepipe API. service Stovepipe { // Ping returns a response indicating the service is alive rpc Ping(PingRequest) returns (PingResponse) {} + // Ingest admits a queue's newly observed commit into the validation pipeline and returns + // the minted request ID. The caller hands off asynchronously; validation happens later. + rpc Ingest(IngestRequest) returns (IngestResponse) {} } diff --git a/api/stovepipe/protopb/stovepipe.pb.go b/api/stovepipe/protopb/stovepipe.pb.go index 1ded621b..25c1a682 100644 --- a/api/stovepipe/protopb/stovepipe.pb.go +++ b/api/stovepipe/protopb/stovepipe.pb.go @@ -155,6 +155,101 @@ func (x *PingResponse) GetHostname() string { return "" } +// IngestRequest is the request for the Ingest method. The poller reports that a queue +// (a named repo+ref) has a new commit to validate. +type IngestRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Logical queue name to validate (e.g. "monorepo/main"). It namespaces the minted + // request ID and is the stable handle for the repo+ref being validated. + Queue string `protobuf:"bytes,1,opt,name=queue,proto3" json:"queue,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *IngestRequest) Reset() { + *x = IngestRequest{} + mi := &file_stovepipe_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *IngestRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IngestRequest) ProtoMessage() {} + +func (x *IngestRequest) ProtoReflect() protoreflect.Message { + mi := &file_stovepipe_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IngestRequest.ProtoReflect.Descriptor instead. +func (*IngestRequest) Descriptor() ([]byte, []int) { + return file_stovepipe_proto_rawDescGZIP(), []int{2} +} + +func (x *IngestRequest) GetQueue() string { + if x != nil { + return x.Queue + } + return "" +} + +// IngestResponse is the response for the Ingest method. +type IngestResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // The minted request ID, namespaced by queue. Format: "request//" + // (e.g. "request/monorepo/main/42"). + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *IngestResponse) Reset() { + *x = IngestResponse{} + mi := &file_stovepipe_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *IngestResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IngestResponse) ProtoMessage() {} + +func (x *IngestResponse) ProtoReflect() protoreflect.Message { + mi := &file_stovepipe_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IngestResponse.ProtoReflect.Descriptor instead. +func (*IngestResponse) Descriptor() ([]byte, []int) { + return file_stovepipe_proto_rawDescGZIP(), []int{3} +} + +func (x *IngestResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + var File_stovepipe_proto protoreflect.FileDescriptor const file_stovepipe_proto_rawDesc = "" + @@ -166,9 +261,14 @@ const file_stovepipe_proto_rawDesc = "" + "\amessage\x18\x01 \x01(\tR\amessage\x12!\n" + "\fservice_name\x18\x02 \x01(\tR\vserviceName\x12\x1c\n" + "\ttimestamp\x18\x03 \x01(\x03R\ttimestamp\x12\x1a\n" + - "\bhostname\x18\x04 \x01(\tR\bhostname2h\n" + + "\bhostname\x18\x04 \x01(\tR\bhostname\"%\n" + + "\rIngestRequest\x12\x14\n" + + "\x05queue\x18\x01 \x01(\tR\x05queue\" \n" + + "\x0eIngestResponse\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id2\xcb\x01\n" + "\tStovepipe\x12[\n" + - "\x04Ping\x12'.uber.submitqueue.stovepipe.PingRequest\x1a(.uber.submitqueue.stovepipe.PingResponse\"\x00Be\n" + + "\x04Ping\x12'.uber.submitqueue.stovepipe.PingRequest\x1a(.uber.submitqueue.stovepipe.PingResponse\"\x00\x12a\n" + + "\x06Ingest\x12).uber.submitqueue.stovepipe.IngestRequest\x1a*.uber.submitqueue.stovepipe.IngestResponse\"\x00Be\n" + "\x1ecom.uber.submitqueue.stovepipeB\x0eStovepipeProtoP\x01Z1github.com/uber/submitqueue/api/stovepipe/protopbb\x06proto3" var ( @@ -183,16 +283,20 @@ func file_stovepipe_proto_rawDescGZIP() []byte { return file_stovepipe_proto_rawDescData } -var file_stovepipe_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_stovepipe_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_stovepipe_proto_goTypes = []any{ - (*PingRequest)(nil), // 0: uber.submitqueue.stovepipe.PingRequest - (*PingResponse)(nil), // 1: uber.submitqueue.stovepipe.PingResponse + (*PingRequest)(nil), // 0: uber.submitqueue.stovepipe.PingRequest + (*PingResponse)(nil), // 1: uber.submitqueue.stovepipe.PingResponse + (*IngestRequest)(nil), // 2: uber.submitqueue.stovepipe.IngestRequest + (*IngestResponse)(nil), // 3: uber.submitqueue.stovepipe.IngestResponse } var file_stovepipe_proto_depIdxs = []int32{ 0, // 0: uber.submitqueue.stovepipe.Stovepipe.Ping:input_type -> uber.submitqueue.stovepipe.PingRequest - 1, // 1: uber.submitqueue.stovepipe.Stovepipe.Ping:output_type -> uber.submitqueue.stovepipe.PingResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type + 2, // 1: uber.submitqueue.stovepipe.Stovepipe.Ingest:input_type -> uber.submitqueue.stovepipe.IngestRequest + 1, // 2: uber.submitqueue.stovepipe.Stovepipe.Ping:output_type -> uber.submitqueue.stovepipe.PingResponse + 3, // 3: uber.submitqueue.stovepipe.Stovepipe.Ingest:output_type -> uber.submitqueue.stovepipe.IngestResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -209,7 +313,7 @@ func file_stovepipe_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_stovepipe_proto_rawDesc), len(file_stovepipe_proto_rawDesc)), NumEnums: 0, - NumMessages: 2, + NumMessages: 4, NumExtensions: 0, NumServices: 1, }, diff --git a/api/stovepipe/protopb/stovepipe.pb.yarpc.go b/api/stovepipe/protopb/stovepipe.pb.yarpc.go index 6731b089..ad6fe9eb 100644 --- a/api/stovepipe/protopb/stovepipe.pb.yarpc.go +++ b/api/stovepipe/protopb/stovepipe.pb.yarpc.go @@ -22,6 +22,7 @@ var _ = ioutil.NopCloser // StovepipeYARPCClient is the YARPC client-side interface for the Stovepipe service. type StovepipeYARPCClient interface { Ping(context.Context, *PingRequest, ...yarpc.CallOption) (*PingResponse, error) + Ingest(context.Context, *IngestRequest, ...yarpc.CallOption) (*IngestResponse, error) } func newStovepipeYARPCClient(clientConfig transport.ClientConfig, anyResolver v2.AnyResolver, options ...v2.ClientOption) StovepipeYARPCClient { @@ -43,6 +44,7 @@ func NewStovepipeYARPCClient(clientConfig transport.ClientConfig, options ...v2. // StovepipeYARPCServer is the YARPC server-side interface for the Stovepipe service. type StovepipeYARPCServer interface { Ping(context.Context, *PingRequest) (*PingResponse, error) + Ingest(context.Context, *IngestRequest) (*IngestResponse, error) } type buildStovepipeYARPCProceduresParams struct { @@ -66,6 +68,16 @@ func buildStovepipeYARPCProcedures(params buildStovepipeYARPCProceduresParams) [ }, ), }, + { + MethodName: "Ingest", + Handler: v2.NewUnaryHandler( + v2.UnaryHandlerParams{ + Handle: handler.Ingest, + NewRequest: newStovepipeServiceIngestYARPCRequest, + AnyResolver: params.AnyResolver, + }, + ), + }, }, OnewayHandlerParams: []v2.BuildProceduresOnewayHandlerParams{}, StreamHandlerParams: []v2.BuildProceduresStreamHandlerParams{}, @@ -190,6 +202,18 @@ func (c *_StovepipeYARPCCaller) Ping(ctx context.Context, request *PingRequest, return response, err } +func (c *_StovepipeYARPCCaller) Ingest(ctx context.Context, request *IngestRequest, options ...yarpc.CallOption) (*IngestResponse, error) { + responseMessage, err := c.streamClient.Call(ctx, "Ingest", request, newStovepipeServiceIngestYARPCResponse, options...) + if responseMessage == nil { + return nil, err + } + response, ok := responseMessage.(*IngestResponse) + if !ok { + return nil, v2.CastError(emptyStovepipeServiceIngestYARPCResponse, responseMessage) + } + return response, err +} + type _StovepipeYARPCHandler struct { server StovepipeYARPCServer } @@ -210,6 +234,22 @@ func (h *_StovepipeYARPCHandler) Ping(ctx context.Context, requestMessage proto. return response, err } +func (h *_StovepipeYARPCHandler) Ingest(ctx context.Context, requestMessage proto.Message) (proto.Message, error) { + var request *IngestRequest + var ok bool + if requestMessage != nil { + request, ok = requestMessage.(*IngestRequest) + if !ok { + return nil, v2.CastError(emptyStovepipeServiceIngestYARPCRequest, requestMessage) + } + } + response, err := h.server.Ingest(ctx, request) + if response == nil { + return nil, err + } + return response, err +} + func newStovepipeServicePingYARPCRequest() proto.Message { return &PingRequest{} } @@ -218,30 +258,44 @@ func newStovepipeServicePingYARPCResponse() proto.Message { return &PingResponse{} } +func newStovepipeServiceIngestYARPCRequest() proto.Message { + return &IngestRequest{} +} + +func newStovepipeServiceIngestYARPCResponse() proto.Message { + return &IngestResponse{} +} + var ( - emptyStovepipeServicePingYARPCRequest = &PingRequest{} - emptyStovepipeServicePingYARPCResponse = &PingResponse{} + emptyStovepipeServicePingYARPCRequest = &PingRequest{} + emptyStovepipeServicePingYARPCResponse = &PingResponse{} + emptyStovepipeServiceIngestYARPCRequest = &IngestRequest{} + emptyStovepipeServiceIngestYARPCResponse = &IngestResponse{} ) var yarpcFileDescriptorClosurefabdb6b3c0b09022 = [][]byte{ // stovepipe.proto []byte{ - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0xb1, 0x4e, 0xc3, 0x30, - 0x10, 0x86, 0x31, 0xad, 0x80, 0x5e, 0x2b, 0x90, 0x3c, 0x45, 0x11, 0x42, 0x25, 0x4b, 0x33, 0xd9, - 0x02, 0xde, 0xa0, 0x0f, 0x80, 0xa2, 0xb0, 0xc1, 0x80, 0xec, 0xe8, 0x94, 0x78, 0x70, 0xec, 0xe6, - 0xec, 0xbe, 0x01, 0xef, 0x8d, 0xe2, 0xd2, 0xd0, 0xa5, 0xb0, 0xf9, 0xce, 0xdf, 0x27, 0xfd, 0xbf, - 0x0d, 0x77, 0x14, 0xdc, 0x1e, 0xbd, 0xf1, 0x28, 0xfc, 0xe0, 0x82, 0xe3, 0x79, 0xd4, 0x38, 0x08, - 0x8a, 0xda, 0x9a, 0xb0, 0x8b, 0x18, 0x51, 0x4c, 0x44, 0xb1, 0x81, 0x65, 0x65, 0xfa, 0xb6, 0xc6, - 0x5d, 0x44, 0x0a, 0x3c, 0x83, 0x6b, 0x8b, 0x44, 0xaa, 0xc5, 0x8c, 0xad, 0x59, 0xb9, 0xa8, 0x8f, - 0x63, 0xf1, 0xc5, 0x60, 0x75, 0x20, 0xc9, 0xbb, 0x9e, 0xf0, 0x3c, 0xca, 0x1f, 0x61, 0x45, 0x38, - 0xec, 0x4d, 0x83, 0x9f, 0xbd, 0xb2, 0x98, 0x5d, 0xa6, 0xeb, 0xe5, 0xcf, 0xee, 0x55, 0x59, 0xe4, - 0xf7, 0xb0, 0x08, 0xc6, 0x22, 0x05, 0x65, 0x7d, 0x36, 0x5b, 0xb3, 0x72, 0x56, 0xff, 0x2e, 0x78, - 0x0e, 0x37, 0x9d, 0xa3, 0x90, 0xe4, 0x79, 0x92, 0xa7, 0xf9, 0xb9, 0x83, 0xc5, 0xdb, 0x31, 0x3d, - 0xff, 0x80, 0xf9, 0x98, 0x89, 0x6f, 0xc4, 0xf9, 0x8a, 0xe2, 0xa4, 0x5f, 0x5e, 0xfe, 0x0f, 0x1e, - 0xea, 0x15, 0x17, 0x5b, 0x84, 0x87, 0xc6, 0xd9, 0x3f, 0x84, 0xed, 0xed, 0x94, 0xa4, 0x1a, 0x1f, - 0xba, 0x62, 0xef, 0x4f, 0xad, 0x09, 0x5d, 0xd4, 0xa2, 0x71, 0x56, 0x8e, 0xa2, 0x3c, 0x11, 0xa5, - 0xf2, 0x46, 0x4e, 0xb2, 0x4c, 0x7f, 0xe3, 0xb5, 0xbe, 0x4a, 0x87, 0x97, 0xef, 0x00, 0x00, 0x00, - 0xff, 0xff, 0xb6, 0x5b, 0x9f, 0x24, 0xb7, 0x01, 0x00, 0x00, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x4f, 0x4e, 0xf3, 0x30, + 0x10, 0xc5, 0x3f, 0xb7, 0xfd, 0x0a, 0x9d, 0x96, 0x22, 0x59, 0x2c, 0xa2, 0x08, 0xa1, 0x12, 0x09, + 0xb5, 0xb0, 0x70, 0x04, 0xdc, 0xa0, 0x3b, 0x36, 0xa8, 0x0a, 0x3b, 0x58, 0x20, 0x27, 0x1d, 0xa5, + 0x5e, 0x38, 0x76, 0x63, 0xbb, 0x37, 0xe0, 0x74, 0x5c, 0x0a, 0xc5, 0xf9, 0x43, 0x59, 0xb4, 0xb0, + 0xf3, 0x8c, 0xdf, 0x4f, 0xf3, 0xde, 0x68, 0xe0, 0xdc, 0x58, 0xb5, 0x43, 0x2d, 0x34, 0x32, 0x5d, + 0x2a, 0xab, 0x68, 0xe8, 0x52, 0x2c, 0x99, 0x71, 0xa9, 0x14, 0x76, 0xeb, 0xd0, 0x21, 0xeb, 0x14, + 0xd1, 0x1c, 0xc6, 0x2b, 0x51, 0xe4, 0x09, 0x6e, 0x1d, 0x1a, 0x4b, 0x03, 0x38, 0x91, 0x68, 0x0c, + 0xcf, 0x31, 0x20, 0x33, 0xb2, 0x18, 0x25, 0x6d, 0x19, 0x7d, 0x10, 0x98, 0xd4, 0x4a, 0xa3, 0x55, + 0x61, 0xf0, 0xb0, 0x94, 0x5e, 0xc3, 0xc4, 0x60, 0xb9, 0x13, 0x19, 0xbe, 0x17, 0x5c, 0x62, 0xd0, + 0xf3, 0xdf, 0xe3, 0xa6, 0xf7, 0xcc, 0x25, 0xd2, 0x4b, 0x18, 0x59, 0x21, 0xd1, 0x58, 0x2e, 0x75, + 0xd0, 0x9f, 0x91, 0x45, 0x3f, 0xf9, 0x6e, 0xd0, 0x10, 0x4e, 0x37, 0xca, 0x58, 0x0f, 0x0f, 0x3c, + 0xdc, 0xd5, 0xd1, 0x0d, 0x9c, 0x3d, 0x15, 0x39, 0x1a, 0xdb, 0x5a, 0xbe, 0x80, 0xff, 0x3e, 0x54, + 0xe3, 0xa2, 0x2e, 0xa2, 0x19, 0x4c, 0x5b, 0x59, 0xe3, 0x77, 0x0a, 0x3d, 0xb1, 0x6e, 0x44, 0x3d, + 0xb1, 0x7e, 0xf8, 0x24, 0x30, 0x7a, 0x69, 0xf7, 0x40, 0xdf, 0x60, 0x50, 0xa5, 0xa3, 0x73, 0x76, + 0x78, 0x59, 0x6c, 0x6f, 0x53, 0xe1, 0xe2, 0x77, 0x61, 0x3d, 0x38, 0xfa, 0x47, 0x39, 0x0c, 0x6b, + 0x33, 0xf4, 0xf6, 0x18, 0xf5, 0x23, 0x57, 0x78, 0xf7, 0x17, 0x69, 0x3b, 0x62, 0x89, 0x70, 0x95, + 0x29, 0x79, 0x04, 0x59, 0x4e, 0xbb, 0xb0, 0xab, 0xea, 0x2a, 0x56, 0xe4, 0xf5, 0x3e, 0x17, 0x76, + 0xe3, 0x52, 0x96, 0x29, 0x19, 0x57, 0x60, 0xbc, 0x07, 0xc6, 0x5c, 0x8b, 0xb8, 0x83, 0x63, 0x7f, + 0x48, 0x3a, 0x4d, 0x87, 0xfe, 0xf1, 0xf8, 0x15, 0x00, 0x00, 0xff, 0xff, 0xc1, 0x6e, 0x43, 0xbc, + 0x64, 0x02, 0x00, 0x00, }, } diff --git a/api/stovepipe/protopb/stovepipe_grpc.pb.go b/api/stovepipe/protopb/stovepipe_grpc.pb.go index 3cf25bc3..ec4f58a4 100644 --- a/api/stovepipe/protopb/stovepipe_grpc.pb.go +++ b/api/stovepipe/protopb/stovepipe_grpc.pb.go @@ -34,7 +34,8 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - Stovepipe_Ping_FullMethodName = "/uber.submitqueue.stovepipe.Stovepipe/Ping" + Stovepipe_Ping_FullMethodName = "/uber.submitqueue.stovepipe.Stovepipe/Ping" + Stovepipe_Ingest_FullMethodName = "/uber.submitqueue.stovepipe.Stovepipe/Ingest" ) // StovepipeClient is the client API for Stovepipe service. @@ -45,6 +46,9 @@ const ( type StovepipeClient interface { // Ping returns a response indicating the service is alive Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) + // Ingest admits a queue's newly observed commit into the validation pipeline and returns + // the minted request ID. The caller hands off asynchronously; validation happens later. + Ingest(ctx context.Context, in *IngestRequest, opts ...grpc.CallOption) (*IngestResponse, error) } type stovepipeClient struct { @@ -65,6 +69,16 @@ func (c *stovepipeClient) Ping(ctx context.Context, in *PingRequest, opts ...grp return out, nil } +func (c *stovepipeClient) Ingest(ctx context.Context, in *IngestRequest, opts ...grpc.CallOption) (*IngestResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(IngestResponse) + err := c.cc.Invoke(ctx, Stovepipe_Ingest_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // StovepipeServer is the server API for Stovepipe service. // All implementations must embed UnimplementedStovepipeServer // for forward compatibility. @@ -73,6 +87,9 @@ func (c *stovepipeClient) Ping(ctx context.Context, in *PingRequest, opts ...grp type StovepipeServer interface { // Ping returns a response indicating the service is alive Ping(context.Context, *PingRequest) (*PingResponse, error) + // Ingest admits a queue's newly observed commit into the validation pipeline and returns + // the minted request ID. The caller hands off asynchronously; validation happens later. + Ingest(context.Context, *IngestRequest) (*IngestResponse, error) mustEmbedUnimplementedStovepipeServer() } @@ -86,6 +103,9 @@ type UnimplementedStovepipeServer struct{} func (UnimplementedStovepipeServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") } +func (UnimplementedStovepipeServer) Ingest(context.Context, *IngestRequest) (*IngestResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ingest not implemented") +} func (UnimplementedStovepipeServer) mustEmbedUnimplementedStovepipeServer() {} func (UnimplementedStovepipeServer) testEmbeddedByValue() {} @@ -125,6 +145,24 @@ func _Stovepipe_Ping_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Stovepipe_Ingest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(IngestRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StovepipeServer).Ingest(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Stovepipe_Ingest_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StovepipeServer).Ingest(ctx, req.(*IngestRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Stovepipe_ServiceDesc is the grpc.ServiceDesc for Stovepipe service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -136,6 +174,10 @@ var Stovepipe_ServiceDesc = grpc.ServiceDesc{ MethodName: "Ping", Handler: _Stovepipe_Ping_Handler, }, + { + MethodName: "Ingest", + Handler: _Stovepipe_Ingest_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "stovepipe.proto", From 58e63c7c443d4e85091bb8b2fa7588318e078abf Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 25 Jun 2026 22:59:57 -0700 Subject: [PATCH 3/3] feat(stovepipe): add thin Ingest controller and wire it into the example server Add the IngestController, the pipeline's entry point. It validates the queue name, mints a request ID via the counter extension (format `request//`, following SubmitQueue's convention), builds the resulting `Request` (state `accepted`), logs it, and returns the ID. This is intentionally thin: resolving the commit URI via the SourceControl extension, persisting the Request, and publishing it onto the pipeline are follow-ups. The example server wires the controller behind the new Ingest RPC using a minimal in-process counter (a real deployment supplies a persistent counter implementation). --- example/stovepipe/server/main.go | 36 +++++++++-- stovepipe/controller/BUILD.bazel | 15 ++++- stovepipe/controller/ingest.go | 96 +++++++++++++++++++++++++++++ stovepipe/controller/ingest_test.go | 70 +++++++++++++++++++++ 4 files changed, 211 insertions(+), 6 deletions(-) create mode 100644 stovepipe/controller/ingest.go create mode 100644 stovepipe/controller/ingest_test.go diff --git a/example/stovepipe/server/main.go b/example/stovepipe/server/main.go index 5428c8ed..8ab97ae1 100644 --- a/example/stovepipe/server/main.go +++ b/example/stovepipe/server/main.go @@ -33,10 +33,11 @@ import ( "google.golang.org/grpc/reflection" ) -// StovepipeServer wraps the controller and implements the gRPC service interface. +// StovepipeServer wraps the controllers and implements the gRPC service interface. type StovepipeServer struct { pb.UnimplementedStovepipeServer - pingController *controller.PingController + pingController *controller.PingController + ingestController *controller.IngestController } // Ping delegates to the controller. @@ -44,6 +45,31 @@ func (s *StovepipeServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.Pi return s.pingController.Ping(ctx, req) } +// Ingest delegates to the controller. +func (s *StovepipeServer) Ingest(ctx context.Context, req *pb.IngestRequest) (*pb.IngestResponse, error) { + return s.ingestController.Ingest(ctx, req) +} + +// inMemoryCounter is a minimal, process-local counter.Counter used to wire the example +// server. It is not durable; a real deployment supplies a persistent implementation +// (e.g. platform/extension/counter/mysql). +type inMemoryCounter struct { + mu sync.Mutex + values map[string]int64 +} + +func newInMemoryCounter() *inMemoryCounter { + return &inMemoryCounter{values: make(map[string]int64)} +} + +// Next returns the next value in the sequence for the given domain, starting at 1. +func (c *inMemoryCounter) Next(_ context.Context, domain string) (int64, error) { + c.mu.Lock() + defer c.mu.Unlock() + c.values[domain]++ + return c.values[domain], nil +} + func main() { code := 0 if err := run(); err != nil { @@ -108,10 +134,12 @@ func run() error { // Create gRPC server grpcServer := grpc.NewServer() - // Create ping controller and wrap it for gRPC + // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) + ingestController := controller.NewIngestController(logger.Sugar(), scope, newInMemoryCounter()) srv := &StovepipeServer{ - pingController: pingController, + pingController: pingController, + ingestController: ingestController, } pb.RegisterStovepipeServer(grpcServer, srv) diff --git a/stovepipe/controller/BUILD.bazel b/stovepipe/controller/BUILD.bazel index e9a9f4cc..51aa4636 100644 --- a/stovepipe/controller/BUILD.bazel +++ b/stovepipe/controller/BUILD.bazel @@ -2,12 +2,18 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "controller", - srcs = ["ping.go"], + srcs = [ + "ingest.go", + "ping.go", + ], importpath = "github.com/uber/submitqueue/stovepipe/controller", visibility = ["//visibility:public"], deps = [ "//api/stovepipe/protopb", + "//platform/errs", + "//platform/extension/counter", "//platform/metrics", + "//stovepipe/entity", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -15,13 +21,18 @@ go_library( go_test( name = "controller_test", - srcs = ["ping_test.go"], + srcs = [ + "ingest_test.go", + "ping_test.go", + ], embed = [":controller"], deps = [ "//api/stovepipe/protopb", + "//platform/extension/counter/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", "@org_uber_go_zap//:zap", ], ) diff --git a/stovepipe/controller/ingest.go b/stovepipe/controller/ingest.go new file mode 100644 index 00000000..8d1fd180 --- /dev/null +++ b/stovepipe/controller/ingest.go @@ -0,0 +1,96 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "errors" + "fmt" + + "github.com/uber-go/tally" + pb "github.com/uber/submitqueue/api/stovepipe/protopb" + "github.com/uber/submitqueue/platform/errs" + "github.com/uber/submitqueue/platform/extension/counter" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/stovepipe/entity" + "go.uber.org/zap" +) + +// ErrInvalidRequest is returned when the request fails validation. +// This error should be mapped to codes.InvalidArgument at the gRPC layer. +var ErrInvalidRequest = errs.NewUserError(errors.New("invalid request")) + +// IsInvalidRequest returns true if any error in the error chain is ErrInvalidRequest. +func IsInvalidRequest(err error) bool { + return errors.Is(err, ErrInvalidRequest) +} + +// IngestController handles ingest business logic for stovepipe: it admits a queue's newly +// observed commit into the validation pipeline. +// +// This is the thin entry point. It mints a request ID namespaced by the queue and records the +// resulting Request. Resolving the commit URI via the SourceControl extension, persisting the +// Request, and publishing it onto the pipeline are deliberately out of scope for now. +type IngestController struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + counter counter.Counter +} + +// NewIngestController creates a new instance of the stovepipe ingest controller. +func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter) *IngestController { + return &IngestController{ + logger: logger, + metricsScope: scope.SubScope("ingest_controller"), + counter: counter, + } +} + +// Ingest admits a queue's newly observed commit into the validation pipeline and returns the minted request ID. +func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (resp *pb.IngestResponse, retErr error) { + const opName = "ingest" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + if req.Queue == "" { + return nil, fmt.Errorf("IngestController requires the request to have a queue name specified: %w", ErrInvalidRequest) + } + + queue := req.Queue + + // Generate a globally unique request ID namespaced by the queue. The counter domain + // ("request/") doubles as the ID prefix, so the ID is "/". + domain := "request/" + queue + seq, err := c.counter.Next(ctx, domain) + if err != nil { + return nil, fmt.Errorf("IngestController failed to generate request ID for queue=%s: %w", queue, err) + } + + request := entity.Request{ + ID: fmt.Sprintf("%s/%d", domain, seq), + Queue: queue, + State: entity.RequestStateAccepted, + Version: 1, + } + + c.logger.Infow("accepted request", + "id", request.ID, + "queue", request.Queue, + "state", request.State, + ) + + return &pb.IngestResponse{Id: request.ID}, nil +} diff --git a/stovepipe/controller/ingest_test.go b/stovepipe/controller/ingest_test.go new file mode 100644 index 00000000..c0500251 --- /dev/null +++ b/stovepipe/controller/ingest_test.go @@ -0,0 +1,70 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + pb "github.com/uber/submitqueue/api/stovepipe/protopb" + countermock "github.com/uber/submitqueue/platform/extension/counter/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +func newIngestController(t *testing.T, c *countermock.MockCounter) *IngestController { + t.Helper() + return NewIngestController(zap.NewNop().Sugar(), tally.NewTestScope("test", nil), c) +} + +func TestIngestController_Ingest(t *testing.T) { + ctrl := gomock.NewController(t) + mockCounter := countermock.NewMockCounter(ctrl) + mockCounter.EXPECT().Next(gomock.Any(), "request/monorepo/main").Return(int64(7), nil) + + c := newIngestController(t, mockCounter) + + resp, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: "monorepo/main"}) + require.NoError(t, err) + assert.Equal(t, "request/monorepo/main/7", resp.Id) +} + +func TestIngestController_Ingest_EmptyQueue(t *testing.T) { + ctrl := gomock.NewController(t) + mockCounter := countermock.NewMockCounter(ctrl) + // Counter must not be consulted when the queue is missing. + + c := newIngestController(t, mockCounter) + + _, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: ""}) + require.Error(t, err) + assert.True(t, IsInvalidRequest(err)) +} + +func TestIngestController_Ingest_CounterError(t *testing.T) { + ctrl := gomock.NewController(t) + mockCounter := countermock.NewMockCounter(ctrl) + mockCounter.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), errors.New("counter unavailable")) + + c := newIngestController(t, mockCounter) + + _, err := c.Ingest(context.Background(), &pb.IngestRequest{Queue: "monorepo/main"}) + require.Error(t, err) + assert.False(t, IsInvalidRequest(err)) +}