Skip to content

Commit 45c83ed

Browse files
kevinlnewclaude
andcommitted
feat(pushqueue): add example gateway server and client
Wires up noop VCS and passthrough queue implementations so the pushqueue gateway can start and serve Ping/Land/CheckMergeability RPCs for local development. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c4d2722 commit 45c83ed

4 files changed

Lines changed: 317 additions & 0 deletions

File tree

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
load("@rules_go//go:def.bzl", "go_binary", "go_library")
2+
3+
go_library(
4+
name = "client_lib",
5+
srcs = ["main.go"],
6+
importpath = "github.com/uber/submitqueue/example/pushqueue/gateway/client",
7+
visibility = ["//visibility:private"],
8+
deps = [
9+
"//pushqueue/gateway/protopb",
10+
"@org_golang_google_grpc//:grpc",
11+
"@org_golang_google_grpc//credentials/insecure",
12+
],
13+
)
14+
15+
go_binary(
16+
name = "client",
17+
embed = [":client_lib"],
18+
visibility = ["//visibility:public"],
19+
)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"context"
19+
"flag"
20+
"fmt"
21+
"os"
22+
"time"
23+
24+
pb "github.com/uber/submitqueue/pushqueue/gateway/protopb"
25+
"google.golang.org/grpc"
26+
"google.golang.org/grpc/credentials/insecure"
27+
)
28+
29+
func main() {
30+
addr := flag.String("addr", "localhost:8084", "pushqueue gateway server address")
31+
message := flag.String("message", "", "message to send in ping request")
32+
timeout := flag.Duration("timeout", 5*time.Second, "request timeout")
33+
flag.Parse()
34+
35+
if err := run(*addr, *message, *timeout); err != nil {
36+
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
37+
os.Exit(1)
38+
}
39+
}
40+
41+
func run(addr, message string, timeout time.Duration) error {
42+
conn, err := grpc.NewClient(
43+
addr,
44+
grpc.WithTransportCredentials(insecure.NewCredentials()),
45+
)
46+
if err != nil {
47+
return fmt.Errorf("failed to connect: %w", err)
48+
}
49+
defer conn.Close()
50+
51+
client := pb.NewPushQueueGatewayClient(conn)
52+
53+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
54+
defer cancel()
55+
56+
req := &pb.PingRequest{
57+
Message: message,
58+
}
59+
60+
fmt.Printf("Sending ping to pushqueue gateway at %s...\n", addr)
61+
resp, err := client.Ping(ctx, req)
62+
if err != nil {
63+
return fmt.Errorf("ping failed: %w", err)
64+
}
65+
66+
fmt.Printf("\nResponse:\n")
67+
fmt.Printf(" Message: %s\n", resp.Message)
68+
fmt.Printf(" Service Name: %s\n", resp.ServiceName)
69+
fmt.Printf(" Timestamp: %d (%s)\n", resp.Timestamp, time.Unix(resp.Timestamp, 0))
70+
fmt.Printf(" Hostname: %s\n", resp.Hostname)
71+
72+
return nil
73+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
load("@rules_go//go:def.bzl", "go_binary", "go_library")
2+
3+
go_library(
4+
name = "server_lib",
5+
srcs = ["main.go"],
6+
importpath = "github.com/uber/submitqueue/example/pushqueue/gateway/server",
7+
visibility = ["//visibility:private"],
8+
deps = [
9+
"//pushqueue/entity",
10+
"//pushqueue/extension/landqueue",
11+
"//pushqueue/extension/vcs",
12+
"//pushqueue/gateway/controller",
13+
"//pushqueue/gateway/protopb",
14+
"@com_github_uber_go_tally_v4//:tally",
15+
"@org_golang_google_grpc//:grpc",
16+
"@org_golang_google_grpc//reflection",
17+
"@org_uber_go_zap//:zap",
18+
],
19+
)
20+
21+
go_binary(
22+
name = "server",
23+
embed = [":server_lib"],
24+
visibility = ["//visibility:public"],
25+
)
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
"net"
22+
"os"
23+
"os/signal"
24+
"sync"
25+
"syscall"
26+
"time"
27+
28+
"github.com/uber-go/tally/v4"
29+
"github.com/uber/submitqueue/pushqueue/entity"
30+
"github.com/uber/submitqueue/pushqueue/extension/landqueue"
31+
"github.com/uber/submitqueue/pushqueue/extension/vcs"
32+
"github.com/uber/submitqueue/pushqueue/gateway/controller"
33+
pb "github.com/uber/submitqueue/pushqueue/gateway/protopb"
34+
"go.uber.org/zap"
35+
"google.golang.org/grpc"
36+
"google.golang.org/grpc/reflection"
37+
)
38+
39+
// GatewayServer wraps the controllers and implements the gRPC service interface.
40+
type GatewayServer struct {
41+
pb.UnimplementedPushQueueGatewayServer
42+
pingController *controller.PingController
43+
landController *controller.LandController
44+
}
45+
46+
// Ping delegates to the controller.
47+
func (s *GatewayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
48+
return s.pingController.Ping(ctx, req)
49+
}
50+
51+
// Land delegates to the controller.
52+
func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) {
53+
return s.landController.Land(ctx, req)
54+
}
55+
56+
// CheckMergeability delegates to the controller.
57+
func (s *GatewayServer) CheckMergeability(ctx context.Context, req *pb.CheckMergeabilityRequest) (*pb.CheckMergeabilityResponse, error) {
58+
return s.landController.CheckMergeability(ctx, req)
59+
}
60+
61+
func main() {
62+
code := 0
63+
if err := run(); err != nil {
64+
if errors.Is(err, context.Canceled) {
65+
fmt.Println("PushQueue gateway server stopped by signal")
66+
code = 128 + int(syscall.SIGTERM)
67+
} else {
68+
fmt.Fprintf(os.Stderr, "PushQueue gateway server failure: %v\n", err)
69+
code = 1
70+
}
71+
}
72+
os.Exit(code)
73+
}
74+
75+
func run() error {
76+
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
77+
defer cancel()
78+
79+
logger, err := zap.NewDevelopment()
80+
if err != nil {
81+
return fmt.Errorf("failed to create logger: %w", err)
82+
}
83+
defer logger.Sync()
84+
85+
scope := tally.NewTestScope("pushqueue_gateway", nil)
86+
metricsStopCh := make(chan any, 1)
87+
metricsWgDone := sync.WaitGroup{}
88+
metricsWgDone.Add(1)
89+
go func() {
90+
defer metricsWgDone.Done()
91+
92+
ticker := time.NewTicker(10 * time.Second)
93+
defer ticker.Stop()
94+
95+
for {
96+
select {
97+
case <-metricsStopCh:
98+
return
99+
case <-ticker.C:
100+
snapshot := scope.Snapshot()
101+
logger.Info("metrics snapshot",
102+
zap.Any("counters", snapshot.Counters()),
103+
zap.Any("gauges", snapshot.Gauges()),
104+
zap.Any("timers", snapshot.Timers()),
105+
)
106+
}
107+
}
108+
}()
109+
110+
defer func() {
111+
close(metricsStopCh)
112+
metricsWgDone.Wait()
113+
}()
114+
115+
grpcServer := grpc.NewServer()
116+
117+
pingController := controller.NewPingController(logger.Sugar(), scope)
118+
landController := controller.NewLandController(logger.Sugar(), scope, noopVCS{}, noopQueue{})
119+
srv := &GatewayServer{
120+
pingController: pingController,
121+
landController: landController,
122+
}
123+
pb.RegisterPushQueueGatewayServer(grpcServer, srv)
124+
125+
reflection.Register(grpcServer)
126+
127+
port := os.Getenv("PORT")
128+
if port == "" {
129+
port = ":8084"
130+
}
131+
listener, err := net.Listen("tcp", port)
132+
if err != nil {
133+
return fmt.Errorf("failed to listen on port %s: %w", port, err)
134+
}
135+
136+
fmt.Printf("PushQueue gateway gRPC server is running on %s\n", port)
137+
fmt.Println("Press Ctrl+C to stop, or send a SIGTERM.")
138+
139+
serverErrCh := make(chan error, 1)
140+
go func() {
141+
serverErrCh <- grpcServer.Serve(listener)
142+
}()
143+
144+
var serverErr error
145+
select {
146+
case <-ctx.Done():
147+
fmt.Println("Shutting down pushqueue gateway server due to interruption signal...")
148+
err = ctx.Err()
149+
grpcServer.GracefulStop()
150+
serverErr = <-serverErrCh
151+
case serverErr = <-serverErrCh:
152+
fmt.Println("Shutting down pushqueue gateway server due to critical GRPC server error...")
153+
}
154+
155+
if serverErr != nil {
156+
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
157+
}
158+
159+
return err
160+
}
161+
162+
// noopVCS is a placeholder VCS that errors on every operation.
163+
type noopVCS struct{}
164+
165+
func (noopVCS) CheckMergeability(_ context.Context, _ entity.QueueTarget, items []entity.LandItem) ([]vcs.MergeabilityResult, error) {
166+
results := make([]vcs.MergeabilityResult, len(items))
167+
for i := range results {
168+
results[i] = vcs.MergeabilityResult{Mergeable: false, Reason: "noop VCS: not configured"}
169+
}
170+
return results, nil
171+
}
172+
173+
func (noopVCS) Prepare(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error {
174+
return fmt.Errorf("noop VCS: not configured")
175+
}
176+
177+
func (noopVCS) Push(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) (vcs.PushResult, error) {
178+
return vcs.PushResult{}, fmt.Errorf("noop VCS: not configured")
179+
}
180+
181+
func (noopVCS) Finalize(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error {
182+
return fmt.Errorf("noop VCS: not configured")
183+
}
184+
185+
// noopQueue is a placeholder Queue that passes through immediately.
186+
type noopQueue struct{}
187+
188+
var _ landqueue.Queue = noopQueue{}
189+
190+
func (noopQueue) Enqueue(_ context.Context, _ entity.QueueTarget, _ []entity.LandItem) error {
191+
return nil
192+
}
193+
194+
func (noopQueue) Wait(_ context.Context, _ entity.QueueTarget) error {
195+
return nil
196+
}
197+
198+
func (noopQueue) Dequeue(_ context.Context, _ entity.QueueTarget) error {
199+
return nil
200+
}

0 commit comments

Comments
 (0)