@@ -19,6 +19,7 @@ import (
1919 "database/sql"
2020 "errors"
2121 "fmt"
22+ "net"
2223 "os"
2324 "os/signal"
2425 "sync"
@@ -28,6 +29,8 @@ import (
2829 _ "github.com/go-sql-driver/mysql"
2930 "github.com/uber-go/tally"
3031 "go.uber.org/zap"
32+ "google.golang.org/grpc"
33+ "google.golang.org/grpc/reflection"
3134
3235 "github.com/uber/submitqueue/core/consumer"
3336 "github.com/uber/submitqueue/core/errs"
@@ -38,8 +41,10 @@ import (
3841 "github.com/uber/submitqueue/runway/core/topickey"
3942 "github.com/uber/submitqueue/runway/extension/vcs"
4043 "github.com/uber/submitqueue/runway/extension/vcs/noop"
44+ "github.com/uber/submitqueue/runway/orchestrator/controller"
4145 "github.com/uber/submitqueue/runway/orchestrator/controller/check"
4246 "github.com/uber/submitqueue/runway/orchestrator/controller/land"
47+ pb "github.com/uber/submitqueue/runway/orchestrator/protopb"
4348)
4449
4550func main () {
@@ -56,6 +61,17 @@ func main() {
5661 os .Exit (code )
5762}
5863
64+ // OrchestratorServer wraps the controller and implements the gRPC service interface.
65+ type OrchestratorServer struct {
66+ pb.UnimplementedRunwayOrchestratorServer
67+ pingController * controller.PingController
68+ }
69+
70+ // Ping delegates to the controller.
71+ func (s * OrchestratorServer ) Ping (ctx context.Context , req * pb.PingRequest ) (* pb.PingResponse , error ) {
72+ return s .pingController .Ping (ctx , req )
73+ }
74+
5975// noopVCSFactory returns the noop VCS for any configuration.
6076type noopVCSFactory struct {}
6177
@@ -153,16 +169,48 @@ func run() error {
153169 return fmt .Errorf ("failed to start consumer: %w" , err )
154170 }
155171
156- fmt .Println ("Runway orchestrator is running. Press Ctrl+C to stop, or send a SIGTERM." )
172+ // gRPC server for health checks (Ping).
173+ grpcServer := grpc .NewServer ()
174+ pingController := controller .NewPingController (logger , scope )
175+ srv := & OrchestratorServer {pingController : pingController }
176+ pb .RegisterRunwayOrchestratorServer (grpcServer , srv )
177+ reflection .Register (grpcServer )
178+
179+ port := os .Getenv ("PORT" )
180+ if port == "" {
181+ port = ":8085"
182+ }
183+ listener , err := net .Listen ("tcp" , port )
184+ if err != nil {
185+ return fmt .Errorf ("failed to listen on port %s: %w" , port , err )
186+ }
187+
188+ fmt .Printf ("Runway orchestrator is running on %s\n " , port )
189+ fmt .Println ("Press Ctrl+C to stop, or send a SIGTERM." )
157190
158- <- ctx .Done ()
159- fmt .Println ("Shutting down runway orchestrator..." )
191+ serverErrCh := make (chan error , 1 )
192+ go func () {
193+ serverErrCh <- grpcServer .Serve (listener )
194+ }()
160195
161- err = ctx .Err ()
196+ var serverErr error
197+ select {
198+ case <- ctx .Done ():
199+ fmt .Println ("Shutting down runway orchestrator..." )
200+ err = ctx .Err ()
201+ grpcServer .GracefulStop ()
202+ serverErr = <- serverErrCh
203+ case serverErr = <- serverErrCh :
204+ fmt .Println ("Shutting down runway orchestrator due to gRPC server error..." )
205+ }
206+
207+ if serverErr != nil {
208+ err = fmt .Errorf ("gRPC server exited with error: %w" , serverErr )
209+ }
162210
163211 stopErr := primaryConsumer .Stop (30000 )
164212 if stopErr != nil {
165- err = fmt .Errorf ("failed to stop consumer: %w" , stopErr )
213+ err = errors . Join ( err , fmt .Errorf ("failed to stop consumer: %w" , stopErr ) )
166214 }
167215
168216 return err
0 commit comments