diff --git a/.env.example b/.env.example index 86267446..e5cfd6d9 100644 --- a/.env.example +++ b/.env.example @@ -31,6 +31,12 @@ SECUSCAN_VAULT_KEY=replace-with-output-of-secrets.token_hex-32 # SECUSCAN_PLUGIN_SIGNATURE_KEY=replace-with-your-signing-key # SECUSCAN_ENFORCE_PLUGIN_SIGNATURES=false +# Plugin Capability Policy +# Comma-separated list of capabilities to deny across all plugins. +# Plugins that require any denied capability will fail before execution. +# Supported values: network, filesystem, docker, credentials, intrusive, exploit +# Example: deny all exploitation and credential-accessing plugins: +# SECUSCAN_DENIED_CAPABILITIES=exploit,credentials # Parser Sandbox Limits # Plugin parser.py files run in isolated subprocesses. Adjust these if you have # plugins that produce very large output or need more time to parse. diff --git a/backend/secuscan/capabilities.py b/backend/secuscan/capabilities.py new file mode 100644 index 00000000..ffcabbe3 --- /dev/null +++ b/backend/secuscan/capabilities.py @@ -0,0 +1,203 @@ +""" +Per-plugin capability declarations and pre-execution enforcement. + +Plugins declare a list of capabilities they require in their metadata.json under +the ``capabilities`` key. The enforcer checks that list against the operator's +``denied_capabilities`` setting (``SECUSCAN_DENIED_CAPABILITIES`` env var, comma- +separated) before any command is built or process is spawned. + +Supported capabilities +---------------------- +network - plugin makes outbound network connections +filesystem - plugin reads or writes paths on the local filesystem +docker - plugin requires the Docker daemon at runtime +credentials - plugin pulls secrets from the credential vault +intrusive - plugin performs active probing that may affect target systems +exploit - plugin attempts to exploit vulnerabilities (highest risk, opt-in only) + +Backward compatibility / migration +----------------------------------- +Plugins that do **not** declare a ``capabilities`` list (i.e. all plugins that +pre-date this feature) are **not broken**. Instead, an implied capability set is +derived from their ``safety.level`` field: + + safe → ["network"] + intrusive → ["network", "intrusive"] + exploit → ["network", "intrusive", "exploit"] + +This means: + +* Existing plugins without a ``capabilities`` field continue to load and execute + normally. No plugin metadata files need to be updated for the enforcement + system to become active. +* Operators can still deny capabilities (e.g. ``SECUSCAN_DENIED_CAPABILITIES=exploit``) + and all exploit-level plugins will be blocked even if they lack an explicit + ``capabilities`` declaration. +* Plugin authors are encouraged to add an explicit ``capabilities`` list to their + metadata.json so operators have fine-grained visibility. After adding or + changing the ``capabilities`` field the plugin checksum must be regenerated + (run ``python -m backend.secuscan.plugins_validate --refresh ``). +""" + +from __future__ import annotations + +from enum import Enum +from typing import FrozenSet, List, Optional, Set + +import logging + +logger = logging.getLogger(__name__) + + +class Capability(str, Enum): + """All recognised plugin capability tokens.""" + + NETWORK = "network" + FILESYSTEM = "filesystem" + DOCKER = "docker" + CREDENTIALS = "credentials" + INTRUSIVE = "intrusive" + EXPLOIT = "exploit" + + +ALL_CAPABILITIES: FrozenSet[str] = frozenset(c.value for c in Capability) + +# Capabilities that are implicitly required by a plugin's safety level when the +# plugin has not declared them explicitly. This lets older plugins without a +# ``capabilities`` field degrade gracefully while still being enforceable. +_SAFETY_LEVEL_IMPLIED: dict[str, List[str]] = { + "safe": ["network"], + "intrusive": ["network", "intrusive"], + "exploit": ["network", "intrusive", "exploit"], +} + + +class CapabilityDeniedError(PermissionError): + """Raised when a plugin attempts to use a capability that the operator has denied.""" + + def __init__(self, plugin_id: str, denied: Set[str]) -> None: + self.plugin_id = plugin_id + self.denied_capabilities = denied + caps = ", ".join(sorted(denied)) + super().__init__( + f"Plugin '{plugin_id}' requires capabilities [{caps}] that are denied by " + "operator policy. Update SECUSCAN_DENIED_CAPABILITIES to allow them or " + "choose a plugin that does not require these capabilities." + ) + + +def validate_capability_list(capabilities: List[str], plugin_id: str) -> List[str]: + """Return the normalised capability list, raising ValueError for unknowns.""" + normalised: List[str] = [] + for raw in capabilities: + token = raw.strip().lower() + if token not in ALL_CAPABILITIES: + raise ValueError( + f"Plugin '{plugin_id}' declares unknown capability '{raw}'. " + f"Supported capabilities: {sorted(ALL_CAPABILITIES)}" + ) + normalised.append(token) + return normalised + + +def effective_capabilities( + declared: Optional[List[str]], + safety_level: str, + plugin_id: str, +) -> Set[str]: + """Combine explicitly declared capabilities with safety-level implied ones. + + If the plugin declares an explicit capability list, that list is the source + of truth (implied capabilities are *not* added on top — they were already + considered by the plugin author). If no capabilities are declared at all the + implied set for the plugin's safety level is used so that legacy plugins + remain enforceable. + """ + if declared is not None and len(declared) > 0: + validated = validate_capability_list(declared, plugin_id) + return set(validated) + + implied = _SAFETY_LEVEL_IMPLIED.get(safety_level, ["network"]) + return set(implied) + + +class CapabilityEnforcer: + """Checks plugin capabilities against the operator-configured denied set. + + Instantiate once and reuse across the application lifetime. The denied set + is fixed at construction time so that the enforcer is deterministic and + testable independently of the global settings object. + """ + + def __init__(self, denied_capabilities: Optional[List[str]] = None) -> None: + raw = denied_capabilities or [] + normalised: List[str] = [] + unknown: List[str] = [] + for tok in raw: + token = tok.strip().lower() + if not token: + continue + if token not in ALL_CAPABILITIES: + unknown.append(tok.strip()) + else: + normalised.append(token) + if unknown: + raise ValueError( + f"SECUSCAN_DENIED_CAPABILITIES contains unrecognised capability tokens: " + f"{unknown!r}. Supported capabilities: {sorted(ALL_CAPABILITIES)}. " + "Fix the typo or remove the unknown token — a misconfigured deny-list " + "silently fails to enforce the intended policy." + ) + self._denied: FrozenSet[str] = frozenset(normalised) + if self._denied: + logger.info( + "CapabilityEnforcer: operator has denied capabilities: %s", + sorted(self._denied), + ) + + @property + def denied(self) -> FrozenSet[str]: + return self._denied + + def check( + self, + plugin_id: str, + declared: Optional[List[str]], + safety_level: str, + ) -> None: + """Raise CapabilityDeniedError if the plugin needs a denied capability. + + Args: + plugin_id: The plugin's ``id`` field from metadata. + declared: The ``capabilities`` list from the plugin's metadata (may be None). + safety_level: The plugin's safety level (``safe``, ``intrusive``, ``exploit``). + + Raises: + CapabilityDeniedError: when any required capability is denied. + """ + if not self._denied: + return + + required = effective_capabilities(declared, safety_level, plugin_id) + blocked = required & self._denied + + if blocked: + logger.warning( + "Blocked plugin '%s': requires denied capabilities %s", + plugin_id, + sorted(blocked), + ) + raise CapabilityDeniedError(plugin_id, blocked) + + logger.debug( + "Capability check passed for plugin '%s': required=%s", + plugin_id, + sorted(required), + ) + + +def build_enforcer_from_settings() -> CapabilityEnforcer: + """Construct a CapabilityEnforcer from the global application settings.""" + from .config import settings # local import to avoid circular dependency + + return CapabilityEnforcer(denied_capabilities=list(settings.denied_capabilities)) diff --git a/backend/secuscan/config.py b/backend/secuscan/config.py index 9303b7ac..505d8e04 100644 --- a/backend/secuscan/config.py +++ b/backend/secuscan/config.py @@ -58,6 +58,7 @@ class Settings(BaseSettings): plugin_signature_key: Optional[str] = None enforce_plugin_signatures: bool = False vault_key: Optional[str] = None + denied_capabilities: List[str] = [] # Rate Limiting max_concurrent_tasks: int = 3 diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index fa9dc426..e8f0bbea 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -21,6 +21,7 @@ from .models import TaskStatus, ScanPhase from .ratelimit import concurrent_limiter from .risk_scoring import compute_risk_score, compute_risk_factors +from .capabilities import CapabilityEnforcer, CapabilityDeniedError, build_enforcer_from_settings from .parser_sandbox import run_parser_in_sandbox, ParserSandboxError @@ -89,6 +90,7 @@ def __init__(self): self.running_tasks: Dict[str, asyncio.Task] = {} # PubSub: Map of task_id to list of active async queues listening for output/status updates self._listeners: Dict[str, List[asyncio.Queue]] = {} + self._capability_enforcer: CapabilityEnforcer = build_enforcer_from_settings() def subscribe(self, task_id: str) -> asyncio.Queue: """Subscribe to a task's real-time events.""" @@ -328,8 +330,13 @@ async def execute_task(self, task_id: str): if not plugin: raise ValueError(f"Plugin not found: {plugin_id}") - # Pending records for assets removed - + # Enforce capability policy before any command is built or process spawned + self._capability_enforcer.check( + plugin_id=plugin.id, + declared=plugin.capabilities, + safety_level=plugin.safety.get("level", "safe"), + ) + command = plugin_manager.build_command(plugin_id, inputs) if not command: @@ -458,6 +465,40 @@ async def execute_task(self, task_id: str): await self._invalidate_cached_views() raise # let asyncio complete the cancellation + except CapabilityDeniedError as e: + logger.warning("Task %s blocked by capability policy: %s", task_id, e) + duration = (time.time() - start_time) if "start_time" in locals() else 0 + await db.execute( + """ + UPDATE tasks SET + status = ?, + completed_at = ?, + duration_seconds = ?, + error_message = ? + WHERE id = ? + """, + ( + TaskStatus.FAILED.value, + datetime.now().isoformat(), + duration, + str(e), + task_id, + ), + ) + await self._broadcast(task_id, "status", TaskStatus.FAILED.value) + await self._invalidate_cached_views() + await db.log_audit( + "task_capability_denied", + f"Task blocked by capability policy: {str(e)}", + severity="warning", + context={ + "task_id": task_id, + "denied_capabilities": sorted(e.denied_capabilities), + "plugin_id": plugin_id, + }, + task_id=task_id, + ) + except Exception as e: logger.error(f"Task {task_id} failed: {e}", exc_info=True) diff --git a/backend/secuscan/models.py b/backend/secuscan/models.py index 8310091a..829855f7 100644 --- a/backend/secuscan/models.py +++ b/backend/secuscan/models.py @@ -79,6 +79,7 @@ class PluginMetadata(BaseModel): output: Dict[str, Any] safety: Dict[str, Any] + capabilities: Optional[List[str]] = None learning: Optional[Dict[str, Any]] = None dependencies: Optional[Dict[str, List[str]]] = None docker_image: Optional[str] = None diff --git a/backend/secuscan/plugins.py b/backend/secuscan/plugins.py index e2574a79..8e158ccb 100644 --- a/backend/secuscan/plugins.py +++ b/backend/secuscan/plugins.py @@ -14,6 +14,7 @@ from .models import PluginMetadata, PluginFieldType from .config import settings +from .capabilities import validate_capability_list, ALL_CAPABILITIES # Port specifications: one or more comma-separated port numbers or port ranges. # Valid: "22", "80,443", "1-1000", "22,80,1000-2000" @@ -141,6 +142,14 @@ async def _validate_plugin(self, plugin: PluginMetadata, plugin_dir: Path) -> bo logger.error(f"Invalid safety level: {safety_level}") return False + # Validate declared capabilities against the known set + if plugin.capabilities is not None: + try: + validate_capability_list(plugin.capabilities, plugin.id) + except ValueError as exc: + logger.error("Invalid capabilities in plugin %s: %s", plugin.id, exc) + return False + if not self._verify_plugin_integrity(plugin, plugin_dir): return False @@ -271,6 +280,7 @@ def list_plugins(self) -> List[Dict]: "icon": plugin.icon, "requires_consent": bool(plugin.safety.get("requires_consent", False)), "consent_message": plugin.safety.get("consent_message"), + "capabilities": plugin.capabilities or [], "availability": { "runnable": len(missing_binaries) == 0, "missing_binaries": missing_binaries, diff --git a/plugins/waf_detector/metadata.json b/plugins/waf_detector/metadata.json index b642dcda..6bd96a48 100644 --- a/plugins/waf_detector/metadata.json +++ b/plugins/waf_detector/metadata.json @@ -54,5 +54,6 @@ "python_packages": [], "system_packages": [] }, - "checksum": "60b54af15ff7bad498a02cdbf08ee8611622e117944a3a65301cb3cae1582bb2" + "capabilities": ["network"], + "checksum": "ac518fd15fe9a14f9327812178fd244ebaa2ee95d29d04b93ee928fa3fda7ffa" } diff --git a/testing/backend/unit/test_capabilities.py b/testing/backend/unit/test_capabilities.py new file mode 100644 index 00000000..de838562 --- /dev/null +++ b/testing/backend/unit/test_capabilities.py @@ -0,0 +1,253 @@ +""" +Unit tests for the per-plugin capability enforcement system. + +Covers: +- CapabilityEnforcer allows plugins whose capabilities are not denied +- CapabilityEnforcer blocks plugins that require a denied capability +- Partial denial: only the matching capability triggers a block +- Empty denied set: all plugins pass +- Legacy plugins (no declared capabilities) fall back to safety-level implied set +- validate_capability_list rejects unknown tokens +- effective_capabilities logic for declared vs implied sets +- Exploit capability is correctly blocked/allowed +- CapabilityDeniedError carries the right metadata +- build_enforcer_from_settings round-trips through Settings +""" + +import pytest +from unittest.mock import patch + +from backend.secuscan.capabilities import ( + ALL_CAPABILITIES, + Capability, + CapabilityDeniedError, + CapabilityEnforcer, + effective_capabilities, + validate_capability_list, + build_enforcer_from_settings, +) + + +# --------------------------------------------------------------------------- +# validate_capability_list +# --------------------------------------------------------------------------- + + +class TestValidateCapabilityList: + def test_all_known_capabilities_accepted(self): + known = list(ALL_CAPABILITIES) + result = validate_capability_list(known, "test_plugin") + assert set(result) == ALL_CAPABILITIES + + def test_unknown_token_raises(self): + with pytest.raises(ValueError, match="unknown capability"): + validate_capability_list(["network", "xray_vision"], "test_plugin") + + def test_empty_list_is_valid(self): + assert validate_capability_list([], "test_plugin") == [] + + def test_normalises_to_lowercase(self): + result = validate_capability_list(["NETWORK", "Intrusive"], "test_plugin") + assert result == ["network", "intrusive"] + + def test_whitespace_is_stripped(self): + result = validate_capability_list([" network "], "test_plugin") + assert result == ["network"] + + +# --------------------------------------------------------------------------- +# effective_capabilities +# --------------------------------------------------------------------------- + + +class TestEffectiveCapabilities: + def test_explicit_list_returned_as_is(self): + caps = effective_capabilities(["network", "credentials"], "safe", "plugin") + assert caps == {"network", "credentials"} + + def test_empty_declared_list_falls_back_to_implied(self): + # An *empty* list (no capabilities declared) falls back to implied + caps = effective_capabilities(None, "safe", "plugin") + assert "network" in caps + + def test_intrusive_implied_set(self): + caps = effective_capabilities(None, "intrusive", "plugin") + assert caps >= {"network", "intrusive"} + + def test_exploit_implied_set(self): + caps = effective_capabilities(None, "exploit", "plugin") + assert caps >= {"network", "intrusive", "exploit"} + + def test_safe_implied_set(self): + caps = effective_capabilities(None, "safe", "plugin") + assert "network" in caps + assert "exploit" not in caps + + def test_explicit_empty_list_falls_back_to_implied(self): + # An empty list [] means "no explicit declarations" → use implied + caps = effective_capabilities([], "intrusive", "plugin") + assert "intrusive" in caps + + def test_explicit_list_overrides_implied(self): + # Plugin explicitly declares only "filesystem" even though it's intrusive + caps = effective_capabilities(["filesystem"], "intrusive", "plugin") + assert caps == {"filesystem"} + assert "network" not in caps + + +# --------------------------------------------------------------------------- +# CapabilityEnforcer – basic allow/deny +# --------------------------------------------------------------------------- + + +class TestCapabilityEnforcerAllow: + def test_no_denied_capabilities_always_passes(self): + enforcer = CapabilityEnforcer(denied_capabilities=[]) + # Should not raise for any combination + enforcer.check("nuclei", ["network", "intrusive"], "intrusive") + enforcer.check("sqlmap", ["network", "intrusive", "exploit"], "exploit") + enforcer.check("nmap", ["network"], "safe") + + def test_unrelated_denial_does_not_block(self): + enforcer = CapabilityEnforcer(denied_capabilities=["docker"]) + enforcer.check("nmap", ["network"], "safe") + + def test_plugin_passes_when_all_its_caps_allowed(self): + enforcer = CapabilityEnforcer(denied_capabilities=["exploit"]) + enforcer.check("nuclei", ["network", "intrusive"], "intrusive") + + def test_exploit_plugin_allowed_when_exploit_not_denied(self): + enforcer = CapabilityEnforcer(denied_capabilities=["docker"]) + enforcer.check("sqlmap", ["network", "intrusive", "exploit"], "exploit") + + +class TestCapabilityEnforcerDeny: + def test_single_denied_capability_blocks(self): + enforcer = CapabilityEnforcer(denied_capabilities=["exploit"]) + with pytest.raises(CapabilityDeniedError) as exc_info: + enforcer.check("sqlmap", ["network", "intrusive", "exploit"], "exploit") + assert "exploit" in str(exc_info.value) + + def test_blocks_when_any_required_cap_denied(self): + enforcer = CapabilityEnforcer(denied_capabilities=["intrusive"]) + with pytest.raises(CapabilityDeniedError): + enforcer.check("nuclei", ["network", "intrusive"], "intrusive") + + def test_multiple_denied_some_matching(self): + enforcer = CapabilityEnforcer(denied_capabilities=["docker", "exploit"]) + with pytest.raises(CapabilityDeniedError) as exc_info: + enforcer.check("zap", ["network", "exploit"], "exploit") + error = exc_info.value + assert "exploit" in error.denied_capabilities + + def test_error_carries_plugin_id(self): + enforcer = CapabilityEnforcer(denied_capabilities=["credentials"]) + with pytest.raises(CapabilityDeniedError) as exc_info: + enforcer.check("ssh_runner", ["network", "credentials"], "intrusive") + assert exc_info.value.plugin_id == "ssh_runner" + + def test_error_carries_denied_set(self): + enforcer = CapabilityEnforcer(denied_capabilities=["exploit", "credentials"]) + with pytest.raises(CapabilityDeniedError) as exc_info: + enforcer.check("metasploit", ["network", "exploit", "credentials"], "exploit") + blocked = exc_info.value.denied_capabilities + assert "exploit" in blocked + assert "credentials" in blocked + + def test_legacy_plugin_no_caps_blocked_via_implied(self): + enforcer = CapabilityEnforcer(denied_capabilities=["exploit"]) + with pytest.raises(CapabilityDeniedError): + # No declared capabilities, safety=exploit → implied includes exploit + enforcer.check("legacy_exploiter", None, "exploit") + + def test_legacy_intrusive_plugin_blocked(self): + enforcer = CapabilityEnforcer(denied_capabilities=["intrusive"]) + with pytest.raises(CapabilityDeniedError): + enforcer.check("old_scanner", None, "intrusive") + + def test_legacy_safe_plugin_not_blocked_by_intrusive_denial(self): + enforcer = CapabilityEnforcer(denied_capabilities=["intrusive"]) + # Safe plugin implied set does not include intrusive + enforcer.check("passive_scanner", None, "safe") + + def test_filesystem_denial_blocks_filesystem_plugin(self): + enforcer = CapabilityEnforcer(denied_capabilities=["filesystem"]) + with pytest.raises(CapabilityDeniedError): + enforcer.check("yara_scan", ["filesystem", "intrusive"], "intrusive") + + def test_docker_denial_blocks_docker_plugin(self): + enforcer = CapabilityEnforcer(denied_capabilities=["docker"]) + with pytest.raises(CapabilityDeniedError): + enforcer.check("container_scanner", ["network", "docker"], "safe") + + +# --------------------------------------------------------------------------- +# CapabilityEnforcer – denied set normalisation +# --------------------------------------------------------------------------- + + +class TestCapabilityEnforcerNormalisation: + def test_whitespace_in_denied_list_stripped(self): + enforcer = CapabilityEnforcer(denied_capabilities=[" exploit "]) + with pytest.raises(CapabilityDeniedError): + enforcer.check("sqlmap", ["exploit"], "exploit") + + def test_uppercase_denied_capability_normalised(self): + enforcer = CapabilityEnforcer(denied_capabilities=["EXPLOIT"]) + with pytest.raises(CapabilityDeniedError): + enforcer.check("sqlmap", ["exploit"], "exploit") + + def test_empty_strings_in_denied_list_ignored(self): + enforcer = CapabilityEnforcer(denied_capabilities=["", " ", "network"]) + enforcer.check("nmap", ["filesystem"], "safe") + + def test_denied_property_is_frozenset(self): + enforcer = CapabilityEnforcer(denied_capabilities=["network", "exploit"]) + assert isinstance(enforcer.denied, frozenset) + + +# --------------------------------------------------------------------------- +# CapabilityDeniedError +# --------------------------------------------------------------------------- + + +class TestCapabilityDeniedError: + def test_message_contains_plugin_id(self): + err = CapabilityDeniedError("my_plugin", {"exploit"}) + assert "my_plugin" in str(err) + + def test_message_contains_capability_name(self): + err = CapabilityDeniedError("my_plugin", {"credentials", "exploit"}) + assert "credentials" in str(err) + assert "exploit" in str(err) + + def test_is_permission_error(self): + err = CapabilityDeniedError("my_plugin", {"exploit"}) + assert isinstance(err, PermissionError) + + +# --------------------------------------------------------------------------- +# build_enforcer_from_settings +# --------------------------------------------------------------------------- + + +class TestBuildEnforcerFromSettings: + def test_empty_denied_capabilities_in_settings(self): + with patch("backend.secuscan.config.settings") as mock_settings: + mock_settings.denied_capabilities = [] + enforcer = build_enforcer_from_settings() + assert enforcer.denied == frozenset() + + def test_denied_capabilities_propagated_from_settings(self): + with patch("backend.secuscan.config.settings") as mock_settings: + mock_settings.denied_capabilities = ["exploit", "docker"] + enforcer = build_enforcer_from_settings() + assert "exploit" in enforcer.denied + assert "docker" in enforcer.denied + + def test_enforcer_blocks_based_on_settings(self): + with patch("backend.secuscan.config.settings") as mock_settings: + mock_settings.denied_capabilities = ["exploit"] + enforcer = build_enforcer_from_settings() + with pytest.raises(CapabilityDeniedError): + enforcer.check("sqlmap", ["network", "exploit"], "exploit") diff --git a/testing/backend/unit/test_plugin_capabilities_inventory.py b/testing/backend/unit/test_plugin_capabilities_inventory.py new file mode 100644 index 00000000..72c5a38d --- /dev/null +++ b/testing/backend/unit/test_plugin_capabilities_inventory.py @@ -0,0 +1,347 @@ +""" +Plugin capability inventory tests (PR #368). + +Verifies: +- Every plugin whose metadata.json declares a `capabilities` field uses only + recognised capability tokens from the ALL_CAPABILITIES set. +- Plugins with safety.level == "exploit" that declare explicit capabilities + include the "exploit" token (high-risk classification consistency). +- CapabilityEnforcer raises ValueError at construction time on an unknown + denied-capability token (operator typo safety). +- CapabilityEnforcer raises on any unknown token even when mixed with valid ones. +- CapabilityEnforcer accepts the empty denied list without error. +- CapabilityEnforcer accepts all recognised capability tokens without error. +- Denied capabilities raise CapabilityDeniedError BEFORE command construction + (execution-blocking regression: executor must enforce policy before any + subprocess is spawned or any command argument is assembled). +- effective_capabilities returns the expected implied set for every known + safety level so legacy plugins remain enforceable without metadata changes. +""" + +import json +from pathlib import Path +import pytest + +from backend.secuscan.capabilities import ( + ALL_CAPABILITIES, + CapabilityDeniedError, + CapabilityEnforcer, + effective_capabilities, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_PLUGINS_ROOT = Path(__file__).parent.parent.parent.parent / "plugins" + + +def _iter_plugin_metadata(): + """Yield (plugin_id, metadata_dict) for every plugin with a metadata.json.""" + for plugin_dir in sorted(_PLUGINS_ROOT.iterdir()): + mf = plugin_dir / "metadata.json" + if not mf.is_file(): + continue + try: + meta = json.loads(mf.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + continue + yield meta.get("id", plugin_dir.name), meta + + +def _plugins_with_explicit_capabilities(): + return [ + (pid, meta) + for pid, meta in _iter_plugin_metadata() + if meta.get("capabilities") is not None + ] + + +def _exploit_plugins_with_explicit_capabilities(): + return [ + (pid, meta) + for pid, meta in _plugins_with_explicit_capabilities() + if meta.get("safety", {}).get("level") == "exploit" + ] + + +# --------------------------------------------------------------------------- +# Capability token inventory +# --------------------------------------------------------------------------- + + +class TestPluginCapabilityInventory: + def test_all_declared_capabilities_are_recognised(self): + """Every capability token declared in any plugin metadata.json must be + in ALL_CAPABILITIES; unknown tokens indicate a typo or an undocumented + capability that was never registered.""" + bad: list[str] = [] + for plugin_id, meta in _plugins_with_explicit_capabilities(): + for token in meta.get("capabilities", []): + if token.strip().lower() not in ALL_CAPABILITIES: + bad.append(f"{plugin_id}: unknown token {token!r}") + assert not bad, ( + "Plugin(s) declare unrecognised capability tokens:\n" + + "\n".join(bad) + + f"\nSupported: {sorted(ALL_CAPABILITIES)}" + ) + + def test_exploit_level_plugins_declare_exploit_capability(self): + """Plugins with safety.level == 'exploit' that explicitly declare + capabilities must include 'exploit' — omitting it would let the + operator's deny-list miss them.""" + missing: list[str] = [] + for plugin_id, meta in _exploit_plugins_with_explicit_capabilities(): + caps = [t.strip().lower() for t in meta.get("capabilities", [])] + if "exploit" not in caps: + missing.append(plugin_id) + assert not missing, ( + "Exploit-level plugin(s) declare explicit capabilities but omit " + f"'exploit': {missing}. Add 'exploit' or remove the explicit " + "declaration to fall back to the implied set." + ) + + def test_intrusive_level_plugins_declare_intrusive_when_explicit(self): + """Plugins with safety.level == 'intrusive' that explicitly declare + capabilities must include 'intrusive'.""" + missing: list[str] = [] + for plugin_id, meta in _plugins_with_explicit_capabilities(): + if meta.get("safety", {}).get("level") != "intrusive": + continue + caps = [t.strip().lower() for t in meta.get("capabilities", [])] + if "intrusive" not in caps: + missing.append(plugin_id) + assert not missing, ( + "Intrusive-level plugin(s) declare explicit capabilities but omit " + f"'intrusive': {missing}." + ) + + def test_capabilities_field_is_a_list_when_present(self): + """The `capabilities` field must be a JSON array, not a string or other type.""" + bad: list[str] = [] + for plugin_id, meta in _plugins_with_explicit_capabilities(): + if not isinstance(meta.get("capabilities"), list): + bad.append( + f"{plugin_id}: 'capabilities' is {type(meta['capabilities']).__name__}, " + "expected list" + ) + assert not bad, "\n".join(bad) + + def test_all_capability_tokens_are_lowercase_strings(self): + """Capability tokens should be lowercase strings (normalisation happens at + runtime, but storing mixed-case values is a maintenance footgun).""" + bad: list[str] = [] + for plugin_id, meta in _plugins_with_explicit_capabilities(): + for token in meta.get("capabilities", []): + if not isinstance(token, str) or token != token.lower(): + bad.append(f"{plugin_id}: non-lowercase token {token!r}") + assert not bad, "\n".join(bad) + + +# --------------------------------------------------------------------------- +# CapabilityEnforcer construction-time token validation +# --------------------------------------------------------------------------- + + +class TestCapabilityEnforcerDeniedTokenValidation: + def test_unknown_token_raises_at_construction(self): + with pytest.raises(ValueError, match="unrecognised capability"): + CapabilityEnforcer(denied_capabilities=["netwrk"]) # typo + + def test_mixed_valid_and_unknown_raises(self): + with pytest.raises(ValueError, match="unrecognised capability"): + CapabilityEnforcer(denied_capabilities=["network", "xray_vision"]) + + def test_error_message_names_the_bad_token(self): + with pytest.raises(ValueError, match="xray_vision"): + CapabilityEnforcer(denied_capabilities=["xray_vision"]) + + def test_error_message_lists_supported_capabilities(self): + with pytest.raises(ValueError, match="Supported capabilities"): + CapabilityEnforcer(denied_capabilities=["bad_token"]) + + def test_empty_denied_list_accepted(self): + enforcer = CapabilityEnforcer(denied_capabilities=[]) + assert enforcer.denied == frozenset() + + def test_none_denied_list_accepted(self): + enforcer = CapabilityEnforcer(denied_capabilities=None) + assert enforcer.denied == frozenset() + + def test_all_recognised_tokens_accepted(self): + enforcer = CapabilityEnforcer(denied_capabilities=list(ALL_CAPABILITIES)) + assert enforcer.denied == ALL_CAPABILITIES + + def test_whitespace_only_tokens_are_ignored(self): + enforcer = CapabilityEnforcer(denied_capabilities=[" ", "\t", "network"]) + assert "network" in enforcer.denied + + def test_case_insensitive_normalisation(self): + enforcer = CapabilityEnforcer(denied_capabilities=["NETWORK", "Exploit"]) + assert "network" in enforcer.denied + assert "exploit" in enforcer.denied + + +# --------------------------------------------------------------------------- +# effective_capabilities — implied set correctness for all safety levels +# --------------------------------------------------------------------------- + + +class TestEffectiveCapabilitiesImpliedSets: + """Verify the implied capability sets for all safety levels so that + legacy plugins (those without an explicit 'capabilities' field) remain + enforceable via deny-list even after the enforcement engine ships.""" + + def test_safe_implies_network(self): + caps = effective_capabilities(None, "safe", "plugin") + assert "network" in caps + + def test_safe_does_not_imply_intrusive_or_exploit(self): + caps = effective_capabilities(None, "safe", "plugin") + assert "intrusive" not in caps + assert "exploit" not in caps + + def test_intrusive_implies_network_and_intrusive(self): + caps = effective_capabilities(None, "intrusive", "plugin") + assert {"network", "intrusive"} <= caps + + def test_intrusive_does_not_imply_exploit(self): + caps = effective_capabilities(None, "intrusive", "plugin") + assert "exploit" not in caps + + def test_exploit_implies_network_intrusive_and_exploit(self): + caps = effective_capabilities(None, "exploit", "plugin") + assert {"network", "intrusive", "exploit"} <= caps + + def test_unknown_level_falls_back_to_network(self): + caps = effective_capabilities(None, "unknown_level", "plugin") + assert "network" in caps + + def test_explicit_empty_list_falls_back_to_implied(self): + caps = effective_capabilities([], "intrusive", "plugin") + assert "intrusive" in caps + + def test_explicit_nonempty_list_overrides_implied(self): + # Plugin explicitly declares only filesystem; implied intrusive set is NOT added. + caps = effective_capabilities(["filesystem"], "intrusive", "plugin") + assert caps == {"filesystem"} + assert "intrusive" not in caps + + def test_all_shipped_plugins_have_enforceable_effective_capabilities(self): + """Every plugin in the plugins/ directory must produce a non-empty + effective_capabilities set so the enforcer can act on it.""" + bad: list[str] = [] + for plugin_id, meta in _iter_plugin_metadata(): + level = meta.get("safety", {}).get("level", "safe") + declared = meta.get("capabilities") + caps = effective_capabilities(declared, level, plugin_id) + if not caps: + bad.append(plugin_id) + assert not bad, ( + f"Plugin(s) produced empty effective_capabilities: {bad}" + ) + + +# --------------------------------------------------------------------------- +# Execution-blocking regression: denied capabilities stop execution BEFORE +# command construction — no subprocess is spawned or command built for a +# plugin whose required capabilities are all denied. +# --------------------------------------------------------------------------- + + +class TestDeniedCapabilityBlocksExecution: + """CapabilityEnforcer.check() must raise CapabilityDeniedError before any + command is built or subprocess is spawned. These tests exercise the + enforcer in isolation and then simulate the executor integration point.""" + + def test_check_raises_capability_denied_error_for_denied_cap(self): + enforcer = CapabilityEnforcer(denied_capabilities=["network"]) + with pytest.raises(CapabilityDeniedError) as exc_info: + enforcer.check("my_plugin", declared=None, safety_level="safe") + assert exc_info.value.plugin_id == "my_plugin" + assert "network" in exc_info.value.denied_capabilities + + def test_check_raises_before_command_construction(self): + """Simulate the executor integration point: enforcer.check() is called + before plugin_manager.build_command(). When the capability is denied, + build_command must never be called.""" + enforcer = CapabilityEnforcer(denied_capabilities=["exploit"]) + build_command_called: list[bool] = [] + + def fake_build_command(*args, **kwargs): + build_command_called.append(True) + return ["exploit_tool", "--target", "example.com"] + + with pytest.raises(CapabilityDeniedError): + enforcer.check("exploit_plugin", declared=None, safety_level="exploit") + # This line must not be reached: + fake_build_command("exploit_plugin", {}) + + assert build_command_called == [], ( + "build_command must not be called when capability check raises" + ) + + def test_exploit_plugin_blocked_when_exploit_denied(self): + enforcer = CapabilityEnforcer(denied_capabilities=["exploit"]) + with pytest.raises(CapabilityDeniedError): + enforcer.check("sqli_exploiter", declared=None, safety_level="exploit") + + def test_intrusive_plugin_blocked_when_intrusive_denied(self): + enforcer = CapabilityEnforcer(denied_capabilities=["intrusive"]) + with pytest.raises(CapabilityDeniedError): + enforcer.check("nikto", declared=None, safety_level="intrusive") + + def test_safe_plugin_passes_when_only_exploit_denied(self): + enforcer = CapabilityEnforcer(denied_capabilities=["exploit"]) + # Should not raise — safe plugins do not require exploit capability. + enforcer.check("whois_lookup", declared=None, safety_level="safe") + + def test_plugin_with_explicit_caps_blocked_on_denied_token(self): + enforcer = CapabilityEnforcer(denied_capabilities=["credentials"]) + with pytest.raises(CapabilityDeniedError) as exc_info: + enforcer.check( + "vault_plugin", + declared=["network", "credentials"], + safety_level="safe", + ) + assert "credentials" in exc_info.value.denied_capabilities + + def test_error_message_names_plugin_and_denied_caps(self): + enforcer = CapabilityEnforcer(denied_capabilities=["docker"]) + with pytest.raises(CapabilityDeniedError) as exc_info: + enforcer.check("container_scanner", declared=["docker"], safety_level="safe") + msg = str(exc_info.value) + assert "container_scanner" in msg + assert "docker" in msg + + def test_empty_denied_set_never_blocks_any_shipped_plugin(self): + """With an empty deny list every plugin must pass the capability check.""" + enforcer = CapabilityEnforcer(denied_capabilities=[]) + for plugin_id, meta in _iter_plugin_metadata(): + level = meta.get("safety", {}).get("level", "safe") + declared = meta.get("capabilities") + # Must not raise. + enforcer.check(plugin_id, declared=declared, safety_level=level) + + def test_deny_exploit_blocks_all_exploit_level_shipped_plugins(self): + """Denying 'exploit' must block every plugin with safety.level == 'exploit' + regardless of whether they have an explicit capabilities declaration.""" + enforcer = CapabilityEnforcer(denied_capabilities=["exploit"]) + blocked: list[str] = [] + not_blocked: list[str] = [] + for plugin_id, meta in _iter_plugin_metadata(): + if meta.get("safety", {}).get("level") != "exploit": + continue + declared = meta.get("capabilities") + try: + enforcer.check(plugin_id, declared=declared, safety_level="exploit") + not_blocked.append(plugin_id) + except CapabilityDeniedError: + blocked.append(plugin_id) + # Every exploit-level plugin must be blocked. + assert not_blocked == [], ( + f"Exploit-level plugin(s) not blocked when 'exploit' is denied: {not_blocked}" + ) + # Sanity: at least one exploit-level plugin must exist in the test corpus. + assert len(blocked) > 0, "No exploit-level plugins found — check plugins/ directory"