Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion dojo/admin.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from django.contrib import admin
from django.contrib import admin, messages
from django.contrib.admin.sites import NotRegistered
from polymorphic.admin import PolymorphicChildModelAdmin, PolymorphicParentModelAdmin

from dojo.models import (
Answer,
Answered_Survey,
CategoryMapping,
Choice,
ChoiceAnswer,
ChoiceQuestion,
Engagement_Survey,
Finding,
Question,
TextAnswer,
TextQuestion,
Expand Down Expand Up @@ -99,3 +101,60 @@ class AnswerParentAdmin(PolymorphicParentModelAdmin):
admin.site.register(Answer, AnswerParentAdmin)
admin.site.register(Engagement_Survey)
admin.site.register(Answered_Survey)


# ===================================================
# Mural watercolor FE: dashboard category taxonomy
# ===================================================


@admin.register(CategoryMapping)
class CategoryMappingAdmin(admin.ModelAdmin):

"""
Editable rules table. The ``Test this mapping`` action evaluates the
selected rules against the most recent N findings the admin can see and
surfaces the resolved category in a Django messages banner. See
``prd-fe-dashboard-aggregations-and-category-taxonomy.md`` §10.
"""

list_display = ("__str__", "category", "specificity", "updated_at")
list_filter = ("category",)
search_fields = ("tag_pattern", "scanner")
readonly_fields = ("specificity", "updated_at")
actions = ("test_this_mapping",)

def test_this_mapping(self, request, queryset):
from dojo.taxonomy.categorize import categorize # noqa: PLC0415

sample = list(Finding.objects.order_by("-id")[:20])
if not sample:
self.message_user(
request,
"No findings exist yet — nothing to evaluate against.",
level=messages.WARNING,
)
return

# We restrict the evaluated rule-set to the rows the admin selected
# so the action reflects "what would change if only these rules
# existed" — handy when iterating on a fix.
mappings = list(queryset.order_by("-specificity", "-updated_at"))
if not mappings:
self.message_user(
request,
"Select at least one mapping row to test.",
level=messages.WARNING,
)
return

for f in sample:
resolved = categorize(f, mappings=mappings)
self.message_user(
request,
f"Finding #{f.id} (cwe={f.cwe}, scanner="
f"{getattr(getattr(f, 'test', None), 'scan_type', None)}) -> {resolved}",
level=messages.INFO,
)

test_this_mapping.short_description = "Test this mapping against 20 recent findings"
19 changes: 19 additions & 0 deletions dojo/api_v2/dashboard/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE).
"""
Dashboard aggregation API package.

Three viewsets (queue-counters, category-counts, search-totals) consumed by
the watercolor FE Home screen. See
``prd-fe-dashboard-aggregations-and-category-taxonomy.md`` for the contract.
"""
from dojo.api_v2.dashboard.views import (
CategoryCountsViewSet,
QueueCountersViewSet,
SearchTotalsViewSet,
)

__all__ = [
"CategoryCountsViewSet",
"QueueCountersViewSet",
"SearchTotalsViewSet",
]
209 changes: 209 additions & 0 deletions dojo/api_v2/dashboard/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE).
"""
Dashboard aggregation viewsets.

All endpoints are scoped to the requesting user's accessible findings via
``dojo.finding.queries.get_authorized_findings``. Each response is cached
per-user for 60 seconds, keyed on ``MAX(finding.updated)`` so a mutation
implicitly busts the cache.

Endpoints (all under ``/api/v2/dashboard/``):

* ``GET queue-counters/`` → ``{critical, high, sla_breach, fixed_this_week}``
* ``GET category-counts/`` → ``[{id, count}]`` (always 6 entries in fixed order)
* ``GET search-totals/`` → ``{findings, categories}``
"""
from __future__ import annotations

import hashlib
import logging
from datetime import timedelta

