Skip to content
Open
88 changes: 45 additions & 43 deletions backend/secuscan/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import logging
import re

from .redaction import redact
from .redaction import redact, redact_dict
from .cache import get_cache
from .config import settings
from .database import get_db
Expand Down Expand Up @@ -81,6 +81,7 @@ def extract_target(inputs: Dict[str, Any]) -> str:
or inputs.get("domain")
or ""
)

class TaskExecutor:
"""Executes security scanning tasks in isolated environments"""

Expand Down Expand Up @@ -146,29 +147,29 @@ async def create_task(
) -> str:
"""
Create a new scan task.

Args:
plugin_id: Plugin identifier
inputs: User input values
preset: Optional preset name
consent_granted: Whether user granted consent

Returns:
Task ID
"""
task_id = str(uuid.uuid4())
plugin_manager = get_plugin_manager()
plugin = plugin_manager.get_plugin(plugin_id)

if not plugin:
raise ValueError(f"Plugin not found: {plugin_id}")

# Apply preset if provided
if preset and preset in plugin.presets:
preset_values = plugin.presets[preset]
# Merge preset with user inputs (user inputs take precedence)
inputs = {**preset_values, **inputs}

# Store task in database
db = await get_db()
await db.execute(
Expand All @@ -191,7 +192,7 @@ async def create_task(
bool(safe_mode)
)
)

# Log audit event
await db.log_audit(
"task_created",
Expand All @@ -200,9 +201,9 @@ async def create_task(
task_id=task_id,
plugin_id=plugin_id
)

return task_id

async def mark_task_failed(self, task_id: str, reason: str) -> None:
"""
Mark a task as failed without running it.
Expand Down Expand Up @@ -241,7 +242,7 @@ async def mark_task_failed(self, task_id: str, reason: str) -> None:
async def execute_task(self, task_id: str):
"""
Execute a task asynchronously.

Args:
task_id: Task identifier
"""
Expand Down Expand Up @@ -274,7 +275,7 @@ async def execute_task(self, task_id: str):
if plugin_id in MODULAR_SCANNERS:
scanner_class = MODULAR_SCANNERS[plugin_id]
scanner = scanner_class(task_id, db)

logger.info(f"Executing modular scanner {plugin_id} for task {task_id}")
await self._broadcast(task_id, "status", TaskStatus.RUNNING.value)
await self._broadcast_phase(task_id, ScanPhase.RUNNING_COMMAND.value)
Expand All @@ -283,10 +284,10 @@ async def execute_task(self, task_id: str):
# Run the scanner
result = await scanner.run(target, inputs)
duration = time.time() - start_time

# Update task with results
final_status = TaskStatus.COMPLETED.value if result.get("status") != "failed" else TaskStatus.FAILED.value

await db.execute(
"""
UPDATE tasks SET
Expand Down Expand Up @@ -328,7 +329,7 @@ async def execute_task(self, task_id: str):
raise ValueError(f"Plugin not found: {plugin_id}")

# Pending records for assets removed

command = plugin_manager.build_command(plugin_id, inputs)

if not command:
Expand Down Expand Up @@ -431,11 +432,6 @@ async def execute_task(self, task_id: str):
logger.info(f"Task {task_id} completed in {duration:.2f}s")

except asyncio.CancelledError:
# CancelledError inherits from BaseException, not Exception —
# it bypasses the broad except below, so we handle it explicitly.
# Task.cancelled() returns False while the finally block is still
# executing, so this is the only reliable place to write the
# cancellation status to the DB.
duration = (time.time() - start_time) if 'start_time' in locals() else 0
await db.execute(
"""
Expand All @@ -455,12 +451,11 @@ async def execute_task(self, task_id: str):
)
await self._broadcast(task_id, "status", TaskStatus.CANCELLED.value)
await self._invalidate_cached_views()
raise # let asyncio complete the cancellation
raise

except Exception as e:
logger.error(f"Task {task_id} failed: {e}", exc_info=True)

# Update task as failed
duration = (time.time() - start_time) if 'start_time' in locals() else 0
await db.execute(
"""
Expand Down Expand Up @@ -491,11 +486,9 @@ async def execute_task(self, task_id: str):
task_id=task_id
)
finally:
# Always clean up: remove from the in-memory registry and
# release the concurrency slot regardless of how the task ended.
self.running_tasks.pop(task_id, None)
await concurrent_limiter.release(task_id)

async def _execute_command(
self,
command: list,
Expand Down Expand Up @@ -526,7 +519,7 @@ async def read_stream():
stdout = process.stdout
if stdout is None:
return

while not stdout.at_eof():
line = await stdout.readline()
if line:
Expand All @@ -545,7 +538,6 @@ async def read_stream():
return "".join(output_lines) + "\nTask timed out", -1

except asyncio.CancelledError:
# Handle task cancellation by killing the subprocess
logger.warning(f"Task {task_id} cancelled. Killing process {process.pid}")
try:
process.kill()
Expand Down Expand Up @@ -625,7 +617,6 @@ async def cancel_task(self, task_id: str) -> bool:
task = self.running_tasks[task_id]
task.cancel()

# If docker is enabled, forcefully kill the sandbox container
if settings.docker_enabled:
try:
killer = await asyncio.create_subprocess_exec(
Expand Down Expand Up @@ -653,7 +644,7 @@ async def cancel_task(self, task_id: str) -> bool:
)

return True

async def get_task_status(self, task_id: str) -> Optional[Dict]:
"""Get task status and progress"""
db = await get_db()
Expand Down Expand Up @@ -701,9 +692,12 @@ async def get_task_status(self, task_id: str) -> Optional[Dict]:
async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: str, target: str, status: str, output: str = ""):
"""Persist derived findings and report records into SQLite."""
parsed = self._parse_results(plugin, output)
findings_data = parsed.get("findings", [])

# Update task with structured results

# Redact all findings before any persistence path (structured_json AND findings table)
parsed["findings"] = [redact_dict(f) for f in parsed.get("findings", [])]
findings_data = parsed["findings"]

# Update task with structured results (uses the already-redacted parsed dict)
await db.execute(
"UPDATE tasks SET structured_json = ? WHERE id = ?",
(json.dumps(parsed), task_id)
Expand Down Expand Up @@ -792,8 +786,18 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id:

async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scanner: Any, plugin_id: str, target: str, status: str, result: Dict[str, Any]):
"""Persist modular scanner results into findings, and reports."""
findings_data = result.get("findings", [])


# Redact all findings before any persistence path (structured_json AND findings table)
redacted_findings = [redact_dict(f) for f in result.get("findings", [])]
result["findings"] = redacted_findings
findings_data = redacted_findings

# Update task with redacted structured results
await db.execute(
"UPDATE tasks SET structured_json = ? WHERE id = ?",
(json.dumps(result), task_id)
)

# Insert findings
for finding in findings_data:
u_id = str(uuid.uuid4()).replace("-", "")
Expand Down Expand Up @@ -871,21 +875,21 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann
f"{scanner.name} Report",
"professional" if status == TaskStatus.COMPLETED.value else "failed",
"ready" if status == TaskStatus.COMPLETED.value else "failed",
len(findings_data),
2, # Professional reports are typically multi-page
len(redacted_findings),
2,
),
)

