Skip to content

Commit 7c00f7b

Browse files
albertywuclaude
andcommitted
refactor(storage): replace batch (queue,state) index with active_batch membership
Replace the secondary index over the batch table's mutable `state` column with an `active_batch` membership table that answers the only queue-scoped query the pipeline needs: "which batches in this queue are still active?" (the batch controller uses it to find conflict dependencies; the cancel controller uses it to find the batch holding a request). A row is intended to exist while its batch is non-terminal, so the table stays bounded by the live speculation window rather than growing with batch history. `queue` leads the PK so listing is a PK-prefix scan and the table is shardable by queue — an access pattern that ports cleanly to a key-value store (queue = partition key, batch_id = sort key), unlike a server-maintained secondary index over a mutable non-key column. Membership is best-effort, not an exact mirror of batch state, and is maintained without transactions: - Create writes the membership row before the batch row. This ordering is required for correctness: whenever a batch row is visible to a reader its membership row is already present, so a concurrent ListActive can never miss an active batch. INSERT IGNORE keeps the membership write idempotent across retries. - If the batch insert then fails, Create deliberately leaves the membership row in place. A returned error does not prove the row was not written (an ambiguous failure can commit the batch row and still return an error), so deleting would risk permanently orphaning a live, non-terminal batch from ListActive. A dangling membership is the safe direction. - ListActive resolves each member by primary key: a terminal batch's membership is best-effort removed (race-free — a terminal batch is fully committed and its id is never reused); a missing batch is skipped but NOT removed (it may belong to an in-flight Create that has written its membership but not yet its batch row). Cleanup failures are swallowed so a read never fails on index maintenance, and terminal-state writers (merge, speculate, dlq) need not touch the index. Genuinely dangling rows (failed/crashed creates) and batches stuck in a non-terminal state are left for a future reconcile/prune job, documented in schema/README.md. Integration tests cover the self-heal and membership invariants: - TestActiveBatch_SelfHealsTerminalMembership - TestActiveBatch_SkipsDanglingMembershipWithoutDeleting - TestActiveBatch_CreateKeepsMembershipOnDuplicate - TestActiveBatch_CreateKeepsMembershipOnFailedInsert Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 3cebc2e commit 7c00f7b

13 files changed

Lines changed: 352 additions & 86 deletions

File tree

submitqueue/extension/storage/batch_store.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ type BatchStore interface {
4141
// Version arithmetic is owned by the caller; the store performs a pure conditional write.
4242
UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error
4343

44-
// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states.
45-
GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error)
44+
// ListActive returns all active (non-terminal) batches in the given queue.
45+
// "Active" means the batch's persisted state is not terminal — see
46+
// entity.BatchState.IsTerminal. Callers that need a narrower set filter the
47+
// result by state in memory.
48+
//
49+
// The store tracks active membership internally (added on Create, self-healed
50+
// on read) so this is a key-prefix read rather than a secondary-index query;
51+
// see the implementation and extension/storage/mysql/schema/README.md.
52+
ListActive(ctx context.Context, queue string) ([]entity.Batch, error)
4653
}

submitqueue/extension/storage/mock/batch_store_mock.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

submitqueue/extension/storage/mysql/batch_store.go

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"encoding/json"
2121
"errors"
2222
"fmt"
23-
"strings"
2423

2524
"github.com/go-sql-driver/mysql"
2625
"github.com/uber-go/tally"
@@ -87,6 +86,17 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err
8786
return fmt.Errorf("failed to marshal dependencies=%v id=%s for Create batch entity: %w", batch.Dependencies, batch.ID, err)
8887
}
8988

