From 3d3c21a70084c592cd29978e04255126aa6c215d Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 11 Apr 2026 22:49:27 -0500 Subject: [PATCH] Add hook `HookQueueStateCount` + read middleware as hooks and vice versa Here, add a new hook called `HookQueueStateCount` which gets invoked to produce job queue count statistics. We do this by adding a new maintenance service which like other maintenance services, runs only on the leader, so we only have one client performing counts at any given time. Furthermore, in order to not introduce a potential operational problem without opt-in from River users, the counts only run if a `HookQueueStateCount` hook/middleware has been added to the client. The reason we do all this to to implement a feature requested by one of users: for `otelriver` in contrib to be able to emit queue count metrics, which seems like a pretty reasonable ask for the package to be able to do, and something that every River user would likely want access to in their ops charts. A slight oddity, but which I think is _probably_ okay, is that the new hook ideally stays a hook, but the existing `otelriver` middleware is a middleware. It'd be nice not to have to put `otelriver.Middleware` into both a client's `Hooks` and `Middleware` configuration, so we modify client to allow for hooks that middleware and middleware which are hooks. This lets `otelriver.Middleware` continue doing what it was already doing, but also to start producing new counts as a hook. --- CHANGELOG.md | 5 + client.go | 55 ++++- client_test.go | 69 +++++++ hook_defaults_funcs.go | 10 + internal/hooklookup/hook_lookup.go | 7 + internal/maintenance/queue_state_counter.go | 188 ++++++++++++++++++ .../maintenance/queue_state_counter_test.go | 161 +++++++++++++++ riverdriver/river_driver_interface.go | 8 +- .../internal/dbsqlc/river_job.sql.go | 50 +---- .../river_database_sql_driver.go | 13 +- riverdriver/riverdrivertest/job_read.go | 58 ++---- .../riverpgxv5/internal/dbsqlc/river_job.sql | 42 +--- .../internal/dbsqlc/river_job.sql.go | 50 +---- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 13 +- .../riversqlite/internal/dbsqlc/river_job.sql | 22 +- .../internal/dbsqlc/river_job.sql.go | 30 +-- .../riversqlite/river_sqlite_driver.go | 43 +--- rivertype/river_type.go | 29 +++ 18 files changed, 597 insertions(+), 256 deletions(-) create mode 100644 internal/maintenance/queue_state_counter.go create mode 100644 internal/maintenance/queue_state_counter_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ea739a78..2ed719b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added new `HookQueueStateCount` hook which is run by a River leader to generate queue count statistics. [PR #1203](https://github.com/riverqueue/river/pull/1203). +- Middleware that implements `rivertype.Hook` can be looked up as hooks even if passed into `Config.Middleware`. Similarly, hooks that implement `rivertype.Middleware` can be looked up as middleware even if passed into `Config.Hooks`. [PR #1203](https://github.com/riverqueue/river/pull/1203). + ## [0.34.0] - 2026-04-08 ### Added diff --git a/client.go b/client.go index 28c0ab61..2be27032 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "regexp" + "slices" "strings" "sync" "time" @@ -657,6 +658,7 @@ type clientTestSignals struct { periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals queueCleaner *maintenance.QueueCleanerTestSignals queueMaintainerLeader *maintenance.QueueMaintainerLeaderTestSignals + queueStateCounter *maintenance.QueueStateCounterTestSignals reindexer *maintenance.ReindexerTestSignals } @@ -679,6 +681,9 @@ func (ts *clientTestSignals) Init(tb testutil.TestingTB) { if ts.queueMaintainerLeader != nil { ts.queueMaintainerLeader.Init(tb) } + if ts.queueStateCounter != nil { + ts.queueStateCounter.Init(tb) + } if ts.reindexer != nil { ts.reindexer.Init(tb) } @@ -759,7 +764,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client config: config, driver: driver, hookLookupByJob: hooklookup.NewJobHookLookup(), - hookLookupGlobal: hooklookup.NewHookLookup(config.Hooks), + hookLookupGlobal: nil, // initialized below after cross-referencing with middleware producersByQueueName: make(map[string]*producer), testSignals: clientTestSignals{}, workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up @@ -780,9 +785,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // the more abstract config.Middleware for middleware are set, but not both, // so in practice we never append all three of these to each other. { - middleware := config.Middleware + middlewares := config.Middleware for _, jobInsertMiddleware := range config.JobInsertMiddleware { - middleware = append(middleware, jobInsertMiddleware) + middlewares = append(middlewares, jobInsertMiddleware) } outerLoop: for _, workerMiddleware := range config.WorkerMiddleware { @@ -798,16 +803,44 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } } - middleware = append(middleware, workerMiddleware) + middlewares = append(middlewares, workerMiddleware) } - for _, middleware := range middleware { + for _, middleware := range middlewares { if withBaseService, ok := middleware.(baseservice.WithBaseService); ok { baseservice.Init(archetype, withBaseService) } } - client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middleware) + // Cross-reference hooks and middleware: any middleware that also + // implements Hook is added to hooks, and any hook that also implements + // Middleware is added to middleware. Deduplication prevents double + // registration when the same struct is passed to both Config.Hooks and + // Config.Middleware. + hooks := config.Hooks + + for _, middleware := range middlewares { + if hook, ok := middleware.(rivertype.Hook); ok { + // Only add if this middleware isn't already in hooks (it may + // have been passed to both config properties). + alreadyInHooks := slices.Contains(hooks, hook) + if !alreadyInHooks { + hooks = append(hooks, hook) + } + } + } + + for _, hook := range config.Hooks { + if middleware, ok := hook.(rivertype.Middleware); ok { + alreadyInMiddleware := slices.Contains(middlewares, middleware) + if !alreadyInMiddleware { + middlewares = append(middlewares, middleware) + } + } + } + + client.hookLookupGlobal = hooklookup.NewHookLookup(hooks) + client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middlewares) } pluginDriver, _ := driver.(driverPlugin[TTx]) @@ -961,6 +994,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.testSignals.queueCleaner = &queueCleaner.TestSignals } + { + queueStateCounter := maintenance.NewQueueStateCounter(archetype, &maintenance.QueueStateCounterConfig{ + HookLookupGlobal: client.hookLookupGlobal, + QueueNames: maputil.Keys(config.Queues), + Schema: config.Schema, + }, driver.GetExecutor()) + maintenanceServices = append(maintenanceServices, queueStateCounter) + client.testSignals.queueStateCounter = &queueStateCounter.TestSignals + } + { var scheduleFunc func(time.Time) time.Time if config.ReindexerSchedule != nil { diff --git a/client_test.go b/client_test.go index 24a8be7f..c2c72072 100644 --- a/client_test.go +++ b/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/tidwall/sjson" "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/middlewarelookup" @@ -7626,6 +7627,46 @@ func Test_NewClient_Validations(t *testing.T) { require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) }, }, + { + name: "Middleware implementing Hook is available in hook lookup", + configFunc: func(config *Config) { + config.Middleware = []rivertype.Middleware{&middlewareWithHook{}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1) + }, + }, + { + name: "Hook implementing Middleware is available in middleware lookup", + configFunc: func(config *Config) { + config.Hooks = []rivertype.Hook{&hookWithMiddleware{}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) + }, + }, + { + name: "Hook implementing Middleware in both configs is deduplicated in middleware lookup", + configFunc: func(config *Config) { + hm := &hookWithMiddleware{} + config.Hooks = []rivertype.Hook{hm} + config.Middleware = []rivertype.Middleware{hm} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) + }, + }, + { + name: "Middleware implementing Hook in both configs is deduplicated in hook lookup", + configFunc: func(config *Config) { + mh := &middlewareWithHook{} + config.Hooks = []rivertype.Hook{mh} + config.Middleware = []rivertype.Middleware{mh} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1) + }, + }, { name: "Middleware not allowed with JobInsertMiddleware", configFunc: func(config *Config) { @@ -8625,3 +8666,31 @@ func (f JobArgsWithHooksFunc) Hooks() []rivertype.Hook { func (JobArgsWithHooksFunc) MarshalJSON() ([]byte, error) { return []byte("{}"), nil } func (JobArgsWithHooksFunc) UnmarshalJSON([]byte) error { return nil } + +// middlewareWithHook is a middleware that also implements HookWorkBegin, +// used to test cross-pollination from middleware to hooks. +type middlewareWithHook struct { + MiddlewareDefaults +} + +func (m *middlewareWithHook) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + return doInner(ctx) +} + +func (m *middlewareWithHook) IsHook() bool { return true } + +func (m *middlewareWithHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + return nil +} + +// hookWithMiddleware is a hook that also implements WorkerMiddleware, +// used to test cross-pollination from hooks to middleware. +type hookWithMiddleware struct { + HookDefaults +} + +func (h *hookWithMiddleware) IsMiddleware() bool { return true } + +func (h *hookWithMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + return doInner(ctx) +} diff --git a/hook_defaults_funcs.go b/hook_defaults_funcs.go index 93a7f4fe..2b6aee8c 100644 --- a/hook_defaults_funcs.go +++ b/hook_defaults_funcs.go @@ -33,6 +33,16 @@ func (f HookPeriodicJobsStartFunc) Start(ctx context.Context, params *rivertype. return f(ctx, params) } +// HookQueueStateCountFunc is a convenience helper for implementing +// rivertype.HookQueueStateCount using a simple function instead of a struct. +type HookQueueStateCountFunc func(ctx context.Context, params *rivertype.HookQueueStateCountParams) + +func (f HookQueueStateCountFunc) IsHook() bool { return true } + +func (f HookQueueStateCountFunc) QueueStateCount(ctx context.Context, params *rivertype.HookQueueStateCountParams) { + f(ctx, params) +} + // HookWorkBeginFunc is a convenience helper for implementing // rivertype.HookWorkBegin using a simple function instead of a struct. type HookWorkBeginFunc func(ctx context.Context, job *rivertype.JobRow) error diff --git a/internal/hooklookup/hook_lookup.go b/internal/hooklookup/hook_lookup.go index 420debb2..9dfd3a04 100644 --- a/internal/hooklookup/hook_lookup.go +++ b/internal/hooklookup/hook_lookup.go @@ -15,6 +15,7 @@ type HookKind string const ( HookKindInsertBegin HookKind = "insert_begin" HookKindPeriodicJobsStart HookKind = "periodic_job_start" + HookKindQueueStateCount HookKind = "queue_state_count" HookKindWorkBegin HookKind = "work_begin" HookKindWorkEnd HookKind = "work_end" ) @@ -90,6 +91,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook { c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) } } + case HookKindQueueStateCount: + for _, hook := range c.hooks { + if typedHook, ok := hook.(rivertype.HookQueueStateCount); ok { + c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) + } + } case HookKindWorkBegin: for _, hook := range c.hooks { if typedHook, ok := hook.(rivertype.HookWorkBegin); ok { diff --git a/internal/maintenance/queue_state_counter.go b/internal/maintenance/queue_state_counter.go new file mode 100644 index 00000000..a26e6eef --- /dev/null +++ b/internal/maintenance/queue_state_counter.go @@ -0,0 +1,188 @@ +package maintenance + +import ( + "cmp" + "context" + "errors" + "log/slog" + "time" + + "github.com/riverqueue/river/internal/hooklookup" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riversharedmaintenance" + "github.com/riverqueue/river/rivershared/startstop" + "github.com/riverqueue/river/rivershared/testsignal" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivershared/util/timeutil" + "github.com/riverqueue/river/rivertype" +) + +const QueueStateCounterIntervalDefault = 10 * time.Second + +var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals + +// QueueStateCounterTestSignals are internal signals used exclusively in tests. +type QueueStateCounterTestSignals struct { + CountedOnce testsignal.TestSignal[struct{}] // notifies when a count pass finishes +} + +func (ts *QueueStateCounterTestSignals) Init(tb testutil.TestingTB) { + ts.CountedOnce.Init(tb) +} + +type QueueStateCounterConfig struct { + // HookLookupGlobal provides access to globally registered hooks. + HookLookupGlobal hooklookup.HookLookupInterface + + // Interval is the amount of time between count runs. + Interval time.Duration + + // QueueNames is the list of configured queue names. Counts are emitted for + // all of these queues even if they have no jobs, with zero counts for all + // states. + QueueNames []string + + // Schema where River tables are located. Empty string omits schema, causing + // Postgres to default to `search_path`. + Schema string +} + +func (c *QueueStateCounterConfig) mustValidate() *QueueStateCounterConfig { + if c.Interval <= 0 { + panic("QueueStateCounterConfig.Interval must be above zero") + } + + return c +} + +// QueueStateCounter periodically counts jobs by queue and state, logging the +// results. This provides visibility into queue health without requiring +// external monitoring queries. The maintenance service only runs if there is a +// HookQueueStateCount hook registered that consumes the counts. +type QueueStateCounter struct { + riversharedmaintenance.QueueMaintainerServiceBase + startstop.BaseStartStop + + // exported for test purposes + Config *QueueStateCounterConfig + TestSignals QueueStateCounterTestSignals + + exec riverdriver.Executor +} + +func NewQueueStateCounter(archetype *baseservice.Archetype, config *QueueStateCounterConfig, exec riverdriver.Executor) *QueueStateCounter { + return baseservice.Init(archetype, &QueueStateCounter{ + Config: (&QueueStateCounterConfig{ + HookLookupGlobal: config.HookLookupGlobal, + Interval: cmp.Or(config.Interval, QueueStateCounterIntervalDefault), + QueueNames: config.QueueNames, + Schema: config.Schema, + }).mustValidate(), + exec: exec, + }) +} + +func (s *QueueStateCounter) Start(ctx context.Context) error { + ctx, shouldStart, started, stopped := s.StartInit(ctx) + if !shouldStart { + return nil + } + + s.StaggerStart(ctx) + + go func() { + started() + defer stopped() // this defer should come first so it's last out + + s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted) + defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped) + + // If no hooks are registered, there's no one to send counts to, so + // start, but don't do anything. + if len(s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindQueueStateCount)) < 1 { + <-ctx.Done() + return + } + + ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + if err := s.runOnce(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + s.Logger.ErrorContext(ctx, s.Name+": Error counting queue states", slog.String("error", err.Error())) + } + } + } + }() + + return nil +} + +func (s *QueueStateCounter) runOnce(ctx context.Context) error { + ctx, cancelFunc := context.WithTimeout(ctx, riversharedmaintenance.TimeoutDefault) + defer cancelFunc() + + rawCounts, err := s.exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + QueueNames: s.Config.QueueNames, + Schema: s.Config.Schema, + }) + if err != nil { + return err + } + + byQueue := s.buildResults(ctx, rawCounts) + + for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindQueueStateCount) { + hook.(rivertype.HookQueueStateCount).QueueStateCount(ctx, &rivertype.HookQueueStateCountParams{ //nolint:forcetypeassert + ByQueue: byQueue, + }) + } + + s.TestSignals.CountedOnce.Signal(struct{}{}) + + return nil +} + +// buildResults converts raw driver counts into results with all configured +// queues and all job states filled in (zeroed where needed), logging one line +// per queue. +func (s *QueueStateCounter) buildResults(ctx context.Context, rawCounts map[string]map[rivertype.JobState]int) map[string]rivertype.HookQueueStateCountResult { + // Ensure all configured queues are present, even if they have no jobs. + for _, queue := range s.Config.QueueNames { + if rawCounts[queue] == nil { + rawCounts[queue] = make(map[rivertype.JobState]int) + } + } + + countsByQueue := make(map[string]rivertype.HookQueueStateCountResult, len(rawCounts)) + + for queue, stateCounts := range rawCounts { + attrs := make([]slog.Attr, 0, 2+len(jobStateAll)) + attrs = append(attrs, slog.String("queue", queue)) + total := 0 + + for _, state := range jobStateAll { + if _, ok := stateCounts[state]; !ok { + stateCounts[state] = 0 + } + total += stateCounts[state] + attrs = append(attrs, slog.Int(string(state), stateCounts[state])) + } + + attrs = append(attrs, slog.Int("total", total)) + s.Logger.LogAttrs(ctx, slog.LevelInfo, s.Name+": Queue state counts", attrs...) + + countsByQueue[queue] = rivertype.HookQueueStateCountResult{ + ByState: stateCounts, + Total: total, + } + } + + return countsByQueue +} diff --git a/internal/maintenance/queue_state_counter_test.go b/internal/maintenance/queue_state_counter_test.go new file mode 100644 index 00000000..257776ad --- /dev/null +++ b/internal/maintenance/queue_state_counter_test.go @@ -0,0 +1,161 @@ +package maintenance + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/internal/hooklookup" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/startstoptest" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivertype" +) + +func TestQueueStateCounter(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + } + + setup := func(t *testing.T, hooks []rivertype.Hook, queueNames []string) (*QueueStateCounter, *testBundle) { + t.Helper() + + var ( + tx = riverdbtest.TestTxPgx(ctx, t) + exec = riverpgxv5.New(nil).UnwrapExecutor(tx) + ) + + svc := NewQueueStateCounter( + riversharedtest.BaseServiceArchetype(t), + &QueueStateCounterConfig{ + HookLookupGlobal: hooklookup.NewHookLookup(hooks), + QueueNames: queueNames, + }, + exec, + ) + svc.StaggerStartupDisable(true) + svc.TestSignals.Init(t) + t.Cleanup(svc.Stop) + + return svc, &testBundle{exec: exec} + } + + t.Run("CountsJobsByQueueAndState", func(t *testing.T) { + t.Parallel() + + hook := &capturingQueueStateCountHook{} + svc, bundle := setup(t, []rivertype.Hook{hook}, []string{"queue1", "queue2", "queue_empty"}) + + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateDiscarded)}) + + require.NoError(t, svc.Start(ctx)) + svc.TestSignals.CountedOnce.WaitOrTimeout() + + counts := hook.LastParams().ByQueue + + // All three configured queues are present, including queue_empty + // which has no jobs. + require.Len(t, counts, 3) + + // All job states are present for each queue, even those with zero jobs. + for _, queue := range []string{"queue1", "queue2", "queue_empty"} { + require.Len(t, counts[queue].ByState, len(rivertype.JobStates()), "queue %q should have all states", queue) + } + + require.Equal(t, 2, counts["queue1"].ByState[rivertype.JobStateAvailable]) + require.Equal(t, 1, counts["queue1"].ByState[rivertype.JobStateRunning]) + require.Equal(t, 0, counts["queue1"].ByState[rivertype.JobStateCompleted]) + require.Equal(t, 3, counts["queue1"].Total) + + require.Equal(t, 1, counts["queue2"].ByState[rivertype.JobStateAvailable]) + require.Equal(t, 2, counts["queue2"].ByState[rivertype.JobStateCompleted]) + require.Equal(t, 1, counts["queue2"].ByState[rivertype.JobStateDiscarded]) + require.Equal(t, 0, counts["queue2"].ByState[rivertype.JobStateRunning]) + require.Equal(t, 4, counts["queue2"].Total) + + // queue_empty has all states present, all zero. + for _, state := range rivertype.JobStates() { + require.Equal(t, 0, counts["queue_empty"].ByState[state], "queue_empty[%s] should be 0", state) + } + require.Equal(t, 0, counts["queue_empty"].Total) + }) + + t.Run("NoopsWithoutHooks", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, nil, nil) + + // With no hooks, Start returns immediately without starting the service. + require.NoError(t, svc.Start(ctx)) + + // Service should not have started, so calling Start again should also + // succeed (StartInit would return false if it were already running). + require.NoError(t, svc.Start(ctx)) + }) + + t.Run("StartStop", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, []rivertype.Hook{&capturingQueueStateCountHook{}}, nil) + + require.NoError(t, svc.Start(ctx)) + svc.TestSignals.CountedOnce.WaitOrTimeout() + }) + + t.Run("StartStopStress", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, []rivertype.Hook{&capturingQueueStateCountHook{}}, nil) + svc.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress + svc.TestSignals = QueueStateCounterTestSignals{} // deinit so channels don't fill + + startstoptest.Stress(ctx, t, svc) + }) + + t.Run("StartStopStressNoHooks", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, nil, nil) + svc.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress + svc.TestSignals = QueueStateCounterTestSignals{} // deinit so channels don't fill + + startstoptest.Stress(ctx, t, svc) + }) +} + +// capturingQueueStateCountHook captures the last params received from the hook +// invocation. +type capturingQueueStateCountHook struct { + mu sync.Mutex + lastParams *rivertype.HookQueueStateCountParams +} + +func (h *capturingQueueStateCountHook) IsHook() bool { return true } + +func (h *capturingQueueStateCountHook) QueueStateCount(_ context.Context, params *rivertype.HookQueueStateCountParams) { + h.mu.Lock() + defer h.mu.Unlock() + h.lastParams = params +} + +func (h *capturingQueueStateCountHook) LastParams() *rivertype.HookQueueStateCountParams { + h.mu.Lock() + defer h.mu.Unlock() + return h.lastParams +} diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 43eacb26..1e108c7f 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -202,7 +202,7 @@ type Executor interface { JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error) JobCountByAllStates(ctx context.Context, params *JobCountByAllStatesParams) (map[rivertype.JobState]int, error) - JobCountByQueueAndState(ctx context.Context, params *JobCountByQueueAndStateParams) ([]*JobCountByQueueAndStateResult, error) + JobCountByQueueAndState(ctx context.Context, params *JobCountByQueueAndStateParams) (map[string]map[rivertype.JobState]int, error) JobCountByState(ctx context.Context, params *JobCountByStateParams) (int, error) JobDelete(ctx context.Context, params *JobDeleteParams) (*rivertype.JobRow, error) JobDeleteBefore(ctx context.Context, params *JobDeleteBeforeParams) (int, error) @@ -359,12 +359,6 @@ type JobCountByQueueAndStateParams struct { Schema string } -type JobCountByQueueAndStateResult struct { - CountAvailable int64 - CountRunning int64 - Queue string -} - type JobCountByStateParams struct { Schema string State rivertype.JobState diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 14ec9c43..660eab0b 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -132,48 +132,20 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest($1::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'available' - GROUP BY queue -) - SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC + queue, + state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE queue = ANY($1::text[]) +GROUP BY queue, state +ORDER BY queue, state ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State RiverJobState + Count int64 } func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { @@ -185,7 +157,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ef41a525..ab11fe93 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -222,18 +222,17 @@ func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver. return countsMap, nil } -func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) ([]*riverdriver.JobCountByQueueAndStateResult, error) { +func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) (map[string]map[rivertype.JobState]int, error) { rows, err := dbsqlc.New().JobCountByQueueAndState(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.QueueNames) if err != nil { return nil, interpretError(err) } - results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows)) - for i, row := range rows { - results[i] = &riverdriver.JobCountByQueueAndStateResult{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - Queue: row.Queue, + results := make(map[string]map[rivertype.JobState]int) + for _, row := range rows { + if results[row.Queue] == nil { + results[row.Queue] = make(map[rivertype.JobState]int) } + results[row.Queue][rivertype.JobState(row.State)] = int(row.Count) } return results, nil } diff --git a/riverdriver/riverdrivertest/job_read.go b/riverdriver/riverdrivertest/job_read.go index 5649d910..ba0c1ccf 100644 --- a/riverdriver/riverdrivertest/job_read.go +++ b/riverdriver/riverdrivertest/job_read.go @@ -86,7 +86,7 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx t.Run("JobCountByQueueAndState", func(t *testing.T) { t.Parallel() - t.Run("CountsJobsInAvailableAndRunningForEachOfTheSpecifiedQueues", func(t *testing.T) { + t.Run("CountsAllStatesForSpecifiedQueues", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) @@ -96,10 +96,10 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue3"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue3"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ QueueNames: []string{"queue1", "queue2"}, @@ -109,62 +109,32 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx require.Len(t, countsByQueue, 2) - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(2), countsByQueue[0].CountAvailable) - require.Equal(t, int64(3), countsByQueue[0].CountRunning) - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) - }) - - t.Run("IncludesRequestedQueuesThatHaveNoJobs", func(t *testing.T) { - t.Parallel() - - exec, _ := setup(ctx, t) - - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) - - countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ - QueueNames: []string{"queue1", "queue2"}, - Schema: "", - }) - require.NoError(t, err) - - require.Len(t, countsByQueue, 2) - - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(0), countsByQueue[0].CountAvailable) - require.Equal(t, int64(0), countsByQueue[0].CountRunning) + require.Equal(t, 2, countsByQueue["queue1"][rivertype.JobStateAvailable]) + require.Equal(t, 3, countsByQueue["queue1"][rivertype.JobStateRunning]) + require.Equal(t, 1, countsByQueue["queue1"][rivertype.JobStateCompleted]) + require.Equal(t, 0, countsByQueue["queue1"][rivertype.JobStateCancelled]) - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) + require.Equal(t, 1, countsByQueue["queue2"][rivertype.JobStateAvailable]) + require.Equal(t, 1, countsByQueue["queue2"][rivertype.JobStateRunning]) }) - t.Run("InputQueueNamesAreDeduplicated", func(t *testing.T) { + t.Run("ExcludesQueuesNotRequested", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ - QueueNames: []string{"queue2", "queue1", "queue1"}, + QueueNames: []string{"queue1"}, Schema: "", }) require.NoError(t, err) - require.Len(t, countsByQueue, 2) - - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(0), countsByQueue[0].CountAvailable) - require.Equal(t, int64(0), countsByQueue[0].CountRunning) - - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) + require.Len(t, countsByQueue, 1) + require.Equal(t, 1, countsByQueue["queue1"][rivertype.JobStateAvailable]) + require.Nil(t, countsByQueue["queue2"]) }) }) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index dde85de4..adf65513 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -86,42 +86,14 @@ FROM /* TEMPLATE: schema */ river_job GROUP BY state; -- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest(@queue_names::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY(@queue_names::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY(@queue_names::text[]) - AND state = 'available' - GROUP BY queue -) - SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC; + queue, + state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE queue = ANY(@queue_names::text[]) +GROUP BY queue, state +ORDER BY queue, state; -- name: JobCountByState :one SELECT count(*) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index a86a2235..8ba35d1a 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -129,48 +129,20 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest($1::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'available' - GROUP BY queue -) - SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC + queue, + state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE queue = ANY($1::text[]) +GROUP BY queue, state +ORDER BY queue, state ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State RiverJobState + Count int64 } func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { @@ -182,7 +154,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 9d3fed61..83da6e07 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -230,18 +230,17 @@ func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver. return countsMap, nil } -func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) ([]*riverdriver.JobCountByQueueAndStateResult, error) { +func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) (map[string]map[rivertype.JobState]int, error) { rows, err := dbsqlc.New().JobCountByQueueAndState(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.QueueNames) if err != nil { return nil, interpretError(err) } - results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows)) - for i, row := range rows { - results[i] = &riverdriver.JobCountByQueueAndStateResult{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - Queue: row.Queue, + results := make(map[string]map[rivertype.JobState]int) + for _, row := range rows { + if results[row.Queue] == nil { + results[row.Queue] = make(map[rivertype.JobState]int) } + results[row.Queue][rivertype.JobState(row.State)] = int(row.Count) } return results, nil } diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index dfb72dc3..4403b894 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -56,22 +56,14 @@ FROM /* TEMPLATE: schema */river_job GROUP BY state; -- name: JobCountByQueueAndState :many -WITH queue_stats AS ( - SELECT - river_job.queue, - COUNT(CASE WHEN river_job.state = 'available' THEN 1 END) AS count_available, - COUNT(CASE WHEN river_job.state = 'running' THEN 1 END) AS count_running - FROM /* TEMPLATE: schema */river_job - WHERE river_job.queue IN (sqlc.slice('queue_names')) - GROUP BY river_job.queue -) - SELECT - cast(queue AS text) AS queue, - count_available, - count_running -FROM queue_stats -ORDER BY queue ASC; + cast(river_job.queue AS text) AS queue, + cast(river_job.state AS text) AS state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE river_job.queue IN (sqlc.slice('queue_names')) +GROUP BY river_job.queue, river_job.state +ORDER BY river_job.queue ASC, river_job.state ASC; -- name: JobCountByState :one SELECT count(*) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 083dc5c3..468c127b 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -103,28 +103,20 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH queue_stats AS ( - SELECT - river_job.queue, - COUNT(CASE WHEN river_job.state = 'available' THEN 1 END) AS count_available, - COUNT(CASE WHEN river_job.state = 'running' THEN 1 END) AS count_running - FROM /* TEMPLATE: schema */river_job - WHERE river_job.queue IN (/*SLICE:queue_names*/?) - GROUP BY river_job.queue -) - SELECT - cast(queue AS text) AS queue, - count_available, - count_running -FROM queue_stats -ORDER BY queue ASC + cast(river_job.queue AS text) AS queue, + cast(river_job.state AS text) AS state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE river_job.queue IN (/*SLICE:queue_names*/?) +GROUP BY river_job.queue, river_job.state +ORDER BY river_job.queue ASC, river_job.state ASC ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State string + Count int64 } func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { @@ -146,7 +138,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 1eda9f14..a494ee2d 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -290,51 +290,18 @@ func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver. return countsMap, nil } -func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) ([]*riverdriver.JobCountByQueueAndStateResult, error) { +func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) (map[string]map[rivertype.JobState]int, error) { rows, err := dbsqlc.New().JobCountByQueueAndState(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.QueueNames) if err != nil { return nil, interpretError(err) } - - // The PostgreSQL drivers implement this query with an `all_queues` CTE and - // LEFT JOINs, so they return one row per requested queue, including queues - // that currently have no jobs. The input queue list is deduplicated in SQL. - // The SQLite sqlc driver only reliably supports `sqlc.slice` in `IN (...)`, - // and we haven't found a workable way to bind a parameterized list through - // `json_each(...)` to produce equivalent SQL. The SQLite SQL query therefore - // returns only queues with matching rows, and this wrapper fills in missing - // queues to match PostgreSQL behavior. - countsByQueue := make(map[string]struct { - CountAvailable int64 - CountRunning int64 - }, len(rows)) + results := make(map[string]map[rivertype.JobState]int) for _, row := range rows { - countsByQueue[row.Queue] = struct { - CountAvailable int64 - CountRunning int64 - }{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - } - } - - queueNames := slices.Clone(params.QueueNames) - slices.Sort(queueNames) - queueNames = slices.Compact(queueNames) - - results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(queueNames)) - for _, queueName := range queueNames { - result := &riverdriver.JobCountByQueueAndStateResult{ - Queue: queueName, - } - if counts, ok := countsByQueue[queueName]; ok { - result.CountAvailable = counts.CountAvailable - result.CountRunning = counts.CountRunning + if results[row.Queue] == nil { + results[row.Queue] = make(map[rivertype.JobState]int) } - - results = append(results, result) + results[row.Queue][rivertype.JobState(row.State)] = int(row.Count) } - return results, nil } diff --git a/rivertype/river_type.go b/rivertype/river_type.go index 13d07f63..0233eca4 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -351,6 +351,35 @@ type HookPeriodicJobsStartParams struct { DurableJobs []*DurablePeriodicJob } +// HookQueueStateCount is an interface to a hook that runs each time the queue +// state counter maintenance service performs a count. +type HookQueueStateCount interface { + Hook + + // QueueStateCount is invoked after the queue state counter has completed a + // count of jobs by queue and state. + QueueStateCount(ctx context.Context, params *HookQueueStateCountParams) +} + +// HookQueueStateCountParams are parameters for HookQueueStateCount. +type HookQueueStateCountParams struct { + // ByQueue is a map of queue name to its count results. All queues + // configured with this client are included, even if they don't currently + // contain jobs. + ByQueue map[string]HookQueueStateCountResult +} + +// HookQueueStateCountResult contains coutns for a single queue. +type HookQueueStateCountResult struct { + // ByState is a map of job state to the number of jobs in that state for the + // queue. All job states are included, even if they have no jobs in that + // state. + ByState map[JobState]int + + // Total is the total number of jobs across all states for the queue. + Total int +} + // HookWorkBegin is an interface to a hook that runs after a job has been locked // for work and before it's worked. type HookWorkBegin interface {