Skip to content

Commit c80effa

Browse files
committed
feat(queue): complete Gateway → Orchestrator queue integration
Wire up the first stage of the queue pipeline: Gateway publishes land requests to the queue, Orchestrator consumes and processes them. **Consumer Infrastructure:** - Add Consumer interface (Register/Start/Stop) for orchestrating multiple controllers - Add consumer.Delivery interface to enforce separation of concerns (type-safe) - Controllers receive consumer.Delivery (no Ack/Nack), Consumer handles ack/nack - Implement subscription lifecycle, automatic ack/nack, metrics, graceful shutdown **Gateway:** - Land controller publishes requests to land_request queue after storage - Queue infrastructure optional (controlled by QUEUE_MYSQL_DSN env var) **Orchestrator:** - Request controller consumes from land_request queue - Wire up consumer with graceful shutdown in main.go **CLAUDE.md:** - Document RPC vs Queue Message controller patterns - Add code style guidelines: use SugaredLogger, use interfaces for contracts All unit and integration tests pass. Backward compatible with existing tests.
1 parent f2b4750 commit c80effa

22 files changed

Lines changed: 2085 additions & 42 deletions

File tree

CLAUDE.md

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,36 @@ Three services, each following the same layout:
4747
```
4848
<service>/
4949
├── controller/ # Business logic (pure, transport-agnostic)
50+
│ ├── {method}.go # RPC controllers (e.g., land.go, ping.go)
51+
│ ├── {method}_test.go
52+
│ └── {step}/ # Queue message controllers (e.g., request/)
53+
│ ├── {step}.go # Step in workflow
54+
│ └── {step}_test.go
5055
├── proto/ # Proto definitions (.proto files)
5156
├── protopb/ # Generated proto code (committed to repo)
5257
└── integration_test/
5358
```
5459

5560
### Controllers
5661

57-
Controllers contain pure business logic, independent of the transport layer (gRPC/YARPC). They live in `{service}/controller/` and are wired up in `example/server/{service}/main.go`.
62+
Controllers contain pure business logic, independent of infrastructure. There are two types:
63+
64+
**RPC Controllers** - Handle synchronous API requests in `{service}/controller/`. Accept protobuf types, independent of gRPC/YARPC transport.
65+
66+
```go
67+
func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error)
68+
```
69+
70+
**Queue Message Controllers** - Process async queue messages in `{service}/controller/{step}/`. Implement `consumer.Controller` interface.
71+
72+
```go
73+
// Receives consumer.Delivery (NOT extension/queue.Delivery)
74+
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
75+
// Return nil to ack, error to nack. Consumer handles ack/nack automatically.
76+
}
77+
```
78+
79+
Controllers receive `consumer.Delivery` (subset interface without Ack/Nack methods) to enforce separation: controllers do business logic, consumer framework handles infrastructure.
5880

5981
### Entities
6082

@@ -70,7 +92,10 @@ entity/
7092
**Entity guidelines:**
7193
1. Keep entities pure and framework-agnostic — no external dependencies
7294
2. Use value types, not references
73-
3. Prefer `int64` Unix epoch milliseconds over `time.Time`
95+
3. Prefer `int64` milliseconds over `time.Time` and `time.Duration`:
96+
- Timestamps: Unix epoch milliseconds (e.g., `CreatedAt int64`) — use `time.UnixMilli()` method
97+
- Durations/timeouts: milliseconds (e.g., `TimeoutMs int64`, `DelayMs int64`)
98+
- Use `time.Duration(ms) * time.Millisecond` to convert to `time.Duration` when needed
7499
4. Every field must have a comment explaining its meaning
75100
5. Reference other entities by ID (string or int), not directly
76101
6. Use string enums with clear names; assign sentinel values (`""` for strings, `0` for ints) to unreachable/unknown enum variants
@@ -104,7 +129,9 @@ extension/
104129

105130
### Import Paths
106131