89+
// Write membership before the batch row (no transaction) so a batch row is
90+
// never visible to ListActive without its membership. INSERT IGNORE makes this
91+
// idempotent across retries. A create that fails after this point leaves a
92+
// dangling row, which ListActive skips and the reconcile job reclaims.
93+
if _, err = s.db.ExecContext(ctx,
94+
"INSERT IGNORE INTO active_batch (queue, batch_id) VALUES (?, ?)",
95+
batch.Queue, batch.ID,
96+
); err != nil {
97+
return fmt.Errorf("failed to insert active_batch membership for batch entity id=%s queue=%s: %w", batch.ID, batch.Queue, err)
98+
}
99+
90100
_, err = s.db.ExecContext(ctx,
91101
"INSERT INTO batch (id, queue, contains, dependencies, score, state, version) VALUES (?, ?, ?, ?, ?, ?, ?)",
92102
batch.ID, batch.Queue, containsJSON, dependenciesJSON, batch.Score, batch.State, batch.Version,
@@ -96,6 +106,10 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err
96106
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
97107
return fmt.Errorf("batch entity id=%s: %w", batch.ID, storage.ErrAlreadyExists)
98108
}
109+
// Leave the membership row in place: a returned error doesn't prove the
110+
// batch row was not written (an ambiguous failure can commit it and still
111+
// error), so deleting could permanently hide a live batch from ListActive.
112+
// A dangling row is the safe direction.
99113
return fmt.Errorf("failed to insert batch entity id=%s: %w", batch.ID, err)
100114
}
101115

@@ -174,52 +188,81 @@ func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, oldVers
174188
return nil
175189
}
176190

