Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `otelriver.Middleware` now implements `HookQueueStateCount` and uses it to emit job counts by state for each known job queue. [PR #47](https://github.com/riverqueue/rivercontrib/pull/47).

## [0.7.0] - 2026-01-18

### Added
Expand Down
12 changes: 5 additions & 7 deletions otelriver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

// replace github.com/riverqueue/river => ../../river
replace github.com/riverqueue/river => ../../river

// replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver
replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver

// replace github.com/riverqueue/river/riverdriver => ../../river/riverdriver
replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../river/riverdriver/riverpgxv5

// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../river/riverdriver/riverpgxv5
replace github.com/riverqueue/river/rivershared => ../../river/rivershared

// replace github.com/riverqueue/river/rivershared => ../../river/rivershared

// replace github.com/riverqueue/river/rivertype => ../../river/rivertype
replace github.com/riverqueue/river/rivertype => ../../river/rivertype
Comment on lines +43 to +51
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not mergeable as-is, will need to point at a newer release or commit instead of using replace.

32 changes: 32 additions & 0 deletions otelriver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type MiddlewareConfig struct {
// Middleware is a River middleware that emits OpenTelemetry metrics when jobs
// are inserted or worked.
type Middleware struct {
river.HookDefaults
river.MiddlewareDefaults

config *MiddlewareConfig
Expand All @@ -78,6 +79,8 @@ type middlewareMetrics struct {
messagingClientOperationDuration metric.Float64Histogram
messagingClientSentMessages metric.Int64Counter
messagingProcessDuration metric.Float64Histogram
queueStateCount metric.Int64Gauge
queueTotalCount metric.Int64Gauge
workCount metric.Int64Counter
workDuration metric.Float64Gauge
workDurationHistogram metric.Float64Histogram
Expand Down Expand Up @@ -116,6 +119,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware {
insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")),
insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)),
insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)),
queueStateCount: mustInt64Gauge(meter, prefix+"queue_state_count", metric.WithDescription("Number of jobs in a queue by state"), metric.WithUnit("{job}")),
queueTotalCount: mustInt64Gauge(meter, prefix+"queue_total_count", metric.WithDescription("Total number of jobs in a queue across all states"), metric.WithUnit("{job}")),
workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")),
workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)),
workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)),
Expand Down Expand Up @@ -182,6 +187,25 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
return insertRes, err
}

func (m *Middleware) QueueStateCount(ctx context.Context, params *rivertype.HookQueueStateCountParams) {
for queue, result := range params.ByQueue {
for state, count := range result.ByState {
m.metrics.queueStateCount.Record(ctx, int64(count),
metric.WithAttributes(
attribute.String("queue", queue),
attribute.String("state", string(state)),
),
)
}

m.metrics.queueTotalCount.Record(ctx, int64(result.Total),
metric.WithAttributes(
attribute.String("queue", queue),
),
)
}
}

func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
spanName := prefix + "work"
if m.config.EnableWorkSpanJobKindSuffix {
Expand Down Expand Up @@ -288,6 +312,14 @@ func mustFloat64Histogram(meter metric.Meter, name string, options ...metric.Flo
return metric
}

func mustInt64Gauge(meter metric.Meter, name string, options ...metric.Int64GaugeOption) metric.Int64Gauge {
metric, err := meter.Int64Gauge(name, options...)
if err != nil {
panic(err)
}
return metric
}

func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64CounterOption) metric.Int64Counter {
metric, err := meter.Int64Counter(name, options...)
if err != nil {
Expand Down
81 changes: 81 additions & 0 deletions otelriver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

// Verify interface compliance.
var (
_ rivertype.HookQueueStateCount = &Middleware{} // the middleware is both hook and middleware (the hook so it can emit queue counts); kind of weird, but it works fine
_ rivertype.JobInsertMiddleware = &Middleware{}
_ rivertype.WorkerMiddleware = &Middleware{}
)
Expand Down Expand Up @@ -258,6 +259,86 @@ func TestMiddleware(t *testing.T) {
}
})

t.Run("QueueStateCount", func(t *testing.T) {
t.Parallel()

middleware, bundle := setup(t)

params := &rivertype.HookQueueStateCountParams{
ByQueue: map[string]rivertype.HookQueueStateCountResult{
"default": {
ByState: map[rivertype.JobState]int{
rivertype.JobStateAvailable: 5,
rivertype.JobStateRunning: 3,
rivertype.JobStateCompleted: 10,
},
Total: 18,
},
"critical": {
ByState: map[rivertype.JobState]int{
rivertype.JobStateAvailable: 2,
rivertype.JobStateScheduled: 1,
},
Total: 3,
},
},
}

// Preflight: verify Total is the sum of all ByState counts so
// test data stays consistent if someone edits it later.
for queue, result := range params.ByQueue {
var sum int
for _, count := range result.ByState {
sum += count
}
require.Equal(t, sum, result.Total, "Total for queue %q doesn't match sum of ByState", queue)
}

middleware.QueueStateCount(ctx, params)

var metrics metricdata.ResourceMetrics
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))

_, metricData := requireMetric[metricdata.Gauge[int64]](t, metrics, "river.queue_state_count")

// Build a map of (queue, state) -> value for easy assertion.
type queueState struct{ queue, state string }
counts := make(map[queueState]int64)
for _, dataPoint := range metricData.DataPoints {
var key queueState
for _, attr := range dataPoint.Attributes.ToSlice() {
switch string(attr.Key) {
case "queue":
key.queue = attr.Value.AsString()
case "state":
key.state = attr.Value.AsString()
}
}
counts[key] = dataPoint.Value
}

require.Equal(t, int64(5), counts[queueState{"default", "available"}])
require.Equal(t, int64(3), counts[queueState{"default", "running"}])
require.Equal(t, int64(10), counts[queueState{"default", "completed"}])
require.Equal(t, int64(2), counts[queueState{"critical", "available"}])
require.Equal(t, int64(1), counts[queueState{"critical", "scheduled"}])

// Verify total counts per queue.
_, totalData := requireMetric[metricdata.Gauge[int64]](t, metrics, "river.queue_total_count")

totals := make(map[string]int64)
for _, dataPoint := range totalData.DataPoints {
for _, attr := range dataPoint.Attributes.ToSlice() {
if string(attr.Key) == "queue" {
totals[attr.Value.AsString()] = dataPoint.Value
}
}
}

require.Equal(t, int64(18), totals["default"])
require.Equal(t, int64(3), totals["critical"])
})

t.Run("WorkSuccess", func(t *testing.T) {
t.Parallel()

Expand Down
Loading