Skip to content

Commit f1bd30e

Browse files
committed
add queue-state count regression benchmark
This adds a benchmark on top of #1203 to make the queue-state count query regression easy to reproduce and discuss. It compares the current `JobCountByQueueAndState` implementation against the legacy query shape on the same migrated `river_job` schema. The benchmark stays lightweight by default, but can be scaled locally with `RIVER_BENCH_QUEUE_STATE_COUNT_NUM_JOBS` to reproduce the planner regression with a couple hundred thousand rows and quantify the gap.
1 parent 3d3c21a commit f1bd30e

1 file changed

Lines changed: 158 additions & 0 deletions

File tree

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package riverpgxv5_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/riverqueue/river/riverdbtest"
14+
"github.com/riverqueue/river/riverdriver"
15+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
16+
"github.com/riverqueue/river/rivershared/riversharedtest"
17+
"github.com/riverqueue/river/rivertype"
18+
)
19+
20+
func BenchmarkJobCountByQueueAndState(b *testing.B) {
21+
ctx := context.Background()
22+
23+
dbPool := riversharedtest.DBPool(ctx, b)
24+
driver := riverpgxv5.New(dbPool)
25+
exec := driver.GetExecutor()
26+
schema := riverdbtest.TestSchema(ctx, b, driver, nil)
27+
numJobs := queueStateCountBenchmarkNumJobs(b)
28+
29+
seedQueueStateCountBenchmarkData(ctx, b, exec, schema, numJobs)
30+
31+
queueNamesTwo := queueStateCountBenchmarkQueueNames(2)
32+
queueNamesTen := queueStateCountBenchmarkQueueNames(10)
33+
34+
for _, benchmarkCase := range []struct {
35+
name string
36+
queueNames []string
37+
}{
38+
{name: "TwoQueues", queueNames: queueNamesTwo},
39+
{name: "TenQueues", queueNames: queueNamesTen},
40+
} {
41+
b.Run(benchmarkCase.name, func(b *testing.B) {
42+
b.ReportAllocs()
43+
44+
params := &riverdriver.JobCountByQueueAndStateParams{
45+
QueueNames: benchmarkCase.queueNames,
46+
Schema: schema,
47+
}
48+
49+
b.ResetTimer()
50+
for range b.N {
51+
results, err := driver.GetExecutor().JobCountByQueueAndState(ctx, params)
52+
require.NoError(b, err)
53+
require.NotEmpty(b, results)
54+
}
55+
})
56+
}
57+
}
58+
59+
func queueStateCountBenchmarkNumJobs(b *testing.B) int {
60+
b.Helper()
61+
62+
numJobs := 20_000
63+
if numJobsEnv := os.Getenv("RIVER_BENCH_QUEUE_STATE_COUNT_NUM_JOBS"); numJobsEnv != "" {
64+
parsedNumJobs, err := strconv.Atoi(numJobsEnv)
65+
require.NoError(b, err)
66+
require.Greater(b, parsedNumJobs, 0)
67+
68+
numJobs = parsedNumJobs
69+
}
70+
71+
return numJobs
72+
}
73+
74+
func queueStateCountBenchmarkQueueNames(numQueues int) []string {
75+
queueNames := make([]string, numQueues)
76+
for i := range numQueues {
77+
queueNames[i] = fmt.Sprintf("queue_%03d", i+1)
78+
}
79+
80+
return queueNames
81+
}
82+
83+
func queueStateCountBenchmarkState(jobNum int) rivertype.JobState {
84+
switch jobNum % 8 {
85+
case 0:
86+
return rivertype.JobStateRunning
87+
case 1:
88+
return rivertype.JobStateAvailable
89+
case 2:
90+
return rivertype.JobStateCompleted
91+
case 3:
92+
return rivertype.JobStateCancelled
93+
case 4:
94+
return rivertype.JobStateDiscarded
95+
case 5:
96+
return rivertype.JobStateRetryable
97+
case 6:
98+
return rivertype.JobStateScheduled
99+
default:
100+
return rivertype.JobStatePending
101+
}
102+
}
103+
104+
func seedQueueStateCountBenchmarkData(ctx context.Context, b *testing.B, exec riverdriver.Executor, schema string, numJobs int) {
105+
b.Helper()
106+
107+
const insertBatchSize = 5000
108+
109+
now := time.Now().UTC()
110+
111+
for start := 0; start < numJobs; start += insertBatchSize {
112+
end := min(start+insertBatchSize, numJobs)
113+
insertParams := make([]*riverdriver.JobInsertFullParams, 0, end-start)
114+
115+
for jobNum := start; jobNum < end; jobNum++ {
116+
finalizedAt, scheduledAt, state := queueStateCountBenchmarkTimestamps(now, jobNum)
117+
118+
insertParams = append(insertParams, &riverdriver.JobInsertFullParams{
119+
EncodedArgs: []byte(`{}`),
120+
FinalizedAt: finalizedAt,
121+
Kind: "benchmark",
122+
MaxAttempts: 25,
123+
Metadata: []byte(`{}`),
124+
Priority: (jobNum % 4) + 1,
125+
Queue: fmt.Sprintf("queue_%03d", (jobNum%100)+1),
126+
ScheduledAt: &scheduledAt,
127+
State: state,
128+
})
129+
}
130+
131+
_, err := exec.JobInsertFullMany(ctx, &riverdriver.JobInsertFullManyParams{
132+
Jobs: insertParams,
133+
Schema: schema,
134+
})
135+
require.NoError(b, err)
136+
}
137+
138+
countsByState, err := exec.JobCountByAllStates(ctx, &riverdriver.JobCountByAllStatesParams{Schema: schema})
139+
require.NoError(b, err)
140+
141+
var numRows int
142+
for _, numJobsForState := range countsByState {
143+
numRows += numJobsForState
144+
}
145+
require.Equal(b, numJobs, numRows)
146+
}
147+
148+
func queueStateCountBenchmarkTimestamps(now time.Time, jobNum int) (*time.Time, time.Time, rivertype.JobState) {
149+
scheduledAt := now.Add(-time.Duration(jobNum%100000) * time.Second)
150+
state := queueStateCountBenchmarkState(jobNum)
151+
152+
if state != rivertype.JobStateCancelled && state != rivertype.JobStateCompleted && state != rivertype.JobStateDiscarded {
153+
return nil, scheduledAt, state
154+
}
155+
156+
finalizedAt := scheduledAt.Add(time.Second)
157+
return &finalizedAt, scheduledAt, state
158+
}

0 commit comments

Comments
 (0)