Skip to content

Commit 47c95ca

Browse files
committed
feat(mysql): Wire MySQL to controller and add integration test
1 parent 790b8a6 commit 47c95ca

19 files changed

Lines changed: 833 additions & 314 deletions

File tree

MODULE.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use_repo(
3333
"com_github_go_sql_driver_mysql",
3434
"com_github_gogo_protobuf",
3535
"com_github_stretchr_testify",
36+
"com_github_testcontainers_testcontainers_go",
37+
"com_github_testcontainers_testcontainers_go_modules_mysql",
3638
"com_github_uber_go_tally_v4",
3739
"org_golang_google_grpc",
3840
"org_golang_google_protobuf",

Makefile

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ integration-test:
6868
@echo "Running all service integration tests..."
6969
@$(BAZEL) test //gateway/integration_tests:integration_tests_test //orchestrator/integration_tests:integration_tests_test //speculator/integration_tests:integration_tests_test --test_output=all
7070

71-
# Run end-to-end tests (requires all services to be running)
71+
# Run end-to-end integration tests (hermetic, no manual server setup needed)
7272
e2e-test:
73-
@echo "Running end-to-end tests..."
74-
@$(BAZEL) test //integration_tests:e2e_test --test_output=all
73+
@echo "Running integration tests..."
74+
@$(BAZEL) test //integration_tests:integration_test --test_output=all
7575

7676
# Clean generated files and binaries
7777
clean:
@@ -186,7 +186,9 @@ help:
186186
@echo " make integration-test-orchestrator - Test Orchestrator service"
187187
@echo " make integration-test-speculator - Test Speculator service"
188188
@echo " make integration-test - Test all services"
189-
@echo " make e2e-test - Run end-to-end tests"
189+
@echo ""
190+
@echo "End-to-End Tests (hermetic, no setup needed):"
191+
@echo " make e2e-test - Run integration tests with Testcontainers"
190192
@echo ""
191193
@echo "Run Clients:"
192194
@echo " make run-client-gateway - Run gateway client"

examples/server/gateway/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
importpath = "github.com/uber/submitqueue/examples/server/gateway",
77
visibility = ["//visibility:private"],
88
deps = [
9+
"//extensions/storage/mysql",
910
"//gateway/controller",
1011
"//gateway/protopb",
1112
"@com_github_uber_go_tally_v4//:tally",

examples/server/gateway/main.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/uber-go/tally/v4"
14+
"github.com/uber/submitqueue/extensions/storage/mysql"
1415
"github.com/uber/submitqueue/gateway/controller"
1516
pb "github.com/uber/submitqueue/gateway/protopb"
1617
"go.uber.org/zap"
@@ -21,12 +22,18 @@ import (
2122
// GatewayServer wraps the controller and implements the gRPC service interface
2223
type GatewayServer struct {
2324
pb.UnimplementedSubmitQueueGatewayServer
24-
controller *controller.PingController
25+
pingController *controller.PingController
26+
landController *controller.LandController
2527
}
2628

2729
// Ping delegates to the controller
2830
func (s *GatewayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
29-
return s.controller.Ping(ctx, req)
31+
return s.pingController.Ping(ctx, req)
32+
}
33+
34+
// Land delegates to the controller
35+
func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) {
36+
return s.landController.Land(ctx, req)
3037
}
3138

3239
func main() {
@@ -75,21 +82,42 @@ func run() error {
7582
metricsWgDone.Wait()
7683
}()
7784

85+
// Initialize MySQL storage factory
86+
mysqlDSN := os.Getenv("MYSQL_DSN")
87+
if mysqlDSN == "" {
88+
mysqlDSN = "root:root@tcp(localhost:3306)/submitqueue?parseTime=true"
89+
}
90+
storeFactory, err := mysql.NewFactory(mysql.MySQLParameters{
91+
DSN: mysqlDSN,
92+
MaxOpenConns: 10,
93+
MaxIdleConns: 5,
94+
ConnMaxLifetime: 5 * time.Minute,
95+
})
96+
if err != nil {
97+
return fmt.Errorf("failed to create MySQL storage factory: %w", err)
98+
}
99+
defer storeFactory.Close()
100+
78101
// Create gRPC server
79102
grpcServer := grpc.NewServer()
80103

81-
// Create ping controller and wrap it for gRPC
104+
// Create controllers and wrap them for gRPC
82105
pingController := controller.NewPingController(logger, scope)
106+
landController := controller.NewLandController(logger, scope, storeFactory)
83107
gatewayServer := &GatewayServer{
84-
controller: pingController,
108+
pingController: pingController,
109+
landController: landController,
85110
}
86111
pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer)
87112

88113
// Register reflection service for debugging with grpcurl
89114
reflection.Register(grpcServer)
90115

91-
// Listen on port 8081
92-
port := ":8081"
116+
// Listen on configurable port
117+
port := os.Getenv("PORT")
118+
if port == "" {
119+
port = ":8081"
120+
}
93121
listener, err := net.Listen("tcp", port)
94122
if err != nil {
95123
return fmt.Errorf("failed to listen on port %s: %w", port, err)

examples/server/orchestrator/main.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,11 @@ func run() error {
8888
// Register reflection service for debugging with grpcurl
8989
reflection.Register(grpcServer)
9090

91-
// Listen on port 8082
92-
port := ":8082"
91+
// Listen on configurable port
92+
port := os.Getenv("PORT")
93+
if port == "" {
94+
port = ":8082"
95+
}
9396
listener, err := net.Listen("tcp", port)
9497
if err != nil {
9598
return fmt.Errorf("failed to listen on port %s: %w", port, err)

examples/server/speculator/main.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,11 @@ func run() error {
8888
// Register reflection service for debugging with grpcurl
8989
reflection.Register(grpcServer)
9090

91-
// Listen on port 8083
92-
port := ":8083"
91+
// Listen on configurable port
92+
port := os.Getenv("PORT")
93+
if port == "" {
94+
port = ":8083"
95+
}
9396
listener, err := net.Listen("tcp", port)
9497
if err != nil {
9598
return fmt.Errorf("failed to listen on port %s: %w", port, err)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
filegroup(
2+
name = "schema",
3+
srcs = glob(["*.sql"]),
4+
visibility = ["//visibility:public"],
5+
)

extensions/storage/mysql/schema/request.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ CREATE TABLE IF NOT EXISTS request (
33
seq BIGINT NOT NULL,
44
change_source VARCHAR(255) NOT NULL,
55
change_ids JSON NOT NULL,
6-
land_strategy INT NOT NULL DEFAULT 0,
6+
land_strategy INT NOT NULL,
77
state INT NOT NULL,
8-
version INT NOT NULL DEFAULT 1,
8+
version INT NOT NULL,
99
PRIMARY KEY (queue, seq)
1010
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

gateway/controller/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ go_library(
99
importpath = "github.com/uber/submitqueue/gateway/controller",
1010
visibility = ["//visibility:public"],
1111
deps = [
12+
"//entities",
13+
"//extensions/storage",
1214
"//gateway/protopb",
1315
"@com_github_uber_go_tally_v4//:tally",
1416
"@org_uber_go_zap//:zap",
@@ -23,6 +25,8 @@ go_test(
2325
],
2426
embed = [":controller"],
2527
deps = [
28+
"//entities",
29+
"//extensions/storage",
2630
"//gateway/protopb",
2731
"@com_github_stretchr_testify//assert",
2832
"@com_github_stretchr_testify//require",

gateway/controller/land.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"time"
77

88
"github.com/uber-go/tally/v4"
9+
"github.com/uber/submitqueue/entities"
10+
"github.com/uber/submitqueue/extensions/storage"
911
pb "github.com/uber/submitqueue/gateway/protopb"
1012
"go.uber.org/zap"
1113
)
@@ -14,13 +16,15 @@ import (
1416
type LandController struct {
1517
logger *zap.Logger
1618
metricsScope tally.Scope
19+
storeFactory storage.StoreFactory
1720
}
1821

1922
// NewLandController creates a new instance of the gateway land controller
20-
func NewLandController(logger *zap.Logger, scope tally.Scope) *LandController {
23+
func NewLandController(logger *zap.Logger, scope tally.Scope, storeFactory storage.StoreFactory) *LandController {
2124
return &LandController{
2225
logger: logger,
2326
metricsScope: scope,
27+
storeFactory: storeFactory,
2428
}
2529
}
2630

@@ -33,8 +37,18 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
3337

3438
c.metricsScope.Counter("land_request_count").Inc(1)
3539

36-
// TODO: Implement proper SQID generation and send the request to the appropriate queue. So far unix time to make it sequential.
37-
sqid := fmt.Sprintf("%d", time.Now().Unix())
40+
change := entities.Change{
41+
Source: req.Change.GetSource(),
42+
IDs: req.Change.GetIds(),
43+
}
44+
strategy := entities.RequestLandStrategy(int(req.Strategy))
45+
46+
request, err := c.storeFactory.GetRequestStore().Create(ctx, req.Queue, change, strategy, entities.RequestStateNew)
47+
if err != nil {
48+
return nil, fmt.Errorf("LandController failed to create request for queue=%s: %w", req.Queue, err)
49+
}
50+
51+
sqid := request.GetID()
3852

3953
c.logger.Debug("land request received",
4054
zap.String("queue", req.Queue),

0 commit comments

Comments
 (0)