Skip to content

Commit 561f86e

Browse files
refactor(request-log): address review on gateway sole-owner PR
- Make the log-consumer subscriber name unique per instance (hostname+PID) so co-located gateway processes don't contend for the same partition lease. - Report the gRPC server error first in the shutdown errors.Join (it is the primary failure; consumer-stop is secondary cleanup). - Clarify in the README that the gateway is the sole owner (writer and reader) of the request log; Status/Cancel read directly, orchestrator only publishes. - Extract named poll constants (persistTimeout/persistPollInterval) in the gateway integration and e2e suites with a comment explaining that the in-container consumer is observed black-box via Status, so a bounded poll is used in lieu of an in-process channel/HookSignal wait. Follow-ups split out: design doc (#211) and DLQ PublishLog() (#212). Co-Authored-By: Oz <oz-agent@warp.dev>
1 parent 4e5ae69 commit 561f86e

4 files changed

Lines changed: 46 additions & 11 deletions

File tree

example/submitqueue/gateway/server/main.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,18 @@ func run() error {
178178
zap.String("queue_dsn", queueDSN),
179179
)
180180

181-
// Stable subscriber name for the log-topic consumer. Falls back to a
182-
// time-seeded name when HOSTNAME is unset (e.g. local runs).
183-
subscriberName := os.Getenv("HOSTNAME")
184-
if subscriberName == "" {
185-
subscriberName = fmt.Sprintf("gateway-%d", time.Now().Unix())
181+
// Subscriber name for the log-topic consumer. It must be unique per running
182+
// instance: SubscriberName identifies a subscriber for partition leases, so
183+
// two gateway processes on the same host (sharing HOSTNAME) would otherwise
184+
// contend for the same lease. Append the PID to keep co-located instances
185+
// distinct; the PID is stable for the life of the process. Offset tracking
186+
// stays keyed on the shared ConsumerGroup ("gateway-log"), not this name.
187+
// Falls back to a time-seeded name when HOSTNAME is unset (e.g. local runs).
188+
hostname := os.Getenv("HOSTNAME")
189+
if hostname == "" {
190+
hostname = fmt.Sprintf("gateway-%d", time.Now().Unix())
186191
}
192+
subscriberName := fmt.Sprintf("%s-%d", hostname, os.Getpid())
187193

188194
// Build the topic registry. The gateway publishes to the start of the
189195
// orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) —
@@ -337,8 +343,10 @@ func run() error {
337343
}
338344

339345
if errStop != nil || serverErr != nil {
340-
// Override context cancellation error with the shutdown error
341-
err = errors.Join(errStop, serverErr)
346+
// Override context cancellation error with the shutdown error. The server
347+
// error is the primary/root failure, so it leads; the consumer-stop error
348+
// is secondary cleanup.
349+
err = errors.Join(serverErr, errStop)
342350
}
343351

344352
return err

submitqueue/gateway/README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ orchestrator pipeline asynchronously via the message queue.
66

77
## Request log ownership
88

9-
The gateway is the **sole writer of the request log**. No other service persists
10-
request log entries:
9+
The gateway is the **sole owner of the request log** — the only service that
10+
both writes and reads it. No other service persists or reads request log
11+
entries:
1112

1213
- For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on
1314
`Cancel`), the gateway writes directly to storage so the entry is visible the
@@ -16,5 +17,9 @@ request log entries:
1617
the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a
1718
consumer that drains the `log` topic and persists each entry to storage.
1819

20+
Reads are likewise gateway-only: the `Status` and `Cancel` RPCs read the request
21+
log directly from storage. The orchestrator only *publishes* log entries and
22+
never touches the request log store.
23+
1924
This keeps a single service responsible for the request log while letting the
2025
orchestrator remain free of storage writes for it.

test/e2e/submitqueue/suite_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ func TestE2EIntegration(t *testing.T) {
5656
suite.Run(t, new(E2EIntegrationSuite))
5757
}
5858

59+
// The gateway log consumer runs inside the gateway-service container, so this
60+
// suite can only observe persistence black-box through the Status RPC — there is
61+
// no in-process channel/HookSignal to wait on across the container boundary. A
62+
// bounded poll is therefore the deterministic-enough analog: persistTimeout is a
63+
// safety net (a failure here means something is genuinely stuck, not a timing
64+
// race), and persistPollInterval bounds how often we re-query.
65+
const (
66+
persistTimeout = 30 * time.Second
67+
persistPollInterval = 500 * time.Millisecond
68+
)
69+
5970
func (s *E2EIntegrationSuite) SetupSuite() {
6071
t := s.T()
6172
s.ctx = context.Background()
@@ -202,7 +213,7 @@ func (s *E2EIntegrationSuite) TestLandRequest_PersistsStartedLogViaGatewayConsum
202213
}
203214
s.log.Logf("Status(%s) = %q", sqid, resp.Status)
204215
return resp.Status == string(entity.RequestStatusStarted)
205-
}, 30*time.Second, 500*time.Millisecond,
216+
}, persistTimeout, persistPollInterval,
206217
"request %s should reach status %q via the gateway log consumer", sqid, entity.RequestStatusStarted)
207218

208219
s.log.Logf("Gateway consumer persisted orchestrator-published 'started' log for sqid=%s", sqid)

test/integration/submitqueue/gateway/suite_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@ func TestGatewayIntegration(t *testing.T) {
6060
suite.Run(t, new(GatewayIntegrationSuite))
6161
}
6262

63+
// The log consumer runs inside the gateway-service container, so this suite can
64+
// only observe persistence black-box through the Status RPC — there is no
65+
// in-process channel/HookSignal to wait on across the container boundary. A
66+
// bounded poll is therefore the deterministic-enough analog: persistTimeout is a
67+
// safety net (a failure here means something is genuinely stuck, not a timing
68+
// race), and persistPollInterval bounds how often we re-query.
69+
const (
70+
persistTimeout = 30 * time.Second
71+
persistPollInterval = 500 * time.Millisecond
72+
)
73+
6374
func (s *GatewayIntegrationSuite) SetupSuite() {
6475
t := s.T()
6576
s.ctx = context.Background()
@@ -190,7 +201,7 @@ func (s *GatewayIntegrationSuite) TestRequestLogConsumer() {
190201
return false
191202
}
192203
return resp.Status == string(entity.RequestStatusStarted)
193-
}, 30*time.Second, 500*time.Millisecond,
204+
}, persistTimeout, persistPollInterval,
194205
"gateway log consumer should persist the published request log for sqid=%s", sqid)
195206

196207
s.log.Logf("Request log consumer test passed: entry persisted and readable via Status")

0 commit comments

Comments
 (0)