diff --git a/.env.example b/.env.example index b0448ff2..86267446 100644 --- a/.env.example +++ b/.env.example @@ -31,6 +31,12 @@ SECUSCAN_VAULT_KEY=replace-with-output-of-secrets.token_hex-32 # SECUSCAN_PLUGIN_SIGNATURE_KEY=replace-with-your-signing-key # SECUSCAN_ENFORCE_PLUGIN_SIGNATURES=false +# Parser Sandbox Limits +# Plugin parser.py files run in isolated subprocesses. Adjust these if you have +# plugins that produce very large output or need more time to parse. +# SECUSCAN_PARSER_SANDBOX_TIMEOUT_SECONDS=30 +# SECUSCAN_PARSER_SANDBOX_MAX_OUTPUT_BYTES=8388608 + # Frontend Overrides # Leave these unset for the default local dev flow. # VITE_API_PROXY_TARGET=http://127.0.0.1:8000 diff --git a/backend/secuscan/config.py b/backend/secuscan/config.py index 3b67d255..9303b7ac 100644 --- a/backend/secuscan/config.py +++ b/backend/secuscan/config.py @@ -90,6 +90,10 @@ class Settings(BaseSettings): task_start_max_field_length: int = 1_000 # max chars per string input value task_start_max_array_length: int = 50 # max items in any list/multiselect input + # Parser sandbox limits + parser_sandbox_timeout_seconds: int = 30 + parser_sandbox_max_output_bytes: int = 8 * 1024 * 1024 # 8 MB + # Logging log_level: str = "INFO" log_file: str = str(PROJECT_ROOT / "logs" / "secuscan.log") diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 6f30d723..026de20c 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -21,6 +21,7 @@ from .models import TaskStatus, ScanPhase from .ratelimit import concurrent_limiter from .risk_scoring import compute_risk_score, compute_risk_factors +from .parser_sandbox import run_parser_in_sandbox, ParserSandboxError def _parse_discovered_at(finding: dict) -> Optional[datetime]: @@ -894,23 +895,18 @@ def _parse_results(self, plugin, output: str) -> Dict[str, Any]: "reinstall the plugin before retrying." ) try: - import importlib.util - spec = importlib.util.spec_from_file_location(f"parser_{plugin.id}", parser_path) - if spec is not None: - loader = spec.loader - if loader is not None: - module = importlib.util.module_from_spec(spec) - loader.exec_module(module) - if hasattr(module, "parse"): - logger.info(f"Using custom parser for {plugin.id}") - parsed = module.parse(parser_input) - return self._normalize_parsed_result(plugin, parser_input, parsed) - else: - logger.warning(f"Custom parser {parser_path} missing 'parse' function") - except ValueError: - raise - except Exception as e: - logger.error(f"Error executing custom parser for {plugin.id}: {e}") + parsed = run_parser_in_sandbox( + parser_path=parser_path, + plugin_id=plugin.id, + parser_input=parser_input, + timeout_seconds=settings.parser_sandbox_timeout_seconds, + max_output_bytes=settings.parser_sandbox_max_output_bytes, + ) + return self._normalize_parsed_result(plugin, parser_input, parsed) + except ParserSandboxError as exc: + logger.error("Parser sandbox error for plugin '%s': %s", plugin.id, exc) + except Exception as exc: + logger.error("Unexpected error running parser sandbox for '%s': %s", plugin.id, exc) # 2. Fallback to legacy built-in parsers if parser_type == "builtin_nmap": diff --git a/backend/secuscan/parser_sandbox.py b/backend/secuscan/parser_sandbox.py new file mode 100644 index 00000000..f6763c00 --- /dev/null +++ b/backend/secuscan/parser_sandbox.py @@ -0,0 +1,261 @@ +""" +Sandboxed parser execution for custom plugin parser.py files. + +Plugin parsers run untrusted third-party code. This module executes each +parser in a fresh, short-lived subprocess so that: + + - A crash, infinite loop, or memory explosion in the parser cannot kill the + backend process. + - The parser cannot access the backend's secrets, database handles, or any + other in-process state. + - Environment variables (which may contain SECUSCAN_VAULT_KEY, API keys, etc.) + are stripped from the child process. + - Execution is bounded by a configurable timeout. + - Output size is capped so a runaway parser cannot exhaust backend memory. + +Communication contract +---------------------- + stdin → JSON line: {"input": } + stdout → JSON line: + stderr → captured for diagnostics only + +The child process is a minimal Python bootstrap that imports the plugin's +parser.py, calls parse(input_data), and writes the result to stdout. It +imports nothing from the backend package, so no application state leaks. +""" + +from __future__ import annotations + +import json +import os +import sys +import subprocess +import textwrap +import logging +from pathlib import Path +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + +# Defaults — overridden by the Settings values passed at call time. +_DEFAULT_TIMEOUT_SECONDS: int = 30 +_DEFAULT_MAX_OUTPUT_BYTES: int = 8 * 1024 * 1024 # 8 MB + + +class ParserSandboxError(RuntimeError): + """Raised when the sandboxed parser fails for any reason.""" + + def __init__(self, plugin_id: str, reason: str, stderr: str = "") -> None: + self.plugin_id = plugin_id + self.reason = reason + self.stderr_excerpt = stderr[:2000] if stderr else "" + detail = f": {stderr[:200]}" if stderr.strip() else "" + super().__init__(f"Parser sandbox failed for '{plugin_id}' ({reason}){detail}") + + +# --------------------------------------------------------------------------- +# Bootstrap script injected into the child process via -c +# --------------------------------------------------------------------------- + +_BOOTSTRAP_TEMPLATE = textwrap.dedent( + """\ + import sys, json, os + + # Hard limit: refuse to read more than {max_input_bytes} bytes from stdin. + MAX_INPUT = {max_input_bytes} + raw = sys.stdin.buffer.read(MAX_INPUT + 1) + if len(raw) > MAX_INPUT: + sys.stderr.write("Parser input exceeded size limit\\n") + sys.exit(2) + + try: + envelope = json.loads(raw.decode("utf-8", errors="replace")) + parser_input = envelope["input"] + except Exception as exc: + sys.stderr.write(f"Failed to decode envelope: {{exc}}\\n") + sys.exit(3) + + # Load the plugin's parser module from an absolute path. + import importlib.util + parser_path = {parser_path!r} + spec = importlib.util.spec_from_file_location("_plugin_parser", parser_path) + if spec is None or spec.loader is None: + sys.stderr.write(f"Cannot load parser from {{parser_path}}\\n") + sys.exit(4) + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + if not hasattr(module, "parse"): + sys.stderr.write("Parser module missing 'parse' function\\n") + sys.exit(5) + + result = module.parse(parser_input) + + # Write result as a single JSON line. + sys.stdout.write(json.dumps(result, default=str)) + sys.stdout.flush() +""" +) + + +def _sanitised_env() -> Dict[str, str]: + """Return a minimal environment for the child process. + + Retains PATH and PYTHONPATH (needed to locate the interpreter and any + installed packages) while stripping all credentials and application + secrets present in the parent's environment. + """ + keep_keys = {"PATH", "PYTHONPATH", "HOME", "TMPDIR", "TEMP", "TMP", "LANG", "LC_ALL"} + return {k: v for k, v in os.environ.items() if k in keep_keys} + + +def run_parser_in_sandbox( + parser_path: Path, + plugin_id: str, + parser_input: str, + timeout_seconds: int = _DEFAULT_TIMEOUT_SECONDS, + max_output_bytes: int = _DEFAULT_MAX_OUTPUT_BYTES, +) -> Dict[str, Any]: + """Execute plugin parser.py in an isolated subprocess and return its result. + + Args: + parser_path: Absolute path to the plugin's parser.py. + plugin_id: Plugin identifier used in log and error messages. + parser_input: The raw string output from the scanner to parse. + timeout_seconds: Hard wall-clock timeout; the child is killed when exceeded. + max_output_bytes: Maximum bytes accepted from the child's stdout. + + Returns: + The dict returned by the parser's ``parse()`` function. + + Raises: + ParserSandboxError: on timeout, crash, oversized output, or malformed JSON. + """ + if not parser_path.exists(): + raise ParserSandboxError(plugin_id, "parser.py not found") + + max_input_bytes = max(len(parser_input.encode("utf-8")) + 128, 64 * 1024) + + bootstrap = _BOOTSTRAP_TEMPLATE.format( + parser_path=str(parser_path), + max_input_bytes=max_input_bytes, + ) + + envelope = json.dumps({"input": parser_input}) + stdin_bytes = envelope.encode("utf-8") + + import threading + import time + + stdout_chunks: list[bytes] = [] + stdout_total = 0 + overflow = False + stderr_chunks: list[bytes] = [] + + proc = subprocess.Popen( + [sys.executable, "-c", bootstrap], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=_sanitised_env(), + ) + + def _read_stdout() -> None: + nonlocal stdout_total, overflow + assert proc.stdout is not None + while True: + chunk = proc.stdout.read(65536) + if not chunk: + break + stdout_total += len(chunk) + if stdout_total > max_output_bytes: + overflow = True + proc.kill() + break + stdout_chunks.append(chunk) + + def _read_stderr() -> None: + assert proc.stderr is not None + while True: + chunk = proc.stderr.read(4096) + if not chunk: + break + stderr_chunks.append(chunk) + + t_out = threading.Thread(target=_read_stdout, daemon=True) + t_err = threading.Thread(target=_read_stderr, daemon=True) + t_out.start() + t_err.start() + + try: + proc.stdin.write(stdin_bytes) # type: ignore[union-attr] + proc.stdin.close() # type: ignore[union-attr] + except BrokenPipeError: + pass + + timed_out = False + try: + proc.wait(timeout=timeout_seconds) + except subprocess.TimeoutExpired: + timed_out = True + proc.kill() + + t_out.join(timeout=5) + t_err.join(timeout=5) + + stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace") + + if overflow: + raise ParserSandboxError( + plugin_id, + f"output exceeded {max_output_bytes // (1024 * 1024)} MB limit", + ) + + if timed_out: + logger.warning( + "Parser sandbox timed out after %ds for plugin '%s'", + timeout_seconds, + plugin_id, + ) + raise ParserSandboxError(plugin_id, f"timed out after {timeout_seconds}s", stderr_text) + + if proc.returncode != 0: + logger.error( + "Parser sandbox exited with code %d for plugin '%s': %s", + proc.returncode, + plugin_id, + stderr_text[:500], + ) + raise ParserSandboxError( + plugin_id, + f"subprocess exited with code {proc.returncode}", + stderr_text, + ) + + stdout_bytes = b"".join(stdout_chunks) + + if not stdout_bytes.strip(): + logger.warning( + "Parser sandbox produced no output for plugin '%s'; treating as empty result", + plugin_id, + ) + return {} + + try: + parsed = json.loads(stdout_bytes.decode("utf-8", errors="replace")) + except json.JSONDecodeError as exc: + raise ParserSandboxError( + plugin_id, + f"parser returned non-JSON output: {exc}", + stderr_text, + ) + + if not isinstance(parsed, (dict, list)): + raise ParserSandboxError( + plugin_id, + f"parser returned unexpected type {type(parsed).__name__}; expected dict or list", + ) + + logger.info("Parser sandbox completed successfully for plugin '%s'", plugin_id) + return parsed if isinstance(parsed, dict) else {"findings": parsed} diff --git a/testing/backend/unit/test_parser_sandbox.py b/testing/backend/unit/test_parser_sandbox.py new file mode 100644 index 00000000..d6b0168c --- /dev/null +++ b/testing/backend/unit/test_parser_sandbox.py @@ -0,0 +1,376 @@ +""" +Unit tests for the parser_sandbox module. + +Covers: +- Successful parse: dict result propagated correctly +- Successful parse: list result wrapped in {findings: [...]} +- Parser timeout: ParserSandboxError raised with reason containing "timed out" +- Parser crash (sys.exit / unhandled exception): ParserSandboxError raised +- Parser returns malformed JSON: ParserSandboxError raised +- Parser missing parse() function: ParserSandboxError raised +- Parser produces oversized output: ParserSandboxError raised +- Missing parser.py: ParserSandboxError raised +- Environment sanitisation: secrets not leaked to child process +- Stderr captured in error when subprocess fails +- Empty stdout treated as empty result +- parse() returning non-dict/list raises ParserSandboxError +""" + +import json +import os +import sys +import tempfile +import textwrap +from pathlib import Path + +import pytest + +from backend.secuscan.parser_sandbox import ( + ParserSandboxError, + _sanitised_env, + run_parser_in_sandbox, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _write_parser(tmp_path: Path, body: str) -> Path: + """Write a parser.py with the given body and return its path.""" + p = tmp_path / "parser.py" + p.write_text(textwrap.dedent(body)) + return p + + +# --------------------------------------------------------------------------- +# Successful parsing +# --------------------------------------------------------------------------- + + +class TestRunParserSuccessful: + def test_returns_dict_from_parser(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return {"findings": [], "summary": "ok"} + """, + ) + result = run_parser_in_sandbox(p, "test_plugin", "some scanner output") + assert result == {"findings": [], "summary": "ok"} + + def test_parser_receives_correct_input(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return {"echo": output} + """, + ) + result = run_parser_in_sandbox(p, "test_plugin", "SCANNER OUTPUT") + assert result["echo"] == "SCANNER OUTPUT" + + def test_list_result_wrapped_in_findings(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return [{"title": "finding1"}, {"title": "finding2"}] + """, + ) + result = run_parser_in_sandbox(p, "test_plugin", "") + assert "findings" in result + assert len(result["findings"]) == 2 + + def test_unicode_input_handled(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return {"length": len(output)} + """, + ) + input_str = "テスト scan output 🔍" + result = run_parser_in_sandbox(p, "test_plugin", input_str) + assert result["length"] == len(input_str) + + def test_empty_input_string_accepted(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return {"empty": output == ""} + """, + ) + result = run_parser_in_sandbox(p, "test_plugin", "") + assert result["empty"] is True + + def test_large_output_within_limit_accepted(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return {"findings": [{"title": f"f{i}"} for i in range(1000)]} + """, + ) + result = run_parser_in_sandbox(p, "test_plugin", "data", max_output_bytes=10 * 1024 * 1024) + assert len(result["findings"]) == 1000 + + +# --------------------------------------------------------------------------- +# Timeout +# --------------------------------------------------------------------------- + + +class TestParserTimeout: + def test_timeout_raises_parser_sandbox_error(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + import time + def parse(output): + time.sleep(60) + return {} + """, + ) + with pytest.raises(ParserSandboxError) as exc_info: + run_parser_in_sandbox(p, "slow_plugin", "data", timeout_seconds=1) + assert "timed out" in str(exc_info.value) + assert exc_info.value.plugin_id == "slow_plugin" + + def test_reason_contains_timeout_duration(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + import time + def parse(output): + time.sleep(60) + return {} + """, + ) + with pytest.raises(ParserSandboxError) as exc_info: + run_parser_in_sandbox(p, "slow_plugin", "data", timeout_seconds=1) + assert "1s" in exc_info.value.reason + + +# --------------------------------------------------------------------------- +# Parser crashes +# --------------------------------------------------------------------------- + + +class TestParserCrash: + def test_unhandled_exception_raises_sandbox_error(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + raise RuntimeError("parser exploded") + """, + ) + with pytest.raises(ParserSandboxError) as exc_info: + run_parser_in_sandbox(p, "crash_plugin", "data") + assert exc_info.value.plugin_id == "crash_plugin" + + def test_explicit_sys_exit_raises_sandbox_error(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + import sys + def parse(output): + sys.exit(42) + """, + ) + with pytest.raises(ParserSandboxError): + run_parser_in_sandbox(p, "exit_plugin", "data") + + def test_stderr_captured_in_error(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + import sys + def parse(output): + sys.stderr.write("detailed crash info\\n") + raise ValueError("boom") + """, + ) + with pytest.raises(ParserSandboxError) as exc_info: + run_parser_in_sandbox(p, "verbose_crash", "data") + assert "detailed crash info" in exc_info.value.stderr_excerpt + + def test_syntax_error_in_parser_raises(self, tmp_path): + p = tmp_path / "parser.py" + p.write_text("def parse(output:\n return {}") # syntax error + with pytest.raises(ParserSandboxError): + run_parser_in_sandbox(p, "syntax_plugin", "data") + + +# --------------------------------------------------------------------------- +# Malformed / missing parse function +# --------------------------------------------------------------------------- + + +class TestMalformedParser: + def test_missing_parse_function_raises(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def not_parse(output): + return {} + """, + ) + with pytest.raises(ParserSandboxError): + run_parser_in_sandbox(p, "no_func_plugin", "data") + + def test_parse_returns_non_json_serialisable_raises(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return "just a string" + """, + ) + with pytest.raises(ParserSandboxError) as exc_info: + run_parser_in_sandbox(p, "string_plugin", "data") + assert "unexpected type" in exc_info.value.reason + + def test_parse_returns_none_treated_as_empty(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return None + """, + ) + with pytest.raises(ParserSandboxError): + run_parser_in_sandbox(p, "none_plugin", "data") + + +# --------------------------------------------------------------------------- +# Output size limit +# --------------------------------------------------------------------------- + + +class TestOutputSizeLimit: + def test_oversized_output_raises(self, tmp_path): + p = _write_parser( + tmp_path, + """\ + def parse(output): + return {"data": "x" * 1_000_000} + """, + ) + with pytest.raises(ParserSandboxError) as exc_info: + run_parser_in_sandbox(p, "big_plugin", "data", max_output_bytes=100) + assert "limit" in exc_info.value.reason + + def test_oversized_output_process_killed_before_full_buffer(self, tmp_path): + """Regression: parser generating >limit bytes must be killed immediately. + + The process is terminated as soon as the cap is hit, so it cannot force + the parent to buffer the full output in memory first. + """ + # Parser streams 20 MB in a tight loop so it would fill memory fast if + # the parent waited for it to finish before checking size. + p = _write_parser( + tmp_path, + """\ + import sys + def parse(output): + # Write 20 MB to stdout directly so the parent reader sees it. + chunk = b"x" * 65536 + for _ in range(320): # 320 * 64 KB = 20 MB + sys.stdout.buffer.write(chunk) + sys.stdout.buffer.flush() + return {} + """, + ) + cap = 512 * 1024 # 512 KB cap + import time + start = time.monotonic() + with pytest.raises(ParserSandboxError) as exc_info: + run_parser_in_sandbox(p, "overflow_plugin", "data", max_output_bytes=cap) + elapsed = time.monotonic() - start + assert "limit" in exc_info.value.reason + # Must be killed well before it finishes writing 20 MB — should take < 10s + assert elapsed < 10, f"Overflow enforcement took too long: {elapsed:.1f}s" + + +# --------------------------------------------------------------------------- +# Missing parser file +# --------------------------------------------------------------------------- + + +class TestMissingParserFile: + def test_nonexistent_parser_path_raises(self, tmp_path): + missing = tmp_path / "does_not_exist.py" + with pytest.raises(ParserSandboxError) as exc_info: + run_parser_in_sandbox(missing, "ghost_plugin", "data") + assert "not found" in exc_info.value.reason + assert exc_info.value.plugin_id == "ghost_plugin" + + +# --------------------------------------------------------------------------- +# Environment sanitisation +# --------------------------------------------------------------------------- + + +class TestEnvironmentSanitisation: + def test_secret_env_vars_not_leaked_to_child(self, tmp_path): + os.environ["SECUSCAN_VAULT_KEY"] = "super-secret-key-12345" + p = _write_parser( + tmp_path, + """\ + import os + def parse(output): + return {"vault_key": os.environ.get("SECUSCAN_VAULT_KEY", "NOT_FOUND")} + """, + ) + try: + result = run_parser_in_sandbox(p, "env_test_plugin", "data") + assert result.get("vault_key") == "NOT_FOUND" + finally: + del os.environ["SECUSCAN_VAULT_KEY"] + + def test_sanitised_env_excludes_app_secrets(self): + os.environ["SECUSCAN_VAULT_KEY"] = "should-not-pass" + os.environ["MY_API_TOKEN"] = "token-123" + try: + env = _sanitised_env() + assert "SECUSCAN_VAULT_KEY" not in env + assert "MY_API_TOKEN" not in env + finally: + del os.environ["SECUSCAN_VAULT_KEY"] + del os.environ["MY_API_TOKEN"] + + def test_sanitised_env_retains_path(self): + env = _sanitised_env() + assert "PATH" in env + + +# --------------------------------------------------------------------------- +# ParserSandboxError +# --------------------------------------------------------------------------- + + +class TestParserSandboxError: + def test_is_runtime_error(self): + err = ParserSandboxError("plugin_x", "something went wrong") + assert isinstance(err, RuntimeError) + + def test_plugin_id_stored(self): + err = ParserSandboxError("plugin_x", "reason") + assert err.plugin_id == "plugin_x" + + def test_reason_stored(self): + err = ParserSandboxError("plugin_x", "custom reason") + assert err.reason == "custom reason" + + def test_stderr_excerpt_truncated_to_2000_chars(self): + err = ParserSandboxError("p", "r", stderr="x" * 5000) + assert len(err.stderr_excerpt) == 2000 + + def test_str_contains_plugin_id(self): + err = ParserSandboxError("my_plugin", "bad thing") + assert "my_plugin" in str(err) diff --git a/testing/backend/unit/test_plugin_parser_rce.py b/testing/backend/unit/test_plugin_parser_rce.py index fbd3c3e7..5983fffc 100644 --- a/testing/backend/unit/test_plugin_parser_rce.py +++ b/testing/backend/unit/test_plugin_parser_rce.py @@ -200,8 +200,8 @@ def test_integrity_failure_raises_and_blocks_exec(self, tmp_path, monkeypatch): assert len(exec_called) == 0, "exec_module must not be called when integrity check fails" - def test_exec_module_called_when_integrity_passes(self, tmp_path, monkeypatch): - """When verify_parser_at_exec_time returns True, exec_module must run.""" + def test_sandbox_called_when_integrity_passes(self, tmp_path, monkeypatch): + """When verify_parser_at_exec_time returns True, run_parser_in_sandbox must be called.""" monkeypatch.setattr(settings, "enforce_plugin_signatures", False) parser_src = "def parse(output):\n return {'findings': []}\n" @@ -213,29 +213,18 @@ def test_exec_module_called_when_integrity_passes(self, tmp_path, monkeypatch): mgr.plugins_dir = tmp_path mgr.plugins[plugin.id] = plugin - exec_called = [] - - def _fake_exec(module): - exec_called.append(True) - module.parse = lambda output: {"findings": []} + sandbox_called = [] - with patch("importlib.util.spec_from_file_location") as mock_spec: - mock_loader = MagicMock() - mock_loader.exec_module = MagicMock(side_effect=_fake_exec) - mock_spec_obj = MagicMock() - mock_spec_obj.loader = mock_loader - mock_spec.return_value = mock_spec_obj + def _fake_sandbox(parser_path, plugin_id, parser_input, **kwargs): + sandbox_called.append(plugin_id) + return {"findings": []} - fake_module = MagicMock() - fake_module.parse = lambda output: {"findings": []} + from backend.secuscan import executor as executor_module + exec_instance = executor_module.TaskExecutor.__new__(executor_module.TaskExecutor) - with patch("importlib.util.module_from_spec", return_value=fake_module): - from backend.secuscan import executor as executor_module - exec_instance = executor_module.TaskExecutor.__new__(executor_module.TaskExecutor) - - with patch( - "backend.secuscan.executor.get_plugin_manager", return_value=mgr - ): - result = exec_instance._parse_results(plugin, "raw output") + with patch("backend.secuscan.executor.get_plugin_manager", return_value=mgr), \ + patch("backend.secuscan.executor.run_parser_in_sandbox", side_effect=_fake_sandbox): + result = exec_instance._parse_results(plugin, "raw output") - assert len(exec_called) == 1, "exec_module must be called once when integrity check passes" + assert len(sandbox_called) == 1, "run_parser_in_sandbox must be called once when integrity check passes" + assert sandbox_called[0] == plugin.id