Skip to content

Commit b7694fa

Browse files
committed
Add hook HookQueueStateCount + read middleware as hooks and vice versa
Here, add a new hook called `HookQueueStateCount` which gets invoked to produce job queue count statistics. We do this by adding a new maintenance service which like other maintenance services, runs only on the leader, so we only have one client performing counts at any given time. Furthermore, in order to not introduce a potential operational problem without opt-in from River users, the counts only run if a `HookQueueStateCount` hook/middleware has been added to the client. The reason we do all this to to implement a feature requested by one of users: for `otelriver` in contrib to be able to emit queue count metrics, which seems like a pretty reasonable ask for the package to be able to do, and something that every River user would likely want access to in their ops charts. A slight oddity, but which I think is _probably_ okay, is that the new hook ideally stays a hook, but the existing `otelriver` middleware is a middleware. It'd be nice not to have to put `otelriver.Middleware` into both a client's `Hooks` and `Middleware` configuration, so we modify client to allow for hooks that middleware and middleware which are hooks. This lets `otelriver.Middleware` continue doing what it was already doing, but also to start producing new counts as a hook.
1 parent 0143b85 commit b7694fa

File tree

18 files changed

+597
-256
lines changed

18 files changed

+597
-256
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added new `HookQueueStateCount` hook which is run by a River leader to generate queue count statistics. [PR #1203](https://github.com/riverqueue/river/pull/1203).
13+
- Middleware that implements `rivertype.Hook` can be looked up as hooks even if passed into `Config.Middleware`. Similarly, hooks that implement `rivertype.Middleware` can be looked up as middleware even if passed into `Config.Hooks`. [PR #1203](https://github.com/riverqueue/river/pull/1203).
14+
1015
## [0.34.0] - 2026-04-08
1116

1217
### Added

client.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log/slog"
1010
"os"
1111
"regexp"
12+
"slices"
1213
"strings"
1314
"sync"
1415
"time"
@@ -657,6 +658,7 @@ type clientTestSignals struct {
657658
periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals
658659
queueCleaner *maintenance.QueueCleanerTestSignals
659660
queueMaintainerLeader *maintenance.QueueMaintainerLeaderTestSignals
661+
queueStateCounter *maintenance.QueueStateCounterTestSignals
660662
reindexer *maintenance.ReindexerTestSignals
661663
}
662664

@@ -679,6 +681,9 @@ func (ts *clientTestSignals) Init(tb testutil.TestingTB) {
679681
if ts.queueMaintainerLeader != nil {
680682
ts.queueMaintainerLeader.Init(tb)
681683
}
684+
if ts.queueStateCounter != nil {
685+
ts.queueStateCounter.Init(tb)
686+
}
682687
if ts.reindexer != nil {
683688
ts.reindexer.Init(tb)
684689
}
@@ -759,7 +764,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
759764
config: config,
760765
driver: driver,
761766
hookLookupByJob: hooklookup.NewJobHookLookup(),
762-
hookLookupGlobal: hooklookup.NewHookLookup(config.Hooks),
767+
hookLookupGlobal: nil, // initialized below after cross-referencing with middleware
763768
producersByQueueName: make(map[string]*producer),
764769
testSignals: clientTestSignals{},
765770
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
@@ -780,9 +785,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
780785
// the more abstract config.Middleware for middleware are set, but not both,
781786
// so in practice we never append all three of these to each other.
782787
{
783-
middleware := config.Middleware
788+
middlewares := config.Middleware
784789
for _, jobInsertMiddleware := range config.JobInsertMiddleware {
785-
middleware = append(middleware, jobInsertMiddleware)
790+
middlewares = append(middlewares, jobInsertMiddleware)
786791
}
787792
outerLoop:
788793
for _, workerMiddleware := range config.WorkerMiddleware {
@@ -798,16 +803,44 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
798803
}
799804
}
800805

801-
middleware = append(middleware, workerMiddleware)
806+
middlewares = append(middlewares, workerMiddleware)
802807
}
803808

804-
for _, middleware := range middleware {
809+
for _, middleware := range middlewares {
805810
if withBaseService, ok := middleware.(baseservice.WithBaseService); ok {
806811
baseservice.Init(archetype, withBaseService)
807812
}
808813
}
809814

810-
client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middleware)
815+
// Cross-reference hooks and middleware: any middleware that also
816+
// implements Hook is added to hooks, and any hook that also implements
817+
// Middleware is added to middleware. Deduplication prevents double
818+
// registration when the same struct is passed to both Config.Hooks and
819+
// Config.Middleware.
820+
hooks := config.Hooks
821+
822+
for _, middleware := range middlewares {
823+
if hook, ok := middleware.(rivertype.Hook); ok {
824+
// Only add if this middleware isn't already in hooks (it may
825+
// have been passed to both config properties).
826+
alreadyInHooks := slices.Contains(hooks, hook)
827+
if !alreadyInHooks {
828+
hooks = append(hooks, hook)
829+
}
830+
}
831+
}
832+
833+
for _, hook := range config.Hooks {
834+
if middleware, ok := hook.(rivertype.Middleware); ok {
835+
alreadyInMiddleware := slices.Contains(middlewares, middleware)
836+
if !alreadyInMiddleware {
837+
middlewares = append(middlewares, middleware)
838+
}
839+
}
840+
}
841+
842+
client.hookLookupGlobal = hooklookup.NewHookLookup(hooks)
843+
client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middlewares)
811844
}
812845

