Skip to content

Commit 790b8a6

Browse files
committed
feat(storage) Implement sample MySQL storage engine for Requests
1 parent 674e49b commit 790b8a6

9 files changed

Lines changed: 214 additions & 24 deletions

File tree

MODULE.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ go_deps.from_file(go_mod = "//:go.mod")
3030
# All *direct* Go dependencies of the module have to be listed explicitly
3131
use_repo(
3232
go_deps,
33+
"com_github_go_sql_driver_mysql",
3334
"com_github_gogo_protobuf",
3435
"com_github_stretchr_testify",
3536
"com_github_uber_go_tally_v4",

entities/request.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,34 @@ import (
99
// RequestLandStrategy defines the possible source control integration methods.
1010
type RequestLandStrategy int
1111

12+
// do not use iota here, as values should be fixed and consistent across versions.
1213
const (
1314
// RequestLandStrategyDefault lets the server decide based on configuration.
1415
RequestLandStrategyDefault RequestLandStrategy = 0
1516
// RequestLandStrategyRebase rebases commits onto the target branch before landing.
16-
RequestLandStrategyRebase = 1
17+
RequestLandStrategyRebase RequestLandStrategy = 1
1718
// RequestLandStrategySquashRebase squashes commits into a single commit before rebase.
18-
RequestLandStrategySquashRebase = 2
19+
RequestLandStrategySquashRebase RequestLandStrategy = 2
1920
// RequestLandStrategyMerge merges commits into the target branch by creating a separate merge commit, preserving the commit history along with hashes.
20-
RequestLandStrategyMerge = 3
21+
RequestLandStrategyMerge RequestLandStrategy = 3
2122
)
2223

24+
// RequestState defines the possible states of a land request.
2325
type RequestState int
2426

2527
// TODO: define all states
28+
// do not use iota here, as values should be fixed and consistent across versions.
2629
const (
2730
// RequestStateUnknown is the unreachable state. It is set by default when the structure is initialized. It should never be seen in the system.
2831
RequestStateUnknown RequestState = 0
2932
// RequestStateNew is the initial state of a land request. It is confirmed by the system but the processing is not started yet.
3033
RequestStateNew RequestState = 1
3134
// RequestStateProcessing is the state of a land request that is being processed.
32-
RequestStateProcessing = 2
35+
RequestStateProcessing RequestState = 2
3336
// RequestStateLanded is the state of a land request that has been successfully processed and landed. This is the final state.
34-
RequestStateLanded = 3
37+
RequestStateLanded RequestState = 3
3538
// RequestStateError is the state of a land request that has encountered an error. This is the final state.
36-
RequestStateError = 4
39+
RequestStateError RequestState = 4
3740
)
3841

3942
// Change represents a set of related code changes identified by one or more IDs from a particular code change provider, like Github Pull Requests.

extensions/storage/mysql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ go_library(
1111
deps = [
1212
"//entities",
1313
"//extensions/storage",
14+
"@com_github_go_sql_driver_mysql//:mysql",
1415
],
1516
)
Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,71 @@
11
package mysql
22

3-
import "github.com/uber/submitqueue/extensions/storage"
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"time"
47

5-
// MySQLParameters defines the parameters for the MySQL storage factory.
8+
_ "github.com/go-sql-driver/mysql"
9+
10+
"github.com/uber/submitqueue/extensions/storage"
11+
)
12+
13+
// MySQLParameters defines the configuration for the MySQL storage factory.
14+
// TODO: integrate with configuration system.
615
type MySQLParameters struct {
7-
}
16+
// DSN is the MySQL Data Source Name.
17+
// Format: [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
18+
DSN string
819

9-
// NewFactory creates a new MySQL storage factory
10-
func NewFactory(p MySQLParameters) (storage.StoreFactory, error) {
11-
return &factory{
12-
requestStore: NewRequestStore(),
13-
}, nil
20+
// MaxOpenConns sets the maximum number of open connections to the database. 0 means unlimited.
21+
MaxOpenConns int
22+
23+
// MaxIdleConns sets the maximum number of idle connections in the pool. 0 means no idle connections are retained.
24+
MaxIdleConns int
25+
26+
// ConnMaxLifetime sets the maximum amount of time a connection may be reused. 0 means connections are not closed due to age.
27+
ConnMaxLifetime time.Duration
1428
}
1529

1630
type factory struct {
31+
db *sql.DB
1732
requestStore storage.RequestStore
1833
}
1934

20-
// GetRequestStore returns the MySQL-backed RequestStore
35+
// NewFactory creates a new MySQL storage factory.
36+
func NewFactory(p MySQLParameters) (storage.StoreFactory, error) {
37+
db, err := sql.Open("mysql", p.DSN)
38+
if err != nil {
39+
return nil, fmt.Errorf("failed to open MySQL connection: %w", err)
40+
}
41+
42+
if p.MaxOpenConns > 0 {
43+
db.SetMaxOpenConns(p.MaxOpenConns)
44+
}
45+
if p.MaxIdleConns > 0 {
46+
db.SetMaxIdleConns(p.MaxIdleConns)
47+
}
48+
if p.ConnMaxLifetime > 0 {
49+
db.SetConnMaxLifetime(p.ConnMaxLifetime)
50+
}
51+
52+
if err := db.Ping(); err != nil {
53+
db.Close()
54+
return nil, fmt.Errorf("failed to ping MySQL: %w", err)
55+
}
56+
57+
return &factory{
58+
db: db,
59+
requestStore: NewRequestStore(db),
60+
}, nil
61+
}
62+
63+
// GetRequestStore returns the MySQL-backed RequestStore.
2164
func (f *factory) GetRequestStore() storage.RequestStore {
2265
return f.requestStore
2366
}
67+
68+
// Close closes the underlying database connection.
69+
func (f *factory) Close() error {
70+
return f.db.Close()
71+
}

extensions/storage/mysql/request_store.go

Lines changed: 116 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,142 @@ package mysql
22

33
import (
44
"context"
5+
"database/sql"
6+
"encoding/json"
57
"errors"
8+
"fmt"
9+
10+
"github.com/go-sql-driver/mysql"
611

712
"github.com/uber/submitqueue/entities"
813
"github.com/uber/submitqueue/extensions/storage"
914
)
1015

16+
const maxCreateRetries = 1000
17+
1118
type requestStore struct {
19+
db *sql.DB
1220
}
1321

1422
// NewRequestStore creates a new MySQL-backed RequestStore.
15-
func NewRequestStore() storage.RequestStore {
16-
return &requestStore{}
23+
func NewRequestStore(db *sql.DB) storage.RequestStore {
24+
return &requestStore{db: db}
1725
}
1826

1927
// Get retrieves a land request by ID. Returns ErrNotFound if the request is not found.
2028
func (r *requestStore) Get(ctx context.Context, id string) (entities.Request, error) {
21-
// TODO: implement GET operation
22-
return entities.Request{}, errors.New("not implemented")
29+
queue, seq, err := entities.ParseRequestID(id)
30+
if err != nil {
31+
return entities.Request{}, fmt.Errorf("failed to parse request ID %s: %w", id, err)
32+
}
33+
34+
var req entities.Request
35+
var changeIDsJSON []byte
36+
37+
err = r.db.QueryRowContext(ctx,
38+
"SELECT queue, seq, change_source, change_ids, land_strategy, state, version FROM request WHERE queue = ? AND seq = ?",
39+
queue, seq,
40+
).Scan(&req.Queue, &req.Seq, &req.Change.Source, &changeIDsJSON, &req.LandStrategy, &req.State, &req.Version)
41+
42+
if errors.Is(err, sql.ErrNoRows) {
43+
return entities.Request{}, storage.WrapNotFound(err)
44+
}
45+
if err != nil {
46+
return entities.Request{}, fmt.Errorf("failed to get request entity id=%s from the database: %w", id, err)
47+
}
48+
49+
if err := json.Unmarshal(changeIDsJSON, &req.Change.IDs); err != nil {
50+
return entities.Request{}, fmt.Errorf("failed to unmarshal change IDs for request entity id=%s from the database: %w", id, err)
51+
}
52+
53+
return req, nil
2354
}
2455

2556
// Create creates a new land request. Returns the created request object with generated sequence number.
57+
// It uses optimistic locking: obtains the current max sequence number, attempts to insert with seq+1,
58+
// and retries with an incremented sequence number on primary key conflict.
2659
func (r *requestStore) Create(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
27-
// TODO: implement CREATE operation
28-
return entities.Request{}, errors.New("not implemented")
60+
changeIDsJSON, err := json.Marshal(change.IDs)
61+
if err != nil {
62+
return entities.Request{}, fmt.Errorf("failed to marshal change IDs=%v queue=%s for Create request entity: %w", change.IDs, queue, err)
63+
}
64+
65+
var seq int64
66+
err = r.db.QueryRowContext(ctx,
67+
"SELECT COALESCE(MAX(seq), 0) + 1 FROM request WHERE queue = ?",
68+
queue,
69+
).Scan(&seq)
70+
if err != nil {
71+
return entities.Request{}, fmt.Errorf("failed to get next sequence number for queue=%s: %w", queue, err)
72+
}
73+
74+
// Version always start from 1 as per protocol.
75+
version := int32(1)
76+
77+
// retry up to maxCreateRetries times to insert the request entity, incrementing the sequence number on primary key conflict
78+
for attempt := 0; attempt < maxCreateRetries; attempt++ {
79+
_, err = r.db.ExecContext(ctx,
80+
"INSERT INTO request (queue, seq, change_source, change_ids, land_strategy, state, version) VALUES (?, ?, ?, ?, ?, ?, ?)",
81+
queue, seq, change.Source, changeIDsJSON, strategy, state, version,
82+
)
83+
if err == nil {
84+
return entities.Request{
85+
Queue: queue,
86+
Seq: seq,
87+
Change: change,
88+
LandStrategy: strategy,
89+
State: state,
90+
Version: version,
91+
}, nil
92+
}
93+
94+
// if the error is a MySQL primary key conflict error, increment the sequence number and retry
95+
// It relies on MySQL-specific error code 1062 for primary key conflict. Hopefully this will not change in the future.
96+
var mysqlErr *mysql.MySQLError
97+
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
98+
seq++
99+
continue
100+
}
101+
102+
return entities.Request{}, fmt.Errorf("failed to insert request entity queue=%s seq=%d: %w", queue, seq, err)
103+
}
104+
105+
return entities.Request{}, fmt.Errorf("failed to insert request entity queue=%s change=%v: exceeded %d retry attempts due to primary key conflicts", queue, change, maxCreateRetries)
29106
}
30107

31108
// UpdateState updates the state of a land request if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch.
32-
// The implementation should increment the version by 1 atomically with the state update.
109+
// The implementation increments the version by 1 atomically with the state update.
33110
func (r *requestStore) UpdateState(ctx context.Context, id string, version int32, newState entities.RequestState) error {
34-
// TODO: implement UPDATE STATE operation
35-
return errors.New("not implemented")
111+
queue, seq, err := entities.ParseRequestID(id)
112+
if err != nil {
113+
return fmt.Errorf("failed to parse request ID=%q: %w", id, err)
114+
}
115+
116+
result, err := r.db.ExecContext(ctx,
117+
"UPDATE request SET state = ?, version = version + 1 WHERE queue = ? AND seq = ? AND version = ?",
118+
newState, queue, seq, version,
119+
)
120+
if err != nil {
121+
return fmt.Errorf(
122+
"failed to update request state for queue=%q seq=%d version=%d newState=%v: %w",
123+
queue, seq, version, newState, err,
124+
)
125+
}
126+
127+
rowsAffected, err := result.RowsAffected()
128+
if err != nil {
129+
return fmt.Errorf(
130+
"failed to get rows affected from update for queue=%q seq=%d version=%d newState=%v: %w",
131+
queue, seq, version, newState, err,
132+
)
133+
}
134+
135+
if rowsAffected != 1 {
136+
return fmt.Errorf(
137+
"version mismatch for request update: queue=%q seq=%d expected_version=%d newState=%v: %w",
138+
queue, seq, version, newState, storage.ErrVersionMismatch,
139+
)
140+
}
141+
142+
return nil
36143
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CREATE TABLE IF NOT EXISTS request (
2+
queue VARCHAR(255) NOT NULL,
3+
seq BIGINT NOT NULL,
4+
change_source VARCHAR(255) NOT NULL,
5+
change_ids JSON NOT NULL,
6+
land_strategy INT NOT NULL DEFAULT 0,
7+
state INT NOT NULL,
8+
version INT NOT NULL DEFAULT 1,
9+
PRIMARY KEY (queue, seq)
10+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

extensions/storage/storage.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
package storage
22

33
import "errors"
4+
import "fmt"
45

56
// ErrNotFound is returned by storage implementations when the requested record is not found in the database.
67
var ErrNotFound = errors.New("record not found")
78

9+
// IsNotFound returns true if any error in the error chain is a ErrNotFound.
10+
func IsNotFound(err error) bool {
11+
return errors.Is(err, ErrNotFound)
12+
}
13+
14+
// WrapNotFound wraps ErrNotFound with the original error from the storage implementation.
15+
func WrapNotFound(err error) error {
16+
return fmt.Errorf("%w: %w", ErrNotFound, err)
17+
}
18+
819
// ErrVersionMismatch is returned by storage implementations when the expected entity version does not match the current version of the object.
920
// This is used to implement an optimistic locking mechanism, allowing multiple clients to update the same entity concurrently
1021
// and either retry or implement idempotent operations.
@@ -15,4 +26,7 @@ var ErrVersionMismatch = errors.New("version mismatch")
1526
type StoreFactory interface {
1627
// GetRequestStore creates a new RequestStore instance.
1728
GetRequestStore() RequestStore
29+
30+
// Close closes the store factory and all underlying connections. Should only be called once at the end of the program.
31+
Close() error
1832
}

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/uber/submitqueue
33
go 1.24.5
44

55
require (
6+
github.com/go-sql-driver/mysql v1.9.3
67
github.com/gogo/protobuf v1.3.2
78
github.com/stretchr/testify v1.9.0
89
github.com/uber-go/tally/v4 v4.1.17
@@ -14,6 +15,7 @@ require (
1415
)
1516

1617
require (
18+
filippo.io/edwards25519 v1.1.0 // indirect
1719
github.com/BurntSushi/toml v1.2.1 // indirect
1820
github.com/beorn7/perks v1.0.1 // indirect
1921
github.com/cespare/xxhash/v2 v2.3.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
2+
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
3+
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
24
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
35
github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
46
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
@@ -29,6 +31,8 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
2931
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
3032
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
3133
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
34+
github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
35+
github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
3236
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
3337
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
3438
github.com/gogo/googleapis v1.4.1 h1:1Yx4Myt7BxzvUr5ldGSbwYiZG6t9wGBZ+8/fX3Wvtq0=

0 commit comments

Comments
 (0)