From 73f690b1b2f790045be07e56667d04d31ccb3f46 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Thu, 19 Feb 2026 20:44:10 +0100 Subject: [PATCH] feat: add System Status page with Celery queue management - Add dedicated System Status page (/system_status) with superuser-only access, accessible from the navigation menu alongside System Settings - Display Celery worker liveness, pending queue length with human-readable duration formatting, and active task timeout/expiry configuration - Add Purge queue button that POSTs to the new API endpoint and reloads the page on success - Fix get_celery_worker_status() to use app.control.ping() via the pidbox control channel, which works correctly even when the task queue is clogged (previously dispatched a task that would never be picked up) - Add purge_celery_queue() utility using a direct broker connection - Add two new superuser-only REST API endpoints: GET /api/v2/celery/status/ - worker status, queue length, config POST /api/v2/celery/queue/purge/ - purge all pending tasks Both use the same permission guards as SystemSettingsViewSet (IsSuperUser + DjangoModelPermissions against System_Settings) - Add DD_CELERY_TASK_TIME_LIMIT (default 12h), DD_CELERY_TASK_SOFT_TIME_LIMIT (default disabled), and DD_CELERY_TASK_DEFAULT_EXPIRES (default 12h) environment variables to settings.dist.py with explanatory comments - Move celery status rendering from server-side Django view to client-side AJAX so dojo-pro can consume the same API endpoints feat: add Refresh button next to Purge button on System Status page remove plan --- dojo/api_v2/serializers.py | 8 ++ dojo/api_v2/views.py | 47 ++++++++ dojo/settings/settings.dist.py | 18 ++++ dojo/system_settings/urls.py | 5 + dojo/system_settings/views.py | 51 +++------ dojo/templates/base.html | 5 + dojo/templates/dojo/system_settings.html | 23 ---- dojo/templates/dojo/system_status.html | 132 +++++++++++++++++++++++ dojo/urls.py | 4 + dojo/utils.py | 21 ++-- 10 files changed, 246 insertions(+), 68 deletions(-) create mode 100644 dojo/templates/dojo/system_status.html diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 1eeb021d165..e6c5c6a74ad 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -3051,6 +3051,14 @@ def validate(self, data): return data +class CeleryStatusSerializer(serializers.Serializer): + worker_status = serializers.BooleanField(read_only=True) + queue_length = serializers.IntegerField(allow_null=True, read_only=True) + task_time_limit = serializers.IntegerField(allow_null=True, read_only=True) + task_soft_time_limit = serializers.IntegerField(allow_null=True, read_only=True) + task_default_expires = serializers.IntegerField(allow_null=True, read_only=True) + + class FindingNoteSerializer(serializers.Serializer): note_id = serializers.IntegerField() diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 3461e54b25a..db7eec305a8 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -33,6 +33,7 @@ from rest_framework.parsers import MultiPartParser from rest_framework.permissions import DjangoModelPermissions, IsAuthenticated from rest_framework.response import Response +from rest_framework.views import APIView import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper @@ -179,9 +180,12 @@ from dojo.utils import ( async_delete, generate_file_response, + get_celery_queue_length, + get_celery_worker_status, get_setting, get_system_setting, process_tag_notifications, + purge_celery_queue, ) logger = logging.getLogger(__name__) @@ -3123,6 +3127,49 @@ def get_queryset(self): return System_Settings.objects.all().order_by("id") +@extend_schema( + responses=serializers.CeleryStatusSerializer, + summary="Get Celery worker and queue status", + description=( + "Returns Celery worker liveness, pending queue length, and the active task " + "timeout/expiry configuration. Uses the Celery control channel (pidbox) for " + "worker status so it works correctly even when the task queue is clogged." + ), +) +class CeleryStatusView(APIView): + permission_classes = (permissions.IsSuperUser, DjangoModelPermissions) + queryset = System_Settings.objects.none() + + def get(self, request): + data = { + "worker_status": get_celery_worker_status(), + "queue_length": get_celery_queue_length(), + "task_time_limit": getattr(settings, "CELERY_TASK_TIME_LIMIT", None), + "task_soft_time_limit": getattr(settings, "CELERY_TASK_SOFT_TIME_LIMIT", None), + "task_default_expires": getattr(settings, "CELERY_TASK_DEFAULT_EXPIRES", None), + } + return Response(serializers.CeleryStatusSerializer(data).data) + + +@extend_schema( + request=None, + responses={200: {"type": "object", "properties": {"purged": {"type": "integer"}}}}, + summary="Purge all pending Celery tasks from the queue", + description=( + "Removes all pending tasks from the default Celery queue. Tasks already being " + "executed by workers are not affected. Note: if deduplication tasks were queued, " + "you may need to re-run deduplication manually via `python manage.py dedupe`." + ), +) +class CeleryQueuePurgeView(APIView): + permission_classes = (permissions.IsSuperUser, DjangoModelPermissions) + queryset = System_Settings.objects.none() + + def post(self, request): + purged = purge_celery_queue() + return Response({"purged": purged}) + + # Authorization: superuser @extend_schema_view(**schema_with_prefetch()) class NotificationsViewSet( diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index ca0c28d76f6..22a80dae2b1 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -90,6 +90,17 @@ DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")), DD_CELERY_TASK_SERIALIZER=(str, "pickle"), DD_CELERY_LOG_LEVEL=(str, "INFO"), + # Hard ceiling on task runtime. When reached, the worker process is sent SIGKILL — no cleanup + # code runs. Always set higher than DD_CELERY_TASK_SOFT_TIME_LIMIT. (0 = disabled, no limit) + DD_CELERY_TASK_TIME_LIMIT=(int, 43200), # default: 12 hours + # Raises SoftTimeLimitExceeded inside the task, giving it a chance to clean up before the hard + # kill. Set a few seconds below DD_CELERY_TASK_TIME_LIMIT so cleanup has time to finish. + # (0 = disabled, no limit) + DD_CELERY_TASK_SOFT_TIME_LIMIT=(int, 0), + # If a task sits in the broker queue for longer than this without being picked up by a worker, + # Celery silently discards it — it is never executed and no exception is raised. Does not + # affect tasks that are already running. (0 = disabled, no limit) + DD_CELERY_TASK_DEFAULT_EXPIRES=(int, 43200), # default: 12 hours DD_TAG_BULK_ADD_BATCH_SIZE=(int, 1000), # Tagulous slug truncate unique setting. Set to -1 to use tagulous internal default (5) DD_TAGULOUS_SLUG_TRUNCATE_UNIQUE=(int, -1), @@ -1249,6 +1260,13 @@ def saml2_attrib_map_format(din): CELERY_TASK_SERIALIZER = env("DD_CELERY_TASK_SERIALIZER") CELERY_LOG_LEVEL = env("DD_CELERY_LOG_LEVEL") +if env("DD_CELERY_TASK_TIME_LIMIT") > 0: + CELERY_TASK_TIME_LIMIT = env("DD_CELERY_TASK_TIME_LIMIT") +if env("DD_CELERY_TASK_SOFT_TIME_LIMIT") > 0: + CELERY_TASK_SOFT_TIME_LIMIT = env("DD_CELERY_TASK_SOFT_TIME_LIMIT") +if env("DD_CELERY_TASK_DEFAULT_EXPIRES") > 0: + CELERY_TASK_DEFAULT_EXPIRES = env("DD_CELERY_TASK_DEFAULT_EXPIRES") + if len(env("DD_CELERY_BROKER_TRANSPORT_OPTIONS")) > 0: CELERY_BROKER_TRANSPORT_OPTIONS = json.loads(env("DD_CELERY_BROKER_TRANSPORT_OPTIONS")) diff --git a/dojo/system_settings/urls.py b/dojo/system_settings/urls.py index 1452493095c..e25ec7d4b46 100644 --- a/dojo/system_settings/urls.py +++ b/dojo/system_settings/urls.py @@ -8,4 +8,9 @@ views.SystemSettingsView.as_view(), name="system_settings", ), + re_path( + r"^system_status$", + views.SystemStatusView.as_view(), + name="system_status", + ), ] diff --git a/dojo/system_settings/views.py b/dojo/system_settings/views.py index 67bad91dc0e..33771b333ac 100644 --- a/dojo/system_settings/views.py +++ b/dojo/system_settings/views.py @@ -1,6 +1,5 @@ import logging -from django.conf import settings from django.contrib import messages from django.core.exceptions import PermissionDenied from django.http import HttpRequest, HttpResponse @@ -9,7 +8,7 @@ from dojo.forms import SystemSettingsForm from dojo.models import System_Settings -from dojo.utils import add_breadcrumb, get_celery_queue_length, get_celery_worker_status +from dojo.utils import add_breadcrumb logger = logging.getLogger(__name__) @@ -30,15 +29,10 @@ def get_context( request: HttpRequest, ) -> dict: system_settings_obj = self.get_settings_object() - # Set the initial context - context = { + return { "system_settings_obj": system_settings_obj, "form": self.get_form(request, system_settings_obj), } - # Check the status of celery - self.get_celery_status(context) - - return context def get_form( self, @@ -95,35 +89,6 @@ def validate_form( return request, True return request, False - def get_celery_status( - self, - context: dict, - ) -> None: - # Celery needs to be set with the setting: CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite' - if hasattr(settings, "CELERY_RESULT_BACKEND"): - # Check the status of Celery by sending calling a celery task - context["celery_bool"] = get_celery_worker_status() - - if context["celery_bool"]: - context["celery_msg"] = "Celery is processing tasks." - context["celery_status"] = "Running" - else: - context["celery_msg"] = "Celery does not appear to be up and running. Please ensure celery is running." - context["celery_status"] = "Not Running" - - q_len = get_celery_queue_length() - if q_len is None: - context["celery_q_len"] = " It is not possible to identify number of waiting tasks." - elif q_len: - context["celery_q_len"] = f"{q_len} tasks are waiting to be proccessed." - else: - context["celery_q_len"] = "No task is waiting to be proccessed." - - else: - context["celery_bool"] = False - context["celery_msg"] = "Celery needs to have the setting CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite' set in settings.py." - context["celery_status"] = "Unknown" - def get_template(self) -> str: return "dojo/system_settings.html" @@ -148,9 +113,19 @@ def post( self.permission_check(request) # Set up the initial context context = self.get_context(request) - # Check the status of celery request, _ = self.validate_form(request, context) # Add some breadcrumbs add_breadcrumb(title="System settings", top_level=False, request=request) # Render the page return render(request, self.get_template(), context) + + +class SystemStatusView(View): + def get( + self, + request: HttpRequest, + ) -> HttpResponse: + if not request.user.is_superuser: + raise PermissionDenied + add_breadcrumb(title="System status", top_level=False, request=request) + return render(request, "dojo/system_status.html") diff --git a/dojo/templates/base.html b/dojo/templates/base.html index 09393088809..abf395c8819 100644 --- a/dojo/templates/base.html +++ b/dojo/templates/base.html @@ -613,6 +613,11 @@ {% trans "System Settings" %} +
  • + + {% trans "System Status" %} + +
  • {% endif %} {% if "dojo.view_tool_configuration"|has_configuration_permission:request %}
  • diff --git a/dojo/templates/dojo/system_settings.html b/dojo/templates/dojo/system_settings.html index 4ea772bc7a3..11bb7c4e58e 100644 --- a/dojo/templates/dojo/system_settings.html +++ b/dojo/templates/dojo/system_settings.html @@ -10,29 +10,6 @@ {% block content %} {{ block.super }} - {% block status %} -
    -

    System Status

    -
    -
    -
    -
    - {% if celery_bool %} -

    Celery {{celery_status}}

    - {% else %} -

    Celery {{celery_status}}

    - {% endif %} -
    -
    - {{celery_msg}} -
    -
    - {{celery_q_len}} -
    -
    -
    -
    - {% endblock status %}
    {% block settings %}
    diff --git a/dojo/templates/dojo/system_status.html b/dojo/templates/dojo/system_status.html new file mode 100644 index 00000000000..f108cede1d1 --- /dev/null +++ b/dojo/templates/dojo/system_status.html @@ -0,0 +1,132 @@ +{% extends "base.html" %} +{% load static %} + +{% block content %} + {{ block.super }} +
    +

    System Status

    +
    +
    +
    +
    +

    Celery Loading...

    +
    +
    +
    + +
    + + +
    +

    + Note: Purging the queue removes pending tasks that have not yet been executed, + including deduplication tasks. If deduplication tasks were in the queue, you may need to + re-run deduplication manually using the dedupe management command: + python manage.py dedupe +

    +
    +
    + + + + + + + + +
    SettingValue
    + Read-only. Configured via environment variables: + DD_CELERY_TASK_TIME_LIMIT, + DD_CELERY_TASK_SOFT_TIME_LIMIT, + DD_CELERY_TASK_DEFAULT_EXPIRES +
    +
    +
    +
    +
    +{% endblock content %} + +{% block postscript %} + {{ block.super }} + +{% endblock %} diff --git a/dojo/urls.py b/dojo/urls.py index 2d04ca7c078..04b1609dfb7 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -15,6 +15,8 @@ AnnouncementViewSet, AppAnalysisViewSet, BurpRawRequestResponseViewSet, + CeleryQueuePurgeView, + CeleryStatusView, ConfigurationPermissionViewSet, CredentialsMappingViewSet, CredentialsViewSet, @@ -242,6 +244,8 @@ # Django Rest Framework API v2 re_path(r"^{}api/v2/".format(get_system_setting("url_prefix")), include(v2_api.urls)), re_path(r"^{}api/v2/user_profile/".format(get_system_setting("url_prefix")), UserProfileView.as_view(), name="user_profile"), + re_path(r"^{}api/v2/celery/status/$".format(get_system_setting("url_prefix")), CeleryStatusView.as_view(), name="celery_status_api"), + re_path(r"^{}api/v2/celery/queue/purge/$".format(get_system_setting("url_prefix")), CeleryQueuePurgeView.as_view(), name="celery_queue_purge_api"), ] if hasattr(settings, "API_TOKENS_ENABLED") and hasattr(settings, "API_TOKEN_AUTH_ENDPOINT_ENABLED"): diff --git a/dojo/utils.py b/dojo/utils.py index a5d8a13ed81..822b7144fb0 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -1312,13 +1312,14 @@ def perform_product_grading(product): def get_celery_worker_status(): - from .tasks import celery_status # noqa: PLC0415 circular import - res = celery_status.apply_async() - - # Wait 5 seconds for a response from Celery + from dojo.celery import app # noqa: PLC0415 circular import + # Use the control channel (celery.pidbox) instead of dispatching a task. + # This bypasses the task queue entirely, so it works correctly even when + # the queue is clogged with pending tasks. try: - return res.get(timeout=5) - except: + result = app.control.ping(timeout=3) + return len(result) > 0 + except Exception: return False @@ -1330,10 +1331,16 @@ def get_celery_queue_length(): if "NOT_FOUND" in str(e): return 0 return None - except: + except Exception: return None +def purge_celery_queue(): + from dojo.celery import app # noqa: PLC0415 circular import + with app.connection() as conn, conn.channel() as channel: + return channel.queue_purge(queue="celery") + + # Used to display the counts and enabled tabs in the product view # Uses @cached_property for lazy loading to avoid expensive queries on every page load # See: https://github.com/DefectDojo/django-DefectDojo/issues/10313