813846
pluginDriver, _ := driver.(driverPlugin[TTx])
@@ -961,6 +994,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
961994
client.testSignals.queueCleaner = &queueCleaner.TestSignals
962995
}
963996

997+
{
998+
queueStateCounter := maintenance.NewQueueStateCounter(archetype, &maintenance.QueueStateCounterConfig{
999+
HookLookupGlobal: client.hookLookupGlobal,
1000+
QueueNames: maputil.Keys(config.Queues),
1001+
Schema: config.Schema,
1002+
}, driver.GetExecutor())
1003+
maintenanceServices = append(maintenanceServices, queueStateCounter)
1004+
client.testSignals.queueStateCounter = &queueStateCounter.TestSignals
1005+
}
1006+
9641007
{
9651008
var scheduleFunc func(time.Time) time.Time
9661009
if config.ReindexerSchedule != nil {

client_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/tidwall/sjson"
2525

2626
"github.com/riverqueue/river/internal/dbunique"
27+
"github.com/riverqueue/river/internal/hooklookup"
2728
"github.com/riverqueue/river/internal/jobexecutor"
2829
"github.com/riverqueue/river/internal/maintenance"
2930
"github.com/riverqueue/river/internal/middlewarelookup"
@@ -7626,6 +7627,46 @@ func Test_NewClient_Validations(t *testing.T) {
76267627
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
76277628
},
76287629
},
7630+
{
7631+
name: "Middleware implementing Hook is available in hook lookup",
7632+
configFunc: func(config *Config) {
7633+
config.Middleware = []rivertype.Middleware{&middlewareWithHook{}}
7634+
},
7635+
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
7636+
require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1)
7637+
},
7638+
},
7639+
{
7640+
name: "Hook implementing Middleware is available in middleware lookup",
7641+
configFunc: func(config *Config) {
7642+
config.Hooks = []rivertype.Hook{&hookWithMiddleware{}}
7643+
},
7644+
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
7645+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
7646+
},
7647+
},
7648+
{
7649+
name: "Hook implementing Middleware in both configs is deduplicated in middleware lookup",
7650+
configFunc: func(config *Config) {
7651+
hm := &hookWithMiddleware{}
7652+
config.Hooks = []rivertype.Hook{hm}
7653+
config.Middleware = []rivertype.Middleware{hm}
7654+
},
7655+
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
7656+
require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1)
7657+
},
7658+
},
7659+
{
7660+
name: "Middleware implementing Hook in both configs is deduplicated in hook lookup",
7661+
configFunc: func(config *Config) {
7662+
mh := &middlewareWithHook{}
7663+
config.Hooks = []rivertype.Hook{mh}
7664+
config.Middleware = []rivertype.Middleware{mh}
7665+
},
7666+
validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper
7667+
require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1)
7668+
},
7669+
},
76297670
{
76307671
name: "Middleware not allowed with JobInsertMiddleware",
76317672
configFunc: func(config *Config) {
@@ -8625,3 +8666,31 @@ func (f JobArgsWithHooksFunc) Hooks() []rivertype.Hook {
86258666
func (JobArgsWithHooksFunc) MarshalJSON() ([]byte, error) { return []byte("{}"), nil }
86268667

86278668
func (JobArgsWithHooksFunc) UnmarshalJSON([]byte) error { return nil }
8669+
8670+
// middlewareWithHook is a middleware that also implements HookWorkBegin,
8671+
// used to test cross-pollination from middleware to hooks.
8672+
type middlewareWithHook struct {
8673+
MiddlewareDefaults
8674+
}
8675+
8676+
func (m *middlewareWithHook) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
8677+
return doInner(ctx)
8678+
}
8679+
8680+
func (m *middlewareWithHook) IsHook() bool { return true }
8681+
8682+
func (m *middlewareWithHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error {
8683+
return nil
8684+
}
8685+
8686+
// hookWithMiddleware is a hook that also implements WorkerMiddleware,
8687+
// used to test cross-pollination from hooks to middleware.
8688+
type hookWithMiddleware struct {
8689+
HookDefaults
8690+
}
8691+
8692+
func (h *hookWithMiddleware) IsMiddleware() bool { return true }
8693+
8694+
func (h *hookWithMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
8695+
return doInner(ctx)
8696+
}

hook_defaults_funcs.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ func (f HookPeriodicJobsStartFunc) Start(ctx context.Context, params *rivertype.
3333
return f(ctx, params)
3434
}
3535

36+
// HookQueueStateCountFunc is a convenience helper for implementing
37+
// rivertype.HookQueueStateCount using a simple function instead of a struct.
38+
type HookQueueStateCountFunc func(ctx context.Context, params *rivertype.HookQueueStateCountParams)
39+
40+
func (f HookQueueStateCountFunc) IsHook() bool { return true }
41+
42+
func (f HookQueueStateCountFunc) QueueStateCount(ctx context.Context, params *rivertype.HookQueueStateCountParams) {
43+
f(ctx, params)
44+
}
45+
3646
// HookWorkBeginFunc is a convenience helper for implementing
3747
// rivertype.HookWorkBegin using a simple function instead of a struct.
3848
type HookWorkBeginFunc func(ctx context.Context, job *rivertype.JobRow) error

internal/hooklookup/hook_lookup.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type HookKind string
1515
const (
1616
HookKindInsertBegin HookKind = "insert_begin"
1717
HookKindPeriodicJobsStart HookKind = "periodic_job_start"
18+
HookKindQueueStateCount HookKind = "queue_state_count"
1819
HookKindWorkBegin HookKind = "work_begin"
1920
HookKindWorkEnd HookKind = "work_end"
2021
)
@@ -90,6 +91,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook {
9091
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
9192
}
9293
}
94+
case HookKindQueueStateCount:
95+
for _, hook := range c.hooks {
96+
if typedHook, ok := hook.(rivertype.HookQueueStateCount); ok {
97+
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
98+
}
99+
}
93100
case HookKindWorkBegin:
94101
for _, hook := range c.hooks {
95102
if typedHook, ok := hook.(rivertype.HookWorkBegin); ok {

0 commit comments

Comments
 (0)