def _parse_results(self, plugin, output: str) -> Dict[str, Any]:
"""Route to appropriate parser based on plugin metadata."""
parser_type = plugin.output.get("parser")
parser_input = self._resolve_parser_input(plugin, output)

# 1. Check for custom parser.py in plugin directory (Recommended)
plugin_manager = get_plugin_manager()
plugin_dir = plugin_manager.plugins_dir / plugin.id
parser_path = plugin_dir / "parser.py"

if parser_path.exists():
if not plugin_manager.verify_parser_at_exec_time(plugin, plugin_dir):
raise ValueError(
Expand Down Expand Up @@ -917,7 +921,7 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]:
return self._normalize_parsed_result(plugin, parser_input, self._parse_nmap_output(parser_input))
elif parser_type == "builtin_http":
return self._normalize_parsed_result(plugin, parser_input, self._parse_http_output(parser_input))

return self._normalize_parsed_result(plugin, parser_input, {"findings": [], "raw": parser_input})

def _resolve_parser_input(self, plugin, output: str) -> str:
Expand Down Expand Up @@ -1036,7 +1040,6 @@ def _parse_json_fallback_findings(self, plugin, parser_input: str) -> List[Dict[
return findings

if isinstance(data, dict):
# Common scanner shape: { "results": [...] }
for list_key in ("results", "findings", "issues", "vulnerabilities"):
if isinstance(data.get(list_key), list):
for idx, item in enumerate(data[list_key], start=1):
Expand Down Expand Up @@ -1076,8 +1079,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]:
findings = []
ports = []
services = []

# Regex for open ports: 80/tcp open http

port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)")
for match in port_pattern.finditer(output):
port_str, proto, service = match.groups()
Expand All @@ -1092,7 +1094,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]:
"remediation": "Close unnecessary ports and use a firewall to restrict access.",
"metadata": {"port": port_str, "protocol": proto, "service": service}
})

return {
"open_ports": sorted(list(set(ports))),
"services": sorted(list(set(services))),
Expand Down Expand Up @@ -1148,4 +1150,4 @@ async def _invalidate_cached_views(self):


# Global executor instance
executor = TaskExecutor()
executor = TaskExecutor()
Loading
Loading