Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions backend/secuscan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ class NotificationRuleCreate(BaseModel):
is_active: bool = True


class NotificationRuleUpdate(BaseModel):
"""Partial update payload for a notification rule."""
name: Optional[str] = None
severity_threshold: Optional[NotificationSeverityThreshold] = None
channel_type: Optional[NotificationChannelType] = None
target_url_or_email: Optional[str] = None
is_active: Optional[bool] = None


class NotificationRuleResponse(BaseModel):
"""Stored notification rule returned by the API."""
id: str
Expand Down
221 changes: 219 additions & 2 deletions backend/secuscan/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def _serialize_workflow(row: Dict[str, Any], queued_task_ids: Optional[List[str]
"queued_task_ids": queued_task_ids or [],
}


def is_filesystem_target(target: str) -> bool:
"""
Return True only for genuine local filesystem paths.
Expand Down Expand Up @@ -101,7 +102,9 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str:
from .cache import get_cache
from .models import (
TaskCreateRequest, TaskResponse, TaskResult,
PluginListResponse, ErrorResponse, BulkDeleteRequest
PluginListResponse, ErrorResponse, BulkDeleteRequest,
NotificationRuleCreate, NotificationRuleUpdate,
NotificationChannelType,
)
from .config import settings
from .database import get_db
Expand All @@ -114,7 +117,7 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str:
report_download_limiter, read_heavy_limiter,
resolve_client_identity,
)
from .validation import validate_target, validate_task_start_payload
from .validation import validate_target, validate_task_start_payload, validate_url
from .reporting import reporting
from .vault import VaultCrypto
from .workflows import scheduler
Expand All @@ -124,6 +127,48 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str:
router = APIRouter(prefix="/api/v1")
SSE_RAW_OUTPUT_CHUNK_SIZE = 64 * 1024

_EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")


def _validate_notification_target(channel_type: NotificationChannelType, target: str) -> str:
cleaned = target.strip()
if not cleaned:
raise HTTPException(status_code=400, detail="Notification target is required")

if channel_type == NotificationChannelType.WEBHOOK:
is_valid, error = validate_url(cleaned)
if not is_valid:
raise HTTPException(status_code=400, detail=error or "Invalid webhook URL")
return cleaned

if not _EMAIL_PATTERN.match(cleaned):
raise HTTPException(status_code=400, detail="Invalid email address")
return cleaned


def _serialize_notification_rule(row: Dict[str, Any]) -> Dict[str, Any]:
return {
"id": row["id"],
"name": row["name"],
"severity_threshold": row["severity_threshold"],
"channel_type": row["channel_type"],
"target_url_or_email": row["target_url_or_email"],
"is_active": bool(row.get("is_active")),
"created_at": row.get("created_at"),
"updated_at": row.get("updated_at"),
}


def _serialize_notification_history(row: Dict[str, Any]) -> Dict[str, Any]:
return {
"id": row["id"],
"rule_id": row["rule_id"],
"finding_id": row["finding_id"],
"status": row["status"],
"error_message": row.get("error_message"),
"sent_at": row.get("sent_at"),
}


async def get_or_set_cached(key: str, builder):
"""Read from cache, or build and cache a JSON response."""
Expand Down Expand Up @@ -1197,6 +1242,178 @@ async def trigger_workflow_tick():
return {"tick": "ok"}


@router.get("/notifications/rules")
async def list_notification_rules():
db = await get_db()
rows = await db.fetchall(
"SELECT * FROM notification_rules ORDER BY created_at DESC"
)
rules = [_serialize_notification_rule(row) for row in rows]
return {"rules": rules, "total": len(rules)}


@router.post("/notifications/rules")
async def create_notification_rule(payload: NotificationRuleCreate):
name = payload.name.strip()
if not name:
raise HTTPException(status_code=400, detail="Rule name is required")

target = _validate_notification_target(payload.channel_type, payload.target_url_or_email)
rule_id = str(uuid.uuid4())
db = await get_db()
await db.execute(
"""
INSERT INTO notification_rules (
id, name, severity_threshold, channel_type, target_url_or_email, is_active
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
rule_id,
name,
payload.severity_threshold.value,
payload.channel_type.value,
target,
1 if payload.is_active else 0,
),
)
row = await db.fetchone(
"SELECT * FROM notification_rules WHERE id = ?",
(rule_id,),
)
if not row:
raise HTTPException(status_code=500, detail="Failed to create notification rule")
return _serialize_notification_rule(row)


