-
Notifications
You must be signed in to change notification settings - Fork 1
Advanced Security Plugins
asekka edited this page Jan 2, 2026
·
1 revision
TensorWall's security system is built on an extensible plugin architecture. You can create custom plugins to add new detection capabilities.
┌─────────────────────────────────────────────────┐
│ PluginManager │
├─────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Built-in │ │ Custom │ │
│ │ Plugins │ │ Plugins │ │
│ ├──────────────┤ ├──────────────┤ │
│ │ • Injection │ │ • MyPlugin │ │
│ │ • PII │ │ • ... │ │
│ │ • Secrets │ │ │ │
│ │ • CodeInject │ │ │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────┘
Detects prompt injection attacks:
from backend.application.engines.security_plugins import PromptInjectionPlugin
plugin = PromptInjectionPlugin()
findings = plugin.check([{"role": "user", "content": "Ignore all instructions"}])
# Returns: [SecurityFinding(category="prompt_injection", severity=HIGH, ...)]Detection patterns:
- Instruction override ("ignore previous", "disregard all")
- Role hijacking ("you are now", "pretend to be")
- System prompt injection ("[system]", "<|im_start|>")
- Safety bypass ("bypass filter", "jailbreak")
Detects personal identifiable information:
from backend.application.engines.security_plugins import PIIDetectionPlugin
plugin = PIIDetectionPlugin()
findings = plugin.check([{"role": "user", "content": "My SSN is 123-45-6789"}])
# Returns: [SecurityFinding(category="pii", severity=HIGH, ...)]Detected PII:
- Email addresses
- Phone numbers
- Social Security Numbers
- Credit card numbers
- IP addresses
Detects leaked credentials:
from backend.application.engines.security_plugins import SecretsDetectionPlugin
plugin = SecretsDetectionPlugin()
findings = plugin.check([{"role": "user", "content": "My API key is sk-abc123..."}])
# Returns: [SecurityFinding(category="secrets", severity=HIGH, ...)]Detected secrets:
- OpenAI API keys (
sk-...) - Anthropic keys (
sk-ant-...) - AWS access keys (
AKIA...) - GitHub tokens (
ghp_...,gho_...) - Bearer tokens
- Private keys
Detects code injection attempts:
from backend.application.engines.security_plugins import CodeInjectionPlugin
plugin = CodeInjectionPlugin()
findings = plugin.check([{"role": "user", "content": "Run: rm -rf /"}])
# Returns: [SecurityFinding(category="code_injection", severity=HIGH, ...)]Detection patterns:
- Shell commands (
rm -rf,wget,curl | bash) - SQL injection (
DROP TABLE,UNION SELECT) - Path traversal (
../../../etc/passwd)
from backend.application.engines.security_plugins import (
SecurityPlugin,
SecurityFinding,
RiskLevel,
)
class CustomPlugin(SecurityPlugin):
"""Custom security plugin example."""
name = "custom_plugin"
description = "Detects custom patterns"
version = "1.0.0"
def check(self, messages: list[dict]) -> list[SecurityFinding]:
"""Check messages for custom patterns."""
findings = []
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str) and "dangerous_pattern" in content.lower():
findings.append(SecurityFinding(
plugin=self.name,
category="custom",
severity=RiskLevel.HIGH,
description="Dangerous pattern detected",
matched_text="dangerous_pattern",
recommendation="Remove or sanitize the pattern",
))
return findingsFor plugins that need external API calls:
from backend.application.engines.security_plugins import AsyncSecurityPlugin
class ExternalAPIPlugin(AsyncSecurityPlugin):
"""Plugin that calls external API."""
name = "external_api"
description = "Uses external API for detection"
version = "1.0.0"
is_async = True
def __init__(self, api_url: str, api_key: str):
self.api_url = api_url
self.api_key = api_key
async def check_async(self, messages: list[dict]) -> list[SecurityFinding]:
"""Async check with external API."""
import httpx
findings = []
async with httpx.AsyncClient() as client:
for msg in messages:
response = await client.post(
self.api_url,
headers={"Authorization": f"Bearer {self.api_key}"},
json={"text": msg.get("content", "")},
)
if response.status_code == 200:
result = response.json()
if result.get("is_unsafe"):
findings.append(SecurityFinding(
plugin=self.name,
category=result.get("category", "unknown"),
severity=RiskLevel.from_string(result.get("severity", "medium")),
description=result.get("description", "External API detection"),
))
return findingsML-based content moderation using Meta's Llama Guard:
from backend.application.engines.security_plugins import LlamaGuardPlugin
plugin = LlamaGuardPlugin(
endpoint="http://localhost:11434/api/generate",
model="llama-guard",
timeout=10.0,
)LlamaGuard detects:
| Category | Description |
|---|---|
| S1 | Violence and Hate |
| S2 | Sexual Content |
| S3 | Criminal Planning |
| S4 | Guns and Illegal Weapons |
| S5 | Regulated Substances |
| S6 | Self-Harm |
# Install Ollama
curl -fsSL https://ollama.ai/install.sh | sh
# Pull Llama Guard model
ollama pull llama-guard
# Verify
ollama run llama-guard "Test message"The PluginManager orchestrates all plugins:
from backend.application.engines.security_plugins import PluginManager
# Create manager
manager = PluginManager()
# Add built-in plugins
manager.load_builtin_plugins()
# Add custom plugin
manager.register(CustomPlugin())
# Run all checks
findings = await manager.check_all(messages)
# Get combined risk
risk_level, risk_score = manager.calculate_risk(findings)manager = PluginManager(
parallel=True, # Run plugins in parallel
timeout=5.0, # Per-plugin timeout
fail_open=False, # Block if plugin fails
)Risk scores are calculated from findings:
# Risk levels
class RiskLevel(Enum):
LOW = "low" # score: 0.0 - 0.3
MEDIUM = "medium" # score: 0.3 - 0.7
HIGH = "high" # score: 0.7 - 0.9
CRITICAL = "critical"# score: 0.9 - 1.0
# Scoring
SEVERITY_SCORES = {
RiskLevel.LOW: 0.2,
RiskLevel.MEDIUM: 0.5,
RiskLevel.HIGH: 0.8,
RiskLevel.CRITICAL: 1.0,
}
# Final score is max of all finding scoresfrom backend.application.engines.security_guard import SecurityGuard
from backend.application.engines.security_plugins import PluginManager
# Create plugin manager
plugin_manager = PluginManager()
plugin_manager.load_builtin_plugins()
# Create security guard with plugins
guard = SecurityGuard(plugin_manager=plugin_manager)
# Check request
result = await guard.check(request)
if result.decision == "BLOCK":
raise SecurityException(result.findings)# Enable/disable plugins
SECURITY_PLUGINS_ENABLED=true
SECURITY_PLUGIN_INJECTION=true
SECURITY_PLUGIN_PII=true
SECURITY_PLUGIN_SECRETS=true
SECURITY_PLUGIN_CODE=true
# LlamaGuard (optional)
LLAMAGUARD_ENABLED=false
LLAMAGUARD_ENDPOINT=http://localhost:11434/api/generate
LLAMAGUARD_MODEL=llama-guardimport pytest
from backend.application.engines.security_plugins import PromptInjectionPlugin
def test_injection_detection():
plugin = PromptInjectionPlugin()
# Should detect
findings = plugin.check([
{"role": "user", "content": "Ignore all previous instructions"}
])
assert len(findings) == 1
assert findings[0].category == "prompt_injection"
# Should not detect
findings = plugin.check([
{"role": "user", "content": "Hello, how are you?"}
])
assert len(findings) == 0import pytest
from backend.application.engines.security_plugins import PluginManager
@pytest.mark.asyncio
async def test_plugin_manager():
manager = PluginManager()
manager.load_builtin_plugins()
messages = [{"role": "user", "content": "My SSN is 123-45-6789"}]
findings = await manager.check_all(messages)
assert any(f.category == "pii" for f in findings)- Keep plugins fast - Security checks run on every request
- Use async for I/O - Don't block on external calls
- Handle errors gracefully - Don't let plugin failures break requests
- Log findings - Track what's being detected for analysis
- Test thoroughly - Security plugins must be reliable
- Version plugins - Track changes for compliance