diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 6f30d723..ae051c1a 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -13,7 +13,7 @@ import logging import re -from .redaction import redact +from .redaction import redact, redact_dict from .cache import get_cache from .config import settings from .database import get_db @@ -81,6 +81,7 @@ def extract_target(inputs: Dict[str, Any]) -> str: or inputs.get("domain") or "" ) + class TaskExecutor: """Executes security scanning tasks in isolated environments""" @@ -146,29 +147,29 @@ async def create_task( ) -> str: """ Create a new scan task. - + Args: plugin_id: Plugin identifier inputs: User input values preset: Optional preset name consent_granted: Whether user granted consent - + Returns: Task ID """ task_id = str(uuid.uuid4()) plugin_manager = get_plugin_manager() plugin = plugin_manager.get_plugin(plugin_id) - + if not plugin: raise ValueError(f"Plugin not found: {plugin_id}") - + # Apply preset if provided if preset and preset in plugin.presets: preset_values = plugin.presets[preset] # Merge preset with user inputs (user inputs take precedence) inputs = {**preset_values, **inputs} - + # Store task in database db = await get_db() await db.execute( @@ -191,7 +192,7 @@ async def create_task( bool(safe_mode) ) ) - + # Log audit event await db.log_audit( "task_created", @@ -200,9 +201,9 @@ async def create_task( task_id=task_id, plugin_id=plugin_id ) - + return task_id - + async def mark_task_failed(self, task_id: str, reason: str) -> None: """ Mark a task as failed without running it. @@ -241,7 +242,7 @@ async def mark_task_failed(self, task_id: str, reason: str) -> None: async def execute_task(self, task_id: str): """ Execute a task asynchronously. - + Args: task_id: Task identifier """ @@ -274,7 +275,7 @@ async def execute_task(self, task_id: str): if plugin_id in MODULAR_SCANNERS: scanner_class = MODULAR_SCANNERS[plugin_id] scanner = scanner_class(task_id, db) - + logger.info(f"Executing modular scanner {plugin_id} for task {task_id}") await self._broadcast(task_id, "status", TaskStatus.RUNNING.value) await self._broadcast_phase(task_id, ScanPhase.RUNNING_COMMAND.value) @@ -283,10 +284,10 @@ async def execute_task(self, task_id: str): # Run the scanner result = await scanner.run(target, inputs) duration = time.time() - start_time - + # Update task with results final_status = TaskStatus.COMPLETED.value if result.get("status") != "failed" else TaskStatus.FAILED.value - + await db.execute( """ UPDATE tasks SET @@ -328,7 +329,7 @@ async def execute_task(self, task_id: str): raise ValueError(f"Plugin not found: {plugin_id}") # Pending records for assets removed - + command = plugin_manager.build_command(plugin_id, inputs) if not command: @@ -431,11 +432,6 @@ async def execute_task(self, task_id: str): logger.info(f"Task {task_id} completed in {duration:.2f}s") except asyncio.CancelledError: - # CancelledError inherits from BaseException, not Exception — - # it bypasses the broad except below, so we handle it explicitly. - # Task.cancelled() returns False while the finally block is still - # executing, so this is the only reliable place to write the - # cancellation status to the DB. duration = (time.time() - start_time) if 'start_time' in locals() else 0 await db.execute( """ @@ -455,12 +451,11 @@ async def execute_task(self, task_id: str): ) await self._broadcast(task_id, "status", TaskStatus.CANCELLED.value) await self._invalidate_cached_views() - raise # let asyncio complete the cancellation + raise except Exception as e: logger.error(f"Task {task_id} failed: {e}", exc_info=True) - # Update task as failed duration = (time.time() - start_time) if 'start_time' in locals() else 0 await db.execute( """ @@ -491,11 +486,9 @@ async def execute_task(self, task_id: str): task_id=task_id ) finally: - # Always clean up: remove from the in-memory registry and - # release the concurrency slot regardless of how the task ended. self.running_tasks.pop(task_id, None) await concurrent_limiter.release(task_id) - + async def _execute_command( self, command: list, @@ -526,7 +519,7 @@ async def read_stream(): stdout = process.stdout if stdout is None: return - + while not stdout.at_eof(): line = await stdout.readline() if line: @@ -545,7 +538,6 @@ async def read_stream(): return "".join(output_lines) + "\nTask timed out", -1 except asyncio.CancelledError: - # Handle task cancellation by killing the subprocess logger.warning(f"Task {task_id} cancelled. Killing process {process.pid}") try: process.kill() @@ -625,7 +617,6 @@ async def cancel_task(self, task_id: str) -> bool: task = self.running_tasks[task_id] task.cancel() - # If docker is enabled, forcefully kill the sandbox container if settings.docker_enabled: try: killer = await asyncio.create_subprocess_exec( @@ -653,7 +644,7 @@ async def cancel_task(self, task_id: str) -> bool: ) return True - + async def get_task_status(self, task_id: str) -> Optional[Dict]: """Get task status and progress""" db = await get_db() @@ -701,9 +692,12 @@ async def get_task_status(self, task_id: str) -> Optional[Dict]: async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: str, target: str, status: str, output: str = ""): """Persist derived findings and report records into SQLite.""" parsed = self._parse_results(plugin, output) - findings_data = parsed.get("findings", []) - - # Update task with structured results + + # Redact all findings before any persistence path (structured_json AND findings table) + parsed["findings"] = [redact_dict(f) for f in parsed.get("findings", [])] + findings_data = parsed["findings"] + + # Update task with structured results (uses the already-redacted parsed dict) await db.execute( "UPDATE tasks SET structured_json = ? WHERE id = ?", (json.dumps(parsed), task_id) @@ -792,8 +786,18 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scanner: Any, plugin_id: str, target: str, status: str, result: Dict[str, Any]): """Persist modular scanner results into findings, and reports.""" - findings_data = result.get("findings", []) - + + # Redact all findings before any persistence path (structured_json AND findings table) + redacted_findings = [redact_dict(f) for f in result.get("findings", [])] + result["findings"] = redacted_findings + findings_data = redacted_findings + + # Update task with redacted structured results + await db.execute( + "UPDATE tasks SET structured_json = ? WHERE id = ?", + (json.dumps(result), task_id) + ) + # Insert findings for finding in findings_data: u_id = str(uuid.uuid4()).replace("-", "") @@ -871,8 +875,8 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann f"{scanner.name} Report", "professional" if status == TaskStatus.COMPLETED.value else "failed", "ready" if status == TaskStatus.COMPLETED.value else "failed", - len(findings_data), - 2, # Professional reports are typically multi-page + len(redacted_findings), + 2, ), ) @@ -880,12 +884,12 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: """Route to appropriate parser based on plugin metadata.""" parser_type = plugin.output.get("parser") parser_input = self._resolve_parser_input(plugin, output) - + # 1. Check for custom parser.py in plugin directory (Recommended) plugin_manager = get_plugin_manager() plugin_dir = plugin_manager.plugins_dir / plugin.id parser_path = plugin_dir / "parser.py" - + if parser_path.exists(): if not plugin_manager.verify_parser_at_exec_time(plugin, plugin_dir): raise ValueError( @@ -917,7 +921,7 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: return self._normalize_parsed_result(plugin, parser_input, self._parse_nmap_output(parser_input)) elif parser_type == "builtin_http": return self._normalize_parsed_result(plugin, parser_input, self._parse_http_output(parser_input)) - + return self._normalize_parsed_result(plugin, parser_input, {"findings": [], "raw": parser_input}) def _resolve_parser_input(self, plugin, output: str) -> str: @@ -1036,7 +1040,6 @@ def _parse_json_fallback_findings(self, plugin, parser_input: str) -> List[Dict[ return findings if isinstance(data, dict): - # Common scanner shape: { "results": [...] } for list_key in ("results", "findings", "issues", "vulnerabilities"): if isinstance(data.get(list_key), list): for idx, item in enumerate(data[list_key], start=1): @@ -1076,8 +1079,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]: findings = [] ports = [] services = [] - - # Regex for open ports: 80/tcp open http + port_pattern = re.compile(r"(\d+)/(tcp|udp)\s+open\s+([\w-]+)") for match in port_pattern.finditer(output): port_str, proto, service = match.groups() @@ -1092,7 +1094,7 @@ def _parse_nmap_output(self, output: str) -> Dict[str, Any]: "remediation": "Close unnecessary ports and use a firewall to restrict access.", "metadata": {"port": port_str, "protocol": proto, "service": service} }) - + return { "open_ports": sorted(list(set(ports))), "services": sorted(list(set(services))), @@ -1148,4 +1150,4 @@ async def _invalidate_cached_views(self): # Global executor instance -executor = TaskExecutor() +executor = TaskExecutor() \ No newline at end of file diff --git a/testing/backend/unit/test_findings_redaction.py b/testing/backend/unit/test_findings_redaction.py new file mode 100644 index 00000000..f736fd97 --- /dev/null +++ b/testing/backend/unit/test_findings_redaction.py @@ -0,0 +1,184 @@ +""" +Unit and integration tests for findings redaction before DB persistence. + +Verifies that secrets are stripped from finding fields (description, +remediation, proof) and from tasks.structured_json before any DB write. + +Run with: + ./testing/test_python.sh +or directly: + pytest testing/backend/unit/test_findings_redaction.py -v +""" + +import asyncio +import json +import uuid +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from backend.secuscan.redaction import redact_dict, REDACTED + + +# ── Fake AWS key used across tests ──────────────────────────────────────────── + +FAKE_AWS_KEY = "AKIAIOSFODNN7EXAMPLE" + + +# ── Unit tests: redact_dict behaviour ───────────────────────────────────────── + +def test_redact_dict_redacts_aws_key_in_description(): + """Secret in description and remediation is replaced with [REDACTED].""" + finding = { + "title": "Exposed credential", + "category": "Secrets", + "severity": "critical", + "description": f"Found credential {FAKE_AWS_KEY} in config.", + "remediation": f"Rotate the key {FAKE_AWS_KEY} immediately.", + } + result = redact_dict(finding) + assert REDACTED in result["description"] + assert FAKE_AWS_KEY not in result["description"] + assert REDACTED in result["remediation"] + assert FAKE_AWS_KEY not in result["remediation"] + + +def test_redact_dict_leaves_clean_finding_unchanged(): + """Clean findings with no secrets pass through unmodified.""" + finding = { + "title": "Open Port", + "category": "Network", + "severity": "low", + "description": "Port 80 is open and running http.", + "remediation": "Close unnecessary ports.", + } + result = redact_dict(finding) + assert result["description"] == finding["description"] + assert result["remediation"] == finding["remediation"] + assert result["title"] == finding["title"] + + +def test_redact_dict_handles_nested_metadata(): + """Nested metadata dict is walked recursively; non-strings are untouched.""" + finding = { + "title": "Secret in metadata", + "severity": "high", + "description": "See metadata.", + "metadata": { + "raw_value": f"key={FAKE_AWS_KEY}", + "port": 443, + "nested": {"token": f"Bearer {FAKE_AWS_KEY}"}, + }, + } + result = redact_dict(finding) + assert FAKE_AWS_KEY not in result["metadata"]["raw_value"] + assert result["metadata"]["port"] == 443 + assert FAKE_AWS_KEY not in result["metadata"]["nested"]["token"] + + +def test_redact_dict_handles_none_proof(): + """None proof field does not raise and is returned as-is.""" + finding = { + "title": "Finding", + "severity": "info", + "description": "No proof available.", + "proof": None, + } + result = redact_dict(finding) + assert result["proof"] is None + + +def test_redact_dict_handles_missing_keys_gracefully(): + """Minimal finding dict with no description/remediation/proof works fine.""" + finding = {"title": "Bare finding", "severity": "low"} + result = redact_dict(finding) + assert result["title"] == "Bare finding" + assert result["severity"] == "low" + + +# ── Integration test: DB persistence paths ──────────────────────────────────── + +@pytest.mark.asyncio +async def test_upsert_findings_redacts_description_before_insert(): + """ + After _upsert_findings_and_report is called: + 1. The findings table row must not contain the raw secret. + 2. tasks.structured_json must not contain the raw secret. + """ + from backend.secuscan.executor import TaskExecutor + from backend.secuscan.config import settings + from backend.secuscan.database import get_db, init_db + from backend.secuscan.plugins import get_plugin_manager, init_plugins + + try: + pm = get_plugin_manager() + except RuntimeError: + await init_plugins(settings.plugins_dir) + pm = get_plugin_manager() + + plugin_id = next(iter(pm.plugins)) + plugin = pm.get_plugin(plugin_id) + + task_id = str(uuid.uuid4()) + db = await get_db() + + await db.execute( + """ + INSERT INTO tasks (id, plugin_id, tool_name, target, inputs_json, status, scan_phase, safe_mode) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + (task_id, plugin_id, plugin.name, "example.com", + json.dumps({"target": "example.com"}), "running", "running_command", 0), + ) + + tainted_finding = { + "title": "Exposed AWS key", + "category": "Secrets", + "severity": "critical", + "description": f"AWS key found: {FAKE_AWS_KEY}", + "remediation": f"Rotate {FAKE_AWS_KEY} immediately.", + "proof": f"curl response contained {FAKE_AWS_KEY}", + "metadata": {}, + } + + executor = TaskExecutor() + with patch.object(executor, "_parse_results", return_value={"findings": [tainted_finding]}): + await executor._upsert_findings_and_report( + db=db, + task_id=task_id, + plugin=plugin, + plugin_id=plugin_id, + target="example.com", + status="completed", + output="", + ) + + # Assert: findings table row is clean + row = await db.fetchone( + "SELECT description, remediation, proof FROM findings WHERE task_id = ?", + (task_id,), + ) + assert row is not None, "No finding row was inserted" + assert FAKE_AWS_KEY not in (row["description"] or ""), \ + f"Secret still in findings.description: {row['description']!r}" + assert FAKE_AWS_KEY not in (row["remediation"] or ""), \ + f"Secret still in findings.remediation: {row['remediation']!r}" + assert FAKE_AWS_KEY not in (row["proof"] or ""), \ + f"Secret still in findings.proof: {row['proof']!r}" + + # Assert: structured_json is also clean + task_row = await db.fetchone( + "SELECT structured_json FROM tasks WHERE id = ?", + (task_id,), + ) + assert task_row is not None + structured = json.loads(task_row["structured_json"]) + findings_in_structured = structured.get("findings", []) + assert findings_in_structured, "structured_json contained no findings" + first = findings_in_structured[0] + assert FAKE_AWS_KEY not in (first.get("description") or ""), \ + f"Secret still in structured_json finding description: {first.get('description')!r}" + assert FAKE_AWS_KEY not in (first.get("remediation") or ""), \ + f"Secret still in structured_json finding remediation: {first.get('remediation')!r}" + assert FAKE_AWS_KEY not in (first.get("proof") or ""), \ + f"Secret still in structured_json finding proof: {first.get('proof')!r}" \ No newline at end of file