Skip to content

Commit f8f0bc0

Browse files
committed
add queue-state count regression benchmark
This adds a benchmark on top of #1203 to make the queue-state count query regression easy to reproduce and discuss. It compares the current `JobCountByQueueAndState` implementation against the legacy query shape on the same migrated `river_job` schema. The benchmark stays lightweight by default, but can be scaled locally with `RIVER_BENCH_QUEUE_STATE_COUNT_NUM_JOBS` to reproduce the planner regression with a couple hundred thousand rows and quantify the gap.
1 parent 3d3c21a commit f8f0bc0

1 file changed

Lines changed: 209 additions & 0 deletions

File tree

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package riverpgxv5_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"strconv"
8+
"testing"
9+
10+
"github.com/jackc/pgx/v5/pgxpool"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/riverqueue/river/riverdbtest"
14+
"github.com/riverqueue/river/riverdriver"
15+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
16+
"github.com/riverqueue/river/rivershared/riversharedtest"
17+
)
18+
19+
const (
20+
queueStateCountBenchmarkNumJobsDefault = 20000
21+
queueStateCountBenchmarkNumJobsEnvVar = "RIVER_BENCH_QUEUE_STATE_COUNT_NUM_JOBS"
22+
)
23+
24+
func BenchmarkJobCountByQueueAndState(b *testing.B) {
25+
ctx := context.Background()
26+
27+
dbPool := riversharedtest.DBPool(ctx, b)
28+
driver := riverpgxv5.New(dbPool)
29+
schema := riverdbtest.TestSchema(ctx, b, driver, nil)
30+
numJobs := queueStateCountBenchmarkNumJobs(b)
31+
32+
seedQueueStateCountBenchmarkData(ctx, b, dbPool, schema, numJobs)
33+
34+
queueNamesTwo := queueStateCountBenchmarkQueueNames(2)
35+
queueNamesTen := queueStateCountBenchmarkQueueNames(10)
36+
37+
for _, benchmarkCase := range []struct {
38+
name string
39+
queueNames []string
40+
}{
41+
{name: "TwoQueues", queueNames: queueNamesTwo},
42+
{name: "TenQueues", queueNames: queueNamesTen},
43+
} {
44+
b.Run("Current/"+benchmarkCase.name, func(b *testing.B) {
45+
b.ReportAllocs()
46+
47+
params := &riverdriver.JobCountByQueueAndStateParams{
48+
QueueNames: benchmarkCase.queueNames,
49+
Schema: schema,
50+
}
51+
52+
b.ResetTimer()
53+
for range b.N {
54+
results, err := driver.GetExecutor().JobCountByQueueAndState(ctx, params)
55+
require.NoError(b, err)
56+
require.NotEmpty(b, results)
57+
}
58+
})
59+
60+
b.Run("Legacy/"+benchmarkCase.name, func(b *testing.B) {
61+
b.ReportAllocs()
62+
63+
query := legacyJobCountByQueueAndStateQuery(schema)
64+
65+
b.ResetTimer()
66+
for range b.N {
67+
rows, err := dbPool.Query(ctx, query, benchmarkCase.queueNames)
68+
require.NoError(b, err)
69+
70+
var numRows int
71+
for rows.Next() {
72+
var (
73+
countAvailable int64
74+
countRunning int64
75+
queue string
76+
)
77+
78+
require.NoError(b, rows.Scan(&queue, &countAvailable, &countRunning))
79+
numRows++
80+
}
81+
82+
rows.Close()
83+
require.NoError(b, rows.Err())
84+
require.Equal(b, len(benchmarkCase.queueNames), numRows)
85+
}
86+
})
87+
}
88+
}
89+
90+
func legacyJobCountByQueueAndStateQuery(schema string) string {
91+
return fmt.Sprintf(`
92+
WITH all_queues AS (
93+
SELECT DISTINCT unnest($1::text[])::text AS queue
94+
),
95+
96+
running_job_counts AS (
97+
SELECT
98+
queue,
99+
COUNT(*) AS count
100+
FROM %s.river_job
101+
WHERE queue = ANY($1::text[])
102+
AND state = 'running'
103+
GROUP BY queue
104+
),
105+
106+
available_job_counts AS (
107+
SELECT
108+
queue,
109+
COUNT(*) AS count
110+
FROM %s.river_job
111+
WHERE queue = ANY($1::text[])
112+
AND state = 'available'
113+
GROUP BY queue
114+
)
115+
116+
SELECT
117+
all_queues.queue,
118+
COALESCE(available_job_counts.count, 0) AS count_available,
119+
COALESCE(running_job_counts.count, 0) AS count_running
120+
FROM
121+
all_queues
122+
LEFT JOIN
123+
running_job_counts ON all_queues.queue = running_job_counts.queue
124+
LEFT JOIN
125+
available_job_counts ON all_queues.queue = available_job_counts.queue
126+
ORDER BY all_queues.queue ASC
127+
`, schema, schema)
128+
}
129+
130+
func queueStateCountBenchmarkNumJobs(b *testing.B) int {
131+
b.Helper()
132+
133+
numJobs := queueStateCountBenchmarkNumJobsDefault
134+
if numJobsEnv := os.Getenv(queueStateCountBenchmarkNumJobsEnvVar); numJobsEnv != "" {
135+
parsedNumJobs, err := strconv.Atoi(numJobsEnv)
136+
require.NoError(b, err)
137+
require.Greater(b, parsedNumJobs, 0)
138+
139+
numJobs = parsedNumJobs
140+
}
141+
142+
return numJobs
143+
}
144+
145+
func queueStateCountBenchmarkQueueNames(numQueues int) []string {
146+
queueNames := make([]string, numQueues)
147+
for i := range numQueues {
148+
queueNames[i] = fmt.Sprintf("queue_%03d", i+1)
149+
}
150+
151+
return queueNames
152+
}
153+
154+
func seedQueueStateCountBenchmarkData(ctx context.Context, b *testing.B, dbPool *pgxpool.Pool, schema string, numJobs int) {
155+
b.Helper()
156+
157+
query := fmt.Sprintf(`
158+
WITH generated_jobs AS (
159+
SELECT
160+
CASE gs %% 8
161+
WHEN 0 THEN 'running'
162+
WHEN 1 THEN 'available'
163+
WHEN 2 THEN 'completed'
164+
WHEN 3 THEN 'cancelled'
165+
WHEN 4 THEN 'discarded'
166+
WHEN 5 THEN 'retryable'
167+
WHEN 6 THEN 'scheduled'
168+
ELSE 'pending'
169+
END AS state,
170+
now() - ((gs %% 100000)::text || ' seconds')::interval AS scheduled_at,
171+
'queue_' || lpad(((gs %% 100) + 1)::text, 3, '0') AS queue
172+
FROM generate_series(1, %d) AS gs
173+
)
174+
INSERT INTO %s.river_job (
175+
args,
176+
finalized_at,
177+
kind,
178+
max_attempts,
179+
metadata,
180+
queue,
181+
scheduled_at,
182+
state
183+
)
184+
SELECT
185+
'{}'::jsonb,
186+
CASE
187+
WHEN state IN ('cancelled', 'completed', 'discarded') THEN scheduled_at + interval '1 second'
188+
ELSE NULL
189+
END AS finalized_at,
190+
'benchmark',
191+
25,
192+
'{}'::jsonb,
193+
queue,
194+
scheduled_at,
195+
state::%s.river_job_state
196+
FROM generated_jobs;
197+
198+
ANALYZE %s.river_job;
199+
`, numJobs, schema, schema, schema)
200+
201+
_, err := dbPool.Exec(ctx, query)
202+
require.NoError(b, err)
203+
204+
row := dbPool.QueryRow(ctx, "SELECT count(*) FROM "+schema+".river_job")
205+
206+
var numRows int
207+
require.NoError(b, row.Scan(&numRows))
208+
require.Equal(b, numJobs, numRows)
209+
}

0 commit comments

Comments
 (0)