Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ python-multipart>=0.0.9
xhtml2pdf>=0.2.17
aiosqlite>=0.20.0
python-whois>=0.9.4
httpx>=0.28.1
78 changes: 52 additions & 26 deletions backend/secuscan/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def _broadcast(self, task_id: str, event_type: str, data: Any):
event = {"type": event_type, "data": data}
for q in self._listeners[task_id]:
await q.put(event)

async def create_task(
self,
plugin_id: str,
Expand All @@ -119,29 +119,29 @@ async def create_task(
) -> str:
"""
Create a new scan task.

Args:
plugin_id: Plugin identifier
inputs: User input values
preset: Optional preset name
consent_granted: Whether user granted consent

Returns:
Task ID
"""
task_id = str(uuid.uuid4())
plugin_manager = get_plugin_manager()
plugin = plugin_manager.get_plugin(plugin_id)

if not plugin:
raise ValueError(f"Plugin not found: {plugin_id}")

# Apply preset if provided
if preset and preset in plugin.presets:
preset_values = plugin.presets[preset]
# Merge preset with user inputs (user inputs take precedence)
inputs = {**preset_values, **inputs}

# Store task in database
db = await get_db()
await db.execute(
Expand All @@ -163,7 +163,7 @@ async def create_task(
inputs.get("safe_mode", True)
)
)

# Log audit event
await db.log_audit(
"task_created",
Expand All @@ -172,9 +172,9 @@ async def create_task(
task_id=task_id,
plugin_id=plugin_id
)

return task_id

async def mark_task_failed(self, task_id: str, reason: str) -> None:
"""
Mark a task as failed without running it.
Expand Down Expand Up @@ -213,7 +213,7 @@ async def mark_task_failed(self, task_id: str, reason: str) -> None:
async def execute_task(self, task_id: str):
"""
Execute a task asynchronously.

Args:
task_id: Task identifier
"""
Expand Down Expand Up @@ -246,18 +246,18 @@ async def execute_task(self, task_id: str):
if plugin_id in MODULAR_SCANNERS:
scanner_class = MODULAR_SCANNERS[plugin_id]
scanner = scanner_class(task_id, db)

logger.info(f"Executing modular scanner {plugin_id} for task {task_id}")
await self._broadcast(task_id, "status", TaskStatus.RUNNING.value)

start_time = time.time()
# Run the scanner
result = await scanner.run(target, inputs)
duration = time.time() - start_time

# Update task with results
final_status = TaskStatus.COMPLETED.value if result.get("status") != "failed" else TaskStatus.FAILED.value

await db.execute(
"""
UPDATE tasks SET
Expand Down Expand Up @@ -297,7 +297,7 @@ async def execute_task(self, task_id: str):
raise ValueError(f"Plugin not found: {plugin_id}")

# Pending records for assets removed

command = plugin_manager.build_command(plugin_id, inputs)

if not command:
Expand Down Expand Up @@ -384,6 +384,32 @@ async def execute_task(self, task_id: str):
await self._broadcast(task_id, "status", final_status)
await self._invalidate_cached_views()

# Trigger Webhook Notifications
try:
webhook_rows = await db.fetchall("SELECT key, value FROM settings WHERE type = 'webhook'")
if webhook_rows:
from .models import WebhookConfig
from .notifications import notify_scan_completion
webhook_config_dict = {row["key"]: row["value"] for row in webhook_rows}
webhook_config = WebhookConfig(**webhook_config_dict)

if any([webhook_config.slack_url, webhook_config.discord_url, webhook_config.custom_url]):
findings_rows = await db.fetchall("SELECT severity, count(*) as count FROM findings WHERE task_id = ? GROUP BY severity", (task_id,))
findings_summary = {row["severity"]: row["count"] for row in findings_rows}

asyncio.create_task(
notify_scan_completion(
task_id=task_id,
target=target,
status=final_status,
duration=duration,
findings_summary=findings_summary,
config=webhook_config
)
)
except Exception as e:
logger.error(f"Failed to trigger webhooks for task {task_id}: {e}")

# Log completion
await db.log_audit(
"task_completed",
Expand Down Expand Up @@ -460,7 +486,7 @@ async def execute_task(self, task_id: str):
# release the concurrency slot regardless of how the task ended.
self.running_tasks.pop(task_id, None)
await concurrent_limiter.release(task_id)

async def _execute_command(
self,
command: list,
Expand Down Expand Up @@ -491,7 +517,7 @@ async def read_stream():
stdout = process.stdout
if stdout is None:
return

while not stdout.at_eof():
line = await stdout.readline()
if line:
Expand Down Expand Up @@ -618,13 +644,13 @@ async def cancel_task(self, task_id: str) -> bool:
)

return True

async def get_task_status(self, task_id: str) -> Optional[Dict]:
"""Get task status and progress"""
db = await get_db()
task_row = await db.fetchone(
"""
SELECT id, plugin_id, tool_name, target, status, created_at, started_at, completed_at,
SELECT id, plugin_id, tool_name, target, status, created_at, started_at, completed_at,
duration_seconds, exit_code, error_message, preset, inputs_json
FROM tasks WHERE id = ?
""",
Expand Down Expand Up @@ -667,7 +693,7 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id:
"""Persist derived findings and report records into SQLite."""
parsed = self._parse_results(plugin, output)
findings_data = parsed.get("findings", [])

# Update task with structured results
await db.execute(
"UPDATE tasks SET structured_json = ? WHERE id = ?",
Expand Down Expand Up @@ -758,7 +784,7 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id:
async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scanner: Any, plugin_id: str, target: str, status: str, result: Dict[str, Any]):
"""Persist modular scanner results into findings, and reports."""
findings_data = result.get("findings", [])

# Insert findings
for finding in findings_data:
u_id = str(uuid.uuid4()).replace("-", "")
Expand Down Expand Up @@ -845,12 +871,12 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]:
"""Route to appropriate parser based on plugin metadata."""
parser_type = plugin.output.get("parser")
parser_input = self._resolve_parser_input(plugin, output)

# 1. Check for custom parser.py in plugin directory (Recommended)
plugin_manager = get_plugin_manager()
plugin_dir = plugin_manager.plugins_dir / plugin.id
parser_path = plugin_dir / "parser.py"

if parser_path.exists():
if not plugin_manager.verify_parser_at_exec_time(plugin, plugin_dir):
raise ValueError(
Expand Down Expand Up @@ -882,7 +908,7 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]:
return self._normalize_parsed_result(plugin, parser_input, self._parse_nmap_output(parser_input))
elif parser_type == "builtin_http":
return self._normalize_parsed_result(plugin, parser_input, self._parse_http_output(parser_input))

return self._normalize_parsed_result(plugin, parser_input, {"findings": [], "raw": parser_input})

def _resolve_parser_input(self, plugin, output: str) -> str:
Expand Down Expand Up @@ -1041,7 +1067,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]:
findings = []
ports = []
services = []

# Regex for open ports: 80/tcp open http
port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)")
for match in port_pattern.finditer(output):
Expand All @@ -1057,7 +1083,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]:
"remediation": "Close unnecessary ports and use a firewall to restrict access.",
"metadata": {"port": port_str, "protocol": proto, "service": service}
})

