@@ -16,6 +16,7 @@ package main
1616
1717import (
1818 "context"
19+ "database/sql"
1920 "errors"
2021 "fmt"
2122 "net"
@@ -25,9 +26,18 @@ import (
2526 "syscall"
2627 "time"
2728
29+ _ "github.com/go-sql-driver/mysql"
2830 "github.com/uber-go/tally"
2931 pb "github.com/uber/submitqueue/api/stovepipe/orchestrator/protopb"
32+ "github.com/uber/submitqueue/platform/consumer"
33+ "github.com/uber/submitqueue/platform/errs"
34+ genericerrs "github.com/uber/submitqueue/platform/errs/generic"
35+ mysqlerrs "github.com/uber/submitqueue/platform/errs/mysql"
36+ extqueue "github.com/uber/submitqueue/platform/extension/messagequeue"
37+ queueMySQL "github.com/uber/submitqueue/platform/extension/messagequeue/mysql"
38+ "github.com/uber/submitqueue/stovepipe/core/topickey"
3039 "github.com/uber/submitqueue/stovepipe/orchestrator/controller"
40+ "github.com/uber/submitqueue/stovepipe/orchestrator/controller/start"
3141 "go.uber.org/zap"
3242 "google.golang.org/grpc"
3343 "google.golang.org/grpc/reflection"
@@ -102,6 +112,62 @@ func run() error {
102112 metricsWgDone .Wait ()
103113 }()
104114
115+ queueDSN := os .Getenv ("QUEUE_MYSQL_DSN" )
116+ if queueDSN == "" {
117+ return fmt .Errorf ("QUEUE_MYSQL_DSN environment variable is required" )
118+ }
119+ queueDB , err := sql .Open ("mysql" , queueDSN )
120+ if err != nil {
121+ return fmt .Errorf ("failed to open queue database: %w" , err )
122+ }
123+ defer queueDB .Close ()
124+
125+ mysqlQueue , err := queueMySQL .NewQueue (queueMySQL.Params {
126+ DB : queueDB ,
127+ Logger : logger ,
128+ MetricsScope : scope .SubScope ("queue" ),
129+ })
130+ if err != nil {
131+ return fmt .Errorf ("failed to create queue: %w" , err )
132+ }
133+ defer mysqlQueue .Close ()
134+
135+ logger .Info ("initialized queue" , zap .String ("dsn" , queueDSN ))
136+
137+ subscriberName := os .Getenv ("HOSTNAME" )
138+ if subscriberName == "" {
139+ subscriberName = fmt .Sprintf ("stovepipe-orchestrator-%d" , time .Now ().Unix ())
140+ }
141+
142+ registry , err := newTopicRegistry (mysqlQueue , subscriberName )
143+ if err != nil {
144+ return fmt .Errorf ("failed to create topic registry: %w" , err )
145+ }
146+
147+ primaryConsumer := consumer .New (logger .Sugar (), scope .SubScope ("consumer" ), registry ,
148+ errs .NewClassifierProcessor (
149+ genericerrs .Classifier ,
150+ mysqlerrs .Classifier ,
151+ ),
152+ )
153+
154+ startController := start .NewController (start.Params {
155+ Logger : logger .Sugar (),
156+ Scope : scope ,
157+ Registry : registry ,
158+ TopicKey : topickey .TopicKeyStart ,
159+ ConsumerGroup : "orchestrator-start" ,
160+ })
161+ if err := primaryConsumer .Register (startController ); err != nil {
162+ return fmt .Errorf ("failed to register start controller: %w" , err )
163+ }
164+ logger .Info ("controllers registered" , zap .Int ("primary" , 1 ))
165+
166+ if err := primaryConsumer .Start (ctx ); err != nil {
167+ return fmt .Errorf ("failed to start primary consumer: %w" , err )
168+ }
169+ logger .Info ("consumer started" )
170+
105171 grpcServer := grpc .NewServer ()
106172
107173 pingController := controller .NewPingController (logger , scope )
@@ -140,11 +206,42 @@ func run() error {
140206 serverErr = <- serverErrCh
141207 case serverErr = <- serverErrCh :
142208 fmt .Println ("Shutting down stovepipe orchestrator server due to critical GRPC server error..." )
209+ cancel ()
143210 }
144211
145212 if serverErr != nil {
146- err = fmt .Errorf ("GRPC server exited with error: %w" , serverErr )
213+ serverErr = fmt .Errorf ("GRPC server exited with error: %w" , serverErr )
214+ }
215+
216+ primaryStopErr := primaryConsumer .Stop (30000 )
217+ if primaryStopErr != nil {
218+ primaryStopErr = fmt .Errorf ("failed to stop consumer: %w" , primaryStopErr )
219+ }
220+
221+ if primaryStopErr != nil || serverErr != nil {
222+ err = errors .Join (primaryStopErr , serverErr )
147223 }
148224
149225 return err
150226}
227+
228+ func newTopicRegistry (q extqueue.Queue , subscriberName string ) (consumer.TopicRegistry , error ) {
229+ return consumer .NewTopicRegistry ([]consumer.TopicConfig {
230+ {
231+ Key : topickey .TopicKeyStart ,
232+ Name : "start" ,
233+ Queue : q ,
234+ Subscription : extqueue .DefaultSubscriptionConfig (
235+ subscriberName , "orchestrator-start" ,
236+ ),
237+ },
238+ {
239+ Key : topickey .TopicKeyValidate ,
240+ Name : "validate" ,
241+ Queue : q ,
242+ Subscription : extqueue .DefaultSubscriptionConfig (
243+ subscriberName , "orchestrator-validate" ,
244+ ),
245+ },
246+ })
247+ }
0 commit comments