107-
- Controllers: `github.com/uber/submitqueue/{service}/controller`
132+
- RPC Controllers: `github.com/uber/submitqueue/{service}/controller`
133+
- Queue Controllers: `github.com/uber/submitqueue/{service}/controller/{step}`
134+
- Consumer: `github.com/uber/submitqueue/consumer`
108135
- Proto (generated): `github.com/uber/submitqueue/{service}/protopb`
109136
- Extensions: `github.com/uber/submitqueue/extension/{extension}`
110137
- Extension impl: `github.com/uber/submitqueue/extension/{extension}/{impl}`
@@ -164,6 +191,11 @@ All generated proto files are **committed to the repository**. When modifying `.
164191
- Tests: `{file}_test.go`
165192
- BUILD files: Always `BUILD.bazel`
166193

194+
### Directory Naming
195+
196+
- Use **singular** names for directories (e.g., `mock/` not `mocks/`, `entity/` not `entities/`)
197+
- This applies to all folders including test mocks, extensions, entities, and service directories
198+
167199
### Common Make Targets
168200

169201
```bash
@@ -189,6 +221,10 @@ make clean-proto # Remove generated proto files
189221
3. Add controller in `{service}/controller/`
190222
4. Wire up in `example/server/{service}/main.go`
191223

224+
**Add new queue message controller:**
225+
1. Create `{service}/controller/{step}/` with controller implementing `consumer.Controller`
226+
2. Wire up in `example/server/{service}/main.go`: register → start → stop on shutdown
227+
192228
**Add new extension implementation:**
193229
1. Create `extension/{extension}/{impl}/` directory
194230
2. Implement factory and core interfaces
@@ -204,3 +240,20 @@ make clean-proto # Remove generated proto files
204240
1. **Avoid asserting on error messages** — assert on error type if it is part of the contract, or assert generic error otherwise.
205241
2. **Avoid blocking operations for synchronization** — do not use `time.Sleep`. Design the tested routine to signal back (channels, callbacks, condition variables).
206242
3. **Use testify assertions** — use `stretchr/assert` or `require` instead of `t.Fatal()`.
243+
244+
### Code Style Guidelines
245+
246+
1. **Use SugaredLogger for structured logging** — always use `zap.SugaredLogger` with structured logging methods:
247+
- `logger.Debugw(msg, key1, val1, key2, val2, ...)` for debug logs
248+
- `logger.Infow(msg, key1, val1, key2, val2, ...)` for info logs
249+
- `logger.Errorw(msg, key1, val1, key2, val2, ...)` for error logs
250+
- Never use unstructured methods like `Debug()`, `Info()`, `Error()`, or `Printf()`
251+
- Example: `logger.Infow("starting consumer", "subscriber_name", subscriberName, "controller_count", len(controllers))`
252+
253+
2. **Use interfaces for contracts** — define interfaces for public APIs and dependencies:
254+
- Public components should return/accept interfaces, not concrete structs
255+
- Unexported structs implement the interfaces
256+
- Makes testing easier through mocking
257+
- Example: `func New(...) Consumer` returns interface, not `*consumer`
258+
- Implementation struct is unexported: `type consumer struct { ... }`
259+

consumer/BUILD.bazel

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "consumer",
5+
srcs = [
6+
"consumer.go",
7+
"controller.go",
8+
],
9+
importpath = "github.com/uber/submitqueue/consumer",
10+
visibility = ["//visibility:public"],
11+
deps = [
12+
"//entity/queue",
13+
"//extension/queue",
14+
"@com_github_uber_go_tally_v4//:tally",
15+
"@org_uber_go_zap//:zap",
16+
],
17+
)
18+
19+
go_test(
20+
name = "consumer_test",
21+
srcs = ["consumer_test.go"],
22+
embed = [":consumer"],
23+
deps = [
24+
"//entity/queue",
25+
"//extension/queue",
26+
"@com_github_stretchr_testify//assert",
27+
"@com_github_stretchr_testify//require",
28+
"@com_github_uber_go_tally_v4//:tally",
29+
"@org_uber_go_zap//zaptest",
30+
],
31+
)

0 commit comments

Comments
 (0)