diff --git a/backend-api/alembic/versions/ccf7645372fc_add_manual_scan_result_detail_table.py b/backend-api/alembic/versions/ccf7645372fc_add_manual_scan_result_detail_table.py new file mode 100644 index 000000000..daa053755 --- /dev/null +++ b/backend-api/alembic/versions/ccf7645372fc_add_manual_scan_result_detail_table.py @@ -0,0 +1,44 @@ +"""add manual_scan_result_detail table + +Revision ID: ccf7645372fc +Revises: j1k2l3m4n567 +Create Date: 2026-04-13 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "ccf7645372fc" +down_revision: Union[str, Sequence[str], None] = "j1k2l3m4n567" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + op.create_table( + "manual_scan_result_detail", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("scan_result_id", sa.Integer(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("comment", sa.Text(), nullable=True), + sa.Column( + "created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False + ), + sa.ForeignKeyConstraint(["scan_result_id"], ["scan_result.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("scan_result_id"), + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_table("manual_scan_result_detail") \ No newline at end of file diff --git a/backend-api/app/api/v1/manual_verification.py b/backend-api/app/api/v1/manual_verification.py new file mode 100644 index 000000000..d80692972 --- /dev/null +++ b/backend-api/app/api/v1/manual_verification.py @@ -0,0 +1,166 @@ +"""Manual scan result detail API endpoints.""" + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.auth import get_current_user +from app.db.session import get_async_session +from app.models.compliance import Scan +from app.models.manual_scan_result_detail import ManualScanResultDetail +from app.models.scan_result import ScanResult +from app.models.user import User +from app.schemas.manual_scan_result_detail import ( + ManualScanResultDetailCreate, + ManualScanResultDetailRead, + ManualScanResultDetailUpdate, +) + +router = APIRouter(prefix="/manual-verification", tags=["Manual Verification"]) + + +def _check_ownership(detail: ManualScanResultDetail, user: User) -> None: + """Raise 403 if the current user does not own this record.""" + if detail.user_id != user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to access this record", + ) + + +@router.post("/", response_model=ManualScanResultDetailRead, status_code=status.HTTP_201_CREATED) +async def create_manual_verification( + data: ManualScanResultDetailCreate, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Submit a manual verification for a scan result.""" + result = await db.execute( + select(ScanResult).where(ScanResult.id == data.scan_result_id) + ) + scan_result = result.scalar_one_or_none() + + if not scan_result: + raise HTTPException(status_code=404, detail="Scan result not found") + + owner_result = await db.execute( + select(Scan).where(Scan.id == scan_result.scan_id) + ) + scan = owner_result.scalar_one_or_none() + + if not scan or scan.user_id != current_user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You do not have permission to verify this scan result", + ) + + detail = ManualScanResultDetail( + scan_result_id=data.scan_result_id, + user_id=current_user.id, + comment=data.comment, + ) + + db.add(detail) + + try: + await db.commit() + except IntegrityError: + await db.rollback() + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Manual verification already exists for this scan result", + ) + + await db.refresh(detail) + return detail + + +@router.get("/{detail_id}", response_model=ManualScanResultDetailRead) +async def get_manual_verification( + detail_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Get a manual verification by ID.""" + result = await db.execute( + select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) + ) + detail = result.scalar_one_or_none() + + if not detail: + raise HTTPException(status_code=404, detail="Manual verification not found") + + _check_ownership(detail, current_user) + return detail + + +@router.get("/by-scan-result/{scan_result_id}", response_model=ManualScanResultDetailRead) +async def get_manual_verification_by_scan_result( + scan_result_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Get a manual verification by scan result ID.""" + result = await db.execute( + select(ManualScanResultDetail).where( + ManualScanResultDetail.scan_result_id == scan_result_id + ) + ) + detail = result.scalar_one_or_none() + + if not detail: + raise HTTPException( + status_code=404, + detail="Manual verification not found for this scan result", + ) + + _check_ownership(detail, current_user) + return detail + + +@router.patch("/{detail_id}", response_model=ManualScanResultDetailRead) +async def update_manual_verification( + detail_id: int, + update: ManualScanResultDetailUpdate, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Update a manual verification comment.""" + result = await db.execute( + select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) + ) + detail = result.scalar_one_or_none() + + if not detail: + raise HTTPException(status_code=404, detail="Manual verification not found") + + _check_ownership(detail, current_user) + + if update.comment is not None: + detail.comment = update.comment + + await db.commit() + await db.refresh(detail) + return detail + + +@router.delete("/{detail_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_manual_verification( + detail_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_async_session), +): + """Delete a manual verification.""" + result = await db.execute( + select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id) + ) + detail = result.scalar_one_or_none() + + if not detail: + raise HTTPException(status_code=404, detail="Manual verification not found") + + _check_ownership(detail, current_user) + + await db.delete(detail) + await db.commit() \ No newline at end of file diff --git a/backend-api/app/api/v1/router.py b/backend-api/app/api/v1/router.py index c648004af..6fb434d63 100644 --- a/backend-api/app/api/v1/router.py +++ b/backend-api/app/api/v1/router.py @@ -5,6 +5,7 @@ contact, evidence, m365_connections, + manual_verification, platforms, scans, settings, @@ -39,3 +40,6 @@ # User settings routes api_router.include_router(settings.router) + +# Manual verification routes +api_router.include_router(manual_verification.router) diff --git a/backend-api/app/models/__init__.py b/backend-api/app/models/__init__.py index 27c476e4d..d607c0777 100644 --- a/backend-api/app/models/__init__.py +++ b/backend-api/app/models/__init__.py @@ -8,6 +8,7 @@ from app.models.aws_connection import AWSConnection from app.models.platform import Platform from app.models.scan_result import ScanResult +from app.models.manual_scan_result_detail import ManualScanResultDetail from app.models.compliance import Scan from app.models.evidence_validation import EvidenceValidation from app.models.contact import ContactSubmission, SubmissionNote, SubmissionHistory @@ -23,6 +24,7 @@ "AWSConnection", "Platform", "ScanResult", + "ManualScanResultDetail", "Scan", "EvidenceValidation", "ContactSubmission", diff --git a/backend-api/app/models/manual_scan_result_detail.py b/backend-api/app/models/manual_scan_result_detail.py new file mode 100644 index 000000000..36b8d1319 --- /dev/null +++ b/backend-api/app/models/manual_scan_result_detail.py @@ -0,0 +1,38 @@ +"""ManualScanResultDetail model for extra data on manually verified controls.""" + +from datetime import datetime +from typing import TYPE_CHECKING, Optional + +from sqlalchemy import ForeignKey, Text +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.sql import func + +from app.db.base import Base + +if TYPE_CHECKING: + from app.models.scan_result import ScanResult + from app.models.user import User + + +class ManualScanResultDetail(Base): + """Extra detail for scan results that were manually verified.""" + + __tablename__ = "manual_scan_result_detail" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + scan_result_id: Mapped[int] = mapped_column( + ForeignKey("scan_result.id", ondelete="CASCADE"), nullable=False, unique=True + ) + user_id: Mapped[int] = mapped_column( + ForeignKey("user.id", ondelete="CASCADE"), nullable=False + ) + comment: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + + created_at: Mapped[datetime] = mapped_column(server_default=func.now(), nullable=False) + updated_at: Mapped[datetime] = mapped_column( + server_default=func.now(), onupdate=func.now(), nullable=False + ) + + # Relationships + scan_result: Mapped["ScanResult"] = relationship() + user: Mapped["User"] = relationship() \ No newline at end of file diff --git a/backend-api/app/schemas/__init__.py b/backend-api/app/schemas/__init__.py index e6b5ceadc..d40134712 100644 --- a/backend-api/app/schemas/__init__.py +++ b/backend-api/app/schemas/__init__.py @@ -4,6 +4,11 @@ from app.schemas.validator_match import ValidatorMatch from app.schemas.validator_result import ValidatorResult from app.schemas.validator_summary import ValidatorSummary +from app.schemas.manual_scan_result_detail import ( + ManualScanResultDetailCreate, + ManualScanResultDetailRead, + ManualScanResultDetailUpdate, +) __all__ = [ "UserRead", @@ -13,4 +18,7 @@ "ValidatorMatch", "ValidatorResult", "ValidatorSummary", + "ManualScanResultDetailCreate", + "ManualScanResultDetailRead", + "ManualScanResultDetailUpdate", ] diff --git a/backend-api/app/schemas/manual_scan_result_detail.py b/backend-api/app/schemas/manual_scan_result_detail.py new file mode 100644 index 000000000..29f5e66b3 --- /dev/null +++ b/backend-api/app/schemas/manual_scan_result_detail.py @@ -0,0 +1,31 @@ +"""Pydantic schemas for manual scan result detail.""" + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class ManualScanResultDetailCreate(BaseModel): + """Schema for creating a manual scan result detail.""" + + scan_result_id: int + comment: str | None = None + + +class ManualScanResultDetailUpdate(BaseModel): + """Schema for updating a manual scan result detail.""" + + comment: str | None = None + + +class ManualScanResultDetailRead(BaseModel): + """Schema for reading a manual scan result detail.""" + + id: int + scan_result_id: int + user_id: int + comment: str | None + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) \ No newline at end of file diff --git a/backend-api/tests/__init__.py b/backend-api/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend-api/tests/test_manual_verification.py b/backend-api/tests/test_manual_verification.py new file mode 100644 index 000000000..db33ed7ba --- /dev/null +++ b/backend-api/tests/test_manual_verification.py @@ -0,0 +1,79 @@ +"""Integration tests for manual verification endpoints.""" + +import os + +import httpx + +BASE = "http://localhost:8000/v1" + + +def token(): + r = httpx.post( + f"{BASE}/auth/login", + data={ + "username": os.getenv( + "AUTOAUDIT_TEST_USERNAME", + "admin@example.com", + ), + "password": os.getenv( + "AUTOAUDIT_TEST_PASSWORD", + "admin", # nosec B105 + ), + }, + timeout=10, + ) + + assert r.status_code == 200, r.text # nosec B101 + + return r.json()["access_token"] + + +def auth(token_value): + return { + "Authorization": f"Bearer {token_value}" + } + + +def test_get_nonexistent_returns_404(): + t = token() + + r = httpx.get( + f"{BASE}/manual-verification/99999", + headers=auth(t), + ) + + assert r.status_code == 404, r.text # nosec B101 + + +def test_patch_nonexistent_returns_404(): + t = token() + + r = httpx.patch( + f"{BASE}/manual-verification/99999", + json={"comment": "x"}, + headers=auth(t), + ) + + assert r.status_code == 404, r.text # nosec B101 + + +def test_delete_nonexistent_returns_404(): + t = token() + + r = httpx.delete( + f"{BASE}/manual-verification/99999", + headers=auth(t), + ) + + assert r.status_code == 404, r.text # nosec B101 + + +def test_get_by_scan_result_nonexistent_returns_404(): + t = token() + + r = httpx.get( + f"{BASE}/manual-verification/by-scan-result/99999", + headers=auth(t), + ) + + assert r.status_code == 404, r.text # nosec B101 \ No newline at end of file