From 41a86fc8daaa58ede12d5362f630336b153cea72 Mon Sep 17 00:00:00 2001 From: Samarth Sugandhi Date: Fri, 29 May 2026 02:12:02 +0530 Subject: [PATCH] =?UTF-8?q?feat(backend):=20add=20plugin=20execution=20san?= =?UTF-8?q?dbox=20with=20SIGTERM=E2=86=92SIGKILL=20escalation,=20output=20?= =?UTF-8?q?cap,=20and=20POSIX=20resource=20limits?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/secuscan/executor.py | 101 ++++++------ backend/secuscan/sandbox_executor.py | 223 +++++++++++++++++++++++++++ 2 files changed, 278 insertions(+), 46 deletions(-) create mode 100644 backend/secuscan/sandbox_executor.py diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 44bda2ae..c70792d8 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -20,6 +20,7 @@ from .plugins import get_plugin_manager from .models import TaskStatus from .ratelimit import concurrent_limiter +from .sandbox_executor import SandboxConfig, SandboxViolation, run_sandboxed from .risk_scoring import compute_risk_score, compute_risk_factors @@ -325,10 +326,19 @@ async def execute_task(self, task_id: str): # Execute command start_time = time.time() + # Build sandbox config — allow plugin metadata to override defaults + plugin_sandbox_meta = plugin.output.get("sandbox", {}) if isinstance(plugin.output, dict) else {} + sandbox_cfg = SandboxConfig( + timeout_seconds=self._resolve_execution_timeout(inputs), + max_memory_mb=int(plugin_sandbox_meta.get("max_memory_mb", settings.sandbox_memory_mb)), + max_output_bytes=int(plugin_sandbox_meta.get("max_output_bytes", 5 * 1024 * 1024)), + sigterm_grace_seconds=int(plugin_sandbox_meta.get("sigterm_grace_seconds", 3)), + ) output, exit_code = await self._execute_command( command, task_id, - timeout=self._resolve_execution_timeout(inputs), + timeout=sandbox_cfg.timeout_seconds, + sandbox_config=sandbox_cfg, ) duration = time.time() - start_time @@ -465,63 +475,62 @@ async def _execute_command( self, command: list, task_id: str, - timeout: int = 600 + timeout: int = 600, + sandbox_config: Optional[SandboxConfig] = None, ) -> tuple: """ - Execute command in subprocess and stream output. + Execute command in a sandboxed subprocess and stream output. Args: - command: Command as list - task_id: Task identifier for logging - timeout: Execution timeout in seconds + command: Command as list + task_id: Task identifier for logging and broadcast + timeout: Execution timeout in seconds (overrides sandbox_config) + sandbox_config: Optional SandboxConfig; built from settings if not provided Returns: Tuple of (output, exit_code) """ + config = sandbox_config or SandboxConfig( + timeout_seconds=timeout, + max_memory_mb=settings.sandbox_memory_mb, + max_output_bytes=5 * 1024 * 1024, + sigterm_grace_seconds=3, + ) + # Always honour the caller-supplied timeout + config.timeout_seconds = timeout + try: - process = await asyncio.create_subprocess_exec( - *command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT + output, exit_code = await run_sandboxed( + command=command, + task_id=task_id, + config=config, + broadcast_fn=self._broadcast, ) + return output, exit_code - output_lines = [] - - async def read_stream(): - stdout = process.stdout - if stdout is None: - return - - while not stdout.at_eof(): - line = await stdout.readline() - if line: - decoded_line = line.decode('utf-8', errors='replace') - output_lines.append(decoded_line) - await self._broadcast(task_id, "output", decoded_line) - - try: - await asyncio.wait_for(read_stream(), timeout=timeout) - await process.wait() - return "".join(output_lines), process.returncode if process.returncode is not None else -1 - - except asyncio.TimeoutError: - process.kill() - await process.wait() - return "".join(output_lines) + "\nTask timed out", -1 - - except asyncio.CancelledError: - # Handle task cancellation by killing the subprocess - logger.warning(f"Task {task_id} cancelled. Killing process {process.pid}") - try: - process.kill() - await process.wait() - except Exception as e: - logger.error(f"Error killing process for cancelled task {task_id}: {e}") - raise + except SandboxViolation as exc: + logger.warning( + "Sandbox violation for task %s — reason: %s", task_id, exc.reason + ) + # Surface the termination reason in the task status broadcast + await self._broadcast(task_id, "status", f"terminated:{exc.reason}") + # Update DB with a structured error message + db = await get_db() + await db.execute( + "UPDATE tasks SET error_message = ? WHERE id = ?", + (f"[sandbox:{exc.reason}] {exc}", task_id), + ) + await db.log_audit( + "sandbox_violation", + f"Task {task_id} terminated by sandbox: {exc.reason}", + severity="warning", + context={"task_id": task_id, "reason": exc.reason}, + task_id=task_id, + ) + return exc.output or f"[Sandbox terminated: {exc.reason}]", -1 - except Exception as e: - logger.error(f"Failed to execute command: {e}") - return f"Execution error: {str(e)}", -1 + except asyncio.CancelledError: + raise def _resolve_execution_timeout(self, inputs: Dict[str, Any]) -> int: """Resolve per-task process timeout from plugin inputs.""" diff --git a/backend/secuscan/sandbox_executor.py b/backend/secuscan/sandbox_executor.py new file mode 100644 index 00000000..1faaec6f --- /dev/null +++ b/backend/secuscan/sandbox_executor.py @@ -0,0 +1,223 @@ +""" +Plugin execution sandbox with resource limits and timeout enforcement. + +Wraps asyncio subprocess execution with: +- Configurable timeout (SIGTERM → grace period → SIGKILL) +- stdout/stderr byte-stream capping +- POSIX resource limits (RLIMIT_AS, RLIMIT_CPU) via preexec_fn on Linux +- Structured SandboxViolation exception on any breach +""" + +from __future__ import annotations + +import asyncio +import platform +import signal +import sys +from asyncio import subprocess +from dataclasses import dataclass, field +from typing import Optional +import logging + +logger = logging.getLogger(__name__) + +# ── Resource limits (Linux only) ────────────────────────────────────────────── +_IS_LINUX = platform.system() == "Linux" + +if _IS_LINUX: + import resource as _resource + + +def _apply_resource_limits(memory_mb: int, cpu_seconds: int) -> None: + """ + Called as preexec_fn inside the child process (Linux only). + Sets virtual memory and CPU time hard limits before exec(). + """ + if not _IS_LINUX: + return + try: + # Virtual address space limit (bytes) + mem_bytes = memory_mb * 1024 * 1024 + _resource.setrlimit(_resource.RLIMIT_AS, (mem_bytes, mem_bytes)) + except Exception: + pass # best-effort — never crash the child process + + try: + # CPU time limit (seconds) + _resource.setrlimit(_resource.RLIMIT_CPU, (cpu_seconds, cpu_seconds)) + except Exception: + pass + + +# ── Public API ───────────────────────────────────────────────────────────────── + +class SandboxViolation(Exception): + """Raised when a subprocess breaches a sandbox constraint.""" + + def __init__(self, reason: str, output: str = ""): + super().__init__(reason) + self.reason = reason # e.g. "timeout", "memory_limit", "output_limit" + self.output = output # partial output collected before the violation + + +@dataclass +class SandboxConfig: + """ + Per-task sandbox constraints. + + Defaults mirror settings values but can be overridden per plugin via + plugin metadata: { "sandbox": { "timeout_seconds": 30, ... } } + """ + timeout_seconds: int = 600 + max_memory_mb: int = 512 + max_output_bytes: int = 5 * 1024 * 1024 # 5 MB + sigterm_grace_seconds: int = 3 + + +async def run_sandboxed( + command: list[str], + task_id: str, + config: SandboxConfig, + broadcast_fn=None, # async callable(task_id, "output", line_str) | None +) -> tuple[str, int]: + """ + Execute *command* inside a resource-constrained subprocess. + + Args: + command: Command + args list passed to asyncio.create_subprocess_exec + task_id: Used only for logging and broadcast tagging + config: SandboxConfig instance controlling all limits + broadcast_fn: Optional async coroutine to stream each output line + + Returns: + (output_str, exit_code) + + Raises: + SandboxViolation: if timeout, output cap, or memory limit is hit + """ + + # Build preexec_fn for Linux resource limits + preexec = None + if _IS_LINUX: + mem_mb = config.max_memory_mb + cpu_sec = config.timeout_seconds # CPU seconds == wall timeout as upper bound + def preexec(): # noqa: E306 + _apply_resource_limits(mem_mb, cpu_sec) + + process: Optional[asyncio.subprocess.Process] = None + + try: + process = await asyncio.create_subprocess_exec( + *command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=preexec, + ) + + output_chunks: list[str] = [] + total_bytes = 0 + truncated = False + + async def _read_stream() -> None: + nonlocal total_bytes, truncated + stdout = process.stdout + if stdout is None: + return + while not stdout.at_eof(): + line = await stdout.readline() + if not line: + continue + line_bytes = len(line) + + # Output byte cap — stop reading, flag truncation + if total_bytes + line_bytes > config.max_output_bytes: + truncated = True + output_chunks.append( + f"\n[SANDBOX] Output truncated at {config.max_output_bytes // 1024} KB limit\n" + ) + # Drain remaining stdout so the process isn't blocked on write + try: + await asyncio.wait_for(stdout.read(), timeout=2) + except Exception: + pass + return + + decoded = line.decode("utf-8", errors="replace") + output_chunks.append(decoded) + total_bytes += line_bytes + + if broadcast_fn is not None: + try: + await broadcast_fn(task_id, "output", decoded) + except Exception: + pass + + # ── Timeout enforcement ──────────────────────────────────────────── + try: + await asyncio.wait_for(_read_stream(), timeout=config.timeout_seconds) + await process.wait() + except asyncio.TimeoutError: + await _escalate_kill(process, config.sigterm_grace_seconds, task_id) + partial = "".join(output_chunks) + raise SandboxViolation( + reason="timeout", + output=partial + f"\n[SANDBOX] Process killed after {config.timeout_seconds}s timeout", + ) + except asyncio.CancelledError: + await _escalate_kill(process, config.sigterm_grace_seconds, task_id) + raise + + exit_code = process.returncode if process.returncode is not None else -1 + output = "".join(output_chunks) + + # Surface memory limit hit (Linux SIGKILL from RLIMIT_AS → exit -9) + if _IS_LINUX and exit_code in (-9, 137): + raise SandboxViolation( + reason="memory_limit", + output=output + "\n[SANDBOX] Process killed by OS — memory limit exceeded", + ) + + return output, exit_code + + except SandboxViolation: + raise + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error("Sandbox execution error for task %s: %s", task_id, exc) + return f"Execution error: {exc}", -1 + + +async def _escalate_kill( + process: asyncio.subprocess.Process, + grace_seconds: int, + task_id: str, +) -> None: + """ + Send SIGTERM, wait grace_seconds, then SIGKILL if still alive. + Safe on all platforms (Windows falls back to terminate/kill). + """ + pid = process.pid + logger.warning("Sandbox: sending SIGTERM to PID %s (task %s)", pid, task_id) + + try: + if sys.platform == "win32": + process.terminate() + else: + process.send_signal(signal.SIGTERM) + except ProcessLookupError: + return # already dead + + try: + await asyncio.wait_for(process.wait(), timeout=grace_seconds) + logger.info("Sandbox: PID %s exited cleanly after SIGTERM", pid) + except asyncio.TimeoutError: + logger.warning( + "Sandbox: PID %s ignored SIGTERM after %ss — sending SIGKILL", + pid, grace_seconds, + ) + try: + process.kill() + await process.wait() + except ProcessLookupError: + pass