diff --git a/backend/secuscan/models.py b/backend/secuscan/models.py index a739d3c2..8310091a 100644 --- a/backend/secuscan/models.py +++ b/backend/secuscan/models.py @@ -209,6 +209,15 @@ class NotificationRuleCreate(BaseModel): is_active: bool = True +class NotificationRuleUpdate(BaseModel): + """Partial update payload for a notification rule.""" + name: Optional[str] = None + severity_threshold: Optional[NotificationSeverityThreshold] = None + channel_type: Optional[NotificationChannelType] = None + target_url_or_email: Optional[str] = None + is_active: Optional[bool] = None + + class NotificationRuleResponse(BaseModel): """Stored notification rule returned by the API.""" id: str diff --git a/backend/secuscan/routes.py b/backend/secuscan/routes.py index 35f29815..cf62d1ef 100644 --- a/backend/secuscan/routes.py +++ b/backend/secuscan/routes.py @@ -53,6 +53,7 @@ def _serialize_workflow(row: Dict[str, Any], queued_task_ids: Optional[List[str] "queued_task_ids": queued_task_ids or [], } + def is_filesystem_target(target: str) -> bool: """ Return True only for genuine local filesystem paths. @@ -101,7 +102,9 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str: from .cache import get_cache from .models import ( TaskCreateRequest, TaskResponse, TaskResult, - PluginListResponse, ErrorResponse, BulkDeleteRequest + PluginListResponse, ErrorResponse, BulkDeleteRequest, + NotificationRuleCreate, NotificationRuleUpdate, + NotificationChannelType, ) from .config import settings from .database import get_db @@ -114,7 +117,7 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str: report_download_limiter, read_heavy_limiter, resolve_client_identity, ) -from .validation import validate_target, validate_task_start_payload +from .validation import validate_target, validate_task_start_payload, validate_url from .reporting import reporting from .vault import VaultCrypto from .workflows import scheduler @@ -124,6 +127,48 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str: router = APIRouter(prefix="/api/v1") SSE_RAW_OUTPUT_CHUNK_SIZE = 64 * 1024 +_EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") + + +def _validate_notification_target(channel_type: NotificationChannelType, target: str) -> str: + cleaned = target.strip() + if not cleaned: + raise HTTPException(status_code=400, detail="Notification target is required") + + if channel_type == NotificationChannelType.WEBHOOK: + is_valid, error = validate_url(cleaned) + if not is_valid: + raise HTTPException(status_code=400, detail=error or "Invalid webhook URL") + return cleaned + + if not _EMAIL_PATTERN.match(cleaned): + raise HTTPException(status_code=400, detail="Invalid email address") + return cleaned + + +def _serialize_notification_rule(row: Dict[str, Any]) -> Dict[str, Any]: + return { + "id": row["id"], + "name": row["name"], + "severity_threshold": row["severity_threshold"], + "channel_type": row["channel_type"], + "target_url_or_email": row["target_url_or_email"], + "is_active": bool(row.get("is_active")), + "created_at": row.get("created_at"), + "updated_at": row.get("updated_at"), + } + + +def _serialize_notification_history(row: Dict[str, Any]) -> Dict[str, Any]: + return { + "id": row["id"], + "rule_id": row["rule_id"], + "finding_id": row["finding_id"], + "status": row["status"], + "error_message": row.get("error_message"), + "sent_at": row.get("sent_at"), + } + async def get_or_set_cached(key: str, builder): """Read from cache, or build and cache a JSON response.""" @@ -1197,6 +1242,178 @@ async def trigger_workflow_tick(): return {"tick": "ok"} +@router.get("/notifications/rules") +async def list_notification_rules(): + db = await get_db() + rows = await db.fetchall( + "SELECT * FROM notification_rules ORDER BY created_at DESC" + ) + rules = [_serialize_notification_rule(row) for row in rows] + return {"rules": rules, "total": len(rules)} + + +@router.post("/notifications/rules") +async def create_notification_rule(payload: NotificationRuleCreate): + name = payload.name.strip() + if not name: + raise HTTPException(status_code=400, detail="Rule name is required") + + target = _validate_notification_target(payload.channel_type, payload.target_url_or_email) + rule_id = str(uuid.uuid4()) + db = await get_db() + await db.execute( + """ + INSERT INTO notification_rules ( + id, name, severity_threshold, channel_type, target_url_or_email, is_active + ) VALUES (?, ?, ?, ?, ?, ?) + """, + ( + rule_id, + name, + payload.severity_threshold.value, + payload.channel_type.value, + target, + 1 if payload.is_active else 0, + ), + ) + row = await db.fetchone( + "SELECT * FROM notification_rules WHERE id = ?", + (rule_id,), + ) + if not row: + raise HTTPException(status_code=500, detail="Failed to create notification rule") + return _serialize_notification_rule(row) + + +@router.get("/notifications/rules/{rule_id}") +async def get_notification_rule(rule_id: str): + db = await get_db() + row = await db.fetchone( + "SELECT * FROM notification_rules WHERE id = ?", + (rule_id,), + ) + if not row: + raise HTTPException(status_code=404, detail="Notification rule not found") + return _serialize_notification_rule(row) + + +@router.patch("/notifications/rules/{rule_id}") +async def update_notification_rule(rule_id: str, payload: NotificationRuleUpdate): + db = await get_db() + row = await db.fetchone( + "SELECT * FROM notification_rules WHERE id = ?", + (rule_id,), + ) + if not row: + raise HTTPException(status_code=404, detail="Notification rule not found") + + updates: List[str] = [] + params: List[Any] = [] + + if payload.name is not None: + name = payload.name.strip() + if not name: + raise HTTPException(status_code=400, detail="Rule name is required") + updates.append("name = ?") + params.append(name) + + effective_channel = ( + payload.channel_type + if payload.channel_type is not None + else NotificationChannelType(row["channel_type"]) + ) + if payload.target_url_or_email is not None: + target = _validate_notification_target( + effective_channel, + payload.target_url_or_email, + ) + updates.append("target_url_or_email = ?") + params.append(target) + elif payload.channel_type is not None: + target = _validate_notification_target( + effective_channel, + row["target_url_or_email"], + ) + updates.append("target_url_or_email = ?") + params.append(target) + + if payload.severity_threshold is not None: + updates.append("severity_threshold = ?") + params.append(payload.severity_threshold.value) + + if payload.channel_type is not None: + updates.append("channel_type = ?") + params.append(payload.channel_type.value) + + if payload.is_active is not None: + updates.append("is_active = ?") + params.append(1 if payload.is_active else 0) + + if not updates: + raise HTTPException(status_code=400, detail="No update fields provided") + + updates.append("updated_at = datetime('now')") + params.append(rule_id) + await db.execute( + f"UPDATE notification_rules SET {', '.join(updates)} WHERE id = ?", + tuple(params), + ) + updated = await db.fetchone( + "SELECT * FROM notification_rules WHERE id = ?", + (rule_id,), + ) + if not updated: + raise HTTPException(status_code=404, detail="Notification rule not found") + return _serialize_notification_rule(updated) + + +@router.delete("/notifications/rules/{rule_id}") +async def delete_notification_rule(rule_id: str): + db = await get_db() + row = await db.fetchone( + "SELECT id FROM notification_rules WHERE id = ?", + (rule_id,), + ) + if not row: + raise HTTPException(status_code=404, detail="Notification rule not found") + await db.execute("DELETE FROM notification_rules WHERE id = ?", (rule_id,)) + return {"rule_id": rule_id, "deleted": True} + + +@router.get("/notifications/history", dependencies=[Depends(read_heavy_limiter)]) +async def list_notification_history( + rule_id: Optional[str] = None, + limit: int = 50, + offset: int = 0, +): + if limit < 1 or limit > 200: + raise HTTPException(status_code=400, detail="Limit must be between 1 and 200") + if offset < 0: + raise HTTPException(status_code=400, detail="Offset must be non-negative") + + db = await get_db() + query = "SELECT * FROM notification_history" + params: List[Any] = [] + if rule_id: + query += " WHERE rule_id = ?" + params.append(rule_id) + query += " ORDER BY sent_at DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + + rows = await db.fetchall(query, tuple(params)) + history = [_serialize_notification_history(row) for row in rows] + + count_query = "SELECT COUNT(*) AS total FROM notification_history" + count_params: List[Any] = [] + if rule_id: + count_query += " WHERE rule_id = ?" + count_params.append(rule_id) + count_row = await db.fetchone(count_query, tuple(count_params)) + total = int(count_row["total"]) if count_row else 0 + + return {"history": history, "total": total, "limit": limit, "offset": offset} + + @router.get("/finding/{finding_id}") async def get_finding_details(finding_id: str): """Get detailed information for a specific finding""" diff --git a/testing/backend/integration/test_notification_routes.py b/testing/backend/integration/test_notification_routes.py new file mode 100644 index 00000000..f519e500 --- /dev/null +++ b/testing/backend/integration/test_notification_routes.py @@ -0,0 +1,155 @@ +import uuid + +from backend.secuscan.models import NotificationDeliveryStatus + + +def _rule_payload( + name: str = "Critical alerts", + severity_threshold: str = "critical", + channel_type: str = "webhook", + target_url_or_email: str = "https://example.com/hook", + is_active: bool = True, +): + return { + "name": name, + "severity_threshold": severity_threshold, + "channel_type": channel_type, + "target_url_or_email": target_url_or_email, + "is_active": is_active, + } + + +def test_notification_rule_crud_contract(test_client): + create_response = test_client.post( + "/api/v1/notifications/rules", + json=_rule_payload(), + ) + assert create_response.status_code == 200 + created = create_response.json() + assert created["id"] + assert created["name"] == "Critical alerts" + assert created["severity_threshold"] == "critical" + assert created["channel_type"] == "webhook" + assert created["target_url_or_email"] == "https://example.com/hook" + assert created["is_active"] is True + assert created["created_at"] + assert created["updated_at"] + + list_response = test_client.get("/api/v1/notifications/rules") + assert list_response.status_code == 200 + listed = list_response.json() + assert listed["total"] == 1 + assert listed["rules"][0]["id"] == created["id"] + + get_response = test_client.get(f"/api/v1/notifications/rules/{created['id']}") + assert get_response.status_code == 200 + assert get_response.json()["id"] == created["id"] + + update_response = test_client.patch( + f"/api/v1/notifications/rules/{created['id']}", + json={"severity_threshold": "high", "is_active": False}, + ) + assert update_response.status_code == 200 + updated = update_response.json() + assert updated["severity_threshold"] == "high" + assert updated["is_active"] is False + + delete_response = test_client.delete( + f"/api/v1/notifications/rules/{created['id']}" + ) + assert delete_response.status_code == 200 + assert delete_response.json()["deleted"] is True + + missing_response = test_client.get(f"/api/v1/notifications/rules/{created['id']}") + assert missing_response.status_code == 404 + + +def test_notification_rule_rejects_invalid_webhook(test_client): + response = test_client.post( + "/api/v1/notifications/rules", + json=_rule_payload(target_url_or_email="not-a-url"), + ) + assert response.status_code == 400 + + +def test_notification_rule_accepts_email_target(test_client): + response = test_client.post( + "/api/v1/notifications/rules", + json=_rule_payload( + channel_type="email", + target_url_or_email="alerts@example.com", + ), + ) + assert response.status_code == 200 + assert response.json()["channel_type"] == "email" + assert response.json()["target_url_or_email"] == "alerts@example.com" + + +def test_notification_history_list_contract(test_client): + import asyncio + + from backend.secuscan.database import get_db + + create_response = test_client.post( + "/api/v1/notifications/rules", + json=_rule_payload(name="History rule"), + ) + assert create_response.status_code == 200 + rule_id = create_response.json()["id"] + + async def seed_history(): + db = await get_db() + task_id = str(uuid.uuid4()) + finding_id = str(uuid.uuid4()) + history_id = str(uuid.uuid4()) + + await db.execute( + """ + INSERT INTO tasks ( + id, plugin_id, tool_name, target, status, inputs_json, consent_granted + ) VALUES (?, 'nmap', 'nmap', '127.0.0.1', 'completed', '{}', 1) + """, + (task_id,), + ) + await db.execute( + """ + INSERT INTO findings ( + id, task_id, plugin_id, title, category, severity, target, description, remediation + ) VALUES (?, ?, 'nmap', 'Open port', 'network', 'critical', '127.0.0.1', 'desc', 'fix') + """, + (finding_id, task_id), + ) + await db.execute( + """ + INSERT INTO notification_history (id, rule_id, finding_id, status, error_message) + VALUES (?, ?, ?, ?, ?) + """, + ( + history_id, + rule_id, + finding_id, + NotificationDeliveryStatus.SUCCESS.value, + None, + ), + ) + return history_id + + history_id = asyncio.run(seed_history()) + + response = test_client.get("/api/v1/notifications/history") + assert response.status_code == 200 + data = response.json() + assert data["total"] >= 1 + assert data["limit"] == 50 + assert data["offset"] == 0 + assert any(item["id"] == history_id for item in data["history"]) + + filtered = test_client.get( + f"/api/v1/notifications/history?rule_id={rule_id}&limit=10" + ) + assert filtered.status_code == 200 + filtered_data = filtered.json() + assert filtered_data["total"] == 1 + assert filtered_data["history"][0]["rule_id"] == rule_id + assert filtered_data["history"][0]["finding_id"] + assert filtered_data["history"][0]["status"] == "success"