Skip to content

Commit acd299b

Browse files
committed
Add READY task age metrics to Redis metrics collector
1 parent 85c99c0 commit acd299b

2 files changed

Lines changed: 198 additions & 0 deletions

File tree

django_tasks_redis/metrics/collectors.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77

88
import logging
9+
from datetime import datetime, timezone
910
from typing import TYPE_CHECKING
1011

1112
if TYPE_CHECKING:
@@ -31,6 +32,8 @@ class RedisTaskMetricsCollector(Collector):
3132
3233
Metrics exposed:
3334
- django_tasks_queue_length: Gauge for current queue length by status
35+
- django_tasks_queue_oldest_ready_age_seconds: Age of oldest READY task in seconds
36+
- django_tasks_queue_newest_ready_age_seconds: Age of newest READY task in seconds
3437
"""
3538

3639
def __init__(self, backend: "RedisTaskBackend"):
@@ -82,6 +85,61 @@ def collect(self):
8285

8386
yield queue_length
8487

88+
# Calculate age metrics for READY tasks
89+
ready_count = status_counts.get("READY", 0)
90+
if ready_count > 0:
91+
ready_tasks, _ = self.backend.get_all_tasks(
92+
status="READY",
93+
limit=ready_count # Get all READY tasks
94+
)
95+
96+
if ready_tasks:
97+
now = datetime.now(timezone.utc)
98+
ages = []
99+
100+
for task in ready_tasks:
101+
enqueued_at_str = task.get("enqueued_at", "")
102+
if enqueued_at_str:
103+
try:
104+
from django_tasks_redis.utils import deserialize_datetime
105+
enqueued_at = deserialize_datetime(enqueued_at_str)
106+
if enqueued_at:
107+
# Make sure enqueued_at is timezone-aware
108+
if enqueued_at.tzinfo is None:
109+
enqueued_at = enqueued_at.replace(tzinfo=timezone.utc)
110+
age_seconds = (now - enqueued_at).total_seconds()
111+
ages.append(age_seconds)
112+
except Exception as e:
113+
logger.warning(
114+
"Failed to parse enqueued_at for task: %s",
115+
e
116+
)
117+
118+
if ages:
119+
# Create gauge for oldest READY task age
120+
oldest_age = GaugeMetricFamily(
121+
"django_tasks_queue_oldest_ready_age_seconds",
122+
"Age of the oldest READY task in seconds",
123+
labels=["backend"],
124+
)
125+
oldest_age.add_metric(
126+
labels=[self.backend_name],
127+
value=max(ages),
128+
)
129+
yield oldest_age
130+
131+
# Create gauge for newest READY task age
132+
newest_age = GaugeMetricFamily(
133+
"django_tasks_queue_newest_ready_age_seconds",
134+
"Age of the newest READY task in seconds",
135+
labels=["backend"],
136+
)
137+
newest_age.add_metric(
138+
labels=[self.backend_name],
139+
value=min(ages),
140+
)
141+
yield newest_age
142+
85143
logger.debug(
86144
"Collected metrics for backend %s: %s",
87145
self.backend_name,

tests/test_metrics.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Tests for Prometheus metrics integration.
33
"""
44

5+
from datetime import datetime, timedelta, timezone
56
from unittest.mock import Mock, patch
67

78
import pytest
@@ -135,3 +136,142 @@ def test_collect_handles_errors(self):
135136

