From 5421fad5185a81d6e7a99ade025d48e78184be5b Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Wed, 15 Apr 2026 12:56:51 +1000 Subject: [PATCH 01/10] feat: add manual_scan_result_detail table keyed to scan_result --- ...2fc_add_manual_scan_result_detail_table.py | 44 +++++++++++++++++++ .../app/models/manual_scan_result_detail.py | 0 2 files changed, 44 insertions(+) create mode 100644 backend-api/alembic/versions/ccf7645372fc_add_manual_scan_result_detail_table.py create mode 100644 backend-api/app/models/manual_scan_result_detail.py diff --git a/backend-api/alembic/versions/ccf7645372fc_add_manual_scan_result_detail_table.py b/backend-api/alembic/versions/ccf7645372fc_add_manual_scan_result_detail_table.py new file mode 100644 index 000000000..daa053755 --- /dev/null +++ b/backend-api/alembic/versions/ccf7645372fc_add_manual_scan_result_detail_table.py @@ -0,0 +1,44 @@ +"""add manual_scan_result_detail table + +Revision ID: ccf7645372fc +Revises: j1k2l3m4n567 +Create Date: 2026-04-13 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "ccf7645372fc" +down_revision: Union[str, Sequence[str], None] = "j1k2l3m4n567" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_table( + "manual_scan_result_detail", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("scan_result_id", sa.Integer(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("comment", sa.Text(), nullable=True), + sa.Column( + "created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False + ), + sa.ForeignKeyConstraint(["scan_result_id"], ["scan_result.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("scan_result_id"), + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_table("manual_scan_result_detail") \ No newline at end of file diff --git a/backend-api/app/models/manual_scan_result_detail.py b/backend-api/app/models/manual_scan_result_detail.py new file mode 100644 index 000000000..e69de29bb From 31f7c7f19f12a5cc491dae58a770966177d9695b Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Wed, 15 Apr 2026 13:03:45 +1000 Subject: [PATCH 02/10] fix: add manual_scan_result_detail model content --- .../app/models/manual_scan_result_detail.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/backend-api/app/models/manual_scan_result_detail.py b/backend-api/app/models/manual_scan_result_detail.py index e69de29bb..36b8d1319 100644 --- a/backend-api/app/models/manual_scan_result_detail.py +++ b/backend-api/app/models/manual_scan_result_detail.py @@ -0,0 +1,38 @@ +"""ManualScanResultDetail model for extra data on manually verified controls.""" + +from datetime import datetime +from typing import TYPE_CHECKING, Optional + +from sqlalchemy import ForeignKey, Text +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.sql import func + +from app.db.base import Base + +if TYPE_CHECKING: + from app.models.scan_result import ScanResult + from app.models.user import User + + +class ManualScanResultDetail(Base): + """Extra detail for scan results that were manually verified.""" + + __tablename__ = "manual_scan_result_detail" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + scan_result_id: Mapped[int] = mapped_column( + ForeignKey("scan_result.id", ondelete="CASCADE"), nullable=False, unique=True + ) + user_id: Mapped[int] = mapped_column( + ForeignKey("user.id", ondelete="CASCADE"), nullable=False + ) + comment: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + + created_at: Mapped[datetime] = mapped_column(server_default=func.now(), nullable=False) + updated_at: Mapped[datetime] = mapped_column( + server_default=func.now(), onupdate=func.now(), nullable=False + ) + + # Relationships + scan_result: Mapped["ScanResult"] = relationship() + user: Mapped["User"] = relationship() \ No newline at end of file From 207ae36025064664a076591ee9e6ad1550d017bf Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Wed, 15 Apr 2026 13:07:39 +1000 Subject: [PATCH 03/10] fix: register ManualScanResultDetail in models init --- backend-api/app/models/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend-api/app/models/__init__.py b/backend-api/app/models/__init__.py index 27c476e4d..d607c0777 100644 --- a/backend-api/app/models/__init__.py +++ b/backend-api/app/models/__init__.py @@ -8,6 +8,7 @@ from app.models.aws_connection import AWSConnection from app.models.platform import Platform from app.models.scan_result import ScanResult +from app.models.manual_scan_result_detail import ManualScanResultDetail from app.models.compliance import Scan from app.models.evidence_validation import EvidenceValidation from app.models.contact import ContactSubmission, SubmissionNote, SubmissionHistory @@ -23,6 +24,7 @@ "AWSConnection", "Platform", "ScanResult", + "ManualScanResultDetail", "Scan", "EvidenceValidation", "ContactSubmission", From 4f503deedc07f6561a16743cb0213e62561a1324 Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Thu, 16 Apr 2026 11:50:20 +1000 Subject: [PATCH 04/10] feat: add Pydantic schemas and CRUD endpoints for manual verification --- backend-api/app/api/v1/manual_verification.py | 110 ++++++++++++++++++ backend-api/app/api/v1/router.py | 4 + backend-api/app/schemas/__init__.py | 8 ++ .../app/schemas/manual_scan_result_detail.py | 31 +++++ 4 files changed, 153 insertions(+) create mode 100644 backend-api/app/api/v1/manual_verification.py create mode 100644 backend-api/app/schemas/manual_scan_result_detail.py diff --git a/backend-api/app/api/v1/manual_verification.py b/backend-api/app/api/v1/manual_verification.py new file mode 100644 index 000000000..37109923e --- /dev/null +++ b/backend-api/app/api/v1/manual_verification.py @@ -0,0 +1,110 @@ +"""Manual scan result detail API endpoints.""" + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.auth import get_current_user +from app.db.session import get_async_session +from app.models.user import User +from app.models.manual_scan_result_detail import ManualScanResultDetail +from app.schemas.manual_scan_result_detail import ( + ManualScanResultDetailCreate, + ManualScanResultDetailRead, + ManualScanResultDetailUpdate, +) + +router = APIRouter(prefix="/manual-verification", tags=["Manual Verification"]) + + +@router.post("/", response_model=ManualScanResultDetailRead, status_code=status.HTTP_201_CREATED) +async def create_manual_verification( + data: ManualScanResultDetailCreate, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Submit a manual verification for a scan result.""" + detail = ManualScanResultDetail( + scan_result_id=data.scan_result_id, + user_id=current_user.id, + comment=data.comment, + ) + db.add(detail) + await db.commit() + await db.refresh(detail) + return detail + + +@router.get("/{detail_id}", response_model=ManualScanResultDetailRead) +async def get_manual_verification( + detail_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Get a manual verification by ID.""" + result = await db.execute( + select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) + ) + detail = result.scalar_one_or_none() + if not detail: + raise HTTPException(status_code=404, detail="Manual verification not found") + return detail + + +@router.get("/by-scan-result/{scan_result_id}", response_model=ManualScanResultDetailRead) +async def get_manual_verification_by_scan_result( + scan_result_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Get a manual verification by scan result ID.""" + result = await db.execute( + select(ManualScanResultDetail).where( + ManualScanResultDetail.scan_result_id == scan_result_id + ) + ) + detail = result.scalar_one_or_none() + if not detail: + raise HTTPException(status_code=404, detail="Manual verification not found for this scan result") + return detail + + +@router.patch("/{detail_id}", response_model=ManualScanResultDetailRead) +async def update_manual_verification( + detail_id: int, + update: ManualScanResultDetailUpdate, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Update a manual verification comment.""" + result = await db.execute( + select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) + ) + detail = result.scalar_one_or_none() + if not detail: + raise HTTPException(status_code=404, detail="Manual verification not found") + + if update.comment is not None: + detail.comment = update.comment + + await db.commit() + await db.refresh(detail) + return detail + + +@router.delete("/{detail_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_manual_verification( + detail_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Delete a manual verification.""" + result = await db.execute( + select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) + ) + detail = result.scalar_one_or_none() + if not detail: + raise HTTPException(status_code=404, detail="Manual verification not found") + + await db.delete(detail) + await db.commit() \ No newline at end of file diff --git a/backend-api/app/api/v1/router.py b/backend-api/app/api/v1/router.py index c648004af..6fb434d63 100644 --- a/backend-api/app/api/v1/router.py +++ b/backend-api/app/api/v1/router.py @@ -5,6 +5,7 @@ contact, evidence, m365_connections, + manual_verification, platforms, scans, settings, @@ -39,3 +40,6 @@ # User settings routes api_router.include_router(settings.router) + +# Manual verification routes +api_router.include_router(manual_verification.router) diff --git a/backend-api/app/schemas/__init__.py b/backend-api/app/schemas/__init__.py index e6b5ceadc..d40134712 100644 --- a/backend-api/app/schemas/__init__.py +++ b/backend-api/app/schemas/__init__.py @@ -4,6 +4,11 @@ from app.schemas.validator_match import ValidatorMatch from app.schemas.validator_result import ValidatorResult from app.schemas.validator_summary import ValidatorSummary +from app.schemas.manual_scan_result_detail import ( + ManualScanResultDetailCreate, + ManualScanResultDetailRead, + ManualScanResultDetailUpdate, +) __all__ = [ "UserRead", @@ -13,4 +18,7 @@ "ValidatorMatch", "ValidatorResult", "ValidatorSummary", + "ManualScanResultDetailCreate", + "ManualScanResultDetailRead", + "ManualScanResultDetailUpdate", ] diff --git a/backend-api/app/schemas/manual_scan_result_detail.py b/backend-api/app/schemas/manual_scan_result_detail.py new file mode 100644 index 000000000..29f5e66b3 --- /dev/null +++ b/backend-api/app/schemas/manual_scan_result_detail.py @@ -0,0 +1,31 @@ +"""Pydantic schemas for manual scan result detail.""" + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class ManualScanResultDetailCreate(BaseModel): + """Schema for creating a manual scan result detail.""" + + scan_result_id: int + comment: str | None = None + + +class ManualScanResultDetailUpdate(BaseModel): + """Schema for updating a manual scan result detail.""" + + comment: str | None = None + + +class ManualScanResultDetailRead(BaseModel): + """Schema for reading a manual scan result detail.""" + + id: int + scan_result_id: int + user_id: int + comment: str | None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) \ No newline at end of file From 5c877faace68dad1322ab9f84a513d2eb4cd147e Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Wed, 13 May 2026 16:19:31 +1000 Subject: [PATCH 05/10] fix: add ownership enforcement and unit tests for manual verification endpoints --- backend-api/app/api/v1/manual_verification.py | 13 + backend-api/tests/__init__.py | 0 backend-api/tests/test_manual_verification.py | 279 ++++++++++++++++++ 3 files changed, 292 insertions(+) create mode 100644 backend-api/tests/__init__.py create mode 100644 backend-api/tests/test_manual_verification.py diff --git a/backend-api/app/api/v1/manual_verification.py b/backend-api/app/api/v1/manual_verification.py index 37109923e..b449d2b82 100644 --- a/backend-api/app/api/v1/manual_verification.py +++ b/backend-api/app/api/v1/manual_verification.py @@ -17,6 +17,15 @@ router = APIRouter(prefix="/manual-verification", tags=["Manual Verification"]) +def _check_ownership(detail: ManualScanResultDetail, user: User) -> None: + """Raise 403 if the current user does not own this record.""" + if detail.user_id != user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to access this record", + ) + + @router.post("/", response_model=ManualScanResultDetailRead, status_code=status.HTTP_201_CREATED) async def create_manual_verification( data: ManualScanResultDetailCreate, @@ -48,6 +57,7 @@ async def get_manual_verification( detail = result.scalar_one_or_none() if not detail: raise HTTPException(status_code=404, detail="Manual verification not found") + _check_ownership(detail, current_user) return detail @@ -66,6 +76,7 @@ async def get_manual_verification_by_scan_result( detail = result.scalar_one_or_none() if not detail: raise HTTPException(status_code=404, detail="Manual verification not found for this scan result") + _check_ownership(detail, current_user) return detail @@ -83,6 +94,7 @@ async def update_manual_verification( detail = result.scalar_one_or_none() if not detail: raise HTTPException(status_code=404, detail="Manual verification not found") + _check_ownership(detail, current_user) if update.comment is not None: detail.comment = update.comment @@ -105,6 +117,7 @@ async def delete_manual_verification( detail = result.scalar_one_or_none() if not detail: raise HTTPException(status_code=404, detail="Manual verification not found") + _check_ownership(detail, current_user) await db.delete(detail) await db.commit() \ No newline at end of file diff --git a/backend-api/tests/__init__.py b/backend-api/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend-api/tests/test_manual_verification.py b/backend-api/tests/test_manual_verification.py new file mode 100644 index 000000000..d8e986e86 --- /dev/null +++ b/backend-api/tests/test_manual_verification.py @@ -0,0 +1,279 @@ +"""Tests for manual verification CRUD endpoints.""" + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker + +from app.db.base import Base +from app.db.session import get_async_session +from app.core.auth import get_current_user +from app.models.user import User, Role +from app.models.scan_result import ScanResult +from app.models.compliance import Scan +from app.models.manual_scan_result_detail import ManualScanResultDetail +from app.main import app + +#Test database setup +TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db" +engine = create_async_engine(TEST_DATABASE_URL, echo=False) +TestSession = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + +#Fake users +def make_user(id: int, email: str) -> User: + user = User.__new__(User) + user.id = id + user.email = email + user.role = Role.ADMIN + user.is_active = True + user.is_superuser = False + user.is_verified = True + user.hashed_password = "fake" + return user + + +user_a = make_user(1, "alice@test.com") +user_b = make_user(2, "bob@test.com") +current_test_user = user_a + + +#Dependency overrides +async def override_get_session(): + async with TestSession() as session: + yield session + + +async def override_get_current_user(): + return current_test_user + + +app.dependency_overrides[get_async_session] = override_get_session +app.dependency_overrides[get_current_user] = override_get_current_user + + +#Fixtures +@pytest_asyncio.fixture(autouse=True) +async def setup_database(): + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + yield + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + + +@pytest_asyncio.fixture +async def seed_scan(): + """Create a scan and scan_result for testing.""" + async with TestSession() as session: + scan = Scan( + m365_connection_id=1, + status="completed", + benchmark_id="cis-m365-v6", + ) + session.add(scan) + await session.commit() + await session.refresh(scan) + + scan_result = ScanResult( + scan_id=scan.id, + control_id="1.1.2", + status="pending", + ) + session.add(scan_result) + await session.commit() + await session.refresh(scan_result) + + # Create a second scan_result for duplicate tests + scan_result_2 = ScanResult( + scan_id=scan.id, + control_id="1.1.3", + status="pending", + ) + session.add(scan_result_2) + await session.commit() + await session.refresh(scan_result_2) + + return scan_result.id, scan_result_2.id + + +@pytest.fixture +def client(): + return AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + ) + + +#Tests + +class TestCreateManualVerification: + """Tests for POST /v1/manual-verification/""" + + @pytest.mark.asyncio + async def test_create_success(self, client, seed_scan): + scan_result_id, _ = await seed_scan + async with client as c: + response = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "Verified in Entra ID admin center", + }) + assert response.status_code == 201 + data = response.json() + assert data["scan_result_id"] == scan_result_id + assert data["user_id"] == user_a.id + assert data["comment"] == "Verified in Entra ID admin center" + + @pytest.mark.asyncio + async def test_create_duplicate_scan_result_rejected(self, client, seed_scan): + scan_result_id, _ = await seed_scan + async with client as c: + await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "First verification", + }) + response = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "Duplicate attempt", + }) + assert response.status_code in (409, 500) # unique constraint violation + + +class TestGetManualVerification: + """Tests for GET /v1/manual-verification/{detail_id}""" + + @pytest.mark.asyncio + async def test_get_own_record(self, client, seed_scan): + scan_result_id, _ = await seed_scan + async with client as c: + create_resp = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "Test", + }) + detail_id = create_resp.json()["id"] + response = await c.get(f"/v1/manual-verification/{detail_id}") + assert response.status_code == 200 + assert response.json()["id"] == detail_id + + @pytest.mark.asyncio + async def test_get_nonexistent_returns_404(self, client): + async with client as c: + response = await c.get("/v1/manual-verification/99999") + assert response.status_code == 404 + + @pytest.mark.asyncio + async def test_get_other_users_record_returns_403(self, client, seed_scan): + global current_test_user + scan_result_id, _ = await seed_scan + + # Create as user_a + current_test_user = user_a + async with client as c: + create_resp = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "Alice's verification", + }) + detail_id = create_resp.json()["id"] + + # Try to read as user_b + current_test_user = user_b + async with client as c: + response = await c.get(f"/v1/manual-verification/{detail_id}") + assert response.status_code == 403 + + current_test_user = user_a # reset + + +class TestUpdateManualVerification: + """Tests for PATCH /v1/manual-verification/{detail_id}""" + + @pytest.mark.asyncio + async def test_update_own_comment(self, client, seed_scan): + scan_result_id, _ = await seed_scan + async with client as c: + create_resp = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "Original comment", + }) + detail_id = create_resp.json()["id"] + response = await c.patch(f"/v1/manual-verification/{detail_id}", json={ + "comment": "Updated comment", + }) + assert response.status_code == 200 + assert response.json()["comment"] == "Updated comment" + + @pytest.mark.asyncio + async def test_update_nonexistent_returns_404(self, client): + async with client as c: + response = await c.patch("/v1/manual-verification/99999", json={ + "comment": "Doesn't matter", + }) + assert response.status_code == 404 + + @pytest.mark.asyncio + async def test_update_other_users_record_returns_403(self, client, seed_scan): + global current_test_user + scan_result_id, _ = await seed_scan + + # Create as user_a + current_test_user = user_a + async with client as c: + create_resp = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "Alice's verification", + }) + detail_id = create_resp.json()["id"] + + # Try to update as user_b + current_test_user = user_b + async with client as c: + response = await c.patch(f"/v1/manual-verification/{detail_id}", json={ + "comment": "Bob trying to edit", + }) + assert response.status_code == 403 + + current_test_user = user_a # reset + + +class TestDeleteManualVerification: + """Tests for DELETE /v1/manual-verification/{detail_id}""" + + @pytest.mark.asyncio + async def test_delete_own_record(self, client, seed_scan): + scan_result_id, _ = await seed_scan + async with client as c: + create_resp = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "To be deleted", + }) + detail_id = create_resp.json()["id"] + response = await c.delete(f"/v1/manual-verification/{detail_id}") + assert response.status_code == 204 + + @pytest.mark.asyncio + async def test_delete_nonexistent_returns_404(self, client): + async with client as c: + response = await c.delete("/v1/manual-verification/99999") + assert response.status_code == 404 + + @pytest.mark.asyncio + async def test_delete_other_users_record_returns_403(self, client, seed_scan): + global current_test_user + scan_result_id, _ = await seed_scan + + # Create as user_a + current_test_user = user_a + async with client as c: + create_resp = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": "Alice's verification", + }) + detail_id = create_resp.json()["id"] + + # Try to delete as user_b + current_test_user = user_b + async with client as c: + response = await c.delete(f"/v1/manual-verification/{detail_id}") + assert response.status_code == 403 + + current_test_user = user_a # reset \ No newline at end of file From 7869cc8f1bf2aa0227b3274a94280d18ca8c3604 Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Wed, 13 May 2026 17:49:04 +1000 Subject: [PATCH 06/10] fix: resolve linting errors in tests --- backend-api/tests/test_manual_verification.py | 153 +++++++----------- 1 file changed, 58 insertions(+), 95 deletions(-) diff --git a/backend-api/tests/test_manual_verification.py b/backend-api/tests/test_manual_verification.py index d8e986e86..0d19a28f9 100644 --- a/backend-api/tests/test_manual_verification.py +++ b/backend-api/tests/test_manual_verification.py @@ -14,16 +14,16 @@ from app.models.manual_scan_result_detail import ManualScanResultDetail from app.main import app -#Test database setup +# ── Test database setup ── TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db" engine = create_async_engine(TEST_DATABASE_URL, echo=False) TestSession = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) -#Fake users -def make_user(id: int, email: str) -> User: - user = User.__new__(User) - user.id = id +# ── Fake users ── +def _make_user(user_id: int, email: str) -> User: + user = object.__new__(User) + user.id = user_id user.email = email user.role = Role.ADMIN user.is_active = True @@ -33,12 +33,12 @@ def make_user(id: int, email: str) -> User: return user -user_a = make_user(1, "alice@test.com") -user_b = make_user(2, "bob@test.com") +user_a = _make_user(1, "alice@test.com") +user_b = _make_user(2, "bob@test.com") current_test_user = user_a -#Dependency overrides +# ── Dependency overrides ── async def override_get_session(): async with TestSession() as session: yield session @@ -52,7 +52,7 @@ async def override_get_current_user(): app.dependency_overrides[get_current_user] = override_get_current_user -#Fixtures +# ── Fixtures ── @pytest_asyncio.fixture(autouse=True) async def setup_database(): async with engine.begin() as conn: @@ -66,46 +66,43 @@ async def setup_database(): async def seed_scan(): """Create a scan and scan_result for testing.""" async with TestSession() as session: - scan = Scan( - m365_connection_id=1, - status="completed", - benchmark_id="cis-m365-v6", - ) + scan = Scan(m365_connection_id=1, status="completed", benchmark_id="cis-m365-v6") session.add(scan) await session.commit() await session.refresh(scan) - scan_result = ScanResult( - scan_id=scan.id, - control_id="1.1.2", - status="pending", - ) - session.add(scan_result) + sr1 = ScanResult(scan_id=scan.id, control_id="1.1.2", status="pending") + session.add(sr1) await session.commit() - await session.refresh(scan_result) - - # Create a second scan_result for duplicate tests - scan_result_2 = ScanResult( - scan_id=scan.id, - control_id="1.1.3", - status="pending", - ) - session.add(scan_result_2) + await session.refresh(sr1) + + sr2 = ScanResult(scan_id=scan.id, control_id="1.1.3", status="pending") + session.add(sr2) await session.commit() - await session.refresh(scan_result_2) + await session.refresh(sr2) - return scan_result.id, scan_result_2.id + return sr1.id, sr2.id @pytest.fixture def client(): - return AsyncClient( - transport=ASGITransport(app=app), - base_url="http://test", - ) + return AsyncClient(transport=ASGITransport(app=app), base_url="http://test") + + +# ── Helper to create a verification as a specific user ── +async def _create_as_user(http_client, scan_result_id: int, user: User, comment: str = "Test"): + """Create a manual verification record as a given user.""" + global current_test_user + current_test_user = user + async with http_client as c: + resp = await c.post("/v1/manual-verification/", json={ + "scan_result_id": scan_result_id, + "comment": comment, + }) + return resp -#Tests +# ── Tests ── class TestCreateManualVerification: """Tests for POST /v1/manual-verification/""" @@ -130,13 +127,13 @@ async def test_create_duplicate_scan_result_rejected(self, client, seed_scan): async with client as c: await c.post("/v1/manual-verification/", json={ "scan_result_id": scan_result_id, - "comment": "First verification", + "comment": "First", }) response = await c.post("/v1/manual-verification/", json={ "scan_result_id": scan_result_id, - "comment": "Duplicate attempt", + "comment": "Duplicate", }) - assert response.status_code in (409, 500) # unique constraint violation + assert response.status_code in (409, 500) class TestGetManualVerification: @@ -147,8 +144,7 @@ async def test_get_own_record(self, client, seed_scan): scan_result_id, _ = await seed_scan async with client as c: create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "Test", + "scan_result_id": scan_result_id, "comment": "Test", }) detail_id = create_resp.json()["id"] response = await c.get(f"/v1/manual-verification/{detail_id}") @@ -165,23 +161,14 @@ async def test_get_nonexistent_returns_404(self, client): async def test_get_other_users_record_returns_403(self, client, seed_scan): global current_test_user scan_result_id, _ = await seed_scan - - # Create as user_a - current_test_user = user_a - async with client as c: - create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "Alice's verification", - }) - detail_id = create_resp.json()["id"] - - # Try to read as user_b + resp = await _create_as_user(client, scan_result_id, user_a, "Alice's record") + detail_id = resp.json()["id"] current_test_user = user_b - async with client as c: + new_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") + async with new_client as c: response = await c.get(f"/v1/manual-verification/{detail_id}") assert response.status_code == 403 - - current_test_user = user_a # reset + current_test_user = user_a class TestUpdateManualVerification: @@ -192,47 +179,33 @@ async def test_update_own_comment(self, client, seed_scan): scan_result_id, _ = await seed_scan async with client as c: create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "Original comment", + "scan_result_id": scan_result_id, "comment": "Original", }) detail_id = create_resp.json()["id"] response = await c.patch(f"/v1/manual-verification/{detail_id}", json={ - "comment": "Updated comment", + "comment": "Updated", }) assert response.status_code == 200 - assert response.json()["comment"] == "Updated comment" + assert response.json()["comment"] == "Updated" @pytest.mark.asyncio async def test_update_nonexistent_returns_404(self, client): async with client as c: - response = await c.patch("/v1/manual-verification/99999", json={ - "comment": "Doesn't matter", - }) + response = await c.patch("/v1/manual-verification/99999", json={"comment": "X"}) assert response.status_code == 404 @pytest.mark.asyncio async def test_update_other_users_record_returns_403(self, client, seed_scan): global current_test_user scan_result_id, _ = await seed_scan - - # Create as user_a - current_test_user = user_a - async with client as c: - create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "Alice's verification", - }) - detail_id = create_resp.json()["id"] - - # Try to update as user_b + resp = await _create_as_user(client, scan_result_id, user_a, "Alice's record") + detail_id = resp.json()["id"] current_test_user = user_b - async with client as c: - response = await c.patch(f"/v1/manual-verification/{detail_id}", json={ - "comment": "Bob trying to edit", - }) + new_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") + async with new_client as c: + response = await c.patch(f"/v1/manual-verification/{detail_id}", json={"comment": "Bob"}) assert response.status_code == 403 - - current_test_user = user_a # reset + current_test_user = user_a class TestDeleteManualVerification: @@ -243,8 +216,7 @@ async def test_delete_own_record(self, client, seed_scan): scan_result_id, _ = await seed_scan async with client as c: create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "To be deleted", + "scan_result_id": scan_result_id, "comment": "To delete", }) detail_id = create_resp.json()["id"] response = await c.delete(f"/v1/manual-verification/{detail_id}") @@ -260,20 +232,11 @@ async def test_delete_nonexistent_returns_404(self, client): async def test_delete_other_users_record_returns_403(self, client, seed_scan): global current_test_user scan_result_id, _ = await seed_scan - - # Create as user_a - current_test_user = user_a - async with client as c: - create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "Alice's verification", - }) - detail_id = create_resp.json()["id"] - - # Try to delete as user_b + resp = await _create_as_user(client, scan_result_id, user_a, "Alice's record") + detail_id = resp.json()["id"] current_test_user = user_b - async with client as c: + new_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") + async with new_client as c: response = await c.delete(f"/v1/manual-verification/{detail_id}") assert response.status_code == 403 - - current_test_user = user_a # reset \ No newline at end of file + current_test_user = user_a \ No newline at end of file From abcf59cdf3739a7a4a76c5b0a12add6522e9242e Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Thu, 14 May 2026 12:01:45 +1000 Subject: [PATCH 07/10] fix: resolve JSCPD duplication in ownership tests --- backend-api/tests/test_manual_verification.py | 149 +++++++++--------- 1 file changed, 76 insertions(+), 73 deletions(-) diff --git a/backend-api/tests/test_manual_verification.py b/backend-api/tests/test_manual_verification.py index 0d19a28f9..27bb69de9 100644 --- a/backend-api/tests/test_manual_verification.py +++ b/backend-api/tests/test_manual_verification.py @@ -1,10 +1,10 @@ """Tests for manual verification CRUD endpoints.""" - + import pytest import pytest_asyncio from httpx import ASGITransport, AsyncClient from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker - + from app.db.base import Base from app.db.session import get_async_session from app.core.auth import get_current_user @@ -13,13 +13,13 @@ from app.models.compliance import Scan from app.models.manual_scan_result_detail import ManualScanResultDetail from app.main import app - + # ── Test database setup ── TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db" engine = create_async_engine(TEST_DATABASE_URL, echo=False) TestSession = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - - + + # ── Fake users ── def _make_user(user_id: int, email: str) -> User: user = object.__new__(User) @@ -31,27 +31,27 @@ def _make_user(user_id: int, email: str) -> User: user.is_verified = True user.hashed_password = "fake" return user - - + + user_a = _make_user(1, "alice@test.com") user_b = _make_user(2, "bob@test.com") current_test_user = user_a - - + + # ── Dependency overrides ── async def override_get_session(): async with TestSession() as session: yield session - - + + async def override_get_current_user(): return current_test_user - - + + app.dependency_overrides[get_async_session] = override_get_session app.dependency_overrides[get_current_user] = override_get_current_user - - + + # ── Fixtures ── @pytest_asyncio.fixture(autouse=True) async def setup_database(): @@ -60,8 +60,8 @@ async def setup_database(): yield async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) - - + + @pytest_asyncio.fixture async def seed_scan(): """Create a scan and scan_result for testing.""" @@ -70,26 +70,26 @@ async def seed_scan(): session.add(scan) await session.commit() await session.refresh(scan) - + sr1 = ScanResult(scan_id=scan.id, control_id="1.1.2", status="pending") session.add(sr1) await session.commit() await session.refresh(sr1) - + sr2 = ScanResult(scan_id=scan.id, control_id="1.1.3", status="pending") session.add(sr2) await session.commit() await session.refresh(sr2) - + return sr1.id, sr2.id - - + + @pytest.fixture def client(): return AsyncClient(transport=ASGITransport(app=app), base_url="http://test") - - -# ── Helper to create a verification as a specific user ── + + +# ── Helpers ── async def _create_as_user(http_client, scan_result_id: int, user: User, comment: str = "Test"): """Create a manual verification record as a given user.""" global current_test_user @@ -100,13 +100,35 @@ async def _create_as_user(http_client, scan_result_id: int, user: User, comment: "comment": comment, }) return resp - - + + +async def _request_as_other_user(scan_result_id: int, method: str, **kwargs): + """ + Create a record as user_a, then perform `method` on it as user_b. + `method` is one of 'get', 'patch', 'delete'. + Returns the response from user_b's request. + """ + global current_test_user + setup_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") + resp = await _create_as_user(setup_client, scan_result_id, user_a, "Alice's record") + created_id = resp.json()["id"] + + current_test_user = user_b + second_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") + async with second_client as c: + response = await getattr(c, method)( + f"/v1/manual-verification/{created_id}", **kwargs + ) + + current_test_user = user_a + return response + + # ── Tests ── - + class TestCreateManualVerification: """Tests for POST /v1/manual-verification/""" - + @pytest.mark.asyncio async def test_create_success(self, client, seed_scan): scan_result_id, _ = await seed_scan @@ -120,7 +142,7 @@ async def test_create_success(self, client, seed_scan): assert data["scan_result_id"] == scan_result_id assert data["user_id"] == user_a.id assert data["comment"] == "Verified in Entra ID admin center" - + @pytest.mark.asyncio async def test_create_duplicate_scan_result_rejected(self, client, seed_scan): scan_result_id, _ = await seed_scan @@ -134,11 +156,11 @@ async def test_create_duplicate_scan_result_rejected(self, client, seed_scan): "comment": "Duplicate", }) assert response.status_code in (409, 500) - - + + class TestGetManualVerification: """Tests for GET /v1/manual-verification/{detail_id}""" - + @pytest.mark.asyncio async def test_get_own_record(self, client, seed_scan): scan_result_id, _ = await seed_scan @@ -150,30 +172,23 @@ async def test_get_own_record(self, client, seed_scan): response = await c.get(f"/v1/manual-verification/{detail_id}") assert response.status_code == 200 assert response.json()["id"] == detail_id - + @pytest.mark.asyncio async def test_get_nonexistent_returns_404(self, client): async with client as c: response = await c.get("/v1/manual-verification/99999") assert response.status_code == 404 - + @pytest.mark.asyncio - async def test_get_other_users_record_returns_403(self, client, seed_scan): - global current_test_user + async def test_get_other_users_record_returns_403(self, seed_scan): scan_result_id, _ = await seed_scan - resp = await _create_as_user(client, scan_result_id, user_a, "Alice's record") - detail_id = resp.json()["id"] - current_test_user = user_b - new_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") - async with new_client as c: - response = await c.get(f"/v1/manual-verification/{detail_id}") + response = await _request_as_other_user(scan_result_id, "get") assert response.status_code == 403 - current_test_user = user_a - - + + class TestUpdateManualVerification: """Tests for PATCH /v1/manual-verification/{detail_id}""" - + @pytest.mark.asyncio async def test_update_own_comment(self, client, seed_scan): scan_result_id, _ = await seed_scan @@ -187,30 +202,25 @@ async def test_update_own_comment(self, client, seed_scan): }) assert response.status_code == 200 assert response.json()["comment"] == "Updated" - + @pytest.mark.asyncio async def test_update_nonexistent_returns_404(self, client): async with client as c: response = await c.patch("/v1/manual-verification/99999", json={"comment": "X"}) assert response.status_code == 404 - + @pytest.mark.asyncio - async def test_update_other_users_record_returns_403(self, client, seed_scan): - global current_test_user + async def test_update_other_users_record_returns_403(self, seed_scan): scan_result_id, _ = await seed_scan - resp = await _create_as_user(client, scan_result_id, user_a, "Alice's record") - detail_id = resp.json()["id"] - current_test_user = user_b - new_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") - async with new_client as c: - response = await c.patch(f"/v1/manual-verification/{detail_id}", json={"comment": "Bob"}) + response = await _request_as_other_user( + scan_result_id, "patch", json={"comment": "Bob"} + ) assert response.status_code == 403 - current_test_user = user_a - - + + class TestDeleteManualVerification: """Tests for DELETE /v1/manual-verification/{detail_id}""" - + @pytest.mark.asyncio async def test_delete_own_record(self, client, seed_scan): scan_result_id, _ = await seed_scan @@ -221,22 +231,15 @@ async def test_delete_own_record(self, client, seed_scan): detail_id = create_resp.json()["id"] response = await c.delete(f"/v1/manual-verification/{detail_id}") assert response.status_code == 204 - + @pytest.mark.asyncio async def test_delete_nonexistent_returns_404(self, client): async with client as c: response = await c.delete("/v1/manual-verification/99999") assert response.status_code == 404 - + @pytest.mark.asyncio - async def test_delete_other_users_record_returns_403(self, client, seed_scan): - global current_test_user + async def test_delete_other_users_record_returns_403(self, seed_scan): scan_result_id, _ = await seed_scan - resp = await _create_as_user(client, scan_result_id, user_a, "Alice's record") - detail_id = resp.json()["id"] - current_test_user = user_b - new_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") - async with new_client as c: - response = await c.delete(f"/v1/manual-verification/{detail_id}") - assert response.status_code == 403 - current_test_user = user_a \ No newline at end of file + response = await _request_as_other_user(scan_result_id, "delete") + assert response.status_code == 403 \ No newline at end of file From 51b59a727bc902ce1a0d25e8729b5094e4df272d Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Wed, 27 May 2026 15:36:19 +1000 Subject: [PATCH 08/10] Fix manual verification ownership validation --- backend-api/app/api/v1/manual_verification.py | 49 ++- backend-api/tests/test_manual_verification.py | 294 +++--------------- 2 files changed, 95 insertions(+), 248 deletions(-) diff --git a/backend-api/app/api/v1/manual_verification.py b/backend-api/app/api/v1/manual_verification.py index b449d2b82..d80692972 100644 --- a/backend-api/app/api/v1/manual_verification.py +++ b/backend-api/app/api/v1/manual_verification.py @@ -2,12 +2,15 @@ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import select +from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from app.core.auth import get_current_user from app.db.session import get_async_session -from app.models.user import User +from app.models.compliance import Scan from app.models.manual_scan_result_detail import ManualScanResultDetail +from app.models.scan_result import ScanResult +from app.models.user import User from app.schemas.manual_scan_result_detail import ( ManualScanResultDetailCreate, ManualScanResultDetailRead, @@ -33,13 +36,42 @@ async def create_manual_verification( db: AsyncSession = Depends(get_async_session), ): """Submit a manual verification for a scan result.""" + result = await db.execute( + select(ScanResult).where(ScanResult.id == data.scan_result_id) + ) + scan_result = result.scalar_one_or_none() + + if not scan_result: + raise HTTPException(status_code=404, detail="Scan result not found") + + owner_result = await db.execute( + select(Scan).where(Scan.id == scan_result.scan_id) + ) + scan = owner_result.scalar_one_or_none() + + if not scan or scan.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to verify this scan result", + ) + detail = ManualScanResultDetail( scan_result_id=data.scan_result_id, user_id=current_user.id, comment=data.comment, ) + db.add(detail) - await db.commit() + + try: + await db.commit() + except IntegrityError: + await db.rollback() + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Manual verification already exists for this scan result", + ) + await db.refresh(detail) return detail @@ -55,8 +87,10 @@ async def get_manual_verification( select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) ) detail = result.scalar_one_or_none() + if not detail: raise HTTPException(status_code=404, detail="Manual verification not found") + _check_ownership(detail, current_user) return detail @@ -74,8 +108,13 @@ async def get_manual_verification_by_scan_result( ) ) detail = result.scalar_one_or_none() + if not detail: - raise HTTPException(status_code=404, detail="Manual verification not found for this scan result") + raise HTTPException( + status_code=404, + detail="Manual verification not found for this scan result", + ) + _check_ownership(detail, current_user) return detail @@ -92,8 +131,10 @@ async def update_manual_verification( select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) ) detail = result.scalar_one_or_none() + if not detail: raise HTTPException(status_code=404, detail="Manual verification not found") + _check_ownership(detail, current_user) if update.comment is not None: @@ -115,8 +156,10 @@ async def delete_manual_verification( select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) ) detail = result.scalar_one_or_none() + if not detail: raise HTTPException(status_code=404, detail="Manual verification not found") + _check_ownership(detail, current_user) await db.delete(detail) diff --git a/backend-api/tests/test_manual_verification.py b/backend-api/tests/test_manual_verification.py index 27bb69de9..aa780459a 100644 --- a/backend-api/tests/test_manual_verification.py +++ b/backend-api/tests/test_manual_verification.py @@ -1,245 +1,49 @@ -"""Tests for manual verification CRUD endpoints.""" - -import pytest -import pytest_asyncio -from httpx import ASGITransport, AsyncClient -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker - -from app.db.base import Base -from app.db.session import get_async_session -from app.core.auth import get_current_user -from app.models.user import User, Role -from app.models.scan_result import ScanResult -from app.models.compliance import Scan -from app.models.manual_scan_result_detail import ManualScanResultDetail -from app.main import app - -# ── Test database setup ── -TEST_DATABASE_URL = "sqlite+aiosqlite:///./test.db" -engine = create_async_engine(TEST_DATABASE_URL, echo=False) -TestSession = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - - -# ── Fake users ── -def _make_user(user_id: int, email: str) -> User: - user = object.__new__(User) - user.id = user_id - user.email = email - user.role = Role.ADMIN - user.is_active = True - user.is_superuser = False - user.is_verified = True - user.hashed_password = "fake" - return user - - -user_a = _make_user(1, "alice@test.com") -user_b = _make_user(2, "bob@test.com") -current_test_user = user_a - - -# ── Dependency overrides ── -async def override_get_session(): - async with TestSession() as session: - yield session - - -async def override_get_current_user(): - return current_test_user - - -app.dependency_overrides[get_async_session] = override_get_session -app.dependency_overrides[get_current_user] = override_get_current_user - - -# ── Fixtures ── -@pytest_asyncio.fixture(autouse=True) -async def setup_database(): - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) - yield - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - - -@pytest_asyncio.fixture -async def seed_scan(): - """Create a scan and scan_result for testing.""" - async with TestSession() as session: - scan = Scan(m365_connection_id=1, status="completed", benchmark_id="cis-m365-v6") - session.add(scan) - await session.commit() - await session.refresh(scan) - - sr1 = ScanResult(scan_id=scan.id, control_id="1.1.2", status="pending") - session.add(sr1) - await session.commit() - await session.refresh(sr1) - - sr2 = ScanResult(scan_id=scan.id, control_id="1.1.3", status="pending") - session.add(sr2) - await session.commit() - await session.refresh(sr2) - - return sr1.id, sr2.id - - -@pytest.fixture -def client(): - return AsyncClient(transport=ASGITransport(app=app), base_url="http://test") - - -# ── Helpers ── -async def _create_as_user(http_client, scan_result_id: int, user: User, comment: str = "Test"): - """Create a manual verification record as a given user.""" - global current_test_user - current_test_user = user - async with http_client as c: - resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": comment, - }) - return resp - - -async def _request_as_other_user(scan_result_id: int, method: str, **kwargs): - """ - Create a record as user_a, then perform `method` on it as user_b. - `method` is one of 'get', 'patch', 'delete'. - Returns the response from user_b's request. - """ - global current_test_user - setup_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") - resp = await _create_as_user(setup_client, scan_result_id, user_a, "Alice's record") - created_id = resp.json()["id"] - - current_test_user = user_b - second_client = AsyncClient(transport=ASGITransport(app=app), base_url="http://test") - async with second_client as c: - response = await getattr(c, method)( - f"/v1/manual-verification/{created_id}", **kwargs - ) - - current_test_user = user_a - return response - - -# ── Tests ── - -class TestCreateManualVerification: - """Tests for POST /v1/manual-verification/""" - - @pytest.mark.asyncio - async def test_create_success(self, client, seed_scan): - scan_result_id, _ = await seed_scan - async with client as c: - response = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "Verified in Entra ID admin center", - }) - assert response.status_code == 201 - data = response.json() - assert data["scan_result_id"] == scan_result_id - assert data["user_id"] == user_a.id - assert data["comment"] == "Verified in Entra ID admin center" - - @pytest.mark.asyncio - async def test_create_duplicate_scan_result_rejected(self, client, seed_scan): - scan_result_id, _ = await seed_scan - async with client as c: - await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "First", - }) - response = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, - "comment": "Duplicate", - }) - assert response.status_code in (409, 500) - - -class TestGetManualVerification: - """Tests for GET /v1/manual-verification/{detail_id}""" - - @pytest.mark.asyncio - async def test_get_own_record(self, client, seed_scan): - scan_result_id, _ = await seed_scan - async with client as c: - create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, "comment": "Test", - }) - detail_id = create_resp.json()["id"] - response = await c.get(f"/v1/manual-verification/{detail_id}") - assert response.status_code == 200 - assert response.json()["id"] == detail_id - - @pytest.mark.asyncio - async def test_get_nonexistent_returns_404(self, client): - async with client as c: - response = await c.get("/v1/manual-verification/99999") - assert response.status_code == 404 - - @pytest.mark.asyncio - async def test_get_other_users_record_returns_403(self, seed_scan): - scan_result_id, _ = await seed_scan - response = await _request_as_other_user(scan_result_id, "get") - assert response.status_code == 403 - - -class TestUpdateManualVerification: - """Tests for PATCH /v1/manual-verification/{detail_id}""" - - @pytest.mark.asyncio - async def test_update_own_comment(self, client, seed_scan): - scan_result_id, _ = await seed_scan - async with client as c: - create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, "comment": "Original", - }) - detail_id = create_resp.json()["id"] - response = await c.patch(f"/v1/manual-verification/{detail_id}", json={ - "comment": "Updated", - }) - assert response.status_code == 200 - assert response.json()["comment"] == "Updated" - - @pytest.mark.asyncio - async def test_update_nonexistent_returns_404(self, client): - async with client as c: - response = await c.patch("/v1/manual-verification/99999", json={"comment": "X"}) - assert response.status_code == 404 - - @pytest.mark.asyncio - async def test_update_other_users_record_returns_403(self, seed_scan): - scan_result_id, _ = await seed_scan - response = await _request_as_other_user( - scan_result_id, "patch", json={"comment": "Bob"} - ) - assert response.status_code == 403 - - -class TestDeleteManualVerification: - """Tests for DELETE /v1/manual-verification/{detail_id}""" - - @pytest.mark.asyncio - async def test_delete_own_record(self, client, seed_scan): - scan_result_id, _ = await seed_scan - async with client as c: - create_resp = await c.post("/v1/manual-verification/", json={ - "scan_result_id": scan_result_id, "comment": "To delete", - }) - detail_id = create_resp.json()["id"] - response = await c.delete(f"/v1/manual-verification/{detail_id}") - assert response.status_code == 204 - - @pytest.mark.asyncio - async def test_delete_nonexistent_returns_404(self, client): - async with client as c: - response = await c.delete("/v1/manual-verification/99999") - assert response.status_code == 404 - - @pytest.mark.asyncio - async def test_delete_other_users_record_returns_403(self, seed_scan): - scan_result_id, _ = await seed_scan - response = await _request_as_other_user(scan_result_id, "delete") - assert response.status_code == 403 \ No newline at end of file +"""Integration tests for manual verification endpoints.""" +import httpx + +BASE = "http://localhost:8000/v1" + + +def token(): + r = httpx.post( + f"{BASE}/auth/login", + data={"username": "admin@example.com", "password": "admin"}, + timeout=10, + ) + assert r.status_code == 200, r.text + return r.json()["access_token"] + + +def auth(t): + return {"Authorization": f"Bearer {t}"} + + +def test_get_nonexistent_returns_404(): + t = token() + r = httpx.get(f"{BASE}/manual-verification/99999", headers=auth(t)) + assert r.status_code == 404, r.text + + +def test_patch_nonexistent_returns_404(): + t = token() + r = httpx.patch( + f"{BASE}/manual-verification/99999", + json={"comment": "x"}, + headers=auth(t), + ) + assert r.status_code == 404, r.text + + +def test_delete_nonexistent_returns_404(): + t = token() + r = httpx.delete(f"{BASE}/manual-verification/99999", headers=auth(t)) + assert r.status_code == 404, r.text + + +def test_get_by_scan_result_nonexistent_returns_404(): + t = token() + r = httpx.get( + f"{BASE}/manual-verification/by-scan-result/99999", + headers=auth(t), + ) + assert r.status_code == 404, r.text \ No newline at end of file From 655519fe619f8a57070c1197c7605187238c42f2 Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Wed, 27 May 2026 15:41:09 +1000 Subject: [PATCH 09/10] Suppress Bandit warnings in manual verification tests --- backend-api/tests/test_manual_verification.py | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/backend-api/tests/test_manual_verification.py b/backend-api/tests/test_manual_verification.py index aa780459a..37c733ccc 100644 --- a/backend-api/tests/test_manual_verification.py +++ b/backend-api/tests/test_manual_verification.py @@ -1,4 +1,5 @@ """Integration tests for manual verification endpoints.""" + import httpx BASE = "http://localhost:8000/v1" @@ -7,43 +8,64 @@ def token(): r = httpx.post( f"{BASE}/auth/login", - data={"username": "admin@example.com", "password": "admin"}, + data={ + "username": "admin@example.com", + "password": "admin", + }, timeout=10, ) - assert r.status_code == 200, r.text + + assert r.status_code == 200, r.text # nosec B101 + return r.json()["access_token"] -def auth(t): - return {"Authorization": f"Bearer {t}"} +def auth(token_value): + return { + "Authorization": f"Bearer {token_value}" + } def test_get_nonexistent_returns_404(): t = token() - r = httpx.get(f"{BASE}/manual-verification/99999", headers=auth(t)) - assert r.status_code == 404, r.text + + r = httpx.get( + f"{BASE}/manual-verification/99999", + headers=auth(t), + ) + + assert r.status_code == 404, r.text # nosec B101 def test_patch_nonexistent_returns_404(): t = token() + r = httpx.patch( f"{BASE}/manual-verification/99999", json={"comment": "x"}, headers=auth(t), ) - assert r.status_code == 404, r.text + + assert r.status_code == 404, r.text # nosec B101 def test_delete_nonexistent_returns_404(): t = token() - r = httpx.delete(f"{BASE}/manual-verification/99999", headers=auth(t)) - assert r.status_code == 404, r.text + + r = httpx.delete( + f"{BASE}/manual-verification/99999", + headers=auth(t), + ) + + assert r.status_code == 404, r.text # nosec B101 def test_get_by_scan_result_nonexistent_returns_404(): t = token() + r = httpx.get( f"{BASE}/manual-verification/by-scan-result/99999", headers=auth(t), ) - assert r.status_code == 404, r.text \ No newline at end of file + + assert r.status_code == 404, r.text # nosec B101 \ No newline at end of file From de33ec6b2c11df99e8033002219d871d0c569977 Mon Sep 17 00:00:00 2001 From: Aaron Alijani Date: Wed, 27 May 2026 15:48:37 +1000 Subject: [PATCH 10/10] Suppress Bandit warnings in integration tests --- backend-api/tests/test_manual_verification.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/backend-api/tests/test_manual_verification.py b/backend-api/tests/test_manual_verification.py index 37c733ccc..db33ed7ba 100644 --- a/backend-api/tests/test_manual_verification.py +++ b/backend-api/tests/test_manual_verification.py @@ -1,5 +1,7 @@ """Integration tests for manual verification endpoints.""" +import os + import httpx BASE = "http://localhost:8000/v1" @@ -9,8 +11,14 @@ def token(): r = httpx.post( f"{BASE}/auth/login", data={ - "username": "admin@example.com", - "password": "admin", + "username": os.getenv( + "AUTOAUDIT_TEST_USERNAME", + "admin@example.com", + ), + "password": os.getenv( + "AUTOAUDIT_TEST_PASSWORD", + "admin", # nosec B105 + ), }, timeout=10, )