return {
"open_ports": sorted(list(set(ports))),
"services": sorted(list(set(services))),
Expand Down
8 changes: 7 additions & 1 deletion backend/secuscan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,10 @@ class ErrorResponse(BaseModel):

class BulkDeleteRequest(RootModel[Annotated[List[str], Field(max_length=MAX_BULK_DELETE)]]):
"""Accepts a JSON array of task IDs directly. Max 500 per request."""
pass
pass

class WebhookConfig(BaseModel):
"""Configuration for external webhook notifications"""
slack_url: Optional[str] = None
discord_url: Optional[str] = None
custom_url: Optional[str] = None
99 changes: 99 additions & 0 deletions backend/secuscan/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import httpx
import logging
import asyncio
from typing import Dict, Any
from .models import WebhookConfig

logger = logging.getLogger(__name__)

async def _send_with_retry(url: str, payload: Dict[str, Any], headers: Dict[str, str] = None, max_retries: int = 3):
"""Send an HTTP request with exponential backoff retry logic."""
if not headers:
headers = {"Content-Type": "application/json"}

async with httpx.AsyncClient() as client:
for attempt in range(max_retries):
try:
response = await client.post(url, json=payload, headers=headers, timeout=10.0)
if response.status_code < 400:
return response
logger.warning(f"Webhook request to {url} failed with status {response.status_code}. Attempt {attempt+1}/{max_retries}")
except httpx.RequestError as e:
logger.warning(f"Webhook request to {url} failed: {e}. Attempt {attempt+1}/{max_retries}")

if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)

logger.error(f"Failed to send webhook to {url} after {max_retries} attempts.")

async def send_slack_webhook(url: str, payload: Dict[str, Any]):
await _send_with_retry(url, payload)

async def send_discord_webhook(url: str, payload: Dict[str, Any]):
await _send_with_retry(url, payload)

async def send_custom_webhook(url: str, payload: Dict[str, Any]):
await _send_with_retry(url, payload)


async def test_webhook_config(config: WebhookConfig):
payload = {
"text": "This is a test notification from SecuScan.",
"content": "This is a test notification from SecuScan.",
"message": "This is a test notification from SecuScan."
}

tasks = []
if config.slack_url:
tasks.append(send_slack_webhook(config.slack_url, {"text": payload["text"]}))
if config.discord_url:
tasks.append(send_discord_webhook(config.discord_url, {"content": payload["content"]}))
if config.custom_url:
tasks.append(send_custom_webhook(config.custom_url, payload))

if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
else:
raise ValueError("No webhook URLs provided to test.")

async def notify_scan_completion(task_id: str, target: str, status: str, duration: float, findings_summary: Dict[str, int], config: WebhookConfig):
duration_str = f"{duration:.2f}s" if duration else "Unknown"

severity_text = ", ".join(f"{k}: {v}" for k, v in findings_summary.items() if v > 0)
if not severity_text:
severity_text = "No findings"

text_content = (
f"🎯 *Scan Completed*\n"
f"Target: `{target}`\n"
f"Status: {status.upper()}\n"
f"Duration: {duration_str}\n"
f"Findings: {severity_text}"
)

slack_payload = {
"text": text_content
}

discord_payload = {
"content": text_content.replace("*", "**") # Discord uses double asterisk for bold
}

custom_payload = {
"task_id": task_id,
"target": target,
"status": status,
"duration_seconds": duration,
"findings_summary": findings_summary
}

tasks = []
if config.slack_url:
tasks.append(send_slack_webhook(config.slack_url, slack_payload))
if config.discord_url:
tasks.append(send_discord_webhook(config.discord_url, discord_payload))
if config.custom_url:
tasks.append(send_custom_webhook(config.custom_url, custom_payload))

if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
Loading
Loading