Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""add manual_scan_result_detail table

Revision ID: ccf7645372fc
Revises: j1k2l3m4n567
Create Date: 2026-04-13

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "ccf7645372fc"
down_revision: Union[str, Sequence[str], None] = "j1k2l3m4n567"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
op.create_table(
"manual_scan_result_detail",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("scan_result_id", sa.Integer(), nullable=False),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("comment", sa.Text(), nullable=True),
sa.Column(
"created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False
),
sa.ForeignKeyConstraint(["scan_result_id"], ["scan_result.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("scan_result_id"),
)


def downgrade() -> None:
"""Downgrade schema."""
op.drop_table("manual_scan_result_detail")
166 changes: 166 additions & 0 deletions backend-api/app/api/v1/manual_verification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Manual scan result detail API endpoints."""

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.auth import get_current_user
from app.db.session import get_async_session
from app.models.compliance import Scan
from app.models.manual_scan_result_detail import ManualScanResultDetail
from app.models.scan_result import ScanResult
from app.models.user import User
from app.schemas.manual_scan_result_detail import (
ManualScanResultDetailCreate,
ManualScanResultDetailRead,
ManualScanResultDetailUpdate,
)

router = APIRouter(prefix="/manual-verification", tags=["Manual Verification"])


def _check_ownership(detail: ManualScanResultDetail, user: User) -> None:
"""Raise 403 if the current user does not own this record."""
if detail.user_id != user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to access this record",
)


@router.post("/", response_model=ManualScanResultDetailRead, status_code=status.HTTP_201_CREATED)
async def create_manual_verification(
data: ManualScanResultDetailCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_async_session),
):
"""Submit a manual verification for a scan result."""
result = await db.execute(
select(ScanResult).where(ScanResult.id == data.scan_result_id)
)
scan_result = result.scalar_one_or_none()

if not scan_result:
raise HTTPException(status_code=404, detail="Scan result not found")

owner_result = await db.execute(
select(Scan).where(Scan.id == scan_result.scan_id)
)
scan = owner_result.scalar_one_or_none()

if not scan or scan.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to verify this scan result",
)

detail = ManualScanResultDetail(
scan_result_id=data.scan_result_id,
user_id=current_user.id,
comment=data.comment,
)

db.add(detail)

try:
await db.commit()
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Manual verification already exists for this scan result",
)

await db.refresh(detail)
return detail


@router.get("/{detail_id}", response_model=ManualScanResultDetailRead)
async def get_manual_verification(
detail_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_async_session),
):
"""Get a manual verification by ID."""
result = await db.execute(
select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id)
)
detail = result.scalar_one_or_none()

if not detail:
raise HTTPException(status_code=404, detail="Manual verification not found")

_check_ownership(detail, current_user)
return detail


@router.get("/by-scan-result/{scan_result_id}", response_model=ManualScanResultDetailRead)
async def get_manual_verification_by_scan_result(
scan_result_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_async_session),
):
"""Get a manual verification by scan result ID."""
result = await db.execute(
select(ManualScanResultDetail).where(
ManualScanResultDetail.scan_result_id == scan_result_id
)
)
detail = result.scalar_one_or_none()

if not detail:
raise HTTPException(
status_code=404,
detail="Manual verification not found for this scan result",
)

_check_ownership(detail, current_user)
return detail


@router.patch("/{detail_id}", response_model=ManualScanResultDetailRead)
async def update_manual_verification(
detail_id: int,
update: ManualScanResultDetailUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_async_session),
):
"""Update a manual verification comment."""
result = await db.execute(
select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id)
)
detail = result.scalar_one_or_none()

if not detail:
raise HTTPException(status_code=404, detail="Manual verification not found")

_check_ownership(detail, current_user)

if update.comment is not None:
detail.comment = update.comment

await db.commit()
await db.refresh(detail)
return detail


@router.delete("/{detail_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_manual_verification(
detail_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_async_session),
):
"""Delete a manual verification."""
result = await db.execute(
select(ManualScanResultDetail).where(ManualScanResultDetail.id == detail_id)
)
detail = result.scalar_one_or_none()

if not detail:
raise HTTPException(status_code=404, detail="Manual verification not found")

_check_ownership(detail, current_user)

await db.delete(detail)
await db.commit()
4 changes: 4 additions & 0 deletions backend-api/app/api/v1/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
contact,
evidence,
m365_connections,
manual_verification,
platforms,
scans,
settings,
Expand Down Expand Up @@ -39,3 +40,6 @@

# User settings routes
api_router.include_router(settings.router)

# Manual verification routes
api_router.include_router(manual_verification.router)
2 changes: 2 additions & 0 deletions backend-api/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from app.models.aws_connection import AWSConnection
from app.models.platform import Platform
from app.models.scan_result import ScanResult
from app.models.manual_scan_result_detail import ManualScanResultDetail
from app.models.compliance import Scan
from app.models.evidence_validation import EvidenceValidation
from app.models.contact import ContactSubmission, SubmissionNote, SubmissionHistory
Expand All @@ -23,6 +24,7 @@
"AWSConnection",
"Platform",
"ScanResult",
"ManualScanResultDetail",
"Scan",
"EvidenceValidation",
"ContactSubmission",
Expand Down
38 changes: 38 additions & 0 deletions backend-api/app/models/manual_scan_result_detail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""ManualScanResultDetail model for extra data on manually verified controls."""

from datetime import datetime
from typing import TYPE_CHECKING, Optional

from sqlalchemy import ForeignKey, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from app.db.base import Base

if TYPE_CHECKING:
from app.models.scan_result import ScanResult
from app.models.user import User


class ManualScanResultDetail(Base):
"""Extra detail for scan results that were manually verified."""

__tablename__ = "manual_scan_result_detail"

id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
scan_result_id: Mapped[int] = mapped_column(
ForeignKey("scan_result.id", ondelete="CASCADE"), nullable=False, unique=True
)
user_id: Mapped[int] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"), nullable=False
)
comment: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

created_at: Mapped[datetime] = mapped_column(server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now(), nullable=False
)

# Relationships
scan_result: Mapped["ScanResult"] = relationship()
user: Mapped["User"] = relationship()
8 changes: 8 additions & 0 deletions backend-api/app/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
from app.schemas.validator_match import ValidatorMatch
from app.schemas.validator_result import ValidatorResult
from app.schemas.validator_summary import ValidatorSummary
from app.schemas.manual_scan_result_detail import (
ManualScanResultDetailCreate,
ManualScanResultDetailRead,
ManualScanResultDetailUpdate,
)

__all__ = [
"UserRead",
Expand All @@ -13,4 +18,7 @@
"ValidatorMatch",
"ValidatorResult",
"ValidatorSummary",
"ManualScanResultDetailCreate",
"ManualScanResultDetailRead",
"ManualScanResultDetailUpdate",
]
31 changes: 31 additions & 0 deletions backend-api/app/schemas/manual_scan_result_detail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Pydantic schemas for manual scan result detail."""

from datetime import datetime

from pydantic import BaseModel, ConfigDict


class ManualScanResultDetailCreate(BaseModel):
"""Schema for creating a manual scan result detail."""

scan_result_id: int
comment: str | None = None


class ManualScanResultDetailUpdate(BaseModel):
"""Schema for updating a manual scan result detail."""

comment: str | None = None


class ManualScanResultDetailRead(BaseModel):
"""Schema for reading a manual scan result detail."""

id: int
scan_result_id: int
user_id: int
comment: str | None
created_at: datetime
updated_at: datetime

model_config = ConfigDict(from_attributes=True)
Empty file added backend-api/tests/__init__.py
Empty file.
Loading
Loading