177-
// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states.
178-
func (s *batchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) (ret []entity.Batch, retErr error) {
179-
op := metrics.Begin(s.scope, "get_by_queue_and_states")
191+
// ListActive returns all active (non-terminal) batches in the given queue.
192+
//
193+
// Membership is tracked in active_batch (queue leads the PK), so listing is a
194+
// PK-prefix scan that ports cleanly to a key-value store. Each member is fetched
195+
// by primary key: a terminal batch's membership is best-effort removed (race-free,
196+
// its id is never reused), while a missing batch is skipped but NOT removed (it
197+
// may belong to an in-flight Create that hasn't written its batch row yet).
198+
func (s *batchStore) ListActive(ctx context.Context, queue string) (ret []entity.Batch, retErr error) {
199+
op := metrics.Begin(s.scope, "list_active")
180200
defer func() { op.Complete(retErr) }()
181201

182-
if len(states) == 0 {
183-
return nil, nil
202+
// Read all membership rows and release the connection before resolving each
203+
// batch, since Get issues its own query.
204+
ids, err := s.activeBatchIDs(ctx, queue)
205+
if err != nil {
206+
return nil, err
184207
}
185208

186-
query := "SELECT id, queue, contains, dependencies, score, state, version FROM batch WHERE queue = ? AND state IN (?" + strings.Repeat(", ?", len(states)-1) + ")"
187-
188-
args := make([]any, 1+len(states))
189-
args[0] = queue
190-
for i, state := range states {
191-
args[i+1] = state
209+
var results []entity.Batch
210+
for _, id := range ids {
211+
batch, err := s.Get(ctx, id)
212+
if err != nil {
213+
if storage.IsNotFound(err) {
214+
// Missing batch: either an in-flight Create or a dangling row. We
215+
// can't tell them apart, so skip without deleting.
216+
continue
217+
}
218+
return nil, fmt.Errorf("failed to get active batch id=%q queue=%q: %w", id, queue, err)
219+
}
220+
if batch.State.IsTerminal() {
221+
// Stale membership: the batch has finished. Race-free to remove since
222+
// its id is never reused.
223+
s.removeActive(ctx, queue, id)
224+
continue
225+
}
226+
results = append(results, batch)
192227
}
193228

194-
rows, err := s.db.QueryContext(ctx, query, args...)
229+
return results, nil
230+
}
231+
232+
// activeBatchIDs reads the batch IDs recorded as active for the queue, owning the
233+
// result set's lifecycle so the caller can resolve each batch after it's closed.
234+
func (s *batchStore) activeBatchIDs(ctx context.Context, queue string) ([]string, error) {
235+
rows, err := s.db.QueryContext(ctx,
236+
"SELECT batch_id FROM active_batch WHERE queue = ?",
237+
queue,
238+
)
195239
if err != nil {
196-
return nil, fmt.Errorf("failed to query batches by queue=%q states=%v from the database: %w", queue, states, err)
240+
return nil, fmt.Errorf("failed to query active batch membership for queue=%q: %w", queue, err)
197241
}
198242
defer rows.Close()
199243

200-
var results []entity.Batch
244+
var ids []string
201245
for rows.Next() {
202-
var batch entity.Batch
203-
var containsJSON []byte
204-
var dependenciesJSON []byte
205-
206-
if err := rows.Scan(&batch.ID, &batch.Queue, &containsJSON, &dependenciesJSON, &batch.Score, &batch.State, &batch.Version); err != nil {
207-
return nil, fmt.Errorf("failed to scan batch entity by queue=%q states=%v from the database: %w", queue, states, err)
208-
}
209-
210-
if err := json.Unmarshal(containsJSON, &batch.Contains); err != nil {
211-
return nil, fmt.Errorf("failed to unmarshal contains for batch entity id=%s from the database: %w", batch.ID, err)
212-
}
213-
214-
if err := json.Unmarshal(dependenciesJSON, &batch.Dependencies); err != nil {
215-
return nil, fmt.Errorf("failed to unmarshal dependencies for batch entity id=%s from the database: %w", batch.ID, err)
246+
var id string
247+
if err := rows.Scan(&id); err != nil {
248+
return nil, fmt.Errorf("failed to scan active batch membership for queue=%q: %w", queue, err)
216249
}
217-
218-
results = append(results, batch)
250+
ids = append(ids, id)
219251
}
220252
if err := rows.Err(); err != nil {
221-
return nil, fmt.Errorf("failed to iterate batches by queue=%q states=%v from the database: %w", queue, states, err)
253+
return nil, fmt.Errorf("failed to iterate active batch membership for queue=%q: %w", queue, err)
222254
}
255+
return ids, nil
256+
}
223257

224-
return results, nil
258+
// removeActive best-effort deletes a single active_batch membership row, used by
259+
// ListActive to reclaim terminal batches' memberships. Failures are counted and
260+
// ignored — the row is harmless and the next read retries.
261+
func (s *batchStore) removeActive(ctx context.Context, queue, batchID string) {
262+
if _, err := s.db.ExecContext(ctx,
263+
"DELETE FROM active_batch WHERE queue = ? AND batch_id = ?",
264+
queue, batchID,
265+
); err != nil {
266+
metrics.NamedCounter(s.scope, "list_active", "self_heal_errors", 1)
267+
}
225268
}

submitqueue/extension/storage/mysql/schema/README.md

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,23 @@
22

33
## batch table
44

5-
### Secondary index: `idx_queue_state (queue, state)`
5+
The `batch` table is reachable only by its primary key (`id`). It carries no secondary index — every access pattern is expressed as a primary-key get or as a key-prefix scan over a companion membership table (see `active_batch` below). This keeps the access patterns portable to a key-value / document store, where a server-maintained secondary index over a mutable, non-key column (such as `state`) is not a primitive every backend offers cheaply.
66

7-
The `batch` table has a composite secondary index on `(queue, state)`. This index supports the `GetByQueueAndStates` query, which retrieves batches filtered by queue and one or more states. Without this index, the query would require a full table scan.
7+
## active_batch table
88

9-
#### Trade-offs
9+
`active_batch` is the membership index that answers "which batches in this queue are still active?" — the only queue-scoped query the pipeline needs (the batch controller uses it to find conflict dependencies; the cancel controller uses it to find the batch holding a request). A row is intended to exist per non-terminal batch, so the table stays bounded by the live speculation window rather than full batch history. The correspondence is best-effort, not exact: readers treat membership as a hint and resolve each batch by primary key — see *Maintenance and self-healing* below.
1010

11-
- **Write overhead**: Every `INSERT` and `UPDATE` to the `batch` table must also update the secondary index, adding latency to write operations.
12-
- **Storage cost**: The index consumes additional disk space proportional to the number of rows in the table.
13-
- **Lock contention**: Under high write concurrency, index maintenance can increase lock contention on the affected index pages.
11+
`queue` leads the composite primary key `(queue, batch_id)`, so listing a queue's active batches is a primary-key-prefix scan and the table is shardable by queue. On a key-value store the same shape maps directly onto a partition key (`queue`) and sort key (`batch_id`) with no secondary index.
1412

15-
#### Future: Prune job
13+
### Maintenance and self-healing
1614

17-
As the `batch` table grows, the secondary index will grow with it, increasing storage costs and degrading write performance. To mitigate this, a prune job should be introduced to periodically delete batches in terminal states (`succeeded`, `failed`, `cancelled`) that are older than a configurable retention period. This keeps the table and its indexes bounded in size, ensuring consistent query and write performance over time.
15+
`BatchStore.Create` writes the membership row before the batch row, so a batch row is never visible to `ListActive` without its membership. If the batch insert then fails, `Create` leaves the membership row in place: a returned error doesn't prove the row wasn't written (an ambiguous failure can commit it and still error), so deleting could permanently hide a live batch. A dangling row is the safe direction.
16+
17+
On read, `ListActive` resolves each member by primary key. A **terminal** batch's membership is best-effort removed (race-free — its id is never reused). A **missing** batch is skipped but not removed, since it may belong to an in-flight `Create` that hasn't written its batch row yet. Cleanup failures are swallowed, so reads never fail on index maintenance and terminal-state writers (merge, speculate, dlq) never touch the index. Because the two writes are independent (no transaction), the design tolerates partial failure via idempotent retries and read-time reconciliation.
18+
19+
### Future: prune / reconcile job
20+
21+
Read-time reconciliation only removes terminal memberships, so two kinds of stale row need a periodic sweep: dangling memberships whose batch never landed (a failed or crashed create), and memberships of batches that are stuck in a non-terminal state (e.g. an orphan stuck in `created` after a mid-process failure). A reconcile job should sweep both, keeping the table bounded independently of read traffic.
1822

1923
## change table
2024

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- active_batch is the membership index for "active (non-terminal) batches in a
2+
-- queue", keeping the set bounded by the live speculation window rather than batch
3+
-- history. queue leads the PK so listing is a PK-prefix scan (shardable by queue;
4+
-- portable to a key-value store with queue = partition key, batch_id = sort key).
5+
-- Membership is best-effort: it is added on Create (before the batch row) and
6+
-- removed on read by ListActive once a batch is terminal. A reconcile job reclaims
7+
-- rows left dangling by a failed or crashed create. See schema/README.md.
8+
CREATE TABLE IF NOT EXISTS active_batch (
9+
queue VARCHAR(255) NOT NULL,
10+
batch_id VARCHAR(255) NOT NULL,
11+
PRIMARY KEY (queue, batch_id)
12+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

submitqueue/extension/storage/mysql/schema/batch.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,5 @@ CREATE TABLE IF NOT EXISTS batch (
66
score DOUBLE NOT NULL,
77
state VARCHAR(255) NOT NUll,
88
version INT NOT NULL,
9-
PRIMARY KEY (id),
10-
INDEX idx_queue_state (queue, state)
9+
PRIMARY KEY (id)
1110
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

submitqueue/orchestrator/controller/batch/batch.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,22 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
133133
Version: 1,
134134
}
135135

136-
// Get active batches for this queue and ask the conflict analyzer which
137-
// of them the new batch must serialize behind. The dependency set drives
138-
// the speculation graph downstream.
139-
activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{
140-
entity.BatchStateCreated,
141-
entity.BatchStateSpeculating,
142-
entity.BatchStateMerging,
143-
})
136+
// Ask the conflict analyzer which active batches the new batch must serialize
137+
// behind. ListActive returns all non-terminal batches; we narrow to
138+
// Created/Speculating/Merging so the analyzer only sees batches that can still
139+
// acquire new conflicts.
140+
allActive, err := c.store.GetBatchStore().ListActive(ctx, request.Queue)
144141
if err != nil {
145142
metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1)
146143
return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err)
147144
}
145+
activeBatches := make([]entity.Batch, 0, len(allActive))
146+
for _, b := range allActive {
147+
switch b.State {
148+
case entity.BatchStateCreated, entity.BatchStateSpeculating, entity.BatchStateMerging:
149+
activeBatches = append(activeBatches, b)
150+
}
151+
}
148152

149153
// Dedupe by batch ID since a single (analyzed, in-flight) pair may be
150154
// reported with multiple Conflict entries when different conflict types

submitqueue/orchestrator/controller/batch/batch_test.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M
7878

7979
if mockStorage == nil {
8080
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
81-
mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
81+
mockBatchStore.EXPECT().ListActive(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
8282
mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
8383

8484
mockReqStore := storagemock.NewMockRequestStore(ctrl)
@@ -209,14 +209,21 @@ func TestController_Process_WithDependencies(t *testing.T) {
209209
Version: 1,
210210
}
211211

212-
// Set up storage with active batches to become dependencies.
212+
// Set up storage with active batches to become dependencies. ListActive
213+
// returns every non-terminal batch; the controller filters to
214+
// Created/Speculating/Merging in memory. The Scored and Cancelling batches
215+
// below must be excluded — note no BatchDependent expectations are registered
216+
// for them, so the default all.New() analyzer would fail the test on an
217+
// unexpected mock call if the filter let them through.
213218
activeBatches := []entity.Batch{
214219
{ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateCreated, Version: 1},
215220
{ID: "test-queue/batch/2", Queue: "test-queue", State: entity.BatchStateSpeculating, Version: 2},
221+
{ID: "test-queue/batch/3", Queue: "test-queue", State: entity.BatchStateScored, Version: 1},
222+
{ID: "test-queue/batch/4", Queue: "test-queue", State: entity.BatchStateCancelling, Version: 1},
216223
}
217224

218225
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
219-
mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil)
226+
mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(activeBatches, nil)
220227
mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)
221228

222229
mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl)
@@ -268,7 +275,7 @@ func TestController_Process_AnalyzerSelectsSubset(t *testing.T) {
268275
}
269276

270277
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
271-
mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil)
278+
mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(activeBatches, nil)
272279
mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)
273280

274281
mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl)
@@ -314,7 +321,7 @@ func TestController_Process_AnalyzerFailure(t *testing.T) {
314321
request := testRequest()
315322

316323
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
317-
mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil)
324+
mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil)
318325

319326
mockReqStore := storagemock.NewMockRequestStore(ctrl)
320327
mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil)
@@ -410,7 +417,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) {
410417
request := testRequest()
411418

412419
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
413-
mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil)
420+
mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil)
414421
// Create must NOT be called — gomock fails if it is.
415422

416423
mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl)
@@ -463,7 +470,7 @@ func TestController_Process_CASUnexpectedErrorPropagates(t *testing.T) {
463470
request := testRequest()
464471

465472
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
466-
mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil)
473+
mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil)
467474
// Create must NOT be called — gomock fails if it is.
468475

469476
mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl)
@@ -509,7 +516,7 @@ func TestController_Process_RecoveryAfterPriorCAS(t *testing.T) {
509516
request.Version = 2 // prior attempt bumped from 1 → 2
510517

511518
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
512-
mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil)
519+
mockBatchStore.EXPECT().ListActive(gomock.Any(), "test-queue").Return(nil, nil)
513520
mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)
514521

515522
mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl)

0 commit comments

Comments
 (0)