@@ -16,6 +16,7 @@ package main
1616
1717import (
1818 "context"
19+ "database/sql"
1920 "errors"
2021 "fmt"
2122 "net"
@@ -25,25 +26,44 @@ import (
2526 "syscall"
2627 "time"
2728
29+ _ "github.com/go-sql-driver/mysql"
2830 "github.com/uber-go/tally"
2931 pb "github.com/uber/submitqueue/api/stovepipe/gateway/protopb"
32+ "github.com/uber/submitqueue/platform/consumer"
33+ "github.com/uber/submitqueue/platform/errs"
34+ genericerrs "github.com/uber/submitqueue/platform/errs/generic"
35+ mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql"
36+ mysqlcounter "github.com/uber/submitqueue/platform/extension/counter/mysql"
37+ extqueue "github.com/uber/submitqueue/platform/extension/messagequeue"
38+ queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql"
39+ "github.com/uber/submitqueue/stovepipe/core/topickey"
40+ mysqlstorage "github.com/uber/submitqueue/stovepipe/extension/storage/mysql"
3041 "github.com/uber/submitqueue/stovepipe/gateway/controller"
42+ logctrl "github.com/uber/submitqueue/stovepipe/gateway/controller/log"
3143 "go.uber.org/zap"
3244 "google.golang.org/grpc"
45+ "google.golang.org/grpc/codes"
3346 "google.golang.org/grpc/reflection"
47+ "google.golang.org/grpc/status"
3448)
3549
36- // GatewayServer wraps the controller and implements the gRPC service interface.
50+ // GatewayServer wraps the controllers and implements the gRPC service interface.
3751type GatewayServer struct {
3852 pb.UnimplementedStovepipeGatewayServer
39- pingController * controller.PingController
53+ pingController * controller.PingController
54+ ingestController * controller.IngestController
4055}
4156
4257// Ping delegates to the controller.
4358func (s * GatewayServer ) Ping (ctx context.Context , req * pb.PingRequest ) (* pb.PingResponse , error ) {
4459 return s .pingController .Ping (ctx , req )
4560}
4661
62+ // Ingest delegates to the controller.
63+ func (s * GatewayServer ) Ingest (ctx context.Context , req * pb.IngestRequest ) (* pb.IngestResponse , error ) {
64+ return s .ingestController .Ingest (ctx , req )
65+ }
66+
4767func main () {
4868 code := 0
4969 if err := run (); err != nil {
@@ -105,19 +125,142 @@ func run() error {
105125 metricsWgDone .Wait ()
106126 }()
107127
108- // Create gRPC server
109- grpcServer := grpc .NewServer ()
128+ // Open application database connection.
129+ // Docker Compose healthchecks ensure MySQL is ready before service starts.
130+ appDSN := os .Getenv ("MYSQL_DSN" )
131+ if appDSN == "" {
132+ return fmt .Errorf ("MYSQL_DSN environment variable is required" )
133+ }
134+ appDB , err := sql .Open ("mysql" , appDSN )
135+ if err != nil {
136+ return fmt .Errorf ("failed to open app database: %w" , err )
137+ }
138+ defer appDB .Close ()
139+
140+ // Initialize counter from shared app database connection. The ingest controller uses it to
141+ // mint a SPID per ingest request.
142+ cnt := mysqlcounter .NewCounter (appDB , scope .SubScope ("counter" ))
143+
144+ // Open queue database connection
145+ queueDSN := os .Getenv ("QUEUE_MYSQL_DSN" )
146+ if queueDSN == "" {
147+ return fmt .Errorf ("QUEUE_MYSQL_DSN environment variable is required" )
148+ }
149+ queueDB , err := sql .Open ("mysql" , queueDSN )
150+ if err != nil {
151+ return fmt .Errorf ("failed to open queue database: %w" , err )
152+ }
153+ defer queueDB .Close ()
154+
155+ // Initialize queue
156+ mysqlQueue , err := queueMySQL .NewQueue (queueMySQL.Params {
157+ DB : queueDB ,
158+ Logger : logger ,
159+ MetricsScope : scope .SubScope ("queue" ),
160+ })
161+ if err != nil {
162+ return fmt .Errorf ("failed to create queue: %w" , err )
163+ }
164+ defer mysqlQueue .Close ()
165+
166+ logger .Info ("initialized dependencies" ,
167+ zap .String ("app_dsn" , appDSN ),
168+ zap .String ("queue_dsn" , queueDSN ),
169+ )
110170
111- // Create ping controller and wrap it for gRPC
171+ // Subscriber name for the log-topic consumer. It must be unique per running
172+ // instance: SubscriberName identifies a subscriber for partition leases, so
173+ // two gateway processes on the same host (sharing HOSTNAME) would otherwise
174+ // contend for the same lease. Append the PID to keep co-located instances
175+ // distinct; the PID is stable for the life of the process. Offset tracking
176+ // stays keyed on the shared ConsumerGroup ("gateway-log"), not this name.
177+ // Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs).
178+ hostname := os .Getenv ("HOSTNAME" )
179+ if hostname == "" {
180+ hostname = fmt .Sprintf ("stovepipe-gateway-%d" , time .Now ().Unix ())
181+ }
182+ subscriberName := fmt .Sprintf ("%s-%d" , hostname , os .Getpid ())
183+
184+ // Build the topic registry. The gateway publishes ingest requests to the start of the
185+ // orchestrator pipeline (TopicKeyStart) — publish-only. It additionally consumes the log topic
186+ // (TopicKeyLog): the gateway is the sole writer of the request log, persisting entries that the
187+ // orchestrator publishes there.
188+ registry , err := consumer .NewTopicRegistry ([]consumer.TopicConfig {
189+ {Key : topickey .TopicKeyStart , Name : "start" , Queue : mysqlQueue },
190+ {
191+ Key : topickey .TopicKeyLog ,
192+ Name : "log" ,
193+ Queue : mysqlQueue ,
194+ Subscription : extqueue .DefaultSubscriptionConfig (
195+ subscriberName , "gateway-log" ,
196+ ),
197+ },
198+ })
199+ if err != nil {
200+ return fmt .Errorf ("failed to create topic registry: %w" , err )
201+ }
202+
203+ // Create gRPC server with a unary interceptor that translates user-input
204+ // validation errors (anything in the chain that matches controller.ErrInvalidRequest)
205+ // into codes.InvalidArgument so gRPC clients can distinguish bad input from
206+ // infrastructure failures. Other errors pass through unchanged.
207+ grpcServer := grpc .NewServer (grpc .UnaryInterceptor (
208+ func (ctx context.Context , req interface {}, info * grpc.UnaryServerInfo , handler grpc.UnaryHandler ) (interface {}, error ) {
209+ resp , err := handler (ctx , req )
210+ if err != nil && controller .IsInvalidRequest (err ) {
211+ return nil , status .Error (codes .InvalidArgument , err .Error ())
212+ }
213+ return resp , err
214+ },
215+ ))
216+
217+ // Initialize storage from the shared app database connection. The ingest controller writes the
218+ // accepted entry to the request log directly; the log consumer (registered below) is the sole
219+ // persister of request log entries published by the orchestrator.
220+ store , err := mysqlstorage .NewStorage (appDB , scope .SubScope ("storage" ))
221+ if err != nil {
222+ return fmt .Errorf ("failed to create storage: %w" , err )
223+ }
224+ requestLogStore := store .GetRequestLogStore ()
225+
226+ // Create controllers and wrap them for gRPC
112227 pingController := controller .NewPingController (logger , scope )
113- srv := & GatewayServer {
114- pingController : pingController ,
228+ ingestController := controller .NewIngestController (logger .Sugar (), scope , cnt , requestLogStore , registry )
229+ gatewayServer := & GatewayServer {
230+ pingController : pingController ,
231+ ingestController : ingestController ,
115232 }
116- pb .RegisterStovepipeGatewayServer (grpcServer , srv )
233+
234+ pb .RegisterStovepipeGatewayServer (grpcServer , gatewayServer )
117235
118236 // Register reflection service for debugging with grpcurl
119237 reflection .Register (grpcServer )
120238
239+ // Create the queue consumer and register the log controller. The gateway is
240+ // the sole persister of the request log: the orchestrator publishes entries
241+ // to the log topic and this consumer writes them to storage.
242+ logConsumer := consumer .New (logger .Sugar (), scope .SubScope ("consumer" ), registry ,
243+ errs .NewClassifierProcessor (
244+ // Storage (stovepipe/extension/storage/mysql) and queue (platform/extension/messagequeue/mysql)
245+ // both run on the same MySQL driver, so a single classifier covers
246+ // errors surfaced from either backend.
247+ genericerrs .Classifier ,
248+ mysqlerrs .Classifier ,
249+ ),
250+ )
251+
252+ logController := logctrl .NewController (logger .Sugar (), scope , requestLogStore , topickey .TopicKeyLog , "gateway-log" )
253+ if err := logConsumer .Register (logController ); err != nil {
254+ return fmt .Errorf ("failed to register log controller: %w" , err )
255+ }
256+
257+ if err := logConsumer .Start (ctx ); err != nil {
258+ // The error can also be a result of a context cancellation due to SIGINT or SIGTERM.
259+ // This is expected, just propagate it.
260+ return fmt .Errorf ("failed to start log consumer: %w" , err )
261+ }
262+ logger .Info ("log consumer started" )
263+
121264 // Listen on configurable port
122265 port := os .Getenv ("PORT" )
123266 if port == "" {
@@ -137,9 +280,11 @@ func run() error {
137280 serverErrCh <- grpcServer .Serve (listener )
138281 }()
139282
140- // Wait for interrupt signal or server critical error
141- // If interruption is signaled, gracefully stop the server
142- // If an error happens during shutdown, return the actual error, not the context cancellation error
283+ // Wait for interrupt signal or server critical error.
284+ // If interruption is signaled, gracefully stop the server.
285+ // If the server exits with an error, cancel the context to signal the consumer.
286+ // After this, stop the consumer.
287+ // If an error happens during shutdown, return the actual error, not the context cancellation error.
143288 var serverErr error
144289 select {
145290 case <- ctx .Done ():
@@ -155,10 +300,27 @@ func run() error {
155300 serverErr = <- serverErrCh
156301 case serverErr = <- serverErrCh :
157302 fmt .Println ("Shutting down stovepipe gateway server due to critical GRPC server error..." )
303+
304+ // Cancel the context to signal cancellation to the queue consumer
305+ cancel ()
158306 }
159307
160308 if serverErr != nil {
161- err = fmt .Errorf ("GRPC server exited with error: %w" , serverErr )
309+ serverErr = fmt .Errorf ("GRPC server exited with error: %w" , serverErr )
310+ }
311+
312+ // Stop the consumer with a 30s timeout; by this time the context should be
313+ // cancelled and the processing threads may already be exiting; recollect them.
314+ errStop := logConsumer .Stop (30000 )
315+ if errStop != nil {
316+ errStop = fmt .Errorf ("failed to stop consumer: %w" , errStop )
317+ }
318+
319+ if errStop != nil || serverErr != nil {
320+ // Override context cancellation error with the shutdown error. The server
321+ // error is the primary/root failure, so it leads; the consumer-stop error
322+ // is secondary cleanup.
323+ err = errors .Join (serverErr , errStop )
162324 }
163325
164326 return err
0 commit comments