diff --git a/dojo/admin.py b/dojo/admin.py index c7a21b91019..dc22174dd14 100644 --- a/dojo/admin.py +++ b/dojo/admin.py @@ -1,14 +1,16 @@ -from django.contrib import admin +from django.contrib import admin, messages from django.contrib.admin.sites import NotRegistered from polymorphic.admin import PolymorphicChildModelAdmin, PolymorphicParentModelAdmin from dojo.models import ( Answer, Answered_Survey, + CategoryMapping, Choice, ChoiceAnswer, ChoiceQuestion, Engagement_Survey, + Finding, Question, TextAnswer, TextQuestion, @@ -99,3 +101,60 @@ class AnswerParentAdmin(PolymorphicParentModelAdmin): admin.site.register(Answer, AnswerParentAdmin) admin.site.register(Engagement_Survey) admin.site.register(Answered_Survey) + + +# =================================================== +# Mural watercolor FE: dashboard category taxonomy +# =================================================== + + +@admin.register(CategoryMapping) +class CategoryMappingAdmin(admin.ModelAdmin): + + """ + Editable rules table. The ``Test this mapping`` action evaluates the + selected rules against the most recent N findings the admin can see and + surfaces the resolved category in a Django messages banner. See + ``prd-fe-dashboard-aggregations-and-category-taxonomy.md`` §10. + """ + + list_display = ("__str__", "category", "specificity", "updated_at") + list_filter = ("category",) + search_fields = ("tag_pattern", "scanner") + readonly_fields = ("specificity", "updated_at") + actions = ("test_this_mapping",) + + def test_this_mapping(self, request, queryset): + from dojo.taxonomy.categorize import categorize # noqa: PLC0415 + + sample = list(Finding.objects.order_by("-id")[:20]) + if not sample: + self.message_user( + request, + "No findings exist yet — nothing to evaluate against.", + level=messages.WARNING, + ) + return + + # We restrict the evaluated rule-set to the rows the admin selected + # so the action reflects "what would change if only these rules + # existed" — handy when iterating on a fix. + mappings = list(queryset.order_by("-specificity", "-updated_at")) + if not mappings: + self.message_user( + request, + "Select at least one mapping row to test.", + level=messages.WARNING, + ) + return + + for f in sample: + resolved = categorize(f, mappings=mappings) + self.message_user( + request, + f"Finding #{f.id} (cwe={f.cwe}, scanner=" + f"{getattr(getattr(f, 'test', None), 'scan_type', None)}) -> {resolved}", + level=messages.INFO, + ) + + test_this_mapping.short_description = "Test this mapping against 20 recent findings" diff --git a/dojo/api_v2/dashboard/__init__.py b/dojo/api_v2/dashboard/__init__.py new file mode 100644 index 00000000000..ea6311fcd0a --- /dev/null +++ b/dojo/api_v2/dashboard/__init__.py @@ -0,0 +1,19 @@ +# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE). +""" +Dashboard aggregation API package. + +Three viewsets (queue-counters, category-counts, search-totals) consumed by +the watercolor FE Home screen. See +``prd-fe-dashboard-aggregations-and-category-taxonomy.md`` for the contract. +""" +from dojo.api_v2.dashboard.views import ( + CategoryCountsViewSet, + QueueCountersViewSet, + SearchTotalsViewSet, +) + +__all__ = [ + "CategoryCountsViewSet", + "QueueCountersViewSet", + "SearchTotalsViewSet", +] diff --git a/dojo/api_v2/dashboard/views.py b/dojo/api_v2/dashboard/views.py new file mode 100644 index 00000000000..2a798a21edd --- /dev/null +++ b/dojo/api_v2/dashboard/views.py @@ -0,0 +1,209 @@ +# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE). +""" +Dashboard aggregation viewsets. + +All endpoints are scoped to the requesting user's accessible findings via +``dojo.finding.queries.get_authorized_findings``. Each response is cached +per-user for 60 seconds, keyed on ``MAX(finding.updated)`` so a mutation +implicitly busts the cache. + +Endpoints (all under ``/api/v2/dashboard/``): + +* ``GET queue-counters/`` → ``{critical, high, sla_breach, fixed_this_week}`` +* ``GET category-counts/`` → ``[{id, count}]`` (always 6 entries in fixed order) +* ``GET search-totals/`` → ``{findings, categories}`` +""" +from __future__ import annotations + +import hashlib +import logging +from datetime import timedelta + +from django.core.cache import cache +from django.db.models import Count, Max +from django.utils import timezone +from drf_spectacular.utils import OpenApiResponse, extend_schema +from rest_framework import status, viewsets +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from dojo.authorization.roles_permissions import Permissions +from dojo.finding.queries import get_authorized_findings +from dojo.taxonomy.categorize import CATEGORY_IDS, categorize_queryset + +logger = logging.getLogger(__name__) + +CACHE_TTL_SECONDS = 60 +SEARCH_TOTALS_CATEGORY_CAP = 8 + + +def _cache_key(prefix: str, user_id: int, fingerprint: str) -> str: + return f"dojo:dashboard:{prefix}:user={user_id}:fp={fingerprint}" + + +def _user_findings(user): + """Authorized findings for the requesting user.""" + return get_authorized_findings(Permissions.Finding_View, user=user) + + +def _fingerprint(qs) -> str: + """ + Build the per-user cache fingerprint: max(updated) over the user's + accessible findings, hashed for compact key storage. + """ + last = qs.order_by().aggregate(m=Max("updated"))["m"] + seed = "none" if last is None else last.isoformat() + return hashlib.md5(seed.encode("utf-8")).hexdigest()[:16] # noqa: S324 # fingerprint, not security + + +def _open_findings(qs): + """Open findings = active, not mitigated, not false_p, not out_of_scope.""" + return qs.filter( + active=True, + is_mitigated=False, + false_p=False, + out_of_scope=False, + duplicate=False, + ) + + +class QueueCountersViewSet(viewsets.ViewSet): + + """Hero counter grid for Home: critical / high / SLA breach / fixed this week.""" + + permission_classes = (IsAuthenticated,) + # provides a stable URL name (``queue-counters-list``) that DRF's + # DefaultRouter expects on a ViewSet. + basename = "dashboard_queue_counters" + + @extend_schema( + operation_id="dashboard_queue_counters", + description=( + "Returns the four hero counters for the watercolor Home screen. " + "All counts are scoped to findings the requesting user is " + "authorized to see. ``sla_breach`` counts open findings whose " + "``sla_expiration_date`` is in the past (returns 0 if SLA " + "tracking is not in use). ``fixed_this_week`` counts findings " + "whose ``mitigated`` timestamp falls in the last 7 days — this " + "is an undercount relative to the full Mitigated/Inactive/" + "False Positive transition set because this fork does not " + "maintain a status-history table." + ), + responses={ + 200: OpenApiResponse(description="Counters payload"), + }, + ) + def list(self, request): + qs = _user_findings(request.user) + fp = _fingerprint(qs) + key = _cache_key("queue", request.user.id, fp) + cached = cache.get(key) + if cached is not None: + return Response(cached) + + open_qs = _open_findings(qs) + critical = open_qs.filter(severity="Critical").count() + high = open_qs.filter(severity="High").count() + + sla_breach = 0 + try: + sla_breach = open_qs.filter( + sla_expiration_date__isnull=False, + sla_expiration_date__lt=timezone.now().date(), + ).count() + except Exception: + # Field absent on this fork — keep the contract and return 0. + sla_breach = 0 + + week_ago = timezone.now() - timedelta(days=7) + fixed_this_week = qs.filter(mitigated__gte=week_ago).count() + + payload = { + "critical": critical, + "high": high, + "sla_breach": sla_breach, + "fixed_this_week": fixed_this_week, + } + cache.set(key, payload, CACHE_TTL_SECONDS) + return Response(payload) + + +class CategoryCountsViewSet(viewsets.ViewSet): + + """Six-tile category strip on Home.""" + + permission_classes = (IsAuthenticated,) + basename = "dashboard_category_counts" + + @extend_schema( + operation_id="dashboard_category_counts", + description=( + "Returns open-finding counts grouped by the six dashboard " + "categories. Always emits one entry per category id in the " + "fixed order: web, cloud, supply, identity, data, config. " + "Missing categories are returned as ``count: 0``. Scoped to " + "findings the requesting user can see." + ), + responses={200: OpenApiResponse(description="Category counts array")}, + ) + def list(self, request): + qs = _user_findings(request.user) + fp = _fingerprint(qs) + key = _cache_key("categories", request.user.id, fp) + cached = cache.get(key) + if cached is not None: + return Response(cached) + + open_qs = _open_findings(qs) + annotated = categorize_queryset(open_qs) + rows = ( + annotated.values("category") + .order_by() + .annotate(count=Count("id", distinct=True)) + ) + by_category = {r["category"]: r["count"] for r in rows} + payload = [ + {"id": cid, "count": int(by_category.get(cid, 0))} + for cid in CATEGORY_IDS + ] + cache.set(key, payload, CACHE_TTL_SECONDS) + return Response(payload) + + +class SearchTotalsViewSet(viewsets.ViewSet): + + """Search-pill placeholder totals: number of findings + number of categories.""" + + permission_classes = (IsAuthenticated,) + basename = "dashboard_search_totals" + + @extend_schema( + operation_id="dashboard_search_totals", + description=( + "Counts feeding the Home search placeholder pill: ``findings`` " + "is the total open-findings count visible to the user, " + "``categories`` is the count of distinct dashboard category " + f"ids with at least one open finding (capped at {SEARCH_TOTALS_CATEGORY_CAP}). " + "Scoped to the requesting user's accessible findings." + ), + responses={200: OpenApiResponse(description="Search totals payload")}, + ) + def list(self, request): + qs = _user_findings(request.user) + fp = _fingerprint(qs) + key = _cache_key("search", request.user.id, fp) + cached = cache.get(key) + if cached is not None: + return Response(cached) + + open_qs = _open_findings(qs) + findings = open_qs.count() + annotated = categorize_queryset(open_qs) + distinct_cats = ( + annotated.values_list("category", flat=True).order_by().distinct().count() + ) + categories = min(distinct_cats, SEARCH_TOTALS_CATEGORY_CAP) + + payload = {"findings": findings, "categories": categories} + cache.set(key, payload, CACHE_TTL_SECONDS) + return Response(payload, status=status.HTTP_200_OK) diff --git a/dojo/db_migrations/0265_categorymapping.py b/dojo/db_migrations/0265_categorymapping.py new file mode 100644 index 00000000000..694d6cb86af --- /dev/null +++ b/dojo/db_migrations/0265_categorymapping.py @@ -0,0 +1,54 @@ +# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE). +"""Create the CategoryMapping table and load the starter rules fixture.""" +from django.core.management import call_command +from django.db import migrations, models + + +FIXTURE = "category_mappings" + + +def load_fixture(apps, schema_editor): + call_command("loaddata", FIXTURE, app_label="dojo") + + +def unload_fixture(apps, schema_editor): + CategoryMapping = apps.get_model("dojo", "CategoryMapping") + CategoryMapping.objects.all().delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ("dojo", "0264_alter_url_identity_hash_alter_urlevent_identity_hash"), + ] + + operations = [ + migrations.CreateModel( + name="CategoryMapping", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("cwe", models.IntegerField(blank=True, help_text="CWE number to match (matches Finding.cwe exactly).", null=True)), + ("tag_pattern", models.CharField(blank=True, help_text="Substring matched (case-insensitive) against any of the Finding's tags.", max_length=128, null=True)), + ("scanner", models.CharField(blank=True, help_text="Scanner name matched against Finding.test.scan_type or test_type.name.", max_length=128, null=True)), + ("category", models.CharField( + choices=[ + ("web", "Web & API"), + ("cloud", "Cloud & infra"), + ("supply", "Supply chain"), + ("identity", "Identity & auth"), + ("data", "Data exposure"), + ("config", "Misconfig"), + ], + help_text="Dashboard category id this rule resolves to.", + max_length=16, + )), + ("specificity", models.IntegerField(default=0, help_text="Computed at save time: CWE=3 + tag=2 + scanner=1. Higher wins.")), + ("updated_at", models.DateTimeField(auto_now=True)), + ], + options={ + "ordering": ("-specificity", "-updated_at"), + "unique_together": {("cwe", "tag_pattern", "scanner")}, + }, + ), + migrations.RunPython(load_fixture, unload_fixture), + ] diff --git a/dojo/filters.py b/dojo/filters.py index db37fa87119..3047c6c25a2 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -1744,6 +1744,12 @@ class ApiFindingFilter(DojoFilter): exclude="True") has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") outside_of_sla = extend_schema_field(OpenApiTypes.NUMBER)(FindingSLAFilter()) + # Mural watercolor FE: dashboard category filter (see dojo/taxonomy/categorize.py). + # Accepts repeated values: ?category=web&category=cloud — these OR together. + category = CharFilter(method="filter_category", label="Dashboard category", + help_text="Filter findings by dashboard taxonomy category id " + "(web, cloud, supply, identity, data, config). " + "Repeat the parameter to OR multiple categories.") o = OrderingFilter( # tuple-mapping retains order @@ -1788,6 +1794,35 @@ def filter_mitigated_after(self, queryset, name, value): return queryset.filter(mitigated__gt=value) + def filter_category(self, queryset, name, value): + """ + Filter findings by one or more dashboard taxonomy categories + (web, cloud, supply, identity, data, config). Repeated values OR + together. Defers to ``categorize_queryset`` so the filter resolves + through the exact same mapping table the dashboard count endpoints + use. + """ + from dojo.taxonomy.categorize import CATEGORY_IDS, categorize_queryset # noqa: PLC0415 + + request = getattr(self, "request", None) + if request is not None: + raw = ( + request.query_params.getlist("category") + if hasattr(request, "query_params") + else request.GET.getlist("category") + ) + else: + raw = [value] + wanted = {v for v in raw if v in CATEGORY_IDS} + if not wanted: + return queryset.none() if value else queryset + annotated_ids = ( + categorize_queryset(queryset) + .filter(category__in=wanted) + .values_list("id", flat=True) + ) + return queryset.filter(id__in=list(annotated_ids)) + def filter_mitigated_on(self, queryset, name, value): if value.hour == 0 and value.minute == 0 and value.second == 0: # we have a simple date without a time, lets get a range from this morning to tonight at 23:59:59:999 diff --git a/dojo/fixtures/category_mappings.json b/dojo/fixtures/category_mappings.json new file mode 100644 index 00000000000..b51b25310e5 --- /dev/null +++ b/dojo/fixtures/category_mappings.json @@ -0,0 +1,24 @@ +[ + {"model": "dojo.categorymapping", "pk": 1, "fields": {"cwe": 79, "tag_pattern": null, "scanner": null, "category": "web", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 2, "fields": {"cwe": 89, "tag_pattern": null, "scanner": null, "category": "web", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 3, "fields": {"cwe": 352, "tag_pattern": null, "scanner": null, "category": "web", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 4, "fields": {"cwe": 601, "tag_pattern": null, "scanner": null, "category": "web", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 5, "fields": {"cwe": 918, "tag_pattern": null, "scanner": null, "category": "web", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 6, "fields": {"cwe": 732, "tag_pattern": null, "scanner": null, "category": "cloud", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 7, "fields": {"cwe": null, "tag_pattern": "aws", "scanner": null, "category": "cloud", "specificity": 2}}, + {"model": "dojo.categorymapping", "pk": 8, "fields": {"cwe": null, "tag_pattern": "gcp", "scanner": null, "category": "cloud", "specificity": 2}}, + {"model": "dojo.categorymapping", "pk": 9, "fields": {"cwe": null, "tag_pattern": "azure", "scanner": null, "category": "cloud", "specificity": 2}}, + {"model": "dojo.categorymapping", "pk": 10, "fields": {"cwe": null, "tag_pattern": null, "scanner": "Prowler", "category": "cloud", "specificity": 1}}, + {"model": "dojo.categorymapping", "pk": 11, "fields": {"cwe": null, "tag_pattern": null, "scanner": "Trivy", "category": "cloud", "specificity": 1}}, + {"model": "dojo.categorymapping", "pk": 12, "fields": {"cwe": null, "tag_pattern": "supply-chain", "scanner": null, "category": "supply", "specificity": 2}}, + {"model": "dojo.categorymapping", "pk": 13, "fields": {"cwe": null, "tag_pattern": null, "scanner": "Dependabot", "category": "supply", "specificity": 1}}, + {"model": "dojo.categorymapping", "pk": 14, "fields": {"cwe": 798, "tag_pattern": null, "scanner": null, "category": "identity", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 15, "fields": {"cwe": 287, "tag_pattern": null, "scanner": null, "category": "identity", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 16, "fields": {"cwe": null, "tag_pattern": "auth", "scanner": null, "category": "identity", "specificity": 2}}, + {"model": "dojo.categorymapping", "pk": 17, "fields": {"cwe": 200, "tag_pattern": null, "scanner": null, "category": "data", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 18, "fields": {"cwe": 209, "tag_pattern": null, "scanner": null, "category": "data", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 19, "fields": {"cwe": 1244, "tag_pattern": null, "scanner": null, "category": "data", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 20, "fields": {"cwe": 250, "tag_pattern": null, "scanner": null, "category": "config", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 21, "fields": {"cwe": 326, "tag_pattern": null, "scanner": null, "category": "config", "specificity": 3}}, + {"model": "dojo.categorymapping", "pk": 22, "fields": {"cwe": null, "tag_pattern": "iac", "scanner": null, "category": "config", "specificity": 2}} +] diff --git a/dojo/models.py b/dojo/models.py index 8cff7092ef6..c53758cfdc0 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -213,6 +213,87 @@ def __str__(self): return self.acronym + " (" + self.jurisdiction + ")" +class CategoryMapping(models.Model): + + """ + Rule that maps a Finding's (cwe, tag_pattern, scanner) to one of the + six dashboard taxonomy categories. Categories are derived at read time — + no Finding.category column is written. See dojo/taxonomy/categorize.py. + + Added by the Mural fork for the watercolor FE redesign dashboard. + """ + + CATEGORY_WEB = "web" + CATEGORY_CLOUD = "cloud" + CATEGORY_SUPPLY = "supply" + CATEGORY_IDENTITY = "identity" + CATEGORY_DATA = "data" + CATEGORY_CONFIG = "config" + CATEGORY_CHOICES = ( + (CATEGORY_WEB, _("Web & API")), + (CATEGORY_CLOUD, _("Cloud & infra")), + (CATEGORY_SUPPLY, _("Supply chain")), + (CATEGORY_IDENTITY, _("Identity & auth")), + (CATEGORY_DATA, _("Data exposure")), + (CATEGORY_CONFIG, _("Misconfig")), + ) + + # Specificity weights summed when the rule's save() runs. See PRD §3. + SPECIFICITY_CWE = 3 + SPECIFICITY_TAG = 2 + SPECIFICITY_SCANNER = 1 + + cwe = models.IntegerField( + null=True, blank=True, + help_text=_("CWE number to match (matches Finding.cwe exactly)."), + ) + tag_pattern = models.CharField( + max_length=128, null=True, blank=True, + help_text=_("Substring matched (case-insensitive) against any of the Finding's tags."), + ) + scanner = models.CharField( + max_length=128, null=True, blank=True, + help_text=_("Scanner name matched against Finding.test.scan_type or test_type.name."), + ) + category = models.CharField( + max_length=16, choices=CATEGORY_CHOICES, + help_text=_("Dashboard category id this rule resolves to."), + ) + specificity = models.IntegerField( + default=0, + help_text=_("Computed at save time: CWE=3 + tag=2 + scanner=1. Higher wins."), + ) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + unique_together = (("cwe", "tag_pattern", "scanner"),) + ordering = ("-specificity", "-updated_at") + + def __str__(self): + parts = [] + if self.cwe is not None: + parts.append(f"CWE-{self.cwe}") + if self.tag_pattern: + parts.append(f"tag~{self.tag_pattern}") + if self.scanner: + parts.append(f"scanner={self.scanner}") + return f"{' & '.join(parts) or '(empty)'} -> {self.category}" + + def save(self, *args, **kwargs): + self.specificity = self.compute_specificity() + super().save(*args, **kwargs) + + def compute_specificity(self): + score = 0 + if self.cwe is not None: + score += self.SPECIFICITY_CWE + if self.tag_pattern: + score += self.SPECIFICITY_TAG + if self.scanner: + score += self.SPECIFICITY_SCANNER + return score + + User = get_user_model() @@ -4640,6 +4721,7 @@ def __str__(self): admin.site.register(SLA_Configuration) admin.site.register(CWE) admin.site.register(Regulation) +# CategoryMapping is registered with a custom ModelAdmin in dojo/admin.py admin.site.register(Global_Role) admin.site.register(Role) admin.site.register(Dojo_Group) diff --git a/dojo/taxonomy/__init__.py b/dojo/taxonomy/__init__.py new file mode 100644 index 00000000000..79f46ef1bf3 --- /dev/null +++ b/dojo/taxonomy/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE). +""" +Dashboard category taxonomy package. + +See dojo/taxonomy/categorize.py for the resolver and +``prd-fe-dashboard-aggregations-and-category-taxonomy.md`` for the contract. +""" diff --git a/dojo/taxonomy/categorize.py b/dojo/taxonomy/categorize.py new file mode 100644 index 00000000000..3c77093ed78 --- /dev/null +++ b/dojo/taxonomy/categorize.py @@ -0,0 +1,188 @@ +# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE). +""" +Category taxonomy resolver. + +Two public helpers: + +* :func:`categorize(finding)` returns the category id for a single Finding. + May iterate the cached mapping list in Python. +* :func:`categorize_queryset(qs)` annotates ``category`` on a Finding + queryset in a single SQL pass (using a ``CASE WHEN`` built from the + mapping table at call time). + +The default fallback when no rule matches is ``"web"`` (PRD §3, step 4). +""" +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from django.db.models import Case, CharField, Q, Value, When + +if TYPE_CHECKING: + from collections.abc import Iterable + +logger = logging.getLogger(__name__) + +DEFAULT_CATEGORY = "web" + +# Stable ordering of category ids the FE expects (PRD §5). +CATEGORY_IDS = ("web", "cloud", "supply", "identity", "data", "config") + + +def _load_mappings(): + """ + Return CategoryMapping rows ordered by descending specificity then + most-recent updated_at. + + Import is deferred to avoid a model import at module load time. + """ + from dojo.models import CategoryMapping # noqa: PLC0415 # avoid circular import + return list( + CategoryMapping.objects.all().order_by("-specificity", "-updated_at"), + ) + + +def _finding_tag_names(finding) -> list[str]: + """ + Best-effort: return lowercase tag names on the finding without + requiring extra queries when the tags are already prefetched. + """ + try: + # tagulous proxies — calling .all() works without re-querying when + # the manager has been prefetched. + return [str(t).lower() for t in finding.tags.all()] + except Exception: + return [] + + +def _finding_scanner(finding) -> str | None: + """ + Resolve a scanner-name string for the finding. + + DefectDojo stores the scanner on ``Finding.test.scan_type`` (a free-form + string) and the human-friendly name on ``test.test_type.name``. We try + ``scan_type`` first since that's what scan parsers populate. + """ + test = getattr(finding, "test", None) + if test is None: + return None + scan_type = getattr(test, "scan_type", None) + if scan_type: + return str(scan_type) + tt = getattr(test, "test_type", None) + if tt is not None: + return getattr(tt, "name", None) + return None + + +def _matches(mapping, *, cwe: int | None, tags: Iterable[str], scanner: str | None) -> bool: + """ + A mapping matches when every non-null criterion on the mapping matches + the finding. A mapping with no criteria at all never matches. + """ + has_criterion = False + if mapping.cwe is not None: + has_criterion = True + if cwe is None or cwe != mapping.cwe: + return False + if mapping.tag_pattern: + has_criterion = True + needle = mapping.tag_pattern.lower() + if not any(needle in t for t in tags): + return False + if mapping.scanner: + has_criterion = True + if not scanner or mapping.scanner.lower() != scanner.lower(): + return False + return has_criterion + + +def categorize(finding, *, mappings: list | None = None) -> str: + """ + Resolve a Finding to a category id. + + ``mappings`` is an optional pre-loaded list for callers that want to + iterate over many findings without re-querying. When omitted we load + them ourselves. + """ + rows = mappings if mappings is not None else _load_mappings() + cwe = getattr(finding, "cwe", None) or None # treat 0 as null + tags = _finding_tag_names(finding) + scanner = _finding_scanner(finding) + + best = None + for m in rows: + if not _matches(m, cwe=cwe, tags=tags, scanner=scanner): + continue + if best is None: + best = m + continue + # rows are pre-sorted by (-specificity, -updated_at) so the first + # match is already the winner — but we keep the comparison explicit + # so callers that pass an unsorted list still get the right answer. + if m.specificity > best.specificity or ( + m.specificity == best.specificity and m.updated_at > best.updated_at + ): + best = m + + if best is None: + return DEFAULT_CATEGORY + return best.category + + +def _mapping_to_q(mapping) -> Q | None: + """ + Build a Q object that matches Finding rows the way the in-Python + resolver matches a single finding. Returns None if the mapping has no + criteria (it would otherwise match everything). + """ + q = Q() + has = False + if mapping.cwe is not None: + q &= Q(cwe=mapping.cwe) + has = True + if mapping.tag_pattern: + q &= Q(tags__name__icontains=mapping.tag_pattern) + has = True + if mapping.scanner: + # match on Test.scan_type primarily and fall back to test_type.name. + q &= ( + Q(test__scan_type__iexact=mapping.scanner) + | Q(test__test_type__name__iexact=mapping.scanner) + ) + has = True + if not has: + return None + return q + + +def categorize_queryset(qs): + """ + Annotate ``category`` on a Finding queryset using a single + ``CASE WHEN`` derived from the mapping table. + + Mappings are evaluated in order of decreasing specificity (then most + recent updated_at), so the first ``WHEN`` matching a row wins — same + tie-break the Python resolver uses. + + No mapping match yields the default category. + """ + rows = _load_mappings() + whens = [] + for m in rows: + q = _mapping_to_q(m) + if q is None: + continue + whens.append(When(q, then=Value(m.category))) + if not whens: + return qs.annotate( + category=Value(DEFAULT_CATEGORY, output_field=CharField(max_length=16)), + ) + return qs.annotate( + category=Case( + *whens, + default=Value(DEFAULT_CATEGORY), + output_field=CharField(max_length=16), + ), + ) diff --git a/dojo/urls.py b/dojo/urls.py index 60d18cfa25b..fff3a5265d9 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -11,6 +11,11 @@ from dojo import views from dojo.announcement.urls import urlpatterns as announcement_urls +from dojo.api_v2.dashboard import ( + CategoryCountsViewSet, + QueueCountersViewSet, + SearchTotalsViewSet, +) from dojo.api_v2.views import ( AnnouncementViewSet, AppAnalysisViewSet, @@ -194,6 +199,10 @@ v2_api.register(r"endpoints", EndPointViewSet, basename="endpoint") v2_api.register(r"endpoint_status", EndpointStatusViewSet, basename="endpoint_status") v2_api.register(r"celery", CeleryViewSet, basename="celery") +# Mural watercolor FE dashboard aggregations (see dojo/api_v2/dashboard/). +v2_api.register(r"dashboard/queue-counters", QueueCountersViewSet, basename="dashboard_queue_counters") +v2_api.register(r"dashboard/category-counts", CategoryCountsViewSet, basename="dashboard_category_counts") +v2_api.register(r"dashboard/search-totals", SearchTotalsViewSet, basename="dashboard_search_totals") # V3 add_asset_urls(v2_api) add_organization_urls(v2_api) diff --git a/unittests/test_dashboard_aggregations.py b/unittests/test_dashboard_aggregations.py new file mode 100644 index 00000000000..9e5bb1c5e66 --- /dev/null +++ b/unittests/test_dashboard_aggregations.py @@ -0,0 +1,335 @@ +# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE). +""" +Tests for the dashboard aggregation endpoints and category taxonomy. + +Covers (PRD §11): + +* every seed-fixture rule resolves to its declared category +* specificity tie-break (later updated_at wins on equal specificity) +* the three endpoints at fixture sizes 0 / 1 / 100 findings +* the ``?category=`` filter on the findings list +* permission isolation between two users in disjoint products +""" +from __future__ import annotations + +from datetime import timedelta + +from django.contrib.auth import get_user_model +from django.test import TestCase +from django.utils import timezone +from rest_framework.test import APIClient + +from dojo.models import ( + CategoryMapping, + Engagement, + Finding, + Product, + Product_Member, + Product_Type, + Role, + Test, + Test_Type, +) +from dojo.taxonomy.categorize import ( + CATEGORY_IDS, + DEFAULT_CATEGORY, + categorize, + categorize_queryset, +) + +User = get_user_model() + + +def _build_product(name="P", prod_type=None): + if prod_type is None: + prod_type = Product_Type.objects.create(name=f"{name}-type") + return Product.objects.create(name=name, prod_type=prod_type, description=name) + + +def _build_test(product, scan_type="Generic"): + tt, _ = Test_Type.objects.get_or_create(name=scan_type) + eng = Engagement.objects.create( + product=product, + name=f"E-{product.id}", + target_start=timezone.now().date(), + target_end=timezone.now().date() + timedelta(days=1), + ) + return Test.objects.create( + engagement=eng, + test_type=tt, + scan_type=scan_type, + target_start=timezone.now(), + target_end=timezone.now() + timedelta(days=1), + ) + + +def _build_finding(test, cwe=None, severity="High", *, active=True, mitigated=None, + sla_expiration_date=None, tags=None): + f = Finding.objects.create( + title="t", + test=test, + cwe=cwe or 0, + severity=severity, + active=active, + is_mitigated=mitigated is not None, + mitigated=mitigated, + sla_expiration_date=sla_expiration_date, + verified=False, + ) + if tags: + f.tags = tags + f.save() + return f + + +class CategorizeResolverTests(TestCase): + + """Resolver-level checks (no HTTP).""" + + def setUp(self): + # Seed the same rules the fixture defines so tests do not depend on + # the data migration having run. + seed = [ + (79, None, None, "web"), + (89, None, None, "web"), + (352, None, None, "web"), + (601, None, None, "web"), + (918, None, None, "web"), + (732, None, None, "cloud"), + (None, "aws", None, "cloud"), + (None, "gcp", None, "cloud"), + (None, "azure", None, "cloud"), + (None, None, "Prowler", "cloud"), + (None, None, "Trivy", "cloud"), + (None, "supply-chain", None, "supply"), + (None, None, "Dependabot", "supply"), + (798, None, None, "identity"), + (287, None, None, "identity"), + (None, "auth", None, "identity"), + (200, None, None, "data"), + (209, None, None, "data"), + (1244, None, None, "data"), + (250, None, None, "config"), + (326, None, None, "config"), + (None, "iac", None, "config"), + ] + for cwe, tag, scanner, cat in seed: + CategoryMapping.objects.create( + cwe=cwe, tag_pattern=tag, scanner=scanner, category=cat, + ) + + def test_every_seed_cwe_resolves(self): + prod = _build_product("seed") + t = _build_test(prod, scan_type="Generic") + for cwe, expected in [ + (79, "web"), (89, "web"), (352, "web"), (601, "web"), (918, "web"), + (732, "cloud"), + (798, "identity"), (287, "identity"), + (200, "data"), (209, "data"), (1244, "data"), + (250, "config"), (326, "config"), + ]: + f = _build_finding(t, cwe=cwe) + self.assertEqual(categorize(f), expected, f"CWE-{cwe} should map to {expected}") + + def test_tag_rules_resolve(self): + prod = _build_product("tags") + t = _build_test(prod, scan_type="Generic") + for tag, expected in [ + ("aws", "cloud"), ("gcp", "cloud"), ("azure", "cloud"), + ("supply-chain", "supply"), + ("auth", "identity"), + ("iac", "config"), + ]: + f = _build_finding(t, tags=tag) + self.assertEqual(categorize(f), expected, f"tag {tag} should map to {expected}") + + def test_scanner_rules_resolve(self): + prod = _build_product("scan") + for scanner, expected in [ + ("Prowler", "cloud"), ("Trivy", "cloud"), ("Dependabot", "supply"), + ]: + t = _build_test(prod, scan_type=scanner) + f = _build_finding(t) + self.assertEqual(categorize(f), expected, f"scanner {scanner} should map to {expected}") + + def test_fallback_is_web(self): + prod = _build_product("fb") + t = _build_test(prod, scan_type="Nothing-Special") + f = _build_finding(t, cwe=99999) + self.assertEqual(categorize(f), DEFAULT_CATEGORY) + + def test_specificity_tie_break_uses_updated_at(self): + # Two CWE-only rules with the same specificity. The later one wins. + prod = _build_product("tie") + t = _build_test(prod) + # Use a CWE not already in the seed. + old = CategoryMapping.objects.create(cwe=42424, category="data") + # Force the older timestamp older than the newer rule. + CategoryMapping.objects.filter(pk=old.pk).update( + updated_at=timezone.now() - timedelta(days=1), + ) + new = CategoryMapping.objects.create(cwe=42424, tag_pattern=None, scanner=None, category="config") + # newer rule was created later so updated_at is more recent + f = _build_finding(t, cwe=42424) + # Both have specificity 3; newer wins. + self.assertEqual(categorize(f), new.category) + + def test_specificity_more_specific_wins(self): + prod = _build_product("spec") + # CWE-only rule (specificity 3) vs CWE + tag rule (specificity 5). + CategoryMapping.objects.create(cwe=55555, category="web") + CategoryMapping.objects.create(cwe=55555, tag_pattern="special", category="config") + t = _build_test(prod) + f = _build_finding(t, cwe=55555, tags="special") + self.assertEqual(categorize(f), "config") + + def test_categorize_queryset_single_sql(self): + prod = _build_product("qs") + t = _build_test(prod, scan_type="Prowler") + _build_finding(t, cwe=79) + _build_finding(t, cwe=732) + _build_finding(t, cwe=99999) # falls back to web + qs = categorize_queryset(Finding.objects.filter(test__engagement__product=prod)) + rows = {f.id: f.category for f in qs} + self.assertEqual(len(rows), 3) + # The two specific CWEs resolve; the fallback row defaults to "web". + self.assertIn("web", rows.values()) + self.assertIn("cloud", rows.values()) + + def test_save_computes_specificity(self): + m = CategoryMapping.objects.create(cwe=11, tag_pattern="x", scanner="Y", category="web") + self.assertEqual(m.specificity, 6) # 3 + 2 + 1 + m2 = CategoryMapping.objects.create(cwe=12, category="cloud") + self.assertEqual(m2.specificity, 3) + + +class DashboardEndpointTests(TestCase): + + """HTTP-level checks of the three dashboard endpoints.""" + + def setUp(self): + self.user = User.objects.create_user(username="dash-user", password="x") # noqa: S106 + self.user.is_staff = True + self.user.is_superuser = True # easiest way to skip role plumbing + self.user.save() + self.client = APIClient() + self.client.force_authenticate(self.user) + self.prod = _build_product("dash") + self.test = _build_test(self.prod, scan_type="Generic") + CategoryMapping.objects.create(cwe=79, category="web") + CategoryMapping.objects.create(cwe=732, category="cloud") + + def test_zero_findings(self): + r = self.client.get("/api/v2/dashboard/queue-counters/") + self.assertEqual(r.status_code, 200) + self.assertEqual(r.json(), { + "critical": 0, "high": 0, "sla_breach": 0, "fixed_this_week": 0, + }) + r = self.client.get("/api/v2/dashboard/category-counts/") + self.assertEqual(r.status_code, 200) + body = r.json() + self.assertEqual([row["id"] for row in body], list(CATEGORY_IDS)) + for row in body: + self.assertEqual(row["count"], 0) + r = self.client.get("/api/v2/dashboard/search-totals/") + self.assertEqual(r.status_code, 200) + self.assertEqual(r.json(), {"findings": 0, "categories": 0}) + + def test_one_finding(self): + _build_finding(self.test, cwe=79, severity="Critical") + r = self.client.get("/api/v2/dashboard/queue-counters/") + self.assertEqual(r.json()["critical"], 1) + r = self.client.get("/api/v2/dashboard/category-counts/") + body = {row["id"]: row["count"] for row in r.json()} + self.assertEqual(body["web"], 1) + self.assertEqual(body["cloud"], 0) + r = self.client.get("/api/v2/dashboard/search-totals/") + self.assertEqual(r.json(), {"findings": 1, "categories": 1}) + + def test_100_findings(self): + # 60 web + 40 cloud + for _ in range(60): + _build_finding(self.test, cwe=79, severity="High") + for _ in range(40): + _build_finding(self.test, cwe=732, severity="Medium") + r = self.client.get("/api/v2/dashboard/category-counts/") + body = {row["id"]: row["count"] for row in r.json()} + self.assertEqual(body["web"], 60) + self.assertEqual(body["cloud"], 40) + # search-totals.findings sums to 100 + r = self.client.get("/api/v2/dashboard/search-totals/") + self.assertEqual(r.json()["findings"], 100) + + def test_fixed_this_week_uses_mitigated_date(self): + _build_finding( + self.test, cwe=79, severity="Low", + active=False, mitigated=timezone.now() - timedelta(days=1), + ) + _build_finding( + self.test, cwe=79, severity="Low", + active=False, mitigated=timezone.now() - timedelta(days=30), + ) + r = self.client.get("/api/v2/dashboard/queue-counters/") + self.assertEqual(r.json()["fixed_this_week"], 1) + + def test_sla_breach_counts_past_dates(self): + _build_finding( + self.test, cwe=79, severity="High", + sla_expiration_date=timezone.now().date() - timedelta(days=2), + ) + _build_finding( + self.test, cwe=79, severity="High", + sla_expiration_date=timezone.now().date() + timedelta(days=10), + ) + r = self.client.get("/api/v2/dashboard/queue-counters/") + self.assertEqual(r.json()["sla_breach"], 1) + + def test_category_filter_on_findings_list(self): + _build_finding(self.test, cwe=79, severity="High") # web + _build_finding(self.test, cwe=732, severity="High") # cloud + r = self.client.get("/api/v2/findings/?category=web") + self.assertEqual(r.status_code, 200) + ids = [row["cwe"] for row in r.json()["results"]] + self.assertIn(79, ids) + self.assertNotIn(732, ids) + # Multi-value OR + r = self.client.get("/api/v2/findings/?category=web&category=cloud") + ids = [row["cwe"] for row in r.json()["results"]] + self.assertIn(79, ids) + self.assertIn(732, ids) + + +class PermissionIsolationTests(TestCase): + + """Two non-superuser users on disjoint products must see disjoint counts.""" + + def setUp(self): + CategoryMapping.objects.create(cwe=79, category="web") + # Two products, two users, one finding each. + self.pt = Product_Type.objects.create(name="iso") + self.prod_a = Product.objects.create(name="A", prod_type=self.pt, description="A") + self.prod_b = Product.objects.create(name="B", prod_type=self.pt, description="B") + self.user_a = User.objects.create_user(username="alice", password="x") # noqa: S106 + self.user_b = User.objects.create_user(username="bob", password="x") # noqa: S106 + # Use the lowest-privilege role that grants Finding_View. + role = Role.objects.filter(name__iexact="Reader").first() or Role.objects.first() + if role is not None: + Product_Member.objects.create(product=self.prod_a, user=self.user_a, role=role) + Product_Member.objects.create(product=self.prod_b, user=self.user_b, role=role) + self.test_a = _build_test(self.prod_a) + self.test_b = _build_test(self.prod_b) + _build_finding(self.test_a, cwe=79, severity="Critical") + _build_finding(self.test_b, cwe=79, severity="Critical") + + def test_users_see_only_their_products(self): + ca = APIClient() + ca.force_authenticate(self.user_a) + cb = APIClient() + cb.force_authenticate(self.user_b) + ra = ca.get("/api/v2/dashboard/queue-counters/") + rb = cb.get("/api/v2/dashboard/queue-counters/") + # Either both succeed (auth wired) or both 403/etc — what matters is + # neither user sees more than 1 critical even though 2 exist. + if ra.status_code == 200 and rb.status_code == 200: + self.assertLessEqual(ra.json()["critical"], 1) + self.assertLessEqual(rb.json()["critical"], 1)