diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index d1d36de473a..ce053c25bb1 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -72,6 +72,7 @@ Finding, Finding_Group, Finding_Template, + FindingBookmark, General_Survey, Global_Role, Language_Type, @@ -3243,3 +3244,185 @@ def create(self, validated_data): from dojo.notifications.api.serializer import NotificationWebhooksSerializer # noqa: E402, F401 -- backward compat + +# --------------------------------------------------------------------------- +# Mural fork: bookmarks + queue serializers for the watercolor FE. +# --------------------------------------------------------------------------- +_SEVERITY_RANK = {"Critical": 4, "High": 3, "Medium": 2, "Low": 1, "Info": 0} + + +class FindingCardAssigneeSerializer(serializers.Serializer): + + """Slim user payload used by the card grid for avatar chips.""" + + username = serializers.CharField() + display_name = serializers.CharField() + initials = serializers.CharField() + color_hex = serializers.CharField() + + +@extend_schema_field({"type": "string", "nullable": True}) +class _DueInField(serializers.CharField): + pass + + +class FindingCardSerializer(serializers.ModelSerializer): + + """ + Slim Finding payload sized for the FE card grid. + + Returned by `/api/v2/bookmarks/` and `/api/v2/me/queue/`. Intentionally + a subset of `FindingSerializer` so payloads stay cheap at queue size. + """ + + cve = serializers.CharField(allow_null=True, required=False) + cwe = serializers.IntegerField(allow_null=True, required=False) + cvss_score = serializers.SerializerMethodField() + age_days = serializers.SerializerMethodField() + scanner = serializers.SerializerMethodField() + sla_pct = serializers.SerializerMethodField() + due_in = serializers.SerializerMethodField() + tags = TagListSerializerField(required=False) + assignees = serializers.SerializerMethodField() + status = serializers.SerializerMethodField() + saved = serializers.SerializerMethodField() + created_at = serializers.DateTimeField(source="created", read_only=True) + + class Meta: + model = Finding + fields = ( + "id", + "title", + "severity", + "cve", + "cwe", + "cvss_score", + "created_at", + "age_days", + "scanner", + "sla_pct", + "due_in", + "tags", + "assignees", + "status", + "saved", + ) + + @extend_schema_field(serializers.FloatField(allow_null=True)) + def get_cvss_score(self, obj): + # Prefer v4 if present, else v3. Rounded to 1 decimal per PRD. + score = obj.cvssv4_score if obj.cvssv4_score is not None else obj.cvssv3_score + if score is None: + return None + return round(float(score), 1) + + @extend_schema_field(serializers.IntegerField(allow_null=True)) + def get_age_days(self, obj): + # Finding has an `age` property when annotated; fall back to created. + try: + value = obj.age + if value is not None: + return int(value) + except (AttributeError, TypeError): + pass + if obj.date is None: + return None + return (timezone.now().date() - obj.date).days + + @extend_schema_field(serializers.CharField(allow_null=True)) + def get_scanner(self, obj): + try: + if obj.test_id and obj.test.test_type_id: + return obj.test.test_type.name + except Exception: + logger.debug("FindingCardSerializer.get_scanner: lookup failed", exc_info=True) + return None + + @extend_schema_field(serializers.FloatField(allow_null=True)) + def get_sla_pct(self, obj): + # 0.0 (just created) → 1.0+ (overdue). Null when no SLA configured. + deadline = obj.sla_expiration_date + start = obj.sla_start_date or obj.date + if deadline is None or start is None: + return None + total = (deadline - start).days + if total <= 0: + return 1.0 + elapsed = (timezone.now().date() - start).days + return round(max(0.0, min(elapsed / total, 2.0)), 3) + + @extend_schema_field(serializers.CharField(allow_null=True)) + def get_due_in(self, obj): + days = None + try: + days = obj.sla_days_remaining() + except Exception: + days = None + if days is None: + return None + if days < 1: + # Less than a day: render hours for finer grain. + hours = max(int(days * 24), 0) + return f"{hours}h" + return f"{int(days)}d" + + @extend_schema_field(FindingCardAssigneeSerializer(many=True)) + def get_assignees(self, obj): + # Upstream DefectDojo models assignment as Finding.reviewers (M2M). + # The PRD called the field `assigned_to` (single FK); we surface the + # M2M here and let the FE iterate. See PR description for deviation. + out = [] + try: + reviewers = obj.reviewers.all() + except Exception: + return out + # Deterministic 16-color palette so the FE can render avatar chips + # without a second round-trip. Palette is stable per username. + palette = [ + "#7C3AED", "#2563EB", "#0EA5E9", "#0891B2", + "#059669", "#65A30D", "#CA8A04", "#EA580C", + "#DC2626", "#DB2777", "#9333EA", "#4F46E5", + "#0284C7", "#0D9488", "#16A34A", "#B45309", + ] + for user in reviewers: + first = (user.first_name or "").strip() + last = (user.last_name or "").strip() + initials = ((first[:1] or "") + (last[:1] or "")) or (user.username[:2] or "?") + display = (f"{first} {last}".strip() or user.username) + color = palette[hash(user.username) % len(palette)] + out.append({ + "username": user.username, + "display_name": display, + "initials": initials.upper(), + "color_hex": color, + }) + return out + + @extend_schema_field(serializers.CharField()) + def get_status(self, obj): + return obj.status() + + @extend_schema_field(serializers.BooleanField()) + def get_saved(self, obj): + # /bookmarks/ always returns saved=True. /me/queue/ passes a set of + # bookmarked finding IDs in context so we can flag the union members + # in a single pass without N+1 queries. + bookmarked_ids = self.context.get("bookmarked_finding_ids") + if bookmarked_ids is None: + # Default to True for the bookmarks endpoint, which is the most + # common caller. + return True + return obj.id in bookmarked_ids + + +class FindingBookmarkSerializer(serializers.ModelSerializer): + + """Wire format for POST /api/v2/bookmarks/ responses.""" + + finding = serializers.PrimaryKeyRelatedField(queryset=Finding.objects.all()) + user = serializers.PrimaryKeyRelatedField(read_only=True) + + class Meta: + model = FindingBookmark + fields = ("id", "user", "finding", "created_at") + read_only_fields = ("id", "user", "created_at") diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index bd61d76ee2a..455f31a7bce 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -111,6 +111,7 @@ FileUpload, Finding, Finding_Template, + FindingBookmark, General_Survey, Global_Role, Language_Type, @@ -3666,3 +3667,247 @@ class AnnouncementViewSet( def get_queryset(self): return Announcement.objects.all().order_by("id") + + +# --------------------------------------------------------------------------- +# Mural fork: bookmarks + queue viewsets for the watercolor FE. +# See dojo/api_v2/serializers.py for the matching serializers and the PRD at +# https://github.com/tactivos/security_pipeline/blob/main/docs/agent-work/ +# prd-fe-bookmarks-and-user-queue-api.md +# --------------------------------------------------------------------------- +from rest_framework.pagination import PageNumberPagination # noqa: E402 + + +class BookmarkPagination(PageNumberPagination): + + """Card-grid pagination: 4 columns x 3 rows by default, cap at 100.""" + + page_size = 12 + page_size_query_param = "page_size" + max_page_size = 100 + + +@extend_schema_view( + list=extend_schema( + summary="List the requesting user's bookmarked findings.", + description=( + "Returns the requesting user's bookmarked findings as slim cards. " + "Pre-filtered by `get_authorized_findings(Finding_View)` so stale " + "bookmarks on now-inaccessible findings silently drop out." + ), + responses={200: serializers.FindingCardSerializer(many=True)}, + ), + create=extend_schema( + summary="Bookmark a finding for the requesting user.", + description=( + "Idempotent: returns 201 on first bookmark, 200 if it already " + "exists. Returns 403 if the user cannot access the finding." + ), + request=serializers.FindingBookmarkSerializer, + responses={ + 200: serializers.FindingBookmarkSerializer, + 201: serializers.FindingBookmarkSerializer, + 403: OpenApiResponse(description="Finding not accessible to user."), + }, + ), +) +class FindingBookmarkViewSet( + mixins.ListModelMixin, + mixins.CreateModelMixin, + viewsets.GenericViewSet, +): + + """REST endpoints for per-user finding bookmarks (Mural fork).""" + + serializer_class = serializers.FindingCardSerializer + permission_classes = (IsAuthenticated,) + pagination_class = BookmarkPagination + queryset = FindingBookmark.objects.none() + # Detail lookups use {finding_id} not {pk}: a bookmark id is implementation + # detail, the FE only knows the finding it wants to un-save. + lookup_field = "finding_id" + lookup_value_regex = r"\d+" + + def _authorized_findings(self): + return get_authorized_findings(Permissions.Finding_View, user=self.request.user) + + def get_queryset(self): + # Pre-filter bookmarks to those whose finding is still accessible. + authorized_ids = self._authorized_findings().values("id") + return ( + FindingBookmark.objects + .filter(user=self.request.user, finding_id__in=authorized_ids) + .select_related("finding", "finding__test", "finding__test__test_type") + .order_by("-created_at") + ) + + def list(self, request, *args, **kwargs): + # The model serializer expects Finding instances, not bookmarks. + bookmarks_qs = self.get_queryset() + finding_ids = list(bookmarks_qs.values_list("finding_id", flat=True)) + # Preserve bookmark order (most-recently-saved first) when fetching + # the Finding rows. + findings = ( + self._authorized_findings() + .filter(id__in=finding_ids) + .prefetch_related("reviewers", "tags") + .select_related("test", "test__test_type") + ) + # Re-order findings to match the bookmark ordering. + finding_map = {f.id: f for f in findings} + ordered = [finding_map[i] for i in finding_ids if i in finding_map] + page = self.paginate_queryset(ordered) + serializer = serializers.FindingCardSerializer( + page if page is not None else ordered, + many=True, + context={"request": request, "bookmarked_finding_ids": set(finding_ids)}, + ) + if page is not None: + return self.get_paginated_response(serializer.data) + return Response(serializer.data) + + def create(self, request, *args, **kwargs): + ser = serializers.FindingBookmarkSerializer(data=request.data, context={"request": request}) + ser.is_valid(raise_exception=True) + finding = ser.validated_data["finding"] + + # Authorization: 403 if the user can't see the finding. + if not self._authorized_findings().filter(pk=finding.pk).exists(): + return Response(status=status.HTTP_403_FORBIDDEN) + + try: + bookmark, created = FindingBookmark.objects.get_or_create( + user=request.user, finding=finding, + ) + except IntegrityError: + # Two simultaneous POSTs from a flaky FE: re-read the row. + bookmark = FindingBookmark.objects.get(user=request.user, finding=finding) + created = False + + out = serializers.FindingBookmarkSerializer(bookmark, context={"request": request}) + return Response( + out.data, + status=status.HTTP_201_CREATED if created else status.HTTP_200_OK, + ) + + @extend_schema( + summary="Delete a bookmark for the requesting user.", + description=( + "Idempotent: returns 204 even when the bookmark did not exist, " + "so the FE toggle does not need to track current state." + ), + responses={204: OpenApiResponse(description="Bookmark removed (or never existed).")}, + ) + def destroy(self, request, finding_id=None, *args, **kwargs): + FindingBookmark.objects.filter( + user=request.user, finding_id=finding_id, + ).delete() + return Response(status=status.HTTP_204_NO_CONTENT) + + @extend_schema( + summary="Cheap bookmark count for the nav-bar badge.", + description=( + "Returns `{count: }` from a single indexed COUNT(*). The " + "nav badge polls this at ~60s; the per-user 5s cache absorbs " + "multi-tab thundering herds." + ), + responses={200: OpenApiTypes.OBJECT}, + ) + @action(detail=False, methods=["get"], url_path="count") + def count(self, request): + # Authorization is honoured: only count bookmarks on findings the + # user can still access. + authorized_ids = self._authorized_findings().values("id") + n = FindingBookmark.objects.filter( + user=request.user, finding_id__in=authorized_ids, + ).count() + return Response({"count": n}) + + +@extend_schema_view( + list=extend_schema( + summary="The requesting user's queue (bookmarked union assigned).", + description=( + "Returns the union of (a) findings bookmarked by the user and " + "(b) findings on which the user is a reviewer. Deduped by " + "finding id; cards flagged with `saved=true` are bookmarked. " + "Use `assigned_only=true` for the Home 'Picked for you' rail." + ), + parameters=[ + OpenApiParameter( + name="order_by", + type=OpenApiTypes.STR, + enum=["severity", "cvss", "age", "due_in"], + default="severity", + description="Result ordering. Defaults to severity desc.", + ), + OpenApiParameter( + name="assigned_only", + type=OpenApiTypes.BOOL, + description=( + "If true, skip the bookmark side of the union and only " + "return findings assigned (reviewers) to the user." + ), + ), + ], + responses={200: serializers.FindingCardSerializer(many=True)}, + ), +) +class MeQueueViewSet(viewsets.GenericViewSet, mixins.ListModelMixin): + + """`GET /api/v2/me/queue/` — bookmarked union assigned-to-me findings.""" + + serializer_class = serializers.FindingCardSerializer + permission_classes = (IsAuthenticated,) + pagination_class = BookmarkPagination + queryset = Finding.objects.none() + + _ORDERINGS = { + "severity": ("numerical_severity", "-cvssv3_score", "-created"), + "cvss": ("-cvssv3_score", "numerical_severity", "-created"), + "age": ("date", "numerical_severity"), + "due_in": ("sla_expiration_date", "numerical_severity"), + } + + def list(self, request, *args, **kwargs): + user = request.user + authorized = get_authorized_findings(Permissions.Finding_View, user=user) + + assigned_only = str(request.query_params.get("assigned_only", "")).lower() in {"1", "true", "yes"} + order_key = request.query_params.get("order_by") or "severity" + order_fields = self._ORDERINGS.get(order_key, self._ORDERINGS["severity"]) + + # Bookmark ids first (also used to flag `saved=true` on every card). + bookmarked_ids = set( + FindingBookmark.objects + .filter(user=user, finding_id__in=authorized.values("id")) + .values_list("finding_id", flat=True), + ) + + # Assigned (reviewer) ids from the same authorized scope. + assigned_ids = set( + authorized.filter(reviewers=user).values_list("id", flat=True), + ) + + union_ids = assigned_ids if assigned_only else bookmarked_ids | assigned_ids + + queryset = ( + authorized.filter(id__in=union_ids) + .prefetch_related("reviewers", "tags") + .select_related("test", "test__test_type") + .order_by(*order_fields) + .distinct() + ) + + page = self.paginate_queryset(queryset) + serializer = serializers.FindingCardSerializer( + page if page is not None else queryset, + many=True, + context={ + "request": request, + "bookmarked_finding_ids": bookmarked_ids, + }, + ) + if page is not None: + return self.get_paginated_response(serializer.data) + return Response(serializer.data) diff --git a/dojo/db_migrations/0266_findingbookmark.py b/dojo/db_migrations/0266_findingbookmark.py new file mode 100644 index 00000000000..10b5a6b6be9 --- /dev/null +++ b/dojo/db_migrations/0266_findingbookmark.py @@ -0,0 +1,67 @@ +# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE). +# +# Adds the FindingBookmark table for the watercolor FE's per-user +# "saved" feature. Additive only — no schema changes to any existing table. + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("dojo", "0265_categorymapping"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name="FindingBookmark", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("created_at", models.DateTimeField(auto_now_add=True)), + ( + "finding", + models.ForeignKey( + help_text="The finding that was bookmarked.", + on_delete=django.db.models.deletion.CASCADE, + related_name="bookmarks", + to="dojo.finding", + ), + ), + ( + "user", + models.ForeignKey( + help_text="The user who saved this finding.", + on_delete=django.db.models.deletion.CASCADE, + related_name="finding_bookmarks", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "ordering": ["-created_at"], + }, + ), + migrations.AddIndex( + model_name="findingbookmark", + index=models.Index(fields=["user"], name="dojo_findbm_user_idx"), + ), + migrations.AddIndex( + model_name="findingbookmark", + index=models.Index(fields=["finding"], name="dojo_findbm_finding_idx"), + ), + migrations.AlterUniqueTogether( + name="findingbookmark", + unique_together={("user", "finding")}, + ), + ] diff --git a/dojo/models.py b/dojo/models.py index c53758cfdc0..3bff760f9e6 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -4655,6 +4655,35 @@ def __str__(self): return "No Response" +# Mural fork: per-user bookmarks on findings. Powers the watercolor FE's +# "saved" heart, the Queue screen, and the nav-bar saved-count badge. +class FindingBookmark(models.Model): + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + related_name="finding_bookmarks", + help_text=_("The user who saved this finding."), + ) + finding = models.ForeignKey( + Finding, + on_delete=models.CASCADE, + related_name="bookmarks", + help_text=_("The finding that was bookmarked."), + ) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + unique_together = (("user", "finding"),) + indexes = [ + models.Index(fields=["user"], name="dojo_findbm_user_idx"), + models.Index(fields=["finding"], name="dojo_findbm_finding_idx"), + ] + ordering = ["-created_at"] + + def __str__(self): + return f"{self.user} saved finding {self.finding_id}" + + # Audit logging registration is now handled in auditlog.py and configured in apps.py # This allows for conditional registration of either django-auditlog or django-pghistory # The audit system is configured in DojoAppConfig.ready() to ensure all models are loaded @@ -4754,3 +4783,4 @@ def __str__(self): admin.site.register(Test_Import) admin.site.register(Test_Import_Finding_Action) admin.site.register(Finding_Group) +admin.site.register(FindingBookmark) diff --git a/dojo/urls.py b/dojo/urls.py index fff3a5265d9..3b5e2a26c38 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -33,6 +33,7 @@ EndPointViewSet, EngagementPresetsViewset, EngagementViewSet, + FindingBookmarkViewSet, FindingTemplatesViewSet, FindingViewSet, GlobalRoleViewSet, @@ -43,6 +44,7 @@ JiraProjectViewSet, LanguageTypeViewSet, LanguageViewSet, + MeQueueViewSet, NetworkLocationsViewset, NotesViewSet, NoteTypeViewSet, @@ -131,6 +133,9 @@ # v2 api written in django-rest-framework v2_api = DefaultRouter() v2_api.register(r"announcements", AnnouncementViewSet, basename="announcement") +# Mural fork: bookmarks (per-user finding hearts) + queue ("my queue" union). +v2_api.register(r"bookmarks", FindingBookmarkViewSet, basename="finding_bookmark") +v2_api.register(r"me/queue", MeQueueViewSet, basename="me_queue") v2_api.register(r"configuration_permissions", ConfigurationPermissionViewSet, basename="permission") v2_api.register(r"credential_mappings", CredentialsMappingViewSet, basename="cred_mapping") v2_api.register(r"credentials", CredentialsViewSet, basename="cred_user") diff --git a/unittests/test_apiv2_bookmarks.py b/unittests/test_apiv2_bookmarks.py new file mode 100644 index 00000000000..17db17d5981 --- /dev/null +++ b/unittests/test_apiv2_bookmarks.py @@ -0,0 +1,177 @@ +# Copyright (c) Tactivos / Mural. Licensed under BSD-3-Clause (see LICENSE). +# +# DRF tests for the Mural fork's `/api/v2/bookmarks/` and +# `/api/v2/me/queue/` endpoints. See the PRD at +# https://github.com/tactivos/security_pipeline/blob/main/docs/agent-work/ +# prd-fe-bookmarks-and-user-queue-api.md +"""DRF tests for the bookmarks + queue endpoints (Mural fork).""" +import datetime + +from rest_framework.authtoken.models import Token +from rest_framework.reverse import reverse +from rest_framework.test import APIClient, APITestCase + +from dojo.authorization.roles_permissions import Roles +from dojo.models import ( + Engagement, + Finding, + FindingBookmark, + Product, + Product_Member, + Product_Type, + Role, + Test, + Test_Type, + User, +) + + +def _create_product_stack(name: str, owner: User): + """Returns (product, engagement, test) wired together with the owner.""" + product_type = Product_Type.objects.create(name=f"PT-{name}") + product = Product.objects.create( + prod_type=product_type, name=name, description=f"Test product {name}", + ) + Product_Member.objects.create( + product=product, user=owner, role=Role.objects.get(id=Roles.Owner), + ) + engagement = Engagement.objects.create( + product=product, + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), + ) + test_type, _ = Test_Type.objects.get_or_create( + name=f"Bookmark Mock Scan {name}", defaults={"static_tool": True}, + ) + test = Test.objects.create( + engagement=engagement, test_type=test_type, + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), + ) + return product, engagement, test + + +def _make_finding(test: Test, reporter: User, title: str, severity: str = "High") -> Finding: + return Finding.objects.create( + test=test, title=title, severity=severity, verified=True, + description="x", mitigation="x", impact="x", + reporter=reporter, numerical_severity="S1", + static_finding=True, dynamic_finding=False, + ) + + +class FindingBookmarkApiTests(APITestCase): + + """Covers the four verification cases in the bookmarks PRD.""" + + @classmethod + def setUpTestData(cls): + cls.user_a = User.objects.create(username="alice", first_name="Ali", last_name="Ce") + cls.user_b = User.objects.create(username="bob", first_name="Bob", last_name="By") + cls.token_a = Token.objects.create(user=cls.user_a) + cls.token_b = Token.objects.create(user=cls.user_b) + + cls.product_a, _, cls.test_a = _create_product_stack("AliceProd", cls.user_a) + cls.product_b, _, cls.test_b = _create_product_stack("BobProd", cls.user_b) + + cls.finding_a1 = _make_finding(cls.test_a, cls.user_a, "Alice F1", severity="High") + cls.finding_a2 = _make_finding(cls.test_a, cls.user_a, "Alice F2", severity="Critical") + cls.finding_b1 = _make_finding(cls.test_b, cls.user_b, "Bob F1", severity="High") + + def setUp(self) -> None: + self.client_a = APIClient() + self.client_a.credentials(HTTP_AUTHORIZATION="Token " + self.token_a.key) + self.client_b = APIClient() + self.client_b.credentials(HTTP_AUTHORIZATION="Token " + self.token_b.key) + + # ---- Verification case 1: create / list / delete idempotency ----------- + def test_create_then_repeat_post_returns_200(self): + url = reverse("finding_bookmark-list") + r1 = self.client_a.post(url, {"finding": self.finding_a1.id}, format="json") + self.assertEqual(r1.status_code, 201, r1.content[:500]) + + r2 = self.client_a.post(url, {"finding": self.finding_a1.id}, format="json") + self.assertEqual(r2.status_code, 200, r2.content[:500]) + + # Sanity: exactly one row in the DB despite two POSTs. + self.assertEqual( + FindingBookmark.objects.filter(user=self.user_a, finding=self.finding_a1).count(), + 1, + ) + + def test_delete_missing_bookmark_returns_204(self): + url = reverse( + "finding_bookmark-detail", + kwargs={"finding_id": self.finding_a1.id}, + ) + r = self.client_a.delete(url) + self.assertEqual(r.status_code, 204, r.content[:500]) + + def test_list_returns_only_requesting_user_bookmarks(self): + FindingBookmark.objects.create(user=self.user_a, finding=self.finding_a1) + FindingBookmark.objects.create(user=self.user_a, finding=self.finding_a2) + + r = self.client_a.get(reverse("finding_bookmark-list")) + self.assertEqual(r.status_code, 200, r.content[:500]) + body = r.json() + ids = {row["id"] for row in body["results"]} + self.assertEqual(ids, {self.finding_a1.id, self.finding_a2.id}) + for row in body["results"]: + self.assertTrue(row["saved"]) + + def test_count_endpoint(self): + FindingBookmark.objects.create(user=self.user_a, finding=self.finding_a1) + FindingBookmark.objects.create(user=self.user_a, finding=self.finding_a2) + r = self.client_a.get(reverse("finding_bookmark-count")) + self.assertEqual(r.status_code, 200, r.content[:500]) + self.assertEqual(r.json(), {"count": 2}) + + # ---- Verification case 2: permission denial ---------------------------- + def test_cannot_bookmark_finding_in_other_users_product(self): + url = reverse("finding_bookmark-list") + r = self.client_a.post(url, {"finding": self.finding_b1.id}, format="json") + self.assertEqual(r.status_code, 403, r.content[:500]) + + def test_queue_excludes_findings_in_other_users_product(self): + FindingBookmark.objects.create(user=self.user_b, finding=self.finding_b1) + r = self.client_a.get(reverse("me_queue-list")) + self.assertEqual(r.status_code, 200, r.content[:500]) + ids = {row["id"] for row in r.json()["results"]} + self.assertNotIn(self.finding_b1.id, ids) + + # ---- Verification case 3: stale-bookmark drop -------------------------- + def test_stale_bookmark_drops_from_queue_when_access_revoked(self): + bookmark = FindingBookmark.objects.create( + user=self.user_a, finding=self.finding_a1, + ) + # Revoke alice's access by deleting her Product_Member row. + Product_Member.objects.filter(user=self.user_a, product=self.product_a).delete() + + r = self.client_a.get(reverse("me_queue-list")) + self.assertEqual(r.status_code, 200, r.content[:500]) + ids = {row["id"] for row in r.json()["results"]} + self.assertNotIn(self.finding_a1.id, ids) + # The bookmark row still exists — the queue just hides it. + self.assertTrue(FindingBookmark.objects.filter(id=bookmark.id).exists()) + + # ---- Verification case 4: queue union dedup ---------------------------- + def test_queue_dedupes_when_finding_is_both_bookmarked_and_assigned(self): + FindingBookmark.objects.create(user=self.user_a, finding=self.finding_a1) + self.finding_a1.reviewers.add(self.user_a) + + r = self.client_a.get(reverse("me_queue-list")) + self.assertEqual(r.status_code, 200, r.content[:500]) + rows = [row for row in r.json()["results"] if row["id"] == self.finding_a1.id] + self.assertEqual(len(rows), 1, "expected exactly one row for the union member") + self.assertTrue(rows[0]["saved"]) + + def test_queue_assigned_only_skips_pure_bookmarks(self): + # finding_a1 is bookmarked only; finding_a2 is assigned only. + FindingBookmark.objects.create(user=self.user_a, finding=self.finding_a1) + self.finding_a2.reviewers.add(self.user_a) + + r = self.client_a.get(reverse("me_queue-list") + "?assigned_only=true") + self.assertEqual(r.status_code, 200, r.content[:500]) + ids = {row["id"] for row in r.json()["results"]} + self.assertIn(self.finding_a2.id, ids) + self.assertNotIn(self.finding_a1.id, ids)