from django.core.cache import cache
from django.db.models import Count, Max
from django.utils import timezone
from drf_spectacular.utils import OpenApiResponse, extend_schema
from rest_framework import status, viewsets
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from dojo.authorization.roles_permissions import Permissions
from dojo.finding.queries import get_authorized_findings
from dojo.taxonomy.categorize import CATEGORY_IDS, categorize_queryset

logger = logging.getLogger(__name__)

CACHE_TTL_SECONDS = 60
SEARCH_TOTALS_CATEGORY_CAP = 8


def _cache_key(prefix: str, user_id: int, fingerprint: str) -> str:
return f"dojo:dashboard:{prefix}:user={user_id}:fp={fingerprint}"


def _user_findings(user):
"""Authorized findings for the requesting user."""
return get_authorized_findings(Permissions.Finding_View, user=user)


def _fingerprint(qs) -> str:
"""
Build the per-user cache fingerprint: max(updated) over the user's
accessible findings, hashed for compact key storage.
"""
last = qs.order_by().aggregate(m=Max("updated"))["m"]
seed = "none" if last is None else last.isoformat()
return hashlib.md5(seed.encode("utf-8")).hexdigest()[:16] # noqa: S324 # fingerprint, not security


def _open_findings(qs):
"""Open findings = active, not mitigated, not false_p, not out_of_scope."""
return qs.filter(
active=True,
is_mitigated=False,
false_p=False,
out_of_scope=False,
duplicate=False,
)


class QueueCountersViewSet(viewsets.ViewSet):

"""Hero counter grid for Home: critical / high / SLA breach / fixed this week."""

permission_classes = (IsAuthenticated,)
# provides a stable URL name (``queue-counters-list``) that DRF's
# DefaultRouter expects on a ViewSet.
basename = "dashboard_queue_counters"

@extend_schema(
operation_id="dashboard_queue_counters",
description=(
"Returns the four hero counters for the watercolor Home screen. "
"All counts are scoped to findings the requesting user is "
"authorized to see. ``sla_breach`` counts open findings whose "
"``sla_expiration_date`` is in the past (returns 0 if SLA "
"tracking is not in use). ``fixed_this_week`` counts findings "
"whose ``mitigated`` timestamp falls in the last 7 days — this "
"is an undercount relative to the full Mitigated/Inactive/"
"False Positive transition set because this fork does not "
"maintain a status-history table."
),
responses={
200: OpenApiResponse(description="Counters payload"),
},
)
def list(self, request):
qs = _user_findings(request.user)
fp = _fingerprint(qs)
key = _cache_key("queue", request.user.id, fp)
cached = cache.get(key)
if cached is not None:
return Response(cached)

open_qs = _open_findings(qs)
critical = open_qs.filter(severity="Critical").count()
high = open_qs.filter(severity="High").count()

sla_breach = 0
try:
sla_breach = open_qs.filter(
sla_expiration_date__isnull=False,
sla_expiration_date__lt=timezone.now().date(),
).count()
except Exception:
# Field absent on this fork — keep the contract and return 0.
sla_breach = 0

week_ago = timezone.now() - timedelta(days=7)
fixed_this_week = qs.filter(mitigated__gte=week_ago).count()

payload = {
"critical": critical,
"high": high,
"sla_breach": sla_breach,
"fixed_this_week": fixed_this_week,
}
cache.set(key, payload, CACHE_TTL_SECONDS)
return Response(payload)


class CategoryCountsViewSet(viewsets.ViewSet):

"""Six-tile category strip on Home."""

permission_classes = (IsAuthenticated,)
basename = "dashboard_category_counts"

@extend_schema(
operation_id="dashboard_category_counts",
description=(
"Returns open-finding counts grouped by the six dashboard "
"categories. Always emits one entry per category id in the "
"fixed order: web, cloud, supply, identity, data, config. "
"Missing categories are returned as ``count: 0``. Scoped to "
"findings the requesting user can see."
),
responses={200: OpenApiResponse(description="Category counts array")},
)
def list(self, request):
qs = _user_findings(request.user)
fp = _fingerprint(qs)
key = _cache_key("categories", request.user.id, fp)
cached = cache.get(key)
if cached is not None:
return Response(cached)

