Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ SECUSCAN_VAULT_KEY=replace-with-output-of-secrets.token_hex-32
# SECUSCAN_PLUGIN_SIGNATURE_KEY=replace-with-your-signing-key
# SECUSCAN_ENFORCE_PLUGIN_SIGNATURES=false

# Plugin Capability Policy
# Comma-separated list of capabilities to deny across all plugins.
# Plugins that require any denied capability will fail before execution.
# Supported values: network, filesystem, docker, credentials, intrusive, exploit
# Example: deny all exploitation and credential-accessing plugins:
# SECUSCAN_DENIED_CAPABILITIES=exploit,credentials
# Parser Sandbox Limits
# Plugin parser.py files run in isolated subprocesses. Adjust these if you have
# plugins that produce very large output or need more time to parse.
Expand Down
203 changes: 203 additions & 0 deletions backend/secuscan/capabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""
Per-plugin capability declarations and pre-execution enforcement.

Plugins declare a list of capabilities they require in their metadata.json under
the ``capabilities`` key. The enforcer checks that list against the operator's
``denied_capabilities`` setting (``SECUSCAN_DENIED_CAPABILITIES`` env var, comma-
separated) before any command is built or process is spawned.

Supported capabilities
----------------------
network - plugin makes outbound network connections
filesystem - plugin reads or writes paths on the local filesystem
docker - plugin requires the Docker daemon at runtime
credentials - plugin pulls secrets from the credential vault
intrusive - plugin performs active probing that may affect target systems
exploit - plugin attempts to exploit vulnerabilities (highest risk, opt-in only)

Backward compatibility / migration
-----------------------------------
Plugins that do **not** declare a ``capabilities`` list (i.e. all plugins that
pre-date this feature) are **not broken**. Instead, an implied capability set is
derived from their ``safety.level`` field:

safe → ["network"]
intrusive → ["network", "intrusive"]
exploit → ["network", "intrusive", "exploit"]

This means:

* Existing plugins without a ``capabilities`` field continue to load and execute
normally. No plugin metadata files need to be updated for the enforcement
system to become active.
* Operators can still deny capabilities (e.g. ``SECUSCAN_DENIED_CAPABILITIES=exploit``)
and all exploit-level plugins will be blocked even if they lack an explicit
``capabilities`` declaration.
* Plugin authors are encouraged to add an explicit ``capabilities`` list to their
metadata.json so operators have fine-grained visibility. After adding or
changing the ``capabilities`` field the plugin checksum must be regenerated
(run ``python -m backend.secuscan.plugins_validate --refresh <plugin-id>``).
"""

from __future__ import annotations

from enum import Enum
from typing import FrozenSet, List, Optional, Set

import logging

logger = logging.getLogger(__name__)


class Capability(str, Enum):
"""All recognised plugin capability tokens."""

NETWORK = "network"
FILESYSTEM = "filesystem"
DOCKER = "docker"
CREDENTIALS = "credentials"
INTRUSIVE = "intrusive"
EXPLOIT = "exploit"


ALL_CAPABILITIES: FrozenSet[str] = frozenset(c.value for c in Capability)

# Capabilities that are implicitly required by a plugin's safety level when the
# plugin has not declared them explicitly. This lets older plugins without a
# ``capabilities`` field degrade gracefully while still being enforceable.
_SAFETY_LEVEL_IMPLIED: dict[str, List[str]] = {
"safe": ["network"],
"intrusive": ["network", "intrusive"],
"exploit": ["network", "intrusive", "exploit"],
}


class CapabilityDeniedError(PermissionError):
"""Raised when a plugin attempts to use a capability that the operator has denied."""

def __init__(self, plugin_id: str, denied: Set[str]) -> None:
self.plugin_id = plugin_id
self.denied_capabilities = denied
caps = ", ".join(sorted(denied))
super().__init__(
f"Plugin '{plugin_id}' requires capabilities [{caps}] that are denied by "
"operator policy. Update SECUSCAN_DENIED_CAPABILITIES to allow them or "
"choose a plugin that does not require these capabilities."
)


def validate_capability_list(capabilities: List[str], plugin_id: str) -> List[str]:
"""Return the normalised capability list, raising ValueError for unknowns."""
normalised: List[str] = []
for raw in capabilities:
token = raw.strip().lower()
if token not in ALL_CAPABILITIES:
raise ValueError(
f"Plugin '{plugin_id}' declares unknown capability '{raw}'. "
f"Supported capabilities: {sorted(ALL_CAPABILITIES)}"
)
normalised.append(token)
return normalised


def effective_capabilities(
declared: Optional[List[str]],
safety_level: str,
plugin_id: str,
) -> Set[str]:
"""Combine explicitly declared capabilities with safety-level implied ones.

