From dd91f10d27a6b9ecbe47442be13c91a1d998d3ae Mon Sep 17 00:00:00 2001 From: Brandur Date: Sun, 12 Apr 2026 01:01:40 -0500 Subject: [PATCH] Generate OTEL queue count metrics See [1] for context, but here we implement `HookQueueStateCount` in the existing OTEL middleware so that it can emit metrics around queue counts, a pretty handy feature that we have on definitive request for, but which I'd guess most River users would like to have access to. [1] https://github.com/riverqueue/river/pull/1203 --- CHANGELOG.md | 4 ++ otelriver/go.mod | 12 +++--- otelriver/middleware.go | 32 ++++++++++++++ otelriver/middleware_test.go | 81 ++++++++++++++++++++++++++++++++++++ 4 files changed, 122 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5957e3..d43958e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `otelriver.Middleware` now implements `HookQueueStateCount` and uses it to emit job counts by state for each known job queue. [PR #47](https://github.com/riverqueue/rivercontrib/pull/47). + ## [0.7.0] - 2026-01-18 ### Added diff --git a/otelriver/go.mod b/otelriver/go.mod index 921397c..3410d91 100644 --- a/otelriver/go.mod +++ b/otelriver/go.mod @@ -40,14 +40,12 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -// replace github.com/riverqueue/river => ../../river +replace github.com/riverqueue/river => ../../river -// replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver +replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver -// replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver +replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../river/riverdriver/riverpgxv5 -// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../river/riverdriver/riverpgxv5 +replace github.com/riverqueue/river/rivershared => ../../river/rivershared -// replace github.com/riverqueue/river/rivershared => ../../river/rivershared - -// replace github.com/riverqueue/river/rivertype => ../../river/rivertype +replace github.com/riverqueue/river/rivertype => ../../river/rivertype diff --git a/otelriver/middleware.go b/otelriver/middleware.go index be7e10b..3b95a6b 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -60,6 +60,7 @@ type MiddlewareConfig struct { // Middleware is a River middleware that emits OpenTelemetry metrics when jobs // are inserted or worked. type Middleware struct { + river.HookDefaults river.MiddlewareDefaults config *MiddlewareConfig @@ -78,6 +79,8 @@ type middlewareMetrics struct { messagingClientOperationDuration metric.Float64Histogram messagingClientSentMessages metric.Int64Counter messagingProcessDuration metric.Float64Histogram + queueStateCount metric.Int64Gauge + queueTotalCount metric.Int64Gauge workCount metric.Int64Counter workDuration metric.Float64Gauge workDurationHistogram metric.Float64Histogram @@ -116,6 +119,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware { insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")), insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)), insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)), + queueStateCount: mustInt64Gauge(meter, prefix+"queue_state_count", metric.WithDescription("Number of jobs in a queue by state"), metric.WithUnit("{job}")), + queueTotalCount: mustInt64Gauge(meter, prefix+"queue_total_count", metric.WithDescription("Total number of jobs in a queue across all states"), metric.WithUnit("{job}")), workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")), workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)), workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)), @@ -182,6 +187,25 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job return insertRes, err } +func (m *Middleware) QueueStateCount(ctx context.Context, params *rivertype.HookQueueStateCountParams) { + for queue, result := range params.ByQueue { + for state, count := range result.ByState { + m.metrics.queueStateCount.Record(ctx, int64(count), + metric.WithAttributes( + attribute.String("queue", queue), + attribute.String("state", string(state)), + ), + ) + } + + m.metrics.queueTotalCount.Record(ctx, int64(result.Total), + metric.WithAttributes( + attribute.String("queue", queue), + ), + ) + } +} + func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error { spanName := prefix + "work" if m.config.EnableWorkSpanJobKindSuffix { @@ -288,6 +312,14 @@ func mustFloat64Histogram(meter metric.Meter, name string, options ...metric.Flo return metric } +func mustInt64Gauge(meter metric.Meter, name string, options ...metric.Int64GaugeOption) metric.Int64Gauge { + metric, err := meter.Int64Gauge(name, options...) + if err != nil { + panic(err) + } + return metric +} + func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64CounterOption) metric.Int64Counter { metric, err := meter.Int64Counter(name, options...) if err != nil { diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index a0b84e3..bbb3cb8 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -21,6 +21,7 @@ import ( // Verify interface compliance. var ( + _ rivertype.HookQueueStateCount = &Middleware{} // the middleware is both hook and middleware (the hook so it can emit queue counts); kind of weird, but it works fine _ rivertype.JobInsertMiddleware = &Middleware{} _ rivertype.WorkerMiddleware = &Middleware{} ) @@ -258,6 +259,86 @@ func TestMiddleware(t *testing.T) { } }) + t.Run("QueueStateCount", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + params := &rivertype.HookQueueStateCountParams{ + ByQueue: map[string]rivertype.HookQueueStateCountResult{ + "default": { + ByState: map[rivertype.JobState]int{ + rivertype.JobStateAvailable: 5, + rivertype.JobStateRunning: 3, + rivertype.JobStateCompleted: 10, + }, + Total: 18, + }, + "critical": { + ByState: map[rivertype.JobState]int{ + rivertype.JobStateAvailable: 2, + rivertype.JobStateScheduled: 1, + }, + Total: 3, + }, + }, + } + + // Preflight: verify Total is the sum of all ByState counts so + // test data stays consistent if someone edits it later. + for queue, result := range params.ByQueue { + var sum int + for _, count := range result.ByState { + sum += count + } + require.Equal(t, sum, result.Total, "Total for queue %q doesn't match sum of ByState", queue) + } + + middleware.QueueStateCount(ctx, params) + + var metrics metricdata.ResourceMetrics + require.NoError(t, bundle.metricReader.Collect(ctx, &metrics)) + + _, metricData := requireMetric[metricdata.Gauge[int64]](t, metrics, "river.queue_state_count") + + // Build a map of (queue, state) -> value for easy assertion. + type queueState struct{ queue, state string } + counts := make(map[queueState]int64) + for _, dataPoint := range metricData.DataPoints { + var key queueState + for _, attr := range dataPoint.Attributes.ToSlice() { + switch string(attr.Key) { + case "queue": + key.queue = attr.Value.AsString() + case "state": + key.state = attr.Value.AsString() + } + } + counts[key] = dataPoint.Value + } + + require.Equal(t, int64(5), counts[queueState{"default", "available"}]) + require.Equal(t, int64(3), counts[queueState{"default", "running"}]) + require.Equal(t, int64(10), counts[queueState{"default", "completed"}]) + require.Equal(t, int64(2), counts[queueState{"critical", "available"}]) + require.Equal(t, int64(1), counts[queueState{"critical", "scheduled"}]) + + // Verify total counts per queue. + _, totalData := requireMetric[metricdata.Gauge[int64]](t, metrics, "river.queue_total_count") + + totals := make(map[string]int64) + for _, dataPoint := range totalData.DataPoints { + for _, attr := range dataPoint.Attributes.ToSlice() { + if string(attr.Key) == "queue" { + totals[attr.Value.AsString()] = dataPoint.Value + } + } + } + + require.Equal(t, int64(18), totals["default"]) + require.Equal(t, int64(3), totals["critical"]) + }) + t.Run("WorkSuccess", func(t *testing.T) { t.Parallel()