open_qs = _open_findings(qs)
annotated = categorize_queryset(open_qs)
rows = (
annotated.values("category")
.order_by()
.annotate(count=Count("id", distinct=True))
)
by_category = {r["category"]: r["count"] for r in rows}
payload = [
{"id": cid, "count": int(by_category.get(cid, 0))}
for cid in CATEGORY_IDS
]
cache.set(key, payload, CACHE_TTL_SECONDS)
return Response(payload)


class SearchTotalsViewSet(viewsets.ViewSet):

"""Search-pill placeholder totals: number of findings + number of categories."""

permission_classes = (IsAuthenticated,)
basename = "dashboard_search_totals"

@extend_schema(
operation_id="dashboard_search_totals",
description=(
"Counts feeding the Home search placeholder pill: ``findings`` "
"is the total open-findings count visible to the user, "
"``categories`` is the count of distinct dashboard category "
f"ids with at least one open finding (capped at {SEARCH_TOTALS_CATEGORY_CAP}). "
"Scoped to the requesting user's accessible findings."
),
responses={200: OpenApiResponse(description="Search totals payload")},
)
def list(self, request):
qs = _user_findings(request.user)
fp = _fingerprint(qs)
key = _cache_key("search", request.user.id, fp)
cached = cache.get(key)
if cached is not None:
return Response(cached)

open_qs = _open_findings(qs)
findings = open_qs.count()
annotated = categorize_queryset(open_qs)
distinct_cats = (
annotated.values_list("category", flat=True).order_by().distinct().count()
)
categories = min(distinct_cats, SEARCH_TOTALS_CATEGORY_CAP)

payload = {"findings": findings, "categories": categories}
cache.set(key, payload, CACHE_TTL_SECONDS)
return Response(payload, status=status.HTTP_200_OK)
54 changes: 54 additions & 0 deletions dojo/db_migrations/0265_categorymapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE).
"""Create the CategoryMapping table and load the starter rules fixture."""
from django.core.management import call_command
from django.db import migrations, models


FIXTURE = "category_mappings"


def load_fixture(apps, schema_editor):
call_command("loaddata", FIXTURE, app_label="dojo")


def unload_fixture(apps, schema_editor):
CategoryMapping = apps.get_model("dojo", "CategoryMapping")
CategoryMapping.objects.all().delete()


class Migration(migrations.Migration):

dependencies = [
("dojo", "0264_alter_url_identity_hash_alter_urlevent_identity_hash"),
]

operations = [
migrations.CreateModel(
name="CategoryMapping",
fields=[
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("cwe", models.IntegerField(blank=True, help_text="CWE number to match (matches Finding.cwe exactly).", null=True)),
("tag_pattern", models.CharField(blank=True, help_text="Substring matched (case-insensitive) against any of the Finding's tags.", max_length=128, null=True)),
("scanner", models.CharField(blank=True, help_text="Scanner name matched against Finding.test.scan_type or test_type.name.", max_length=128, null=True)),
("category", models.CharField(
choices=[
("web", "Web & API"),
("cloud", "Cloud & infra"),
("supply", "Supply chain"),
("identity", "Identity & auth"),
("data", "Data exposure"),
("config", "Misconfig"),
],
help_text="Dashboard category id this rule resolves to.",
max_length=16,
)),
("specificity", models.IntegerField(default=0, help_text="Computed at save time: CWE=3 + tag=2 + scanner=1. Higher wins.")),
("updated_at", models.DateTimeField(auto_now=True)),
],
options={
"ordering": ("-specificity", "-updated_at"),
"unique_together": {("cwe", "tag_pattern", "scanner")},
},
),
migrations.RunPython(load_fixture, unload_fixture),
]
Loading
Loading