diff --git a/CHANGELOG.md b/CHANGELOG.md index ea739a78..2ed719b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added new `HookQueueStateCount` hook which is run by a River leader to generate queue count statistics. [PR #1203](https://github.com/riverqueue/river/pull/1203). +- Middleware that implements `rivertype.Hook` can be looked up as hooks even if passed into `Config.Middleware`. Similarly, hooks that implement `rivertype.Middleware` can be looked up as middleware even if passed into `Config.Hooks`. [PR #1203](https://github.com/riverqueue/river/pull/1203). + ## [0.34.0] - 2026-04-08 ### Added diff --git a/client.go b/client.go index 28c0ab61..2be27032 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "log/slog" "os" "regexp" + "slices" "strings" "sync" "time" @@ -657,6 +658,7 @@ type clientTestSignals struct { periodicJobEnqueuer *maintenance.PeriodicJobEnqueuerTestSignals queueCleaner *maintenance.QueueCleanerTestSignals queueMaintainerLeader *maintenance.QueueMaintainerLeaderTestSignals + queueStateCounter *maintenance.QueueStateCounterTestSignals reindexer *maintenance.ReindexerTestSignals } @@ -679,6 +681,9 @@ func (ts *clientTestSignals) Init(tb testutil.TestingTB) { if ts.queueMaintainerLeader != nil { ts.queueMaintainerLeader.Init(tb) } + if ts.queueStateCounter != nil { + ts.queueStateCounter.Init(tb) + } if ts.reindexer != nil { ts.reindexer.Init(tb) } @@ -759,7 +764,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client config: config, driver: driver, hookLookupByJob: hooklookup.NewJobHookLookup(), - hookLookupGlobal: hooklookup.NewHookLookup(config.Hooks), + hookLookupGlobal: nil, // initialized below after cross-referencing with middleware producersByQueueName: make(map[string]*producer), testSignals: clientTestSignals{}, workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up @@ -780,9 +785,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // the more abstract config.Middleware for middleware are set, but not both, // so in practice we never append all three of these to each other. { - middleware := config.Middleware + middlewares := config.Middleware for _, jobInsertMiddleware := range config.JobInsertMiddleware { - middleware = append(middleware, jobInsertMiddleware) + middlewares = append(middlewares, jobInsertMiddleware) } outerLoop: for _, workerMiddleware := range config.WorkerMiddleware { @@ -798,16 +803,44 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } } - middleware = append(middleware, workerMiddleware) + middlewares = append(middlewares, workerMiddleware) } - for _, middleware := range middleware { + for _, middleware := range middlewares { if withBaseService, ok := middleware.(baseservice.WithBaseService); ok { baseservice.Init(archetype, withBaseService) } } - client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middleware) + // Cross-reference hooks and middleware: any middleware that also + // implements Hook is added to hooks, and any hook that also implements + // Middleware is added to middleware. Deduplication prevents double + // registration when the same struct is passed to both Config.Hooks and + // Config.Middleware. + hooks := config.Hooks + + for _, middleware := range middlewares { + if hook, ok := middleware.(rivertype.Hook); ok { + // Only add if this middleware isn't already in hooks (it may + // have been passed to both config properties). + alreadyInHooks := slices.Contains(hooks, hook) + if !alreadyInHooks { + hooks = append(hooks, hook) + } + } + } + + for _, hook := range config.Hooks { + if middleware, ok := hook.(rivertype.Middleware); ok { + alreadyInMiddleware := slices.Contains(middlewares, middleware) + if !alreadyInMiddleware { + middlewares = append(middlewares, middleware) + } + } + } + + client.hookLookupGlobal = hooklookup.NewHookLookup(hooks) + client.middlewareLookupGlobal = middlewarelookup.NewMiddlewareLookup(middlewares) } pluginDriver, _ := driver.(driverPlugin[TTx]) @@ -961,6 +994,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.testSignals.queueCleaner = &queueCleaner.TestSignals } + { + queueStateCounter := maintenance.NewQueueStateCounter(archetype, &maintenance.QueueStateCounterConfig{ + HookLookupGlobal: client.hookLookupGlobal, + QueueNames: maputil.Keys(config.Queues), + Schema: config.Schema, + }, driver.GetExecutor()) + maintenanceServices = append(maintenanceServices, queueStateCounter) + client.testSignals.queueStateCounter = &queueStateCounter.TestSignals + } + { var scheduleFunc func(time.Time) time.Time if config.ReindexerSchedule != nil { diff --git a/client_test.go b/client_test.go index 24a8be7f..c2c72072 100644 --- a/client_test.go +++ b/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/tidwall/sjson" "github.com/riverqueue/river/internal/dbunique" + "github.com/riverqueue/river/internal/hooklookup" "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/internal/maintenance" "github.com/riverqueue/river/internal/middlewarelookup" @@ -7626,6 +7627,46 @@ func Test_NewClient_Validations(t *testing.T) { require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) }, }, + { + name: "Middleware implementing Hook is available in hook lookup", + configFunc: func(config *Config) { + config.Middleware = []rivertype.Middleware{&middlewareWithHook{}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1) + }, + }, + { + name: "Hook implementing Middleware is available in middleware lookup", + configFunc: func(config *Config) { + config.Hooks = []rivertype.Hook{&hookWithMiddleware{}} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) + }, + }, + { + name: "Hook implementing Middleware in both configs is deduplicated in middleware lookup", + configFunc: func(config *Config) { + hm := &hookWithMiddleware{} + config.Hooks = []rivertype.Hook{hm} + config.Middleware = []rivertype.Middleware{hm} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.middlewareLookupGlobal.ByMiddlewareKind(middlewarelookup.MiddlewareKindWorker), 1) + }, + }, + { + name: "Middleware implementing Hook in both configs is deduplicated in hook lookup", + configFunc: func(config *Config) { + mh := &middlewareWithHook{} + config.Hooks = []rivertype.Hook{mh} + config.Middleware = []rivertype.Middleware{mh} + }, + validateResult: func(t *testing.T, client *Client[pgx.Tx]) { //nolint:thelper + require.Len(t, client.hookLookupGlobal.ByHookKind(hooklookup.HookKindWorkBegin), 1) + }, + }, { name: "Middleware not allowed with JobInsertMiddleware", configFunc: func(config *Config) { @@ -8625,3 +8666,31 @@ func (f JobArgsWithHooksFunc) Hooks() []rivertype.Hook { func (JobArgsWithHooksFunc) MarshalJSON() ([]byte, error) { return []byte("{}"), nil } func (JobArgsWithHooksFunc) UnmarshalJSON([]byte) error { return nil } + +// middlewareWithHook is a middleware that also implements HookWorkBegin, +// used to test cross-pollination from middleware to hooks. +type middlewareWithHook struct { + MiddlewareDefaults +} + +func (m *middlewareWithHook) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + return doInner(ctx) +} + +func (m *middlewareWithHook) IsHook() bool { return true } + +func (m *middlewareWithHook) WorkBegin(ctx context.Context, job *rivertype.JobRow) error { + return nil +} + +// hookWithMiddleware is a hook that also implements WorkerMiddleware, +// used to test cross-pollination from hooks to middleware. +type hookWithMiddleware struct { + HookDefaults +} + +func (h *hookWithMiddleware) IsMiddleware() bool { return true } + +func (h *hookWithMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + return doInner(ctx) +} diff --git a/hook_defaults_funcs.go b/hook_defaults_funcs.go index 93a7f4fe..2b6aee8c 100644 --- a/hook_defaults_funcs.go +++ b/hook_defaults_funcs.go @@ -33,6 +33,16 @@ func (f HookPeriodicJobsStartFunc) Start(ctx context.Context, params *rivertype. return f(ctx, params) } +// HookQueueStateCountFunc is a convenience helper for implementing +// rivertype.HookQueueStateCount using a simple function instead of a struct. +type HookQueueStateCountFunc func(ctx context.Context, params *rivertype.HookQueueStateCountParams) + +func (f HookQueueStateCountFunc) IsHook() bool { return true } + +func (f HookQueueStateCountFunc) QueueStateCount(ctx context.Context, params *rivertype.HookQueueStateCountParams) { + f(ctx, params) +} + // HookWorkBeginFunc is a convenience helper for implementing // rivertype.HookWorkBegin using a simple function instead of a struct. type HookWorkBeginFunc func(ctx context.Context, job *rivertype.JobRow) error diff --git a/internal/hooklookup/hook_lookup.go b/internal/hooklookup/hook_lookup.go index 420debb2..9dfd3a04 100644 --- a/internal/hooklookup/hook_lookup.go +++ b/internal/hooklookup/hook_lookup.go @@ -15,6 +15,7 @@ type HookKind string const ( HookKindInsertBegin HookKind = "insert_begin" HookKindPeriodicJobsStart HookKind = "periodic_job_start" + HookKindQueueStateCount HookKind = "queue_state_count" HookKindWorkBegin HookKind = "work_begin" HookKindWorkEnd HookKind = "work_end" ) @@ -90,6 +91,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook { c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) } } + case HookKindQueueStateCount: + for _, hook := range c.hooks { + if typedHook, ok := hook.(rivertype.HookQueueStateCount); ok { + c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook) + } + } case HookKindWorkBegin: for _, hook := range c.hooks { if typedHook, ok := hook.(rivertype.HookWorkBegin); ok { diff --git a/internal/maintenance/queue_state_counter.go b/internal/maintenance/queue_state_counter.go new file mode 100644 index 00000000..a26e6eef --- /dev/null +++ b/internal/maintenance/queue_state_counter.go @@ -0,0 +1,188 @@ +package maintenance + +import ( + "cmp" + "context" + "errors" + "log/slog" + "time" + + "github.com/riverqueue/river/internal/hooklookup" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riversharedmaintenance" + "github.com/riverqueue/river/rivershared/startstop" + "github.com/riverqueue/river/rivershared/testsignal" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivershared/util/timeutil" + "github.com/riverqueue/river/rivertype" +) + +const QueueStateCounterIntervalDefault = 10 * time.Second + +var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals + +// QueueStateCounterTestSignals are internal signals used exclusively in tests. +type QueueStateCounterTestSignals struct { + CountedOnce testsignal.TestSignal[struct{}] // notifies when a count pass finishes +} + +func (ts *QueueStateCounterTestSignals) Init(tb testutil.TestingTB) { + ts.CountedOnce.Init(tb) +} + +type QueueStateCounterConfig struct { + // HookLookupGlobal provides access to globally registered hooks. + HookLookupGlobal hooklookup.HookLookupInterface + + // Interval is the amount of time between count runs. + Interval time.Duration + + // QueueNames is the list of configured queue names. Counts are emitted for + // all of these queues even if they have no jobs, with zero counts for all + // states. + QueueNames []string + + // Schema where River tables are located. Empty string omits schema, causing + // Postgres to default to `search_path`. + Schema string +} + +func (c *QueueStateCounterConfig) mustValidate() *QueueStateCounterConfig { + if c.Interval <= 0 { + panic("QueueStateCounterConfig.Interval must be above zero") + } + + return c +} + +// QueueStateCounter periodically counts jobs by queue and state, logging the +// results. This provides visibility into queue health without requiring +// external monitoring queries. The maintenance service only runs if there is a +// HookQueueStateCount hook registered that consumes the counts. +type QueueStateCounter struct { + riversharedmaintenance.QueueMaintainerServiceBase + startstop.BaseStartStop + + // exported for test purposes + Config *QueueStateCounterConfig + TestSignals QueueStateCounterTestSignals + + exec riverdriver.Executor +} + +func NewQueueStateCounter(archetype *baseservice.Archetype, config *QueueStateCounterConfig, exec riverdriver.Executor) *QueueStateCounter { + return baseservice.Init(archetype, &QueueStateCounter{ + Config: (&QueueStateCounterConfig{ + HookLookupGlobal: config.HookLookupGlobal, + Interval: cmp.Or(config.Interval, QueueStateCounterIntervalDefault), + QueueNames: config.QueueNames, + Schema: config.Schema, + }).mustValidate(), + exec: exec, + }) +} + +func (s *QueueStateCounter) Start(ctx context.Context) error { + ctx, shouldStart, started, stopped := s.StartInit(ctx) + if !shouldStart { + return nil + } + + s.StaggerStart(ctx) + + go func() { + started() + defer stopped() // this defer should come first so it's last out + + s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted) + defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped) + + // If no hooks are registered, there's no one to send counts to, so + // start, but don't do anything. + if len(s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindQueueStateCount)) < 1 { + <-ctx.Done() + return + } + + ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + if err := s.runOnce(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + s.Logger.ErrorContext(ctx, s.Name+": Error counting queue states", slog.String("error", err.Error())) + } + } + } + }() + + return nil +} + +func (s *QueueStateCounter) runOnce(ctx context.Context) error { + ctx, cancelFunc := context.WithTimeout(ctx, riversharedmaintenance.TimeoutDefault) + defer cancelFunc() + + rawCounts, err := s.exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + QueueNames: s.Config.QueueNames, + Schema: s.Config.Schema, + }) + if err != nil { + return err + } + + byQueue := s.buildResults(ctx, rawCounts) + + for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindQueueStateCount) { + hook.(rivertype.HookQueueStateCount).QueueStateCount(ctx, &rivertype.HookQueueStateCountParams{ //nolint:forcetypeassert + ByQueue: byQueue, + }) + } + + s.TestSignals.CountedOnce.Signal(struct{}{}) + + return nil +} + +// buildResults converts raw driver counts into results with all configured +// queues and all job states filled in (zeroed where needed), logging one line +// per queue. +func (s *QueueStateCounter) buildResults(ctx context.Context, rawCounts map[string]map[rivertype.JobState]int) map[string]rivertype.HookQueueStateCountResult { + // Ensure all configured queues are present, even if they have no jobs. + for _, queue := range s.Config.QueueNames { + if rawCounts[queue] == nil { + rawCounts[queue] = make(map[rivertype.JobState]int) + } + } + + countsByQueue := make(map[string]rivertype.HookQueueStateCountResult, len(rawCounts)) + + for queue, stateCounts := range rawCounts { + attrs := make([]slog.Attr, 0, 2+len(jobStateAll)) + attrs = append(attrs, slog.String("queue", queue)) + total := 0 + + for _, state := range jobStateAll { + if _, ok := stateCounts[state]; !ok { + stateCounts[state] = 0 + } + total += stateCounts[state] + attrs = append(attrs, slog.Int(string(state), stateCounts[state])) + } + + attrs = append(attrs, slog.Int("total", total)) + s.Logger.LogAttrs(ctx, slog.LevelInfo, s.Name+": Queue state counts", attrs...) + + countsByQueue[queue] = rivertype.HookQueueStateCountResult{ + ByState: stateCounts, + Total: total, + } + } + + return countsByQueue +} diff --git a/internal/maintenance/queue_state_counter_test.go b/internal/maintenance/queue_state_counter_test.go new file mode 100644 index 00000000..257776ad --- /dev/null +++ b/internal/maintenance/queue_state_counter_test.go @@ -0,0 +1,161 @@ +package maintenance + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/internal/hooklookup" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/startstoptest" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivertype" +) + +func TestQueueStateCounter(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + } + + setup := func(t *testing.T, hooks []rivertype.Hook, queueNames []string) (*QueueStateCounter, *testBundle) { + t.Helper() + + var ( + tx = riverdbtest.TestTxPgx(ctx, t) + exec = riverpgxv5.New(nil).UnwrapExecutor(tx) + ) + + svc := NewQueueStateCounter( + riversharedtest.BaseServiceArchetype(t), + &QueueStateCounterConfig{ + HookLookupGlobal: hooklookup.NewHookLookup(hooks), + QueueNames: queueNames, + }, + exec, + ) + svc.StaggerStartupDisable(true) + svc.TestSignals.Init(t) + t.Cleanup(svc.Stop) + + return svc, &testBundle{exec: exec} + } + + t.Run("CountsJobsByQueueAndState", func(t *testing.T) { + t.Parallel() + + hook := &capturingQueueStateCountHook{} + svc, bundle := setup(t, []rivertype.Hook{hook}, []string{"queue1", "queue2", "queue_empty"}) + + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateDiscarded)}) + + require.NoError(t, svc.Start(ctx)) + svc.TestSignals.CountedOnce.WaitOrTimeout() + + counts := hook.LastParams().ByQueue + + // All three configured queues are present, including queue_empty + // which has no jobs. + require.Len(t, counts, 3) + + // All job states are present for each queue, even those with zero jobs. + for _, queue := range []string{"queue1", "queue2", "queue_empty"} { + require.Len(t, counts[queue].ByState, len(rivertype.JobStates()), "queue %q should have all states", queue) + } + + require.Equal(t, 2, counts["queue1"].ByState[rivertype.JobStateAvailable]) + require.Equal(t, 1, counts["queue1"].ByState[rivertype.JobStateRunning]) + require.Equal(t, 0, counts["queue1"].ByState[rivertype.JobStateCompleted]) + require.Equal(t, 3, counts["queue1"].Total) + + require.Equal(t, 1, counts["queue2"].ByState[rivertype.JobStateAvailable]) + require.Equal(t, 2, counts["queue2"].ByState[rivertype.JobStateCompleted]) + require.Equal(t, 1, counts["queue2"].ByState[rivertype.JobStateDiscarded]) + require.Equal(t, 0, counts["queue2"].ByState[rivertype.JobStateRunning]) + require.Equal(t, 4, counts["queue2"].Total) + + // queue_empty has all states present, all zero. + for _, state := range rivertype.JobStates() { + require.Equal(t, 0, counts["queue_empty"].ByState[state], "queue_empty[%s] should be 0", state) + } + require.Equal(t, 0, counts["queue_empty"].Total) + }) + + t.Run("NoopsWithoutHooks", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, nil, nil) + + // With no hooks, Start returns immediately without starting the service. + require.NoError(t, svc.Start(ctx)) + + // Service should not have started, so calling Start again should also + // succeed (StartInit would return false if it were already running). + require.NoError(t, svc.Start(ctx)) + }) + + t.Run("StartStop", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, []rivertype.Hook{&capturingQueueStateCountHook{}}, nil) + + require.NoError(t, svc.Start(ctx)) + svc.TestSignals.CountedOnce.WaitOrTimeout() + }) + + t.Run("StartStopStress", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, []rivertype.Hook{&capturingQueueStateCountHook{}}, nil) + svc.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress + svc.TestSignals = QueueStateCounterTestSignals{} // deinit so channels don't fill + + startstoptest.Stress(ctx, t, svc) + }) + + t.Run("StartStopStressNoHooks", func(t *testing.T) { + t.Parallel() + + svc, _ := setup(t, nil, nil) + svc.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress + svc.TestSignals = QueueStateCounterTestSignals{} // deinit so channels don't fill + + startstoptest.Stress(ctx, t, svc) + }) +} + +// capturingQueueStateCountHook captures the last params received from the hook +// invocation. +type capturingQueueStateCountHook struct { + mu sync.Mutex + lastParams *rivertype.HookQueueStateCountParams +} + +func (h *capturingQueueStateCountHook) IsHook() bool { return true } + +func (h *capturingQueueStateCountHook) QueueStateCount(_ context.Context, params *rivertype.HookQueueStateCountParams) { + h.mu.Lock() + defer h.mu.Unlock() + h.lastParams = params +} + +func (h *capturingQueueStateCountHook) LastParams() *rivertype.HookQueueStateCountParams { + h.mu.Lock() + defer h.mu.Unlock() + return h.lastParams +} diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 43eacb26..1e108c7f 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -202,7 +202,7 @@ type Executor interface { JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error) JobCountByAllStates(ctx context.Context, params *JobCountByAllStatesParams) (map[rivertype.JobState]int, error) - JobCountByQueueAndState(ctx context.Context, params *JobCountByQueueAndStateParams) ([]*JobCountByQueueAndStateResult, error) + JobCountByQueueAndState(ctx context.Context, params *JobCountByQueueAndStateParams) (map[string]map[rivertype.JobState]int, error) JobCountByState(ctx context.Context, params *JobCountByStateParams) (int, error) JobDelete(ctx context.Context, params *JobDeleteParams) (*rivertype.JobRow, error) JobDeleteBefore(ctx context.Context, params *JobDeleteBeforeParams) (int, error) @@ -359,12 +359,6 @@ type JobCountByQueueAndStateParams struct { Schema string } -type JobCountByQueueAndStateResult struct { - CountAvailable int64 - CountRunning int64 - Queue string -} - type JobCountByStateParams struct { Schema string State rivertype.JobState diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 14ec9c43..660eab0b 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -132,48 +132,20 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest($1::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'available' - GROUP BY queue -) - SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC + queue, + state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE queue = ANY($1::text[]) +GROUP BY queue, state +ORDER BY queue, state ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State RiverJobState + Count int64 } func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { @@ -185,7 +157,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ef41a525..ab11fe93 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -222,18 +222,17 @@ func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver. return countsMap, nil } -func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) ([]*riverdriver.JobCountByQueueAndStateResult, error) { +func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) (map[string]map[rivertype.JobState]int, error) { rows, err := dbsqlc.New().JobCountByQueueAndState(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.QueueNames) if err != nil { return nil, interpretError(err) } - results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows)) - for i, row := range rows { - results[i] = &riverdriver.JobCountByQueueAndStateResult{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - Queue: row.Queue, + results := make(map[string]map[rivertype.JobState]int) + for _, row := range rows { + if results[row.Queue] == nil { + results[row.Queue] = make(map[rivertype.JobState]int) } + results[row.Queue][rivertype.JobState(row.State)] = int(row.Count) } return results, nil } diff --git a/riverdriver/riverdrivertest/job_read.go b/riverdriver/riverdrivertest/job_read.go index 5649d910..ba0c1ccf 100644 --- a/riverdriver/riverdrivertest/job_read.go +++ b/riverdriver/riverdrivertest/job_read.go @@ -86,7 +86,7 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx t.Run("JobCountByQueueAndState", func(t *testing.T) { t.Parallel() - t.Run("CountsJobsInAvailableAndRunningForEachOfTheSpecifiedQueues", func(t *testing.T) { + t.Run("CountsAllStatesForSpecifiedQueues", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) @@ -96,10 +96,10 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateCompleted)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue3"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue3"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ QueueNames: []string{"queue1", "queue2"}, @@ -109,62 +109,32 @@ func exerciseJobRead[TTx any](ctx context.Context, t *testing.T, executorWithTx require.Len(t, countsByQueue, 2) - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(2), countsByQueue[0].CountAvailable) - require.Equal(t, int64(3), countsByQueue[0].CountRunning) - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) - }) - - t.Run("IncludesRequestedQueuesThatHaveNoJobs", func(t *testing.T) { - t.Parallel() - - exec, _ := setup(ctx, t) - - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) - - countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ - QueueNames: []string{"queue1", "queue2"}, - Schema: "", - }) - require.NoError(t, err) - - require.Len(t, countsByQueue, 2) - - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(0), countsByQueue[0].CountAvailable) - require.Equal(t, int64(0), countsByQueue[0].CountRunning) + require.Equal(t, 2, countsByQueue["queue1"][rivertype.JobStateAvailable]) + require.Equal(t, 3, countsByQueue["queue1"][rivertype.JobStateRunning]) + require.Equal(t, 1, countsByQueue["queue1"][rivertype.JobStateCompleted]) + require.Equal(t, 0, countsByQueue["queue1"][rivertype.JobStateCancelled]) - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) + require.Equal(t, 1, countsByQueue["queue2"][rivertype.JobStateAvailable]) + require.Equal(t, 1, countsByQueue["queue2"][rivertype.JobStateRunning]) }) - t.Run("InputQueueNamesAreDeduplicated", func(t *testing.T) { + t.Run("ExcludesQueuesNotRequested", func(t *testing.T) { t.Parallel() exec, _ := setup(ctx, t) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue1"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ - QueueNames: []string{"queue2", "queue1", "queue1"}, + QueueNames: []string{"queue1"}, Schema: "", }) require.NoError(t, err) - require.Len(t, countsByQueue, 2) - - require.Equal(t, "queue1", countsByQueue[0].Queue) - require.Equal(t, int64(0), countsByQueue[0].CountAvailable) - require.Equal(t, int64(0), countsByQueue[0].CountRunning) - - require.Equal(t, "queue2", countsByQueue[1].Queue) - require.Equal(t, int64(1), countsByQueue[1].CountAvailable) - require.Equal(t, int64(1), countsByQueue[1].CountRunning) + require.Len(t, countsByQueue, 1) + require.Equal(t, 1, countsByQueue["queue1"][rivertype.JobStateAvailable]) + require.Nil(t, countsByQueue["queue2"]) }) }) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index dde85de4..adf65513 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -86,42 +86,14 @@ FROM /* TEMPLATE: schema */ river_job GROUP BY state; -- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest(@queue_names::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY(@queue_names::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY(@queue_names::text[]) - AND state = 'available' - GROUP BY queue -) - SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC; + queue, + state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE queue = ANY(@queue_names::text[]) +GROUP BY queue, state +ORDER BY queue, state; -- name: JobCountByState :one SELECT count(*) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index a86a2235..8ba35d1a 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -129,48 +129,20 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH all_queues AS ( - SELECT DISTINCT unnest($1::text[])::text AS queue -), - -running_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'running' - GROUP BY queue -), - -available_job_counts AS ( - SELECT - queue, - COUNT(*) AS count - FROM - /* TEMPLATE: schema */river_job - WHERE queue = ANY($1::text[]) - AND state = 'available' - GROUP BY queue -) - SELECT - all_queues.queue, - COALESCE(available_job_counts.count, 0) AS count_available, - COALESCE(running_job_counts.count, 0) AS count_running -FROM - all_queues -LEFT JOIN - running_job_counts ON all_queues.queue = running_job_counts.queue -LEFT JOIN - available_job_counts ON all_queues.queue = available_job_counts.queue -ORDER BY all_queues.queue ASC + queue, + state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE queue = ANY($1::text[]) +GROUP BY queue, state +ORDER BY queue, state ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State RiverJobState + Count int64 } func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { @@ -182,7 +154,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 9d3fed61..83da6e07 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -230,18 +230,17 @@ func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver. return countsMap, nil } -func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) ([]*riverdriver.JobCountByQueueAndStateResult, error) { +func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) (map[string]map[rivertype.JobState]int, error) { rows, err := dbsqlc.New().JobCountByQueueAndState(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.QueueNames) if err != nil { return nil, interpretError(err) } - results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows)) - for i, row := range rows { - results[i] = &riverdriver.JobCountByQueueAndStateResult{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - Queue: row.Queue, + results := make(map[string]map[rivertype.JobState]int) + for _, row := range rows { + if results[row.Queue] == nil { + results[row.Queue] = make(map[rivertype.JobState]int) } + results[row.Queue][rivertype.JobState(row.State)] = int(row.Count) } return results, nil } diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index dfb72dc3..4403b894 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -56,22 +56,14 @@ FROM /* TEMPLATE: schema */river_job GROUP BY state; -- name: JobCountByQueueAndState :many -WITH queue_stats AS ( - SELECT - river_job.queue, - COUNT(CASE WHEN river_job.state = 'available' THEN 1 END) AS count_available, - COUNT(CASE WHEN river_job.state = 'running' THEN 1 END) AS count_running - FROM /* TEMPLATE: schema */river_job - WHERE river_job.queue IN (sqlc.slice('queue_names')) - GROUP BY river_job.queue -) - SELECT - cast(queue AS text) AS queue, - count_available, - count_running -FROM queue_stats -ORDER BY queue ASC; + cast(river_job.queue AS text) AS queue, + cast(river_job.state AS text) AS state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE river_job.queue IN (sqlc.slice('queue_names')) +GROUP BY river_job.queue, river_job.state +ORDER BY river_job.queue ASC, river_job.state ASC; -- name: JobCountByState :one SELECT count(*) diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index 083dc5c3..468c127b 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -103,28 +103,20 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount } const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many -WITH queue_stats AS ( - SELECT - river_job.queue, - COUNT(CASE WHEN river_job.state = 'available' THEN 1 END) AS count_available, - COUNT(CASE WHEN river_job.state = 'running' THEN 1 END) AS count_running - FROM /* TEMPLATE: schema */river_job - WHERE river_job.queue IN (/*SLICE:queue_names*/?) - GROUP BY river_job.queue -) - SELECT - cast(queue AS text) AS queue, - count_available, - count_running -FROM queue_stats -ORDER BY queue ASC + cast(river_job.queue AS text) AS queue, + cast(river_job.state AS text) AS state, + COUNT(*) AS count +FROM /* TEMPLATE: schema */river_job +WHERE river_job.queue IN (/*SLICE:queue_names*/?) +GROUP BY river_job.queue, river_job.state +ORDER BY river_job.queue ASC, river_job.state ASC ` type JobCountByQueueAndStateRow struct { - Queue string - CountAvailable int64 - CountRunning int64 + Queue string + State string + Count int64 } func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNames []string) ([]*JobCountByQueueAndStateRow, error) { @@ -146,7 +138,7 @@ func (q *Queries) JobCountByQueueAndState(ctx context.Context, db DBTX, queueNam var items []*JobCountByQueueAndStateRow for rows.Next() { var i JobCountByQueueAndStateRow - if err := rows.Scan(&i.Queue, &i.CountAvailable, &i.CountRunning); err != nil { + if err := rows.Scan(&i.Queue, &i.State, &i.Count); err != nil { return nil, err } items = append(items, &i) diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 1eda9f14..a494ee2d 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -290,51 +290,18 @@ func (e *Executor) JobCountByAllStates(ctx context.Context, params *riverdriver. return countsMap, nil } -func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) ([]*riverdriver.JobCountByQueueAndStateResult, error) { +func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdriver.JobCountByQueueAndStateParams) (map[string]map[rivertype.JobState]int, error) { rows, err := dbsqlc.New().JobCountByQueueAndState(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.QueueNames) if err != nil { return nil, interpretError(err) } - - // The PostgreSQL drivers implement this query with an `all_queues` CTE and - // LEFT JOINs, so they return one row per requested queue, including queues - // that currently have no jobs. The input queue list is deduplicated in SQL. - // The SQLite sqlc driver only reliably supports `sqlc.slice` in `IN (...)`, - // and we haven't found a workable way to bind a parameterized list through - // `json_each(...)` to produce equivalent SQL. The SQLite SQL query therefore - // returns only queues with matching rows, and this wrapper fills in missing - // queues to match PostgreSQL behavior. - countsByQueue := make(map[string]struct { - CountAvailable int64 - CountRunning int64 - }, len(rows)) + results := make(map[string]map[rivertype.JobState]int) for _, row := range rows { - countsByQueue[row.Queue] = struct { - CountAvailable int64 - CountRunning int64 - }{ - CountAvailable: row.CountAvailable, - CountRunning: row.CountRunning, - } - } - - queueNames := slices.Clone(params.QueueNames) - slices.Sort(queueNames) - queueNames = slices.Compact(queueNames) - - results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(queueNames)) - for _, queueName := range queueNames { - result := &riverdriver.JobCountByQueueAndStateResult{ - Queue: queueName, - } - if counts, ok := countsByQueue[queueName]; ok { - result.CountAvailable = counts.CountAvailable - result.CountRunning = counts.CountRunning + if results[row.Queue] == nil { + results[row.Queue] = make(map[rivertype.JobState]int) } - - results = append(results, result) + results[row.Queue][rivertype.JobState(row.State)] = int(row.Count) } - return results, nil } diff --git a/rivertype/river_type.go b/rivertype/river_type.go index 13d07f63..0233eca4 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -351,6 +351,35 @@ type HookPeriodicJobsStartParams struct { DurableJobs []*DurablePeriodicJob } +// HookQueueStateCount is an interface to a hook that runs each time the queue +// state counter maintenance service performs a count. +type HookQueueStateCount interface { + Hook + + // QueueStateCount is invoked after the queue state counter has completed a + // count of jobs by queue and state. + QueueStateCount(ctx context.Context, params *HookQueueStateCountParams) +} + +// HookQueueStateCountParams are parameters for HookQueueStateCount. +type HookQueueStateCountParams struct { + // ByQueue is a map of queue name to its count results. All queues + // configured with this client are included, even if they don't currently + // contain jobs. + ByQueue map[string]HookQueueStateCountResult +} + +// HookQueueStateCountResult contains coutns for a single queue. +type HookQueueStateCountResult struct { + // ByState is a map of job state to the number of jobs in that state for the + // queue. All job states are included, even if they have no jobs in that + // state. + ByState map[JobState]int + + // Total is the total number of jobs across all states for the queue. + Total int +} + // HookWorkBegin is an interface to a hook that runs after a job has been locked // for work and before it's worked. type HookWorkBegin interface {