If the plugin declares an explicit capability list, that list is the source
of truth (implied capabilities are *not* added on top — they were already
considered by the plugin author). If no capabilities are declared at all the
implied set for the plugin's safety level is used so that legacy plugins
remain enforceable.
"""
if declared is not None and len(declared) > 0:
validated = validate_capability_list(declared, plugin_id)
return set(validated)

implied = _SAFETY_LEVEL_IMPLIED.get(safety_level, ["network"])
return set(implied)


class CapabilityEnforcer:
"""Checks plugin capabilities against the operator-configured denied set.

Instantiate once and reuse across the application lifetime. The denied set
is fixed at construction time so that the enforcer is deterministic and
testable independently of the global settings object.
"""

def __init__(self, denied_capabilities: Optional[List[str]] = None) -> None:
raw = denied_capabilities or []
normalised: List[str] = []
unknown: List[str] = []
for tok in raw:
token = tok.strip().lower()
if not token:
continue
if token not in ALL_CAPABILITIES:
unknown.append(tok.strip())
else:
normalised.append(token)
if unknown:
raise ValueError(
f"SECUSCAN_DENIED_CAPABILITIES contains unrecognised capability tokens: "
f"{unknown!r}. Supported capabilities: {sorted(ALL_CAPABILITIES)}. "
"Fix the typo or remove the unknown token — a misconfigured deny-list "
"silently fails to enforce the intended policy."
)
self._denied: FrozenSet[str] = frozenset(normalised)
if self._denied:
logger.info(
"CapabilityEnforcer: operator has denied capabilities: %s",
sorted(self._denied),
)

@property
def denied(self) -> FrozenSet[str]:
return self._denied

def check(
self,
plugin_id: str,
declared: Optional[List[str]],
safety_level: str,
) -> None:
"""Raise CapabilityDeniedError if the plugin needs a denied capability.

Args:
plugin_id: The plugin's ``id`` field from metadata.
declared: The ``capabilities`` list from the plugin's metadata (may be None).
safety_level: The plugin's safety level (``safe``, ``intrusive``, ``exploit``).

Raises:
CapabilityDeniedError: when any required capability is denied.
"""
if not self._denied:
return

required = effective_capabilities(declared, safety_level, plugin_id)
blocked = required & self._denied

if blocked:
logger.warning(
"Blocked plugin '%s': requires denied capabilities %s",
plugin_id,
sorted(blocked),
)
raise CapabilityDeniedError(plugin_id, blocked)

logger.debug(
"Capability check passed for plugin '%s': required=%s",
plugin_id,
sorted(required),
)


def build_enforcer_from_settings() -> CapabilityEnforcer:
"""Construct a CapabilityEnforcer from the global application settings."""
from .config import settings # local import to avoid circular dependency

return CapabilityEnforcer(denied_capabilities=list(settings.denied_capabilities))
1 change: 1 addition & 0 deletions backend/secuscan/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class Settings(BaseSettings):
plugin_signature_key: Optional[str] = None
enforce_plugin_signatures: bool = False
vault_key: Optional[str] = None
denied_capabilities: List[str] = []
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Parse denied capabilities as CSV from the environment

This new List[str] setting is documented in .env.example as SECUSCAN_DENIED_CAPABILITIES=exploit,credentials, but it is not included in the existing CSV parser validator used for other comma-separated list settings. In deployments that follow the documented example, pydantic-settings treats list environment values as JSON unless custom parsing is applied, so the server will either fail settings loading or never produce the intended ['exploit', 'credentials'] denied set; include this field in parse_csv_or_list (or use equivalent custom parsing) so the advertised policy knob works.

Useful? React with 👍 / 👎.


# Rate Limiting
max_concurrent_tasks: int = 3
Expand Down
45 changes: 43 additions & 2 deletions backend/secuscan/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .models import TaskStatus, ScanPhase
from .ratelimit import concurrent_limiter
from .risk_scoring import compute_risk_score, compute_risk_factors
from .capabilities import CapabilityEnforcer, CapabilityDeniedError, build_enforcer_from_settings
from .parser_sandbox import run_parser_in_sandbox, ParserSandboxError


Expand Down Expand Up @@ -89,6 +90,7 @@ def __init__(self):
self.running_tasks: Dict[str, asyncio.Task] = {}
# PubSub: Map of task_id to list of active async queues listening for output/status updates
self._listeners: Dict[str, List[asyncio.Queue]] = {}
self._capability_enforcer: CapabilityEnforcer = build_enforcer_from_settings()

def subscribe(self, task_id: str) -> asyncio.Queue:
"""Subscribe to a task's real-time events."""
Expand Down Expand Up @@ -328,8 +330,13 @@ async def execute_task(self, task_id: str):
if not plugin:
raise ValueError(f"Plugin not found: {plugin_id}")