@router.get("/notifications/rules/{rule_id}")
async def get_notification_rule(rule_id: str):
db = await get_db()
row = await db.fetchone(
"SELECT * FROM notification_rules WHERE id = ?",
(rule_id,),
)
if not row:
raise HTTPException(status_code=404, detail="Notification rule not found")
return _serialize_notification_rule(row)


@router.patch("/notifications/rules/{rule_id}")
async def update_notification_rule(rule_id: str, payload: NotificationRuleUpdate):
db = await get_db()
row = await db.fetchone(
"SELECT * FROM notification_rules WHERE id = ?",
(rule_id,),
)
if not row:
raise HTTPException(status_code=404, detail="Notification rule not found")

updates: List[str] = []
params: List[Any] = []

if payload.name is not None:
name = payload.name.strip()
if not name:
raise HTTPException(status_code=400, detail="Rule name is required")
updates.append("name = ?")
params.append(name)

effective_channel = (
payload.channel_type
if payload.channel_type is not None
else NotificationChannelType(row["channel_type"])
)
if payload.target_url_or_email is not None:
target = _validate_notification_target(
effective_channel,
payload.target_url_or_email,
)
updates.append("target_url_or_email = ?")
params.append(target)
elif payload.channel_type is not None:
target = _validate_notification_target(
effective_channel,
row["target_url_or_email"],
)
updates.append("target_url_or_email = ?")
params.append(target)

if payload.severity_threshold is not None:
updates.append("severity_threshold = ?")
params.append(payload.severity_threshold.value)

if payload.channel_type is not None:
updates.append("channel_type = ?")
params.append(payload.channel_type.value)

if payload.is_active is not None:
updates.append("is_active = ?")
params.append(1 if payload.is_active else 0)

if not updates:
raise HTTPException(status_code=400, detail="No update fields provided")

updates.append("updated_at = datetime('now')")
params.append(rule_id)
await db.execute(
f"UPDATE notification_rules SET {', '.join(updates)} WHERE id = ?",
tuple(params),
)
updated = await db.fetchone(
"SELECT * FROM notification_rules WHERE id = ?",
(rule_id,),
)
if not updated:
raise HTTPException(status_code=404, detail="Notification rule not found")
return _serialize_notification_rule(updated)


@router.delete("/notifications/rules/{rule_id}")
async def delete_notification_rule(rule_id: str):
db = await get_db()
row = await db.fetchone(
"SELECT id FROM notification_rules WHERE id = ?",
(rule_id,),
)
if not row:
raise HTTPException(status_code=404, detail="Notification rule not found")
await db.execute("DELETE FROM notification_rules WHERE id = ?", (rule_id,))
return {"rule_id": rule_id, "deleted": True}


@router.get("/notifications/history", dependencies=[Depends(read_heavy_limiter)])
async def list_notification_history(
rule_id: Optional[str] = None,
limit: int = 50,
offset: int = 0,
):
if limit < 1 or limit > 200:
raise HTTPException(status_code=400, detail="Limit must be between 1 and 200")
if offset < 0:
raise HTTPException(status_code=400, detail="Offset must be non-negative")

db = await get_db()
query = "SELECT * FROM notification_history"
params: List[Any] = []
if rule_id:
query += " WHERE rule_id = ?"
params.append(rule_id)
query += " ORDER BY sent_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])

rows = await db.fetchall(query, tuple(params))
history = [_serialize_notification_history(row) for row in rows]

count_query = "SELECT COUNT(*) AS total FROM notification_history"
count_params: List[Any] = []
if rule_id:
count_query += " WHERE rule_id = ?"
count_params.append(rule_id)
count_row = await db.fetchone(count_query, tuple(count_params))
total = int(count_row["total"]) if count_row else 0

return {"history": history, "total": total, "limit": limit, "offset": offset}


@router.get("/finding/{finding_id}")
async def get_finding_details(finding_id: str):
"""Get detailed information for a specific finding"""
Expand Down
Loading
Loading