136137
# Should return empty list on error
137138
assert len(metrics) == 0
139+
140+
@pytest.mark.skipif(
141+
not _prometheus_available(),
142+
reason="prometheus-client not installed",
143+
)
144+
def test_collect_with_ready_tasks_age_metrics(self):
145+
"""Test that collect() includes age metrics for READY tasks."""
146+
from django_tasks_redis.metrics.collectors import RedisTaskMetricsCollector
147+
from django_tasks_redis.utils import serialize_datetime
148+
149+
mock_backend = Mock()
150+
mock_backend.alias = "test"
151+
mock_backend.get_status_counts.return_value = {
152+
TaskResultStatus.READY: 3,
153+
TaskResultStatus.RUNNING: 1,
154+
}
155+
156+
# Create mock tasks with different ages
157+
now = datetime.now(timezone.utc)
158+
oldest_task_time = now - timedelta(minutes=10)
159+
middle_task_time = now - timedelta(minutes=5)
160+
newest_task_time = now - timedelta(minutes=1)
161+
162+
mock_backend.get_all_tasks.return_value = (
163+
[
164+
{"enqueued_at": serialize_datetime(oldest_task_time)},
165+
{"enqueued_at": serialize_datetime(middle_task_time)},
166+
{"enqueued_at": serialize_datetime(newest_task_time)},
167+
],
168+
3,
169+
)
170+
171+
collector = RedisTaskMetricsCollector(mock_backend)
172+
173+
# Call collect
174+
metrics = list(collector.collect())
175+
176+
# Verify get_all_tasks was called with READY status
177+
mock_backend.get_all_tasks.assert_called_once_with(
178+
status="READY",
179+
limit=3
180+
)
181+
182+
# Should return 3 metrics: queue_length, oldest_age, newest_age
183+
assert len(metrics) == 3
184+
185+
metric_names = {m.name for m in metrics}
186+
assert "django_tasks_queue_length" in metric_names
187+
assert "django_tasks_queue_oldest_ready_age_seconds" in metric_names
188+
assert "django_tasks_queue_newest_ready_age_seconds" in metric_names
189+
190+
# Check the age metrics values
191+
oldest_metric = next(
192+
m for m in metrics
193+
if m.name == "django_tasks_queue_oldest_ready_age_seconds"
194+
)
195+
newest_metric = next(
196+
m for m in metrics
197+
if m.name == "django_tasks_queue_newest_ready_age_seconds"
198+
)
199+
200+
# Get the metric values (samples are stored in the metric family)
201+
oldest_samples = list(oldest_metric.samples)
202+
newest_samples = list(newest_metric.samples)
203+
204+
assert len(oldest_samples) == 1
205+
assert len(newest_samples) == 1
206+
207+
# Oldest should be ~600 seconds (10 minutes)
208+
assert 590 <= oldest_samples[0].value <= 610
209+
210+
# Newest should be ~60 seconds (1 minute)
211+
assert 50 <= newest_samples[0].value <= 70
212+
213+
@pytest.mark.skipif(
214+
not _prometheus_available(),
215+
reason="prometheus-client not installed",
216+
)
217+
def test_collect_without_ready_tasks_no_age_metrics(self):
218+
"""Test that collect() doesn't include age metrics when no READY tasks."""
219+
from django_tasks_redis.metrics.collectors import RedisTaskMetricsCollector
220+
221+
mock_backend = Mock()
222+
mock_backend.alias = "test"
223+
mock_backend.get_status_counts.return_value = {
224+
TaskResultStatus.RUNNING: 5,
225+
TaskResultStatus.SUCCESSFUL: 10,
226+
}
227+
228+
collector = RedisTaskMetricsCollector(mock_backend)
229+
230+
# Call collect
231+
metrics = list(collector.collect())
232+
233+
# Should only return queue_length metric, no age metrics
234+
assert len(metrics) == 1
235+
assert metrics[0].name == "django_tasks_queue_length"
236+
237+
# get_all_tasks should not be called when no READY tasks
238+
mock_backend.get_all_tasks.assert_not_called()
239+
240+
@pytest.mark.skipif(
241+
not _prometheus_available(),
242+
reason="prometheus-client not installed",
243+
)
244+
def test_collect_with_invalid_enqueued_at(self):
245+
"""Test that collect() handles tasks with invalid enqueued_at gracefully."""
246+
from django_tasks_redis.metrics.collectors import RedisTaskMetricsCollector
247+
from django_tasks_redis.utils import serialize_datetime
248+
249+
mock_backend = Mock()
250+
mock_backend.alias = "test"
251+
mock_backend.get_status_counts.return_value = {
252+
TaskResultStatus.READY: 3,
253+
}
254+
255+
now = datetime.now(timezone.utc)
256+
valid_task_time = now - timedelta(minutes=5)
257+
258+
# Mix of valid and invalid tasks
259+
mock_backend.get_all_tasks.return_value = (
260+
[
261+
{"enqueued_at": serialize_datetime(valid_task_time)},
262+
{"enqueued_at": "invalid-datetime"},
263+
{"enqueued_at": ""},
264+
],
265+
3,
266+
)
267+
268+
collector = RedisTaskMetricsCollector(mock_backend)
269+
270+
# Call collect - should not raise exception
271+
metrics = list(collector.collect())
272+
273+
# Should still return metrics based on valid tasks
274+
metric_names = {m.name for m in metrics}
275+
assert "django_tasks_queue_length" in metric_names
276+
assert "django_tasks_queue_oldest_ready_age_seconds" in metric_names
277+
assert "django_tasks_queue_newest_ready_age_seconds" in metric_names

0 commit comments

Comments
 (0)