diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 1eeb021d165..e6c5c6a74ad 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -3051,6 +3051,14 @@ def validate(self, data): return data +class CeleryStatusSerializer(serializers.Serializer): + worker_status = serializers.BooleanField(read_only=True) + queue_length = serializers.IntegerField(allow_null=True, read_only=True) + task_time_limit = serializers.IntegerField(allow_null=True, read_only=True) + task_soft_time_limit = serializers.IntegerField(allow_null=True, read_only=True) + task_default_expires = serializers.IntegerField(allow_null=True, read_only=True) + + class FindingNoteSerializer(serializers.Serializer): note_id = serializers.IntegerField() diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 3461e54b25a..db7eec305a8 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -33,6 +33,7 @@ from rest_framework.parsers import MultiPartParser from rest_framework.permissions import DjangoModelPermissions, IsAuthenticated from rest_framework.response import Response +from rest_framework.views import APIView import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper @@ -179,9 +180,12 @@ from dojo.utils import ( async_delete, generate_file_response, + get_celery_queue_length, + get_celery_worker_status, get_setting, get_system_setting, process_tag_notifications, + purge_celery_queue, ) logger = logging.getLogger(__name__) @@ -3123,6 +3127,49 @@ def get_queryset(self): return System_Settings.objects.all().order_by("id") +@extend_schema( + responses=serializers.CeleryStatusSerializer, + summary="Get Celery worker and queue status", + description=( + "Returns Celery worker liveness, pending queue length, and the active task " + "timeout/expiry configuration. Uses the Celery control channel (pidbox) for " + "worker status so it works correctly even when the task queue is clogged." + ), +) +class CeleryStatusView(APIView): + permission_classes = (permissions.IsSuperUser, DjangoModelPermissions) + queryset = System_Settings.objects.none() + + def get(self, request): + data = { + "worker_status": get_celery_worker_status(), + "queue_length": get_celery_queue_length(), + "task_time_limit": getattr(settings, "CELERY_TASK_TIME_LIMIT", None), + "task_soft_time_limit": getattr(settings, "CELERY_TASK_SOFT_TIME_LIMIT", None), + "task_default_expires": getattr(settings, "CELERY_TASK_DEFAULT_EXPIRES", None), + } + return Response(serializers.CeleryStatusSerializer(data).data) + + +@extend_schema( + request=None, + responses={200: {"type": "object", "properties": {"purged": {"type": "integer"}}}}, + summary="Purge all pending Celery tasks from the queue", + description=( + "Removes all pending tasks from the default Celery queue. Tasks already being " + "executed by workers are not affected. Note: if deduplication tasks were queued, " + "you may need to re-run deduplication manually via `python manage.py dedupe`." + ), +) +class CeleryQueuePurgeView(APIView): + permission_classes = (permissions.IsSuperUser, DjangoModelPermissions) + queryset = System_Settings.objects.none() + + def post(self, request): + purged = purge_celery_queue() + return Response({"purged": purged}) + + # Authorization: superuser @extend_schema_view(**schema_with_prefetch()) class NotificationsViewSet( diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index ca0c28d76f6..22a80dae2b1 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -90,6 +90,17 @@ DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")), DD_CELERY_TASK_SERIALIZER=(str, "pickle"), DD_CELERY_LOG_LEVEL=(str, "INFO"), + # Hard ceiling on task runtime. When reached, the worker process is sent SIGKILL — no cleanup + # code runs. Always set higher than DD_CELERY_TASK_SOFT_TIME_LIMIT. (0 = disabled, no limit) + DD_CELERY_TASK_TIME_LIMIT=(int, 43200), # default: 12 hours + # Raises SoftTimeLimitExceeded inside the task, giving it a chance to clean up before the hard + # kill. Set a few seconds below DD_CELERY_TASK_TIME_LIMIT so cleanup has time to finish. + # (0 = disabled, no limit) + DD_CELERY_TASK_SOFT_TIME_LIMIT=(int, 0), + # If a task sits in the broker queue for longer than this without being picked up by a worker, + # Celery silently discards it — it is never executed and no exception is raised. Does not + # affect tasks that are already running. (0 = disabled, no limit) + DD_CELERY_TASK_DEFAULT_EXPIRES=(int, 43200), # default: 12 hours DD_TAG_BULK_ADD_BATCH_SIZE=(int, 1000), # Tagulous slug truncate unique setting. Set to -1 to use tagulous internal default (5) DD_TAGULOUS_SLUG_TRUNCATE_UNIQUE=(int, -1), @@ -1249,6 +1260,13 @@ def saml2_attrib_map_format(din): CELERY_TASK_SERIALIZER = env("DD_CELERY_TASK_SERIALIZER") CELERY_LOG_LEVEL = env("DD_CELERY_LOG_LEVEL") +if env("DD_CELERY_TASK_TIME_LIMIT") > 0: + CELERY_TASK_TIME_LIMIT = env("DD_CELERY_TASK_TIME_LIMIT") +if env("DD_CELERY_TASK_SOFT_TIME_LIMIT") > 0: + CELERY_TASK_SOFT_TIME_LIMIT = env("DD_CELERY_TASK_SOFT_TIME_LIMIT") +if env("DD_CELERY_TASK_DEFAULT_EXPIRES") > 0: + CELERY_TASK_DEFAULT_EXPIRES = env("DD_CELERY_TASK_DEFAULT_EXPIRES") + if len(env("DD_CELERY_BROKER_TRANSPORT_OPTIONS")) > 0: CELERY_BROKER_TRANSPORT_OPTIONS = json.loads(env("DD_CELERY_BROKER_TRANSPORT_OPTIONS")) diff --git a/dojo/system_settings/urls.py b/dojo/system_settings/urls.py index 1452493095c..e25ec7d4b46 100644 --- a/dojo/system_settings/urls.py +++ b/dojo/system_settings/urls.py @@ -8,4 +8,9 @@ views.SystemSettingsView.as_view(), name="system_settings", ), + re_path( + r"^system_status$", + views.SystemStatusView.as_view(), + name="system_status", + ), ] diff --git a/dojo/system_settings/views.py b/dojo/system_settings/views.py index 67bad91dc0e..33771b333ac 100644 --- a/dojo/system_settings/views.py +++ b/dojo/system_settings/views.py @@ -1,6 +1,5 @@ import logging -from django.conf import settings from django.contrib import messages from django.core.exceptions import PermissionDenied from django.http import HttpRequest, HttpResponse @@ -9,7 +8,7 @@ from dojo.forms import SystemSettingsForm from dojo.models import System_Settings -from dojo.utils import add_breadcrumb, get_celery_queue_length, get_celery_worker_status +from dojo.utils import add_breadcrumb logger = logging.getLogger(__name__) @@ -30,15 +29,10 @@ def get_context( request: HttpRequest, ) -> dict: system_settings_obj = self.get_settings_object() - # Set the initial context - context = { + return { "system_settings_obj": system_settings_obj, "form": self.get_form(request, system_settings_obj), } - # Check the status of celery - self.get_celery_status(context) - - return context def get_form( self, @@ -95,35 +89,6 @@ def validate_form( return request, True return request, False - def get_celery_status( - self, - context: dict, - ) -> None: - # Celery needs to be set with the setting: CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite' - if hasattr(settings, "CELERY_RESULT_BACKEND"): - # Check the status of Celery by sending calling a celery task - context["celery_bool"] = get_celery_worker_status() - - if context["celery_bool"]: - context["celery_msg"] = "Celery is processing tasks." - context["celery_status"] = "Running" - else: - context["celery_msg"] = "Celery does not appear to be up and running. Please ensure celery is running." - context["celery_status"] = "Not Running" - - q_len = get_celery_queue_length() - if q_len is None: - context["celery_q_len"] = " It is not possible to identify number of waiting tasks." - elif q_len: - context["celery_q_len"] = f"{q_len} tasks are waiting to be proccessed." - else: - context["celery_q_len"] = "No task is waiting to be proccessed." - - else: - context["celery_bool"] = False - context["celery_msg"] = "Celery needs to have the setting CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite' set in settings.py." - context["celery_status"] = "Unknown" - def get_template(self) -> str: return "dojo/system_settings.html" @@ -148,9 +113,19 @@ def post( self.permission_check(request) # Set up the initial context context = self.get_context(request) - # Check the status of celery request, _ = self.validate_form(request, context) # Add some breadcrumbs add_breadcrumb(title="System settings", top_level=False, request=request) # Render the page return render(request, self.get_template(), context) + + +class SystemStatusView(View): + def get( + self, + request: HttpRequest, + ) -> HttpResponse: + if not request.user.is_superuser: + raise PermissionDenied + add_breadcrumb(title="System status", top_level=False, request=request) + return render(request, "dojo/system_status.html") diff --git a/dojo/templates/base.html b/dojo/templates/base.html index 09393088809..abf395c8819 100644 --- a/dojo/templates/base.html +++ b/dojo/templates/base.html @@ -613,6 +613,11 @@ {% trans "System Settings" %} +
+ Note: Purging the queue removes pending tasks that have not yet been executed,
+ including deduplication tasks. If deduplication tasks were in the queue, you may need to
+ re-run deduplication manually using the dedupe management command:
+ python manage.py dedupe
+
| Setting | Value |
|---|---|
+ Read-only. Configured via environment variables:
+ DD_CELERY_TASK_TIME_LIMIT,
+ DD_CELERY_TASK_SOFT_TIME_LIMIT,
+ DD_CELERY_TASK_DEFAULT_EXPIRES
+ |
+ |