# Pending records for assets removed

# Enforce capability policy before any command is built or process spawned
self._capability_enforcer.check(
plugin_id=plugin.id,
declared=plugin.capabilities,
safety_level=plugin.safety.get("level", "safe"),
)

command = plugin_manager.build_command(plugin_id, inputs)

if not command:
Expand Down Expand Up @@ -458,6 +465,40 @@ async def execute_task(self, task_id: str):
await self._invalidate_cached_views()
raise # let asyncio complete the cancellation

except CapabilityDeniedError as e:
logger.warning("Task %s blocked by capability policy: %s", task_id, e)
duration = (time.time() - start_time) if "start_time" in locals() else 0
await db.execute(
"""
UPDATE tasks SET
status = ?,
completed_at = ?,
duration_seconds = ?,
error_message = ?
WHERE id = ?
""",
(
TaskStatus.FAILED.value,
datetime.now().isoformat(),
duration,
str(e),
task_id,
),
)
await self._broadcast(task_id, "status", TaskStatus.FAILED.value)
await self._invalidate_cached_views()
await db.log_audit(
"task_capability_denied",
f"Task blocked by capability policy: {str(e)}",
severity="warning",
context={
"task_id": task_id,
"denied_capabilities": sorted(e.denied_capabilities),
"plugin_id": plugin_id,
},
task_id=task_id,
)

except Exception as e:
logger.error(f"Task {task_id} failed: {e}", exc_info=True)

Expand Down
1 change: 1 addition & 0 deletions backend/secuscan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class PluginMetadata(BaseModel):

output: Dict[str, Any]
safety: Dict[str, Any]
capabilities: Optional[List[str]] = None
learning: Optional[Dict[str, Any]] = None
dependencies: Optional[Dict[str, List[str]]] = None
docker_image: Optional[str] = None
Expand Down
10 changes: 10 additions & 0 deletions backend/secuscan/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from .models import PluginMetadata, PluginFieldType
from .config import settings
from .capabilities import validate_capability_list, ALL_CAPABILITIES

# Port specifications: one or more comma-separated port numbers or port ranges.
# Valid: "22", "80,443", "1-1000", "22,80,1000-2000"
Expand Down Expand Up @@ -141,6 +142,14 @@ async def _validate_plugin(self, plugin: PluginMetadata, plugin_dir: Path) -> bo
logger.error(f"Invalid safety level: {safety_level}")
return False

# Validate declared capabilities against the known set
if plugin.capabilities is not None:
try:
validate_capability_list(plugin.capabilities, plugin.id)
except ValueError as exc:
logger.error("Invalid capabilities in plugin %s: %s", plugin.id, exc)
return False

if not self._verify_plugin_integrity(plugin, plugin_dir):
return False

Expand Down Expand Up @@ -271,6 +280,7 @@ def list_plugins(self) -> List[Dict]:
"icon": plugin.icon,
"requires_consent": bool(plugin.safety.get("requires_consent", False)),
"consent_message": plugin.safety.get("consent_message"),
"capabilities": plugin.capabilities or [],
"availability": {
"runnable": len(missing_binaries) == 0,
"missing_binaries": missing_binaries,
Expand Down
3 changes: 2 additions & 1 deletion plugins/waf_detector/metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@
"python_packages": [],
"system_packages": []
},
"checksum": "60b54af15ff7bad498a02cdbf08ee8611622e117944a3a65301cb3cae1582bb2"
"capabilities": ["network"],
"checksum": "ac518fd15fe9a14f9327812178fd244ebaa2ee95d29d04b93ee928fa3fda7ffa"
}
Loading
Loading