diff --git a/stovepipe/extension/storage/BUILD.bazel b/stovepipe/extension/storage/BUILD.bazel new file mode 100644 index 00000000..15ee8578 --- /dev/null +++ b/stovepipe/extension/storage/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "storage", + srcs = [ + "request_store.go", + "request_uri_store.go", + "storage.go", + ], + importpath = "github.com/uber/submitqueue/stovepipe/extension/storage", + visibility = ["//visibility:public"], + deps = ["//stovepipe/entity"], +) diff --git a/stovepipe/extension/storage/mock/BUILD.bazel b/stovepipe/extension/storage/mock/BUILD.bazel new file mode 100644 index 00000000..c82888ad --- /dev/null +++ b/stovepipe/extension/storage/mock/BUILD.bazel @@ -0,0 +1,17 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = [ + "request_store_mock.go", + "request_uri_store_mock.go", + "storage_mock.go", + ], + importpath = "github.com/uber/submitqueue/stovepipe/extension/storage/mock", + visibility = ["//visibility:public"], + deps = [ + "//stovepipe/entity", + "//stovepipe/extension/storage", + "@org_uber_go_mock//gomock", + ], +) diff --git a/stovepipe/extension/storage/mock/request_store_mock.go b/stovepipe/extension/storage/mock/request_store_mock.go new file mode 100644 index 00000000..089ae8a1 --- /dev/null +++ b/stovepipe/extension/storage/mock/request_store_mock.go @@ -0,0 +1,85 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: request_store.go +// +// Generated by this command: +// +// mockgen -source=request_store.go -destination=mock/request_store_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/stovepipe/entity" + gomock "go.uber.org/mock/gomock" +) + +// MockRequestStore is a mock of RequestStore interface. +type MockRequestStore struct { + ctrl *gomock.Controller + recorder *MockRequestStoreMockRecorder + isgomock struct{} +} + +// MockRequestStoreMockRecorder is the mock recorder for MockRequestStore. +type MockRequestStoreMockRecorder struct { + mock *MockRequestStore +} + +// NewMockRequestStore creates a new mock instance. +func NewMockRequestStore(ctrl *gomock.Controller) *MockRequestStore { + mock := &MockRequestStore{ctrl: ctrl} + mock.recorder = &MockRequestStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRequestStore) EXPECT() *MockRequestStoreMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockRequestStore) Create(ctx context.Context, request entity.Request) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", ctx, request) + ret0, _ := ret[0].(error) + return ret0 +} + +// Create indicates an expected call of Create. +func (mr *MockRequestStoreMockRecorder) Create(ctx, request any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockRequestStore)(nil).Create), ctx, request) +} + +// Get mocks base method. +func (m *MockRequestStore) Get(ctx context.Context, id string) (entity.Request, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", ctx, id) + ret0, _ := ret[0].(entity.Request) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockRequestStoreMockRecorder) Get(ctx, id any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockRequestStore)(nil).Get), ctx, id) +} + +// Update mocks base method. +func (m *MockRequestStore) Update(ctx context.Context, request entity.Request, oldVersion, newVersion int32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Update", ctx, request, oldVersion, newVersion) + ret0, _ := ret[0].(error) + return ret0 +} + +// Update indicates an expected call of Update. +func (mr *MockRequestStoreMockRecorder) Update(ctx, request, oldVersion, newVersion any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockRequestStore)(nil).Update), ctx, request, oldVersion, newVersion) +} diff --git a/stovepipe/extension/storage/mock/request_uri_store_mock.go b/stovepipe/extension/storage/mock/request_uri_store_mock.go new file mode 100644 index 00000000..e50bf237 --- /dev/null +++ b/stovepipe/extension/storage/mock/request_uri_store_mock.go @@ -0,0 +1,70 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: request_uri_store.go +// +// Generated by this command: +// +// mockgen -source=request_uri_store.go -destination=mock/request_uri_store_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockRequestURIStore is a mock of RequestURIStore interface. +type MockRequestURIStore struct { + ctrl *gomock.Controller + recorder *MockRequestURIStoreMockRecorder + isgomock struct{} +} + +// MockRequestURIStoreMockRecorder is the mock recorder for MockRequestURIStore. +type MockRequestURIStoreMockRecorder struct { + mock *MockRequestURIStore +} + +// NewMockRequestURIStore creates a new mock instance. +func NewMockRequestURIStore(ctrl *gomock.Controller) *MockRequestURIStore { + mock := &MockRequestURIStore{ctrl: ctrl} + mock.recorder = &MockRequestURIStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRequestURIStore) EXPECT() *MockRequestURIStoreMockRecorder { + return m.recorder +} + +// Create mocks base method. +func (m *MockRequestURIStore) Create(ctx context.Context, queue, uri, id string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Create", ctx, queue, uri, id) + ret0, _ := ret[0].(error) + return ret0 +} + +// Create indicates an expected call of Create. +func (mr *MockRequestURIStoreMockRecorder) Create(ctx, queue, uri, id any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockRequestURIStore)(nil).Create), ctx, queue, uri, id) +} + +// GetIDByURI mocks base method. +func (m *MockRequestURIStore) GetIDByURI(ctx context.Context, queue, uri string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetIDByURI", ctx, queue, uri) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetIDByURI indicates an expected call of GetIDByURI. +func (mr *MockRequestURIStoreMockRecorder) GetIDByURI(ctx, queue, uri any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIDByURI", reflect.TypeOf((*MockRequestURIStore)(nil).GetIDByURI), ctx, queue, uri) +} diff --git a/stovepipe/extension/storage/mock/storage_mock.go b/stovepipe/extension/storage/mock/storage_mock.go new file mode 100644 index 00000000..7da8ebb9 --- /dev/null +++ b/stovepipe/extension/storage/mock/storage_mock.go @@ -0,0 +1,83 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: storage.go +// +// Generated by this command: +// +// mockgen -source=storage.go -destination=mock/storage_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + storage "github.com/uber/submitqueue/stovepipe/extension/storage" + gomock "go.uber.org/mock/gomock" +) + +// MockStorage is a mock of Storage interface. +type MockStorage struct { + ctrl *gomock.Controller + recorder *MockStorageMockRecorder + isgomock struct{} +} + +// MockStorageMockRecorder is the mock recorder for MockStorage. +type MockStorageMockRecorder struct { + mock *MockStorage +} + +// NewMockStorage creates a new mock instance. +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ctrl: ctrl} + mock.recorder = &MockStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorage) EXPECT() *MockStorageMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockStorage) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockStorageMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStorage)(nil).Close)) +} + +// GetRequestStore mocks base method. +func (m *MockStorage) GetRequestStore() storage.RequestStore { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRequestStore") + ret0, _ := ret[0].(storage.RequestStore) + return ret0 +} + +// GetRequestStore indicates an expected call of GetRequestStore. +func (mr *MockStorageMockRecorder) GetRequestStore() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRequestStore", reflect.TypeOf((*MockStorage)(nil).GetRequestStore)) +} + +// GetRequestURIStore mocks base method. +func (m *MockStorage) GetRequestURIStore() storage.RequestURIStore { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRequestURIStore") + ret0, _ := ret[0].(storage.RequestURIStore) + return ret0 +} + +// GetRequestURIStore indicates an expected call of GetRequestURIStore. +func (mr *MockStorageMockRecorder) GetRequestURIStore() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRequestURIStore", reflect.TypeOf((*MockStorage)(nil).GetRequestURIStore)) +} diff --git a/stovepipe/extension/storage/mysql/BUILD.bazel b/stovepipe/extension/storage/mysql/BUILD.bazel new file mode 100644 index 00000000..2ec40caf --- /dev/null +++ b/stovepipe/extension/storage/mysql/BUILD.bazel @@ -0,0 +1,19 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mysql", + srcs = [ + "request_store.go", + "request_uri_store.go", + "storage.go", + ], + importpath = "github.com/uber/submitqueue/stovepipe/extension/storage/mysql", + visibility = ["//visibility:public"], + deps = [ + "//platform/metrics", + "//stovepipe/entity", + "//stovepipe/extension/storage", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_uber_go_tally//:tally", + ], +) diff --git a/stovepipe/extension/storage/mysql/request_store.go b/stovepipe/extension/storage/mysql/request_store.go new file mode 100644 index 00000000..f9573d84 --- /dev/null +++ b/stovepipe/extension/storage/mysql/request_store.go @@ -0,0 +1,126 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally" + + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/storage" +) + +// mysqlErrDuplicateEntry is MySQL error code 1062 ("Duplicate entry"), returned on a unique/primary +// key violation. It requires a unique index on the table to be raised. +const mysqlErrDuplicateEntry = 1062 + +type requestStore struct { + db *sql.DB + scope tally.Scope +} + +// NewRequestStore creates a new MySQL-backed RequestStore. +func NewRequestStore(db *sql.DB, scope tally.Scope) storage.RequestStore { + return &requestStore{db: db, scope: scope} +} + +// Create persists a new request. Returns ErrAlreadyExists if the request ID already exists. +func (r *requestStore) Create(ctx context.Context, request entity.Request) (retErr error) { + op := metrics.Begin(r.scope, "create") + defer func() { op.Complete(retErr) }() + + _, err := r.db.ExecContext(ctx, + "INSERT INTO request (id, queue, uri, state, version) VALUES (?, ?, ?, ?, ?)", + request.ID, request.Queue, request.URI, request.State, request.Version, + ) + if err != nil { + if isDuplicateEntry(err) { + return fmt.Errorf("request entity id=%s: %w", request.ID, storage.ErrAlreadyExists) + } + return fmt.Errorf("failed to insert request entity id=%s: %w", request.ID, err) + } + + return nil +} + +// Get retrieves a request by ID. Returns ErrNotFound if the request is not found. +func (r *requestStore) Get(ctx context.Context, id string) (ret entity.Request, retErr error) { + op := metrics.Begin(r.scope, "get") + defer func() { op.Complete(retErr) }() + + var req entity.Request + err := r.db.QueryRowContext(ctx, + "SELECT id, queue, uri, state, version FROM request WHERE id = ?", + id, + ).Scan(&req.ID, &req.Queue, &req.URI, &req.State, &req.Version) + + if errors.Is(err, sql.ErrNoRows) { + return entity.Request{}, storage.WrapNotFound(err) + } + if err != nil { + return entity.Request{}, fmt.Errorf("failed to get request entity id=%s from the database: %w", id, err) + } + + return req, nil +} + +// Update persists the mutable fields of request (uri, state) if the stored version matches +// oldVersion, writing newVersion. Returns ErrVersionMismatch if the stored version does not match +// (including when the request does not exist). This is a pure conditional write; the caller owns +// version arithmetic. +func (r *requestStore) Update(ctx context.Context, request entity.Request, oldVersion, newVersion int32) (retErr error) { + op := metrics.Begin(r.scope, "update") + defer func() { op.Complete(retErr) }() + + result, err := r.db.ExecContext(ctx, + "UPDATE request SET uri = ?, state = ?, version = ? WHERE id = ? AND version = ?", + request.URI, request.State, newVersion, request.ID, oldVersion, + ) + if err != nil { + return fmt.Errorf( + "failed to update request id=%q oldVersion=%d newVersion=%d: %w", + request.ID, oldVersion, newVersion, err, + ) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf( + "failed to get rows affected from update for id=%q oldVersion=%d newVersion=%d: %w", + request.ID, oldVersion, newVersion, err, + ) + } + + if rowsAffected != 1 { + return fmt.Errorf( + "version mismatch for request update: id=%q expected_version=%d: %w", + request.ID, oldVersion, storage.ErrVersionMismatch, + ) + } + + return nil +} + +// isDuplicateEntry reports whether err is a MySQL duplicate-key (1062) error. +func isDuplicateEntry(err error) bool { + var mysqlErr *mysql.MySQLError + return errors.As(err, &mysqlErr) && mysqlErr.Number == mysqlErrDuplicateEntry +} diff --git a/stovepipe/extension/storage/mysql/request_uri_store.go b/stovepipe/extension/storage/mysql/request_uri_store.go new file mode 100644 index 00000000..1f5c5aea --- /dev/null +++ b/stovepipe/extension/storage/mysql/request_uri_store.go @@ -0,0 +1,83 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/uber-go/tally" + + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/stovepipe/extension/storage" +) + +// requestURIInitialVersion is the version written for a new mapping. The mapping is insert-once, +// so the column is constant today; it exists for record consistency with the other stores and to +// reserve optimistic-locking semantics if the mapping ever becomes mutable. +const requestURIInitialVersion = 1 + +type requestURIStore struct { + db *sql.DB + scope tally.Scope +} + +// NewRequestURIStore creates a new MySQL-backed RequestURIStore. +func NewRequestURIStore(db *sql.DB, scope tally.Scope) storage.RequestURIStore { + return &requestURIStore{db: db, scope: scope} +} + +// Create records the (queue, uri) -> id reverse index. Returns ErrAlreadyExists if (queue, uri) +// is already mapped to a request. +func (r *requestURIStore) Create(ctx context.Context, queue, uri, id string) (retErr error) { + op := metrics.Begin(r.scope, "create") + defer func() { op.Complete(retErr) }() + + _, err := r.db.ExecContext(ctx, + "INSERT INTO request_uri (queue, uri, request_id, version) VALUES (?, ?, ?, ?)", + queue, uri, id, requestURIInitialVersion, + ) + if err != nil { + if isDuplicateEntry(err) { + return fmt.Errorf("request_uri queue=%s uri=%s: %w", queue, uri, storage.ErrAlreadyExists) + } + return fmt.Errorf("failed to map queue=%s uri=%s to request id=%s: %w", queue, uri, id, err) + } + + return nil +} + +// GetIDByURI returns the id of the request validating (queue, uri). Returns ErrNotFound if absent. +func (r *requestURIStore) GetIDByURI(ctx context.Context, queue, uri string) (ret string, retErr error) { + op := metrics.Begin(r.scope, "get_id_by_uri") + defer func() { op.Complete(retErr) }() + + var id string + err := r.db.QueryRowContext(ctx, + "SELECT request_id FROM request_uri WHERE queue = ? AND uri = ?", + queue, uri, + ).Scan(&id) + + if errors.Is(err, sql.ErrNoRows) { + return "", storage.WrapNotFound(err) + } + if err != nil { + return "", fmt.Errorf("failed to get request id for queue=%s uri=%s from the database: %w", queue, uri, err) + } + + return id, nil +} diff --git a/stovepipe/extension/storage/mysql/schema/BUILD.bazel b/stovepipe/extension/storage/mysql/schema/BUILD.bazel new file mode 100644 index 00000000..3412d773 --- /dev/null +++ b/stovepipe/extension/storage/mysql/schema/BUILD.bazel @@ -0,0 +1,5 @@ +filegroup( + name = "schema", + srcs = glob(["*.sql"]), + visibility = ["//visibility:public"], +) diff --git a/stovepipe/extension/storage/mysql/schema/request.sql b/stovepipe/extension/storage/mysql/schema/request.sql new file mode 100644 index 00000000..207bdc5b --- /dev/null +++ b/stovepipe/extension/storage/mysql/schema/request.sql @@ -0,0 +1,11 @@ +-- request holds one validation of a queue at a particular commit. uri is the opaque, +-- VCS-agnostic commit locator; it may be empty until SourceControl resolution is wired in. +-- No timestamps: created/updated times are not part of the Request entity. +CREATE TABLE IF NOT EXISTS request ( + id VARCHAR(255) NOT NULL, + queue VARCHAR(255) NOT NULL, + uri VARCHAR(255) NOT NULL, + state VARCHAR(64) NOT NULL, + version INT NOT NULL, + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/stovepipe/extension/storage/mysql/schema/request_uri.sql b/stovepipe/extension/storage/mysql/schema/request_uri.sql new file mode 100644 index 00000000..853f2973 --- /dev/null +++ b/stovepipe/extension/storage/mysql/schema/request_uri.sql @@ -0,0 +1,13 @@ +-- request_uri is the reverse index from a validated commit to the request that owns it. The +-- composite PK (queue, uri) enforces exactly one request per (queue, commit URI) — the RFC's +-- dedup guarantee — and a duplicate insert surfaces as the "already being validated" signal. +-- queue leads the PK so the row is addressed by its (queue, uri) key (NoSQL partition/sort style; +-- no secondary indexes). version carries the optimistic-locking version for record consistency +-- with the other stores; the mapping is currently insert-once, so it is written as 1. +CREATE TABLE IF NOT EXISTS request_uri ( + queue VARCHAR(255) NOT NULL, + uri VARCHAR(255) NOT NULL, + request_id VARCHAR(255) NOT NULL, + version INT NOT NULL, + PRIMARY KEY (queue, uri) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/stovepipe/extension/storage/mysql/storage.go b/stovepipe/extension/storage/mysql/storage.go new file mode 100644 index 00000000..93b04eb7 --- /dev/null +++ b/stovepipe/extension/storage/mysql/storage.go @@ -0,0 +1,54 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "database/sql" + + _ "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally" + + "github.com/uber/submitqueue/stovepipe/extension/storage" +) + +type mysqlStorage struct { + db *sql.DB + requestStore storage.RequestStore + requestURIStore storage.RequestURIStore +} + +// NewStorage creates a new MySQL-backed storage. +func NewStorage(db *sql.DB, scope tally.Scope) (storage.Storage, error) { + return &mysqlStorage{ + db: db, + requestStore: NewRequestStore(db, scope.SubScope("request_store")), + requestURIStore: NewRequestURIStore(db, scope.SubScope("request_uri_store")), + }, nil +} + +// GetRequestStore returns the MySQL-backed RequestStore. +func (f *mysqlStorage) GetRequestStore() storage.RequestStore { + return f.requestStore +} + +// GetRequestURIStore returns the MySQL-backed RequestURIStore. +func (f *mysqlStorage) GetRequestURIStore() storage.RequestURIStore { + return f.requestURIStore +} + +// Close closes the underlying database connection. +func (f *mysqlStorage) Close() error { + return f.db.Close() +} diff --git a/stovepipe/extension/storage/request_store.go b/stovepipe/extension/storage/request_store.go new file mode 100644 index 00000000..4001bf0f --- /dev/null +++ b/stovepipe/extension/storage/request_store.go @@ -0,0 +1,44 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=request_store.go -destination=mock/request_store_mock.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/stovepipe/entity" +) + +// RequestStore persists validation requests, keyed by request ID. The reverse index from a +// validated commit to the request that owns it lives in a separate RequestURIStore (one store per +// table), so the two concerns can evolve and be backed independently. +type RequestStore interface { + // Create persists a new request. The request must have a unique ID already assigned. + // Returns ErrAlreadyExists if a request with the same ID already exists. + Create(ctx context.Context, request entity.Request) error + + // Get retrieves a request by ID. Returns ErrNotFound if the request is not found. + Get(ctx context.Context, id string) (entity.Request, error) + + // Update persists the mutable fields of request if the currently stored version matches + // oldVersion, writing newVersion as the new version. Returns ErrVersionMismatch if the + // stored version does not match (including when the request does not exist). + // + // Version arithmetic is owned by the caller: it computes newVersion (typically oldVersion+1) + // and only assigns request.Version = newVersion after this call succeeds. The store performs + // a pure conditional write and does not read request.Version. + Update(ctx context.Context, request entity.Request, oldVersion, newVersion int32) error +} diff --git a/stovepipe/extension/storage/request_uri_store.go b/stovepipe/extension/storage/request_uri_store.go new file mode 100644 index 00000000..e5f9f6b7 --- /dev/null +++ b/stovepipe/extension/storage/request_uri_store.go @@ -0,0 +1,36 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=request_uri_store.go -destination=mock/request_uri_store_mock.go -package=mock + +import ( + "context" +) + +// RequestURIStore is the reverse index from a validated commit to the request that owns it, +// keyed by (queue, commit URI). It is a separate store from RequestStore because it is a separate +// table — the two are written independently (no cross-table transaction), keeping the contract +// satisfiable by key/value backends. The caller orchestrates "create request, then map URI". +type RequestURIStore interface { + // Create records that request id validates the commit uri for queue. Returns ErrAlreadyExists + // if (queue, uri) is already mapped — the signal a caller uses to detect that the commit is + // already being validated by another request. + Create(ctx context.Context, queue, uri, id string) error + + // GetIDByURI returns the id of the request validating (queue, uri). + // Returns ErrNotFound if no request is mapped to that commit. + GetIDByURI(ctx context.Context, queue, uri string) (string, error) +} diff --git a/stovepipe/extension/storage/storage.go b/stovepipe/extension/storage/storage.go new file mode 100644 index 00000000..d42406bb --- /dev/null +++ b/stovepipe/extension/storage/storage.go @@ -0,0 +1,55 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +//go:generate mockgen -source=storage.go -destination=mock/storage_mock.go -package=mock + +import ( + "errors" + "fmt" +) + +// ErrNotFound is returned by storage implementations when the requested record is not found in the database. +var ErrNotFound = errors.New("record not found") + +// IsNotFound returns true if any error in the error chain is a ErrNotFound. +func IsNotFound(err error) bool { + return errors.Is(err, ErrNotFound) +} + +// WrapNotFound wraps ErrNotFound with the original error from the storage implementation. +func WrapNotFound(err error) error { + return fmt.Errorf("%w: %w", ErrNotFound, err) +} + +// ErrAlreadyExists is returned by storage implementations when attempting to create a record that already exists. +var ErrAlreadyExists = errors.New("record already exists") + +// ErrVersionMismatch is returned by storage implementations when a conditional (CAS) update finds that +// the stored version does not match the expected version. It backs optimistic locking, letting callers +// retry or converge instead of overwriting a concurrent change. +var ErrVersionMismatch = errors.New("version mismatch") + +// Storage is a factory interface that aggregates all entity stores into a single injectable dependency. +type Storage interface { + // GetRequestStore returns the RequestStore instance. + GetRequestStore() RequestStore + + // GetRequestURIStore returns the RequestURIStore instance. + GetRequestURIStore() RequestURIStore + + // Close closes the storage and all underlying connections. Should only be called once at the end of the program. + Close() error +} diff --git a/test/integration/stovepipe/extension/storage/mysql/BUILD.bazel b/test/integration/stovepipe/extension/storage/mysql/BUILD.bazel new file mode 100644 index 00000000..2b3ec00c --- /dev/null +++ b/test/integration/stovepipe/extension/storage/mysql/BUILD.bazel @@ -0,0 +1,24 @@ +load("@rules_go//go:def.bzl", "go_test") + +go_test( + name = "mysql_test", + srcs = ["storage_test.go"], + data = [ + "docker-compose.yml", + "//stovepipe/extension/storage/mysql/schema", + ], + tags = [ + "external", + "integration", + ], + deps = [ + "//stovepipe/entity", + "//stovepipe/extension/storage", + "//stovepipe/extension/storage/mysql", + "//test/testutil", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_stretchr_testify//require", + "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally//:tally", + ], +) diff --git a/test/integration/stovepipe/extension/storage/mysql/docker-compose.yml b/test/integration/stovepipe/extension/storage/mysql/docker-compose.yml new file mode 100644 index 00000000..79d574b3 --- /dev/null +++ b/test/integration/stovepipe/extension/storage/mysql/docker-compose.yml @@ -0,0 +1,20 @@ +# Docker Compose for Stovepipe MySQL Storage tests. +# Tests the storage extension's MySQL implementation in isolation. + +services: + mysql: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: root + # Database name must match the shared testutil DSN (ConnectMySQLService). + MYSQL_DATABASE: submitqueue + ports: + - "3306" # Random ephemeral port to avoid conflicts + healthcheck: + # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats + # "localhost" as a socket connection, which can be ready before the TCP + # listener — causing dependent services that connect over TCP to fail. + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] + interval: 5s + timeout: 5s + retries: 10 diff --git a/test/integration/stovepipe/extension/storage/mysql/storage_test.go b/test/integration/stovepipe/extension/storage/mysql/storage_test.go new file mode 100644 index 00000000..1efdf7f7 --- /dev/null +++ b/test/integration/stovepipe/extension/storage/mysql/storage_test.go @@ -0,0 +1,188 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber-go/tally" + + "github.com/uber/submitqueue/stovepipe/entity" + "github.com/uber/submitqueue/stovepipe/extension/storage" + mysqlstorage "github.com/uber/submitqueue/stovepipe/extension/storage/mysql" + "github.com/uber/submitqueue/test/testutil" +) + +// MySQLRequestStoreSuite exercises the MySQL-backed RequestStore against a real MySQL instance +// started via docker-compose. +type MySQLRequestStoreSuite struct { + suite.Suite + stack *testutil.ComposeStack + db *sql.DB + log *testutil.TestLogger + ctx context.Context + store storage.RequestStore + uriStore storage.RequestURIStore +} + +func TestMySQLRequestStore(t *testing.T) { + suite.Run(t, new(MySQLRequestStoreSuite)) +} + +func (s *MySQLRequestStoreSuite) SetupSuite() { + t := s.T() + s.ctx = context.Background() + s.log = testutil.NewTestLogger(t) + + s.stack = testutil.NewComposeStack( + t, + s.log, + s.ctx, + "docker-compose.yml", + "ext-stovepipe-storage-mysql", + ) + + err := s.stack.Up() + require.NoError(t, err, "failed to start compose stack") + + s.db, err = s.stack.ConnectMySQLService("mysql") + require.NoError(t, err, "failed to connect to MySQL") + + schemaDir := testutil.SchemaDir("stovepipe/extension/storage/mysql/schema") + testutil.ApplySchema(t, s.log, s.db, schemaDir) + + store, err := mysqlstorage.NewStorage(s.db, tally.NoopScope) + require.NoError(t, err, "failed to create storage") + s.store = store.GetRequestStore() + s.uriStore = store.GetRequestURIStore() + + t.Cleanup(func() { + if s.db != nil { + s.db.Close() + } + }) +} + +func (s *MySQLRequestStoreSuite) TestCreateAndGet() { + req := entity.Request{ + ID: "request/monorepo/main/1", + Queue: "monorepo/main", + URI: "git://remote/monorepo/main/aaaa1111", + State: entity.RequestStateAccepted, + Version: 1, + } + require.NoError(s.T(), s.store.Create(s.ctx, req)) + + got, err := s.store.Get(s.ctx, req.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), req, got) +} + +func (s *MySQLRequestStoreSuite) TestGetNotFound() { + _, err := s.store.Get(s.ctx, "request/monorepo/main/does-not-exist") + require.True(s.T(), storage.IsNotFound(err)) +} + +func (s *MySQLRequestStoreSuite) TestUpdateCAS() { + req := entity.Request{ + ID: "request/monorepo/main/update", + Queue: "monorepo/main", + State: entity.RequestStateAccepted, + Version: 1, + } + require.NoError(s.T(), s.store.Create(s.ctx, req)) + + // Successful CAS: stored version (1) matches oldVersion; resolve the URI and bump to 2. + updated := req + updated.URI = "git://remote/monorepo/main/resolved" + require.NoError(s.T(), s.store.Update(s.ctx, updated, 1, 2)) + + got, err := s.store.Get(s.ctx, req.ID) + require.NoError(s.T(), err) + require.Equal(s.T(), "git://remote/monorepo/main/resolved", got.URI) + require.Equal(s.T(), int32(2), got.Version) + + // Stale CAS: oldVersion 1 no longer matches the stored version (2). + err = s.store.Update(s.ctx, updated, 1, 2) + require.ErrorIs(s.T(), err, storage.ErrVersionMismatch) +} + +func (s *MySQLRequestStoreSuite) TestUpdateNotFoundIsVersionMismatch() { + missing := entity.Request{ID: "request/monorepo/main/missing", State: entity.RequestStateAccepted} + err := s.store.Update(s.ctx, missing, 1, 2) + require.ErrorIs(s.T(), err, storage.ErrVersionMismatch) +} + +func (s *MySQLRequestStoreSuite) TestCreateDuplicateID() { + req := entity.Request{ + ID: "request/monorepo/main/2", + Queue: "monorepo/main", + State: entity.RequestStateAccepted, + Version: 1, + } + require.NoError(s.T(), s.store.Create(s.ctx, req)) + + err := s.store.Create(s.ctx, req) + require.ErrorIs(s.T(), err, storage.ErrAlreadyExists) +} + +func (s *MySQLRequestStoreSuite) TestURIMappingCreateAndGet() { + const ( + queue = "monorepo/main" + uri = "git://remote/monorepo/main/bbbb2222" + id = "request/monorepo/main/3" + ) + require.NoError(s.T(), s.uriStore.Create(s.ctx, queue, uri, id)) + + got, err := s.uriStore.GetIDByURI(s.ctx, queue, uri) + require.NoError(s.T(), err) + require.Equal(s.T(), id, got) +} + +func (s *MySQLRequestStoreSuite) TestGetIDByURINotFound() { + _, err := s.uriStore.GetIDByURI(s.ctx, "monorepo/main", "git://remote/monorepo/main/unmapped") + require.True(s.T(), storage.IsNotFound(err)) +} + +func (s *MySQLRequestStoreSuite) TestURIMappingDuplicate() { + const ( + queue = "monorepo/main" + uri = "git://remote/monorepo/main/cccc3333" + ) + require.NoError(s.T(), s.uriStore.Create(s.ctx, queue, uri, "request/monorepo/main/4")) + + // A second request claiming the same (queue, uri) is rejected — the dedup signal. + err := s.uriStore.Create(s.ctx, queue, uri, "request/monorepo/main/5") + require.ErrorIs(s.T(), err, storage.ErrAlreadyExists) +} + +func (s *MySQLRequestStoreSuite) TestURIMappingDistinctAcrossQueues() { + const uri = "git://remote/monorepo/shared/dddd4444" + require.NoError(s.T(), s.uriStore.Create(s.ctx, "queue-a", uri, "request/queue-a/1")) + require.NoError(s.T(), s.uriStore.Create(s.ctx, "queue-b", uri, "request/queue-b/1")) + + idA, err := s.uriStore.GetIDByURI(s.ctx, "queue-a", uri) + require.NoError(s.T(), err) + require.Equal(s.T(), "request/queue-a/1", idA) + + idB, err := s.uriStore.GetIDByURI(s.ctx, "queue-b", uri) + require.NoError(s.T(), err) + require.Equal(s.T(), "request/queue-b/1", idB) +}